Skip to main content

agent_sdk_core/application/
agent_pool.rs

1//! Feature-layer agent-pool coordination over runs, messages, wake conditions, and
2//! subscriptions. Use this module for generic run-to-run coordination without
3//! introducing workflow-engine or product swarm behavior. Side-effecting operations may
4//! update pool membership, append source-run journal records, and publish agent-pool
5//! events through the configured runtime ports.
6//!
7use std::{
8    collections::{BTreeMap, BTreeSet, VecDeque},
9    sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    domain::{
16        AgentError, AgentErrorKind, AgentId, AgentPoolId, ContentRef, DestinationKind,
17        DestinationRef, EffectId, EntityRef, EventId, IdempotencyKey, MessageId, PolicyRef,
18        PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef, SpanId, TopicId, TraceId,
19        WakeConditionId,
20    },
21    effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
22    event::{
23        AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
24        EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
25        EventFilterSet, EventFrame, EventKind, EventStreamScope, PayloadAccessMode,
26    },
27    event_bus::AgentEventStream,
28    journal::{
29        AgentPoolLifecycleStatus, AgentPoolRecord, EventIndexProjection, JOURNAL_SCHEMA_VERSION,
30        JournalCursor, JournalRecord, JournalRecordKind, JournalRecordPayload,
31        RunMessageAddressTargetRecord, RunMessageDeliveryStatus, RunMessageRecord, WakeRecord,
32        WakeResumeInputPolicyRecord, WakeTriggerStatus,
33    },
34    run::RunRequest,
35    run_handle::RunHandle,
36    runtime::AgentRuntime,
37};
38
39#[derive(Clone)]
40/// Holds agent pool application-layer state or configuration.
41/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
42pub struct AgentPool {
43    pool_id: AgentPoolId,
44    runtime: AgentRuntime,
45    store: Arc<dyn AgentPoolStore>,
46}
47
48impl AgentPool {
49    /// Starts a builder for this application::agent_pool value.
50    /// Building is data-only; runtime side effects occur only when a
51    /// later coordinator or host port executes the built configuration.
52    pub fn builder(pool_id: AgentPoolId) -> AgentPoolBuilder {
53        AgentPoolBuilder {
54            pool_id,
55            runtime: None,
56            message_policy: AgentPoolMessagePolicy::bounded_defaults(),
57            wake_policy: AgentPoolWakePolicy::safe_defaults(),
58            policy_refs: Vec::new(),
59            store: None,
60        }
61    }
62
63    /// Returns the pool id currently held by this value.
64    /// This is a data-only accessor and does not change membership or wake state.
65    pub fn pool_id(&self) -> &AgentPoolId {
66        &self.pool_id
67    }
68
69    /// Starts a run through the shared runtime and joins it to this pool.
70    /// Runtime registration and provider-loop effects stay in `AgentRuntime`;
71    /// the pool side effect is membership tracking for coordination.
72    pub fn start_run(&self, request: RunRequest) -> Result<RunHandle, AgentError> {
73        let handle = self.runtime.start_run(request.clone())?;
74        self.join_run(AgentPoolMember::new(request.run_id, request.agent_id))?;
75        Ok(handle)
76    }
77
78    /// Join run.
79    /// This records pool membership in the coordinator so later pool messages and subscriptions
80    /// can target the run.
81    pub fn join_run(&self, member: AgentPoolMember) -> Result<(), AgentError> {
82        let should_create = {
83            let snapshot = self.snapshot()?;
84            !snapshot.created
85        };
86
87        if should_create {
88            self.append_pool_record(
89                &member.run_id,
90                &member.agent_id,
91                AgentPoolLifecycleStatus::Created,
92                EventKind::AgentPoolCreated,
93            )?;
94            self.store.record_pool_created(&self.pool_id)?;
95        }
96
97        self.store.join_member(&self.pool_id, member.clone())?;
98
99        self.append_pool_record(
100            &member.run_id,
101            &member.agent_id,
102            AgentPoolLifecycleStatus::RunJoined,
103            EventKind::AgentPoolRunJoined,
104        )?;
105        Ok(())
106    }
107
108    /// Returns the members currently held by this value.
109    /// This reads current pool membership without starting, stopping, or messaging runs.
110    pub fn members(&self) -> Result<Vec<AgentPoolMember>, AgentError> {
111        Ok(self.snapshot()?.members)
112    }
113
114    /// Records that a run has left the pool.
115    /// This removes membership from the shared store, appends a lifecycle record, and publishes
116    /// a pool event. It does not cancel or otherwise mutate the run itself.
117    pub fn leave_run(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
118        let (member, _) = self.store.leave_member(&self.pool_id, run_id)?;
119        self.append_pool_record(
120            &member.run_id,
121            &member.agent_id,
122            AgentPoolLifecycleStatus::RunLeft,
123            EventKind::AgentPoolRunLeft,
124        )?;
125        Ok(member)
126    }
127
128    /// Sends a run message through the pool coordinator.
129    /// This resolves the addressed members, applies pool message policy, appends accepted
130    /// and terminal delivery records to the source run journal, publishes the matching
131    /// agent-pool events, and deduplicates repeated calls by idempotency key.
132    pub fn send(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
133        if let Some(receipt) = self
134            .store
135            .message_receipt(&self.pool_id, &message.idempotency_key)?
136        {
137            return Ok(receipt);
138        }
139
140        let delivered_to = self.resolve_address(&message);
141        let terminal_status = if message.expires_at_millis == Some(0) {
142            MessageStatus::Expired
143        } else if delivered_to.is_empty() {
144            MessageStatus::Failed
145        } else {
146            MessageStatus::Delivered
147        };
148
149        if terminal_status == MessageStatus::Expired {
150            let receipt =
151                self.record_message_status(&message, MessageStatus::Expired, Vec::new())?;
152            return Ok(receipt);
153        }
154
155        if terminal_status == MessageStatus::Failed {
156            let receipt =
157                self.record_message_status(&message, MessageStatus::Failed, Vec::new())?;
158            return Ok(receipt);
159        }
160
161        self.record_message_status(&message, MessageStatus::Accepted, delivered_to.clone())?;
162        let receipt =
163            self.record_message_status(&message, MessageStatus::Delivered, delivered_to)?;
164        Ok(receipt)
165    }
166
167    /// Records one run-message status transition.
168    /// This appends the status record to the source run journal, publishes the matching
169    /// agent-pool event on the runtime event bus, and returns a receipt carrying the journal
170    /// cursor. Use [`AgentPool::send`] for the full accept-to-terminal delivery flow.
171    pub fn record_message_status(
172        &self,
173        message: &RunMessage,
174        status: MessageStatus,
175        delivered_to: Vec<RunId>,
176    ) -> Result<MessageReceipt, AgentError> {
177        let source_member = self.member(&message.from)?;
178        let journal = self.runtime.journal_port(&message.from)?;
179        let record = self.run_message_record(message, status.clone(), delivered_to.clone())?;
180        let cursor = journal.append(record)?;
181        let frame = self.publish_agent_pool_event(
182            message.from.clone(),
183            source_member.agent_id,
184            status.event_kind(),
185            Some(message.message_id.clone()),
186            None,
187            EntityRef::message(message.message_id.clone()),
188            message.target_related_refs(&delivered_to),
189            Some(message.to.destination_ref.clone()),
190            message.policy_refs.clone(),
191            Some(cursor.clone()),
192            status.redacted_summary(),
193        )?;
194
195        let receipt = MessageReceipt {
196            message_id: message.message_id.clone(),
197            status,
198            delivered_to,
199            journal_cursor: Some(cursor),
200        };
201        self.store
202            .record_message(&self.pool_id, message.clone(), receipt.clone())?;
203        self.trigger_matching_wakes(&frame)?;
204        Ok(receipt)
205    }
206
207    /// Subscribe.
208    /// This creates a read-only subscription scoped by pool membership and the supplied filter.
209    pub fn subscribe(
210        &self,
211        filter: EventFilter,
212        cursor: Option<crate::event::EventCursor>,
213    ) -> Result<AgentEventStream, AgentError> {
214        let compiled = self.compile_scoped_filter(filter)?;
215        self.runtime.subscribe_events(compiled, cursor)
216    }
217
218    /// Computes or returns compile scoped filter for the
219    /// application::agent_pool contract without external I/O or side effects.
220    pub fn compile_scoped_filter(
221        &self,
222        filter: EventFilter,
223    ) -> Result<CompiledEventFilter, AgentError> {
224        self.scope_filter(filter).compile()
225    }
226
227    /// Returns scope filter derived from the supplied state.
228    /// This operates on the named coordinator state or selected port; it does not create a
229    /// parallel runtime path.
230    pub fn scope_filter(&self, mut filter: EventFilter) -> EventFilter {
231        let allowed_runs = self.observable_member_runs();
232        filter.run_ids = intersect_run_ids(&filter.run_ids, &allowed_runs);
233        let envelope_only = self
234            .snapshot()
235            .map(|snapshot| snapshot.wake_policy.envelope_only)
236            .unwrap_or(true);
237        if envelope_only {
238            filter.payload_access = PayloadAccessMode::EnvelopeOnly;
239        }
240        filter
241    }
242
243    /// Registers a wake condition for a pool member run.
244    /// This mutates the pool's wake registry and dedupe index, scopes the event filter to current
245    /// members, and may poll the configured event subscription port to trigger immediately.
246    pub fn suspend_until(
247        &self,
248        run_id: RunId,
249        condition: WakeCondition,
250    ) -> Result<WakeRegistration, AgentError> {
251        if run_id != condition.run_id {
252            return Err(AgentError::new(
253                AgentErrorKind::InvalidStateTransition,
254                RetryClassification::NotRetryable,
255                "wake registration run_id must match condition run_id",
256            ));
257        }
258
259        if let Some(registration) = self
260            .store
261            .wake_registration(&self.pool_id, &condition.idempotency_key)?
262        {
263            return Ok(registration);
264        }
265
266        self.member(&condition.run_id)?;
267        let compiled = self.compile_scoped_filter(condition.filter.clone())?;
268        let mut registration = self.record_wake_status(
269            &condition,
270            compiled.clone(),
271            WakeRegistrationStatus::Registered,
272            None,
273        )?;
274
275        if condition.timeout_millis == Some(0) {
276            registration = self.record_wake_status(
277                &condition,
278                compiled,
279                WakeRegistrationStatus::TimedOut,
280                None,
281            )?;
282        } else if let Some(frame) = self
283            .runtime
284            .subscribe_events(compiled.clone(), None)?
285            .next()
286        {
287            registration = self.record_wake_status(
288                &condition,
289                compiled,
290                WakeRegistrationStatus::Triggered,
291                Some(frame.event.envelope.event_id),
292            )?;
293        }
294
295        Ok(registration)
296    }
297
298    /// Polls a registered wake condition for a matching event.
299    /// This reads and may update pool wake state through `record_wake_status`; it creates a
300    /// read-only event subscription but does not cancel or advance the target run.
301    pub fn poll_wake(
302        &self,
303        condition_id: &WakeConditionId,
304    ) -> Result<WakeRegistration, AgentError> {
305        let stored = self
306            .store
307            .wake(&self.pool_id, condition_id)?
308            .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
309
310        if stored.registration.status != WakeRegistrationStatus::Registered {
311            return Ok(stored.registration);
312        }
313
314        let Some(frame) = self
315            .runtime
316            .subscribe_events(stored.compiled_filter.clone(), None)?
317            .next()
318        else {
319            return Ok(stored.registration);
320        };
321
322        self.record_wake_status(
323            &stored.condition,
324            stored.compiled_filter,
325            WakeRegistrationStatus::Triggered,
326            Some(frame.event.envelope.event_id),
327        )
328    }
329
330    /// Cancel wake.
331    /// This marks a registered wake condition as cancelled in pool state; it does not cancel
332    /// the run itself.
333    pub fn cancel_wake(
334        &self,
335        condition_id: &WakeConditionId,
336    ) -> Result<WakeRegistration, AgentError> {
337        let stored = self
338            .store
339            .wake(&self.pool_id, condition_id)?
340            .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
341        self.record_wake_status(
342            &stored.condition,
343            stored.compiled_filter,
344            WakeRegistrationStatus::Cancelled,
345            None,
346        )
347    }
348
349    fn record_wake_status(
350        &self,
351        condition: &WakeCondition,
352        compiled_filter: CompiledEventFilter,
353        status: WakeRegistrationStatus,
354        matched_event_id: Option<EventId>,
355    ) -> Result<WakeRegistration, AgentError> {
356        let member = self.member(&condition.run_id)?;
357        let journal = self.runtime.journal_port(&condition.run_id)?;
358        let wake_record = WakeRecord {
359            condition_id: condition.condition_id.clone(),
360            run_id: condition.run_id.clone(),
361            event_filter_fingerprint: compiled_filter.filter_fingerprint.clone(),
362            timeout_millis: condition.timeout_millis,
363            resume_policy: condition.resume_with.clone().into(),
364            trigger_status: status.clone().into(),
365            policy_refs: condition.policy_refs.clone(),
366            idempotency_key: condition.idempotency_key.clone(),
367            matched_event_id,
368        };
369        let record = self.journal_record(
370            condition.run_id.clone(),
371            member.agent_id.clone(),
372            JournalRecordKind::Wake,
373            "agent_pool",
374            status.event_kind().wire_name(),
375            EntityRef::wake_condition(condition.condition_id.clone()),
376            vec![EntityRef::run(condition.run_id.clone())],
377            condition.policy_refs.clone(),
378            Vec::new(),
379            Some(condition.idempotency_key.clone()),
380            JournalRecordPayload::Wake(wake_record),
381        )?;
382        let cursor = journal.append(record)?;
383        self.publish_agent_pool_event(
384            condition.run_id.clone(),
385            member.agent_id,
386            status.event_kind(),
387            None,
388            Some(condition.condition_id.clone()),
389            EntityRef::wake_condition(condition.condition_id.clone()),
390            vec![EntityRef::run(condition.run_id.clone())],
391            Some(DestinationRef::with_kind(
392                DestinationKind::Agent,
393                condition.run_id.as_str(),
394            )),
395            condition.policy_refs.clone(),
396            Some(cursor.clone()),
397            status.redacted_summary(),
398        )?;
399
400        let registration = WakeRegistration {
401            condition_id: condition.condition_id.clone(),
402            run_id: condition.run_id.clone(),
403            status,
404            journal_cursor: Some(cursor),
405        };
406
407        self.store.record_wake(
408            &self.pool_id,
409            condition.clone(),
410            compiled_filter,
411            registration.clone(),
412        )?;
413
414        Ok(registration)
415    }
416
417    fn append_pool_record(
418        &self,
419        run_id: &RunId,
420        agent_id: &AgentId,
421        status: AgentPoolLifecycleStatus,
422        event_kind: EventKind,
423    ) -> Result<(), AgentError> {
424        let journal = self.runtime.journal_port(run_id)?;
425        let snapshot = self.snapshot()?;
426        let member_run_ids = snapshot
427            .members
428            .iter()
429            .map(|member| member.run_id.clone())
430            .collect::<Vec<_>>();
431        let topics = snapshot.topics;
432        let policy_refs = snapshot.policy_refs;
433
434        let record = AgentPoolRecord {
435            pool_id: self.pool_id.clone(),
436            member_run_ids,
437            topics,
438            policy_refs: policy_refs.clone(),
439            lifecycle_status: status,
440        };
441        let journal_record = self.journal_record(
442            run_id.clone(),
443            agent_id.clone(),
444            JournalRecordKind::AgentPool,
445            "agent_pool",
446            event_kind.wire_name(),
447            EntityRef::run(run_id.clone()),
448            Vec::new(),
449            policy_refs.clone(),
450            Vec::new(),
451            None,
452            JournalRecordPayload::AgentPool(record),
453        )?;
454        let cursor = journal.append(journal_record)?;
455        self.publish_agent_pool_event(
456            run_id.clone(),
457            agent_id.clone(),
458            event_kind,
459            None,
460            None,
461            EntityRef::run(run_id.clone()),
462            Vec::new(),
463            Some(DestinationRef::with_kind(
464                DestinationKind::Agent,
465                run_id.as_str(),
466            )),
467            policy_refs,
468            Some(cursor),
469            "agent pool membership updated",
470        )?;
471        Ok(())
472    }
473
474    fn run_message_record(
475        &self,
476        message: &RunMessage,
477        status: MessageStatus,
478        delivered_to: Vec<RunId>,
479    ) -> Result<JournalRecord, AgentError> {
480        let member = self.member(&message.from)?;
481        let mut effect_intent = None;
482        let mut effect_result = None;
483        let effect_id = EffectId::new(format!(
484            "effect.run_message.{}",
485            message.message_id.as_str()
486        ));
487
488        if status == MessageStatus::Accepted {
489            let mut intent = EffectIntent::new(
490                effect_id.clone(),
491                EffectKind::RunMessageDelivery,
492                EntityRef::message(message.message_id.clone()),
493                SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
494                "run message delivery intent",
495            );
496            intent.destination = Some(message.to.destination_ref.clone());
497            intent.policy_refs = message.policy_refs.clone();
498            intent.idempotency_key = Some(message.idempotency_key.clone());
499            intent.content_refs = vec![message.content_ref.clone()];
500            effect_intent = Some(intent);
501        }
502
503        if status.is_terminal_delivery() {
504            effect_result = Some(EffectResult {
505                effect_id,
506                terminal_status: status.effect_terminal_status(),
507                external_operation_id: None,
508                reconciliation_ref: None,
509                error_ref: None,
510                content_refs: vec![message.content_ref.clone()],
511                redacted_summary: status.redacted_summary().to_string(),
512            });
513        }
514
515        let record = RunMessageRecord {
516            message_id: message.message_id.clone(),
517            source_run_id: message.from.clone(),
518            address_target: message.to.target.clone().into(),
519            content_ref: message.content_ref.clone(),
520            correlation: message.correlation.clone(),
521            reply_to: message.reply_to.clone(),
522            delivery_status: status.clone().into(),
523            delivered_to: delivered_to.clone(),
524            policy_refs: message.policy_refs.clone(),
525            idempotency_key: message.idempotency_key.clone(),
526            effect_intent,
527            effect_result,
528        };
529
530        self.journal_record(
531            message.from.clone(),
532            member.agent_id,
533            JournalRecordKind::RunMessage,
534            "agent_pool",
535            status.event_kind().wire_name(),
536            EntityRef::message(message.message_id.clone()),
537            message.target_related_refs(&delivered_to),
538            message.policy_refs.clone(),
539            vec![message.content_ref.clone()],
540            Some(message.idempotency_key.clone()),
541            JournalRecordPayload::RunMessage(record),
542        )
543    }
544
545    #[expect(
546        clippy::too_many_arguments,
547        reason = "journal-backed pool records intentionally spell out lineage, refs, and payload until a dedicated record-builder API replaces this private helper"
548    )]
549    fn journal_record(
550        &self,
551        run_id: RunId,
552        agent_id: AgentId,
553        record_kind: JournalRecordKind,
554        event_family: impl Into<String>,
555        event_kind: impl Into<String>,
556        subject_ref: EntityRef,
557        related_refs: Vec<EntityRef>,
558        _policy_refs: Vec<PolicyRef>,
559        content_refs: Vec<ContentRef>,
560        idempotency_key: Option<IdempotencyKey>,
561        payload: JournalRecordPayload,
562    ) -> Result<JournalRecord, AgentError> {
563        let journal_seq = self.runtime.next_journal_seq();
564        let source = SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool");
565        let fingerprint = self
566            .runtime
567            .run_snapshot(&run_id)
568            .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
569            .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
570        let session_id = self
571            .runtime
572            .run_snapshot(&run_id)
573            .ok()
574            .and_then(|snapshot| snapshot.session_id);
575        let event_family = event_family.into();
576        let event_kind = event_kind.into();
577
578        Ok(JournalRecord {
579            journal_schema_version: JOURNAL_SCHEMA_VERSION,
580            journal_seq,
581            record_id: format!("journal.record.agent_pool.{journal_seq}"),
582            record_kind,
583            run_id: run_id.clone(),
584            session_id: session_id.clone(),
585            agent_id: agent_id.clone(),
586            turn_id: None,
587            attempt_id: None,
588            subject_ref: subject_ref.clone(),
589            related_refs: related_refs.clone(),
590            causal_refs: Vec::new(),
591            source: source.clone(),
592            destination: Some(DestinationRef::with_kind(
593                DestinationKind::Journal,
594                "destination.journal.agent_pool",
595            )),
596            correlation_keys: Vec::new(),
597            tags: vec!["feature:agent_pool".to_string()],
598            delivery_semantics: "journal_backed".to_string(),
599            event_index: EventIndexProjection {
600                run_id,
601                session_id,
602                agent_id,
603                turn_id: None,
604                event_family,
605                event_kind,
606                source,
607                destination: Some(DestinationRef::with_kind(
608                    DestinationKind::EventStream,
609                    "destination.event_stream.agent_pool",
610                )),
611                subject_ref,
612                related_refs,
613                correlation_keys: Vec::new(),
614                tags: vec!["feature:agent_pool".to_string()],
615                privacy_class: PrivacyClass::ContentRefsOnly,
616                delivery_semantics: "journal_backed".to_string(),
617            },
618            timestamp_millis: journal_seq,
619            runtime_package_fingerprint: fingerprint,
620            privacy: PrivacyClass::ContentRefsOnly,
621            content_refs,
622            redaction_policy_id: "redaction.agent_pool.default".to_string(),
623            idempotency_key,
624            dedupe_key: None,
625            checkpoint_ref: None,
626            payload,
627        })
628    }
629
630    #[expect(
631        clippy::too_many_arguments,
632        reason = "event publication mirrors the durable event envelope fields so lineage stays explicit at the call site"
633    )]
634    fn publish_agent_pool_event(
635        &self,
636        run_id: RunId,
637        agent_id: AgentId,
638        event_kind: EventKind,
639        message_id: Option<MessageId>,
640        wake_condition_id: Option<WakeConditionId>,
641        subject_ref: EntityRef,
642        mut related_refs: Vec<EntityRef>,
643        destination: Option<DestinationRef>,
644        policy_refs: Vec<PolicyRef>,
645        journal_cursor: Option<JournalCursor>,
646        summary: impl Into<String>,
647    ) -> Result<EventFrame, AgentError> {
648        if let Some(condition_id) = wake_condition_id {
649            related_refs.push(EntityRef::wake_condition(condition_id));
650        }
651        let event_counter = self.store.next_event_sequence(&self.pool_id)?;
652        let fingerprint = self
653            .runtime
654            .run_snapshot(&run_id)
655            .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
656            .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
657        let session_id = self
658            .runtime
659            .run_snapshot(&run_id)
660            .ok()
661            .and_then(|snapshot| snapshot.session_id);
662        let event = AgentEvent::with_redacted_summary(
663            EventEnvelope {
664                schema_version: EVENT_SCHEMA_VERSION,
665                event_id: EventId::new(format!(
666                    "event.agent_pool.{}.{}",
667                    self.pool_id.as_str(),
668                    event_counter
669                )),
670                event_seq: 0,
671                event_family: EventFamily::AgentPool,
672                event_kind,
673                payload_schema_version: 1,
674                timestamp: format!("1970-01-01T00:00:{event_counter:02}Z"),
675                recorded_at: format!("1970-01-01T00:00:{event_counter:02}Z"),
676                run_id,
677                session_id,
678                agent_id,
679                turn_id: None,
680                attempt_id: None,
681                message_id,
682                context_item_id: None,
683                trace_id: TraceId::new(format!("trace.agent_pool.{}", self.pool_id.as_str())),
684                span_id: SpanId::new(format!(
685                    "span.agent_pool.{}.{}",
686                    self.pool_id.as_str(),
687                    event_counter
688                )),
689                parent_event_id: None,
690                caused_by: None,
691                subject_ref,
692                related_refs,
693                causal_refs: Vec::new(),
694                correlation: EventCorrelation::default(),
695                tags: vec![crate::event::EventTag::new("feature:agent_pool")],
696                source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
697                destination,
698                policy_refs,
699                journal_cursor,
700                state_before: None,
701                state_after: None,
702                delivery_semantics: EventDeliverySemantics::JournalBacked,
703                privacy: PrivacyClass::ContentRefsOnly,
704                content_capture: ContentCaptureMode::Off,
705                redaction_policy_id: "redaction.agent_pool.default".to_string(),
706                runtime_package_fingerprint: fingerprint,
707            },
708            summary,
709        );
710        let frame = EventFrame {
711            cursor: event.envelope.cursor(EventStreamScope::All),
712            event,
713            archive_cursor: None,
714            overflow: None,
715        };
716        self.runtime
717            .event_bus_port(&frame.event.envelope.run_id)?
718            .publish(frame.clone())?;
719        Ok(frame)
720    }
721
722    fn resolve_address(&self, message: &RunMessage) -> Vec<RunId> {
723        let Ok(snapshot) = self.snapshot() else {
724            return Vec::new();
725        };
726        let members = snapshot
727            .members
728            .iter()
729            .cloned()
730            .map(|member| (member.run_id.clone(), member))
731            .collect::<BTreeMap<_, _>>();
732        let topics = topics_from_members(&snapshot.members);
733
734        if !members.contains_key(&message.from) || !snapshot.message_policy.allows(message) {
735            return Vec::new();
736        }
737
738        let mut candidates = match &message.to.target {
739            RunAddressTarget::Run { run_id } => vec![run_id.clone()],
740            RunAddressTarget::Agent { agent_id } => members
741                .values()
742                .filter(|member| &member.agent_id == agent_id)
743                .map(|member| member.run_id.clone())
744                .collect::<Vec<_>>(),
745            RunAddressTarget::Topic { topic_id } => topics
746                .get(topic_id)
747                .map(|runs| runs.iter().cloned().collect::<Vec<_>>())
748                .unwrap_or_default(),
749            RunAddressTarget::Pool { pool_id } if pool_id == &self.pool_id => {
750                members.keys().cloned().collect::<Vec<_>>()
751            }
752            RunAddressTarget::Pool { .. } => Vec::new(),
753        };
754
755        candidates.retain(|run_id| {
756            members
757                .get(run_id)
758                .is_some_and(|member| member.allows_message_policies(&message.policy_refs))
759        });
760
761        if matches!(message.to.target, RunAddressTarget::Pool { .. })
762            && !snapshot.message_policy.include_sender_in_pool_broadcast
763        {
764            candidates.retain(|run_id| run_id != &message.from);
765        }
766
767        candidates.sort();
768        candidates.dedup();
769        candidates
770    }
771
772    fn observable_member_runs(&self) -> Vec<RunId> {
773        self.snapshot()
774            .map(|snapshot| {
775                snapshot
776                    .members
777                    .iter()
778                    .filter(|member| member.allows_message_policies(&snapshot.policy_refs))
779                    .map(|member| member.run_id.clone())
780                    .collect()
781            })
782            .unwrap_or_default()
783    }
784
785    fn member(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
786        self.snapshot()?
787            .members
788            .into_iter()
789            .find(|member| &member.run_id == run_id)
790            .ok_or_else(|| {
791                AgentError::new(
792                    AgentErrorKind::InvalidStateTransition,
793                    RetryClassification::NotRetryable,
794                    "run is not a member of this agent pool",
795                )
796            })
797    }
798
799    /// Rehydrates the current durable pool snapshot from the configured store.
800    /// This returns only pool-backed membership, message, wake, policy, and cursor state; it
801    /// does not subscribe to the global event bus or synthesize missing records.
802    pub fn snapshot(&self) -> Result<AgentPoolSnapshot, AgentError> {
803        self.store.snapshot(&self.pool_id)
804    }
805
806    fn trigger_matching_wakes(&self, frame: &EventFrame) -> Result<(), AgentError> {
807        if matches!(
808            frame.event.envelope.event_kind,
809            EventKind::WakeConditionRegistered
810                | EventKind::WakeConditionTriggered
811                | EventKind::WakeConditionTimedOut
812                | EventKind::WakeConditionCancelled
813                | EventKind::WakeConditionFailed
814        ) {
815            return Ok(());
816        }
817
818        let wakes = self.snapshot()?.wakes;
819        for wake in wakes
820            .into_iter()
821            .filter(|wake| wake.registration.status == WakeRegistrationStatus::Registered)
822        {
823            if wake.compiled_filter.matches_envelope(&frame.event.envelope) {
824                self.record_wake_status(
825                    &wake.condition,
826                    wake.compiled_filter,
827                    WakeRegistrationStatus::Triggered,
828                    Some(frame.event.envelope.event_id.clone()),
829                )?;
830            }
831        }
832        Ok(())
833    }
834
835    /// Watches durable pool-store changes after the supplied cursor.
836    /// This is a pool-scoped coordination-record stream, not a global event bus.
837    pub fn watch_pool(
838        &self,
839        cursor: Option<AgentPoolStoreCursor>,
840    ) -> Result<AgentPoolStoreStream, AgentError> {
841        self.store.watch(&self.pool_id, cursor)
842    }
843}
844
845#[derive(Clone)]
846/// Holds agent pool builder application-layer state or configuration.
847/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
848pub struct AgentPoolBuilder {
849    pool_id: AgentPoolId,
850    runtime: Option<AgentRuntime>,
851    message_policy: AgentPoolMessagePolicy,
852    wake_policy: AgentPoolWakePolicy,
853    policy_refs: Vec<PolicyRef>,
854    store: Option<Arc<dyn AgentPoolStore>>,
855}
856
857impl AgentPoolBuilder {
858    /// Returns an updated value with runtime configured.
859    /// This stores the runtime used by the pool builder; no run is started until `start_run` is
860    /// called.
861    pub fn runtime(mut self, runtime: AgentRuntime) -> Self {
862        self.runtime = Some(runtime);
863        self
864    }
865
866    /// Returns an updated value with message policy configured.
867    /// This is builder configuration only and performs no I/O or run coordination.
868    pub fn message_policy(mut self, policy: AgentPoolMessagePolicy) -> Self {
869        self.message_policy = policy;
870        self
871    }
872
873    /// Returns an updated value with wake policy configured.
874    /// This is builder configuration only and performs no I/O or run coordination.
875    pub fn wake_policy(mut self, policy: AgentPoolWakePolicy) -> Self {
876        self.wake_policy = policy;
877        self
878    }
879
880    /// Returns an updated value with policy ref configured.
881    /// This sets the policy reference on the coordination value and performs no I/O.
882    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
883        self.policy_refs.push(policy_ref);
884        self
885    }
886
887    /// Returns an updated value with the pool store configured.
888    /// The store is the shared coordination authority for membership,
889    /// messages, wake registrations, dedupe, rehydration, and pool watch.
890    pub fn store<S>(mut self, store: S) -> Self
891    where
892        S: AgentPoolStore + 'static,
893    {
894        self.store = Some(Arc::new(store));
895        self
896    }
897
898    /// Returns an updated value with a dynamically dispatched pool store.
899    /// Use this when sharing one store instance across multiple pool handles
900    /// or when a host provides its own adapter.
901    pub fn shared_store(mut self, store: Arc<dyn AgentPoolStore>) -> Self {
902        self.store = Some(store);
903        self
904    }
905
906    /// Finishes builder validation and returns the configured value.
907    /// This is data-only unless the surrounding builder explicitly
908    /// documents adapter or store access.
909    pub fn build(self) -> Result<AgentPool, AgentError> {
910        let runtime = self
911            .runtime
912            .ok_or_else(|| AgentError::host_configuration_needed("agent pool requires runtime"))?;
913        let store = self
914            .store
915            .unwrap_or_else(|| Arc::new(InMemoryAgentPoolStore::default()));
916        store.open_pool(
917            self.pool_id.clone(),
918            AgentPoolStoreConfig {
919                message_policy: self.message_policy,
920                wake_policy: self.wake_policy,
921                policy_refs: self.policy_refs,
922            },
923        )?;
924        Ok(AgentPool {
925            pool_id: self.pool_id,
926            runtime,
927            store,
928        })
929    }
930}
931
932#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
933/// Store configuration for one logical agent pool.
934/// This is durable pool metadata; constructing it does not open a
935/// concrete store or start coordination work.
936pub struct AgentPoolStoreConfig {
937    /// Message policy used when resolving pool messages.
938    pub message_policy: AgentPoolMessagePolicy,
939    /// Wake policy used when scoping wake filters.
940    pub wake_policy: AgentPoolWakePolicy,
941    #[serde(default, skip_serializing_if = "Vec::is_empty")]
942    /// Policy refs that scope pool-level observation and membership.
943    pub policy_refs: Vec<PolicyRef>,
944}
945
946#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
947/// Durable cursor for pool-scoped store records.
948/// Cursors are scoped by the pool passed to `watch` and should not be
949/// used as global event or journal cursors.
950pub struct AgentPoolStoreCursor {
951    /// Monotonic sequence within a logical pool store partition.
952    pub sequence: u64,
953}
954
955impl AgentPoolStoreCursor {
956    /// Builds the initial cursor before any pool-store record.
957    pub fn start() -> Self {
958        Self { sequence: 0 }
959    }
960
961    /// Builds a cursor for a known sequence.
962    pub fn new(sequence: u64) -> Self {
963        Self { sequence }
964    }
965}
966
967#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
968/// Rehydratable snapshot for one logical agent pool.
969/// The snapshot is derived only from durable store records; callers must
970/// not synthesize members, messages, or wakes outside the store.
971pub struct AgentPoolSnapshot {
972    /// Stable pool id used for typed lineage, lookup, or dedupe.
973    pub pool_id: AgentPoolId,
974    /// Whether the pool-created lifecycle record has been persisted.
975    pub created: bool,
976    /// Current members visible in the pool.
977    pub members: Vec<AgentPoolMember>,
978    /// Current topic ids known to the pool.
979    pub topics: Vec<TopicId>,
980    /// Message policy used when resolving pool messages.
981    pub message_policy: AgentPoolMessagePolicy,
982    /// Wake policy used when scoping wake filters.
983    pub wake_policy: AgentPoolWakePolicy,
984    #[serde(default, skip_serializing_if = "Vec::is_empty")]
985    /// Policy refs that scope pool-level observation and membership.
986    pub policy_refs: Vec<PolicyRef>,
987    #[serde(default, skip_serializing_if = "Vec::is_empty")]
988    /// Durable run-message status records known to the pool.
989    pub messages: Vec<AgentPoolStoredMessage>,
990    #[serde(default, skip_serializing_if = "Vec::is_empty")]
991    /// Durable wake registrations known to the pool.
992    pub wakes: Vec<AgentPoolStoredWake>,
993    #[serde(skip_serializing_if = "Option::is_none")]
994    /// Latest store cursor represented by this snapshot.
995    pub cursor: Option<AgentPoolStoreCursor>,
996}
997
998#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
999/// Durable message status stored for pool rehydration and dedupe.
1000pub struct AgentPoolStoredMessage {
1001    /// Original run message request.
1002    pub message: RunMessage,
1003    /// Receipt for the stored status transition.
1004    pub receipt: MessageReceipt,
1005}
1006
1007#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1008/// Durable wake state stored for pool rehydration and cross-handle wake
1009/// triggering.
1010pub struct AgentPoolStoredWake {
1011    /// Original wake condition.
1012    pub condition: WakeCondition,
1013    /// Scoped, compiled filter used for envelope matching.
1014    pub compiled_filter: CompiledEventFilter,
1015    /// Latest durable registration status.
1016    pub registration: WakeRegistration,
1017}
1018
1019#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1020/// One append-only pool-store record. These records are a durable
1021/// coordination view linked to journal-backed events; they are not a
1022/// replacement for `AgentEventBus`.
1023pub struct AgentPoolStoreRecord {
1024    /// Stable pool id used for typed lineage, lookup, or dedupe.
1025    pub pool_id: AgentPoolId,
1026    /// Cursor assigned by the store.
1027    pub cursor: AgentPoolStoreCursor,
1028    /// Stored pool change.
1029    pub payload: AgentPoolStoreRecordPayload,
1030}
1031
1032#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1033#[serde(tag = "type", rename_all = "snake_case")]
1034/// Finite pool-store record variants.
1035#[expect(
1036    clippy::large_enum_variant,
1037    reason = "pool store payloads are durable serde records; preserve direct variant ergonomics until a separate storage-envelope redesign"
1038)]
1039pub enum AgentPoolStoreRecordPayload {
1040    /// Pool metadata was opened or initialized.
1041    PoolOpened {
1042        /// Configuration persisted for this pool.
1043        config: AgentPoolStoreConfig,
1044    },
1045    /// Pool lifecycle was marked created by a journal-backed operation.
1046    PoolCreated,
1047    /// A run joined the pool.
1048    MemberJoined {
1049        /// Joined member.
1050        member: AgentPoolMember,
1051    },
1052    /// A run left the pool.
1053    MemberLeft {
1054        /// Left member.
1055        member: AgentPoolMember,
1056    },
1057    /// A run-message status was persisted.
1058    RunMessage {
1059        /// Stored message transition.
1060        stored: AgentPoolStoredMessage,
1061    },
1062    /// A wake status was persisted.
1063    Wake {
1064        /// Stored wake transition.
1065        stored: AgentPoolStoredWake,
1066    },
1067}
1068
1069#[derive(Clone, Debug)]
1070/// Iterator over durable pool-store records for one logical pool.
1071pub struct AgentPoolStoreStream {
1072    records: VecDeque<AgentPoolStoreRecord>,
1073}
1074
1075impl AgentPoolStoreStream {
1076    /// Builds a store stream from records already loaded by a store.
1077    pub fn new(records: impl IntoIterator<Item = AgentPoolStoreRecord>) -> Self {
1078        Self {
1079            records: records.into_iter().collect(),
1080        }
1081    }
1082}
1083
1084impl Iterator for AgentPoolStoreStream {
1085    type Item = AgentPoolStoreRecord;
1086
1087    fn next(&mut self) -> Option<Self::Item> {
1088        self.records.pop_front()
1089    }
1090}
1091
1092/// Port for durable/shared agent-pool coordination.
1093/// Implementations may use memory, SQLite, RPC, MCP, or another backing
1094/// service, but they must preserve the same pool-scoped records,
1095/// snapshots, idempotency, and watch semantics.
1096pub trait AgentPoolStore: Send + Sync {
1097    /// Create or open a logical pool and return the durable snapshot.
1098    fn open_pool(
1099        &self,
1100        pool_id: AgentPoolId,
1101        config: AgentPoolStoreConfig,
1102    ) -> Result<AgentPoolSnapshot, AgentError>;
1103
1104    /// Rehydrate the current durable pool snapshot.
1105    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError>;
1106
1107    /// Mark the pool-created lifecycle as durable.
1108    fn record_pool_created(
1109        &self,
1110        pool_id: &AgentPoolId,
1111    ) -> Result<AgentPoolStoreCursor, AgentError>;
1112
1113    /// Persist member join.
1114    fn join_member(
1115        &self,
1116        pool_id: &AgentPoolId,
1117        member: AgentPoolMember,
1118    ) -> Result<AgentPoolStoreCursor, AgentError>;
1119
1120    /// Persist member leave and return the removed member.
1121    fn leave_member(
1122        &self,
1123        pool_id: &AgentPoolId,
1124        run_id: &RunId,
1125    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError>;
1126
1127    /// Look up message dedupe state by idempotency key.
1128    fn message_receipt(
1129        &self,
1130        pool_id: &AgentPoolId,
1131        idempotency_key: &IdempotencyKey,
1132    ) -> Result<Option<MessageReceipt>, AgentError>;
1133
1134    /// Persist one message status transition.
1135    fn record_message(
1136        &self,
1137        pool_id: &AgentPoolId,
1138        message: RunMessage,
1139        receipt: MessageReceipt,
1140    ) -> Result<AgentPoolStoreCursor, AgentError>;
1141
1142    /// Look up wake dedupe state by idempotency key.
1143    fn wake_registration(
1144        &self,
1145        pool_id: &AgentPoolId,
1146        idempotency_key: &IdempotencyKey,
1147    ) -> Result<Option<WakeRegistration>, AgentError>;
1148
1149    /// Look up one stored wake by condition id.
1150    fn wake(
1151        &self,
1152        pool_id: &AgentPoolId,
1153        condition_id: &WakeConditionId,
1154    ) -> Result<Option<AgentPoolStoredWake>, AgentError>;
1155
1156    /// Persist one wake status transition.
1157    fn record_wake(
1158        &self,
1159        pool_id: &AgentPoolId,
1160        condition: WakeCondition,
1161        compiled_filter: CompiledEventFilter,
1162        registration: WakeRegistration,
1163    ) -> Result<AgentPoolStoreCursor, AgentError>;
1164
1165    /// Read durable pool changes after the supplied cursor.
1166    fn watch(
1167        &self,
1168        pool_id: &AgentPoolId,
1169        cursor: Option<AgentPoolStoreCursor>,
1170    ) -> Result<AgentPoolStoreStream, AgentError>;
1171
1172    /// Allocate a unique event sequence for pool event IDs.
1173    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError>;
1174}
1175
1176#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1177/// Holds agent pool member application-layer state or configuration.
1178/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1179pub struct AgentPoolMember {
1180    /// Run identifier used for lineage, filtering, replay, and dedupe.
1181    pub run_id: RunId,
1182    /// Agent identifier used for lineage, filtering, and ownership checks.
1183    pub agent_id: AgentId,
1184    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1185    /// Policy references that govern admission, projection, execution, or
1186    /// delivery.
1187    pub policy_refs: Vec<PolicyRef>,
1188    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1189    /// Collection of topics values.
1190    /// Ordering and membership should be treated as part of the serialized contract when
1191    /// relevant.
1192    pub topics: Vec<TopicId>,
1193}
1194
1195impl AgentPoolMember {
1196    /// Creates a new application::agent_pool value with explicit
1197    /// caller-provided inputs. This constructor is data-only and
1198    /// performs no I/O or external side effects.
1199    pub fn new(run_id: RunId, agent_id: AgentId) -> Self {
1200        Self {
1201            run_id,
1202            agent_id,
1203            policy_refs: Vec::new(),
1204            topics: Vec::new(),
1205        }
1206    }
1207
1208    /// Returns an updated value with policy ref configured.
1209    /// This sets the policy reference on the coordination value and performs no I/O.
1210    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1211        self.policy_refs.push(policy_ref);
1212        self
1213    }
1214
1215    /// Returns an updated value with topic configured.
1216    /// This sets the topic id on the address/filter value and performs no subscription by
1217    /// itself.
1218    pub fn topic(mut self, topic_id: TopicId) -> Self {
1219        self.topics.push(topic_id);
1220        self
1221    }
1222
1223    fn allows_message_policies(&self, required: &[PolicyRef]) -> bool {
1224        required
1225            .iter()
1226            .all(|required| self.policy_refs.contains(required))
1227    }
1228}
1229
1230#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1231/// Holds agent pool message policy application-layer state or configuration.
1232/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1233pub struct AgentPoolMessagePolicy {
1234    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1235    /// Typed required policy refs references. Resolving them is separate from
1236    /// constructing this record.
1237    pub required_policy_refs: Vec<PolicyRef>,
1238    /// Whether pool broadcast delivery includes the sender run as a recipient.
1239    /// Use this for explicit loopback semantics; the default coordination path should avoid
1240    /// accidental self-delivery.
1241    pub include_sender_in_pool_broadcast: bool,
1242}
1243
1244impl AgentPoolMessagePolicy {
1245    /// Builds the bounded defaults value with the documented defaults.
1246    /// This uses only local coordinator state and performs no hidden host work.
1247    pub fn bounded_defaults() -> Self {
1248        Self {
1249            required_policy_refs: Vec::new(),
1250            include_sender_in_pool_broadcast: false,
1251        }
1252    }
1253
1254    fn allows(&self, message: &RunMessage) -> bool {
1255        self.required_policy_refs
1256            .iter()
1257            .all(|required| message.policy_refs.contains(required))
1258    }
1259}
1260
1261#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1262/// Holds agent pool wake policy application-layer state or configuration.
1263/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1264pub struct AgentPoolWakePolicy {
1265    /// Whether envelope only is enabled.
1266    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
1267    pub envelope_only: bool,
1268}
1269
1270impl AgentPoolWakePolicy {
1271    /// Returns an updated value with safe defaults configured.
1272    /// This is data-only and does not perform I/O, call host ports, append journals, publish
1273    /// events, or start processes.
1274    pub fn safe_defaults() -> Self {
1275        Self {
1276            envelope_only: true,
1277        }
1278    }
1279}
1280
1281#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1282/// Holds run address application-layer state or configuration.
1283/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1284pub struct RunAddress {
1285    /// Target used by this record or request.
1286    pub target: RunAddressTarget,
1287    /// Typed destination reference that records where this item is being sent
1288    /// or projected.
1289    pub destination_ref: DestinationRef,
1290    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1291    /// Typed related refs references. Resolving them is separate from
1292    /// constructing this record.
1293    pub related_refs: Vec<EntityRef>,
1294}
1295
1296impl RunAddress {
1297    /// Builds the run value with the documented defaults.
1298    /// This uses only local coordinator state and performs no hidden host work.
1299    pub fn run(run_id: RunId) -> Self {
1300        Self {
1301            destination_ref: DestinationRef::with_kind(DestinationKind::Agent, run_id.as_str()),
1302            related_refs: vec![EntityRef::run(run_id.clone())],
1303            target: RunAddressTarget::Run { run_id },
1304        }
1305    }
1306
1307    /// Returns agent for the current value.
1308    /// This is a read-only or data-construction helper unless the method body explicitly calls
1309    /// a port or store.
1310    pub fn agent(agent_id: AgentId) -> Self {
1311        Self {
1312            destination_ref: DestinationRef::with_kind(DestinationKind::Agent, agent_id.as_str()),
1313            related_refs: vec![EntityRef::agent(agent_id.clone())],
1314            target: RunAddressTarget::Agent { agent_id },
1315        }
1316    }
1317
1318    /// Returns an updated value with topic configured.
1319    /// This sets the topic id on the address/filter value and performs no subscription by
1320    /// itself.
1321    pub fn topic(topic_id: TopicId) -> Self {
1322        Self {
1323            destination_ref: DestinationRef::with_kind(DestinationKind::Topic, topic_id.as_str()),
1324            related_refs: vec![EntityRef::topic(topic_id.clone())],
1325            target: RunAddressTarget::Topic { topic_id },
1326        }
1327    }
1328
1329    /// Builds the pool value with the documented defaults.
1330    /// This uses only local coordinator state and performs no hidden host work.
1331    pub fn pool(pool_id: AgentPoolId) -> Self {
1332        Self {
1333            destination_ref: DestinationRef::with_kind(
1334                DestinationKind::AgentPool,
1335                pool_id.as_str(),
1336            ),
1337            related_refs: vec![EntityRef::agent_pool(pool_id.clone())],
1338            target: RunAddressTarget::Pool { pool_id },
1339        }
1340    }
1341}
1342
1343#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1344#[serde(tag = "type", rename_all = "snake_case")]
1345/// Enumerates the finite run address target cases.
1346/// Serialized names are part of the SDK contract; update fixtures when variants change.
1347pub enum RunAddressTarget {
1348    /// Use this variant when the contract needs to represent run; selecting it has no side effect by itself.
1349    Run {
1350        /// Run identifier used for lineage, filtering, replay, and dedupe.
1351        run_id: RunId,
1352    },
1353    /// Use this variant when the contract needs to represent agent; selecting it has no side effect by itself.
1354    Agent {
1355        /// Agent identifier used for lineage, filtering, and ownership
1356        /// checks.
1357        agent_id: AgentId,
1358    },
1359    /// Use this variant when the contract needs to represent topic; selecting it has no side effect by itself.
1360    Topic {
1361        /// Stable topic id used for typed lineage, lookup, or dedupe.
1362        topic_id: TopicId,
1363    },
1364    /// Use this variant when the contract needs to represent pool; selecting it has no side effect by itself.
1365    Pool {
1366        /// Stable pool id used for typed lineage, lookup, or dedupe.
1367        pool_id: AgentPoolId,
1368    },
1369}
1370
1371impl RunAddressTarget {
1372    /// Returns run id for this application::agent_pool value without
1373    /// performing external I/O.
1374    pub fn run_id(&self) -> Option<&RunId> {
1375        match self {
1376            Self::Run { run_id } => Some(run_id),
1377            _ => None,
1378        }
1379    }
1380}
1381
1382#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1383/// Holds run message application-layer state or configuration.
1384/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1385pub struct RunMessage {
1386    /// Message identifier for transcript, projection, or provider-response
1387    /// lineage.
1388    pub message_id: MessageId,
1389    /// From used by this record or request.
1390    pub from: RunId,
1391    /// To used by this record or request.
1392    pub to: RunAddress,
1393    /// Content reference where payload bytes or structured tool output are
1394    /// stored.
1395    pub content_ref: ContentRef,
1396    /// Correlation used by this record or request.
1397    pub correlation: EventCorrelation,
1398    #[serde(skip_serializing_if = "Option::is_none")]
1399    /// Optional reply to value.
1400    /// When absent, callers should use the documented default or skip that optional behavior.
1401    pub reply_to: Option<MessageId>,
1402    #[serde(skip_serializing_if = "Option::is_none")]
1403    /// Optional response contract value.
1404    /// When absent, callers should use the documented default or skip that optional behavior.
1405    pub response_contract: Option<MessageResponseContract>,
1406    #[serde(skip_serializing_if = "Option::is_none")]
1407    /// Time value in milliseconds for expires at millis.
1408    /// Use it for timeout, ordering, or diagnostic calculations.
1409    pub expires_at_millis: Option<u64>,
1410    /// Idempotency setting or key for deduping retries.
1411    /// Use it to prevent duplicate side effects during replay or repair.
1412    pub idempotency_key: IdempotencyKey,
1413    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1414    /// Policy references that govern admission, projection, execution, or
1415    /// delivery.
1416    pub policy_refs: Vec<PolicyRef>,
1417}
1418
1419impl RunMessage {
1420    /// Creates a new application::agent_pool value with explicit
1421    /// caller-provided inputs. This constructor is data-only and
1422    /// performs no I/O or external side effects.
1423    pub fn new(
1424        message_id: MessageId,
1425        from: RunId,
1426        to: RunAddress,
1427        content_ref: ContentRef,
1428        idempotency_key: IdempotencyKey,
1429    ) -> Self {
1430        Self {
1431            message_id,
1432            from,
1433            to,
1434            content_ref,
1435            correlation: EventCorrelation::default(),
1436            reply_to: None,
1437            response_contract: None,
1438            expires_at_millis: None,
1439            idempotency_key,
1440            policy_refs: Vec::new(),
1441        }
1442    }
1443
1444    /// Returns an updated value with policy ref configured.
1445    /// This sets the policy reference on the coordination value and performs no I/O.
1446    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1447        self.policy_refs.push(policy_ref);
1448        self
1449    }
1450
1451    fn target_related_refs(&self, delivered_to: &[RunId]) -> Vec<EntityRef> {
1452        let mut refs = self.to.related_refs.clone();
1453        refs.extend(delivered_to.iter().cloned().map(EntityRef::run));
1454        refs.sort_by(|left, right| {
1455            left.kind
1456                .cmp(&right.kind)
1457                .then_with(|| left.as_str().cmp(right.as_str()))
1458        });
1459        refs.dedup_by(|left, right| left.kind == right.kind && left.as_str() == right.as_str());
1460        refs
1461    }
1462}
1463
1464#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1465/// Holds message response contract application-layer state or configuration.
1466/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1467pub struct MessageResponseContract {
1468    /// Expected responses used by this record or request.
1469    pub expected_responses: u32,
1470    #[serde(skip_serializing_if = "Option::is_none")]
1471    /// Time value in milliseconds for timeout millis.
1472    /// Use it for timeout, ordering, or diagnostic calculations.
1473    pub timeout_millis: Option<u64>,
1474}
1475
1476impl MessageResponseContract {
1477    /// Builds the one response value with the documented defaults.
1478    /// This uses only local coordinator state and performs no hidden host work.
1479    pub fn one_response(timeout_millis: u64) -> Self {
1480        Self {
1481            expected_responses: 1,
1482            timeout_millis: Some(timeout_millis),
1483        }
1484    }
1485}
1486
1487#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1488/// Holds message receipt application-layer state or configuration.
1489/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1490pub struct MessageReceipt {
1491    /// Message identifier for transcript, projection, or provider-response
1492    /// lineage.
1493    pub message_id: MessageId,
1494    /// Finite status for this record or lifecycle stage.
1495    pub status: MessageStatus,
1496    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1497    /// Collection of delivered to values.
1498    /// Ordering and membership should be treated as part of the serialized contract when
1499    /// relevant.
1500    pub delivered_to: Vec<RunId>,
1501    #[serde(skip_serializing_if = "Option::is_none")]
1502    /// Cursor identifying a replay, export, or subscription position.
1503    /// Use it to resume without widening the original scope.
1504    pub journal_cursor: Option<JournalCursor>,
1505}
1506
1507#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1508#[serde(rename_all = "snake_case")]
1509/// Enumerates the finite message status cases.
1510/// Serialized names are part of the SDK contract; update fixtures when variants change.
1511pub enum MessageStatus {
1512    /// Use this variant when the contract needs to represent accepted; selecting it has no side effect by itself.
1513    Accepted,
1514    /// Use this variant when the contract needs to represent delivered; selecting it has no side effect by itself.
1515    Delivered,
1516    /// Use this variant when the contract needs to represent responded; selecting it has no side effect by itself.
1517    Responded,
1518    /// Use this variant when the contract needs to represent failed; selecting it has no side effect by itself.
1519    Failed,
1520    /// Use this variant when the contract needs to represent timed out; selecting it has no side effect by itself.
1521    TimedOut,
1522    /// Use this variant when the contract needs to represent expired; selecting it has no side effect by itself.
1523    Expired,
1524    /// Use this variant when the contract needs to represent cancelled; selecting it has no side effect by itself.
1525    Cancelled,
1526}
1527
1528impl MessageStatus {
1529    fn event_kind(&self) -> EventKind {
1530        match self {
1531            Self::Accepted => EventKind::RunMessageAccepted,
1532            Self::Delivered => EventKind::RunMessageDelivered,
1533            Self::Responded => EventKind::RunMessageResponded,
1534            Self::Failed => EventKind::RunMessageFailed,
1535            Self::TimedOut => EventKind::RunMessageTimedOut,
1536            Self::Expired => EventKind::RunMessageExpired,
1537            Self::Cancelled => EventKind::RunMessageCancelled,
1538        }
1539    }
1540
1541    fn redacted_summary(&self) -> &'static str {
1542        match self {
1543            Self::Accepted => "run message accepted",
1544            Self::Delivered => "run message delivered",
1545            Self::Responded => "run message responded",
1546            Self::Failed => "run message failed",
1547            Self::TimedOut => "run message timed out",
1548            Self::Expired => "run message expired",
1549            Self::Cancelled => "run message cancelled",
1550        }
1551    }
1552
1553    fn is_terminal_delivery(&self) -> bool {
1554        matches!(
1555            self,
1556            Self::Delivered
1557                | Self::Responded
1558                | Self::Failed
1559                | Self::TimedOut
1560                | Self::Expired
1561                | Self::Cancelled
1562        )
1563    }
1564
1565    fn effect_terminal_status(&self) -> EffectTerminalStatus {
1566        match self {
1567            Self::Delivered | Self::Responded => EffectTerminalStatus::Completed,
1568            Self::TimedOut => EffectTerminalStatus::TimedOut,
1569            Self::Cancelled => EffectTerminalStatus::Cancelled,
1570            Self::Accepted => EffectTerminalStatus::Unknown,
1571            Self::Failed | Self::Expired => EffectTerminalStatus::Failed,
1572        }
1573    }
1574}
1575
1576#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1577/// Holds wake condition application-layer state or configuration.
1578/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1579pub struct WakeCondition {
1580    /// Stable condition id used for typed lineage, lookup, or dedupe.
1581    pub condition_id: WakeConditionId,
1582    /// Run identifier used for lineage, filtering, replay, and dedupe.
1583    pub run_id: RunId,
1584    /// Filter used by this record or request.
1585    pub filter: EventFilter,
1586    #[serde(skip_serializing_if = "Option::is_none")]
1587    /// Time value in milliseconds for timeout millis.
1588    /// Use it for timeout, ordering, or diagnostic calculations.
1589    pub timeout_millis: Option<u64>,
1590    /// Resume with used by this record or request.
1591    pub resume_with: ResumeInputPolicy,
1592    /// Idempotency setting or key for deduping retries.
1593    /// Use it to prevent duplicate side effects during replay or repair.
1594    pub idempotency_key: IdempotencyKey,
1595    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1596    /// Policy references that govern admission, projection, execution, or
1597    /// delivery.
1598    pub policy_refs: Vec<PolicyRef>,
1599}
1600
1601impl WakeCondition {
1602    /// Creates a new application::agent_pool value with explicit
1603    /// caller-provided inputs. This constructor is data-only and
1604    /// performs no I/O or external side effects.
1605    pub fn new(
1606        condition_id: WakeConditionId,
1607        run_id: RunId,
1608        filter: EventFilter,
1609        idempotency_key: IdempotencyKey,
1610    ) -> Self {
1611        Self {
1612            condition_id,
1613            run_id,
1614            filter,
1615            timeout_millis: None,
1616            resume_with: ResumeInputPolicy::MatchingEventRefs,
1617            idempotency_key,
1618            policy_refs: Vec::new(),
1619        }
1620    }
1621
1622    /// Returns an updated value with timeout millis configured.
1623    /// This updates the wake timeout on the condition value and performs no scheduling by
1624    /// itself.
1625    pub fn timeout_millis(mut self, timeout_millis: u64) -> Self {
1626        self.timeout_millis = Some(timeout_millis);
1627        self
1628    }
1629
1630    /// Returns an updated value with policy ref configured.
1631    /// This sets the policy reference on the coordination value and performs no I/O.
1632    pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1633        self.policy_refs.push(policy_ref);
1634        self
1635    }
1636
1637    /// Computes or returns compile envelope filter for the
1638    /// application::agent_pool contract without external I/O or side effects.
1639    pub fn compile_envelope_filter(&self) -> Result<CompiledEventFilter, AgentError> {
1640        let mut filter = self.filter.clone();
1641        filter.payload_access = PayloadAccessMode::EnvelopeOnly;
1642        filter.compile()
1643    }
1644}
1645
1646#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1647#[serde(rename_all = "snake_case")]
1648/// Enumerates the finite resume input policy cases.
1649/// Serialized names are part of the SDK contract; update fixtures when variants change.
1650pub enum ResumeInputPolicy {
1651    /// Use this variant when the contract needs to represent matching event refs; selecting it has no side effect by itself.
1652    MatchingEventRefs,
1653    /// Use this variant when the contract needs to represent redacted summary; selecting it has no side effect by itself.
1654    RedactedSummary,
1655    /// Use this variant when the contract needs to represent none; selecting it has no side effect by itself.
1656    None,
1657}
1658
1659#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1660/// Holds wake registration application-layer state or configuration.
1661/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
1662pub struct WakeRegistration {
1663    /// Stable condition id used for typed lineage, lookup, or dedupe.
1664    pub condition_id: WakeConditionId,
1665    /// Run identifier used for lineage, filtering, replay, and dedupe.
1666    pub run_id: RunId,
1667    /// Finite status for this record or lifecycle stage.
1668    pub status: WakeRegistrationStatus,
1669    #[serde(skip_serializing_if = "Option::is_none")]
1670    /// Cursor identifying a replay, export, or subscription position.
1671    /// Use it to resume without widening the original scope.
1672    pub journal_cursor: Option<JournalCursor>,
1673}
1674
1675#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1676#[serde(rename_all = "snake_case")]
1677/// Enumerates the finite wake registration status cases.
1678/// Serialized names are part of the SDK contract; update fixtures when variants change.
1679pub enum WakeRegistrationStatus {
1680    /// Use this variant when the contract needs to represent registered; selecting it has no side effect by itself.
1681    Registered,
1682    /// Use this variant when the contract needs to represent triggered; selecting it has no side effect by itself.
1683    Triggered,
1684    /// Use this variant when the contract needs to represent timed out; selecting it has no side effect by itself.
1685    TimedOut,
1686    /// Use this variant when the contract needs to represent cancelled; selecting it has no side effect by itself.
1687    Cancelled,
1688    /// Use this variant when the contract needs to represent failed; selecting it has no side effect by itself.
1689    Failed,
1690}
1691
1692impl WakeRegistrationStatus {
1693    fn event_kind(&self) -> EventKind {
1694        match self {
1695            Self::Registered => EventKind::WakeConditionRegistered,
1696            Self::Triggered => EventKind::WakeConditionTriggered,
1697            Self::TimedOut => EventKind::WakeConditionTimedOut,
1698            Self::Cancelled => EventKind::WakeConditionCancelled,
1699            Self::Failed => EventKind::WakeConditionFailed,
1700        }
1701    }
1702
1703    fn redacted_summary(&self) -> &'static str {
1704        match self {
1705            Self::Registered => "wake condition registered",
1706            Self::Triggered => "wake condition triggered",
1707            Self::TimedOut => "wake condition timed out",
1708            Self::Cancelled => "wake condition cancelled",
1709            Self::Failed => "wake condition failed",
1710        }
1711    }
1712}
1713
1714#[derive(Clone, Debug)]
1715struct AgentPoolState {
1716    created: bool,
1717    members: BTreeMap<RunId, AgentPoolMember>,
1718    topics: BTreeMap<TopicId, BTreeSet<RunId>>,
1719    message_policy: AgentPoolMessagePolicy,
1720    wake_policy: AgentPoolWakePolicy,
1721    policy_refs: Vec<PolicyRef>,
1722    messages: BTreeMap<MessageId, AgentPoolStoredMessage>,
1723    message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
1724    wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
1725    wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
1726    next_event_counter: u64,
1727}
1728
1729impl AgentPoolState {
1730    fn new(config: AgentPoolStoreConfig) -> Self {
1731        Self {
1732            created: false,
1733            members: BTreeMap::new(),
1734            topics: BTreeMap::new(),
1735            message_policy: config.message_policy,
1736            wake_policy: config.wake_policy,
1737            policy_refs: config.policy_refs,
1738            messages: BTreeMap::new(),
1739            message_dedupe: BTreeMap::new(),
1740            wake_dedupe: BTreeMap::new(),
1741            wakes: BTreeMap::new(),
1742            next_event_counter: 0,
1743        }
1744    }
1745
1746    fn config(&self) -> AgentPoolStoreConfig {
1747        AgentPoolStoreConfig {
1748            message_policy: self.message_policy.clone(),
1749            wake_policy: self.wake_policy.clone(),
1750            policy_refs: self.policy_refs.clone(),
1751        }
1752    }
1753
1754    fn snapshot(
1755        &self,
1756        pool_id: AgentPoolId,
1757        cursor: Option<AgentPoolStoreCursor>,
1758    ) -> AgentPoolSnapshot {
1759        AgentPoolSnapshot {
1760            pool_id,
1761            created: self.created,
1762            members: self.members.values().cloned().collect(),
1763            topics: self.topics.keys().cloned().collect(),
1764            message_policy: self.message_policy.clone(),
1765            wake_policy: self.wake_policy.clone(),
1766            policy_refs: self.policy_refs.clone(),
1767            messages: self.messages.values().cloned().collect(),
1768            wakes: self.wakes.values().cloned().collect(),
1769            cursor,
1770        }
1771    }
1772
1773    fn index_member(&mut self, member: AgentPoolMember) {
1774        for topic in &member.topics {
1775            self.topics
1776                .entry(topic.clone())
1777                .or_default()
1778                .insert(member.run_id.clone());
1779        }
1780        self.members.insert(member.run_id.clone(), member);
1781    }
1782
1783    fn remove_member(&mut self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
1784        let member = self.members.remove(run_id).ok_or_else(|| {
1785            AgentError::new(
1786                AgentErrorKind::InvalidStateTransition,
1787                RetryClassification::NotRetryable,
1788                "run is not a member of this agent pool",
1789            )
1790        })?;
1791        for topic in &member.topics {
1792            let remove_topic = if let Some(runs) = self.topics.get_mut(topic) {
1793                runs.remove(run_id);
1794                runs.is_empty()
1795            } else {
1796                false
1797            };
1798            if remove_topic {
1799                self.topics.remove(topic);
1800            }
1801        }
1802        Ok(member)
1803    }
1804}
1805
1806#[derive(Clone, Debug, Default)]
1807/// In-memory `AgentPoolStore` implementation.
1808/// Cloning this value shares the same backing map, making it useful for
1809/// tests that simulate two process-local `AgentPool` handles sharing one
1810/// coordination authority. Separate default values are isolated.
1811pub struct InMemoryAgentPoolStore {
1812    pools: Arc<Mutex<BTreeMap<AgentPoolId, AgentPoolState>>>,
1813    records: Arc<Mutex<BTreeMap<AgentPoolId, Vec<AgentPoolStoreRecord>>>>,
1814}
1815
1816impl InMemoryAgentPoolStore {
1817    fn with_pool_state<T>(
1818        &self,
1819        pool_id: &AgentPoolId,
1820        f: impl FnOnce(&mut AgentPoolState) -> Result<T, AgentError>,
1821    ) -> Result<T, AgentError> {
1822        let mut pools = self
1823            .pools
1824            .lock()
1825            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1826        let state = pools.get_mut(pool_id).ok_or_else(|| {
1827            AgentError::new(
1828                AgentErrorKind::HostConfigurationNeeded,
1829                RetryClassification::HostConfigurationNeeded,
1830                "agent pool store has not opened this pool",
1831            )
1832        })?;
1833        f(state)
1834    }
1835
1836    fn append_record(
1837        &self,
1838        pool_id: &AgentPoolId,
1839        payload: AgentPoolStoreRecordPayload,
1840    ) -> Result<AgentPoolStoreCursor, AgentError> {
1841        let mut records = self
1842            .records
1843            .lock()
1844            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1845        let entries = records.entry(pool_id.clone()).or_default();
1846        let cursor = AgentPoolStoreCursor::new(entries.len() as u64 + 1);
1847        entries.push(AgentPoolStoreRecord {
1848            pool_id: pool_id.clone(),
1849            cursor: cursor.clone(),
1850            payload,
1851        });
1852        Ok(cursor)
1853    }
1854
1855    fn latest_cursor(
1856        &self,
1857        pool_id: &AgentPoolId,
1858    ) -> Result<Option<AgentPoolStoreCursor>, AgentError> {
1859        let records = self
1860            .records
1861            .lock()
1862            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1863        Ok(records
1864            .get(pool_id)
1865            .and_then(|records| records.last().map(|record| record.cursor.clone())))
1866    }
1867}
1868
1869impl AgentPoolStore for InMemoryAgentPoolStore {
1870    fn open_pool(
1871        &self,
1872        pool_id: AgentPoolId,
1873        config: AgentPoolStoreConfig,
1874    ) -> Result<AgentPoolSnapshot, AgentError> {
1875        {
1876            let mut pools = self
1877                .pools
1878                .lock()
1879                .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1880            if let Some(existing) = pools.get(&pool_id) {
1881                if existing.config() != config {
1882                    return Err(AgentError::new(
1883                        AgentErrorKind::InvalidStateTransition,
1884                        RetryClassification::RepairNeeded,
1885                        "agent pool store config conflicts with existing pool",
1886                    ));
1887                }
1888            } else {
1889                pools.insert(pool_id.clone(), AgentPoolState::new(config.clone()));
1890                drop(pools);
1891                self.append_record(&pool_id, AgentPoolStoreRecordPayload::PoolOpened { config })?;
1892            }
1893        }
1894        self.snapshot(&pool_id)
1895    }
1896
1897    fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
1898        let cursor = self.latest_cursor(pool_id)?;
1899        let pools = self
1900            .pools
1901            .lock()
1902            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1903        pools
1904            .get(pool_id)
1905            .map(|state| state.snapshot(pool_id.clone(), cursor))
1906            .ok_or_else(|| {
1907                AgentError::new(
1908                    AgentErrorKind::HostConfigurationNeeded,
1909                    RetryClassification::HostConfigurationNeeded,
1910                    "agent pool store has not opened this pool",
1911                )
1912            })
1913    }
1914
1915    fn record_pool_created(
1916        &self,
1917        pool_id: &AgentPoolId,
1918    ) -> Result<AgentPoolStoreCursor, AgentError> {
1919        self.with_pool_state(pool_id, |state| {
1920            state.created = true;
1921            Ok(())
1922        })?;
1923        self.append_record(pool_id, AgentPoolStoreRecordPayload::PoolCreated)
1924    }
1925
1926    fn join_member(
1927        &self,
1928        pool_id: &AgentPoolId,
1929        member: AgentPoolMember,
1930    ) -> Result<AgentPoolStoreCursor, AgentError> {
1931        self.with_pool_state(pool_id, |state| {
1932            state.index_member(member.clone());
1933            Ok(())
1934        })?;
1935        self.append_record(
1936            pool_id,
1937            AgentPoolStoreRecordPayload::MemberJoined { member },
1938        )
1939    }
1940
1941    fn leave_member(
1942        &self,
1943        pool_id: &AgentPoolId,
1944        run_id: &RunId,
1945    ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
1946        let member = self.with_pool_state(pool_id, |state| state.remove_member(run_id))?;
1947        let cursor = self.append_record(
1948            pool_id,
1949            AgentPoolStoreRecordPayload::MemberLeft {
1950                member: member.clone(),
1951            },
1952        )?;
1953        Ok((member, cursor))
1954    }
1955
1956    fn message_receipt(
1957        &self,
1958        pool_id: &AgentPoolId,
1959        idempotency_key: &IdempotencyKey,
1960    ) -> Result<Option<MessageReceipt>, AgentError> {
1961        self.with_pool_state(pool_id, |state| {
1962            Ok(state.message_dedupe.get(idempotency_key).cloned())
1963        })
1964    }
1965
1966    fn record_message(
1967        &self,
1968        pool_id: &AgentPoolId,
1969        message: RunMessage,
1970        receipt: MessageReceipt,
1971    ) -> Result<AgentPoolStoreCursor, AgentError> {
1972        let stored = AgentPoolStoredMessage { message, receipt };
1973        self.with_pool_state(pool_id, |state| {
1974            state.message_dedupe.insert(
1975                stored.message.idempotency_key.clone(),
1976                stored.receipt.clone(),
1977            );
1978            state
1979                .messages
1980                .insert(stored.message.message_id.clone(), stored.clone());
1981            Ok(())
1982        })?;
1983        self.append_record(pool_id, AgentPoolStoreRecordPayload::RunMessage { stored })
1984    }
1985
1986    fn wake_registration(
1987        &self,
1988        pool_id: &AgentPoolId,
1989        idempotency_key: &IdempotencyKey,
1990    ) -> Result<Option<WakeRegistration>, AgentError> {
1991        self.with_pool_state(pool_id, |state| {
1992            Ok(state.wake_dedupe.get(idempotency_key).cloned())
1993        })
1994    }
1995
1996    fn wake(
1997        &self,
1998        pool_id: &AgentPoolId,
1999        condition_id: &WakeConditionId,
2000    ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
2001        self.with_pool_state(pool_id, |state| Ok(state.wakes.get(condition_id).cloned()))
2002    }
2003
2004    fn record_wake(
2005        &self,
2006        pool_id: &AgentPoolId,
2007        condition: WakeCondition,
2008        compiled_filter: CompiledEventFilter,
2009        registration: WakeRegistration,
2010    ) -> Result<AgentPoolStoreCursor, AgentError> {
2011        let stored = AgentPoolStoredWake {
2012            condition,
2013            compiled_filter,
2014            registration,
2015        };
2016        self.with_pool_state(pool_id, |state| {
2017            state.wake_dedupe.insert(
2018                stored.condition.idempotency_key.clone(),
2019                stored.registration.clone(),
2020            );
2021            state
2022                .wakes
2023                .insert(stored.condition.condition_id.clone(), stored.clone());
2024            Ok(())
2025        })?;
2026        self.append_record(pool_id, AgentPoolStoreRecordPayload::Wake { stored })
2027    }
2028
2029    fn watch(
2030        &self,
2031        pool_id: &AgentPoolId,
2032        cursor: Option<AgentPoolStoreCursor>,
2033    ) -> Result<AgentPoolStoreStream, AgentError> {
2034        let start_after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
2035        let records = self
2036            .records
2037            .lock()
2038            .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
2039        Ok(AgentPoolStoreStream::new(
2040            records
2041                .get(pool_id)
2042                .cloned()
2043                .unwrap_or_default()
2044                .into_iter()
2045                .filter(|record| record.cursor.sequence > start_after),
2046        ))
2047    }
2048
2049    fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
2050        self.with_pool_state(pool_id, |state| {
2051            state.next_event_counter += 1;
2052            Ok(state.next_event_counter)
2053        })
2054    }
2055}
2056
2057impl From<RunAddressTarget> for RunMessageAddressTargetRecord {
2058    fn from(value: RunAddressTarget) -> Self {
2059        match value {
2060            RunAddressTarget::Run { run_id } => Self::Run { run_id },
2061            RunAddressTarget::Agent { agent_id } => Self::Agent { agent_id },
2062            RunAddressTarget::Topic { topic_id } => Self::Topic { topic_id },
2063            RunAddressTarget::Pool { pool_id } => Self::Pool { pool_id },
2064        }
2065    }
2066}
2067
2068impl From<MessageStatus> for RunMessageDeliveryStatus {
2069    fn from(value: MessageStatus) -> Self {
2070        match value {
2071            MessageStatus::Accepted => Self::Accepted,
2072            MessageStatus::Delivered => Self::Delivered,
2073            MessageStatus::Responded => Self::Responded,
2074            MessageStatus::Failed => Self::Failed,
2075            MessageStatus::TimedOut => Self::TimedOut,
2076            MessageStatus::Expired => Self::Expired,
2077            MessageStatus::Cancelled => Self::Cancelled,
2078        }
2079    }
2080}
2081
2082impl From<ResumeInputPolicy> for WakeResumeInputPolicyRecord {
2083    fn from(value: ResumeInputPolicy) -> Self {
2084        match value {
2085            ResumeInputPolicy::MatchingEventRefs => Self::MatchingEventRefs,
2086            ResumeInputPolicy::RedactedSummary => Self::RedactedSummary,
2087            ResumeInputPolicy::None => Self::None,
2088        }
2089    }
2090}
2091
2092impl From<WakeRegistrationStatus> for WakeTriggerStatus {
2093    fn from(value: WakeRegistrationStatus) -> Self {
2094        match value {
2095            WakeRegistrationStatus::Registered => Self::Registered,
2096            WakeRegistrationStatus::Triggered => Self::Triggered,
2097            WakeRegistrationStatus::TimedOut => Self::TimedOut,
2098            WakeRegistrationStatus::Cancelled => Self::Cancelled,
2099            WakeRegistrationStatus::Failed => Self::Failed,
2100        }
2101    }
2102}
2103
2104trait AgentPoolEventKindName {
2105    fn wire_name(&self) -> &'static str;
2106}
2107
2108impl AgentPoolEventKindName for EventKind {
2109    fn wire_name(&self) -> &'static str {
2110        match self {
2111            EventKind::AgentPoolCreated => "agent_pool_created",
2112            EventKind::AgentPoolRunJoined => "agent_pool_run_joined",
2113            EventKind::AgentPoolRunLeft => "agent_pool_run_left",
2114            EventKind::RunMessageAccepted => "run_message_accepted",
2115            EventKind::RunMessageDelivered => "run_message_delivered",
2116            EventKind::RunMessageResponded => "run_message_responded",
2117            EventKind::RunMessageFailed => "run_message_failed",
2118            EventKind::RunMessageTimedOut => "run_message_timed_out",
2119            EventKind::RunMessageExpired => "run_message_expired",
2120            EventKind::RunMessageCancelled => "run_message_cancelled",
2121            EventKind::WakeConditionRegistered => "wake_condition_registered",
2122            EventKind::WakeConditionTriggered => "wake_condition_triggered",
2123            EventKind::WakeConditionTimedOut => "wake_condition_timed_out",
2124            EventKind::WakeConditionCancelled => "wake_condition_cancelled",
2125            EventKind::WakeConditionFailed => "wake_condition_failed",
2126            _ => "agent_pool_event",
2127        }
2128    }
2129}
2130
2131fn intersect_run_ids(filter: &EventFilterSet<RunId>, allowed: &[RunId]) -> EventFilterSet<RunId> {
2132    match filter {
2133        EventFilterSet::Any => EventFilterSet::Include(allowed.to_vec()),
2134        EventFilterSet::Include(requested) => EventFilterSet::Include(
2135            requested
2136                .iter()
2137                .filter(|run_id| allowed.contains(run_id))
2138                .cloned()
2139                .collect(),
2140        ),
2141    }
2142}
2143
2144fn topics_from_members(members: &[AgentPoolMember]) -> BTreeMap<TopicId, BTreeSet<RunId>> {
2145    let mut topics = BTreeMap::new();
2146    for member in members {
2147        for topic in &member.topics {
2148            topics
2149                .entry(topic.clone())
2150                .or_insert_with(BTreeSet::new)
2151                .insert(member.run_id.clone());
2152        }
2153    }
2154    topics
2155}