Skip to main content

agent_sdk_core/application/
agent_pool.rs

1//! Feature-layer agent-pool coordination over runs, messages, wake conditions, and
2//! subscriptions. Use this module for generic run-to-run coordination without
3//! introducing workflow-engine or product swarm behavior. Side-effecting operations may
4//! update pool membership, append source-run journal records, and publish agent-pool
5//! events through the configured runtime ports.
6//!
7use std::{
8    collections::{BTreeMap, BTreeSet, VecDeque},
9    sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    domain::{
16        AgentError, AgentErrorKind, AgentId, AgentPoolId, ContentRef, DestinationKind,
17        DestinationRef, EffectId, EntityRef, EventId, IdempotencyKey, MessageId, PolicyRef,
18        PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef, SpanId, TopicId, TraceId,
19        WakeConditionId,
20    },
21    effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
22    event::{
23        AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
24        EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
25        EventFilterSet, EventFrame, EventKind, EventStreamScope, PayloadAccessMode,
26    },
27    event_bus::AgentEventStream,
28    journal::{
29        AgentPoolLifecycleStatus, AgentPoolRecord, EventIndexProjection, JOURNAL_SCHEMA_VERSION,
30        JournalCursor, JournalRecord, JournalRecordKind, JournalRecordPayload,
31        RunMessageAddressTargetRecord, RunMessageDeliveryStatus, RunMessageRecord, WakeRecord,
32        WakeResumeInputPolicyRecord, WakeTriggerStatus,
33    },
34    run::RunRequest,
35    run_handle::RunHandle,
36    runtime::AgentRuntime,
37};
38
39#[derive(Clone)]
40/// Holds agent pool application-layer state or configuration.
41/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
42pub struct AgentPool {
43    pool_id: AgentPoolId,
44    runtime: AgentRuntime,
45    store: Arc<dyn AgentPoolStore>,
46}
47
48impl AgentPool {
49    /// Starts a builder for this application::agent_pool value.
50    /// Building is data-only; runtime side effects occur only when a
51    /// later coordinator or host port executes the built configuration.
52    pub fn builder(pool_id: AgentPoolId) -> AgentPoolBuilder {
53        AgentPoolBuilder {
54            pool_id,
55            runtime: None,
56            message_policy: AgentPoolMessagePolicy::bounded_defaults(),
57            wake_policy: AgentPoolWakePolicy::safe_defaults(),
58            policy_refs: Vec::new(),
59            store: None,
60        }
61    }
62
63    /// Returns the pool id currently held by this value.
64    /// This is a data-only accessor and does not change membership or wake state.
65    pub fn pool_id(&self) -> &AgentPoolId {
66        &self.pool_id
67    }
68
69    /// Starts a run through the shared runtime and joins it to this pool.
70    /// Runtime registration and provider-loop effects stay in `AgentRuntime`;
71    /// the pool side effect is membership tracking for coordination.
72    pub fn start_run(&self, request: RunRequest) -> Result<RunHandle, AgentError> {
73        let handle = self.runtime.start_run(request.clone())?;
74        self.join_run(AgentPoolMember::new(request.run_id, request.agent_id))?;
75        Ok(handle)
76    }
77
78    /// Join run.
79    /// This records pool membership in the coordinator so later pool messages and subscriptions
80    /// can target the run.
81    pub fn join_run(&self, member: AgentPoolMember) -> Result<(), AgentError> {
82        let should_create = {
83            let snapshot = self.snapshot()?;
84            !snapshot.created
85        };
86
87        if should_create {
88            self.append_pool_record(
89                &member.run_id,
90                &member.agent_id,
91                AgentPoolLifecycleStatus::Created,
92                EventKind::AgentPoolCreated,
93            )?;
94            self.store.record_pool_created(&self.pool_id)?;
95        }
96
97        self.store.join_member(&self.pool_id, member.clone())?;
98
99        self.append_pool_record(
100            &member.run_id,
101            &member.agent_id,
102            AgentPoolLifecycleStatus::RunJoined,
103            EventKind::AgentPoolRunJoined,
104        )?;
105        Ok(())
106    }
107
108    /// Returns the members currently held by this value.
109    /// This reads current pool membership without starting, stopping, or messaging runs.
110    pub fn members(&self) -> Result<Vec<AgentPoolMember>, AgentError> {
111        Ok(self.snapshot()?.members)
112    }
113
114    /// Records that a run has left the pool.
115    /// This removes membership from the shared store, appends a lifecycle record, and publishes
116    /// a pool event. It does not cancel or otherwise mutate the run itself.
117    pub fn leave_run(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
118        let (member, _) = self.store.leave_member(&self.pool_id, run_id)?;
119        self.append_pool_record(
120            &member.run_id,
121            &member.agent_id,
122            AgentPoolLifecycleStatus::RunLeft,
123            EventKind::AgentPoolRunLeft,
124        )?;
125        Ok(member)
126    }
127
128    /// Sends a run message through the pool coordinator.
129    /// This resolves the addressed members, applies pool message policy, appends accepted
130    /// and terminal delivery records to the source run journal, publishes the matching
131    /// agent-pool events, and deduplicates repeated calls by idempotency key.
132    pub fn send(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
133        if let Some(receipt) = self
134            .store
135            .message_receipt(&self.pool_id, &message.idempotency_key)?
136        {
137            return Ok(receipt);
138        }
139
140        let delivered_to = self.resolve_address(&message);
141        let terminal_status = if message.expires_at_millis == Some(0) {
142            MessageStatus::Expired
143        } else if delivered_to.is_empty() {
144            MessageStatus::Failed
145        } else {
146            MessageStatus::Delivered
147        };
148
149        if terminal_status == MessageStatus::Expired {
150            let receipt =
151                self.record_message_status(&message, MessageStatus::Expired, Vec::new())?;
152            return Ok(receipt);
153        }
154
155        if terminal_status == MessageStatus::Failed {
156            let receipt =
157                self.record_message_status(&message, MessageStatus::Failed, Vec::new())?;
158            return Ok(receipt);
159        }
160
161        self.record_message_status(&message, MessageStatus::Accepted, delivered_to.clone())?;
162        let receipt =
163            self.record_message_status(&message, MessageStatus::Delivered, delivered_to)?;
164        Ok(receipt)
165    }
166
167    /// Records one run-message status transition.
168    /// This appends the status record to the source run journal, publishes the matching
169    /// agent-pool event on the runtime event bus, and returns a receipt carrying the journal
170    /// cursor. Use [`AgentPool::send`] for the full accept-to-terminal delivery flow.
171    pub fn record_message_status(
172        &self,
173        message: &RunMessage,
174        status: MessageStatus,
175        delivered_to: Vec<RunId>,
176    ) -> Result<MessageReceipt, AgentError> {
177        let source_member = self.member(&message.from)?;
178        let journal = self.runtime.journal_port(&message.from)?;
179        let record = self.run_message_record(message, status.clone(), delivered_to.clone())?;
180        let cursor = journal.append(record)?;
181        let frame = self.publish_agent_pool_event(
182            message.from.clone(),
183            source_member.agent_id,
184            status.event_kind(),
185            Some(message.message_id.clone()),
186            None,
187            EntityRef::message(message.message_id.clone()),
188            message.target_related_refs(&delivered_to),
189            Some(message.to.destination_ref.clone()),
190            message.policy_refs.clone(),
191            Some(cursor.clone()),
192            status.redacted_summary(),
193        )?;
194
195        let receipt = MessageReceipt {
196            message_id: message.message_id.clone(),
197            status,
198            delivered_to,
199            journal_cursor: Some(cursor),
200        };
201        self.store
202            .record_message(&self.pool_id, message.clone(), receipt.clone())?;
203        self.trigger_matching_wakes(&frame)?;
204        Ok(receipt)
205    }
206
207    /// Subscribe.
208    /// This creates a read-only subscription scoped by pool membership and the supplied filter.
209    pub fn subscribe(
210        &self,
211        filter: EventFilter,
212        cursor: Option<crate::event::EventCursor>,
213    ) -> Result<AgentEventStream, AgentError> {
214        let compiled = self.compile_scoped_filter(filter)?;
215        self.runtime.subscribe_events(compiled, cursor)
216    }
217
218    /// Computes or returns compile scoped filter for the
219    /// application::agent_pool contract without external I/O or side effects.
220    pub fn compile_scoped_filter(
221        &self,
222        filter: EventFilter,
223    ) -> Result<CompiledEventFilter, AgentError> {
224        self.scope_filter(filter).compile()
225    }
226
227    /// Returns scope filter derived from the supplied state.
228    /// This operates on the named coordinator state or selected port; it does not create a
229    /// parallel runtime path.
230    pub fn scope_filter(&self, mut filter: EventFilter) -> EventFilter {
231        let allowed_runs = self.observable_member_runs();
232        filter.run_ids = intersect_run_ids(&filter.run_ids, &allowed_runs);
233        let envelope_only = self
234            .snapshot()
235            .map(|snapshot| snapshot.wake_policy.envelope_only)
236            .unwrap_or(true);
237        if envelope_only {
238            filter.payload_access = PayloadAccessMode::EnvelopeOnly;
239        }
240        filter
241    }
242
243    /// Registers a wake condition for a pool member run.
244    /// This mutates the pool's wake registry and dedupe index, scopes the event filter to current
245    /// members, and may poll the configured event subscription port to trigger immediately.
246    pub fn suspend_until(
247        &self,
248        run_id: RunId,
249        condition: WakeCondition,
250    ) -> Result<WakeRegistration, AgentError> {
251        if run_id != condition.run_id {
252            return Err(AgentError::new(
253                AgentErrorKind::InvalidStateTransition,
254                RetryClassification::NotRetryable,
255                "wake registration run_id must match condition run_id",
256            ));
257        }
258
259        if let Some(registration) = self
260            .store
261            .wake_registration(&self.pool_id, &condition.idempotency_key)?
262        {
263            return Ok(registration);
264        }
265
266        self.member(&condition.run_id)?;
267        let compiled = self.compile_scoped_filter(condition.filter.clone())?;
268        let mut registration = self.record_wake_status(
269            &condition,
270            compiled.clone(),
271            WakeRegistrationStatus::Registered,
272            None,
273        )?;
274
275        if condition.timeout_millis == Some(0) {
276            registration = self.record_wake_status(
277                &condition,
278                compiled,
279                WakeRegistrationStatus::TimedOut,
280                None,
281            )?;
282        } else if let Some(frame) = self
283            .runtime
284            .subscribe_events(compiled.clone(), None)?
285            .next()
286        {
287            registration = self.record_wake_status(
288                &condition,
289                compiled,
290                WakeRegistrationStatus::Triggered,
291                Some(frame.event.envelope.event_id),
292            )?;
293        }
294
295        Ok(registration)
296    }
297
298    /// Polls a registered wake condition for a matching event.
299    /// This reads and may update pool wake state through `record_wake_status`; it creates a
300    /// read-only event subscription but does not cancel or advance the target run.
301    pub fn poll_wake(
302        &self,
303        condition_id: &WakeConditionId,
304    ) -> Result<WakeRegistration, AgentError> {
305        let stored = self
306            .store
307            .wake(&self.pool_id, condition_id)?
308            .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
309
310        if stored.registration.status != WakeRegistrationStatus::Registered {
311            return Ok(stored.registration);
312        }
313
314        let Some(frame) = self
315            .runtime
316            .subscribe_events(stored.compiled_filter.clone(), None)?
317            .next()
318        else {
319            return Ok(stored.registration);
320        };
321
322        self.record_wake_status(
323            &stored.condition,
324            stored.compiled_filter,
325            WakeRegistrationStatus::Triggered,
326            Some(frame.event.envelope.event_id),
327        )
328    }
329
330    /// Cancel wake.
331    /// This marks a registered wake condition as cancelled in pool state; it does not cancel
332    /// the run itself.
333    pub fn cancel_wake(
334        &self,
335        condition_id: &WakeConditionId,
336    ) -> Result<WakeRegistration, AgentError> {
337        let stored = self
338            .store
339            .wake(&self.pool_id, condition_id)?
340            .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
341        self.record_wake_status(
342            &stored.condition,
343            stored.compiled_filter,
344            WakeRegistrationStatus::Cancelled,
345            None,
346        )
347    }
348
349    fn record_wake_status(
350        &self,
351        condition: &WakeCondition,
352        compiled_filter: CompiledEventFilter,
353        status: WakeRegistrationStatus,
354        matched_event_id: Option<EventId>,
355    ) -> Result<WakeRegistration, AgentError> {
356        let member = self.member(&condition.run_id)?;
357        let journal = self.runtime.journal_port(&condition.run_id)?;
358        let wake_record = WakeRecord {
359            condition_id: condition.condition_id.clone(),
360            run_id: condition.run_id.clone(),
361            event_filter_fingerprint: compiled_filter.filter_fingerprint.clone(),
362            timeout_millis: condition.timeout_millis,
363            resume_policy: condition.resume_with.clone().into(),
364            trigger_status: status.clone().into(),
365            policy_refs: condition.policy_refs.clone(),
366            idempotency_key: condition.idempotency_key.clone(),
367            matched_event_id,
368        };
369        let record = self.journal_record(
370            condition.run_id.clone(),
371            member.agent_id.clone(),
372            JournalRecordKind::Wake,
373            "agent_pool",
374            status.event_kind().wire_name(),
375            EntityRef::wake_condition(condition.condition_id.clone()),
376            vec![EntityRef::run(condition.run_id.clone())],
377            condition.policy_refs.clone(),
378            Vec::new(),
379            Some(condition.idempotency_key.clone()),
380            JournalRecordPayload::Wake(wake_record),
381        )?;
382        let cursor = journal.append(record)?;
383        self.publish_agent_pool_event(
384            condition.run_id.clone(),
385            member.agent_id,
386            status.event_kind(),
387            None,
388            Some(condition.condition_id.clone()),
389            EntityRef::wake_condition(condition.condition_id.clone()),
390            vec![EntityRef::run(condition.run_id.clone())],
391            Some(DestinationRef::with_kind(
392                DestinationKind::Agent,
393                condition.run_id.as_str(),
394            )),
395            condition.policy_refs.clone(),
396            Some(cursor.clone()),
397            status.redacted_summary(),
398        )?;
399
400        let registration = WakeRegistration {
401            condition_id: condition.condition_id.clone(),
402            run_id: condition.run_id.clone(),
403            status,
404            journal_cursor: Some(cursor),
405        };
406
407        self.store.record_wake(
408            &self.pool_id,
409            condition.clone(),
410            compiled_filter,
411            registration.clone(),
412        )?;
413
414        Ok(registration)
415    }
416
417    fn append_pool_record(
418        &self,
419        run_id: &RunId,
420        agent_id: &AgentId,
421        status: AgentPoolLifecycleStatus,
422        event_kind: EventKind,
423    ) -> Result<(), AgentError> {
424        let journal = self.runtime.journal_port(run_id)?;
425        let snapshot = self.snapshot()?;
426        let member_run_ids = snapshot
427            .members
428            .iter()
429            .map(|member| member.run_id.clone())
430            .collect::<Vec<_>>();
431        let topics = snapshot.topics;
432        let policy_refs = snapshot.policy_refs;
433
434        let record = AgentPoolRecord {
435            pool_id: self.pool_id.clone(),
436            member_run_ids,
437            topics,
438            policy_refs: policy_refs.clone(),
439            lifecycle_status: status,
440        };
441        let journal_record = self.journal_record(
442            run_id.clone(),
443            agent_id.clone(),
444            JournalRecordKind::AgentPool,
445            "agent_pool",
446            event_kind.wire_name(),
447            EntityRef::run(run_id.clone()),
448            Vec::new(),
449            policy_refs.clone(),
450            Vec::new(),
451            None,
452            JournalRecordPayload::AgentPool(record),
453        )?;
454        let cursor = journal.append(journal_record)?;
455        self.publish_agent_pool_event(
456            run_id.clone(),
457            agent_id.clone(),
458            event_kind,
459            None,
460            None,
461            EntityRef::run(run_id.clone()),
462            Vec::new(),
463            Some(DestinationRef::with_kind(
464                DestinationKind::Agent,
465                run_id.as_str(),
466            )),
467            policy_refs,
468            Some(cursor),
469            "agent pool membership updated",
470        )?;
471        Ok(())
472    }
473
474    fn run_message_record(
475        &self,
476        message: &RunMessage,
477        status: MessageStatus,
478        delivered_to: Vec<RunId>,
479    ) -> Result<JournalRecord, AgentError> {
480        let member = self.member(&message.from)?;
481        let mut effect_intent = None;
482        let mut effect_result = None;
483        let effect_id = EffectId::new(format!(
484            "effect.run_message.{}",
485            message.message_id.as_str()
486        ));
487
488        if status == MessageStatus::Accepted {
489            let mut intent = EffectIntent::new(
490                effect_id.clone(),
491                EffectKind::RunMessageDelivery,
492                EntityRef::message(message.message_id.clone()),
493                SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
494                "run message delivery intent",
495            );
496            intent.destination = Some(message.to.destination_ref.clone());
497            intent.policy_refs = message.policy_refs.clone();
498            intent.idempotency_key = Some(message.idempotency_key.clone());
499            intent.content_refs = vec![message.content_ref.clone()];
500            effect_intent = Some(intent);
501        }
502
503        if status.is_terminal_delivery() {
504            effect_result = Some(EffectResult {
505                effect_id,
506                terminal_status: status.effect_terminal_status(),
507                external_operation_id: None,
508                reconciliation_ref: None,
509                error_ref: None,
510                content_refs: vec![message.content_ref.clone()],
511                redacted_summary: status.redacted_summary().to_string(),
512            });
513        }
514
515        let record = RunMessageRecord {
516            message_id: message.message_id.clone(),
517            source_run_id: message.from.clone(),
518            address_target: message.to.target.clone().into(),
519            content_ref: message.content_ref.clone(),
520            correlation: message.correlation.clone(),
521            reply_to: message.reply_to.clone(),
522            delivery_status: status.clone().into(),
523            delivered_to: delivered_to.clone(),
524            policy_refs: message.policy_refs.clone(),
525            idempotency_key: message.idempotency_key.clone(),
526            effect_intent,
527            effect_result,
528        };
529
530        self.journal_record(
531            message.from.clone(),
532            member.agent_id,
533            JournalRecordKind::RunMessage,
534            "agent_pool",
535            status.event_kind().wire_name(),
536            EntityRef::message(message.message_id.clone()),
537            message.target_related_refs(&delivered_to),
538            message.policy_refs.clone(),
539            vec![message.content_ref.clone()],
540            Some(message.idempotency_key.clone()),
541            JournalRecordPayload::RunMessage(record),
542        )
543    }
544
545    #[expect(
546        clippy::too_many_arguments,
547        reason = "journal-backed pool records intentionally spell out lineage, refs, and payload until a dedicated record-builder API replaces this private helper"
548    )]
549    fn journal_record(
550        &self,
551        run_id: RunId,
552        agent_id: AgentId,
553        record_kind: JournalRecordKind,
554        event_family: impl Into<String>,
555        event_kind: impl Into<String>,
556        subject_ref: EntityRef,
557        related_refs: Vec<EntityRef>,
558        _policy_refs: Vec<PolicyRef>,
559        content_refs: Vec<ContentRef>,
560        idempotency_key: Option<IdempotencyKey>,
561        payload: JournalRecordPayload,
562    ) -> Result<JournalRecord, AgentError> {
563        let journal_seq = self.runtime.next_journal_seq();
564        let source = SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool");
565        let fingerprint = self
566            .runtime
567            .run_snapshot(&run_id)
568            .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
569            .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
570        let event_family = event_family.into();
571        let event_kind = event_kind.into();
572
573        Ok(JournalRecord {
574            journal_schema_version: JOURNAL_SCHEMA_VERSION,
575            journal_seq,
576            record_id: format!("journal.record.agent_pool.{journal_seq}"),
577            record_kind,
578            run_id: run_id.clone(),
579            agent_id: agent_id.clone(),
580            turn_id: None,
581            attempt_id: None,
582            subject_ref: subject_ref.clone(),
583            related_refs: related_refs.clone(),
584            causal_refs: Vec::new(),
585            source: source.clone(),
586            destination: Some(DestinationRef::with_kind(
587                DestinationKind::Journal,
588                "destination.journal.agent_pool",
589            )),
590            correlation_keys: Vec::new(),
591            tags: vec!["feature:agent_pool".to_string()],
592            delivery_semantics: "journal_backed".to_string(),
593            event_index: EventIndexProjection {
594                run_id,
595                agent_id,
596                turn_id: None,
597                event_family,
598                event_kind,
599                source,
600                destination: Some(DestinationRef::with_kind(
601                    DestinationKind::EventStream,
602                    "destination.event_stream.agent_pool",
603                )),
604                subject_ref,
605                related_refs,
606                correlation_keys: Vec::new(),
607                tags: vec!["feature:agent_pool".to_string()],
608                privacy_class: PrivacyClass::ContentRefsOnly,
609                delivery_semantics: "journal_backed".to_string(),
610            },
611            timestamp_millis: journal_seq,
612            runtime_package_fingerprint: fingerprint,
613            privacy: PrivacyClass::ContentRefsOnly,
614            content_refs,
615            redaction_policy_id: "redaction.agent_pool.default".to_string(),
616            idempotency_key,
617            dedupe_key: None,
618            checkpoint_ref: None,
619            payload,
620        })
621    }
622
623    #[expect(
624        clippy::too_many_arguments,
625        reason = "event publication mirrors the durable event envelope fields so lineage stays explicit at the call site"
626    )]
627    fn publish_agent_pool_event(
628        &self,
629        run_id: RunId,
630        agent_id: AgentId,
631        event_kind: EventKind,
632        message_id: Option<MessageId>,
633        wake_condition_id: Option<WakeConditionId>,
634        subject_ref: EntityRef,
635        mut related_refs: Vec<EntityRef>,
636        destination: Option<DestinationRef>,
637        policy_refs: Vec<PolicyRef>,
638        journal_cursor: Option<JournalCursor>,
639        summary: impl Into<String>,
640    ) -> Result<EventFrame, AgentError> {
641        if let Some(condition_id) = wake_condition_id {
642            related_refs.push(EntityRef::wake_condition(condition_id));
643        }
644        let event_counter = self.store.next_event_sequence(&self.pool_id)?;
645        let fingerprint = self
646            .runtime
647            .run_snapshot(&run_id)
648            .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
649            .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
650        let event = AgentEvent::with_redacted_summary(
651            EventEnvelope {
652                schema_version: EVENT_SCHEMA_VERSION,
653                event_id: EventId::new(format!(
654                    "event.agent_pool.{}.{}",
655                    self.pool_id.as_str(),
656                    event_counter
657                )),
658                event_seq: 0,
659                event_family: EventFamily::AgentPool,
660                event_kind,
661                payload_schema_version: 1,
662                timestamp: format!("1970-01-01T00:00:{event_counter:02}Z"),
663                recorded_at: format!("1970-01-01T00:00:{event_counter:02}Z"),
664                run_id,
665                agent_id,
666                turn_id: None,
667                attempt_id: None,
668                message_id,
669                context_item_id: None,
670                trace_id: TraceId::new(format!("trace.agent_pool.{}", self.pool_id.as_str())),
671                span_id: SpanId::new(format!(
672                    "span.agent_pool.{}.{}",
673                    self.pool_id.as_str(),
674                    event_counter
675                )),
676                parent_event_id: None,
677                caused_by: None,
678                subject_ref,
679                related_refs,
680                causal_refs: Vec::new(),
681                correlation: EventCorrelation::default(),
682                tags: vec![crate::event::EventTag::new("feature:agent_pool")],
683                source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
684                destination,
685                policy_refs,
686                journal_cursor,
687                state_before: None,
688                state_after: None,
689                delivery_semantics: EventDeliverySemantics::JournalBacked,
690                privacy: PrivacyClass::ContentRefsOnly,
691                content_capture: ContentCaptureMode::Off,
692                redaction_policy_id: "redaction.agent_pool.default".to_string(),
693                runtime_package_fingerprint: fingerprint,
694            },
695            summary,
696        );
697        let frame = EventFrame {
698            cursor: event.envelope.cursor(EventStreamScope::All),
699            event,
700            archive_cursor: None,
701            overflow: None,
702        };
703        self.runtime
704            .event_bus_port(&frame.event.envelope.run_id)?
705            .publish(frame.clone())?;
706        Ok(frame)
707    }
708
709    fn resolve_address(&self, message: &RunMessage) -> Vec<RunId> {
710        let Ok(snapshot) = self.snapshot() else {
711            return Vec::new();
712        };
713        let members = snapshot
714            .members
715            .iter()
716            .cloned()
717            .map(|member| (member.run_id.clone(), member))
718            .collect::<BTreeMap<_, _>>();
719        let topics = topics_from_members(&snapshot.members);
720
721        if !members.contains_key(&message.from) || !snapshot.message_policy.allows(message) {
722            return Vec::new();
723        }
724
725        let mut candidates = match &message.to.target {
726            RunAddressTarget::Run { run_id } => vec![run_id.clone()],
727            RunAddressTarget::Agent { agent_id } => members
728                .values()
729                .filter(|member| &member.agent_id == agent_id)
730                .map(|member| member.run_id.clone())
731                .collect::<Vec<_>>(),
732            RunAddressTarget::Topic { topic_id } => topics
733                .get(topic_id)
734                .map(|runs| runs.iter().cloned().collect::<Vec<_>>())
735                .unwrap_or_default(),
736            RunAddressTarget::Pool { pool_id } if pool_id == &self.pool_id => {
737                members.keys().cloned().collect::<Vec<_>>()
738            }
739            RunAddressTarget::Pool { .. } => Vec::new(),
740        };
741
742        candidates.retain(|run_id| {
743            members
744                .get(run_id)
745                .is_some_and(|member| member.allows_message_policies(&message.policy_refs))
746        });
747
748        if matches!(message.to.target, RunAddressTarget::Pool { .. })
749            && !snapshot.message_policy.include_sender_in_pool_broadcast
750        {
751            candidates.retain(|run_id| run_id != &message.from);
752        }
753
754        candidates.sort();
755        candidates.dedup();
756        candidates
757    }
758
759    fn observable_member_runs(&self) -> Vec<RunId> {
760        self.snapshot()
761            .map(|snapshot| {
762                snapshot
763                    .members
764                    .iter()
765                    .filter(|member| member.allows_message_policies(&snapshot.policy_refs))
766                    .map(|member| member.run_id.clone())
767                    .collect()
768            })
769            .unwrap_or_default()
770    }
771
772    fn member(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
773        self.snapshot()?
774            .members
775            .into_iter()
776            .find(|member| &member.run_id == run_id)
777            .ok_or_else(|| {
778                AgentError::new(
779                    AgentErrorKind::InvalidStateTransition,
780                    RetryClassification::NotRetryable,
781                    "run is not a member of this agent pool",
782                )
783            })
784    }
785
786    /// Rehydrates the current durable pool snapshot from the configured store.
787    /// This returns only pool-backed membership, message, wake, policy, and cursor state; it
788    /// does not subscribe to the global event bus or synthesize missing records.
789    pub fn snapshot(&self) -> Result<AgentPoolSnapshot, AgentError> {
790        self.store.snapshot(&self.pool_id)
791    }
792
793    fn trigger_matching_wakes(&self, frame: &EventFrame) -> Result<(), AgentError> {
794        if matches!(
795            frame.event.envelope.event_kind,
796            EventKind::WakeConditionRegistered
797                | EventKind::WakeConditionTriggered
798                | EventKind::WakeConditionTimedOut
799                | EventKind::WakeConditionCancelled
800                | EventKind::WakeConditionFailed
801        ) {
802            return Ok(());
803        }
804
805        let wakes = self.snapshot()?.wakes;
806        for wake in wakes
807            .into_iter()
808            .filter(|wake| wake.registration.status == WakeRegistrationStatus::Registered)
809        {
810            if wake.compiled_filter.matches_envelope(&frame.event.envelope) {
811                self.record_wake_status(
812                    &wake.condition,
813                    wake.compiled_filter,
814                    WakeRegistrationStatus::Triggered,
815                    Some(frame.event.envelope.event_id.clone()),
816                )?;
817            }
818        }
819        Ok(())
820    }
821
822    /// Watches durable pool-store changes after the supplied cursor.
823    /// This is a pool-scoped coordination-record stream, not a global event bus.
824    pub fn watch_pool(
825        &self,
826        cursor: Option<AgentPoolStoreCursor>,
827    ) -> Result<AgentPoolStoreStream, AgentError> {
828        self.store.watch(&self.pool_id, cursor)
829    }
830}
831
832#[derive(Clone)]
833/// Holds agent pool builder application-layer state or configuration.
834/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
835pub struct AgentPoolBuilder {
836    pool_id: AgentPoolId,
837    runtime: Option<AgentRuntime>,
838    message_policy: AgentPoolMessagePolicy,
839    wake_policy: AgentPoolWakePolicy,
840    policy_refs: Vec<PolicyRef>,
841    store: Option<Arc<dyn AgentPoolStore>>,
842}
843
844impl AgentPoolBuilder {
845    /// Returns an updated value with runtime configured.
846    /// This stores the runtime used by the pool builder; no run is started until `start_run` is
847    /// called.
848    pub fn runtime(mut self, runtime: AgentRuntime) -> Self {
849        self.runtime = Some(runtime);
850        self
851    }
852
853    /// Returns an updated value with message policy configured.
854    /// This is builder configuration only and performs no I/O or run coordination.
855    pub fn message_policy(mut self, policy: AgentPoolMessagePolicy) -> Self {
856        self.message_policy = policy;
857        self
858    }
859
860    /// Returns an updated value with wake policy configured.
861    /// This is builder configuration only and performs no I/O or run coordination.
862    pub fn wake_policy(mut self, policy: AgentPoolWakePolicy) -> Self {
863        self.wake_policy = policy;
864        self
865    }
866
867    /// Returns an updated value with policy ref configured.
868    /// This sets the policy reference on the coordination value and performs no I/O.
869    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
870        self.policy_refs.push(policy_ref);
871        self
872    }
873
874    /// Returns an updated value with the pool store configured.
875    /// The store is the shared coordination authority for membership,
876    /// messages, wake registrations, dedupe, rehydration, and pool watch.
877    pub fn store<S>(mut self, store: S) -> Self
878    where
879        S: AgentPoolStore + 'static,
880    {
881        self.store = Some(Arc::new(store));
882        self
883    }
884
885    /// Returns an updated value with a dynamically dispatched pool store.
886    /// Use this when sharing one store instance across multiple pool handles
887    /// or when a host provides its own adapter.
888    pub fn shared_store(mut self, store: Arc<dyn AgentPoolStore>) -> Self {
889        self.store = Some(store);
890        self
891    }
892
893    /// Finishes builder validation and returns the configured value.
894    /// This is data-only unless the surrounding builder explicitly
895    /// documents adapter or store access.
896    pub fn build(self) -> Result<AgentPool, AgentError> {
897        let runtime = self
898            .runtime
899            .ok_or_else(|| AgentError::host_configuration_needed("agent pool requires runtime"))?;
900        let store = self
901            .store
902            .unwrap_or_else(|| Arc::new(InMemoryAgentPoolStore::default()));
903        store.open_pool(
904            self.pool_id.clone(),
905            AgentPoolStoreConfig {
906                message_policy: self.message_policy,
907                wake_policy: self.wake_policy,
908                policy_refs: self.policy_refs,
909            },
910        )?;
911        Ok(AgentPool {
912            pool_id: self.pool_id,
913            runtime,
914            store,
915        })
916    }
917}
918
919#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
920/// Store configuration for one logical agent pool.
921/// This is durable pool metadata; constructing it does not open a
922/// concrete store or start coordination work.
923pub struct AgentPoolStoreConfig {
924    /// Message policy used when resolving pool messages.
925    pub message_policy: AgentPoolMessagePolicy,
926    /// Wake policy used when scoping wake filters.
927    pub wake_policy: AgentPoolWakePolicy,
928    #[serde(default, skip_serializing_if = "Vec::is_empty")]
929    /// Policy refs that scope pool-level observation and membership.
930    pub policy_refs: Vec<PolicyRef>,
931}
932
933#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
934/// Durable cursor for pool-scoped store records.
935/// Cursors are scoped by the pool passed to `watch` and should not be
936/// used as global event or journal cursors.
937pub struct AgentPoolStoreCursor {
938    /// Monotonic sequence within a logical pool store partition.
939    pub sequence: u64,
940}
941
942impl AgentPoolStoreCursor {
943    /// Builds the initial cursor before any pool-store record.
944    pub fn start() -> Self {
945        Self { sequence: 0 }
946    }
947
948    /// Builds a cursor for a known sequence.
949    pub fn new(sequence: u64) -> Self {
950        Self { sequence }
951    }
952}
953
954#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
955/// Rehydratable snapshot for one logical agent pool.
956/// The snapshot is derived only from durable store records; callers must
957/// not synthesize members, messages, or wakes outside the store.
958pub struct AgentPoolSnapshot {
959    /// Stable pool id used for typed lineage, lookup, or dedupe.
960    pub pool_id: AgentPoolId,
961    /// Whether the pool-created lifecycle record has been persisted.
962    pub created: bool,
963    /// Current members visible in the pool.
964    pub members: Vec<AgentPoolMember>,
965    /// Current topic ids known to the pool.
966    pub topics: Vec<TopicId>,
967    /// Message policy used when resolving pool messages.
968    pub message_policy: AgentPoolMessagePolicy,
969    /// Wake policy used when scoping wake filters.
970    pub wake_policy: AgentPoolWakePolicy,
971    #[serde(default, skip_serializing_if = "Vec::is_empty")]
972    /// Policy refs that scope pool-level observation and membership.
973    pub policy_refs: Vec<PolicyRef>,
974    #[serde(default, skip_serializing_if = "Vec::is_empty")]
975    /// Durable run-message status records known to the pool.
976    pub messages: Vec<AgentPoolStoredMessage>,
977    #[serde(default, skip_serializing_if = "Vec::is_empty")]
978    /// Durable wake registrations known to the pool.
979    pub wakes: Vec<AgentPoolStoredWake>,
980    #[serde(skip_serializing_if = "Option::is_none")]
981    /// Latest store cursor represented by this snapshot.
982    pub cursor: Option<AgentPoolStoreCursor>,
983}
984
985#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
986/// Durable message status stored for pool rehydration and dedupe.
987pub struct AgentPoolStoredMessage {
988    /// Original run message request.
989    pub message: RunMessage,
990    /// Receipt for the stored status transition.
991    pub receipt: MessageReceipt,
992}
993
994#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
995/// Durable wake state stored for pool rehydration and cross-handle wake
996/// triggering.
997pub struct AgentPoolStoredWake {
998    /// Original wake condition.
999    pub condition: WakeCondition,
1000    /// Scoped, compiled filter used for envelope matching.
1001    pub compiled_filter: CompiledEventFilter,
1002    /// Latest durable registration status.
1003    pub registration: WakeRegistration,
1004}
1005
1006#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1007/// One append-only pool-store record. These records are a durable
1008/// coordination view linked to journal-backed events; they are not a
1009/// replacement for `AgentEventBus`.
1010pub struct AgentPoolStoreRecord {
1011    /// Stable pool id used for typed lineage, lookup, or dedupe.
1012    pub pool_id: AgentPoolId,
1013    /// Cursor assigned by the store.
1014    pub cursor: AgentPoolStoreCursor,
1015    /// Stored pool change.
1016    pub payload: AgentPoolStoreRecordPayload,
1017}
1018
1019#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1020#[serde(tag = "type", rename_all = "snake_case")]
1021/// Finite pool-store record variants.
1022#[expect(
1023    clippy::large_enum_variant,
1024    reason = "pool store payloads are durable serde records; preserve direct variant ergonomics until a separate storage-envelope redesign"
1025)]
1026pub enum AgentPoolStoreRecordPayload {
1027    /// Pool metadata was opened or initialized.
1028    PoolOpened {
1029        /// Configuration persisted for this pool.
1030        config: AgentPoolStoreConfig,
1031    },
1032    /// Pool lifecycle was marked created by a journal-backed operation.
1033    PoolCreated,
1034    /// A run joined the pool.
1035    MemberJoined {
1036        /// Joined member.
1037        member: AgentPoolMember,
1038    },
1039    /// A run left the pool.
1040    MemberLeft {
1041        /// Left member.
1042        member: AgentPoolMember,
1043    },
1044    /// A run-message status was persisted.
1045    RunMessage {
1046        /// Stored message transition.
1047        stored: AgentPoolStoredMessage,
1048    },
1049    /// A wake status was persisted.
1050    Wake {
1051        /// Stored wake transition.
1052        stored: AgentPoolStoredWake,
1053    },
1054}
1055
1056#[derive(Clone, Debug)]
1057/// Iterator over durable pool-store records for one logical pool.
1058pub struct AgentPoolStoreStream {
1059    records: VecDeque<AgentPoolStoreRecord>,
1060}
1061
1062impl AgentPoolStoreStream {
1063    /// Builds a store stream from records already loaded by a store.
1064    pub fn new(records: impl IntoIterator<Item = AgentPoolStoreRecord>) -> Self {
1065        Self {
1066            records: records.into_iter().collect(),
1067        }
1068    }
1069}
1070
1071impl Iterator for AgentPoolStoreStream {
1072    type Item = AgentPoolStoreRecord;
1073
1074    fn next(&mut self) -> Option<Self::Item> {
1075        self.records.pop_front()
1076    }
1077}
1078
1079/// Port for durable/shared agent-pool coordination.
1080/// Implementations may use memory, SQLite, RPC, MCP, or another backing
1081/// service, but they must preserve the same pool-scoped records,
1082/// snapshots, idempotency, and watch semantics.
1083pub trait AgentPoolStore: Send + Sync {
1084    /// Create or open a logical pool and return the durable snapshot.
1085    fn open_pool(
1086        &self,
1087        pool_id: AgentPoolId,
1088        config: AgentPoolStoreConfig,
1089    ) -> Result<AgentPoolSnapshot, AgentError>;
1090
1091    /// Rehydrate the current durable pool snapshot.
1092    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError>;
1093
1094    /// Mark the pool-created lifecycle as durable.
1095    fn record_pool_created(
1096        &self,
1097        pool_id: &AgentPoolId,
1098    ) -> Result<AgentPoolStoreCursor, AgentError>;
1099
1100    /// Persist member join.
1101    fn join_member(
1102        &self,
1103        pool_id: &AgentPoolId,
1104        member: AgentPoolMember,
1105    ) -> Result<AgentPoolStoreCursor, AgentError>;
1106
1107    /// Persist member leave and return the removed member.
1108    fn leave_member(
1109        &self,
1110        pool_id: &AgentPoolId,
1111        run_id: &RunId,
1112    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError>;
1113
1114    /// Look up message dedupe state by idempotency key.
1115    fn message_receipt(
1116        &self,
1117        pool_id: &AgentPoolId,
1118        idempotency_key: &IdempotencyKey,
1119    ) -> Result<Option<MessageReceipt>, AgentError>;
1120
1121    /// Persist one message status transition.
1122    fn record_message(
1123        &self,
1124        pool_id: &AgentPoolId,
1125        message: RunMessage,
1126        receipt: MessageReceipt,
1127    ) -> Result<AgentPoolStoreCursor, AgentError>;
1128
1129    /// Look up wake dedupe state by idempotency key.
1130    fn wake_registration(
1131        &self,
1132        pool_id: &AgentPoolId,
1133        idempotency_key: &IdempotencyKey,
1134    ) -> Result<Option<WakeRegistration>, AgentError>;
1135
1136    /// Look up one stored wake by condition id.
1137    fn wake(
1138        &self,
1139        pool_id: &AgentPoolId,
1140        condition_id: &WakeConditionId,
1141    ) -> Result<Option<AgentPoolStoredWake>, AgentError>;
1142
1143    /// Persist one wake status transition.
1144    fn record_wake(
1145        &self,
1146        pool_id: &AgentPoolId,
1147        condition: WakeCondition,
1148        compiled_filter: CompiledEventFilter,
1149        registration: WakeRegistration,
1150    ) -> Result<AgentPoolStoreCursor, AgentError>;
1151
1152    /// Read durable pool changes after the supplied cursor.
1153    fn watch(
1154        &self,
1155        pool_id: &AgentPoolId,
1156        cursor: Option<AgentPoolStoreCursor>,
1157    ) -> Result<AgentPoolStoreStream, AgentError>;
1158
1159    /// Allocate a unique event sequence for pool event IDs.
1160    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError>;
1161}
1162
1163#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1164/// Holds agent pool member application-layer state or configuration.
1165/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1166pub struct AgentPoolMember {
1167    /// Run identifier used for lineage, filtering, replay, and dedupe.
1168    pub run_id: RunId,
1169    /// Agent identifier used for lineage, filtering, and ownership checks.
1170    pub agent_id: AgentId,
1171    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1172    /// Policy references that govern admission, projection, execution, or
1173    /// delivery.
1174    pub policy_refs: Vec<PolicyRef>,
1175    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1176    /// Collection of topics values.
1177    /// Ordering and membership should be treated as part of the serialized contract when
1178    /// relevant.
1179    pub topics: Vec<TopicId>,
1180}
1181
1182impl AgentPoolMember {
1183    /// Creates a new application::agent_pool value with explicit
1184    /// caller-provided inputs. This constructor is data-only and
1185    /// performs no I/O or external side effects.
1186    pub fn new(run_id: RunId, agent_id: AgentId) -> Self {
1187        Self {
1188            run_id,
1189            agent_id,
1190            policy_refs: Vec::new(),
1191            topics: Vec::new(),
1192        }
1193    }
1194
1195    /// Returns an updated value with policy ref configured.
1196    /// This sets the policy reference on the coordination value and performs no I/O.
1197    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1198        self.policy_refs.push(policy_ref);
1199        self
1200    }
1201
1202    /// Returns an updated value with topic configured.
1203    /// This sets the topic id on the address/filter value and performs no subscription by
1204    /// itself.
1205    pub fn topic(mut self, topic_id: TopicId) -> Self {
1206        self.topics.push(topic_id);
1207        self
1208    }
1209
1210    fn allows_message_policies(&self, required: &[PolicyRef]) -> bool {
1211        required
1212            .iter()
1213            .all(|required| self.policy_refs.contains(required))
1214    }
1215}
1216
1217#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1218/// Holds agent pool message policy application-layer state or configuration.
1219/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1220pub struct AgentPoolMessagePolicy {
1221    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1222    /// Typed required policy refs references. Resolving them is separate from
1223    /// constructing this record.
1224    pub required_policy_refs: Vec<PolicyRef>,
1225    /// Whether pool broadcast delivery includes the sender run as a recipient.
1226    /// Use this for explicit loopback semantics; the default coordination path should avoid
1227    /// accidental self-delivery.
1228    pub include_sender_in_pool_broadcast: bool,
1229}
1230
1231impl AgentPoolMessagePolicy {
1232    /// Builds the bounded defaults value with the documented defaults.
1233    /// This uses only local coordinator state and performs no hidden host work.
1234    pub fn bounded_defaults() -> Self {
1235        Self {
1236            required_policy_refs: Vec::new(),
1237            include_sender_in_pool_broadcast: false,
1238        }
1239    }
1240
1241    fn allows(&self, message: &RunMessage) -> bool {
1242        self.required_policy_refs
1243            .iter()
1244            .all(|required| message.policy_refs.contains(required))
1245    }
1246}
1247
1248#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1249/// Holds agent pool wake policy application-layer state or configuration.
1250/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1251pub struct AgentPoolWakePolicy {
1252    /// Whether envelope only is enabled.
1253    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
1254    pub envelope_only: bool,
1255}
1256
1257impl AgentPoolWakePolicy {
1258    /// Returns an updated value with safe defaults configured.
1259    /// This is data-only and does not perform I/O, call host ports, append journals, publish
1260    /// events, or start processes.
1261    pub fn safe_defaults() -> Self {
1262        Self {
1263            envelope_only: true,
1264        }
1265    }
1266}
1267
1268#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1269/// Holds run address application-layer state or configuration.
1270/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1271pub struct RunAddress {
1272    /// Target used by this record or request.
1273    pub target: RunAddressTarget,
1274    /// Typed destination reference that records where this item is being sent
1275    /// or projected.
1276    pub destination_ref: DestinationRef,
1277    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1278    /// Typed related refs references. Resolving them is separate from
1279    /// constructing this record.
1280    pub related_refs: Vec<EntityRef>,
1281}
1282
1283impl RunAddress {
1284    /// Builds the run value with the documented defaults.
1285    /// This uses only local coordinator state and performs no hidden host work.
1286    pub fn run(run_id: RunId) -> Self {
1287        Self {
1288            destination_ref: DestinationRef::with_kind(DestinationKind::Agent, run_id.as_str()),
1289            related_refs: vec![EntityRef::run(run_id.clone())],
1290            target: RunAddressTarget::Run { run_id },
1291        }
1292    }
1293
1294    /// Returns agent for the current value.
1295    /// This is a read-only or data-construction helper unless the method body explicitly calls
1296    /// a port or store.
1297    pub fn agent(agent_id: AgentId) -> Self {
1298        Self {
1299            destination_ref: DestinationRef::with_kind(DestinationKind::Agent, agent_id.as_str()),
1300            related_refs: vec![EntityRef::agent(agent_id.clone())],
1301            target: RunAddressTarget::Agent { agent_id },
1302        }
1303    }
1304
1305    /// Returns an updated value with topic configured.
1306    /// This sets the topic id on the address/filter value and performs no subscription by
1307    /// itself.
1308    pub fn topic(topic_id: TopicId) -> Self {
1309        Self {
1310            destination_ref: DestinationRef::with_kind(DestinationKind::Topic, topic_id.as_str()),
1311            related_refs: vec![EntityRef::topic(topic_id.clone())],
1312            target: RunAddressTarget::Topic { topic_id },
1313        }
1314    }
1315
1316    /// Builds the pool value with the documented defaults.
1317    /// This uses only local coordinator state and performs no hidden host work.
1318    pub fn pool(pool_id: AgentPoolId) -> Self {
1319        Self {
1320            destination_ref: DestinationRef::with_kind(
1321                DestinationKind::AgentPool,
1322                pool_id.as_str(),
1323            ),
1324            related_refs: vec![EntityRef::agent_pool(pool_id.clone())],
1325            target: RunAddressTarget::Pool { pool_id },
1326        }
1327    }
1328}
1329
1330#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1331#[serde(tag = "type", rename_all = "snake_case")]
1332/// Enumerates the finite run address target cases.
1333/// Serialized names are part of the SDK contract; update fixtures when variants change.
1334pub enum RunAddressTarget {
1335    /// Use this variant when the contract needs to represent run; selecting it has no side effect by itself.
1336    Run {
1337        /// Run identifier used for lineage, filtering, replay, and dedupe.
1338        run_id: RunId,
1339    },
1340    /// Use this variant when the contract needs to represent agent; selecting it has no side effect by itself.
1341    Agent {
1342        /// Agent identifier used for lineage, filtering, and ownership
1343        /// checks.
1344        agent_id: AgentId,
1345    },
1346    /// Use this variant when the contract needs to represent topic; selecting it has no side effect by itself.
1347    Topic {
1348        /// Stable topic id used for typed lineage, lookup, or dedupe.
1349        topic_id: TopicId,
1350    },
1351    /// Use this variant when the contract needs to represent pool; selecting it has no side effect by itself.
1352    Pool {
1353        /// Stable pool id used for typed lineage, lookup, or dedupe.
1354        pool_id: AgentPoolId,
1355    },
1356}
1357
1358impl RunAddressTarget {
1359    /// Returns run id for this application::agent_pool value without
1360    /// performing external I/O.
1361    pub fn run_id(&self) -> Option<&RunId> {
1362        match self {
1363            Self::Run { run_id } => Some(run_id),
1364            _ => None,
1365        }
1366    }
1367}
1368
1369#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1370/// Holds run message application-layer state or configuration.
1371/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1372pub struct RunMessage {
1373    /// Message identifier for transcript, projection, or provider-response
1374    /// lineage.
1375    pub message_id: MessageId,
1376    /// From used by this record or request.
1377    pub from: RunId,
1378    /// To used by this record or request.
1379    pub to: RunAddress,
1380    /// Content reference where payload bytes or structured tool output are
1381    /// stored.
1382    pub content_ref: ContentRef,
1383    /// Correlation used by this record or request.
1384    pub correlation: EventCorrelation,
1385    #[serde(skip_serializing_if = "Option::is_none")]
1386    /// Optional reply to value.
1387    /// When absent, callers should use the documented default or skip that optional behavior.
1388    pub reply_to: Option<MessageId>,
1389    #[serde(skip_serializing_if = "Option::is_none")]
1390    /// Optional response contract value.
1391    /// When absent, callers should use the documented default or skip that optional behavior.
1392    pub response_contract: Option<MessageResponseContract>,
1393    #[serde(skip_serializing_if = "Option::is_none")]
1394    /// Time value in milliseconds for expires at millis.
1395    /// Use it for timeout, ordering, or diagnostic calculations.
1396    pub expires_at_millis: Option<u64>,
1397    /// Idempotency setting or key for deduping retries.
1398    /// Use it to prevent duplicate side effects during replay or repair.
1399    pub idempotency_key: IdempotencyKey,
1400    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1401    /// Policy references that govern admission, projection, execution, or
1402    /// delivery.
1403    pub policy_refs: Vec<PolicyRef>,
1404}
1405
1406impl RunMessage {
1407    /// Creates a new application::agent_pool value with explicit
1408    /// caller-provided inputs. This constructor is data-only and
1409    /// performs no I/O or external side effects.
1410    pub fn new(
1411        message_id: MessageId,
1412        from: RunId,
1413        to: RunAddress,
1414        content_ref: ContentRef,
1415        idempotency_key: IdempotencyKey,
1416    ) -> Self {
1417        Self {
1418            message_id,
1419            from,
1420            to,
1421            content_ref,
1422            correlation: EventCorrelation::default(),
1423            reply_to: None,
1424            response_contract: None,
1425            expires_at_millis: None,
1426            idempotency_key,
1427            policy_refs: Vec::new(),
1428        }
1429    }
1430
1431    /// Returns an updated value with policy ref configured.
1432    /// This sets the policy reference on the coordination value and performs no I/O.
1433    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1434        self.policy_refs.push(policy_ref);
1435        self
1436    }
1437
1438    fn target_related_refs(&self, delivered_to: &[RunId]) -> Vec<EntityRef> {
1439        let mut refs = self.to.related_refs.clone();
1440        refs.extend(delivered_to.iter().cloned().map(EntityRef::run));
1441        refs.sort_by(|left, right| {
1442            left.kind
1443                .cmp(&right.kind)
1444                .then_with(|| left.as_str().cmp(right.as_str()))
1445        });
1446        refs.dedup_by(|left, right| left.kind == right.kind && left.as_str() == right.as_str());
1447        refs
1448    }
1449}
1450
1451#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1452/// Holds message response contract application-layer state or configuration.
1453/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1454pub struct MessageResponseContract {
1455    /// Expected responses used by this record or request.
1456    pub expected_responses: u32,
1457    #[serde(skip_serializing_if = "Option::is_none")]
1458    /// Time value in milliseconds for timeout millis.
1459    /// Use it for timeout, ordering, or diagnostic calculations.
1460    pub timeout_millis: Option<u64>,
1461}
1462
1463impl MessageResponseContract {
1464    /// Builds the one response value with the documented defaults.
1465    /// This uses only local coordinator state and performs no hidden host work.
1466    pub fn one_response(timeout_millis: u64) -> Self {
1467        Self {
1468            expected_responses: 1,
1469            timeout_millis: Some(timeout_millis),
1470        }
1471    }
1472}
1473
1474#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1475/// Holds message receipt application-layer state or configuration.
1476/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1477pub struct MessageReceipt {
1478    /// Message identifier for transcript, projection, or provider-response
1479    /// lineage.
1480    pub message_id: MessageId,
1481    /// Finite status for this record or lifecycle stage.
1482    pub status: MessageStatus,
1483    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1484    /// Collection of delivered to values.
1485    /// Ordering and membership should be treated as part of the serialized contract when
1486    /// relevant.
1487    pub delivered_to: Vec<RunId>,
1488    #[serde(skip_serializing_if = "Option::is_none")]
1489    /// Cursor identifying a replay, export, or subscription position.
1490    /// Use it to resume without widening the original scope.
1491    pub journal_cursor: Option<JournalCursor>,
1492}
1493
1494#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1495#[serde(rename_all = "snake_case")]
1496/// Enumerates the finite message status cases.
1497/// Serialized names are part of the SDK contract; update fixtures when variants change.
1498pub enum MessageStatus {
1499    /// Use this variant when the contract needs to represent accepted; selecting it has no side effect by itself.
1500    Accepted,
1501    /// Use this variant when the contract needs to represent delivered; selecting it has no side effect by itself.
1502    Delivered,
1503    /// Use this variant when the contract needs to represent responded; selecting it has no side effect by itself.
1504    Responded,
1505    /// Use this variant when the contract needs to represent failed; selecting it has no side effect by itself.
1506    Failed,
1507    /// Use this variant when the contract needs to represent timed out; selecting it has no side effect by itself.
1508    TimedOut,
1509    /// Use this variant when the contract needs to represent expired; selecting it has no side effect by itself.
1510    Expired,
1511    /// Use this variant when the contract needs to represent cancelled; selecting it has no side effect by itself.
1512    Cancelled,
1513}
1514
1515impl MessageStatus {
1516    fn event_kind(&self) -> EventKind {
1517        match self {
1518            Self::Accepted => EventKind::RunMessageAccepted,
1519            Self::Delivered => EventKind::RunMessageDelivered,
1520            Self::Responded => EventKind::RunMessageResponded,
1521            Self::Failed => EventKind::RunMessageFailed,
1522            Self::TimedOut => EventKind::RunMessageTimedOut,
1523            Self::Expired => EventKind::RunMessageExpired,
1524            Self::Cancelled => EventKind::RunMessageCancelled,
1525        }
1526    }
1527
1528    fn redacted_summary(&self) -> &'static str {
1529        match self {
1530            Self::Accepted => "run message accepted",
1531            Self::Delivered => "run message delivered",
1532            Self::Responded => "run message responded",
1533            Self::Failed => "run message failed",
1534            Self::TimedOut => "run message timed out",
1535            Self::Expired => "run message expired",
1536            Self::Cancelled => "run message cancelled",
1537        }
1538    }
1539
1540    fn is_terminal_delivery(&self) -> bool {
1541        matches!(
1542            self,
1543            Self::Delivered
1544                | Self::Responded
1545                | Self::Failed
1546                | Self::TimedOut
1547                | Self::Expired
1548                | Self::Cancelled
1549        )
1550    }
1551
1552    fn effect_terminal_status(&self) -> EffectTerminalStatus {
1553        match self {
1554            Self::Delivered | Self::Responded => EffectTerminalStatus::Completed,
1555            Self::TimedOut => EffectTerminalStatus::TimedOut,
1556            Self::Cancelled => EffectTerminalStatus::Cancelled,
1557            Self::Accepted => EffectTerminalStatus::Unknown,
1558            Self::Failed | Self::Expired => EffectTerminalStatus::Failed,
1559        }
1560    }
1561}
1562
1563#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1564/// Holds wake condition application-layer state or configuration.
1565/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1566pub struct WakeCondition {
1567    /// Stable condition id used for typed lineage, lookup, or dedupe.
1568    pub condition_id: WakeConditionId,
1569    /// Run identifier used for lineage, filtering, replay, and dedupe.
1570    pub run_id: RunId,
1571    /// Filter used by this record or request.
1572    pub filter: EventFilter,
1573    #[serde(skip_serializing_if = "Option::is_none")]
1574    /// Time value in milliseconds for timeout millis.
1575    /// Use it for timeout, ordering, or diagnostic calculations.
1576    pub timeout_millis: Option<u64>,
1577    /// Resume with used by this record or request.
1578    pub resume_with: ResumeInputPolicy,
1579    /// Idempotency setting or key for deduping retries.
1580    /// Use it to prevent duplicate side effects during replay or repair.
1581    pub idempotency_key: IdempotencyKey,
1582    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1583    /// Policy references that govern admission, projection, execution, or
1584    /// delivery.
1585    pub policy_refs: Vec<PolicyRef>,
1586}
1587
1588impl WakeCondition {
1589    /// Creates a new application::agent_pool value with explicit
1590    /// caller-provided inputs. This constructor is data-only and
1591    /// performs no I/O or external side effects.
1592    pub fn new(
1593        condition_id: WakeConditionId,
1594        run_id: RunId,
1595        filter: EventFilter,
1596        idempotency_key: IdempotencyKey,
1597    ) -> Self {
1598        Self {
1599            condition_id,
1600            run_id,
1601            filter,
1602            timeout_millis: None,
1603            resume_with: ResumeInputPolicy::MatchingEventRefs,
1604            idempotency_key,
1605            policy_refs: Vec::new(),
1606        }
1607    }
1608
1609    /// Returns an updated value with timeout millis configured.
1610    /// This updates the wake timeout on the condition value and performs no scheduling by
1611    /// itself.
1612    pub fn timeout_millis(mut self, timeout_millis: u64) -> Self {
1613        self.timeout_millis = Some(timeout_millis);
1614        self
1615    }
1616
1617    /// Returns an updated value with policy ref configured.
1618    /// This sets the policy reference on the coordination value and performs no I/O.
1619    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1620        self.policy_refs.push(policy_ref);
1621        self
1622    }
1623
1624    /// Computes or returns compile envelope filter for the
1625    /// application::agent_pool contract without external I/O or side effects.
1626    pub fn compile_envelope_filter(&self) -> Result<CompiledEventFilter, AgentError> {
1627        let mut filter = self.filter.clone();
1628        filter.payload_access = PayloadAccessMode::EnvelopeOnly;
1629        filter.compile()
1630    }
1631}
1632
1633#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1634#[serde(rename_all = "snake_case")]
1635/// Enumerates the finite resume input policy cases.
1636/// Serialized names are part of the SDK contract; update fixtures when variants change.
1637pub enum ResumeInputPolicy {
1638    /// Use this variant when the contract needs to represent matching event refs; selecting it has no side effect by itself.
1639    MatchingEventRefs,
1640    /// Use this variant when the contract needs to represent redacted summary; selecting it has no side effect by itself.
1641    RedactedSummary,
1642    /// Use this variant when the contract needs to represent none; selecting it has no side effect by itself.
1643    None,
1644}
1645
1646#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1647/// Holds wake registration application-layer state or configuration.
1648/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1649pub struct WakeRegistration {
1650    /// Stable condition id used for typed lineage, lookup, or dedupe.
1651    pub condition_id: WakeConditionId,
1652    /// Run identifier used for lineage, filtering, replay, and dedupe.
1653    pub run_id: RunId,
1654    /// Finite status for this record or lifecycle stage.
1655    pub status: WakeRegistrationStatus,
1656    #[serde(skip_serializing_if = "Option::is_none")]
1657    /// Cursor identifying a replay, export, or subscription position.
1658    /// Use it to resume without widening the original scope.
1659    pub journal_cursor: Option<JournalCursor>,
1660}
1661
1662#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1663#[serde(rename_all = "snake_case")]
1664/// Enumerates the finite wake registration status cases.
1665/// Serialized names are part of the SDK contract; update fixtures when variants change.
1666pub enum WakeRegistrationStatus {
1667    /// Use this variant when the contract needs to represent registered; selecting it has no side effect by itself.
1668    Registered,
1669    /// Use this variant when the contract needs to represent triggered; selecting it has no side effect by itself.
1670    Triggered,
1671    /// Use this variant when the contract needs to represent timed out; selecting it has no side effect by itself.
1672    TimedOut,
1673    /// Use this variant when the contract needs to represent cancelled; selecting it has no side effect by itself.
1674    Cancelled,
1675    /// Use this variant when the contract needs to represent failed; selecting it has no side effect by itself.
1676    Failed,
1677}
1678
1679impl WakeRegistrationStatus {
1680    fn event_kind(&self) -> EventKind {
1681        match self {
1682            Self::Registered => EventKind::WakeConditionRegistered,
1683            Self::Triggered => EventKind::WakeConditionTriggered,
1684            Self::TimedOut => EventKind::WakeConditionTimedOut,
1685            Self::Cancelled => EventKind::WakeConditionCancelled,
1686            Self::Failed => EventKind::WakeConditionFailed,
1687        }
1688    }
1689
1690    fn redacted_summary(&self) -> &'static str {
1691        match self {
1692            Self::Registered => "wake condition registered",
1693            Self::Triggered => "wake condition triggered",
1694            Self::TimedOut => "wake condition timed out",
1695            Self::Cancelled => "wake condition cancelled",
1696            Self::Failed => "wake condition failed",
1697        }
1698    }
1699}
1700
1701#[derive(Clone, Debug)]
1702struct AgentPoolState {
1703    created: bool,
1704    members: BTreeMap<RunId, AgentPoolMember>,
1705    topics: BTreeMap<TopicId, BTreeSet<RunId>>,
1706    message_policy: AgentPoolMessagePolicy,
1707    wake_policy: AgentPoolWakePolicy,
1708    policy_refs: Vec<PolicyRef>,
1709    messages: BTreeMap<MessageId, AgentPoolStoredMessage>,
1710    message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
1711    wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
1712    wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
1713    next_event_counter: u64,
1714}
1715
1716impl AgentPoolState {
1717    fn new(config: AgentPoolStoreConfig) -> Self {
1718        Self {
1719            created: false,
1720            members: BTreeMap::new(),
1721            topics: BTreeMap::new(),
1722            message_policy: config.message_policy,
1723            wake_policy: config.wake_policy,
1724            policy_refs: config.policy_refs,
1725            messages: BTreeMap::new(),
1726            message_dedupe: BTreeMap::new(),
1727            wake_dedupe: BTreeMap::new(),
1728            wakes: BTreeMap::new(),
1729            next_event_counter: 0,
1730        }
1731    }
1732
1733    fn config(&self) -> AgentPoolStoreConfig {
1734        AgentPoolStoreConfig {
1735            message_policy: self.message_policy.clone(),
1736            wake_policy: self.wake_policy.clone(),
1737            policy_refs: self.policy_refs.clone(),
1738        }
1739    }
1740
1741    fn snapshot(
1742        &self,
1743        pool_id: AgentPoolId,
1744        cursor: Option<AgentPoolStoreCursor>,
1745    ) -> AgentPoolSnapshot {
1746        AgentPoolSnapshot {
1747            pool_id,
1748            created: self.created,
1749            members: self.members.values().cloned().collect(),
1750            topics: self.topics.keys().cloned().collect(),
1751            message_policy: self.message_policy.clone(),
1752            wake_policy: self.wake_policy.clone(),
1753            policy_refs: self.policy_refs.clone(),
1754            messages: self.messages.values().cloned().collect(),
1755            wakes: self.wakes.values().cloned().collect(),
1756            cursor,
1757        }
1758    }
1759
1760    fn index_member(&mut self, member: AgentPoolMember) {
1761        for topic in &member.topics {
1762            self.topics
1763                .entry(topic.clone())
1764                .or_default()
1765                .insert(member.run_id.clone());
1766        }
1767        self.members.insert(member.run_id.clone(), member);
1768    }
1769
1770    fn remove_member(&mut self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
1771        let member = self.members.remove(run_id).ok_or_else(|| {
1772            AgentError::new(
1773                AgentErrorKind::InvalidStateTransition,
1774                RetryClassification::NotRetryable,
1775                "run is not a member of this agent pool",
1776            )
1777        })?;
1778        for topic in &member.topics {
1779            let remove_topic = if let Some(runs) = self.topics.get_mut(topic) {
1780                runs.remove(run_id);
1781                runs.is_empty()
1782            } else {
1783                false
1784            };
1785            if remove_topic {
1786                self.topics.remove(topic);
1787            }
1788        }
1789        Ok(member)
1790    }
1791}
1792
1793#[derive(Clone, Debug, Default)]
1794/// In-memory `AgentPoolStore` implementation.
1795/// Cloning this value shares the same backing map, making it useful for
1796/// tests that simulate two process-local `AgentPool` handles sharing one
1797/// coordination authority. Separate default values are isolated.
1798pub struct InMemoryAgentPoolStore {
1799    pools: Arc<Mutex<BTreeMap<AgentPoolId, AgentPoolState>>>,
1800    records: Arc<Mutex<BTreeMap<AgentPoolId, Vec<AgentPoolStoreRecord>>>>,
1801}
1802
1803impl InMemoryAgentPoolStore {
1804    fn with_pool_state<T>(
1805        &self,
1806        pool_id: &AgentPoolId,
1807        f: impl FnOnce(&mut AgentPoolState) -> Result<T, AgentError>,
1808    ) -> Result<T, AgentError> {
1809        let mut pools = self
1810            .pools
1811            .lock()
1812            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1813        let state = pools.get_mut(pool_id).ok_or_else(|| {
1814            AgentError::new(
1815                AgentErrorKind::HostConfigurationNeeded,
1816                RetryClassification::HostConfigurationNeeded,
1817                "agent pool store has not opened this pool",
1818            )
1819        })?;
1820        f(state)
1821    }
1822
1823    fn append_record(
1824        &self,
1825        pool_id: &AgentPoolId,
1826        payload: AgentPoolStoreRecordPayload,
1827    ) -> Result<AgentPoolStoreCursor, AgentError> {
1828        let mut records = self
1829            .records
1830            .lock()
1831            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1832        let entries = records.entry(pool_id.clone()).or_default();
1833        let cursor = AgentPoolStoreCursor::new(entries.len() as u64 + 1);
1834        entries.push(AgentPoolStoreRecord {
1835            pool_id: pool_id.clone(),
1836            cursor: cursor.clone(),
1837            payload,
1838        });
1839        Ok(cursor)
1840    }
1841
1842    fn latest_cursor(
1843        &self,
1844        pool_id: &AgentPoolId,
1845    ) -> Result<Option<AgentPoolStoreCursor>, AgentError> {
1846        let records = self
1847            .records
1848            .lock()
1849            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1850        Ok(records
1851            .get(pool_id)
1852            .and_then(|records| records.last().map(|record| record.cursor.clone())))
1853    }
1854}
1855
1856impl AgentPoolStore for InMemoryAgentPoolStore {
1857    fn open_pool(
1858        &self,
1859        pool_id: AgentPoolId,
1860        config: AgentPoolStoreConfig,
1861    ) -> Result<AgentPoolSnapshot, AgentError> {
1862        {
1863            let mut pools = self
1864                .pools
1865                .lock()
1866                .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1867            if let Some(existing) = pools.get(&pool_id) {
1868                if existing.config() != config {
1869                    return Err(AgentError::new(
1870                        AgentErrorKind::InvalidStateTransition,
1871                        RetryClassification::RepairNeeded,
1872                        "agent pool store config conflicts with existing pool",
1873                    ));
1874                }
1875            } else {
1876                pools.insert(pool_id.clone(), AgentPoolState::new(config.clone()));
1877                drop(pools);
1878                self.append_record(&pool_id, AgentPoolStoreRecordPayload::PoolOpened { config })?;
1879            }
1880        }
1881        self.snapshot(&pool_id)
1882    }
1883
1884    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
1885        let cursor = self.latest_cursor(pool_id)?;
1886        let pools = self
1887            .pools
1888            .lock()
1889            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1890        pools
1891            .get(pool_id)
1892            .map(|state| state.snapshot(pool_id.clone(), cursor))
1893            .ok_or_else(|| {
1894                AgentError::new(
1895                    AgentErrorKind::HostConfigurationNeeded,
1896                    RetryClassification::HostConfigurationNeeded,
1897                    "agent pool store has not opened this pool",
1898                )
1899            })
1900    }
1901
1902    fn record_pool_created(
1903        &self,
1904        pool_id: &AgentPoolId,
1905    ) -> Result<AgentPoolStoreCursor, AgentError> {
1906        self.with_pool_state(pool_id, |state| {
1907            state.created = true;
1908            Ok(())
1909        })?;
1910        self.append_record(pool_id, AgentPoolStoreRecordPayload::PoolCreated)
1911    }
1912
1913    fn join_member(
1914        &self,
1915        pool_id: &AgentPoolId,
1916        member: AgentPoolMember,
1917    ) -> Result<AgentPoolStoreCursor, AgentError> {
1918        self.with_pool_state(pool_id, |state| {
1919            state.index_member(member.clone());
1920            Ok(())
1921        })?;
1922        self.append_record(
1923            pool_id,
1924            AgentPoolStoreRecordPayload::MemberJoined { member },
1925        )
1926    }
1927
1928    fn leave_member(
1929        &self,
1930        pool_id: &AgentPoolId,
1931        run_id: &RunId,
1932    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
1933        let member = self.with_pool_state(pool_id, |state| state.remove_member(run_id))?;
1934        let cursor = self.append_record(
1935            pool_id,
1936            AgentPoolStoreRecordPayload::MemberLeft {
1937                member: member.clone(),
1938            },
1939        )?;
1940        Ok((member, cursor))
1941    }
1942
1943    fn message_receipt(
1944        &self,
1945        pool_id: &AgentPoolId,
1946        idempotency_key: &IdempotencyKey,
1947    ) -> Result<Option<MessageReceipt>, AgentError> {
1948        self.with_pool_state(pool_id, |state| {
1949            Ok(state.message_dedupe.get(idempotency_key).cloned())
1950        })
1951    }
1952
1953    fn record_message(
1954        &self,
1955        pool_id: &AgentPoolId,
1956        message: RunMessage,
1957        receipt: MessageReceipt,
1958    ) -> Result<AgentPoolStoreCursor, AgentError> {
1959        let stored = AgentPoolStoredMessage { message, receipt };
1960        self.with_pool_state(pool_id, |state| {
1961            state.message_dedupe.insert(
1962                stored.message.idempotency_key.clone(),
1963                stored.receipt.clone(),
1964            );
1965            state
1966                .messages
1967                .insert(stored.message.message_id.clone(), stored.clone());
1968            Ok(())
1969        })?;
1970        self.append_record(pool_id, AgentPoolStoreRecordPayload::RunMessage { stored })
1971    }
1972
1973    fn wake_registration(
1974        &self,
1975        pool_id: &AgentPoolId,
1976        idempotency_key: &IdempotencyKey,
1977    ) -> Result<Option<WakeRegistration>, AgentError> {
1978        self.with_pool_state(pool_id, |state| {
1979            Ok(state.wake_dedupe.get(idempotency_key).cloned())
1980        })
1981    }
1982
1983    fn wake(
1984        &self,
1985        pool_id: &AgentPoolId,
1986        condition_id: &WakeConditionId,
1987    ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
1988        self.with_pool_state(pool_id, |state| Ok(state.wakes.get(condition_id).cloned()))
1989    }
1990
1991    fn record_wake(
1992        &self,
1993        pool_id: &AgentPoolId,
1994        condition: WakeCondition,
1995        compiled_filter: CompiledEventFilter,
1996        registration: WakeRegistration,
1997    ) -> Result<AgentPoolStoreCursor, AgentError> {
1998        let stored = AgentPoolStoredWake {
1999            condition,
2000            compiled_filter,
2001            registration,
2002        };
2003        self.with_pool_state(pool_id, |state| {
2004            state.wake_dedupe.insert(
2005                stored.condition.idempotency_key.clone(),
2006                stored.registration.clone(),
2007            );
2008            state
2009                .wakes
2010                .insert(stored.condition.condition_id.clone(), stored.clone());
2011            Ok(())
2012        })?;
2013        self.append_record(pool_id, AgentPoolStoreRecordPayload::Wake { stored })
2014    }
2015
2016    fn watch(
2017        &self,
2018        pool_id: &AgentPoolId,
2019        cursor: Option<AgentPoolStoreCursor>,
2020    ) -> Result<AgentPoolStoreStream, AgentError> {
2021        let start_after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
2022        let records = self
2023            .records
2024            .lock()
2025            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
2026        Ok(AgentPoolStoreStream::new(
2027            records
2028                .get(pool_id)
2029                .cloned()
2030                .unwrap_or_default()
2031                .into_iter()
2032                .filter(|record| record.cursor.sequence > start_after),
2033        ))
2034    }
2035
2036    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
2037        self.with_pool_state(pool_id, |state| {
2038            state.next_event_counter += 1;
2039            Ok(state.next_event_counter)
2040        })
2041    }
2042}
2043
2044impl From<RunAddressTarget> for RunMessageAddressTargetRecord {
2045    fn from(value: RunAddressTarget) -> Self {
2046        match value {
2047            RunAddressTarget::Run { run_id } => Self::Run { run_id },
2048            RunAddressTarget::Agent { agent_id } => Self::Agent { agent_id },
2049            RunAddressTarget::Topic { topic_id } => Self::Topic { topic_id },
2050            RunAddressTarget::Pool { pool_id } => Self::Pool { pool_id },
2051        }
2052    }
2053}
2054
2055impl From<MessageStatus> for RunMessageDeliveryStatus {
2056    fn from(value: MessageStatus) -> Self {
2057        match value {
2058            MessageStatus::Accepted => Self::Accepted,
2059            MessageStatus::Delivered => Self::Delivered,
2060            MessageStatus::Responded => Self::Responded,
2061            MessageStatus::Failed => Self::Failed,
2062            MessageStatus::TimedOut => Self::TimedOut,
2063            MessageStatus::Expired => Self::Expired,
2064            MessageStatus::Cancelled => Self::Cancelled,
2065        }
2066    }
2067}
2068
2069impl From<ResumeInputPolicy> for WakeResumeInputPolicyRecord {
2070    fn from(value: ResumeInputPolicy) -> Self {
2071        match value {
2072            ResumeInputPolicy::MatchingEventRefs => Self::MatchingEventRefs,
2073            ResumeInputPolicy::RedactedSummary => Self::RedactedSummary,
2074            ResumeInputPolicy::None => Self::None,
2075        }
2076    }
2077}
2078
2079impl From<WakeRegistrationStatus> for WakeTriggerStatus {
2080    fn from(value: WakeRegistrationStatus) -> Self {
2081        match value {
2082            WakeRegistrationStatus::Registered => Self::Registered,
2083            WakeRegistrationStatus::Triggered => Self::Triggered,
2084            WakeRegistrationStatus::TimedOut => Self::TimedOut,
2085            WakeRegistrationStatus::Cancelled => Self::Cancelled,
2086            WakeRegistrationStatus::Failed => Self::Failed,
2087        }
2088    }
2089}
2090
2091trait AgentPoolEventKindName {
2092    fn wire_name(&self) -> &'static str;
2093}
2094
2095impl AgentPoolEventKindName for EventKind {
2096    fn wire_name(&self) -> &'static str {
2097        match self {
2098            EventKind::AgentPoolCreated => "agent_pool_created",
2099            EventKind::AgentPoolRunJoined => "agent_pool_run_joined",
2100            EventKind::AgentPoolRunLeft => "agent_pool_run_left",
2101            EventKind::RunMessageAccepted => "run_message_accepted",
2102            EventKind::RunMessageDelivered => "run_message_delivered",
2103            EventKind::RunMessageResponded => "run_message_responded",
2104            EventKind::RunMessageFailed => "run_message_failed",
2105            EventKind::RunMessageTimedOut => "run_message_timed_out",
2106            EventKind::RunMessageExpired => "run_message_expired",
2107            EventKind::RunMessageCancelled => "run_message_cancelled",
2108            EventKind::WakeConditionRegistered => "wake_condition_registered",
2109            EventKind::WakeConditionTriggered => "wake_condition_triggered",
2110            EventKind::WakeConditionTimedOut => "wake_condition_timed_out",
2111            EventKind::WakeConditionCancelled => "wake_condition_cancelled",
2112            EventKind::WakeConditionFailed => "wake_condition_failed",
2113            _ => "agent_pool_event",
2114        }
2115    }
2116}
2117
2118fn intersect_run_ids(filter: &EventFilterSet<RunId>, allowed: &[RunId]) -> EventFilterSet<RunId> {
2119    match filter {
2120        EventFilterSet::Any => EventFilterSet::Include(allowed.to_vec()),
2121        EventFilterSet::Include(requested) => EventFilterSet::Include(
2122            requested
2123                .iter()
2124                .filter(|run_id| allowed.contains(run_id))
2125                .cloned()
2126                .collect(),
2127        ),
2128    }
2129}
2130
2131fn topics_from_members(members: &[AgentPoolMember]) -> BTreeMap<TopicId, BTreeSet<RunId>> {
2132    let mut topics = BTreeMap::new();
2133    for member in members {
2134        for topic in &member.topics {
2135            topics
2136                .entry(topic.clone())
2137                .or_insert_with(BTreeSet::new)
2138                .insert(member.run_id.clone());
2139        }
2140    }
2141    topics
2142}