Skip to main content

agent_sdk/
subagent.rs

1//! Subagent support for spawning child agents.
2//!
3//! This module provides the ability to spawn subagents from within an agent.
4//! Subagents are isolated agent instances that run to completion and return
5//! only their final response to the parent agent.
6//!
7//! # Overview
8//!
9//! Subagents are useful for:
10//! - Delegating complex subtasks to specialized agents
11//! - Running parallel investigations
12//! - Isolating context for specific operations
13//!
14//! # Example
15//!
16//! ```ignore
17//! use agent_sdk::subagent::{SubagentTool, SubagentConfig};
18//!
19//! let config = SubagentConfig::new("researcher")
20//!     .with_system_prompt("You are a research specialist...")
21//!     .with_max_turns(10);
22//!
23//! let tool = SubagentTool::new(config, provider, tools, || {
24//!     std::sync::Arc::new(agent_sdk::InMemoryEventStore::new())
25//! });
26//! registry.register(tool);
27//! ```
28//!
29//! # Behavior
30//!
31//! When a subagent runs:
32//! 1. A new isolated thread is created
33//! 2. The subagent runs until completion or max turns
34//! 3. Only the final text response is returned to the parent
35//! 4. The parent does not see the subagent's intermediate tool calls
36
37mod factory;
38
39pub use factory::SubagentFactory;
40
41use crate::events::AgentEvent;
42use crate::hooks::{AgentHooks, DefaultHooks};
43use crate::llm::LlmProvider;
44use crate::stores::{EventStore, InMemoryStore, MessageStore, StateStore};
45use crate::tools::{DynamicToolName, Tool, ToolContext, ToolRegistry};
46use crate::types::{AgentConfig, AgentInput, ThreadId, TokenUsage, ToolResult, ToolTier};
47use anyhow::{Context, Result, bail};
48use serde::{Deserialize, Serialize};
49use serde_json::{Value, json};
50use std::collections::HashMap;
51use std::sync::{Arc, Mutex, OnceLock, PoisonError};
52use std::time::{Duration, Instant};
53use tokio_util::sync::CancellationToken;
54
55/// Return the (display name, description) pair for a subagent of the given
56/// name, interning the leaked `&'static str`s so repeated construction with the
57/// same name reuses one allocation.
58///
59/// `SubagentFactory` builds a fresh `SubagentTool` per request, so leaking on
60/// every construction (the previous approach) grew memory without bound in
61/// long-running hosts. Interning bounds the leak to the number of distinct
62/// subagent names.
63fn cached_tool_strings(name: &str) -> (&'static str, &'static str) {
64    static CACHE: OnceLock<Mutex<HashMap<String, (&'static str, &'static str)>>> = OnceLock::new();
65    let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
66    let mut map = cache.lock().unwrap_or_else(PoisonError::into_inner);
67    if let Some(entry) = map.get(name) {
68        return *entry;
69    }
70    let display: &'static str = Box::leak(format!("Subagent: {name}").into_boxed_str());
71    let description: &'static str = Box::leak(
72        format!(
73            "Spawn a subagent named '{name}' to handle a task. The subagent will work independently and return only its final response."
74        )
75        .into_boxed_str(),
76    );
77    *map.entry(name.to_string())
78        .or_insert((display, description))
79}
80
81/// Metadata key for tracking the current subagent nesting depth.
82///
83/// When a subagent spawns another subagent, the depth is incremented.
84/// Tools check this value against the configured maximum depth.
85pub const METADATA_SUBAGENT_DEPTH: &str = "subagent_depth";
86
87/// Metadata key for the maximum allowed subagent nesting depth.
88///
89/// Set by the host application (e.g. bip) to prevent unbounded recursion.
90pub const METADATA_MAX_SUBAGENT_DEPTH: &str = "max_subagent_depth";
91
92/// Configuration for a subagent.
93#[derive(Clone, Debug, Serialize, Deserialize)]
94pub struct SubagentConfig {
95    /// Name of the subagent (for identification).
96    pub name: String,
97    /// Human-friendly nickname assigned by the parent (e.g., "Zara").
98    pub nickname: Option<String>,
99    /// System prompt for the subagent.
100    pub system_prompt: String,
101    /// Maximum number of turns before stopping.
102    pub max_turns: Option<usize>,
103    /// Optional timeout in milliseconds.
104    pub timeout_ms: Option<u64>,
105    /// Optional model override for this subagent.
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub model: Option<String>,
108}
109
110impl SubagentConfig {
111    /// Create a new subagent configuration.
112    #[must_use]
113    pub fn new(name: impl Into<String>) -> Self {
114        Self {
115            name: name.into(),
116            nickname: None,
117            system_prompt: String::new(),
118            max_turns: None,
119            timeout_ms: None,
120            model: None,
121        }
122    }
123
124    /// Set the system prompt.
125    #[must_use]
126    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
127        self.system_prompt = prompt.into();
128        self
129    }
130
131    /// Set the maximum number of turns.
132    #[must_use]
133    pub const fn with_max_turns(mut self, max: usize) -> Self {
134        self.max_turns = Some(max);
135        self
136    }
137
138    /// Set the timeout in milliseconds.
139    #[must_use]
140    pub const fn with_timeout_ms(mut self, timeout: u64) -> Self {
141        self.timeout_ms = Some(timeout);
142        self
143    }
144
145    /// Set the model override for this subagent.
146    #[must_use]
147    pub fn with_model(mut self, model: impl Into<String>) -> Self {
148        self.model = Some(model.into());
149        self
150    }
151
152    /// Set a human-friendly nickname for this subagent.
153    #[must_use]
154    pub fn with_nickname(mut self, nickname: impl Into<String>) -> Self {
155        self.nickname = Some(nickname.into());
156        self
157    }
158}
159
160/// Log entry for a single tool call within a subagent.
161#[derive(Clone, Debug, Serialize, Deserialize)]
162pub struct ToolCallLog {
163    /// Tool name.
164    pub name: String,
165    /// Tool display name.
166    pub display_name: String,
167    /// Brief context/args (e.g., file path, command).
168    pub context: String,
169    /// Brief result summary.
170    pub result: String,
171    /// Whether the tool call succeeded.
172    pub success: bool,
173    /// Duration in milliseconds.
174    pub duration_ms: Option<u64>,
175}
176
177/// Result from a subagent execution.
178#[derive(Clone, Debug, Serialize, Deserialize)]
179pub struct SubagentResult {
180    /// Name of the subagent.
181    pub name: String,
182    /// The final text response (only visible part to parent).
183    pub final_response: String,
184    /// Total number of turns taken.
185    pub total_turns: usize,
186    /// Number of tool calls made by the subagent.
187    pub tool_count: u32,
188    /// Log of tool calls made by the subagent.
189    pub tool_logs: Vec<ToolCallLog>,
190    /// Token usage statistics.
191    pub usage: TokenUsage,
192    /// Whether the subagent completed successfully.
193    pub success: bool,
194    /// Duration in milliseconds.
195    pub duration_ms: u64,
196    /// Detailed error information when `success` is false.
197    ///
198    /// Contains the raw error message from the agent event, which may include
199    /// stack trace information or structured error context.
200    #[serde(default, skip_serializing_if = "Option::is_none")]
201    pub error_details: Option<String>,
202    /// Retained for serialization compatibility with older clients.
203    ///
204    /// The previous implementation inferred this from the "last pending tool"
205    /// when any generic error occurred, which was incorrect for LLM or
206    /// transport failures. The field is currently never populated until the
207    /// SDK has deterministic error provenance.
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub failed_tool: Option<String>,
210}
211
212/// Tool for spawning subagents.
213///
214/// This tool allows an agent to spawn a child agent that runs independently
215/// and returns only its final response.
216///
217/// # Example
218///
219/// ```ignore
220/// use agent_sdk::subagent::{SubagentTool, SubagentConfig};
221///
222/// let config = SubagentConfig::new("analyzer")
223///     .with_system_prompt("You analyze code...");
224///
225/// let tool = SubagentTool::new(config, provider.clone(), tools.clone(), || {
226///     std::sync::Arc::new(agent_sdk::InMemoryEventStore::new())
227/// });
228/// ```
229pub struct SubagentTool<P, H = DefaultHooks, M = InMemoryStore, S = InMemoryStore>
230where
231    P: LlmProvider,
232    H: AgentHooks,
233    M: MessageStore,
234    S: StateStore,
235{
236    config: SubagentConfig,
237    provider: Arc<P>,
238    tools: Arc<ToolRegistry<()>>,
239    hooks: Arc<H>,
240    message_store_factory: Arc<dyn Fn() -> M + Send + Sync>,
241    state_store_factory: Arc<dyn Fn() -> S + Send + Sync>,
242    event_store_factory: Arc<dyn Fn() -> Arc<dyn EventStore> + Send + Sync>,
243    /// Cached display name to avoid `Box::leak` on every call.
244    cached_display_name: &'static str,
245    /// Cached description to avoid `Box::leak` on every call.
246    cached_description: &'static str,
247}
248
249impl<P> SubagentTool<P, DefaultHooks, InMemoryStore, InMemoryStore>
250where
251    P: LlmProvider + 'static,
252{
253    /// Create a new subagent tool with default hooks and in-memory message/state stores.
254    #[must_use]
255    pub fn new<EF>(
256        config: SubagentConfig,
257        provider: Arc<P>,
258        tools: Arc<ToolRegistry<()>>,
259        event_store_factory: EF,
260    ) -> Self
261    where
262        EF: Fn() -> Arc<dyn EventStore> + Send + Sync + 'static,
263    {
264        // Intern the leaked strings so repeated construction with the same
265        // name reuses one allocation (factories build a tool per request).
266        let (cached_display_name, cached_description) = cached_tool_strings(&config.name);
267        Self {
268            config,
269            provider,
270            tools,
271            hooks: Arc::new(DefaultHooks),
272            message_store_factory: Arc::new(InMemoryStore::new),
273            state_store_factory: Arc::new(InMemoryStore::new),
274            event_store_factory: Arc::new(event_store_factory),
275            cached_display_name,
276            cached_description,
277        }
278    }
279}
280
281impl<P, H, M, S> SubagentTool<P, H, M, S>
282where
283    P: LlmProvider + Clone + 'static,
284    H: AgentHooks + Clone + 'static,
285    M: MessageStore + 'static,
286    S: StateStore + 'static,
287{
288    /// Create with custom hooks.
289    #[must_use]
290    pub fn with_hooks<H2: AgentHooks + Clone + 'static>(
291        self,
292        hooks: Arc<H2>,
293    ) -> SubagentTool<P, H2, M, S> {
294        SubagentTool {
295            config: self.config,
296            provider: self.provider,
297            tools: self.tools,
298            hooks,
299            message_store_factory: self.message_store_factory,
300            state_store_factory: self.state_store_factory,
301            event_store_factory: self.event_store_factory,
302            cached_display_name: self.cached_display_name,
303            cached_description: self.cached_description,
304        }
305    }
306
307    /// Create with custom store factories.
308    #[must_use]
309    pub fn with_stores<M2, S2, MF, SF>(
310        self,
311        message_factory: MF,
312        state_factory: SF,
313    ) -> SubagentTool<P, H, M2, S2>
314    where
315        M2: MessageStore + 'static,
316        S2: StateStore + 'static,
317        MF: Fn() -> M2 + Send + Sync + 'static,
318        SF: Fn() -> S2 + Send + Sync + 'static,
319    {
320        SubagentTool {
321            config: self.config,
322            provider: self.provider,
323            tools: self.tools,
324            hooks: self.hooks,
325            message_store_factory: Arc::new(message_factory),
326            state_store_factory: Arc::new(state_factory),
327            event_store_factory: self.event_store_factory,
328            cached_display_name: self.cached_display_name,
329            cached_description: self.cached_description,
330        }
331    }
332
333    /// Get the subagent configuration.
334    #[must_use]
335    pub const fn config(&self) -> &SubagentConfig {
336        &self.config
337    }
338
339    /// Run the subagent with a task.
340    ///
341    /// The `parent_cancel` token links the subagent's lifecycle to its parent.
342    /// Cancelling the parent token will also cancel the subagent.
343    async fn run_subagent<Ctx>(
344        &self,
345        task: &str,
346        subagent_id: String,
347        parent_ctx: &ToolContext<Ctx>,
348        parent_cancel: CancellationToken,
349    ) -> Result<SubagentResult>
350    where
351        Ctx: Send + Sync + 'static,
352    {
353        use crate::agent_loop::AgentLoop;
354
355        let start = Instant::now();
356        // Each subagent run gets its own thread id, so a shared event store
357        // only returns this subagent's events when queried with `thread_id`.
358        let thread_id = ThreadId::new();
359
360        // Create stores for this subagent run
361        let message_store = (self.message_store_factory)();
362        let state_store = (self.state_store_factory)();
363        let event_store = (self.event_store_factory)();
364
365        // Create agent config with a default max_turns to prevent unbounded execution
366        let agent_config = AgentConfig {
367            max_turns: Some(self.config.max_turns.unwrap_or(100)),
368            system_prompt: self.config.system_prompt.clone(),
369            ..Default::default()
370        };
371
372        // Build the subagent
373        let agent = AgentLoop::new(
374            (*self.provider).clone(),
375            (*self.tools).clone(),
376            (*self.hooks).clone(),
377            message_store,
378            state_store,
379            Arc::clone(&event_store),
380            agent_config,
381        );
382
383        // Derive the child context from the parent so the subagent-depth guard
384        // and the concurrency semaphore actually propagate into nested runs.
385        let tool_ctx = build_subagent_child_context(parent_ctx);
386
387        // Run with a child cancellation token so parent cancellation propagates.
388        // Use `run_abortable` so we can abort the spawned task on timeout
389        // instead of leaving a detached task that continues making API calls.
390        let cancel_token = parent_cancel.child_token();
391        let timeout_cancel = cancel_token.clone();
392        let (state_rx, task_handle) = agent.run_abortable(
393            thread_id.clone(),
394            AgentInput::Text(task.to_string()),
395            tool_ctx,
396            cancel_token,
397        );
398
399        let wait_result = wait_for_subagent_state(self.config.timeout_ms, start, state_rx).await;
400        let mut state = SubagentExecutionState::new();
401        let replay_events = apply_subagent_wait_outcome(
402            classify_subagent_wait_result(wait_result.as_ref()),
403            &self.config,
404            &timeout_cancel,
405            &task_handle,
406            &mut state,
407        );
408
409        if replay_events {
410            replay_subagent_events(
411                &event_store,
412                &thread_id,
413                parent_ctx,
414                &self.config,
415                &subagent_id,
416                &mut state,
417            )
418            .await?;
419        }
420
421        let result = state.into_result(self.config.name.clone(), start);
422        emit_subagent_observability(self, &result);
423        Ok(result)
424    }
425}
426
427/// Derive the tool context for a subagent run from its parent.
428///
429/// Copies the parent metadata (so host-set policy such as
430/// [`METADATA_MAX_SUBAGENT_DEPTH`] carries through), increments
431/// [`METADATA_SUBAGENT_DEPTH`] by one, and propagates the shared subagent
432/// concurrency semaphore. Without this the depth guard is dead code and nested
433/// spawning is unbounded.
434fn build_subagent_child_context<Ctx>(parent_ctx: &ToolContext<Ctx>) -> ToolContext<()> {
435    let parent_depth = parent_ctx
436        .metadata
437        .get(METADATA_SUBAGENT_DEPTH)
438        .and_then(Value::as_u64)
439        .unwrap_or(0);
440
441    let mut child = ToolContext::new(());
442    child.metadata.clone_from(&parent_ctx.metadata);
443    child
444        .metadata
445        .insert(METADATA_SUBAGENT_DEPTH.to_string(), json!(parent_depth + 1));
446
447    if let Some(semaphore) = parent_ctx.subagent_semaphore() {
448        child = child.with_subagent_semaphore(semaphore);
449    }
450    child
451}
452
453type SubagentWaitResult = Result<
454    Result<crate::types::AgentRunState, tokio::sync::oneshot::error::RecvError>,
455    tokio::time::error::Elapsed,
456>;
457
458struct SubagentExecutionState {
459    final_response: String,
460    /// Message id whose text currently populates `final_response`. Used to keep
461    /// only the **last** assistant message's text (the documented contract),
462    /// rather than concatenating every interim turn's commentary.
463    final_response_message_id: Option<String>,
464    total_turns: usize,
465    tool_count: u32,
466    tool_logs: Vec<ToolCallLog>,
467    pending_tools: HashMap<String, (String, String)>,
468    total_usage: TokenUsage,
469    success: bool,
470    error_details: Option<String>,
471    failed_tool: Option<String>,
472}
473
474impl SubagentExecutionState {
475    fn new() -> Self {
476        Self {
477            final_response: String::new(),
478            final_response_message_id: None,
479            total_turns: 0,
480            tool_count: 0,
481            tool_logs: Vec::new(),
482            pending_tools: HashMap::new(),
483            total_usage: TokenUsage::default(),
484            success: true,
485            error_details: None,
486            failed_tool: None,
487        }
488    }
489
490    /// Record a failure: set the parent-visible response, the detailed error,
491    /// and clear the success flag.
492    fn fail(&mut self, response: impl Into<String>, details: String) {
493        self.final_response = response.into();
494        self.error_details = Some(details);
495        self.success = false;
496    }
497
498    fn into_result(self, name: String, start: Instant) -> SubagentResult {
499        SubagentResult {
500            name,
501            final_response: self.final_response,
502            total_turns: self.total_turns,
503            tool_count: self.tool_count,
504            tool_logs: self.tool_logs,
505            usage: self.total_usage,
506            success: self.success,
507            duration_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
508            error_details: self.error_details,
509            failed_tool: self.failed_tool,
510        }
511    }
512}
513
514fn subagent_total_tokens(total_usage: &TokenUsage) -> u64 {
515    u64::from(total_usage.input_tokens) + u64::from(total_usage.output_tokens)
516}
517
518struct SubagentProgressUpdate<'a> {
519    subagent_id: &'a str,
520    total_turns: usize,
521    total_usage: &'a TokenUsage,
522    tool_name: String,
523    tool_context: String,
524    completed: bool,
525    success: bool,
526    tool_count: u32,
527}
528
529enum SubagentWaitOutcome {
530    ReplayEvents,
531    TimedOut,
532    Disconnected,
533    Cancelled,
534    AwaitingConfirmation,
535    Error(crate::types::AgentError),
536}
537
538async fn wait_for_subagent_state(
539    timeout_ms: Option<u64>,
540    start: Instant,
541    state_rx: tokio::sync::oneshot::Receiver<crate::types::AgentRunState>,
542) -> Option<SubagentWaitResult> {
543    let timeout_duration = timeout_ms.map(Duration::from_millis);
544    if timeout_duration.is_some_and(|timeout| timeout.saturating_sub(start.elapsed()).is_zero()) {
545        return None;
546    }
547    if let Some(timeout) = timeout_duration {
548        let remaining = timeout.saturating_sub(start.elapsed());
549        Some(tokio::time::timeout(remaining, state_rx).await)
550    } else {
551        Some(Ok(state_rx.await))
552    }
553}
554
555fn classify_subagent_wait_result(wait_result: Option<&SubagentWaitResult>) -> SubagentWaitOutcome {
556    match wait_result {
557        Some(Ok(Ok(
558            crate::types::AgentRunState::Done { .. } | crate::types::AgentRunState::Refusal { .. },
559        ))) => SubagentWaitOutcome::ReplayEvents,
560        Some(Ok(Ok(crate::types::AgentRunState::Cancelled { .. }))) => {
561            SubagentWaitOutcome::Cancelled
562        }
563        Some(Ok(Ok(crate::types::AgentRunState::AwaitingConfirmation { .. }))) => {
564            SubagentWaitOutcome::AwaitingConfirmation
565        }
566        Some(Ok(Ok(crate::types::AgentRunState::Error(error)))) => {
567            SubagentWaitOutcome::Error(error.clone())
568        }
569        // `AgentRunState` is `#[non_exhaustive]`; an unrecognized future run
570        // state is surfaced as an error so the subagent failure is not
571        // silently treated as a successful replay.
572        Some(Ok(Ok(_))) => SubagentWaitOutcome::Error(crate::types::AgentError::new(
573            "subagent returned an unrecognized run state".to_string(),
574            false,
575        )),
576        Some(Ok(Err(_))) => SubagentWaitOutcome::Disconnected,
577        None | Some(Err(_)) => SubagentWaitOutcome::TimedOut,
578    }
579}
580
581fn apply_subagent_wait_outcome(
582    outcome: SubagentWaitOutcome,
583    config: &SubagentConfig,
584    timeout_cancel: &CancellationToken,
585    task_handle: &tokio::task::JoinHandle<()>,
586    state: &mut SubagentExecutionState,
587) -> bool {
588    // Compute the (parent-visible response, detailed error) for the failure
589    // arms; a successful replay returns early and leaves the finished task be.
590    let (response, details) = match outcome {
591        SubagentWaitOutcome::ReplayEvents => return true,
592        SubagentWaitOutcome::TimedOut => (
593            "Subagent timed out".to_string(),
594            format!(
595                "Subagent '{}' timed out after {}ms",
596                config.name,
597                config.timeout_ms.unwrap_or(0)
598            ),
599        ),
600        SubagentWaitOutcome::Disconnected => (
601            "Subagent ended unexpectedly".to_string(),
602            format!(
603                "Subagent '{}' ended before returning a final state",
604                config.name
605            ),
606        ),
607        SubagentWaitOutcome::Cancelled => (
608            "Subagent cancelled".to_string(),
609            format!("Subagent '{}' was cancelled", config.name),
610        ),
611        SubagentWaitOutcome::AwaitingConfirmation => (
612            "Subagent requires confirmation".to_string(),
613            format!(
614                "Subagent '{}' requested confirmation, which is not supported in nested runs",
615                config.name
616            ),
617        ),
618        SubagentWaitOutcome::Error(error) => (error.message.clone(), error.message),
619    };
620
621    // Every failure path reclaims the spawned run before reporting.
622    timeout_cancel.cancel();
623    task_handle.abort();
624    state.fail(response, details);
625    false
626}
627
628async fn replay_subagent_events<Ctx: Send + Sync + 'static>(
629    event_store: &Arc<dyn EventStore>,
630    thread_id: &ThreadId,
631    parent_ctx: &ToolContext<Ctx>,
632    config: &SubagentConfig,
633    subagent_id: &str,
634    state: &mut SubagentExecutionState,
635) -> Result<()> {
636    for envelope in event_store.get_events(thread_id).await? {
637        match envelope.event {
638            AgentEvent::Text { message_id, text } => {
639                // Keep only the last assistant message's text. A new message id
640                // means a new assistant message, so reset the accumulator;
641                // multiple text blocks within one message still concatenate.
642                // This honors the documented contract that the parent receives
643                // "only the final text response", not interim commentary.
644                if state.final_response_message_id.as_deref() != Some(message_id.as_str()) {
645                    state.final_response.clear();
646                    state.final_response_message_id = Some(message_id);
647                }
648                state.final_response.push_str(&text);
649            }
650            AgentEvent::ToolCallStart {
651                id, name, input, ..
652            } => {
653                state.tool_count += 1;
654                let context = extract_tool_context(&name, &input);
655                state
656                    .pending_tools
657                    .insert(id, (name.clone(), context.clone()));
658
659                emit_subagent_progress_if_possible(
660                    parent_ctx,
661                    config,
662                    SubagentProgressUpdate {
663                        subagent_id,
664                        total_turns: state.total_turns,
665                        total_usage: &state.total_usage,
666                        tool_name: name,
667                        tool_context: context,
668                        completed: false,
669                        success: false,
670                        tool_count: state.tool_count,
671                    },
672                )
673                .await;
674            }
675            AgentEvent::ToolCallEnd {
676                id,
677                name,
678                display_name,
679                result,
680            } => {
681                let context = state
682                    .pending_tools
683                    .remove(&id)
684                    .map(|(_, ctx)| ctx)
685                    .unwrap_or_default();
686                let tool_success = result.success;
687                state.tool_logs.push(ToolCallLog {
688                    name: name.clone(),
689                    display_name: display_name.clone(),
690                    context: context.clone(),
691                    result: summarize_tool_result(&name, &result),
692                    success: tool_success,
693                    duration_ms: result.duration_ms,
694                });
695
696                emit_subagent_progress_if_possible(
697                    parent_ctx,
698                    config,
699                    SubagentProgressUpdate {
700                        subagent_id,
701                        total_turns: state.total_turns,
702                        total_usage: &state.total_usage,
703                        tool_name: name,
704                        tool_context: context,
705                        completed: true,
706                        success: tool_success,
707                        tool_count: state.tool_count,
708                    },
709                )
710                .await;
711            }
712            AgentEvent::TurnComplete { turn, usage, .. } => {
713                state.total_turns = turn;
714                state.total_usage.add(&usage);
715            }
716            AgentEvent::Done {
717                total_turns: turns, ..
718            } => {
719                state.total_turns = turns;
720                break;
721            }
722            AgentEvent::Refusal { text, .. } => {
723                let refusal_message =
724                    text.unwrap_or_else(|| "Subagent refused the request".to_string());
725                state.error_details = Some(refusal_message.clone());
726                state.final_response = refusal_message;
727                state.success = false;
728                break;
729            }
730            AgentEvent::Error { message, .. } => {
731                state.error_details = Some(message.clone());
732                state.final_response = message;
733                state.success = false;
734                break;
735            }
736            _ => {}
737        }
738    }
739    Ok(())
740}
741
742async fn emit_subagent_progress_if_possible<Ctx: Send + Sync + 'static>(
743    parent_ctx: &ToolContext<Ctx>,
744    config: &SubagentConfig,
745    update: SubagentProgressUpdate<'_>,
746) {
747    if let Err(error) = emit_subagent_progress(parent_ctx, config, update).await {
748        log::warn!("Failed to emit subagent progress event: {error}");
749    }
750}
751
752async fn emit_subagent_progress<Ctx: Send + Sync + 'static>(
753    parent_ctx: &ToolContext<Ctx>,
754    config: &SubagentConfig,
755    SubagentProgressUpdate {
756        subagent_id,
757        total_turns,
758        total_usage,
759        tool_name,
760        tool_context,
761        completed,
762        success,
763        tool_count,
764    }: SubagentProgressUpdate<'_>,
765) -> Result<()> {
766    let max_turns = config.max_turns.map(usize_to_u32_saturating);
767    let current_turn = Some(usize_to_u32_saturating(total_turns));
768
769    parent_ctx
770        .emit_event(AgentEvent::SubagentProgress {
771            subagent_id: subagent_id.to_string(),
772            subagent_name: config.name.clone(),
773            nickname: config.nickname.clone(),
774            child_thread_id: None,
775            child_root_task_id: None,
776            subagent_task_id: None,
777            max_turns,
778            current_turn,
779            model: config.model.clone(),
780            tool_name,
781            tool_context,
782            completed,
783            success,
784            tool_count,
785            total_tokens: subagent_total_tokens(total_usage),
786        })
787        .await
788}
789
790fn usize_to_u32_saturating(value: usize) -> u32 {
791    u32::try_from(value).unwrap_or(u32::MAX)
792}
793
794#[cfg(feature = "otel")]
795fn emit_subagent_observability<P, H, M, S>(tool: &SubagentTool<P, H, M, S>, result: &SubagentResult)
796where
797    P: LlmProvider + Clone + 'static,
798    H: AgentHooks + Clone + 'static,
799    M: MessageStore + 'static,
800    S: StateStore + 'static,
801{
802    use crate::observability::{attrs, baggage, langfuse, metrics, provider_name, spans};
803    use opentelemetry::Context;
804    use opentelemetry::KeyValue;
805    use opentelemetry::trace::{Span, TraceContextExt};
806
807    // Capture the parent turn's SpanContext **before** starting the
808    // synthetic subagent span so the link points at the caller, not at
809    // ourselves.  When the agent runs without an active parent span
810    // (e.g. running a subagent from a top-level test) the captured
811    // context is invalid and `link_to_parent_turn` becomes a no-op.
812    let parent_ctx = Context::current();
813    let parent_span_context = parent_ctx.span().span_context().clone();
814
815    let normalized_provider_name = provider_name::normalize(tool.provider.provider());
816    let request_model = tool.provider.model().to_string();
817    let agent_name = tool.config.name.clone();
818
819    let mut span = spans::start_internal_span(
820        "invoke_agent",
821        vec![
822            KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
823            KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name.clone()),
824            KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, normalized_provider_name),
825            KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.clone()),
826            KeyValue::new(attrs::SDK_RUN_MODE, "loop"),
827        ],
828    );
829    baggage::copy_baggage_to_active_span(&mut span);
830    langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
831    if parent_span_context.is_valid() {
832        spans::link_to_parent_turn(
833            &mut span,
834            &parent_span_context.trace_id().to_string(),
835            &parent_span_context.span_id().to_string(),
836        );
837    }
838    let outcome = if result.success { "done" } else { "error" };
839    span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
840    span.set_attribute(attrs::kv_i64(
841        attrs::SDK_TOTAL_TURNS,
842        i64::try_from(result.total_turns).unwrap_or(0),
843    ));
844    span.set_attribute(attrs::kv_i64(
845        attrs::GEN_AI_USAGE_INPUT_TOKENS,
846        i64::from(result.usage.input_tokens),
847    ));
848    span.set_attribute(attrs::kv_i64(
849        attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
850        i64::from(result.usage.output_tokens),
851    ));
852    if outcome == "error" {
853        spans::set_span_error(&mut span, "agent_error", "subagent invocation failed");
854    }
855    span.end();
856
857    // ── Metrics ─────────────────────────────────────────────────────
858    //
859    // Subagent invocations get a dedicated counter so dashboards can
860    // tell parent-turn LLM activity apart from delegated subagent
861    // work. We also fold the subagent's token usage into the shared
862    // `gen_ai.client.token.usage` histogram so global token-spend
863    // accounting stays accurate; the `gen_ai.operation.name` label
864    // discriminates `invoke_agent` from regular `chat` calls.
865    let metrics_handle = metrics::Metrics::global();
866    metrics_handle.subagent_invocations.add(
867        1,
868        &[
869            KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name),
870            KeyValue::new(attrs::SDK_OUTCOME, outcome),
871        ],
872    );
873    record_subagent_token_usage(
874        &metrics_handle,
875        result,
876        normalized_provider_name,
877        &request_model,
878    );
879}
880
881#[cfg(feature = "otel")]
882fn record_subagent_token_usage(
883    metrics: &crate::observability::metrics::Metrics,
884    result: &SubagentResult,
885    provider_name: &'static str,
886    request_model: &str,
887) {
888    use crate::observability::attrs;
889    use opentelemetry::KeyValue;
890
891    let entries: [(u32, &'static str); 2] = [
892        (result.usage.input_tokens, "input"),
893        (result.usage.output_tokens, "output"),
894    ];
895
896    for (count, token_type) in entries {
897        if count == 0 {
898            continue;
899        }
900        metrics.token_usage.record(
901            u64::from(count),
902            &[
903                KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
904                KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider_name),
905                KeyValue::new("gen_ai.token.type", token_type),
906                KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.to_string()),
907            ],
908        );
909    }
910}
911
912#[cfg(not(feature = "otel"))]
913const fn emit_subagent_observability<P, H, M, S>(
914    _tool: &SubagentTool<P, H, M, S>,
915    _result: &SubagentResult,
916) where
917    P: LlmProvider + Clone + 'static,
918    H: AgentHooks + Clone + 'static,
919    M: MessageStore + 'static,
920    S: StateStore + 'static,
921{
922}
923
924/// Extracts context information from tool input for display.
925fn extract_tool_context(name: &str, input: &Value) -> String {
926    match name {
927        "read" => input
928            .get("file_path")
929            .or_else(|| input.get("path"))
930            .and_then(Value::as_str)
931            .unwrap_or("")
932            .to_string(),
933        "write" | "edit" => input
934            .get("file_path")
935            .or_else(|| input.get("path"))
936            .and_then(Value::as_str)
937            .unwrap_or("")
938            .to_string(),
939        "bash" => {
940            let cmd = input.get("command").and_then(Value::as_str).unwrap_or("");
941            // Truncate long commands (UTF-8 safe)
942            if cmd.len() > 60 {
943                format!("{}...", crate::primitive_tools::truncate_str(cmd, 57))
944            } else {
945                cmd.to_string()
946            }
947        }
948        "glob" | "grep" => input
949            .get("pattern")
950            .and_then(Value::as_str)
951            .unwrap_or("")
952            .to_string(),
953        "web_search" => input
954            .get("query")
955            .and_then(Value::as_str)
956            .unwrap_or("")
957            .to_string(),
958        _ => String::new(),
959    }
960}
961
962/// Summarizes tool result for logging.
963fn summarize_tool_result(name: &str, result: &ToolResult) -> String {
964    if !result.success {
965        let first_line = result.output.lines().next().unwrap_or("Error");
966        return if first_line.len() > 50 {
967            format!(
968                "{}...",
969                crate::primitive_tools::truncate_str(first_line, 47)
970            )
971        } else {
972            first_line.to_string()
973        };
974    }
975
976    match name {
977        "read" => {
978            let line_count = result.output.lines().count();
979            format!("{line_count} lines")
980        }
981        "write" => "wrote file".to_string(),
982        "edit" => "edited".to_string(),
983        "bash" => {
984            let lines: Vec<&str> = result.output.lines().collect();
985            if lines.is_empty() {
986                "done".to_string()
987            } else if lines.len() == 1 {
988                let line = lines[0];
989                if line.len() > 50 {
990                    format!("{}...", crate::primitive_tools::truncate_str(line, 47))
991                } else {
992                    line.to_string()
993                }
994            } else {
995                format!("{} lines", lines.len())
996            }
997        }
998        "glob" => {
999            let count = result.output.lines().count();
1000            format!("{count} files")
1001        }
1002        "grep" => {
1003            let count = result.output.lines().count();
1004            format!("{count} matches")
1005        }
1006        _ => {
1007            let line_count = result.output.lines().count();
1008            if line_count == 0 {
1009                "done".to_string()
1010            } else {
1011                format!("{line_count} lines")
1012            }
1013        }
1014    }
1015}
1016
1017impl<P, H, M, S, Ctx> Tool<Ctx> for SubagentTool<P, H, M, S>
1018where
1019    P: LlmProvider + Clone + 'static,
1020    H: AgentHooks + Clone + 'static,
1021    M: MessageStore + 'static,
1022    S: StateStore + 'static,
1023    Ctx: Send + Sync + 'static,
1024{
1025    type Name = DynamicToolName;
1026
1027    fn name(&self) -> DynamicToolName {
1028        DynamicToolName::new(format!("subagent_{}", self.config.name))
1029    }
1030
1031    fn display_name(&self) -> &'static str {
1032        self.cached_display_name
1033    }
1034
1035    fn description(&self) -> &'static str {
1036        self.cached_description
1037    }
1038
1039    fn input_schema(&self) -> Value {
1040        json!({
1041            "type": "object",
1042            "properties": {
1043                "task": {
1044                    "type": "string",
1045                    "description": "The task or question for the subagent to handle"
1046                }
1047            },
1048            "required": ["task"]
1049        })
1050    }
1051
1052    fn tier(&self) -> ToolTier {
1053        // Subagent spawning requires confirmation
1054        ToolTier::Confirm
1055    }
1056
1057    async fn execute(&self, ctx: &ToolContext<Ctx>, input: Value) -> Result<ToolResult> {
1058        let task = input
1059            .get("task")
1060            .and_then(Value::as_str)
1061            .context("Missing 'task' parameter")?;
1062
1063        // ── Depth limit enforcement ───────────────────────────────────
1064        let current_depth = ctx
1065            .metadata
1066            .get(METADATA_SUBAGENT_DEPTH)
1067            .and_then(Value::as_u64)
1068            .unwrap_or(0);
1069        let max_depth = ctx
1070            .metadata
1071            .get(METADATA_MAX_SUBAGENT_DEPTH)
1072            .and_then(Value::as_u64)
1073            .unwrap_or(3); // default: 3 levels deep
1074
1075        if current_depth >= max_depth {
1076            bail!(
1077                "Subagent depth limit exceeded ({current_depth}/{max_depth}). \
1078                 Cannot spawn nested subagent '{}' — maximum nesting depth reached.",
1079                self.config.name
1080            );
1081        }
1082
1083        // ── Thread limit enforcement (semaphore) ──────────────────────
1084        let _permit = if let Some(ref sem) = ctx.subagent_semaphore() {
1085            match sem.clone().try_acquire_owned() {
1086                Ok(permit) => Some(permit),
1087                Err(_) => {
1088                    return Ok(ToolResult {
1089                        success: false,
1090                        output: format!(
1091                            "Cannot spawn subagent '{}': maximum concurrent subagent limit reached. \
1092                             Try again when another subagent completes.",
1093                            self.config.name
1094                        ),
1095                        data: None,
1096                        documents: Vec::new(),
1097                        duration_ms: Some(0),
1098                    });
1099                }
1100            }
1101        } else {
1102            None
1103        };
1104
1105        // Generate a unique ID for this subagent execution
1106        let subagent_id = format!(
1107            "{}_{:x}",
1108            self.config.name,
1109            std::time::SystemTime::now()
1110                .duration_since(std::time::UNIX_EPOCH)
1111                .unwrap_or_default()
1112                .as_nanos()
1113        );
1114
1115        // Use the context's cancellation token if available, otherwise create a standalone one.
1116        // This ensures that when a parent agent is cancelled, subagents are also cancelled.
1117        let cancel_token = ctx.cancel_token().unwrap_or_default();
1118
1119        let result = self
1120            .run_subagent(task, subagent_id, ctx, cancel_token)
1121            .await?;
1122
1123        Ok(ToolResult {
1124            success: result.success,
1125            output: result.final_response.clone(),
1126            data: Some(serde_json::to_value(&result).unwrap_or_default()),
1127            documents: Vec::new(),
1128            duration_ms: Some(result.duration_ms),
1129        })
1130    }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use super::*;
1136    use crate::authority::{EventAuthority, LocalEventAuthority};
1137    use crate::events::{AgentEvent, AgentEventEnvelope};
1138    use crate::llm::{ChatOutcome, ChatRequest, ChatResponse, ContentBlock, StopReason, Usage};
1139    use crate::stores::{EventStore, InMemoryEventStore, StoredTurnEvents};
1140    use anyhow::{Context, Result, bail};
1141    use async_trait::async_trait;
1142    use tokio::sync::Mutex;
1143
1144    #[derive(Clone)]
1145    struct TestProvider {
1146        responses: Arc<Mutex<Vec<ChatOutcome>>>,
1147        delay: Option<Duration>,
1148    }
1149
1150    impl TestProvider {
1151        fn new(responses: Vec<ChatOutcome>) -> Self {
1152            Self {
1153                responses: Arc::new(Mutex::new(responses)),
1154                delay: None,
1155            }
1156        }
1157
1158        fn with_delay(mut self, delay: Duration) -> Self {
1159            self.delay = Some(delay);
1160            self
1161        }
1162
1163        fn text_response(text: &str) -> ChatOutcome {
1164            ChatOutcome::Success(ChatResponse {
1165                id: "resp_text".to_string(),
1166                content: vec![ContentBlock::Text {
1167                    text: text.to_string(),
1168                }],
1169                model: "test-model".to_string(),
1170                stop_reason: Some(StopReason::EndTurn),
1171                usage: Usage {
1172                    input_tokens: 10,
1173                    output_tokens: 20,
1174                    cached_input_tokens: 0,
1175                    cache_creation_input_tokens: 0,
1176                },
1177            })
1178        }
1179
1180        fn tool_use_response(tool_id: &str, tool_name: &str, input: Value) -> ChatOutcome {
1181            ChatOutcome::Success(ChatResponse {
1182                id: "resp_tool".to_string(),
1183                content: vec![ContentBlock::ToolUse {
1184                    id: tool_id.to_string(),
1185                    name: tool_name.to_string(),
1186                    input,
1187                    thought_signature: None,
1188                }],
1189                model: "test-model".to_string(),
1190                stop_reason: Some(StopReason::ToolUse),
1191                usage: Usage {
1192                    input_tokens: 15,
1193                    output_tokens: 25,
1194                    cached_input_tokens: 0,
1195                    cache_creation_input_tokens: 0,
1196                },
1197            })
1198        }
1199
1200        fn refusal_response(text: Option<&str>) -> ChatOutcome {
1201            let content = text.map_or_else(Vec::new, |text| {
1202                vec![ContentBlock::Text {
1203                    text: text.to_string(),
1204                }]
1205            });
1206            ChatOutcome::Success(ChatResponse {
1207                id: "resp_refusal".to_string(),
1208                content,
1209                model: "test-model".to_string(),
1210                stop_reason: Some(StopReason::Refusal),
1211                usage: Usage {
1212                    input_tokens: 12,
1213                    output_tokens: 0,
1214                    cached_input_tokens: 0,
1215                    cache_creation_input_tokens: 0,
1216                },
1217            })
1218        }
1219    }
1220
1221    #[async_trait]
1222    impl LlmProvider for TestProvider {
1223        async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1224            if let Some(delay) = self.delay {
1225                tokio::time::sleep(delay).await;
1226            }
1227
1228            let mut responses = self.responses.lock().await;
1229            if responses.is_empty() {
1230                Ok(Self::text_response("default"))
1231            } else {
1232                Ok(responses.remove(0))
1233            }
1234        }
1235
1236        fn model(&self) -> &'static str {
1237            "test-model"
1238        }
1239
1240        fn provider(&self) -> &'static str {
1241            "mock"
1242        }
1243    }
1244
1245    struct TestEchoTool;
1246
1247    impl Tool<()> for TestEchoTool {
1248        type Name = DynamicToolName;
1249
1250        fn name(&self) -> DynamicToolName {
1251            DynamicToolName::new("echo")
1252        }
1253
1254        fn display_name(&self) -> &'static str {
1255            "Echo"
1256        }
1257
1258        fn description(&self) -> &'static str {
1259            "Echo the input"
1260        }
1261
1262        fn input_schema(&self) -> Value {
1263            json!({
1264                "type": "object",
1265                "properties": {
1266                    "message": { "type": "string" }
1267                },
1268                "required": ["message"]
1269            })
1270        }
1271
1272        fn tier(&self) -> ToolTier {
1273            ToolTier::Observe
1274        }
1275
1276        async fn execute(&self, _ctx: &ToolContext<()>, input: Value) -> Result<ToolResult> {
1277            let message = input
1278                .get("message")
1279                .and_then(Value::as_str)
1280                .context("missing echo message")?;
1281            Ok(ToolResult::success(format!("Echo: {message}")))
1282        }
1283    }
1284
1285    #[derive(Clone, Default)]
1286    struct RecordingEventStore {
1287        inner: Arc<InMemoryEventStore>,
1288        appended: Arc<Mutex<Vec<(ThreadId, usize, AgentEventEnvelope)>>>,
1289    }
1290
1291    impl RecordingEventStore {
1292        async fn appended_events(&self) -> Vec<(ThreadId, usize, AgentEventEnvelope)> {
1293            self.appended.lock().await.clone()
1294        }
1295    }
1296
1297    #[async_trait]
1298    impl EventStore for RecordingEventStore {
1299        async fn append(
1300            &self,
1301            thread_id: &ThreadId,
1302            turn: usize,
1303            envelope: AgentEventEnvelope,
1304        ) -> Result<()> {
1305            self.appended
1306                .lock()
1307                .await
1308                .push((thread_id.clone(), turn, envelope.clone()));
1309            self.inner.append(thread_id, turn, envelope).await
1310        }
1311
1312        async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1313            self.inner.finish_turn(thread_id, turn).await
1314        }
1315
1316        async fn get_turn(
1317            &self,
1318            thread_id: &ThreadId,
1319            turn: usize,
1320        ) -> Result<Option<StoredTurnEvents>> {
1321            self.inner.get_turn(thread_id, turn).await
1322        }
1323
1324        async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1325            self.inner.get_turns(thread_id).await
1326        }
1327
1328        async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1329            self.inner.clear(thread_id).await
1330        }
1331    }
1332
1333    #[derive(Clone, Default)]
1334    struct AlwaysFailAppendEventStore;
1335
1336    #[async_trait]
1337    impl EventStore for AlwaysFailAppendEventStore {
1338        async fn append(
1339            &self,
1340            _thread_id: &ThreadId,
1341            _turn: usize,
1342            _envelope: AgentEventEnvelope,
1343        ) -> Result<()> {
1344            bail!("append failed")
1345        }
1346
1347        async fn finish_turn(&self, _thread_id: &ThreadId, _turn: usize) -> Result<()> {
1348            Ok(())
1349        }
1350
1351        async fn get_turn(
1352            &self,
1353            _thread_id: &ThreadId,
1354            _turn: usize,
1355        ) -> Result<Option<StoredTurnEvents>> {
1356            Ok(None)
1357        }
1358
1359        async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1360            Ok(Vec::new())
1361        }
1362
1363        async fn clear(&self, _thread_id: &ThreadId) -> Result<()> {
1364            Ok(())
1365        }
1366    }
1367
1368    #[derive(Clone, Default)]
1369    struct NoReadAfterFailureEventStore {
1370        inner: Arc<InMemoryEventStore>,
1371    }
1372
1373    #[async_trait]
1374    impl EventStore for NoReadAfterFailureEventStore {
1375        async fn append(
1376            &self,
1377            thread_id: &ThreadId,
1378            turn: usize,
1379            envelope: AgentEventEnvelope,
1380        ) -> Result<()> {
1381            self.inner.append(thread_id, turn, envelope).await
1382        }
1383
1384        async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1385            self.inner.finish_turn(thread_id, turn).await
1386        }
1387
1388        async fn get_turn(
1389            &self,
1390            thread_id: &ThreadId,
1391            turn: usize,
1392        ) -> Result<Option<StoredTurnEvents>> {
1393            self.inner.get_turn(thread_id, turn).await
1394        }
1395
1396        async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1397            bail!("get_events should not be called after subagent failure")
1398        }
1399
1400        async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1401            self.inner.clear(thread_id).await
1402        }
1403    }
1404
1405    #[derive(Clone, Default)]
1406    struct PanicProvider;
1407
1408    #[async_trait]
1409    impl LlmProvider for PanicProvider {
1410        async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1411            // Panic isolation catches this at the run-loop
1412            // boundary and turns it into a structured `Error`, so the
1413            // parent classifies it as an agent error rather than a
1414            // dropped channel. The literal text is asserted on by
1415            // `test_run_subagent_panic_classified_as_error_not_disconnected`.
1416            panic!("panic provider should disconnect subagent");
1417        }
1418
1419        fn model(&self) -> &'static str {
1420            "panic-model"
1421        }
1422
1423        fn provider(&self) -> &'static str {
1424            "panic"
1425        }
1426    }
1427
1428    #[test]
1429    fn test_subagent_config_builder() {
1430        let config = SubagentConfig::new("test")
1431            .with_system_prompt("Test prompt")
1432            .with_max_turns(5)
1433            .with_timeout_ms(30000);
1434
1435        assert_eq!(config.name, "test");
1436        assert_eq!(config.system_prompt, "Test prompt");
1437        assert_eq!(config.max_turns, Some(5));
1438        assert_eq!(config.timeout_ms, Some(30000));
1439    }
1440
1441    #[test]
1442    fn test_subagent_config_defaults() {
1443        let config = SubagentConfig::new("default");
1444
1445        assert_eq!(config.name, "default");
1446        assert!(config.system_prompt.is_empty());
1447        assert_eq!(config.max_turns, None);
1448        assert_eq!(config.timeout_ms, None);
1449    }
1450
1451    #[test]
1452    fn test_subagent_result_serialization() -> Result<()> {
1453        let result = SubagentResult {
1454            name: "test".to_string(),
1455            final_response: "Done".to_string(),
1456            total_turns: 3,
1457            tool_count: 5,
1458            tool_logs: vec![
1459                ToolCallLog {
1460                    name: "read".to_string(),
1461                    display_name: "Read file".to_string(),
1462                    context: "/tmp/test.rs".to_string(),
1463                    result: "50 lines".to_string(),
1464                    success: true,
1465                    duration_ms: Some(10),
1466                },
1467                ToolCallLog {
1468                    name: "grep".to_string(),
1469                    display_name: "Grep TODO".to_string(),
1470                    context: "TODO".to_string(),
1471                    result: "3 matches".to_string(),
1472                    success: true,
1473                    duration_ms: Some(5),
1474                },
1475            ],
1476            usage: TokenUsage::default(),
1477            success: true,
1478            duration_ms: 1000,
1479            error_details: None,
1480            failed_tool: None,
1481        };
1482
1483        let json = serde_json::to_string(&result).context("failed to serialize subagent result")?;
1484        assert!(json.contains("test"));
1485        assert!(json.contains("Done"));
1486        assert!(json.contains("tool_count"));
1487        assert!(json.contains("tool_logs"));
1488        assert!(json.contains("/tmp/test.rs"));
1489
1490        Ok(())
1491    }
1492
1493    #[test]
1494    fn test_subagent_result_field_extraction() -> Result<()> {
1495        let result = SubagentResult {
1496            name: "explore".to_string(),
1497            final_response: "Found 3 config files".to_string(),
1498            total_turns: 2,
1499            tool_count: 5,
1500            tool_logs: vec![ToolCallLog {
1501                name: "glob".to_string(),
1502                display_name: "Glob config files".to_string(),
1503                context: "**/*.toml".to_string(),
1504                result: "3 files".to_string(),
1505                success: true,
1506                duration_ms: Some(15),
1507            }],
1508            usage: TokenUsage {
1509                input_tokens: 1500,
1510                output_tokens: 500,
1511                ..Default::default()
1512            },
1513            success: true,
1514            duration_ms: 2500,
1515            error_details: None,
1516            failed_tool: None,
1517        };
1518
1519        let value =
1520            serde_json::to_value(&result).context("failed to convert subagent result to json")?;
1521
1522        let tool_count = value.get("tool_count").and_then(Value::as_u64);
1523        assert_eq!(tool_count, Some(5));
1524
1525        let usage = value.get("usage").context("missing usage field")?;
1526        let input_tokens = usage.get("input_tokens").and_then(Value::as_u64);
1527        let output_tokens = usage.get("output_tokens").and_then(Value::as_u64);
1528        assert_eq!(input_tokens, Some(1500));
1529        assert_eq!(output_tokens, Some(500));
1530
1531        let logs = value
1532            .get("tool_logs")
1533            .and_then(Value::as_array)
1534            .context("missing tool_logs array")?;
1535        assert_eq!(logs.len(), 1);
1536
1537        let first_log = &logs[0];
1538        assert_eq!(first_log.get("name").and_then(Value::as_str), Some("glob"));
1539        assert_eq!(
1540            first_log.get("context").and_then(Value::as_str),
1541            Some("**/*.toml")
1542        );
1543        assert_eq!(
1544            first_log.get("result").and_then(Value::as_str),
1545            Some("3 files")
1546        );
1547        assert_eq!(
1548            first_log.get("success").and_then(Value::as_bool),
1549            Some(true)
1550        );
1551
1552        Ok(())
1553    }
1554
1555    #[tokio::test]
1556    async fn test_run_subagent_uses_isolated_child_thread() -> Result<()> {
1557        let event_store = Arc::new(RecordingEventStore::default());
1558        let provider = Arc::new(TestProvider::new(vec![
1559            TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1560            TestProvider::text_response("Subagent complete"),
1561        ]));
1562        let mut tools = ToolRegistry::new();
1563        tools.register(TestEchoTool);
1564
1565        let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1566            let store = Arc::clone(&event_store);
1567            move || -> Arc<dyn EventStore> { store.clone() }
1568        });
1569        let parent_thread = ThreadId::new();
1570        let parent_ctx = ToolContext::new(()).with_event_store(
1571            event_store.clone(),
1572            parent_thread.clone(),
1573            1,
1574            Arc::new(LocalEventAuthority::new()),
1575        );
1576
1577        let result = tool
1578            .run_subagent(
1579                "Inspect the repo",
1580                "subagent_1".to_string(),
1581                &parent_ctx,
1582                CancellationToken::new(),
1583            )
1584            .await?;
1585
1586        assert!(result.success);
1587        assert_eq!(result.tool_count, 1);
1588        assert_eq!(result.tool_logs.len(), 1);
1589
1590        let parent_turn = event_store
1591            .get_turn(&parent_thread, 1)
1592            .await?
1593            .context("missing parent turn")?;
1594        assert!(!parent_turn.events.is_empty());
1595        assert!(
1596            parent_turn
1597                .events
1598                .iter()
1599                .all(|envelope| { matches!(envelope.event, AgentEvent::SubagentProgress { .. }) })
1600        );
1601
1602        let appended = event_store.appended_events().await;
1603        let child_thread = appended
1604            .iter()
1605            .map(|(thread_id, _, _)| thread_id.clone())
1606            .find(|thread_id| thread_id != &parent_thread)
1607            .context("missing child thread events")?;
1608        let child_turn = event_store
1609            .get_turn(&child_thread, 1)
1610            .await?
1611            .context("missing child turn")?;
1612        let child_events = event_store.get_events(&child_thread).await?;
1613
1614        assert!(
1615            child_turn
1616                .events
1617                .iter()
1618                .any(|envelope| { matches!(envelope.event, AgentEvent::ToolCallStart { .. }) })
1619        );
1620        assert!(
1621            child_events
1622                .iter()
1623                .any(|envelope| { matches!(envelope.event, AgentEvent::Done { .. }) })
1624        );
1625
1626        Ok(())
1627    }
1628
1629    #[tokio::test]
1630    async fn test_run_subagent_timeout_marks_result_as_failed() -> Result<()> {
1631        let event_store = Arc::new(NoReadAfterFailureEventStore::default());
1632        let provider = Arc::new(
1633            TestProvider::new(vec![TestProvider::text_response("Too late")])
1634                .with_delay(Duration::from_millis(50)),
1635        );
1636        let tool = SubagentTool::new(
1637            SubagentConfig::new("worker").with_timeout_ms(10),
1638            provider,
1639            Arc::new(ToolRegistry::<()>::new()),
1640            {
1641                let store = Arc::clone(&event_store);
1642                move || -> Arc<dyn EventStore> { store.clone() }
1643            },
1644        );
1645
1646        let result = tool
1647            .run_subagent(
1648                "Take too long",
1649                "subagent_timeout".to_string(),
1650                &ToolContext::new(()),
1651                CancellationToken::new(),
1652            )
1653            .await?;
1654
1655        assert!(!result.success);
1656        assert_eq!(result.final_response, "Subagent timed out");
1657        assert!(
1658            result
1659                .error_details
1660                .context("missing timeout details")?
1661                .contains("timed out")
1662        );
1663
1664        Ok(())
1665    }
1666
1667    #[tokio::test]
1668    async fn test_run_subagent_progress_failures_do_not_abort_successful_runs() -> Result<()> {
1669        let provider = Arc::new(TestProvider::new(vec![
1670            TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1671            TestProvider::text_response("Subagent complete"),
1672        ]));
1673        let mut tools = ToolRegistry::new();
1674        tools.register(TestEchoTool);
1675
1676        let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1677            move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) }
1678        });
1679        let parent_ctx = ToolContext::new(()).with_event_store(
1680            Arc::new(AlwaysFailAppendEventStore),
1681            ThreadId::new(),
1682            1,
1683            Arc::new(LocalEventAuthority::new()),
1684        );
1685
1686        let result = tool
1687            .run_subagent(
1688                "Inspect the repo",
1689                "subagent_progress".to_string(),
1690                &parent_ctx,
1691                CancellationToken::new(),
1692            )
1693            .await?;
1694
1695        assert!(result.success);
1696        assert_eq!(result.final_response, "Subagent complete");
1697        assert_eq!(result.tool_count, 1);
1698
1699        Ok(())
1700    }
1701
1702    #[tokio::test]
1703    async fn test_run_subagent_panic_classified_as_error_not_disconnected() -> Result<()> {
1704        // A subagent whose LLM provider panics must surface as a
1705        // structured `Error` — NOT the `Disconnected` ("ended
1706        // unexpectedly") path. Before panic isolation the
1707        // panic unwound the spawned run task, dropped `state_tx`, and
1708        // the parent saw a `RecvError` it misclassified as
1709        // `Disconnected`. The run-loop catch_unwind boundary now turns
1710        // the panic into `AgentRunState::Error`, which the subagent
1711        // classifies correctly.
1712        let tool = SubagentTool::new(
1713            SubagentConfig::new("worker"),
1714            Arc::new(PanicProvider),
1715            Arc::new(ToolRegistry::<()>::new()),
1716            move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
1717        );
1718
1719        let result = tool
1720            .run_subagent(
1721                "Crash",
1722                "subagent_panic".to_string(),
1723                &ToolContext::new(()),
1724                CancellationToken::new(),
1725            )
1726            .await?;
1727
1728        assert!(!result.success);
1729        // The disconnect path must NOT be taken: a caught panic is a
1730        // structured agent error, not an opaque dropped channel.
1731        assert_ne!(result.final_response, "Subagent ended unexpectedly");
1732        let details = result
1733            .error_details
1734            .context("panicking subagent must carry structured error details")?;
1735        assert!(
1736            !details.contains("ended before returning a final state"),
1737            "panic must not be classified as Disconnected; got {details:?}",
1738        );
1739        assert!(
1740            details.contains("panicked"),
1741            "structured error should reflect the panic; got {details:?}",
1742        );
1743        assert!(
1744            details.contains("panic provider should disconnect subagent"),
1745            "structured error should carry the original panic message; got {details:?}",
1746        );
1747
1748        Ok(())
1749    }
1750
1751    #[tokio::test]
1752    async fn test_run_subagent_refusal_marks_result_as_failed() -> Result<()> {
1753        let tool = SubagentTool::new(
1754            SubagentConfig::new("worker"),
1755            Arc::new(TestProvider::new(vec![TestProvider::refusal_response(
1756                Some("Refused for policy reasons"),
1757            )])),
1758            Arc::new(ToolRegistry::<()>::new()),
1759            || Arc::new(InMemoryEventStore::new()),
1760        );
1761
1762        let result = tool
1763            .run_subagent(
1764                "Refuse",
1765                "subagent_refusal".to_string(),
1766                &ToolContext::new(()),
1767                CancellationToken::new(),
1768            )
1769            .await?;
1770
1771        assert!(!result.success);
1772        assert_eq!(result.final_response, "Refused for policy reasons");
1773        assert_eq!(
1774            result.error_details.as_deref(),
1775            Some("Refused for policy reasons")
1776        );
1777
1778        Ok(())
1779    }
1780
1781    #[tokio::test]
1782    async fn test_run_subagent_cancelled_marks_result_as_failed() -> Result<()> {
1783        let tool = SubagentTool::new(
1784            SubagentConfig::new("worker"),
1785            Arc::new(
1786                TestProvider::new(vec![TestProvider::text_response("Too late")])
1787                    .with_delay(Duration::from_millis(50)),
1788            ),
1789            Arc::new(ToolRegistry::<()>::new()),
1790            || Arc::new(InMemoryEventStore::new()),
1791        );
1792        let cancel_token = CancellationToken::new();
1793        cancel_token.cancel();
1794
1795        let result = tool
1796            .run_subagent(
1797                "Cancel",
1798                "subagent_cancelled".to_string(),
1799                &ToolContext::new(()),
1800                cancel_token,
1801            )
1802            .await?;
1803
1804        assert!(!result.success);
1805        assert_eq!(result.final_response, "Subagent cancelled");
1806        assert!(
1807            result
1808                .error_details
1809                .context("missing cancellation details")?
1810                .contains("cancelled")
1811        );
1812
1813        Ok(())
1814    }
1815
1816    #[tokio::test]
1817    async fn test_run_subagent_llm_error_does_not_infer_failed_tool() -> Result<()> {
1818        let provider = Arc::new(TestProvider::new(vec![
1819            ChatOutcome::ServerError("llm transport failed".to_string()),
1820            ChatOutcome::ServerError("llm transport failed".to_string()),
1821            ChatOutcome::ServerError("llm transport failed".to_string()),
1822            ChatOutcome::ServerError("llm transport failed".to_string()),
1823            ChatOutcome::ServerError("llm transport failed".to_string()),
1824            ChatOutcome::ServerError("llm transport failed".to_string()),
1825        ]));
1826        let mut tools = ToolRegistry::new();
1827        tools.register(TestEchoTool);
1828
1829        let tool = SubagentTool::new(
1830            SubagentConfig::new("worker"),
1831            provider,
1832            Arc::new(tools),
1833            || Arc::new(InMemoryEventStore::new()),
1834        );
1835
1836        let result = tool
1837            .run_subagent(
1838                "Trigger an llm failure",
1839                "subagent_llm_error".to_string(),
1840                &ToolContext::new(()),
1841                CancellationToken::new(),
1842            )
1843            .await?;
1844
1845        assert!(!result.success);
1846        assert!(result.failed_tool.is_none());
1847        assert!(
1848            result
1849                .error_details
1850                .as_deref()
1851                .unwrap_or_default()
1852                .contains("Server error")
1853        );
1854
1855        Ok(())
1856    }
1857
1858    #[tokio::test]
1859    async fn test_replay_subagent_events_stops_after_error() -> Result<()> {
1860        let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1861        let thread_id = ThreadId::new();
1862        let authority = LocalEventAuthority::new();
1863        event_store
1864            .append(
1865                &thread_id,
1866                1,
1867                authority.wrap(AgentEvent::Error {
1868                    message: "subagent boom".to_string(),
1869                    recoverable: false,
1870                }),
1871            )
1872            .await?;
1873        event_store
1874            .append(
1875                &thread_id,
1876                1,
1877                authority.wrap(AgentEvent::Text {
1878                    message_id: "msg_after_error".to_string(),
1879                    text: "should not be appended".to_string(),
1880                }),
1881            )
1882            .await?;
1883
1884        let mut state = SubagentExecutionState::new();
1885        replay_subagent_events(
1886            &event_store,
1887            &thread_id,
1888            &ToolContext::new(()),
1889            &SubagentConfig::new("worker"),
1890            "subagent_error",
1891            &mut state,
1892        )
1893        .await?;
1894
1895        assert!(!state.success);
1896        assert_eq!(state.final_response, "subagent boom");
1897        assert_eq!(state.error_details.as_deref(), Some("subagent boom"));
1898
1899        Ok(())
1900    }
1901
1902    #[tokio::test]
1903    async fn test_replay_keeps_only_final_message_text() -> Result<()> {
1904        let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1905        let thread_id = ThreadId::new();
1906        let authority = LocalEventAuthority::new();
1907
1908        // Interim commentary in an early assistant message.
1909        event_store
1910            .append(
1911                &thread_id,
1912                1,
1913                authority.wrap(AgentEvent::Text {
1914                    message_id: "m1".to_string(),
1915                    text: "Let me check the repo...".to_string(),
1916                }),
1917            )
1918            .await?;
1919        // The final answer in a later message, split across two text blocks.
1920        event_store
1921            .append(
1922                &thread_id,
1923                2,
1924                authority.wrap(AgentEvent::Text {
1925                    message_id: "m2".to_string(),
1926                    text: "Final answer ".to_string(),
1927                }),
1928            )
1929            .await?;
1930        event_store
1931            .append(
1932                &thread_id,
1933                2,
1934                authority.wrap(AgentEvent::Text {
1935                    message_id: "m2".to_string(),
1936                    text: "part two".to_string(),
1937                }),
1938            )
1939            .await?;
1940        event_store
1941            .append(
1942                &thread_id,
1943                2,
1944                authority.wrap(AgentEvent::done(
1945                    thread_id.clone(),
1946                    2,
1947                    TokenUsage::default(),
1948                    Duration::from_millis(1),
1949                )),
1950            )
1951            .await?;
1952
1953        let mut state = SubagentExecutionState::new();
1954        replay_subagent_events(
1955            &event_store,
1956            &thread_id,
1957            &ToolContext::new(()),
1958            &SubagentConfig::new("worker"),
1959            "subagent_final",
1960            &mut state,
1961        )
1962        .await?;
1963
1964        // Interim commentary must not be glued onto the final answer; only the
1965        // last assistant message's text is returned.
1966        assert_eq!(state.final_response, "Final answer part two");
1967
1968        Ok(())
1969    }
1970
1971    #[test]
1972    fn build_subagent_child_context_increments_depth_and_propagates_semaphore() {
1973        let semaphore = Arc::new(tokio::sync::Semaphore::new(2));
1974        let parent = ToolContext::new(())
1975            .with_metadata(METADATA_SUBAGENT_DEPTH, json!(2))
1976            .with_metadata(METADATA_MAX_SUBAGENT_DEPTH, json!(5))
1977            .with_subagent_semaphore(Arc::clone(&semaphore));
1978
1979        let child = build_subagent_child_context(&parent);
1980
1981        assert_eq!(
1982            child
1983                .metadata
1984                .get(METADATA_SUBAGENT_DEPTH)
1985                .and_then(Value::as_u64),
1986            Some(3)
1987        );
1988        // Host-set max-depth carries through unchanged.
1989        assert_eq!(
1990            child
1991                .metadata
1992                .get(METADATA_MAX_SUBAGENT_DEPTH)
1993                .and_then(Value::as_u64),
1994            Some(5)
1995        );
1996        // The shared concurrency semaphore propagates into the child run.
1997        assert!(child.subagent_semaphore().is_some());
1998    }
1999
2000    #[test]
2001    fn cached_tool_strings_are_interned_per_name() {
2002        let a = cached_tool_strings("worker");
2003        let b = cached_tool_strings("worker");
2004        // The same name reuses a single leaked allocation rather than leaking
2005        // afresh on every construction.
2006        assert!(std::ptr::eq(a.0, b.0));
2007        assert!(std::ptr::eq(a.1, b.1));
2008        assert_eq!(a.0, "Subagent: worker");
2009
2010        let c = cached_tool_strings("a-different-name");
2011        assert!(!std::ptr::eq(a.0, c.0));
2012    }
2013
2014    #[tokio::test]
2015    async fn test_nested_subagent_depth_limit_propagates() -> Result<()> {
2016        use crate::hooks::AllowAllHooks;
2017
2018        // The inner subagent must never run: the depth guard fires first.
2019        let inner = SubagentTool::new(
2020            SubagentConfig::new("inner"),
2021            Arc::new(TestProvider::new(vec![TestProvider::text_response(
2022                "inner ran (should not happen)",
2023            )])),
2024            Arc::new(ToolRegistry::<()>::new()),
2025            || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
2026        );
2027        let mut middle_tools = ToolRegistry::new();
2028        middle_tools.register(inner);
2029
2030        // The outer subagent tries to spawn the nested subagent, then finishes.
2031        // `AllowAllHooks` auto-approves the Confirm-tier nested spawn so it
2032        // actually reaches the depth guard.
2033        let outer = SubagentTool::new(
2034            SubagentConfig::new("outer"),
2035            Arc::new(TestProvider::new(vec![
2036                TestProvider::tool_use_response(
2037                    "call_inner",
2038                    "subagent_inner",
2039                    json!({ "task": "go deeper" }),
2040                ),
2041                TestProvider::text_response("outer done"),
2042            ])),
2043            Arc::new(middle_tools),
2044            || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
2045        )
2046        .with_hooks(Arc::new(AllowAllHooks));
2047
2048        // Cap nesting at depth 1: the outer runs at depth 1, so its nested
2049        // spawn (which would be depth 1 >= max 1) must be rejected.
2050        let parent_ctx = ToolContext::new(()).with_metadata(METADATA_MAX_SUBAGENT_DEPTH, json!(1));
2051
2052        let result = outer
2053            .execute(&parent_ctx, json!({ "task": "start" }))
2054            .await?;
2055
2056        assert!(result.success, "outer subagent should still complete");
2057        assert_eq!(result.output, "outer done");
2058
2059        let subresult: SubagentResult =
2060            serde_json::from_value(result.data.context("missing subagent result data")?)
2061                .context("subagent result should deserialize")?;
2062        let nested = subresult
2063            .tool_logs
2064            .iter()
2065            .find(|log| log.name == "subagent_inner")
2066            .context("missing nested subagent tool log")?;
2067        assert!(!nested.success, "nested spawn must be rejected");
2068        assert!(
2069            nested.result.contains("depth limit"),
2070            "nested failure should be the depth-limit error; got: {}",
2071            nested.result
2072        );
2073
2074        Ok(())
2075    }
2076}