agent-sdk-store-postgres 0.1.0-alpha.4

Scripted Postgres-style durable store adapters for the Agent SDK.
Documentation
use agent_sdk_core::{
    AgentError, AgentPoolId, AgentPoolMember, AgentPoolMessagePolicy, AgentPoolSnapshot,
    AgentPoolStore, AgentPoolStoreConfig, AgentPoolStoreCursor, AgentPoolStoreRecord,
    AgentPoolStoreRecordPayload, AgentPoolStoreStream, AgentPoolStoredWake, AgentPoolWakePolicy,
    CompiledEventFilter, IdempotencyKey, MessageReceipt, RunId, RunMessage, WakeCondition,
    WakeConditionId, WakeRegistration,
};
use serde_json::{Value, json};

use crate::{PostgresStoreClient, util::json_value};

#[derive(Clone)]
pub struct PostgresAgentPoolStore {
    client: PostgresStoreClient,
}

impl PostgresAgentPoolStore {
    pub fn new(client: PostgresStoreClient) -> Self {
        Self { client }
    }
}

impl AgentPoolStore for PostgresAgentPoolStore {
    fn open_pool(
        &self,
        pool_id: AgentPoolId,
        config: AgentPoolStoreConfig,
    ) -> Result<AgentPoolSnapshot, AgentError> {
        self.client.execute(
            format!(
                "select state_json from {} where store_scope = $1 and pool_id = $2",
                self.client.table("agent_sdk_agent_pools")
            ),
            vec![
                self.client.scope(),
                Value::String(pool_id.as_str().to_string()),
            ],
        )?;
        Ok(AgentPoolSnapshot {
            pool_id,
            created: false,
            members: Vec::new(),
            topics: Vec::new(),
            message_policy: config.message_policy,
            wake_policy: config.wake_policy,
            policy_refs: config.policy_refs,
            messages: Vec::new(),
            wakes: Vec::new(),
            cursor: None,
        })
    }

    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
        Ok(AgentPoolSnapshot {
            pool_id: pool_id.clone(),
            created: false,
            members: Vec::new(),
            topics: Vec::new(),
            message_policy: AgentPoolMessagePolicy::bounded_defaults(),
            wake_policy: AgentPoolWakePolicy::safe_defaults(),
            policy_refs: Vec::new(),
            messages: Vec::new(),
            wakes: Vec::new(),
            cursor: None,
        })
    }

    fn record_pool_created(
        &self,
        pool_id: &AgentPoolId,
    ) -> Result<AgentPoolStoreCursor, AgentError> {
        self.append_pool_record(pool_id, json!({"kind": "pool_created"}))
    }

    fn join_member(
        &self,
        pool_id: &AgentPoolId,
        member: AgentPoolMember,
    ) -> Result<AgentPoolStoreCursor, AgentError> {
        self.append_pool_record(
            pool_id,
            json_value(&AgentPoolStoreRecordPayload::MemberJoined { member })?,
        )
    }

    fn leave_member(
        &self,
        _pool_id: &AgentPoolId,
        _run_id: &RunId,
    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
        Err(AgentError::contract_violation(
            "scripted Postgres agent pool leave_member requires caller-provided row fixture",
        ))
    }

    fn message_receipt(
        &self,
        _pool_id: &AgentPoolId,
        _idempotency_key: &IdempotencyKey,
    ) -> Result<Option<MessageReceipt>, AgentError> {
        Ok(None)
    }

    fn record_message(
        &self,
        pool_id: &AgentPoolId,
        message: RunMessage,
        receipt: MessageReceipt,
    ) -> Result<AgentPoolStoreCursor, AgentError> {
        self.append_pool_record(
            pool_id,
            json_value(&AgentPoolStoreRecordPayload::RunMessage {
                stored: agent_sdk_core::AgentPoolStoredMessage { message, receipt },
            })?,
        )
    }

    fn wake_registration(
        &self,
        _pool_id: &AgentPoolId,
        _idempotency_key: &IdempotencyKey,
    ) -> Result<Option<WakeRegistration>, AgentError> {
        Ok(None)
    }

    fn wake(
        &self,
        _pool_id: &AgentPoolId,
        _condition_id: &WakeConditionId,
    ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
        Ok(None)
    }

    fn record_wake(
        &self,
        pool_id: &AgentPoolId,
        condition: WakeCondition,
        compiled_filter: CompiledEventFilter,
        registration: WakeRegistration,
    ) -> Result<AgentPoolStoreCursor, AgentError> {
        let wake = AgentPoolStoredWake {
            condition,
            compiled_filter,
            registration,
        };
        self.append_pool_record(
            pool_id,
            json_value(&AgentPoolStoreRecordPayload::Wake { stored: wake })?,
        )
    }

    fn watch(
        &self,
        _pool_id: &AgentPoolId,
        _cursor: Option<AgentPoolStoreCursor>,
    ) -> Result<AgentPoolStoreStream, AgentError> {
        Ok(AgentPoolStoreStream::new(Vec::<AgentPoolStoreRecord>::new()))
    }

    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
        let response = self.client.execute(
            format!(
                "select {}.next_agent_pool_event_sequence($1, $2) as next_sequence",
                self.client.config.schema
            ),
            vec![
                self.client.scope(),
                Value::String(pool_id.as_str().to_string()),
            ],
        )?;
        Ok(response
            .rows
            .first()
            .and_then(|row| row.get("next_sequence"))
            .and_then(Value::as_u64)
            .unwrap_or(1))
    }
}

impl PostgresAgentPoolStore {
    fn append_pool_record(
        &self,
        pool_id: &AgentPoolId,
        payload: Value,
    ) -> Result<AgentPoolStoreCursor, AgentError> {
        let response = self.client.execute(
            format!("insert into {} (store_scope, pool_id, payload_json) values ($1, $2, $3) returning seq", self.client.table("agent_sdk_agent_pool_records")),
            vec![self.client.scope(), Value::String(pool_id.as_str().to_string()), payload],
        )?;
        let seq = response
            .rows
            .first()
            .and_then(|row| row.get("seq"))
            .and_then(Value::as_u64)
            .unwrap_or(1);
        Ok(AgentPoolStoreCursor::new(seq))
    }
}