agent_sdk_store_postgres/
event_archive.rs1use agent_sdk_core::{
2 AgentError, ArchiveCursor, CompiledEventFilter, EventArchive, EventArchiveReader, EventFrame,
3 domain::ArchiveCursorId,
4};
5use serde_json::Value;
6
7use crate::{
8 PostgresStoreClient,
9 util::{decode_row, json_value},
10};
11
12#[derive(Clone)]
13pub struct PostgresEventArchive {
14 client: PostgresStoreClient,
15}
16
17impl PostgresEventArchive {
18 pub fn new(client: PostgresStoreClient) -> Self {
19 Self { client }
20 }
21
22 pub fn append_frame(&self, frame: EventFrame) -> Result<ArchiveCursor, AgentError> {
23 let response = self.client.execute(
24 format!("insert into {} (store_scope, event_id, frame_json) values ($1, $2, $3) returning archive_seq", self.client.table("agent_sdk_event_archive")),
25 vec![self.client.scope(), Value::String(frame.event.envelope.event_id.as_str().to_string()), json_value(&frame)?],
26 )?;
27 let seq = response
28 .rows
29 .first()
30 .and_then(|row| row.get("archive_seq"))
31 .and_then(Value::as_u64)
32 .unwrap_or(1);
33 Ok(ArchiveCursor {
34 archive_id: ArchiveCursorId::new("archive.postgres.default"),
35 position: format!("archive.{seq}"),
36 event_id: Some(frame.event.envelope.event_id),
37 watermark: None,
38 })
39 }
40}
41
42impl EventArchiveReader for PostgresEventArchive {
43 fn frames_after(&self, cursor: Option<ArchiveCursor>) -> Result<Vec<EventFrame>, AgentError> {
44 let after = cursor
45 .and_then(|cursor| {
46 cursor
47 .position
48 .strip_prefix("archive.")
49 .unwrap_or(&cursor.position)
50 .parse::<u64>()
51 .ok()
52 })
53 .unwrap_or(0);
54 let response = self.client.execute(
55 format!("select frame_json from {} where store_scope = $1 and archive_seq > $2 order by archive_seq asc", self.client.table("agent_sdk_event_archive")),
56 vec![self.client.scope(), Value::from(after)],
57 )?;
58 response
59 .rows
60 .into_iter()
61 .map(|row| decode_row(row, "frame_json"))
62 .collect()
63 }
64}
65
66impl EventArchive for PostgresEventArchive {
67 fn replay_filtered_from_cursor(
68 &self,
69 filter: CompiledEventFilter,
70 cursor: ArchiveCursor,
71 ) -> Result<agent_sdk_core::AgentEventStream, AgentError> {
72 let frames = self
73 .frames_after(Some(cursor))?
74 .into_iter()
75 .filter(|frame| filter.matches_envelope(&frame.event.envelope))
76 .collect::<Vec<_>>();
77 Ok(agent_sdk_core::AgentEventStream::new(frames))
78 }
79}