1use std::{
8 collections::{BTreeMap, BTreeSet, VecDeque},
9 sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 domain::{
16 AgentError, AgentErrorKind, AgentId, AgentPoolId, ContentRef, DestinationKind,
17 DestinationRef, EffectId, EntityRef, EventId, IdempotencyKey, MessageId, PolicyRef,
18 PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef, SpanId, TopicId, TraceId,
19 WakeConditionId,
20 },
21 effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
22 event::{
23 AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
24 EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
25 EventFilterSet, EventFrame, EventKind, EventStreamScope, PayloadAccessMode,
26 },
27 event_bus::AgentEventStream,
28 journal::{
29 AgentPoolLifecycleStatus, AgentPoolRecord, EventIndexProjection, JOURNAL_SCHEMA_VERSION,
30 JournalCursor, JournalRecord, JournalRecordKind, JournalRecordPayload,
31 RunMessageAddressTargetRecord, RunMessageDeliveryStatus, RunMessageRecord, WakeRecord,
32 WakeResumeInputPolicyRecord, WakeTriggerStatus,
33 },
34 run::RunRequest,
35 run_handle::RunHandle,
36 runtime::AgentRuntime,
37};
38
39#[derive(Clone)]
40pub struct AgentPool {
43 pool_id: AgentPoolId,
44 runtime: AgentRuntime,
45 store: Arc<dyn AgentPoolStore>,
46}
47
48impl AgentPool {
49 pub fn builder(pool_id: AgentPoolId) -> AgentPoolBuilder {
53 AgentPoolBuilder {
54 pool_id,
55 runtime: None,
56 message_policy: AgentPoolMessagePolicy::bounded_defaults(),
57 wake_policy: AgentPoolWakePolicy::safe_defaults(),
58 policy_refs: Vec::new(),
59 store: None,
60 }
61 }
62
63 pub fn pool_id(&self) -> &AgentPoolId {
66 &self.pool_id
67 }
68
69 pub fn start_run(&self, request: RunRequest) -> Result<RunHandle, AgentError> {
73 let handle = self.runtime.start_run(request.clone())?;
74 self.join_run(AgentPoolMember::new(request.run_id, request.agent_id))?;
75 Ok(handle)
76 }
77
78 pub fn join_run(&self, member: AgentPoolMember) -> Result<(), AgentError> {
82 let should_create = {
83 let snapshot = self.snapshot()?;
84 !snapshot.created
85 };
86
87 if should_create {
88 self.append_pool_record(
89 &member.run_id,
90 &member.agent_id,
91 AgentPoolLifecycleStatus::Created,
92 EventKind::AgentPoolCreated,
93 )?;
94 self.store.record_pool_created(&self.pool_id)?;
95 }
96
97 self.store.join_member(&self.pool_id, member.clone())?;
98
99 self.append_pool_record(
100 &member.run_id,
101 &member.agent_id,
102 AgentPoolLifecycleStatus::RunJoined,
103 EventKind::AgentPoolRunJoined,
104 )?;
105 Ok(())
106 }
107
108 pub fn members(&self) -> Result<Vec<AgentPoolMember>, AgentError> {
111 Ok(self.snapshot()?.members)
112 }
113
114 pub fn leave_run(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
118 let (member, _) = self.store.leave_member(&self.pool_id, run_id)?;
119 self.append_pool_record(
120 &member.run_id,
121 &member.agent_id,
122 AgentPoolLifecycleStatus::RunLeft,
123 EventKind::AgentPoolRunLeft,
124 )?;
125 Ok(member)
126 }
127
128 pub fn send(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
133 if let Some(receipt) = self
134 .store
135 .message_receipt(&self.pool_id, &message.idempotency_key)?
136 {
137 return Ok(receipt);
138 }
139
140 let delivered_to = self.resolve_address(&message);
141 let terminal_status = if message.expires_at_millis == Some(0) {
142 MessageStatus::Expired
143 } else if delivered_to.is_empty() {
144 MessageStatus::Failed
145 } else {
146 MessageStatus::Delivered
147 };
148
149 if terminal_status == MessageStatus::Expired {
150 let receipt =
151 self.record_message_status(&message, MessageStatus::Expired, Vec::new())?;
152 return Ok(receipt);
153 }
154
155 if terminal_status == MessageStatus::Failed {
156 let receipt =
157 self.record_message_status(&message, MessageStatus::Failed, Vec::new())?;
158 return Ok(receipt);
159 }
160
161 self.record_message_status(&message, MessageStatus::Accepted, delivered_to.clone())?;
162 let receipt =
163 self.record_message_status(&message, MessageStatus::Delivered, delivered_to)?;
164 Ok(receipt)
165 }
166
167 pub fn record_message_status(
172 &self,
173 message: &RunMessage,
174 status: MessageStatus,
175 delivered_to: Vec<RunId>,
176 ) -> Result<MessageReceipt, AgentError> {
177 let source_member = self.member(&message.from)?;
178 let journal = self.runtime.journal_port(&message.from)?;
179 let record = self.run_message_record(message, status.clone(), delivered_to.clone())?;
180 let cursor = journal.append(record)?;
181 let frame = self.publish_agent_pool_event(
182 message.from.clone(),
183 source_member.agent_id,
184 status.event_kind(),
185 Some(message.message_id.clone()),
186 None,
187 EntityRef::message(message.message_id.clone()),
188 message.target_related_refs(&delivered_to),
189 Some(message.to.destination_ref.clone()),
190 message.policy_refs.clone(),
191 Some(cursor.clone()),
192 status.redacted_summary(),
193 )?;
194
195 let receipt = MessageReceipt {
196 message_id: message.message_id.clone(),
197 status,
198 delivered_to,
199 journal_cursor: Some(cursor),
200 };
201 self.store
202 .record_message(&self.pool_id, message.clone(), receipt.clone())?;
203 self.trigger_matching_wakes(&frame)?;
204 Ok(receipt)
205 }
206
207 pub fn subscribe(
210 &self,
211 filter: EventFilter,
212 cursor: Option<crate::event::EventCursor>,
213 ) -> Result<AgentEventStream, AgentError> {
214 let compiled = self.compile_scoped_filter(filter)?;
215 self.runtime.subscribe_events(compiled, cursor)
216 }
217
218 pub fn compile_scoped_filter(
221 &self,
222 filter: EventFilter,
223 ) -> Result<CompiledEventFilter, AgentError> {
224 self.scope_filter(filter).compile()
225 }
226
227 pub fn scope_filter(&self, mut filter: EventFilter) -> EventFilter {
231 let allowed_runs = self.observable_member_runs();
232 filter.run_ids = intersect_run_ids(&filter.run_ids, &allowed_runs);
233 let envelope_only = self
234 .snapshot()
235 .map(|snapshot| snapshot.wake_policy.envelope_only)
236 .unwrap_or(true);
237 if envelope_only {
238 filter.payload_access = PayloadAccessMode::EnvelopeOnly;
239 }
240 filter
241 }
242
243 pub fn suspend_until(
247 &self,
248 run_id: RunId,
249 condition: WakeCondition,
250 ) -> Result<WakeRegistration, AgentError> {
251 if run_id != condition.run_id {
252 return Err(AgentError::new(
253 AgentErrorKind::InvalidStateTransition,
254 RetryClassification::NotRetryable,
255 "wake registration run_id must match condition run_id",
256 ));
257 }
258
259 if let Some(registration) = self
260 .store
261 .wake_registration(&self.pool_id, &condition.idempotency_key)?
262 {
263 return Ok(registration);
264 }
265
266 self.member(&condition.run_id)?;
267 let compiled = self.compile_scoped_filter(condition.filter.clone())?;
268 let mut registration = self.record_wake_status(
269 &condition,
270 compiled.clone(),
271 WakeRegistrationStatus::Registered,
272 None,
273 )?;
274
275 if condition.timeout_millis == Some(0) {
276 registration = self.record_wake_status(
277 &condition,
278 compiled,
279 WakeRegistrationStatus::TimedOut,
280 None,
281 )?;
282 } else if let Some(frame) = self
283 .runtime
284 .subscribe_events(compiled.clone(), None)?
285 .next()
286 {
287 registration = self.record_wake_status(
288 &condition,
289 compiled,
290 WakeRegistrationStatus::Triggered,
291 Some(frame.event.envelope.event_id),
292 )?;
293 }
294
295 Ok(registration)
296 }
297
298 pub fn poll_wake(
302 &self,
303 condition_id: &WakeConditionId,
304 ) -> Result<WakeRegistration, AgentError> {
305 let stored = self
306 .store
307 .wake(&self.pool_id, condition_id)?
308 .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
309
310 if stored.registration.status != WakeRegistrationStatus::Registered {
311 return Ok(stored.registration);
312 }
313
314 let Some(frame) = self
315 .runtime
316 .subscribe_events(stored.compiled_filter.clone(), None)?
317 .next()
318 else {
319 return Ok(stored.registration);
320 };
321
322 self.record_wake_status(
323 &stored.condition,
324 stored.compiled_filter,
325 WakeRegistrationStatus::Triggered,
326 Some(frame.event.envelope.event_id),
327 )
328 }
329
330 pub fn cancel_wake(
334 &self,
335 condition_id: &WakeConditionId,
336 ) -> Result<WakeRegistration, AgentError> {
337 let stored = self
338 .store
339 .wake(&self.pool_id, condition_id)?
340 .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
341 self.record_wake_status(
342 &stored.condition,
343 stored.compiled_filter,
344 WakeRegistrationStatus::Cancelled,
345 None,
346 )
347 }
348
349 fn record_wake_status(
350 &self,
351 condition: &WakeCondition,
352 compiled_filter: CompiledEventFilter,
353 status: WakeRegistrationStatus,
354 matched_event_id: Option<EventId>,
355 ) -> Result<WakeRegistration, AgentError> {
356 let member = self.member(&condition.run_id)?;
357 let journal = self.runtime.journal_port(&condition.run_id)?;
358 let wake_record = WakeRecord {
359 condition_id: condition.condition_id.clone(),
360 run_id: condition.run_id.clone(),
361 event_filter_fingerprint: compiled_filter.filter_fingerprint.clone(),
362 timeout_millis: condition.timeout_millis,
363 resume_policy: condition.resume_with.clone().into(),
364 trigger_status: status.clone().into(),
365 policy_refs: condition.policy_refs.clone(),
366 idempotency_key: condition.idempotency_key.clone(),
367 matched_event_id,
368 };
369 let record = self.journal_record(
370 condition.run_id.clone(),
371 member.agent_id.clone(),
372 JournalRecordKind::Wake,
373 "agent_pool",
374 status.event_kind().wire_name(),
375 EntityRef::wake_condition(condition.condition_id.clone()),
376 vec![EntityRef::run(condition.run_id.clone())],
377 condition.policy_refs.clone(),
378 Vec::new(),
379 Some(condition.idempotency_key.clone()),
380 JournalRecordPayload::Wake(wake_record),
381 )?;
382 let cursor = journal.append(record)?;
383 self.publish_agent_pool_event(
384 condition.run_id.clone(),
385 member.agent_id,
386 status.event_kind(),
387 None,
388 Some(condition.condition_id.clone()),
389 EntityRef::wake_condition(condition.condition_id.clone()),
390 vec![EntityRef::run(condition.run_id.clone())],
391 Some(DestinationRef::with_kind(
392 DestinationKind::Agent,
393 condition.run_id.as_str(),
394 )),
395 condition.policy_refs.clone(),
396 Some(cursor.clone()),
397 status.redacted_summary(),
398 )?;
399
400 let registration = WakeRegistration {
401 condition_id: condition.condition_id.clone(),
402 run_id: condition.run_id.clone(),
403 status,
404 journal_cursor: Some(cursor),
405 };
406
407 self.store.record_wake(
408 &self.pool_id,
409 condition.clone(),
410 compiled_filter,
411 registration.clone(),
412 )?;
413
414 Ok(registration)
415 }
416
417 fn append_pool_record(
418 &self,
419 run_id: &RunId,
420 agent_id: &AgentId,
421 status: AgentPoolLifecycleStatus,
422 event_kind: EventKind,
423 ) -> Result<(), AgentError> {
424 let journal = self.runtime.journal_port(run_id)?;
425 let snapshot = self.snapshot()?;
426 let member_run_ids = snapshot
427 .members
428 .iter()
429 .map(|member| member.run_id.clone())
430 .collect::<Vec<_>>();
431 let topics = snapshot.topics;
432 let policy_refs = snapshot.policy_refs;
433
434 let record = AgentPoolRecord {
435 pool_id: self.pool_id.clone(),
436 member_run_ids,
437 topics,
438 policy_refs: policy_refs.clone(),
439 lifecycle_status: status,
440 };
441 let journal_record = self.journal_record(
442 run_id.clone(),
443 agent_id.clone(),
444 JournalRecordKind::AgentPool,
445 "agent_pool",
446 event_kind.wire_name(),
447 EntityRef::run(run_id.clone()),
448 Vec::new(),
449 policy_refs.clone(),
450 Vec::new(),
451 None,
452 JournalRecordPayload::AgentPool(record),
453 )?;
454 let cursor = journal.append(journal_record)?;
455 self.publish_agent_pool_event(
456 run_id.clone(),
457 agent_id.clone(),
458 event_kind,
459 None,
460 None,
461 EntityRef::run(run_id.clone()),
462 Vec::new(),
463 Some(DestinationRef::with_kind(
464 DestinationKind::Agent,
465 run_id.as_str(),
466 )),
467 policy_refs,
468 Some(cursor),
469 "agent pool membership updated",
470 )?;
471 Ok(())
472 }
473
474 fn run_message_record(
475 &self,
476 message: &RunMessage,
477 status: MessageStatus,
478 delivered_to: Vec<RunId>,
479 ) -> Result<JournalRecord, AgentError> {
480 let member = self.member(&message.from)?;
481 let mut effect_intent = None;
482 let mut effect_result = None;
483 let effect_id = EffectId::new(format!(
484 "effect.run_message.{}",
485 message.message_id.as_str()
486 ));
487
488 if status == MessageStatus::Accepted {
489 let mut intent = EffectIntent::new(
490 effect_id.clone(),
491 EffectKind::RunMessageDelivery,
492 EntityRef::message(message.message_id.clone()),
493 SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
494 "run message delivery intent",
495 );
496 intent.destination = Some(message.to.destination_ref.clone());
497 intent.policy_refs = message.policy_refs.clone();
498 intent.idempotency_key = Some(message.idempotency_key.clone());
499 intent.content_refs = vec![message.content_ref.clone()];
500 effect_intent = Some(intent);
501 }
502
503 if status.is_terminal_delivery() {
504 effect_result = Some(EffectResult {
505 effect_id,
506 terminal_status: status.effect_terminal_status(),
507 external_operation_id: None,
508 reconciliation_ref: None,
509 error_ref: None,
510 content_refs: vec![message.content_ref.clone()],
511 redacted_summary: status.redacted_summary().to_string(),
512 });
513 }
514
515 let record = RunMessageRecord {
516 message_id: message.message_id.clone(),
517 source_run_id: message.from.clone(),
518 address_target: message.to.target.clone().into(),
519 content_ref: message.content_ref.clone(),
520 correlation: message.correlation.clone(),
521 reply_to: message.reply_to.clone(),
522 delivery_status: status.clone().into(),
523 delivered_to: delivered_to.clone(),
524 policy_refs: message.policy_refs.clone(),
525 idempotency_key: message.idempotency_key.clone(),
526 effect_intent,
527 effect_result,
528 };
529
530 self.journal_record(
531 message.from.clone(),
532 member.agent_id,
533 JournalRecordKind::RunMessage,
534 "agent_pool",
535 status.event_kind().wire_name(),
536 EntityRef::message(message.message_id.clone()),
537 message.target_related_refs(&delivered_to),
538 message.policy_refs.clone(),
539 vec![message.content_ref.clone()],
540 Some(message.idempotency_key.clone()),
541 JournalRecordPayload::RunMessage(record),
542 )
543 }
544
545 #[expect(
546 clippy::too_many_arguments,
547 reason = "journal-backed pool records intentionally spell out lineage, refs, and payload until a dedicated record-builder API replaces this private helper"
548 )]
549 fn journal_record(
550 &self,
551 run_id: RunId,
552 agent_id: AgentId,
553 record_kind: JournalRecordKind,
554 event_family: impl Into<String>,
555 event_kind: impl Into<String>,
556 subject_ref: EntityRef,
557 related_refs: Vec<EntityRef>,
558 _policy_refs: Vec<PolicyRef>,
559 content_refs: Vec<ContentRef>,
560 idempotency_key: Option<IdempotencyKey>,
561 payload: JournalRecordPayload,
562 ) -> Result<JournalRecord, AgentError> {
563 let journal_seq = self.runtime.next_journal_seq();
564 let source = SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool");
565 let fingerprint = self
566 .runtime
567 .run_snapshot(&run_id)
568 .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
569 .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
570 let event_family = event_family.into();
571 let event_kind = event_kind.into();
572
573 Ok(JournalRecord {
574 journal_schema_version: JOURNAL_SCHEMA_VERSION,
575 journal_seq,
576 record_id: format!("journal.record.agent_pool.{journal_seq}"),
577 record_kind,
578 run_id: run_id.clone(),
579 agent_id: agent_id.clone(),
580 turn_id: None,
581 attempt_id: None,
582 subject_ref: subject_ref.clone(),
583 related_refs: related_refs.clone(),
584 causal_refs: Vec::new(),
585 source: source.clone(),
586 destination: Some(DestinationRef::with_kind(
587 DestinationKind::Journal,
588 "destination.journal.agent_pool",
589 )),
590 correlation_keys: Vec::new(),
591 tags: vec!["feature:agent_pool".to_string()],
592 delivery_semantics: "journal_backed".to_string(),
593 event_index: EventIndexProjection {
594 run_id,
595 agent_id,
596 turn_id: None,
597 event_family,
598 event_kind,
599 source,
600 destination: Some(DestinationRef::with_kind(
601 DestinationKind::EventStream,
602 "destination.event_stream.agent_pool",
603 )),
604 subject_ref,
605 related_refs,
606 correlation_keys: Vec::new(),
607 tags: vec!["feature:agent_pool".to_string()],
608 privacy_class: PrivacyClass::ContentRefsOnly,
609 delivery_semantics: "journal_backed".to_string(),
610 },
611 timestamp_millis: journal_seq,
612 runtime_package_fingerprint: fingerprint,
613 privacy: PrivacyClass::ContentRefsOnly,
614 content_refs,
615 redaction_policy_id: "redaction.agent_pool.default".to_string(),
616 idempotency_key,
617 dedupe_key: None,
618 checkpoint_ref: None,
619 payload,
620 })
621 }
622
623 #[expect(
624 clippy::too_many_arguments,
625 reason = "event publication mirrors the durable event envelope fields so lineage stays explicit at the call site"
626 )]
627 fn publish_agent_pool_event(
628 &self,
629 run_id: RunId,
630 agent_id: AgentId,
631 event_kind: EventKind,
632 message_id: Option<MessageId>,
633 wake_condition_id: Option<WakeConditionId>,
634 subject_ref: EntityRef,
635 mut related_refs: Vec<EntityRef>,
636 destination: Option<DestinationRef>,
637 policy_refs: Vec<PolicyRef>,
638 journal_cursor: Option<JournalCursor>,
639 summary: impl Into<String>,
640 ) -> Result<EventFrame, AgentError> {
641 if let Some(condition_id) = wake_condition_id {
642 related_refs.push(EntityRef::wake_condition(condition_id));
643 }
644 let event_counter = self.store.next_event_sequence(&self.pool_id)?;
645 let fingerprint = self
646 .runtime
647 .run_snapshot(&run_id)
648 .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
649 .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
650 let event = AgentEvent::with_redacted_summary(
651 EventEnvelope {
652 schema_version: EVENT_SCHEMA_VERSION,
653 event_id: EventId::new(format!(
654 "event.agent_pool.{}.{}",
655 self.pool_id.as_str(),
656 event_counter
657 )),
658 event_seq: 0,
659 event_family: EventFamily::AgentPool,
660 event_kind,
661 payload_schema_version: 1,
662 timestamp: format!("1970-01-01T00:00:{event_counter:02}Z"),
663 recorded_at: format!("1970-01-01T00:00:{event_counter:02}Z"),
664 run_id,
665 agent_id,
666 turn_id: None,
667 attempt_id: None,
668 message_id,
669 context_item_id: None,
670 trace_id: TraceId::new(format!("trace.agent_pool.{}", self.pool_id.as_str())),
671 span_id: SpanId::new(format!(
672 "span.agent_pool.{}.{}",
673 self.pool_id.as_str(),
674 event_counter
675 )),
676 parent_event_id: None,
677 caused_by: None,
678 subject_ref,
679 related_refs,
680 causal_refs: Vec::new(),
681 correlation: EventCorrelation::default(),
682 tags: vec![crate::event::EventTag::new("feature:agent_pool")],
683 source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
684 destination,
685 policy_refs,
686 journal_cursor,
687 state_before: None,
688 state_after: None,
689 delivery_semantics: EventDeliverySemantics::JournalBacked,
690 privacy: PrivacyClass::ContentRefsOnly,
691 content_capture: ContentCaptureMode::Off,
692 redaction_policy_id: "redaction.agent_pool.default".to_string(),
693 runtime_package_fingerprint: fingerprint,
694 },
695 summary,
696 );
697 let frame = EventFrame {
698 cursor: event.envelope.cursor(EventStreamScope::All),
699 event,
700 archive_cursor: None,
701 overflow: None,
702 };
703 self.runtime
704 .event_bus_port(&frame.event.envelope.run_id)?
705 .publish(frame.clone())?;
706 Ok(frame)
707 }
708
709 fn resolve_address(&self, message: &RunMessage) -> Vec<RunId> {
710 let Ok(snapshot) = self.snapshot() else {
711 return Vec::new();
712 };
713 let members = snapshot
714 .members
715 .iter()
716 .cloned()
717 .map(|member| (member.run_id.clone(), member))
718 .collect::<BTreeMap<_, _>>();
719 let topics = topics_from_members(&snapshot.members);
720
721 if !members.contains_key(&message.from) || !snapshot.message_policy.allows(message) {
722 return Vec::new();
723 }
724
725 let mut candidates = match &message.to.target {
726 RunAddressTarget::Run { run_id } => vec![run_id.clone()],
727 RunAddressTarget::Agent { agent_id } => members
728 .values()
729 .filter(|member| &member.agent_id == agent_id)
730 .map(|member| member.run_id.clone())
731 .collect::<Vec<_>>(),
732 RunAddressTarget::Topic { topic_id } => topics
733 .get(topic_id)
734 .map(|runs| runs.iter().cloned().collect::<Vec<_>>())
735 .unwrap_or_default(),
736 RunAddressTarget::Pool { pool_id } if pool_id == &self.pool_id => {
737 members.keys().cloned().collect::<Vec<_>>()
738 }
739 RunAddressTarget::Pool { .. } => Vec::new(),
740 };
741
742 candidates.retain(|run_id| {
743 members
744 .get(run_id)
745 .is_some_and(|member| member.allows_message_policies(&message.policy_refs))
746 });
747
748 if matches!(message.to.target, RunAddressTarget::Pool { .. })
749 && !snapshot.message_policy.include_sender_in_pool_broadcast
750 {
751 candidates.retain(|run_id| run_id != &message.from);
752 }
753
754 candidates.sort();
755 candidates.dedup();
756 candidates
757 }
758
759 fn observable_member_runs(&self) -> Vec<RunId> {
760 self.snapshot()
761 .map(|snapshot| {
762 snapshot
763 .members
764 .iter()
765 .filter(|member| member.allows_message_policies(&snapshot.policy_refs))
766 .map(|member| member.run_id.clone())
767 .collect()
768 })
769 .unwrap_or_default()
770 }
771
772 fn member(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
773 self.snapshot()?
774 .members
775 .into_iter()
776 .find(|member| &member.run_id == run_id)
777 .ok_or_else(|| {
778 AgentError::new(
779 AgentErrorKind::InvalidStateTransition,
780 RetryClassification::NotRetryable,
781 "run is not a member of this agent pool",
782 )
783 })
784 }
785
786 pub fn snapshot(&self) -> Result<AgentPoolSnapshot, AgentError> {
790 self.store.snapshot(&self.pool_id)
791 }
792
793 fn trigger_matching_wakes(&self, frame: &EventFrame) -> Result<(), AgentError> {
794 if matches!(
795 frame.event.envelope.event_kind,
796 EventKind::WakeConditionRegistered
797 | EventKind::WakeConditionTriggered
798 | EventKind::WakeConditionTimedOut
799 | EventKind::WakeConditionCancelled
800 | EventKind::WakeConditionFailed
801 ) {
802 return Ok(());
803 }
804
805 let wakes = self.snapshot()?.wakes;
806 for wake in wakes
807 .into_iter()
808 .filter(|wake| wake.registration.status == WakeRegistrationStatus::Registered)
809 {
810 if wake.compiled_filter.matches_envelope(&frame.event.envelope) {
811 self.record_wake_status(
812 &wake.condition,
813 wake.compiled_filter,
814 WakeRegistrationStatus::Triggered,
815 Some(frame.event.envelope.event_id.clone()),
816 )?;
817 }
818 }
819 Ok(())
820 }
821
822 pub fn watch_pool(
825 &self,
826 cursor: Option<AgentPoolStoreCursor>,
827 ) -> Result<AgentPoolStoreStream, AgentError> {
828 self.store.watch(&self.pool_id, cursor)
829 }
830}
831
832#[derive(Clone)]
833pub struct AgentPoolBuilder {
836 pool_id: AgentPoolId,
837 runtime: Option<AgentRuntime>,
838 message_policy: AgentPoolMessagePolicy,
839 wake_policy: AgentPoolWakePolicy,
840 policy_refs: Vec<PolicyRef>,
841 store: Option<Arc<dyn AgentPoolStore>>,
842}
843
844impl AgentPoolBuilder {
845 pub fn runtime(mut self, runtime: AgentRuntime) -> Self {
849 self.runtime = Some(runtime);
850 self
851 }
852
853 pub fn message_policy(mut self, policy: AgentPoolMessagePolicy) -> Self {
856 self.message_policy = policy;
857 self
858 }
859
860 pub fn wake_policy(mut self, policy: AgentPoolWakePolicy) -> Self {
863 self.wake_policy = policy;
864 self
865 }
866
867 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
870 self.policy_refs.push(policy_ref);
871 self
872 }
873
874 pub fn store<S>(mut self, store: S) -> Self
878 where
879 S: AgentPoolStore + 'static,
880 {
881 self.store = Some(Arc::new(store));
882 self
883 }
884
885 pub fn shared_store(mut self, store: Arc<dyn AgentPoolStore>) -> Self {
889 self.store = Some(store);
890 self
891 }
892
893 pub fn build(self) -> Result<AgentPool, AgentError> {
897 let runtime = self
898 .runtime
899 .ok_or_else(|| AgentError::host_configuration_needed("agent pool requires runtime"))?;
900 let store = self
901 .store
902 .unwrap_or_else(|| Arc::new(InMemoryAgentPoolStore::default()));
903 store.open_pool(
904 self.pool_id.clone(),
905 AgentPoolStoreConfig {
906 message_policy: self.message_policy,
907 wake_policy: self.wake_policy,
908 policy_refs: self.policy_refs,
909 },
910 )?;
911 Ok(AgentPool {
912 pool_id: self.pool_id,
913 runtime,
914 store,
915 })
916 }
917}
918
919#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
920pub struct AgentPoolStoreConfig {
924 pub message_policy: AgentPoolMessagePolicy,
926 pub wake_policy: AgentPoolWakePolicy,
928 #[serde(default, skip_serializing_if = "Vec::is_empty")]
929 pub policy_refs: Vec<PolicyRef>,
931}
932
933#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
934pub struct AgentPoolStoreCursor {
938 pub sequence: u64,
940}
941
942impl AgentPoolStoreCursor {
943 pub fn start() -> Self {
945 Self { sequence: 0 }
946 }
947
948 pub fn new(sequence: u64) -> Self {
950 Self { sequence }
951 }
952}
953
954#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
955pub struct AgentPoolSnapshot {
959 pub pool_id: AgentPoolId,
961 pub created: bool,
963 pub members: Vec<AgentPoolMember>,
965 pub topics: Vec<TopicId>,
967 pub message_policy: AgentPoolMessagePolicy,
969 pub wake_policy: AgentPoolWakePolicy,
971 #[serde(default, skip_serializing_if = "Vec::is_empty")]
972 pub policy_refs: Vec<PolicyRef>,
974 #[serde(default, skip_serializing_if = "Vec::is_empty")]
975 pub messages: Vec<AgentPoolStoredMessage>,
977 #[serde(default, skip_serializing_if = "Vec::is_empty")]
978 pub wakes: Vec<AgentPoolStoredWake>,
980 #[serde(skip_serializing_if = "Option::is_none")]
981 pub cursor: Option<AgentPoolStoreCursor>,
983}
984
985#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
986pub struct AgentPoolStoredMessage {
988 pub message: RunMessage,
990 pub receipt: MessageReceipt,
992}
993
994#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
995pub struct AgentPoolStoredWake {
998 pub condition: WakeCondition,
1000 pub compiled_filter: CompiledEventFilter,
1002 pub registration: WakeRegistration,
1004}
1005
1006#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1007pub struct AgentPoolStoreRecord {
1011 pub pool_id: AgentPoolId,
1013 pub cursor: AgentPoolStoreCursor,
1015 pub payload: AgentPoolStoreRecordPayload,
1017}
1018
1019#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1020#[serde(tag = "type", rename_all = "snake_case")]
1021#[expect(
1023 clippy::large_enum_variant,
1024 reason = "pool store payloads are durable serde records; preserve direct variant ergonomics until a separate storage-envelope redesign"
1025)]
1026pub enum AgentPoolStoreRecordPayload {
1027 PoolOpened {
1029 config: AgentPoolStoreConfig,
1031 },
1032 PoolCreated,
1034 MemberJoined {
1036 member: AgentPoolMember,
1038 },
1039 MemberLeft {
1041 member: AgentPoolMember,
1043 },
1044 RunMessage {
1046 stored: AgentPoolStoredMessage,
1048 },
1049 Wake {
1051 stored: AgentPoolStoredWake,
1053 },
1054}
1055
1056#[derive(Clone, Debug)]
1057pub struct AgentPoolStoreStream {
1059 records: VecDeque<AgentPoolStoreRecord>,
1060}
1061
1062impl AgentPoolStoreStream {
1063 pub fn new(records: impl IntoIterator<Item = AgentPoolStoreRecord>) -> Self {
1065 Self {
1066 records: records.into_iter().collect(),
1067 }
1068 }
1069}
1070
1071impl Iterator for AgentPoolStoreStream {
1072 type Item = AgentPoolStoreRecord;
1073
1074 fn next(&mut self) -> Option<Self::Item> {
1075 self.records.pop_front()
1076 }
1077}
1078
1079pub trait AgentPoolStore: Send + Sync {
1084 fn open_pool(
1086 &self,
1087 pool_id: AgentPoolId,
1088 config: AgentPoolStoreConfig,
1089 ) -> Result<AgentPoolSnapshot, AgentError>;
1090
1091 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError>;
1093
1094 fn record_pool_created(
1096 &self,
1097 pool_id: &AgentPoolId,
1098 ) -> Result<AgentPoolStoreCursor, AgentError>;
1099
1100 fn join_member(
1102 &self,
1103 pool_id: &AgentPoolId,
1104 member: AgentPoolMember,
1105 ) -> Result<AgentPoolStoreCursor, AgentError>;
1106
1107 fn leave_member(
1109 &self,
1110 pool_id: &AgentPoolId,
1111 run_id: &RunId,
1112 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError>;
1113
1114 fn message_receipt(
1116 &self,
1117 pool_id: &AgentPoolId,
1118 idempotency_key: &IdempotencyKey,
1119 ) -> Result<Option<MessageReceipt>, AgentError>;
1120
1121 fn record_message(
1123 &self,
1124 pool_id: &AgentPoolId,
1125 message: RunMessage,
1126 receipt: MessageReceipt,
1127 ) -> Result<AgentPoolStoreCursor, AgentError>;
1128
1129 fn wake_registration(
1131 &self,
1132 pool_id: &AgentPoolId,
1133 idempotency_key: &IdempotencyKey,
1134 ) -> Result<Option<WakeRegistration>, AgentError>;
1135
1136 fn wake(
1138 &self,
1139 pool_id: &AgentPoolId,
1140 condition_id: &WakeConditionId,
1141 ) -> Result<Option<AgentPoolStoredWake>, AgentError>;
1142
1143 fn record_wake(
1145 &self,
1146 pool_id: &AgentPoolId,
1147 condition: WakeCondition,
1148 compiled_filter: CompiledEventFilter,
1149 registration: WakeRegistration,
1150 ) -> Result<AgentPoolStoreCursor, AgentError>;
1151
1152 fn watch(
1154 &self,
1155 pool_id: &AgentPoolId,
1156 cursor: Option<AgentPoolStoreCursor>,
1157 ) -> Result<AgentPoolStoreStream, AgentError>;
1158
1159 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError>;
1161}
1162
1163#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1164pub struct AgentPoolMember {
1167 pub run_id: RunId,
1169 pub agent_id: AgentId,
1171 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1172 pub policy_refs: Vec<PolicyRef>,
1175 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1176 pub topics: Vec<TopicId>,
1180}
1181
1182impl AgentPoolMember {
1183 pub fn new(run_id: RunId, agent_id: AgentId) -> Self {
1187 Self {
1188 run_id,
1189 agent_id,
1190 policy_refs: Vec::new(),
1191 topics: Vec::new(),
1192 }
1193 }
1194
1195 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1198 self.policy_refs.push(policy_ref);
1199 self
1200 }
1201
1202 pub fn topic(mut self, topic_id: TopicId) -> Self {
1206 self.topics.push(topic_id);
1207 self
1208 }
1209
1210 fn allows_message_policies(&self, required: &[PolicyRef]) -> bool {
1211 required
1212 .iter()
1213 .all(|required| self.policy_refs.contains(required))
1214 }
1215}
1216
1217#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1218pub struct AgentPoolMessagePolicy {
1221 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1222 pub required_policy_refs: Vec<PolicyRef>,
1225 pub include_sender_in_pool_broadcast: bool,
1229}
1230
1231impl AgentPoolMessagePolicy {
1232 pub fn bounded_defaults() -> Self {
1235 Self {
1236 required_policy_refs: Vec::new(),
1237 include_sender_in_pool_broadcast: false,
1238 }
1239 }
1240
1241 fn allows(&self, message: &RunMessage) -> bool {
1242 self.required_policy_refs
1243 .iter()
1244 .all(|required| message.policy_refs.contains(required))
1245 }
1246}
1247
1248#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1249pub struct AgentPoolWakePolicy {
1252 pub envelope_only: bool,
1255}
1256
1257impl AgentPoolWakePolicy {
1258 pub fn safe_defaults() -> Self {
1262 Self {
1263 envelope_only: true,
1264 }
1265 }
1266}
1267
1268#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1269pub struct RunAddress {
1272 pub target: RunAddressTarget,
1274 pub destination_ref: DestinationRef,
1277 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1278 pub related_refs: Vec<EntityRef>,
1281}
1282
1283impl RunAddress {
1284 pub fn run(run_id: RunId) -> Self {
1287 Self {
1288 destination_ref: DestinationRef::with_kind(DestinationKind::Agent, run_id.as_str()),
1289 related_refs: vec![EntityRef::run(run_id.clone())],
1290 target: RunAddressTarget::Run { run_id },
1291 }
1292 }
1293
1294 pub fn agent(agent_id: AgentId) -> Self {
1298 Self {
1299 destination_ref: DestinationRef::with_kind(DestinationKind::Agent, agent_id.as_str()),
1300 related_refs: vec![EntityRef::agent(agent_id.clone())],
1301 target: RunAddressTarget::Agent { agent_id },
1302 }
1303 }
1304
1305 pub fn topic(topic_id: TopicId) -> Self {
1309 Self {
1310 destination_ref: DestinationRef::with_kind(DestinationKind::Topic, topic_id.as_str()),
1311 related_refs: vec![EntityRef::topic(topic_id.clone())],
1312 target: RunAddressTarget::Topic { topic_id },
1313 }
1314 }
1315
1316 pub fn pool(pool_id: AgentPoolId) -> Self {
1319 Self {
1320 destination_ref: DestinationRef::with_kind(
1321 DestinationKind::AgentPool,
1322 pool_id.as_str(),
1323 ),
1324 related_refs: vec![EntityRef::agent_pool(pool_id.clone())],
1325 target: RunAddressTarget::Pool { pool_id },
1326 }
1327 }
1328}
1329
1330#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1331#[serde(tag = "type", rename_all = "snake_case")]
1332pub enum RunAddressTarget {
1335 Run {
1337 run_id: RunId,
1339 },
1340 Agent {
1342 agent_id: AgentId,
1345 },
1346 Topic {
1348 topic_id: TopicId,
1350 },
1351 Pool {
1353 pool_id: AgentPoolId,
1355 },
1356}
1357
1358impl RunAddressTarget {
1359 pub fn run_id(&self) -> Option<&RunId> {
1362 match self {
1363 Self::Run { run_id } => Some(run_id),
1364 _ => None,
1365 }
1366 }
1367}
1368
1369#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1370pub struct RunMessage {
1373 pub message_id: MessageId,
1376 pub from: RunId,
1378 pub to: RunAddress,
1380 pub content_ref: ContentRef,
1383 pub correlation: EventCorrelation,
1385 #[serde(skip_serializing_if = "Option::is_none")]
1386 pub reply_to: Option<MessageId>,
1389 #[serde(skip_serializing_if = "Option::is_none")]
1390 pub response_contract: Option<MessageResponseContract>,
1393 #[serde(skip_serializing_if = "Option::is_none")]
1394 pub expires_at_millis: Option<u64>,
1397 pub idempotency_key: IdempotencyKey,
1400 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1401 pub policy_refs: Vec<PolicyRef>,
1404}
1405
1406impl RunMessage {
1407 pub fn new(
1411 message_id: MessageId,
1412 from: RunId,
1413 to: RunAddress,
1414 content_ref: ContentRef,
1415 idempotency_key: IdempotencyKey,
1416 ) -> Self {
1417 Self {
1418 message_id,
1419 from,
1420 to,
1421 content_ref,
1422 correlation: EventCorrelation::default(),
1423 reply_to: None,
1424 response_contract: None,
1425 expires_at_millis: None,
1426 idempotency_key,
1427 policy_refs: Vec::new(),
1428 }
1429 }
1430
1431 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1434 self.policy_refs.push(policy_ref);
1435 self
1436 }
1437
1438 fn target_related_refs(&self, delivered_to: &[RunId]) -> Vec<EntityRef> {
1439 let mut refs = self.to.related_refs.clone();
1440 refs.extend(delivered_to.iter().cloned().map(EntityRef::run));
1441 refs.sort_by(|left, right| {
1442 left.kind
1443 .cmp(&right.kind)
1444 .then_with(|| left.as_str().cmp(right.as_str()))
1445 });
1446 refs.dedup_by(|left, right| left.kind == right.kind && left.as_str() == right.as_str());
1447 refs
1448 }
1449}
1450
1451#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1452pub struct MessageResponseContract {
1455 pub expected_responses: u32,
1457 #[serde(skip_serializing_if = "Option::is_none")]
1458 pub timeout_millis: Option<u64>,
1461}
1462
1463impl MessageResponseContract {
1464 pub fn one_response(timeout_millis: u64) -> Self {
1467 Self {
1468 expected_responses: 1,
1469 timeout_millis: Some(timeout_millis),
1470 }
1471 }
1472}
1473
1474#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1475pub struct MessageReceipt {
1478 pub message_id: MessageId,
1481 pub status: MessageStatus,
1483 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1484 pub delivered_to: Vec<RunId>,
1488 #[serde(skip_serializing_if = "Option::is_none")]
1489 pub journal_cursor: Option<JournalCursor>,
1492}
1493
1494#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1495#[serde(rename_all = "snake_case")]
1496pub enum MessageStatus {
1499 Accepted,
1501 Delivered,
1503 Responded,
1505 Failed,
1507 TimedOut,
1509 Expired,
1511 Cancelled,
1513}
1514
1515impl MessageStatus {
1516 fn event_kind(&self) -> EventKind {
1517 match self {
1518 Self::Accepted => EventKind::RunMessageAccepted,
1519 Self::Delivered => EventKind::RunMessageDelivered,
1520 Self::Responded => EventKind::RunMessageResponded,
1521 Self::Failed => EventKind::RunMessageFailed,
1522 Self::TimedOut => EventKind::RunMessageTimedOut,
1523 Self::Expired => EventKind::RunMessageExpired,
1524 Self::Cancelled => EventKind::RunMessageCancelled,
1525 }
1526 }
1527
1528 fn redacted_summary(&self) -> &'static str {
1529 match self {
1530 Self::Accepted => "run message accepted",
1531 Self::Delivered => "run message delivered",
1532 Self::Responded => "run message responded",
1533 Self::Failed => "run message failed",
1534 Self::TimedOut => "run message timed out",
1535 Self::Expired => "run message expired",
1536 Self::Cancelled => "run message cancelled",
1537 }
1538 }
1539
1540 fn is_terminal_delivery(&self) -> bool {
1541 matches!(
1542 self,
1543 Self::Delivered
1544 | Self::Responded
1545 | Self::Failed
1546 | Self::TimedOut
1547 | Self::Expired
1548 | Self::Cancelled
1549 )
1550 }
1551
1552 fn effect_terminal_status(&self) -> EffectTerminalStatus {
1553 match self {
1554 Self::Delivered | Self::Responded => EffectTerminalStatus::Completed,
1555 Self::TimedOut => EffectTerminalStatus::TimedOut,
1556 Self::Cancelled => EffectTerminalStatus::Cancelled,
1557 Self::Accepted => EffectTerminalStatus::Unknown,
1558 Self::Failed | Self::Expired => EffectTerminalStatus::Failed,
1559 }
1560 }
1561}
1562
1563#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1564pub struct WakeCondition {
1567 pub condition_id: WakeConditionId,
1569 pub run_id: RunId,
1571 pub filter: EventFilter,
1573 #[serde(skip_serializing_if = "Option::is_none")]
1574 pub timeout_millis: Option<u64>,
1577 pub resume_with: ResumeInputPolicy,
1579 pub idempotency_key: IdempotencyKey,
1582 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1583 pub policy_refs: Vec<PolicyRef>,
1586}
1587
1588impl WakeCondition {
1589 pub fn new(
1593 condition_id: WakeConditionId,
1594 run_id: RunId,
1595 filter: EventFilter,
1596 idempotency_key: IdempotencyKey,
1597 ) -> Self {
1598 Self {
1599 condition_id,
1600 run_id,
1601 filter,
1602 timeout_millis: None,
1603 resume_with: ResumeInputPolicy::MatchingEventRefs,
1604 idempotency_key,
1605 policy_refs: Vec::new(),
1606 }
1607 }
1608
1609 pub fn timeout_millis(mut self, timeout_millis: u64) -> Self {
1613 self.timeout_millis = Some(timeout_millis);
1614 self
1615 }
1616
1617 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1620 self.policy_refs.push(policy_ref);
1621 self
1622 }
1623
1624 pub fn compile_envelope_filter(&self) -> Result<CompiledEventFilter, AgentError> {
1627 let mut filter = self.filter.clone();
1628 filter.payload_access = PayloadAccessMode::EnvelopeOnly;
1629 filter.compile()
1630 }
1631}
1632
1633#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1634#[serde(rename_all = "snake_case")]
1635pub enum ResumeInputPolicy {
1638 MatchingEventRefs,
1640 RedactedSummary,
1642 None,
1644}
1645
1646#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1647pub struct WakeRegistration {
1650 pub condition_id: WakeConditionId,
1652 pub run_id: RunId,
1654 pub status: WakeRegistrationStatus,
1656 #[serde(skip_serializing_if = "Option::is_none")]
1657 pub journal_cursor: Option<JournalCursor>,
1660}
1661
1662#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1663#[serde(rename_all = "snake_case")]
1664pub enum WakeRegistrationStatus {
1667 Registered,
1669 Triggered,
1671 TimedOut,
1673 Cancelled,
1675 Failed,
1677}
1678
1679impl WakeRegistrationStatus {
1680 fn event_kind(&self) -> EventKind {
1681 match self {
1682 Self::Registered => EventKind::WakeConditionRegistered,
1683 Self::Triggered => EventKind::WakeConditionTriggered,
1684 Self::TimedOut => EventKind::WakeConditionTimedOut,
1685 Self::Cancelled => EventKind::WakeConditionCancelled,
1686 Self::Failed => EventKind::WakeConditionFailed,
1687 }
1688 }
1689
1690 fn redacted_summary(&self) -> &'static str {
1691 match self {
1692 Self::Registered => "wake condition registered",
1693 Self::Triggered => "wake condition triggered",
1694 Self::TimedOut => "wake condition timed out",
1695 Self::Cancelled => "wake condition cancelled",
1696 Self::Failed => "wake condition failed",
1697 }
1698 }
1699}
1700
1701#[derive(Clone, Debug)]
1702struct AgentPoolState {
1703 created: bool,
1704 members: BTreeMap<RunId, AgentPoolMember>,
1705 topics: BTreeMap<TopicId, BTreeSet<RunId>>,
1706 message_policy: AgentPoolMessagePolicy,
1707 wake_policy: AgentPoolWakePolicy,
1708 policy_refs: Vec<PolicyRef>,
1709 messages: BTreeMap<MessageId, AgentPoolStoredMessage>,
1710 message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
1711 wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
1712 wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
1713 next_event_counter: u64,
1714}
1715
1716impl AgentPoolState {
1717 fn new(config: AgentPoolStoreConfig) -> Self {
1718 Self {
1719 created: false,
1720 members: BTreeMap::new(),
1721 topics: BTreeMap::new(),
1722 message_policy: config.message_policy,
1723 wake_policy: config.wake_policy,
1724 policy_refs: config.policy_refs,
1725 messages: BTreeMap::new(),
1726 message_dedupe: BTreeMap::new(),
1727 wake_dedupe: BTreeMap::new(),
1728 wakes: BTreeMap::new(),
1729 next_event_counter: 0,
1730 }
1731 }
1732
1733 fn config(&self) -> AgentPoolStoreConfig {
1734 AgentPoolStoreConfig {
1735 message_policy: self.message_policy.clone(),
1736 wake_policy: self.wake_policy.clone(),
1737 policy_refs: self.policy_refs.clone(),
1738 }
1739 }
1740
1741 fn snapshot(
1742 &self,
1743 pool_id: AgentPoolId,
1744 cursor: Option<AgentPoolStoreCursor>,
1745 ) -> AgentPoolSnapshot {
1746 AgentPoolSnapshot {
1747 pool_id,
1748 created: self.created,
1749 members: self.members.values().cloned().collect(),
1750 topics: self.topics.keys().cloned().collect(),
1751 message_policy: self.message_policy.clone(),
1752 wake_policy: self.wake_policy.clone(),
1753 policy_refs: self.policy_refs.clone(),
1754 messages: self.messages.values().cloned().collect(),
1755 wakes: self.wakes.values().cloned().collect(),
1756 cursor,
1757 }
1758 }
1759
1760 fn index_member(&mut self, member: AgentPoolMember) {
1761 for topic in &member.topics {
1762 self.topics
1763 .entry(topic.clone())
1764 .or_default()
1765 .insert(member.run_id.clone());
1766 }
1767 self.members.insert(member.run_id.clone(), member);
1768 }
1769
1770 fn remove_member(&mut self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
1771 let member = self.members.remove(run_id).ok_or_else(|| {
1772 AgentError::new(
1773 AgentErrorKind::InvalidStateTransition,
1774 RetryClassification::NotRetryable,
1775 "run is not a member of this agent pool",
1776 )
1777 })?;
1778 for topic in &member.topics {
1779 let remove_topic = if let Some(runs) = self.topics.get_mut(topic) {
1780 runs.remove(run_id);
1781 runs.is_empty()
1782 } else {
1783 false
1784 };
1785 if remove_topic {
1786 self.topics.remove(topic);
1787 }
1788 }
1789 Ok(member)
1790 }
1791}
1792
1793#[derive(Clone, Debug, Default)]
1794pub struct InMemoryAgentPoolStore {
1799 pools: Arc<Mutex<BTreeMap<AgentPoolId, AgentPoolState>>>,
1800 records: Arc<Mutex<BTreeMap<AgentPoolId, Vec<AgentPoolStoreRecord>>>>,
1801}
1802
1803impl InMemoryAgentPoolStore {
1804 fn with_pool_state<T>(
1805 &self,
1806 pool_id: &AgentPoolId,
1807 f: impl FnOnce(&mut AgentPoolState) -> Result<T, AgentError>,
1808 ) -> Result<T, AgentError> {
1809 let mut pools = self
1810 .pools
1811 .lock()
1812 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1813 let state = pools.get_mut(pool_id).ok_or_else(|| {
1814 AgentError::new(
1815 AgentErrorKind::HostConfigurationNeeded,
1816 RetryClassification::HostConfigurationNeeded,
1817 "agent pool store has not opened this pool",
1818 )
1819 })?;
1820 f(state)
1821 }
1822
1823 fn append_record(
1824 &self,
1825 pool_id: &AgentPoolId,
1826 payload: AgentPoolStoreRecordPayload,
1827 ) -> Result<AgentPoolStoreCursor, AgentError> {
1828 let mut records = self
1829 .records
1830 .lock()
1831 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1832 let entries = records.entry(pool_id.clone()).or_default();
1833 let cursor = AgentPoolStoreCursor::new(entries.len() as u64 + 1);
1834 entries.push(AgentPoolStoreRecord {
1835 pool_id: pool_id.clone(),
1836 cursor: cursor.clone(),
1837 payload,
1838 });
1839 Ok(cursor)
1840 }
1841
1842 fn latest_cursor(
1843 &self,
1844 pool_id: &AgentPoolId,
1845 ) -> Result<Option<AgentPoolStoreCursor>, AgentError> {
1846 let records = self
1847 .records
1848 .lock()
1849 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1850 Ok(records
1851 .get(pool_id)
1852 .and_then(|records| records.last().map(|record| record.cursor.clone())))
1853 }
1854}
1855
1856impl AgentPoolStore for InMemoryAgentPoolStore {
1857 fn open_pool(
1858 &self,
1859 pool_id: AgentPoolId,
1860 config: AgentPoolStoreConfig,
1861 ) -> Result<AgentPoolSnapshot, AgentError> {
1862 {
1863 let mut pools = self
1864 .pools
1865 .lock()
1866 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1867 if let Some(existing) = pools.get(&pool_id) {
1868 if existing.config() != config {
1869 return Err(AgentError::new(
1870 AgentErrorKind::InvalidStateTransition,
1871 RetryClassification::RepairNeeded,
1872 "agent pool store config conflicts with existing pool",
1873 ));
1874 }
1875 } else {
1876 pools.insert(pool_id.clone(), AgentPoolState::new(config.clone()));
1877 drop(pools);
1878 self.append_record(&pool_id, AgentPoolStoreRecordPayload::PoolOpened { config })?;
1879 }
1880 }
1881 self.snapshot(&pool_id)
1882 }
1883
1884 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
1885 let cursor = self.latest_cursor(pool_id)?;
1886 let pools = self
1887 .pools
1888 .lock()
1889 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1890 pools
1891 .get(pool_id)
1892 .map(|state| state.snapshot(pool_id.clone(), cursor))
1893 .ok_or_else(|| {
1894 AgentError::new(
1895 AgentErrorKind::HostConfigurationNeeded,
1896 RetryClassification::HostConfigurationNeeded,
1897 "agent pool store has not opened this pool",
1898 )
1899 })
1900 }
1901
1902 fn record_pool_created(
1903 &self,
1904 pool_id: &AgentPoolId,
1905 ) -> Result<AgentPoolStoreCursor, AgentError> {
1906 self.with_pool_state(pool_id, |state| {
1907 state.created = true;
1908 Ok(())
1909 })?;
1910 self.append_record(pool_id, AgentPoolStoreRecordPayload::PoolCreated)
1911 }
1912
1913 fn join_member(
1914 &self,
1915 pool_id: &AgentPoolId,
1916 member: AgentPoolMember,
1917 ) -> Result<AgentPoolStoreCursor, AgentError> {
1918 self.with_pool_state(pool_id, |state| {
1919 state.index_member(member.clone());
1920 Ok(())
1921 })?;
1922 self.append_record(
1923 pool_id,
1924 AgentPoolStoreRecordPayload::MemberJoined { member },
1925 )
1926 }
1927
1928 fn leave_member(
1929 &self,
1930 pool_id: &AgentPoolId,
1931 run_id: &RunId,
1932 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
1933 let member = self.with_pool_state(pool_id, |state| state.remove_member(run_id))?;
1934 let cursor = self.append_record(
1935 pool_id,
1936 AgentPoolStoreRecordPayload::MemberLeft {
1937 member: member.clone(),
1938 },
1939 )?;
1940 Ok((member, cursor))
1941 }
1942
1943 fn message_receipt(
1944 &self,
1945 pool_id: &AgentPoolId,
1946 idempotency_key: &IdempotencyKey,
1947 ) -> Result<Option<MessageReceipt>, AgentError> {
1948 self.with_pool_state(pool_id, |state| {
1949 Ok(state.message_dedupe.get(idempotency_key).cloned())
1950 })
1951 }
1952
1953 fn record_message(
1954 &self,
1955 pool_id: &AgentPoolId,
1956 message: RunMessage,
1957 receipt: MessageReceipt,
1958 ) -> Result<AgentPoolStoreCursor, AgentError> {
1959 let stored = AgentPoolStoredMessage { message, receipt };
1960 self.with_pool_state(pool_id, |state| {
1961 state.message_dedupe.insert(
1962 stored.message.idempotency_key.clone(),
1963 stored.receipt.clone(),
1964 );
1965 state
1966 .messages
1967 .insert(stored.message.message_id.clone(), stored.clone());
1968 Ok(())
1969 })?;
1970 self.append_record(pool_id, AgentPoolStoreRecordPayload::RunMessage { stored })
1971 }
1972
1973 fn wake_registration(
1974 &self,
1975 pool_id: &AgentPoolId,
1976 idempotency_key: &IdempotencyKey,
1977 ) -> Result<Option<WakeRegistration>, AgentError> {
1978 self.with_pool_state(pool_id, |state| {
1979 Ok(state.wake_dedupe.get(idempotency_key).cloned())
1980 })
1981 }
1982
1983 fn wake(
1984 &self,
1985 pool_id: &AgentPoolId,
1986 condition_id: &WakeConditionId,
1987 ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
1988 self.with_pool_state(pool_id, |state| Ok(state.wakes.get(condition_id).cloned()))
1989 }
1990
1991 fn record_wake(
1992 &self,
1993 pool_id: &AgentPoolId,
1994 condition: WakeCondition,
1995 compiled_filter: CompiledEventFilter,
1996 registration: WakeRegistration,
1997 ) -> Result<AgentPoolStoreCursor, AgentError> {
1998 let stored = AgentPoolStoredWake {
1999 condition,
2000 compiled_filter,
2001 registration,
2002 };
2003 self.with_pool_state(pool_id, |state| {
2004 state.wake_dedupe.insert(
2005 stored.condition.idempotency_key.clone(),
2006 stored.registration.clone(),
2007 );
2008 state
2009 .wakes
2010 .insert(stored.condition.condition_id.clone(), stored.clone());
2011 Ok(())
2012 })?;
2013 self.append_record(pool_id, AgentPoolStoreRecordPayload::Wake { stored })
2014 }
2015
2016 fn watch(
2017 &self,
2018 pool_id: &AgentPoolId,
2019 cursor: Option<AgentPoolStoreCursor>,
2020 ) -> Result<AgentPoolStoreStream, AgentError> {
2021 let start_after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
2022 let records = self
2023 .records
2024 .lock()
2025 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
2026 Ok(AgentPoolStoreStream::new(
2027 records
2028 .get(pool_id)
2029 .cloned()
2030 .unwrap_or_default()
2031 .into_iter()
2032 .filter(|record| record.cursor.sequence > start_after),
2033 ))
2034 }
2035
2036 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
2037 self.with_pool_state(pool_id, |state| {
2038 state.next_event_counter += 1;
2039 Ok(state.next_event_counter)
2040 })
2041 }
2042}
2043
2044impl From<RunAddressTarget> for RunMessageAddressTargetRecord {
2045 fn from(value: RunAddressTarget) -> Self {
2046 match value {
2047 RunAddressTarget::Run { run_id } => Self::Run { run_id },
2048 RunAddressTarget::Agent { agent_id } => Self::Agent { agent_id },
2049 RunAddressTarget::Topic { topic_id } => Self::Topic { topic_id },
2050 RunAddressTarget::Pool { pool_id } => Self::Pool { pool_id },
2051 }
2052 }
2053}
2054
2055impl From<MessageStatus> for RunMessageDeliveryStatus {
2056 fn from(value: MessageStatus) -> Self {
2057 match value {
2058 MessageStatus::Accepted => Self::Accepted,
2059 MessageStatus::Delivered => Self::Delivered,
2060 MessageStatus::Responded => Self::Responded,
2061 MessageStatus::Failed => Self::Failed,
2062 MessageStatus::TimedOut => Self::TimedOut,
2063 MessageStatus::Expired => Self::Expired,
2064 MessageStatus::Cancelled => Self::Cancelled,
2065 }
2066 }
2067}
2068
2069impl From<ResumeInputPolicy> for WakeResumeInputPolicyRecord {
2070 fn from(value: ResumeInputPolicy) -> Self {
2071 match value {
2072 ResumeInputPolicy::MatchingEventRefs => Self::MatchingEventRefs,
2073 ResumeInputPolicy::RedactedSummary => Self::RedactedSummary,
2074 ResumeInputPolicy::None => Self::None,
2075 }
2076 }
2077}
2078
2079impl From<WakeRegistrationStatus> for WakeTriggerStatus {
2080 fn from(value: WakeRegistrationStatus) -> Self {
2081 match value {
2082 WakeRegistrationStatus::Registered => Self::Registered,
2083 WakeRegistrationStatus::Triggered => Self::Triggered,
2084 WakeRegistrationStatus::TimedOut => Self::TimedOut,
2085 WakeRegistrationStatus::Cancelled => Self::Cancelled,
2086 WakeRegistrationStatus::Failed => Self::Failed,
2087 }
2088 }
2089}
2090
2091trait AgentPoolEventKindName {
2092 fn wire_name(&self) -> &'static str;
2093}
2094
2095impl AgentPoolEventKindName for EventKind {
2096 fn wire_name(&self) -> &'static str {
2097 match self {
2098 EventKind::AgentPoolCreated => "agent_pool_created",
2099 EventKind::AgentPoolRunJoined => "agent_pool_run_joined",
2100 EventKind::AgentPoolRunLeft => "agent_pool_run_left",
2101 EventKind::RunMessageAccepted => "run_message_accepted",
2102 EventKind::RunMessageDelivered => "run_message_delivered",
2103 EventKind::RunMessageResponded => "run_message_responded",
2104 EventKind::RunMessageFailed => "run_message_failed",
2105 EventKind::RunMessageTimedOut => "run_message_timed_out",
2106 EventKind::RunMessageExpired => "run_message_expired",
2107 EventKind::RunMessageCancelled => "run_message_cancelled",
2108 EventKind::WakeConditionRegistered => "wake_condition_registered",
2109 EventKind::WakeConditionTriggered => "wake_condition_triggered",
2110 EventKind::WakeConditionTimedOut => "wake_condition_timed_out",
2111 EventKind::WakeConditionCancelled => "wake_condition_cancelled",
2112 EventKind::WakeConditionFailed => "wake_condition_failed",
2113 _ => "agent_pool_event",
2114 }
2115 }
2116}
2117
2118fn intersect_run_ids(filter: &EventFilterSet<RunId>, allowed: &[RunId]) -> EventFilterSet<RunId> {
2119 match filter {
2120 EventFilterSet::Any => EventFilterSet::Include(allowed.to_vec()),
2121 EventFilterSet::Include(requested) => EventFilterSet::Include(
2122 requested
2123 .iter()
2124 .filter(|run_id| allowed.contains(run_id))
2125 .cloned()
2126 .collect(),
2127 ),
2128 }
2129}
2130
2131fn topics_from_members(members: &[AgentPoolMember]) -> BTreeMap<TopicId, BTreeSet<RunId>> {
2132 let mut topics = BTreeMap::new();
2133 for member in members {
2134 for topic in &member.topics {
2135 topics
2136 .entry(topic.clone())
2137 .or_insert_with(BTreeSet::new)
2138 .insert(member.run_id.clone());
2139 }
2140 }
2141 topics
2142}