Skip to main content

agent_sdk_store_postgres/
agent_pool.rs

1use agent_sdk_core::{
2    AgentError, AgentPoolId, AgentPoolMember, AgentPoolMessagePolicy, AgentPoolSnapshot,
3    AgentPoolStore, AgentPoolStoreConfig, AgentPoolStoreCursor, AgentPoolStoreRecord,
4    AgentPoolStoreRecordPayload, AgentPoolStoreStream, AgentPoolStoredWake, AgentPoolWakePolicy,
5    CompiledEventFilter, IdempotencyKey, MessageReceipt, RunId, RunMessage, WakeCondition,
6    WakeConditionId, WakeRegistration,
7};
8use serde_json::{Value, json};
9
10use crate::{PostgresStoreClient, util::json_value};
11
12#[derive(Clone)]
13pub struct PostgresAgentPoolStore {
14    client: PostgresStoreClient,
15}
16
17impl PostgresAgentPoolStore {
18    pub fn new(client: PostgresStoreClient) -> Self {
19        Self { client }
20    }
21}
22
23impl AgentPoolStore for PostgresAgentPoolStore {
24    fn open_pool(
25        &self,
26        pool_id: AgentPoolId,
27        config: AgentPoolStoreConfig,
28    ) -> Result<AgentPoolSnapshot, AgentError> {
29        self.client.execute(
30            format!(
31                "select state_json from {} where store_scope = $1 and pool_id = $2",
32                self.client.table("agent_sdk_agent_pools")
33            ),
34            vec![
35                self.client.scope(),
36                Value::String(pool_id.as_str().to_string()),
37            ],
38        )?;
39        Ok(AgentPoolSnapshot {
40            pool_id,
41            created: false,
42            members: Vec::new(),
43            topics: Vec::new(),
44            message_policy: config.message_policy,
45            wake_policy: config.wake_policy,
46            policy_refs: config.policy_refs,
47            messages: Vec::new(),
48            wakes: Vec::new(),
49            cursor: None,
50        })
51    }
52
53    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
54        Ok(AgentPoolSnapshot {
55            pool_id: pool_id.clone(),
56            created: false,
57            members: Vec::new(),
58            topics: Vec::new(),
59            message_policy: AgentPoolMessagePolicy::bounded_defaults(),
60            wake_policy: AgentPoolWakePolicy::safe_defaults(),
61            policy_refs: Vec::new(),
62            messages: Vec::new(),
63            wakes: Vec::new(),
64            cursor: None,
65        })
66    }
67
68    fn record_pool_created(
69        &self,
70        pool_id: &AgentPoolId,
71    ) -> Result<AgentPoolStoreCursor, AgentError> {
72        self.append_pool_record(pool_id, json!({"kind": "pool_created"}))
73    }
74
75    fn join_member(
76        &self,
77        pool_id: &AgentPoolId,
78        member: AgentPoolMember,
79    ) -> Result<AgentPoolStoreCursor, AgentError> {
80        self.append_pool_record(
81            pool_id,
82            json_value(&AgentPoolStoreRecordPayload::MemberJoined { member })?,
83        )
84    }
85
86    fn leave_member(
87        &self,
88        _pool_id: &AgentPoolId,
89        _run_id: &RunId,
90    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
91        Err(AgentError::contract_violation(
92            "scripted Postgres agent pool leave_member requires caller-provided row fixture",
93        ))
94    }
95
96    fn message_receipt(
97        &self,
98        _pool_id: &AgentPoolId,
99        _idempotency_key: &IdempotencyKey,
100    ) -> Result<Option<MessageReceipt>, AgentError> {
101        Ok(None)
102    }
103
104    fn record_message(
105        &self,
106        pool_id: &AgentPoolId,
107        message: RunMessage,
108        receipt: MessageReceipt,
109    ) -> Result<AgentPoolStoreCursor, AgentError> {
110        self.append_pool_record(
111            pool_id,
112            json_value(&AgentPoolStoreRecordPayload::RunMessage {
113                stored: agent_sdk_core::AgentPoolStoredMessage { message, receipt },
114            })?,
115        )
116    }
117
118    fn wake_registration(
119        &self,
120        _pool_id: &AgentPoolId,
121        _idempotency_key: &IdempotencyKey,
122    ) -> Result<Option<WakeRegistration>, AgentError> {
123        Ok(None)
124    }
125
126    fn wake(
127        &self,
128        _pool_id: &AgentPoolId,
129        _condition_id: &WakeConditionId,
130    ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
131        Ok(None)
132    }
133
134    fn record_wake(
135        &self,
136        pool_id: &AgentPoolId,
137        condition: WakeCondition,
138        compiled_filter: CompiledEventFilter,
139        registration: WakeRegistration,
140    ) -> Result<AgentPoolStoreCursor, AgentError> {
141        let wake = AgentPoolStoredWake {
142            condition,
143            compiled_filter,
144            registration,
145        };
146        self.append_pool_record(
147            pool_id,
148            json_value(&AgentPoolStoreRecordPayload::Wake { stored: wake })?,
149        )
150    }
151
152    fn watch(
153        &self,
154        _pool_id: &AgentPoolId,
155        _cursor: Option<AgentPoolStoreCursor>,
156    ) -> Result<AgentPoolStoreStream, AgentError> {
157        Ok(AgentPoolStoreStream::new(Vec::<AgentPoolStoreRecord>::new()))
158    }
159
160    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
161        let response = self.client.execute(
162            format!(
163                "select {}.next_agent_pool_event_sequence($1, $2) as next_sequence",
164                self.client.config.schema
165            ),
166            vec![
167                self.client.scope(),
168                Value::String(pool_id.as_str().to_string()),
169            ],
170        )?;
171        Ok(response
172            .rows
173            .first()
174            .and_then(|row| row.get("next_sequence"))
175            .and_then(Value::as_u64)
176            .unwrap_or(1))
177    }
178}
179
180impl PostgresAgentPoolStore {
181    fn append_pool_record(
182        &self,
183        pool_id: &AgentPoolId,
184        payload: Value,
185    ) -> Result<AgentPoolStoreCursor, AgentError> {
186        let response = self.client.execute(
187            format!("insert into {} (store_scope, pool_id, payload_json) values ($1, $2, $3) returning seq", self.client.table("agent_sdk_agent_pool_records")),
188            vec![self.client.scope(), Value::String(pool_id.as_str().to_string()), payload],
189        )?;
190        let seq = response
191            .rows
192            .first()
193            .and_then(|row| row.get("seq"))
194            .and_then(Value::as_u64)
195            .unwrap_or(1);
196        Ok(AgentPoolStoreCursor::new(seq))
197    }
198}