agent_sdk_store_postgres/
agent_pool.rs1use agent_sdk_core::{
2 AgentError, AgentPoolId, AgentPoolMember, AgentPoolMessagePolicy, AgentPoolSnapshot,
3 AgentPoolStore, AgentPoolStoreConfig, AgentPoolStoreCursor, AgentPoolStoreRecord,
4 AgentPoolStoreRecordPayload, AgentPoolStoreStream, AgentPoolStoredWake, AgentPoolWakePolicy,
5 CompiledEventFilter, IdempotencyKey, MessageReceipt, RunId, RunMessage, WakeCondition,
6 WakeConditionId, WakeRegistration,
7};
8use serde_json::{Value, json};
9
10use crate::{PostgresStoreClient, util::json_value};
11
12#[derive(Clone)]
13pub struct PostgresAgentPoolStore {
14 client: PostgresStoreClient,
15}
16
17impl PostgresAgentPoolStore {
18 pub fn new(client: PostgresStoreClient) -> Self {
19 Self { client }
20 }
21}
22
23impl AgentPoolStore for PostgresAgentPoolStore {
24 fn open_pool(
25 &self,
26 pool_id: AgentPoolId,
27 config: AgentPoolStoreConfig,
28 ) -> Result<AgentPoolSnapshot, AgentError> {
29 self.client.execute(
30 format!(
31 "select state_json from {} where store_scope = $1 and pool_id = $2",
32 self.client.table("agent_sdk_agent_pools")
33 ),
34 vec![
35 self.client.scope(),
36 Value::String(pool_id.as_str().to_string()),
37 ],
38 )?;
39 Ok(AgentPoolSnapshot {
40 pool_id,
41 created: false,
42 members: Vec::new(),
43 topics: Vec::new(),
44 message_policy: config.message_policy,
45 wake_policy: config.wake_policy,
46 policy_refs: config.policy_refs,
47 messages: Vec::new(),
48 wakes: Vec::new(),
49 cursor: None,
50 })
51 }
52
53 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
54 Ok(AgentPoolSnapshot {
55 pool_id: pool_id.clone(),
56 created: false,
57 members: Vec::new(),
58 topics: Vec::new(),
59 message_policy: AgentPoolMessagePolicy::bounded_defaults(),
60 wake_policy: AgentPoolWakePolicy::safe_defaults(),
61 policy_refs: Vec::new(),
62 messages: Vec::new(),
63 wakes: Vec::new(),
64 cursor: None,
65 })
66 }
67
68 fn record_pool_created(
69 &self,
70 pool_id: &AgentPoolId,
71 ) -> Result<AgentPoolStoreCursor, AgentError> {
72 self.append_pool_record(pool_id, json!({"kind": "pool_created"}))
73 }
74
75 fn join_member(
76 &self,
77 pool_id: &AgentPoolId,
78 member: AgentPoolMember,
79 ) -> Result<AgentPoolStoreCursor, AgentError> {
80 self.append_pool_record(
81 pool_id,
82 json_value(&AgentPoolStoreRecordPayload::MemberJoined { member })?,
83 )
84 }
85
86 fn leave_member(
87 &self,
88 _pool_id: &AgentPoolId,
89 _run_id: &RunId,
90 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
91 Err(AgentError::contract_violation(
92 "scripted Postgres agent pool leave_member requires caller-provided row fixture",
93 ))
94 }
95
96 fn message_receipt(
97 &self,
98 _pool_id: &AgentPoolId,
99 _idempotency_key: &IdempotencyKey,
100 ) -> Result<Option<MessageReceipt>, AgentError> {
101 Ok(None)
102 }
103
104 fn record_message(
105 &self,
106 pool_id: &AgentPoolId,
107 message: RunMessage,
108 receipt: MessageReceipt,
109 ) -> Result<AgentPoolStoreCursor, AgentError> {
110 self.append_pool_record(
111 pool_id,
112 json_value(&AgentPoolStoreRecordPayload::RunMessage {
113 stored: agent_sdk_core::AgentPoolStoredMessage { message, receipt },
114 })?,
115 )
116 }
117
118 fn wake_registration(
119 &self,
120 _pool_id: &AgentPoolId,
121 _idempotency_key: &IdempotencyKey,
122 ) -> Result<Option<WakeRegistration>, AgentError> {
123 Ok(None)
124 }
125
126 fn wake(
127 &self,
128 _pool_id: &AgentPoolId,
129 _condition_id: &WakeConditionId,
130 ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
131 Ok(None)
132 }
133
134 fn record_wake(
135 &self,
136 pool_id: &AgentPoolId,
137 condition: WakeCondition,
138 compiled_filter: CompiledEventFilter,
139 registration: WakeRegistration,
140 ) -> Result<AgentPoolStoreCursor, AgentError> {
141 let wake = AgentPoolStoredWake {
142 condition,
143 compiled_filter,
144 registration,
145 };
146 self.append_pool_record(
147 pool_id,
148 json_value(&AgentPoolStoreRecordPayload::Wake { stored: wake })?,
149 )
150 }
151
152 fn watch(
153 &self,
154 _pool_id: &AgentPoolId,
155 _cursor: Option<AgentPoolStoreCursor>,
156 ) -> Result<AgentPoolStoreStream, AgentError> {
157 Ok(AgentPoolStoreStream::new(Vec::<AgentPoolStoreRecord>::new()))
158 }
159
160 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
161 let response = self.client.execute(
162 format!(
163 "select {}.next_agent_pool_event_sequence($1, $2) as next_sequence",
164 self.client.config.schema
165 ),
166 vec![
167 self.client.scope(),
168 Value::String(pool_id.as_str().to_string()),
169 ],
170 )?;
171 Ok(response
172 .rows
173 .first()
174 .and_then(|row| row.get("next_sequence"))
175 .and_then(Value::as_u64)
176 .unwrap_or(1))
177 }
178}
179
180impl PostgresAgentPoolStore {
181 fn append_pool_record(
182 &self,
183 pool_id: &AgentPoolId,
184 payload: Value,
185 ) -> Result<AgentPoolStoreCursor, AgentError> {
186 let response = self.client.execute(
187 format!("insert into {} (store_scope, pool_id, payload_json) values ($1, $2, $3) returning seq", self.client.table("agent_sdk_agent_pool_records")),
188 vec![self.client.scope(), Value::String(pool_id.as_str().to_string()), payload],
189 )?;
190 let seq = response
191 .rows
192 .first()
193 .and_then(|row| row.get("seq"))
194 .and_then(Value::as_u64)
195 .unwrap_or(1);
196 Ok(AgentPoolStoreCursor::new(seq))
197 }
198}