Skip to main content

phi_core/agents/
sub_agent.rs

1//! Sub-agent tool — delegates tasks to a child agent loop.
2//!
3//! The `SubAgentTool` implements `AgentTool` and internally runs `agent_loop()`
4//! with its own system prompt, tools, and provider. The parent LLM invokes it
5//! like any other tool, passing a natural-language `task` string.
6//!
7//! # Design
8//!
9//! - **Context isolation**: each invocation starts a fresh conversation
10//! - **Depth limiting**: sub-agents are not given other SubAgentTools (static, no runtime counter)
11//! - **Cancellation propagation**: the parent's cancel token is forwarded
12//! - **Event forwarding**: sub-agent events stream to the parent via `on_update`
13//!
14//! # Example
15//!
16//! ```rust,no_run
17//! use phi_core::agents::SubAgentTool;
18//! use phi_core::provider::ModelConfig;
19//!
20//! let researcher = SubAgentTool::new(
21//!     "researcher",
22//!     ModelConfig::anthropic("claude-sonnet-4-20250514", "Claude Sonnet 4", "sk-..."),
23//! )
24//! .with_description("Searches codebases and documents")
25//! .with_system_prompt("You are a research assistant.");
26//! ```
27
28use crate::agent_loop::{agent_loop, AgentLoopConfig};
29use crate::context::ExecutionLimits;
30use crate::provider::{ModelConfig, StreamProvider};
31use crate::types::*;
32use std::sync::Arc;
33use tokio::sync::mpsc;
34
35/// Default max turns for sub-agents (prevents runaway execution).
36const DEFAULT_MAX_TURNS: usize = 10;
37
38/// A tool that delegates work to a child agent loop.
39///
40/// When the parent LLM calls this tool, it spawns a fresh `agent_loop()` with
41/// its own system prompt, tools, and provider. The sub-agent runs to completion
42/// and its final text output is returned as the tool result.
43pub struct SubAgentTool {
44    tool_name: String,
45    tool_description: String,
46    system_prompt: String,
47    model_config: ModelConfig,
48    provider_override: Option<Arc<dyn StreamProvider>>,
49    tools: Vec<Arc<dyn AgentTool>>,
50    thinking_level: ThinkingLevel,
51    max_tokens: Option<u32>,
52    cache_config: CacheConfig,
53    tool_execution: ToolExecutionStrategy,
54    retry_config: crate::provider::retry::RetryConfig,
55    max_turns: usize,
56    /// The `loop_id` of the parent agent loop that spawned this sub-agent.
57    /// Passed into the child context as `parent_loop_id` so that the full
58    /// parent → child ancestry chain is traceable via `AgentStart` events.
59    parent_loop_id: Option<String>,
60}
61
62impl SubAgentTool {
63    /// Create a new sub-agent tool with a name and model config.
64    pub fn new(name: impl Into<String>, model_config: ModelConfig) -> Self {
65        let name = name.into();
66        Self {
67            tool_description: format!("Delegate a task to the '{}' sub-agent", name),
68            tool_name: name,
69            system_prompt: String::new(),
70            model_config,
71            provider_override: None,
72            tools: Vec::new(),
73            thinking_level: ThinkingLevel::Off,
74            max_tokens: None,
75            cache_config: CacheConfig::default(),
76            tool_execution: ToolExecutionStrategy::default(),
77            retry_config: crate::provider::retry::RetryConfig::default(),
78            max_turns: DEFAULT_MAX_TURNS,
79            parent_loop_id: None,
80        }
81    }
82
83    /// Set the parent loop's `loop_id` for child → parent ancestry tracking.
84    ///
85    /// When set, this value is placed in the child `AgentContext.parent_loop_id`,
86    /// which is then emitted in the child's `AgentStart` event. This creates a
87    /// bidirectional link: the parent sees the child's `loop_id` via
88    /// `ToolExecutionEnd.child_loop_id`, and the child records the parent via
89    /// `AgentStart.parent_loop_id`.
90    pub fn with_parent_loop_id(mut self, id: impl Into<String>) -> Self {
91        self.parent_loop_id = Some(id.into());
92        self
93    }
94
95    /// Override the provider used by this sub-agent, bypassing `ProviderRegistry` dispatch.
96    /// Primarily used in tests to inject a `MockProvider`.
97    pub fn with_provider_override(mut self, provider: Arc<dyn StreamProvider>) -> Self {
98        self.provider_override = Some(provider);
99        self
100    }
101
102    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
103        self.tool_description = desc.into();
104        self
105    }
106
107    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
108        self.system_prompt = prompt.into();
109        self
110    }
111
112    pub fn with_tools(mut self, tools: Vec<Arc<dyn AgentTool>>) -> Self {
113        self.tools = tools;
114        self
115    }
116
117    pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
118        self.thinking_level = level;
119        self
120    }
121
122    pub fn with_max_tokens(mut self, max: u32) -> Self {
123        self.max_tokens = Some(max);
124        self
125    }
126
127    pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
128        self.cache_config = config;
129        self
130    }
131
132    pub fn with_tool_execution(mut self, strategy: ToolExecutionStrategy) -> Self {
133        self.tool_execution = strategy;
134        self
135    }
136
137    pub fn with_retry_config(mut self, config: crate::provider::retry::RetryConfig) -> Self {
138        self.retry_config = config;
139        self
140    }
141
142    pub fn with_max_turns(mut self, max: usize) -> Self {
143        self.max_turns = max;
144        self
145    }
146}
147
148/*
149Both `SubAgentTool.tools` and `AgentContext.tools` now use `Vec<Arc<dyn AgentTool>>`,
150so tools can be passed directly — no adapter needed. Arc::clone on each tool just
151increments the reference count (cheap), and the sub-agent's context shares the same
152underlying tool instances as the parent.
153*/
154
155#[async_trait::async_trait]
156impl AgentTool for SubAgentTool {
157    fn name(&self) -> &str {
158        &self.tool_name
159    }
160
161    fn label(&self) -> &str {
162        &self.tool_name
163    }
164
165    fn description(&self) -> &str {
166        &self.tool_description
167    }
168
169    fn parameters_schema(&self) -> serde_json::Value {
170        serde_json::json!({
171            "type": "object",
172            "properties": {
173                "task": {
174                    "type": "string",
175                    "description": "The task to delegate to this sub-agent"
176                }
177            },
178            "required": ["task"]
179        })
180    }
181
182    async fn execute(
183        &self,
184        params: serde_json::Value, // LLM INPUT — expects `{"task": "..."}` — the natural-language task to delegate
185        ctx: ToolContext, // SYSTEM ENV — cancel token + on_update/on_progress from parent agent loop
186    ) -> Result<ToolResult, ToolError> {
187        let cancel = ctx.cancel; // forwarded to the child agent_loop as its abort signal
188        let on_update = ctx.on_update; // forwarded to child for event fan-out (parent sees child progress)
189        let on_progress = ctx.on_progress;
190        /*
191        RUST QUIRK: Chaining on serde_json::Value — `get()`, `and_then()`, `ok_or_else()`
192
193        `params` is `serde_json::Value` — a dynamically typed JSON value.
194        Extracting nested values requires a chain of Option-returning methods:
195
196          .get("task")       → Option<&Value> (None if key absent)
197          .and_then(|v| ...) → flatMap: if Some, apply f and return its Option; if None, stay None
198          .as_str()          → Option<&str> (None if value is not a JSON string)
199          .ok_or_else(|| ..) → convert Option → Result: None → Err(ToolError::...)
200          ?                  → propagate the Err if still None
201          .to_string()       → convert &str → owned String
202
203        Python analogy:
204          task = params.get("task")
205          if not isinstance(task, str):
206              raise ToolError("Missing required 'task' parameter")
207
208        `.ok_or_else(|| ...)` uses a closure (lazy): the error is only constructed
209        if we actually need it (i.e., when the Option is None). vs `.ok_or(error)` which
210        eagerly constructs the error even when Ok — wasteful if construction is expensive.
211        */
212        // Extract the task parameter
213        let task = params
214            .get("task") // Option<&Value>
215            .and_then(|v| v.as_str()) // Option<&str> — None if not a string
216            .ok_or_else(|| ToolError::InvalidArgs("Missing required 'task' parameter".into()))?
217            .to_string(); // &str → owned String
218
219        // Clone Arc references — increments reference count, no deep copy.
220        let tools: Vec<Arc<dyn AgentTool>> = self.tools.iter().map(Arc::clone).collect();
221
222        // Generate stable identity for the child loop.
223        // Each sub-agent invocation is its own independent session: fresh agent_id,
224        // session_id, and loop_id. The parent's loop_id is carried as parent_loop_id
225        // so the ancestry chain is traceable via AgentStart events.
226        let child_agent_id = uuid::Uuid::new_v4().to_string();
227        let child_session_id = uuid::Uuid::new_v4().to_string();
228        // ".sub.1" — ".sub" marks this as a sub-agent loop (distinguishes from top-level loops
229        // in the parent session), ".1" is the loop counter (fresh session → always starts at 1).
230        let child_loop_id = format!("{}.sub.1", child_session_id);
231
232        // Fresh context for the sub-agent
233        let mut context = AgentContext {
234            system_prompt: self.system_prompt.clone(),
235            messages: Vec::new(),
236            tools,
237            agent_id: Some(child_agent_id),
238            session_id: Some(child_session_id),
239            loop_id: Some(child_loop_id),
240            parent_loop_id: self.parent_loop_id.clone(), // links child back to parent
241            continuation_kind: None,
242            session: None,
243            user_context: Vec::new(),
244            inrun_context: Vec::new(),
245        };
246
247        // Config for the sub-agent loop
248        let config = AgentLoopConfig {
249            model_config: self.model_config.clone(),
250            provider_override: self.provider_override.clone(),
251            thinking_level: self.thinking_level,
252            max_tokens: self.max_tokens,
253            temperature: None,
254            convert_to_llm: None,
255            transform_context: None,
256            get_steering_messages: None,
257            get_follow_up_messages: None,
258            context_config: None,
259            execution_limits: Some(ExecutionLimits {
260                max_turns: self.max_turns,
261                // Generous token/duration limits — turn limit is the primary guard
262                max_total_tokens: 1_000_000,
263                max_duration: std::time::Duration::from_secs(300),
264                max_cost: None,
265            }),
266            cache_config: self.cache_config.clone(),
267            tool_execution: self.tool_execution.clone(),
268            tool_timeout: None,
269            response_format: crate::provider::ResponseFormat::Text,
270            retry_config: self.retry_config.clone(),
271            before_turn: None,
272            after_turn: None,
273            before_loop: None,
274            after_loop: None,
275            before_tool_execution: None,
276            after_tool_execution: None,
277            before_tool_execution_update: None,
278            after_tool_execution_update: None,
279            before_compaction_start: None,
280            after_compaction_end: None,
281            on_error: None,
282            input_filters: vec![],
283            first_turn_trigger: TurnTrigger::SubAgent,
284            config_id: None,
285            context_translation: None,
286            prun_pending: None,
287        };
288
289        /*
290        RUST QUIRK: `tokio::spawn` — spawning a concurrent async task
291
292        `tokio::spawn(async move { ... })` launches an async task that runs
293        CONCURRENTLY with the current code. It returns a `JoinHandle<T>` —
294        a handle you can `.await` to get the task's return value.
295
296        `async move { ... }` — an async block that OWNS (moves) its captured values.
297        The block is a "future" that gets polled by the tokio runtime.
298
299        Why spawn a separate task for event forwarding?
300        We need to RECEIVE events from `rx` while SIMULTANEOUSLY running `agent_loop()`.
301        If we ran both sequentially, agent_loop() would block waiting for someone to drain rx
302        (an unbounded channel will buffer, but we want real-time forwarding).
303        By spawning a task, the event forwarding runs in parallel with the agent loop.
304
305        Python analogy:
306          asyncio.create_task(forward_events(rx, on_update, on_progress))
307        */
308        // Channel for sub-agent events
309        let (tx, mut rx) = mpsc::unbounded_channel();
310
311        // Forward sub-agent events to parent via on_update and on_progress callbacks
312        let forward_handle = if on_update.is_some() || on_progress.is_some() {
313            let tool_name = self.tool_name.clone();
314            Some(tokio::spawn(async move {
315                // `while let Some(event) = rx.recv().await` — receive events until channel closes.
316                // `rx.recv()` returns None when all senders (tx) are dropped.
317                // When agent_loop() returns, it drops tx, which closes the channel, which breaks this loop.
318                while let Some(event) = rx.recv().await {
319                    // Forward progress messages via on_progress
320                    if let AgentEvent::ProgressMessage { text, .. } = &event {
321                        if let Some(ref cb) = on_progress {
322                            cb(text.clone());
323                        }
324                    }
325
326                    // Convert interesting events to ToolResult updates for the parent
327                    if let Some(ref on_update) = on_update {
328                        let update_text = match &event {
329                            AgentEvent::MessageUpdate {
330                                delta: StreamDelta::Text { delta },
331                                ..
332                            } => Some(delta.clone()),
333                            AgentEvent::ToolExecutionStart { tool_name, .. } => {
334                                Some(format!("[sub-agent calling tool: {}]", tool_name))
335                            }
336                            _ => None,
337                        };
338
339                        if let Some(text) = update_text {
340                            on_update(ToolResult {
341                                content: vec![Content::Text { text }],
342                                details: serde_json::json!({ "sub_agent": tool_name }),
343                                child_loop_id: None,
344                            });
345                        }
346                    }
347                }
348            }))
349        } else {
350            None
351        };
352
353        // Run the sub-agent loop. We capture context.loop_id after the call to surface it
354        // in ToolExecutionEnd.child_loop_id. The loop_id is already Some (we set it above);
355        // agent_loop only writes it when None, so our value is preserved.
356        let prompt = AgentMessage::Llm(LlmMessage::new(Message::user(task)));
357        let new_messages = agent_loop(vec![prompt], &mut context, &config, tx, cancel).await;
358        let returned_child_loop_id = context.loop_id.clone();
359
360        /*
361        RUST QUIRK: `let _ = handle.await` — explicitly discarding a Result
362
363        `handle.await` returns `Result<(), JoinError>` — it can fail if the task panicked.
364        `let _ = ...` explicitly ignores the result. This is idiomatic for "I don't care
365        about this result" and suppresses the "unused Result" compiler warning.
366
367        Why not just `handle.await.ok()`? Both work; `let _ =` is slightly more explicit
368        about intentional discard. `handle.await?` would propagate the JoinError, but
369        we're in execute() which returns ToolError, not JoinError — type mismatch.
370        */
371        // Wait for event forwarding to complete
372        if let Some(handle) = forward_handle {
373            let _ = handle.await; // wait for the spawned task to finish (ignoring panic errors)
374        }
375
376        // Extract final assistant text from the returned messages
377        let result_text = extract_final_text(&new_messages);
378
379        // Include full sub-agent conversation in details for debugging
380        let details = serde_json::json!({
381            "sub_agent": self.tool_name,
382            "turns": new_messages.len(),
383        });
384
385        Ok(ToolResult {
386            content: vec![Content::Text { text: result_text }],
387            details,
388            child_loop_id: returned_child_loop_id,
389        })
390    }
391}
392
393/// Extract the final assistant text from agent messages.
394/// Collects text from the last assistant message, or returns a fallback.
395fn extract_final_text(messages: &[AgentMessage]) -> String {
396    for msg in messages.iter().rev() {
397        if let AgentMessage::Llm(LlmMessage {
398            message: Message::Assistant { content, .. },
399            ..
400        }) = msg
401        {
402            let texts: Vec<&str> = content
403                .iter()
404                .filter_map(|c| match c {
405                    Content::Text { text } => Some(text.as_str()),
406                    _ => None,
407                })
408                .collect();
409            if !texts.is_empty() {
410                return texts.join("\n");
411            }
412        }
413    }
414    "(sub-agent produced no text output)".to_string()
415}