Skip to main content

objectiveai_sdk/filesystem/logs/
writer.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3
4use super::LogFile;
5
6/// Writes streaming chunks to the log file structure on disk.
7///
8/// `C` is the chunk type. The `produce` function pointer extracts
9/// [`LogFile`]s from each chunk.
10///
11/// Maintains a buffer of previously written file contents so that
12/// unchanged files are not rewritten on every chunk.
13pub struct LogWriter<C> {
14    logs_dir: PathBuf,
15    produce: fn(&C) -> Option<Vec<LogFile>>,
16    primary_id: Option<String>,
17    buffer: HashMap<String, Vec<u8>>,
18}
19
20impl<C> LogWriter<C> {
21    pub fn new(
22        logs_dir: PathBuf,
23        produce: fn(&C) -> Option<Vec<LogFile>>,
24    ) -> Self {
25        Self {
26            logs_dir,
27            produce,
28            primary_id: None,
29            buffer: HashMap::new(),
30        }
31    }
32
33    /// The ID of the primary (root) log entry.
34    ///
35    /// Returns `None` until at least one chunk has been written.
36    pub fn primary_id(&self) -> Option<&str> {
37        self.primary_id.as_deref()
38    }
39
40    /// Write a chunk to disk. Files whose content hasn't changed since the
41    /// last write are skipped.
42    pub async fn write(&mut self, chunk: &C) -> Result<(), super::super::Error> {
43        let files = match (self.produce)(chunk) {
44            Some(files) => files,
45            None => return Ok(()),
46        };
47
48        // The last file is always the root — capture its id on first write
49        if self.primary_id.is_none() {
50            if let Some(last) = files.last() {
51                self.primary_id = Some(last.id.clone());
52            }
53        }
54
55        // Filter out files whose content matches the buffer
56        let changed: Vec<LogFile> = files.into_iter().filter(|file| {
57            let path = file.path();
58            if self.buffer.get(&path).map_or(false, |prev| prev == &file.content) {
59                return false;
60            }
61            self.buffer.insert(path, file.content.clone());
62            true
63        }).collect();
64
65        futures::future::try_join_all(changed.into_iter().map(|file| {
66            let full_path = self.logs_dir.join(file.path());
67            async move {
68                if let Some(parent) = full_path.parent() {
69                    tokio::fs::create_dir_all(parent).await
70                        .map_err(|e| super::super::Error::Write(parent.to_path_buf(), e))?;
71                }
72                tokio::fs::write(&full_path, file.content).await
73                    .map_err(|e| super::super::Error::Write(full_path, e))
74            }
75        })).await?;
76
77        Ok(())
78    }
79}