1use core::fmt;
7use std::{
8 collections::{BTreeMap, BTreeSet},
9 sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError};
13
14use crate::{
15 agent_pool::{
16 AgentPool, AgentPoolMember, MessageReceipt, RunMessage, WakeCondition, WakeRegistration,
17 },
18 domain::{
19 AgentError, AgentErrorKind, AgentId, ContentRef as ContentRefId, DestinationKind,
20 DestinationRef, EffectId, EntityKind, EntityRef, EventId, IdempotencyKey, MessageId,
21 PolicyKind, PolicyRef, PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef,
22 SpanId, ToolCallId, TraceId,
23 },
24 effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
25 event::{
26 AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
27 EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
28 EventFilterSet, EventFrame, EventKind, EventStreamScope,
29 },
30 ids::{IdValidationError, validate_identifier},
31 journal::{
32 JournalCursor, JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload,
33 },
34 package::{
35 ChildRuntimePackage, ChildRuntimePackagePolicy, ContextHandoffPolicy, DepthBudget,
36 RuntimePackage, RuntimePackageFingerprint, SubagentRoutePolicy, SubagentToolPolicy,
37 build_child_runtime_package,
38 },
39 run::RunRequest,
40 run_handle::RunHandle,
41 runtime::AgentRuntime,
42 subagent_records::{
43 ChildLifecycleRecord, RunJournalRef, SubagentCompletedRecord, SubagentHandoffRecord,
44 SubagentRecord, SubagentStartedRecord, SubagentTerminalStatus, SubagentUsageRolledUpRecord,
45 SubagentWrappedEventRecord,
46 },
47};
48
49#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
50#[serde(transparent)]
51pub struct SubagentRequestId(String);
54
55impl SubagentRequestId {
56 pub fn new(value: impl Into<String>) -> Self {
66 Self::try_new(value).expect("SubagentRequestId must be valid")
67 }
68
69 pub fn try_new(value: impl Into<String>) -> Result<Self, IdValidationError> {
73 let value = value.into();
74 validate_identifier(&value)?;
75 Ok(Self(value))
76 }
77
78 pub fn as_str(&self) -> &str {
81 &self.0
82 }
83}
84
85impl<'de> Deserialize<'de> for SubagentRequestId {
86 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
87 where
88 D: Deserializer<'de>,
89 {
90 let value = String::deserialize(deserializer)?;
91 Self::try_new(value).map_err(D::Error::custom)
92 }
93}
94
95impl fmt::Debug for SubagentRequestId {
96 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
97 formatter.write_str("SubagentRequestId(redacted)")
98 }
99}
100
101impl fmt::Display for SubagentRequestId {
102 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
103 formatter.write_str("SubagentRequestId(redacted)")
104 }
105}
106
107#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
108pub struct SubagentRequest {
111 pub request_id: SubagentRequestId,
113 pub parent_run_id: RunId,
115 pub parent_agent_id: AgentId,
117 pub parent_tool_call_id: ToolCallId,
119 pub child_run_id: RunId,
121 pub child_agent_id: AgentId,
123 pub child_source: SourceRef,
125 pub child_destination: DestinationRef,
127 pub route_policy: SubagentRoutePolicy,
129 pub context_handoff: ContextHandoffPolicy,
131 pub child_package_policy: ChildRuntimePackagePolicy,
133 pub child_tool_policy: SubagentToolPolicy,
135 pub message_policy_ref: PolicyRef,
138 pub wake_policy_ref: PolicyRef,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub lifecycle_policy_ref: Option<PolicyRef>,
145 pub depth_budget: DepthBudget,
147 pub idempotency_key: IdempotencyKey,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub initial_message_ref: Option<ContentRefId>,
154}
155
156impl SubagentRequest {
157 pub fn validate(&self) -> Result<(), AgentError> {
161 self.depth_budget.validate_child_start()?;
162 self.context_handoff.validate()?;
163 if self.child_destination.kind != DestinationKind::Subagent {
164 return Err(AgentError::contract_violation(
165 "subagent child destination must use DestinationKind::Subagent",
166 ));
167 }
168 if self.child_source.kind != SourceKind::Subagent {
169 return Err(AgentError::contract_violation(
170 "subagent child source must use SourceKind::Subagent",
171 ));
172 }
173 if self.message_policy_ref.kind == PolicyKind::Host
174 || self.wake_policy_ref.kind == PolicyKind::Host
175 {
176 return Err(AgentError::contract_violation(
177 "subagent message and wake policies must be explicit SDK policy refs",
178 ));
179 }
180 Ok(())
181 }
182
183 pub fn child_run_request(&self) -> RunRequest {
187 RunRequest::text(
188 self.child_run_id.clone(),
189 self.child_agent_id.clone(),
190 self.child_source.clone(),
191 format!("subagent child run {}", self.request_id.as_str()),
192 )
193 }
194}
195
196#[derive(Clone)]
197pub struct SubagentSupervisor {
200 runtime: AgentRuntime,
201 pool: AgentPool,
202 parent_package: RuntimePackage,
203 state: Arc<Mutex<SubagentSupervisorState>>,
204}
205
206impl SubagentSupervisor {
207 pub fn new(runtime: AgentRuntime, pool: AgentPool, parent_package: RuntimePackage) -> Self {
211 Self {
212 runtime,
213 pool,
214 parent_package,
215 state: Arc::new(Mutex::new(SubagentSupervisorState::default())),
216 }
217 }
218
219 pub fn start_child(&self, request: SubagentRequest) -> Result<ChildRunHandle, AgentError> {
223 request.validate()?;
224
225 let child_package = build_child_runtime_package(
226 &self.parent_package,
227 request.child_agent_id.clone(),
228 &request.route_policy,
229 &request.context_handoff,
230 &request.child_package_policy,
231 &request.child_tool_policy,
232 )?;
233 self.runtime.provider_for_route(
234 &child_package.package.provider_route.route_id,
235 &request.child_run_id,
236 )?;
237
238 let start_effect = child_start_intent(&request);
239 let journal_cursor = self.append_parent_effect_intent(&request, start_effect.clone())?;
240
241 let child_journal_ref = RunJournalRef::for_run(request.child_run_id.clone());
242 let message_ids = request
243 .initial_message_ref
244 .as_ref()
245 .map(|_| {
246 vec![MessageId::new(format!(
247 "message.{}.initial",
248 request.request_id.as_str()
249 ))]
250 })
251 .unwrap_or_default();
252
253 let started = SubagentStartedRecord {
254 request_id: request.request_id.clone(),
255 parent_run_id: request.parent_run_id.clone(),
256 child_run_id: request.child_run_id.clone(),
257 parent_tool_call_id: request.parent_tool_call_id.clone(),
258 child_agent_id: request.child_agent_id.clone(),
259 child_package_fingerprint: child_package.fingerprint.clone(),
260 child_journal_ref: child_journal_ref.clone(),
261 handoff_policy: request.context_handoff.clone(),
262 tool_policy: request.child_tool_policy.clone(),
263 message_ids: message_ids.clone(),
264 wake_condition_ids: Vec::new(),
265 effect_intent: start_effect,
266 };
267 let handoff = SubagentHandoffRecord {
268 request_id: request.request_id.clone(),
269 parent_run_id: request.parent_run_id.clone(),
270 child_run_id: request.child_run_id.clone(),
271 handoff_policy: request.context_handoff.clone(),
272 selected_content_refs: request.context_handoff.selected_content_refs(),
273 projection_audit_ref: match request.context_handoff {
274 ContextHandoffPolicy::FullHistoryWithPolicy { .. } => {
275 Some(format!("projection.audit.{}", request.request_id.as_str()))
276 }
277 _ => None,
278 },
279 policy_refs: request
280 .context_handoff
281 .policy_refs()
282 .into_iter()
283 .chain([request.child_package_policy.redaction_policy_ref.clone()])
284 .collect(),
285 redaction_policy_id: request
286 .child_package_policy
287 .redaction_policy_ref
288 .as_str()
289 .to_string(),
290 };
291 self.append_parent_subagent_record(&request, SubagentRecord::Started(started.clone()))?;
292 self.append_parent_subagent_record(&request, SubagentRecord::Handoff(handoff.clone()))?;
293
294 self.pool.join_run(pool_member_with_subagent_policies(
295 request.parent_run_id.clone(),
296 request.parent_agent_id.clone(),
297 &request,
298 ))?;
299 let run_handle = self.pool.start_run(request.child_run_request())?;
300 self.pool.join_run(pool_member_with_subagent_policies(
301 request.child_run_id.clone(),
302 request.child_agent_id.clone(),
303 &request,
304 ))?;
305
306 if let Some(content_ref) = request.initial_message_ref.clone() {
307 let message = RunMessage::new(
308 message_ids
309 .first()
310 .cloned()
311 .expect("initial message id was precomputed"),
312 request.parent_run_id.clone(),
313 crate::agent_pool::RunAddress::run(request.child_run_id.clone()),
314 content_ref,
315 IdempotencyKey::new(format!("idem.{}.initial", request.request_id.as_str())),
316 )
317 .policy_ref(request.message_policy_ref.clone());
318 self.pool.send(message)?;
319 }
320
321 let handle = ChildRunHandle {
322 child_run_id: request.child_run_id.clone(),
323 child_agent_id: request.child_agent_id.clone(),
324 parent_run_id: request.parent_run_id.clone(),
325 child_package_fingerprint: child_package.fingerprint.clone(),
326 child_journal_ref,
327 wrapped_event_filter: child_event_filter(request.child_run_id.clone())?,
328 run_handle,
329 child_package,
330 start_journal_cursor: Some(journal_cursor),
331 };
332
333 let mut state = self.state()?;
334 state.children.insert(
335 request.child_run_id.clone(),
336 ChildRunState {
337 request: request.clone(),
338 handle: handle.clone_without_run_handle(),
339 detached: false,
340 terminal: false,
341 },
342 );
343 state.records.push(SubagentRecord::Started(started));
344 state.records.push(SubagentRecord::Handoff(handoff));
345
346 Ok(handle)
347 }
348
349 pub fn send_message(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
353 self.pool.send(message)
354 }
355
356 pub fn suspend_until(
360 &self,
361 run_id: RunId,
362 condition: WakeCondition,
363 ) -> Result<WakeRegistration, AgentError> {
364 self.pool.suspend_until(run_id, condition)
365 }
366
367 pub fn wrap_child_event(
371 &self,
372 event: AgentEvent,
373 ) -> Result<SubagentWrappedEventRecord, AgentError> {
374 let child_run_id = event.envelope.run_id.clone();
375 let child = self.child(&child_run_id)?;
376 let record = SubagentWrappedEventRecord {
377 parent_run_id: child.request.parent_run_id.clone(),
378 child_run_id: child_run_id.clone(),
379 child_agent_id: child.request.child_agent_id.clone(),
380 original_child_event_id: event.envelope.event_id.clone(),
381 original_child_event_kind: event.envelope.event_kind.clone(),
382 wrapped_event_ref: format!(
383 "subagent.event.{}.{}",
384 child.request.parent_run_id.as_str(),
385 event.envelope.event_id.as_str()
386 ),
387 child_journal_cursor: event.envelope.journal_cursor.clone(),
388 child_journal_ref: child.handle.child_journal_ref.clone(),
389 privacy: event.envelope.privacy,
390 };
391 self.append_parent_subagent_record(
392 &child.request,
393 SubagentRecord::WrappedEvent(record.clone()),
394 )?;
395 self.state()?
396 .records
397 .push(SubagentRecord::WrappedEvent(record.clone()));
398 Ok(record)
399 }
400
401 #[expect(
405 clippy::too_many_arguments,
406 reason = "usage rollup is a durable subagent audit record constructor; a parameter object should be a separate public API pass"
407 )]
408 pub fn rollup_usage(
409 &self,
410 child_run_id: RunId,
411 child_usage_ref: impl Into<String>,
412 input_tokens: u32,
413 output_tokens: u32,
414 cost_micros: Option<u64>,
415 currency: Option<String>,
416 terminal_status: SubagentTerminalStatus,
417 ) -> Result<SubagentUsageRolledUpRecord, AgentError> {
418 let child_usage_ref = child_usage_ref.into();
419 let child = self.child(&child_run_id)?;
420 let dedupe_key = format!("{}:{child_usage_ref}", child_run_id.as_str());
421
422 let mut state = self.state()?;
423 if !state.usage_rollup_dedupe.insert(dedupe_key) {
424 return state
425 .records
426 .iter()
427 .find_map(|record| match record {
428 SubagentRecord::UsageRolledUp(record)
429 if record.child_run_id == child_run_id
430 && record.child_usage_ref == child_usage_ref =>
431 {
432 Some(record.clone())
433 }
434 _ => None,
435 })
436 .ok_or_else(|| AgentError::contract_violation("usage rollup dedupe lost record"));
437 }
438
439 let record = SubagentUsageRolledUpRecord {
440 child_run_id: child_run_id.clone(),
441 parent_run_id: child.request.parent_run_id.clone(),
442 child_usage_ref: child_usage_ref.clone(),
443 parent_usage_ref: format!("usage.parent.{}.{}", child_run_id.as_str(), child_usage_ref),
444 input_tokens,
445 output_tokens,
446 total_tokens: input_tokens + output_tokens,
447 cost_micros,
448 currency,
449 terminal_status,
450 };
451 self.append_parent_subagent_record(
452 &child.request,
453 SubagentRecord::UsageRolledUp(record.clone()),
454 )?;
455 state
456 .records
457 .push(SubagentRecord::UsageRolledUp(record.clone()));
458 Ok(record)
459 }
460
461 pub fn complete_child(
465 &self,
466 child_run_id: RunId,
467 terminal_status: SubagentTerminalStatus,
468 result_ref: Option<ContentRefId>,
469 error_ref: Option<String>,
470 ) -> Result<SubagentCompletedRecord, AgentError> {
471 let child = self.child(&child_run_id)?;
472 let effect_id = EffectId::new(format!("effect.subagent.start.{}", child_run_id.as_str()));
473 let record = SubagentCompletedRecord {
474 child_run_id: child_run_id.clone(),
475 parent_run_id: child.request.parent_run_id.clone(),
476 terminal_status: terminal_status.clone(),
477 result_ref,
478 error_ref,
479 policy_outcome: "policy.subagent.terminal".to_string(),
480 effect_result: EffectResult {
481 effect_id,
482 terminal_status: match terminal_status {
483 SubagentTerminalStatus::Completed | SubagentTerminalStatus::Detached => {
484 EffectTerminalStatus::Completed
485 }
486 SubagentTerminalStatus::Failed => EffectTerminalStatus::Failed,
487 SubagentTerminalStatus::Cancelled => EffectTerminalStatus::Cancelled,
488 },
489 external_operation_id: None,
490 reconciliation_ref: None,
491 error_ref: None,
492 content_refs: Vec::new(),
493 redacted_summary: "subagent terminal status recorded".to_string(),
494 },
495 };
496 self.append_parent_effect_result(&child.request, record.effect_result.clone())?;
497 self.append_parent_subagent_record(
498 &child.request,
499 SubagentRecord::Completed(record.clone()),
500 )?;
501
502 let mut state = self.state()?;
503 if let Some(child) = state.children.get_mut(&child_run_id) {
504 child.terminal = true;
505 }
506 state
507 .records
508 .push(SubagentRecord::Completed(record.clone()));
509 Ok(record)
510 }
511
512 pub fn cancel_child(
516 &self,
517 child_run_id: RunId,
518 ) -> Result<Vec<ChildLifecycleRecord>, AgentError> {
519 let child = self.child(&child_run_id)?;
520 if child.detached {
521 return Err(AgentError::new(
522 AgentErrorKind::ChildLifecycleFailure,
523 RetryClassification::HostConfigurationNeeded,
524 "detached child lifecycle is host-owned after detach acknowledgement",
525 ));
526 }
527 let idempotency_key =
528 IdempotencyKey::new(format!("idem.subagent.cancel.{}", child_run_id.as_str()));
529 let intent = ChildLifecycleRecord::shutdown_intent(
530 child.request.parent_run_id.clone(),
531 child_run_id.clone(),
532 child.lifecycle_policy_refs(),
533 idempotency_key,
534 );
535 self.append_parent_child_lifecycle_record(&child.request, intent.clone())?;
536 self.runtime.cancel_run(&child_run_id)?;
537 let completed = intent.shutdown_completed();
538 self.append_parent_child_lifecycle_record(&child.request, completed.clone())?;
539 let mut state = self.state()?;
540 state
541 .records
542 .push(SubagentRecord::ChildLifecycle(intent.clone()));
543 state
544 .records
545 .push(SubagentRecord::ChildLifecycle(completed.clone()));
546 Ok(vec![intent, completed])
547 }
548
549 pub fn detach_child(
553 &self,
554 child_run_id: RunId,
555 host_ack_ref: impl Into<String>,
556 reclaim_policy_ref: PolicyRef,
557 ) -> Result<Vec<ChildLifecycleRecord>, AgentError> {
558 let child = self.child(&child_run_id)?;
559 let host_ack_ref = host_ack_ref.into();
560 if host_ack_ref.is_empty() {
561 return Err(AgentError::contract_violation(
562 "detached child run requires host acknowledgement",
563 ));
564 }
565 let idempotency_key =
566 IdempotencyKey::new(format!("idem.subagent.detach.{}", child_run_id.as_str()));
567 let intent = ChildLifecycleRecord::detach_intent(
568 child.request.parent_run_id.clone(),
569 child_run_id.clone(),
570 child.lifecycle_policy_refs(),
571 host_ack_ref,
572 reclaim_policy_ref,
573 idempotency_key,
574 );
575 let detached = intent.detached();
576 self.append_parent_child_lifecycle_record(&child.request, intent.clone())?;
577 self.append_parent_child_lifecycle_record(&child.request, detached.clone())?;
578 let mut state = self.state()?;
579 if let Some(child) = state.children.get_mut(&child_run_id) {
580 child.detached = true;
581 }
582 state
583 .records
584 .push(SubagentRecord::ChildLifecycle(intent.clone()));
585 state
586 .records
587 .push(SubagentRecord::ChildLifecycle(detached.clone()));
588 Ok(vec![intent, detached])
589 }
590
591 pub fn records(&self) -> Result<Vec<SubagentRecord>, AgentError> {
594 Ok(self.state()?.records.clone())
595 }
596
597 pub fn child_can_be_addressed_as_user_chat(
601 &self,
602 child_run_id: &RunId,
603 ) -> Result<bool, AgentError> {
604 self.child(child_run_id)?;
605 Ok(false)
606 }
607
608 pub fn child_requires_terminal_rollup_or_detach(
612 &self,
613 child_run_id: &RunId,
614 ) -> Result<bool, AgentError> {
615 let child = self.child(child_run_id)?;
616 Ok(!child.detached && !child.terminal)
617 }
618
619 fn append_parent_effect_intent(
620 &self,
621 request: &SubagentRequest,
622 intent: EffectIntent,
623 ) -> Result<JournalCursor, AgentError> {
624 let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
625 let mut base = JournalRecordBase::new(
626 self.runtime.next_journal_seq(),
627 format!(
628 "journal.record.subagent.start.{}",
629 request.request_id.as_str()
630 ),
631 request.parent_run_id.clone(),
632 request.parent_agent_id.clone(),
633 request.child_source.clone(),
634 );
635 base.destination = Some(request.child_destination.clone());
636 base.tags = vec!["feature:subagent".to_string()];
637 base.runtime_package_fingerprint = self
638 .parent_package
639 .fingerprint()
640 .map(|fingerprint| fingerprint.as_str().to_string())?;
641 base.privacy = PrivacyClass::ContentRefsOnly;
642 base.redaction_policy_id = request
643 .child_package_policy
644 .redaction_policy_ref
645 .as_str()
646 .to_string();
647 parent_journal.append(JournalRecord::effect_intent(base, intent))
648 }
649
650 fn append_parent_effect_result(
651 &self,
652 request: &SubagentRequest,
653 result: EffectResult,
654 ) -> Result<JournalCursor, AgentError> {
655 let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
656 let mut base = self.parent_record_base(
657 request,
658 format!(
659 "journal.record.subagent.effect.result.{}",
660 request.request_id.as_str()
661 ),
662 )?;
663 base.source = request.child_source.clone();
664 parent_journal.append(JournalRecord::effect_result(base, result))
665 }
666
667 fn append_parent_subagent_record(
668 &self,
669 request: &SubagentRequest,
670 record: SubagentRecord,
671 ) -> Result<JournalCursor, AgentError> {
672 let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
673 let base = self.parent_record_base(
674 request,
675 format!(
676 "journal.record.{}.{}",
677 record.kind().replace('_', "."),
678 request.request_id.as_str()
679 ),
680 )?;
681 parent_journal.append(JournalRecord::feature_record(
682 base,
683 JournalRecordKind::Subagent,
684 "subagent",
685 record.kind(),
686 EntityRef::new(EntityKind::SubagentRun, request.child_run_id.as_str()),
687 vec![EntityRef::run(request.parent_run_id.clone())],
688 subagent_content_refs(&record),
689 JournalRecordPayload::Subagent(record),
690 ))
691 }
692
693 fn append_parent_child_lifecycle_record(
694 &self,
695 request: &SubagentRequest,
696 record: ChildLifecycleRecord,
697 ) -> Result<JournalCursor, AgentError> {
698 let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
699 let event_kind = match record.status {
700 crate::subagent_records::ChildLifecycleStatus::Requested => "child_lifecycle_requested",
701 crate::subagent_records::ChildLifecycleStatus::Completed => "child_lifecycle_completed",
702 crate::subagent_records::ChildLifecycleStatus::Detached => "child_lifecycle_detached",
703 crate::subagent_records::ChildLifecycleStatus::Failed => "child_lifecycle_failed",
704 };
705 let base = self.parent_record_base(
706 request,
707 format!(
708 "journal.record.{}.{}",
709 event_kind.replace('_', "."),
710 request.request_id.as_str()
711 ),
712 )?;
713 parent_journal.append(JournalRecord::feature_record(
714 base,
715 JournalRecordKind::ChildLifecycle,
716 "child_lifecycle",
717 event_kind,
718 EntityRef::new(EntityKind::SubagentRun, record.child_run_id.as_str()),
719 vec![EntityRef::run(record.parent_run_id.clone())],
720 Vec::new(),
721 JournalRecordPayload::ChildLifecycle(record),
722 ))
723 }
724
725 fn parent_record_base(
726 &self,
727 request: &SubagentRequest,
728 record_id: String,
729 ) -> Result<JournalRecordBase, AgentError> {
730 let mut base = JournalRecordBase::new(
731 self.runtime.next_journal_seq(),
732 record_id,
733 request.parent_run_id.clone(),
734 request.parent_agent_id.clone(),
735 request.child_source.clone(),
736 );
737 base.destination = Some(request.child_destination.clone());
738 base.tags = vec!["feature:subagent".to_string()];
739 base.runtime_package_fingerprint = self
740 .parent_package
741 .fingerprint()
742 .map(|fingerprint| fingerprint.as_str().to_string())?;
743 base.privacy = PrivacyClass::ContentRefsOnly;
744 base.redaction_policy_id = request
745 .child_package_policy
746 .redaction_policy_ref
747 .as_str()
748 .to_string();
749 Ok(base)
750 }
751
752 fn child(&self, child_run_id: &RunId) -> Result<ChildRunState, AgentError> {
753 self.state()?
754 .children
755 .get(child_run_id)
756 .cloned()
757 .ok_or_else(|| {
758 AgentError::new(
759 AgentErrorKind::SubagentFailure,
760 RetryClassification::RepairNeeded,
761 "child run is not supervised by this subagent supervisor",
762 )
763 })
764 }
765
766 fn state(&self) -> Result<std::sync::MutexGuard<'_, SubagentSupervisorState>, AgentError> {
767 self.state
768 .lock()
769 .map_err(|_| AgentError::contract_violation("subagent supervisor state lock poisoned"))
770 }
771}
772
773#[derive(Clone, Debug)]
774pub struct ChildRunHandle {
777 pub child_run_id: RunId,
779 pub child_agent_id: AgentId,
781 pub parent_run_id: RunId,
783 pub child_package_fingerprint: RuntimePackageFingerprint,
786 pub child_journal_ref: RunJournalRef,
789 pub wrapped_event_filter: CompiledEventFilter,
791 pub run_handle: RunHandle,
793 pub child_package: ChildRuntimePackage,
795 pub start_journal_cursor: Option<JournalCursor>,
798}
799
800impl ChildRunHandle {
801 fn clone_without_run_handle(&self) -> ChildRunHandleSnapshot {
802 ChildRunHandleSnapshot {
803 child_journal_ref: self.child_journal_ref.clone(),
804 }
805 }
806}
807
808#[derive(Clone, Debug)]
809struct ChildRunHandleSnapshot {
810 child_journal_ref: RunJournalRef,
811}
812
813#[derive(Clone)]
814struct ChildRunState {
815 request: SubagentRequest,
816 handle: ChildRunHandleSnapshot,
817 detached: bool,
818 terminal: bool,
819}
820
821impl ChildRunState {
822 fn lifecycle_policy_refs(&self) -> Vec<PolicyRef> {
823 let mut refs = vec![
824 self.request
825 .child_package_policy
826 .child_lifecycle_bounds
827 .clone(),
828 self.request.message_policy_ref.clone(),
829 self.request.wake_policy_ref.clone(),
830 ];
831 if let Some(policy) = &self.request.lifecycle_policy_ref {
832 refs.push(policy.clone());
833 }
834 refs
835 }
836}
837
838#[derive(Default)]
839struct SubagentSupervisorState {
840 children: BTreeMap<RunId, ChildRunState>,
841 records: Vec<SubagentRecord>,
842 usage_rollup_dedupe: BTreeSet<String>,
843}
844
845fn child_start_intent(request: &SubagentRequest) -> EffectIntent {
846 let mut intent = EffectIntent::new(
847 EffectId::new(format!(
848 "effect.subagent.start.{}",
849 request.child_run_id.as_str()
850 )),
851 EffectKind::ChildAgentStart,
852 EntityRef::new(EntityKind::SubagentRun, request.child_run_id.as_str()),
853 request.child_source.clone(),
854 "parent requested child subagent start",
855 );
856 intent.destination = Some(request.child_destination.clone());
857 intent.policy_refs = vec![
858 request.message_policy_ref.clone(),
859 request.wake_policy_ref.clone(),
860 request.child_package_policy.child_lifecycle_bounds.clone(),
861 request.child_package_policy.redaction_policy_ref.clone(),
862 ];
863 intent.idempotency_key = Some(request.idempotency_key.clone());
864 if let Some(content_ref) = &request.initial_message_ref {
865 intent.content_refs.push(content_ref.clone());
866 }
867 intent
868}
869
870fn subagent_content_refs(record: &SubagentRecord) -> Vec<ContentRefId> {
871 match record {
872 SubagentRecord::Started(record) => record.effect_intent.content_refs.to_vec(),
873 SubagentRecord::Handoff(record) => record.selected_content_refs.clone(),
874 SubagentRecord::Completed(record) => record.result_ref.iter().cloned().collect(),
875 SubagentRecord::WrappedEvent(_)
876 | SubagentRecord::UsageRolledUp(_)
877 | SubagentRecord::ChildLifecycle(_) => Vec::new(),
878 }
879}
880
881fn pool_member_with_subagent_policies(
882 run_id: RunId,
883 agent_id: AgentId,
884 request: &SubagentRequest,
885) -> AgentPoolMember {
886 let mut member = AgentPoolMember::new(run_id, agent_id)
887 .policy_ref(request.message_policy_ref.clone())
888 .policy_ref(request.wake_policy_ref.clone())
889 .policy_ref(request.child_package_policy.child_lifecycle_bounds.clone());
890 if let Some(policy_ref) = &request.lifecycle_policy_ref {
891 member = member.policy_ref(policy_ref.clone());
892 }
893 member
894}
895
896fn child_event_filter(child_run_id: RunId) -> Result<CompiledEventFilter, AgentError> {
897 EventFilter {
898 run_ids: EventFilterSet::Include(vec![child_run_id]),
899 ..EventFilter::default()
900 }
901 .compile()
902}
903
904pub fn subagent_runtime_event_frame(
907 parent_run_id: RunId,
908 child_run_id: RunId,
909 child_agent_id: AgentId,
910 event_seq: u64,
911 event_kind: EventKind,
912 journal_cursor: Option<JournalCursor>,
913) -> EventFrame {
914 let event = AgentEvent::with_redacted_summary(
915 EventEnvelope {
916 schema_version: EVENT_SCHEMA_VERSION,
917 event_id: EventId::new(format!("event.subagent.child.{event_seq}")),
918 event_seq,
919 event_family: EventFamily::Run,
920 event_kind,
921 payload_schema_version: 1,
922 timestamp: "1970-01-01T00:00:00Z".to_string(),
923 recorded_at: "1970-01-01T00:00:00Z".to_string(),
924 run_id: child_run_id.clone(),
925 agent_id: child_agent_id,
926 turn_id: None,
927 attempt_id: None,
928 message_id: None,
929 context_item_id: None,
930 trace_id: TraceId::new(format!("trace.subagent.{}", parent_run_id.as_str())),
931 span_id: SpanId::new(format!("span.subagent.child.{event_seq}")),
932 parent_event_id: None,
933 caused_by: None,
934 subject_ref: EntityRef::new(EntityKind::SubagentRun, child_run_id.as_str()),
935 related_refs: vec![EntityRef::run(parent_run_id)],
936 causal_refs: Vec::new(),
937 correlation: EventCorrelation::default(),
938 tags: vec![crate::event::EventTag::new("feature:subagent")],
939 source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.subagent"),
940 destination: Some(DestinationRef::with_kind(
941 DestinationKind::EventStream,
942 "destination.event_stream.subagent",
943 )),
944 policy_refs: Vec::new(),
945 journal_cursor,
946 state_before: None,
947 state_after: None,
948 delivery_semantics: EventDeliverySemantics::JournalBacked,
949 privacy: PrivacyClass::ContentRefsOnly,
950 content_capture: ContentCaptureMode::Off,
951 redaction_policy_id: "policy.redaction.subagent.default".to_string(),
952 runtime_package_fingerprint: "runtime.package.fingerprint.subagent.child".to_string(),
953 },
954 "child event wrapped by subagent supervisor",
955 );
956 EventFrame {
957 cursor: event.envelope.cursor(EventStreamScope::All),
958 event,
959 archive_cursor: None,
960 overflow: None,
961 }
962}