1use crate::agent::{AgentConfig, AgentEvent, AgentLoop};
18use crate::llm::LlmClient;
19use crate::mcp::manager::McpManager;
20use crate::subagent::AgentRegistry;
21use crate::tools::types::{Tool, ToolContext, ToolOutput};
22use anyhow::{Context, Result};
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use std::path::PathBuf;
26use std::sync::Arc;
27use tokio::sync::broadcast;
28
29const TASK_OUTPUT_CONTEXT_LIMIT: usize = 4_000;
30const TASK_OUTPUT_CONTEXT_HEAD: usize = 3_000;
31const TASK_OUTPUT_CONTEXT_TAIL: usize = 800;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(deny_unknown_fields)]
36pub struct TaskParams {
37 pub agent: String,
39 pub description: String,
41 pub prompt: String,
43 #[serde(default)]
45 pub background: bool,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub max_steps: Option<usize>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct TaskResult {
54 pub output: String,
56 pub session_id: String,
58 pub agent: String,
60 pub success: bool,
62 pub task_id: String,
64}
65
66fn compact_task_output(output: &str) -> (String, bool) {
67 if output.len() <= TASK_OUTPUT_CONTEXT_LIMIT {
68 return (output.to_string(), false);
69 }
70
71 let head = crate::text::truncate_utf8(output, TASK_OUTPUT_CONTEXT_HEAD);
72 let tail_start = output
73 .char_indices()
74 .find_map(|(idx, _)| {
75 if output.len().saturating_sub(idx) <= TASK_OUTPUT_CONTEXT_TAIL {
76 Some(idx)
77 } else {
78 None
79 }
80 })
81 .unwrap_or(output.len());
82 let tail = &output[tail_start..];
83
84 (
85 format!(
86 "{}\n\n[{} bytes omitted from delegated task output]\n\n{}",
87 head,
88 output.len().saturating_sub(head.len() + tail.len()),
89 tail
90 ),
91 true,
92 )
93}
94
95fn task_artifact_id(result: &TaskResult) -> String {
96 format!("task-output:{}", result.task_id)
97}
98
99fn task_artifact_uri(result: &TaskResult) -> String {
100 format!(
101 "a3s://tasks/{}/runs/{}/output",
102 result.session_id, result.task_id
103 )
104}
105
106fn format_task_result_for_context(result: &TaskResult) -> (String, bool) {
107 let (output, truncated) = compact_task_output(&result.output);
108 let status = if result.success {
109 "completed"
110 } else {
111 "failed"
112 };
113 let artifact_id = task_artifact_id(result);
114 let artifact_uri = task_artifact_uri(result);
115 let mut formatted = format!(
116 "Task {status}: {}\nAgent: {}\nSession: {}\nTask ID: {}\nArtifact ID: {}\nArtifact URI: {}\n",
117 result.task_id, result.agent, result.session_id, result.task_id, artifact_id, artifact_uri
118 );
119 if truncated {
120 formatted.push_str(
121 "Output excerpt: truncated for parent context. Use the artifact URI or child run session/events if exact omitted content is needed.\n",
122 );
123 } else {
124 formatted.push_str("Output:\n");
125 }
126 formatted.push_str(&output);
127 (formatted, truncated)
128}
129
130pub struct TaskExecutor {
132 registry: Arc<AgentRegistry>,
134 llm_client: Arc<dyn LlmClient>,
136 workspace: String,
138 mcp_manager: Option<Arc<McpManager>>,
140 parent_context: Option<crate::child_run::ChildRunContext>,
142 max_parallel_tasks: usize,
143}
144
145impl TaskExecutor {
146 pub fn new(
148 registry: Arc<AgentRegistry>,
149 llm_client: Arc<dyn LlmClient>,
150 workspace: String,
151 ) -> Self {
152 Self {
153 registry,
154 llm_client,
155 workspace,
156 mcp_manager: None,
157 parent_context: None,
158 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
159 }
160 }
161
162 pub fn with_mcp(
164 registry: Arc<AgentRegistry>,
165 llm_client: Arc<dyn LlmClient>,
166 workspace: String,
167 mcp_manager: Arc<McpManager>,
168 ) -> Self {
169 Self {
170 registry,
171 llm_client,
172 workspace,
173 mcp_manager: Some(mcp_manager),
174 parent_context: None,
175 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
176 }
177 }
178
179 pub fn with_parent_context(mut self, ctx: crate::child_run::ChildRunContext) -> Self {
181 if let Some(max_parallel_tasks) = ctx.max_parallel_tasks {
182 self.max_parallel_tasks = max_parallel_tasks.max(1);
183 }
184 self.parent_context = Some(ctx);
185 self
186 }
187
188 pub fn with_max_parallel_tasks(mut self, max_parallel_tasks: usize) -> Self {
189 self.max_parallel_tasks = max_parallel_tasks.max(1);
190 self
191 }
192
193 pub async fn execute(
195 &self,
196 params: TaskParams,
197 event_tx: Option<broadcast::Sender<AgentEvent>>,
198 ) -> Result<TaskResult> {
199 let task_id = format!("task-{}", uuid::Uuid::new_v4());
200 let session_id = format!("task-run-{}", task_id);
201
202 let agent = self
203 .registry
204 .get(¶ms.agent)
205 .context(format!("Unknown agent type: '{}'", params.agent))?;
206
207 if let Some(ref tx) = event_tx {
208 let _ = tx.send(AgentEvent::SubagentStart {
209 task_id: task_id.clone(),
210 session_id: session_id.clone(),
211 parent_session_id: String::new(),
212 agent: params.agent.clone(),
213 description: params.description.clone(),
214 });
215 }
216
217 let child_executor = if let Some(ref parent_ctx) = self.parent_context {
220 if let Some(ref services) = parent_ctx.workspace_services {
221 crate::tools::ToolExecutor::new_with_workspace_services_and_artifact_limits(
222 self.workspace.clone(),
223 Arc::clone(services),
224 crate::tools::ArtifactStoreLimits::default(),
225 )
226 } else {
227 crate::tools::ToolExecutor::new(self.workspace.clone())
228 }
229 } else {
230 crate::tools::ToolExecutor::new(self.workspace.clone())
231 };
232
233 if let Some(ref mcp) = self.mcp_manager {
235 let all_tools = mcp.get_all_tools().await;
236 let mut by_server: std::collections::HashMap<
237 String,
238 Vec<crate::mcp::protocol::McpTool>,
239 > = std::collections::HashMap::new();
240 for (server, tool) in all_tools {
241 by_server.entry(server).or_default().push(tool);
242 }
243 for (server_name, tools) in by_server {
244 let wrappers =
245 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp));
246 for wrapper in wrappers {
247 child_executor.register_dynamic_tool(wrapper);
248 }
249 }
250 }
251
252 let child_executor = Arc::new(child_executor);
253
254 let mut child_config = AgentConfig {
255 tools: child_executor.definitions(),
256 ..AgentConfig::default()
257 };
258 agent.apply_to(&mut child_config);
259 if let Some(ref parent_ctx) = self.parent_context {
260 parent_ctx.apply_to(&mut child_config);
261 }
262 if let Some(max_steps) = params.max_steps {
263 child_config.max_tool_rounds = max_steps;
264 }
265
266 let mut tool_context =
267 ToolContext::new(PathBuf::from(&self.workspace)).with_session_id(session_id.clone());
268 if let Some(ref parent_ctx) = self.parent_context {
269 if let Some(ref services) = parent_ctx.workspace_services {
270 tool_context = tool_context.with_workspace_services(Arc::clone(services));
271 }
272 }
273
274 let agent_loop = AgentLoop::new(
275 Arc::clone(&self.llm_client),
276 child_executor,
277 tool_context,
278 child_config,
279 );
280
281 let child_event_tx = if let Some(ref broadcast_tx) = event_tx {
283 let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100);
284 let broadcast_tx_clone = broadcast_tx.clone();
285
286 tokio::spawn(async move {
288 while let Some(event) = mpsc_rx.recv().await {
289 let _ = broadcast_tx_clone.send(event);
290 }
291 });
292
293 Some(mpsc_tx)
294 } else {
295 None
296 };
297
298 let (output, success) = match agent_loop
299 .execute(&[], ¶ms.prompt, child_event_tx)
300 .await
301 {
302 Ok(result) => (result.text, true),
303 Err(e) => (format!("Task failed: {}", e), false),
304 };
305
306 if let Some(ref tx) = event_tx {
307 let _ = tx.send(AgentEvent::SubagentEnd {
308 task_id: task_id.clone(),
309 session_id: session_id.clone(),
310 agent: params.agent.clone(),
311 output: output.clone(),
312 success,
313 });
314 }
315
316 Ok(TaskResult {
317 output,
318 session_id,
319 agent: params.agent,
320 success,
321 task_id,
322 })
323 }
324
325 pub fn execute_background(
329 self: Arc<Self>,
330 params: TaskParams,
331 event_tx: Option<broadcast::Sender<AgentEvent>>,
332 ) -> String {
333 let task_id = format!("task-{}", uuid::Uuid::new_v4());
334 let task_id_clone = task_id.clone();
335
336 tokio::spawn(async move {
337 if let Err(e) = self.execute(params, event_tx).await {
338 tracing::error!("Background task {} failed: {}", task_id_clone, e);
339 }
340 });
341
342 task_id
343 }
344
345 pub async fn execute_parallel(
350 self: &Arc<Self>,
351 tasks: Vec<TaskParams>,
352 event_tx: Option<broadcast::Sender<AgentEvent>>,
353 ) -> Vec<TaskResult> {
354 let fallback_agents = tasks
355 .iter()
356 .map(|params| params.agent.clone())
357 .collect::<Vec<_>>();
358 let executor = Arc::clone(self);
359 let results = crate::ordered_parallel::run_ordered_parallel_with_limit(
360 tasks,
361 self.max_parallel_tasks,
362 move |_idx, params| {
363 let executor = Arc::clone(&executor);
364 let tx = event_tx.clone();
365 async move {
366 match executor.execute(params.clone(), tx).await {
367 Ok(result) => result,
368 Err(e) => TaskResult {
369 output: format!("Task failed: {}", e),
370 session_id: String::new(),
371 agent: params.agent,
372 success: false,
373 task_id: format!("task-{}", uuid::Uuid::new_v4()),
374 },
375 }
376 }
377 },
378 )
379 .await;
380
381 results
382 .into_iter()
383 .map(|result| match result.output {
384 Ok(task_result) => task_result,
385 Err(error) => {
386 tracing::error!("Parallel task failed: {}", error);
387 TaskResult {
388 output: format!("Task failed: {}", error),
389 session_id: String::new(),
390 agent: fallback_agents
391 .get(result.index)
392 .cloned()
393 .unwrap_or_else(|| "unknown".to_string()),
394 success: false,
395 task_id: format!("task-{}", uuid::Uuid::new_v4()),
396 }
397 }
398 })
399 .collect()
400 }
401}
402
403pub fn task_params_schema() -> serde_json::Value {
405 serde_json::json!({
406 "type": "object",
407 "additionalProperties": false,
408 "properties": {
409 "agent": {
410 "type": "string",
411 "description": "Required. Canonical agent type to use (for example: explore, general, plan, verification, review). Always provide this exact field name: 'agent'."
412 },
413 "description": {
414 "type": "string",
415 "description": "Required. Short task label for display and tracking. Always provide this exact field name: 'description'."
416 },
417 "prompt": {
418 "type": "string",
419 "description": "Required. Detailed instruction for the delegated child run. Always provide this exact field name: 'prompt'."
420 },
421 "background": {
422 "type": "boolean",
423 "description": "Optional. Run the task in the background. Default: false.",
424 "default": false
425 },
426 "max_steps": {
427 "type": "integer",
428 "description": "Optional. Maximum number of steps for this task."
429 }
430 },
431 "required": ["agent", "description", "prompt"],
432 "examples": [
433 {
434 "agent": "explore",
435 "description": "Find Rust files",
436 "prompt": "Search the workspace for Rust files and summarize the layout."
437 },
438 {
439 "agent": "general",
440 "description": "Investigate test failure",
441 "prompt": "Inspect the failing tests and explain the root cause.",
442 "max_steps": 6
443 }
444 ]
445 })
446}
447
448pub struct TaskTool {
451 executor: Arc<TaskExecutor>,
452}
453
454impl TaskTool {
455 pub fn new(executor: Arc<TaskExecutor>) -> Self {
457 Self { executor }
458 }
459}
460
461#[async_trait]
462impl Tool for TaskTool {
463 fn name(&self) -> &str {
464 "task"
465 }
466
467 fn description(&self) -> &str {
468 "Delegate a bounded task to a specialized child run. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
469 }
470
471 fn parameters(&self) -> serde_json::Value {
472 task_params_schema()
473 }
474
475 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
476 let params: TaskParams =
477 serde_json::from_value(args.clone()).context("Invalid task parameters")?;
478
479 if params.background {
480 let task_id =
481 Arc::clone(&self.executor).execute_background(params, ctx.agent_event_tx.clone());
482 return Ok(ToolOutput::success(format!(
483 "Task started in background. Task ID: {}",
484 task_id
485 )));
486 }
487
488 let result = self
489 .executor
490 .execute(params, ctx.agent_event_tx.clone())
491 .await?;
492 let (content, truncated) = format_task_result_for_context(&result);
493 let metadata = serde_json::json!({
494 "task_id": result.task_id,
495 "session_id": result.session_id,
496 "agent": result.agent,
497 "success": result.success,
498 "output_bytes": result.output.len(),
499 "truncated_for_context": truncated,
500 "artifact_id": task_artifact_id(&result),
501 "artifact_uri": task_artifact_uri(&result),
502 });
503
504 if result.success {
505 Ok(ToolOutput::success(content).with_metadata(metadata))
506 } else {
507 Ok(ToolOutput::error(content).with_metadata(metadata))
508 }
509 }
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
514#[serde(deny_unknown_fields)]
515pub struct ParallelTaskParams {
516 pub tasks: Vec<TaskParams>,
518}
519
520pub fn parallel_task_params_schema() -> serde_json::Value {
522 serde_json::json!({
523 "type": "object",
524 "additionalProperties": false,
525 "properties": {
526 "tasks": {
527 "type": "array",
528 "description": "List of tasks to execute in parallel. Each task runs as an independent delegated child run concurrently.",
529 "items": {
530 "type": "object",
531 "additionalProperties": false,
532 "properties": {
533 "agent": {
534 "type": "string",
535 "description": "Required. Canonical agent type for this task."
536 },
537 "description": {
538 "type": "string",
539 "description": "Required. Short task label for display and tracking."
540 },
541 "prompt": {
542 "type": "string",
543 "description": "Required. Detailed instruction for the delegated child run."
544 }
545 },
546 "required": ["agent", "description", "prompt"]
547 },
548 "minItems": 1
549 }
550 },
551 "required": ["tasks"],
552 "examples": [
553 {
554 "tasks": [
555 {
556 "agent": "explore",
557 "description": "Find Rust files",
558 "prompt": "List Rust files under src/."
559 },
560 {
561 "agent": "explore",
562 "description": "Find tests",
563 "prompt": "List test files and summarize their purpose."
564 }
565 ]
566 }
567 ]
568 })
569}
570
571pub struct ParallelTaskTool {
575 executor: Arc<TaskExecutor>,
576}
577
578impl ParallelTaskTool {
579 pub fn new(executor: Arc<TaskExecutor>) -> Self {
581 Self { executor }
582 }
583}
584
585#[async_trait]
586impl Tool for ParallelTaskTool {
587 fn name(&self) -> &str {
588 "parallel_task"
589 }
590
591 fn description(&self) -> &str {
592 "Execute multiple delegated child runs in parallel. All tasks run concurrently and results are returned when all complete. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
593 }
594
595 fn parameters(&self) -> serde_json::Value {
596 parallel_task_params_schema()
597 }
598
599 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
600 let params: ParallelTaskParams =
601 serde_json::from_value(args.clone()).context("Invalid parallel task parameters")?;
602
603 if params.tasks.is_empty() {
604 return Ok(ToolOutput::error("No tasks provided".to_string()));
605 }
606
607 let task_count = params.tasks.len();
608
609 let results = self
610 .executor
611 .execute_parallel(params.tasks, ctx.agent_event_tx.clone())
612 .await;
613
614 let mut output = format!("Executed {} tasks in parallel:\n\n", task_count);
616 let mut metadata_results = Vec::new();
617 for (i, result) in results.iter().enumerate() {
618 let status = if result.success { "[OK]" } else { "[ERR]" };
619 let (formatted, truncated) = format_task_result_for_context(result);
620 metadata_results.push(serde_json::json!({
621 "task_id": result.task_id,
622 "session_id": result.session_id,
623 "agent": result.agent,
624 "success": result.success,
625 "output": formatted.clone(),
626 "output_bytes": result.output.len(),
627 "truncated_for_context": truncated,
628 "artifact_id": task_artifact_id(result),
629 "artifact_uri": task_artifact_uri(result),
630 }));
631 output.push_str(&format!(
632 "--- Task {} ({}) {} ---\n{}\n\n",
633 i + 1,
634 result.agent,
635 status,
636 formatted
637 ));
638 }
639
640 let all_success = results.iter().all(|result| result.success);
641 let output = if all_success {
642 ToolOutput::success(output)
643 } else {
644 ToolOutput::error(output)
645 };
646
647 Ok(output.with_metadata(serde_json::json!({
648 "task_count": task_count,
649 "results": metadata_results,
650 })))
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_task_params_deserialize() {
660 let json = r#"{
661 "agent": "explore",
662 "description": "Find auth code",
663 "prompt": "Search for authentication files"
664 }"#;
665
666 let params: TaskParams = serde_json::from_str(json).unwrap();
667 assert_eq!(params.agent, "explore");
668 assert_eq!(params.description, "Find auth code");
669 assert!(!params.background);
670 }
671
672 #[test]
673 fn test_task_params_with_background() {
674 let json = r#"{
675 "agent": "general",
676 "description": "Long task",
677 "prompt": "Do something complex",
678 "background": true
679 }"#;
680
681 let params: TaskParams = serde_json::from_str(json).unwrap();
682 assert!(params.background);
683 }
684
685 #[test]
686 fn test_task_params_with_max_steps() {
687 let json = r#"{
688 "agent": "plan",
689 "description": "Planning task",
690 "prompt": "Create a plan",
691 "max_steps": 10
692 }"#;
693
694 let params: TaskParams = serde_json::from_str(json).unwrap();
695 assert_eq!(params.agent, "plan");
696 assert_eq!(params.max_steps, Some(10));
697 assert!(!params.background);
698 }
699
700 #[test]
701 fn test_task_params_all_fields() {
702 let json = r#"{
703 "agent": "general",
704 "description": "Complex task",
705 "prompt": "Do everything",
706 "background": true,
707 "max_steps": 20
708 }"#;
709
710 let params: TaskParams = serde_json::from_str(json).unwrap();
711 assert_eq!(params.agent, "general");
712 assert_eq!(params.description, "Complex task");
713 assert_eq!(params.prompt, "Do everything");
714 assert!(params.background);
715 assert_eq!(params.max_steps, Some(20));
716 }
717
718 #[test]
719 fn test_task_params_missing_required_field() {
720 let json = r#"{
721 "agent": "explore",
722 "description": "Missing prompt"
723 }"#;
724
725 let result: Result<TaskParams, _> = serde_json::from_str(json);
726 assert!(result.is_err());
727 }
728
729 #[test]
730 fn test_task_params_serialize() {
731 let params = TaskParams {
732 agent: "explore".to_string(),
733 description: "Test task".to_string(),
734 prompt: "Test prompt".to_string(),
735 background: false,
736 max_steps: Some(5),
737 };
738
739 let json = serde_json::to_string(¶ms).unwrap();
740 assert!(json.contains("explore"));
741 assert!(json.contains("Test task"));
742 assert!(json.contains("Test prompt"));
743 }
744
745 #[test]
746 fn test_task_params_clone() {
747 let params = TaskParams {
748 agent: "explore".to_string(),
749 description: "Test".to_string(),
750 prompt: "Prompt".to_string(),
751 background: true,
752 max_steps: None,
753 };
754
755 let cloned = params.clone();
756 assert_eq!(params.agent, cloned.agent);
757 assert_eq!(params.description, cloned.description);
758 assert_eq!(params.background, cloned.background);
759 }
760
761 #[test]
762 fn test_task_result_serialize() {
763 let result = TaskResult {
764 output: "Found 5 files".to_string(),
765 session_id: "session-123".to_string(),
766 agent: "explore".to_string(),
767 success: true,
768 task_id: "task-456".to_string(),
769 };
770
771 let json = serde_json::to_string(&result).unwrap();
772 assert!(json.contains("Found 5 files"));
773 assert!(json.contains("explore"));
774 }
775
776 #[test]
777 fn test_task_result_deserialize() {
778 let json = r#"{
779 "output": "Task completed",
780 "session_id": "sess-789",
781 "agent": "general",
782 "success": false,
783 "task_id": "task-123"
784 }"#;
785
786 let result: TaskResult = serde_json::from_str(json).unwrap();
787 assert_eq!(result.output, "Task completed");
788 assert_eq!(result.session_id, "sess-789");
789 assert_eq!(result.agent, "general");
790 assert!(!result.success);
791 assert_eq!(result.task_id, "task-123");
792 }
793
794 #[test]
795 fn test_task_result_clone() {
796 let result = TaskResult {
797 output: "Output".to_string(),
798 session_id: "session-1".to_string(),
799 agent: "explore".to_string(),
800 success: true,
801 task_id: "task-1".to_string(),
802 };
803
804 let cloned = result.clone();
805 assert_eq!(result.output, cloned.output);
806 assert_eq!(result.success, cloned.success);
807 }
808
809 #[test]
810 fn test_compact_task_output_preserves_small_output() {
811 let (output, truncated) = compact_task_output("short result");
812 assert_eq!(output, "short result");
813 assert!(!truncated);
814 }
815
816 #[test]
817 fn test_format_task_result_for_context_truncates_large_output() {
818 let result = TaskResult {
819 output: format!("{}TAIL", "x".repeat(TASK_OUTPUT_CONTEXT_LIMIT + 500)),
820 session_id: "session-1".to_string(),
821 agent: "explore".to_string(),
822 success: true,
823 task_id: "task-1".to_string(),
824 };
825
826 let (formatted, truncated) = format_task_result_for_context(&result);
827 assert!(truncated);
828 assert!(formatted.contains("Output excerpt"));
829 assert!(formatted.contains("bytes omitted"));
830 assert!(formatted.contains("Artifact ID: task-output:task-1"));
831 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
832 assert!(formatted.contains("TAIL"));
833 assert!(formatted.len() < result.output.len());
834 }
835
836 #[test]
837 fn test_task_artifact_reference_is_stable() {
838 let result = TaskResult {
839 output: "done".to_string(),
840 session_id: "session-1".to_string(),
841 agent: "explore".to_string(),
842 success: true,
843 task_id: "task-1".to_string(),
844 };
845
846 assert_eq!(task_artifact_id(&result), "task-output:task-1");
847 assert_eq!(
848 task_artifact_uri(&result),
849 "a3s://tasks/session-1/runs/task-1/output"
850 );
851
852 let (formatted, truncated) = format_task_result_for_context(&result);
853 assert!(!truncated);
854 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
855 }
856
857 #[test]
858 fn test_task_params_schema() {
859 let schema = task_params_schema();
860 assert_eq!(schema["type"], "object");
861 assert_eq!(schema["additionalProperties"], false);
862 assert!(schema["properties"]["agent"].is_object());
863 assert!(schema["properties"]["prompt"].is_object());
864 }
865
866 #[test]
867 fn test_task_params_schema_required_fields() {
868 let schema = task_params_schema();
869 let required = schema["required"].as_array().unwrap();
870 assert!(required.contains(&serde_json::json!("agent")));
871 assert!(required.contains(&serde_json::json!("description")));
872 assert!(required.contains(&serde_json::json!("prompt")));
873 }
874
875 #[test]
876 fn test_task_params_schema_properties() {
877 let schema = task_params_schema();
878 let props = &schema["properties"];
879
880 assert_eq!(props["agent"]["type"], "string");
881 assert_eq!(props["description"]["type"], "string");
882 assert_eq!(props["prompt"]["type"], "string");
883 assert_eq!(props["background"]["type"], "boolean");
884 assert_eq!(props["background"]["default"], false);
885 assert_eq!(props["max_steps"]["type"], "integer");
886 }
887
888 #[test]
889 fn test_task_params_schema_descriptions() {
890 let schema = task_params_schema();
891 let props = &schema["properties"];
892
893 assert!(props["agent"]["description"].is_string());
894 assert!(props["description"]["description"].is_string());
895 assert!(props["prompt"]["description"].is_string());
896 assert!(props["background"]["description"].is_string());
897 assert!(props["max_steps"]["description"].is_string());
898 }
899
900 #[test]
901 fn test_task_params_default_background() {
902 let params = TaskParams {
903 agent: "explore".to_string(),
904 description: "Test".to_string(),
905 prompt: "Test prompt".to_string(),
906 background: false,
907 max_steps: None,
908 };
909 assert!(!params.background);
910 }
911
912 #[test]
913 fn test_task_params_serialize_skip_none() {
914 let params = TaskParams {
915 agent: "explore".to_string(),
916 description: "Test".to_string(),
917 prompt: "Test prompt".to_string(),
918 background: false,
919 max_steps: None,
920 };
921 let json = serde_json::to_string(¶ms).unwrap();
922 assert!(!json.contains("max_steps"));
924 }
925
926 #[test]
927 fn test_task_params_serialize_with_max_steps() {
928 let params = TaskParams {
929 agent: "explore".to_string(),
930 description: "Test".to_string(),
931 prompt: "Test prompt".to_string(),
932 background: false,
933 max_steps: Some(15),
934 };
935 let json = serde_json::to_string(¶ms).unwrap();
936 assert!(json.contains("max_steps"));
937 assert!(json.contains("15"));
938 }
939
940 #[test]
941 fn test_task_result_success_true() {
942 let result = TaskResult {
943 output: "Success".to_string(),
944 session_id: "sess-1".to_string(),
945 agent: "explore".to_string(),
946 success: true,
947 task_id: "task-1".to_string(),
948 };
949 assert!(result.success);
950 }
951
952 #[test]
953 fn test_task_result_success_false() {
954 let result = TaskResult {
955 output: "Failed".to_string(),
956 session_id: "sess-1".to_string(),
957 agent: "explore".to_string(),
958 success: false,
959 task_id: "task-1".to_string(),
960 };
961 assert!(!result.success);
962 }
963
964 #[test]
965 fn test_task_params_empty_strings() {
966 let params = TaskParams {
967 agent: "".to_string(),
968 description: "".to_string(),
969 prompt: "".to_string(),
970 background: false,
971 max_steps: None,
972 };
973 let json = serde_json::to_string(¶ms).unwrap();
974 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
975 assert_eq!(deserialized.agent, "");
976 assert_eq!(deserialized.description, "");
977 assert_eq!(deserialized.prompt, "");
978 }
979
980 #[test]
981 fn test_task_result_empty_output() {
982 let result = TaskResult {
983 output: "".to_string(),
984 session_id: "sess-1".to_string(),
985 agent: "explore".to_string(),
986 success: true,
987 task_id: "task-1".to_string(),
988 };
989 assert_eq!(result.output, "");
990 }
991
992 #[test]
993 fn test_task_params_debug_format() {
994 let params = TaskParams {
995 agent: "explore".to_string(),
996 description: "Test".to_string(),
997 prompt: "Test prompt".to_string(),
998 background: false,
999 max_steps: None,
1000 };
1001 let debug_str = format!("{:?}", params);
1002 assert!(debug_str.contains("explore"));
1003 assert!(debug_str.contains("Test"));
1004 }
1005
1006 #[test]
1007 fn test_task_result_debug_format() {
1008 let result = TaskResult {
1009 output: "Output".to_string(),
1010 session_id: "sess-1".to_string(),
1011 agent: "explore".to_string(),
1012 success: true,
1013 task_id: "task-1".to_string(),
1014 };
1015 let debug_str = format!("{:?}", result);
1016 assert!(debug_str.contains("Output"));
1017 assert!(debug_str.contains("explore"));
1018 }
1019
1020 #[test]
1021 fn test_task_params_roundtrip() {
1022 let original = TaskParams {
1023 agent: "general".to_string(),
1024 description: "Roundtrip test".to_string(),
1025 prompt: "Test roundtrip serialization".to_string(),
1026 background: true,
1027 max_steps: Some(42),
1028 };
1029 let json = serde_json::to_string(&original).unwrap();
1030 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1031 assert_eq!(original.agent, deserialized.agent);
1032 assert_eq!(original.description, deserialized.description);
1033 assert_eq!(original.prompt, deserialized.prompt);
1034 assert_eq!(original.background, deserialized.background);
1035 assert_eq!(original.max_steps, deserialized.max_steps);
1036 }
1037
1038 #[test]
1039 fn test_task_result_roundtrip() {
1040 let original = TaskResult {
1041 output: "Roundtrip output".to_string(),
1042 session_id: "sess-roundtrip".to_string(),
1043 agent: "plan".to_string(),
1044 success: false,
1045 task_id: "task-roundtrip".to_string(),
1046 };
1047 let json = serde_json::to_string(&original).unwrap();
1048 let deserialized: TaskResult = serde_json::from_str(&json).unwrap();
1049 assert_eq!(original.output, deserialized.output);
1050 assert_eq!(original.session_id, deserialized.session_id);
1051 assert_eq!(original.agent, deserialized.agent);
1052 assert_eq!(original.success, deserialized.success);
1053 assert_eq!(original.task_id, deserialized.task_id);
1054 }
1055
1056 #[test]
1057 fn test_parallel_task_params_deserialize() {
1058 let json = r#"{
1059 "tasks": [
1060 { "agent": "explore", "description": "Find auth", "prompt": "Search auth files" },
1061 { "agent": "general", "description": "Fix bug", "prompt": "Fix the login bug" }
1062 ]
1063 }"#;
1064
1065 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1066 assert_eq!(params.tasks.len(), 2);
1067 assert_eq!(params.tasks[0].agent, "explore");
1068 assert_eq!(params.tasks[1].agent, "general");
1069 }
1070
1071 #[test]
1072 fn test_parallel_task_params_single_task() {
1073 let json = r#"{
1074 "tasks": [
1075 { "agent": "plan", "description": "Plan work", "prompt": "Create a plan" }
1076 ]
1077 }"#;
1078
1079 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1080 assert_eq!(params.tasks.len(), 1);
1081 }
1082
1083 #[test]
1084 fn test_parallel_task_params_empty_tasks() {
1085 let json = r#"{ "tasks": [] }"#;
1086 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1087 assert!(params.tasks.is_empty());
1088 }
1089
1090 #[test]
1091 fn test_parallel_task_params_missing_tasks() {
1092 let json = r#"{}"#;
1093 let result: Result<ParallelTaskParams, _> = serde_json::from_str(json);
1094 assert!(result.is_err());
1095 }
1096
1097 #[test]
1098 fn test_parallel_task_params_serialize() {
1099 let params = ParallelTaskParams {
1100 tasks: vec![
1101 TaskParams {
1102 agent: "explore".to_string(),
1103 description: "Task 1".to_string(),
1104 prompt: "Prompt 1".to_string(),
1105 background: false,
1106 max_steps: None,
1107 },
1108 TaskParams {
1109 agent: "general".to_string(),
1110 description: "Task 2".to_string(),
1111 prompt: "Prompt 2".to_string(),
1112 background: false,
1113 max_steps: Some(10),
1114 },
1115 ],
1116 };
1117 let json = serde_json::to_string(¶ms).unwrap();
1118 assert!(json.contains("explore"));
1119 assert!(json.contains("general"));
1120 assert!(json.contains("Prompt 1"));
1121 assert!(json.contains("Prompt 2"));
1122 }
1123
1124 #[test]
1125 fn test_parallel_task_params_roundtrip() {
1126 let original = ParallelTaskParams {
1127 tasks: vec![
1128 TaskParams {
1129 agent: "explore".to_string(),
1130 description: "Explore".to_string(),
1131 prompt: "Find files".to_string(),
1132 background: false,
1133 max_steps: None,
1134 },
1135 TaskParams {
1136 agent: "plan".to_string(),
1137 description: "Plan".to_string(),
1138 prompt: "Make plan".to_string(),
1139 background: false,
1140 max_steps: Some(5),
1141 },
1142 ],
1143 };
1144 let json = serde_json::to_string(&original).unwrap();
1145 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1146 assert_eq!(original.tasks.len(), deserialized.tasks.len());
1147 assert_eq!(original.tasks[0].agent, deserialized.tasks[0].agent);
1148 assert_eq!(original.tasks[1].agent, deserialized.tasks[1].agent);
1149 assert_eq!(original.tasks[1].max_steps, deserialized.tasks[1].max_steps);
1150 }
1151
1152 #[test]
1153 fn test_parallel_task_params_clone() {
1154 let params = ParallelTaskParams {
1155 tasks: vec![TaskParams {
1156 agent: "explore".to_string(),
1157 description: "Test".to_string(),
1158 prompt: "Prompt".to_string(),
1159 background: false,
1160 max_steps: None,
1161 }],
1162 };
1163 let cloned = params.clone();
1164 assert_eq!(params.tasks.len(), cloned.tasks.len());
1165 assert_eq!(params.tasks[0].agent, cloned.tasks[0].agent);
1166 }
1167
1168 #[test]
1169 fn test_parallel_task_params_schema() {
1170 let schema = parallel_task_params_schema();
1171 assert_eq!(schema["type"], "object");
1172 assert_eq!(schema["additionalProperties"], false);
1173 assert!(schema["properties"]["tasks"].is_object());
1174 assert_eq!(schema["properties"]["tasks"]["type"], "array");
1175 assert_eq!(schema["properties"]["tasks"]["minItems"], 1);
1176 }
1177
1178 #[test]
1179 fn test_parallel_task_params_schema_required() {
1180 let schema = parallel_task_params_schema();
1181 let required = schema["required"].as_array().unwrap();
1182 assert!(required.contains(&serde_json::json!("tasks")));
1183 }
1184
1185 #[test]
1186 fn test_parallel_task_params_schema_items() {
1187 let schema = parallel_task_params_schema();
1188 let items = &schema["properties"]["tasks"]["items"];
1189 assert_eq!(items["type"], "object");
1190 assert_eq!(items["additionalProperties"], false);
1191 let item_required = items["required"].as_array().unwrap();
1192 assert!(item_required.contains(&serde_json::json!("agent")));
1193 assert!(item_required.contains(&serde_json::json!("description")));
1194 assert!(item_required.contains(&serde_json::json!("prompt")));
1195 }
1196
1197 #[test]
1198 fn test_task_schema_examples_use_delegation_core() {
1199 let task = task_params_schema();
1200 let task_examples = task["examples"].as_array().unwrap();
1201 assert_eq!(task_examples[0]["agent"], "explore");
1202 assert!(task_examples[0].get("task").is_none());
1203
1204 let parallel = parallel_task_params_schema();
1205 let parallel_examples = parallel["examples"].as_array().unwrap();
1206 assert!(!parallel_examples[0]["tasks"].as_array().unwrap().is_empty());
1207 }
1208
1209 #[test]
1210 fn test_parallel_task_params_debug() {
1211 let params = ParallelTaskParams {
1212 tasks: vec![TaskParams {
1213 agent: "explore".to_string(),
1214 description: "Debug test".to_string(),
1215 prompt: "Test".to_string(),
1216 background: false,
1217 max_steps: None,
1218 }],
1219 };
1220 let debug_str = format!("{:?}", params);
1221 assert!(debug_str.contains("explore"));
1222 assert!(debug_str.contains("Debug test"));
1223 }
1224
1225 #[test]
1226 fn test_parallel_task_params_large_count() {
1227 let tasks: Vec<TaskParams> = (0..150)
1229 .map(|i| TaskParams {
1230 agent: "explore".to_string(),
1231 description: format!("Task {}", i),
1232 prompt: format!("Prompt for task {}", i),
1233 background: false,
1234 max_steps: Some(10),
1235 })
1236 .collect();
1237
1238 let params = ParallelTaskParams { tasks };
1239 let json = serde_json::to_string(¶ms).unwrap();
1240 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1241 assert_eq!(deserialized.tasks.len(), 150);
1242 assert_eq!(deserialized.tasks[0].description, "Task 0");
1243 assert_eq!(deserialized.tasks[149].description, "Task 149");
1244 }
1245
1246 #[test]
1247 fn test_task_params_max_steps_zero() {
1248 let params = TaskParams {
1250 agent: "explore".to_string(),
1251 description: "Edge case".to_string(),
1252 prompt: "Zero steps".to_string(),
1253 background: false,
1254 max_steps: Some(0),
1255 };
1256 let json = serde_json::to_string(¶ms).unwrap();
1257 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1258 assert_eq!(deserialized.max_steps, Some(0));
1259 }
1260
1261 #[test]
1262 fn test_parallel_task_params_all_background() {
1263 let tasks: Vec<TaskParams> = (0..5)
1264 .map(|i| TaskParams {
1265 agent: "general".to_string(),
1266 description: format!("BG task {}", i),
1267 prompt: "Run in background".to_string(),
1268 background: true,
1269 max_steps: None,
1270 })
1271 .collect();
1272 let params = ParallelTaskParams { tasks };
1273 for task in ¶ms.tasks {
1274 assert!(task.background);
1275 }
1276 }
1277
1278 #[test]
1279 fn test_task_params_rejects_permissive_field() {
1280 let json = r#"{
1281 "agent": "general",
1282 "description": "Legacy field rejection",
1283 "prompt": "Verify legacy fields are rejected",
1284 "permissive": true
1285 }"#;
1286
1287 let result: Result<TaskParams, _> = serde_json::from_str(json);
1288 assert!(result.is_err());
1289 }
1290
1291 #[test]
1292 fn test_task_params_schema_hides_permissive_field() {
1293 let schema = task_params_schema();
1294 let props = &schema["properties"];
1295
1296 assert!(props.get("permissive").is_none());
1297 }
1298
1299 use crate::agent::tests::MockLlmClient;
1304 use crate::llm::{ContentBlock, LlmResponse, Message, StreamEvent, TokenUsage, ToolDefinition};
1305 use crate::permissions::PermissionPolicy;
1306 use crate::subagent::AgentRegistry;
1307 use std::sync::atomic::{AtomicUsize, Ordering};
1308 use std::time::Duration;
1309 use tokio::sync::{mpsc, Barrier};
1310
1311 fn text_response(text: impl Into<String>) -> LlmResponse {
1312 LlmResponse {
1313 message: Message {
1314 role: "assistant".to_string(),
1315 content: vec![ContentBlock::Text { text: text.into() }],
1316 reasoning_content: None,
1317 },
1318 usage: TokenUsage {
1319 prompt_tokens: 10,
1320 completion_tokens: 5,
1321 total_tokens: 15,
1322 cache_read_tokens: None,
1323 cache_write_tokens: None,
1324 },
1325 stop_reason: Some("end_turn".to_string()),
1326 meta: None,
1327 }
1328 }
1329
1330 fn pre_analysis_response(messages: &[Message]) -> LlmResponse {
1331 let prompt = last_text(messages);
1332 let response = serde_json::json!({
1333 "intent": "GeneralPurpose",
1334 "requires_planning": false,
1335 "goal": {
1336 "description": prompt,
1337 "success_criteria": []
1338 },
1339 "execution_plan": {
1340 "complexity": "Simple",
1341 "steps": [{
1342 "id": "step-1",
1343 "description": prompt,
1344 "tool": null,
1345 "dependencies": [],
1346 "success_criteria": "Complete the request"
1347 }],
1348 "required_tools": []
1349 },
1350 "optimized_input": prompt
1351 });
1352 text_response(response.to_string())
1353 }
1354
1355 fn last_text(messages: &[Message]) -> String {
1356 messages
1357 .last()
1358 .and_then(|message| {
1359 message.content.iter().find_map(|block| {
1360 if let ContentBlock::Text { text } = block {
1361 Some(text.clone())
1362 } else {
1363 None
1364 }
1365 })
1366 })
1367 .unwrap_or_default()
1368 }
1369
1370 struct StaticLlmClient {
1371 text: String,
1372 }
1373
1374 impl StaticLlmClient {
1375 fn new(text: impl Into<String>) -> Self {
1376 Self { text: text.into() }
1377 }
1378 }
1379
1380 #[async_trait::async_trait]
1381 impl LlmClient for StaticLlmClient {
1382 async fn complete(
1383 &self,
1384 messages: &[Message],
1385 system: Option<&str>,
1386 _tools: &[ToolDefinition],
1387 ) -> Result<LlmResponse> {
1388 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1389 return Ok(pre_analysis_response(messages));
1390 }
1391 Ok(text_response(self.text.clone()))
1392 }
1393
1394 async fn complete_streaming(
1395 &self,
1396 _messages: &[Message],
1397 _system: Option<&str>,
1398 _tools: &[ToolDefinition],
1399 _cancel_token: tokio_util::sync::CancellationToken,
1400 ) -> Result<mpsc::Receiver<StreamEvent>> {
1401 anyhow::bail!("streaming is not used by task executor tests")
1402 }
1403 }
1404
1405 struct ConcurrentLlmClient {
1406 barrier: Arc<Barrier>,
1407 active: AtomicUsize,
1408 max_active: AtomicUsize,
1409 }
1410
1411 impl ConcurrentLlmClient {
1412 fn new(task_count: usize) -> Self {
1413 Self {
1414 barrier: Arc::new(Barrier::new(task_count)),
1415 active: AtomicUsize::new(0),
1416 max_active: AtomicUsize::new(0),
1417 }
1418 }
1419
1420 fn max_active(&self) -> usize {
1421 self.max_active.load(Ordering::SeqCst)
1422 }
1423
1424 fn record_active(&self) {
1425 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1426 let mut observed = self.max_active.load(Ordering::SeqCst);
1427 while active > observed {
1428 match self.max_active.compare_exchange(
1429 observed,
1430 active,
1431 Ordering::SeqCst,
1432 Ordering::SeqCst,
1433 ) {
1434 Ok(_) => break,
1435 Err(next) => observed = next,
1436 }
1437 }
1438 }
1439 }
1440
1441 struct LimitedConcurrencyLlmClient {
1442 active: AtomicUsize,
1443 max_active: AtomicUsize,
1444 }
1445
1446 impl LimitedConcurrencyLlmClient {
1447 fn new() -> Self {
1448 Self {
1449 active: AtomicUsize::new(0),
1450 max_active: AtomicUsize::new(0),
1451 }
1452 }
1453
1454 fn max_active(&self) -> usize {
1455 self.max_active.load(Ordering::SeqCst)
1456 }
1457
1458 fn record_active(&self) {
1459 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1460 self.max_active.fetch_max(active, Ordering::SeqCst);
1461 }
1462 }
1463
1464 #[async_trait::async_trait]
1465 impl LlmClient for LimitedConcurrencyLlmClient {
1466 async fn complete(
1467 &self,
1468 messages: &[Message],
1469 system: Option<&str>,
1470 _tools: &[ToolDefinition],
1471 ) -> Result<LlmResponse> {
1472 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1473 return Ok(pre_analysis_response(messages));
1474 }
1475
1476 let prompt = last_text(messages);
1477 self.record_active();
1478 tokio::time::sleep(Duration::from_millis(40)).await;
1479 self.active.fetch_sub(1, Ordering::SeqCst);
1480 Ok(text_response(format!("completed: {prompt}")))
1481 }
1482
1483 async fn complete_streaming(
1484 &self,
1485 _messages: &[Message],
1486 _system: Option<&str>,
1487 _tools: &[ToolDefinition],
1488 _cancel_token: tokio_util::sync::CancellationToken,
1489 ) -> Result<mpsc::Receiver<StreamEvent>> {
1490 anyhow::bail!("streaming is not used by task executor tests")
1491 }
1492 }
1493
1494 #[async_trait::async_trait]
1495 impl LlmClient for ConcurrentLlmClient {
1496 async fn complete(
1497 &self,
1498 messages: &[Message],
1499 system: Option<&str>,
1500 _tools: &[ToolDefinition],
1501 ) -> Result<LlmResponse> {
1502 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1503 return Ok(pre_analysis_response(messages));
1504 }
1505
1506 let prompt = last_text(messages);
1507 self.record_active();
1508 self.barrier.wait().await;
1509 if prompt.contains("slow") {
1510 tokio::time::sleep(Duration::from_millis(120)).await;
1511 } else {
1512 tokio::time::sleep(Duration::from_millis(10)).await;
1513 }
1514 self.active.fetch_sub(1, Ordering::SeqCst);
1515 Ok(text_response(format!("completed: {prompt}")))
1516 }
1517
1518 async fn complete_streaming(
1519 &self,
1520 _messages: &[Message],
1521 _system: Option<&str>,
1522 _tools: &[ToolDefinition],
1523 _cancel_token: tokio_util::sync::CancellationToken,
1524 ) -> Result<mpsc::Receiver<StreamEvent>> {
1525 anyhow::bail!("streaming is not used by task executor tests")
1526 }
1527 }
1528
1529 fn test_registry_with_writer() -> Arc<AgentRegistry> {
1530 let registry = AgentRegistry::new();
1531 let spec = crate::subagent::WorkerAgentSpec::custom("writer", "Write files")
1532 .with_permissions(PermissionPolicy::new().allow("write(*)").allow("read(*)"))
1533 .with_prompt("Write files when asked.")
1534 .with_max_steps(3);
1535 registry.register(spec.into_agent_definition());
1536 Arc::new(registry)
1537 }
1538
1539 fn test_registry_with_text_worker() -> Arc<AgentRegistry> {
1540 let registry = AgentRegistry::new();
1541 let spec = crate::subagent::WorkerAgentSpec::custom("worker", "Text worker")
1542 .with_prompt("Return a concise result.")
1543 .with_max_steps(1);
1544 registry.register(spec.into_agent_definition());
1545 Arc::new(registry)
1546 }
1547
1548 #[tokio::test]
1549 async fn task_child_run_permission_allow() {
1550 let workspace = tempfile::tempdir().unwrap();
1551 let mock = Arc::new(MockLlmClient::new(vec![
1552 MockLlmClient::tool_call_response(
1553 "t1",
1554 "write",
1555 serde_json::json!({
1556 "file_path": workspace.path().join("out.txt").to_string_lossy(),
1557 "content": "WRITTEN"
1558 }),
1559 ),
1560 MockLlmClient::text_response("Done."),
1561 ]));
1562
1563 let executor = TaskExecutor::new(
1564 test_registry_with_writer(),
1565 mock,
1566 workspace.path().to_string_lossy().to_string(),
1567 );
1568
1569 let result = executor
1570 .execute(
1571 TaskParams {
1572 agent: "writer".to_string(),
1573 description: "Write file".to_string(),
1574 prompt: "Write out.txt".to_string(),
1575 background: false,
1576 max_steps: Some(3),
1577 },
1578 None,
1579 )
1580 .await
1581 .unwrap();
1582
1583 assert!(
1584 result.success,
1585 "child run should succeed: {}",
1586 result.output
1587 );
1588 assert!(
1589 !result.output.contains("Permission denied"),
1590 "no permission denial: {}",
1591 result.output
1592 );
1593 let content = std::fs::read_to_string(workspace.path().join("out.txt")).unwrap();
1594 assert_eq!(content, "WRITTEN");
1595 }
1596
1597 #[tokio::test]
1598 async fn task_child_run_permission_deny() {
1599 let workspace = tempfile::tempdir().unwrap();
1600 let registry = AgentRegistry::new();
1601 let spec = crate::subagent::WorkerAgentSpec::custom("restricted", "Restricted agent")
1602 .with_permissions(PermissionPolicy::new().allow("read(*)").deny("bash(*)"))
1603 .with_max_steps(3);
1604 registry.register(spec.into_agent_definition());
1605
1606 let mock = Arc::new(MockLlmClient::new(vec![
1607 MockLlmClient::tool_call_response(
1608 "t1",
1609 "bash",
1610 serde_json::json!({"command": "echo hello"}),
1611 ),
1612 MockLlmClient::text_response("Could not run bash."),
1613 ]));
1614
1615 let executor = TaskExecutor::new(
1616 Arc::new(registry),
1617 mock,
1618 workspace.path().to_string_lossy().to_string(),
1619 );
1620
1621 let result = executor
1622 .execute(
1623 TaskParams {
1624 agent: "restricted".to_string(),
1625 description: "Try bash".to_string(),
1626 prompt: "Run echo hello".to_string(),
1627 background: false,
1628 max_steps: Some(3),
1629 },
1630 None,
1631 )
1632 .await
1633 .unwrap();
1634
1635 assert!(result.success, "agent should complete: {}", result.output);
1638 }
1639
1640 #[tokio::test]
1641 async fn task_child_run_confirmation_auto_approve() {
1642 let workspace = tempfile::tempdir().unwrap();
1643 let registry = AgentRegistry::new();
1644 let spec = crate::subagent::WorkerAgentSpec::custom("reader-writer", "Read and write")
1647 .with_permissions(PermissionPolicy::new().allow("read(*)"))
1648 .with_max_steps(3);
1649 registry.register(spec.into_agent_definition());
1650
1651 let mock = Arc::new(MockLlmClient::new(vec![
1652 MockLlmClient::tool_call_response(
1653 "t1",
1654 "write",
1655 serde_json::json!({
1656 "file_path": workspace.path().join("auto.txt").to_string_lossy(),
1657 "content": "AUTO_APPROVED"
1658 }),
1659 ),
1660 MockLlmClient::text_response("Written."),
1661 ]));
1662
1663 let executor = TaskExecutor::new(
1664 Arc::new(registry),
1665 mock,
1666 workspace.path().to_string_lossy().to_string(),
1667 );
1668
1669 let result = executor
1670 .execute(
1671 TaskParams {
1672 agent: "reader-writer".to_string(),
1673 description: "Write via auto-approve".to_string(),
1674 prompt: "Write auto.txt".to_string(),
1675 background: false,
1676 max_steps: Some(3),
1677 },
1678 None,
1679 )
1680 .await
1681 .unwrap();
1682
1683 assert!(
1684 result.success,
1685 "Ask should be auto-approved: {}",
1686 result.output
1687 );
1688 assert!(
1689 !result.output.contains("MissingConfirmationManager"),
1690 "no MissingConfirmationManager: {}",
1691 result.output
1692 );
1693 }
1694
1695 #[tokio::test]
1696 async fn task_child_run_step_budget_enforced() {
1697 let workspace = tempfile::tempdir().unwrap();
1698 let mock = Arc::new(MockLlmClient::new(vec![
1699 MockLlmClient::tool_call_response(
1700 "t1",
1701 "read",
1702 serde_json::json!({"file_path": "/tmp/a.txt"}),
1703 ),
1704 MockLlmClient::tool_call_response(
1705 "t2",
1706 "read",
1707 serde_json::json!({"file_path": "/tmp/b.txt"}),
1708 ),
1709 MockLlmClient::tool_call_response(
1710 "t3",
1711 "read",
1712 serde_json::json!({"file_path": "/tmp/c.txt"}),
1713 ),
1714 MockLlmClient::text_response("Should not reach here."),
1715 ]));
1716
1717 let executor = TaskExecutor::new(
1718 test_registry_with_writer(),
1719 mock,
1720 workspace.path().to_string_lossy().to_string(),
1721 );
1722
1723 let result = executor
1724 .execute(
1725 TaskParams {
1726 agent: "writer".to_string(),
1727 description: "Exceed budget".to_string(),
1728 prompt: "Read many files".to_string(),
1729 background: false,
1730 max_steps: Some(2),
1731 },
1732 None,
1733 )
1734 .await
1735 .unwrap();
1736
1737 assert!(
1739 !result.success,
1740 "should fail when exceeding step budget: {}",
1741 result.output
1742 );
1743 assert!(
1744 result.output.contains("Max tool rounds") || result.output.contains("max tool rounds"),
1745 "error should mention tool rounds: {}",
1746 result.output
1747 );
1748 }
1749
1750 #[tokio::test]
1751 async fn parallel_task_executor_runs_children_concurrently_and_preserves_input_order() {
1752 let workspace = tempfile::tempdir().unwrap();
1753 let client = Arc::new(ConcurrentLlmClient::new(2));
1754 let executor = Arc::new(TaskExecutor::new(
1755 test_registry_with_text_worker(),
1756 client.clone(),
1757 workspace.path().to_string_lossy().to_string(),
1758 ));
1759
1760 let tasks = vec![
1761 TaskParams {
1762 agent: "worker".to_string(),
1763 description: "Slow task".to_string(),
1764 prompt: "slow branch".to_string(),
1765 background: false,
1766 max_steps: Some(1),
1767 },
1768 TaskParams {
1769 agent: "worker".to_string(),
1770 description: "Fast task".to_string(),
1771 prompt: "fast branch".to_string(),
1772 background: false,
1773 max_steps: Some(1),
1774 },
1775 ];
1776
1777 let results = tokio::time::timeout(
1778 Duration::from_secs(2),
1779 executor.execute_parallel(tasks, None),
1780 )
1781 .await
1782 .expect("parallel children should reach the barrier and complete");
1783
1784 assert_eq!(results.len(), 2);
1785 assert!(
1786 client.max_active() >= 2,
1787 "expected concurrent child execution, max_active={}",
1788 client.max_active()
1789 );
1790 assert!(results[0].success);
1791 assert!(results[0].output.contains("slow branch"));
1792 assert!(results[1].success);
1793 assert!(results[1].output.contains("fast branch"));
1794 }
1795
1796 #[tokio::test]
1797 async fn parallel_task_executor_respects_configured_concurrency_limit() {
1798 let workspace = tempfile::tempdir().unwrap();
1799 let client = Arc::new(LimitedConcurrencyLlmClient::new());
1800 let executor = Arc::new(
1801 TaskExecutor::new(
1802 test_registry_with_text_worker(),
1803 client.clone(),
1804 workspace.path().to_string_lossy().to_string(),
1805 )
1806 .with_max_parallel_tasks(2),
1807 );
1808
1809 let tasks = (0..5)
1810 .map(|idx| TaskParams {
1811 agent: "worker".to_string(),
1812 description: format!("Task {idx}"),
1813 prompt: format!("branch {idx}"),
1814 background: false,
1815 max_steps: Some(1),
1816 })
1817 .collect::<Vec<_>>();
1818
1819 let results = executor.execute_parallel(tasks, None).await;
1820
1821 assert_eq!(results.len(), 5);
1822 assert!(results.iter().all(|result| result.success));
1823 assert_eq!(client.max_active(), 2);
1824 }
1825
1826 #[tokio::test]
1827 async fn parallel_task_executor_isolates_unknown_agent_failure() {
1828 let workspace = tempfile::tempdir().unwrap();
1829 let executor = Arc::new(TaskExecutor::new(
1830 test_registry_with_text_worker(),
1831 Arc::new(StaticLlmClient::new("valid branch done")),
1832 workspace.path().to_string_lossy().to_string(),
1833 ));
1834
1835 let tasks = vec![
1836 TaskParams {
1837 agent: "missing-agent".to_string(),
1838 description: "Missing".to_string(),
1839 prompt: "should fail".to_string(),
1840 background: false,
1841 max_steps: Some(1),
1842 },
1843 TaskParams {
1844 agent: "worker".to_string(),
1845 description: "Valid".to_string(),
1846 prompt: "should succeed".to_string(),
1847 background: false,
1848 max_steps: Some(1),
1849 },
1850 ];
1851
1852 let results = executor.execute_parallel(tasks, None).await;
1853
1854 assert_eq!(results.len(), 2);
1855 assert!(!results[0].success);
1856 assert_eq!(results[0].agent, "missing-agent");
1857 assert!(results[0].output.contains("Unknown agent type"));
1858 assert!(results[1].success);
1859 assert_eq!(results[1].agent, "worker");
1860 assert!(results[1].output.contains("valid branch done"));
1861 }
1862
1863 #[tokio::test]
1864 async fn parallel_task_executor_emits_subagent_events_for_each_child() {
1865 let workspace = tempfile::tempdir().unwrap();
1866 let executor = Arc::new(TaskExecutor::new(
1867 test_registry_with_text_worker(),
1868 Arc::new(StaticLlmClient::new("done")),
1869 workspace.path().to_string_lossy().to_string(),
1870 ));
1871 let (tx, mut rx) = broadcast::channel(64);
1872
1873 let tasks = vec![
1874 TaskParams {
1875 agent: "worker".to_string(),
1876 description: "One".to_string(),
1877 prompt: "first".to_string(),
1878 background: false,
1879 max_steps: Some(1),
1880 },
1881 TaskParams {
1882 agent: "worker".to_string(),
1883 description: "Two".to_string(),
1884 prompt: "second".to_string(),
1885 background: false,
1886 max_steps: Some(1),
1887 },
1888 ];
1889
1890 let results = executor.execute_parallel(tasks, Some(tx)).await;
1891 assert_eq!(results.len(), 2);
1892 tokio::time::sleep(Duration::from_millis(20)).await;
1893
1894 let mut starts = Vec::new();
1895 let mut ends = Vec::new();
1896 while let Ok(event) = rx.try_recv() {
1897 match event {
1898 AgentEvent::SubagentStart { description, .. } => starts.push(description),
1899 AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)),
1900 _ => {}
1901 }
1902 }
1903
1904 starts.sort();
1905 assert_eq!(starts, vec!["One".to_string(), "Two".to_string()]);
1906 assert_eq!(ends.len(), 2);
1907 assert!(ends
1908 .iter()
1909 .all(|(agent, success)| agent == "worker" && *success));
1910 }
1911
1912 #[tokio::test]
1913 async fn parallel_task_tool_reports_error_when_any_child_fails() {
1914 let workspace = tempfile::tempdir().unwrap();
1915 let executor = Arc::new(TaskExecutor::new(
1916 test_registry_with_text_worker(),
1917 Arc::new(StaticLlmClient::new("valid branch done")),
1918 workspace.path().to_string_lossy().to_string(),
1919 ));
1920 let tool = ParallelTaskTool::new(executor);
1921 let ctx = ToolContext::new(workspace.path().to_path_buf());
1922
1923 let output = tool
1924 .execute(
1925 &serde_json::json!({
1926 "tasks": [
1927 {
1928 "agent": "missing-agent",
1929 "description": "Missing",
1930 "prompt": "should fail"
1931 },
1932 {
1933 "agent": "worker",
1934 "description": "Valid",
1935 "prompt": "should succeed"
1936 }
1937 ]
1938 }),
1939 &ctx,
1940 )
1941 .await
1942 .unwrap();
1943
1944 assert!(
1945 !output.success,
1946 "parallel_task should fail when any child result fails"
1947 );
1948 assert!(output.content.contains("[ERR]"));
1949 assert!(output.content.contains("[OK]"));
1950 let metadata = output.metadata.expect("metadata");
1951 assert_eq!(metadata["task_count"], 2);
1952 assert_eq!(metadata["results"][0]["success"], false);
1953 assert_eq!(metadata["results"][1]["success"], true);
1954 }
1955
1956 #[tokio::test]
1957 async fn parallel_task_both_inherit_permissions() {
1958 let workspace = tempfile::tempdir().unwrap();
1959 let mock = Arc::new(MockLlmClient::new(vec![
1960 MockLlmClient::tool_call_response(
1962 "t1",
1963 "write",
1964 serde_json::json!({
1965 "file_path": workspace.path().join("p1.txt").to_string_lossy(),
1966 "content": "P1"
1967 }),
1968 ),
1969 MockLlmClient::text_response("Done 1."),
1970 MockLlmClient::tool_call_response(
1972 "t2",
1973 "write",
1974 serde_json::json!({
1975 "file_path": workspace.path().join("p2.txt").to_string_lossy(),
1976 "content": "P2"
1977 }),
1978 ),
1979 MockLlmClient::text_response("Done 2."),
1980 ]));
1981
1982 let executor = Arc::new(TaskExecutor::new(
1983 test_registry_with_writer(),
1984 mock,
1985 workspace.path().to_string_lossy().to_string(),
1986 ));
1987
1988 let tasks = vec![
1989 TaskParams {
1990 agent: "writer".to_string(),
1991 description: "Write p1".to_string(),
1992 prompt: "Write p1.txt".to_string(),
1993 background: false,
1994 max_steps: Some(3),
1995 },
1996 TaskParams {
1997 agent: "writer".to_string(),
1998 description: "Write p2".to_string(),
1999 prompt: "Write p2.txt".to_string(),
2000 background: false,
2001 max_steps: Some(3),
2002 },
2003 ];
2004
2005 let results = executor.execute_parallel(tasks, None).await;
2006 assert_eq!(results.len(), 2);
2007
2008 for result in &results {
2009 assert!(
2010 result.success,
2011 "parallel child should succeed: {}",
2012 result.output
2013 );
2014 }
2015 }
2016}