Skip to main content

oxi_agent/tools/
subagent.rs

1/// Subagent tool — delegate tasks to specialized agents
2///
3/// Spawns a separate `oxi --mode json` process for each invocation,
4/// giving it an isolated context window.
5///
6/// Supports three modes:
7///   - Single: { agent: "name", task: "..." }
8///   - Parallel: { tasks: [{ agent, task }, ...] }
9///   - Chain: { chain: [{ agent, task: "... {previous} ..." }, ...] }
10///
11/// Agent definitions are markdown files with YAML frontmatter,
12/// discovered from `~/.oxi/agents/` (user) and `.oxi/agents/` (project).
13/// Discovery is delegated to [`crate::agent_definition::AgentDiscovery`].
14use super::{AgentTool, AgentToolResult, ProgressCallback, ToolContext, ToolError};
15use crate::agent_definition::{
16    AgentDefinition, AgentDiscovery, AgentScope, current_subagent_depth, max_subagent_depth,
17};
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use serde_json::{Value, json};
21use std::path::{Path, PathBuf};
22use tokio::io::{AsyncBufReadExt, BufReader};
23use tokio::sync::oneshot;
24
25// ── Constants ──────────────────────────────────────────────────────────
26
27const MAX_PARALLEL_TASKS: usize = 8;
28const MAX_CONCURRENCY: usize = 4;
29
30// ── Progress callback type (reuse from tools.rs) ──────────────────────
31
32type ProgressFn = ProgressCallback;
33
34// ── Temp dir helper (no RAII — let OS clean up after subprocess exits) ──
35
36fn create_system_prompt_temp_dir(prefix: &str) -> Result<PathBuf, String> {
37    let path = std::env::temp_dir().join(format!("{}-{}", prefix, uuid::Uuid::new_v4()));
38    std::fs::create_dir_all(&path).map_err(|e| format!("Failed to create temp dir: {}", e))?;
39    Ok(path)
40}
41
42// ── Agent Discovery (delegates to SDK) ─────────────────────────────────
43
44/// Discover agents by delegating to [`AgentDiscovery`].
45pub fn discover_agents(cwd: &Path, scope: AgentScope) -> Vec<AgentDefinition> {
46    AgentDiscovery::discover(cwd, scope)
47        .unwrap_or_default()
48        .into_iter()
49        .map(|(_, def)| def)
50        .collect()
51}
52
53// ── Result Types ───────────────────────────────────────────────────────
54
55#[derive(Debug, Clone, Serialize, Deserialize, Default)]
56/// Usage statistics from a subagent run.
57pub struct UsageStats {
58    /// Input tokens consumed.
59    pub input_tokens: u64,
60    /// Output tokens consumed.
61    pub output_tokens: u64,
62    /// Cache read tokens (reserved for future use).
63    pub cache_read: u64,
64    /// Cache write tokens (reserved for future use).
65    pub cache_write: u64,
66    /// Cost in USD (reserved for future use).
67    pub cost: f64,
68    /// Number of agent turns.
69    pub turns: u32,
70}
71
72#[derive(Debug, Clone)]
73/// Result from a single subagent execution.
74pub struct SingleResult {
75    /// Agent name.
76    pub agent: String,
77    /// Discovery source ("user" or "project").
78    pub agent_source: String,
79    /// Task that was executed.
80    pub task: String,
81    /// Process exit code.
82    pub exit_code: i32,
83    /// Captured stdout text.
84    pub output: String,
85    /// Captured stderr text.
86    pub stderr: String,
87    /// Token usage.
88    pub usage: UsageStats,
89    /// Model used by the agent.
90    pub model: Option<String>,
91    /// Stop reason.
92    pub stop_reason: Option<String>,
93    /// Error message if the agent failed.
94    pub error_message: Option<String>,
95    /// Step index in chain mode.
96    pub step: Option<usize>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101/// Execution mode used for a subagent invocation.
102pub enum SubagentMode {
103    /// Single agent, single task.
104    Single,
105    /// Multiple agents/tasks running concurrently.
106    Parallel,
107    /// Sequential agents, passing {previous} output forward.
108    Chain,
109}
110
111#[derive(Debug, Clone)]
112/// Detailed results from a subagent invocation.
113pub struct SubagentDetails {
114    /// Which mode was used.
115    pub mode: SubagentMode,
116    /// Per-agent results.
117    pub results: Vec<SingleResult>,
118}
119
120// ── JSON line processing ───────────────────────────────────────────────
121
122fn process_json_line(
123    line: &str,
124    result: &mut SingleResult,
125    text: &mut String,
126    _on_progress: &Option<ProgressFn>,
127) {
128    let event: Value = match serde_json::from_str(line) {
129        Ok(v) => v,
130        Err(_) => return,
131    };
132    match event["type"].as_str().unwrap_or("") {
133        "text_delta" => {
134            if let Some(t) = event["text"].as_str() {
135                text.push_str(t);
136            }
137        }
138        "usage" => {
139            result.usage.input_tokens += event["input_tokens"].as_u64().unwrap_or(0);
140            result.usage.output_tokens += event["output_tokens"].as_u64().unwrap_or(0);
141            result.usage.turns += 1;
142        }
143        "complete" => {
144            result.stop_reason = Some("complete".to_string());
145        }
146        "error" => {
147            result.error_message = Some(
148                event["message"]
149                    .as_str()
150                    .unwrap_or("Unknown error")
151                    .to_string(),
152            );
153            result.stop_reason = Some("error".to_string());
154        }
155        _ => {}
156    }
157}
158
159// ── Process Execution ──────────────────────────────────────────────────
160
161/// Build command-line arguments for launching a subagent process.
162fn build_agent_args(agent: &AgentDefinition, tmp_dir: &Path, task: &str) -> Vec<String> {
163    let mut args = vec!["--mode".to_string(), "json".to_string(), "-p".to_string()];
164
165    if let Some(ref model) = agent.model {
166        args.push("--model".to_string());
167        args.push(model.clone());
168    }
169
170    if !agent.tools.is_empty() {
171        args.push("--tools".to_string());
172        args.push(agent.tools.join(","));
173    }
174
175    if let Some(ref prompt) = agent.system_prompt
176        && !prompt.is_empty()
177        && std::fs::write(tmp_dir.join("system_prompt.md"), prompt).is_ok()
178    {
179        args.push("--append-system-prompt".to_string());
180        args.push(
181            tmp_dir
182                .join("system_prompt.md")
183                .to_str()
184                .unwrap_or_default()
185                .to_string(),
186        );
187    }
188
189    args.push(format!("Task: {}", task));
190    args
191}
192
193/// Gracefully terminate a child process (SIGTERM → wait → SIGKILL).
194async fn terminate_child(
195    child: &mut tokio::process::Child,
196    stderr_handle: tokio::task::JoinHandle<String>,
197    result: &mut SingleResult,
198) {
199    #[cfg(unix)]
200    {
201        if let Some(pid) = child.id() {
202            // SAFETY: libc::kill sends SIGTERM to the child process. PID comes from
203            // child.id() which is a valid running process. Used for graceful shutdown
204            // before force-killing. Race (process exited) returns ESRCH harmlessly.
205            unsafe {
206                libc::kill(pid as i32, libc::SIGTERM);
207            }
208        }
209        let deadline = tokio::time::sleep(std::time::Duration::from_secs(5));
210        tokio::pin!(deadline);
211        tokio::select! {
212            _ = &mut deadline => { let _ = child.start_kill(); }
213            _ = child.wait() => {}
214        }
215    }
216    #[cfg(not(unix))]
217    {
218        let _ = child.start_kill();
219        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
220    }
221
222    // Collect stderr with short timeout
223    let _ = tokio::time::timeout(std::time::Duration::from_secs(1), async {
224        if let Ok(err) = stderr_handle.await {
225            result.stderr = err;
226        }
227    })
228    .await;
229}
230
231/// Run a single agent process with abort support.
232#[allow(clippy::too_many_arguments)]
233async fn run_single_agent(
234    cwd: &Path,
235    agents: &[AgentDefinition],
236    agent_name: &str,
237    task: &str,
238    agent_cwd: Option<&str>,
239    step: Option<usize>,
240    signal: Option<oneshot::Receiver<()>>,
241    on_progress: Option<ProgressFn>,
242    binary_path: &Path,
243) -> SingleResult {
244    let agent = match agents.iter().find(|a| a.name == agent_name) {
245        Some(a) => a,
246        None => {
247            let available = agents
248                .iter()
249                .map(|a| format!("\"{}\"", a.name))
250                .collect::<Vec<_>>()
251                .join(", ");
252            return SingleResult {
253                agent: agent_name.to_string(),
254                agent_source: "unknown".to_string(),
255                task: task.to_string(),
256                exit_code: 1,
257                output: String::new(),
258                stderr: format!(
259                    "Unknown agent: \"{}\". Available: {}",
260                    agent_name, available
261                ),
262                usage: UsageStats::default(),
263                model: None,
264                stop_reason: None,
265                error_message: Some(format!("Unknown agent: {}", agent_name)),
266                step,
267            };
268        }
269    };
270
271    let mut result = SingleResult {
272        agent: agent_name.to_string(),
273        agent_source: agent.source.clone(),
274        task: task.to_string(),
275        exit_code: 0,
276        output: String::new(),
277        stderr: String::new(),
278        usage: UsageStats::default(),
279        model: agent.model.clone(),
280        stop_reason: None,
281        error_message: None,
282        step,
283    };
284
285    // Notify progress
286    if let Some(ref cb) = on_progress {
287        cb(format!("[{}] running...", agent_name));
288    }
289
290    // Build command args
291    let tmp_dir = match create_system_prompt_temp_dir("oxi-subagent") {
292        Ok(tmp) => Some(tmp),
293        Err(e) => {
294            result.exit_code = 1;
295            result.stderr = e.clone();
296            result.error_message = Some(e);
297            return result;
298        }
299    };
300
301    let args = match tmp_dir {
302        Some(ref tmp) => build_agent_args(agent, tmp, task),
303        None => vec![
304            "--mode".to_string(),
305            "json".to_string(),
306            "-p".to_string(),
307            format!("Task: {}", task),
308        ],
309    };
310
311    let working_dir = agent_cwd
312        .map(PathBuf::from)
313        .unwrap_or_else(|| cwd.to_path_buf());
314
315    let mut cmd = tokio::process::Command::new(binary_path);
316    cmd.args(&args)
317        .current_dir(&working_dir)
318        .stdout(std::process::Stdio::piped())
319        .stderr(std::process::Stdio::piped())
320        .stdin(std::process::Stdio::null())
321        // Depth tracking for subagent nesting limits
322        .env(
323            "OXI_SUBAGENT_DEPTH",
324            (current_subagent_depth() + 1).to_string(),
325        )
326        .env(
327            "OXI_MAX_SUBAGENT_DEPTH",
328            agent.max_subagent_depth.to_string(),
329        );
330
331    let mut child = match cmd.spawn() {
332        Ok(c) => c,
333        Err(e) => {
334            result.exit_code = 1;
335            result.stderr = format!("Failed to spawn: {}", e);
336            result.error_message = Some(format!("Failed to spawn: {}", e));
337            return result;
338        }
339    };
340
341    let stdout = child.stdout.take().expect("stdout piped but missing");
342    let stderr = child.stderr.take().expect("stderr piped but missing");
343
344    // Spawn stdout reader → channel
345    let (line_tx, mut line_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
346    let _reader_handle = tokio::spawn(async move {
347        let reader = BufReader::new(stdout);
348        let mut lines = reader.lines();
349        while let Ok(Some(line)) = lines.next_line().await {
350            if line_tx.send(line).is_err() {
351                break;
352            }
353        }
354    });
355
356    // Spawn stderr reader
357    let stderr_handle = tokio::spawn(async move {
358        let mut err = String::new();
359        let reader = BufReader::new(stderr);
360        let mut lines = reader.lines();
361        while let Ok(Some(line)) = lines.next_line().await {
362            err.push_str(&line);
363            err.push('\n');
364        }
365        err
366    });
367
368    // Main loop: select between stdout lines and abort signal
369    let mut final_text = String::new();
370    let mut signal_rx = signal;
371    let mut aborted = false;
372
373    loop {
374        tokio::select! {
375            line = line_rx.recv() => {
376                match line {
377                    Some(line) => {
378                        process_json_line(&line, &mut result, &mut final_text, &on_progress);
379                    }
380                    None => break, // stdout EOF
381                }
382            }
383            _ = async {
384                match &mut signal_rx {
385                    Some(rx) => { let _ = rx.await; }
386                    None => std::future::pending::<()>().await,
387                }
388            } => {
389                aborted = true;
390                break;
391            }
392        }
393    }
394
395    if aborted {
396        result.stop_reason = Some("aborted".into());
397        result.error_message = Some("Aborted by user".into());
398        terminate_child(&mut child, stderr_handle, &mut result).await;
399    } else {
400        // Normal completion
401        if let Ok(err_output) = stderr_handle.await {
402            result.stderr = err_output;
403        }
404        match child.wait().await {
405            Ok(status) => result.exit_code = status.code().unwrap_or(1),
406            Err(_) => result.exit_code = 1,
407        }
408    }
409
410    result.output = final_text;
411
412    if let Some(ref cb) = on_progress {
413        let status = if result.exit_code == 0 {
414            "done"
415        } else {
416            "failed"
417        };
418        cb(format!("[{}] {}", agent_name, status));
419    }
420
421    result
422}
423
424/// Run multiple tasks with concurrency limit.
425async fn run_parallel(
426    cwd: &Path,
427    agents: &[AgentDefinition],
428    tasks: Vec<ParallelTask>,
429    binary_path: PathBuf,
430    on_progress: Option<ProgressFn>,
431) -> Vec<SingleResult> {
432    let n = tasks.len();
433    if n == 0 {
434        return vec![];
435    }
436
437    let limit = MAX_CONCURRENCY.min(n);
438    let indexed_tasks: Vec<(usize, ParallelTask)> = tasks.into_iter().enumerate().collect();
439    let mut all_results: Vec<Option<SingleResult>> = vec![None; n];
440
441    let mut i = 0;
442    while i < indexed_tasks.len() {
443        let end = (i + limit).min(indexed_tasks.len());
444        let chunk: Vec<_> = indexed_tasks[i..end].to_vec();
445        let mut handles = Vec::new();
446
447        for (idx, task) in chunk {
448            let agents = agents.to_vec();
449            let cwd = cwd.to_path_buf();
450            let bp = binary_path.clone();
451            let progress = on_progress.clone();
452
453            handles.push((
454                idx,
455                tokio::spawn(async move {
456                    run_single_agent(
457                        &cwd,
458                        &agents,
459                        &task.agent,
460                        &task.task,
461                        task.cwd.as_deref(),
462                        None,
463                        None,
464                        progress,
465                        &bp,
466                    )
467                    .await
468                }),
469            ));
470        }
471
472        for (idx, handle) in handles {
473            if let Ok(r) = handle.await {
474                all_results[idx] = Some(r);
475            }
476        }
477
478        i = end;
479    }
480
481    all_results
482        .into_iter()
483        .map(|r| {
484            r.unwrap_or_else(|| SingleResult {
485                agent: "unknown".to_string(),
486                agent_source: "unknown".to_string(),
487                task: "unknown".to_string(),
488                exit_code: 1,
489                output: String::new(),
490                stderr: "Task did not complete".to_string(),
491                usage: UsageStats::default(),
492                model: None,
493                stop_reason: Some("error".to_string()),
494                error_message: Some("Task did not complete".to_string()),
495                step: None,
496            })
497        })
498        .collect()
499}
500
501// ── Parameter Types ────────────────────────────────────────────────────
502
503#[derive(Debug, Deserialize, Clone)]
504struct ParallelTask {
505    agent: String,
506    task: String,
507    #[serde(default)]
508    cwd: Option<String>,
509}
510
511#[derive(Debug, Deserialize)]
512struct ChainStep {
513    agent: String,
514    task: String,
515    #[serde(default)]
516    cwd: Option<String>,
517}
518
519// ── Tool Implementation ────────────────────────────────────────────────
520
521/// Subagent tool for delegating tasks to specialized agents.
522pub struct SubagentTool {
523    /// Explicit working directory override. If None, uses ToolContext.root() at runtime.
524    cwd: Option<PathBuf>,
525    binary_path: Option<PathBuf>,
526    progress_callback: parking_lot::Mutex<Option<ProgressFn>>,
527}
528
529impl Default for SubagentTool {
530    fn default() -> Self {
531        Self::new()
532    }
533}
534
535impl SubagentTool {
536    /// Create with no explicit root (uses ToolContext.root() at runtime).
537    pub fn new() -> Self {
538        Self {
539            cwd: None,
540            binary_path: None,
541            progress_callback: parking_lot::Mutex::new(None),
542        }
543    }
544
545    /// Create with an explicit working directory (overrides ToolContext).
546    pub fn with_cwd(cwd: impl Into<PathBuf>) -> Self {
547        Self {
548            cwd: Some(cwd.into()),
549            binary_path: None,
550            progress_callback: parking_lot::Mutex::new(None),
551        }
552    }
553
554    fn get_binary(&self) -> PathBuf {
555        self.binary_path
556            .clone()
557            .or_else(|| std::env::current_exe().ok())
558            .unwrap_or_else(|| PathBuf::from("oxi"))
559    }
560}
561
562#[async_trait]
563impl AgentTool for SubagentTool {
564    fn name(&self) -> &str {
565        "subagent"
566    }
567
568    fn label(&self) -> &str {
569        "Subagent"
570    }
571
572    fn description(&self) -> &str {
573        "Delegate tasks to specialized subagents with isolated context. \
574         Modes: single (agent + task), parallel (tasks array), chain (sequential with {previous} placeholder). \
575         Agents are discovered from ~/.oxi/agents/ (user) and .oxi/agents/ (project)."
576    }
577
578    fn parameters_schema(&self) -> Value {
579        json!({
580            "type": "object",
581            "properties": {
582                "agent": {
583                    "type": "string",
584                    "description": "Agent name for single mode"
585                },
586                "task": {
587                    "type": "string",
588                    "description": "Task to delegate (single mode)"
589                },
590                "tasks": {
591                    "type": "array",
592                    "description": "Array of {agent, task} for parallel execution (max 8)",
593                    "items": {
594                        "type": "object",
595                        "properties": {
596                            "agent": { "type": "string" },
597                            "task": { "type": "string" },
598                            "cwd": { "type": "string" }
599                        },
600                        "required": ["agent", "task"]
601                    }
602                },
603                "chain": {
604                    "type": "array",
605                    "description": "Array of {agent, task} for sequential execution. Use {previous} in task for prior output.",
606                    "items": {
607                        "type": "object",
608                        "properties": {
609                            "agent": { "type": "string" },
610                            "task": { "type": "string" },
611                            "cwd": { "type": "string" }
612                        },
613                        "required": ["agent", "task"]
614                    }
615                },
616                "agentScope": {
617                    "type": "string",
618                    "description": "Agent discovery scope: 'user' (default), 'project', or 'both'",
619                    "enum": ["user", "project", "both"],
620                    "default": "user"
621                },
622                "cwd": {
623                    "type": "string",
624                    "description": "Working directory for single mode"
625                }
626            }
627        })
628    }
629
630    fn on_progress(&self, callback: ProgressCallback) {
631        *self.progress_callback.lock() = Some(callback);
632    }
633
634    async fn execute(
635        &self,
636        _tool_call_id: &str,
637        params: Value,
638        signal: Option<oneshot::Receiver<()>>,
639        ctx: &ToolContext,
640    ) -> Result<AgentToolResult, ToolError> {
641        // ── Depth check ──
642        let depth = current_subagent_depth();
643        let max = max_subagent_depth();
644        if depth >= max {
645            return Ok(AgentToolResult::error(format!(
646                "Subagent depth limit reached ({}/{}). \
647                 Increase max_subagent_depth in your agent definition.",
648                depth, max
649            )));
650        }
651
652        // Use explicit cwd if set, else ctx.root()
653        let effective_cwd = self.cwd.as_deref().unwrap_or(ctx.root());
654
655        let scope: AgentScope = params
656            .get("agentScope")
657            .and_then(|v| serde_json::from_value(v.clone()).ok())
658            .unwrap_or(AgentScope::User);
659
660        let agents = discover_agents(effective_cwd, scope);
661        let binary = self.get_binary();
662        let progress = self.progress_callback.lock().clone();
663
664        let has_chain = params["chain"]
665            .as_array()
666            .map(|a| !a.is_empty())
667            .unwrap_or(false);
668        let has_tasks = params["tasks"]
669            .as_array()
670            .map(|a| !a.is_empty())
671            .unwrap_or(false);
672        let has_single = params["agent"].is_string() && params["task"].is_string();
673
674        let mode_count = [has_chain, has_tasks, has_single]
675            .iter()
676            .filter(|&&x| x)
677            .count();
678
679        if mode_count != 1 {
680            let available = agents
681                .iter()
682                .map(|a| format!("{} ({})", a.name, a.source))
683                .collect::<Vec<_>>()
684                .join(", ");
685            return Ok(AgentToolResult::error(format!(
686                "Provide exactly one mode: agent+task, tasks, or chain.\nAvailable agents: {}",
687                if available.is_empty() {
688                    "none".to_string()
689                } else {
690                    available
691                }
692            )));
693        }
694
695        // ── Chain mode ──
696        if has_chain {
697            return execute_chain_mode(effective_cwd, &agents, params, &binary, progress, signal)
698                .await;
699        }
700
701        // ── Parallel mode ──
702        if has_tasks {
703            return execute_parallel_mode(effective_cwd, &agents, params, &binary, progress).await;
704        }
705
706        // ── Single mode ──
707        if has_single {
708            return execute_single_mode(effective_cwd, &agents, params, &binary, progress, signal)
709                .await;
710        }
711
712        Ok(AgentToolResult::error("Invalid parameters".to_string()))
713    }
714}
715
716/// Execute chain mode: sequential agents where each step can reference {previous} output.
717async fn execute_chain_mode(
718    cwd: &Path,
719    agents: &[AgentDefinition],
720    params: Value,
721    binary: &Path,
722    progress: Option<ProgressFn>,
723    signal: Option<oneshot::Receiver<()>>,
724) -> Result<AgentToolResult, ToolError> {
725    let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
726        .map_err(|e| format!("Invalid chain parameter: {}", e))?;
727    let total = steps.len();
728    let mut results = Vec::new();
729    let mut previous_output = String::new();
730    let mut abort_signal = signal;
731
732    for (i, step) in steps.into_iter().enumerate() {
733        let task = step.task.replace("{previous}", &previous_output);
734        let step_signal = if i == total - 1 {
735            abort_signal.take()
736        } else {
737            None
738        };
739
740        let result = run_single_agent(
741            cwd,
742            agents,
743            &step.agent,
744            &task,
745            step.cwd.as_deref(),
746            Some(i + 1),
747            step_signal,
748            progress.clone(),
749            binary,
750        )
751        .await;
752
753        let is_error = result.exit_code != 0
754            || result.stop_reason.as_deref() == Some("error")
755            || result.stop_reason.as_deref() == Some("aborted");
756
757        if is_error {
758            let agent_name = result.agent.clone();
759            let error_msg = result
760                .error_message
761                .clone()
762                .unwrap_or_else(|| result.stderr.clone());
763            results.push(result);
764            return Ok(AgentToolResult::error(format!(
765                "Chain stopped at step {}/{} ({}): {}",
766                i + 1,
767                total,
768                agent_name,
769                error_msg
770            )));
771        }
772
773        previous_output = result.output.clone();
774        results.push(result);
775    }
776
777    let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
778    Ok(AgentToolResult::success(if output.is_empty() {
779        "(no output)".to_string()
780    } else {
781        output
782    })
783    .with_metadata(json!({
784        "mode": "chain",
785        "steps": results.len(),
786    })))
787}
788
789/// Execute parallel mode: multiple agents running concurrently.
790async fn execute_parallel_mode(
791    cwd: &Path,
792    agents: &[AgentDefinition],
793    params: Value,
794    binary: &Path,
795    progress: Option<ProgressFn>,
796) -> Result<AgentToolResult, ToolError> {
797    let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
798        .map_err(|e| format!("Invalid tasks parameter: {}", e))?;
799
800    if tasks.len() > MAX_PARALLEL_TASKS {
801        return Ok(AgentToolResult::error(format!(
802            "Too many parallel tasks ({}). Max is {}.",
803            tasks.len(),
804            MAX_PARALLEL_TASKS
805        )));
806    }
807
808    let results = run_parallel(cwd, agents, tasks, binary.to_path_buf(), progress).await;
809
810    let success_count = results.iter().filter(|r| r.exit_code == 0).count();
811    let summaries: Vec<String> = results
812        .iter()
813        .map(|r| {
814            let _preview = truncate_output(&r.output, 100);
815            format!(
816                "[{}]: {}",
817                r.agent,
818                if r.exit_code == 0 {
819                    "completed"
820                } else {
821                    "failed"
822                },
823            )
824        })
825        .collect();
826
827    Ok(AgentToolResult::success(format!(
828        "Parallel: {}/{} succeeded\n\n{}",
829        success_count,
830        results.len(),
831        summaries.join("\n\n")
832    ))
833    .with_metadata(json!({
834        "mode": "parallel",
835        "results": results.iter().map(|r| json!({
836            "agent": r.agent,
837            "exit_code": r.exit_code,
838        })).collect::<Vec<_>>()
839    })))
840}
841
842/// Execute single mode: one agent, one task.
843async fn execute_single_mode(
844    cwd: &Path,
845    agents: &[AgentDefinition],
846    params: Value,
847    binary: &Path,
848    progress: Option<ProgressFn>,
849    signal: Option<oneshot::Receiver<()>>,
850) -> Result<AgentToolResult, ToolError> {
851    let agent_name = params["agent"]
852        .as_str()
853        .ok_or("Missing required parameter: agent")?;
854    let task = params["task"]
855        .as_str()
856        .ok_or("Missing required parameter: task")?;
857    let agent_cwd = params["cwd"].as_str();
858
859    let result = run_single_agent(
860        cwd, agents, agent_name, task, agent_cwd, None, signal, progress, binary,
861    )
862    .await;
863
864    let is_error = result.exit_code != 0
865        || result.stop_reason.as_deref() == Some("error")
866        || result.stop_reason.as_deref() == Some("aborted");
867
868    if is_error {
869        let error_msg = result.error_message.as_deref().unwrap_or(&result.stderr);
870        return Ok(AgentToolResult::error(format!(
871            "Agent {}: {}",
872            result.stop_reason.as_deref().unwrap_or("failed"),
873            error_msg
874        )));
875    }
876
877    Ok(AgentToolResult::success(if result.output.is_empty() {
878        "(no output)".to_string()
879    } else {
880        result.output.clone()
881    })
882    .with_metadata(json!({
883        "mode": "single",
884        "agent": result.agent,
885        "source": result.agent_source,
886        "usage": {
887            "input_tokens": result.usage.input_tokens,
888            "output_tokens": result.usage.output_tokens,
889            "turns": result.usage.turns,
890        },
891    })))
892}
893
894// ── Helpers ────────────────────────────────────────────────────────────
895
896fn truncate_output(text: &str, max_chars: usize) -> String {
897    if text.len() <= max_chars {
898        text.to_string()
899    } else {
900        format!("{}...", &text[..max_chars])
901    }
902}
903
904// ── Tests ──────────────────────────────────────────────────────────────
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909
910    #[test]
911    fn test_discover_agents_empty_dir() {
912        let tmp = tempfile::tempdir().unwrap();
913        let agents = discover_agents(tmp.path(), AgentScope::Project);
914        assert!(agents.is_empty());
915    }
916
917    #[test]
918    fn test_discover_agents_with_flat_files() {
919        let tmp = tempfile::tempdir().unwrap();
920        let agents_dir = tmp.path().join(".oxi").join("agents");
921        std::fs::create_dir_all(&agents_dir).unwrap();
922        std::fs::write(
923            agents_dir.join("scout.md"),
924            "---\nname: scout\ndescription: Recon\n---\nBe a scout.",
925        )
926        .unwrap();
927        std::fs::write(
928            agents_dir.join("worker.md"),
929            "---\nname: worker\n---\nBe a worker.",
930        )
931        .unwrap();
932        std::fs::write(agents_dir.join("ignore.txt"), "ignore me").unwrap();
933        let agents = discover_agents(tmp.path(), AgentScope::Project);
934        assert_eq!(agents.len(), 2);
935        assert!(agents.iter().any(|a| a.name == "scout"));
936        assert!(agents.iter().any(|a| a.name == "worker"));
937    }
938
939    #[test]
940    fn test_schema_structure() {
941        let tool = SubagentTool::new();
942        let schema = tool.parameters_schema();
943        assert_eq!(schema["type"], "object");
944        assert!(schema["properties"]["agent"].is_object());
945        assert!(schema["properties"]["tasks"].is_object());
946        assert!(schema["properties"]["chain"].is_object());
947        assert!(schema["properties"]["agentScope"].is_object());
948    }
949
950    #[test]
951    fn test_truncate_output() {
952        assert_eq!(truncate_output("hello", 10), "hello");
953        assert_eq!(truncate_output("hello world foo", 5), "hello...");
954    }
955
956    #[test]
957    fn test_process_json_line_text_delta() {
958        let mut result = SingleResult {
959            agent: "test".into(),
960            agent_source: "user".into(),
961            task: "t".into(),
962            exit_code: 0,
963            output: String::new(),
964            stderr: String::new(),
965            usage: UsageStats::default(),
966            model: None,
967            stop_reason: None,
968            error_message: None,
969            step: None,
970        };
971        let mut text = String::new();
972        process_json_line(
973            r#"{"type":"text_delta","text":"hello"}"#,
974            &mut result,
975            &mut text,
976            &None,
977        );
978        assert_eq!(text, "hello");
979    }
980
981    #[test]
982    fn test_process_json_line_usage() {
983        let mut result = SingleResult {
984            agent: "test".into(),
985            agent_source: "user".into(),
986            task: "t".into(),
987            exit_code: 0,
988            output: String::new(),
989            stderr: String::new(),
990            usage: UsageStats::default(),
991            model: None,
992            stop_reason: None,
993            error_message: None,
994            step: None,
995        };
996        let mut text = String::new();
997        process_json_line(
998            r#"{"type":"usage","input_tokens":100,"output_tokens":50}"#,
999            &mut result,
1000            &mut text,
1001            &None,
1002        );
1003        assert_eq!(result.usage.input_tokens, 100);
1004        assert_eq!(result.usage.output_tokens, 50);
1005        assert_eq!(result.usage.turns, 1);
1006    }
1007
1008    #[test]
1009    fn test_depth_limit_default() {
1010        unsafe {
1011            std::env::remove_var("OXI_SUBAGENT_DEPTH");
1012            std::env::remove_var("OXI_MAX_SUBAGENT_DEPTH");
1013        }
1014        assert_eq!(current_subagent_depth(), 0);
1015        assert_eq!(max_subagent_depth(), 3);
1016    }
1017}