agent_sdk_store_sqlite/
journal.rs1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4 AgentError, JournalCursor, JournalRecord, RunId, RunJournal, RunJournalReader,
5 journal::JOURNAL_SCHEMA_VERSION,
6};
7use rusqlite::params;
8
9use crate::util::{decode, encode, journal_error, open, sqlite_error};
10
11const SCHEMA: &str = "
12CREATE TABLE IF NOT EXISTS run_journal_records (
13 run_id TEXT NOT NULL,
14 journal_seq INTEGER NOT NULL,
15 record_id TEXT NOT NULL,
16 record_json TEXT NOT NULL,
17 PRIMARY KEY (run_id, journal_seq)
18);
19CREATE UNIQUE INDEX IF NOT EXISTS idx_run_journal_record_id
20ON run_journal_records(run_id, record_id);
21";
22
23#[derive(Clone, Debug)]
24pub struct SqliteRunJournal {
26 path: PathBuf,
27}
28
29impl SqliteRunJournal {
30 pub fn open(path: impl AsRef<Path>) -> Result<Self, AgentError> {
32 crate::util::init(path.as_ref(), SCHEMA)?;
33 Ok(Self {
34 path: path.as_ref().to_path_buf(),
35 })
36 }
37
38 pub fn path(&self) -> &Path {
40 &self.path
41 }
42}
43
44impl RunJournal for SqliteRunJournal {
45 fn append(&self, record: JournalRecord) -> Result<JournalCursor, AgentError> {
46 if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
47 return Err(journal_error("journal record schema version mismatch"));
48 }
49 let connection = open(&self.path)?;
50 if let Ok(existing_seq) = connection.query_row(
51 "SELECT journal_seq FROM run_journal_records
52 WHERE run_id = ?1 AND record_id = ?2",
53 params![record.run_id.as_str(), record.record_id],
54 |row| row.get::<_, i64>(0),
55 ) {
56 return Ok(JournalCursor::new(format!("journal.{existing_seq}")));
57 }
58 let latest_seq = connection
59 .query_row(
60 "SELECT COALESCE(MAX(journal_seq), 0)
61 FROM run_journal_records WHERE run_id = ?1",
62 params![record.run_id.as_str()],
63 |row| row.get::<_, i64>(0),
64 )
65 .map_err(sqlite_error)? as u64;
66 if record.journal_seq <= latest_seq {
67 return Err(journal_error(
68 "journal_seq must be strictly increasing for a run journal",
69 ));
70 }
71 connection
72 .execute(
73 "INSERT INTO run_journal_records
74 (run_id, journal_seq, record_id, record_json)
75 VALUES (?1, ?2, ?3, ?4)",
76 params![
77 record.run_id.as_str(),
78 record.journal_seq as i64,
79 record.record_id,
80 encode(&record)?,
81 ],
82 )
83 .map_err(sqlite_error)?;
84 Ok(JournalCursor::new(format!(
85 "journal.{}",
86 record.journal_seq
87 )))
88 }
89}
90
91impl RunJournalReader for SqliteRunJournal {
92 fn records_for_run(&self, run_id: &RunId) -> Result<Vec<JournalRecord>, AgentError> {
93 let connection = open(&self.path)?;
94 let mut statement = connection
95 .prepare(
96 "SELECT record_json FROM run_journal_records
97 WHERE run_id = ?1 ORDER BY journal_seq ASC",
98 )
99 .map_err(sqlite_error)?;
100 let rows = statement
101 .query_map(params![run_id.as_str()], |row| row.get::<_, String>(0))
102 .map_err(sqlite_error)?;
103 let mut records = Vec::new();
104 for row in rows {
105 records.push(decode(&row.map_err(sqlite_error)?)?);
106 }
107 Ok(records)
108 }
109}