use std::{
collections::HashMap,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::{fs::File, sync::Mutex};
use super::ConsumerOutput;
use crate::{error::IoriResult, StreamingSegment};
pub struct PipeConsumer {
output_dir: PathBuf,
recycle: bool,
next: Arc<AtomicU64>,
segments: Arc<Mutex<HashMap<u64, PathBuf>>>,
}
impl PipeConsumer {
pub fn new<P>(output_dir: P, recycle: bool) -> IoriResult<Self>
where
P: Into<PathBuf>,
{
let output_dir = output_dir.into();
std::fs::create_dir_all(&output_dir)?;
Ok(Self {
output_dir,
recycle,
next: Arc::new(AtomicU64::new(0)),
segments: Arc::new(Mutex::new(HashMap::new())),
})
}
pub async fn open_writer(
&self,
segment: &(impl StreamingSegment + Send + Sync + 'static),
) -> IoriResult<Option<ConsumerOutput>> {
let filename = segment.file_name();
let sequence = segment.sequence();
let filename = format!("{sequence:06}_{filename}");
let path = self.output_dir.join(filename);
let file = File::create(&path).await?;
let recycle = self.recycle;
let next = self.next.clone();
let segments = self.segments.clone();
Ok(Some(ConsumerOutput::new(Box::pin(file)).on_finish(
move || {
Box::pin(async move {
let mut segments = segments.lock().await;
segments.insert(sequence, path);
if sequence == next.load(Ordering::Relaxed) {
while let Some(path) = segments.remove(&next.load(Ordering::Relaxed)) {
let mut file = std::fs::File::open(&path)?;
let _ = std::io::copy(&mut file, &mut std::io::stdout());
if recycle {
tokio::fs::remove_file(&path).await?;
}
next.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
})
},
)))
}
}