agent_sdk_store_sqlite/
event_archive.rs1use std::path::{Path, PathBuf};
2
3use agent_sdk_core::{
4 AgentError, AgentEventStream, ArchiveCursor, CompiledEventFilter, EventArchive,
5 EventArchiveReader, EventFrame, domain::ArchiveCursorId,
6};
7use rusqlite::params;
8
9use crate::util::{decode, encode, open, sqlite_error};
10
11const SCHEMA: &str = "
12CREATE TABLE IF NOT EXISTS event_archive_frames (
13 seq INTEGER PRIMARY KEY AUTOINCREMENT,
14 event_id TEXT NOT NULL,
15 frame_json TEXT NOT NULL
16);
17";
18
19#[derive(Clone, Debug)]
20pub struct SqliteEventArchive {
22 path: PathBuf,
23 archive_id: ArchiveCursorId,
24}
25
26impl SqliteEventArchive {
27 pub fn open(path: impl AsRef<Path>) -> Result<Self, AgentError> {
29 crate::util::init(path.as_ref(), SCHEMA)?;
30 Ok(Self {
31 path: path.as_ref().to_path_buf(),
32 archive_id: ArchiveCursorId::new("archive.sqlite.default"),
33 })
34 }
35
36 pub fn append_frame(&self, mut frame: EventFrame) -> Result<ArchiveCursor, AgentError> {
38 let connection = open(&self.path)?;
39 let next_seq = connection
40 .query_row(
41 "SELECT COALESCE(MAX(seq), 0) + 1 FROM event_archive_frames",
42 [],
43 |row| row.get::<_, i64>(0),
44 )
45 .map_err(sqlite_error)?;
46 let cursor = frame
47 .archive_cursor
48 .clone()
49 .unwrap_or_else(|| ArchiveCursor {
50 archive_id: self.archive_id.clone(),
51 position: format!("archive.{next_seq}"),
52 event_id: Some(frame.event.envelope.event_id.clone()),
53 watermark: None,
54 });
55 frame.archive_cursor = Some(cursor.clone());
56 connection
57 .execute(
58 "INSERT INTO event_archive_frames (seq, event_id, frame_json)
59 VALUES (?1, ?2, ?3)",
60 params![
61 next_seq,
62 frame.event.envelope.event_id.as_str(),
63 encode(&frame)?,
64 ],
65 )
66 .map_err(sqlite_error)?;
67 Ok(cursor)
68 }
69}
70
71impl EventArchiveReader for SqliteEventArchive {
72 fn frames_after(&self, cursor: Option<ArchiveCursor>) -> Result<Vec<EventFrame>, AgentError> {
73 let start_after = cursor
74 .and_then(|cursor| {
75 cursor
76 .position
77 .strip_prefix("archive.")
78 .unwrap_or(&cursor.position)
79 .parse::<i64>()
80 .ok()
81 })
82 .unwrap_or(0);
83 let connection = open(&self.path)?;
84 let mut statement = connection
85 .prepare(
86 "SELECT frame_json FROM event_archive_frames
87 WHERE seq > ?1 ORDER BY seq ASC",
88 )
89 .map_err(sqlite_error)?;
90 let rows = statement
91 .query_map(params![start_after], |row| row.get::<_, String>(0))
92 .map_err(sqlite_error)?;
93 let mut frames = Vec::new();
94 for row in rows {
95 frames.push(decode(&row.map_err(sqlite_error)?)?);
96 }
97 Ok(frames)
98 }
99}
100
101impl EventArchive for SqliteEventArchive {
102 fn replay_filtered_from_cursor(
103 &self,
104 filter: CompiledEventFilter,
105 cursor: ArchiveCursor,
106 ) -> Result<AgentEventStream, AgentError> {
107 let frames = self
108 .frames_after(Some(cursor))?
109 .into_iter()
110 .filter(|frame| filter.matches_envelope(&frame.event.envelope))
111 .collect::<Vec<_>>();
112 Ok(AgentEventStream::new(frames))
113 }
114}