1use crate::agent::{AgentConfig, AgentEvent, AgentLoop};
18use crate::llm::structured::{generate_blocking, StructuredMode, StructuredRequest};
19use crate::llm::LlmClient;
20use crate::mcp::manager::McpManager;
21use crate::orchestration::{AgentExecutor, AgentStepSpec, StepOutcome};
22use crate::subagent::AgentRegistry;
23use crate::tools::types::{Tool, ToolContext, ToolOutput};
24use anyhow::{Context, Result};
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use std::path::PathBuf;
28use std::sync::Arc;
29use tokio::sync::broadcast;
30
31const TASK_OUTPUT_CONTEXT_LIMIT: usize = 4_000;
32const TASK_OUTPUT_CONTEXT_HEAD: usize = 3_000;
33const TASK_OUTPUT_CONTEXT_TAIL: usize = 800;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(deny_unknown_fields)]
38pub struct TaskParams {
39 pub agent: String,
41 pub description: String,
43 pub prompt: String,
45 #[serde(default)]
47 pub background: bool,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub max_steps: Option<usize>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct TaskResult {
56 pub output: String,
58 pub session_id: String,
60 pub agent: String,
62 pub success: bool,
64 pub task_id: String,
66}
67
68fn compact_task_output(output: &str) -> (String, bool) {
69 if output.len() <= TASK_OUTPUT_CONTEXT_LIMIT {
70 return (output.to_string(), false);
71 }
72
73 let head = crate::text::truncate_utf8(output, TASK_OUTPUT_CONTEXT_HEAD);
74 let tail_start = output
75 .char_indices()
76 .find_map(|(idx, _)| {
77 if output.len().saturating_sub(idx) <= TASK_OUTPUT_CONTEXT_TAIL {
78 Some(idx)
79 } else {
80 None
81 }
82 })
83 .unwrap_or(output.len());
84 let tail = &output[tail_start..];
85
86 (
87 format!(
88 "{}\n\n[{} bytes omitted from delegated task output]\n\n{}",
89 head,
90 output.len().saturating_sub(head.len() + tail.len()),
91 tail
92 ),
93 true,
94 )
95}
96
97fn synthesize_subagent_progress(
108 event: &AgentEvent,
109 task_id: &str,
110 session_id: &str,
111) -> Option<AgentEvent> {
112 match event {
113 AgentEvent::ToolEnd {
114 name,
115 output,
116 exit_code,
117 error_kind,
118 ..
119 } => {
120 let mut metadata = serde_json::json!({
121 "tool": name,
122 "exit_code": exit_code,
123 "output_bytes": output.len(),
124 });
125 if let Some(kind) = error_kind {
126 metadata["error_kind"] =
127 serde_json::to_value(kind).unwrap_or(serde_json::Value::Null);
128 }
129 Some(AgentEvent::SubagentProgress {
130 task_id: task_id.to_string(),
131 session_id: session_id.to_string(),
132 status: "tool_completed".to_string(),
133 metadata,
134 })
135 }
136 AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress {
137 task_id: task_id.to_string(),
138 session_id: session_id.to_string(),
139 status: "turn_completed".to_string(),
140 metadata: serde_json::json!({
141 "turn": turn,
142 "total_tokens": usage.total_tokens,
143 "prompt_tokens": usage.prompt_tokens,
144 "completion_tokens": usage.completion_tokens,
145 }),
146 }),
147 _ => None,
148 }
149}
150
151fn task_artifact_id(result: &TaskResult) -> String {
152 format!("task-output:{}", result.task_id)
153}
154
155fn task_artifact_uri(result: &TaskResult) -> String {
156 format!(
157 "a3s://tasks/{}/runs/{}/output",
158 result.session_id, result.task_id
159 )
160}
161
162fn format_task_result_for_context(result: &TaskResult) -> (String, bool) {
163 let (output, truncated) = compact_task_output(&result.output);
164 let status = if result.success {
165 "completed"
166 } else {
167 "failed"
168 };
169 let artifact_id = task_artifact_id(result);
170 let artifact_uri = task_artifact_uri(result);
171 let mut formatted = format!(
172 "Task {status}: {}\nAgent: {}\nSession: {}\nTask ID: {}\nArtifact ID: {}\nArtifact URI: {}\n",
173 result.task_id, result.agent, result.session_id, result.task_id, artifact_id, artifact_uri
174 );
175 if truncated {
176 formatted.push_str(
177 "Output excerpt: truncated for parent context. Use the artifact URI or child run session/events if exact omitted content is needed.\n",
178 );
179 } else {
180 formatted.push_str("Output:\n");
181 }
182 formatted.push_str(&output);
183 (formatted, truncated)
184}
185
186pub struct TaskExecutor {
188 registry: Arc<AgentRegistry>,
190 llm_client: Arc<dyn LlmClient>,
192 workspace: String,
194 mcp_manager: Option<Arc<McpManager>>,
196 parent_context: Option<crate::child_run::ChildRunContext>,
198 max_parallel_tasks: usize,
199 subagent_tracker: Option<Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>>,
202}
203
204impl TaskExecutor {
205 pub fn new(
207 registry: Arc<AgentRegistry>,
208 llm_client: Arc<dyn LlmClient>,
209 workspace: String,
210 ) -> Self {
211 Self {
212 registry,
213 llm_client,
214 workspace,
215 mcp_manager: None,
216 parent_context: None,
217 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
218 subagent_tracker: None,
219 }
220 }
221
222 pub fn with_mcp(
224 registry: Arc<AgentRegistry>,
225 llm_client: Arc<dyn LlmClient>,
226 workspace: String,
227 mcp_manager: Arc<McpManager>,
228 ) -> Self {
229 Self {
230 registry,
231 llm_client,
232 workspace,
233 mcp_manager: Some(mcp_manager),
234 parent_context: None,
235 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
236 subagent_tracker: None,
237 }
238 }
239
240 pub fn with_parent_context(mut self, ctx: crate::child_run::ChildRunContext) -> Self {
242 if let Some(max_parallel_tasks) = ctx.max_parallel_tasks {
243 self.max_parallel_tasks = max_parallel_tasks.max(1);
244 }
245 self.parent_context = Some(ctx);
246 self
247 }
248
249 pub fn with_max_parallel_tasks(mut self, max_parallel_tasks: usize) -> Self {
250 self.max_parallel_tasks = max_parallel_tasks.max(1);
251 self
252 }
253
254 pub fn with_subagent_tracker(
258 mut self,
259 tracker: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
260 ) -> Self {
261 self.subagent_tracker = Some(tracker);
262 self
263 }
264
265 pub async fn execute(
270 &self,
271 params: TaskParams,
272 event_tx: Option<broadcast::Sender<AgentEvent>>,
273 parent_session_id: Option<&str>,
274 ) -> Result<TaskResult> {
275 let task_id = format!("task-{}", uuid::Uuid::new_v4());
276 self.execute_with_task_id(task_id, params, event_tx, parent_session_id, true)
277 .await
278 }
279
280 pub async fn execute_with_task_id(
285 &self,
286 task_id: String,
287 params: TaskParams,
288 event_tx: Option<broadcast::Sender<AgentEvent>>,
289 parent_session_id: Option<&str>,
290 emit_start: bool,
291 ) -> Result<TaskResult> {
292 let session_id = format!("task-run-{}", task_id);
293
294 let agent = self
295 .registry
296 .get(¶ms.agent)
297 .context(format!("Unknown agent type: '{}'", params.agent))?;
298
299 if emit_start {
300 if let Some(ref tx) = event_tx {
301 let _ = tx.send(AgentEvent::SubagentStart {
302 task_id: task_id.clone(),
303 session_id: session_id.clone(),
304 parent_session_id: parent_session_id.unwrap_or_default().to_string(),
305 agent: params.agent.clone(),
306 description: params.description.clone(),
307 });
308 }
309 }
310
311 let child_executor = if let Some(ref parent_ctx) = self.parent_context {
314 if let Some(ref services) = parent_ctx.workspace_services {
315 crate::tools::ToolExecutor::new_with_workspace_services_and_artifact_limits(
316 self.workspace.clone(),
317 Arc::clone(services),
318 crate::tools::ArtifactStoreLimits::default(),
319 )
320 } else {
321 crate::tools::ToolExecutor::new(self.workspace.clone())
322 }
323 } else {
324 crate::tools::ToolExecutor::new(self.workspace.clone())
325 };
326
327 if let Some(ref mcp) = self.mcp_manager {
329 let all_tools = mcp.get_all_tools().await;
330 let mut by_server: std::collections::HashMap<
331 String,
332 Vec<crate::mcp::protocol::McpTool>,
333 > = std::collections::HashMap::new();
334 for (server, tool) in all_tools {
335 by_server.entry(server).or_default().push(tool);
336 }
337 for (server_name, tools) in by_server {
338 let wrappers =
339 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp));
340 for wrapper in wrappers {
341 child_executor.register_dynamic_tool(wrapper);
342 }
343 }
344 }
345
346 let child_executor = Arc::new(child_executor);
347
348 let mut child_config = AgentConfig {
349 tools: child_executor.definitions(),
350 ..AgentConfig::default()
351 };
352 agent.apply_to(&mut child_config);
353 if let Some(ref parent_ctx) = self.parent_context {
354 parent_ctx.apply_to(&mut child_config);
355 }
356 if let Some(max_steps) = params.max_steps {
357 child_config.max_tool_rounds = max_steps;
358 }
359
360 let mut tool_context =
361 ToolContext::new(PathBuf::from(&self.workspace)).with_session_id(session_id.clone());
362 if let Some(ref parent_ctx) = self.parent_context {
363 if let Some(ref services) = parent_ctx.workspace_services {
364 tool_context = tool_context.with_workspace_services(Arc::clone(services));
365 }
366 }
367
368 let agent_loop = AgentLoop::new(
369 Arc::clone(&self.llm_client),
370 child_executor,
371 tool_context,
372 child_config,
373 );
374
375 let child_event_tx = if let Some(ref broadcast_tx) = event_tx {
380 let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100);
381 let broadcast_tx_clone = broadcast_tx.clone();
382 let progress_task_id = task_id.clone();
383 let progress_session_id = session_id.clone();
384
385 tokio::spawn(async move {
386 while let Some(event) = mpsc_rx.recv().await {
387 if let Some(progress) = synthesize_subagent_progress(
388 &event,
389 &progress_task_id,
390 &progress_session_id,
391 ) {
392 let _ = broadcast_tx_clone.send(progress);
393 }
394 let _ = broadcast_tx_clone.send(event);
395 }
396 });
397
398 Some(mpsc_tx)
399 } else {
400 None
401 };
402
403 let cancel_token = tokio_util::sync::CancellationToken::new();
406 if let Some(ref tracker) = self.subagent_tracker {
407 tracker
408 .register_canceller(&task_id, cancel_token.clone())
409 .await;
410 }
411
412 let (output, success) = match agent_loop
413 .execute_with_session(
414 &[],
415 ¶ms.prompt,
416 Some(&session_id),
417 child_event_tx,
418 Some(&cancel_token),
419 )
420 .await
421 {
422 Ok(result) => (result.text, true),
423 Err(e) if cancel_token.is_cancelled() => {
424 (format!("Task cancelled by caller: {}", e), false)
425 }
426 Err(e) => (format!("Task failed: {}", e), false),
427 };
428
429 if let Some(ref tracker) = self.subagent_tracker {
430 tracker.clear_canceller(&task_id).await;
431 }
432
433 if let Some(ref tx) = event_tx {
434 let _ = tx.send(AgentEvent::SubagentEnd {
435 task_id: task_id.clone(),
436 session_id: session_id.clone(),
437 agent: params.agent.clone(),
438 output: output.clone(),
439 success,
440 });
441 }
442
443 Ok(TaskResult {
444 output,
445 session_id,
446 agent: params.agent,
447 success,
448 task_id,
449 })
450 }
451
452 pub fn execute_background(
460 self: Arc<Self>,
461 params: TaskParams,
462 event_tx: Option<broadcast::Sender<AgentEvent>>,
463 parent_session_id: Option<String>,
464 ) -> String {
465 let task_id = format!("task-{}", uuid::Uuid::new_v4());
466 let session_id = format!("task-run-{}", task_id);
467
468 if let Some(ref tx) = event_tx {
469 let _ = tx.send(AgentEvent::SubagentStart {
470 task_id: task_id.clone(),
471 session_id,
472 parent_session_id: parent_session_id.clone().unwrap_or_default(),
473 agent: params.agent.clone(),
474 description: params.description.clone(),
475 });
476 }
477
478 let task_id_for_spawn = task_id.clone();
479 let task_id_for_log = task_id.clone();
480 tokio::spawn(async move {
481 if let Err(e) = self
482 .execute_with_task_id(
483 task_id_for_spawn,
484 params,
485 event_tx,
486 parent_session_id.as_deref(),
487 false,
488 )
489 .await
490 {
491 tracing::error!("Background task {} failed: {}", task_id_for_log, e);
492 }
493 });
494
495 task_id
496 }
497
498 pub async fn execute_parallel(
506 self: &Arc<Self>,
507 tasks: Vec<TaskParams>,
508 event_tx: Option<broadcast::Sender<AgentEvent>>,
509 parent_session_id: Option<&str>,
510 ) -> Vec<TaskResult> {
511 let parent = parent_session_id.map(|s| s.to_string());
512 let specs = tasks
513 .into_iter()
514 .map(|params| AgentStepSpec {
515 task_id: format!("task-{}", uuid::Uuid::new_v4()),
516 agent: params.agent,
517 description: params.description,
518 prompt: params.prompt,
519 max_steps: params.max_steps,
520 parent_session_id: parent.clone(),
521 output_schema: None,
522 })
523 .collect();
524
525 let executor: Arc<dyn AgentExecutor> = Arc::<Self>::clone(self);
526 crate::orchestration::execute_steps_parallel(executor, specs, event_tx)
527 .await
528 .into_iter()
529 .map(TaskResult::from)
530 .collect()
531 }
532}
533
534impl From<TaskResult> for StepOutcome {
535 fn from(r: TaskResult) -> Self {
536 StepOutcome {
537 task_id: r.task_id,
538 session_id: r.session_id,
539 agent: r.agent,
540 output: r.output,
541 success: r.success,
542 structured: None,
543 }
544 }
545}
546
547impl From<StepOutcome> for TaskResult {
548 fn from(o: StepOutcome) -> Self {
549 TaskResult {
550 output: o.output,
551 session_id: o.session_id,
552 agent: o.agent,
553 success: o.success,
554 task_id: o.task_id,
555 }
556 }
557}
558
559#[async_trait]
563impl AgentExecutor for TaskExecutor {
564 async fn execute_step(
565 &self,
566 spec: AgentStepSpec,
567 event_tx: Option<broadcast::Sender<AgentEvent>>,
568 ) -> StepOutcome {
569 let agent = spec.agent.clone();
570 let task_id = spec.task_id.clone();
571 let output_schema = spec.output_schema.clone();
572 let params = TaskParams {
573 agent: spec.agent,
574 description: spec.description,
575 prompt: spec.prompt,
576 background: false,
577 max_steps: spec.max_steps,
578 };
579 let mut outcome: StepOutcome = match self
580 .execute_with_task_id(
581 task_id.clone(),
582 params,
583 event_tx,
584 spec.parent_session_id.as_deref(),
585 true,
586 )
587 .await
588 {
589 Ok(result) => result.into(),
590 Err(e) => return StepOutcome::failed(task_id, agent, format!("Task failed: {e}")),
591 };
592
593 if outcome.success {
598 if let Some(schema) = output_schema {
599 match self.coerce_to_schema(&outcome.output, schema).await {
600 Ok(object) => outcome.structured = Some(object),
601 Err(e) => {
602 outcome.success = false;
603 outcome.output =
604 format!("{}\n\n[structured output failed: {e}]", outcome.output);
605 }
606 }
607 }
608 }
609 outcome
610 }
611
612 fn concurrency_hint(&self) -> usize {
613 self.max_parallel_tasks
614 }
615}
616
617impl TaskExecutor {
618 async fn coerce_to_schema(
623 &self,
624 output: &str,
625 schema: serde_json::Value,
626 ) -> Result<serde_json::Value> {
627 let req = StructuredRequest {
628 prompt: format!(
629 "Convert the following task result into a single JSON object that conforms to \
630 the required schema. Use only information present in the result.\n\n\
631 --- TASK RESULT ---\n{output}"
632 ),
633 system: Some(
634 "You output exactly one JSON object matching the provided schema.".to_string(),
635 ),
636 schema,
637 schema_name: "step_output".to_string(),
638 schema_description: None,
639 mode: StructuredMode::Tool,
642 max_repair_attempts: 2,
643 };
644 let result = generate_blocking(&*self.llm_client, &req).await?;
645 Ok(result.object)
646 }
647}
648
649pub fn task_params_schema() -> serde_json::Value {
651 serde_json::json!({
652 "type": "object",
653 "additionalProperties": false,
654 "properties": {
655 "agent": {
656 "type": "string",
657 "description": "Required. Canonical agent type to use (for example: explore, general, plan, verification, review). Always provide this exact field name: 'agent'."
658 },
659 "description": {
660 "type": "string",
661 "description": "Required. Short task label for display and tracking. Always provide this exact field name: 'description'."
662 },
663 "prompt": {
664 "type": "string",
665 "description": "Required. Detailed instruction for the delegated child run. Always provide this exact field name: 'prompt'."
666 },
667 "background": {
668 "type": "boolean",
669 "description": "Optional. Run the task in the background. Default: false.",
670 "default": false
671 },
672 "max_steps": {
673 "type": "integer",
674 "description": "Optional. Maximum number of steps for this task."
675 }
676 },
677 "required": ["agent", "description", "prompt"],
678 "examples": [
679 {
680 "agent": "explore",
681 "description": "Find Rust files",
682 "prompt": "Search the workspace for Rust files and summarize the layout."
683 },
684 {
685 "agent": "general",
686 "description": "Investigate test failure",
687 "prompt": "Inspect the failing tests and explain the root cause.",
688 "max_steps": 6
689 }
690 ]
691 })
692}
693
694pub struct TaskTool {
697 executor: Arc<TaskExecutor>,
698}
699
700impl TaskTool {
701 pub fn new(executor: Arc<TaskExecutor>) -> Self {
703 Self { executor }
704 }
705}
706
707#[async_trait]
708impl Tool for TaskTool {
709 fn name(&self) -> &str {
710 "task"
711 }
712
713 fn description(&self) -> &str {
714 "Delegate a bounded task to a specialized child run. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
715 }
716
717 fn parameters(&self) -> serde_json::Value {
718 task_params_schema()
719 }
720
721 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
722 let params: TaskParams =
723 serde_json::from_value(args.clone()).context("Invalid task parameters")?;
724
725 if params.background {
726 let task_id = Arc::clone(&self.executor).execute_background(
727 params,
728 ctx.agent_event_tx.clone(),
729 ctx.session_id.clone(),
730 );
731 return Ok(ToolOutput::success(format!(
732 "Task started in background. Task ID: {}",
733 task_id
734 )));
735 }
736
737 let result = self
738 .executor
739 .execute(
740 params,
741 ctx.agent_event_tx.clone(),
742 ctx.session_id.as_deref(),
743 )
744 .await?;
745 let (content, truncated) = format_task_result_for_context(&result);
746 let metadata = serde_json::json!({
747 "task_id": result.task_id,
748 "session_id": result.session_id,
749 "agent": result.agent,
750 "success": result.success,
751 "output_bytes": result.output.len(),
752 "truncated_for_context": truncated,
753 "artifact_id": task_artifact_id(&result),
754 "artifact_uri": task_artifact_uri(&result),
755 });
756
757 if result.success {
758 Ok(ToolOutput::success(content).with_metadata(metadata))
759 } else {
760 Ok(ToolOutput::error(content).with_metadata(metadata))
761 }
762 }
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
767#[serde(deny_unknown_fields)]
768pub struct ParallelTaskParams {
769 pub tasks: Vec<TaskParams>,
771}
772
773pub fn parallel_task_params_schema() -> serde_json::Value {
775 serde_json::json!({
776 "type": "object",
777 "additionalProperties": false,
778 "properties": {
779 "tasks": {
780 "type": "array",
781 "description": "List of tasks to execute in parallel. Each task runs as an independent delegated child run concurrently.",
782 "items": {
783 "type": "object",
784 "additionalProperties": false,
785 "properties": {
786 "agent": {
787 "type": "string",
788 "description": "Required. Canonical agent type for this task."
789 },
790 "description": {
791 "type": "string",
792 "description": "Required. Short task label for display and tracking."
793 },
794 "prompt": {
795 "type": "string",
796 "description": "Required. Detailed instruction for the delegated child run."
797 }
798 },
799 "required": ["agent", "description", "prompt"]
800 },
801 "minItems": 1
802 }
803 },
804 "required": ["tasks"],
805 "examples": [
806 {
807 "tasks": [
808 {
809 "agent": "explore",
810 "description": "Find Rust files",
811 "prompt": "List Rust files under src/."
812 },
813 {
814 "agent": "explore",
815 "description": "Find tests",
816 "prompt": "List test files and summarize their purpose."
817 }
818 ]
819 }
820 ]
821 })
822}
823
824pub struct ParallelTaskTool {
828 executor: Arc<TaskExecutor>,
829}
830
831impl ParallelTaskTool {
832 pub fn new(executor: Arc<TaskExecutor>) -> Self {
834 Self { executor }
835 }
836}
837
838#[async_trait]
839impl Tool for ParallelTaskTool {
840 fn name(&self) -> &str {
841 "parallel_task"
842 }
843
844 fn description(&self) -> &str {
845 "Execute multiple delegated child runs in parallel. All tasks run concurrently and results are returned when all complete. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
846 }
847
848 fn parameters(&self) -> serde_json::Value {
849 parallel_task_params_schema()
850 }
851
852 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
853 let params: ParallelTaskParams =
854 serde_json::from_value(args.clone()).context("Invalid parallel task parameters")?;
855
856 if params.tasks.is_empty() {
857 return Ok(ToolOutput::error("No tasks provided".to_string()));
858 }
859
860 let task_count = params.tasks.len();
861
862 let results = self
863 .executor
864 .execute_parallel(
865 params.tasks,
866 ctx.agent_event_tx.clone(),
867 ctx.session_id.as_deref(),
868 )
869 .await;
870
871 let mut output = format!("Executed {} tasks in parallel:\n\n", task_count);
873 let mut metadata_results = Vec::new();
874 for (i, result) in results.iter().enumerate() {
875 let status = if result.success { "[OK]" } else { "[ERR]" };
876 let (formatted, truncated) = format_task_result_for_context(result);
877 metadata_results.push(serde_json::json!({
878 "task_id": result.task_id,
879 "session_id": result.session_id,
880 "agent": result.agent,
881 "success": result.success,
882 "output": formatted.clone(),
883 "output_bytes": result.output.len(),
884 "truncated_for_context": truncated,
885 "artifact_id": task_artifact_id(result),
886 "artifact_uri": task_artifact_uri(result),
887 }));
888 output.push_str(&format!(
889 "--- Task {} ({}) {} ---\n{}\n\n",
890 i + 1,
891 result.agent,
892 status,
893 formatted
894 ));
895 }
896
897 let all_success = results.iter().all(|result| result.success);
898 let output = if all_success {
899 ToolOutput::success(output)
900 } else {
901 ToolOutput::error(output)
902 };
903
904 Ok(output.with_metadata(serde_json::json!({
905 "task_count": task_count,
906 "results": metadata_results,
907 })))
908 }
909}
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914
915 #[test]
916 fn test_task_params_deserialize() {
917 let json = r#"{
918 "agent": "explore",
919 "description": "Find auth code",
920 "prompt": "Search for authentication files"
921 }"#;
922
923 let params: TaskParams = serde_json::from_str(json).unwrap();
924 assert_eq!(params.agent, "explore");
925 assert_eq!(params.description, "Find auth code");
926 assert!(!params.background);
927 }
928
929 #[test]
930 fn test_task_params_with_background() {
931 let json = r#"{
932 "agent": "general",
933 "description": "Long task",
934 "prompt": "Do something complex",
935 "background": true
936 }"#;
937
938 let params: TaskParams = serde_json::from_str(json).unwrap();
939 assert!(params.background);
940 }
941
942 #[test]
943 fn test_task_params_with_max_steps() {
944 let json = r#"{
945 "agent": "plan",
946 "description": "Planning task",
947 "prompt": "Create a plan",
948 "max_steps": 10
949 }"#;
950
951 let params: TaskParams = serde_json::from_str(json).unwrap();
952 assert_eq!(params.agent, "plan");
953 assert_eq!(params.max_steps, Some(10));
954 assert!(!params.background);
955 }
956
957 #[test]
958 fn test_task_params_all_fields() {
959 let json = r#"{
960 "agent": "general",
961 "description": "Complex task",
962 "prompt": "Do everything",
963 "background": true,
964 "max_steps": 20
965 }"#;
966
967 let params: TaskParams = serde_json::from_str(json).unwrap();
968 assert_eq!(params.agent, "general");
969 assert_eq!(params.description, "Complex task");
970 assert_eq!(params.prompt, "Do everything");
971 assert!(params.background);
972 assert_eq!(params.max_steps, Some(20));
973 }
974
975 #[test]
976 fn test_task_params_missing_required_field() {
977 let json = r#"{
978 "agent": "explore",
979 "description": "Missing prompt"
980 }"#;
981
982 let result: Result<TaskParams, _> = serde_json::from_str(json);
983 assert!(result.is_err());
984 }
985
986 #[test]
987 fn test_task_params_serialize() {
988 let params = TaskParams {
989 agent: "explore".to_string(),
990 description: "Test task".to_string(),
991 prompt: "Test prompt".to_string(),
992 background: false,
993 max_steps: Some(5),
994 };
995
996 let json = serde_json::to_string(¶ms).unwrap();
997 assert!(json.contains("explore"));
998 assert!(json.contains("Test task"));
999 assert!(json.contains("Test prompt"));
1000 }
1001
1002 #[test]
1003 fn test_task_params_clone() {
1004 let params = TaskParams {
1005 agent: "explore".to_string(),
1006 description: "Test".to_string(),
1007 prompt: "Prompt".to_string(),
1008 background: true,
1009 max_steps: None,
1010 };
1011
1012 let cloned = params.clone();
1013 assert_eq!(params.agent, cloned.agent);
1014 assert_eq!(params.description, cloned.description);
1015 assert_eq!(params.background, cloned.background);
1016 }
1017
1018 #[test]
1019 fn test_task_result_serialize() {
1020 let result = TaskResult {
1021 output: "Found 5 files".to_string(),
1022 session_id: "session-123".to_string(),
1023 agent: "explore".to_string(),
1024 success: true,
1025 task_id: "task-456".to_string(),
1026 };
1027
1028 let json = serde_json::to_string(&result).unwrap();
1029 assert!(json.contains("Found 5 files"));
1030 assert!(json.contains("explore"));
1031 }
1032
1033 #[test]
1034 fn test_task_result_deserialize() {
1035 let json = r#"{
1036 "output": "Task completed",
1037 "session_id": "sess-789",
1038 "agent": "general",
1039 "success": false,
1040 "task_id": "task-123"
1041 }"#;
1042
1043 let result: TaskResult = serde_json::from_str(json).unwrap();
1044 assert_eq!(result.output, "Task completed");
1045 assert_eq!(result.session_id, "sess-789");
1046 assert_eq!(result.agent, "general");
1047 assert!(!result.success);
1048 assert_eq!(result.task_id, "task-123");
1049 }
1050
1051 #[test]
1052 fn test_task_result_clone() {
1053 let result = TaskResult {
1054 output: "Output".to_string(),
1055 session_id: "session-1".to_string(),
1056 agent: "explore".to_string(),
1057 success: true,
1058 task_id: "task-1".to_string(),
1059 };
1060
1061 let cloned = result.clone();
1062 assert_eq!(result.output, cloned.output);
1063 assert_eq!(result.success, cloned.success);
1064 }
1065
1066 #[test]
1067 fn test_compact_task_output_preserves_small_output() {
1068 let (output, truncated) = compact_task_output("short result");
1069 assert_eq!(output, "short result");
1070 assert!(!truncated);
1071 }
1072
1073 #[test]
1074 fn test_format_task_result_for_context_truncates_large_output() {
1075 let result = TaskResult {
1076 output: format!("{}TAIL", "x".repeat(TASK_OUTPUT_CONTEXT_LIMIT + 500)),
1077 session_id: "session-1".to_string(),
1078 agent: "explore".to_string(),
1079 success: true,
1080 task_id: "task-1".to_string(),
1081 };
1082
1083 let (formatted, truncated) = format_task_result_for_context(&result);
1084 assert!(truncated);
1085 assert!(formatted.contains("Output excerpt"));
1086 assert!(formatted.contains("bytes omitted"));
1087 assert!(formatted.contains("Artifact ID: task-output:task-1"));
1088 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1089 assert!(formatted.contains("TAIL"));
1090 assert!(formatted.len() < result.output.len());
1091 }
1092
1093 #[test]
1094 fn test_task_artifact_reference_is_stable() {
1095 let result = TaskResult {
1096 output: "done".to_string(),
1097 session_id: "session-1".to_string(),
1098 agent: "explore".to_string(),
1099 success: true,
1100 task_id: "task-1".to_string(),
1101 };
1102
1103 assert_eq!(task_artifact_id(&result), "task-output:task-1");
1104 assert_eq!(
1105 task_artifact_uri(&result),
1106 "a3s://tasks/session-1/runs/task-1/output"
1107 );
1108
1109 let (formatted, truncated) = format_task_result_for_context(&result);
1110 assert!(!truncated);
1111 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1112 }
1113
1114 #[test]
1115 fn test_task_params_schema() {
1116 let schema = task_params_schema();
1117 assert_eq!(schema["type"], "object");
1118 assert_eq!(schema["additionalProperties"], false);
1119 assert!(schema["properties"]["agent"].is_object());
1120 assert!(schema["properties"]["prompt"].is_object());
1121 }
1122
1123 #[test]
1124 fn test_task_params_schema_required_fields() {
1125 let schema = task_params_schema();
1126 let required = schema["required"].as_array().unwrap();
1127 assert!(required.contains(&serde_json::json!("agent")));
1128 assert!(required.contains(&serde_json::json!("description")));
1129 assert!(required.contains(&serde_json::json!("prompt")));
1130 }
1131
1132 #[test]
1133 fn test_task_params_schema_properties() {
1134 let schema = task_params_schema();
1135 let props = &schema["properties"];
1136
1137 assert_eq!(props["agent"]["type"], "string");
1138 assert_eq!(props["description"]["type"], "string");
1139 assert_eq!(props["prompt"]["type"], "string");
1140 assert_eq!(props["background"]["type"], "boolean");
1141 assert_eq!(props["background"]["default"], false);
1142 assert_eq!(props["max_steps"]["type"], "integer");
1143 }
1144
1145 #[test]
1146 fn test_task_params_schema_descriptions() {
1147 let schema = task_params_schema();
1148 let props = &schema["properties"];
1149
1150 assert!(props["agent"]["description"].is_string());
1151 assert!(props["description"]["description"].is_string());
1152 assert!(props["prompt"]["description"].is_string());
1153 assert!(props["background"]["description"].is_string());
1154 assert!(props["max_steps"]["description"].is_string());
1155 }
1156
1157 #[test]
1158 fn test_task_params_default_background() {
1159 let params = TaskParams {
1160 agent: "explore".to_string(),
1161 description: "Test".to_string(),
1162 prompt: "Test prompt".to_string(),
1163 background: false,
1164 max_steps: None,
1165 };
1166 assert!(!params.background);
1167 }
1168
1169 #[test]
1170 fn test_task_params_serialize_skip_none() {
1171 let params = TaskParams {
1172 agent: "explore".to_string(),
1173 description: "Test".to_string(),
1174 prompt: "Test prompt".to_string(),
1175 background: false,
1176 max_steps: None,
1177 };
1178 let json = serde_json::to_string(¶ms).unwrap();
1179 assert!(!json.contains("max_steps"));
1181 }
1182
1183 #[test]
1184 fn test_task_params_serialize_with_max_steps() {
1185 let params = TaskParams {
1186 agent: "explore".to_string(),
1187 description: "Test".to_string(),
1188 prompt: "Test prompt".to_string(),
1189 background: false,
1190 max_steps: Some(15),
1191 };
1192 let json = serde_json::to_string(¶ms).unwrap();
1193 assert!(json.contains("max_steps"));
1194 assert!(json.contains("15"));
1195 }
1196
1197 #[test]
1198 fn test_task_result_success_true() {
1199 let result = TaskResult {
1200 output: "Success".to_string(),
1201 session_id: "sess-1".to_string(),
1202 agent: "explore".to_string(),
1203 success: true,
1204 task_id: "task-1".to_string(),
1205 };
1206 assert!(result.success);
1207 }
1208
1209 #[test]
1210 fn test_task_result_success_false() {
1211 let result = TaskResult {
1212 output: "Failed".to_string(),
1213 session_id: "sess-1".to_string(),
1214 agent: "explore".to_string(),
1215 success: false,
1216 task_id: "task-1".to_string(),
1217 };
1218 assert!(!result.success);
1219 }
1220
1221 #[test]
1222 fn test_task_params_empty_strings() {
1223 let params = TaskParams {
1224 agent: "".to_string(),
1225 description: "".to_string(),
1226 prompt: "".to_string(),
1227 background: false,
1228 max_steps: None,
1229 };
1230 let json = serde_json::to_string(¶ms).unwrap();
1231 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1232 assert_eq!(deserialized.agent, "");
1233 assert_eq!(deserialized.description, "");
1234 assert_eq!(deserialized.prompt, "");
1235 }
1236
1237 #[test]
1238 fn test_task_result_empty_output() {
1239 let result = TaskResult {
1240 output: "".to_string(),
1241 session_id: "sess-1".to_string(),
1242 agent: "explore".to_string(),
1243 success: true,
1244 task_id: "task-1".to_string(),
1245 };
1246 assert_eq!(result.output, "");
1247 }
1248
1249 #[test]
1250 fn test_task_params_debug_format() {
1251 let params = TaskParams {
1252 agent: "explore".to_string(),
1253 description: "Test".to_string(),
1254 prompt: "Test prompt".to_string(),
1255 background: false,
1256 max_steps: None,
1257 };
1258 let debug_str = format!("{:?}", params);
1259 assert!(debug_str.contains("explore"));
1260 assert!(debug_str.contains("Test"));
1261 }
1262
1263 #[test]
1264 fn test_task_result_debug_format() {
1265 let result = TaskResult {
1266 output: "Output".to_string(),
1267 session_id: "sess-1".to_string(),
1268 agent: "explore".to_string(),
1269 success: true,
1270 task_id: "task-1".to_string(),
1271 };
1272 let debug_str = format!("{:?}", result);
1273 assert!(debug_str.contains("Output"));
1274 assert!(debug_str.contains("explore"));
1275 }
1276
1277 #[test]
1278 fn test_task_params_roundtrip() {
1279 let original = TaskParams {
1280 agent: "general".to_string(),
1281 description: "Roundtrip test".to_string(),
1282 prompt: "Test roundtrip serialization".to_string(),
1283 background: true,
1284 max_steps: Some(42),
1285 };
1286 let json = serde_json::to_string(&original).unwrap();
1287 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1288 assert_eq!(original.agent, deserialized.agent);
1289 assert_eq!(original.description, deserialized.description);
1290 assert_eq!(original.prompt, deserialized.prompt);
1291 assert_eq!(original.background, deserialized.background);
1292 assert_eq!(original.max_steps, deserialized.max_steps);
1293 }
1294
1295 #[test]
1296 fn test_task_result_roundtrip() {
1297 let original = TaskResult {
1298 output: "Roundtrip output".to_string(),
1299 session_id: "sess-roundtrip".to_string(),
1300 agent: "plan".to_string(),
1301 success: false,
1302 task_id: "task-roundtrip".to_string(),
1303 };
1304 let json = serde_json::to_string(&original).unwrap();
1305 let deserialized: TaskResult = serde_json::from_str(&json).unwrap();
1306 assert_eq!(original.output, deserialized.output);
1307 assert_eq!(original.session_id, deserialized.session_id);
1308 assert_eq!(original.agent, deserialized.agent);
1309 assert_eq!(original.success, deserialized.success);
1310 assert_eq!(original.task_id, deserialized.task_id);
1311 }
1312
1313 #[test]
1314 fn test_parallel_task_params_deserialize() {
1315 let json = r#"{
1316 "tasks": [
1317 { "agent": "explore", "description": "Find auth", "prompt": "Search auth files" },
1318 { "agent": "general", "description": "Fix bug", "prompt": "Fix the login bug" }
1319 ]
1320 }"#;
1321
1322 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1323 assert_eq!(params.tasks.len(), 2);
1324 assert_eq!(params.tasks[0].agent, "explore");
1325 assert_eq!(params.tasks[1].agent, "general");
1326 }
1327
1328 #[test]
1329 fn test_parallel_task_params_single_task() {
1330 let json = r#"{
1331 "tasks": [
1332 { "agent": "plan", "description": "Plan work", "prompt": "Create a plan" }
1333 ]
1334 }"#;
1335
1336 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1337 assert_eq!(params.tasks.len(), 1);
1338 }
1339
1340 #[test]
1341 fn test_parallel_task_params_empty_tasks() {
1342 let json = r#"{ "tasks": [] }"#;
1343 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1344 assert!(params.tasks.is_empty());
1345 }
1346
1347 #[test]
1348 fn test_parallel_task_params_missing_tasks() {
1349 let json = r#"{}"#;
1350 let result: Result<ParallelTaskParams, _> = serde_json::from_str(json);
1351 assert!(result.is_err());
1352 }
1353
1354 #[test]
1355 fn test_parallel_task_params_serialize() {
1356 let params = ParallelTaskParams {
1357 tasks: vec![
1358 TaskParams {
1359 agent: "explore".to_string(),
1360 description: "Task 1".to_string(),
1361 prompt: "Prompt 1".to_string(),
1362 background: false,
1363 max_steps: None,
1364 },
1365 TaskParams {
1366 agent: "general".to_string(),
1367 description: "Task 2".to_string(),
1368 prompt: "Prompt 2".to_string(),
1369 background: false,
1370 max_steps: Some(10),
1371 },
1372 ],
1373 };
1374 let json = serde_json::to_string(¶ms).unwrap();
1375 assert!(json.contains("explore"));
1376 assert!(json.contains("general"));
1377 assert!(json.contains("Prompt 1"));
1378 assert!(json.contains("Prompt 2"));
1379 }
1380
1381 #[test]
1382 fn test_parallel_task_params_roundtrip() {
1383 let original = ParallelTaskParams {
1384 tasks: vec![
1385 TaskParams {
1386 agent: "explore".to_string(),
1387 description: "Explore".to_string(),
1388 prompt: "Find files".to_string(),
1389 background: false,
1390 max_steps: None,
1391 },
1392 TaskParams {
1393 agent: "plan".to_string(),
1394 description: "Plan".to_string(),
1395 prompt: "Make plan".to_string(),
1396 background: false,
1397 max_steps: Some(5),
1398 },
1399 ],
1400 };
1401 let json = serde_json::to_string(&original).unwrap();
1402 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1403 assert_eq!(original.tasks.len(), deserialized.tasks.len());
1404 assert_eq!(original.tasks[0].agent, deserialized.tasks[0].agent);
1405 assert_eq!(original.tasks[1].agent, deserialized.tasks[1].agent);
1406 assert_eq!(original.tasks[1].max_steps, deserialized.tasks[1].max_steps);
1407 }
1408
1409 #[test]
1410 fn test_parallel_task_params_clone() {
1411 let params = ParallelTaskParams {
1412 tasks: vec![TaskParams {
1413 agent: "explore".to_string(),
1414 description: "Test".to_string(),
1415 prompt: "Prompt".to_string(),
1416 background: false,
1417 max_steps: None,
1418 }],
1419 };
1420 let cloned = params.clone();
1421 assert_eq!(params.tasks.len(), cloned.tasks.len());
1422 assert_eq!(params.tasks[0].agent, cloned.tasks[0].agent);
1423 }
1424
1425 #[test]
1426 fn test_parallel_task_params_schema() {
1427 let schema = parallel_task_params_schema();
1428 assert_eq!(schema["type"], "object");
1429 assert_eq!(schema["additionalProperties"], false);
1430 assert!(schema["properties"]["tasks"].is_object());
1431 assert_eq!(schema["properties"]["tasks"]["type"], "array");
1432 assert_eq!(schema["properties"]["tasks"]["minItems"], 1);
1433 }
1434
1435 #[test]
1436 fn test_parallel_task_params_schema_required() {
1437 let schema = parallel_task_params_schema();
1438 let required = schema["required"].as_array().unwrap();
1439 assert!(required.contains(&serde_json::json!("tasks")));
1440 }
1441
1442 #[test]
1443 fn test_parallel_task_params_schema_items() {
1444 let schema = parallel_task_params_schema();
1445 let items = &schema["properties"]["tasks"]["items"];
1446 assert_eq!(items["type"], "object");
1447 assert_eq!(items["additionalProperties"], false);
1448 let item_required = items["required"].as_array().unwrap();
1449 assert!(item_required.contains(&serde_json::json!("agent")));
1450 assert!(item_required.contains(&serde_json::json!("description")));
1451 assert!(item_required.contains(&serde_json::json!("prompt")));
1452 }
1453
1454 #[test]
1455 fn test_task_schema_examples_use_delegation_core() {
1456 let task = task_params_schema();
1457 let task_examples = task["examples"].as_array().unwrap();
1458 assert_eq!(task_examples[0]["agent"], "explore");
1459 assert!(task_examples[0].get("task").is_none());
1460
1461 let parallel = parallel_task_params_schema();
1462 let parallel_examples = parallel["examples"].as_array().unwrap();
1463 assert!(!parallel_examples[0]["tasks"].as_array().unwrap().is_empty());
1464 }
1465
1466 #[test]
1467 fn test_parallel_task_params_debug() {
1468 let params = ParallelTaskParams {
1469 tasks: vec![TaskParams {
1470 agent: "explore".to_string(),
1471 description: "Debug test".to_string(),
1472 prompt: "Test".to_string(),
1473 background: false,
1474 max_steps: None,
1475 }],
1476 };
1477 let debug_str = format!("{:?}", params);
1478 assert!(debug_str.contains("explore"));
1479 assert!(debug_str.contains("Debug test"));
1480 }
1481
1482 #[test]
1483 fn test_parallel_task_params_large_count() {
1484 let tasks: Vec<TaskParams> = (0..150)
1486 .map(|i| TaskParams {
1487 agent: "explore".to_string(),
1488 description: format!("Task {}", i),
1489 prompt: format!("Prompt for task {}", i),
1490 background: false,
1491 max_steps: Some(10),
1492 })
1493 .collect();
1494
1495 let params = ParallelTaskParams { tasks };
1496 let json = serde_json::to_string(¶ms).unwrap();
1497 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1498 assert_eq!(deserialized.tasks.len(), 150);
1499 assert_eq!(deserialized.tasks[0].description, "Task 0");
1500 assert_eq!(deserialized.tasks[149].description, "Task 149");
1501 }
1502
1503 #[test]
1504 fn test_task_params_max_steps_zero() {
1505 let params = TaskParams {
1507 agent: "explore".to_string(),
1508 description: "Edge case".to_string(),
1509 prompt: "Zero steps".to_string(),
1510 background: false,
1511 max_steps: Some(0),
1512 };
1513 let json = serde_json::to_string(¶ms).unwrap();
1514 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1515 assert_eq!(deserialized.max_steps, Some(0));
1516 }
1517
1518 #[test]
1519 fn test_parallel_task_params_all_background() {
1520 let tasks: Vec<TaskParams> = (0..5)
1521 .map(|i| TaskParams {
1522 agent: "general".to_string(),
1523 description: format!("BG task {}", i),
1524 prompt: "Run in background".to_string(),
1525 background: true,
1526 max_steps: None,
1527 })
1528 .collect();
1529 let params = ParallelTaskParams { tasks };
1530 for task in ¶ms.tasks {
1531 assert!(task.background);
1532 }
1533 }
1534
1535 #[test]
1536 fn test_task_params_rejects_permissive_field() {
1537 let json = r#"{
1538 "agent": "general",
1539 "description": "Legacy field rejection",
1540 "prompt": "Verify legacy fields are rejected",
1541 "permissive": true
1542 }"#;
1543
1544 let result: Result<TaskParams, _> = serde_json::from_str(json);
1545 assert!(result.is_err());
1546 }
1547
1548 #[test]
1549 fn test_task_params_schema_hides_permissive_field() {
1550 let schema = task_params_schema();
1551 let props = &schema["properties"];
1552
1553 assert!(props.get("permissive").is_none());
1554 }
1555
1556 use crate::agent::tests::MockLlmClient;
1561 use crate::llm::{ContentBlock, LlmResponse, Message, StreamEvent, TokenUsage, ToolDefinition};
1562 use crate::permissions::PermissionPolicy;
1563 use crate::subagent::AgentRegistry;
1564 use std::sync::atomic::{AtomicUsize, Ordering};
1565 use std::time::Duration;
1566 use tokio::sync::{mpsc, Barrier};
1567
1568 fn text_response(text: impl Into<String>) -> LlmResponse {
1569 LlmResponse {
1570 message: Message {
1571 role: "assistant".to_string(),
1572 content: vec![ContentBlock::Text { text: text.into() }],
1573 reasoning_content: None,
1574 },
1575 usage: TokenUsage {
1576 prompt_tokens: 10,
1577 completion_tokens: 5,
1578 total_tokens: 15,
1579 cache_read_tokens: None,
1580 cache_write_tokens: None,
1581 },
1582 stop_reason: Some("end_turn".to_string()),
1583 meta: None,
1584 }
1585 }
1586
1587 fn pre_analysis_response(messages: &[Message]) -> LlmResponse {
1588 let prompt = last_text(messages);
1589 let response = serde_json::json!({
1590 "intent": "GeneralPurpose",
1591 "requires_planning": false,
1592 "goal": {
1593 "description": prompt,
1594 "success_criteria": []
1595 },
1596 "execution_plan": {
1597 "complexity": "Simple",
1598 "steps": [{
1599 "id": "step-1",
1600 "description": prompt,
1601 "tool": null,
1602 "dependencies": [],
1603 "success_criteria": "Complete the request"
1604 }],
1605 "required_tools": []
1606 },
1607 "optimized_input": prompt
1608 });
1609 text_response(response.to_string())
1610 }
1611
1612 fn last_text(messages: &[Message]) -> String {
1613 messages
1614 .last()
1615 .and_then(|message| {
1616 message.content.iter().find_map(|block| {
1617 if let ContentBlock::Text { text } = block {
1618 Some(text.clone())
1619 } else {
1620 None
1621 }
1622 })
1623 })
1624 .unwrap_or_default()
1625 }
1626
1627 struct SchemaCoercionClient;
1632
1633 #[async_trait::async_trait]
1634 impl LlmClient for SchemaCoercionClient {
1635 async fn complete(
1636 &self,
1637 messages: &[Message],
1638 system: Option<&str>,
1639 tools: &[ToolDefinition],
1640 ) -> Result<LlmResponse> {
1641 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1642 return Ok(pre_analysis_response(messages));
1643 }
1644 if tools.iter().any(|t| t.name == "emit_step_output") {
1647 return Ok(MockLlmClient::tool_call_response(
1648 "coerce-1",
1649 "emit_step_output",
1650 serde_json::json!({ "verdict": "ok" }),
1651 ));
1652 }
1653 Ok(text_response("The verdict is ok."))
1654 }
1655
1656 async fn complete_streaming(
1657 &self,
1658 _messages: &[Message],
1659 _system: Option<&str>,
1660 _tools: &[ToolDefinition],
1661 _cancel_token: tokio_util::sync::CancellationToken,
1662 ) -> Result<mpsc::Receiver<StreamEvent>> {
1663 anyhow::bail!("streaming is not used by schema coercion tests")
1664 }
1665 }
1666
1667 fn verdict_schema() -> serde_json::Value {
1668 serde_json::json!({
1669 "type": "object",
1670 "properties": { "verdict": { "type": "string" } },
1671 "required": ["verdict"]
1672 })
1673 }
1674
1675 #[tokio::test]
1676 async fn execute_step_with_schema_coerces_structured_output() {
1677 let workspace = tempfile::tempdir().unwrap();
1678 let executor = TaskExecutor::new(
1679 Arc::new(AgentRegistry::new()),
1680 Arc::new(SchemaCoercionClient),
1681 workspace.path().to_string_lossy().to_string(),
1682 );
1683 let spec = AgentStepSpec::new("step-1", "general", "assess", "Assess the thing.")
1684 .with_output_schema(verdict_schema());
1685
1686 let outcome = executor.execute_step(spec, None).await;
1687
1688 assert!(outcome.success, "step should succeed: {}", outcome.output);
1689 assert_eq!(
1690 outcome.structured,
1691 Some(serde_json::json!({ "verdict": "ok" })),
1692 "a schema'd step returns the validated object in `structured`"
1693 );
1694 }
1695
1696 #[tokio::test]
1697 async fn execute_step_without_schema_has_no_structured_output() {
1698 let workspace = tempfile::tempdir().unwrap();
1699 let executor = TaskExecutor::new(
1700 Arc::new(AgentRegistry::new()),
1701 Arc::new(SchemaCoercionClient),
1702 workspace.path().to_string_lossy().to_string(),
1703 );
1704 let spec = AgentStepSpec::new("step-2", "general", "assess", "Assess the thing.");
1705
1706 let outcome = executor.execute_step(spec, None).await;
1707
1708 assert!(outcome.success, "step should succeed: {}", outcome.output);
1709 assert_eq!(
1710 outcome.structured, None,
1711 "no schema requested → no structured output, no coercion call"
1712 );
1713 }
1714
1715 struct SchemaFailClient;
1719
1720 #[async_trait::async_trait]
1721 impl LlmClient for SchemaFailClient {
1722 async fn complete(
1723 &self,
1724 messages: &[Message],
1725 system: Option<&str>,
1726 tools: &[ToolDefinition],
1727 ) -> Result<LlmResponse> {
1728 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1729 return Ok(pre_analysis_response(messages));
1730 }
1731 if tools.iter().any(|t| t.name == "emit_step_output") {
1732 return Ok(MockLlmClient::tool_call_response(
1734 "coerce-fail",
1735 "emit_step_output",
1736 serde_json::json!({}),
1737 ));
1738 }
1739 Ok(text_response("some answer"))
1740 }
1741
1742 async fn complete_streaming(
1743 &self,
1744 _messages: &[Message],
1745 _system: Option<&str>,
1746 _tools: &[ToolDefinition],
1747 _cancel_token: tokio_util::sync::CancellationToken,
1748 ) -> Result<mpsc::Receiver<StreamEvent>> {
1749 anyhow::bail!("streaming unused")
1750 }
1751 }
1752
1753 #[tokio::test]
1754 async fn execute_step_with_schema_demotes_step_on_coercion_failure() {
1755 let workspace = tempfile::tempdir().unwrap();
1756 let executor = TaskExecutor::new(
1757 Arc::new(AgentRegistry::new()),
1758 Arc::new(SchemaFailClient),
1759 workspace.path().to_string_lossy().to_string(),
1760 );
1761 let spec = AgentStepSpec::new("step-x", "general", "assess", "Assess the thing.")
1762 .with_output_schema(verdict_schema());
1763
1764 let outcome = executor.execute_step(spec, None).await;
1765
1766 assert!(
1767 !outcome.success,
1768 "a step whose output can't satisfy the schema is demoted to failure"
1769 );
1770 assert_eq!(outcome.structured, None, "no validated object on failure");
1771 assert!(
1772 outcome.output.contains("[structured output failed"),
1773 "the demotion marker is appended: {}",
1774 outcome.output
1775 );
1776 }
1777
1778 #[tokio::test]
1779 async fn parallel_isolates_schema_coercion_failure_from_sibling() {
1780 let workspace = tempfile::tempdir().unwrap();
1781 let executor: Arc<dyn AgentExecutor> = Arc::new(TaskExecutor::new(
1782 Arc::new(AgentRegistry::new()),
1783 Arc::new(SchemaFailClient),
1784 workspace.path().to_string_lossy().to_string(),
1785 ));
1786 let specs = vec![
1789 AgentStepSpec::new("plain", "general", "d", "p"),
1790 AgentStepSpec::new("schemad", "general", "d", "p").with_output_schema(verdict_schema()),
1791 ];
1792 let out = crate::orchestration::execute_steps_parallel(executor, specs, None).await;
1793
1794 assert_eq!(out.len(), 2);
1795 assert_eq!(out[0].task_id, "plain");
1796 assert!(out[0].success, "no-schema sibling unaffected");
1797 assert_eq!(out[0].structured, None);
1798 assert_eq!(out[1].task_id, "schemad");
1799 assert!(!out[1].success, "schema-failing step surfaces as failure");
1800 assert_eq!(out[1].structured, None);
1801 assert!(out[1].output.contains("[structured output failed"));
1802 }
1803
1804 #[tokio::test]
1805 async fn failed_step_with_schema_skips_coercion() {
1806 let workspace = tempfile::tempdir().unwrap();
1807 let executor = TaskExecutor::new(
1808 Arc::new(AgentRegistry::new()),
1809 Arc::new(SchemaCoercionClient),
1810 workspace.path().to_string_lossy().to_string(),
1811 );
1812 let spec = AgentStepSpec::new("step-y", "no-such-agent", "d", "p")
1815 .with_output_schema(verdict_schema());
1816
1817 let outcome = executor.execute_step(spec, None).await;
1818
1819 assert!(!outcome.success);
1820 assert_eq!(outcome.structured, None);
1821 assert!(
1822 !outcome.output.contains("[structured output failed"),
1823 "coercion never ran — failure is the run error, not a coercion failure: {}",
1824 outcome.output
1825 );
1826 }
1827
1828 struct StaticLlmClient {
1829 text: String,
1830 }
1831
1832 impl StaticLlmClient {
1833 fn new(text: impl Into<String>) -> Self {
1834 Self { text: text.into() }
1835 }
1836 }
1837
1838 #[async_trait::async_trait]
1839 impl LlmClient for StaticLlmClient {
1840 async fn complete(
1841 &self,
1842 messages: &[Message],
1843 system: Option<&str>,
1844 _tools: &[ToolDefinition],
1845 ) -> Result<LlmResponse> {
1846 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1847 return Ok(pre_analysis_response(messages));
1848 }
1849 Ok(text_response(self.text.clone()))
1850 }
1851
1852 async fn complete_streaming(
1853 &self,
1854 _messages: &[Message],
1855 _system: Option<&str>,
1856 _tools: &[ToolDefinition],
1857 _cancel_token: tokio_util::sync::CancellationToken,
1858 ) -> Result<mpsc::Receiver<StreamEvent>> {
1859 anyhow::bail!("streaming is not used by task executor tests")
1860 }
1861 }
1862
1863 struct ConcurrentLlmClient {
1864 barrier: Arc<Barrier>,
1865 active: AtomicUsize,
1866 max_active: AtomicUsize,
1867 }
1868
1869 impl ConcurrentLlmClient {
1870 fn new(task_count: usize) -> Self {
1871 Self {
1872 barrier: Arc::new(Barrier::new(task_count)),
1873 active: AtomicUsize::new(0),
1874 max_active: AtomicUsize::new(0),
1875 }
1876 }
1877
1878 fn max_active(&self) -> usize {
1879 self.max_active.load(Ordering::SeqCst)
1880 }
1881
1882 fn record_active(&self) {
1883 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1884 let mut observed = self.max_active.load(Ordering::SeqCst);
1885 while active > observed {
1886 match self.max_active.compare_exchange(
1887 observed,
1888 active,
1889 Ordering::SeqCst,
1890 Ordering::SeqCst,
1891 ) {
1892 Ok(_) => break,
1893 Err(next) => observed = next,
1894 }
1895 }
1896 }
1897 }
1898
1899 struct LimitedConcurrencyLlmClient {
1900 active: AtomicUsize,
1901 max_active: AtomicUsize,
1902 }
1903
1904 impl LimitedConcurrencyLlmClient {
1905 fn new() -> Self {
1906 Self {
1907 active: AtomicUsize::new(0),
1908 max_active: AtomicUsize::new(0),
1909 }
1910 }
1911
1912 fn max_active(&self) -> usize {
1913 self.max_active.load(Ordering::SeqCst)
1914 }
1915
1916 fn record_active(&self) {
1917 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1918 self.max_active.fetch_max(active, Ordering::SeqCst);
1919 }
1920 }
1921
1922 #[async_trait::async_trait]
1923 impl LlmClient for LimitedConcurrencyLlmClient {
1924 async fn complete(
1925 &self,
1926 messages: &[Message],
1927 system: Option<&str>,
1928 _tools: &[ToolDefinition],
1929 ) -> Result<LlmResponse> {
1930 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1931 return Ok(pre_analysis_response(messages));
1932 }
1933
1934 let prompt = last_text(messages);
1935 self.record_active();
1936 tokio::time::sleep(Duration::from_millis(40)).await;
1937 self.active.fetch_sub(1, Ordering::SeqCst);
1938 Ok(text_response(format!("completed: {prompt}")))
1939 }
1940
1941 async fn complete_streaming(
1942 &self,
1943 _messages: &[Message],
1944 _system: Option<&str>,
1945 _tools: &[ToolDefinition],
1946 _cancel_token: tokio_util::sync::CancellationToken,
1947 ) -> Result<mpsc::Receiver<StreamEvent>> {
1948 anyhow::bail!("streaming is not used by task executor tests")
1949 }
1950 }
1951
1952 #[async_trait::async_trait]
1953 impl LlmClient for ConcurrentLlmClient {
1954 async fn complete(
1955 &self,
1956 messages: &[Message],
1957 system: Option<&str>,
1958 _tools: &[ToolDefinition],
1959 ) -> Result<LlmResponse> {
1960 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1961 return Ok(pre_analysis_response(messages));
1962 }
1963
1964 let prompt = last_text(messages);
1965 self.record_active();
1966 self.barrier.wait().await;
1967 if prompt.contains("slow") {
1968 tokio::time::sleep(Duration::from_millis(120)).await;
1969 } else {
1970 tokio::time::sleep(Duration::from_millis(10)).await;
1971 }
1972 self.active.fetch_sub(1, Ordering::SeqCst);
1973 Ok(text_response(format!("completed: {prompt}")))
1974 }
1975
1976 async fn complete_streaming(
1977 &self,
1978 _messages: &[Message],
1979 _system: Option<&str>,
1980 _tools: &[ToolDefinition],
1981 _cancel_token: tokio_util::sync::CancellationToken,
1982 ) -> Result<mpsc::Receiver<StreamEvent>> {
1983 anyhow::bail!("streaming is not used by task executor tests")
1984 }
1985 }
1986
1987 fn test_registry_with_writer() -> Arc<AgentRegistry> {
1988 let registry = AgentRegistry::new();
1989 let spec = crate::subagent::WorkerAgentSpec::custom("writer", "Write files")
1990 .with_permissions(PermissionPolicy::new().allow("write(*)").allow("read(*)"))
1991 .with_prompt("Write files when asked.")
1992 .with_max_steps(3);
1993 registry.register(spec.into_agent_definition());
1994 Arc::new(registry)
1995 }
1996
1997 fn test_registry_with_text_worker() -> Arc<AgentRegistry> {
1998 let registry = AgentRegistry::new();
1999 let spec = crate::subagent::WorkerAgentSpec::custom("worker", "Text worker")
2000 .with_prompt("Return a concise result.")
2001 .with_max_steps(1);
2002 registry.register(spec.into_agent_definition());
2003 Arc::new(registry)
2004 }
2005
2006 #[tokio::test]
2007 async fn task_child_run_permission_allow() {
2008 let workspace = tempfile::tempdir().unwrap();
2009 let mock = Arc::new(MockLlmClient::new(vec![
2010 MockLlmClient::tool_call_response(
2011 "t1",
2012 "write",
2013 serde_json::json!({
2014 "file_path": workspace.path().join("out.txt").to_string_lossy(),
2015 "content": "WRITTEN"
2016 }),
2017 ),
2018 MockLlmClient::text_response("Done."),
2019 ]));
2020
2021 let executor = TaskExecutor::new(
2022 test_registry_with_writer(),
2023 mock,
2024 workspace.path().to_string_lossy().to_string(),
2025 );
2026
2027 let result = executor
2028 .execute(
2029 TaskParams {
2030 agent: "writer".to_string(),
2031 description: "Write file".to_string(),
2032 prompt: "Write out.txt".to_string(),
2033 background: false,
2034 max_steps: Some(3),
2035 },
2036 None,
2037 None,
2038 )
2039 .await
2040 .unwrap();
2041
2042 assert!(
2043 result.success,
2044 "child run should succeed: {}",
2045 result.output
2046 );
2047 assert!(
2048 !result.output.contains("Permission denied"),
2049 "no permission denial: {}",
2050 result.output
2051 );
2052 let content = std::fs::read_to_string(workspace.path().join("out.txt")).unwrap();
2053 assert_eq!(content, "WRITTEN");
2054 }
2055
2056 #[tokio::test]
2057 async fn task_child_run_permission_deny() {
2058 let workspace = tempfile::tempdir().unwrap();
2059 let registry = AgentRegistry::new();
2060 let spec = crate::subagent::WorkerAgentSpec::custom("restricted", "Restricted agent")
2061 .with_permissions(PermissionPolicy::new().allow("read(*)").deny("bash(*)"))
2062 .with_max_steps(3);
2063 registry.register(spec.into_agent_definition());
2064
2065 let mock = Arc::new(MockLlmClient::new(vec![
2066 MockLlmClient::tool_call_response(
2067 "t1",
2068 "bash",
2069 serde_json::json!({"command": "echo hello"}),
2070 ),
2071 MockLlmClient::text_response("Could not run bash."),
2072 ]));
2073
2074 let executor = TaskExecutor::new(
2075 Arc::new(registry),
2076 mock,
2077 workspace.path().to_string_lossy().to_string(),
2078 );
2079
2080 let result = executor
2081 .execute(
2082 TaskParams {
2083 agent: "restricted".to_string(),
2084 description: "Try bash".to_string(),
2085 prompt: "Run echo hello".to_string(),
2086 background: false,
2087 max_steps: Some(3),
2088 },
2089 None,
2090 None,
2091 )
2092 .await
2093 .unwrap();
2094
2095 assert!(result.success, "agent should complete: {}", result.output);
2098 }
2099
2100 #[tokio::test]
2101 async fn task_child_run_confirmation_auto_approve() {
2102 let workspace = tempfile::tempdir().unwrap();
2103 let registry = AgentRegistry::new();
2104 let spec = crate::subagent::WorkerAgentSpec::custom("reader-writer", "Read and write")
2107 .with_permissions(PermissionPolicy::new().allow("read(*)"))
2108 .with_max_steps(3);
2109 registry.register(spec.into_agent_definition());
2110
2111 let mock = Arc::new(MockLlmClient::new(vec![
2112 MockLlmClient::tool_call_response(
2113 "t1",
2114 "write",
2115 serde_json::json!({
2116 "file_path": workspace.path().join("auto.txt").to_string_lossy(),
2117 "content": "AUTO_APPROVED"
2118 }),
2119 ),
2120 MockLlmClient::text_response("Written."),
2121 ]));
2122
2123 let executor = TaskExecutor::new(
2124 Arc::new(registry),
2125 mock,
2126 workspace.path().to_string_lossy().to_string(),
2127 );
2128
2129 let result = executor
2130 .execute(
2131 TaskParams {
2132 agent: "reader-writer".to_string(),
2133 description: "Write via auto-approve".to_string(),
2134 prompt: "Write auto.txt".to_string(),
2135 background: false,
2136 max_steps: Some(3),
2137 },
2138 None,
2139 None,
2140 )
2141 .await
2142 .unwrap();
2143
2144 assert!(
2145 result.success,
2146 "Ask should be auto-approved: {}",
2147 result.output
2148 );
2149 assert!(
2150 !result.output.contains("MissingConfirmationManager"),
2151 "no MissingConfirmationManager: {}",
2152 result.output
2153 );
2154 }
2155
2156 #[tokio::test]
2157 async fn task_child_run_step_budget_enforced() {
2158 let workspace = tempfile::tempdir().unwrap();
2159 let mock = Arc::new(MockLlmClient::new(vec![
2160 MockLlmClient::tool_call_response(
2161 "t1",
2162 "read",
2163 serde_json::json!({"file_path": "/tmp/a.txt"}),
2164 ),
2165 MockLlmClient::tool_call_response(
2166 "t2",
2167 "read",
2168 serde_json::json!({"file_path": "/tmp/b.txt"}),
2169 ),
2170 MockLlmClient::tool_call_response(
2171 "t3",
2172 "read",
2173 serde_json::json!({"file_path": "/tmp/c.txt"}),
2174 ),
2175 MockLlmClient::text_response("Should not reach here."),
2176 ]));
2177
2178 let executor = TaskExecutor::new(
2179 test_registry_with_writer(),
2180 mock,
2181 workspace.path().to_string_lossy().to_string(),
2182 );
2183
2184 let result = executor
2185 .execute(
2186 TaskParams {
2187 agent: "writer".to_string(),
2188 description: "Exceed budget".to_string(),
2189 prompt: "Read many files".to_string(),
2190 background: false,
2191 max_steps: Some(2),
2192 },
2193 None,
2194 None,
2195 )
2196 .await
2197 .unwrap();
2198
2199 assert!(
2201 !result.success,
2202 "should fail when exceeding step budget: {}",
2203 result.output
2204 );
2205 assert!(
2206 result.output.contains("Max tool rounds") || result.output.contains("max tool rounds"),
2207 "error should mention tool rounds: {}",
2208 result.output
2209 );
2210 }
2211
2212 #[tokio::test]
2213 async fn parallel_task_executor_runs_children_concurrently_and_preserves_input_order() {
2214 let workspace = tempfile::tempdir().unwrap();
2215 let client = Arc::new(ConcurrentLlmClient::new(2));
2216 let executor = Arc::new(TaskExecutor::new(
2217 test_registry_with_text_worker(),
2218 client.clone(),
2219 workspace.path().to_string_lossy().to_string(),
2220 ));
2221
2222 let tasks = vec![
2223 TaskParams {
2224 agent: "worker".to_string(),
2225 description: "Slow task".to_string(),
2226 prompt: "slow branch".to_string(),
2227 background: false,
2228 max_steps: Some(1),
2229 },
2230 TaskParams {
2231 agent: "worker".to_string(),
2232 description: "Fast task".to_string(),
2233 prompt: "fast branch".to_string(),
2234 background: false,
2235 max_steps: Some(1),
2236 },
2237 ];
2238
2239 let results = tokio::time::timeout(
2240 Duration::from_secs(2),
2241 executor.execute_parallel(tasks, None, None),
2242 )
2243 .await
2244 .expect("parallel children should reach the barrier and complete");
2245
2246 assert_eq!(results.len(), 2);
2247 assert!(
2248 client.max_active() >= 2,
2249 "expected concurrent child execution, max_active={}",
2250 client.max_active()
2251 );
2252 assert!(results[0].success);
2253 assert!(results[0].output.contains("slow branch"));
2254 assert!(results[1].success);
2255 assert!(results[1].output.contains("fast branch"));
2256 }
2257
2258 #[tokio::test]
2259 async fn parallel_task_executor_respects_configured_concurrency_limit() {
2260 let workspace = tempfile::tempdir().unwrap();
2261 let client = Arc::new(LimitedConcurrencyLlmClient::new());
2262 let executor = Arc::new(
2263 TaskExecutor::new(
2264 test_registry_with_text_worker(),
2265 client.clone(),
2266 workspace.path().to_string_lossy().to_string(),
2267 )
2268 .with_max_parallel_tasks(2),
2269 );
2270
2271 let tasks = (0..5)
2272 .map(|idx| TaskParams {
2273 agent: "worker".to_string(),
2274 description: format!("Task {idx}"),
2275 prompt: format!("branch {idx}"),
2276 background: false,
2277 max_steps: Some(1),
2278 })
2279 .collect::<Vec<_>>();
2280
2281 let results = executor.execute_parallel(tasks, None, None).await;
2282
2283 assert_eq!(results.len(), 5);
2284 assert!(results.iter().all(|result| result.success));
2285 assert_eq!(client.max_active(), 2);
2286 }
2287
2288 #[tokio::test]
2289 async fn parallel_task_executor_isolates_unknown_agent_failure() {
2290 let workspace = tempfile::tempdir().unwrap();
2291 let executor = Arc::new(TaskExecutor::new(
2292 test_registry_with_text_worker(),
2293 Arc::new(StaticLlmClient::new("valid branch done")),
2294 workspace.path().to_string_lossy().to_string(),
2295 ));
2296
2297 let tasks = vec![
2298 TaskParams {
2299 agent: "missing-agent".to_string(),
2300 description: "Missing".to_string(),
2301 prompt: "should fail".to_string(),
2302 background: false,
2303 max_steps: Some(1),
2304 },
2305 TaskParams {
2306 agent: "worker".to_string(),
2307 description: "Valid".to_string(),
2308 prompt: "should succeed".to_string(),
2309 background: false,
2310 max_steps: Some(1),
2311 },
2312 ];
2313
2314 let results = executor.execute_parallel(tasks, None, None).await;
2315
2316 assert_eq!(results.len(), 2);
2317 assert!(!results[0].success);
2318 assert_eq!(results[0].agent, "missing-agent");
2319 assert!(results[0].output.contains("Unknown agent type"));
2320 assert!(results[1].success);
2321 assert_eq!(results[1].agent, "worker");
2322 assert!(results[1].output.contains("valid branch done"));
2323 }
2324
2325 #[tokio::test]
2326 async fn parallel_task_executor_emits_subagent_events_for_each_child() {
2327 let workspace = tempfile::tempdir().unwrap();
2328 let executor = Arc::new(TaskExecutor::new(
2329 test_registry_with_text_worker(),
2330 Arc::new(StaticLlmClient::new("done")),
2331 workspace.path().to_string_lossy().to_string(),
2332 ));
2333 let (tx, mut rx) = broadcast::channel(64);
2334
2335 let tasks = vec![
2336 TaskParams {
2337 agent: "worker".to_string(),
2338 description: "One".to_string(),
2339 prompt: "first".to_string(),
2340 background: false,
2341 max_steps: Some(1),
2342 },
2343 TaskParams {
2344 agent: "worker".to_string(),
2345 description: "Two".to_string(),
2346 prompt: "second".to_string(),
2347 background: false,
2348 max_steps: Some(1),
2349 },
2350 ];
2351
2352 let results = executor.execute_parallel(tasks, Some(tx), None).await;
2353 assert_eq!(results.len(), 2);
2354 tokio::time::sleep(Duration::from_millis(20)).await;
2355
2356 let mut starts = Vec::new();
2357 let mut ends = Vec::new();
2358 let mut progress_statuses: Vec<String> = Vec::new();
2359 while let Ok(event) = rx.try_recv() {
2360 match event {
2361 AgentEvent::SubagentStart { description, .. } => starts.push(description),
2362 AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)),
2363 AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status),
2364 _ => {}
2365 }
2366 }
2367
2368 starts.sort();
2369 assert_eq!(starts, vec!["One".to_string(), "Two".to_string()]);
2370 assert_eq!(ends.len(), 2);
2371 assert!(ends
2372 .iter()
2373 .all(|(agent, success)| agent == "worker" && *success));
2374 assert!(
2377 progress_statuses
2378 .iter()
2379 .filter(|s| s == &"turn_completed")
2380 .count()
2381 >= 2,
2382 "expected at least two turn_completed progress events, got {:?}",
2383 progress_statuses
2384 );
2385 }
2386
2387 #[tokio::test]
2388 async fn parallel_task_tool_reports_error_when_any_child_fails() {
2389 let workspace = tempfile::tempdir().unwrap();
2390 let executor = Arc::new(TaskExecutor::new(
2391 test_registry_with_text_worker(),
2392 Arc::new(StaticLlmClient::new("valid branch done")),
2393 workspace.path().to_string_lossy().to_string(),
2394 ));
2395 let tool = ParallelTaskTool::new(executor);
2396 let ctx = ToolContext::new(workspace.path().to_path_buf());
2397
2398 let output = tool
2399 .execute(
2400 &serde_json::json!({
2401 "tasks": [
2402 {
2403 "agent": "missing-agent",
2404 "description": "Missing",
2405 "prompt": "should fail"
2406 },
2407 {
2408 "agent": "worker",
2409 "description": "Valid",
2410 "prompt": "should succeed"
2411 }
2412 ]
2413 }),
2414 &ctx,
2415 )
2416 .await
2417 .unwrap();
2418
2419 assert!(
2420 !output.success,
2421 "parallel_task should fail when any child result fails"
2422 );
2423 assert!(output.content.contains("[ERR]"));
2424 assert!(output.content.contains("[OK]"));
2425 let metadata = output.metadata.expect("metadata");
2426 assert_eq!(metadata["task_count"], 2);
2427 assert_eq!(metadata["results"][0]["success"], false);
2428 assert_eq!(metadata["results"][1]["success"], true);
2429 }
2430
2431 #[tokio::test]
2432 async fn parallel_task_both_inherit_permissions() {
2433 let workspace = tempfile::tempdir().unwrap();
2434 let mock = Arc::new(MockLlmClient::new(vec![
2435 MockLlmClient::tool_call_response(
2437 "t1",
2438 "write",
2439 serde_json::json!({
2440 "file_path": workspace.path().join("p1.txt").to_string_lossy(),
2441 "content": "P1"
2442 }),
2443 ),
2444 MockLlmClient::text_response("Done 1."),
2445 MockLlmClient::tool_call_response(
2447 "t2",
2448 "write",
2449 serde_json::json!({
2450 "file_path": workspace.path().join("p2.txt").to_string_lossy(),
2451 "content": "P2"
2452 }),
2453 ),
2454 MockLlmClient::text_response("Done 2."),
2455 ]));
2456
2457 let executor = Arc::new(TaskExecutor::new(
2458 test_registry_with_writer(),
2459 mock,
2460 workspace.path().to_string_lossy().to_string(),
2461 ));
2462
2463 let tasks = vec![
2464 TaskParams {
2465 agent: "writer".to_string(),
2466 description: "Write p1".to_string(),
2467 prompt: "Write p1.txt".to_string(),
2468 background: false,
2469 max_steps: Some(3),
2470 },
2471 TaskParams {
2472 agent: "writer".to_string(),
2473 description: "Write p2".to_string(),
2474 prompt: "Write p2.txt".to_string(),
2475 background: false,
2476 max_steps: Some(3),
2477 },
2478 ];
2479
2480 let results = executor.execute_parallel(tasks, None, None).await;
2481 assert_eq!(results.len(), 2);
2482
2483 for result in &results {
2484 assert!(
2485 result.success,
2486 "parallel child should succeed: {}",
2487 result.output
2488 );
2489 }
2490 }
2491
2492 #[test]
2493 fn synthesize_progress_emits_tool_completed_for_tool_end() {
2494 let event = AgentEvent::ToolEnd {
2495 id: "call-1".to_string(),
2496 name: "bash".to_string(),
2497 output: "hello".to_string(),
2498 exit_code: 0,
2499 metadata: None,
2500 error_kind: None,
2501 };
2502 let progress =
2503 synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2504 match progress {
2505 AgentEvent::SubagentProgress {
2506 task_id,
2507 session_id,
2508 status,
2509 metadata,
2510 } => {
2511 assert_eq!(task_id, "task-1");
2512 assert_eq!(session_id, "task-run-task-1");
2513 assert_eq!(status, "tool_completed");
2514 assert_eq!(metadata["tool"], "bash");
2515 assert_eq!(metadata["exit_code"], 0);
2516 assert_eq!(metadata["output_bytes"], 5);
2517 assert!(metadata.get("error_kind").is_none());
2518 }
2519 other => panic!("expected SubagentProgress, got {:?}", other),
2520 }
2521 }
2522
2523 #[test]
2524 fn synthesize_progress_includes_error_kind_when_present() {
2525 let event = AgentEvent::ToolEnd {
2526 id: "call-2".to_string(),
2527 name: "edit".to_string(),
2528 output: "boom".to_string(),
2529 exit_code: 1,
2530 metadata: None,
2531 error_kind: Some(crate::tools::ToolErrorKind::NotFound {
2532 path: "missing.txt".to_string(),
2533 }),
2534 };
2535 let progress =
2536 synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some");
2537 if let AgentEvent::SubagentProgress { metadata, .. } = progress {
2538 assert!(
2539 metadata.get("error_kind").is_some(),
2540 "error_kind should propagate into metadata"
2541 );
2542 } else {
2543 panic!("expected SubagentProgress");
2544 }
2545 }
2546
2547 #[test]
2548 fn synthesize_progress_emits_turn_completed_for_turn_end() {
2549 let event = AgentEvent::TurnEnd {
2550 turn: 3,
2551 usage: crate::llm::TokenUsage {
2552 prompt_tokens: 100,
2553 completion_tokens: 25,
2554 total_tokens: 125,
2555 cache_read_tokens: None,
2556 cache_write_tokens: None,
2557 },
2558 };
2559 let progress =
2560 synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2561 if let AgentEvent::SubagentProgress {
2562 status, metadata, ..
2563 } = progress
2564 {
2565 assert_eq!(status, "turn_completed");
2566 assert_eq!(metadata["turn"], 3);
2567 assert_eq!(metadata["total_tokens"], 125);
2568 assert_eq!(metadata["prompt_tokens"], 100);
2569 assert_eq!(metadata["completion_tokens"], 25);
2570 } else {
2571 panic!("expected SubagentProgress");
2572 }
2573 }
2574
2575 #[test]
2576 fn synthesize_progress_ignores_unrelated_events() {
2577 let ignored = [
2578 AgentEvent::TextDelta {
2579 text: "hi".to_string(),
2580 },
2581 AgentEvent::ToolStart {
2582 id: "x".to_string(),
2583 name: "bash".to_string(),
2584 },
2585 AgentEvent::TurnStart { turn: 1 },
2586 AgentEvent::SubagentStart {
2587 task_id: "nested".to_string(),
2588 session_id: "nested-run".to_string(),
2589 parent_session_id: "parent".to_string(),
2590 agent: "explore".to_string(),
2591 description: "nested".to_string(),
2592 },
2593 ];
2594 for event in &ignored {
2595 assert!(
2596 synthesize_subagent_progress(event, "task", "session").is_none(),
2597 "{:?} should not emit progress",
2598 event
2599 );
2600 }
2601 }
2602}