1mod factory;
38
39pub use factory::SubagentFactory;
40
41use crate::events::AgentEvent;
42use crate::hooks::{AgentHooks, DefaultHooks};
43use crate::llm::LlmProvider;
44use crate::stores::{EventStore, InMemoryStore, MessageStore, StateStore};
45use crate::tools::{DynamicToolName, Tool, ToolContext, ToolRegistry};
46use crate::types::{AgentConfig, AgentInput, ThreadId, TokenUsage, ToolResult, ToolTier};
47use anyhow::{Context, Result, bail};
48use serde::{Deserialize, Serialize};
49use serde_json::{Value, json};
50use std::collections::HashMap;
51use std::sync::{Arc, Mutex, OnceLock, PoisonError};
52use std::time::{Duration, Instant};
53use tokio_util::sync::CancellationToken;
54
55fn cached_tool_strings(name: &str) -> (&'static str, &'static str) {
64 static CACHE: OnceLock<Mutex<HashMap<String, (&'static str, &'static str)>>> = OnceLock::new();
65 let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
66 let mut map = cache.lock().unwrap_or_else(PoisonError::into_inner);
67 if let Some(entry) = map.get(name) {
68 return *entry;
69 }
70 let display: &'static str = Box::leak(format!("Subagent: {name}").into_boxed_str());
71 let description: &'static str = Box::leak(
72 format!(
73 "Spawn a subagent named '{name}' to handle a task. The subagent will work independently and return only its final response."
74 )
75 .into_boxed_str(),
76 );
77 *map.entry(name.to_string())
78 .or_insert((display, description))
79}
80
81pub const METADATA_SUBAGENT_DEPTH: &str = "subagent_depth";
86
87pub const METADATA_MAX_SUBAGENT_DEPTH: &str = "max_subagent_depth";
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
94pub struct SubagentConfig {
95 pub name: String,
97 pub nickname: Option<String>,
99 pub system_prompt: String,
101 pub max_turns: Option<usize>,
103 pub timeout_ms: Option<u64>,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub model: Option<String>,
108}
109
110impl SubagentConfig {
111 #[must_use]
113 pub fn new(name: impl Into<String>) -> Self {
114 Self {
115 name: name.into(),
116 nickname: None,
117 system_prompt: String::new(),
118 max_turns: None,
119 timeout_ms: None,
120 model: None,
121 }
122 }
123
124 #[must_use]
126 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
127 self.system_prompt = prompt.into();
128 self
129 }
130
131 #[must_use]
133 pub const fn with_max_turns(mut self, max: usize) -> Self {
134 self.max_turns = Some(max);
135 self
136 }
137
138 #[must_use]
140 pub const fn with_timeout_ms(mut self, timeout: u64) -> Self {
141 self.timeout_ms = Some(timeout);
142 self
143 }
144
145 #[must_use]
147 pub fn with_model(mut self, model: impl Into<String>) -> Self {
148 self.model = Some(model.into());
149 self
150 }
151
152 #[must_use]
154 pub fn with_nickname(mut self, nickname: impl Into<String>) -> Self {
155 self.nickname = Some(nickname.into());
156 self
157 }
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize)]
162pub struct ToolCallLog {
163 pub name: String,
165 pub display_name: String,
167 pub context: String,
169 pub result: String,
171 pub success: bool,
173 pub duration_ms: Option<u64>,
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize)]
179pub struct SubagentResult {
180 pub name: String,
182 pub final_response: String,
184 pub total_turns: usize,
186 pub tool_count: u32,
188 pub tool_logs: Vec<ToolCallLog>,
190 pub usage: TokenUsage,
192 pub success: bool,
194 pub duration_ms: u64,
196 #[serde(default, skip_serializing_if = "Option::is_none")]
201 pub error_details: Option<String>,
202 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub failed_tool: Option<String>,
210}
211
212pub struct SubagentTool<P, H = DefaultHooks, M = InMemoryStore, S = InMemoryStore>
230where
231 P: LlmProvider,
232 H: AgentHooks,
233 M: MessageStore,
234 S: StateStore,
235{
236 config: SubagentConfig,
237 provider: Arc<P>,
238 tools: Arc<ToolRegistry<()>>,
239 hooks: Arc<H>,
240 message_store_factory: Arc<dyn Fn() -> M + Send + Sync>,
241 state_store_factory: Arc<dyn Fn() -> S + Send + Sync>,
242 event_store_factory: Arc<dyn Fn() -> Arc<dyn EventStore> + Send + Sync>,
243 cached_display_name: &'static str,
245 cached_description: &'static str,
247}
248
249impl<P> SubagentTool<P, DefaultHooks, InMemoryStore, InMemoryStore>
250where
251 P: LlmProvider + 'static,
252{
253 #[must_use]
255 pub fn new<EF>(
256 config: SubagentConfig,
257 provider: Arc<P>,
258 tools: Arc<ToolRegistry<()>>,
259 event_store_factory: EF,
260 ) -> Self
261 where
262 EF: Fn() -> Arc<dyn EventStore> + Send + Sync + 'static,
263 {
264 let (cached_display_name, cached_description) = cached_tool_strings(&config.name);
267 Self {
268 config,
269 provider,
270 tools,
271 hooks: Arc::new(DefaultHooks),
272 message_store_factory: Arc::new(InMemoryStore::new),
273 state_store_factory: Arc::new(InMemoryStore::new),
274 event_store_factory: Arc::new(event_store_factory),
275 cached_display_name,
276 cached_description,
277 }
278 }
279}
280
281impl<P, H, M, S> SubagentTool<P, H, M, S>
282where
283 P: LlmProvider + Clone + 'static,
284 H: AgentHooks + Clone + 'static,
285 M: MessageStore + 'static,
286 S: StateStore + 'static,
287{
288 #[must_use]
290 pub fn with_hooks<H2: AgentHooks + Clone + 'static>(
291 self,
292 hooks: Arc<H2>,
293 ) -> SubagentTool<P, H2, M, S> {
294 SubagentTool {
295 config: self.config,
296 provider: self.provider,
297 tools: self.tools,
298 hooks,
299 message_store_factory: self.message_store_factory,
300 state_store_factory: self.state_store_factory,
301 event_store_factory: self.event_store_factory,
302 cached_display_name: self.cached_display_name,
303 cached_description: self.cached_description,
304 }
305 }
306
307 #[must_use]
309 pub fn with_stores<M2, S2, MF, SF>(
310 self,
311 message_factory: MF,
312 state_factory: SF,
313 ) -> SubagentTool<P, H, M2, S2>
314 where
315 M2: MessageStore + 'static,
316 S2: StateStore + 'static,
317 MF: Fn() -> M2 + Send + Sync + 'static,
318 SF: Fn() -> S2 + Send + Sync + 'static,
319 {
320 SubagentTool {
321 config: self.config,
322 provider: self.provider,
323 tools: self.tools,
324 hooks: self.hooks,
325 message_store_factory: Arc::new(message_factory),
326 state_store_factory: Arc::new(state_factory),
327 event_store_factory: self.event_store_factory,
328 cached_display_name: self.cached_display_name,
329 cached_description: self.cached_description,
330 }
331 }
332
333 #[must_use]
335 pub const fn config(&self) -> &SubagentConfig {
336 &self.config
337 }
338
339 async fn run_subagent<Ctx>(
344 &self,
345 task: &str,
346 subagent_id: String,
347 parent_ctx: &ToolContext<Ctx>,
348 parent_cancel: CancellationToken,
349 ) -> Result<SubagentResult>
350 where
351 Ctx: Send + Sync + 'static,
352 {
353 use crate::agent_loop::AgentLoop;
354
355 let start = Instant::now();
356 let thread_id = ThreadId::new();
359
360 let message_store = (self.message_store_factory)();
362 let state_store = (self.state_store_factory)();
363 let event_store = (self.event_store_factory)();
364
365 let agent_config = AgentConfig {
367 max_turns: Some(self.config.max_turns.unwrap_or(100)),
368 system_prompt: self.config.system_prompt.clone(),
369 ..Default::default()
370 };
371
372 let agent = AgentLoop::new(
374 (*self.provider).clone(),
375 (*self.tools).clone(),
376 (*self.hooks).clone(),
377 message_store,
378 state_store,
379 Arc::clone(&event_store),
380 agent_config,
381 );
382
383 let tool_ctx = build_subagent_child_context(parent_ctx);
386
387 let cancel_token = parent_cancel.child_token();
391 let timeout_cancel = cancel_token.clone();
392 let (state_rx, task_handle) = agent.run_abortable(
393 thread_id.clone(),
394 AgentInput::Text(task.to_string()),
395 tool_ctx,
396 cancel_token,
397 );
398
399 let wait_result = wait_for_subagent_state(self.config.timeout_ms, start, state_rx).await;
400 let mut state = SubagentExecutionState::new();
401 let replay_events = apply_subagent_wait_outcome(
402 classify_subagent_wait_result(wait_result.as_ref()),
403 &self.config,
404 &timeout_cancel,
405 &task_handle,
406 &mut state,
407 );
408
409 if replay_events {
410 replay_subagent_events(
411 &event_store,
412 &thread_id,
413 parent_ctx,
414 &self.config,
415 &subagent_id,
416 &mut state,
417 )
418 .await?;
419 }
420
421 let result = state.into_result(self.config.name.clone(), start);
422 emit_subagent_observability(self, &result);
423 Ok(result)
424 }
425}
426
427fn build_subagent_child_context<Ctx>(parent_ctx: &ToolContext<Ctx>) -> ToolContext<()> {
435 let parent_depth = parent_ctx
436 .metadata
437 .get(METADATA_SUBAGENT_DEPTH)
438 .and_then(Value::as_u64)
439 .unwrap_or(0);
440
441 let mut child = ToolContext::new(());
442 child.metadata.clone_from(&parent_ctx.metadata);
443 child
444 .metadata
445 .insert(METADATA_SUBAGENT_DEPTH.to_string(), json!(parent_depth + 1));
446
447 if let Some(semaphore) = parent_ctx.subagent_semaphore() {
448 child = child.with_subagent_semaphore(semaphore);
449 }
450 child
451}
452
453type SubagentWaitResult = Result<
454 Result<crate::types::AgentRunState, tokio::sync::oneshot::error::RecvError>,
455 tokio::time::error::Elapsed,
456>;
457
458struct SubagentExecutionState {
459 final_response: String,
460 final_response_message_id: Option<String>,
464 total_turns: usize,
465 tool_count: u32,
466 tool_logs: Vec<ToolCallLog>,
467 pending_tools: HashMap<String, (String, String)>,
468 total_usage: TokenUsage,
469 success: bool,
470 error_details: Option<String>,
471 failed_tool: Option<String>,
472}
473
474impl SubagentExecutionState {
475 fn new() -> Self {
476 Self {
477 final_response: String::new(),
478 final_response_message_id: None,
479 total_turns: 0,
480 tool_count: 0,
481 tool_logs: Vec::new(),
482 pending_tools: HashMap::new(),
483 total_usage: TokenUsage::default(),
484 success: true,
485 error_details: None,
486 failed_tool: None,
487 }
488 }
489
490 fn fail(&mut self, response: impl Into<String>, details: String) {
493 self.final_response = response.into();
494 self.error_details = Some(details);
495 self.success = false;
496 }
497
498 fn into_result(self, name: String, start: Instant) -> SubagentResult {
499 SubagentResult {
500 name,
501 final_response: self.final_response,
502 total_turns: self.total_turns,
503 tool_count: self.tool_count,
504 tool_logs: self.tool_logs,
505 usage: self.total_usage,
506 success: self.success,
507 duration_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
508 error_details: self.error_details,
509 failed_tool: self.failed_tool,
510 }
511 }
512}
513
514fn subagent_total_tokens(total_usage: &TokenUsage) -> u64 {
515 u64::from(total_usage.input_tokens) + u64::from(total_usage.output_tokens)
516}
517
518struct SubagentProgressUpdate<'a> {
519 subagent_id: &'a str,
520 total_turns: usize,
521 total_usage: &'a TokenUsage,
522 tool_name: String,
523 tool_context: String,
524 completed: bool,
525 success: bool,
526 tool_count: u32,
527}
528
529enum SubagentWaitOutcome {
530 ReplayEvents,
531 TimedOut,
532 Disconnected,
533 Cancelled,
534 AwaitingConfirmation,
535 Error(crate::types::AgentError),
536}
537
538async fn wait_for_subagent_state(
539 timeout_ms: Option<u64>,
540 start: Instant,
541 state_rx: tokio::sync::oneshot::Receiver<crate::types::AgentRunState>,
542) -> Option<SubagentWaitResult> {
543 let timeout_duration = timeout_ms.map(Duration::from_millis);
544 if timeout_duration.is_some_and(|timeout| timeout.saturating_sub(start.elapsed()).is_zero()) {
545 return None;
546 }
547 if let Some(timeout) = timeout_duration {
548 let remaining = timeout.saturating_sub(start.elapsed());
549 Some(tokio::time::timeout(remaining, state_rx).await)
550 } else {
551 Some(Ok(state_rx.await))
552 }
553}
554
555fn classify_subagent_wait_result(wait_result: Option<&SubagentWaitResult>) -> SubagentWaitOutcome {
556 match wait_result {
557 Some(Ok(Ok(
558 crate::types::AgentRunState::Done { .. } | crate::types::AgentRunState::Refusal { .. },
559 ))) => SubagentWaitOutcome::ReplayEvents,
560 Some(Ok(Ok(crate::types::AgentRunState::Cancelled { .. }))) => {
561 SubagentWaitOutcome::Cancelled
562 }
563 Some(Ok(Ok(crate::types::AgentRunState::AwaitingConfirmation { .. }))) => {
564 SubagentWaitOutcome::AwaitingConfirmation
565 }
566 Some(Ok(Ok(crate::types::AgentRunState::Error(error)))) => {
567 SubagentWaitOutcome::Error(error.clone())
568 }
569 Some(Ok(Ok(_))) => SubagentWaitOutcome::Error(crate::types::AgentError::new(
573 "subagent returned an unrecognized run state".to_string(),
574 false,
575 )),
576 Some(Ok(Err(_))) => SubagentWaitOutcome::Disconnected,
577 None | Some(Err(_)) => SubagentWaitOutcome::TimedOut,
578 }
579}
580
581fn apply_subagent_wait_outcome(
582 outcome: SubagentWaitOutcome,
583 config: &SubagentConfig,
584 timeout_cancel: &CancellationToken,
585 task_handle: &tokio::task::JoinHandle<()>,
586 state: &mut SubagentExecutionState,
587) -> bool {
588 let (response, details) = match outcome {
591 SubagentWaitOutcome::ReplayEvents => return true,
592 SubagentWaitOutcome::TimedOut => (
593 "Subagent timed out".to_string(),
594 format!(
595 "Subagent '{}' timed out after {}ms",
596 config.name,
597 config.timeout_ms.unwrap_or(0)
598 ),
599 ),
600 SubagentWaitOutcome::Disconnected => (
601 "Subagent ended unexpectedly".to_string(),
602 format!(
603 "Subagent '{}' ended before returning a final state",
604 config.name
605 ),
606 ),
607 SubagentWaitOutcome::Cancelled => (
608 "Subagent cancelled".to_string(),
609 format!("Subagent '{}' was cancelled", config.name),
610 ),
611 SubagentWaitOutcome::AwaitingConfirmation => (
612 "Subagent requires confirmation".to_string(),
613 format!(
614 "Subagent '{}' requested confirmation, which is not supported in nested runs",
615 config.name
616 ),
617 ),
618 SubagentWaitOutcome::Error(error) => (error.message.clone(), error.message),
619 };
620
621 timeout_cancel.cancel();
623 task_handle.abort();
624 state.fail(response, details);
625 false
626}
627
628async fn replay_subagent_events<Ctx: Send + Sync + 'static>(
629 event_store: &Arc<dyn EventStore>,
630 thread_id: &ThreadId,
631 parent_ctx: &ToolContext<Ctx>,
632 config: &SubagentConfig,
633 subagent_id: &str,
634 state: &mut SubagentExecutionState,
635) -> Result<()> {
636 for envelope in event_store.get_events(thread_id).await? {
637 match envelope.event {
638 AgentEvent::Text { message_id, text } => {
639 if state.final_response_message_id.as_deref() != Some(message_id.as_str()) {
645 state.final_response.clear();
646 state.final_response_message_id = Some(message_id);
647 }
648 state.final_response.push_str(&text);
649 }
650 AgentEvent::ToolCallStart {
651 id, name, input, ..
652 } => {
653 state.tool_count += 1;
654 let context = extract_tool_context(&name, &input);
655 state
656 .pending_tools
657 .insert(id, (name.clone(), context.clone()));
658
659 emit_subagent_progress_if_possible(
660 parent_ctx,
661 config,
662 SubagentProgressUpdate {
663 subagent_id,
664 total_turns: state.total_turns,
665 total_usage: &state.total_usage,
666 tool_name: name,
667 tool_context: context,
668 completed: false,
669 success: false,
670 tool_count: state.tool_count,
671 },
672 )
673 .await;
674 }
675 AgentEvent::ToolCallEnd {
676 id,
677 name,
678 display_name,
679 result,
680 } => {
681 let context = state
682 .pending_tools
683 .remove(&id)
684 .map(|(_, ctx)| ctx)
685 .unwrap_or_default();
686 let tool_success = result.success;
687 state.tool_logs.push(ToolCallLog {
688 name: name.clone(),
689 display_name: display_name.clone(),
690 context: context.clone(),
691 result: summarize_tool_result(&name, &result),
692 success: tool_success,
693 duration_ms: result.duration_ms,
694 });
695
696 emit_subagent_progress_if_possible(
697 parent_ctx,
698 config,
699 SubagentProgressUpdate {
700 subagent_id,
701 total_turns: state.total_turns,
702 total_usage: &state.total_usage,
703 tool_name: name,
704 tool_context: context,
705 completed: true,
706 success: tool_success,
707 tool_count: state.tool_count,
708 },
709 )
710 .await;
711 }
712 AgentEvent::TurnComplete { turn, usage, .. } => {
713 state.total_turns = turn;
714 state.total_usage.add(&usage);
715 }
716 AgentEvent::Done {
717 total_turns: turns, ..
718 } => {
719 state.total_turns = turns;
720 break;
721 }
722 AgentEvent::Refusal { text, .. } => {
723 let refusal_message =
724 text.unwrap_or_else(|| "Subagent refused the request".to_string());
725 state.error_details = Some(refusal_message.clone());
726 state.final_response = refusal_message;
727 state.success = false;
728 break;
729 }
730 AgentEvent::Error { message, .. } => {
731 state.error_details = Some(message.clone());
732 state.final_response = message;
733 state.success = false;
734 break;
735 }
736 _ => {}
737 }
738 }
739 Ok(())
740}
741
742async fn emit_subagent_progress_if_possible<Ctx: Send + Sync + 'static>(
743 parent_ctx: &ToolContext<Ctx>,
744 config: &SubagentConfig,
745 update: SubagentProgressUpdate<'_>,
746) {
747 if let Err(error) = emit_subagent_progress(parent_ctx, config, update).await {
748 log::warn!("Failed to emit subagent progress event: {error}");
749 }
750}
751
752async fn emit_subagent_progress<Ctx: Send + Sync + 'static>(
753 parent_ctx: &ToolContext<Ctx>,
754 config: &SubagentConfig,
755 SubagentProgressUpdate {
756 subagent_id,
757 total_turns,
758 total_usage,
759 tool_name,
760 tool_context,
761 completed,
762 success,
763 tool_count,
764 }: SubagentProgressUpdate<'_>,
765) -> Result<()> {
766 let max_turns = config.max_turns.map(usize_to_u32_saturating);
767 let current_turn = Some(usize_to_u32_saturating(total_turns));
768
769 parent_ctx
770 .emit_event(AgentEvent::SubagentProgress {
771 subagent_id: subagent_id.to_string(),
772 subagent_name: config.name.clone(),
773 nickname: config.nickname.clone(),
774 child_thread_id: None,
775 child_root_task_id: None,
776 subagent_task_id: None,
777 max_turns,
778 current_turn,
779 model: config.model.clone(),
780 tool_name,
781 tool_context,
782 completed,
783 success,
784 tool_count,
785 total_tokens: subagent_total_tokens(total_usage),
786 })
787 .await
788}
789
790fn usize_to_u32_saturating(value: usize) -> u32 {
791 u32::try_from(value).unwrap_or(u32::MAX)
792}
793
794#[cfg(feature = "otel")]
795fn emit_subagent_observability<P, H, M, S>(tool: &SubagentTool<P, H, M, S>, result: &SubagentResult)
796where
797 P: LlmProvider + Clone + 'static,
798 H: AgentHooks + Clone + 'static,
799 M: MessageStore + 'static,
800 S: StateStore + 'static,
801{
802 use crate::observability::{attrs, baggage, langfuse, metrics, provider_name, spans};
803 use opentelemetry::Context;
804 use opentelemetry::KeyValue;
805 use opentelemetry::trace::{Span, TraceContextExt};
806
807 let parent_ctx = Context::current();
813 let parent_span_context = parent_ctx.span().span_context().clone();
814
815 let normalized_provider_name = provider_name::normalize(tool.provider.provider());
816 let request_model = tool.provider.model().to_string();
817 let agent_name = tool.config.name.clone();
818
819 let mut span = spans::start_internal_span(
820 "invoke_agent",
821 vec![
822 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
823 KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name.clone()),
824 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, normalized_provider_name),
825 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.clone()),
826 KeyValue::new(attrs::SDK_RUN_MODE, "loop"),
827 ],
828 );
829 baggage::copy_baggage_to_active_span(&mut span);
830 langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
831 if parent_span_context.is_valid() {
832 spans::link_to_parent_turn(
833 &mut span,
834 &parent_span_context.trace_id().to_string(),
835 &parent_span_context.span_id().to_string(),
836 );
837 }
838 let outcome = if result.success { "done" } else { "error" };
839 span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
840 span.set_attribute(attrs::kv_i64(
841 attrs::SDK_TOTAL_TURNS,
842 i64::try_from(result.total_turns).unwrap_or(0),
843 ));
844 span.set_attribute(attrs::kv_i64(
845 attrs::GEN_AI_USAGE_INPUT_TOKENS,
846 i64::from(result.usage.input_tokens),
847 ));
848 span.set_attribute(attrs::kv_i64(
849 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
850 i64::from(result.usage.output_tokens),
851 ));
852 if outcome == "error" {
853 spans::set_span_error(&mut span, "agent_error", "subagent invocation failed");
854 }
855 span.end();
856
857 let metrics_handle = metrics::Metrics::global();
866 metrics_handle.subagent_invocations.add(
867 1,
868 &[
869 KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name),
870 KeyValue::new(attrs::SDK_OUTCOME, outcome),
871 ],
872 );
873 record_subagent_token_usage(
874 &metrics_handle,
875 result,
876 normalized_provider_name,
877 &request_model,
878 );
879}
880
881#[cfg(feature = "otel")]
882fn record_subagent_token_usage(
883 metrics: &crate::observability::metrics::Metrics,
884 result: &SubagentResult,
885 provider_name: &'static str,
886 request_model: &str,
887) {
888 use crate::observability::attrs;
889 use opentelemetry::KeyValue;
890
891 let entries: [(u32, &'static str); 2] = [
892 (result.usage.input_tokens, "input"),
893 (result.usage.output_tokens, "output"),
894 ];
895
896 for (count, token_type) in entries {
897 if count == 0 {
898 continue;
899 }
900 metrics.token_usage.record(
901 u64::from(count),
902 &[
903 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
904 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider_name),
905 KeyValue::new("gen_ai.token.type", token_type),
906 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.to_string()),
907 ],
908 );
909 }
910}
911
912#[cfg(not(feature = "otel"))]
913const fn emit_subagent_observability<P, H, M, S>(
914 _tool: &SubagentTool<P, H, M, S>,
915 _result: &SubagentResult,
916) where
917 P: LlmProvider + Clone + 'static,
918 H: AgentHooks + Clone + 'static,
919 M: MessageStore + 'static,
920 S: StateStore + 'static,
921{
922}
923
924fn extract_tool_context(name: &str, input: &Value) -> String {
926 match name {
927 "read" => input
928 .get("file_path")
929 .or_else(|| input.get("path"))
930 .and_then(Value::as_str)
931 .unwrap_or("")
932 .to_string(),
933 "write" | "edit" => input
934 .get("file_path")
935 .or_else(|| input.get("path"))
936 .and_then(Value::as_str)
937 .unwrap_or("")
938 .to_string(),
939 "bash" => {
940 let cmd = input.get("command").and_then(Value::as_str).unwrap_or("");
941 if cmd.len() > 60 {
943 format!("{}...", crate::primitive_tools::truncate_str(cmd, 57))
944 } else {
945 cmd.to_string()
946 }
947 }
948 "glob" | "grep" => input
949 .get("pattern")
950 .and_then(Value::as_str)
951 .unwrap_or("")
952 .to_string(),
953 "web_search" => input
954 .get("query")
955 .and_then(Value::as_str)
956 .unwrap_or("")
957 .to_string(),
958 _ => String::new(),
959 }
960}
961
962fn summarize_tool_result(name: &str, result: &ToolResult) -> String {
964 if !result.success {
965 let first_line = result.output.lines().next().unwrap_or("Error");
966 return if first_line.len() > 50 {
967 format!(
968 "{}...",
969 crate::primitive_tools::truncate_str(first_line, 47)
970 )
971 } else {
972 first_line.to_string()
973 };
974 }
975
976 match name {
977 "read" => {
978 let line_count = result.output.lines().count();
979 format!("{line_count} lines")
980 }
981 "write" => "wrote file".to_string(),
982 "edit" => "edited".to_string(),
983 "bash" => {
984 let lines: Vec<&str> = result.output.lines().collect();
985 if lines.is_empty() {
986 "done".to_string()
987 } else if lines.len() == 1 {
988 let line = lines[0];
989 if line.len() > 50 {
990 format!("{}...", crate::primitive_tools::truncate_str(line, 47))
991 } else {
992 line.to_string()
993 }
994 } else {
995 format!("{} lines", lines.len())
996 }
997 }
998 "glob" => {
999 let count = result.output.lines().count();
1000 format!("{count} files")
1001 }
1002 "grep" => {
1003 let count = result.output.lines().count();
1004 format!("{count} matches")
1005 }
1006 _ => {
1007 let line_count = result.output.lines().count();
1008 if line_count == 0 {
1009 "done".to_string()
1010 } else {
1011 format!("{line_count} lines")
1012 }
1013 }
1014 }
1015}
1016
1017impl<P, H, M, S, Ctx> Tool<Ctx> for SubagentTool<P, H, M, S>
1018where
1019 P: LlmProvider + Clone + 'static,
1020 H: AgentHooks + Clone + 'static,
1021 M: MessageStore + 'static,
1022 S: StateStore + 'static,
1023 Ctx: Send + Sync + 'static,
1024{
1025 type Name = DynamicToolName;
1026
1027 fn name(&self) -> DynamicToolName {
1028 DynamicToolName::new(format!("subagent_{}", self.config.name))
1029 }
1030
1031 fn display_name(&self) -> &'static str {
1032 self.cached_display_name
1033 }
1034
1035 fn description(&self) -> &'static str {
1036 self.cached_description
1037 }
1038
1039 fn input_schema(&self) -> Value {
1040 json!({
1041 "type": "object",
1042 "properties": {
1043 "task": {
1044 "type": "string",
1045 "description": "The task or question for the subagent to handle"
1046 }
1047 },
1048 "required": ["task"]
1049 })
1050 }
1051
1052 fn tier(&self) -> ToolTier {
1053 ToolTier::Confirm
1055 }
1056
1057 async fn execute(&self, ctx: &ToolContext<Ctx>, input: Value) -> Result<ToolResult> {
1058 let task = input
1059 .get("task")
1060 .and_then(Value::as_str)
1061 .context("Missing 'task' parameter")?;
1062
1063 let current_depth = ctx
1065 .metadata
1066 .get(METADATA_SUBAGENT_DEPTH)
1067 .and_then(Value::as_u64)
1068 .unwrap_or(0);
1069 let max_depth = ctx
1070 .metadata
1071 .get(METADATA_MAX_SUBAGENT_DEPTH)
1072 .and_then(Value::as_u64)
1073 .unwrap_or(3); if current_depth >= max_depth {
1076 bail!(
1077 "Subagent depth limit exceeded ({current_depth}/{max_depth}). \
1078 Cannot spawn nested subagent '{}' — maximum nesting depth reached.",
1079 self.config.name
1080 );
1081 }
1082
1083 let _permit = if let Some(ref sem) = ctx.subagent_semaphore() {
1085 match sem.clone().try_acquire_owned() {
1086 Ok(permit) => Some(permit),
1087 Err(_) => {
1088 return Ok(ToolResult {
1089 success: false,
1090 output: format!(
1091 "Cannot spawn subagent '{}': maximum concurrent subagent limit reached. \
1092 Try again when another subagent completes.",
1093 self.config.name
1094 ),
1095 data: None,
1096 documents: Vec::new(),
1097 duration_ms: Some(0),
1098 });
1099 }
1100 }
1101 } else {
1102 None
1103 };
1104
1105 let subagent_id = format!(
1107 "{}_{:x}",
1108 self.config.name,
1109 std::time::SystemTime::now()
1110 .duration_since(std::time::UNIX_EPOCH)
1111 .unwrap_or_default()
1112 .as_nanos()
1113 );
1114
1115 let cancel_token = ctx.cancel_token().unwrap_or_default();
1118
1119 let result = self
1120 .run_subagent(task, subagent_id, ctx, cancel_token)
1121 .await?;
1122
1123 Ok(ToolResult {
1124 success: result.success,
1125 output: result.final_response.clone(),
1126 data: Some(serde_json::to_value(&result).unwrap_or_default()),
1127 documents: Vec::new(),
1128 duration_ms: Some(result.duration_ms),
1129 })
1130 }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136 use crate::authority::{EventAuthority, LocalEventAuthority};
1137 use crate::events::{AgentEvent, AgentEventEnvelope};
1138 use crate::llm::{ChatOutcome, ChatRequest, ChatResponse, ContentBlock, StopReason, Usage};
1139 use crate::stores::{EventStore, InMemoryEventStore, StoredTurnEvents};
1140 use anyhow::{Context, Result, bail};
1141 use async_trait::async_trait;
1142 use tokio::sync::Mutex;
1143
1144 #[derive(Clone)]
1145 struct TestProvider {
1146 responses: Arc<Mutex<Vec<ChatOutcome>>>,
1147 delay: Option<Duration>,
1148 }
1149
1150 impl TestProvider {
1151 fn new(responses: Vec<ChatOutcome>) -> Self {
1152 Self {
1153 responses: Arc::new(Mutex::new(responses)),
1154 delay: None,
1155 }
1156 }
1157
1158 fn with_delay(mut self, delay: Duration) -> Self {
1159 self.delay = Some(delay);
1160 self
1161 }
1162
1163 fn text_response(text: &str) -> ChatOutcome {
1164 ChatOutcome::Success(ChatResponse {
1165 id: "resp_text".to_string(),
1166 content: vec![ContentBlock::Text {
1167 text: text.to_string(),
1168 }],
1169 model: "test-model".to_string(),
1170 stop_reason: Some(StopReason::EndTurn),
1171 usage: Usage {
1172 input_tokens: 10,
1173 output_tokens: 20,
1174 cached_input_tokens: 0,
1175 cache_creation_input_tokens: 0,
1176 },
1177 })
1178 }
1179
1180 fn tool_use_response(tool_id: &str, tool_name: &str, input: Value) -> ChatOutcome {
1181 ChatOutcome::Success(ChatResponse {
1182 id: "resp_tool".to_string(),
1183 content: vec![ContentBlock::ToolUse {
1184 id: tool_id.to_string(),
1185 name: tool_name.to_string(),
1186 input,
1187 thought_signature: None,
1188 }],
1189 model: "test-model".to_string(),
1190 stop_reason: Some(StopReason::ToolUse),
1191 usage: Usage {
1192 input_tokens: 15,
1193 output_tokens: 25,
1194 cached_input_tokens: 0,
1195 cache_creation_input_tokens: 0,
1196 },
1197 })
1198 }
1199
1200 fn refusal_response(text: Option<&str>) -> ChatOutcome {
1201 let content = text.map_or_else(Vec::new, |text| {
1202 vec![ContentBlock::Text {
1203 text: text.to_string(),
1204 }]
1205 });
1206 ChatOutcome::Success(ChatResponse {
1207 id: "resp_refusal".to_string(),
1208 content,
1209 model: "test-model".to_string(),
1210 stop_reason: Some(StopReason::Refusal),
1211 usage: Usage {
1212 input_tokens: 12,
1213 output_tokens: 0,
1214 cached_input_tokens: 0,
1215 cache_creation_input_tokens: 0,
1216 },
1217 })
1218 }
1219 }
1220
1221 #[async_trait]
1222 impl LlmProvider for TestProvider {
1223 async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1224 if let Some(delay) = self.delay {
1225 tokio::time::sleep(delay).await;
1226 }
1227
1228 let mut responses = self.responses.lock().await;
1229 if responses.is_empty() {
1230 Ok(Self::text_response("default"))
1231 } else {
1232 Ok(responses.remove(0))
1233 }
1234 }
1235
1236 fn model(&self) -> &'static str {
1237 "test-model"
1238 }
1239
1240 fn provider(&self) -> &'static str {
1241 "mock"
1242 }
1243 }
1244
1245 struct TestEchoTool;
1246
1247 impl Tool<()> for TestEchoTool {
1248 type Name = DynamicToolName;
1249
1250 fn name(&self) -> DynamicToolName {
1251 DynamicToolName::new("echo")
1252 }
1253
1254 fn display_name(&self) -> &'static str {
1255 "Echo"
1256 }
1257
1258 fn description(&self) -> &'static str {
1259 "Echo the input"
1260 }
1261
1262 fn input_schema(&self) -> Value {
1263 json!({
1264 "type": "object",
1265 "properties": {
1266 "message": { "type": "string" }
1267 },
1268 "required": ["message"]
1269 })
1270 }
1271
1272 fn tier(&self) -> ToolTier {
1273 ToolTier::Observe
1274 }
1275
1276 async fn execute(&self, _ctx: &ToolContext<()>, input: Value) -> Result<ToolResult> {
1277 let message = input
1278 .get("message")
1279 .and_then(Value::as_str)
1280 .context("missing echo message")?;
1281 Ok(ToolResult::success(format!("Echo: {message}")))
1282 }
1283 }
1284
1285 #[derive(Clone, Default)]
1286 struct RecordingEventStore {
1287 inner: Arc<InMemoryEventStore>,
1288 appended: Arc<Mutex<Vec<(ThreadId, usize, AgentEventEnvelope)>>>,
1289 }
1290
1291 impl RecordingEventStore {
1292 async fn appended_events(&self) -> Vec<(ThreadId, usize, AgentEventEnvelope)> {
1293 self.appended.lock().await.clone()
1294 }
1295 }
1296
1297 #[async_trait]
1298 impl EventStore for RecordingEventStore {
1299 async fn append(
1300 &self,
1301 thread_id: &ThreadId,
1302 turn: usize,
1303 envelope: AgentEventEnvelope,
1304 ) -> Result<()> {
1305 self.appended
1306 .lock()
1307 .await
1308 .push((thread_id.clone(), turn, envelope.clone()));
1309 self.inner.append(thread_id, turn, envelope).await
1310 }
1311
1312 async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1313 self.inner.finish_turn(thread_id, turn).await
1314 }
1315
1316 async fn get_turn(
1317 &self,
1318 thread_id: &ThreadId,
1319 turn: usize,
1320 ) -> Result<Option<StoredTurnEvents>> {
1321 self.inner.get_turn(thread_id, turn).await
1322 }
1323
1324 async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1325 self.inner.get_turns(thread_id).await
1326 }
1327
1328 async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1329 self.inner.clear(thread_id).await
1330 }
1331 }
1332
1333 #[derive(Clone, Default)]
1334 struct AlwaysFailAppendEventStore;
1335
1336 #[async_trait]
1337 impl EventStore for AlwaysFailAppendEventStore {
1338 async fn append(
1339 &self,
1340 _thread_id: &ThreadId,
1341 _turn: usize,
1342 _envelope: AgentEventEnvelope,
1343 ) -> Result<()> {
1344 bail!("append failed")
1345 }
1346
1347 async fn finish_turn(&self, _thread_id: &ThreadId, _turn: usize) -> Result<()> {
1348 Ok(())
1349 }
1350
1351 async fn get_turn(
1352 &self,
1353 _thread_id: &ThreadId,
1354 _turn: usize,
1355 ) -> Result<Option<StoredTurnEvents>> {
1356 Ok(None)
1357 }
1358
1359 async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1360 Ok(Vec::new())
1361 }
1362
1363 async fn clear(&self, _thread_id: &ThreadId) -> Result<()> {
1364 Ok(())
1365 }
1366 }
1367
1368 #[derive(Clone, Default)]
1369 struct NoReadAfterFailureEventStore {
1370 inner: Arc<InMemoryEventStore>,
1371 }
1372
1373 #[async_trait]
1374 impl EventStore for NoReadAfterFailureEventStore {
1375 async fn append(
1376 &self,
1377 thread_id: &ThreadId,
1378 turn: usize,
1379 envelope: AgentEventEnvelope,
1380 ) -> Result<()> {
1381 self.inner.append(thread_id, turn, envelope).await
1382 }
1383
1384 async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1385 self.inner.finish_turn(thread_id, turn).await
1386 }
1387
1388 async fn get_turn(
1389 &self,
1390 thread_id: &ThreadId,
1391 turn: usize,
1392 ) -> Result<Option<StoredTurnEvents>> {
1393 self.inner.get_turn(thread_id, turn).await
1394 }
1395
1396 async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1397 bail!("get_events should not be called after subagent failure")
1398 }
1399
1400 async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1401 self.inner.clear(thread_id).await
1402 }
1403 }
1404
1405 #[derive(Clone, Default)]
1406 struct PanicProvider;
1407
1408 #[async_trait]
1409 impl LlmProvider for PanicProvider {
1410 async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1411 panic!("panic provider should disconnect subagent");
1417 }
1418
1419 fn model(&self) -> &'static str {
1420 "panic-model"
1421 }
1422
1423 fn provider(&self) -> &'static str {
1424 "panic"
1425 }
1426 }
1427
1428 #[test]
1429 fn test_subagent_config_builder() {
1430 let config = SubagentConfig::new("test")
1431 .with_system_prompt("Test prompt")
1432 .with_max_turns(5)
1433 .with_timeout_ms(30000);
1434
1435 assert_eq!(config.name, "test");
1436 assert_eq!(config.system_prompt, "Test prompt");
1437 assert_eq!(config.max_turns, Some(5));
1438 assert_eq!(config.timeout_ms, Some(30000));
1439 }
1440
1441 #[test]
1442 fn test_subagent_config_defaults() {
1443 let config = SubagentConfig::new("default");
1444
1445 assert_eq!(config.name, "default");
1446 assert!(config.system_prompt.is_empty());
1447 assert_eq!(config.max_turns, None);
1448 assert_eq!(config.timeout_ms, None);
1449 }
1450
1451 #[test]
1452 fn test_subagent_result_serialization() -> Result<()> {
1453 let result = SubagentResult {
1454 name: "test".to_string(),
1455 final_response: "Done".to_string(),
1456 total_turns: 3,
1457 tool_count: 5,
1458 tool_logs: vec![
1459 ToolCallLog {
1460 name: "read".to_string(),
1461 display_name: "Read file".to_string(),
1462 context: "/tmp/test.rs".to_string(),
1463 result: "50 lines".to_string(),
1464 success: true,
1465 duration_ms: Some(10),
1466 },
1467 ToolCallLog {
1468 name: "grep".to_string(),
1469 display_name: "Grep TODO".to_string(),
1470 context: "TODO".to_string(),
1471 result: "3 matches".to_string(),
1472 success: true,
1473 duration_ms: Some(5),
1474 },
1475 ],
1476 usage: TokenUsage::default(),
1477 success: true,
1478 duration_ms: 1000,
1479 error_details: None,
1480 failed_tool: None,
1481 };
1482
1483 let json = serde_json::to_string(&result).context("failed to serialize subagent result")?;
1484 assert!(json.contains("test"));
1485 assert!(json.contains("Done"));
1486 assert!(json.contains("tool_count"));
1487 assert!(json.contains("tool_logs"));
1488 assert!(json.contains("/tmp/test.rs"));
1489
1490 Ok(())
1491 }
1492
1493 #[test]
1494 fn test_subagent_result_field_extraction() -> Result<()> {
1495 let result = SubagentResult {
1496 name: "explore".to_string(),
1497 final_response: "Found 3 config files".to_string(),
1498 total_turns: 2,
1499 tool_count: 5,
1500 tool_logs: vec![ToolCallLog {
1501 name: "glob".to_string(),
1502 display_name: "Glob config files".to_string(),
1503 context: "**/*.toml".to_string(),
1504 result: "3 files".to_string(),
1505 success: true,
1506 duration_ms: Some(15),
1507 }],
1508 usage: TokenUsage {
1509 input_tokens: 1500,
1510 output_tokens: 500,
1511 ..Default::default()
1512 },
1513 success: true,
1514 duration_ms: 2500,
1515 error_details: None,
1516 failed_tool: None,
1517 };
1518
1519 let value =
1520 serde_json::to_value(&result).context("failed to convert subagent result to json")?;
1521
1522 let tool_count = value.get("tool_count").and_then(Value::as_u64);
1523 assert_eq!(tool_count, Some(5));
1524
1525 let usage = value.get("usage").context("missing usage field")?;
1526 let input_tokens = usage.get("input_tokens").and_then(Value::as_u64);
1527 let output_tokens = usage.get("output_tokens").and_then(Value::as_u64);
1528 assert_eq!(input_tokens, Some(1500));
1529 assert_eq!(output_tokens, Some(500));
1530
1531 let logs = value
1532 .get("tool_logs")
1533 .and_then(Value::as_array)
1534 .context("missing tool_logs array")?;
1535 assert_eq!(logs.len(), 1);
1536
1537 let first_log = &logs[0];
1538 assert_eq!(first_log.get("name").and_then(Value::as_str), Some("glob"));
1539 assert_eq!(
1540 first_log.get("context").and_then(Value::as_str),
1541 Some("**/*.toml")
1542 );
1543 assert_eq!(
1544 first_log.get("result").and_then(Value::as_str),
1545 Some("3 files")
1546 );
1547 assert_eq!(
1548 first_log.get("success").and_then(Value::as_bool),
1549 Some(true)
1550 );
1551
1552 Ok(())
1553 }
1554
1555 #[tokio::test]
1556 async fn test_run_subagent_uses_isolated_child_thread() -> Result<()> {
1557 let event_store = Arc::new(RecordingEventStore::default());
1558 let provider = Arc::new(TestProvider::new(vec![
1559 TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1560 TestProvider::text_response("Subagent complete"),
1561 ]));
1562 let mut tools = ToolRegistry::new();
1563 tools.register(TestEchoTool);
1564
1565 let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1566 let store = Arc::clone(&event_store);
1567 move || -> Arc<dyn EventStore> { store.clone() }
1568 });
1569 let parent_thread = ThreadId::new();
1570 let parent_ctx = ToolContext::new(()).with_event_store(
1571 event_store.clone(),
1572 parent_thread.clone(),
1573 1,
1574 Arc::new(LocalEventAuthority::new()),
1575 );
1576
1577 let result = tool
1578 .run_subagent(
1579 "Inspect the repo",
1580 "subagent_1".to_string(),
1581 &parent_ctx,
1582 CancellationToken::new(),
1583 )
1584 .await?;
1585
1586 assert!(result.success);
1587 assert_eq!(result.tool_count, 1);
1588 assert_eq!(result.tool_logs.len(), 1);
1589
1590 let parent_turn = event_store
1591 .get_turn(&parent_thread, 1)
1592 .await?
1593 .context("missing parent turn")?;
1594 assert!(!parent_turn.events.is_empty());
1595 assert!(
1596 parent_turn
1597 .events
1598 .iter()
1599 .all(|envelope| { matches!(envelope.event, AgentEvent::SubagentProgress { .. }) })
1600 );
1601
1602 let appended = event_store.appended_events().await;
1603 let child_thread = appended
1604 .iter()
1605 .map(|(thread_id, _, _)| thread_id.clone())
1606 .find(|thread_id| thread_id != &parent_thread)
1607 .context("missing child thread events")?;
1608 let child_turn = event_store
1609 .get_turn(&child_thread, 1)
1610 .await?
1611 .context("missing child turn")?;
1612 let child_events = event_store.get_events(&child_thread).await?;
1613
1614 assert!(
1615 child_turn
1616 .events
1617 .iter()
1618 .any(|envelope| { matches!(envelope.event, AgentEvent::ToolCallStart { .. }) })
1619 );
1620 assert!(
1621 child_events
1622 .iter()
1623 .any(|envelope| { matches!(envelope.event, AgentEvent::Done { .. }) })
1624 );
1625
1626 Ok(())
1627 }
1628
1629 #[tokio::test]
1630 async fn test_run_subagent_timeout_marks_result_as_failed() -> Result<()> {
1631 let event_store = Arc::new(NoReadAfterFailureEventStore::default());
1632 let provider = Arc::new(
1633 TestProvider::new(vec![TestProvider::text_response("Too late")])
1634 .with_delay(Duration::from_millis(50)),
1635 );
1636 let tool = SubagentTool::new(
1637 SubagentConfig::new("worker").with_timeout_ms(10),
1638 provider,
1639 Arc::new(ToolRegistry::<()>::new()),
1640 {
1641 let store = Arc::clone(&event_store);
1642 move || -> Arc<dyn EventStore> { store.clone() }
1643 },
1644 );
1645
1646 let result = tool
1647 .run_subagent(
1648 "Take too long",
1649 "subagent_timeout".to_string(),
1650 &ToolContext::new(()),
1651 CancellationToken::new(),
1652 )
1653 .await?;
1654
1655 assert!(!result.success);
1656 assert_eq!(result.final_response, "Subagent timed out");
1657 assert!(
1658 result
1659 .error_details
1660 .context("missing timeout details")?
1661 .contains("timed out")
1662 );
1663
1664 Ok(())
1665 }
1666
1667 #[tokio::test]
1668 async fn test_run_subagent_progress_failures_do_not_abort_successful_runs() -> Result<()> {
1669 let provider = Arc::new(TestProvider::new(vec![
1670 TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1671 TestProvider::text_response("Subagent complete"),
1672 ]));
1673 let mut tools = ToolRegistry::new();
1674 tools.register(TestEchoTool);
1675
1676 let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1677 move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) }
1678 });
1679 let parent_ctx = ToolContext::new(()).with_event_store(
1680 Arc::new(AlwaysFailAppendEventStore),
1681 ThreadId::new(),
1682 1,
1683 Arc::new(LocalEventAuthority::new()),
1684 );
1685
1686 let result = tool
1687 .run_subagent(
1688 "Inspect the repo",
1689 "subagent_progress".to_string(),
1690 &parent_ctx,
1691 CancellationToken::new(),
1692 )
1693 .await?;
1694
1695 assert!(result.success);
1696 assert_eq!(result.final_response, "Subagent complete");
1697 assert_eq!(result.tool_count, 1);
1698
1699 Ok(())
1700 }
1701
1702 #[tokio::test]
1703 async fn test_run_subagent_panic_classified_as_error_not_disconnected() -> Result<()> {
1704 let tool = SubagentTool::new(
1713 SubagentConfig::new("worker"),
1714 Arc::new(PanicProvider),
1715 Arc::new(ToolRegistry::<()>::new()),
1716 move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
1717 );
1718
1719 let result = tool
1720 .run_subagent(
1721 "Crash",
1722 "subagent_panic".to_string(),
1723 &ToolContext::new(()),
1724 CancellationToken::new(),
1725 )
1726 .await?;
1727
1728 assert!(!result.success);
1729 assert_ne!(result.final_response, "Subagent ended unexpectedly");
1732 let details = result
1733 .error_details
1734 .context("panicking subagent must carry structured error details")?;
1735 assert!(
1736 !details.contains("ended before returning a final state"),
1737 "panic must not be classified as Disconnected; got {details:?}",
1738 );
1739 assert!(
1740 details.contains("panicked"),
1741 "structured error should reflect the panic; got {details:?}",
1742 );
1743 assert!(
1744 details.contains("panic provider should disconnect subagent"),
1745 "structured error should carry the original panic message; got {details:?}",
1746 );
1747
1748 Ok(())
1749 }
1750
1751 #[tokio::test]
1752 async fn test_run_subagent_refusal_marks_result_as_failed() -> Result<()> {
1753 let tool = SubagentTool::new(
1754 SubagentConfig::new("worker"),
1755 Arc::new(TestProvider::new(vec![TestProvider::refusal_response(
1756 Some("Refused for policy reasons"),
1757 )])),
1758 Arc::new(ToolRegistry::<()>::new()),
1759 || Arc::new(InMemoryEventStore::new()),
1760 );
1761
1762 let result = tool
1763 .run_subagent(
1764 "Refuse",
1765 "subagent_refusal".to_string(),
1766 &ToolContext::new(()),
1767 CancellationToken::new(),
1768 )
1769 .await?;
1770
1771 assert!(!result.success);
1772 assert_eq!(result.final_response, "Refused for policy reasons");
1773 assert_eq!(
1774 result.error_details.as_deref(),
1775 Some("Refused for policy reasons")
1776 );
1777
1778 Ok(())
1779 }
1780
1781 #[tokio::test]
1782 async fn test_run_subagent_cancelled_marks_result_as_failed() -> Result<()> {
1783 let tool = SubagentTool::new(
1784 SubagentConfig::new("worker"),
1785 Arc::new(
1786 TestProvider::new(vec![TestProvider::text_response("Too late")])
1787 .with_delay(Duration::from_millis(50)),
1788 ),
1789 Arc::new(ToolRegistry::<()>::new()),
1790 || Arc::new(InMemoryEventStore::new()),
1791 );
1792 let cancel_token = CancellationToken::new();
1793 cancel_token.cancel();
1794
1795 let result = tool
1796 .run_subagent(
1797 "Cancel",
1798 "subagent_cancelled".to_string(),
1799 &ToolContext::new(()),
1800 cancel_token,
1801 )
1802 .await?;
1803
1804 assert!(!result.success);
1805 assert_eq!(result.final_response, "Subagent cancelled");
1806 assert!(
1807 result
1808 .error_details
1809 .context("missing cancellation details")?
1810 .contains("cancelled")
1811 );
1812
1813 Ok(())
1814 }
1815
1816 #[tokio::test]
1817 async fn test_run_subagent_llm_error_does_not_infer_failed_tool() -> Result<()> {
1818 let provider = Arc::new(TestProvider::new(vec![
1819 ChatOutcome::ServerError("llm transport failed".to_string()),
1820 ChatOutcome::ServerError("llm transport failed".to_string()),
1821 ChatOutcome::ServerError("llm transport failed".to_string()),
1822 ChatOutcome::ServerError("llm transport failed".to_string()),
1823 ChatOutcome::ServerError("llm transport failed".to_string()),
1824 ChatOutcome::ServerError("llm transport failed".to_string()),
1825 ]));
1826 let mut tools = ToolRegistry::new();
1827 tools.register(TestEchoTool);
1828
1829 let tool = SubagentTool::new(
1830 SubagentConfig::new("worker"),
1831 provider,
1832 Arc::new(tools),
1833 || Arc::new(InMemoryEventStore::new()),
1834 );
1835
1836 let result = tool
1837 .run_subagent(
1838 "Trigger an llm failure",
1839 "subagent_llm_error".to_string(),
1840 &ToolContext::new(()),
1841 CancellationToken::new(),
1842 )
1843 .await?;
1844
1845 assert!(!result.success);
1846 assert!(result.failed_tool.is_none());
1847 assert!(
1848 result
1849 .error_details
1850 .as_deref()
1851 .unwrap_or_default()
1852 .contains("Server error")
1853 );
1854
1855 Ok(())
1856 }
1857
1858 #[tokio::test]
1859 async fn test_replay_subagent_events_stops_after_error() -> Result<()> {
1860 let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1861 let thread_id = ThreadId::new();
1862 let authority = LocalEventAuthority::new();
1863 event_store
1864 .append(
1865 &thread_id,
1866 1,
1867 authority.wrap(AgentEvent::Error {
1868 message: "subagent boom".to_string(),
1869 recoverable: false,
1870 }),
1871 )
1872 .await?;
1873 event_store
1874 .append(
1875 &thread_id,
1876 1,
1877 authority.wrap(AgentEvent::Text {
1878 message_id: "msg_after_error".to_string(),
1879 text: "should not be appended".to_string(),
1880 }),
1881 )
1882 .await?;
1883
1884 let mut state = SubagentExecutionState::new();
1885 replay_subagent_events(
1886 &event_store,
1887 &thread_id,
1888 &ToolContext::new(()),
1889 &SubagentConfig::new("worker"),
1890 "subagent_error",
1891 &mut state,
1892 )
1893 .await?;
1894
1895 assert!(!state.success);
1896 assert_eq!(state.final_response, "subagent boom");
1897 assert_eq!(state.error_details.as_deref(), Some("subagent boom"));
1898
1899 Ok(())
1900 }
1901
1902 #[tokio::test]
1903 async fn test_replay_keeps_only_final_message_text() -> Result<()> {
1904 let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1905 let thread_id = ThreadId::new();
1906 let authority = LocalEventAuthority::new();
1907
1908 event_store
1910 .append(
1911 &thread_id,
1912 1,
1913 authority.wrap(AgentEvent::Text {
1914 message_id: "m1".to_string(),
1915 text: "Let me check the repo...".to_string(),
1916 }),
1917 )
1918 .await?;
1919 event_store
1921 .append(
1922 &thread_id,
1923 2,
1924 authority.wrap(AgentEvent::Text {
1925 message_id: "m2".to_string(),
1926 text: "Final answer ".to_string(),
1927 }),
1928 )
1929 .await?;
1930 event_store
1931 .append(
1932 &thread_id,
1933 2,
1934 authority.wrap(AgentEvent::Text {
1935 message_id: "m2".to_string(),
1936 text: "part two".to_string(),
1937 }),
1938 )
1939 .await?;
1940 event_store
1941 .append(
1942 &thread_id,
1943 2,
1944 authority.wrap(AgentEvent::done(
1945 thread_id.clone(),
1946 2,
1947 TokenUsage::default(),
1948 Duration::from_millis(1),
1949 )),
1950 )
1951 .await?;
1952
1953 let mut state = SubagentExecutionState::new();
1954 replay_subagent_events(
1955 &event_store,
1956 &thread_id,
1957 &ToolContext::new(()),
1958 &SubagentConfig::new("worker"),
1959 "subagent_final",
1960 &mut state,
1961 )
1962 .await?;
1963
1964 assert_eq!(state.final_response, "Final answer part two");
1967
1968 Ok(())
1969 }
1970
1971 #[test]
1972 fn build_subagent_child_context_increments_depth_and_propagates_semaphore() {
1973 let semaphore = Arc::new(tokio::sync::Semaphore::new(2));
1974 let parent = ToolContext::new(())
1975 .with_metadata(METADATA_SUBAGENT_DEPTH, json!(2))
1976 .with_metadata(METADATA_MAX_SUBAGENT_DEPTH, json!(5))
1977 .with_subagent_semaphore(Arc::clone(&semaphore));
1978
1979 let child = build_subagent_child_context(&parent);
1980
1981 assert_eq!(
1982 child
1983 .metadata
1984 .get(METADATA_SUBAGENT_DEPTH)
1985 .and_then(Value::as_u64),
1986 Some(3)
1987 );
1988 assert_eq!(
1990 child
1991 .metadata
1992 .get(METADATA_MAX_SUBAGENT_DEPTH)
1993 .and_then(Value::as_u64),
1994 Some(5)
1995 );
1996 assert!(child.subagent_semaphore().is_some());
1998 }
1999
2000 #[test]
2001 fn cached_tool_strings_are_interned_per_name() {
2002 let a = cached_tool_strings("worker");
2003 let b = cached_tool_strings("worker");
2004 assert!(std::ptr::eq(a.0, b.0));
2007 assert!(std::ptr::eq(a.1, b.1));
2008 assert_eq!(a.0, "Subagent: worker");
2009
2010 let c = cached_tool_strings("a-different-name");
2011 assert!(!std::ptr::eq(a.0, c.0));
2012 }
2013
2014 #[tokio::test]
2015 async fn test_nested_subagent_depth_limit_propagates() -> Result<()> {
2016 use crate::hooks::AllowAllHooks;
2017
2018 let inner = SubagentTool::new(
2020 SubagentConfig::new("inner"),
2021 Arc::new(TestProvider::new(vec![TestProvider::text_response(
2022 "inner ran (should not happen)",
2023 )])),
2024 Arc::new(ToolRegistry::<()>::new()),
2025 || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
2026 );
2027 let mut middle_tools = ToolRegistry::new();
2028 middle_tools.register(inner);
2029
2030 let outer = SubagentTool::new(
2034 SubagentConfig::new("outer"),
2035 Arc::new(TestProvider::new(vec![
2036 TestProvider::tool_use_response(
2037 "call_inner",
2038 "subagent_inner",
2039 json!({ "task": "go deeper" }),
2040 ),
2041 TestProvider::text_response("outer done"),
2042 ])),
2043 Arc::new(middle_tools),
2044 || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
2045 )
2046 .with_hooks(Arc::new(AllowAllHooks));
2047
2048 let parent_ctx = ToolContext::new(()).with_metadata(METADATA_MAX_SUBAGENT_DEPTH, json!(1));
2051
2052 let result = outer
2053 .execute(&parent_ctx, json!({ "task": "start" }))
2054 .await?;
2055
2056 assert!(result.success, "outer subagent should still complete");
2057 assert_eq!(result.output, "outer done");
2058
2059 let subresult: SubagentResult =
2060 serde_json::from_value(result.data.context("missing subagent result data")?)
2061 .context("subagent result should deserialize")?;
2062 let nested = subresult
2063 .tool_logs
2064 .iter()
2065 .find(|log| log.name == "subagent_inner")
2066 .context("missing nested subagent tool log")?;
2067 assert!(!nested.success, "nested spawn must be rejected");
2068 assert!(
2069 nested.result.contains("depth limit"),
2070 "nested failure should be the depth-limit error; got: {}",
2071 nested.result
2072 );
2073
2074 Ok(())
2075 }
2076}