1use std::{
8 collections::{BTreeMap, BTreeSet, VecDeque},
9 sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 domain::{
16 AgentError, AgentErrorKind, AgentId, AgentPoolId, ContentRef, DestinationKind,
17 DestinationRef, EffectId, EntityRef, EventId, IdempotencyKey, MessageId, PolicyRef,
18 PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef, SpanId, TopicId, TraceId,
19 WakeConditionId,
20 },
21 effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
22 event::{
23 AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
24 EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
25 EventFilterSet, EventFrame, EventKind, EventStreamScope, PayloadAccessMode,
26 },
27 event_bus::AgentEventStream,
28 journal::{
29 AgentPoolLifecycleStatus, AgentPoolRecord, EventIndexProjection, JOURNAL_SCHEMA_VERSION,
30 JournalCursor, JournalRecord, JournalRecordKind, JournalRecordPayload,
31 RunMessageAddressTargetRecord, RunMessageDeliveryStatus, RunMessageRecord, WakeRecord,
32 WakeResumeInputPolicyRecord, WakeTriggerStatus,
33 },
34 run::RunRequest,
35 run_handle::RunHandle,
36 runtime::AgentRuntime,
37};
38
39#[derive(Clone)]
40pub struct AgentPool {
43 pool_id: AgentPoolId,
44 runtime: AgentRuntime,
45 store: Arc<dyn AgentPoolStore>,
46}
47
48impl AgentPool {
49 pub fn builder(pool_id: AgentPoolId) -> AgentPoolBuilder {
53 AgentPoolBuilder {
54 pool_id,
55 runtime: None,
56 message_policy: AgentPoolMessagePolicy::bounded_defaults(),
57 wake_policy: AgentPoolWakePolicy::safe_defaults(),
58 policy_refs: Vec::new(),
59 store: None,
60 }
61 }
62
63 pub fn pool_id(&self) -> &AgentPoolId {
66 &self.pool_id
67 }
68
69 pub fn start_run(&self, request: RunRequest) -> Result<RunHandle, AgentError> {
73 let handle = self.runtime.start_run(request.clone())?;
74 self.join_run(AgentPoolMember::new(request.run_id, request.agent_id))?;
75 Ok(handle)
76 }
77
78 pub fn join_run(&self, member: AgentPoolMember) -> Result<(), AgentError> {
82 let should_create = {
83 let snapshot = self.snapshot()?;
84 !snapshot.created
85 };
86
87 if should_create {
88 self.append_pool_record(
89 &member.run_id,
90 &member.agent_id,
91 AgentPoolLifecycleStatus::Created,
92 EventKind::AgentPoolCreated,
93 )?;
94 self.store.record_pool_created(&self.pool_id)?;
95 }
96
97 self.store.join_member(&self.pool_id, member.clone())?;
98
99 self.append_pool_record(
100 &member.run_id,
101 &member.agent_id,
102 AgentPoolLifecycleStatus::RunJoined,
103 EventKind::AgentPoolRunJoined,
104 )?;
105 Ok(())
106 }
107
108 pub fn members(&self) -> Result<Vec<AgentPoolMember>, AgentError> {
111 Ok(self.snapshot()?.members)
112 }
113
114 pub fn leave_run(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
118 let (member, _) = self.store.leave_member(&self.pool_id, run_id)?;
119 self.append_pool_record(
120 &member.run_id,
121 &member.agent_id,
122 AgentPoolLifecycleStatus::RunLeft,
123 EventKind::AgentPoolRunLeft,
124 )?;
125 Ok(member)
126 }
127
128 pub fn send(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
133 if let Some(receipt) = self
134 .store
135 .message_receipt(&self.pool_id, &message.idempotency_key)?
136 {
137 return Ok(receipt);
138 }
139
140 let delivered_to = self.resolve_address(&message);
141 let terminal_status = if message.expires_at_millis == Some(0) {
142 MessageStatus::Expired
143 } else if delivered_to.is_empty() {
144 MessageStatus::Failed
145 } else {
146 MessageStatus::Delivered
147 };
148
149 if terminal_status == MessageStatus::Expired {
150 let receipt =
151 self.record_message_status(&message, MessageStatus::Expired, Vec::new())?;
152 return Ok(receipt);
153 }
154
155 if terminal_status == MessageStatus::Failed {
156 let receipt =
157 self.record_message_status(&message, MessageStatus::Failed, Vec::new())?;
158 return Ok(receipt);
159 }
160
161 self.record_message_status(&message, MessageStatus::Accepted, delivered_to.clone())?;
162 let receipt =
163 self.record_message_status(&message, MessageStatus::Delivered, delivered_to)?;
164 Ok(receipt)
165 }
166
167 pub fn record_message_status(
172 &self,
173 message: &RunMessage,
174 status: MessageStatus,
175 delivered_to: Vec<RunId>,
176 ) -> Result<MessageReceipt, AgentError> {
177 let source_member = self.member(&message.from)?;
178 let journal = self.runtime.journal_port(&message.from)?;
179 let record = self.run_message_record(message, status.clone(), delivered_to.clone())?;
180 let cursor = journal.append(record)?;
181 let frame = self.publish_agent_pool_event(
182 message.from.clone(),
183 source_member.agent_id,
184 status.event_kind(),
185 Some(message.message_id.clone()),
186 None,
187 EntityRef::message(message.message_id.clone()),
188 message.target_related_refs(&delivered_to),
189 Some(message.to.destination_ref.clone()),
190 message.policy_refs.clone(),
191 Some(cursor.clone()),
192 status.redacted_summary(),
193 )?;
194
195 let receipt = MessageReceipt {
196 message_id: message.message_id.clone(),
197 status,
198 delivered_to,
199 journal_cursor: Some(cursor),
200 };
201 self.store
202 .record_message(&self.pool_id, message.clone(), receipt.clone())?;
203 self.trigger_matching_wakes(&frame)?;
204 Ok(receipt)
205 }
206
207 pub fn subscribe(
210 &self,
211 filter: EventFilter,
212 cursor: Option<crate::event::EventCursor>,
213 ) -> Result<AgentEventStream, AgentError> {
214 let compiled = self.compile_scoped_filter(filter)?;
215 self.runtime.subscribe_events(compiled, cursor)
216 }
217
218 pub fn compile_scoped_filter(
221 &self,
222 filter: EventFilter,
223 ) -> Result<CompiledEventFilter, AgentError> {
224 self.scope_filter(filter).compile()
225 }
226
227 pub fn scope_filter(&self, mut filter: EventFilter) -> EventFilter {
231 let allowed_runs = self.observable_member_runs();
232 filter.run_ids = intersect_run_ids(&filter.run_ids, &allowed_runs);
233 let envelope_only = self
234 .snapshot()
235 .map(|snapshot| snapshot.wake_policy.envelope_only)
236 .unwrap_or(true);
237 if envelope_only {
238 filter.payload_access = PayloadAccessMode::EnvelopeOnly;
239 }
240 filter
241 }
242
243 pub fn suspend_until(
247 &self,
248 run_id: RunId,
249 condition: WakeCondition,
250 ) -> Result<WakeRegistration, AgentError> {
251 if run_id != condition.run_id {
252 return Err(AgentError::new(
253 AgentErrorKind::InvalidStateTransition,
254 RetryClassification::NotRetryable,
255 "wake registration run_id must match condition run_id",
256 ));
257 }
258
259 if let Some(registration) = self
260 .store
261 .wake_registration(&self.pool_id, &condition.idempotency_key)?
262 {
263 return Ok(registration);
264 }
265
266 self.member(&condition.run_id)?;
267 let compiled = self.compile_scoped_filter(condition.filter.clone())?;
268 let mut registration = self.record_wake_status(
269 &condition,
270 compiled.clone(),
271 WakeRegistrationStatus::Registered,
272 None,
273 )?;
274
275 if condition.timeout_millis == Some(0) {
276 registration = self.record_wake_status(
277 &condition,
278 compiled,
279 WakeRegistrationStatus::TimedOut,
280 None,
281 )?;
282 } else if let Some(frame) = self
283 .runtime
284 .subscribe_events(compiled.clone(), None)?
285 .next()
286 {
287 registration = self.record_wake_status(
288 &condition,
289 compiled,
290 WakeRegistrationStatus::Triggered,
291 Some(frame.event.envelope.event_id),
292 )?;
293 }
294
295 Ok(registration)
296 }
297
298 pub fn poll_wake(
302 &self,
303 condition_id: &WakeConditionId,
304 ) -> Result<WakeRegistration, AgentError> {
305 let stored = self
306 .store
307 .wake(&self.pool_id, condition_id)?
308 .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
309
310 if stored.registration.status != WakeRegistrationStatus::Registered {
311 return Ok(stored.registration);
312 }
313
314 let Some(frame) = self
315 .runtime
316 .subscribe_events(stored.compiled_filter.clone(), None)?
317 .next()
318 else {
319 return Ok(stored.registration);
320 };
321
322 self.record_wake_status(
323 &stored.condition,
324 stored.compiled_filter,
325 WakeRegistrationStatus::Triggered,
326 Some(frame.event.envelope.event_id),
327 )
328 }
329
330 pub fn cancel_wake(
334 &self,
335 condition_id: &WakeConditionId,
336 ) -> Result<WakeRegistration, AgentError> {
337 let stored = self
338 .store
339 .wake(&self.pool_id, condition_id)?
340 .ok_or_else(|| AgentError::contract_violation("wake condition is not registered"))?;
341 self.record_wake_status(
342 &stored.condition,
343 stored.compiled_filter,
344 WakeRegistrationStatus::Cancelled,
345 None,
346 )
347 }
348
349 fn record_wake_status(
350 &self,
351 condition: &WakeCondition,
352 compiled_filter: CompiledEventFilter,
353 status: WakeRegistrationStatus,
354 matched_event_id: Option<EventId>,
355 ) -> Result<WakeRegistration, AgentError> {
356 let member = self.member(&condition.run_id)?;
357 let journal = self.runtime.journal_port(&condition.run_id)?;
358 let wake_record = WakeRecord {
359 condition_id: condition.condition_id.clone(),
360 run_id: condition.run_id.clone(),
361 event_filter_fingerprint: compiled_filter.filter_fingerprint.clone(),
362 timeout_millis: condition.timeout_millis,
363 resume_policy: condition.resume_with.clone().into(),
364 trigger_status: status.clone().into(),
365 policy_refs: condition.policy_refs.clone(),
366 idempotency_key: condition.idempotency_key.clone(),
367 matched_event_id,
368 };
369 let record = self.journal_record(
370 condition.run_id.clone(),
371 member.agent_id.clone(),
372 JournalRecordKind::Wake,
373 "agent_pool",
374 status.event_kind().wire_name(),
375 EntityRef::wake_condition(condition.condition_id.clone()),
376 vec![EntityRef::run(condition.run_id.clone())],
377 condition.policy_refs.clone(),
378 Vec::new(),
379 Some(condition.idempotency_key.clone()),
380 JournalRecordPayload::Wake(wake_record),
381 )?;
382 let cursor = journal.append(record)?;
383 self.publish_agent_pool_event(
384 condition.run_id.clone(),
385 member.agent_id,
386 status.event_kind(),
387 None,
388 Some(condition.condition_id.clone()),
389 EntityRef::wake_condition(condition.condition_id.clone()),
390 vec![EntityRef::run(condition.run_id.clone())],
391 Some(DestinationRef::with_kind(
392 DestinationKind::Agent,
393 condition.run_id.as_str(),
394 )),
395 condition.policy_refs.clone(),
396 Some(cursor.clone()),
397 status.redacted_summary(),
398 )?;
399
400 let registration = WakeRegistration {
401 condition_id: condition.condition_id.clone(),
402 run_id: condition.run_id.clone(),
403 status,
404 journal_cursor: Some(cursor),
405 };
406
407 self.store.record_wake(
408 &self.pool_id,
409 condition.clone(),
410 compiled_filter,
411 registration.clone(),
412 )?;
413
414 Ok(registration)
415 }
416
417 fn append_pool_record(
418 &self,
419 run_id: &RunId,
420 agent_id: &AgentId,
421 status: AgentPoolLifecycleStatus,
422 event_kind: EventKind,
423 ) -> Result<(), AgentError> {
424 let journal = self.runtime.journal_port(run_id)?;
425 let snapshot = self.snapshot()?;
426 let member_run_ids = snapshot
427 .members
428 .iter()
429 .map(|member| member.run_id.clone())
430 .collect::<Vec<_>>();
431 let topics = snapshot.topics;
432 let policy_refs = snapshot.policy_refs;
433
434 let record = AgentPoolRecord {
435 pool_id: self.pool_id.clone(),
436 member_run_ids,
437 topics,
438 policy_refs: policy_refs.clone(),
439 lifecycle_status: status,
440 };
441 let journal_record = self.journal_record(
442 run_id.clone(),
443 agent_id.clone(),
444 JournalRecordKind::AgentPool,
445 "agent_pool",
446 event_kind.wire_name(),
447 EntityRef::run(run_id.clone()),
448 Vec::new(),
449 policy_refs.clone(),
450 Vec::new(),
451 None,
452 JournalRecordPayload::AgentPool(record),
453 )?;
454 let cursor = journal.append(journal_record)?;
455 self.publish_agent_pool_event(
456 run_id.clone(),
457 agent_id.clone(),
458 event_kind,
459 None,
460 None,
461 EntityRef::run(run_id.clone()),
462 Vec::new(),
463 Some(DestinationRef::with_kind(
464 DestinationKind::Agent,
465 run_id.as_str(),
466 )),
467 policy_refs,
468 Some(cursor),
469 "agent pool membership updated",
470 )?;
471 Ok(())
472 }
473
474 fn run_message_record(
475 &self,
476 message: &RunMessage,
477 status: MessageStatus,
478 delivered_to: Vec<RunId>,
479 ) -> Result<JournalRecord, AgentError> {
480 let member = self.member(&message.from)?;
481 let mut effect_intent = None;
482 let mut effect_result = None;
483 let effect_id = EffectId::new(format!(
484 "effect.run_message.{}",
485 message.message_id.as_str()
486 ));
487
488 if status == MessageStatus::Accepted {
489 let mut intent = EffectIntent::new(
490 effect_id.clone(),
491 EffectKind::RunMessageDelivery,
492 EntityRef::message(message.message_id.clone()),
493 SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
494 "run message delivery intent",
495 );
496 intent.destination = Some(message.to.destination_ref.clone());
497 intent.policy_refs = message.policy_refs.clone();
498 intent.idempotency_key = Some(message.idempotency_key.clone());
499 intent.content_refs = vec![message.content_ref.clone()];
500 effect_intent = Some(intent);
501 }
502
503 if status.is_terminal_delivery() {
504 effect_result = Some(EffectResult {
505 effect_id,
506 terminal_status: status.effect_terminal_status(),
507 external_operation_id: None,
508 reconciliation_ref: None,
509 error_ref: None,
510 content_refs: vec![message.content_ref.clone()],
511 redacted_summary: status.redacted_summary().to_string(),
512 });
513 }
514
515 let record = RunMessageRecord {
516 message_id: message.message_id.clone(),
517 source_run_id: message.from.clone(),
518 address_target: message.to.target.clone().into(),
519 content_ref: message.content_ref.clone(),
520 correlation: message.correlation.clone(),
521 reply_to: message.reply_to.clone(),
522 delivery_status: status.clone().into(),
523 delivered_to: delivered_to.clone(),
524 policy_refs: message.policy_refs.clone(),
525 idempotency_key: message.idempotency_key.clone(),
526 effect_intent,
527 effect_result,
528 };
529
530 self.journal_record(
531 message.from.clone(),
532 member.agent_id,
533 JournalRecordKind::RunMessage,
534 "agent_pool",
535 status.event_kind().wire_name(),
536 EntityRef::message(message.message_id.clone()),
537 message.target_related_refs(&delivered_to),
538 message.policy_refs.clone(),
539 vec![message.content_ref.clone()],
540 Some(message.idempotency_key.clone()),
541 JournalRecordPayload::RunMessage(record),
542 )
543 }
544
545 #[expect(
546 clippy::too_many_arguments,
547 reason = "journal-backed pool records intentionally spell out lineage, refs, and payload until a dedicated record-builder API replaces this private helper"
548 )]
549 fn journal_record(
550 &self,
551 run_id: RunId,
552 agent_id: AgentId,
553 record_kind: JournalRecordKind,
554 event_family: impl Into<String>,
555 event_kind: impl Into<String>,
556 subject_ref: EntityRef,
557 related_refs: Vec<EntityRef>,
558 _policy_refs: Vec<PolicyRef>,
559 content_refs: Vec<ContentRef>,
560 idempotency_key: Option<IdempotencyKey>,
561 payload: JournalRecordPayload,
562 ) -> Result<JournalRecord, AgentError> {
563 let journal_seq = self.runtime.next_journal_seq();
564 let source = SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool");
565 let fingerprint = self
566 .runtime
567 .run_snapshot(&run_id)
568 .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
569 .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
570 let session_id = self
571 .runtime
572 .run_snapshot(&run_id)
573 .ok()
574 .and_then(|snapshot| snapshot.session_id);
575 let event_family = event_family.into();
576 let event_kind = event_kind.into();
577
578 Ok(JournalRecord {
579 journal_schema_version: JOURNAL_SCHEMA_VERSION,
580 journal_seq,
581 record_id: format!("journal.record.agent_pool.{journal_seq}"),
582 record_kind,
583 run_id: run_id.clone(),
584 session_id: session_id.clone(),
585 agent_id: agent_id.clone(),
586 turn_id: None,
587 attempt_id: None,
588 subject_ref: subject_ref.clone(),
589 related_refs: related_refs.clone(),
590 causal_refs: Vec::new(),
591 source: source.clone(),
592 destination: Some(DestinationRef::with_kind(
593 DestinationKind::Journal,
594 "destination.journal.agent_pool",
595 )),
596 correlation_keys: Vec::new(),
597 tags: vec!["feature:agent_pool".to_string()],
598 delivery_semantics: "journal_backed".to_string(),
599 event_index: EventIndexProjection {
600 run_id,
601 session_id,
602 agent_id,
603 turn_id: None,
604 event_family,
605 event_kind,
606 source,
607 destination: Some(DestinationRef::with_kind(
608 DestinationKind::EventStream,
609 "destination.event_stream.agent_pool",
610 )),
611 subject_ref,
612 related_refs,
613 correlation_keys: Vec::new(),
614 tags: vec!["feature:agent_pool".to_string()],
615 privacy_class: PrivacyClass::ContentRefsOnly,
616 delivery_semantics: "journal_backed".to_string(),
617 },
618 timestamp_millis: journal_seq,
619 runtime_package_fingerprint: fingerprint,
620 privacy: PrivacyClass::ContentRefsOnly,
621 content_refs,
622 redaction_policy_id: "redaction.agent_pool.default".to_string(),
623 idempotency_key,
624 dedupe_key: None,
625 checkpoint_ref: None,
626 payload,
627 })
628 }
629
630 #[expect(
631 clippy::too_many_arguments,
632 reason = "event publication mirrors the durable event envelope fields so lineage stays explicit at the call site"
633 )]
634 fn publish_agent_pool_event(
635 &self,
636 run_id: RunId,
637 agent_id: AgentId,
638 event_kind: EventKind,
639 message_id: Option<MessageId>,
640 wake_condition_id: Option<WakeConditionId>,
641 subject_ref: EntityRef,
642 mut related_refs: Vec<EntityRef>,
643 destination: Option<DestinationRef>,
644 policy_refs: Vec<PolicyRef>,
645 journal_cursor: Option<JournalCursor>,
646 summary: impl Into<String>,
647 ) -> Result<EventFrame, AgentError> {
648 if let Some(condition_id) = wake_condition_id {
649 related_refs.push(EntityRef::wake_condition(condition_id));
650 }
651 let event_counter = self.store.next_event_sequence(&self.pool_id)?;
652 let fingerprint = self
653 .runtime
654 .run_snapshot(&run_id)
655 .map(|snapshot| snapshot.runtime_package_fingerprint.as_str().to_string())
656 .unwrap_or_else(|_| "runtime.package.fingerprint.agent_pool".to_string());
657 let session_id = self
658 .runtime
659 .run_snapshot(&run_id)
660 .ok()
661 .and_then(|snapshot| snapshot.session_id);
662 let event = AgentEvent::with_redacted_summary(
663 EventEnvelope {
664 schema_version: EVENT_SCHEMA_VERSION,
665 event_id: EventId::new(format!(
666 "event.agent_pool.{}.{}",
667 self.pool_id.as_str(),
668 event_counter
669 )),
670 event_seq: 0,
671 event_family: EventFamily::AgentPool,
672 event_kind,
673 payload_schema_version: 1,
674 timestamp: format!("1970-01-01T00:00:{event_counter:02}Z"),
675 recorded_at: format!("1970-01-01T00:00:{event_counter:02}Z"),
676 run_id,
677 session_id,
678 agent_id,
679 turn_id: None,
680 attempt_id: None,
681 message_id,
682 context_item_id: None,
683 trace_id: TraceId::new(format!("trace.agent_pool.{}", self.pool_id.as_str())),
684 span_id: SpanId::new(format!(
685 "span.agent_pool.{}.{}",
686 self.pool_id.as_str(),
687 event_counter
688 )),
689 parent_event_id: None,
690 caused_by: None,
691 subject_ref,
692 related_refs,
693 causal_refs: Vec::new(),
694 correlation: EventCorrelation::default(),
695 tags: vec![crate::event::EventTag::new("feature:agent_pool")],
696 source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.agent_pool"),
697 destination,
698 policy_refs,
699 journal_cursor,
700 state_before: None,
701 state_after: None,
702 delivery_semantics: EventDeliverySemantics::JournalBacked,
703 privacy: PrivacyClass::ContentRefsOnly,
704 content_capture: ContentCaptureMode::Off,
705 redaction_policy_id: "redaction.agent_pool.default".to_string(),
706 runtime_package_fingerprint: fingerprint,
707 },
708 summary,
709 );
710 let frame = EventFrame {
711 cursor: event.envelope.cursor(EventStreamScope::All),
712 event,
713 archive_cursor: None,
714 overflow: None,
715 };
716 self.runtime
717 .event_bus_port(&frame.event.envelope.run_id)?
718 .publish(frame.clone())?;
719 Ok(frame)
720 }
721
722 fn resolve_address(&self, message: &RunMessage) -> Vec<RunId> {
723 let Ok(snapshot) = self.snapshot() else {
724 return Vec::new();
725 };
726 let members = snapshot
727 .members
728 .iter()
729 .cloned()
730 .map(|member| (member.run_id.clone(), member))
731 .collect::<BTreeMap<_, _>>();
732 let topics = topics_from_members(&snapshot.members);
733
734 if !members.contains_key(&message.from) || !snapshot.message_policy.allows(message) {
735 return Vec::new();
736 }
737
738 let mut candidates = match &message.to.target {
739 RunAddressTarget::Run { run_id } => vec![run_id.clone()],
740 RunAddressTarget::Agent { agent_id } => members
741 .values()
742 .filter(|member| &member.agent_id == agent_id)
743 .map(|member| member.run_id.clone())
744 .collect::<Vec<_>>(),
745 RunAddressTarget::Topic { topic_id } => topics
746 .get(topic_id)
747 .map(|runs| runs.iter().cloned().collect::<Vec<_>>())
748 .unwrap_or_default(),
749 RunAddressTarget::Pool { pool_id } if pool_id == &self.pool_id => {
750 members.keys().cloned().collect::<Vec<_>>()
751 }
752 RunAddressTarget::Pool { .. } => Vec::new(),
753 };
754
755 candidates.retain(|run_id| {
756 members
757 .get(run_id)
758 .is_some_and(|member| member.allows_message_policies(&message.policy_refs))
759 });
760
761 if matches!(message.to.target, RunAddressTarget::Pool { .. })
762 && !snapshot.message_policy.include_sender_in_pool_broadcast
763 {
764 candidates.retain(|run_id| run_id != &message.from);
765 }
766
767 candidates.sort();
768 candidates.dedup();
769 candidates
770 }
771
772 fn observable_member_runs(&self) -> Vec<RunId> {
773 self.snapshot()
774 .map(|snapshot| {
775 snapshot
776 .members
777 .iter()
778 .filter(|member| member.allows_message_policies(&snapshot.policy_refs))
779 .map(|member| member.run_id.clone())
780 .collect()
781 })
782 .unwrap_or_default()
783 }
784
785 fn member(&self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
786 self.snapshot()?
787 .members
788 .into_iter()
789 .find(|member| &member.run_id == run_id)
790 .ok_or_else(|| {
791 AgentError::new(
792 AgentErrorKind::InvalidStateTransition,
793 RetryClassification::NotRetryable,
794 "run is not a member of this agent pool",
795 )
796 })
797 }
798
799 pub fn snapshot(&self) -> Result<AgentPoolSnapshot, AgentError> {
803 self.store.snapshot(&self.pool_id)
804 }
805
806 fn trigger_matching_wakes(&self, frame: &EventFrame) -> Result<(), AgentError> {
807 if matches!(
808 frame.event.envelope.event_kind,
809 EventKind::WakeConditionRegistered
810 | EventKind::WakeConditionTriggered
811 | EventKind::WakeConditionTimedOut
812 | EventKind::WakeConditionCancelled
813 | EventKind::WakeConditionFailed
814 ) {
815 return Ok(());
816 }
817
818 let wakes = self.snapshot()?.wakes;
819 for wake in wakes
820 .into_iter()
821 .filter(|wake| wake.registration.status == WakeRegistrationStatus::Registered)
822 {
823 if wake.compiled_filter.matches_envelope(&frame.event.envelope) {
824 self.record_wake_status(
825 &wake.condition,
826 wake.compiled_filter,
827 WakeRegistrationStatus::Triggered,
828 Some(frame.event.envelope.event_id.clone()),
829 )?;
830 }
831 }
832 Ok(())
833 }
834
835 pub fn watch_pool(
838 &self,
839 cursor: Option<AgentPoolStoreCursor>,
840 ) -> Result<AgentPoolStoreStream, AgentError> {
841 self.store.watch(&self.pool_id, cursor)
842 }
843}
844
845#[derive(Clone)]
846pub struct AgentPoolBuilder {
849 pool_id: AgentPoolId,
850 runtime: Option<AgentRuntime>,
851 message_policy: AgentPoolMessagePolicy,
852 wake_policy: AgentPoolWakePolicy,
853 policy_refs: Vec<PolicyRef>,
854 store: Option<Arc<dyn AgentPoolStore>>,
855}
856
857impl AgentPoolBuilder {
858 pub fn runtime(mut self, runtime: AgentRuntime) -> Self {
862 self.runtime = Some(runtime);
863 self
864 }
865
866 pub fn message_policy(mut self, policy: AgentPoolMessagePolicy) -> Self {
869 self.message_policy = policy;
870 self
871 }
872
873 pub fn wake_policy(mut self, policy: AgentPoolWakePolicy) -> Self {
876 self.wake_policy = policy;
877 self
878 }
879
880 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
883 self.policy_refs.push(policy_ref);
884 self
885 }
886
887 pub fn store<S>(mut self, store: S) -> Self
891 where
892 S: AgentPoolStore + 'static,
893 {
894 self.store = Some(Arc::new(store));
895 self
896 }
897
898 pub fn shared_store(mut self, store: Arc<dyn AgentPoolStore>) -> Self {
902 self.store = Some(store);
903 self
904 }
905
906 pub fn build(self) -> Result<AgentPool, AgentError> {
910 let runtime = self
911 .runtime
912 .ok_or_else(|| AgentError::host_configuration_needed("agent pool requires runtime"))?;
913 let store = self
914 .store
915 .unwrap_or_else(|| Arc::new(InMemoryAgentPoolStore::default()));
916 store.open_pool(
917 self.pool_id.clone(),
918 AgentPoolStoreConfig {
919 message_policy: self.message_policy,
920 wake_policy: self.wake_policy,
921 policy_refs: self.policy_refs,
922 },
923 )?;
924 Ok(AgentPool {
925 pool_id: self.pool_id,
926 runtime,
927 store,
928 })
929 }
930}
931
932#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
933pub struct AgentPoolStoreConfig {
937 pub message_policy: AgentPoolMessagePolicy,
939 pub wake_policy: AgentPoolWakePolicy,
941 #[serde(default, skip_serializing_if = "Vec::is_empty")]
942 pub policy_refs: Vec<PolicyRef>,
944}
945
946#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
947pub struct AgentPoolStoreCursor {
951 pub sequence: u64,
953}
954
955impl AgentPoolStoreCursor {
956 pub fn start() -> Self {
958 Self { sequence: 0 }
959 }
960
961 pub fn new(sequence: u64) -> Self {
963 Self { sequence }
964 }
965}
966
967#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
968pub struct AgentPoolSnapshot {
972 pub pool_id: AgentPoolId,
974 pub created: bool,
976 pub members: Vec<AgentPoolMember>,
978 pub topics: Vec<TopicId>,
980 pub message_policy: AgentPoolMessagePolicy,
982 pub wake_policy: AgentPoolWakePolicy,
984 #[serde(default, skip_serializing_if = "Vec::is_empty")]
985 pub policy_refs: Vec<PolicyRef>,
987 #[serde(default, skip_serializing_if = "Vec::is_empty")]
988 pub messages: Vec<AgentPoolStoredMessage>,
990 #[serde(default, skip_serializing_if = "Vec::is_empty")]
991 pub wakes: Vec<AgentPoolStoredWake>,
993 #[serde(skip_serializing_if = "Option::is_none")]
994 pub cursor: Option<AgentPoolStoreCursor>,
996}
997
998#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
999pub struct AgentPoolStoredMessage {
1001 pub message: RunMessage,
1003 pub receipt: MessageReceipt,
1005}
1006
1007#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1008pub struct AgentPoolStoredWake {
1011 pub condition: WakeCondition,
1013 pub compiled_filter: CompiledEventFilter,
1015 pub registration: WakeRegistration,
1017}
1018
1019#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1020pub struct AgentPoolStoreRecord {
1024 pub pool_id: AgentPoolId,
1026 pub cursor: AgentPoolStoreCursor,
1028 pub payload: AgentPoolStoreRecordPayload,
1030}
1031
1032#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1033#[serde(tag = "type", rename_all = "snake_case")]
1034#[expect(
1036 clippy::large_enum_variant,
1037 reason = "pool store payloads are durable serde records; preserve direct variant ergonomics until a separate storage-envelope redesign"
1038)]
1039pub enum AgentPoolStoreRecordPayload {
1040 PoolOpened {
1042 config: AgentPoolStoreConfig,
1044 },
1045 PoolCreated,
1047 MemberJoined {
1049 member: AgentPoolMember,
1051 },
1052 MemberLeft {
1054 member: AgentPoolMember,
1056 },
1057 RunMessage {
1059 stored: AgentPoolStoredMessage,
1061 },
1062 Wake {
1064 stored: AgentPoolStoredWake,
1066 },
1067}
1068
1069#[derive(Clone, Debug)]
1070pub struct AgentPoolStoreStream {
1072 records: VecDeque<AgentPoolStoreRecord>,
1073}
1074
1075impl AgentPoolStoreStream {
1076 pub fn new(records: impl IntoIterator<Item = AgentPoolStoreRecord>) -> Self {
1078 Self {
1079 records: records.into_iter().collect(),
1080 }
1081 }
1082}
1083
1084impl Iterator for AgentPoolStoreStream {
1085 type Item = AgentPoolStoreRecord;
1086
1087 fn next(&mut self) -> Option<Self::Item> {
1088 self.records.pop_front()
1089 }
1090}
1091
1092pub trait AgentPoolStore: Send + Sync {
1097 fn open_pool(
1099 &self,
1100 pool_id: AgentPoolId,
1101 config: AgentPoolStoreConfig,
1102 ) -> Result<AgentPoolSnapshot, AgentError>;
1103
1104 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError>;
1106
1107 fn record_pool_created(
1109 &self,
1110 pool_id: &AgentPoolId,
1111 ) -> Result<AgentPoolStoreCursor, AgentError>;
1112
1113 fn join_member(
1115 &self,
1116 pool_id: &AgentPoolId,
1117 member: AgentPoolMember,
1118 ) -> Result<AgentPoolStoreCursor, AgentError>;
1119
1120 fn leave_member(
1122 &self,
1123 pool_id: &AgentPoolId,
1124 run_id: &RunId,
1125 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError>;
1126
1127 fn message_receipt(
1129 &self,
1130 pool_id: &AgentPoolId,
1131 idempotency_key: &IdempotencyKey,
1132 ) -> Result<Option<MessageReceipt>, AgentError>;
1133
1134 fn record_message(
1136 &self,
1137 pool_id: &AgentPoolId,
1138 message: RunMessage,
1139 receipt: MessageReceipt,
1140 ) -> Result<AgentPoolStoreCursor, AgentError>;
1141
1142 fn wake_registration(
1144 &self,
1145 pool_id: &AgentPoolId,
1146 idempotency_key: &IdempotencyKey,
1147 ) -> Result<Option<WakeRegistration>, AgentError>;
1148
1149 fn wake(
1151 &self,
1152 pool_id: &AgentPoolId,
1153 condition_id: &WakeConditionId,
1154 ) -> Result<Option<AgentPoolStoredWake>, AgentError>;
1155
1156 fn record_wake(
1158 &self,
1159 pool_id: &AgentPoolId,
1160 condition: WakeCondition,
1161 compiled_filter: CompiledEventFilter,
1162 registration: WakeRegistration,
1163 ) -> Result<AgentPoolStoreCursor, AgentError>;
1164
1165 fn watch(
1167 &self,
1168 pool_id: &AgentPoolId,
1169 cursor: Option<AgentPoolStoreCursor>,
1170 ) -> Result<AgentPoolStoreStream, AgentError>;
1171
1172 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError>;
1174}
1175
1176#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1177pub struct AgentPoolMember {
1180 pub run_id: RunId,
1182 pub agent_id: AgentId,
1184 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1185 pub policy_refs: Vec<PolicyRef>,
1188 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1189 pub topics: Vec<TopicId>,
1193}
1194
1195impl AgentPoolMember {
1196 pub fn new(run_id: RunId, agent_id: AgentId) -> Self {
1200 Self {
1201 run_id,
1202 agent_id,
1203 policy_refs: Vec::new(),
1204 topics: Vec::new(),
1205 }
1206 }
1207
1208 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1211 self.policy_refs.push(policy_ref);
1212 self
1213 }
1214
1215 pub fn topic(mut self, topic_id: TopicId) -> Self {
1219 self.topics.push(topic_id);
1220 self
1221 }
1222
1223 fn allows_message_policies(&self, required: &[PolicyRef]) -> bool {
1224 required
1225 .iter()
1226 .all(|required| self.policy_refs.contains(required))
1227 }
1228}
1229
1230#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1231pub struct AgentPoolMessagePolicy {
1234 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1235 pub required_policy_refs: Vec<PolicyRef>,
1238 pub include_sender_in_pool_broadcast: bool,
1242}
1243
1244impl AgentPoolMessagePolicy {
1245 pub fn bounded_defaults() -> Self {
1248 Self {
1249 required_policy_refs: Vec::new(),
1250 include_sender_in_pool_broadcast: false,
1251 }
1252 }
1253
1254 fn allows(&self, message: &RunMessage) -> bool {
1255 self.required_policy_refs
1256 .iter()
1257 .all(|required| message.policy_refs.contains(required))
1258 }
1259}
1260
1261#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1262pub struct AgentPoolWakePolicy {
1265 pub envelope_only: bool,
1268}
1269
1270impl AgentPoolWakePolicy {
1271 pub fn safe_defaults() -> Self {
1275 Self {
1276 envelope_only: true,
1277 }
1278 }
1279}
1280
1281#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1282pub struct RunAddress {
1285 pub target: RunAddressTarget,
1287 pub destination_ref: DestinationRef,
1290 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1291 pub related_refs: Vec<EntityRef>,
1294}
1295
1296impl RunAddress {
1297 pub fn run(run_id: RunId) -> Self {
1300 Self {
1301 destination_ref: DestinationRef::with_kind(DestinationKind::Agent, run_id.as_str()),
1302 related_refs: vec![EntityRef::run(run_id.clone())],
1303 target: RunAddressTarget::Run { run_id },
1304 }
1305 }
1306
1307 pub fn agent(agent_id: AgentId) -> Self {
1311 Self {
1312 destination_ref: DestinationRef::with_kind(DestinationKind::Agent, agent_id.as_str()),
1313 related_refs: vec![EntityRef::agent(agent_id.clone())],
1314 target: RunAddressTarget::Agent { agent_id },
1315 }
1316 }
1317
1318 pub fn topic(topic_id: TopicId) -> Self {
1322 Self {
1323 destination_ref: DestinationRef::with_kind(DestinationKind::Topic, topic_id.as_str()),
1324 related_refs: vec![EntityRef::topic(topic_id.clone())],
1325 target: RunAddressTarget::Topic { topic_id },
1326 }
1327 }
1328
1329 pub fn pool(pool_id: AgentPoolId) -> Self {
1332 Self {
1333 destination_ref: DestinationRef::with_kind(
1334 DestinationKind::AgentPool,
1335 pool_id.as_str(),
1336 ),
1337 related_refs: vec![EntityRef::agent_pool(pool_id.clone())],
1338 target: RunAddressTarget::Pool { pool_id },
1339 }
1340 }
1341}
1342
1343#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1344#[serde(tag = "type", rename_all = "snake_case")]
1345pub enum RunAddressTarget {
1348 Run {
1350 run_id: RunId,
1352 },
1353 Agent {
1355 agent_id: AgentId,
1358 },
1359 Topic {
1361 topic_id: TopicId,
1363 },
1364 Pool {
1366 pool_id: AgentPoolId,
1368 },
1369}
1370
1371impl RunAddressTarget {
1372 pub fn run_id(&self) -> Option<&RunId> {
1375 match self {
1376 Self::Run { run_id } => Some(run_id),
1377 _ => None,
1378 }
1379 }
1380}
1381
1382#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1383pub struct RunMessage {
1386 pub message_id: MessageId,
1389 pub from: RunId,
1391 pub to: RunAddress,
1393 pub content_ref: ContentRef,
1396 pub correlation: EventCorrelation,
1398 #[serde(skip_serializing_if = "Option::is_none")]
1399 pub reply_to: Option<MessageId>,
1402 #[serde(skip_serializing_if = "Option::is_none")]
1403 pub response_contract: Option<MessageResponseContract>,
1406 #[serde(skip_serializing_if = "Option::is_none")]
1407 pub expires_at_millis: Option<u64>,
1410 pub idempotency_key: IdempotencyKey,
1413 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1414 pub policy_refs: Vec<PolicyRef>,
1417}
1418
1419impl RunMessage {
1420 pub fn new(
1424 message_id: MessageId,
1425 from: RunId,
1426 to: RunAddress,
1427 content_ref: ContentRef,
1428 idempotency_key: IdempotencyKey,
1429 ) -> Self {
1430 Self {
1431 message_id,
1432 from,
1433 to,
1434 content_ref,
1435 correlation: EventCorrelation::default(),
1436 reply_to: None,
1437 response_contract: None,
1438 expires_at_millis: None,
1439 idempotency_key,
1440 policy_refs: Vec::new(),
1441 }
1442 }
1443
1444 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1447 self.policy_refs.push(policy_ref);
1448 self
1449 }
1450
1451 fn target_related_refs(&self, delivered_to: &[RunId]) -> Vec<EntityRef> {
1452 let mut refs = self.to.related_refs.clone();
1453 refs.extend(delivered_to.iter().cloned().map(EntityRef::run));
1454 refs.sort_by(|left, right| {
1455 left.kind
1456 .cmp(&right.kind)
1457 .then_with(|| left.as_str().cmp(right.as_str()))
1458 });
1459 refs.dedup_by(|left, right| left.kind == right.kind && left.as_str() == right.as_str());
1460 refs
1461 }
1462}
1463
1464#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1465pub struct MessageResponseContract {
1468 pub expected_responses: u32,
1470 #[serde(skip_serializing_if = "Option::is_none")]
1471 pub timeout_millis: Option<u64>,
1474}
1475
1476impl MessageResponseContract {
1477 pub fn one_response(timeout_millis: u64) -> Self {
1480 Self {
1481 expected_responses: 1,
1482 timeout_millis: Some(timeout_millis),
1483 }
1484 }
1485}
1486
1487#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1488pub struct MessageReceipt {
1491 pub message_id: MessageId,
1494 pub status: MessageStatus,
1496 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1497 pub delivered_to: Vec<RunId>,
1501 #[serde(skip_serializing_if = "Option::is_none")]
1502 pub journal_cursor: Option<JournalCursor>,
1505}
1506
1507#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1508#[serde(rename_all = "snake_case")]
1509pub enum MessageStatus {
1512 Accepted,
1514 Delivered,
1516 Responded,
1518 Failed,
1520 TimedOut,
1522 Expired,
1524 Cancelled,
1526}
1527
1528impl MessageStatus {
1529 fn event_kind(&self) -> EventKind {
1530 match self {
1531 Self::Accepted => EventKind::RunMessageAccepted,
1532 Self::Delivered => EventKind::RunMessageDelivered,
1533 Self::Responded => EventKind::RunMessageResponded,
1534 Self::Failed => EventKind::RunMessageFailed,
1535 Self::TimedOut => EventKind::RunMessageTimedOut,
1536 Self::Expired => EventKind::RunMessageExpired,
1537 Self::Cancelled => EventKind::RunMessageCancelled,
1538 }
1539 }
1540
1541 fn redacted_summary(&self) -> &'static str {
1542 match self {
1543 Self::Accepted => "run message accepted",
1544 Self::Delivered => "run message delivered",
1545 Self::Responded => "run message responded",
1546 Self::Failed => "run message failed",
1547 Self::TimedOut => "run message timed out",
1548 Self::Expired => "run message expired",
1549 Self::Cancelled => "run message cancelled",
1550 }
1551 }
1552
1553 fn is_terminal_delivery(&self) -> bool {
1554 matches!(
1555 self,
1556 Self::Delivered
1557 | Self::Responded
1558 | Self::Failed
1559 | Self::TimedOut
1560 | Self::Expired
1561 | Self::Cancelled
1562 )
1563 }
1564
1565 fn effect_terminal_status(&self) -> EffectTerminalStatus {
1566 match self {
1567 Self::Delivered | Self::Responded => EffectTerminalStatus::Completed,
1568 Self::TimedOut => EffectTerminalStatus::TimedOut,
1569 Self::Cancelled => EffectTerminalStatus::Cancelled,
1570 Self::Accepted => EffectTerminalStatus::Unknown,
1571 Self::Failed | Self::Expired => EffectTerminalStatus::Failed,
1572 }
1573 }
1574}
1575
1576#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1577pub struct WakeCondition {
1580 pub condition_id: WakeConditionId,
1582 pub run_id: RunId,
1584 pub filter: EventFilter,
1586 #[serde(skip_serializing_if = "Option::is_none")]
1587 pub timeout_millis: Option<u64>,
1590 pub resume_with: ResumeInputPolicy,
1592 pub idempotency_key: IdempotencyKey,
1595 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1596 pub policy_refs: Vec<PolicyRef>,
1599}
1600
1601impl WakeCondition {
1602 pub fn new(
1606 condition_id: WakeConditionId,
1607 run_id: RunId,
1608 filter: EventFilter,
1609 idempotency_key: IdempotencyKey,
1610 ) -> Self {
1611 Self {
1612 condition_id,
1613 run_id,
1614 filter,
1615 timeout_millis: None,
1616 resume_with: ResumeInputPolicy::MatchingEventRefs,
1617 idempotency_key,
1618 policy_refs: Vec::new(),
1619 }
1620 }
1621
1622 pub fn timeout_millis(mut self, timeout_millis: u64) -> Self {
1626 self.timeout_millis = Some(timeout_millis);
1627 self
1628 }
1629
1630 pub fn policy_ref(mut self, policy_ref: PolicyRef) -> Self {
1633 self.policy_refs.push(policy_ref);
1634 self
1635 }
1636
1637 pub fn compile_envelope_filter(&self) -> Result<CompiledEventFilter, AgentError> {
1640 let mut filter = self.filter.clone();
1641 filter.payload_access = PayloadAccessMode::EnvelopeOnly;
1642 filter.compile()
1643 }
1644}
1645
1646#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1647#[serde(rename_all = "snake_case")]
1648pub enum ResumeInputPolicy {
1651 MatchingEventRefs,
1653 RedactedSummary,
1655 None,
1657}
1658
1659#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1660pub struct WakeRegistration {
1663 pub condition_id: WakeConditionId,
1665 pub run_id: RunId,
1667 pub status: WakeRegistrationStatus,
1669 #[serde(skip_serializing_if = "Option::is_none")]
1670 pub journal_cursor: Option<JournalCursor>,
1673}
1674
1675#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1676#[serde(rename_all = "snake_case")]
1677pub enum WakeRegistrationStatus {
1680 Registered,
1682 Triggered,
1684 TimedOut,
1686 Cancelled,
1688 Failed,
1690}
1691
1692impl WakeRegistrationStatus {
1693 fn event_kind(&self) -> EventKind {
1694 match self {
1695 Self::Registered => EventKind::WakeConditionRegistered,
1696 Self::Triggered => EventKind::WakeConditionTriggered,
1697 Self::TimedOut => EventKind::WakeConditionTimedOut,
1698 Self::Cancelled => EventKind::WakeConditionCancelled,
1699 Self::Failed => EventKind::WakeConditionFailed,
1700 }
1701 }
1702
1703 fn redacted_summary(&self) -> &'static str {
1704 match self {
1705 Self::Registered => "wake condition registered",
1706 Self::Triggered => "wake condition triggered",
1707 Self::TimedOut => "wake condition timed out",
1708 Self::Cancelled => "wake condition cancelled",
1709 Self::Failed => "wake condition failed",
1710 }
1711 }
1712}
1713
1714#[derive(Clone, Debug)]
1715struct AgentPoolState {
1716 created: bool,
1717 members: BTreeMap<RunId, AgentPoolMember>,
1718 topics: BTreeMap<TopicId, BTreeSet<RunId>>,
1719 message_policy: AgentPoolMessagePolicy,
1720 wake_policy: AgentPoolWakePolicy,
1721 policy_refs: Vec<PolicyRef>,
1722 messages: BTreeMap<MessageId, AgentPoolStoredMessage>,
1723 message_dedupe: BTreeMap<IdempotencyKey, MessageReceipt>,
1724 wake_dedupe: BTreeMap<IdempotencyKey, WakeRegistration>,
1725 wakes: BTreeMap<WakeConditionId, AgentPoolStoredWake>,
1726 next_event_counter: u64,
1727}
1728
1729impl AgentPoolState {
1730 fn new(config: AgentPoolStoreConfig) -> Self {
1731 Self {
1732 created: false,
1733 members: BTreeMap::new(),
1734 topics: BTreeMap::new(),
1735 message_policy: config.message_policy,
1736 wake_policy: config.wake_policy,
1737 policy_refs: config.policy_refs,
1738 messages: BTreeMap::new(),
1739 message_dedupe: BTreeMap::new(),
1740 wake_dedupe: BTreeMap::new(),
1741 wakes: BTreeMap::new(),
1742 next_event_counter: 0,
1743 }
1744 }
1745
1746 fn config(&self) -> AgentPoolStoreConfig {
1747 AgentPoolStoreConfig {
1748 message_policy: self.message_policy.clone(),
1749 wake_policy: self.wake_policy.clone(),
1750 policy_refs: self.policy_refs.clone(),
1751 }
1752 }
1753
1754 fn snapshot(
1755 &self,
1756 pool_id: AgentPoolId,
1757 cursor: Option<AgentPoolStoreCursor>,
1758 ) -> AgentPoolSnapshot {
1759 AgentPoolSnapshot {
1760 pool_id,
1761 created: self.created,
1762 members: self.members.values().cloned().collect(),
1763 topics: self.topics.keys().cloned().collect(),
1764 message_policy: self.message_policy.clone(),
1765 wake_policy: self.wake_policy.clone(),
1766 policy_refs: self.policy_refs.clone(),
1767 messages: self.messages.values().cloned().collect(),
1768 wakes: self.wakes.values().cloned().collect(),
1769 cursor,
1770 }
1771 }
1772
1773 fn index_member(&mut self, member: AgentPoolMember) {
1774 for topic in &member.topics {
1775 self.topics
1776 .entry(topic.clone())
1777 .or_default()
1778 .insert(member.run_id.clone());
1779 }
1780 self.members.insert(member.run_id.clone(), member);
1781 }
1782
1783 fn remove_member(&mut self, run_id: &RunId) -> Result<AgentPoolMember, AgentError> {
1784 let member = self.members.remove(run_id).ok_or_else(|| {
1785 AgentError::new(
1786 AgentErrorKind::InvalidStateTransition,
1787 RetryClassification::NotRetryable,
1788 "run is not a member of this agent pool",
1789 )
1790 })?;
1791 for topic in &member.topics {
1792 let remove_topic = if let Some(runs) = self.topics.get_mut(topic) {
1793 runs.remove(run_id);
1794 runs.is_empty()
1795 } else {
1796 false
1797 };
1798 if remove_topic {
1799 self.topics.remove(topic);
1800 }
1801 }
1802 Ok(member)
1803 }
1804}
1805
1806#[derive(Clone, Debug, Default)]
1807pub struct InMemoryAgentPoolStore {
1812 pools: Arc<Mutex<BTreeMap<AgentPoolId, AgentPoolState>>>,
1813 records: Arc<Mutex<BTreeMap<AgentPoolId, Vec<AgentPoolStoreRecord>>>>,
1814}
1815
1816impl InMemoryAgentPoolStore {
1817 fn with_pool_state<T>(
1818 &self,
1819 pool_id: &AgentPoolId,
1820 f: impl FnOnce(&mut AgentPoolState) -> Result<T, AgentError>,
1821 ) -> Result<T, AgentError> {
1822 let mut pools = self
1823 .pools
1824 .lock()
1825 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1826 let state = pools.get_mut(pool_id).ok_or_else(|| {
1827 AgentError::new(
1828 AgentErrorKind::HostConfigurationNeeded,
1829 RetryClassification::HostConfigurationNeeded,
1830 "agent pool store has not opened this pool",
1831 )
1832 })?;
1833 f(state)
1834 }
1835
1836 fn append_record(
1837 &self,
1838 pool_id: &AgentPoolId,
1839 payload: AgentPoolStoreRecordPayload,
1840 ) -> Result<AgentPoolStoreCursor, AgentError> {
1841 let mut records = self
1842 .records
1843 .lock()
1844 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1845 let entries = records.entry(pool_id.clone()).or_default();
1846 let cursor = AgentPoolStoreCursor::new(entries.len() as u64 + 1);
1847 entries.push(AgentPoolStoreRecord {
1848 pool_id: pool_id.clone(),
1849 cursor: cursor.clone(),
1850 payload,
1851 });
1852 Ok(cursor)
1853 }
1854
1855 fn latest_cursor(
1856 &self,
1857 pool_id: &AgentPoolId,
1858 ) -> Result<Option<AgentPoolStoreCursor>, AgentError> {
1859 let records = self
1860 .records
1861 .lock()
1862 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1863 Ok(records
1864 .get(pool_id)
1865 .and_then(|records| records.last().map(|record| record.cursor.clone())))
1866 }
1867}
1868
1869impl AgentPoolStore for InMemoryAgentPoolStore {
1870 fn open_pool(
1871 &self,
1872 pool_id: AgentPoolId,
1873 config: AgentPoolStoreConfig,
1874 ) -> Result<AgentPoolSnapshot, AgentError> {
1875 {
1876 let mut pools = self
1877 .pools
1878 .lock()
1879 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1880 if let Some(existing) = pools.get(&pool_id) {
1881 if existing.config() != config {
1882 return Err(AgentError::new(
1883 AgentErrorKind::InvalidStateTransition,
1884 RetryClassification::RepairNeeded,
1885 "agent pool store config conflicts with existing pool",
1886 ));
1887 }
1888 } else {
1889 pools.insert(pool_id.clone(), AgentPoolState::new(config.clone()));
1890 drop(pools);
1891 self.append_record(&pool_id, AgentPoolStoreRecordPayload::PoolOpened { config })?;
1892 }
1893 }
1894 self.snapshot(&pool_id)
1895 }
1896
1897 fn snapshot(&self, pool_id: &AgentPoolId) -> Result<AgentPoolSnapshot, AgentError> {
1898 let cursor = self.latest_cursor(pool_id)?;
1899 let pools = self
1900 .pools
1901 .lock()
1902 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
1903 pools
1904 .get(pool_id)
1905 .map(|state| state.snapshot(pool_id.clone(), cursor))
1906 .ok_or_else(|| {
1907 AgentError::new(
1908 AgentErrorKind::HostConfigurationNeeded,
1909 RetryClassification::HostConfigurationNeeded,
1910 "agent pool store has not opened this pool",
1911 )
1912 })
1913 }
1914
1915 fn record_pool_created(
1916 &self,
1917 pool_id: &AgentPoolId,
1918 ) -> Result<AgentPoolStoreCursor, AgentError> {
1919 self.with_pool_state(pool_id, |state| {
1920 state.created = true;
1921 Ok(())
1922 })?;
1923 self.append_record(pool_id, AgentPoolStoreRecordPayload::PoolCreated)
1924 }
1925
1926 fn join_member(
1927 &self,
1928 pool_id: &AgentPoolId,
1929 member: AgentPoolMember,
1930 ) -> Result<AgentPoolStoreCursor, AgentError> {
1931 self.with_pool_state(pool_id, |state| {
1932 state.index_member(member.clone());
1933 Ok(())
1934 })?;
1935 self.append_record(
1936 pool_id,
1937 AgentPoolStoreRecordPayload::MemberJoined { member },
1938 )
1939 }
1940
1941 fn leave_member(
1942 &self,
1943 pool_id: &AgentPoolId,
1944 run_id: &RunId,
1945 ) -> Result<(AgentPoolMember, AgentPoolStoreCursor), AgentError> {
1946 let member = self.with_pool_state(pool_id, |state| state.remove_member(run_id))?;
1947 let cursor = self.append_record(
1948 pool_id,
1949 AgentPoolStoreRecordPayload::MemberLeft {
1950 member: member.clone(),
1951 },
1952 )?;
1953 Ok((member, cursor))
1954 }
1955
1956 fn message_receipt(
1957 &self,
1958 pool_id: &AgentPoolId,
1959 idempotency_key: &IdempotencyKey,
1960 ) -> Result<Option<MessageReceipt>, AgentError> {
1961 self.with_pool_state(pool_id, |state| {
1962 Ok(state.message_dedupe.get(idempotency_key).cloned())
1963 })
1964 }
1965
1966 fn record_message(
1967 &self,
1968 pool_id: &AgentPoolId,
1969 message: RunMessage,
1970 receipt: MessageReceipt,
1971 ) -> Result<AgentPoolStoreCursor, AgentError> {
1972 let stored = AgentPoolStoredMessage { message, receipt };
1973 self.with_pool_state(pool_id, |state| {
1974 state.message_dedupe.insert(
1975 stored.message.idempotency_key.clone(),
1976 stored.receipt.clone(),
1977 );
1978 state
1979 .messages
1980 .insert(stored.message.message_id.clone(), stored.clone());
1981 Ok(())
1982 })?;
1983 self.append_record(pool_id, AgentPoolStoreRecordPayload::RunMessage { stored })
1984 }
1985
1986 fn wake_registration(
1987 &self,
1988 pool_id: &AgentPoolId,
1989 idempotency_key: &IdempotencyKey,
1990 ) -> Result<Option<WakeRegistration>, AgentError> {
1991 self.with_pool_state(pool_id, |state| {
1992 Ok(state.wake_dedupe.get(idempotency_key).cloned())
1993 })
1994 }
1995
1996 fn wake(
1997 &self,
1998 pool_id: &AgentPoolId,
1999 condition_id: &WakeConditionId,
2000 ) -> Result<Option<AgentPoolStoredWake>, AgentError> {
2001 self.with_pool_state(pool_id, |state| Ok(state.wakes.get(condition_id).cloned()))
2002 }
2003
2004 fn record_wake(
2005 &self,
2006 pool_id: &AgentPoolId,
2007 condition: WakeCondition,
2008 compiled_filter: CompiledEventFilter,
2009 registration: WakeRegistration,
2010 ) -> Result<AgentPoolStoreCursor, AgentError> {
2011 let stored = AgentPoolStoredWake {
2012 condition,
2013 compiled_filter,
2014 registration,
2015 };
2016 self.with_pool_state(pool_id, |state| {
2017 state.wake_dedupe.insert(
2018 stored.condition.idempotency_key.clone(),
2019 stored.registration.clone(),
2020 );
2021 state
2022 .wakes
2023 .insert(stored.condition.condition_id.clone(), stored.clone());
2024 Ok(())
2025 })?;
2026 self.append_record(pool_id, AgentPoolStoreRecordPayload::Wake { stored })
2027 }
2028
2029 fn watch(
2030 &self,
2031 pool_id: &AgentPoolId,
2032 cursor: Option<AgentPoolStoreCursor>,
2033 ) -> Result<AgentPoolStoreStream, AgentError> {
2034 let start_after = cursor.map(|cursor| cursor.sequence).unwrap_or(0);
2035 let records = self
2036 .records
2037 .lock()
2038 .map_err(|_| AgentError::contract_violation("agent pool store lock poisoned"))?;
2039 Ok(AgentPoolStoreStream::new(
2040 records
2041 .get(pool_id)
2042 .cloned()
2043 .unwrap_or_default()
2044 .into_iter()
2045 .filter(|record| record.cursor.sequence > start_after),
2046 ))
2047 }
2048
2049 fn next_event_sequence(&self, pool_id: &AgentPoolId) -> Result<u64, AgentError> {
2050 self.with_pool_state(pool_id, |state| {
2051 state.next_event_counter += 1;
2052 Ok(state.next_event_counter)
2053 })
2054 }
2055}
2056
2057impl From<RunAddressTarget> for RunMessageAddressTargetRecord {
2058 fn from(value: RunAddressTarget) -> Self {
2059 match value {
2060 RunAddressTarget::Run { run_id } => Self::Run { run_id },
2061 RunAddressTarget::Agent { agent_id } => Self::Agent { agent_id },
2062 RunAddressTarget::Topic { topic_id } => Self::Topic { topic_id },
2063 RunAddressTarget::Pool { pool_id } => Self::Pool { pool_id },
2064 }
2065 }
2066}
2067
2068impl From<MessageStatus> for RunMessageDeliveryStatus {
2069 fn from(value: MessageStatus) -> Self {
2070 match value {
2071 MessageStatus::Accepted => Self::Accepted,
2072 MessageStatus::Delivered => Self::Delivered,
2073 MessageStatus::Responded => Self::Responded,
2074 MessageStatus::Failed => Self::Failed,
2075 MessageStatus::TimedOut => Self::TimedOut,
2076 MessageStatus::Expired => Self::Expired,
2077 MessageStatus::Cancelled => Self::Cancelled,
2078 }
2079 }
2080}
2081
2082impl From<ResumeInputPolicy> for WakeResumeInputPolicyRecord {
2083 fn from(value: ResumeInputPolicy) -> Self {
2084 match value {
2085 ResumeInputPolicy::MatchingEventRefs => Self::MatchingEventRefs,
2086 ResumeInputPolicy::RedactedSummary => Self::RedactedSummary,
2087 ResumeInputPolicy::None => Self::None,
2088 }
2089 }
2090}
2091
2092impl From<WakeRegistrationStatus> for WakeTriggerStatus {
2093 fn from(value: WakeRegistrationStatus) -> Self {
2094 match value {
2095 WakeRegistrationStatus::Registered => Self::Registered,
2096 WakeRegistrationStatus::Triggered => Self::Triggered,
2097 WakeRegistrationStatus::TimedOut => Self::TimedOut,
2098 WakeRegistrationStatus::Cancelled => Self::Cancelled,
2099 WakeRegistrationStatus::Failed => Self::Failed,
2100 }
2101 }
2102}
2103
2104trait AgentPoolEventKindName {
2105 fn wire_name(&self) -> &'static str;
2106}
2107
2108impl AgentPoolEventKindName for EventKind {
2109 fn wire_name(&self) -> &'static str {
2110 match self {
2111 EventKind::AgentPoolCreated => "agent_pool_created",
2112 EventKind::AgentPoolRunJoined => "agent_pool_run_joined",
2113 EventKind::AgentPoolRunLeft => "agent_pool_run_left",
2114 EventKind::RunMessageAccepted => "run_message_accepted",
2115 EventKind::RunMessageDelivered => "run_message_delivered",
2116 EventKind::RunMessageResponded => "run_message_responded",
2117 EventKind::RunMessageFailed => "run_message_failed",
2118 EventKind::RunMessageTimedOut => "run_message_timed_out",
2119 EventKind::RunMessageExpired => "run_message_expired",
2120 EventKind::RunMessageCancelled => "run_message_cancelled",
2121 EventKind::WakeConditionRegistered => "wake_condition_registered",
2122 EventKind::WakeConditionTriggered => "wake_condition_triggered",
2123 EventKind::WakeConditionTimedOut => "wake_condition_timed_out",
2124 EventKind::WakeConditionCancelled => "wake_condition_cancelled",
2125 EventKind::WakeConditionFailed => "wake_condition_failed",
2126 _ => "agent_pool_event",
2127 }
2128 }
2129}
2130
2131fn intersect_run_ids(filter: &EventFilterSet<RunId>, allowed: &[RunId]) -> EventFilterSet<RunId> {
2132 match filter {
2133 EventFilterSet::Any => EventFilterSet::Include(allowed.to_vec()),
2134 EventFilterSet::Include(requested) => EventFilterSet::Include(
2135 requested
2136 .iter()
2137 .filter(|run_id| allowed.contains(run_id))
2138 .cloned()
2139 .collect(),
2140 ),
2141 }
2142}
2143
2144fn topics_from_members(members: &[AgentPoolMember]) -> BTreeMap<TopicId, BTreeSet<RunId>> {
2145 let mut topics = BTreeMap::new();
2146 for member in members {
2147 for topic in &member.topics {
2148 topics
2149 .entry(topic.clone())
2150 .or_insert_with(BTreeSet::new)
2151 .insert(member.run_id.clone());
2152 }
2153 }
2154 topics
2155}