Skip to main content

agent_sdk_store_file/
journal.rs

1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4    AgentError, JournalCursor, JournalRecord, RunId, RunJournal, RunJournalReader,
5    journal::JOURNAL_SCHEMA_VERSION,
6};
7
8use crate::util::{append_json_line, journal_error, read_json_lines, root_join, safe_segment};
9
10#[derive(Clone, Debug)]
11/// Filesystem-backed run journal adapter.
12pub struct FileRunJournal {
13    root: PathBuf,
14}
15
16impl FileRunJournal {
17    /// Creates a journal adapter rooted under the provided directory.
18    pub fn new(root: impl Into<PathBuf>) -> Self {
19        Self { root: root.into() }
20    }
21
22    /// Returns the path used for one run's append-only journal file.
23    pub fn journal_path(&self, run_id: &RunId) -> PathBuf {
24        root_join(
25            &self.root,
26            &[
27                "runs".to_string(),
28                safe_segment(run_id.as_str()),
29                "journal.ndjson".to_string(),
30            ],
31        )
32    }
33
34    fn records_at(path: &Path) -> Result<Vec<JournalRecord>, AgentError> {
35        read_json_lines(path).map_err(|error| journal_error(error.context().message))
36    }
37}
38
39impl RunJournal for FileRunJournal {
40    fn append(&self, record: JournalRecord) -> Result<JournalCursor, AgentError> {
41        if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
42            return Err(journal_error("journal record schema version mismatch"));
43        }
44        let path = self.journal_path(&record.run_id);
45        let records = Self::records_at(&path)?;
46        if let Some(existing) = records
47            .iter()
48            .find(|existing| existing.record_id == record.record_id)
49        {
50            return Ok(JournalCursor::new(format!(
51                "journal.{}",
52                existing.journal_seq
53            )));
54        }
55        if records
56            .last()
57            .is_some_and(|existing| record.journal_seq <= existing.journal_seq)
58        {
59            return Err(journal_error(
60                "journal_seq must be strictly increasing for a run journal",
61            ));
62        }
63        append_json_line(&path, &record).map_err(|error| journal_error(error.context().message))?;
64        Ok(JournalCursor::new(format!(
65            "journal.{}",
66            record.journal_seq
67        )))
68    }
69}
70
71impl RunJournalReader for FileRunJournal {
72    fn records_for_run(&self, run_id: &RunId) -> Result<Vec<JournalRecord>, AgentError> {
73        let mut records = Self::records_at(&self.journal_path(run_id))?
74            .into_iter()
75            .filter(|record| &record.run_id == run_id)
76            .collect::<Vec<_>>();
77        records.sort_by_key(|record| record.journal_seq);
78        Ok(records)
79    }
80}