1use std::{collections::BTreeMap, path::PathBuf};
2
3use agent_sdk_core::{
4 AgentError, AgentPoolId, AgentPoolMember, AgentPoolSnapshot, AgentPoolStore,
5 AgentPoolStoreConfig, AgentPoolStoreCursor, AgentPoolStoreRecord, AgentPoolStoreRecordPayload,
6 AgentPoolStoreStream, AgentPoolStoredMessage, AgentPoolStoredWake, IdempotencyKey,
7 MessageReceipt, RunId, RunMessage, WakeCondition, WakeConditionId, WakeRegistration,
8 agent_pool::AgentPoolStoredWake as CoreStoredWake, event::CompiledEventFilter,
9};
10use serde::{Deserialize, Serialize};
11
12use crate::util::{read_json, root_join, safe_segment, write_json};
13
14#[derive(Clone, Debug)]
15pub struct FileAgentPoolStore {
17 root: PathBuf,
18}
19
20#[derive(Clone, Debug, Deserialize, Serialize)]
21struct PoolState {
22 config: AgentPoolStoreConfig,
23 created: bool,
24 members: BTreeMap<RunId, AgentPoolMember>,
25 messages: BTreeMap<String, AgentPoolStoredMessage>,
26 message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
27 wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
28 wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
29 records: Vec<AgentPoolStoreRecord>,
30 next_event_counter: u64,
31}
32
33impl PoolState {
34 fn new(config: AgentPoolStoreConfig) -> Self {
35 Self {
36 config,
37 created: false,
38 members: BTreeMap::new(),
39 messages: BTreeMap::new(),
40 message_dedupe: BTreeMap::new(),
41 wakes: BTreeMap::new(),
42 wake_dedupe: BTreeMap::new(),
43 records: Vec::new(),
44 next_event_counter: 0,
45 }
46 }
47
48 fn snapshot(&self, pool_id: AgentPoolId) -> AgentPoolSnapshot {
49 AgentPoolSnapshot {
50 pool_id,
51 created: self.created,
52 topics: self
53 .members
54 .values()
55 .flat_map(|member| member.topics.clone())
56 .collect(),
57 members: self.members.values().cloned().collect(),
58 message_policy: self.config.message_policy.clone(),
59 wake_policy: self.config.wake_policy.clone(),
60 policy_refs: self.config.policy_refs.clone(),
61 messages: self.messages.values().cloned().collect(),
62 wakes: self.wakes.values().cloned().collect(),
63 cursor: self.records.last().map(|record| record.cursor.clone()),
64 }
65 }
66}
67
68impl FileAgentPoolStore {
69 pub fn new(root: impl Into<PathBuf>) -> Self {
71 Self { root: root.into() }
72 }
73
74 fn state_path(&self, pool_id: &AgentPoolId) -> PathBuf {
75 root_join(
76 &self.root,
77 &[
78 "agent_pools".to_string(),
79 safe_segment(pool_id.as_str()),
80 "state.json".to_string(),
81 ],
82 )
83 }
84
85 fn load_state(&self, pool_id: &AgentPoolId) -> Result<Option<PoolState>, AgentError> {
86 read_json(&self.state_path(pool_id))
87 }
88
89 fn save_state(&self, pool_id: &AgentPoolId, state: &PoolState) -> Result<(), AgentError> {
90 write_json(&self.state_path(pool_id), state)
91 }
92
93 fn with_state<T>(
94 &self,
95 pool_id: &AgentPoolId,
96 f: impl FnOnce(&mut PoolState) -> Result<T, AgentError>,
97 ) -> Result<T, AgentError> {
98 let mut state = self
99 .load_state(pool_id)?
100 .ok_or_else(|| AgentError::host_configuration_needed("agent pool is not open"))?;
101 let output = f(&mut state)?;
102 self.save_state(pool_id, &state)?;
103 Ok(output)
104 }
105
106 fn append_record(
107 pool_id: &AgentPoolId,
108 state: &mut PoolState,
109 payload: AgentPoolStoreRecordPayload,
110 ) -> AgentPoolStoreCursor {
111 let cursor = AgentPoolStoreCursor::new(state.records.len() as u64 + 1);
112 state.records.push(AgentPoolStoreRecord {
113 pool_id: pool_id.clone(),
114 cursor: cursor.clone(),
115 payload,
116 });
117 cursor
118 }
119}
120
121impl AgentPoolStore for FileAgentPoolStore {
122 fn open_pool(
123 &self,
124 pool_id: AgentPoolId,
125 config: AgentPoolStoreConfig,
126 ) -> Result<AgentPoolSnapshot, AgentError> {
127 let state = if let Some(existing) = self.load_state(&pool_id)? {
128 if existing.config != config {
129 return Err(AgentError::contract_violation(
130 "agent pool store config conflicts with existing pool",
131 ));
132 }
133 existing
134 } else {
135 let mut state = PoolState::new(config.clone());
136 Self::append_record(
137 &pool_id,
138 &mut state,
139 AgentPoolStoreRecordPayload::PoolOpened { config },
140 );
141 state
142 };
143 let snapshot = state.snapshot(pool_id.clone());
144 self.save_state(&pool_id, &state)?;
145 Ok(snapshot)
146 }
147
148 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
149 self.load_state(pool_id)?
150 .map(|state| state.snapshot(pool_id.clone()))
151 .ok_or_else(|| AgentError::host_configuration_needed("agent pool is not open"))
152 }
153
154 fn record_pool_created(
155 &self,
156 pool_id: &AgentPoolId,
157 ) -> Result<AgentPoolStoreCursor, AgentError> {
158 self.with_state(pool_id, |state| {
159 state.created = true;
160 Ok(Self::append_record(
161 pool_id,
162 state,
163 AgentPoolStoreRecordPayload::PoolCreated,
164 ))
165 })
166 }
167
168 fn join_member(
169 &self,
170 pool_id: &AgentPoolId,
171 member: AgentPoolMember,
172 ) -> Result<AgentPoolStoreCursor, AgentError> {
173 self.with_state(pool_id, |state| {
174 state.members.insert(member.run_id.clone(), member.clone());
175 Ok(Self::append_record(
176 pool_id,
177 state,
178 AgentPoolStoreRecordPayload::MemberJoined { member },
179 ))
180 })
181 }
182
183 fn leave_member(
184 &self,
185 pool_id: &AgentPoolId,
186 run_id: &RunId,
187 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
188 self.with_state(pool_id, |state| {
189 let member = state.members.remove(run_id).ok_or_else(|| {
190 AgentError::contract_violation("run is not a member of this agent pool")
191 })?;
192 let cursor = Self::append_record(
193 pool_id,
194 state,
195 AgentPoolStoreRecordPayload::MemberLeft {
196 member: member.clone(),
197 },
198 );
199 Ok((member, cursor))
200 })
201 }
202
203 fn message_receipt(
204 &self,
205 pool_id: &AgentPoolId,
206 idempotency_key: &IdempotencyKey,
207 ) -> Result<Option<MessageReceipt>, AgentError> {
208 Ok(self
209 .load_state(pool_id)?
210 .and_then(|state| state.message_dedupe.get(idempotency_key).cloned()))
211 }
212
213 fn record_message(
214 &self,
215 pool_id: &AgentPoolId,
216 message: RunMessage,
217 receipt: MessageReceipt,
218 ) -> Result<AgentPoolStoreCursor, AgentError> {
219 self.with_state(pool_id, |state| {
220 let stored = AgentPoolStoredMessage {
221 message: message.clone(),
222 receipt: receipt.clone(),
223 };
224 state
225 .messages
226 .insert(message.message_id.as_str().to_string(), stored.clone());
227 state
228 .message_dedupe
229 .insert(message.idempotency_key.clone(), receipt);
230 Ok(Self::append_record(
231 pool_id,
232 state,
233 AgentPoolStoreRecordPayload::RunMessage { stored },
234 ))
235 })
236 }
237
238 fn wake_registration(
239 &self,
240 pool_id: &AgentPoolId,
241 idempotency_key: &IdempotencyKey,
242 ) -> Result<Option<WakeRegistration>, AgentError> {
243 Ok(self
244 .load_state(pool_id)?
245 .and_then(|state| state.wake_dedupe.get(idempotency_key).cloned()))
246 }
247
248 fn wake(
249 &self,
250 pool_id: &AgentPoolId,
251 condition_id: &WakeConditionId,
252 ) -> Result<Option<CoreStoredWake>, AgentError> {
253 Ok(self
254 .load_state(pool_id)?
255 .and_then(|state| state.wakes.get(condition_id).cloned()))
256 }
257
258 fn record_wake(
259 &self,
260 pool_id: &AgentPoolId,
261 condition: WakeCondition,
262 compiled_filter: CompiledEventFilter,
263 registration: WakeRegistration,
264 ) -> Result<AgentPoolStoreCursor, AgentError> {
265 self.with_state(pool_id, |state| {
266 let stored = AgentPoolStoredWake {
267 condition: condition.clone(),
268 compiled_filter,
269 registration: registration.clone(),
270 };
271 state
272 .wakes
273 .insert(condition.condition_id.clone(), stored.clone());
274 state
275 .wake_dedupe
276 .insert(condition.idempotency_key.clone(), registration);
277 Ok(Self::append_record(
278 pool_id,
279 state,
280 AgentPoolStoreRecordPayload::Wake { stored },
281 ))
282 })
283 }
284
285 fn watch(
286 &self,
287 pool_id: &AgentPoolId,
288 cursor: Option<AgentPoolStoreCursor>,
289 ) -> Result<AgentPoolStoreStream, AgentError> {
290 let after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
291 let records = self
292 .load_state(pool_id)?
293 .map(|state| {
294 state
295 .records
296 .into_iter()
297 .filter(|record| record.cursor.sequence > after)
298 .collect::<Vec<_>>()
299 })
300 .unwrap_or_default();
301 Ok(AgentPoolStoreStream::new(records))
302 }
303
304 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
305 self.with_state(pool_id, |state| {
306 state.next_event_counter += 1;
307 Ok(state.next_event_counter)
308 })
309 }
310}