phi_core/agents/sub_agent.rs
1//! Sub-agent tool — delegates tasks to a child agent loop.
2//!
3//! The `SubAgentTool` implements `AgentTool` and internally runs `agent_loop()`
4//! with its own system prompt, tools, and provider. The parent LLM invokes it
5//! like any other tool, passing a natural-language `task` string.
6//!
7//! # Design
8//!
9//! - **Context isolation**: each invocation starts a fresh conversation
10//! - **Depth limiting**: sub-agents are not given other SubAgentTools (static, no runtime counter)
11//! - **Cancellation propagation**: the parent's cancel token is forwarded
12//! - **Event forwarding**: sub-agent events stream to the parent via `on_update`
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use phi_core::agents::SubAgentTool;
18//! use phi_core::provider::ModelConfig;
19//!
20//! let researcher = SubAgentTool::new(
21//! "researcher",
22//! ModelConfig::anthropic("claude-sonnet-4-20250514", "Claude Sonnet 4", "sk-..."),
23//! )
24//! .with_description("Searches codebases and documents")
25//! .with_system_prompt("You are a research assistant.");
26//! ```
27
28use crate::agent_loop::{agent_loop, AgentLoopConfig};
29use crate::context::ExecutionLimits;
30use crate::provider::{ModelConfig, StreamProvider};
31use crate::types::*;
32use std::sync::Arc;
33use tokio::sync::mpsc;
34
35/// Default max turns for sub-agents (prevents runaway execution).
36const DEFAULT_MAX_TURNS: usize = 10;
37
38/// A tool that delegates work to a child agent loop.
39///
40/// When the parent LLM calls this tool, it spawns a fresh `agent_loop()` with
41/// its own system prompt, tools, and provider. The sub-agent runs to completion
42/// and its final text output is returned as the tool result.
43pub struct SubAgentTool {
44 tool_name: String,
45 tool_description: String,
46 system_prompt: String,
47 model_config: ModelConfig,
48 provider_override: Option<Arc<dyn StreamProvider>>,
49 tools: Vec<Arc<dyn AgentTool>>,
50 thinking_level: ThinkingLevel,
51 max_tokens: Option<u32>,
52 cache_config: CacheConfig,
53 tool_execution: ToolExecutionStrategy,
54 retry_config: crate::provider::retry::RetryConfig,
55 max_turns: usize,
56 /// The `loop_id` of the parent agent loop that spawned this sub-agent.
57 /// Passed into the child context as `parent_loop_id` so that the full
58 /// parent → child ancestry chain is traceable via `AgentStart` events.
59 parent_loop_id: Option<String>,
60}
61
62impl SubAgentTool {
63 /// Create a new sub-agent tool with a name and model config.
64 pub fn new(name: impl Into<String>, model_config: ModelConfig) -> Self {
65 let name = name.into();
66 Self {
67 tool_description: format!("Delegate a task to the '{}' sub-agent", name),
68 tool_name: name,
69 system_prompt: String::new(),
70 model_config,
71 provider_override: None,
72 tools: Vec::new(),
73 thinking_level: ThinkingLevel::Off,
74 max_tokens: None,
75 cache_config: CacheConfig::default(),
76 tool_execution: ToolExecutionStrategy::default(),
77 retry_config: crate::provider::retry::RetryConfig::default(),
78 max_turns: DEFAULT_MAX_TURNS,
79 parent_loop_id: None,
80 }
81 }
82
83 /// Set the parent loop's `loop_id` for child → parent ancestry tracking.
84 ///
85 /// When set, this value is placed in the child `AgentContext.parent_loop_id`,
86 /// which is then emitted in the child's `AgentStart` event. This creates a
87 /// bidirectional link: the parent sees the child's `loop_id` via
88 /// `ToolExecutionEnd.child_loop_id`, and the child records the parent via
89 /// `AgentStart.parent_loop_id`.
90 pub fn with_parent_loop_id(mut self, id: impl Into<String>) -> Self {
91 self.parent_loop_id = Some(id.into());
92 self
93 }
94
95 /// Override the provider used by this sub-agent, bypassing `ProviderRegistry` dispatch.
96 /// Primarily used in tests to inject a `MockProvider`.
97 pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
98 self.provider_override = Some(provider);
99 self
100 }
101
102 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
103 self.tool_description = desc.into();
104 self
105 }
106
107 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
108 self.system_prompt = prompt.into();
109 self
110 }
111
112 pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
113 self.tools = tools;
114 self
115 }
116
117 pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
118 self.thinking_level = level;
119 self
120 }
121
122 pub fn with_max_tokens(mut self, max: u32) -> Self {
123 self.max_tokens = Some(max);
124 self
125 }
126
127 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
128 self.cache_config = config;
129 self
130 }
131
132 pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
133 self.tool_execution = strategy;
134 self
135 }
136
137 pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
138 self.retry_config = config;
139 self
140 }
141
142 pub fn with_max_turns(mut self, max: usize) -> Self {
143 self.max_turns = max;
144 self
145 }
146}
147
148/*
149Both `SubAgentTool.tools` and `AgentContext.tools` now use `Vec<Arc<dyn AgentTool>>`,
150so tools can be passed directly — no adapter needed. Arc::clone on each tool just
151increments the reference count (cheap), and the sub-agent's context shares the same
152underlying tool instances as the parent.
153*/
154
155#[async_trait::async_trait]
156impl AgentTool for SubAgentTool {
157 fn name(&self) -> &str {
158 &self.tool_name
159 }
160
161 fn label(&self) -> &str {
162 &self.tool_name
163 }
164
165 fn description(&self) -> &str {
166 &self.tool_description
167 }
168
169 fn parameters_schema(&self) -> serde_json::Value {
170 serde_json::json!({
171 "type": "object",
172 "properties": {
173 "task": {
174 "type": "string",
175 "description": "The task to delegate to this sub-agent"
176 }
177 },
178 "required": ["task"]
179 })
180 }
181
182 async fn execute(
183 &self,
184 params: serde_json::Value, // LLM INPUT — expects `{"task": "..."}` — the natural-language task to delegate
185 ctx: ToolContext, // SYSTEM ENV — cancel token + on_update/on_progress from parent agent loop
186 ) -> Result<ToolResult, ToolError> {
187 let cancel = ctx.cancel; // forwarded to the child agent_loop as its abort signal
188 let on_update = ctx.on_update; // forwarded to child for event fan-out (parent sees child progress)
189 let on_progress = ctx.on_progress;
190 /*
191 RUST QUIRK: Chaining on serde_json::Value — `get()`, `and_then()`, `ok_or_else()`
192
193 `params` is `serde_json::Value` — a dynamically typed JSON value.
194 Extracting nested values requires a chain of Option-returning methods:
195
196 .get("task") → Option<&Value> (None if key absent)
197 .and_then(|v| ...) → flatMap: if Some, apply f and return its Option; if None, stay None
198 .as_str() → Option<&str> (None if value is not a JSON string)
199 .ok_or_else(|| ..) → convert Option → Result: None → Err(ToolError::...)
200 ? → propagate the Err if still None
201 .to_string() → convert &str → owned String
202
203 Python analogy:
204 task = params.get("task")
205 if not isinstance(task, str):
206 raise ToolError("Missing required 'task' parameter")
207
208 `.ok_or_else(|| ...)` uses a closure (lazy): the error is only constructed
209 if we actually need it (i.e., when the Option is None). vs `.ok_or(error)` which
210 eagerly constructs the error even when Ok — wasteful if construction is expensive.
211 */
212 // Extract the task parameter
213 let task = params
214 .get("task") // Option<&Value>
215 .and_then(|v| v.as_str()) // Option<&str> — None if not a string
216 .ok_or_else(|| ToolError::InvalidArgs("Missing required 'task' parameter".into()))?
217 .to_string(); // &str → owned String
218
219 // Clone Arc references — increments reference count, no deep copy.
220 let tools: Vec<Arc<dyn AgentTool>> = self.tools.iter().map(Arc::clone).collect();
221
222 // Generate stable identity for the child loop.
223 // Each sub-agent invocation is its own independent session: fresh agent_id,
224 // session_id, and loop_id. The parent's loop_id is carried as parent_loop_id
225 // so the ancestry chain is traceable via AgentStart events.
226 let child_agent_id = uuid::Uuid::new_v4().to_string();
227 let child_session_id = uuid::Uuid::new_v4().to_string();
228 // ".sub.1" — ".sub" marks this as a sub-agent loop (distinguishes from top-level loops
229 // in the parent session), ".1" is the loop counter (fresh session → always starts at 1).
230 let child_loop_id = format!("{}.sub.1", child_session_id);
231
232 // Fresh context for the sub-agent
233 let mut context = AgentContext {
234 system_prompt: self.system_prompt.clone(),
235 messages: Vec::new(),
236 tools,
237 agent_id: Some(child_agent_id),
238 session_id: Some(child_session_id),
239 loop_id: Some(child_loop_id),
240 parent_loop_id: self.parent_loop_id.clone(), // links child back to parent
241 continuation_kind: None,
242 session: None,
243 user_context: Vec::new(),
244 inrun_context: Vec::new(),
245 };
246
247 // Config for the sub-agent loop
248 let config = AgentLoopConfig {
249 model_config: self.model_config.clone(),
250 provider_override: self.provider_override.clone(),
251 thinking_level: self.thinking_level,
252 max_tokens: self.max_tokens,
253 temperature: None,
254 convert_to_llm: None,
255 transform_context: None,
256 get_steering_messages: None,
257 get_follow_up_messages: None,
258 context_config: None,
259 execution_limits: Some(ExecutionLimits {
260 max_turns: self.max_turns,
261 // Generous token/duration limits — turn limit is the primary guard
262 max_total_tokens: 1_000_000,
263 max_duration: std::time::Duration::from_secs(300),
264 max_cost: None,
265 }),
266 cache_config: self.cache_config.clone(),
267 tool_execution: self.tool_execution.clone(),
268 tool_timeout: None,
269 response_format: crate::provider::ResponseFormat::Text,
270 retry_config: self.retry_config.clone(),
271 before_turn: None,
272 after_turn: None,
273 before_loop: None,
274 after_loop: None,
275 before_tool_execution: None,
276 after_tool_execution: None,
277 before_tool_execution_update: None,
278 after_tool_execution_update: None,
279 before_compaction_start: None,
280 after_compaction_end: None,
281 on_error: None,
282 input_filters: vec![],
283 first_turn_trigger: TurnTrigger::SubAgent,
284 config_id: None,
285 context_translation: None,
286 prun_pending: None,
287 };
288
289 /*
290 RUST QUIRK: `tokio::spawn` — spawning a concurrent async task
291
292 `tokio::spawn(async move { ... })` launches an async task that runs
293 CONCURRENTLY with the current code. It returns a `JoinHandle<T>` —
294 a handle you can `.await` to get the task's return value.
295
296 `async move { ... }` — an async block that OWNS (moves) its captured values.
297 The block is a "future" that gets polled by the tokio runtime.
298
299 Why spawn a separate task for event forwarding?
300 We need to RECEIVE events from `rx` while SIMULTANEOUSLY running `agent_loop()`.
301 If we ran both sequentially, agent_loop() would block waiting for someone to drain rx
302 (an unbounded channel will buffer, but we want real-time forwarding).
303 By spawning a task, the event forwarding runs in parallel with the agent loop.
304
305 Python analogy:
306 asyncio.create_task(forward_events(rx, on_update, on_progress))
307 */
308 // Channel for sub-agent events
309 let (tx, mut rx) = mpsc::unbounded_channel();
310
311 // Forward sub-agent events to parent via on_update and on_progress callbacks
312 let forward_handle = if on_update.is_some() || on_progress.is_some() {
313 let tool_name = self.tool_name.clone();
314 Some(tokio::spawn(async move {
315 // `while let Some(event) = rx.recv().await` — receive events until channel closes.
316 // `rx.recv()` returns None when all senders (tx) are dropped.
317 // When agent_loop() returns, it drops tx, which closes the channel, which breaks this loop.
318 while let Some(event) = rx.recv().await {
319 // Forward progress messages via on_progress
320 if let AgentEvent::ProgressMessage { text, .. } = &event {
321 if let Some(ref cb) = on_progress {
322 cb(text.clone());
323 }
324 }
325
326 // Convert interesting events to ToolResult updates for the parent
327 if let Some(ref on_update) = on_update {
328 let update_text = match &event {
329 AgentEvent::MessageUpdate {
330 delta: StreamDelta::Text { delta },
331 ..
332 } => Some(delta.clone()),
333 AgentEvent::ToolExecutionStart { tool_name, .. } => {
334 Some(format!("[sub-agent calling tool: {}]", tool_name))
335 }
336 _ => None,
337 };
338
339 if let Some(text) = update_text {
340 on_update(ToolResult {
341 content: vec![Content::Text { text }],
342 details: serde_json::json!({ "sub_agent": tool_name }),
343 child_loop_id: None,
344 });
345 }
346 }
347 }
348 }))
349 } else {
350 None
351 };
352
353 // Run the sub-agent loop. We capture context.loop_id after the call to surface it
354 // in ToolExecutionEnd.child_loop_id. The loop_id is already Some (we set it above);
355 // agent_loop only writes it when None, so our value is preserved.
356 let prompt = AgentMessage::Llm(LlmMessage::new(Message::user(task)));
357 let new_messages = agent_loop(vec![prompt], &mut context, &config, tx, cancel).await;
358 let returned_child_loop_id = context.loop_id.clone();
359
360 /*
361 RUST QUIRK: `let _ = handle.await` — explicitly discarding a Result
362
363 `handle.await` returns `Result<(), JoinError>` — it can fail if the task panicked.
364 `let _ = ...` explicitly ignores the result. This is idiomatic for "I don't care
365 about this result" and suppresses the "unused Result" compiler warning.
366
367 Why not just `handle.await.ok()`? Both work; `let _ =` is slightly more explicit
368 about intentional discard. `handle.await?` would propagate the JoinError, but
369 we're in execute() which returns ToolError, not JoinError — type mismatch.
370 */
371 // Wait for event forwarding to complete
372 if let Some(handle) = forward_handle {
373 let _ = handle.await; // wait for the spawned task to finish (ignoring panic errors)
374 }
375
376 // Extract final assistant text from the returned messages
377 let result_text = extract_final_text(&new_messages);
378
379 // Include full sub-agent conversation in details for debugging
380 let details = serde_json::json!({
381 "sub_agent": self.tool_name,
382 "turns": new_messages.len(),
383 });
384
385 Ok(ToolResult {
386 content: vec![Content::Text { text: result_text }],
387 details,
388 child_loop_id: returned_child_loop_id,
389 })
390 }
391}
392
393/// Extract the final assistant text from agent messages.
394/// Collects text from the last assistant message, or returns a fallback.
395fn extract_final_text(messages: &[AgentMessage]) -> String {
396 for msg in messages.iter().rev() {
397 if let AgentMessage::Llm(LlmMessage {
398 message: Message::Assistant { content, .. },
399 ..
400 }) = msg
401 {
402 let texts: Vec<&str> = content
403 .iter()
404 .filter_map(|c| match c {
405 Content::Text { text } => Some(text.as_str()),
406 _ => None,
407 })
408 .collect();
409 if !texts.is_empty() {
410 return texts.join("\n");
411 }
412 }
413 }
414 "(sub-agent produced no text output)".to_string()
415}