Skip to main content

agent_sdk/
subagent.rs

1//! Subagent support for spawning child agents.
2//!
3//! This module provides the ability to spawn subagents from within an agent.
4//! Subagents are isolated agent instances that run to completion and return
5//! only their final response to the parent agent.
6//!
7//! # Overview
8//!
9//! Subagents are useful for:
10//! - Delegating complex subtasks to specialized agents
11//! - Running parallel investigations
12//! - Isolating context for specific operations
13//!
14//! # Example
15//!
16//! ```ignore
17//! use agent_sdk::subagent::{SubagentTool, SubagentConfig};
18//!
19//! let config = SubagentConfig::new("researcher")
20//!     .with_system_prompt("You are a research specialist...")
21//!     .with_max_turns(10);
22//!
23//! let tool = SubagentTool::new(config, provider, tools, || {
24//!     std::sync::Arc::new(agent_sdk::InMemoryEventStore::new())
25//! });
26//! registry.register(tool);
27//! ```
28//!
29//! # Behavior
30//!
31//! When a subagent runs:
32//! 1. A new isolated thread is created
33//! 2. The subagent runs until completion or max turns
34//! 3. Only the final text response is returned to the parent
35//! 4. The parent does not see the subagent's intermediate tool calls
36
37mod factory;
38
39pub use factory::SubagentFactory;
40
41use crate::events::AgentEvent;
42use crate::hooks::{AgentHooks, DefaultHooks};
43use crate::llm::LlmProvider;
44use crate::stores::{EventStore, InMemoryStore, MessageStore, StateStore};
45use crate::tools::{DynamicToolName, Tool, ToolContext, ToolRegistry};
46use crate::types::{AgentConfig, AgentInput, ThreadId, TokenUsage, ToolResult, ToolTier};
47use anyhow::{Context, Result, bail};
48use serde::{Deserialize, Serialize};
49use serde_json::{Value, json};
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53use tokio_util::sync::CancellationToken;
54
55/// Metadata key for tracking the current subagent nesting depth.
56///
57/// When a subagent spawns another subagent, the depth is incremented.
58/// Tools check this value against the configured maximum depth.
59pub const METADATA_SUBAGENT_DEPTH: &str = "subagent_depth";
60
61/// Metadata key for the maximum allowed subagent nesting depth.
62///
63/// Set by the host application (e.g. bip) to prevent unbounded recursion.
64pub const METADATA_MAX_SUBAGENT_DEPTH: &str = "max_subagent_depth";
65
66/// Configuration for a subagent.
67#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct SubagentConfig {
69    /// Name of the subagent (for identification).
70    pub name: String,
71    /// Human-friendly nickname assigned by the parent (e.g., "Zara").
72    pub nickname: Option<String>,
73    /// System prompt for the subagent.
74    pub system_prompt: String,
75    /// Maximum number of turns before stopping.
76    pub max_turns: Option<usize>,
77    /// Optional timeout in milliseconds.
78    pub timeout_ms: Option<u64>,
79    /// Optional model override for this subagent.
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub model: Option<String>,
82}
83
84impl SubagentConfig {
85    /// Create a new subagent configuration.
86    #[must_use]
87    pub fn new(name: impl Into<String>) -> Self {
88        Self {
89            name: name.into(),
90            nickname: None,
91            system_prompt: String::new(),
92            max_turns: None,
93            timeout_ms: None,
94            model: None,
95        }
96    }
97
98    /// Set the system prompt.
99    #[must_use]
100    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
101        self.system_prompt = prompt.into();
102        self
103    }
104
105    /// Set the maximum number of turns.
106    #[must_use]
107    pub const fn with_max_turns(mut self, max: usize) -> Self {
108        self.max_turns = Some(max);
109        self
110    }
111
112    /// Set the timeout in milliseconds.
113    #[must_use]
114    pub const fn with_timeout_ms(mut self, timeout: u64) -> Self {
115        self.timeout_ms = Some(timeout);
116        self
117    }
118
119    /// Set the model override for this subagent.
120    #[must_use]
121    pub fn with_model(mut self, model: impl Into<String>) -> Self {
122        self.model = Some(model.into());
123        self
124    }
125
126    /// Set a human-friendly nickname for this subagent.
127    #[must_use]
128    pub fn with_nickname(mut self, nickname: impl Into<String>) -> Self {
129        self.nickname = Some(nickname.into());
130        self
131    }
132}
133
134/// Log entry for a single tool call within a subagent.
135#[derive(Clone, Debug, Serialize, Deserialize)]
136pub struct ToolCallLog {
137    /// Tool name.
138    pub name: String,
139    /// Tool display name.
140    pub display_name: String,
141    /// Brief context/args (e.g., file path, command).
142    pub context: String,
143    /// Brief result summary.
144    pub result: String,
145    /// Whether the tool call succeeded.
146    pub success: bool,
147    /// Duration in milliseconds.
148    pub duration_ms: Option<u64>,
149}
150
151/// Result from a subagent execution.
152#[derive(Clone, Debug, Serialize, Deserialize)]
153pub struct SubagentResult {
154    /// Name of the subagent.
155    pub name: String,
156    /// The final text response (only visible part to parent).
157    pub final_response: String,
158    /// Total number of turns taken.
159    pub total_turns: usize,
160    /// Number of tool calls made by the subagent.
161    pub tool_count: u32,
162    /// Log of tool calls made by the subagent.
163    pub tool_logs: Vec<ToolCallLog>,
164    /// Token usage statistics.
165    pub usage: TokenUsage,
166    /// Whether the subagent completed successfully.
167    pub success: bool,
168    /// Duration in milliseconds.
169    pub duration_ms: u64,
170    /// Detailed error information when `success` is false.
171    ///
172    /// Contains the raw error message from the agent event, which may include
173    /// stack trace information or structured error context.
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub error_details: Option<String>,
176    /// Retained for serialization compatibility with older clients.
177    ///
178    /// The previous implementation inferred this from the "last pending tool"
179    /// when any generic error occurred, which was incorrect for LLM or
180    /// transport failures. The field is currently never populated until the
181    /// SDK has deterministic error provenance.
182    #[serde(default, skip_serializing_if = "Option::is_none")]
183    pub failed_tool: Option<String>,
184}
185
186/// Tool for spawning subagents.
187///
188/// This tool allows an agent to spawn a child agent that runs independently
189/// and returns only its final response.
190///
191/// # Example
192///
193/// ```ignore
194/// use agent_sdk::subagent::{SubagentTool, SubagentConfig};
195///
196/// let config = SubagentConfig::new("analyzer")
197///     .with_system_prompt("You analyze code...");
198///
199/// let tool = SubagentTool::new(config, provider.clone(), tools.clone(), || {
200///     std::sync::Arc::new(agent_sdk::InMemoryEventStore::new())
201/// });
202/// ```
203pub struct SubagentTool<P, H = DefaultHooks, M = InMemoryStore, S = InMemoryStore>
204where
205    P: LlmProvider,
206    H: AgentHooks,
207    M: MessageStore,
208    S: StateStore,
209{
210    config: SubagentConfig,
211    provider: Arc<P>,
212    tools: Arc<ToolRegistry<()>>,
213    hooks: Arc<H>,
214    message_store_factory: Arc<dyn Fn() -> M + Send + Sync>,
215    state_store_factory: Arc<dyn Fn() -> S + Send + Sync>,
216    event_store_factory: Arc<dyn Fn() -> Arc<dyn EventStore> + Send + Sync>,
217    /// Cached display name to avoid `Box::leak` on every call.
218    cached_display_name: &'static str,
219    /// Cached description to avoid `Box::leak` on every call.
220    cached_description: &'static str,
221}
222
223impl<P> SubagentTool<P, DefaultHooks, InMemoryStore, InMemoryStore>
224where
225    P: LlmProvider + 'static,
226{
227    /// Create a new subagent tool with default hooks and in-memory message/state stores.
228    #[must_use]
229    pub fn new<EF>(
230        config: SubagentConfig,
231        provider: Arc<P>,
232        tools: Arc<ToolRegistry<()>>,
233        event_store_factory: EF,
234    ) -> Self
235    where
236        EF: Fn() -> Arc<dyn EventStore> + Send + Sync + 'static,
237    {
238        // Cache leaked strings at construction time (bounded by number of tools)
239        let cached_display_name = Box::leak(format!("Subagent: {}", config.name).into_boxed_str());
240        let cached_description = Box::leak(
241            format!(
242                "Spawn a subagent named '{}' to handle a task. The subagent will work independently and return only its final response.",
243                config.name
244            )
245            .into_boxed_str(),
246        );
247        Self {
248            config,
249            provider,
250            tools,
251            hooks: Arc::new(DefaultHooks),
252            message_store_factory: Arc::new(InMemoryStore::new),
253            state_store_factory: Arc::new(InMemoryStore::new),
254            event_store_factory: Arc::new(event_store_factory),
255            cached_display_name,
256            cached_description,
257        }
258    }
259}
260
261impl<P, H, M, S> SubagentTool<P, H, M, S>
262where
263    P: LlmProvider + Clone + 'static,
264    H: AgentHooks + Clone + 'static,
265    M: MessageStore + 'static,
266    S: StateStore + 'static,
267{
268    /// Create with custom hooks.
269    #[must_use]
270    pub fn with_hooks<H2: AgentHooks + Clone + 'static>(
271        self,
272        hooks: Arc<H2>,
273    ) -> SubagentTool<P, H2, M, S> {
274        SubagentTool {
275            config: self.config,
276            provider: self.provider,
277            tools: self.tools,
278            hooks,
279            message_store_factory: self.message_store_factory,
280            state_store_factory: self.state_store_factory,
281            event_store_factory: self.event_store_factory,
282            cached_display_name: self.cached_display_name,
283            cached_description: self.cached_description,
284        }
285    }
286
287    /// Create with custom store factories.
288    #[must_use]
289    pub fn with_stores<M2, S2, MF, SF>(
290        self,
291        message_factory: MF,
292        state_factory: SF,
293    ) -> SubagentTool<P, H, M2, S2>
294    where
295        M2: MessageStore + 'static,
296        S2: StateStore + 'static,
297        MF: Fn() -> M2 + Send + Sync + 'static,
298        SF: Fn() -> S2 + Send + Sync + 'static,
299    {
300        SubagentTool {
301            config: self.config,
302            provider: self.provider,
303            tools: self.tools,
304            hooks: self.hooks,
305            message_store_factory: Arc::new(message_factory),
306            state_store_factory: Arc::new(state_factory),
307            event_store_factory: self.event_store_factory,
308            cached_display_name: self.cached_display_name,
309            cached_description: self.cached_description,
310        }
311    }
312
313    /// Get the subagent configuration.
314    #[must_use]
315    pub const fn config(&self) -> &SubagentConfig {
316        &self.config
317    }
318
319    /// Run the subagent with a task.
320    ///
321    /// The `parent_cancel` token links the subagent's lifecycle to its parent.
322    /// Cancelling the parent token will also cancel the subagent.
323    async fn run_subagent<Ctx>(
324        &self,
325        task: &str,
326        subagent_id: String,
327        parent_ctx: &ToolContext<Ctx>,
328        parent_cancel: CancellationToken,
329    ) -> Result<SubagentResult>
330    where
331        Ctx: Send + Sync + 'static,
332    {
333        use crate::agent_loop::AgentLoop;
334
335        let start = Instant::now();
336        // Each subagent run gets its own thread id, so a shared event store
337        // only returns this subagent's events when queried with `thread_id`.
338        let thread_id = ThreadId::new();
339
340        // Create stores for this subagent run
341        let message_store = (self.message_store_factory)();
342        let state_store = (self.state_store_factory)();
343        let event_store = (self.event_store_factory)();
344
345        // Create agent config with a default max_turns to prevent unbounded execution
346        let agent_config = AgentConfig {
347            max_turns: Some(self.config.max_turns.unwrap_or(100)),
348            system_prompt: self.config.system_prompt.clone(),
349            ..Default::default()
350        };
351
352        // Build the subagent
353        let agent = AgentLoop::new(
354            (*self.provider).clone(),
355            (*self.tools).clone(),
356            (*self.hooks).clone(),
357            message_store,
358            state_store,
359            Arc::clone(&event_store),
360            agent_config,
361        );
362
363        // Create tool context
364        let tool_ctx = ToolContext::new(());
365
366        // Run with a child cancellation token so parent cancellation propagates.
367        // Use `run_abortable` so we can abort the spawned task on timeout
368        // instead of leaving a detached task that continues making API calls.
369        let cancel_token = parent_cancel.child_token();
370        let timeout_cancel = cancel_token.clone();
371        let (state_rx, task_handle) = agent.run_abortable(
372            thread_id.clone(),
373            AgentInput::Text(task.to_string()),
374            tool_ctx,
375            cancel_token,
376        );
377
378        let wait_result = wait_for_subagent_state(self.config.timeout_ms, start, state_rx).await;
379        let mut state = SubagentExecutionState::new();
380        let replay_events = apply_subagent_wait_outcome(
381            classify_subagent_wait_result(wait_result.as_ref()),
382            &self.config,
383            &timeout_cancel,
384            &task_handle,
385            &mut state,
386        );
387
388        if replay_events {
389            replay_subagent_events(
390                &event_store,
391                &thread_id,
392                parent_ctx,
393                &self.config,
394                &subagent_id,
395                &mut state,
396            )
397            .await?;
398        }
399
400        let result = state.into_result(self.config.name.clone(), start);
401        emit_subagent_observability(self, &result);
402        Ok(result)
403    }
404}
405
406fn mark_subagent_timeout(
407    config: &SubagentConfig,
408    final_response: &mut String,
409    error_details: &mut Option<String>,
410    success: &mut bool,
411) {
412    *final_response = "Subagent timed out".to_string();
413    *error_details = Some(format!(
414        "Subagent '{}' timed out after {}ms",
415        config.name,
416        config.timeout_ms.unwrap_or(0)
417    ));
418    *success = false;
419}
420
421fn mark_subagent_disconnected(
422    config: &SubagentConfig,
423    final_response: &mut String,
424    error_details: &mut Option<String>,
425    success: &mut bool,
426) {
427    *final_response = "Subagent ended unexpectedly".to_string();
428    *error_details = Some(format!(
429        "Subagent '{}' ended before returning a final state",
430        config.name
431    ));
432    *success = false;
433}
434
435fn mark_subagent_cancelled(
436    config: &SubagentConfig,
437    final_response: &mut String,
438    error_details: &mut Option<String>,
439    success: &mut bool,
440) {
441    *final_response = "Subagent cancelled".to_string();
442    *error_details = Some(format!("Subagent '{}' was cancelled", config.name));
443    *success = false;
444}
445
446fn mark_subagent_awaiting_confirmation(
447    config: &SubagentConfig,
448    final_response: &mut String,
449    error_details: &mut Option<String>,
450    success: &mut bool,
451) {
452    *final_response = "Subagent requires confirmation".to_string();
453    *error_details = Some(format!(
454        "Subagent '{}' requested confirmation, which is not supported in nested runs",
455        config.name
456    ));
457    *success = false;
458}
459
460fn mark_subagent_agent_error(
461    final_response: &mut String,
462    error_details: &mut Option<String>,
463    success: &mut bool,
464    message: &str,
465) {
466    *final_response = message.to_string();
467    *error_details = Some(message.to_string());
468    *success = false;
469}
470
471type SubagentWaitResult = Result<
472    Result<crate::types::AgentRunState, tokio::sync::oneshot::error::RecvError>,
473    tokio::time::error::Elapsed,
474>;
475
476struct SubagentExecutionState {
477    final_response: String,
478    total_turns: usize,
479    tool_count: u32,
480    tool_logs: Vec<ToolCallLog>,
481    pending_tools: HashMap<String, (String, String)>,
482    total_usage: TokenUsage,
483    success: bool,
484    error_details: Option<String>,
485    failed_tool: Option<String>,
486}
487
488impl SubagentExecutionState {
489    fn new() -> Self {
490        Self {
491            final_response: String::new(),
492            total_turns: 0,
493            tool_count: 0,
494            tool_logs: Vec::new(),
495            pending_tools: HashMap::new(),
496            total_usage: TokenUsage::default(),
497            success: true,
498            error_details: None,
499            failed_tool: None,
500        }
501    }
502
503    fn into_result(self, name: String, start: Instant) -> SubagentResult {
504        SubagentResult {
505            name,
506            final_response: self.final_response,
507            total_turns: self.total_turns,
508            tool_count: self.tool_count,
509            tool_logs: self.tool_logs,
510            usage: self.total_usage,
511            success: self.success,
512            duration_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
513            error_details: self.error_details,
514            failed_tool: self.failed_tool,
515        }
516    }
517}
518
519fn subagent_total_tokens(total_usage: &TokenUsage) -> u64 {
520    u64::from(total_usage.input_tokens) + u64::from(total_usage.output_tokens)
521}
522
523struct SubagentProgressUpdate<'a> {
524    subagent_id: &'a str,
525    total_turns: usize,
526    total_usage: &'a TokenUsage,
527    tool_name: String,
528    tool_context: String,
529    completed: bool,
530    success: bool,
531    tool_count: u32,
532}
533
534enum SubagentWaitOutcome {
535    ReplayEvents,
536    TimedOut,
537    Disconnected,
538    Cancelled,
539    AwaitingConfirmation,
540    Error(crate::types::AgentError),
541}
542
543async fn wait_for_subagent_state(
544    timeout_ms: Option<u64>,
545    start: Instant,
546    state_rx: tokio::sync::oneshot::Receiver<crate::types::AgentRunState>,
547) -> Option<SubagentWaitResult> {
548    let timeout_duration = timeout_ms.map(Duration::from_millis);
549    if timeout_duration.is_some_and(|timeout| timeout.saturating_sub(start.elapsed()).is_zero()) {
550        return None;
551    }
552    if let Some(timeout) = timeout_duration {
553        let remaining = timeout.saturating_sub(start.elapsed());
554        Some(tokio::time::timeout(remaining, state_rx).await)
555    } else {
556        Some(Ok(state_rx.await))
557    }
558}
559
560fn classify_subagent_wait_result(wait_result: Option<&SubagentWaitResult>) -> SubagentWaitOutcome {
561    match wait_result {
562        Some(Ok(Ok(
563            crate::types::AgentRunState::Done { .. } | crate::types::AgentRunState::Refusal { .. },
564        ))) => SubagentWaitOutcome::ReplayEvents,
565        Some(Ok(Ok(crate::types::AgentRunState::Cancelled { .. }))) => {
566            SubagentWaitOutcome::Cancelled
567        }
568        Some(Ok(Ok(crate::types::AgentRunState::AwaitingConfirmation { .. }))) => {
569            SubagentWaitOutcome::AwaitingConfirmation
570        }
571        Some(Ok(Ok(crate::types::AgentRunState::Error(error)))) => {
572            SubagentWaitOutcome::Error(error.clone())
573        }
574        // `AgentRunState` is `#[non_exhaustive]`; an unrecognized future run
575        // state is surfaced as an error so the subagent failure is not
576        // silently treated as a successful replay.
577        Some(Ok(Ok(_))) => SubagentWaitOutcome::Error(crate::types::AgentError::new(
578            "subagent returned an unrecognized run state".to_string(),
579            false,
580        )),
581        Some(Ok(Err(_))) => SubagentWaitOutcome::Disconnected,
582        None | Some(Err(_)) => SubagentWaitOutcome::TimedOut,
583    }
584}
585
586fn apply_subagent_wait_outcome(
587    outcome: SubagentWaitOutcome,
588    config: &SubagentConfig,
589    timeout_cancel: &CancellationToken,
590    task_handle: &tokio::task::JoinHandle<()>,
591    state: &mut SubagentExecutionState,
592) -> bool {
593    match outcome {
594        SubagentWaitOutcome::ReplayEvents => true,
595        SubagentWaitOutcome::TimedOut => {
596            timeout_cancel.cancel();
597            task_handle.abort();
598            mark_subagent_timeout(
599                config,
600                &mut state.final_response,
601                &mut state.error_details,
602                &mut state.success,
603            );
604            false
605        }
606        SubagentWaitOutcome::Disconnected => {
607            timeout_cancel.cancel();
608            task_handle.abort();
609            mark_subagent_disconnected(
610                config,
611                &mut state.final_response,
612                &mut state.error_details,
613                &mut state.success,
614            );
615            false
616        }
617        SubagentWaitOutcome::Cancelled => {
618            timeout_cancel.cancel();
619            task_handle.abort();
620            mark_subagent_cancelled(
621                config,
622                &mut state.final_response,
623                &mut state.error_details,
624                &mut state.success,
625            );
626            false
627        }
628        SubagentWaitOutcome::AwaitingConfirmation => {
629            timeout_cancel.cancel();
630            task_handle.abort();
631            mark_subagent_awaiting_confirmation(
632                config,
633                &mut state.final_response,
634                &mut state.error_details,
635                &mut state.success,
636            );
637            false
638        }
639        SubagentWaitOutcome::Error(error) => {
640            timeout_cancel.cancel();
641            task_handle.abort();
642            mark_subagent_agent_error(
643                &mut state.final_response,
644                &mut state.error_details,
645                &mut state.success,
646                &error.message,
647            );
648            false
649        }
650    }
651}
652
653async fn replay_subagent_events<Ctx: Send + Sync + 'static>(
654    event_store: &Arc<dyn EventStore>,
655    thread_id: &ThreadId,
656    parent_ctx: &ToolContext<Ctx>,
657    config: &SubagentConfig,
658    subagent_id: &str,
659    state: &mut SubagentExecutionState,
660) -> Result<()> {
661    for envelope in event_store.get_events(thread_id).await? {
662        match envelope.event {
663            AgentEvent::Text {
664                message_id: _,
665                text,
666            } => {
667                state.final_response.push_str(&text);
668            }
669            AgentEvent::ToolCallStart {
670                id, name, input, ..
671            } => {
672                state.tool_count += 1;
673                let context = extract_tool_context(&name, &input);
674                state
675                    .pending_tools
676                    .insert(id, (name.clone(), context.clone()));
677
678                emit_subagent_progress_if_possible(
679                    parent_ctx,
680                    config,
681                    SubagentProgressUpdate {
682                        subagent_id,
683                        total_turns: state.total_turns,
684                        total_usage: &state.total_usage,
685                        tool_name: name,
686                        tool_context: context,
687                        completed: false,
688                        success: false,
689                        tool_count: state.tool_count,
690                    },
691                )
692                .await;
693            }
694            AgentEvent::ToolCallEnd {
695                id,
696                name,
697                display_name,
698                result,
699            } => {
700                let context = state
701                    .pending_tools
702                    .remove(&id)
703                    .map(|(_, ctx)| ctx)
704                    .unwrap_or_default();
705                let tool_success = result.success;
706                state.tool_logs.push(ToolCallLog {
707                    name: name.clone(),
708                    display_name: display_name.clone(),
709                    context: context.clone(),
710                    result: summarize_tool_result(&name, &result),
711                    success: tool_success,
712                    duration_ms: result.duration_ms,
713                });
714
715                emit_subagent_progress_if_possible(
716                    parent_ctx,
717                    config,
718                    SubagentProgressUpdate {
719                        subagent_id,
720                        total_turns: state.total_turns,
721                        total_usage: &state.total_usage,
722                        tool_name: name,
723                        tool_context: context,
724                        completed: true,
725                        success: tool_success,
726                        tool_count: state.tool_count,
727                    },
728                )
729                .await;
730            }
731            AgentEvent::TurnComplete { turn, usage, .. } => {
732                state.total_turns = turn;
733                state.total_usage.add(&usage);
734            }
735            AgentEvent::Done {
736                total_turns: turns, ..
737            } => {
738                state.total_turns = turns;
739                break;
740            }
741            AgentEvent::Refusal { text, .. } => {
742                let refusal_message =
743                    text.unwrap_or_else(|| "Subagent refused the request".to_string());
744                state.error_details = Some(refusal_message.clone());
745                state.final_response = refusal_message;
746                state.success = false;
747                break;
748            }
749            AgentEvent::Error { message, .. } => {
750                state.error_details = Some(message.clone());
751                state.final_response = message;
752                state.success = false;
753                break;
754            }
755            _ => {}
756        }
757    }
758    Ok(())
759}
760
761async fn emit_subagent_progress_if_possible<Ctx: Send + Sync + 'static>(
762    parent_ctx: &ToolContext<Ctx>,
763    config: &SubagentConfig,
764    update: SubagentProgressUpdate<'_>,
765) {
766    if let Err(error) = emit_subagent_progress(parent_ctx, config, update).await {
767        log::warn!("Failed to emit subagent progress event: {error}");
768    }
769}
770
771async fn emit_subagent_progress<Ctx: Send + Sync + 'static>(
772    parent_ctx: &ToolContext<Ctx>,
773    config: &SubagentConfig,
774    SubagentProgressUpdate {
775        subagent_id,
776        total_turns,
777        total_usage,
778        tool_name,
779        tool_context,
780        completed,
781        success,
782        tool_count,
783    }: SubagentProgressUpdate<'_>,
784) -> Result<()> {
785    let max_turns = config.max_turns.map(usize_to_u32_saturating);
786    let current_turn = Some(usize_to_u32_saturating(total_turns));
787
788    parent_ctx
789        .emit_event(AgentEvent::SubagentProgress {
790            subagent_id: subagent_id.to_string(),
791            subagent_name: config.name.clone(),
792            nickname: config.nickname.clone(),
793            child_thread_id: None,
794            child_root_task_id: None,
795            subagent_task_id: None,
796            max_turns,
797            current_turn,
798            model: config.model.clone(),
799            tool_name,
800            tool_context,
801            completed,
802            success,
803            tool_count,
804            total_tokens: subagent_total_tokens(total_usage),
805        })
806        .await
807}
808
809fn usize_to_u32_saturating(value: usize) -> u32 {
810    u32::try_from(value).unwrap_or(u32::MAX)
811}
812
813#[cfg(feature = "otel")]
814fn emit_subagent_observability<P, H, M, S>(tool: &SubagentTool<P, H, M, S>, result: &SubagentResult)
815where
816    P: LlmProvider + Clone + 'static,
817    H: AgentHooks + Clone + 'static,
818    M: MessageStore + 'static,
819    S: StateStore + 'static,
820{
821    use crate::observability::{attrs, baggage, langfuse, metrics, provider_name, spans};
822    use opentelemetry::Context;
823    use opentelemetry::KeyValue;
824    use opentelemetry::trace::{Span, TraceContextExt};
825
826    // Capture the parent turn's SpanContext **before** starting the
827    // synthetic subagent span so the link points at the caller, not at
828    // ourselves.  When the agent runs without an active parent span
829    // (e.g. running a subagent from a top-level test) the captured
830    // context is invalid and `link_to_parent_turn` becomes a no-op.
831    let parent_ctx = Context::current();
832    let parent_span_context = parent_ctx.span().span_context().clone();
833
834    let normalized_provider_name = provider_name::normalize(tool.provider.provider());
835    let request_model = tool.provider.model().to_string();
836    let agent_name = tool.config.name.clone();
837
838    let mut span = spans::start_internal_span(
839        "invoke_agent",
840        vec![
841            KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
842            KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name.clone()),
843            KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, normalized_provider_name),
844            KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.clone()),
845            KeyValue::new(attrs::SDK_RUN_MODE, "loop"),
846        ],
847    );
848    baggage::copy_baggage_to_active_span(&mut span);
849    langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
850    if parent_span_context.is_valid() {
851        spans::link_to_parent_turn(
852            &mut span,
853            &parent_span_context.trace_id().to_string(),
854            &parent_span_context.span_id().to_string(),
855        );
856    }
857    let outcome = if result.success { "done" } else { "error" };
858    span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
859    span.set_attribute(attrs::kv_i64(
860        attrs::SDK_TOTAL_TURNS,
861        i64::try_from(result.total_turns).unwrap_or(0),
862    ));
863    span.set_attribute(attrs::kv_i64(
864        attrs::GEN_AI_USAGE_INPUT_TOKENS,
865        i64::from(result.usage.input_tokens),
866    ));
867    span.set_attribute(attrs::kv_i64(
868        attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
869        i64::from(result.usage.output_tokens),
870    ));
871    if outcome == "error" {
872        spans::set_span_error(&mut span, "agent_error", "subagent invocation failed");
873    }
874    span.end();
875
876    // ── Metrics ─────────────────────────────────────────────────────
877    //
878    // Subagent invocations get a dedicated counter so dashboards can
879    // tell parent-turn LLM activity apart from delegated subagent
880    // work. We also fold the subagent's token usage into the shared
881    // `gen_ai.client.token.usage` histogram so global token-spend
882    // accounting stays accurate; the `gen_ai.operation.name` label
883    // discriminates `invoke_agent` from regular `chat` calls.
884    let metrics_handle = metrics::Metrics::global();
885    metrics_handle.subagent_invocations.add(
886        1,
887        &[
888            KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name),
889            KeyValue::new(attrs::SDK_OUTCOME, outcome),
890        ],
891    );
892    record_subagent_token_usage(
893        &metrics_handle,
894        result,
895        normalized_provider_name,
896        &request_model,
897    );
898}
899
900#[cfg(feature = "otel")]
901fn record_subagent_token_usage(
902    metrics: &crate::observability::metrics::Metrics,
903    result: &SubagentResult,
904    provider_name: &'static str,
905    request_model: &str,
906) {
907    use crate::observability::attrs;
908    use opentelemetry::KeyValue;
909
910    let entries: [(u32, &'static str); 2] = [
911        (result.usage.input_tokens, "input"),
912        (result.usage.output_tokens, "output"),
913    ];
914
915    for (count, token_type) in entries {
916        if count == 0 {
917            continue;
918        }
919        metrics.token_usage.record(
920            u64::from(count),
921            &[
922                KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
923                KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider_name),
924                KeyValue::new("gen_ai.token.type", token_type),
925                KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.to_string()),
926            ],
927        );
928    }
929}
930
931#[cfg(not(feature = "otel"))]
932const fn emit_subagent_observability<P, H, M, S>(
933    _tool: &SubagentTool<P, H, M, S>,
934    _result: &SubagentResult,
935) where
936    P: LlmProvider + Clone + 'static,
937    H: AgentHooks + Clone + 'static,
938    M: MessageStore + 'static,
939    S: StateStore + 'static,
940{
941}
942
943/// Extracts context information from tool input for display.
944fn extract_tool_context(name: &str, input: &Value) -> String {
945    match name {
946        "read" => input
947            .get("file_path")
948            .or_else(|| input.get("path"))
949            .and_then(Value::as_str)
950            .unwrap_or("")
951            .to_string(),
952        "write" | "edit" => input
953            .get("file_path")
954            .or_else(|| input.get("path"))
955            .and_then(Value::as_str)
956            .unwrap_or("")
957            .to_string(),
958        "bash" => {
959            let cmd = input.get("command").and_then(Value::as_str).unwrap_or("");
960            // Truncate long commands (UTF-8 safe)
961            if cmd.len() > 60 {
962                format!("{}...", crate::primitive_tools::truncate_str(cmd, 57))
963            } else {
964                cmd.to_string()
965            }
966        }
967        "glob" | "grep" => input
968            .get("pattern")
969            .and_then(Value::as_str)
970            .unwrap_or("")
971            .to_string(),
972        "web_search" => input
973            .get("query")
974            .and_then(Value::as_str)
975            .unwrap_or("")
976            .to_string(),
977        _ => String::new(),
978    }
979}
980
981/// Summarizes tool result for logging.
982fn summarize_tool_result(name: &str, result: &ToolResult) -> String {
983    if !result.success {
984        let first_line = result.output.lines().next().unwrap_or("Error");
985        return if first_line.len() > 50 {
986            format!(
987                "{}...",
988                crate::primitive_tools::truncate_str(first_line, 47)
989            )
990        } else {
991            first_line.to_string()
992        };
993    }
994
995    match name {
996        "read" => {
997            let line_count = result.output.lines().count();
998            format!("{line_count} lines")
999        }
1000        "write" => "wrote file".to_string(),
1001        "edit" => "edited".to_string(),
1002        "bash" => {
1003            let lines: Vec<&str> = result.output.lines().collect();
1004            if lines.is_empty() {
1005                "done".to_string()
1006            } else if lines.len() == 1 {
1007                let line = lines[0];
1008                if line.len() > 50 {
1009                    format!("{}...", crate::primitive_tools::truncate_str(line, 47))
1010                } else {
1011                    line.to_string()
1012                }
1013            } else {
1014                format!("{} lines", lines.len())
1015            }
1016        }
1017        "glob" => {
1018            let count = result.output.lines().count();
1019            format!("{count} files")
1020        }
1021        "grep" => {
1022            let count = result.output.lines().count();
1023            format!("{count} matches")
1024        }
1025        _ => {
1026            let line_count = result.output.lines().count();
1027            if line_count == 0 {
1028                "done".to_string()
1029            } else {
1030                format!("{line_count} lines")
1031            }
1032        }
1033    }
1034}
1035
1036impl<P, H, M, S, Ctx> Tool<Ctx> for SubagentTool<P, H, M, S>
1037where
1038    P: LlmProvider + Clone + 'static,
1039    H: AgentHooks + Clone + 'static,
1040    M: MessageStore + 'static,
1041    S: StateStore + 'static,
1042    Ctx: Send + Sync + 'static,
1043{
1044    type Name = DynamicToolName;
1045
1046    fn name(&self) -> DynamicToolName {
1047        DynamicToolName::new(format!("subagent_{}", self.config.name))
1048    }
1049
1050    fn display_name(&self) -> &'static str {
1051        self.cached_display_name
1052    }
1053
1054    fn description(&self) -> &'static str {
1055        self.cached_description
1056    }
1057
1058    fn input_schema(&self) -> Value {
1059        json!({
1060            "type": "object",
1061            "properties": {
1062                "task": {
1063                    "type": "string",
1064                    "description": "The task or question for the subagent to handle"
1065                }
1066            },
1067            "required": ["task"]
1068        })
1069    }
1070
1071    fn tier(&self) -> ToolTier {
1072        // Subagent spawning requires confirmation
1073        ToolTier::Confirm
1074    }
1075
1076    async fn execute(&self, ctx: &ToolContext<Ctx>, input: Value) -> Result<ToolResult> {
1077        let task = input
1078            .get("task")
1079            .and_then(Value::as_str)
1080            .context("Missing 'task' parameter")?;
1081
1082        // ── Depth limit enforcement ───────────────────────────────────
1083        let current_depth = ctx
1084            .metadata
1085            .get(METADATA_SUBAGENT_DEPTH)
1086            .and_then(Value::as_u64)
1087            .unwrap_or(0);
1088        let max_depth = ctx
1089            .metadata
1090            .get(METADATA_MAX_SUBAGENT_DEPTH)
1091            .and_then(Value::as_u64)
1092            .unwrap_or(3); // default: 3 levels deep
1093
1094        if current_depth >= max_depth {
1095            bail!(
1096                "Subagent depth limit exceeded ({current_depth}/{max_depth}). \
1097                 Cannot spawn nested subagent '{}' — maximum nesting depth reached.",
1098                self.config.name
1099            );
1100        }
1101
1102        // ── Thread limit enforcement (semaphore) ──────────────────────
1103        let _permit = if let Some(ref sem) = ctx.subagent_semaphore() {
1104            match sem.clone().try_acquire_owned() {
1105                Ok(permit) => Some(permit),
1106                Err(_) => {
1107                    return Ok(ToolResult {
1108                        success: false,
1109                        output: format!(
1110                            "Cannot spawn subagent '{}': maximum concurrent subagent limit reached. \
1111                             Try again when another subagent completes.",
1112                            self.config.name
1113                        ),
1114                        data: None,
1115                        documents: Vec::new(),
1116                        duration_ms: Some(0),
1117                    });
1118                }
1119            }
1120        } else {
1121            None
1122        };
1123
1124        // Generate a unique ID for this subagent execution
1125        let subagent_id = format!(
1126            "{}_{:x}",
1127            self.config.name,
1128            std::time::SystemTime::now()
1129                .duration_since(std::time::UNIX_EPOCH)
1130                .unwrap_or_default()
1131                .as_nanos()
1132        );
1133
1134        // Use the context's cancellation token if available, otherwise create a standalone one.
1135        // This ensures that when a parent agent is cancelled, subagents are also cancelled.
1136        let cancel_token = ctx.cancel_token().unwrap_or_default();
1137
1138        let result = self
1139            .run_subagent(task, subagent_id, ctx, cancel_token)
1140            .await?;
1141
1142        Ok(ToolResult {
1143            success: result.success,
1144            output: result.final_response.clone(),
1145            data: Some(serde_json::to_value(&result).unwrap_or_default()),
1146            documents: Vec::new(),
1147            duration_ms: Some(result.duration_ms),
1148        })
1149    }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155    use crate::authority::{EventAuthority, LocalEventAuthority};
1156    use crate::events::{AgentEvent, AgentEventEnvelope};
1157    use crate::llm::{ChatOutcome, ChatRequest, ChatResponse, ContentBlock, StopReason, Usage};
1158    use crate::stores::{EventStore, InMemoryEventStore, StoredTurnEvents};
1159    use anyhow::{Context, Result, bail};
1160    use async_trait::async_trait;
1161    use tokio::sync::Mutex;
1162
1163    #[derive(Clone)]
1164    struct TestProvider {
1165        responses: Arc<Mutex<Vec<ChatOutcome>>>,
1166        delay: Option<Duration>,
1167    }
1168
1169    impl TestProvider {
1170        fn new(responses: Vec<ChatOutcome>) -> Self {
1171            Self {
1172                responses: Arc::new(Mutex::new(responses)),
1173                delay: None,
1174            }
1175        }
1176
1177        fn with_delay(mut self, delay: Duration) -> Self {
1178            self.delay = Some(delay);
1179            self
1180        }
1181
1182        fn text_response(text: &str) -> ChatOutcome {
1183            ChatOutcome::Success(ChatResponse {
1184                id: "resp_text".to_string(),
1185                content: vec![ContentBlock::Text {
1186                    text: text.to_string(),
1187                }],
1188                model: "test-model".to_string(),
1189                stop_reason: Some(StopReason::EndTurn),
1190                usage: Usage {
1191                    input_tokens: 10,
1192                    output_tokens: 20,
1193                    cached_input_tokens: 0,
1194                    cache_creation_input_tokens: 0,
1195                },
1196            })
1197        }
1198
1199        fn tool_use_response(tool_id: &str, tool_name: &str, input: Value) -> ChatOutcome {
1200            ChatOutcome::Success(ChatResponse {
1201                id: "resp_tool".to_string(),
1202                content: vec![ContentBlock::ToolUse {
1203                    id: tool_id.to_string(),
1204                    name: tool_name.to_string(),
1205                    input,
1206                    thought_signature: None,
1207                }],
1208                model: "test-model".to_string(),
1209                stop_reason: Some(StopReason::ToolUse),
1210                usage: Usage {
1211                    input_tokens: 15,
1212                    output_tokens: 25,
1213                    cached_input_tokens: 0,
1214                    cache_creation_input_tokens: 0,
1215                },
1216            })
1217        }
1218
1219        fn refusal_response(text: Option<&str>) -> ChatOutcome {
1220            let content = text.map_or_else(Vec::new, |text| {
1221                vec![ContentBlock::Text {
1222                    text: text.to_string(),
1223                }]
1224            });
1225            ChatOutcome::Success(ChatResponse {
1226                id: "resp_refusal".to_string(),
1227                content,
1228                model: "test-model".to_string(),
1229                stop_reason: Some(StopReason::Refusal),
1230                usage: Usage {
1231                    input_tokens: 12,
1232                    output_tokens: 0,
1233                    cached_input_tokens: 0,
1234                    cache_creation_input_tokens: 0,
1235                },
1236            })
1237        }
1238    }
1239
1240    #[async_trait]
1241    impl LlmProvider for TestProvider {
1242        async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1243            if let Some(delay) = self.delay {
1244                tokio::time::sleep(delay).await;
1245            }
1246
1247            let mut responses = self.responses.lock().await;
1248            if responses.is_empty() {
1249                Ok(Self::text_response("default"))
1250            } else {
1251                Ok(responses.remove(0))
1252            }
1253        }
1254
1255        fn model(&self) -> &'static str {
1256            "test-model"
1257        }
1258
1259        fn provider(&self) -> &'static str {
1260            "mock"
1261        }
1262    }
1263
1264    struct TestEchoTool;
1265
1266    impl Tool<()> for TestEchoTool {
1267        type Name = DynamicToolName;
1268
1269        fn name(&self) -> DynamicToolName {
1270            DynamicToolName::new("echo")
1271        }
1272
1273        fn display_name(&self) -> &'static str {
1274            "Echo"
1275        }
1276
1277        fn description(&self) -> &'static str {
1278            "Echo the input"
1279        }
1280
1281        fn input_schema(&self) -> Value {
1282            json!({
1283                "type": "object",
1284                "properties": {
1285                    "message": { "type": "string" }
1286                },
1287                "required": ["message"]
1288            })
1289        }
1290
1291        fn tier(&self) -> ToolTier {
1292            ToolTier::Observe
1293        }
1294
1295        async fn execute(&self, _ctx: &ToolContext<()>, input: Value) -> Result<ToolResult> {
1296            let message = input
1297                .get("message")
1298                .and_then(Value::as_str)
1299                .context("missing echo message")?;
1300            Ok(ToolResult::success(format!("Echo: {message}")))
1301        }
1302    }
1303
1304    #[derive(Clone, Default)]
1305    struct RecordingEventStore {
1306        inner: Arc<InMemoryEventStore>,
1307        appended: Arc<Mutex<Vec<(ThreadId, usize, AgentEventEnvelope)>>>,
1308    }
1309
1310    impl RecordingEventStore {
1311        async fn appended_events(&self) -> Vec<(ThreadId, usize, AgentEventEnvelope)> {
1312            self.appended.lock().await.clone()
1313        }
1314    }
1315
1316    #[async_trait]
1317    impl EventStore for RecordingEventStore {
1318        async fn append(
1319            &self,
1320            thread_id: &ThreadId,
1321            turn: usize,
1322            envelope: AgentEventEnvelope,
1323        ) -> Result<()> {
1324            self.appended
1325                .lock()
1326                .await
1327                .push((thread_id.clone(), turn, envelope.clone()));
1328            self.inner.append(thread_id, turn, envelope).await
1329        }
1330
1331        async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1332            self.inner.finish_turn(thread_id, turn).await
1333        }
1334
1335        async fn get_turn(
1336            &self,
1337            thread_id: &ThreadId,
1338            turn: usize,
1339        ) -> Result<Option<StoredTurnEvents>> {
1340            self.inner.get_turn(thread_id, turn).await
1341        }
1342
1343        async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1344            self.inner.get_turns(thread_id).await
1345        }
1346
1347        async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1348            self.inner.clear(thread_id).await
1349        }
1350    }
1351
1352    #[derive(Clone, Default)]
1353    struct AlwaysFailAppendEventStore;
1354
1355    #[async_trait]
1356    impl EventStore for AlwaysFailAppendEventStore {
1357        async fn append(
1358            &self,
1359            _thread_id: &ThreadId,
1360            _turn: usize,
1361            _envelope: AgentEventEnvelope,
1362        ) -> Result<()> {
1363            bail!("append failed")
1364        }
1365
1366        async fn finish_turn(&self, _thread_id: &ThreadId, _turn: usize) -> Result<()> {
1367            Ok(())
1368        }
1369
1370        async fn get_turn(
1371            &self,
1372            _thread_id: &ThreadId,
1373            _turn: usize,
1374        ) -> Result<Option<StoredTurnEvents>> {
1375            Ok(None)
1376        }
1377
1378        async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1379            Ok(Vec::new())
1380        }
1381
1382        async fn clear(&self, _thread_id: &ThreadId) -> Result<()> {
1383            Ok(())
1384        }
1385    }
1386
1387    #[derive(Clone, Default)]
1388    struct NoReadAfterFailureEventStore {
1389        inner: Arc<InMemoryEventStore>,
1390    }
1391
1392    #[async_trait]
1393    impl EventStore for NoReadAfterFailureEventStore {
1394        async fn append(
1395            &self,
1396            thread_id: &ThreadId,
1397            turn: usize,
1398            envelope: AgentEventEnvelope,
1399        ) -> Result<()> {
1400            self.inner.append(thread_id, turn, envelope).await
1401        }
1402
1403        async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1404            self.inner.finish_turn(thread_id, turn).await
1405        }
1406
1407        async fn get_turn(
1408            &self,
1409            thread_id: &ThreadId,
1410            turn: usize,
1411        ) -> Result<Option<StoredTurnEvents>> {
1412            self.inner.get_turn(thread_id, turn).await
1413        }
1414
1415        async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1416            bail!("get_events should not be called after subagent failure")
1417        }
1418
1419        async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1420            self.inner.clear(thread_id).await
1421        }
1422    }
1423
1424    #[derive(Clone, Default)]
1425    struct PanicProvider;
1426
1427    #[async_trait]
1428    impl LlmProvider for PanicProvider {
1429        async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1430            // Panic isolation catches this at the run-loop
1431            // boundary and turns it into a structured `Error`, so the
1432            // parent classifies it as an agent error rather than a
1433            // dropped channel. The literal text is asserted on by
1434            // `test_run_subagent_panic_classified_as_error_not_disconnected`.
1435            panic!("panic provider should disconnect subagent");
1436        }
1437
1438        fn model(&self) -> &'static str {
1439            "panic-model"
1440        }
1441
1442        fn provider(&self) -> &'static str {
1443            "panic"
1444        }
1445    }
1446
1447    #[test]
1448    fn test_subagent_config_builder() {
1449        let config = SubagentConfig::new("test")
1450            .with_system_prompt("Test prompt")
1451            .with_max_turns(5)
1452            .with_timeout_ms(30000);
1453
1454        assert_eq!(config.name, "test");
1455        assert_eq!(config.system_prompt, "Test prompt");
1456        assert_eq!(config.max_turns, Some(5));
1457        assert_eq!(config.timeout_ms, Some(30000));
1458    }
1459
1460    #[test]
1461    fn test_subagent_config_defaults() {
1462        let config = SubagentConfig::new("default");
1463
1464        assert_eq!(config.name, "default");
1465        assert!(config.system_prompt.is_empty());
1466        assert_eq!(config.max_turns, None);
1467        assert_eq!(config.timeout_ms, None);
1468    }
1469
1470    #[test]
1471    fn test_subagent_result_serialization() -> Result<()> {
1472        let result = SubagentResult {
1473            name: "test".to_string(),
1474            final_response: "Done".to_string(),
1475            total_turns: 3,
1476            tool_count: 5,
1477            tool_logs: vec![
1478                ToolCallLog {
1479                    name: "read".to_string(),
1480                    display_name: "Read file".to_string(),
1481                    context: "/tmp/test.rs".to_string(),
1482                    result: "50 lines".to_string(),
1483                    success: true,
1484                    duration_ms: Some(10),
1485                },
1486                ToolCallLog {
1487                    name: "grep".to_string(),
1488                    display_name: "Grep TODO".to_string(),
1489                    context: "TODO".to_string(),
1490                    result: "3 matches".to_string(),
1491                    success: true,
1492                    duration_ms: Some(5),
1493                },
1494            ],
1495            usage: TokenUsage::default(),
1496            success: true,
1497            duration_ms: 1000,
1498            error_details: None,
1499            failed_tool: None,
1500        };
1501
1502        let json = serde_json::to_string(&result).context("failed to serialize subagent result")?;
1503        assert!(json.contains("test"));
1504        assert!(json.contains("Done"));
1505        assert!(json.contains("tool_count"));
1506        assert!(json.contains("tool_logs"));
1507        assert!(json.contains("/tmp/test.rs"));
1508
1509        Ok(())
1510    }
1511
1512    #[test]
1513    fn test_subagent_result_field_extraction() -> Result<()> {
1514        let result = SubagentResult {
1515            name: "explore".to_string(),
1516            final_response: "Found 3 config files".to_string(),
1517            total_turns: 2,
1518            tool_count: 5,
1519            tool_logs: vec![ToolCallLog {
1520                name: "glob".to_string(),
1521                display_name: "Glob config files".to_string(),
1522                context: "**/*.toml".to_string(),
1523                result: "3 files".to_string(),
1524                success: true,
1525                duration_ms: Some(15),
1526            }],
1527            usage: TokenUsage {
1528                input_tokens: 1500,
1529                output_tokens: 500,
1530                ..Default::default()
1531            },
1532            success: true,
1533            duration_ms: 2500,
1534            error_details: None,
1535            failed_tool: None,
1536        };
1537
1538        let value =
1539            serde_json::to_value(&result).context("failed to convert subagent result to json")?;
1540
1541        let tool_count = value.get("tool_count").and_then(Value::as_u64);
1542        assert_eq!(tool_count, Some(5));
1543
1544        let usage = value.get("usage").context("missing usage field")?;
1545        let input_tokens = usage.get("input_tokens").and_then(Value::as_u64);
1546        let output_tokens = usage.get("output_tokens").and_then(Value::as_u64);
1547        assert_eq!(input_tokens, Some(1500));
1548        assert_eq!(output_tokens, Some(500));
1549
1550        let logs = value
1551            .get("tool_logs")
1552            .and_then(Value::as_array)
1553            .context("missing tool_logs array")?;
1554        assert_eq!(logs.len(), 1);
1555
1556        let first_log = &logs[0];
1557        assert_eq!(first_log.get("name").and_then(Value::as_str), Some("glob"));
1558        assert_eq!(
1559            first_log.get("context").and_then(Value::as_str),
1560            Some("**/*.toml")
1561        );
1562        assert_eq!(
1563            first_log.get("result").and_then(Value::as_str),
1564            Some("3 files")
1565        );
1566        assert_eq!(
1567            first_log.get("success").and_then(Value::as_bool),
1568            Some(true)
1569        );
1570
1571        Ok(())
1572    }
1573
1574    #[tokio::test]
1575    async fn test_run_subagent_uses_isolated_child_thread() -> Result<()> {
1576        let event_store = Arc::new(RecordingEventStore::default());
1577        let provider = Arc::new(TestProvider::new(vec![
1578            TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1579            TestProvider::text_response("Subagent complete"),
1580        ]));
1581        let mut tools = ToolRegistry::new();
1582        tools.register(TestEchoTool);
1583
1584        let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1585            let store = Arc::clone(&event_store);
1586            move || -> Arc<dyn EventStore> { store.clone() }
1587        });
1588        let parent_thread = ThreadId::new();
1589        let parent_ctx = ToolContext::new(()).with_event_store(
1590            event_store.clone(),
1591            parent_thread.clone(),
1592            1,
1593            Arc::new(LocalEventAuthority::new()),
1594        );
1595
1596        let result = tool
1597            .run_subagent(
1598                "Inspect the repo",
1599                "subagent_1".to_string(),
1600                &parent_ctx,
1601                CancellationToken::new(),
1602            )
1603            .await?;
1604
1605        assert!(result.success);
1606        assert_eq!(result.tool_count, 1);
1607        assert_eq!(result.tool_logs.len(), 1);
1608
1609        let parent_turn = event_store
1610            .get_turn(&parent_thread, 1)
1611            .await?
1612            .context("missing parent turn")?;
1613        assert!(!parent_turn.events.is_empty());
1614        assert!(
1615            parent_turn
1616                .events
1617                .iter()
1618                .all(|envelope| { matches!(envelope.event, AgentEvent::SubagentProgress { .. }) })
1619        );
1620
1621        let appended = event_store.appended_events().await;
1622        let child_thread = appended
1623            .iter()
1624            .map(|(thread_id, _, _)| thread_id.clone())
1625            .find(|thread_id| thread_id != &parent_thread)
1626            .context("missing child thread events")?;
1627        let child_turn = event_store
1628            .get_turn(&child_thread, 1)
1629            .await?
1630            .context("missing child turn")?;
1631        let child_events = event_store.get_events(&child_thread).await?;
1632
1633        assert!(
1634            child_turn
1635                .events
1636                .iter()
1637                .any(|envelope| { matches!(envelope.event, AgentEvent::ToolCallStart { .. }) })
1638        );
1639        assert!(
1640            child_events
1641                .iter()
1642                .any(|envelope| { matches!(envelope.event, AgentEvent::Done { .. }) })
1643        );
1644
1645        Ok(())
1646    }
1647
1648    #[tokio::test]
1649    async fn test_run_subagent_timeout_marks_result_as_failed() -> Result<()> {
1650        let event_store = Arc::new(NoReadAfterFailureEventStore::default());
1651        let provider = Arc::new(
1652            TestProvider::new(vec![TestProvider::text_response("Too late")])
1653                .with_delay(Duration::from_millis(50)),
1654        );
1655        let tool = SubagentTool::new(
1656            SubagentConfig::new("worker").with_timeout_ms(10),
1657            provider,
1658            Arc::new(ToolRegistry::<()>::new()),
1659            {
1660                let store = Arc::clone(&event_store);
1661                move || -> Arc<dyn EventStore> { store.clone() }
1662            },
1663        );
1664
1665        let result = tool
1666            .run_subagent(
1667                "Take too long",
1668                "subagent_timeout".to_string(),
1669                &ToolContext::new(()),
1670                CancellationToken::new(),
1671            )
1672            .await?;
1673
1674        assert!(!result.success);
1675        assert_eq!(result.final_response, "Subagent timed out");
1676        assert!(
1677            result
1678                .error_details
1679                .context("missing timeout details")?
1680                .contains("timed out")
1681        );
1682
1683        Ok(())
1684    }
1685
1686    #[tokio::test]
1687    async fn test_run_subagent_progress_failures_do_not_abort_successful_runs() -> Result<()> {
1688        let provider = Arc::new(TestProvider::new(vec![
1689            TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1690            TestProvider::text_response("Subagent complete"),
1691        ]));
1692        let mut tools = ToolRegistry::new();
1693        tools.register(TestEchoTool);
1694
1695        let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1696            move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) }
1697        });
1698        let parent_ctx = ToolContext::new(()).with_event_store(
1699            Arc::new(AlwaysFailAppendEventStore),
1700            ThreadId::new(),
1701            1,
1702            Arc::new(LocalEventAuthority::new()),
1703        );
1704
1705        let result = tool
1706            .run_subagent(
1707                "Inspect the repo",
1708                "subagent_progress".to_string(),
1709                &parent_ctx,
1710                CancellationToken::new(),
1711            )
1712            .await?;
1713
1714        assert!(result.success);
1715        assert_eq!(result.final_response, "Subagent complete");
1716        assert_eq!(result.tool_count, 1);
1717
1718        Ok(())
1719    }
1720
1721    #[tokio::test]
1722    async fn test_run_subagent_panic_classified_as_error_not_disconnected() -> Result<()> {
1723        // A subagent whose LLM provider panics must surface as a
1724        // structured `Error` — NOT the `Disconnected` ("ended
1725        // unexpectedly") path. Before panic isolation the
1726        // panic unwound the spawned run task, dropped `state_tx`, and
1727        // the parent saw a `RecvError` it misclassified as
1728        // `Disconnected`. The run-loop catch_unwind boundary now turns
1729        // the panic into `AgentRunState::Error`, which the subagent
1730        // classifies correctly.
1731        let tool = SubagentTool::new(
1732            SubagentConfig::new("worker"),
1733            Arc::new(PanicProvider),
1734            Arc::new(ToolRegistry::<()>::new()),
1735            move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
1736        );
1737
1738        let result = tool
1739            .run_subagent(
1740                "Crash",
1741                "subagent_panic".to_string(),
1742                &ToolContext::new(()),
1743                CancellationToken::new(),
1744            )
1745            .await?;
1746
1747        assert!(!result.success);
1748        // The disconnect path must NOT be taken: a caught panic is a
1749        // structured agent error, not an opaque dropped channel.
1750        assert_ne!(result.final_response, "Subagent ended unexpectedly");
1751        let details = result
1752            .error_details
1753            .context("panicking subagent must carry structured error details")?;
1754        assert!(
1755            !details.contains("ended before returning a final state"),
1756            "panic must not be classified as Disconnected; got {details:?}",
1757        );
1758        assert!(
1759            details.contains("panicked"),
1760            "structured error should reflect the panic; got {details:?}",
1761        );
1762        assert!(
1763            details.contains("panic provider should disconnect subagent"),
1764            "structured error should carry the original panic message; got {details:?}",
1765        );
1766
1767        Ok(())
1768    }
1769
1770    #[tokio::test]
1771    async fn test_run_subagent_refusal_marks_result_as_failed() -> Result<()> {
1772        let tool = SubagentTool::new(
1773            SubagentConfig::new("worker"),
1774            Arc::new(TestProvider::new(vec![TestProvider::refusal_response(
1775                Some("Refused for policy reasons"),
1776            )])),
1777            Arc::new(ToolRegistry::<()>::new()),
1778            || Arc::new(InMemoryEventStore::new()),
1779        );
1780
1781        let result = tool
1782            .run_subagent(
1783                "Refuse",
1784                "subagent_refusal".to_string(),
1785                &ToolContext::new(()),
1786                CancellationToken::new(),
1787            )
1788            .await?;
1789
1790        assert!(!result.success);
1791        assert_eq!(result.final_response, "Refused for policy reasons");
1792        assert_eq!(
1793            result.error_details.as_deref(),
1794            Some("Refused for policy reasons")
1795        );
1796
1797        Ok(())
1798    }
1799
1800    #[tokio::test]
1801    async fn test_run_subagent_cancelled_marks_result_as_failed() -> Result<()> {
1802        let tool = SubagentTool::new(
1803            SubagentConfig::new("worker"),
1804            Arc::new(
1805                TestProvider::new(vec![TestProvider::text_response("Too late")])
1806                    .with_delay(Duration::from_millis(50)),
1807            ),
1808            Arc::new(ToolRegistry::<()>::new()),
1809            || Arc::new(InMemoryEventStore::new()),
1810        );
1811        let cancel_token = CancellationToken::new();
1812        cancel_token.cancel();
1813
1814        let result = tool
1815            .run_subagent(
1816                "Cancel",
1817                "subagent_cancelled".to_string(),
1818                &ToolContext::new(()),
1819                cancel_token,
1820            )
1821            .await?;
1822
1823        assert!(!result.success);
1824        assert_eq!(result.final_response, "Subagent cancelled");
1825        assert!(
1826            result
1827                .error_details
1828                .context("missing cancellation details")?
1829                .contains("cancelled")
1830        );
1831
1832        Ok(())
1833    }
1834
1835    #[tokio::test]
1836    async fn test_run_subagent_llm_error_does_not_infer_failed_tool() -> Result<()> {
1837        let provider = Arc::new(TestProvider::new(vec![
1838            ChatOutcome::ServerError("llm transport failed".to_string()),
1839            ChatOutcome::ServerError("llm transport failed".to_string()),
1840            ChatOutcome::ServerError("llm transport failed".to_string()),
1841            ChatOutcome::ServerError("llm transport failed".to_string()),
1842            ChatOutcome::ServerError("llm transport failed".to_string()),
1843            ChatOutcome::ServerError("llm transport failed".to_string()),
1844        ]));
1845        let mut tools = ToolRegistry::new();
1846        tools.register(TestEchoTool);
1847
1848        let tool = SubagentTool::new(
1849            SubagentConfig::new("worker"),
1850            provider,
1851            Arc::new(tools),
1852            || Arc::new(InMemoryEventStore::new()),
1853        );
1854
1855        let result = tool
1856            .run_subagent(
1857                "Trigger an llm failure",
1858                "subagent_llm_error".to_string(),
1859                &ToolContext::new(()),
1860                CancellationToken::new(),
1861            )
1862            .await?;
1863
1864        assert!(!result.success);
1865        assert!(result.failed_tool.is_none());
1866        assert!(
1867            result
1868                .error_details
1869                .as_deref()
1870                .unwrap_or_default()
1871                .contains("Server error")
1872        );
1873
1874        Ok(())
1875    }
1876
1877    #[tokio::test]
1878    async fn test_replay_subagent_events_stops_after_error() -> Result<()> {
1879        let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1880        let thread_id = ThreadId::new();
1881        let authority = LocalEventAuthority::new();
1882        event_store
1883            .append(
1884                &thread_id,
1885                1,
1886                authority.wrap(AgentEvent::Error {
1887                    message: "subagent boom".to_string(),
1888                    recoverable: false,
1889                }),
1890            )
1891            .await?;
1892        event_store
1893            .append(
1894                &thread_id,
1895                1,
1896                authority.wrap(AgentEvent::Text {
1897                    message_id: "msg_after_error".to_string(),
1898                    text: "should not be appended".to_string(),
1899                }),
1900            )
1901            .await?;
1902
1903        let mut state = SubagentExecutionState::new();
1904        replay_subagent_events(
1905            &event_store,
1906            &thread_id,
1907            &ToolContext::new(()),
1908            &SubagentConfig::new("worker"),
1909            "subagent_error",
1910            &mut state,
1911        )
1912        .await?;
1913
1914        assert!(!state.success);
1915        assert_eq!(state.final_response, "subagent boom");
1916        assert_eq!(state.error_details.as_deref(), Some("subagent boom"));
1917
1918        Ok(())
1919    }
1920}