agent_sdk_store_file/
journal.rs1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4 AgentError, JournalCursor, JournalRecord, RunId, RunJournal, RunJournalReader,
5 journal::JOURNAL_SCHEMA_VERSION,
6};
7
8use crate::util::{append_json_line, journal_error, read_json_lines, root_join, safe_segment};
9
10#[derive(Clone, Debug)]
11pub struct FileRunJournal {
13 root: PathBuf,
14}
15
16impl FileRunJournal {
17 pub fn new(root: impl Into<PathBuf>) -> Self {
19 Self { root: root.into() }
20 }
21
22 pub fn journal_path(&self, run_id: &RunId) -> PathBuf {
24 root_join(
25 &self.root,
26 &[
27 "runs".to_string(),
28 safe_segment(run_id.as_str()),
29 "journal.ndjson".to_string(),
30 ],
31 )
32 }
33
34 fn records_at(path: &Path) -> Result<Vec<JournalRecord>, AgentError> {
35 read_json_lines(path).map_err(|error| journal_error(error.context().message))
36 }
37}
38
39impl RunJournal for FileRunJournal {
40 fn append(&self, record: JournalRecord) -> Result<JournalCursor, AgentError> {
41 if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
42 return Err(journal_error("journal record schema version mismatch"));
43 }
44 let path = self.journal_path(&record.run_id);
45 let records = Self::records_at(&path)?;
46 if let Some(existing) = records
47 .iter()
48 .find(|existing| existing.record_id == record.record_id)
49 {
50 return Ok(JournalCursor::new(format!(
51 "journal.{}",
52 existing.journal_seq
53 )));
54 }
55 if records
56 .last()
57 .is_some_and(|existing| record.journal_seq <= existing.journal_seq)
58 {
59 return Err(journal_error(
60 "journal_seq must be strictly increasing for a run journal",
61 ));
62 }
63 append_json_line(&path, &record).map_err(|error| journal_error(error.context().message))?;
64 Ok(JournalCursor::new(format!(
65 "journal.{}",
66 record.journal_seq
67 )))
68 }
69}
70
71impl RunJournalReader for FileRunJournal {
72 fn records_for_run(&self, run_id: &RunId) -> Result<Vec<JournalRecord>, AgentError> {
73 let mut records = Self::records_at(&self.journal_path(run_id))?
74 .into_iter()
75 .filter(|record| &record.run_id == run_id)
76 .collect::<Vec<_>>();
77 records.sort_by_key(|record| record.journal_seq);
78 Ok(records)
79 }
80}