1use super::{AgentTool, AgentToolResult, ProgressCallback, ToolContext, ToolError};
15use crate::agent_definition::{
16 AgentDefinition, AgentDiscovery, AgentScope, current_subagent_depth, max_subagent_depth,
17};
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use serde_json::{Value, json};
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use tokio::io::{AsyncBufReadExt, BufReader};
24use tokio::sync::oneshot;
25
26const MAX_PARALLEL_TASKS: usize = 8;
29const MAX_CONCURRENCY: usize = 4;
30
31type ProgressFn = ProgressCallback;
34
35fn create_system_prompt_temp_dir(prefix: &str) -> Result<PathBuf, String> {
38 let path = std::env::temp_dir().join(format!("{}-{}", prefix, uuid::Uuid::new_v4()));
39 std::fs::create_dir_all(&path).map_err(|e| format!("Failed to create temp dir: {}", e))?;
40 Ok(path)
41}
42
43pub fn discover_agents(cwd: &Path, scope: AgentScope) -> Vec<AgentDefinition> {
47 AgentDiscovery::discover(cwd, scope)
48 .unwrap_or_default()
49 .into_iter()
50 .map(|(_, def)| def)
51 .collect()
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, Default)]
57pub struct UsageStats {
59 pub input_tokens: u64,
61 pub output_tokens: u64,
63 pub cache_read: u64,
65 pub cache_write: u64,
67 pub cost: f64,
69 pub turns: u32,
71}
72
73#[derive(Debug, Clone)]
74pub struct SingleResult {
76 pub agent: String,
78 pub agent_source: String,
80 pub task: String,
82 pub exit_code: i32,
84 pub output: String,
86 pub stderr: String,
88 pub usage: UsageStats,
90 pub model: Option<String>,
92 pub stop_reason: Option<String>,
94 pub error_message: Option<String>,
96 pub step: Option<usize>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub enum SubagentMode {
104 Single,
106 Parallel,
108 Chain,
110}
111
112#[derive(Debug, Clone)]
113pub struct SubagentDetails {
115 pub mode: SubagentMode,
117 pub results: Vec<SingleResult>,
119}
120
121fn process_json_line(
124 line: &str,
125 result: &mut SingleResult,
126 text: &mut String,
127 _on_progress: &Option<ProgressFn>,
128) {
129 let event: Value = match serde_json::from_str(line) {
130 Ok(v) => v,
131 Err(_) => return,
132 };
133 match event["type"].as_str().unwrap_or("") {
134 "text_delta" => {
135 if let Some(t) = event["text"].as_str() {
136 text.push_str(t);
137 }
138 }
139 "usage" => {
140 result.usage.input_tokens += event["input_tokens"].as_u64().unwrap_or(0);
141 result.usage.output_tokens += event["output_tokens"].as_u64().unwrap_or(0);
142 result.usage.turns += 1;
143 }
144 "complete" => {
145 result.stop_reason = Some("complete".to_string());
146 }
147 "error" => {
148 result.error_message = Some(
149 event["message"]
150 .as_str()
151 .unwrap_or("Unknown error")
152 .to_string(),
153 );
154 result.stop_reason = Some("error".to_string());
155 }
156 _ => {}
157 }
158}
159
160fn build_agent_args(agent: &AgentDefinition, tmp_dir: &Path, task: &str) -> Vec<String> {
164 let mut args = vec!["--mode".to_string(), "json".to_string(), "-p".to_string()];
165
166 if let Some(ref model) = agent.model {
167 args.push("--model".to_string());
168 args.push(model.clone());
169 }
170
171 if !agent.tools.is_empty() {
172 args.push("--tools".to_string());
173 args.push(agent.tools.join(","));
174 }
175
176 if let Some(ref prompt) = agent.system_prompt
177 && !prompt.is_empty()
178 && std::fs::write(tmp_dir.join("system_prompt.md"), prompt).is_ok()
179 {
180 args.push("--append-system-prompt".to_string());
181 args.push(
182 tmp_dir
183 .join("system_prompt.md")
184 .to_str()
185 .unwrap_or_default()
186 .to_string(),
187 );
188 }
189
190 args.push(format!("Task: {}", task));
191 args
192}
193
194async fn terminate_child(
196 child: &mut tokio::process::Child,
197 stderr_handle: tokio::task::JoinHandle<String>,
198 result: &mut SingleResult,
199) {
200 #[cfg(unix)]
201 {
202 if let Some(pid) = child.id() {
203 unsafe {
207 libc::kill(pid as i32, libc::SIGTERM);
208 }
209 }
210 let deadline = tokio::time::sleep(std::time::Duration::from_secs(5));
211 tokio::pin!(deadline);
212 tokio::select! {
213 _ = &mut deadline => { let _ = child.start_kill(); }
214 _ = child.wait() => {}
215 }
216 }
217 #[cfg(not(unix))]
218 {
219 let _ = child.start_kill();
220 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
221 }
222
223 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), async {
225 if let Ok(err) = stderr_handle.await {
226 result.stderr = err;
227 }
228 })
229 .await;
230}
231
232#[allow(clippy::too_many_arguments)]
234async fn run_single_agent(
235 cwd: &Path,
236 agents: &[AgentDefinition],
237 agent_name: &str,
238 task: &str,
239 agent_cwd: Option<&str>,
240 step: Option<usize>,
241 signal: Option<oneshot::Receiver<()>>,
242 on_progress: Option<ProgressFn>,
243 binary_path: &Path,
244) -> SingleResult {
245 let agent = match agents.iter().find(|a| a.name == agent_name) {
246 Some(a) => a,
247 None => {
248 let available = agents
249 .iter()
250 .map(|a| format!("\"{}\"", a.name))
251 .collect::<Vec<_>>()
252 .join(", ");
253 return SingleResult {
254 agent: agent_name.to_string(),
255 agent_source: "unknown".to_string(),
256 task: task.to_string(),
257 exit_code: 1,
258 output: String::new(),
259 stderr: format!(
260 "Unknown agent: \"{}\". Available: {}",
261 agent_name, available
262 ),
263 usage: UsageStats::default(),
264 model: None,
265 stop_reason: None,
266 error_message: Some(format!("Unknown agent: {}", agent_name)),
267 step,
268 };
269 }
270 };
271
272 let mut result = SingleResult {
273 agent: agent_name.to_string(),
274 agent_source: agent.source.clone(),
275 task: task.to_string(),
276 exit_code: 0,
277 output: String::new(),
278 stderr: String::new(),
279 usage: UsageStats::default(),
280 model: agent.model.clone(),
281 stop_reason: None,
282 error_message: None,
283 step,
284 };
285
286 if let Some(ref cb) = on_progress {
288 cb(format!("[{}] running...", agent_name));
289 }
290
291 let tmp_dir = match create_system_prompt_temp_dir("oxi-subagent") {
293 Ok(tmp) => Some(tmp),
294 Err(e) => {
295 result.exit_code = 1;
296 result.stderr = e.clone();
297 result.error_message = Some(e);
298 return result;
299 }
300 };
301
302 let args = match tmp_dir {
303 Some(ref tmp) => build_agent_args(agent, tmp, task),
304 None => vec![
305 "--mode".to_string(),
306 "json".to_string(),
307 "-p".to_string(),
308 format!("Task: {}", task),
309 ],
310 };
311
312 let working_dir = agent_cwd
313 .map(PathBuf::from)
314 .unwrap_or_else(|| cwd.to_path_buf());
315
316 let mut cmd = tokio::process::Command::new(binary_path);
317 cmd.args(&args)
318 .current_dir(&working_dir)
319 .stdout(std::process::Stdio::piped())
320 .stderr(std::process::Stdio::piped())
321 .stdin(std::process::Stdio::null())
322 .env(
324 "OXI_SUBAGENT_DEPTH",
325 (current_subagent_depth() + 1).to_string(),
326 )
327 .env(
328 "OXI_MAX_SUBAGENT_DEPTH",
329 agent.max_subagent_depth.to_string(),
330 );
331
332 let mut child = match cmd.spawn() {
333 Ok(c) => c,
334 Err(e) => {
335 result.exit_code = 1;
336 result.stderr = format!("Failed to spawn: {}", e);
337 result.error_message = Some(format!("Failed to spawn: {}", e));
338 return result;
339 }
340 };
341
342 let stdout = child.stdout.take().expect("stdout piped but missing");
343 let stderr = child.stderr.take().expect("stderr piped but missing");
344
345 let (line_tx, mut line_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
347 let _reader_handle = tokio::spawn(async move {
348 let reader = BufReader::new(stdout);
349 let mut lines = reader.lines();
350 while let Ok(Some(line)) = lines.next_line().await {
351 if line_tx.send(line).is_err() {
352 break;
353 }
354 }
355 });
356
357 let stderr_handle = tokio::spawn(async move {
359 let mut err = String::new();
360 let reader = BufReader::new(stderr);
361 let mut lines = reader.lines();
362 while let Ok(Some(line)) = lines.next_line().await {
363 err.push_str(&line);
364 err.push('\n');
365 }
366 err
367 });
368
369 let mut final_text = String::new();
371 let mut signal_rx = signal;
372 let mut aborted = false;
373
374 loop {
375 tokio::select! {
376 line = line_rx.recv() => {
377 match line {
378 Some(line) => {
379 process_json_line(&line, &mut result, &mut final_text, &on_progress);
380 }
381 None => break, }
383 }
384 _ = async {
385 match &mut signal_rx {
386 Some(rx) => { let _ = rx.await; }
387 None => std::future::pending::<()>().await,
388 }
389 } => {
390 aborted = true;
391 break;
392 }
393 }
394 }
395
396 if aborted {
397 result.stop_reason = Some("aborted".into());
398 result.error_message = Some("Aborted by user".into());
399 terminate_child(&mut child, stderr_handle, &mut result).await;
400 } else {
401 if let Ok(err_output) = stderr_handle.await {
403 result.stderr = err_output;
404 }
405 match child.wait().await {
406 Ok(status) => result.exit_code = status.code().unwrap_or(1),
407 Err(_) => result.exit_code = 1,
408 }
409 }
410
411 result.output = final_text;
412
413 if let Some(ref cb) = on_progress {
414 let status = if result.exit_code == 0 {
415 "done"
416 } else {
417 "failed"
418 };
419 cb(format!("[{}] {}", agent_name, status));
420 }
421
422 result
423}
424
425async fn run_parallel(
427 cwd: &Path,
428 agents: &[AgentDefinition],
429 tasks: Vec<ParallelTask>,
430 binary_path: PathBuf,
431 on_progress: Option<ProgressFn>,
432) -> Vec<SingleResult> {
433 let n = tasks.len();
434 if n == 0 {
435 return vec![];
436 }
437
438 let limit = MAX_CONCURRENCY.min(n);
439 let indexed_tasks: Vec<(usize, ParallelTask)> = tasks.into_iter().enumerate().collect();
440 let mut all_results: Vec<Option<SingleResult>> = vec![None; n];
441
442 let mut i = 0;
443 while i < indexed_tasks.len() {
444 let end = (i + limit).min(indexed_tasks.len());
445 let chunk: Vec<_> = indexed_tasks[i..end].to_vec();
446 let mut handles = Vec::new();
447
448 for (idx, task) in chunk {
449 let agents = agents.to_vec();
450 let cwd = cwd.to_path_buf();
451 let bp = binary_path.clone();
452 let progress = on_progress.clone();
453
454 handles.push((
455 idx,
456 tokio::spawn(async move {
457 run_single_agent(
458 &cwd,
459 &agents,
460 &task.agent,
461 &task.task,
462 task.cwd.as_deref(),
463 None,
464 None,
465 progress,
466 &bp,
467 )
468 .await
469 }),
470 ));
471 }
472
473 for (idx, handle) in handles {
474 if let Ok(r) = handle.await {
475 all_results[idx] = Some(r);
476 }
477 }
478
479 i = end;
480 }
481
482 all_results
483 .into_iter()
484 .map(|r| {
485 r.unwrap_or_else(|| SingleResult {
486 agent: "unknown".to_string(),
487 agent_source: "unknown".to_string(),
488 task: "unknown".to_string(),
489 exit_code: 1,
490 output: String::new(),
491 stderr: "Task did not complete".to_string(),
492 usage: UsageStats::default(),
493 model: None,
494 stop_reason: Some("error".to_string()),
495 error_message: Some("Task did not complete".to_string()),
496 step: None,
497 })
498 })
499 .collect()
500}
501
502#[derive(Debug, Deserialize, Clone)]
505struct ParallelTask {
506 agent: String,
507 task: String,
508 #[serde(default)]
509 cwd: Option<String>,
510}
511
512#[derive(Debug, Deserialize)]
513struct ChainStep {
514 agent: String,
515 task: String,
516 #[serde(default)]
517 cwd: Option<String>,
518}
519
520pub struct SubagentTool {
524 cwd: Option<PathBuf>,
526 binary_path: Option<PathBuf>,
527 progress_callback: parking_lot::Mutex<Option<ProgressFn>>,
528}
529
530impl Default for SubagentTool {
531 fn default() -> Self {
532 Self::new()
533 }
534}
535
536impl SubagentTool {
537 pub fn new() -> Self {
539 Self {
540 cwd: None,
541 binary_path: None,
542 progress_callback: parking_lot::Mutex::new(None),
543 }
544 }
545
546 pub fn with_cwd(cwd: impl Into<PathBuf>) -> Self {
548 Self {
549 cwd: Some(cwd.into()),
550 binary_path: None,
551 progress_callback: parking_lot::Mutex::new(None),
552 }
553 }
554
555 fn get_binary(&self) -> PathBuf {
556 self.binary_path
557 .clone()
558 .or_else(|| std::env::current_exe().ok())
559 .unwrap_or_else(|| PathBuf::from("oxi"))
560 }
561}
562
563#[async_trait]
564impl AgentTool for SubagentTool {
565 fn name(&self) -> &str {
566 "subagent"
567 }
568
569 fn label(&self) -> &str {
570 "Subagent"
571 }
572
573 fn description(&self) -> &str {
574 "Delegate tasks to specialized subagents with isolated context. \
575 Modes: single (agent + task), parallel (tasks array), chain (sequential with {previous} placeholder). \
576 Agents are discovered from ~/.oxi/agents/ (user) and .oxi/agents/ (project)."
577 }
578
579 fn parameters_schema(&self) -> Value {
580 json!({
581 "type": "object",
582 "properties": {
583 "agent": {
584 "type": "string",
585 "description": "Agent name for single mode"
586 },
587 "task": {
588 "type": "string",
589 "description": "Task to delegate (single mode)"
590 },
591 "tasks": {
592 "type": "array",
593 "description": "Array of {agent, task} for parallel execution (max 8)",
594 "items": {
595 "type": "object",
596 "properties": {
597 "agent": { "type": "string" },
598 "task": { "type": "string" },
599 "cwd": { "type": "string" }
600 },
601 "required": ["agent", "task"]
602 }
603 },
604 "chain": {
605 "type": "array",
606 "description": "Array of {agent, task} for sequential execution. Use {previous} in task for prior output.",
607 "items": {
608 "type": "object",
609 "properties": {
610 "agent": { "type": "string" },
611 "task": { "type": "string" },
612 "cwd": { "type": "string" }
613 },
614 "required": ["agent", "task"]
615 }
616 },
617 "agentScope": {
618 "type": "string",
619 "description": "Agent discovery scope: 'user' (default), 'project', or 'both'",
620 "enum": ["user", "project", "both"],
621 "default": "user"
622 },
623 "cwd": {
624 "type": "string",
625 "description": "Working directory for single mode"
626 }
627 }
628 })
629 }
630
631 fn on_progress(&self, callback: ProgressCallback) {
632 *self.progress_callback.lock() = Some(callback);
633 }
634
635 async fn execute(
636 &self,
637 _tool_call_id: &str,
638 params: Value,
639 signal: Option<oneshot::Receiver<()>>,
640 ctx: &ToolContext,
641 ) -> Result<AgentToolResult, ToolError> {
642 let runner = ctx.subagent_runner.clone();
649 let depth = if runner.is_some() {
650 ctx.subagent_depth
651 } else {
652 current_subagent_depth()
653 };
654 let max = if runner.is_some() {
655 3 } else {
657 max_subagent_depth()
658 };
659 if depth >= max {
660 return Ok(AgentToolResult::error(format!(
661 "Subagent depth limit reached ({}/{}). \
662 Increase max_subagent_depth in your agent definition.",
663 depth, max
664 )));
665 }
666
667 let effective_cwd = self.cwd.as_deref().unwrap_or(ctx.root());
669
670 let scope: AgentScope = params
671 .get("agentScope")
672 .and_then(|v| serde_json::from_value(v.clone()).ok())
673 .unwrap_or(AgentScope::User);
674
675 let agents = discover_agents(effective_cwd, scope);
676 let progress = self.progress_callback.lock().clone();
677
678 let has_chain = params["chain"]
679 .as_array()
680 .map(|a| !a.is_empty())
681 .unwrap_or(false);
682 let has_tasks = params["tasks"]
683 .as_array()
684 .map(|a| !a.is_empty())
685 .unwrap_or(false);
686 let has_single = params["agent"].is_string() && params["task"].is_string();
687
688 let mode_count = [has_chain, has_tasks, has_single]
689 .iter()
690 .filter(|&&x| x)
691 .count();
692
693 if mode_count != 1 {
694 let available = agents
695 .iter()
696 .map(|a| format!("{} ({})", a.name, a.source))
697 .collect::<Vec<_>>()
698 .join(", ");
699 return Ok(AgentToolResult::error(format!(
700 "Provide exactly one mode: agent+task, tasks, or chain.\nAvailable agents: {}",
701 if available.is_empty() {
702 "none".to_string()
703 } else {
704 available
705 }
706 )));
707 }
708
709 if let Some(runner) = &runner {
715 return execute_in_process(
716 effective_cwd,
717 &agents,
718 params,
719 runner,
720 depth,
721 progress,
722 signal,
723 )
724 .await;
725 }
726
727 let binary = self.get_binary();
729
730 if has_chain {
732 return execute_chain_mode(effective_cwd, &agents, params, &binary, progress, signal)
733 .await;
734 }
735
736 if has_tasks {
738 return execute_parallel_mode(effective_cwd, &agents, params, &binary, progress).await;
739 }
740
741 if has_single {
743 return execute_single_mode(effective_cwd, &agents, params, &binary, progress, signal)
744 .await;
745 }
746
747 Ok(AgentToolResult::error("Invalid parameters".to_string()))
748 }
749}
750
751fn fork_to_single(
762 fork: super::ForkResult,
763 agent_name: &str,
764 task: &str,
765 step: Option<usize>,
766) -> SingleResult {
767 let error = fork.error.clone();
768 SingleResult {
769 agent: agent_name.to_string(),
770 agent_source: "in-process".to_string(),
771 task: task.to_string(),
772 exit_code: if error.is_some() { 1 } else { 0 },
773 output: fork.text,
774 stderr: String::new(),
775 usage: UsageStats {
776 input_tokens: fork.input_tokens as u64,
777 output_tokens: fork.output_tokens as u64,
778 turns: fork.turns,
779 ..Default::default()
780 },
781 model: fork.model,
782 stop_reason: if error.is_some() {
783 Some("error".to_string())
784 } else {
785 Some("complete".to_string())
786 },
787 error_message: error,
788 step,
789 }
790}
791
792#[allow(clippy::too_many_arguments)]
794async fn execute_in_process(
795 cwd: &Path,
796 agents: &[AgentDefinition],
797 params: Value,
798 runner: &Arc<dyn super::SubagentRunner>,
799 depth: u8,
800 progress: Option<ProgressFn>,
801 _signal: Option<oneshot::Receiver<()>>,
802) -> Result<AgentToolResult, ToolError> {
803 let has_chain = params["chain"]
804 .as_array()
805 .map(|a| !a.is_empty())
806 .unwrap_or(false);
807 let has_tasks = params["tasks"]
808 .as_array()
809 .map(|a| !a.is_empty())
810 .unwrap_or(false);
811 let has_single = params["agent"].is_string() && params["task"].is_string();
812
813 if has_single {
815 let agent_name = params["agent"].as_str().unwrap_or("");
816 let task = params["task"].as_str().unwrap_or("");
817 let agent_def = agents.iter().find(|a| a.name == agent_name);
818
819 if let Some(ref cb) = progress {
820 cb(format!("[{}] running (in-process)...", agent_name));
821 }
822
823 let fork = runner
824 .run_isolated(
825 agent_name,
826 task,
827 agent_def.and_then(|d| d.system_prompt.as_deref()),
828 agent_def.and_then(|d| d.model.as_deref()),
829 agent_def.map(|d| d.tools.as_slice()).unwrap_or(&[]),
830 cwd,
831 depth,
832 )
833 .await
834 .map_err(|e| format!("In-process subagent failed: {e}"))?;
835
836 let result = fork_to_single(fork, agent_name, task, None);
837 let is_error = result.stop_reason.as_deref() == Some("error");
838
839 if is_error {
840 let error_msg = result.error_message.as_deref().unwrap_or("unknown error");
841 return Ok(AgentToolResult::error(format!("Agent failed: {error_msg}")));
842 }
843
844 return Ok(AgentToolResult::success(if result.output.is_empty() {
845 "(no output)".to_string()
846 } else {
847 result.output.clone()
848 })
849 .with_metadata(json!({
850 "mode": "single",
851 "agent": result.agent,
852 "source": result.agent_source,
853 "backend": "in-process",
854 "usage": {
855 "input_tokens": result.usage.input_tokens,
856 "output_tokens": result.usage.output_tokens,
857 "turns": result.usage.turns,
858 },
859 })));
860 }
861
862 if has_tasks {
864 let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
865 .map_err(|e| format!("Invalid tasks parameter: {e}"))?;
866 let total = tasks.len();
867 if total == 0 {
868 return Ok(AgentToolResult::error("No tasks provided".to_string()));
869 }
870
871 let limit = MAX_CONCURRENCY.min(total);
874 let mut all_results: Vec<SingleResult> = Vec::with_capacity(total);
875 let mut all_errors: Vec<String> = Vec::new();
876
877 for chunk in tasks.chunks(limit) {
878 let mut handles = Vec::new();
879 for task in chunk {
880 let agent_def = agents.iter().find(|a| a.name == task.agent);
881 let runner = Arc::clone(runner);
882 let agent_name = task.agent.clone();
883 let task_text = task.task.clone();
884 let system_prompt = agent_def.and_then(|d| d.system_prompt.clone());
885 let model = agent_def.and_then(|d| d.model.clone());
886 let tools: Vec<String> = agent_def.map(|d| d.tools.clone()).unwrap_or_default();
887 let cwd = cwd.to_path_buf();
888
889 handles.push(tokio::spawn(async move {
890 runner
891 .run_isolated(
892 &agent_name,
893 &task_text,
894 system_prompt.as_deref(),
895 model.as_deref(),
896 &tools,
897 &cwd,
898 depth,
899 )
900 .await
901 }));
902 }
903
904 for (i, handle) in handles.into_iter().enumerate() {
905 let task = &chunk[i];
906 match handle.await {
907 Ok(Ok(fork)) => {
908 all_results.push(fork_to_single(fork, &task.agent, &task.task, None))
909 }
910 Ok(Err(e)) => all_errors.push(format!("{}: {e}", task.agent)),
911 Err(e) => all_errors.push(format!("{}: join error: {e}", task.agent)),
912 }
913 }
914 }
915
916 if !all_errors.is_empty() {
917 return Ok(AgentToolResult::error(format!(
918 "Errors: {}",
919 all_errors.join("; ")
920 )));
921 }
922
923 let success_count = all_results.iter().filter(|r| r.exit_code == 0).count();
924 let summaries: Vec<String> = all_results
925 .iter()
926 .map(|r| format!("[{}] {}", r.agent, r.output))
927 .collect();
928
929 return Ok(AgentToolResult::success(format!(
930 "Parallel: {}/{} succeeded\n\n{}",
931 success_count,
932 all_results.len(),
933 summaries.join("\n\n---\n\n")
934 ))
935 .with_metadata(json!({
936 "mode": "parallel",
937 "backend": "in-process",
938 "results": all_results.iter().map(|r| json!({
939 "agent": r.agent,
940 "exit_code": r.exit_code,
941 })).collect::<Vec<_>>()
942 })));
943 }
944
945 if has_chain {
947 let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
948 .map_err(|e| format!("Invalid chain parameter: {e}"))?;
949 let total = steps.len();
950 let mut previous_output = String::new();
951 let mut results: Vec<SingleResult> = Vec::new();
952
953 for (i, step) in steps.into_iter().enumerate() {
954 let task = step.task.replace("{previous}", &previous_output);
955 let agent_def = agents.iter().find(|a| a.name == step.agent);
956
957 if let Some(ref cb) = progress {
958 cb(format!("[{}] chain step {}/{}", step.agent, i + 1, total));
959 }
960
961 let fork = runner
962 .run_isolated(
963 &step.agent,
964 &task,
965 agent_def.and_then(|d| d.system_prompt.as_deref()),
966 agent_def.and_then(|d| d.model.as_deref()),
967 agent_def.map(|d| d.tools.as_slice()).unwrap_or(&[]),
968 cwd,
969 depth,
970 )
971 .await;
972
973 let fork = match fork {
974 Ok(f) => f,
975 Err(e) => {
976 return Ok(AgentToolResult::error(format!(
977 "Chain stopped at step {}/{} ({}): {e}",
978 i + 1,
979 total,
980 step.agent
981 )));
982 }
983 };
984
985 let is_error = fork.error.is_some();
986 let result = fork_to_single(fork, &step.agent, &task, Some(i + 1));
987
988 if is_error {
989 let error_msg = result.error_message.clone().unwrap_or_default();
990 return Ok(AgentToolResult::error(format!(
991 "Chain stopped at step {}/{} ({}): {error_msg}",
992 i + 1,
993 total,
994 step.agent
995 )));
996 }
997
998 previous_output = result.output.clone();
999 results.push(result);
1000 }
1001
1002 let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
1003 return Ok(AgentToolResult::success(if output.is_empty() {
1004 "(no output)".to_string()
1005 } else {
1006 output
1007 })
1008 .with_metadata(json!({
1009 "mode": "chain",
1010 "backend": "in-process",
1011 "steps": results.len(),
1012 })));
1013 }
1014
1015 Ok(AgentToolResult::error("Invalid parameters".to_string()))
1016}
1017
1018async fn execute_chain_mode(
1020 cwd: &Path,
1021 agents: &[AgentDefinition],
1022 params: Value,
1023 binary: &Path,
1024 progress: Option<ProgressFn>,
1025 signal: Option<oneshot::Receiver<()>>,
1026) -> Result<AgentToolResult, ToolError> {
1027 let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
1028 .map_err(|e| format!("Invalid chain parameter: {}", e))?;
1029 let total = steps.len();
1030 let mut results = Vec::new();
1031 let mut previous_output = String::new();
1032 let mut abort_signal = signal;
1033
1034 for (i, step) in steps.into_iter().enumerate() {
1035 let task = step.task.replace("{previous}", &previous_output);
1036 let step_signal = if i == total - 1 {
1037 abort_signal.take()
1038 } else {
1039 None
1040 };
1041
1042 let result = run_single_agent(
1043 cwd,
1044 agents,
1045 &step.agent,
1046 &task,
1047 step.cwd.as_deref(),
1048 Some(i + 1),
1049 step_signal,
1050 progress.clone(),
1051 binary,
1052 )
1053 .await;
1054
1055 let is_error = result.exit_code != 0
1056 || result.stop_reason.as_deref() == Some("error")
1057 || result.stop_reason.as_deref() == Some("aborted");
1058
1059 if is_error {
1060 let agent_name = result.agent.clone();
1061 let error_msg = result
1062 .error_message
1063 .clone()
1064 .unwrap_or_else(|| result.stderr.clone());
1065 results.push(result);
1066 return Ok(AgentToolResult::error(format!(
1067 "Chain stopped at step {}/{} ({}): {}",
1068 i + 1,
1069 total,
1070 agent_name,
1071 error_msg
1072 )));
1073 }
1074
1075 previous_output = result.output.clone();
1076 results.push(result);
1077 }
1078
1079 let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
1080 Ok(AgentToolResult::success(if output.is_empty() {
1081 "(no output)".to_string()
1082 } else {
1083 output
1084 })
1085 .with_metadata(json!({
1086 "mode": "chain",
1087 "steps": results.len(),
1088 })))
1089}
1090
1091async fn execute_parallel_mode(
1093 cwd: &Path,
1094 agents: &[AgentDefinition],
1095 params: Value,
1096 binary: &Path,
1097 progress: Option<ProgressFn>,
1098) -> Result<AgentToolResult, ToolError> {
1099 let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
1100 .map_err(|e| format!("Invalid tasks parameter: {}", e))?;
1101
1102 if tasks.len() > MAX_PARALLEL_TASKS {
1103 return Ok(AgentToolResult::error(format!(
1104 "Too many parallel tasks ({}). Max is {}.",
1105 tasks.len(),
1106 MAX_PARALLEL_TASKS
1107 )));
1108 }
1109
1110 let results = run_parallel(cwd, agents, tasks, binary.to_path_buf(), progress).await;
1111
1112 let success_count = results.iter().filter(|r| r.exit_code == 0).count();
1113 let summaries: Vec<String> = results
1114 .iter()
1115 .map(|r| {
1116 let _preview = truncate_output(&r.output, 100);
1117 format!(
1118 "[{}]: {}",
1119 r.agent,
1120 if r.exit_code == 0 {
1121 "completed"
1122 } else {
1123 "failed"
1124 },
1125 )
1126 })
1127 .collect();
1128
1129 Ok(AgentToolResult::success(format!(
1130 "Parallel: {}/{} succeeded\n\n{}",
1131 success_count,
1132 results.len(),
1133 summaries.join("\n\n")
1134 ))
1135 .with_metadata(json!({
1136 "mode": "parallel",
1137 "results": results.iter().map(|r| json!({
1138 "agent": r.agent,
1139 "exit_code": r.exit_code,
1140 })).collect::<Vec<_>>()
1141 })))
1142}
1143
1144async fn execute_single_mode(
1146 cwd: &Path,
1147 agents: &[AgentDefinition],
1148 params: Value,
1149 binary: &Path,
1150 progress: Option<ProgressFn>,
1151 signal: Option<oneshot::Receiver<()>>,
1152) -> Result<AgentToolResult, ToolError> {
1153 let agent_name = params["agent"]
1154 .as_str()
1155 .ok_or("Missing required parameter: agent")?;
1156 let task = params["task"]
1157 .as_str()
1158 .ok_or("Missing required parameter: task")?;
1159 let agent_cwd = params["cwd"].as_str();
1160
1161 let result = run_single_agent(
1162 cwd, agents, agent_name, task, agent_cwd, None, signal, progress, binary,
1163 )
1164 .await;
1165
1166 let is_error = result.exit_code != 0
1167 || result.stop_reason.as_deref() == Some("error")
1168 || result.stop_reason.as_deref() == Some("aborted");
1169
1170 if is_error {
1171 let error_msg = result.error_message.as_deref().unwrap_or(&result.stderr);
1172 return Ok(AgentToolResult::error(format!(
1173 "Agent {}: {}",
1174 result.stop_reason.as_deref().unwrap_or("failed"),
1175 error_msg
1176 )));
1177 }
1178
1179 Ok(AgentToolResult::success(if result.output.is_empty() {
1180 "(no output)".to_string()
1181 } else {
1182 result.output.clone()
1183 })
1184 .with_metadata(json!({
1185 "mode": "single",
1186 "agent": result.agent,
1187 "source": result.agent_source,
1188 "usage": {
1189 "input_tokens": result.usage.input_tokens,
1190 "output_tokens": result.usage.output_tokens,
1191 "turns": result.usage.turns,
1192 },
1193 })))
1194}
1195
1196fn truncate_output(text: &str, max_chars: usize) -> String {
1199 if text.len() <= max_chars {
1200 text.to_string()
1201 } else {
1202 format!("{}...", &text[..max_chars])
1203 }
1204}
1205
1206#[cfg(test)]
1209mod tests {
1210 use super::*;
1211
1212 #[test]
1213 fn test_discover_agents_empty_dir() {
1214 let tmp = tempfile::tempdir().unwrap();
1215 let agents = discover_agents(tmp.path(), AgentScope::Project);
1216 assert!(agents.is_empty());
1217 }
1218
1219 #[test]
1220 fn test_discover_agents_with_flat_files() {
1221 let tmp = tempfile::tempdir().unwrap();
1222 let agents_dir = tmp.path().join(".oxi").join("agents");
1223 std::fs::create_dir_all(&agents_dir).unwrap();
1224 std::fs::write(
1225 agents_dir.join("scout.md"),
1226 "---\nname: scout\ndescription: Recon\n---\nBe a scout.",
1227 )
1228 .unwrap();
1229 std::fs::write(
1230 agents_dir.join("worker.md"),
1231 "---\nname: worker\n---\nBe a worker.",
1232 )
1233 .unwrap();
1234 std::fs::write(agents_dir.join("ignore.txt"), "ignore me").unwrap();
1235 let agents = discover_agents(tmp.path(), AgentScope::Project);
1236 assert_eq!(agents.len(), 2);
1237 assert!(agents.iter().any(|a| a.name == "scout"));
1238 assert!(agents.iter().any(|a| a.name == "worker"));
1239 }
1240
1241 #[test]
1242 fn test_schema_structure() {
1243 let tool = SubagentTool::new();
1244 let schema = tool.parameters_schema();
1245 assert_eq!(schema["type"], "object");
1246 assert!(schema["properties"]["agent"].is_object());
1247 assert!(schema["properties"]["tasks"].is_object());
1248 assert!(schema["properties"]["chain"].is_object());
1249 assert!(schema["properties"]["agentScope"].is_object());
1250 }
1251
1252 #[test]
1253 fn test_truncate_output() {
1254 assert_eq!(truncate_output("hello", 10), "hello");
1255 assert_eq!(truncate_output("hello world foo", 5), "hello...");
1256 }
1257
1258 #[test]
1259 fn test_process_json_line_text_delta() {
1260 let mut result = SingleResult {
1261 agent: "test".into(),
1262 agent_source: "user".into(),
1263 task: "t".into(),
1264 exit_code: 0,
1265 output: String::new(),
1266 stderr: String::new(),
1267 usage: UsageStats::default(),
1268 model: None,
1269 stop_reason: None,
1270 error_message: None,
1271 step: None,
1272 };
1273 let mut text = String::new();
1274 process_json_line(
1275 r#"{"type":"text_delta","text":"hello"}"#,
1276 &mut result,
1277 &mut text,
1278 &None,
1279 );
1280 assert_eq!(text, "hello");
1281 }
1282
1283 #[test]
1284 fn test_process_json_line_usage() {
1285 let mut result = SingleResult {
1286 agent: "test".into(),
1287 agent_source: "user".into(),
1288 task: "t".into(),
1289 exit_code: 0,
1290 output: String::new(),
1291 stderr: String::new(),
1292 usage: UsageStats::default(),
1293 model: None,
1294 stop_reason: None,
1295 error_message: None,
1296 step: None,
1297 };
1298 let mut text = String::new();
1299 process_json_line(
1300 r#"{"type":"usage","input_tokens":100,"output_tokens":50}"#,
1301 &mut result,
1302 &mut text,
1303 &None,
1304 );
1305 assert_eq!(result.usage.input_tokens, 100);
1306 assert_eq!(result.usage.output_tokens, 50);
1307 assert_eq!(result.usage.turns, 1);
1308 }
1309
1310 #[test]
1311 fn test_depth_limit_default() {
1312 unsafe {
1313 std::env::remove_var("OXI_SUBAGENT_DEPTH");
1314 std::env::remove_var("OXI_MAX_SUBAGENT_DEPTH");
1315 }
1316 assert_eq!(current_subagent_depth(), 0);
1317 assert_eq!(max_subagent_depth(), 3);
1318 }
1319}