Skip to main content

agent_sdk_core/application/
subagent.rs

1//! Subagent supervision helpers layered over agent pools and child run requests. Use
2//! this module when a parent run starts bounded child work and needs event wrapping
3//! or usage rollup. Child lifecycle is parent-owned; product-specific agent societies
4//! stay outside core.
5//!
6use core::fmt;
7use std::{
8    collections::{BTreeMap, BTreeSet},
9    sync::{Arc, Mutex},
10};
11
12use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError};
13
14use crate::{
15    agent_pool::{
16        AgentPool, AgentPoolMember, MessageReceipt, RunMessage, WakeCondition, WakeRegistration,
17    },
18    domain::{
19        AgentError, AgentErrorKind, AgentId, ContentRef as ContentRefId, DestinationKind,
20        DestinationRef, EffectId, EntityKind, EntityRef, EventId, IdempotencyKey, MessageId,
21        PolicyKind, PolicyRef, PrivacyClass, RetryClassification, RunId, SourceKind, SourceRef,
22        SpanId, ToolCallId, TraceId,
23    },
24    effect::{EffectIntent, EffectKind, EffectResult, EffectTerminalStatus},
25    event::{
26        AgentEvent, CompiledEventFilter, ContentCaptureMode, EVENT_SCHEMA_VERSION,
27        EventCorrelation, EventDeliverySemantics, EventEnvelope, EventFamily, EventFilter,
28        EventFilterSet, EventFrame, EventKind, EventStreamScope,
29    },
30    ids::{IdValidationError, validate_identifier},
31    journal::{
32        JournalCursor, JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload,
33    },
34    package::{
35        ChildRuntimePackage, ChildRuntimePackagePolicy, ContextHandoffPolicy, DepthBudget,
36        RuntimePackage, RuntimePackageFingerprint, SubagentRoutePolicy, SubagentToolPolicy,
37        build_child_runtime_package,
38    },
39    run::RunRequest,
40    run_handle::RunHandle,
41    runtime::AgentRuntime,
42    subagent_records::{
43        ChildLifecycleRecord, RunJournalRef, SubagentCompletedRecord, SubagentHandoffRecord,
44        SubagentRecord, SubagentStartedRecord, SubagentTerminalStatus, SubagentUsageRolledUpRecord,
45        SubagentWrappedEventRecord,
46    },
47};
48
49#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
50#[serde(transparent)]
51/// Holds subagent request id application-layer state or configuration.
52/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
53pub struct SubagentRequestId(String);
54
55impl SubagentRequestId {
56    /// Creates a new application::subagent value with explicit
57    /// caller-provided inputs. This constructor is data-only and
58    /// performs no I/O or external side effects.
59    ///
60    /// # Panics
61    ///
62    /// Panics if constructor invariants fail, such as invalid identifier
63    /// text or constructor-specific bounds. Use a fallible constructor such as
64    /// `try_new` when one is available for untrusted input.
65    pub fn new(value: impl Into<String>) -> Self {
66        Self::try_new(value).expect("SubagentRequestId must be valid")
67    }
68
69    /// Creates a new application::subagent value after validation.
70    /// Returns an SDK error instead of panicking when the identifier or
71    /// input does not satisfy the contract.
72    pub fn try_new(value: impl Into<String>) -> Result<Self, IdValidationError> {
73        let value = value.into();
74        validate_identifier(&value)?;
75        Ok(Self(value))
76    }
77
78    /// Returns this value as str. The accessor is side-effect free and
79    /// keeps ownership with the caller.
80    pub fn as_str(&self) -> &str {
81        &self.0
82    }
83}
84
85impl<'de> Deserialize<'de> for SubagentRequestId {
86    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
87    where
88        D: Deserializer<'de>,
89    {
90        let value = String::deserialize(deserializer)?;
91        Self::try_new(value).map_err(D::Error::custom)
92    }
93}
94
95impl fmt::Debug for SubagentRequestId {
96    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
97        formatter.write_str("SubagentRequestId(redacted)")
98    }
99}
100
101impl fmt::Display for SubagentRequestId {
102    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
103        formatter.write_str("SubagentRequestId(redacted)")
104    }
105}
106
107#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
108/// Holds subagent request application-layer state or configuration.
109/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
110pub struct SubagentRequest {
111    /// Stable request id used for typed lineage, lookup, or dedupe.
112    pub request_id: SubagentRequestId,
113    /// Stable parent run id used for typed lineage, lookup, or dedupe.
114    pub parent_run_id: RunId,
115    /// Stable parent agent id used for typed lineage, lookup, or dedupe.
116    pub parent_agent_id: AgentId,
117    /// Stable parent tool call id used for typed lineage, lookup, or dedupe.
118    pub parent_tool_call_id: ToolCallId,
119    /// Stable child run id used for typed lineage, lookup, or dedupe.
120    pub child_run_id: RunId,
121    /// Stable child agent id used for typed lineage, lookup, or dedupe.
122    pub child_agent_id: AgentId,
123    /// Child source used by this record or request.
124    pub child_source: SourceRef,
125    /// Child destination used by this record or request.
126    pub child_destination: DestinationRef,
127    /// Route policy used by this record or request.
128    pub route_policy: SubagentRoutePolicy,
129    /// Context handoff used by this record or request.
130    pub context_handoff: ContextHandoffPolicy,
131    /// Child package policy used by this record or request.
132    pub child_package_policy: ChildRuntimePackagePolicy,
133    /// Child tool policy used by this record or request.
134    pub child_tool_policy: SubagentToolPolicy,
135    /// Typed message policy ref reference. Resolving or executing it is a
136    /// separate policy-gated step.
137    pub message_policy_ref: PolicyRef,
138    /// Typed wake policy ref reference. Resolving or executing it is a
139    /// separate policy-gated step.
140    pub wake_policy_ref: PolicyRef,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    /// Typed lifecycle policy ref reference. Resolving or executing it is a
143    /// separate policy-gated step.
144    pub lifecycle_policy_ref: Option<PolicyRef>,
145    /// Depth budget used by this record or request.
146    pub depth_budget: DepthBudget,
147    /// Idempotency setting or key for deduping retries.
148    /// Use it to prevent duplicate side effects during replay or repair.
149    pub idempotency_key: IdempotencyKey,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    /// Typed initial message ref reference. Resolving or executing it is a
152    /// separate policy-gated step.
153    pub initial_message_ref: Option<ContentRefId>,
154}
155
156impl SubagentRequest {
157    /// Validates the application::subagent invariants and returns a
158    /// typed error on failure. Validation is pure and does not perform
159    /// I/O, dispatch, journal appends, or adapter calls.
160    pub fn validate(&self) -> Result<(), AgentError> {
161        self.depth_budget.validate_child_start()?;
162        self.context_handoff.validate()?;
163        if self.child_destination.kind != DestinationKind::Subagent {
164            return Err(AgentError::contract_violation(
165                "subagent child destination must use DestinationKind::Subagent",
166            ));
167        }
168        if self.child_source.kind != SourceKind::Subagent {
169            return Err(AgentError::contract_violation(
170                "subagent child source must use SourceKind::Subagent",
171            ));
172        }
173        if self.message_policy_ref.kind == PolicyKind::Host
174            || self.wake_policy_ref.kind == PolicyKind::Host
175        {
176            return Err(AgentError::contract_violation(
177                "subagent message and wake policies must be explicit SDK policy refs",
178            ));
179        }
180        Ok(())
181    }
182
183    /// Builds the child run request value.
184    /// This is data construction and performs no I/O, journal append, event publication, or
185    /// process work.
186    pub fn child_run_request(&self) -> RunRequest {
187        RunRequest::text(
188            self.child_run_id.clone(),
189            self.child_agent_id.clone(),
190            self.child_source.clone(),
191            format!("subagent child run {}", self.request_id.as_str()),
192        )
193    }
194}
195
196#[derive(Clone)]
197/// Holds subagent supervisor application-layer state or configuration.
198/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
199pub struct SubagentSupervisor {
200    runtime: AgentRuntime,
201    pool: AgentPool,
202    parent_package: RuntimePackage,
203    state: Arc<Mutex<SubagentSupervisorState>>,
204}
205
206impl SubagentSupervisor {
207    /// Creates a new application::subagent value with explicit
208    /// caller-provided inputs. This constructor is data-only and
209    /// performs no I/O or external side effects.
210    pub fn new(runtime: AgentRuntime, pool: AgentPool, parent_package: RuntimePackage) -> Self {
211        Self {
212            runtime,
213            pool,
214            parent_package,
215            state: Arc::new(Mutex::new(SubagentSupervisorState::default())),
216        }
217    }
218
219    /// Start child.
220    /// This delegates to child run control to start/register the child and records parent-owned
221    /// lifecycle state.
222    pub fn start_child(&self, request: SubagentRequest) -> Result<ChildRunHandle, AgentError> {
223        request.validate()?;
224
225        let child_package = build_child_runtime_package(
226            &self.parent_package,
227            request.child_agent_id.clone(),
228            &request.route_policy,
229            &request.context_handoff,
230            &request.child_package_policy,
231            &request.child_tool_policy,
232        )?;
233        self.runtime.provider_for_route(
234            &child_package.package.provider_route.route_id,
235            &request.child_run_id,
236        )?;
237
238        let start_effect = child_start_intent(&request);
239        let journal_cursor = self.append_parent_effect_intent(&request, start_effect.clone())?;
240
241        let child_journal_ref = RunJournalRef::for_run(request.child_run_id.clone());
242        let message_ids = request
243            .initial_message_ref
244            .as_ref()
245            .map(|_| {
246                vec![MessageId::new(format!(
247                    "message.{}.initial",
248                    request.request_id.as_str()
249                ))]
250            })
251            .unwrap_or_default();
252
253        let started = SubagentStartedRecord {
254            request_id: request.request_id.clone(),
255            parent_run_id: request.parent_run_id.clone(),
256            child_run_id: request.child_run_id.clone(),
257            parent_tool_call_id: request.parent_tool_call_id.clone(),
258            child_agent_id: request.child_agent_id.clone(),
259            child_package_fingerprint: child_package.fingerprint.clone(),
260            child_journal_ref: child_journal_ref.clone(),
261            handoff_policy: request.context_handoff.clone(),
262            tool_policy: request.child_tool_policy.clone(),
263            message_ids: message_ids.clone(),
264            wake_condition_ids: Vec::new(),
265            effect_intent: start_effect,
266        };
267        let handoff = SubagentHandoffRecord {
268            request_id: request.request_id.clone(),
269            parent_run_id: request.parent_run_id.clone(),
270            child_run_id: request.child_run_id.clone(),
271            handoff_policy: request.context_handoff.clone(),
272            selected_content_refs: request.context_handoff.selected_content_refs(),
273            projection_audit_ref: match request.context_handoff {
274                ContextHandoffPolicy::FullHistoryWithPolicy { .. } => {
275                    Some(format!("projection.audit.{}", request.request_id.as_str()))
276                }
277                _ => None,
278            },
279            policy_refs: request
280                .context_handoff
281                .policy_refs()
282                .into_iter()
283                .chain([request.child_package_policy.redaction_policy_ref.clone()])
284                .collect(),
285            redaction_policy_id: request
286                .child_package_policy
287                .redaction_policy_ref
288                .as_str()
289                .to_string(),
290        };
291        self.append_parent_subagent_record(&request, SubagentRecord::Started(started.clone()))?;
292        self.append_parent_subagent_record(&request, SubagentRecord::Handoff(handoff.clone()))?;
293
294        self.pool.join_run(pool_member_with_subagent_policies(
295            request.parent_run_id.clone(),
296            request.parent_agent_id.clone(),
297            &request,
298        ))?;
299        let run_handle = self.pool.start_run(request.child_run_request())?;
300        self.pool.join_run(pool_member_with_subagent_policies(
301            request.child_run_id.clone(),
302            request.child_agent_id.clone(),
303            &request,
304        ))?;
305
306        if let Some(content_ref) = request.initial_message_ref.clone() {
307            let message = RunMessage::new(
308                message_ids
309                    .first()
310                    .cloned()
311                    .expect("initial message id was precomputed"),
312                request.parent_run_id.clone(),
313                crate::agent_pool::RunAddress::run(request.child_run_id.clone()),
314                content_ref,
315                IdempotencyKey::new(format!("idem.{}.initial", request.request_id.as_str())),
316            )
317            .policy_ref(request.message_policy_ref.clone());
318            self.pool.send(message)?;
319        }
320
321        let handle = ChildRunHandle {
322            child_run_id: request.child_run_id.clone(),
323            child_agent_id: request.child_agent_id.clone(),
324            parent_run_id: request.parent_run_id.clone(),
325            child_package_fingerprint: child_package.fingerprint.clone(),
326            child_journal_ref,
327            wrapped_event_filter: child_event_filter(request.child_run_id.clone())?,
328            run_handle,
329            child_package,
330            start_journal_cursor: Some(journal_cursor),
331        };
332
333        let mut state = self.state()?;
334        state.children.insert(
335            request.child_run_id.clone(),
336            ChildRunState {
337                request: request.clone(),
338                handle: handle.clone_without_run_handle(),
339                detached: false,
340                terminal: false,
341            },
342        );
343        state.records.push(SubagentRecord::Started(started));
344        state.records.push(SubagentRecord::Handoff(handoff));
345
346        Ok(handle)
347    }
348
349    /// Send message.
350    /// This sends a run message through the parent coordination channel and records the
351    /// receipt.
352    pub fn send_message(&self, message: RunMessage) -> Result<MessageReceipt, AgentError> {
353        self.pool.send(message)
354    }
355
356    /// Registers a wake condition for a child run through the parent pool.
357    /// This delegates to `AgentPoolCoordinator::suspend_until`, so it can mutate wake state and
358    /// poll event subscriptions while leaving the child run itself paused.
359    pub fn suspend_until(
360        &self,
361        run_id: RunId,
362        condition: WakeCondition,
363    ) -> Result<WakeRegistration, AgentError> {
364        self.pool.suspend_until(run_id, condition)
365    }
366
367    /// Wraps a child event into the parent subagent record stream.
368    /// This appends a parent subagent record and stores the wrapped-event record in supervisor
369    /// state; it does not re-publish the child event or execute child work.
370    pub fn wrap_child_event(
371        &self,
372        event: AgentEvent,
373    ) -> Result<SubagentWrappedEventRecord, AgentError> {
374        let child_run_id = event.envelope.run_id.clone();
375        let child = self.child(&child_run_id)?;
376        let record = SubagentWrappedEventRecord {
377            parent_run_id: child.request.parent_run_id.clone(),
378            child_run_id: child_run_id.clone(),
379            child_agent_id: child.request.child_agent_id.clone(),
380            original_child_event_id: event.envelope.event_id.clone(),
381            original_child_event_kind: event.envelope.event_kind.clone(),
382            wrapped_event_ref: format!(
383                "subagent.event.{}.{}",
384                child.request.parent_run_id.as_str(),
385                event.envelope.event_id.as_str()
386            ),
387            child_journal_cursor: event.envelope.journal_cursor.clone(),
388            child_journal_ref: child.handle.child_journal_ref.clone(),
389            privacy: event.envelope.privacy,
390        };
391        self.append_parent_subagent_record(
392            &child.request,
393            SubagentRecord::WrappedEvent(record.clone()),
394        )?;
395        self.state()?
396            .records
397            .push(SubagentRecord::WrappedEvent(record.clone()));
398        Ok(record)
399    }
400
401    /// Rolls child usage into parent-visible subagent accounting.
402    /// This dedupes by child usage ref, appends the parent subagent usage record, and stores the
403    /// rollup in supervisor state; it does not call a provider or sink.
404    #[expect(
405        clippy::too_many_arguments,
406        reason = "usage rollup is a durable subagent audit record constructor; a parameter object should be a separate public API pass"
407    )]
408    pub fn rollup_usage(
409        &self,
410        child_run_id: RunId,
411        child_usage_ref: impl Into<String>,
412        input_tokens: u32,
413        output_tokens: u32,
414        cost_micros: Option<u64>,
415        currency: Option<String>,
416        terminal_status: SubagentTerminalStatus,
417    ) -> Result<SubagentUsageRolledUpRecord, AgentError> {
418        let child_usage_ref = child_usage_ref.into();
419        let child = self.child(&child_run_id)?;
420        let dedupe_key = format!("{}:{child_usage_ref}", child_run_id.as_str());
421
422        let mut state = self.state()?;
423        if !state.usage_rollup_dedupe.insert(dedupe_key) {
424            return state
425                .records
426                .iter()
427                .find_map(|record| match record {
428                    SubagentRecord::UsageRolledUp(record)
429                        if record.child_run_id == child_run_id
430                            && record.child_usage_ref == child_usage_ref =>
431                    {
432                        Some(record.clone())
433                    }
434                    _ => None,
435                })
436                .ok_or_else(|| AgentError::contract_violation("usage rollup dedupe lost record"));
437        }
438
439        let record = SubagentUsageRolledUpRecord {
440            child_run_id: child_run_id.clone(),
441            parent_run_id: child.request.parent_run_id.clone(),
442            child_usage_ref: child_usage_ref.clone(),
443            parent_usage_ref: format!("usage.parent.{}.{}", child_run_id.as_str(), child_usage_ref),
444            input_tokens,
445            output_tokens,
446            total_tokens: input_tokens + output_tokens,
447            cost_micros,
448            currency,
449            terminal_status,
450        };
451        self.append_parent_subagent_record(
452            &child.request,
453            SubagentRecord::UsageRolledUp(record.clone()),
454        )?;
455        state
456            .records
457            .push(SubagentRecord::UsageRolledUp(record.clone()));
458        Ok(record)
459    }
460
461    /// Complete child.
462    /// This records child completion in parent-owned lifecycle state and returns the completion
463    /// summary.
464    pub fn complete_child(
465        &self,
466        child_run_id: RunId,
467        terminal_status: SubagentTerminalStatus,
468        result_ref: Option<ContentRefId>,
469        error_ref: Option<String>,
470    ) -> Result<SubagentCompletedRecord, AgentError> {
471        let child = self.child(&child_run_id)?;
472        let effect_id = EffectId::new(format!("effect.subagent.start.{}", child_run_id.as_str()));
473        let record = SubagentCompletedRecord {
474            child_run_id: child_run_id.clone(),
475            parent_run_id: child.request.parent_run_id.clone(),
476            terminal_status: terminal_status.clone(),
477            result_ref,
478            error_ref,
479            policy_outcome: "policy.subagent.terminal".to_string(),
480            effect_result: EffectResult {
481                effect_id,
482                terminal_status: match terminal_status {
483                    SubagentTerminalStatus::Completed | SubagentTerminalStatus::Detached => {
484                        EffectTerminalStatus::Completed
485                    }
486                    SubagentTerminalStatus::Failed => EffectTerminalStatus::Failed,
487                    SubagentTerminalStatus::Cancelled => EffectTerminalStatus::Cancelled,
488                },
489                external_operation_id: None,
490                reconciliation_ref: None,
491                error_ref: None,
492                content_refs: Vec::new(),
493                redacted_summary: "subagent terminal status recorded".to_string(),
494            },
495        };
496        self.append_parent_effect_result(&child.request, record.effect_result.clone())?;
497        self.append_parent_subagent_record(
498            &child.request,
499            SubagentRecord::Completed(record.clone()),
500        )?;
501
502        let mut state = self.state()?;
503        if let Some(child) = state.children.get_mut(&child_run_id) {
504            child.terminal = true;
505        }
506        state
507            .records
508            .push(SubagentRecord::Completed(record.clone()));
509        Ok(record)
510    }
511
512    /// Cancel child.
513    /// This records parent-owned cancellation intent and asks child run control to cancel the
514    /// child.
515    pub fn cancel_child(
516        &self,
517        child_run_id: RunId,
518    ) -> Result<Vec<ChildLifecycleRecord>, AgentError> {
519        let child = self.child(&child_run_id)?;
520        if child.detached {
521            return Err(AgentError::new(
522                AgentErrorKind::ChildLifecycleFailure,
523                RetryClassification::HostConfigurationNeeded,
524                "detached child lifecycle is host-owned after detach acknowledgement",
525            ));
526        }
527        let idempotency_key =
528            IdempotencyKey::new(format!("idem.subagent.cancel.{}", child_run_id.as_str()));
529        let intent = ChildLifecycleRecord::shutdown_intent(
530            child.request.parent_run_id.clone(),
531            child_run_id.clone(),
532            child.lifecycle_policy_refs(),
533            idempotency_key,
534        );
535        self.append_parent_child_lifecycle_record(&child.request, intent.clone())?;
536        self.runtime.cancel_run(&child_run_id)?;
537        let completed = intent.shutdown_completed();
538        self.append_parent_child_lifecycle_record(&child.request, completed.clone())?;
539        let mut state = self.state()?;
540        state
541            .records
542            .push(SubagentRecord::ChildLifecycle(intent.clone()));
543        state
544            .records
545            .push(SubagentRecord::ChildLifecycle(completed.clone()));
546        Ok(vec![intent, completed])
547    }
548
549    /// Detach child.
550    /// This records parent-owned detach intent and transfers lifecycle ownership according to
551    /// policy.
552    pub fn detach_child(
553        &self,
554        child_run_id: RunId,
555        host_ack_ref: impl Into<String>,
556        reclaim_policy_ref: PolicyRef,
557    ) -> Result<Vec<ChildLifecycleRecord>, AgentError> {
558        let child = self.child(&child_run_id)?;
559        let host_ack_ref = host_ack_ref.into();
560        if host_ack_ref.is_empty() {
561            return Err(AgentError::contract_violation(
562                "detached child run requires host acknowledgement",
563            ));
564        }
565        let idempotency_key =
566            IdempotencyKey::new(format!("idem.subagent.detach.{}", child_run_id.as_str()));
567        let intent = ChildLifecycleRecord::detach_intent(
568            child.request.parent_run_id.clone(),
569            child_run_id.clone(),
570            child.lifecycle_policy_refs(),
571            host_ack_ref,
572            reclaim_policy_ref,
573            idempotency_key,
574        );
575        let detached = intent.detached();
576        self.append_parent_child_lifecycle_record(&child.request, intent.clone())?;
577        self.append_parent_child_lifecycle_record(&child.request, detached.clone())?;
578        let mut state = self.state()?;
579        if let Some(child) = state.children.get_mut(&child_run_id) {
580            child.detached = true;
581        }
582        state
583            .records
584            .push(SubagentRecord::ChildLifecycle(intent.clone()));
585        state
586            .records
587            .push(SubagentRecord::ChildLifecycle(detached.clone()));
588        Ok(vec![intent, detached])
589    }
590
591    /// Records.
592    /// This reads the in-memory child lifecycle ledger for assertions or projection.
593    pub fn records(&self) -> Result<Vec<SubagentRecord>, AgentError> {
594        Ok(self.state()?.records.clone())
595    }
596
597    /// Builds the child can be addressed as user chat value.
598    /// This is data construction and performs no I/O, journal append, event publication, or
599    /// process work.
600    pub fn child_can_be_addressed_as_user_chat(
601        &self,
602        child_run_id: &RunId,
603    ) -> Result<bool, AgentError> {
604        self.child(child_run_id)?;
605        Ok(false)
606    }
607
608    /// Builds the child requires terminal rollup or detach value.
609    /// This is data construction and performs no I/O, journal append, event publication, or
610    /// process work.
611    pub fn child_requires_terminal_rollup_or_detach(
612        &self,
613        child_run_id: &RunId,
614    ) -> Result<bool, AgentError> {
615        let child = self.child(child_run_id)?;
616        Ok(!child.detached && !child.terminal)
617    }
618
619    fn append_parent_effect_intent(
620        &self,
621        request: &SubagentRequest,
622        intent: EffectIntent,
623    ) -> Result<JournalCursor, AgentError> {
624        let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
625        let mut base = JournalRecordBase::new(
626            self.runtime.next_journal_seq(),
627            format!(
628                "journal.record.subagent.start.{}",
629                request.request_id.as_str()
630            ),
631            request.parent_run_id.clone(),
632            request.parent_agent_id.clone(),
633            request.child_source.clone(),
634        );
635        base.destination = Some(request.child_destination.clone());
636        base.tags = vec!["feature:subagent".to_string()];
637        base.runtime_package_fingerprint = self
638            .parent_package
639            .fingerprint()
640            .map(|fingerprint| fingerprint.as_str().to_string())?;
641        base.privacy = PrivacyClass::ContentRefsOnly;
642        base.redaction_policy_id = request
643            .child_package_policy
644            .redaction_policy_ref
645            .as_str()
646            .to_string();
647        parent_journal.append(JournalRecord::effect_intent(base, intent))
648    }
649
650    fn append_parent_effect_result(
651        &self,
652        request: &SubagentRequest,
653        result: EffectResult,
654    ) -> Result<JournalCursor, AgentError> {
655        let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
656        let mut base = self.parent_record_base(
657            request,
658            format!(
659                "journal.record.subagent.effect.result.{}",
660                request.request_id.as_str()
661            ),
662        )?;
663        base.source = request.child_source.clone();
664        parent_journal.append(JournalRecord::effect_result(base, result))
665    }
666
667    fn append_parent_subagent_record(
668        &self,
669        request: &SubagentRequest,
670        record: SubagentRecord,
671    ) -> Result<JournalCursor, AgentError> {
672        let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
673        let base = self.parent_record_base(
674            request,
675            format!(
676                "journal.record.{}.{}",
677                record.kind().replace('_', "."),
678                request.request_id.as_str()
679            ),
680        )?;
681        parent_journal.append(JournalRecord::feature_record(
682            base,
683            JournalRecordKind::Subagent,
684            "subagent",
685            record.kind(),
686            EntityRef::new(EntityKind::SubagentRun, request.child_run_id.as_str()),
687            vec![EntityRef::run(request.parent_run_id.clone())],
688            subagent_content_refs(&record),
689            JournalRecordPayload::Subagent(record),
690        ))
691    }
692
693    fn append_parent_child_lifecycle_record(
694        &self,
695        request: &SubagentRequest,
696        record: ChildLifecycleRecord,
697    ) -> Result<JournalCursor, AgentError> {
698        let parent_journal = self.runtime.journal_port(&request.parent_run_id)?;
699        let event_kind = match record.status {
700            crate::subagent_records::ChildLifecycleStatus::Requested => "child_lifecycle_requested",
701            crate::subagent_records::ChildLifecycleStatus::Completed => "child_lifecycle_completed",
702            crate::subagent_records::ChildLifecycleStatus::Detached => "child_lifecycle_detached",
703            crate::subagent_records::ChildLifecycleStatus::Failed => "child_lifecycle_failed",
704        };
705        let base = self.parent_record_base(
706            request,
707            format!(
708                "journal.record.{}.{}",
709                event_kind.replace('_', "."),
710                request.request_id.as_str()
711            ),
712        )?;
713        parent_journal.append(JournalRecord::feature_record(
714            base,
715            JournalRecordKind::ChildLifecycle,
716            "child_lifecycle",
717            event_kind,
718            EntityRef::new(EntityKind::SubagentRun, record.child_run_id.as_str()),
719            vec![EntityRef::run(record.parent_run_id.clone())],
720            Vec::new(),
721            JournalRecordPayload::ChildLifecycle(record),
722        ))
723    }
724
725    fn parent_record_base(
726        &self,
727        request: &SubagentRequest,
728        record_id: String,
729    ) -> Result<JournalRecordBase, AgentError> {
730        let mut base = JournalRecordBase::new(
731            self.runtime.next_journal_seq(),
732            record_id,
733            request.parent_run_id.clone(),
734            request.parent_agent_id.clone(),
735            request.child_source.clone(),
736        );
737        base.destination = Some(request.child_destination.clone());
738        base.tags = vec!["feature:subagent".to_string()];
739        base.runtime_package_fingerprint = self
740            .parent_package
741            .fingerprint()
742            .map(|fingerprint| fingerprint.as_str().to_string())?;
743        base.privacy = PrivacyClass::ContentRefsOnly;
744        base.redaction_policy_id = request
745            .child_package_policy
746            .redaction_policy_ref
747            .as_str()
748            .to_string();
749        Ok(base)
750    }
751
752    fn child(&self, child_run_id: &RunId) -> Result<ChildRunState, AgentError> {
753        self.state()?
754            .children
755            .get(child_run_id)
756            .cloned()
757            .ok_or_else(|| {
758                AgentError::new(
759                    AgentErrorKind::SubagentFailure,
760                    RetryClassification::RepairNeeded,
761                    "child run is not supervised by this subagent supervisor",
762                )
763            })
764    }
765
766    fn state(&self) -> Result<std::sync::MutexGuard<'_, SubagentSupervisorState>, AgentError> {
767        self.state
768            .lock()
769            .map_err(|_| AgentError::contract_violation("subagent supervisor state lock poisoned"))
770    }
771}
772
773#[derive(Clone, Debug)]
774/// Holds child run handle application-layer state or configuration.
775/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
776pub struct ChildRunHandle {
777    /// Stable child run id used for typed lineage, lookup, or dedupe.
778    pub child_run_id: RunId,
779    /// Stable child agent id used for typed lineage, lookup, or dedupe.
780    pub child_agent_id: AgentId,
781    /// Stable parent run id used for typed lineage, lookup, or dedupe.
782    pub parent_run_id: RunId,
783    /// Deterministic child package fingerprint used for stale checks, package
784    /// evidence, or replay comparisons.
785    pub child_package_fingerprint: RuntimePackageFingerprint,
786    /// Typed child journal ref reference. Resolving or executing it is a
787    /// separate policy-gated step.
788    pub child_journal_ref: RunJournalRef,
789    /// Wrapped event filter used by this record or request.
790    pub wrapped_event_filter: CompiledEventFilter,
791    /// Run handle used by this record or request.
792    pub run_handle: RunHandle,
793    /// Child package used by this record or request.
794    pub child_package: ChildRuntimePackage,
795    /// Cursor identifying a replay, export, or subscription position.
796    /// Use it to resume without widening the original scope.
797    pub start_journal_cursor: Option<JournalCursor>,
798}
799
800impl ChildRunHandle {
801    fn clone_without_run_handle(&self) -> ChildRunHandleSnapshot {
802        ChildRunHandleSnapshot {
803            child_journal_ref: self.child_journal_ref.clone(),
804        }
805    }
806}
807
808#[derive(Clone, Debug)]
809struct ChildRunHandleSnapshot {
810    child_journal_ref: RunJournalRef,
811}
812
813#[derive(Clone)]
814struct ChildRunState {
815    request: SubagentRequest,
816    handle: ChildRunHandleSnapshot,
817    detached: bool,
818    terminal: bool,
819}
820
821impl ChildRunState {
822    fn lifecycle_policy_refs(&self) -> Vec<PolicyRef> {
823        let mut refs = vec![
824            self.request
825                .child_package_policy
826                .child_lifecycle_bounds
827                .clone(),
828            self.request.message_policy_ref.clone(),
829            self.request.wake_policy_ref.clone(),
830        ];
831        if let Some(policy) = &self.request.lifecycle_policy_ref {
832            refs.push(policy.clone());
833        }
834        refs
835    }
836}
837
838#[derive(Default)]
839struct SubagentSupervisorState {
840    children: BTreeMap<RunId, ChildRunState>,
841    records: Vec<SubagentRecord>,
842    usage_rollup_dedupe: BTreeSet<String>,
843}
844
845fn child_start_intent(request: &SubagentRequest) -> EffectIntent {
846    let mut intent = EffectIntent::new(
847        EffectId::new(format!(
848            "effect.subagent.start.{}",
849            request.child_run_id.as_str()
850        )),
851        EffectKind::ChildAgentStart,
852        EntityRef::new(EntityKind::SubagentRun, request.child_run_id.as_str()),
853        request.child_source.clone(),
854        "parent requested child subagent start",
855    );
856    intent.destination = Some(request.child_destination.clone());
857    intent.policy_refs = vec![
858        request.message_policy_ref.clone(),
859        request.wake_policy_ref.clone(),
860        request.child_package_policy.child_lifecycle_bounds.clone(),
861        request.child_package_policy.redaction_policy_ref.clone(),
862    ];
863    intent.idempotency_key = Some(request.idempotency_key.clone());
864    if let Some(content_ref) = &request.initial_message_ref {
865        intent.content_refs.push(content_ref.clone());
866    }
867    intent
868}
869
870fn subagent_content_refs(record: &SubagentRecord) -> Vec<ContentRefId> {
871    match record {
872        SubagentRecord::Started(record) => record.effect_intent.content_refs.to_vec(),
873        SubagentRecord::Handoff(record) => record.selected_content_refs.clone(),
874        SubagentRecord::Completed(record) => record.result_ref.iter().cloned().collect(),
875        SubagentRecord::WrappedEvent(_)
876        | SubagentRecord::UsageRolledUp(_)
877        | SubagentRecord::ChildLifecycle(_) => Vec::new(),
878    }
879}
880
881fn pool_member_with_subagent_policies(
882    run_id: RunId,
883    agent_id: AgentId,
884    request: &SubagentRequest,
885) -> AgentPoolMember {
886    let mut member = AgentPoolMember::new(run_id, agent_id)
887        .policy_ref(request.message_policy_ref.clone())
888        .policy_ref(request.wake_policy_ref.clone())
889        .policy_ref(request.child_package_policy.child_lifecycle_bounds.clone());
890    if let Some(policy_ref) = &request.lifecycle_policy_ref {
891        member = member.policy_ref(policy_ref.clone());
892    }
893    member
894}
895
896fn child_event_filter(child_run_id: RunId) -> Result<CompiledEventFilter, AgentError> {
897    EventFilter {
898        run_ids: EventFilterSet::Include(vec![child_run_id]),
899        ..EventFilter::default()
900    }
901    .compile()
902}
903
904/// Builds the subagent runtime event frame value.
905/// This is data construction and performs no I/O, journal append, event publication, or process
906pub fn subagent_runtime_event_frame(
907    parent_run_id: RunId,
908    child_run_id: RunId,
909    child_agent_id: AgentId,
910    event_seq: u64,
911    event_kind: EventKind,
912    journal_cursor: Option<JournalCursor>,
913) -> EventFrame {
914    let event = AgentEvent::with_redacted_summary(
915        EventEnvelope {
916            schema_version: EVENT_SCHEMA_VERSION,
917            event_id: EventId::new(format!("event.subagent.child.{event_seq}")),
918            event_seq,
919            event_family: EventFamily::Run,
920            event_kind,
921            payload_schema_version: 1,
922            timestamp: "1970-01-01T00:00:00Z".to_string(),
923            recorded_at: "1970-01-01T00:00:00Z".to_string(),
924            run_id: child_run_id.clone(),
925            agent_id: child_agent_id,
926            turn_id: None,
927            attempt_id: None,
928            message_id: None,
929            context_item_id: None,
930            trace_id: TraceId::new(format!("trace.subagent.{}", parent_run_id.as_str())),
931            span_id: SpanId::new(format!("span.subagent.child.{event_seq}")),
932            parent_event_id: None,
933            caused_by: None,
934            subject_ref: EntityRef::new(EntityKind::SubagentRun, child_run_id.as_str()),
935            related_refs: vec![EntityRef::run(parent_run_id)],
936            causal_refs: Vec::new(),
937            correlation: EventCorrelation::default(),
938            tags: vec![crate::event::EventTag::new("feature:subagent")],
939            source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.subagent"),
940            destination: Some(DestinationRef::with_kind(
941                DestinationKind::EventStream,
942                "destination.event_stream.subagent",
943            )),
944            policy_refs: Vec::new(),
945            journal_cursor,
946            state_before: None,
947            state_after: None,
948            delivery_semantics: EventDeliverySemantics::JournalBacked,
949            privacy: PrivacyClass::ContentRefsOnly,
950            content_capture: ContentCaptureMode::Off,
951            redaction_policy_id: "policy.redaction.subagent.default".to_string(),
952            runtime_package_fingerprint: "runtime.package.fingerprint.subagent.child".to_string(),
953        },
954        "child event wrapped by subagent supervisor",
955    );
956    EventFrame {
957        cursor: event.envelope.cursor(EventStreamScope::All),
958        event,
959        archive_cursor: None,
960        overflow: None,
961    }
962}