objectiveai_sdk/filesystem/logs/
writer.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3
4use super::LogFile;
5
6pub struct LogWriter<C> {
14 logs_dir: PathBuf,
15 produce: fn(&C) -> Option<Vec<LogFile>>,
16 primary_id: Option<String>,
17 buffer: HashMap<String, Vec<u8>>,
18}
19
20impl<C> LogWriter<C> {
21 pub fn new(
22 logs_dir: PathBuf,
23 produce: fn(&C) -> Option<Vec<LogFile>>,
24 ) -> Self {
25 Self {
26 logs_dir,
27 produce,
28 primary_id: None,
29 buffer: HashMap::new(),
30 }
31 }
32
33 pub fn primary_id(&self) -> Option<&str> {
37 self.primary_id.as_deref()
38 }
39
40 pub async fn write(&mut self, chunk: &C) -> Result<(), super::super::Error> {
43 let files = match (self.produce)(chunk) {
44 Some(files) => files,
45 None => return Ok(()),
46 };
47
48 if self.primary_id.is_none() {
50 if let Some(last) = files.last() {
51 self.primary_id = Some(last.id.clone());
52 }
53 }
54
55 let changed: Vec<LogFile> = files.into_iter().filter(|file| {
57 let path = file.path();
58 if self.buffer.get(&path).map_or(false, |prev| prev == &file.content) {
59 return false;
60 }
61 self.buffer.insert(path, file.content.clone());
62 true
63 }).collect();
64
65 futures::future::try_join_all(changed.into_iter().map(|file| {
66 let full_path = self.logs_dir.join(file.path());
67 async move {
68 if let Some(parent) = full_path.parent() {
69 tokio::fs::create_dir_all(parent).await
70 .map_err(|e| super::super::Error::Write(parent.to_path_buf(), e))?;
71 }
72 tokio::fs::write(&full_path, file.content).await
73 .map_err(|e| super::super::Error::Write(full_path, e))
74 }
75 })).await?;
76
77 Ok(())
78 }
79}