Skip to main content

agent_sdk_store_sqlite/
event_archive.rs

1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4    AgentError, AgentEventStream, ArchiveCursor, CompiledEventFilter, EventArchive,
5    EventArchiveReader, EventFrame, domain::ArchiveCursorId,
6};
7use rusqlite::params;
8
9use crate::util::{decode, encode, open, sqlite_error};
10
11const SCHEMA: &str = "
12CREATE TABLE IF NOT EXISTS event_archive_frames (
13    seq INTEGER PRIMARY KEY AUTOINCREMENT,
14    event_id TEXT NOT NULL,
15    frame_json TEXT NOT NULL
16);
17";
18
19#[derive(Clone, Debug)]
20/// SQLite-backed event archive.
21pub struct SqliteEventArchive {
22    path: PathBuf,
23    archive_id: ArchiveCursorId,
24}
25
26impl SqliteEventArchive {
27    /// Opens or creates a SQLite event archive.
28    pub fn open(path: impl AsRef<Path>) -> Result<Self, AgentError> {
29        crate::util::init(path.as_ref(), SCHEMA)?;
30        Ok(Self {
31            path: path.as_ref().to_path_buf(),
32            archive_id: ArchiveCursorId::new("archive.sqlite.default"),
33        })
34    }
35
36    /// Appends one event frame and assigns an archive cursor when absent.
37    pub fn append_frame(&self, mut frame: EventFrame) -> Result<ArchiveCursor, AgentError> {
38        let connection = open(&self.path)?;
39        let next_seq = connection
40            .query_row(
41                "SELECT COALESCE(MAX(seq), 0) + 1 FROM event_archive_frames",
42                [],
43                |row| row.get::<_, i64>(0),
44            )
45            .map_err(sqlite_error)?;
46        let cursor = frame
47            .archive_cursor
48            .clone()
49            .unwrap_or_else(|| ArchiveCursor {
50                archive_id: self.archive_id.clone(),
51                position: format!("archive.{next_seq}"),
52                event_id: Some(frame.event.envelope.event_id.clone()),
53                watermark: None,
54            });
55        frame.archive_cursor = Some(cursor.clone());
56        connection
57            .execute(
58                "INSERT INTO event_archive_frames (seq, event_id, frame_json)
59                 VALUES (?1, ?2, ?3)",
60                params![
61                    next_seq,
62                    frame.event.envelope.event_id.as_str(),
63                    encode(&frame)?,
64                ],
65            )
66            .map_err(sqlite_error)?;
67        Ok(cursor)
68    }
69}
70
71impl EventArchiveReader for SqliteEventArchive {
72    fn frames_after(&self, cursor: Option<ArchiveCursor>) -> Result<Vec<EventFrame>, AgentError> {
73        let start_after = cursor
74            .and_then(|cursor| {
75                cursor
76                    .position
77                    .strip_prefix("archive.")
78                    .unwrap_or(&cursor.position)
79                    .parse::<i64>()
80                    .ok()
81            })
82            .unwrap_or(0);
83        let connection = open(&self.path)?;
84        let mut statement = connection
85            .prepare(
86                "SELECT frame_json FROM event_archive_frames
87                 WHERE seq > ?1 ORDER BY seq ASC",
88            )
89            .map_err(sqlite_error)?;
90        let rows = statement
91            .query_map(params![start_after], |row| row.get::<_, String>(0))
92            .map_err(sqlite_error)?;
93        let mut frames = Vec::new();
94        for row in rows {
95            frames.push(decode(&row.map_err(sqlite_error)?)?);
96        }
97        Ok(frames)
98    }
99}
100
101impl EventArchive for SqliteEventArchive {
102    fn replay_filtered_from_cursor(
103        &self,
104        filter: CompiledEventFilter,
105        cursor: ArchiveCursor,
106    ) -> Result<AgentEventStream, AgentError> {
107        let frames = self
108            .frames_after(Some(cursor))?
109            .into_iter()
110            .filter(|frame| filter.matches_envelope(&frame.event.envelope))
111            .collect::<Vec<_>>();
112        Ok(AgentEventStream::new(frames))
113    }
114}