Skip to main content

oxi_agent/tools/
subagent.rs

1/// Subagent tool — delegate tasks to specialized agents
2///
3/// Spawns a separate `oxi --mode json` process for each invocation,
4/// giving it an isolated context window.
5///
6/// Supports three modes:
7///   - Single: { agent: "name", task: "..." }
8///   - Parallel: { tasks: [{ agent, task }, ...] }
9///   - Chain: { chain: [{ agent, task: "... {previous} ..." }, ...] }
10///
11/// Agent definitions are markdown files with YAML frontmatter,
12/// discovered from `~/.oxi/agents/` (user) and `.oxi/agents/` (project).
13/// Discovery is delegated to [`crate::agent_definition::AgentDiscovery`].
14use super::{AgentTool, AgentToolResult, ProgressCallback, ToolContext, ToolError};
15use crate::agent_definition::{
16    AgentDefinition, AgentDiscovery, AgentScope, current_subagent_depth, max_subagent_depth,
17};
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use serde_json::{Value, json};
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use tokio::io::{AsyncBufReadExt, BufReader};
24use tokio::sync::oneshot;
25
26// ── Constants ──────────────────────────────────────────────────────────
27
28const MAX_PARALLEL_TASKS: usize = 8;
29const MAX_CONCURRENCY: usize = 4;
30
31// ── Progress callback type (reuse from tools.rs) ──────────────────────
32
33type ProgressFn = ProgressCallback;
34
35// ── Temp dir helper (no RAII — let OS clean up after subprocess exits) ──
36
37fn create_system_prompt_temp_dir(prefix: &str) -> Result<PathBuf, String> {
38    let path = std::env::temp_dir().join(format!("{}-{}", prefix, uuid::Uuid::new_v4()));
39    std::fs::create_dir_all(&path).map_err(|e| format!("Failed to create temp dir: {}", e))?;
40    Ok(path)
41}
42
43// ── Agent Discovery (delegates to SDK) ─────────────────────────────────
44
45/// Discover agents by delegating to [`AgentDiscovery`].
46pub fn discover_agents(cwd: &Path, scope: AgentScope) -> Vec<AgentDefinition> {
47    AgentDiscovery::discover(cwd, scope)
48        .unwrap_or_default()
49        .into_iter()
50        .map(|(_, def)| def)
51        .collect()
52}
53
54// ── Result Types ───────────────────────────────────────────────────────
55
56#[derive(Debug, Clone, Serialize, Deserialize, Default)]
57/// Usage statistics from a subagent run.
58pub struct UsageStats {
59    /// Input tokens consumed.
60    pub input_tokens: u64,
61    /// Output tokens consumed.
62    pub output_tokens: u64,
63    /// Cache read tokens (reserved for future use).
64    pub cache_read: u64,
65    /// Cache write tokens (reserved for future use).
66    pub cache_write: u64,
67    /// Cost in USD (reserved for future use).
68    pub cost: f64,
69    /// Number of agent turns.
70    pub turns: u32,
71}
72
73#[derive(Debug, Clone)]
74/// Result from a single subagent execution.
75pub struct SingleResult {
76    /// Agent name.
77    pub agent: String,
78    /// Discovery source ("user" or "project").
79    pub agent_source: String,
80    /// Task that was executed.
81    pub task: String,
82    /// Process exit code.
83    pub exit_code: i32,
84    /// Captured stdout text.
85    pub output: String,
86    /// Captured stderr text.
87    pub stderr: String,
88    /// Token usage.
89    pub usage: UsageStats,
90    /// Model used by the agent.
91    pub model: Option<String>,
92    /// Stop reason.
93    pub stop_reason: Option<String>,
94    /// Error message if the agent failed.
95    pub error_message: Option<String>,
96    /// Step index in chain mode.
97    pub step: Option<usize>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102/// Execution mode used for a subagent invocation.
103pub enum SubagentMode {
104    /// Single agent, single task.
105    Single,
106    /// Multiple agents/tasks running concurrently.
107    Parallel,
108    /// Sequential agents, passing {previous} output forward.
109    Chain,
110}
111
112#[derive(Debug, Clone)]
113/// Detailed results from a subagent invocation.
114pub struct SubagentDetails {
115    /// Which mode was used.
116    pub mode: SubagentMode,
117    /// Per-agent results.
118    pub results: Vec<SingleResult>,
119}
120
121// ── JSON line processing ───────────────────────────────────────────────
122
123fn process_json_line(
124    line: &str,
125    result: &mut SingleResult,
126    text: &mut String,
127    _on_progress: &Option<ProgressFn>,
128) {
129    let event: Value = match serde_json::from_str(line) {
130        Ok(v) => v,
131        Err(_) => return,
132    };
133    match event["type"].as_str().unwrap_or("") {
134        "text_delta" => {
135            if let Some(t) = event["text"].as_str() {
136                text.push_str(t);
137            }
138        }
139        "usage" => {
140            result.usage.input_tokens += event["input_tokens"].as_u64().unwrap_or(0);
141            result.usage.output_tokens += event["output_tokens"].as_u64().unwrap_or(0);
142            result.usage.turns += 1;
143        }
144        "complete" => {
145            result.stop_reason = Some("complete".to_string());
146        }
147        "error" => {
148            result.error_message = Some(
149                event["message"]
150                    .as_str()
151                    .unwrap_or("Unknown error")
152                    .to_string(),
153            );
154            result.stop_reason = Some("error".to_string());
155        }
156        _ => {}
157    }
158}
159
160// ── Process Execution ──────────────────────────────────────────────────
161
162/// Build command-line arguments for launching a subagent process.
163fn build_agent_args(agent: &AgentDefinition, tmp_dir: &Path, task: &str) -> Vec<String> {
164    let mut args = vec!["--mode".to_string(), "json".to_string(), "-p".to_string()];
165
166    if let Some(ref model) = agent.model {
167        args.push("--model".to_string());
168        args.push(model.clone());
169    }
170
171    if !agent.tools.is_empty() {
172        args.push("--tools".to_string());
173        args.push(agent.tools.join(","));
174    }
175
176    if let Some(ref prompt) = agent.system_prompt
177        && !prompt.is_empty()
178        && std::fs::write(tmp_dir.join("system_prompt.md"), prompt).is_ok()
179    {
180        args.push("--append-system-prompt".to_string());
181        args.push(
182            tmp_dir
183                .join("system_prompt.md")
184                .to_str()
185                .unwrap_or_default()
186                .to_string(),
187        );
188    }
189
190    args.push(format!("Task: {}", task));
191    args
192}
193
194/// Gracefully terminate a child process (SIGTERM → wait → SIGKILL).
195async fn terminate_child(
196    child: &mut tokio::process::Child,
197    stderr_handle: tokio::task::JoinHandle<String>,
198    result: &mut SingleResult,
199) {
200    #[cfg(unix)]
201    {
202        if let Some(pid) = child.id() {
203            // SAFETY: libc::kill sends SIGTERM to the child process. PID comes from
204            // child.id() which is a valid running process. Used for graceful shutdown
205            // before force-killing. Race (process exited) returns ESRCH harmlessly.
206            unsafe {
207                libc::kill(pid as i32, libc::SIGTERM);
208            }
209        }
210        let deadline = tokio::time::sleep(std::time::Duration::from_secs(5));
211        tokio::pin!(deadline);
212        tokio::select! {
213            _ = &mut deadline => { let _ = child.start_kill(); }
214            _ = child.wait() => {}
215        }
216    }
217    #[cfg(not(unix))]
218    {
219        let _ = child.start_kill();
220        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
221    }
222
223    // Collect stderr with short timeout
224    let _ = tokio::time::timeout(std::time::Duration::from_secs(1), async {
225        if let Ok(err) = stderr_handle.await {
226            result.stderr = err;
227        }
228    })
229    .await;
230}
231
232/// Run a single agent process with abort support.
233#[allow(clippy::too_many_arguments)]
234async fn run_single_agent(
235    cwd: &Path,
236    agents: &[AgentDefinition],
237    agent_name: &str,
238    task: &str,
239    agent_cwd: Option<&str>,
240    step: Option<usize>,
241    signal: Option<oneshot::Receiver<()>>,
242    on_progress: Option<ProgressFn>,
243    binary_path: &Path,
244) -> SingleResult {
245    let agent = match agents.iter().find(|a| a.name == agent_name) {
246        Some(a) => a,
247        None => {
248            let available = agents
249                .iter()
250                .map(|a| format!("\"{}\"", a.name))
251                .collect::<Vec<_>>()
252                .join(", ");
253            return SingleResult {
254                agent: agent_name.to_string(),
255                agent_source: "unknown".to_string(),
256                task: task.to_string(),
257                exit_code: 1,
258                output: String::new(),
259                stderr: format!(
260                    "Unknown agent: \"{}\". Available: {}",
261                    agent_name, available
262                ),
263                usage: UsageStats::default(),
264                model: None,
265                stop_reason: None,
266                error_message: Some(format!("Unknown agent: {}", agent_name)),
267                step,
268            };
269        }
270    };
271
272    let mut result = SingleResult {
273        agent: agent_name.to_string(),
274        agent_source: agent.source.clone(),
275        task: task.to_string(),
276        exit_code: 0,
277        output: String::new(),
278        stderr: String::new(),
279        usage: UsageStats::default(),
280        model: agent.model.clone(),
281        stop_reason: None,
282        error_message: None,
283        step,
284    };
285
286    // Notify progress
287    if let Some(ref cb) = on_progress {
288        cb(format!("[{}] running...", agent_name));
289    }
290
291    // Build command args
292    let tmp_dir = match create_system_prompt_temp_dir("oxi-subagent") {
293        Ok(tmp) => Some(tmp),
294        Err(e) => {
295            result.exit_code = 1;
296            result.stderr = e.clone();
297            result.error_message = Some(e);
298            return result;
299        }
300    };
301
302    let args = match tmp_dir {
303        Some(ref tmp) => build_agent_args(agent, tmp, task),
304        None => vec![
305            "--mode".to_string(),
306            "json".to_string(),
307            "-p".to_string(),
308            format!("Task: {}", task),
309        ],
310    };
311
312    let working_dir = agent_cwd
313        .map(PathBuf::from)
314        .unwrap_or_else(|| cwd.to_path_buf());
315
316    let mut cmd = tokio::process::Command::new(binary_path);
317    cmd.args(&args)
318        .current_dir(&working_dir)
319        .stdout(std::process::Stdio::piped())
320        .stderr(std::process::Stdio::piped())
321        .stdin(std::process::Stdio::null())
322        // Depth tracking for subagent nesting limits
323        .env(
324            "OXI_SUBAGENT_DEPTH",
325            (current_subagent_depth() + 1).to_string(),
326        )
327        .env(
328            "OXI_MAX_SUBAGENT_DEPTH",
329            agent.max_subagent_depth.to_string(),
330        );
331
332    let mut child = match cmd.spawn() {
333        Ok(c) => c,
334        Err(e) => {
335            result.exit_code = 1;
336            result.stderr = format!("Failed to spawn: {}", e);
337            result.error_message = Some(format!("Failed to spawn: {}", e));
338            return result;
339        }
340    };
341
342    let stdout = child.stdout.take().expect("stdout piped but missing");
343    let stderr = child.stderr.take().expect("stderr piped but missing");
344
345    // Spawn stdout reader → channel
346    let (line_tx, mut line_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
347    let _reader_handle = tokio::spawn(async move {
348        let reader = BufReader::new(stdout);
349        let mut lines = reader.lines();
350        while let Ok(Some(line)) = lines.next_line().await {
351            if line_tx.send(line).is_err() {
352                break;
353            }
354        }
355    });
356
357    // Spawn stderr reader
358    let stderr_handle = tokio::spawn(async move {
359        let mut err = String::new();
360        let reader = BufReader::new(stderr);
361        let mut lines = reader.lines();
362        while let Ok(Some(line)) = lines.next_line().await {
363            err.push_str(&line);
364            err.push('\n');
365        }
366        err
367    });
368
369    // Main loop: select between stdout lines and abort signal
370    let mut final_text = String::new();
371    let mut signal_rx = signal;
372    let mut aborted = false;
373
374    loop {
375        tokio::select! {
376            line = line_rx.recv() => {
377                match line {
378                    Some(line) => {
379                        process_json_line(&line, &mut result, &mut final_text, &on_progress);
380                    }
381                    None => break, // stdout EOF
382                }
383            }
384            _ = async {
385                match &mut signal_rx {
386                    Some(rx) => { let _ = rx.await; }
387                    None => std::future::pending::<()>().await,
388                }
389            } => {
390                aborted = true;
391                break;
392            }
393        }
394    }
395
396    if aborted {
397        result.stop_reason = Some("aborted".into());
398        result.error_message = Some("Aborted by user".into());
399        terminate_child(&mut child, stderr_handle, &mut result).await;
400    } else {
401        // Normal completion
402        if let Ok(err_output) = stderr_handle.await {
403            result.stderr = err_output;
404        }
405        match child.wait().await {
406            Ok(status) => result.exit_code = status.code().unwrap_or(1),
407            Err(_) => result.exit_code = 1,
408        }
409    }
410
411    result.output = final_text;
412
413    if let Some(ref cb) = on_progress {
414        let status = if result.exit_code == 0 {
415            "done"
416        } else {
417            "failed"
418        };
419        cb(format!("[{}] {}", agent_name, status));
420    }
421
422    result
423}
424
425/// Run multiple tasks with concurrency limit.
426async fn run_parallel(
427    cwd: &Path,
428    agents: &[AgentDefinition],
429    tasks: Vec<ParallelTask>,
430    binary_path: PathBuf,
431    on_progress: Option<ProgressFn>,
432) -> Vec<SingleResult> {
433    let n = tasks.len();
434    if n == 0 {
435        return vec![];
436    }
437
438    let limit = MAX_CONCURRENCY.min(n);
439    let indexed_tasks: Vec<(usize, ParallelTask)> = tasks.into_iter().enumerate().collect();
440    let mut all_results: Vec<Option<SingleResult>> = vec![None; n];
441
442    let mut i = 0;
443    while i < indexed_tasks.len() {
444        let end = (i + limit).min(indexed_tasks.len());
445        let chunk: Vec<_> = indexed_tasks[i..end].to_vec();
446        let mut handles = Vec::new();
447
448        for (idx, task) in chunk {
449            let agents = agents.to_vec();
450            let cwd = cwd.to_path_buf();
451            let bp = binary_path.clone();
452            let progress = on_progress.clone();
453
454            handles.push((
455                idx,
456                tokio::spawn(async move {
457                    run_single_agent(
458                        &cwd,
459                        &agents,
460                        &task.agent,
461                        &task.task,
462                        task.cwd.as_deref(),
463                        None,
464                        None,
465                        progress,
466                        &bp,
467                    )
468                    .await
469                }),
470            ));
471        }
472
473        for (idx, handle) in handles {
474            if let Ok(r) = handle.await {
475                all_results[idx] = Some(r);
476            }
477        }
478
479        i = end;
480    }
481
482    all_results
483        .into_iter()
484        .map(|r| {
485            r.unwrap_or_else(|| SingleResult {
486                agent: "unknown".to_string(),
487                agent_source: "unknown".to_string(),
488                task: "unknown".to_string(),
489                exit_code: 1,
490                output: String::new(),
491                stderr: "Task did not complete".to_string(),
492                usage: UsageStats::default(),
493                model: None,
494                stop_reason: Some("error".to_string()),
495                error_message: Some("Task did not complete".to_string()),
496                step: None,
497            })
498        })
499        .collect()
500}
501
502// ── Parameter Types ────────────────────────────────────────────────────
503
504#[derive(Debug, Deserialize, Clone)]
505struct ParallelTask {
506    agent: String,
507    task: String,
508    #[serde(default)]
509    cwd: Option<String>,
510}
511
512#[derive(Debug, Deserialize)]
513struct ChainStep {
514    agent: String,
515    task: String,
516    #[serde(default)]
517    cwd: Option<String>,
518}
519
520// ── Tool Implementation ────────────────────────────────────────────────
521
522/// Subagent tool for delegating tasks to specialized agents.
523pub struct SubagentTool {
524    /// Explicit working directory override. If None, uses ToolContext.root() at runtime.
525    cwd: Option<PathBuf>,
526    binary_path: Option<PathBuf>,
527    progress_callback: parking_lot::Mutex<Option<ProgressFn>>,
528}
529
530impl Default for SubagentTool {
531    fn default() -> Self {
532        Self::new()
533    }
534}
535
536impl SubagentTool {
537    /// Create with no explicit root (uses ToolContext.root() at runtime).
538    pub fn new() -> Self {
539        Self {
540            cwd: None,
541            binary_path: None,
542            progress_callback: parking_lot::Mutex::new(None),
543        }
544    }
545
546    /// Create with an explicit working directory (overrides ToolContext).
547    pub fn with_cwd(cwd: impl Into<PathBuf>) -> Self {
548        Self {
549            cwd: Some(cwd.into()),
550            binary_path: None,
551            progress_callback: parking_lot::Mutex::new(None),
552        }
553    }
554
555    fn get_binary(&self) -> PathBuf {
556        self.binary_path
557            .clone()
558            .or_else(|| std::env::current_exe().ok())
559            .unwrap_or_else(|| PathBuf::from("oxi"))
560    }
561}
562
563#[async_trait]
564impl AgentTool for SubagentTool {
565    fn name(&self) -> &str {
566        "subagent"
567    }
568
569    fn label(&self) -> &str {
570        "Subagent"
571    }
572
573    fn description(&self) -> &str {
574        "Delegate tasks to specialized subagents with isolated context. \
575         Modes: single (agent + task), parallel (tasks array), chain (sequential with {previous} placeholder). \
576         Agents are discovered from ~/.oxi/agents/ (user) and .oxi/agents/ (project)."
577    }
578
579    fn parameters_schema(&self) -> Value {
580        json!({
581            "type": "object",
582            "properties": {
583                "agent": {
584                    "type": "string",
585                    "description": "Agent name for single mode"
586                },
587                "task": {
588                    "type": "string",
589                    "description": "Task to delegate (single mode)"
590                },
591                "tasks": {
592                    "type": "array",
593                    "description": "Array of {agent, task} for parallel execution (max 8)",
594                    "items": {
595                        "type": "object",
596                        "properties": {
597                            "agent": { "type": "string" },
598                            "task": { "type": "string" },
599                            "cwd": { "type": "string" }
600                        },
601                        "required": ["agent", "task"]
602                    }
603                },
604                "chain": {
605                    "type": "array",
606                    "description": "Array of {agent, task} for sequential execution. Use {previous} in task for prior output.",
607                    "items": {
608                        "type": "object",
609                        "properties": {
610                            "agent": { "type": "string" },
611                            "task": { "type": "string" },
612                            "cwd": { "type": "string" }
613                        },
614                        "required": ["agent", "task"]
615                    }
616                },
617                "agentScope": {
618                    "type": "string",
619                    "description": "Agent discovery scope: 'user' (default), 'project', or 'both'",
620                    "enum": ["user", "project", "both"],
621                    "default": "user"
622                },
623                "cwd": {
624                    "type": "string",
625                    "description": "Working directory for single mode"
626                }
627            }
628        })
629    }
630
631    fn on_progress(&self, callback: ProgressCallback) {
632        *self.progress_callback.lock() = Some(callback);
633    }
634
635    async fn execute(
636        &self,
637        _tool_call_id: &str,
638        params: Value,
639        signal: Option<oneshot::Receiver<()>>,
640        ctx: &ToolContext,
641    ) -> Result<AgentToolResult, ToolError> {
642        // ── Depth check ──
643        // For the in-process path, depth is tracked via ToolContext
644        // (NOT env vars — concurrent set_var is UB, and env state
645        // leaks between forks). For the CLI path, depth is tracked
646        // via OXI_SUBAGENT_DEPTH env var (safe: each subprocess has
647        // its own env).
648        let runner = ctx.subagent_runner.clone();
649        let depth = if runner.is_some() {
650            ctx.subagent_depth
651        } else {
652            current_subagent_depth()
653        };
654        let max = if runner.is_some() {
655            3 // default; overridden per-agent below
656        } else {
657            max_subagent_depth()
658        };
659        if depth >= max {
660            return Ok(AgentToolResult::error(format!(
661                "Subagent depth limit reached ({}/{}). \
662                 Increase max_subagent_depth in your agent definition.",
663                depth, max
664            )));
665        }
666
667        // Use explicit cwd if set, else ctx.root()
668        let effective_cwd = self.cwd.as_deref().unwrap_or(ctx.root());
669
670        let scope: AgentScope = params
671            .get("agentScope")
672            .and_then(|v| serde_json::from_value(v.clone()).ok())
673            .unwrap_or(AgentScope::User);
674
675        let agents = discover_agents(effective_cwd, scope);
676        let progress = self.progress_callback.lock().clone();
677
678        let has_chain = params["chain"]
679            .as_array()
680            .map(|a| !a.is_empty())
681            .unwrap_or(false);
682        let has_tasks = params["tasks"]
683            .as_array()
684            .map(|a| !a.is_empty())
685            .unwrap_or(false);
686        let has_single = params["agent"].is_string() && params["task"].is_string();
687
688        let mode_count = [has_chain, has_tasks, has_single]
689            .iter()
690            .filter(|&&x| x)
691            .count();
692
693        if mode_count != 1 {
694            let available = agents
695                .iter()
696                .map(|a| format!("{} ({})", a.name, a.source))
697                .collect::<Vec<_>>()
698                .join(", ");
699            return Ok(AgentToolResult::error(format!(
700                "Provide exactly one mode: agent+task, tasks, or chain.\nAvailable agents: {}",
701                if available.is_empty() {
702                    "none".to_string()
703                } else {
704                    available
705                }
706            )));
707        }
708
709        // ── In-process path (library-native delegation) ──
710        // When a SubagentRunner is wired, prefer it over shelling out.
711        // This is the path library consumers (Oxios) use — they have
712        // no `oxi` subprocess. The CLI fallback below is the default
713        // for oxi-cli.
714        if let Some(runner) = &runner {
715            return execute_in_process(
716                effective_cwd,
717                &agents,
718                params,
719                runner,
720                depth,
721                progress,
722                signal,
723            )
724            .await;
725        }
726
727        // ── CLI fallback (existing path) ──
728        let binary = self.get_binary();
729
730        // ── Chain mode ──
731        if has_chain {
732            return execute_chain_mode(effective_cwd, &agents, params, &binary, progress, signal)
733                .await;
734        }
735
736        // ── Parallel mode ──
737        if has_tasks {
738            return execute_parallel_mode(effective_cwd, &agents, params, &binary, progress).await;
739        }
740
741        // ── Single mode ──
742        if has_single {
743            return execute_single_mode(effective_cwd, &agents, params, &binary, progress, signal)
744                .await;
745        }
746
747        Ok(AgentToolResult::error("Invalid parameters".to_string()))
748    }
749}
750
751// ── In-process execution (issue #28 gap 3) ────────────────────────────
752//
753// When a SubagentRunner is wired into ToolContext, the subagent tool
754// delegates to it instead of spawning CLI subprocesses. This is the
755// library-native path — essential for consumers (Oxios) that embed
756// oxi-agent without an `oxi` binary. Depth tracking uses the
757// ToolContext.subagent_depth field (NOT env vars — concurrent set_var
758// is UB and state leaks between sequential forks).
759
760/// Convert a ForkResult to a SingleResult for output formatting.
761fn fork_to_single(
762    fork: super::ForkResult,
763    agent_name: &str,
764    task: &str,
765    step: Option<usize>,
766) -> SingleResult {
767    let error = fork.error.clone();
768    SingleResult {
769        agent: agent_name.to_string(),
770        agent_source: "in-process".to_string(),
771        task: task.to_string(),
772        exit_code: if error.is_some() { 1 } else { 0 },
773        output: fork.text,
774        stderr: String::new(),
775        usage: UsageStats {
776            input_tokens: fork.input_tokens as u64,
777            output_tokens: fork.output_tokens as u64,
778            turns: fork.turns,
779            ..Default::default()
780        },
781        model: fork.model,
782        stop_reason: if error.is_some() {
783            Some("error".to_string())
784        } else {
785            Some("complete".to_string())
786        },
787        error_message: error,
788        step,
789    }
790}
791
792/// Execute via in-process SubagentRunner — single, parallel, and chain modes.
793#[allow(clippy::too_many_arguments)]
794async fn execute_in_process(
795    cwd: &Path,
796    agents: &[AgentDefinition],
797    params: Value,
798    runner: &Arc<dyn super::SubagentRunner>,
799    depth: u8,
800    progress: Option<ProgressFn>,
801    _signal: Option<oneshot::Receiver<()>>,
802) -> Result<AgentToolResult, ToolError> {
803    let has_chain = params["chain"]
804        .as_array()
805        .map(|a| !a.is_empty())
806        .unwrap_or(false);
807    let has_tasks = params["tasks"]
808        .as_array()
809        .map(|a| !a.is_empty())
810        .unwrap_or(false);
811    let has_single = params["agent"].is_string() && params["task"].is_string();
812
813    // ── Single mode ──
814    if has_single {
815        let agent_name = params["agent"].as_str().unwrap_or("");
816        let task = params["task"].as_str().unwrap_or("");
817        let agent_def = agents.iter().find(|a| a.name == agent_name);
818
819        if let Some(ref cb) = progress {
820            cb(format!("[{}] running (in-process)...", agent_name));
821        }
822
823        let fork = runner
824            .run_isolated(
825                agent_name,
826                task,
827                agent_def.and_then(|d| d.system_prompt.as_deref()),
828                agent_def.and_then(|d| d.model.as_deref()),
829                agent_def.map(|d| d.tools.as_slice()).unwrap_or(&[]),
830                cwd,
831                depth,
832            )
833            .await
834            .map_err(|e| format!("In-process subagent failed: {e}"))?;
835
836        let result = fork_to_single(fork, agent_name, task, None);
837        let is_error = result.stop_reason.as_deref() == Some("error");
838
839        if is_error {
840            let error_msg = result.error_message.as_deref().unwrap_or("unknown error");
841            return Ok(AgentToolResult::error(format!("Agent failed: {error_msg}")));
842        }
843
844        return Ok(AgentToolResult::success(if result.output.is_empty() {
845            "(no output)".to_string()
846        } else {
847            result.output.clone()
848        })
849        .with_metadata(json!({
850            "mode": "single",
851            "agent": result.agent,
852            "source": result.agent_source,
853            "backend": "in-process",
854            "usage": {
855                "input_tokens": result.usage.input_tokens,
856                "output_tokens": result.usage.output_tokens,
857                "turns": result.usage.turns,
858            },
859        })));
860    }
861
862    // ── Parallel mode ──
863    if has_tasks {
864        let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
865            .map_err(|e| format!("Invalid tasks parameter: {e}"))?;
866        let total = tasks.len();
867        if total == 0 {
868            return Ok(AgentToolResult::error("No tasks provided".to_string()));
869        }
870
871        // Run concurrently — safe because SubagentRunner is Send+Sync
872        // and there are no env-var mutations (unlike the CLI path).
873        let limit = MAX_CONCURRENCY.min(total);
874        let mut all_results: Vec<SingleResult> = Vec::with_capacity(total);
875        let mut all_errors: Vec<String> = Vec::new();
876
877        for chunk in tasks.chunks(limit) {
878            let mut handles = Vec::new();
879            for task in chunk {
880                let agent_def = agents.iter().find(|a| a.name == task.agent);
881                let runner = Arc::clone(runner);
882                let agent_name = task.agent.clone();
883                let task_text = task.task.clone();
884                let system_prompt = agent_def.and_then(|d| d.system_prompt.clone());
885                let model = agent_def.and_then(|d| d.model.clone());
886                let tools: Vec<String> = agent_def.map(|d| d.tools.clone()).unwrap_or_default();
887                let cwd = cwd.to_path_buf();
888
889                handles.push(tokio::spawn(async move {
890                    runner
891                        .run_isolated(
892                            &agent_name,
893                            &task_text,
894                            system_prompt.as_deref(),
895                            model.as_deref(),
896                            &tools,
897                            &cwd,
898                            depth,
899                        )
900                        .await
901                }));
902            }
903
904            for (i, handle) in handles.into_iter().enumerate() {
905                let task = &chunk[i];
906                match handle.await {
907                    Ok(Ok(fork)) => {
908                        all_results.push(fork_to_single(fork, &task.agent, &task.task, None))
909                    }
910                    Ok(Err(e)) => all_errors.push(format!("{}: {e}", task.agent)),
911                    Err(e) => all_errors.push(format!("{}: join error: {e}", task.agent)),
912                }
913            }
914        }
915
916        if !all_errors.is_empty() {
917            return Ok(AgentToolResult::error(format!(
918                "Errors: {}",
919                all_errors.join("; ")
920            )));
921        }
922
923        let success_count = all_results.iter().filter(|r| r.exit_code == 0).count();
924        let summaries: Vec<String> = all_results
925            .iter()
926            .map(|r| format!("[{}] {}", r.agent, r.output))
927            .collect();
928
929        return Ok(AgentToolResult::success(format!(
930            "Parallel: {}/{} succeeded\n\n{}",
931            success_count,
932            all_results.len(),
933            summaries.join("\n\n---\n\n")
934        ))
935        .with_metadata(json!({
936            "mode": "parallel",
937            "backend": "in-process",
938            "results": all_results.iter().map(|r| json!({
939                "agent": r.agent,
940                "exit_code": r.exit_code,
941            })).collect::<Vec<_>>()
942        })));
943    }
944
945    // ── Chain mode ──
946    if has_chain {
947        let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
948            .map_err(|e| format!("Invalid chain parameter: {e}"))?;
949        let total = steps.len();
950        let mut previous_output = String::new();
951        let mut results: Vec<SingleResult> = Vec::new();
952
953        for (i, step) in steps.into_iter().enumerate() {
954            let task = step.task.replace("{previous}", &previous_output);
955            let agent_def = agents.iter().find(|a| a.name == step.agent);
956
957            if let Some(ref cb) = progress {
958                cb(format!("[{}] chain step {}/{}", step.agent, i + 1, total));
959            }
960
961            let fork = runner
962                .run_isolated(
963                    &step.agent,
964                    &task,
965                    agent_def.and_then(|d| d.system_prompt.as_deref()),
966                    agent_def.and_then(|d| d.model.as_deref()),
967                    agent_def.map(|d| d.tools.as_slice()).unwrap_or(&[]),
968                    cwd,
969                    depth,
970                )
971                .await;
972
973            let fork = match fork {
974                Ok(f) => f,
975                Err(e) => {
976                    return Ok(AgentToolResult::error(format!(
977                        "Chain stopped at step {}/{} ({}): {e}",
978                        i + 1,
979                        total,
980                        step.agent
981                    )));
982                }
983            };
984
985            let is_error = fork.error.is_some();
986            let result = fork_to_single(fork, &step.agent, &task, Some(i + 1));
987
988            if is_error {
989                let error_msg = result.error_message.clone().unwrap_or_default();
990                return Ok(AgentToolResult::error(format!(
991                    "Chain stopped at step {}/{} ({}): {error_msg}",
992                    i + 1,
993                    total,
994                    step.agent
995                )));
996            }
997
998            previous_output = result.output.clone();
999            results.push(result);
1000        }
1001
1002        let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
1003        return Ok(AgentToolResult::success(if output.is_empty() {
1004            "(no output)".to_string()
1005        } else {
1006            output
1007        })
1008        .with_metadata(json!({
1009            "mode": "chain",
1010            "backend": "in-process",
1011            "steps": results.len(),
1012        })));
1013    }
1014
1015    Ok(AgentToolResult::error("Invalid parameters".to_string()))
1016}
1017
1018/// Execute chain mode: sequential agents where each step can reference {previous} output.
1019async fn execute_chain_mode(
1020    cwd: &Path,
1021    agents: &[AgentDefinition],
1022    params: Value,
1023    binary: &Path,
1024    progress: Option<ProgressFn>,
1025    signal: Option<oneshot::Receiver<()>>,
1026) -> Result<AgentToolResult, ToolError> {
1027    let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
1028        .map_err(|e| format!("Invalid chain parameter: {}", e))?;
1029    let total = steps.len();
1030    let mut results = Vec::new();
1031    let mut previous_output = String::new();
1032    let mut abort_signal = signal;
1033
1034    for (i, step) in steps.into_iter().enumerate() {
1035        let task = step.task.replace("{previous}", &previous_output);
1036        let step_signal = if i == total - 1 {
1037            abort_signal.take()
1038        } else {
1039            None
1040        };
1041
1042        let result = run_single_agent(
1043            cwd,
1044            agents,
1045            &step.agent,
1046            &task,
1047            step.cwd.as_deref(),
1048            Some(i + 1),
1049            step_signal,
1050            progress.clone(),
1051            binary,
1052        )
1053        .await;
1054
1055        let is_error = result.exit_code != 0
1056            || result.stop_reason.as_deref() == Some("error")
1057            || result.stop_reason.as_deref() == Some("aborted");
1058
1059        if is_error {
1060            let agent_name = result.agent.clone();
1061            let error_msg = result
1062                .error_message
1063                .clone()
1064                .unwrap_or_else(|| result.stderr.clone());
1065            results.push(result);
1066            return Ok(AgentToolResult::error(format!(
1067                "Chain stopped at step {}/{} ({}): {}",
1068                i + 1,
1069                total,
1070                agent_name,
1071                error_msg
1072            )));
1073        }
1074
1075        previous_output = result.output.clone();
1076        results.push(result);
1077    }
1078
1079    let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
1080    Ok(AgentToolResult::success(if output.is_empty() {
1081        "(no output)".to_string()
1082    } else {
1083        output
1084    })
1085    .with_metadata(json!({
1086        "mode": "chain",
1087        "steps": results.len(),
1088    })))
1089}
1090
1091/// Execute parallel mode: multiple agents running concurrently.
1092async fn execute_parallel_mode(
1093    cwd: &Path,
1094    agents: &[AgentDefinition],
1095    params: Value,
1096    binary: &Path,
1097    progress: Option<ProgressFn>,
1098) -> Result<AgentToolResult, ToolError> {
1099    let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
1100        .map_err(|e| format!("Invalid tasks parameter: {}", e))?;
1101
1102    if tasks.len() > MAX_PARALLEL_TASKS {
1103        return Ok(AgentToolResult::error(format!(
1104            "Too many parallel tasks ({}). Max is {}.",
1105            tasks.len(),
1106            MAX_PARALLEL_TASKS
1107        )));
1108    }
1109
1110    let results = run_parallel(cwd, agents, tasks, binary.to_path_buf(), progress).await;
1111
1112    let success_count = results.iter().filter(|r| r.exit_code == 0).count();
1113    let summaries: Vec<String> = results
1114        .iter()
1115        .map(|r| {
1116            let _preview = truncate_output(&r.output, 100);
1117            format!(
1118                "[{}]: {}",
1119                r.agent,
1120                if r.exit_code == 0 {
1121                    "completed"
1122                } else {
1123                    "failed"
1124                },
1125            )
1126        })
1127        .collect();
1128
1129    Ok(AgentToolResult::success(format!(
1130        "Parallel: {}/{} succeeded\n\n{}",
1131        success_count,
1132        results.len(),
1133        summaries.join("\n\n")
1134    ))
1135    .with_metadata(json!({
1136        "mode": "parallel",
1137        "results": results.iter().map(|r| json!({
1138            "agent": r.agent,
1139            "exit_code": r.exit_code,
1140        })).collect::<Vec<_>>()
1141    })))
1142}
1143
1144/// Execute single mode: one agent, one task.
1145async fn execute_single_mode(
1146    cwd: &Path,
1147    agents: &[AgentDefinition],
1148    params: Value,
1149    binary: &Path,
1150    progress: Option<ProgressFn>,
1151    signal: Option<oneshot::Receiver<()>>,
1152) -> Result<AgentToolResult, ToolError> {
1153    let agent_name = params["agent"]
1154        .as_str()
1155        .ok_or("Missing required parameter: agent")?;
1156    let task = params["task"]
1157        .as_str()
1158        .ok_or("Missing required parameter: task")?;
1159    let agent_cwd = params["cwd"].as_str();
1160
1161    let result = run_single_agent(
1162        cwd, agents, agent_name, task, agent_cwd, None, signal, progress, binary,
1163    )
1164    .await;
1165
1166    let is_error = result.exit_code != 0
1167        || result.stop_reason.as_deref() == Some("error")
1168        || result.stop_reason.as_deref() == Some("aborted");
1169
1170    if is_error {
1171        let error_msg = result.error_message.as_deref().unwrap_or(&result.stderr);
1172        return Ok(AgentToolResult::error(format!(
1173            "Agent {}: {}",
1174            result.stop_reason.as_deref().unwrap_or("failed"),
1175            error_msg
1176        )));
1177    }
1178
1179    Ok(AgentToolResult::success(if result.output.is_empty() {
1180        "(no output)".to_string()
1181    } else {
1182        result.output.clone()
1183    })
1184    .with_metadata(json!({
1185        "mode": "single",
1186        "agent": result.agent,
1187        "source": result.agent_source,
1188        "usage": {
1189            "input_tokens": result.usage.input_tokens,
1190            "output_tokens": result.usage.output_tokens,
1191            "turns": result.usage.turns,
1192        },
1193    })))
1194}
1195
1196// ── Helpers ────────────────────────────────────────────────────────────
1197
1198fn truncate_output(text: &str, max_chars: usize) -> String {
1199    if text.len() <= max_chars {
1200        text.to_string()
1201    } else {
1202        format!("{}...", &text[..max_chars])
1203    }
1204}
1205
1206// ── Tests ──────────────────────────────────────────────────────────────
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211
1212    #[test]
1213    fn test_discover_agents_empty_dir() {
1214        let tmp = tempfile::tempdir().unwrap();
1215        let agents = discover_agents(tmp.path(), AgentScope::Project);
1216        assert!(agents.is_empty());
1217    }
1218
1219    #[test]
1220    fn test_discover_agents_with_flat_files() {
1221        let tmp = tempfile::tempdir().unwrap();
1222        let agents_dir = tmp.path().join(".oxi").join("agents");
1223        std::fs::create_dir_all(&agents_dir).unwrap();
1224        std::fs::write(
1225            agents_dir.join("scout.md"),
1226            "---\nname: scout\ndescription: Recon\n---\nBe a scout.",
1227        )
1228        .unwrap();
1229        std::fs::write(
1230            agents_dir.join("worker.md"),
1231            "---\nname: worker\n---\nBe a worker.",
1232        )
1233        .unwrap();
1234        std::fs::write(agents_dir.join("ignore.txt"), "ignore me").unwrap();
1235        let agents = discover_agents(tmp.path(), AgentScope::Project);
1236        assert_eq!(agents.len(), 2);
1237        assert!(agents.iter().any(|a| a.name == "scout"));
1238        assert!(agents.iter().any(|a| a.name == "worker"));
1239    }
1240
1241    #[test]
1242    fn test_schema_structure() {
1243        let tool = SubagentTool::new();
1244        let schema = tool.parameters_schema();
1245        assert_eq!(schema["type"], "object");
1246        assert!(schema["properties"]["agent"].is_object());
1247        assert!(schema["properties"]["tasks"].is_object());
1248        assert!(schema["properties"]["chain"].is_object());
1249        assert!(schema["properties"]["agentScope"].is_object());
1250    }
1251
1252    #[test]
1253    fn test_truncate_output() {
1254        assert_eq!(truncate_output("hello", 10), "hello");
1255        assert_eq!(truncate_output("hello world foo", 5), "hello...");
1256    }
1257
1258    #[test]
1259    fn test_process_json_line_text_delta() {
1260        let mut result = SingleResult {
1261            agent: "test".into(),
1262            agent_source: "user".into(),
1263            task: "t".into(),
1264            exit_code: 0,
1265            output: String::new(),
1266            stderr: String::new(),
1267            usage: UsageStats::default(),
1268            model: None,
1269            stop_reason: None,
1270            error_message: None,
1271            step: None,
1272        };
1273        let mut text = String::new();
1274        process_json_line(
1275            r#"{"type":"text_delta","text":"hello"}"#,
1276            &mut result,
1277            &mut text,
1278            &None,
1279        );
1280        assert_eq!(text, "hello");
1281    }
1282
1283    #[test]
1284    fn test_process_json_line_usage() {
1285        let mut result = SingleResult {
1286            agent: "test".into(),
1287            agent_source: "user".into(),
1288            task: "t".into(),
1289            exit_code: 0,
1290            output: String::new(),
1291            stderr: String::new(),
1292            usage: UsageStats::default(),
1293            model: None,
1294            stop_reason: None,
1295            error_message: None,
1296            step: None,
1297        };
1298        let mut text = String::new();
1299        process_json_line(
1300            r#"{"type":"usage","input_tokens":100,"output_tokens":50}"#,
1301            &mut result,
1302            &mut text,
1303            &None,
1304        );
1305        assert_eq!(result.usage.input_tokens, 100);
1306        assert_eq!(result.usage.output_tokens, 50);
1307        assert_eq!(result.usage.turns, 1);
1308    }
1309
1310    #[test]
1311    fn test_depth_limit_default() {
1312        unsafe {
1313            std::env::remove_var("OXI_SUBAGENT_DEPTH");
1314            std::env::remove_var("OXI_MAX_SUBAGENT_DEPTH");
1315        }
1316        assert_eq!(current_subagent_depth(), 0);
1317        assert_eq!(max_subagent_depth(), 3);
1318    }
1319}