Skip to main content

agent_sdk_store_postgres/
event_archive.rs

1use agent_sdk_core::{
2    AgentError, ArchiveCursor, CompiledEventFilter, EventArchive, EventArchiveReader, EventFrame,
3    domain::ArchiveCursorId,
4};
5use serde_json::Value;
6
7use crate::{
8    PostgresStoreClient,
9    util::{decode_row, json_value},
10};
11
12#[derive(Clone)]
13pub struct PostgresEventArchive {
14    client: PostgresStoreClient,
15}
16
17impl PostgresEventArchive {
18    pub fn new(client: PostgresStoreClient) -> Self {
19        Self { client }
20    }
21
22    pub fn append_frame(&self, frame: EventFrame) -> Result<ArchiveCursor, AgentError> {
23        let response = self.client.execute(
24            format!("insert into {} (store_scope, event_id, frame_json) values ($1, $2, $3) returning archive_seq", self.client.table("agent_sdk_event_archive")),
25                vec![self.client.scope(), Value::String(frame.event.envelope.event_id.as_str().to_string()), json_value(&frame)?],
26        )?;
27        let seq = response
28            .rows
29            .first()
30            .and_then(|row| row.get("archive_seq"))
31            .and_then(Value::as_u64)
32            .unwrap_or(1);
33        Ok(ArchiveCursor {
34            archive_id: ArchiveCursorId::new("archive.postgres.default"),
35            position: format!("archive.{seq}"),
36            event_id: Some(frame.event.envelope.event_id),
37            watermark: None,
38        })
39    }
40}
41
42impl EventArchiveReader for PostgresEventArchive {
43    fn frames_after(&self, cursor: Option<ArchiveCursor>) -> Result<Vec<EventFrame>, AgentError> {
44        let after = cursor
45            .and_then(|cursor| {
46                cursor
47                    .position
48                    .strip_prefix("archive.")
49                    .unwrap_or(&cursor.position)
50                    .parse::<u64>()
51                    .ok()
52            })
53            .unwrap_or(0);
54        let response = self.client.execute(
55            format!("select frame_json from {} where store_scope = $1 and archive_seq > $2 order by archive_seq asc", self.client.table("agent_sdk_event_archive")),
56            vec![self.client.scope(), Value::from(after)],
57        )?;
58        response
59            .rows
60            .into_iter()
61            .map(|row| decode_row(row, "frame_json"))
62            .collect()
63    }
64}
65
66impl EventArchive for PostgresEventArchive {
67    fn replay_filtered_from_cursor(
68        &self,
69        filter: CompiledEventFilter,
70        cursor: ArchiveCursor,
71    ) -> Result<agent_sdk_core::AgentEventStream, AgentError> {
72        let frames = self
73            .frames_after(Some(cursor))?
74            .into_iter()
75            .filter(|frame| filter.matches_envelope(&frame.event.envelope))
76            .collect::<Vec<_>>();
77        Ok(agent_sdk_core::AgentEventStream::new(frames))
78    }
79}