1use super::{AgentTool, AgentToolResult, ProgressCallback, ToolContext, ToolError};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use tokio::io::{AsyncBufReadExt, BufReader};
17use tokio::sync::oneshot;
18
19const MAX_PARALLEL_TASKS: usize = 8;
22const MAX_CONCURRENCY: usize = 4;
23
24type ProgressFn = ProgressCallback;
27
28fn create_system_prompt_temp_dir(prefix: &str) -> Result<PathBuf, String> {
31 let path = std::env::temp_dir().join(format!("{}-{}", prefix, uuid::Uuid::new_v4()));
32 std::fs::create_dir_all(&path).map_err(|e| format!("Failed to create temp dir: {}", e))?;
33 Ok(path)
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41#[derive(Default)]
42pub enum AgentScope {
43 #[default]
45 User,
46 Project,
48 Both,
50}
51
52#[derive(Debug, Clone)]
54pub struct AgentConfig {
55 pub name: String,
57 pub description: String,
59 pub model: Option<String>,
61 pub tools: Option<Vec<String>>,
63 pub system_prompt: String,
65 pub source: String, }
68
69pub fn discover_agents(cwd: &Path, scope: AgentScope) -> Vec<AgentConfig> {
71 let mut agents = Vec::new();
72 let mut seen_names = std::collections::HashSet::new();
73
74 if scope == AgentScope::User || scope == AgentScope::Both {
76 if let Some(home) = dirs::home_dir() {
77 let user_dir = home.join(".oxi").join("agents");
78 load_agents_from_dir(&user_dir, "user", &mut agents, &mut seen_names);
79 }
80 }
81
82 if scope == AgentScope::Project || scope == AgentScope::Both {
84 if let Some(project_dir) = find_project_agents_dir(cwd) {
85 load_agents_from_dir(&project_dir, "project", &mut agents, &mut seen_names);
86 }
87 }
88
89 agents
90}
91
92fn find_project_agents_dir(cwd: &Path) -> Option<PathBuf> {
95 let mut current = cwd;
96 loop {
97 let candidate = current.join(".oxi").join("agents");
98 if candidate.is_dir() {
99 return Some(candidate);
100 }
101 if current.join(".git").exists() {
103 return None;
104 }
105 current = current.parent()?;
106 }
107}
108
109fn load_agents_from_dir(
110 dir: &Path,
111 source: &str,
112 agents: &mut Vec<AgentConfig>,
113 seen: &mut std::collections::HashSet<String>,
114) {
115 let entries = match std::fs::read_dir(dir) {
116 Ok(e) => e,
117 Err(_) => return,
118 };
119
120 for entry in entries.flatten() {
121 let path = entry.path();
122 if path.extension().and_then(|e| e.to_str()) != Some("md") {
123 continue;
124 }
125
126 let name = path
127 .file_stem()
128 .and_then(|s| s.to_str())
129 .unwrap_or("")
130 .to_string();
131
132 if name.is_empty() || seen.contains(&name) {
133 continue;
134 }
135
136 match parse_agent_file(&path) {
137 Ok(config) => {
138 seen.insert(name.clone());
139 let mut config = config;
140 config.source = source.to_string();
141 agents.push(config);
142 }
143 Err(e) => {
144 tracing::warn!("Failed to parse agent {}: {}", path.display(), e);
145 }
146 }
147 }
148}
149
150fn parse_agent_file(path: &Path) -> Result<AgentConfig, String> {
152 let content = std::fs::read_to_string(path).map_err(|e| format!("Failed to read: {}", e))?;
153
154 let (frontmatter, body) = parse_frontmatter(&content);
155
156 let name = frontmatter.get("name").cloned().unwrap_or_else(|| {
157 path.file_stem()
158 .and_then(|s| s.to_str())
159 .unwrap_or("unknown")
160 .to_string()
161 });
162
163 let description = frontmatter.get("description").cloned().unwrap_or_default();
164
165 let model = frontmatter.get("model").cloned();
166
167 let tools = frontmatter.get("tools").map(|s| {
168 s.split(',')
169 .map(|t| t.trim().to_string())
170 .filter(|t| !t.is_empty())
171 .collect()
172 });
173
174 Ok(AgentConfig {
175 name,
176 description,
177 model,
178 tools,
179 system_prompt: body.trim().to_string(),
180 source: String::new(),
181 })
182}
183
184fn parse_frontmatter(content: &str) -> (HashMap<String, String>, String) {
186 let mut map = HashMap::new();
187 let trimmed = content.trim_start();
188 if !trimmed.starts_with("---") {
189 return (map, content.to_string());
190 }
191 let after_first = &trimmed[3..];
192 if let Some(end_idx) = after_first.find("\n---") {
193 let yaml = &after_first[..end_idx];
194 let body = after_first[end_idx + 4..].to_string();
195 for line in yaml.lines() {
196 let line = line.trim();
197 if line.is_empty() {
198 continue;
199 }
200 if let Some((key, value)) = line.split_once(':') {
201 map.insert(key.trim().to_string(), value.trim().to_string());
202 }
203 }
204 return (map, body);
205 }
206 (map, content.to_string())
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize, Default)]
212pub struct UsageStats {
214 pub input_tokens: u64,
216 pub output_tokens: u64,
218 pub cache_read: u64,
220 pub cache_write: u64,
222 pub cost: f64,
224 pub turns: u32,
226}
227
228#[derive(Debug, Clone)]
229pub struct SingleResult {
231 pub agent: String,
233 pub agent_source: String,
235 pub task: String,
237 pub exit_code: i32,
239 pub output: String,
241 pub stderr: String,
243 pub usage: UsageStats,
245 pub model: Option<String>,
247 pub stop_reason: Option<String>,
249 pub error_message: Option<String>,
251 pub step: Option<usize>,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256#[serde(rename_all = "snake_case")]
257pub enum SubagentMode {
259 Single,
261 Parallel,
263 Chain,
265}
266
267#[derive(Debug, Clone)]
268pub struct SubagentDetails {
270 pub mode: SubagentMode,
272 pub results: Vec<SingleResult>,
274}
275
276fn process_json_line(
279 line: &str,
280 result: &mut SingleResult,
281 text: &mut String,
282 _on_progress: &Option<ProgressFn>,
283) {
284 let event: Value = match serde_json::from_str(line) {
285 Ok(v) => v,
286 Err(_) => return,
287 };
288 match event["type"].as_str().unwrap_or("") {
289 "text_delta" => {
290 if let Some(t) = event["text"].as_str() {
291 text.push_str(t);
292 }
293 }
294 "usage" => {
295 result.usage.input_tokens += event["input_tokens"].as_u64().unwrap_or(0);
296 result.usage.output_tokens += event["output_tokens"].as_u64().unwrap_or(0);
297 result.usage.turns += 1;
298 }
299 "complete" => {
300 result.stop_reason = Some("complete".to_string());
301 }
302 "error" => {
303 result.error_message = Some(
304 event["message"]
305 .as_str()
306 .unwrap_or("Unknown error")
307 .to_string(),
308 );
309 result.stop_reason = Some("error".to_string());
310 }
311 _ => {}
312 }
313}
314
315fn build_agent_args(agent: &AgentConfig, tmp_dir: &Path, task: &str) -> Vec<String> {
319 let mut args = vec!["--mode".to_string(), "json".to_string(), "-p".to_string()];
320
321 if let Some(ref model) = agent.model {
322 args.push("--model".to_string());
323 args.push(model.clone());
324 }
325
326 if let Some(ref agent_tools) = agent.tools {
327 if !agent_tools.is_empty() {
328 args.push("--tools".to_string());
329 args.push(agent_tools.join(","));
330 }
331 }
332
333 if !agent.system_prompt.is_empty()
334 && std::fs::write(tmp_dir.join("system_prompt.md"), &agent.system_prompt).is_ok()
335 {
336 args.push("--append-system-prompt".to_string());
337 args.push(
338 tmp_dir
339 .join("system_prompt.md")
340 .to_str()
341 .unwrap_or_default()
342 .to_string(),
343 );
344 }
345
346 args.push(format!("Task: {}", task));
347 args
348}
349
350async fn terminate_child(
352 child: &mut tokio::process::Child,
353 stderr_handle: tokio::task::JoinHandle<String>,
354 result: &mut SingleResult,
355) {
356 #[cfg(unix)]
357 {
358 if let Some(pid) = child.id() {
359 unsafe {
363 libc::kill(pid as i32, libc::SIGTERM);
364 }
365 }
366 let deadline = tokio::time::sleep(std::time::Duration::from_secs(5));
367 tokio::pin!(deadline);
368 tokio::select! {
369 _ = &mut deadline => { let _ = child.start_kill(); }
370 _ = child.wait() => {}
371 }
372 }
373 #[cfg(not(unix))]
374 {
375 let _ = child.start_kill();
376 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
377 }
378
379 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), async {
381 if let Ok(err) = stderr_handle.await {
382 result.stderr = err;
383 }
384 })
385 .await;
386}
387
388#[allow(clippy::too_many_arguments)]
390async fn run_single_agent(
391 cwd: &Path,
392 agents: &[AgentConfig],
393 agent_name: &str,
394 task: &str,
395 agent_cwd: Option<&str>,
396 step: Option<usize>,
397 signal: Option<oneshot::Receiver<()>>,
398 on_progress: Option<ProgressFn>,
399 binary_path: &Path,
400) -> SingleResult {
401 let agent = match agents.iter().find(|a| a.name == agent_name) {
402 Some(a) => a,
403 None => {
404 let available = agents
405 .iter()
406 .map(|a| format!("\"{}\"", a.name))
407 .collect::<Vec<_>>()
408 .join(", ");
409 return SingleResult {
410 agent: agent_name.to_string(),
411 agent_source: "unknown".to_string(),
412 task: task.to_string(),
413 exit_code: 1,
414 output: String::new(),
415 stderr: format!(
416 "Unknown agent: \"{}\". Available: {}",
417 agent_name, available
418 ),
419 usage: UsageStats::default(),
420 model: None,
421 stop_reason: None,
422 error_message: Some(format!("Unknown agent: {}", agent_name)),
423 step,
424 };
425 }
426 };
427
428 let mut result = SingleResult {
429 agent: agent_name.to_string(),
430 agent_source: agent.source.clone(),
431 task: task.to_string(),
432 exit_code: 0,
433 output: String::new(),
434 stderr: String::new(),
435 usage: UsageStats::default(),
436 model: agent.model.clone(),
437 stop_reason: None,
438 error_message: None,
439 step,
440 };
441
442 if let Some(ref cb) = on_progress {
444 cb(format!("[{}] running...", agent_name));
445 }
446
447 let tmp_dir = match create_system_prompt_temp_dir("oxi-subagent") {
449 Ok(tmp) => Some(tmp),
450 Err(e) => {
451 result.exit_code = 1;
452 result.stderr = e.clone();
453 result.error_message = Some(e);
454 return result;
455 }
456 };
457
458 let args = match tmp_dir {
459 Some(ref tmp) => build_agent_args(agent, tmp, task),
460 None => vec![
461 "--mode".to_string(),
462 "json".to_string(),
463 "-p".to_string(),
464 format!("Task: {}", task),
465 ],
466 };
467
468 let working_dir = agent_cwd
469 .map(PathBuf::from)
470 .unwrap_or_else(|| cwd.to_path_buf());
471
472 let mut cmd = tokio::process::Command::new(binary_path);
473 cmd.args(&args)
474 .current_dir(&working_dir)
475 .stdout(std::process::Stdio::piped())
476 .stderr(std::process::Stdio::piped())
477 .stdin(std::process::Stdio::null());
478
479 let mut child = match cmd.spawn() {
480 Ok(c) => c,
481 Err(e) => {
482 result.exit_code = 1;
483 result.stderr = format!("Failed to spawn: {}", e);
484 result.error_message = Some(format!("Failed to spawn: {}", e));
485 return result;
486 }
487 };
488
489 let stdout = child.stdout.take().expect("stdout piped but missing");
490 let stderr = child.stderr.take().expect("stderr piped but missing");
491
492 let (line_tx, mut line_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
494 let _reader_handle = tokio::spawn(async move {
495 let reader = BufReader::new(stdout);
496 let mut lines = reader.lines();
497 while let Ok(Some(line)) = lines.next_line().await {
498 if line_tx.send(line).is_err() {
499 break;
500 }
501 }
502 });
503
504 let stderr_handle = tokio::spawn(async move {
506 let mut err = String::new();
507 let reader = BufReader::new(stderr);
508 let mut lines = reader.lines();
509 while let Ok(Some(line)) = lines.next_line().await {
510 err.push_str(&line);
511 err.push('\n');
512 }
513 err
514 });
515
516 let mut final_text = String::new();
518 let mut signal_rx = signal;
519 let mut aborted = false;
520
521 loop {
522 tokio::select! {
523 line = line_rx.recv() => {
524 match line {
525 Some(line) => {
526 process_json_line(&line, &mut result, &mut final_text, &on_progress);
527 }
528 None => break, }
530 }
531 _ = async {
532 match &mut signal_rx {
533 Some(rx) => { let _ = rx.await; }
534 None => std::future::pending::<()>().await,
535 }
536 } => {
537 aborted = true;
538 break;
539 }
540 }
541 }
542
543 if aborted {
544 result.stop_reason = Some("aborted".into());
545 result.error_message = Some("Aborted by user".into());
546 terminate_child(&mut child, stderr_handle, &mut result).await;
547 } else {
548 if let Ok(err_output) = stderr_handle.await {
550 result.stderr = err_output;
551 }
552 match child.wait().await {
553 Ok(status) => result.exit_code = status.code().unwrap_or(1),
554 Err(_) => result.exit_code = 1,
555 }
556 }
557
558 result.output = final_text;
559
560 if let Some(ref cb) = on_progress {
561 let status = if result.exit_code == 0 {
562 "done"
563 } else {
564 "failed"
565 };
566 cb(format!("[{}] {}", agent_name, status));
567 }
568
569 result
570}
571
572async fn run_parallel(
574 cwd: &Path,
575 agents: &[AgentConfig],
576 tasks: Vec<ParallelTask>,
577 binary_path: PathBuf,
578 on_progress: Option<ProgressFn>,
579) -> Vec<SingleResult> {
580 let n = tasks.len();
581 if n == 0 {
582 return vec![];
583 }
584
585 let limit = MAX_CONCURRENCY.min(n);
586 let indexed_tasks: Vec<(usize, ParallelTask)> = tasks.into_iter().enumerate().collect();
587 let mut all_results: Vec<Option<SingleResult>> = vec![None; n];
588
589 let mut i = 0;
590 while i < indexed_tasks.len() {
591 let end = (i + limit).min(indexed_tasks.len());
592 let chunk: Vec<_> = indexed_tasks[i..end].to_vec();
593 let mut handles = Vec::new();
594
595 for (idx, task) in chunk {
596 let agents = agents.to_vec();
597 let cwd = cwd.to_path_buf();
598 let bp = binary_path.clone();
599 let progress = on_progress.clone();
600
601 handles.push((
602 idx,
603 tokio::spawn(async move {
604 run_single_agent(
605 &cwd,
606 &agents,
607 &task.agent,
608 &task.task,
609 task.cwd.as_deref(),
610 None,
611 None,
612 progress,
613 &bp,
614 )
615 .await
616 }),
617 ));
618 }
619
620 for (idx, handle) in handles {
621 if let Ok(r) = handle.await {
622 all_results[idx] = Some(r);
623 }
624 }
625
626 i = end;
627 }
628
629 all_results
630 .into_iter()
631 .map(|r| {
632 r.unwrap_or_else(|| SingleResult {
633 agent: "unknown".to_string(),
634 agent_source: "unknown".to_string(),
635 task: "unknown".to_string(),
636 exit_code: 1,
637 output: String::new(),
638 stderr: "Task did not complete".to_string(),
639 usage: UsageStats::default(),
640 model: None,
641 stop_reason: Some("error".to_string()),
642 error_message: Some("Task did not complete".to_string()),
643 step: None,
644 })
645 })
646 .collect()
647}
648
649#[derive(Debug, Deserialize, Clone)]
652struct ParallelTask {
653 agent: String,
654 task: String,
655 #[serde(default)]
656 cwd: Option<String>,
657}
658
659#[derive(Debug, Deserialize)]
660struct ChainStep {
661 agent: String,
662 task: String,
663 #[serde(default)]
664 cwd: Option<String>,
665}
666
667pub struct SubagentTool {
671 cwd: Option<PathBuf>,
673 binary_path: Option<PathBuf>,
674 progress_callback: parking_lot::Mutex<Option<ProgressFn>>,
675}
676
677impl Default for SubagentTool {
678 fn default() -> Self {
679 Self::new()
680 }
681}
682
683impl SubagentTool {
684 pub fn new() -> Self {
686 Self {
687 cwd: None,
688 binary_path: None,
689 progress_callback: parking_lot::Mutex::new(None),
690 }
691 }
692
693 pub fn with_cwd(cwd: impl Into<PathBuf>) -> Self {
695 Self {
696 cwd: Some(cwd.into()),
697 binary_path: None,
698 progress_callback: parking_lot::Mutex::new(None),
699 }
700 }
701
702 fn get_binary(&self) -> PathBuf {
703 self.binary_path
704 .clone()
705 .or_else(|| std::env::current_exe().ok())
706 .unwrap_or_else(|| PathBuf::from("oxi"))
707 }
708}
709
710#[async_trait]
711impl AgentTool for SubagentTool {
712 fn name(&self) -> &str {
713 "subagent"
714 }
715
716 fn label(&self) -> &str {
717 "Subagent"
718 }
719
720 fn description(&self) -> &str {
721 "Delegate tasks to specialized subagents with isolated context. \
722 Modes: single (agent + task), parallel (tasks array), chain (sequential with {previous} placeholder). \
723 Agents are discovered from ~/.oxi/agents/ (user) and .oxi/agents/ (project)."
724 }
725
726 fn parameters_schema(&self) -> Value {
727 json!({
728 "type": "object",
729 "properties": {
730 "agent": {
731 "type": "string",
732 "description": "Agent name for single mode"
733 },
734 "task": {
735 "type": "string",
736 "description": "Task to delegate (single mode)"
737 },
738 "tasks": {
739 "type": "array",
740 "description": "Array of {agent, task} for parallel execution (max 8)",
741 "items": {
742 "type": "object",
743 "properties": {
744 "agent": { "type": "string" },
745 "task": { "type": "string" },
746 "cwd": { "type": "string" }
747 },
748 "required": ["agent", "task"]
749 }
750 },
751 "chain": {
752 "type": "array",
753 "description": "Array of {agent, task} for sequential execution. Use {previous} in task for prior output.",
754 "items": {
755 "type": "object",
756 "properties": {
757 "agent": { "type": "string" },
758 "task": { "type": "string" },
759 "cwd": { "type": "string" }
760 },
761 "required": ["agent", "task"]
762 }
763 },
764 "agentScope": {
765 "type": "string",
766 "description": "Agent discovery scope: 'user' (default), 'project', or 'both'",
767 "enum": ["user", "project", "both"],
768 "default": "user"
769 },
770 "cwd": {
771 "type": "string",
772 "description": "Working directory for single mode"
773 }
774 }
775 })
776 }
777
778 fn on_progress(&self, callback: ProgressCallback) {
779 *self.progress_callback.lock() = Some(callback);
780 }
781
782 async fn execute(
783 &self,
784 _tool_call_id: &str,
785 params: Value,
786 signal: Option<oneshot::Receiver<()>>,
787 ctx: &ToolContext,
788 ) -> Result<AgentToolResult, ToolError> {
789 let effective_cwd = self.cwd.as_deref().unwrap_or(ctx.root());
791
792 let scope: AgentScope = params
793 .get("agentScope")
794 .and_then(|v| serde_json::from_value(v.clone()).ok())
795 .unwrap_or(AgentScope::User);
796
797 let agents = discover_agents(effective_cwd, scope);
798 let binary = self.get_binary();
799 let progress = self.progress_callback.lock().clone();
800
801 let has_chain = params["chain"]
802 .as_array()
803 .map(|a| !a.is_empty())
804 .unwrap_or(false);
805 let has_tasks = params["tasks"]
806 .as_array()
807 .map(|a| !a.is_empty())
808 .unwrap_or(false);
809 let has_single = params["agent"].is_string() && params["task"].is_string();
810
811 let mode_count = [has_chain, has_tasks, has_single]
812 .iter()
813 .filter(|&&x| x)
814 .count();
815
816 if mode_count != 1 {
817 let available = agents
818 .iter()
819 .map(|a| format!("{} ({})", a.name, a.source))
820 .collect::<Vec<_>>()
821 .join(", ");
822 return Ok(AgentToolResult::error(format!(
823 "Provide exactly one mode: agent+task, tasks, or chain.\nAvailable agents: {}",
824 if available.is_empty() {
825 "none".to_string()
826 } else {
827 available
828 }
829 )));
830 }
831
832 if has_chain {
834 return execute_chain_mode(effective_cwd, &agents, params, &binary, progress, signal)
835 .await;
836 }
837
838 if has_tasks {
840 return execute_parallel_mode(effective_cwd, &agents, params, &binary, progress).await;
841 }
842
843 if has_single {
845 return execute_single_mode(effective_cwd, &agents, params, &binary, progress, signal)
846 .await;
847 }
848
849 Ok(AgentToolResult::error("Invalid parameters".to_string()))
850 }
851}
852
853async fn execute_chain_mode(
855 cwd: &Path,
856 agents: &[AgentConfig],
857 params: Value,
858 binary: &Path,
859 progress: Option<ProgressFn>,
860 signal: Option<oneshot::Receiver<()>>,
861) -> Result<AgentToolResult, ToolError> {
862 let steps: Vec<ChainStep> = serde_json::from_value(params["chain"].clone())
863 .map_err(|e| format!("Invalid chain parameter: {}", e))?;
864 let total = steps.len();
865 let mut results = Vec::new();
866 let mut previous_output = String::new();
867 let mut abort_signal = signal;
868
869 for (i, step) in steps.into_iter().enumerate() {
870 let task = step.task.replace("{previous}", &previous_output);
871 let step_signal = if i == total - 1 {
872 abort_signal.take()
873 } else {
874 None
875 };
876
877 let result = run_single_agent(
878 cwd,
879 agents,
880 &step.agent,
881 &task,
882 step.cwd.as_deref(),
883 Some(i + 1),
884 step_signal,
885 progress.clone(),
886 binary,
887 )
888 .await;
889
890 let is_error = result.exit_code != 0
891 || result.stop_reason.as_deref() == Some("error")
892 || result.stop_reason.as_deref() == Some("aborted");
893
894 if is_error {
895 let agent_name = result.agent.clone();
896 let error_msg = result
897 .error_message
898 .clone()
899 .unwrap_or_else(|| result.stderr.clone());
900 results.push(result);
901 return Ok(AgentToolResult::error(format!(
902 "Chain stopped at step {}/{} ({}): {}",
903 i + 1,
904 total,
905 agent_name,
906 error_msg
907 )));
908 }
909
910 previous_output = result.output.clone();
911 results.push(result);
912 }
913
914 let output = results.last().map(|r| r.output.clone()).unwrap_or_default();
915 Ok(AgentToolResult::success(if output.is_empty() {
916 "(no output)".to_string()
917 } else {
918 output
919 })
920 .with_metadata(json!({
921 "mode": "chain",
922 "steps": results.len(),
923 })))
924}
925
926async fn execute_parallel_mode(
928 cwd: &Path,
929 agents: &[AgentConfig],
930 params: Value,
931 binary: &Path,
932 progress: Option<ProgressFn>,
933) -> Result<AgentToolResult, ToolError> {
934 let tasks: Vec<ParallelTask> = serde_json::from_value(params["tasks"].clone())
935 .map_err(|e| format!("Invalid tasks parameter: {}", e))?;
936
937 if tasks.len() > MAX_PARALLEL_TASKS {
938 return Ok(AgentToolResult::error(format!(
939 "Too many parallel tasks ({}). Max is {}.",
940 tasks.len(),
941 MAX_PARALLEL_TASKS
942 )));
943 }
944
945 let results = run_parallel(cwd, agents, tasks, binary.to_path_buf(), progress).await;
946
947 let success_count = results.iter().filter(|r| r.exit_code == 0).count();
948 let summaries: Vec<String> = results
949 .iter()
950 .map(|r| {
951 let _preview = truncate_output(&r.output, 100);
952 format!(
953 "[{}]: {}",
954 r.agent,
955 if r.exit_code == 0 {
956 "completed"
957 } else {
958 "failed"
959 },
960 )
961 })
962 .collect();
963
964 Ok(AgentToolResult::success(format!(
965 "Parallel: {}/{} succeeded\n\n{}",
966 success_count,
967 results.len(),
968 summaries.join("\n\n")
969 ))
970 .with_metadata(json!({
971 "mode": "parallel",
972 "results": results.iter().map(|r| json!({
973 "agent": r.agent,
974 "exit_code": r.exit_code,
975 })).collect::<Vec<_>>()
976 })))
977}
978
979async fn execute_single_mode(
981 cwd: &Path,
982 agents: &[AgentConfig],
983 params: Value,
984 binary: &Path,
985 progress: Option<ProgressFn>,
986 signal: Option<oneshot::Receiver<()>>,
987) -> Result<AgentToolResult, ToolError> {
988 let agent_name = params["agent"]
989 .as_str()
990 .ok_or("Missing required parameter: agent")?;
991 let task = params["task"]
992 .as_str()
993 .ok_or("Missing required parameter: task")?;
994 let agent_cwd = params["cwd"].as_str();
995
996 let result = run_single_agent(
997 cwd, agents, agent_name, task, agent_cwd, None, signal, progress, binary,
998 )
999 .await;
1000
1001 let is_error = result.exit_code != 0
1002 || result.stop_reason.as_deref() == Some("error")
1003 || result.stop_reason.as_deref() == Some("aborted");
1004
1005 if is_error {
1006 let error_msg = result.error_message.as_deref().unwrap_or(&result.stderr);
1007 return Ok(AgentToolResult::error(format!(
1008 "Agent {}: {}",
1009 result.stop_reason.as_deref().unwrap_or("failed"),
1010 error_msg
1011 )));
1012 }
1013
1014 Ok(AgentToolResult::success(if result.output.is_empty() {
1015 "(no output)".to_string()
1016 } else {
1017 result.output.clone()
1018 })
1019 .with_metadata(json!({
1020 "mode": "single",
1021 "agent": result.agent,
1022 "source": result.agent_source,
1023 "usage": {
1024 "input_tokens": result.usage.input_tokens,
1025 "output_tokens": result.usage.output_tokens,
1026 "turns": result.usage.turns,
1027 },
1028 })))
1029}
1030
1031fn truncate_output(text: &str, max_chars: usize) -> String {
1034 if text.len() <= max_chars {
1035 text.to_string()
1036 } else {
1037 format!("{}...", &text[..max_chars])
1038 }
1039}
1040
1041#[cfg(test)]
1044mod tests {
1045 use super::*;
1046
1047 #[test]
1048 fn test_parse_frontmatter_with_yaml() {
1049 let content = "---\nname: scout\ndescription: Fast recon\nmodel: haiku\ntools: read, grep\n---\nYou are a scout agent.";
1050 let (fm, body) = parse_frontmatter(content);
1051 assert_eq!(fm.get("name").unwrap(), "scout");
1052 assert_eq!(fm.get("description").unwrap(), "Fast recon");
1053 assert_eq!(fm.get("model").unwrap(), "haiku");
1054 assert_eq!(fm.get("tools").unwrap(), "read, grep");
1055 assert!(body.trim().starts_with("You are a scout agent."));
1056 }
1057
1058 #[test]
1059 fn test_parse_frontmatter_no_yaml() {
1060 let content = "Just a plain system prompt.";
1061 let (fm, body) = parse_frontmatter(content);
1062 assert!(fm.is_empty());
1063 assert_eq!(body.trim(), "Just a plain system prompt.");
1064 }
1065
1066 #[test]
1067 fn test_parse_agent_file() {
1068 let tmp = tempfile::tempdir().unwrap();
1069 let file_path = tmp.path().join("scout.md");
1070 std::fs::write(
1071 &file_path,
1072 "---\nname: scout\ndescription: Fast recon\n---\nYou are a scout.",
1073 )
1074 .unwrap();
1075 let config = parse_agent_file(&file_path).unwrap();
1076 assert_eq!(config.name, "scout");
1077 assert_eq!(config.description, "Fast recon");
1078 assert_eq!(config.system_prompt, "You are a scout.");
1079 }
1080
1081 #[test]
1082 fn test_parse_agent_file_no_frontmatter() {
1083 let tmp = tempfile::tempdir().unwrap();
1084 let file_path = tmp.path().join("worker.md");
1085 std::fs::write(&file_path, "You are a worker agent.").unwrap();
1086 let config = parse_agent_file(&file_path).unwrap();
1087 assert_eq!(config.name, "worker");
1088 assert_eq!(config.system_prompt, "You are a worker agent.");
1089 }
1090
1091 #[test]
1092 fn test_discover_agents_empty_dir() {
1093 let tmp = tempfile::tempdir().unwrap();
1094 let agents = discover_agents(tmp.path(), AgentScope::User);
1095 assert!(agents.is_empty());
1096 }
1097
1098 #[test]
1099 fn test_discover_agents_with_files() {
1100 let tmp = tempfile::tempdir().unwrap();
1101 let agents_dir = tmp.path().join(".oxi").join("agents");
1102 std::fs::create_dir_all(&agents_dir).unwrap();
1103 std::fs::write(
1104 agents_dir.join("scout.md"),
1105 "---\nname: scout\ndescription: Recon\n---\nBe a scout.",
1106 )
1107 .unwrap();
1108 std::fs::write(
1109 agents_dir.join("worker.md"),
1110 "---\nname: worker\n---\nBe a worker.",
1111 )
1112 .unwrap();
1113 std::fs::write(agents_dir.join("ignore.txt"), "ignore me").unwrap();
1114 let agents = discover_agents(tmp.path(), AgentScope::Project);
1115 assert_eq!(agents.len(), 2);
1116 assert!(agents.iter().any(|a| a.name == "scout"));
1117 assert!(agents.iter().any(|a| a.name == "worker"));
1118 }
1119
1120 #[test]
1121 fn test_find_project_agents_dir() {
1122 let tmp = tempfile::tempdir().unwrap();
1123 let agents_dir = tmp.path().join(".oxi").join("agents");
1124 std::fs::create_dir_all(&agents_dir).unwrap();
1125 let git_dir = tmp.path().join(".git");
1126 std::fs::create_dir_all(&git_dir).unwrap();
1127 let sub = tmp.path().join("subdir");
1128 std::fs::create_dir_all(&sub).unwrap();
1129 assert_eq!(find_project_agents_dir(&sub), Some(agents_dir));
1131 }
1132
1133 #[test]
1134 fn test_find_project_agents_dir_stops_at_git() {
1135 let tmp = tempfile::tempdir().unwrap();
1136 let git_dir = tmp.path().join(".git");
1137 std::fs::create_dir_all(&git_dir).unwrap();
1138 assert_eq!(find_project_agents_dir(tmp.path()), None);
1140 }
1141
1142 #[test]
1143 fn test_agent_scope_default() {
1144 assert_eq!(AgentScope::default(), AgentScope::User);
1145 }
1146
1147 #[test]
1148 fn test_tools_parsing() {
1149 let tmp = tempfile::tempdir().unwrap();
1150 let file_path = tmp.path().join("agent.md");
1151 std::fs::write(
1152 &file_path,
1153 "---\ntools: read, grep, find, ls\n---\nSystem prompt.",
1154 )
1155 .unwrap();
1156 let config = parse_agent_file(&file_path).unwrap();
1157 let tools = config.tools.unwrap();
1158 assert_eq!(tools, vec!["read", "grep", "find", "ls"]);
1159 }
1160
1161 #[test]
1162 fn test_schema_structure() {
1163 let tool = SubagentTool::new();
1164 let schema = tool.parameters_schema();
1165 assert_eq!(schema["type"], "object");
1166 assert!(schema["properties"]["agent"].is_object());
1167 assert!(schema["properties"]["tasks"].is_object());
1168 assert!(schema["properties"]["chain"].is_object());
1169 assert!(schema["properties"]["agentScope"].is_object());
1170 }
1171
1172 #[test]
1173 fn test_truncate_output() {
1174 assert_eq!(truncate_output("hello", 10), "hello");
1175 assert_eq!(truncate_output("hello world foo", 5), "hello...");
1176 }
1177
1178 #[test]
1179 fn test_process_json_line_text_delta() {
1180 let mut result = SingleResult {
1181 agent: "test".into(),
1182 agent_source: "user".into(),
1183 task: "t".into(),
1184 exit_code: 0,
1185 output: String::new(),
1186 stderr: String::new(),
1187 usage: UsageStats::default(),
1188 model: None,
1189 stop_reason: None,
1190 error_message: None,
1191 step: None,
1192 };
1193 let mut text = String::new();
1194 process_json_line(
1195 r#"{"type":"text_delta","text":"hello"}"#,
1196 &mut result,
1197 &mut text,
1198 &None,
1199 );
1200 assert_eq!(text, "hello");
1201 }
1202
1203 #[test]
1204 fn test_process_json_line_usage() {
1205 let mut result = SingleResult {
1206 agent: "test".into(),
1207 agent_source: "user".into(),
1208 task: "t".into(),
1209 exit_code: 0,
1210 output: String::new(),
1211 stderr: String::new(),
1212 usage: UsageStats::default(),
1213 model: None,
1214 stop_reason: None,
1215 error_message: None,
1216 step: None,
1217 };
1218 let mut text = String::new();
1219 process_json_line(
1220 r#"{"type":"usage","input_tokens":100,"output_tokens":50}"#,
1221 &mut result,
1222 &mut text,
1223 &None,
1224 );
1225 assert_eq!(result.usage.input_tokens, 100);
1226 assert_eq!(result.usage.output_tokens, 50);
1227 assert_eq!(result.usage.turns, 1);
1228 }
1229}