agent-sdk-store-postgres 0.1.0-alpha.4

Scripted Postgres-style durable store adapters for the Agent SDK.
Documentation
use agent_sdk_core::{
    AgentError, ArchiveCursor, CompiledEventFilter, EventArchive, EventArchiveReader, EventFrame,
    domain::ArchiveCursorId,
};
use serde_json::Value;

use crate::{
    PostgresStoreClient,
    util::{decode_row, json_value},
};

#[derive(Clone)]
pub struct PostgresEventArchive {
    client: PostgresStoreClient,
}

impl PostgresEventArchive {
    pub fn new(client: PostgresStoreClient) -> Self {
        Self { client }
    }

    pub fn append_frame(&self, frame: EventFrame) -> Result<ArchiveCursor, AgentError> {
        let response = self.client.execute(
            format!("insert into {} (store_scope, event_id, frame_json) values ($1, $2, $3) returning archive_seq", self.client.table("agent_sdk_event_archive")),
                vec![self.client.scope(), Value::String(frame.event.envelope.event_id.as_str().to_string()), json_value(&frame)?],
        )?;
        let seq = response
            .rows
            .first()
            .and_then(|row| row.get("archive_seq"))
            .and_then(Value::as_u64)
            .unwrap_or(1);
        Ok(ArchiveCursor {
            archive_id: ArchiveCursorId::new("archive.postgres.default"),
            position: format!("archive.{seq}"),
            event_id: Some(frame.event.envelope.event_id),
            watermark: None,
        })
    }
}

impl EventArchiveReader for PostgresEventArchive {
    fn frames_after(&self, cursor: Option<ArchiveCursor>) -> Result<Vec<EventFrame>, AgentError> {
        let after = cursor
            .and_then(|cursor| {
                cursor
                    .position
                    .strip_prefix("archive.")
                    .unwrap_or(&cursor.position)
                    .parse::<u64>()
                    .ok()
            })
            .unwrap_or(0);
        let response = self.client.execute(
            format!("select frame_json from {} where store_scope = $1 and archive_seq > $2 order by archive_seq asc", self.client.table("agent_sdk_event_archive")),
            vec![self.client.scope(), Value::from(after)],
        )?;
        response
            .rows
            .into_iter()
            .map(|row| decode_row(row, "frame_json"))
            .collect()
    }
}

impl EventArchive for PostgresEventArchive {
    fn replay_filtered_from_cursor(
        &self,
        filter: CompiledEventFilter,
        cursor: ArchiveCursor,
    ) -> Result<agent_sdk_core::AgentEventStream, AgentError> {
        let frames = self
            .frames_after(Some(cursor))?
            .into_iter()
            .filter(|frame| filter.matches_envelope(&frame.event.envelope))
            .collect::<Vec<_>>();
        Ok(agent_sdk_core::AgentEventStream::new(frames))
    }
}