Skip to main content

agent_sdk_store_file/
agent_pool.rs

1use std::{collections::BTreeMap, path::PathBuf};
2
3use agent_sdk_core::{
4    AgentError, AgentPoolId, AgentPoolMember, AgentPoolSnapshot, AgentPoolStore,
5    AgentPoolStoreConfig, AgentPoolStoreCursor, AgentPoolStoreRecord, AgentPoolStoreRecordPayload,
6    AgentPoolStoreStream, AgentPoolStoredMessage, AgentPoolStoredWake, IdempotencyKey,
7    MessageReceipt, RunId, RunMessage, WakeCondition, WakeConditionId, WakeRegistration,
8    agent_pool::AgentPoolStoredWake as CoreStoredWake, event::CompiledEventFilter,
9};
10use serde::{Deserialize, Serialize};
11
12use crate::util::{read_json, root_join, safe_segment, write_json};
13
14#[derive(Clone, Debug)]
15/// Filesystem-backed agent-pool coordination store.
16pub struct FileAgentPoolStore {
17    root: PathBuf,
18}
19
20#[derive(Clone, Debug, Deserialize, Serialize)]
21struct PoolState {
22    config: AgentPoolStoreConfig,
23    created: bool,
24    members: BTreeMap<RunId, AgentPoolMember>,
25    messages: BTreeMap<String, AgentPoolStoredMessage>,
26    message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
27    wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
28    wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
29    records: Vec<AgentPoolStoreRecord>,
30    next_event_counter: u64,
31}
32
33impl PoolState {
34    fn new(config: AgentPoolStoreConfig) -> Self {
35        Self {
36            config,
37            created: false,
38            members: BTreeMap::new(),
39            messages: BTreeMap::new(),
40            message_dedupe: BTreeMap::new(),
41            wakes: BTreeMap::new(),
42            wake_dedupe: BTreeMap::new(),
43            records: Vec::new(),
44            next_event_counter: 0,
45        }
46    }
47
48    fn snapshot(&self, pool_id: AgentPoolId) -> AgentPoolSnapshot {
49        AgentPoolSnapshot {
50            pool_id,
51            created: self.created,
52            topics: self
53                .members
54                .values()
55                .flat_map(|member| member.topics.clone())
56                .collect(),
57            members: self.members.values().cloned().collect(),
58            message_policy: self.config.message_policy.clone(),
59            wake_policy: self.config.wake_policy.clone(),
60            policy_refs: self.config.policy_refs.clone(),
61            messages: self.messages.values().cloned().collect(),
62            wakes: self.wakes.values().cloned().collect(),
63            cursor: self.records.last().map(|record| record.cursor.clone()),
64        }
65    }
66}
67
68impl FileAgentPoolStore {
69    /// Creates an agent-pool store rooted under the provided directory.
70    pub fn new(root: impl Into<PathBuf>) -> Self {
71        Self { root: root.into() }
72    }
73
74    fn state_path(&self, pool_id: &AgentPoolId) -> PathBuf {
75        root_join(
76            &self.root,
77            &[
78                "agent_pools".to_string(),
79                safe_segment(pool_id.as_str()),
80                "state.json".to_string(),
81            ],
82        )
83    }
84
85    fn load_state(&self, pool_id: &AgentPoolId) -> Result<Option<PoolState>, AgentError> {
86        read_json(&self.state_path(pool_id))
87    }
88
89    fn save_state(&self, pool_id: &AgentPoolId, state: &PoolState) -> Result<(), AgentError> {
90        write_json(&self.state_path(pool_id), state)
91    }
92
93    fn with_state<T>(
94        &self,
95        pool_id: &AgentPoolId,
96        f: impl FnOnce(&mut PoolState) -> Result<T, AgentError>,
97    ) -> Result<T, AgentError> {
98        let mut state = self
99            .load_state(pool_id)?
100            .ok_or_else(|| AgentError::host_configuration_needed("agent pool is not open"))?;
101        let output = f(&mut state)?;
102        self.save_state(pool_id, &state)?;
103        Ok(output)
104    }
105
106    fn append_record(
107        pool_id: &AgentPoolId,
108        state: &mut PoolState,
109        payload: AgentPoolStoreRecordPayload,
110    ) -> AgentPoolStoreCursor {
111        let cursor = AgentPoolStoreCursor::new(state.records.len() as u64 + 1);
112        state.records.push(AgentPoolStoreRecord {
113            pool_id: pool_id.clone(),
114            cursor: cursor.clone(),
115            payload,
116        });
117        cursor
118    }
119}
120
121impl AgentPoolStore for FileAgentPoolStore {
122    fn open_pool(
123        &self,
124        pool_id: AgentPoolId,
125        config: AgentPoolStoreConfig,
126    ) -> Result<AgentPoolSnapshot, AgentError> {
127        let state = if let Some(existing) = self.load_state(&pool_id)? {
128            if existing.config != config {
129                return Err(AgentError::contract_violation(
130                    "agent pool store config conflicts with existing pool",
131                ));
132            }
133            existing
134        } else {
135            let mut state = PoolState::new(config.clone());
136            Self::append_record(
137                &pool_id,
138                &mut state,
139                AgentPoolStoreRecordPayload::PoolOpened { config },
140            );
141            state
142        };
143        let snapshot = state.snapshot(pool_id.clone());
144        self.save_state(&pool_id, &state)?;
145        Ok(snapshot)
146    }
147
148    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
149        self.load_state(pool_id)?
150            .map(|state| state.snapshot(pool_id.clone()))
151            .ok_or_else(|| AgentError::host_configuration_needed("agent pool is not open"))
152    }
153
154    fn record_pool_created(
155        &self,
156        pool_id: &AgentPoolId,
157    ) -> Result<AgentPoolStoreCursor, AgentError> {
158        self.with_state(pool_id, |state| {
159            state.created = true;
160            Ok(Self::append_record(
161                pool_id,
162                state,
163                AgentPoolStoreRecordPayload::PoolCreated,
164            ))
165        })
166    }
167
168    fn join_member(
169        &self,
170        pool_id: &AgentPoolId,
171        member: AgentPoolMember,
172    ) -> Result<AgentPoolStoreCursor, AgentError> {
173        self.with_state(pool_id, |state| {
174            state.members.insert(member.run_id.clone(), member.clone());
175            Ok(Self::append_record(
176                pool_id,
177                state,
178                AgentPoolStoreRecordPayload::MemberJoined { member },
179            ))
180        })
181    }
182
183    fn leave_member(
184        &self,
185        pool_id: &AgentPoolId,
186        run_id: &RunId,
187    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
188        self.with_state(pool_id, |state| {
189            let member = state.members.remove(run_id).ok_or_else(|| {
190                AgentError::contract_violation("run is not a member of this agent pool")
191            })?;
192            let cursor = Self::append_record(
193                pool_id,
194                state,
195                AgentPoolStoreRecordPayload::MemberLeft {
196                    member: member.clone(),
197                },
198            );
199            Ok((member, cursor))
200        })
201    }
202
203    fn message_receipt(
204        &self,
205        pool_id: &AgentPoolId,
206        idempotency_key: &IdempotencyKey,
207    ) -> Result<Option<MessageReceipt>, AgentError> {
208        Ok(self
209            .load_state(pool_id)?
210            .and_then(|state| state.message_dedupe.get(idempotency_key).cloned()))
211    }
212
213    fn record_message(
214        &self,
215        pool_id: &AgentPoolId,
216        message: RunMessage,
217        receipt: MessageReceipt,
218    ) -> Result<AgentPoolStoreCursor, AgentError> {
219        self.with_state(pool_id, |state| {
220            let stored = AgentPoolStoredMessage {
221                message: message.clone(),
222                receipt: receipt.clone(),
223            };
224            state
225                .messages
226                .insert(message.message_id.as_str().to_string(), stored.clone());
227            state
228                .message_dedupe
229                .insert(message.idempotency_key.clone(), receipt);
230            Ok(Self::append_record(
231                pool_id,
232                state,
233                AgentPoolStoreRecordPayload::RunMessage { stored },
234            ))
235        })
236    }
237
238    fn wake_registration(
239        &self,
240        pool_id: &AgentPoolId,
241        idempotency_key: &IdempotencyKey,
242    ) -> Result<Option<WakeRegistration>, AgentError> {
243        Ok(self
244            .load_state(pool_id)?
245            .and_then(|state| state.wake_dedupe.get(idempotency_key).cloned()))
246    }
247
248    fn wake(
249        &self,
250        pool_id: &AgentPoolId,
251        condition_id: &WakeConditionId,
252    ) -> Result<Option<CoreStoredWake>, AgentError> {
253        Ok(self
254            .load_state(pool_id)?
255            .and_then(|state| state.wakes.get(condition_id).cloned()))
256    }
257
258    fn record_wake(
259        &self,
260        pool_id: &AgentPoolId,
261        condition: WakeCondition,
262        compiled_filter: CompiledEventFilter,
263        registration: WakeRegistration,
264    ) -> Result<AgentPoolStoreCursor, AgentError> {
265        self.with_state(pool_id, |state| {
266            let stored = AgentPoolStoredWake {
267                condition: condition.clone(),
268                compiled_filter,
269                registration: registration.clone(),
270            };
271            state
272                .wakes
273                .insert(condition.condition_id.clone(), stored.clone());
274            state
275                .wake_dedupe
276                .insert(condition.idempotency_key.clone(), registration);
277            Ok(Self::append_record(
278                pool_id,
279                state,
280                AgentPoolStoreRecordPayload::Wake { stored },
281            ))
282        })
283    }
284
285    fn watch(
286        &self,
287        pool_id: &AgentPoolId,
288        cursor: Option<AgentPoolStoreCursor>,
289    ) -> Result<AgentPoolStoreStream, AgentError> {
290        let after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
291        let records = self
292            .load_state(pool_id)?
293            .map(|state| {
294                state
295                    .records
296                    .into_iter()
297                    .filter(|record| record.cursor.sequence > after)
298                    .collect::<Vec<_>>()
299            })
300            .unwrap_or_default();
301        Ok(AgentPoolStoreStream::new(records))
302    }
303
304    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
305        self.with_state(pool_id, |state| {
306            state.next_event_counter += 1;
307            Ok(state.next_event_counter)
308        })
309    }
310}