use std::collections::HashMap;
use std::path::PathBuf;
use super::LogFile;
pub struct LogWriter<C> {
logs_dir: PathBuf,
produce: fn(&C) -> Option<Vec<LogFile>>,
primary_id: Option<String>,
buffer: HashMap<String, Vec<u8>>,
}
impl<C> LogWriter<C> {
pub fn new(
logs_dir: PathBuf,
produce: fn(&C) -> Option<Vec<LogFile>>,
) -> Self {
Self {
logs_dir,
produce,
primary_id: None,
buffer: HashMap::new(),
}
}
pub fn primary_id(&self) -> Option<&str> {
self.primary_id.as_deref()
}
pub async fn write(&mut self, chunk: &C) -> Result<(), super::super::Error> {
let files = match (self.produce)(chunk) {
Some(files) => files,
None => return Ok(()),
};
if self.primary_id.is_none() {
if let Some(last) = files.last() {
self.primary_id = Some(last.id.clone());
}
}
let changed: Vec<LogFile> = files.into_iter().filter(|file| {
let path = file.path();
if self.buffer.get(&path).map_or(false, |prev| prev == &file.content) {
return false;
}
self.buffer.insert(path, file.content.clone());
true
}).collect();
futures::future::try_join_all(changed.into_iter().map(|file| {
let full_path = self.logs_dir.join(file.path());
async move {
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent).await
.map_err(|e| super::super::Error::Write(parent.to_path_buf(), e))?;
}
tokio::fs::write(&full_path, file.content).await
.map_err(|e| super::super::Error::Write(full_path, e))
}
})).await?;
Ok(())
}
}