1mod factory;
38
39pub use factory::SubagentFactory;
40
41use crate::events::AgentEvent;
42use crate::hooks::{AgentHooks, DefaultHooks};
43use crate::llm::LlmProvider;
44use crate::stores::{EventStore, InMemoryStore, MessageStore, StateStore};
45use crate::tools::{DynamicToolName, Tool, ToolContext, ToolRegistry};
46use crate::types::{AgentConfig, AgentInput, ThreadId, TokenUsage, ToolResult, ToolTier};
47use anyhow::{Context, Result, bail};
48use serde::{Deserialize, Serialize};
49use serde_json::{Value, json};
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53use tokio_util::sync::CancellationToken;
54
55pub const METADATA_SUBAGENT_DEPTH: &str = "subagent_depth";
60
61pub const METADATA_MAX_SUBAGENT_DEPTH: &str = "max_subagent_depth";
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct SubagentConfig {
69 pub name: String,
71 pub nickname: Option<String>,
73 pub system_prompt: String,
75 pub max_turns: Option<usize>,
77 pub timeout_ms: Option<u64>,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub model: Option<String>,
82}
83
84impl SubagentConfig {
85 #[must_use]
87 pub fn new(name: impl Into<String>) -> Self {
88 Self {
89 name: name.into(),
90 nickname: None,
91 system_prompt: String::new(),
92 max_turns: None,
93 timeout_ms: None,
94 model: None,
95 }
96 }
97
98 #[must_use]
100 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
101 self.system_prompt = prompt.into();
102 self
103 }
104
105 #[must_use]
107 pub const fn with_max_turns(mut self, max: usize) -> Self {
108 self.max_turns = Some(max);
109 self
110 }
111
112 #[must_use]
114 pub const fn with_timeout_ms(mut self, timeout: u64) -> Self {
115 self.timeout_ms = Some(timeout);
116 self
117 }
118
119 #[must_use]
121 pub fn with_model(mut self, model: impl Into<String>) -> Self {
122 self.model = Some(model.into());
123 self
124 }
125
126 #[must_use]
128 pub fn with_nickname(mut self, nickname: impl Into<String>) -> Self {
129 self.nickname = Some(nickname.into());
130 self
131 }
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize)]
136pub struct ToolCallLog {
137 pub name: String,
139 pub display_name: String,
141 pub context: String,
143 pub result: String,
145 pub success: bool,
147 pub duration_ms: Option<u64>,
149}
150
151#[derive(Clone, Debug, Serialize, Deserialize)]
153pub struct SubagentResult {
154 pub name: String,
156 pub final_response: String,
158 pub total_turns: usize,
160 pub tool_count: u32,
162 pub tool_logs: Vec<ToolCallLog>,
164 pub usage: TokenUsage,
166 pub success: bool,
168 pub duration_ms: u64,
170 #[serde(default, skip_serializing_if = "Option::is_none")]
175 pub error_details: Option<String>,
176 #[serde(default, skip_serializing_if = "Option::is_none")]
183 pub failed_tool: Option<String>,
184}
185
186pub struct SubagentTool<P, H = DefaultHooks, M = InMemoryStore, S = InMemoryStore>
204where
205 P: LlmProvider,
206 H: AgentHooks,
207 M: MessageStore,
208 S: StateStore,
209{
210 config: SubagentConfig,
211 provider: Arc<P>,
212 tools: Arc<ToolRegistry<()>>,
213 hooks: Arc<H>,
214 message_store_factory: Arc<dyn Fn() -> M + Send + Sync>,
215 state_store_factory: Arc<dyn Fn() -> S + Send + Sync>,
216 event_store_factory: Arc<dyn Fn() -> Arc<dyn EventStore> + Send + Sync>,
217 cached_display_name: &'static str,
219 cached_description: &'static str,
221}
222
223impl<P> SubagentTool<P, DefaultHooks, InMemoryStore, InMemoryStore>
224where
225 P: LlmProvider + 'static,
226{
227 #[must_use]
229 pub fn new<EF>(
230 config: SubagentConfig,
231 provider: Arc<P>,
232 tools: Arc<ToolRegistry<()>>,
233 event_store_factory: EF,
234 ) -> Self
235 where
236 EF: Fn() -> Arc<dyn EventStore> + Send + Sync + 'static,
237 {
238 let cached_display_name = Box::leak(format!("Subagent: {}", config.name).into_boxed_str());
240 let cached_description = Box::leak(
241 format!(
242 "Spawn a subagent named '{}' to handle a task. The subagent will work independently and return only its final response.",
243 config.name
244 )
245 .into_boxed_str(),
246 );
247 Self {
248 config,
249 provider,
250 tools,
251 hooks: Arc::new(DefaultHooks),
252 message_store_factory: Arc::new(InMemoryStore::new),
253 state_store_factory: Arc::new(InMemoryStore::new),
254 event_store_factory: Arc::new(event_store_factory),
255 cached_display_name,
256 cached_description,
257 }
258 }
259}
260
261impl<P, H, M, S> SubagentTool<P, H, M, S>
262where
263 P: LlmProvider + Clone + 'static,
264 H: AgentHooks + Clone + 'static,
265 M: MessageStore + 'static,
266 S: StateStore + 'static,
267{
268 #[must_use]
270 pub fn with_hooks<H2: AgentHooks + Clone + 'static>(
271 self,
272 hooks: Arc<H2>,
273 ) -> SubagentTool<P, H2, M, S> {
274 SubagentTool {
275 config: self.config,
276 provider: self.provider,
277 tools: self.tools,
278 hooks,
279 message_store_factory: self.message_store_factory,
280 state_store_factory: self.state_store_factory,
281 event_store_factory: self.event_store_factory,
282 cached_display_name: self.cached_display_name,
283 cached_description: self.cached_description,
284 }
285 }
286
287 #[must_use]
289 pub fn with_stores<M2, S2, MF, SF>(
290 self,
291 message_factory: MF,
292 state_factory: SF,
293 ) -> SubagentTool<P, H, M2, S2>
294 where
295 M2: MessageStore + 'static,
296 S2: StateStore + 'static,
297 MF: Fn() -> M2 + Send + Sync + 'static,
298 SF: Fn() -> S2 + Send + Sync + 'static,
299 {
300 SubagentTool {
301 config: self.config,
302 provider: self.provider,
303 tools: self.tools,
304 hooks: self.hooks,
305 message_store_factory: Arc::new(message_factory),
306 state_store_factory: Arc::new(state_factory),
307 event_store_factory: self.event_store_factory,
308 cached_display_name: self.cached_display_name,
309 cached_description: self.cached_description,
310 }
311 }
312
313 #[must_use]
315 pub const fn config(&self) -> &SubagentConfig {
316 &self.config
317 }
318
319 async fn run_subagent<Ctx>(
324 &self,
325 task: &str,
326 subagent_id: String,
327 parent_ctx: &ToolContext<Ctx>,
328 parent_cancel: CancellationToken,
329 ) -> Result<SubagentResult>
330 where
331 Ctx: Send + Sync + 'static,
332 {
333 use crate::agent_loop::AgentLoop;
334
335 let start = Instant::now();
336 let thread_id = ThreadId::new();
339
340 let message_store = (self.message_store_factory)();
342 let state_store = (self.state_store_factory)();
343 let event_store = (self.event_store_factory)();
344
345 let agent_config = AgentConfig {
347 max_turns: Some(self.config.max_turns.unwrap_or(100)),
348 system_prompt: self.config.system_prompt.clone(),
349 ..Default::default()
350 };
351
352 let agent = AgentLoop::new(
354 (*self.provider).clone(),
355 (*self.tools).clone(),
356 (*self.hooks).clone(),
357 message_store,
358 state_store,
359 Arc::clone(&event_store),
360 agent_config,
361 );
362
363 let tool_ctx = ToolContext::new(());
365
366 let cancel_token = parent_cancel.child_token();
370 let timeout_cancel = cancel_token.clone();
371 let (state_rx, task_handle) = agent.run_abortable(
372 thread_id.clone(),
373 AgentInput::Text(task.to_string()),
374 tool_ctx,
375 cancel_token,
376 );
377
378 let wait_result = wait_for_subagent_state(self.config.timeout_ms, start, state_rx).await;
379 let mut state = SubagentExecutionState::new();
380 let replay_events = apply_subagent_wait_outcome(
381 classify_subagent_wait_result(wait_result.as_ref()),
382 &self.config,
383 &timeout_cancel,
384 &task_handle,
385 &mut state,
386 );
387
388 if replay_events {
389 replay_subagent_events(
390 &event_store,
391 &thread_id,
392 parent_ctx,
393 &self.config,
394 &subagent_id,
395 &mut state,
396 )
397 .await?;
398 }
399
400 let result = state.into_result(self.config.name.clone(), start);
401 emit_subagent_observability(self, &result);
402 Ok(result)
403 }
404}
405
406fn mark_subagent_timeout(
407 config: &SubagentConfig,
408 final_response: &mut String,
409 error_details: &mut Option<String>,
410 success: &mut bool,
411) {
412 *final_response = "Subagent timed out".to_string();
413 *error_details = Some(format!(
414 "Subagent '{}' timed out after {}ms",
415 config.name,
416 config.timeout_ms.unwrap_or(0)
417 ));
418 *success = false;
419}
420
421fn mark_subagent_disconnected(
422 config: &SubagentConfig,
423 final_response: &mut String,
424 error_details: &mut Option<String>,
425 success: &mut bool,
426) {
427 *final_response = "Subagent ended unexpectedly".to_string();
428 *error_details = Some(format!(
429 "Subagent '{}' ended before returning a final state",
430 config.name
431 ));
432 *success = false;
433}
434
435fn mark_subagent_cancelled(
436 config: &SubagentConfig,
437 final_response: &mut String,
438 error_details: &mut Option<String>,
439 success: &mut bool,
440) {
441 *final_response = "Subagent cancelled".to_string();
442 *error_details = Some(format!("Subagent '{}' was cancelled", config.name));
443 *success = false;
444}
445
446fn mark_subagent_awaiting_confirmation(
447 config: &SubagentConfig,
448 final_response: &mut String,
449 error_details: &mut Option<String>,
450 success: &mut bool,
451) {
452 *final_response = "Subagent requires confirmation".to_string();
453 *error_details = Some(format!(
454 "Subagent '{}' requested confirmation, which is not supported in nested runs",
455 config.name
456 ));
457 *success = false;
458}
459
460fn mark_subagent_agent_error(
461 final_response: &mut String,
462 error_details: &mut Option<String>,
463 success: &mut bool,
464 message: &str,
465) {
466 *final_response = message.to_string();
467 *error_details = Some(message.to_string());
468 *success = false;
469}
470
471type SubagentWaitResult = Result<
472 Result<crate::types::AgentRunState, tokio::sync::oneshot::error::RecvError>,
473 tokio::time::error::Elapsed,
474>;
475
476struct SubagentExecutionState {
477 final_response: String,
478 total_turns: usize,
479 tool_count: u32,
480 tool_logs: Vec<ToolCallLog>,
481 pending_tools: HashMap<String, (String, String)>,
482 total_usage: TokenUsage,
483 success: bool,
484 error_details: Option<String>,
485 failed_tool: Option<String>,
486}
487
488impl SubagentExecutionState {
489 fn new() -> Self {
490 Self {
491 final_response: String::new(),
492 total_turns: 0,
493 tool_count: 0,
494 tool_logs: Vec::new(),
495 pending_tools: HashMap::new(),
496 total_usage: TokenUsage::default(),
497 success: true,
498 error_details: None,
499 failed_tool: None,
500 }
501 }
502
503 fn into_result(self, name: String, start: Instant) -> SubagentResult {
504 SubagentResult {
505 name,
506 final_response: self.final_response,
507 total_turns: self.total_turns,
508 tool_count: self.tool_count,
509 tool_logs: self.tool_logs,
510 usage: self.total_usage,
511 success: self.success,
512 duration_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
513 error_details: self.error_details,
514 failed_tool: self.failed_tool,
515 }
516 }
517}
518
519fn subagent_total_tokens(total_usage: &TokenUsage) -> u64 {
520 u64::from(total_usage.input_tokens) + u64::from(total_usage.output_tokens)
521}
522
523struct SubagentProgressUpdate<'a> {
524 subagent_id: &'a str,
525 total_turns: usize,
526 total_usage: &'a TokenUsage,
527 tool_name: String,
528 tool_context: String,
529 completed: bool,
530 success: bool,
531 tool_count: u32,
532}
533
534enum SubagentWaitOutcome {
535 ReplayEvents,
536 TimedOut,
537 Disconnected,
538 Cancelled,
539 AwaitingConfirmation,
540 Error(crate::types::AgentError),
541}
542
543async fn wait_for_subagent_state(
544 timeout_ms: Option<u64>,
545 start: Instant,
546 state_rx: tokio::sync::oneshot::Receiver<crate::types::AgentRunState>,
547) -> Option<SubagentWaitResult> {
548 let timeout_duration = timeout_ms.map(Duration::from_millis);
549 if timeout_duration.is_some_and(|timeout| timeout.saturating_sub(start.elapsed()).is_zero()) {
550 return None;
551 }
552 if let Some(timeout) = timeout_duration {
553 let remaining = timeout.saturating_sub(start.elapsed());
554 Some(tokio::time::timeout(remaining, state_rx).await)
555 } else {
556 Some(Ok(state_rx.await))
557 }
558}
559
560fn classify_subagent_wait_result(wait_result: Option<&SubagentWaitResult>) -> SubagentWaitOutcome {
561 match wait_result {
562 Some(Ok(Ok(
563 crate::types::AgentRunState::Done { .. } | crate::types::AgentRunState::Refusal { .. },
564 ))) => SubagentWaitOutcome::ReplayEvents,
565 Some(Ok(Ok(crate::types::AgentRunState::Cancelled { .. }))) => {
566 SubagentWaitOutcome::Cancelled
567 }
568 Some(Ok(Ok(crate::types::AgentRunState::AwaitingConfirmation { .. }))) => {
569 SubagentWaitOutcome::AwaitingConfirmation
570 }
571 Some(Ok(Ok(crate::types::AgentRunState::Error(error)))) => {
572 SubagentWaitOutcome::Error(error.clone())
573 }
574 Some(Ok(Ok(_))) => SubagentWaitOutcome::Error(crate::types::AgentError::new(
578 "subagent returned an unrecognized run state".to_string(),
579 false,
580 )),
581 Some(Ok(Err(_))) => SubagentWaitOutcome::Disconnected,
582 None | Some(Err(_)) => SubagentWaitOutcome::TimedOut,
583 }
584}
585
586fn apply_subagent_wait_outcome(
587 outcome: SubagentWaitOutcome,
588 config: &SubagentConfig,
589 timeout_cancel: &CancellationToken,
590 task_handle: &tokio::task::JoinHandle<()>,
591 state: &mut SubagentExecutionState,
592) -> bool {
593 match outcome {
594 SubagentWaitOutcome::ReplayEvents => true,
595 SubagentWaitOutcome::TimedOut => {
596 timeout_cancel.cancel();
597 task_handle.abort();
598 mark_subagent_timeout(
599 config,
600 &mut state.final_response,
601 &mut state.error_details,
602 &mut state.success,
603 );
604 false
605 }
606 SubagentWaitOutcome::Disconnected => {
607 timeout_cancel.cancel();
608 task_handle.abort();
609 mark_subagent_disconnected(
610 config,
611 &mut state.final_response,
612 &mut state.error_details,
613 &mut state.success,
614 );
615 false
616 }
617 SubagentWaitOutcome::Cancelled => {
618 timeout_cancel.cancel();
619 task_handle.abort();
620 mark_subagent_cancelled(
621 config,
622 &mut state.final_response,
623 &mut state.error_details,
624 &mut state.success,
625 );
626 false
627 }
628 SubagentWaitOutcome::AwaitingConfirmation => {
629 timeout_cancel.cancel();
630 task_handle.abort();
631 mark_subagent_awaiting_confirmation(
632 config,
633 &mut state.final_response,
634 &mut state.error_details,
635 &mut state.success,
636 );
637 false
638 }
639 SubagentWaitOutcome::Error(error) => {
640 timeout_cancel.cancel();
641 task_handle.abort();
642 mark_subagent_agent_error(
643 &mut state.final_response,
644 &mut state.error_details,
645 &mut state.success,
646 &error.message,
647 );
648 false
649 }
650 }
651}
652
653async fn replay_subagent_events<Ctx: Send + Sync + 'static>(
654 event_store: &Arc<dyn EventStore>,
655 thread_id: &ThreadId,
656 parent_ctx: &ToolContext<Ctx>,
657 config: &SubagentConfig,
658 subagent_id: &str,
659 state: &mut SubagentExecutionState,
660) -> Result<()> {
661 for envelope in event_store.get_events(thread_id).await? {
662 match envelope.event {
663 AgentEvent::Text {
664 message_id: _,
665 text,
666 } => {
667 state.final_response.push_str(&text);
668 }
669 AgentEvent::ToolCallStart {
670 id, name, input, ..
671 } => {
672 state.tool_count += 1;
673 let context = extract_tool_context(&name, &input);
674 state
675 .pending_tools
676 .insert(id, (name.clone(), context.clone()));
677
678 emit_subagent_progress_if_possible(
679 parent_ctx,
680 config,
681 SubagentProgressUpdate {
682 subagent_id,
683 total_turns: state.total_turns,
684 total_usage: &state.total_usage,
685 tool_name: name,
686 tool_context: context,
687 completed: false,
688 success: false,
689 tool_count: state.tool_count,
690 },
691 )
692 .await;
693 }
694 AgentEvent::ToolCallEnd {
695 id,
696 name,
697 display_name,
698 result,
699 } => {
700 let context = state
701 .pending_tools
702 .remove(&id)
703 .map(|(_, ctx)| ctx)
704 .unwrap_or_default();
705 let tool_success = result.success;
706 state.tool_logs.push(ToolCallLog {
707 name: name.clone(),
708 display_name: display_name.clone(),
709 context: context.clone(),
710 result: summarize_tool_result(&name, &result),
711 success: tool_success,
712 duration_ms: result.duration_ms,
713 });
714
715 emit_subagent_progress_if_possible(
716 parent_ctx,
717 config,
718 SubagentProgressUpdate {
719 subagent_id,
720 total_turns: state.total_turns,
721 total_usage: &state.total_usage,
722 tool_name: name,
723 tool_context: context,
724 completed: true,
725 success: tool_success,
726 tool_count: state.tool_count,
727 },
728 )
729 .await;
730 }
731 AgentEvent::TurnComplete { turn, usage, .. } => {
732 state.total_turns = turn;
733 state.total_usage.add(&usage);
734 }
735 AgentEvent::Done {
736 total_turns: turns, ..
737 } => {
738 state.total_turns = turns;
739 break;
740 }
741 AgentEvent::Refusal { text, .. } => {
742 let refusal_message =
743 text.unwrap_or_else(|| "Subagent refused the request".to_string());
744 state.error_details = Some(refusal_message.clone());
745 state.final_response = refusal_message;
746 state.success = false;
747 break;
748 }
749 AgentEvent::Error { message, .. } => {
750 state.error_details = Some(message.clone());
751 state.final_response = message;
752 state.success = false;
753 break;
754 }
755 _ => {}
756 }
757 }
758 Ok(())
759}
760
761async fn emit_subagent_progress_if_possible<Ctx: Send + Sync + 'static>(
762 parent_ctx: &ToolContext<Ctx>,
763 config: &SubagentConfig,
764 update: SubagentProgressUpdate<'_>,
765) {
766 if let Err(error) = emit_subagent_progress(parent_ctx, config, update).await {
767 log::warn!("Failed to emit subagent progress event: {error}");
768 }
769}
770
771async fn emit_subagent_progress<Ctx: Send + Sync + 'static>(
772 parent_ctx: &ToolContext<Ctx>,
773 config: &SubagentConfig,
774 SubagentProgressUpdate {
775 subagent_id,
776 total_turns,
777 total_usage,
778 tool_name,
779 tool_context,
780 completed,
781 success,
782 tool_count,
783 }: SubagentProgressUpdate<'_>,
784) -> Result<()> {
785 let max_turns = config.max_turns.map(usize_to_u32_saturating);
786 let current_turn = Some(usize_to_u32_saturating(total_turns));
787
788 parent_ctx
789 .emit_event(AgentEvent::SubagentProgress {
790 subagent_id: subagent_id.to_string(),
791 subagent_name: config.name.clone(),
792 nickname: config.nickname.clone(),
793 child_thread_id: None,
794 child_root_task_id: None,
795 subagent_task_id: None,
796 max_turns,
797 current_turn,
798 model: config.model.clone(),
799 tool_name,
800 tool_context,
801 completed,
802 success,
803 tool_count,
804 total_tokens: subagent_total_tokens(total_usage),
805 })
806 .await
807}
808
809fn usize_to_u32_saturating(value: usize) -> u32 {
810 u32::try_from(value).unwrap_or(u32::MAX)
811}
812
813#[cfg(feature = "otel")]
814fn emit_subagent_observability<P, H, M, S>(tool: &SubagentTool<P, H, M, S>, result: &SubagentResult)
815where
816 P: LlmProvider + Clone + 'static,
817 H: AgentHooks + Clone + 'static,
818 M: MessageStore + 'static,
819 S: StateStore + 'static,
820{
821 use crate::observability::{attrs, baggage, langfuse, metrics, provider_name, spans};
822 use opentelemetry::Context;
823 use opentelemetry::KeyValue;
824 use opentelemetry::trace::{Span, TraceContextExt};
825
826 let parent_ctx = Context::current();
832 let parent_span_context = parent_ctx.span().span_context().clone();
833
834 let normalized_provider_name = provider_name::normalize(tool.provider.provider());
835 let request_model = tool.provider.model().to_string();
836 let agent_name = tool.config.name.clone();
837
838 let mut span = spans::start_internal_span(
839 "invoke_agent",
840 vec![
841 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
842 KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name.clone()),
843 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, normalized_provider_name),
844 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.clone()),
845 KeyValue::new(attrs::SDK_RUN_MODE, "loop"),
846 ],
847 );
848 baggage::copy_baggage_to_active_span(&mut span);
849 langfuse::tag_observation(&mut span, langfuse::ObservationType::Agent);
850 if parent_span_context.is_valid() {
851 spans::link_to_parent_turn(
852 &mut span,
853 &parent_span_context.trace_id().to_string(),
854 &parent_span_context.span_id().to_string(),
855 );
856 }
857 let outcome = if result.success { "done" } else { "error" };
858 span.set_attribute(KeyValue::new(attrs::SDK_OUTCOME, outcome));
859 span.set_attribute(attrs::kv_i64(
860 attrs::SDK_TOTAL_TURNS,
861 i64::try_from(result.total_turns).unwrap_or(0),
862 ));
863 span.set_attribute(attrs::kv_i64(
864 attrs::GEN_AI_USAGE_INPUT_TOKENS,
865 i64::from(result.usage.input_tokens),
866 ));
867 span.set_attribute(attrs::kv_i64(
868 attrs::GEN_AI_USAGE_OUTPUT_TOKENS,
869 i64::from(result.usage.output_tokens),
870 ));
871 if outcome == "error" {
872 spans::set_span_error(&mut span, "agent_error", "subagent invocation failed");
873 }
874 span.end();
875
876 let metrics_handle = metrics::Metrics::global();
885 metrics_handle.subagent_invocations.add(
886 1,
887 &[
888 KeyValue::new(attrs::GEN_AI_AGENT_NAME, agent_name),
889 KeyValue::new(attrs::SDK_OUTCOME, outcome),
890 ],
891 );
892 record_subagent_token_usage(
893 &metrics_handle,
894 result,
895 normalized_provider_name,
896 &request_model,
897 );
898}
899
900#[cfg(feature = "otel")]
901fn record_subagent_token_usage(
902 metrics: &crate::observability::metrics::Metrics,
903 result: &SubagentResult,
904 provider_name: &'static str,
905 request_model: &str,
906) {
907 use crate::observability::attrs;
908 use opentelemetry::KeyValue;
909
910 let entries: [(u32, &'static str); 2] = [
911 (result.usage.input_tokens, "input"),
912 (result.usage.output_tokens, "output"),
913 ];
914
915 for (count, token_type) in entries {
916 if count == 0 {
917 continue;
918 }
919 metrics.token_usage.record(
920 u64::from(count),
921 &[
922 KeyValue::new(attrs::GEN_AI_OPERATION_NAME, "invoke_agent"),
923 KeyValue::new(attrs::GEN_AI_PROVIDER_NAME, provider_name),
924 KeyValue::new("gen_ai.token.type", token_type),
925 KeyValue::new(attrs::GEN_AI_REQUEST_MODEL, request_model.to_string()),
926 ],
927 );
928 }
929}
930
931#[cfg(not(feature = "otel"))]
932const fn emit_subagent_observability<P, H, M, S>(
933 _tool: &SubagentTool<P, H, M, S>,
934 _result: &SubagentResult,
935) where
936 P: LlmProvider + Clone + 'static,
937 H: AgentHooks + Clone + 'static,
938 M: MessageStore + 'static,
939 S: StateStore + 'static,
940{
941}
942
943fn extract_tool_context(name: &str, input: &Value) -> String {
945 match name {
946 "read" => input
947 .get("file_path")
948 .or_else(|| input.get("path"))
949 .and_then(Value::as_str)
950 .unwrap_or("")
951 .to_string(),
952 "write" | "edit" => input
953 .get("file_path")
954 .or_else(|| input.get("path"))
955 .and_then(Value::as_str)
956 .unwrap_or("")
957 .to_string(),
958 "bash" => {
959 let cmd = input.get("command").and_then(Value::as_str).unwrap_or("");
960 if cmd.len() > 60 {
962 format!("{}...", crate::primitive_tools::truncate_str(cmd, 57))
963 } else {
964 cmd.to_string()
965 }
966 }
967 "glob" | "grep" => input
968 .get("pattern")
969 .and_then(Value::as_str)
970 .unwrap_or("")
971 .to_string(),
972 "web_search" => input
973 .get("query")
974 .and_then(Value::as_str)
975 .unwrap_or("")
976 .to_string(),
977 _ => String::new(),
978 }
979}
980
981fn summarize_tool_result(name: &str, result: &ToolResult) -> String {
983 if !result.success {
984 let first_line = result.output.lines().next().unwrap_or("Error");
985 return if first_line.len() > 50 {
986 format!(
987 "{}...",
988 crate::primitive_tools::truncate_str(first_line, 47)
989 )
990 } else {
991 first_line.to_string()
992 };
993 }
994
995 match name {
996 "read" => {
997 let line_count = result.output.lines().count();
998 format!("{line_count} lines")
999 }
1000 "write" => "wrote file".to_string(),
1001 "edit" => "edited".to_string(),
1002 "bash" => {
1003 let lines: Vec<&str> = result.output.lines().collect();
1004 if lines.is_empty() {
1005 "done".to_string()
1006 } else if lines.len() == 1 {
1007 let line = lines[0];
1008 if line.len() > 50 {
1009 format!("{}...", crate::primitive_tools::truncate_str(line, 47))
1010 } else {
1011 line.to_string()
1012 }
1013 } else {
1014 format!("{} lines", lines.len())
1015 }
1016 }
1017 "glob" => {
1018 let count = result.output.lines().count();
1019 format!("{count} files")
1020 }
1021 "grep" => {
1022 let count = result.output.lines().count();
1023 format!("{count} matches")
1024 }
1025 _ => {
1026 let line_count = result.output.lines().count();
1027 if line_count == 0 {
1028 "done".to_string()
1029 } else {
1030 format!("{line_count} lines")
1031 }
1032 }
1033 }
1034}
1035
1036impl<P, H, M, S, Ctx> Tool<Ctx> for SubagentTool<P, H, M, S>
1037where
1038 P: LlmProvider + Clone + 'static,
1039 H: AgentHooks + Clone + 'static,
1040 M: MessageStore + 'static,
1041 S: StateStore + 'static,
1042 Ctx: Send + Sync + 'static,
1043{
1044 type Name = DynamicToolName;
1045
1046 fn name(&self) -> DynamicToolName {
1047 DynamicToolName::new(format!("subagent_{}", self.config.name))
1048 }
1049
1050 fn display_name(&self) -> &'static str {
1051 self.cached_display_name
1052 }
1053
1054 fn description(&self) -> &'static str {
1055 self.cached_description
1056 }
1057
1058 fn input_schema(&self) -> Value {
1059 json!({
1060 "type": "object",
1061 "properties": {
1062 "task": {
1063 "type": "string",
1064 "description": "The task or question for the subagent to handle"
1065 }
1066 },
1067 "required": ["task"]
1068 })
1069 }
1070
1071 fn tier(&self) -> ToolTier {
1072 ToolTier::Confirm
1074 }
1075
1076 async fn execute(&self, ctx: &ToolContext<Ctx>, input: Value) -> Result<ToolResult> {
1077 let task = input
1078 .get("task")
1079 .and_then(Value::as_str)
1080 .context("Missing 'task' parameter")?;
1081
1082 let current_depth = ctx
1084 .metadata
1085 .get(METADATA_SUBAGENT_DEPTH)
1086 .and_then(Value::as_u64)
1087 .unwrap_or(0);
1088 let max_depth = ctx
1089 .metadata
1090 .get(METADATA_MAX_SUBAGENT_DEPTH)
1091 .and_then(Value::as_u64)
1092 .unwrap_or(3); if current_depth >= max_depth {
1095 bail!(
1096 "Subagent depth limit exceeded ({current_depth}/{max_depth}). \
1097 Cannot spawn nested subagent '{}' — maximum nesting depth reached.",
1098 self.config.name
1099 );
1100 }
1101
1102 let _permit = if let Some(ref sem) = ctx.subagent_semaphore() {
1104 match sem.clone().try_acquire_owned() {
1105 Ok(permit) => Some(permit),
1106 Err(_) => {
1107 return Ok(ToolResult {
1108 success: false,
1109 output: format!(
1110 "Cannot spawn subagent '{}': maximum concurrent subagent limit reached. \
1111 Try again when another subagent completes.",
1112 self.config.name
1113 ),
1114 data: None,
1115 documents: Vec::new(),
1116 duration_ms: Some(0),
1117 });
1118 }
1119 }
1120 } else {
1121 None
1122 };
1123
1124 let subagent_id = format!(
1126 "{}_{:x}",
1127 self.config.name,
1128 std::time::SystemTime::now()
1129 .duration_since(std::time::UNIX_EPOCH)
1130 .unwrap_or_default()
1131 .as_nanos()
1132 );
1133
1134 let cancel_token = ctx.cancel_token().unwrap_or_default();
1137
1138 let result = self
1139 .run_subagent(task, subagent_id, ctx, cancel_token)
1140 .await?;
1141
1142 Ok(ToolResult {
1143 success: result.success,
1144 output: result.final_response.clone(),
1145 data: Some(serde_json::to_value(&result).unwrap_or_default()),
1146 documents: Vec::new(),
1147 duration_ms: Some(result.duration_ms),
1148 })
1149 }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155 use crate::authority::{EventAuthority, LocalEventAuthority};
1156 use crate::events::{AgentEvent, AgentEventEnvelope};
1157 use crate::llm::{ChatOutcome, ChatRequest, ChatResponse, ContentBlock, StopReason, Usage};
1158 use crate::stores::{EventStore, InMemoryEventStore, StoredTurnEvents};
1159 use anyhow::{Context, Result, bail};
1160 use async_trait::async_trait;
1161 use tokio::sync::Mutex;
1162
1163 #[derive(Clone)]
1164 struct TestProvider {
1165 responses: Arc<Mutex<Vec<ChatOutcome>>>,
1166 delay: Option<Duration>,
1167 }
1168
1169 impl TestProvider {
1170 fn new(responses: Vec<ChatOutcome>) -> Self {
1171 Self {
1172 responses: Arc::new(Mutex::new(responses)),
1173 delay: None,
1174 }
1175 }
1176
1177 fn with_delay(mut self, delay: Duration) -> Self {
1178 self.delay = Some(delay);
1179 self
1180 }
1181
1182 fn text_response(text: &str) -> ChatOutcome {
1183 ChatOutcome::Success(ChatResponse {
1184 id: "resp_text".to_string(),
1185 content: vec![ContentBlock::Text {
1186 text: text.to_string(),
1187 }],
1188 model: "test-model".to_string(),
1189 stop_reason: Some(StopReason::EndTurn),
1190 usage: Usage {
1191 input_tokens: 10,
1192 output_tokens: 20,
1193 cached_input_tokens: 0,
1194 cache_creation_input_tokens: 0,
1195 },
1196 })
1197 }
1198
1199 fn tool_use_response(tool_id: &str, tool_name: &str, input: Value) -> ChatOutcome {
1200 ChatOutcome::Success(ChatResponse {
1201 id: "resp_tool".to_string(),
1202 content: vec![ContentBlock::ToolUse {
1203 id: tool_id.to_string(),
1204 name: tool_name.to_string(),
1205 input,
1206 thought_signature: None,
1207 }],
1208 model: "test-model".to_string(),
1209 stop_reason: Some(StopReason::ToolUse),
1210 usage: Usage {
1211 input_tokens: 15,
1212 output_tokens: 25,
1213 cached_input_tokens: 0,
1214 cache_creation_input_tokens: 0,
1215 },
1216 })
1217 }
1218
1219 fn refusal_response(text: Option<&str>) -> ChatOutcome {
1220 let content = text.map_or_else(Vec::new, |text| {
1221 vec![ContentBlock::Text {
1222 text: text.to_string(),
1223 }]
1224 });
1225 ChatOutcome::Success(ChatResponse {
1226 id: "resp_refusal".to_string(),
1227 content,
1228 model: "test-model".to_string(),
1229 stop_reason: Some(StopReason::Refusal),
1230 usage: Usage {
1231 input_tokens: 12,
1232 output_tokens: 0,
1233 cached_input_tokens: 0,
1234 cache_creation_input_tokens: 0,
1235 },
1236 })
1237 }
1238 }
1239
1240 #[async_trait]
1241 impl LlmProvider for TestProvider {
1242 async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1243 if let Some(delay) = self.delay {
1244 tokio::time::sleep(delay).await;
1245 }
1246
1247 let mut responses = self.responses.lock().await;
1248 if responses.is_empty() {
1249 Ok(Self::text_response("default"))
1250 } else {
1251 Ok(responses.remove(0))
1252 }
1253 }
1254
1255 fn model(&self) -> &'static str {
1256 "test-model"
1257 }
1258
1259 fn provider(&self) -> &'static str {
1260 "mock"
1261 }
1262 }
1263
1264 struct TestEchoTool;
1265
1266 impl Tool<()> for TestEchoTool {
1267 type Name = DynamicToolName;
1268
1269 fn name(&self) -> DynamicToolName {
1270 DynamicToolName::new("echo")
1271 }
1272
1273 fn display_name(&self) -> &'static str {
1274 "Echo"
1275 }
1276
1277 fn description(&self) -> &'static str {
1278 "Echo the input"
1279 }
1280
1281 fn input_schema(&self) -> Value {
1282 json!({
1283 "type": "object",
1284 "properties": {
1285 "message": { "type": "string" }
1286 },
1287 "required": ["message"]
1288 })
1289 }
1290
1291 fn tier(&self) -> ToolTier {
1292 ToolTier::Observe
1293 }
1294
1295 async fn execute(&self, _ctx: &ToolContext<()>, input: Value) -> Result<ToolResult> {
1296 let message = input
1297 .get("message")
1298 .and_then(Value::as_str)
1299 .context("missing echo message")?;
1300 Ok(ToolResult::success(format!("Echo: {message}")))
1301 }
1302 }
1303
1304 #[derive(Clone, Default)]
1305 struct RecordingEventStore {
1306 inner: Arc<InMemoryEventStore>,
1307 appended: Arc<Mutex<Vec<(ThreadId, usize, AgentEventEnvelope)>>>,
1308 }
1309
1310 impl RecordingEventStore {
1311 async fn appended_events(&self) -> Vec<(ThreadId, usize, AgentEventEnvelope)> {
1312 self.appended.lock().await.clone()
1313 }
1314 }
1315
1316 #[async_trait]
1317 impl EventStore for RecordingEventStore {
1318 async fn append(
1319 &self,
1320 thread_id: &ThreadId,
1321 turn: usize,
1322 envelope: AgentEventEnvelope,
1323 ) -> Result<()> {
1324 self.appended
1325 .lock()
1326 .await
1327 .push((thread_id.clone(), turn, envelope.clone()));
1328 self.inner.append(thread_id, turn, envelope).await
1329 }
1330
1331 async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1332 self.inner.finish_turn(thread_id, turn).await
1333 }
1334
1335 async fn get_turn(
1336 &self,
1337 thread_id: &ThreadId,
1338 turn: usize,
1339 ) -> Result<Option<StoredTurnEvents>> {
1340 self.inner.get_turn(thread_id, turn).await
1341 }
1342
1343 async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1344 self.inner.get_turns(thread_id).await
1345 }
1346
1347 async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1348 self.inner.clear(thread_id).await
1349 }
1350 }
1351
1352 #[derive(Clone, Default)]
1353 struct AlwaysFailAppendEventStore;
1354
1355 #[async_trait]
1356 impl EventStore for AlwaysFailAppendEventStore {
1357 async fn append(
1358 &self,
1359 _thread_id: &ThreadId,
1360 _turn: usize,
1361 _envelope: AgentEventEnvelope,
1362 ) -> Result<()> {
1363 bail!("append failed")
1364 }
1365
1366 async fn finish_turn(&self, _thread_id: &ThreadId, _turn: usize) -> Result<()> {
1367 Ok(())
1368 }
1369
1370 async fn get_turn(
1371 &self,
1372 _thread_id: &ThreadId,
1373 _turn: usize,
1374 ) -> Result<Option<StoredTurnEvents>> {
1375 Ok(None)
1376 }
1377
1378 async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1379 Ok(Vec::new())
1380 }
1381
1382 async fn clear(&self, _thread_id: &ThreadId) -> Result<()> {
1383 Ok(())
1384 }
1385 }
1386
1387 #[derive(Clone, Default)]
1388 struct NoReadAfterFailureEventStore {
1389 inner: Arc<InMemoryEventStore>,
1390 }
1391
1392 #[async_trait]
1393 impl EventStore for NoReadAfterFailureEventStore {
1394 async fn append(
1395 &self,
1396 thread_id: &ThreadId,
1397 turn: usize,
1398 envelope: AgentEventEnvelope,
1399 ) -> Result<()> {
1400 self.inner.append(thread_id, turn, envelope).await
1401 }
1402
1403 async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
1404 self.inner.finish_turn(thread_id, turn).await
1405 }
1406
1407 async fn get_turn(
1408 &self,
1409 thread_id: &ThreadId,
1410 turn: usize,
1411 ) -> Result<Option<StoredTurnEvents>> {
1412 self.inner.get_turn(thread_id, turn).await
1413 }
1414
1415 async fn get_turns(&self, _thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
1416 bail!("get_events should not be called after subagent failure")
1417 }
1418
1419 async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
1420 self.inner.clear(thread_id).await
1421 }
1422 }
1423
1424 #[derive(Clone, Default)]
1425 struct PanicProvider;
1426
1427 #[async_trait]
1428 impl LlmProvider for PanicProvider {
1429 async fn chat(&self, _request: ChatRequest) -> Result<ChatOutcome> {
1430 panic!("panic provider should disconnect subagent");
1436 }
1437
1438 fn model(&self) -> &'static str {
1439 "panic-model"
1440 }
1441
1442 fn provider(&self) -> &'static str {
1443 "panic"
1444 }
1445 }
1446
1447 #[test]
1448 fn test_subagent_config_builder() {
1449 let config = SubagentConfig::new("test")
1450 .with_system_prompt("Test prompt")
1451 .with_max_turns(5)
1452 .with_timeout_ms(30000);
1453
1454 assert_eq!(config.name, "test");
1455 assert_eq!(config.system_prompt, "Test prompt");
1456 assert_eq!(config.max_turns, Some(5));
1457 assert_eq!(config.timeout_ms, Some(30000));
1458 }
1459
1460 #[test]
1461 fn test_subagent_config_defaults() {
1462 let config = SubagentConfig::new("default");
1463
1464 assert_eq!(config.name, "default");
1465 assert!(config.system_prompt.is_empty());
1466 assert_eq!(config.max_turns, None);
1467 assert_eq!(config.timeout_ms, None);
1468 }
1469
1470 #[test]
1471 fn test_subagent_result_serialization() -> Result<()> {
1472 let result = SubagentResult {
1473 name: "test".to_string(),
1474 final_response: "Done".to_string(),
1475 total_turns: 3,
1476 tool_count: 5,
1477 tool_logs: vec![
1478 ToolCallLog {
1479 name: "read".to_string(),
1480 display_name: "Read file".to_string(),
1481 context: "/tmp/test.rs".to_string(),
1482 result: "50 lines".to_string(),
1483 success: true,
1484 duration_ms: Some(10),
1485 },
1486 ToolCallLog {
1487 name: "grep".to_string(),
1488 display_name: "Grep TODO".to_string(),
1489 context: "TODO".to_string(),
1490 result: "3 matches".to_string(),
1491 success: true,
1492 duration_ms: Some(5),
1493 },
1494 ],
1495 usage: TokenUsage::default(),
1496 success: true,
1497 duration_ms: 1000,
1498 error_details: None,
1499 failed_tool: None,
1500 };
1501
1502 let json = serde_json::to_string(&result).context("failed to serialize subagent result")?;
1503 assert!(json.contains("test"));
1504 assert!(json.contains("Done"));
1505 assert!(json.contains("tool_count"));
1506 assert!(json.contains("tool_logs"));
1507 assert!(json.contains("/tmp/test.rs"));
1508
1509 Ok(())
1510 }
1511
1512 #[test]
1513 fn test_subagent_result_field_extraction() -> Result<()> {
1514 let result = SubagentResult {
1515 name: "explore".to_string(),
1516 final_response: "Found 3 config files".to_string(),
1517 total_turns: 2,
1518 tool_count: 5,
1519 tool_logs: vec![ToolCallLog {
1520 name: "glob".to_string(),
1521 display_name: "Glob config files".to_string(),
1522 context: "**/*.toml".to_string(),
1523 result: "3 files".to_string(),
1524 success: true,
1525 duration_ms: Some(15),
1526 }],
1527 usage: TokenUsage {
1528 input_tokens: 1500,
1529 output_tokens: 500,
1530 ..Default::default()
1531 },
1532 success: true,
1533 duration_ms: 2500,
1534 error_details: None,
1535 failed_tool: None,
1536 };
1537
1538 let value =
1539 serde_json::to_value(&result).context("failed to convert subagent result to json")?;
1540
1541 let tool_count = value.get("tool_count").and_then(Value::as_u64);
1542 assert_eq!(tool_count, Some(5));
1543
1544 let usage = value.get("usage").context("missing usage field")?;
1545 let input_tokens = usage.get("input_tokens").and_then(Value::as_u64);
1546 let output_tokens = usage.get("output_tokens").and_then(Value::as_u64);
1547 assert_eq!(input_tokens, Some(1500));
1548 assert_eq!(output_tokens, Some(500));
1549
1550 let logs = value
1551 .get("tool_logs")
1552 .and_then(Value::as_array)
1553 .context("missing tool_logs array")?;
1554 assert_eq!(logs.len(), 1);
1555
1556 let first_log = &logs[0];
1557 assert_eq!(first_log.get("name").and_then(Value::as_str), Some("glob"));
1558 assert_eq!(
1559 first_log.get("context").and_then(Value::as_str),
1560 Some("**/*.toml")
1561 );
1562 assert_eq!(
1563 first_log.get("result").and_then(Value::as_str),
1564 Some("3 files")
1565 );
1566 assert_eq!(
1567 first_log.get("success").and_then(Value::as_bool),
1568 Some(true)
1569 );
1570
1571 Ok(())
1572 }
1573
1574 #[tokio::test]
1575 async fn test_run_subagent_uses_isolated_child_thread() -> Result<()> {
1576 let event_store = Arc::new(RecordingEventStore::default());
1577 let provider = Arc::new(TestProvider::new(vec![
1578 TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1579 TestProvider::text_response("Subagent complete"),
1580 ]));
1581 let mut tools = ToolRegistry::new();
1582 tools.register(TestEchoTool);
1583
1584 let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1585 let store = Arc::clone(&event_store);
1586 move || -> Arc<dyn EventStore> { store.clone() }
1587 });
1588 let parent_thread = ThreadId::new();
1589 let parent_ctx = ToolContext::new(()).with_event_store(
1590 event_store.clone(),
1591 parent_thread.clone(),
1592 1,
1593 Arc::new(LocalEventAuthority::new()),
1594 );
1595
1596 let result = tool
1597 .run_subagent(
1598 "Inspect the repo",
1599 "subagent_1".to_string(),
1600 &parent_ctx,
1601 CancellationToken::new(),
1602 )
1603 .await?;
1604
1605 assert!(result.success);
1606 assert_eq!(result.tool_count, 1);
1607 assert_eq!(result.tool_logs.len(), 1);
1608
1609 let parent_turn = event_store
1610 .get_turn(&parent_thread, 1)
1611 .await?
1612 .context("missing parent turn")?;
1613 assert!(!parent_turn.events.is_empty());
1614 assert!(
1615 parent_turn
1616 .events
1617 .iter()
1618 .all(|envelope| { matches!(envelope.event, AgentEvent::SubagentProgress { .. }) })
1619 );
1620
1621 let appended = event_store.appended_events().await;
1622 let child_thread = appended
1623 .iter()
1624 .map(|(thread_id, _, _)| thread_id.clone())
1625 .find(|thread_id| thread_id != &parent_thread)
1626 .context("missing child thread events")?;
1627 let child_turn = event_store
1628 .get_turn(&child_thread, 1)
1629 .await?
1630 .context("missing child turn")?;
1631 let child_events = event_store.get_events(&child_thread).await?;
1632
1633 assert!(
1634 child_turn
1635 .events
1636 .iter()
1637 .any(|envelope| { matches!(envelope.event, AgentEvent::ToolCallStart { .. }) })
1638 );
1639 assert!(
1640 child_events
1641 .iter()
1642 .any(|envelope| { matches!(envelope.event, AgentEvent::Done { .. }) })
1643 );
1644
1645 Ok(())
1646 }
1647
1648 #[tokio::test]
1649 async fn test_run_subagent_timeout_marks_result_as_failed() -> Result<()> {
1650 let event_store = Arc::new(NoReadAfterFailureEventStore::default());
1651 let provider = Arc::new(
1652 TestProvider::new(vec![TestProvider::text_response("Too late")])
1653 .with_delay(Duration::from_millis(50)),
1654 );
1655 let tool = SubagentTool::new(
1656 SubagentConfig::new("worker").with_timeout_ms(10),
1657 provider,
1658 Arc::new(ToolRegistry::<()>::new()),
1659 {
1660 let store = Arc::clone(&event_store);
1661 move || -> Arc<dyn EventStore> { store.clone() }
1662 },
1663 );
1664
1665 let result = tool
1666 .run_subagent(
1667 "Take too long",
1668 "subagent_timeout".to_string(),
1669 &ToolContext::new(()),
1670 CancellationToken::new(),
1671 )
1672 .await?;
1673
1674 assert!(!result.success);
1675 assert_eq!(result.final_response, "Subagent timed out");
1676 assert!(
1677 result
1678 .error_details
1679 .context("missing timeout details")?
1680 .contains("timed out")
1681 );
1682
1683 Ok(())
1684 }
1685
1686 #[tokio::test]
1687 async fn test_run_subagent_progress_failures_do_not_abort_successful_runs() -> Result<()> {
1688 let provider = Arc::new(TestProvider::new(vec![
1689 TestProvider::tool_use_response("tool_1", "echo", json!({ "message": "child" })),
1690 TestProvider::text_response("Subagent complete"),
1691 ]));
1692 let mut tools = ToolRegistry::new();
1693 tools.register(TestEchoTool);
1694
1695 let tool = SubagentTool::new(SubagentConfig::new("worker"), provider, Arc::new(tools), {
1696 move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) }
1697 });
1698 let parent_ctx = ToolContext::new(()).with_event_store(
1699 Arc::new(AlwaysFailAppendEventStore),
1700 ThreadId::new(),
1701 1,
1702 Arc::new(LocalEventAuthority::new()),
1703 );
1704
1705 let result = tool
1706 .run_subagent(
1707 "Inspect the repo",
1708 "subagent_progress".to_string(),
1709 &parent_ctx,
1710 CancellationToken::new(),
1711 )
1712 .await?;
1713
1714 assert!(result.success);
1715 assert_eq!(result.final_response, "Subagent complete");
1716 assert_eq!(result.tool_count, 1);
1717
1718 Ok(())
1719 }
1720
1721 #[tokio::test]
1722 async fn test_run_subagent_panic_classified_as_error_not_disconnected() -> Result<()> {
1723 let tool = SubagentTool::new(
1732 SubagentConfig::new("worker"),
1733 Arc::new(PanicProvider),
1734 Arc::new(ToolRegistry::<()>::new()),
1735 move || -> Arc<dyn EventStore> { Arc::new(InMemoryEventStore::new()) },
1736 );
1737
1738 let result = tool
1739 .run_subagent(
1740 "Crash",
1741 "subagent_panic".to_string(),
1742 &ToolContext::new(()),
1743 CancellationToken::new(),
1744 )
1745 .await?;
1746
1747 assert!(!result.success);
1748 assert_ne!(result.final_response, "Subagent ended unexpectedly");
1751 let details = result
1752 .error_details
1753 .context("panicking subagent must carry structured error details")?;
1754 assert!(
1755 !details.contains("ended before returning a final state"),
1756 "panic must not be classified as Disconnected; got {details:?}",
1757 );
1758 assert!(
1759 details.contains("panicked"),
1760 "structured error should reflect the panic; got {details:?}",
1761 );
1762 assert!(
1763 details.contains("panic provider should disconnect subagent"),
1764 "structured error should carry the original panic message; got {details:?}",
1765 );
1766
1767 Ok(())
1768 }
1769
1770 #[tokio::test]
1771 async fn test_run_subagent_refusal_marks_result_as_failed() -> Result<()> {
1772 let tool = SubagentTool::new(
1773 SubagentConfig::new("worker"),
1774 Arc::new(TestProvider::new(vec![TestProvider::refusal_response(
1775 Some("Refused for policy reasons"),
1776 )])),
1777 Arc::new(ToolRegistry::<()>::new()),
1778 || Arc::new(InMemoryEventStore::new()),
1779 );
1780
1781 let result = tool
1782 .run_subagent(
1783 "Refuse",
1784 "subagent_refusal".to_string(),
1785 &ToolContext::new(()),
1786 CancellationToken::new(),
1787 )
1788 .await?;
1789
1790 assert!(!result.success);
1791 assert_eq!(result.final_response, "Refused for policy reasons");
1792 assert_eq!(
1793 result.error_details.as_deref(),
1794 Some("Refused for policy reasons")
1795 );
1796
1797 Ok(())
1798 }
1799
1800 #[tokio::test]
1801 async fn test_run_subagent_cancelled_marks_result_as_failed() -> Result<()> {
1802 let tool = SubagentTool::new(
1803 SubagentConfig::new("worker"),
1804 Arc::new(
1805 TestProvider::new(vec![TestProvider::text_response("Too late")])
1806 .with_delay(Duration::from_millis(50)),
1807 ),
1808 Arc::new(ToolRegistry::<()>::new()),
1809 || Arc::new(InMemoryEventStore::new()),
1810 );
1811 let cancel_token = CancellationToken::new();
1812 cancel_token.cancel();
1813
1814 let result = tool
1815 .run_subagent(
1816 "Cancel",
1817 "subagent_cancelled".to_string(),
1818 &ToolContext::new(()),
1819 cancel_token,
1820 )
1821 .await?;
1822
1823 assert!(!result.success);
1824 assert_eq!(result.final_response, "Subagent cancelled");
1825 assert!(
1826 result
1827 .error_details
1828 .context("missing cancellation details")?
1829 .contains("cancelled")
1830 );
1831
1832 Ok(())
1833 }
1834
1835 #[tokio::test]
1836 async fn test_run_subagent_llm_error_does_not_infer_failed_tool() -> Result<()> {
1837 let provider = Arc::new(TestProvider::new(vec![
1838 ChatOutcome::ServerError("llm transport failed".to_string()),
1839 ChatOutcome::ServerError("llm transport failed".to_string()),
1840 ChatOutcome::ServerError("llm transport failed".to_string()),
1841 ChatOutcome::ServerError("llm transport failed".to_string()),
1842 ChatOutcome::ServerError("llm transport failed".to_string()),
1843 ChatOutcome::ServerError("llm transport failed".to_string()),
1844 ]));
1845 let mut tools = ToolRegistry::new();
1846 tools.register(TestEchoTool);
1847
1848 let tool = SubagentTool::new(
1849 SubagentConfig::new("worker"),
1850 provider,
1851 Arc::new(tools),
1852 || Arc::new(InMemoryEventStore::new()),
1853 );
1854
1855 let result = tool
1856 .run_subagent(
1857 "Trigger an llm failure",
1858 "subagent_llm_error".to_string(),
1859 &ToolContext::new(()),
1860 CancellationToken::new(),
1861 )
1862 .await?;
1863
1864 assert!(!result.success);
1865 assert!(result.failed_tool.is_none());
1866 assert!(
1867 result
1868 .error_details
1869 .as_deref()
1870 .unwrap_or_default()
1871 .contains("Server error")
1872 );
1873
1874 Ok(())
1875 }
1876
1877 #[tokio::test]
1878 async fn test_replay_subagent_events_stops_after_error() -> Result<()> {
1879 let event_store: Arc<dyn EventStore> = Arc::new(InMemoryEventStore::new());
1880 let thread_id = ThreadId::new();
1881 let authority = LocalEventAuthority::new();
1882 event_store
1883 .append(
1884 &thread_id,
1885 1,
1886 authority.wrap(AgentEvent::Error {
1887 message: "subagent boom".to_string(),
1888 recoverable: false,
1889 }),
1890 )
1891 .await?;
1892 event_store
1893 .append(
1894 &thread_id,
1895 1,
1896 authority.wrap(AgentEvent::Text {
1897 message_id: "msg_after_error".to_string(),
1898 text: "should not be appended".to_string(),
1899 }),
1900 )
1901 .await?;
1902
1903 let mut state = SubagentExecutionState::new();
1904 replay_subagent_events(
1905 &event_store,
1906 &thread_id,
1907 &ToolContext::new(()),
1908 &SubagentConfig::new("worker"),
1909 "subagent_error",
1910 &mut state,
1911 )
1912 .await?;
1913
1914 assert!(!state.success);
1915 assert_eq!(state.final_response, "subagent boom");
1916 assert_eq!(state.error_details.as_deref(), Some("subagent boom"));
1917
1918 Ok(())
1919 }
1920}