Skip to main content

agent_sdk_store_sqlite/
journal.rs

1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4    AgentError, JournalCursor, JournalRecord, RunId, RunJournal, RunJournalReader,
5    journal::JOURNAL_SCHEMA_VERSION,
6};
7use rusqlite::params;
8
9use crate::util::{decode, encode, journal_error, open, sqlite_error};
10
11const SCHEMA: &str = "
12CREATE TABLE IF NOT EXISTS run_journal_records (
13    run_id TEXT NOT NULL,
14    journal_seq INTEGER NOT NULL,
15    record_id TEXT NOT NULL,
16    record_json TEXT NOT NULL,
17    PRIMARY KEY (run_id, journal_seq)
18);
19CREATE UNIQUE INDEX IF NOT EXISTS idx_run_journal_record_id
20ON run_journal_records(run_id, record_id);
21";
22
23#[derive(Clone, Debug)]
24/// SQLite-backed run journal adapter.
25pub struct SqliteRunJournal {
26    path: PathBuf,
27}
28
29impl SqliteRunJournal {
30    /// Opens or creates a SQLite run journal.
31    pub fn open(path: impl AsRef<Path>) -> Result<Self, AgentError> {
32        crate::util::init(path.as_ref(), SCHEMA)?;
33        Ok(Self {
34            path: path.as_ref().to_path_buf(),
35        })
36    }
37
38    /// Returns the backing SQLite database path.
39    pub fn path(&self) -> &Path {
40        &self.path
41    }
42}
43
44impl RunJournal for SqliteRunJournal {
45    fn append(&self, record: JournalRecord) -> Result<JournalCursor, AgentError> {
46        if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
47            return Err(journal_error("journal record schema version mismatch"));
48        }
49        let connection = open(&self.path)?;
50        if let Ok(existing_seq) = connection.query_row(
51            "SELECT journal_seq FROM run_journal_records
52             WHERE run_id = ?1 AND record_id = ?2",
53            params![record.run_id.as_str(), record.record_id],
54            |row| row.get::<_, i64>(0),
55        ) {
56            return Ok(JournalCursor::new(format!("journal.{existing_seq}")));
57        }
58        let latest_seq = connection
59            .query_row(
60                "SELECT COALESCE(MAX(journal_seq), 0)
61                 FROM run_journal_records WHERE run_id = ?1",
62                params![record.run_id.as_str()],
63                |row| row.get::<_, i64>(0),
64            )
65            .map_err(sqlite_error)? as u64;
66        if record.journal_seq <= latest_seq {
67            return Err(journal_error(
68                "journal_seq must be strictly increasing for a run journal",
69            ));
70        }
71        connection
72            .execute(
73                "INSERT INTO run_journal_records
74                 (run_id, journal_seq, record_id, record_json)
75                 VALUES (?1, ?2, ?3, ?4)",
76                params![
77                    record.run_id.as_str(),
78                    record.journal_seq as i64,
79                    record.record_id,
80                    encode(&record)?,
81                ],
82            )
83            .map_err(sqlite_error)?;
84        Ok(JournalCursor::new(format!(
85            "journal.{}",
86            record.journal_seq
87        )))
88    }
89}
90
91impl RunJournalReader for SqliteRunJournal {
92    fn records_for_run(&self, run_id: &RunId) -> Result<Vec<JournalRecord>, AgentError> {
93        let connection = open(&self.path)?;
94        let mut statement = connection
95            .prepare(
96                "SELECT record_json FROM run_journal_records
97                 WHERE run_id = ?1 ORDER BY journal_seq ASC",
98            )
99            .map_err(sqlite_error)?;
100        let rows = statement
101            .query_map(params![run_id.as_str()], |row| row.get::<_, String>(0))
102            .map_err(sqlite_error)?;
103        let mut records = Vec::new();
104        for row in rows {
105            records.push(decode(&row.map_err(sqlite_error)?)?);
106        }
107        Ok(records)
108    }
109}