1use crate::agent::{AgentConfig, AgentEvent, AgentLoop};
18use crate::llm::LlmClient;
19use crate::mcp::manager::McpManager;
20use crate::subagent::AgentRegistry;
21use crate::tools::types::{Tool, ToolContext, ToolOutput};
22use anyhow::{Context, Result};
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use std::path::PathBuf;
26use std::sync::Arc;
27use tokio::sync::broadcast;
28
29const TASK_OUTPUT_CONTEXT_LIMIT: usize = 4_000;
30const TASK_OUTPUT_CONTEXT_HEAD: usize = 3_000;
31const TASK_OUTPUT_CONTEXT_TAIL: usize = 800;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(deny_unknown_fields)]
36pub struct TaskParams {
37 pub agent: String,
39 pub description: String,
41 pub prompt: String,
43 #[serde(default)]
45 pub background: bool,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub max_steps: Option<usize>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct TaskResult {
54 pub output: String,
56 pub session_id: String,
58 pub agent: String,
60 pub success: bool,
62 pub task_id: String,
64}
65
66fn compact_task_output(output: &str) -> (String, bool) {
67 if output.len() <= TASK_OUTPUT_CONTEXT_LIMIT {
68 return (output.to_string(), false);
69 }
70
71 let head = crate::text::truncate_utf8(output, TASK_OUTPUT_CONTEXT_HEAD);
72 let tail_start = output
73 .char_indices()
74 .find_map(|(idx, _)| {
75 if output.len().saturating_sub(idx) <= TASK_OUTPUT_CONTEXT_TAIL {
76 Some(idx)
77 } else {
78 None
79 }
80 })
81 .unwrap_or(output.len());
82 let tail = &output[tail_start..];
83
84 (
85 format!(
86 "{}\n\n[{} bytes omitted from delegated task output]\n\n{}",
87 head,
88 output.len().saturating_sub(head.len() + tail.len()),
89 tail
90 ),
91 true,
92 )
93}
94
95fn synthesize_subagent_progress(
106 event: &AgentEvent,
107 task_id: &str,
108 session_id: &str,
109) -> Option<AgentEvent> {
110 match event {
111 AgentEvent::ToolEnd {
112 name,
113 output,
114 exit_code,
115 error_kind,
116 ..
117 } => {
118 let mut metadata = serde_json::json!({
119 "tool": name,
120 "exit_code": exit_code,
121 "output_bytes": output.len(),
122 });
123 if let Some(kind) = error_kind {
124 metadata["error_kind"] =
125 serde_json::to_value(kind).unwrap_or(serde_json::Value::Null);
126 }
127 Some(AgentEvent::SubagentProgress {
128 task_id: task_id.to_string(),
129 session_id: session_id.to_string(),
130 status: "tool_completed".to_string(),
131 metadata,
132 })
133 }
134 AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress {
135 task_id: task_id.to_string(),
136 session_id: session_id.to_string(),
137 status: "turn_completed".to_string(),
138 metadata: serde_json::json!({
139 "turn": turn,
140 "total_tokens": usage.total_tokens,
141 "prompt_tokens": usage.prompt_tokens,
142 "completion_tokens": usage.completion_tokens,
143 }),
144 }),
145 _ => None,
146 }
147}
148
149fn task_artifact_id(result: &TaskResult) -> String {
150 format!("task-output:{}", result.task_id)
151}
152
153fn task_artifact_uri(result: &TaskResult) -> String {
154 format!(
155 "a3s://tasks/{}/runs/{}/output",
156 result.session_id, result.task_id
157 )
158}
159
160fn format_task_result_for_context(result: &TaskResult) -> (String, bool) {
161 let (output, truncated) = compact_task_output(&result.output);
162 let status = if result.success {
163 "completed"
164 } else {
165 "failed"
166 };
167 let artifact_id = task_artifact_id(result);
168 let artifact_uri = task_artifact_uri(result);
169 let mut formatted = format!(
170 "Task {status}: {}\nAgent: {}\nSession: {}\nTask ID: {}\nArtifact ID: {}\nArtifact URI: {}\n",
171 result.task_id, result.agent, result.session_id, result.task_id, artifact_id, artifact_uri
172 );
173 if truncated {
174 formatted.push_str(
175 "Output excerpt: truncated for parent context. Use the artifact URI or child run session/events if exact omitted content is needed.\n",
176 );
177 } else {
178 formatted.push_str("Output:\n");
179 }
180 formatted.push_str(&output);
181 (formatted, truncated)
182}
183
184pub struct TaskExecutor {
186 registry: Arc<AgentRegistry>,
188 llm_client: Arc<dyn LlmClient>,
190 workspace: String,
192 mcp_manager: Option<Arc<McpManager>>,
194 parent_context: Option<crate::child_run::ChildRunContext>,
196 max_parallel_tasks: usize,
197 subagent_tracker: Option<Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>>,
200}
201
202impl TaskExecutor {
203 pub fn new(
205 registry: Arc<AgentRegistry>,
206 llm_client: Arc<dyn LlmClient>,
207 workspace: String,
208 ) -> Self {
209 Self {
210 registry,
211 llm_client,
212 workspace,
213 mcp_manager: None,
214 parent_context: None,
215 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
216 subagent_tracker: None,
217 }
218 }
219
220 pub fn with_mcp(
222 registry: Arc<AgentRegistry>,
223 llm_client: Arc<dyn LlmClient>,
224 workspace: String,
225 mcp_manager: Arc<McpManager>,
226 ) -> Self {
227 Self {
228 registry,
229 llm_client,
230 workspace,
231 mcp_manager: Some(mcp_manager),
232 parent_context: None,
233 max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
234 subagent_tracker: None,
235 }
236 }
237
238 pub fn with_parent_context(mut self, ctx: crate::child_run::ChildRunContext) -> Self {
240 if let Some(max_parallel_tasks) = ctx.max_parallel_tasks {
241 self.max_parallel_tasks = max_parallel_tasks.max(1);
242 }
243 self.parent_context = Some(ctx);
244 self
245 }
246
247 pub fn with_max_parallel_tasks(mut self, max_parallel_tasks: usize) -> Self {
248 self.max_parallel_tasks = max_parallel_tasks.max(1);
249 self
250 }
251
252 pub fn with_subagent_tracker(
256 mut self,
257 tracker: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
258 ) -> Self {
259 self.subagent_tracker = Some(tracker);
260 self
261 }
262
263 pub async fn execute(
268 &self,
269 params: TaskParams,
270 event_tx: Option<broadcast::Sender<AgentEvent>>,
271 parent_session_id: Option<&str>,
272 ) -> Result<TaskResult> {
273 let task_id = format!("task-{}", uuid::Uuid::new_v4());
274 self.execute_with_task_id(task_id, params, event_tx, parent_session_id, true)
275 .await
276 }
277
278 pub async fn execute_with_task_id(
283 &self,
284 task_id: String,
285 params: TaskParams,
286 event_tx: Option<broadcast::Sender<AgentEvent>>,
287 parent_session_id: Option<&str>,
288 emit_start: bool,
289 ) -> Result<TaskResult> {
290 let session_id = format!("task-run-{}", task_id);
291
292 let agent = self
293 .registry
294 .get(¶ms.agent)
295 .context(format!("Unknown agent type: '{}'", params.agent))?;
296
297 if emit_start {
298 if let Some(ref tx) = event_tx {
299 let _ = tx.send(AgentEvent::SubagentStart {
300 task_id: task_id.clone(),
301 session_id: session_id.clone(),
302 parent_session_id: parent_session_id.unwrap_or_default().to_string(),
303 agent: params.agent.clone(),
304 description: params.description.clone(),
305 });
306 }
307 }
308
309 let child_executor = if let Some(ref parent_ctx) = self.parent_context {
312 if let Some(ref services) = parent_ctx.workspace_services {
313 crate::tools::ToolExecutor::new_with_workspace_services_and_artifact_limits(
314 self.workspace.clone(),
315 Arc::clone(services),
316 crate::tools::ArtifactStoreLimits::default(),
317 )
318 } else {
319 crate::tools::ToolExecutor::new(self.workspace.clone())
320 }
321 } else {
322 crate::tools::ToolExecutor::new(self.workspace.clone())
323 };
324
325 if let Some(ref mcp) = self.mcp_manager {
327 let all_tools = mcp.get_all_tools().await;
328 let mut by_server: std::collections::HashMap<
329 String,
330 Vec<crate::mcp::protocol::McpTool>,
331 > = std::collections::HashMap::new();
332 for (server, tool) in all_tools {
333 by_server.entry(server).or_default().push(tool);
334 }
335 for (server_name, tools) in by_server {
336 let wrappers =
337 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp));
338 for wrapper in wrappers {
339 child_executor.register_dynamic_tool(wrapper);
340 }
341 }
342 }
343
344 let child_executor = Arc::new(child_executor);
345
346 let mut child_config = AgentConfig {
347 tools: child_executor.definitions(),
348 ..AgentConfig::default()
349 };
350 agent.apply_to(&mut child_config);
351 if let Some(ref parent_ctx) = self.parent_context {
352 parent_ctx.apply_to(&mut child_config);
353 }
354 if let Some(max_steps) = params.max_steps {
355 child_config.max_tool_rounds = max_steps;
356 }
357
358 let mut tool_context =
359 ToolContext::new(PathBuf::from(&self.workspace)).with_session_id(session_id.clone());
360 if let Some(ref parent_ctx) = self.parent_context {
361 if let Some(ref services) = parent_ctx.workspace_services {
362 tool_context = tool_context.with_workspace_services(Arc::clone(services));
363 }
364 }
365
366 let agent_loop = AgentLoop::new(
367 Arc::clone(&self.llm_client),
368 child_executor,
369 tool_context,
370 child_config,
371 );
372
373 let child_event_tx = if let Some(ref broadcast_tx) = event_tx {
378 let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100);
379 let broadcast_tx_clone = broadcast_tx.clone();
380 let progress_task_id = task_id.clone();
381 let progress_session_id = session_id.clone();
382
383 tokio::spawn(async move {
384 while let Some(event) = mpsc_rx.recv().await {
385 if let Some(progress) = synthesize_subagent_progress(
386 &event,
387 &progress_task_id,
388 &progress_session_id,
389 ) {
390 let _ = broadcast_tx_clone.send(progress);
391 }
392 let _ = broadcast_tx_clone.send(event);
393 }
394 });
395
396 Some(mpsc_tx)
397 } else {
398 None
399 };
400
401 let cancel_token = tokio_util::sync::CancellationToken::new();
404 if let Some(ref tracker) = self.subagent_tracker {
405 tracker
406 .register_canceller(&task_id, cancel_token.clone())
407 .await;
408 }
409
410 let (output, success) = match agent_loop
411 .execute_with_session(
412 &[],
413 ¶ms.prompt,
414 Some(&session_id),
415 child_event_tx,
416 Some(&cancel_token),
417 )
418 .await
419 {
420 Ok(result) => (result.text, true),
421 Err(e) if cancel_token.is_cancelled() => {
422 (format!("Task cancelled by caller: {}", e), false)
423 }
424 Err(e) => (format!("Task failed: {}", e), false),
425 };
426
427 if let Some(ref tracker) = self.subagent_tracker {
428 tracker.clear_canceller(&task_id).await;
429 }
430
431 if let Some(ref tx) = event_tx {
432 let _ = tx.send(AgentEvent::SubagentEnd {
433 task_id: task_id.clone(),
434 session_id: session_id.clone(),
435 agent: params.agent.clone(),
436 output: output.clone(),
437 success,
438 });
439 }
440
441 Ok(TaskResult {
442 output,
443 session_id,
444 agent: params.agent,
445 success,
446 task_id,
447 })
448 }
449
450 pub fn execute_background(
458 self: Arc<Self>,
459 params: TaskParams,
460 event_tx: Option<broadcast::Sender<AgentEvent>>,
461 parent_session_id: Option<String>,
462 ) -> String {
463 let task_id = format!("task-{}", uuid::Uuid::new_v4());
464 let session_id = format!("task-run-{}", task_id);
465
466 if let Some(ref tx) = event_tx {
467 let _ = tx.send(AgentEvent::SubagentStart {
468 task_id: task_id.clone(),
469 session_id,
470 parent_session_id: parent_session_id.clone().unwrap_or_default(),
471 agent: params.agent.clone(),
472 description: params.description.clone(),
473 });
474 }
475
476 let task_id_for_spawn = task_id.clone();
477 let task_id_for_log = task_id.clone();
478 tokio::spawn(async move {
479 if let Err(e) = self
480 .execute_with_task_id(
481 task_id_for_spawn,
482 params,
483 event_tx,
484 parent_session_id.as_deref(),
485 false,
486 )
487 .await
488 {
489 tracing::error!("Background task {} failed: {}", task_id_for_log, e);
490 }
491 });
492
493 task_id
494 }
495
496 pub async fn execute_parallel(
501 self: &Arc<Self>,
502 tasks: Vec<TaskParams>,
503 event_tx: Option<broadcast::Sender<AgentEvent>>,
504 parent_session_id: Option<&str>,
505 ) -> Vec<TaskResult> {
506 let fallback_agents = tasks
507 .iter()
508 .map(|params| params.agent.clone())
509 .collect::<Vec<_>>();
510 let executor = Arc::clone(self);
511 let parent_session_id = parent_session_id.map(|s| s.to_string());
512 let results = crate::ordered_parallel::run_ordered_parallel_with_limit(
513 tasks,
514 self.max_parallel_tasks,
515 move |_idx, params| {
516 let executor = Arc::clone(&executor);
517 let tx = event_tx.clone();
518 let parent = parent_session_id.clone();
519 async move {
520 match executor
521 .execute(params.clone(), tx, parent.as_deref())
522 .await
523 {
524 Ok(result) => result,
525 Err(e) => TaskResult {
526 output: format!("Task failed: {}", e),
527 session_id: String::new(),
528 agent: params.agent,
529 success: false,
530 task_id: format!("task-{}", uuid::Uuid::new_v4()),
531 },
532 }
533 }
534 },
535 )
536 .await;
537
538 results
539 .into_iter()
540 .map(|result| match result.output {
541 Ok(task_result) => task_result,
542 Err(error) => {
543 tracing::error!("Parallel task failed: {}", error);
544 TaskResult {
545 output: format!("Task failed: {}", error),
546 session_id: String::new(),
547 agent: fallback_agents
548 .get(result.index)
549 .cloned()
550 .unwrap_or_else(|| "unknown".to_string()),
551 success: false,
552 task_id: format!("task-{}", uuid::Uuid::new_v4()),
553 }
554 }
555 })
556 .collect()
557 }
558}
559
560pub fn task_params_schema() -> serde_json::Value {
562 serde_json::json!({
563 "type": "object",
564 "additionalProperties": false,
565 "properties": {
566 "agent": {
567 "type": "string",
568 "description": "Required. Canonical agent type to use (for example: explore, general, plan, verification, review). Always provide this exact field name: 'agent'."
569 },
570 "description": {
571 "type": "string",
572 "description": "Required. Short task label for display and tracking. Always provide this exact field name: 'description'."
573 },
574 "prompt": {
575 "type": "string",
576 "description": "Required. Detailed instruction for the delegated child run. Always provide this exact field name: 'prompt'."
577 },
578 "background": {
579 "type": "boolean",
580 "description": "Optional. Run the task in the background. Default: false.",
581 "default": false
582 },
583 "max_steps": {
584 "type": "integer",
585 "description": "Optional. Maximum number of steps for this task."
586 }
587 },
588 "required": ["agent", "description", "prompt"],
589 "examples": [
590 {
591 "agent": "explore",
592 "description": "Find Rust files",
593 "prompt": "Search the workspace for Rust files and summarize the layout."
594 },
595 {
596 "agent": "general",
597 "description": "Investigate test failure",
598 "prompt": "Inspect the failing tests and explain the root cause.",
599 "max_steps": 6
600 }
601 ]
602 })
603}
604
605pub struct TaskTool {
608 executor: Arc<TaskExecutor>,
609}
610
611impl TaskTool {
612 pub fn new(executor: Arc<TaskExecutor>) -> Self {
614 Self { executor }
615 }
616}
617
618#[async_trait]
619impl Tool for TaskTool {
620 fn name(&self) -> &str {
621 "task"
622 }
623
624 fn description(&self) -> &str {
625 "Delegate a bounded task to a specialized child run. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
626 }
627
628 fn parameters(&self) -> serde_json::Value {
629 task_params_schema()
630 }
631
632 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
633 let params: TaskParams =
634 serde_json::from_value(args.clone()).context("Invalid task parameters")?;
635
636 if params.background {
637 let task_id = Arc::clone(&self.executor).execute_background(
638 params,
639 ctx.agent_event_tx.clone(),
640 ctx.session_id.clone(),
641 );
642 return Ok(ToolOutput::success(format!(
643 "Task started in background. Task ID: {}",
644 task_id
645 )));
646 }
647
648 let result = self
649 .executor
650 .execute(
651 params,
652 ctx.agent_event_tx.clone(),
653 ctx.session_id.as_deref(),
654 )
655 .await?;
656 let (content, truncated) = format_task_result_for_context(&result);
657 let metadata = serde_json::json!({
658 "task_id": result.task_id,
659 "session_id": result.session_id,
660 "agent": result.agent,
661 "success": result.success,
662 "output_bytes": result.output.len(),
663 "truncated_for_context": truncated,
664 "artifact_id": task_artifact_id(&result),
665 "artifact_uri": task_artifact_uri(&result),
666 });
667
668 if result.success {
669 Ok(ToolOutput::success(content).with_metadata(metadata))
670 } else {
671 Ok(ToolOutput::error(content).with_metadata(metadata))
672 }
673 }
674}
675
676#[derive(Debug, Clone, Serialize, Deserialize)]
678#[serde(deny_unknown_fields)]
679pub struct ParallelTaskParams {
680 pub tasks: Vec<TaskParams>,
682}
683
684pub fn parallel_task_params_schema() -> serde_json::Value {
686 serde_json::json!({
687 "type": "object",
688 "additionalProperties": false,
689 "properties": {
690 "tasks": {
691 "type": "array",
692 "description": "List of tasks to execute in parallel. Each task runs as an independent delegated child run concurrently.",
693 "items": {
694 "type": "object",
695 "additionalProperties": false,
696 "properties": {
697 "agent": {
698 "type": "string",
699 "description": "Required. Canonical agent type for this task."
700 },
701 "description": {
702 "type": "string",
703 "description": "Required. Short task label for display and tracking."
704 },
705 "prompt": {
706 "type": "string",
707 "description": "Required. Detailed instruction for the delegated child run."
708 }
709 },
710 "required": ["agent", "description", "prompt"]
711 },
712 "minItems": 1
713 }
714 },
715 "required": ["tasks"],
716 "examples": [
717 {
718 "tasks": [
719 {
720 "agent": "explore",
721 "description": "Find Rust files",
722 "prompt": "List Rust files under src/."
723 },
724 {
725 "agent": "explore",
726 "description": "Find tests",
727 "prompt": "List test files and summarize their purpose."
728 }
729 ]
730 }
731 ]
732 })
733}
734
735pub struct ParallelTaskTool {
739 executor: Arc<TaskExecutor>,
740}
741
742impl ParallelTaskTool {
743 pub fn new(executor: Arc<TaskExecutor>) -> Self {
745 Self { executor }
746 }
747}
748
749#[async_trait]
750impl Tool for ParallelTaskTool {
751 fn name(&self) -> &str {
752 "parallel_task"
753 }
754
755 fn description(&self) -> &str {
756 "Execute multiple delegated child runs in parallel. All tasks run concurrently and results are returned when all complete. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
757 }
758
759 fn parameters(&self) -> serde_json::Value {
760 parallel_task_params_schema()
761 }
762
763 async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
764 let params: ParallelTaskParams =
765 serde_json::from_value(args.clone()).context("Invalid parallel task parameters")?;
766
767 if params.tasks.is_empty() {
768 return Ok(ToolOutput::error("No tasks provided".to_string()));
769 }
770
771 let task_count = params.tasks.len();
772
773 let results = self
774 .executor
775 .execute_parallel(
776 params.tasks,
777 ctx.agent_event_tx.clone(),
778 ctx.session_id.as_deref(),
779 )
780 .await;
781
782 let mut output = format!("Executed {} tasks in parallel:\n\n", task_count);
784 let mut metadata_results = Vec::new();
785 for (i, result) in results.iter().enumerate() {
786 let status = if result.success { "[OK]" } else { "[ERR]" };
787 let (formatted, truncated) = format_task_result_for_context(result);
788 metadata_results.push(serde_json::json!({
789 "task_id": result.task_id,
790 "session_id": result.session_id,
791 "agent": result.agent,
792 "success": result.success,
793 "output": formatted.clone(),
794 "output_bytes": result.output.len(),
795 "truncated_for_context": truncated,
796 "artifact_id": task_artifact_id(result),
797 "artifact_uri": task_artifact_uri(result),
798 }));
799 output.push_str(&format!(
800 "--- Task {} ({}) {} ---\n{}\n\n",
801 i + 1,
802 result.agent,
803 status,
804 formatted
805 ));
806 }
807
808 let all_success = results.iter().all(|result| result.success);
809 let output = if all_success {
810 ToolOutput::success(output)
811 } else {
812 ToolOutput::error(output)
813 };
814
815 Ok(output.with_metadata(serde_json::json!({
816 "task_count": task_count,
817 "results": metadata_results,
818 })))
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825
826 #[test]
827 fn test_task_params_deserialize() {
828 let json = r#"{
829 "agent": "explore",
830 "description": "Find auth code",
831 "prompt": "Search for authentication files"
832 }"#;
833
834 let params: TaskParams = serde_json::from_str(json).unwrap();
835 assert_eq!(params.agent, "explore");
836 assert_eq!(params.description, "Find auth code");
837 assert!(!params.background);
838 }
839
840 #[test]
841 fn test_task_params_with_background() {
842 let json = r#"{
843 "agent": "general",
844 "description": "Long task",
845 "prompt": "Do something complex",
846 "background": true
847 }"#;
848
849 let params: TaskParams = serde_json::from_str(json).unwrap();
850 assert!(params.background);
851 }
852
853 #[test]
854 fn test_task_params_with_max_steps() {
855 let json = r#"{
856 "agent": "plan",
857 "description": "Planning task",
858 "prompt": "Create a plan",
859 "max_steps": 10
860 }"#;
861
862 let params: TaskParams = serde_json::from_str(json).unwrap();
863 assert_eq!(params.agent, "plan");
864 assert_eq!(params.max_steps, Some(10));
865 assert!(!params.background);
866 }
867
868 #[test]
869 fn test_task_params_all_fields() {
870 let json = r#"{
871 "agent": "general",
872 "description": "Complex task",
873 "prompt": "Do everything",
874 "background": true,
875 "max_steps": 20
876 }"#;
877
878 let params: TaskParams = serde_json::from_str(json).unwrap();
879 assert_eq!(params.agent, "general");
880 assert_eq!(params.description, "Complex task");
881 assert_eq!(params.prompt, "Do everything");
882 assert!(params.background);
883 assert_eq!(params.max_steps, Some(20));
884 }
885
886 #[test]
887 fn test_task_params_missing_required_field() {
888 let json = r#"{
889 "agent": "explore",
890 "description": "Missing prompt"
891 }"#;
892
893 let result: Result<TaskParams, _> = serde_json::from_str(json);
894 assert!(result.is_err());
895 }
896
897 #[test]
898 fn test_task_params_serialize() {
899 let params = TaskParams {
900 agent: "explore".to_string(),
901 description: "Test task".to_string(),
902 prompt: "Test prompt".to_string(),
903 background: false,
904 max_steps: Some(5),
905 };
906
907 let json = serde_json::to_string(¶ms).unwrap();
908 assert!(json.contains("explore"));
909 assert!(json.contains("Test task"));
910 assert!(json.contains("Test prompt"));
911 }
912
913 #[test]
914 fn test_task_params_clone() {
915 let params = TaskParams {
916 agent: "explore".to_string(),
917 description: "Test".to_string(),
918 prompt: "Prompt".to_string(),
919 background: true,
920 max_steps: None,
921 };
922
923 let cloned = params.clone();
924 assert_eq!(params.agent, cloned.agent);
925 assert_eq!(params.description, cloned.description);
926 assert_eq!(params.background, cloned.background);
927 }
928
929 #[test]
930 fn test_task_result_serialize() {
931 let result = TaskResult {
932 output: "Found 5 files".to_string(),
933 session_id: "session-123".to_string(),
934 agent: "explore".to_string(),
935 success: true,
936 task_id: "task-456".to_string(),
937 };
938
939 let json = serde_json::to_string(&result).unwrap();
940 assert!(json.contains("Found 5 files"));
941 assert!(json.contains("explore"));
942 }
943
944 #[test]
945 fn test_task_result_deserialize() {
946 let json = r#"{
947 "output": "Task completed",
948 "session_id": "sess-789",
949 "agent": "general",
950 "success": false,
951 "task_id": "task-123"
952 }"#;
953
954 let result: TaskResult = serde_json::from_str(json).unwrap();
955 assert_eq!(result.output, "Task completed");
956 assert_eq!(result.session_id, "sess-789");
957 assert_eq!(result.agent, "general");
958 assert!(!result.success);
959 assert_eq!(result.task_id, "task-123");
960 }
961
962 #[test]
963 fn test_task_result_clone() {
964 let result = TaskResult {
965 output: "Output".to_string(),
966 session_id: "session-1".to_string(),
967 agent: "explore".to_string(),
968 success: true,
969 task_id: "task-1".to_string(),
970 };
971
972 let cloned = result.clone();
973 assert_eq!(result.output, cloned.output);
974 assert_eq!(result.success, cloned.success);
975 }
976
977 #[test]
978 fn test_compact_task_output_preserves_small_output() {
979 let (output, truncated) = compact_task_output("short result");
980 assert_eq!(output, "short result");
981 assert!(!truncated);
982 }
983
984 #[test]
985 fn test_format_task_result_for_context_truncates_large_output() {
986 let result = TaskResult {
987 output: format!("{}TAIL", "x".repeat(TASK_OUTPUT_CONTEXT_LIMIT + 500)),
988 session_id: "session-1".to_string(),
989 agent: "explore".to_string(),
990 success: true,
991 task_id: "task-1".to_string(),
992 };
993
994 let (formatted, truncated) = format_task_result_for_context(&result);
995 assert!(truncated);
996 assert!(formatted.contains("Output excerpt"));
997 assert!(formatted.contains("bytes omitted"));
998 assert!(formatted.contains("Artifact ID: task-output:task-1"));
999 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1000 assert!(formatted.contains("TAIL"));
1001 assert!(formatted.len() < result.output.len());
1002 }
1003
1004 #[test]
1005 fn test_task_artifact_reference_is_stable() {
1006 let result = TaskResult {
1007 output: "done".to_string(),
1008 session_id: "session-1".to_string(),
1009 agent: "explore".to_string(),
1010 success: true,
1011 task_id: "task-1".to_string(),
1012 };
1013
1014 assert_eq!(task_artifact_id(&result), "task-output:task-1");
1015 assert_eq!(
1016 task_artifact_uri(&result),
1017 "a3s://tasks/session-1/runs/task-1/output"
1018 );
1019
1020 let (formatted, truncated) = format_task_result_for_context(&result);
1021 assert!(!truncated);
1022 assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1023 }
1024
1025 #[test]
1026 fn test_task_params_schema() {
1027 let schema = task_params_schema();
1028 assert_eq!(schema["type"], "object");
1029 assert_eq!(schema["additionalProperties"], false);
1030 assert!(schema["properties"]["agent"].is_object());
1031 assert!(schema["properties"]["prompt"].is_object());
1032 }
1033
1034 #[test]
1035 fn test_task_params_schema_required_fields() {
1036 let schema = task_params_schema();
1037 let required = schema["required"].as_array().unwrap();
1038 assert!(required.contains(&serde_json::json!("agent")));
1039 assert!(required.contains(&serde_json::json!("description")));
1040 assert!(required.contains(&serde_json::json!("prompt")));
1041 }
1042
1043 #[test]
1044 fn test_task_params_schema_properties() {
1045 let schema = task_params_schema();
1046 let props = &schema["properties"];
1047
1048 assert_eq!(props["agent"]["type"], "string");
1049 assert_eq!(props["description"]["type"], "string");
1050 assert_eq!(props["prompt"]["type"], "string");
1051 assert_eq!(props["background"]["type"], "boolean");
1052 assert_eq!(props["background"]["default"], false);
1053 assert_eq!(props["max_steps"]["type"], "integer");
1054 }
1055
1056 #[test]
1057 fn test_task_params_schema_descriptions() {
1058 let schema = task_params_schema();
1059 let props = &schema["properties"];
1060
1061 assert!(props["agent"]["description"].is_string());
1062 assert!(props["description"]["description"].is_string());
1063 assert!(props["prompt"]["description"].is_string());
1064 assert!(props["background"]["description"].is_string());
1065 assert!(props["max_steps"]["description"].is_string());
1066 }
1067
1068 #[test]
1069 fn test_task_params_default_background() {
1070 let params = TaskParams {
1071 agent: "explore".to_string(),
1072 description: "Test".to_string(),
1073 prompt: "Test prompt".to_string(),
1074 background: false,
1075 max_steps: None,
1076 };
1077 assert!(!params.background);
1078 }
1079
1080 #[test]
1081 fn test_task_params_serialize_skip_none() {
1082 let params = TaskParams {
1083 agent: "explore".to_string(),
1084 description: "Test".to_string(),
1085 prompt: "Test prompt".to_string(),
1086 background: false,
1087 max_steps: None,
1088 };
1089 let json = serde_json::to_string(¶ms).unwrap();
1090 assert!(!json.contains("max_steps"));
1092 }
1093
1094 #[test]
1095 fn test_task_params_serialize_with_max_steps() {
1096 let params = TaskParams {
1097 agent: "explore".to_string(),
1098 description: "Test".to_string(),
1099 prompt: "Test prompt".to_string(),
1100 background: false,
1101 max_steps: Some(15),
1102 };
1103 let json = serde_json::to_string(¶ms).unwrap();
1104 assert!(json.contains("max_steps"));
1105 assert!(json.contains("15"));
1106 }
1107
1108 #[test]
1109 fn test_task_result_success_true() {
1110 let result = TaskResult {
1111 output: "Success".to_string(),
1112 session_id: "sess-1".to_string(),
1113 agent: "explore".to_string(),
1114 success: true,
1115 task_id: "task-1".to_string(),
1116 };
1117 assert!(result.success);
1118 }
1119
1120 #[test]
1121 fn test_task_result_success_false() {
1122 let result = TaskResult {
1123 output: "Failed".to_string(),
1124 session_id: "sess-1".to_string(),
1125 agent: "explore".to_string(),
1126 success: false,
1127 task_id: "task-1".to_string(),
1128 };
1129 assert!(!result.success);
1130 }
1131
1132 #[test]
1133 fn test_task_params_empty_strings() {
1134 let params = TaskParams {
1135 agent: "".to_string(),
1136 description: "".to_string(),
1137 prompt: "".to_string(),
1138 background: false,
1139 max_steps: None,
1140 };
1141 let json = serde_json::to_string(¶ms).unwrap();
1142 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1143 assert_eq!(deserialized.agent, "");
1144 assert_eq!(deserialized.description, "");
1145 assert_eq!(deserialized.prompt, "");
1146 }
1147
1148 #[test]
1149 fn test_task_result_empty_output() {
1150 let result = TaskResult {
1151 output: "".to_string(),
1152 session_id: "sess-1".to_string(),
1153 agent: "explore".to_string(),
1154 success: true,
1155 task_id: "task-1".to_string(),
1156 };
1157 assert_eq!(result.output, "");
1158 }
1159
1160 #[test]
1161 fn test_task_params_debug_format() {
1162 let params = TaskParams {
1163 agent: "explore".to_string(),
1164 description: "Test".to_string(),
1165 prompt: "Test prompt".to_string(),
1166 background: false,
1167 max_steps: None,
1168 };
1169 let debug_str = format!("{:?}", params);
1170 assert!(debug_str.contains("explore"));
1171 assert!(debug_str.contains("Test"));
1172 }
1173
1174 #[test]
1175 fn test_task_result_debug_format() {
1176 let result = TaskResult {
1177 output: "Output".to_string(),
1178 session_id: "sess-1".to_string(),
1179 agent: "explore".to_string(),
1180 success: true,
1181 task_id: "task-1".to_string(),
1182 };
1183 let debug_str = format!("{:?}", result);
1184 assert!(debug_str.contains("Output"));
1185 assert!(debug_str.contains("explore"));
1186 }
1187
1188 #[test]
1189 fn test_task_params_roundtrip() {
1190 let original = TaskParams {
1191 agent: "general".to_string(),
1192 description: "Roundtrip test".to_string(),
1193 prompt: "Test roundtrip serialization".to_string(),
1194 background: true,
1195 max_steps: Some(42),
1196 };
1197 let json = serde_json::to_string(&original).unwrap();
1198 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1199 assert_eq!(original.agent, deserialized.agent);
1200 assert_eq!(original.description, deserialized.description);
1201 assert_eq!(original.prompt, deserialized.prompt);
1202 assert_eq!(original.background, deserialized.background);
1203 assert_eq!(original.max_steps, deserialized.max_steps);
1204 }
1205
1206 #[test]
1207 fn test_task_result_roundtrip() {
1208 let original = TaskResult {
1209 output: "Roundtrip output".to_string(),
1210 session_id: "sess-roundtrip".to_string(),
1211 agent: "plan".to_string(),
1212 success: false,
1213 task_id: "task-roundtrip".to_string(),
1214 };
1215 let json = serde_json::to_string(&original).unwrap();
1216 let deserialized: TaskResult = serde_json::from_str(&json).unwrap();
1217 assert_eq!(original.output, deserialized.output);
1218 assert_eq!(original.session_id, deserialized.session_id);
1219 assert_eq!(original.agent, deserialized.agent);
1220 assert_eq!(original.success, deserialized.success);
1221 assert_eq!(original.task_id, deserialized.task_id);
1222 }
1223
1224 #[test]
1225 fn test_parallel_task_params_deserialize() {
1226 let json = r#"{
1227 "tasks": [
1228 { "agent": "explore", "description": "Find auth", "prompt": "Search auth files" },
1229 { "agent": "general", "description": "Fix bug", "prompt": "Fix the login bug" }
1230 ]
1231 }"#;
1232
1233 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1234 assert_eq!(params.tasks.len(), 2);
1235 assert_eq!(params.tasks[0].agent, "explore");
1236 assert_eq!(params.tasks[1].agent, "general");
1237 }
1238
1239 #[test]
1240 fn test_parallel_task_params_single_task() {
1241 let json = r#"{
1242 "tasks": [
1243 { "agent": "plan", "description": "Plan work", "prompt": "Create a plan" }
1244 ]
1245 }"#;
1246
1247 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1248 assert_eq!(params.tasks.len(), 1);
1249 }
1250
1251 #[test]
1252 fn test_parallel_task_params_empty_tasks() {
1253 let json = r#"{ "tasks": [] }"#;
1254 let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1255 assert!(params.tasks.is_empty());
1256 }
1257
1258 #[test]
1259 fn test_parallel_task_params_missing_tasks() {
1260 let json = r#"{}"#;
1261 let result: Result<ParallelTaskParams, _> = serde_json::from_str(json);
1262 assert!(result.is_err());
1263 }
1264
1265 #[test]
1266 fn test_parallel_task_params_serialize() {
1267 let params = ParallelTaskParams {
1268 tasks: vec![
1269 TaskParams {
1270 agent: "explore".to_string(),
1271 description: "Task 1".to_string(),
1272 prompt: "Prompt 1".to_string(),
1273 background: false,
1274 max_steps: None,
1275 },
1276 TaskParams {
1277 agent: "general".to_string(),
1278 description: "Task 2".to_string(),
1279 prompt: "Prompt 2".to_string(),
1280 background: false,
1281 max_steps: Some(10),
1282 },
1283 ],
1284 };
1285 let json = serde_json::to_string(¶ms).unwrap();
1286 assert!(json.contains("explore"));
1287 assert!(json.contains("general"));
1288 assert!(json.contains("Prompt 1"));
1289 assert!(json.contains("Prompt 2"));
1290 }
1291
1292 #[test]
1293 fn test_parallel_task_params_roundtrip() {
1294 let original = ParallelTaskParams {
1295 tasks: vec![
1296 TaskParams {
1297 agent: "explore".to_string(),
1298 description: "Explore".to_string(),
1299 prompt: "Find files".to_string(),
1300 background: false,
1301 max_steps: None,
1302 },
1303 TaskParams {
1304 agent: "plan".to_string(),
1305 description: "Plan".to_string(),
1306 prompt: "Make plan".to_string(),
1307 background: false,
1308 max_steps: Some(5),
1309 },
1310 ],
1311 };
1312 let json = serde_json::to_string(&original).unwrap();
1313 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1314 assert_eq!(original.tasks.len(), deserialized.tasks.len());
1315 assert_eq!(original.tasks[0].agent, deserialized.tasks[0].agent);
1316 assert_eq!(original.tasks[1].agent, deserialized.tasks[1].agent);
1317 assert_eq!(original.tasks[1].max_steps, deserialized.tasks[1].max_steps);
1318 }
1319
1320 #[test]
1321 fn test_parallel_task_params_clone() {
1322 let params = ParallelTaskParams {
1323 tasks: vec![TaskParams {
1324 agent: "explore".to_string(),
1325 description: "Test".to_string(),
1326 prompt: "Prompt".to_string(),
1327 background: false,
1328 max_steps: None,
1329 }],
1330 };
1331 let cloned = params.clone();
1332 assert_eq!(params.tasks.len(), cloned.tasks.len());
1333 assert_eq!(params.tasks[0].agent, cloned.tasks[0].agent);
1334 }
1335
1336 #[test]
1337 fn test_parallel_task_params_schema() {
1338 let schema = parallel_task_params_schema();
1339 assert_eq!(schema["type"], "object");
1340 assert_eq!(schema["additionalProperties"], false);
1341 assert!(schema["properties"]["tasks"].is_object());
1342 assert_eq!(schema["properties"]["tasks"]["type"], "array");
1343 assert_eq!(schema["properties"]["tasks"]["minItems"], 1);
1344 }
1345
1346 #[test]
1347 fn test_parallel_task_params_schema_required() {
1348 let schema = parallel_task_params_schema();
1349 let required = schema["required"].as_array().unwrap();
1350 assert!(required.contains(&serde_json::json!("tasks")));
1351 }
1352
1353 #[test]
1354 fn test_parallel_task_params_schema_items() {
1355 let schema = parallel_task_params_schema();
1356 let items = &schema["properties"]["tasks"]["items"];
1357 assert_eq!(items["type"], "object");
1358 assert_eq!(items["additionalProperties"], false);
1359 let item_required = items["required"].as_array().unwrap();
1360 assert!(item_required.contains(&serde_json::json!("agent")));
1361 assert!(item_required.contains(&serde_json::json!("description")));
1362 assert!(item_required.contains(&serde_json::json!("prompt")));
1363 }
1364
1365 #[test]
1366 fn test_task_schema_examples_use_delegation_core() {
1367 let task = task_params_schema();
1368 let task_examples = task["examples"].as_array().unwrap();
1369 assert_eq!(task_examples[0]["agent"], "explore");
1370 assert!(task_examples[0].get("task").is_none());
1371
1372 let parallel = parallel_task_params_schema();
1373 let parallel_examples = parallel["examples"].as_array().unwrap();
1374 assert!(!parallel_examples[0]["tasks"].as_array().unwrap().is_empty());
1375 }
1376
1377 #[test]
1378 fn test_parallel_task_params_debug() {
1379 let params = ParallelTaskParams {
1380 tasks: vec![TaskParams {
1381 agent: "explore".to_string(),
1382 description: "Debug test".to_string(),
1383 prompt: "Test".to_string(),
1384 background: false,
1385 max_steps: None,
1386 }],
1387 };
1388 let debug_str = format!("{:?}", params);
1389 assert!(debug_str.contains("explore"));
1390 assert!(debug_str.contains("Debug test"));
1391 }
1392
1393 #[test]
1394 fn test_parallel_task_params_large_count() {
1395 let tasks: Vec<TaskParams> = (0..150)
1397 .map(|i| TaskParams {
1398 agent: "explore".to_string(),
1399 description: format!("Task {}", i),
1400 prompt: format!("Prompt for task {}", i),
1401 background: false,
1402 max_steps: Some(10),
1403 })
1404 .collect();
1405
1406 let params = ParallelTaskParams { tasks };
1407 let json = serde_json::to_string(¶ms).unwrap();
1408 let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1409 assert_eq!(deserialized.tasks.len(), 150);
1410 assert_eq!(deserialized.tasks[0].description, "Task 0");
1411 assert_eq!(deserialized.tasks[149].description, "Task 149");
1412 }
1413
1414 #[test]
1415 fn test_task_params_max_steps_zero() {
1416 let params = TaskParams {
1418 agent: "explore".to_string(),
1419 description: "Edge case".to_string(),
1420 prompt: "Zero steps".to_string(),
1421 background: false,
1422 max_steps: Some(0),
1423 };
1424 let json = serde_json::to_string(¶ms).unwrap();
1425 let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1426 assert_eq!(deserialized.max_steps, Some(0));
1427 }
1428
1429 #[test]
1430 fn test_parallel_task_params_all_background() {
1431 let tasks: Vec<TaskParams> = (0..5)
1432 .map(|i| TaskParams {
1433 agent: "general".to_string(),
1434 description: format!("BG task {}", i),
1435 prompt: "Run in background".to_string(),
1436 background: true,
1437 max_steps: None,
1438 })
1439 .collect();
1440 let params = ParallelTaskParams { tasks };
1441 for task in ¶ms.tasks {
1442 assert!(task.background);
1443 }
1444 }
1445
1446 #[test]
1447 fn test_task_params_rejects_permissive_field() {
1448 let json = r#"{
1449 "agent": "general",
1450 "description": "Legacy field rejection",
1451 "prompt": "Verify legacy fields are rejected",
1452 "permissive": true
1453 }"#;
1454
1455 let result: Result<TaskParams, _> = serde_json::from_str(json);
1456 assert!(result.is_err());
1457 }
1458
1459 #[test]
1460 fn test_task_params_schema_hides_permissive_field() {
1461 let schema = task_params_schema();
1462 let props = &schema["properties"];
1463
1464 assert!(props.get("permissive").is_none());
1465 }
1466
1467 use crate::agent::tests::MockLlmClient;
1472 use crate::llm::{ContentBlock, LlmResponse, Message, StreamEvent, TokenUsage, ToolDefinition};
1473 use crate::permissions::PermissionPolicy;
1474 use crate::subagent::AgentRegistry;
1475 use std::sync::atomic::{AtomicUsize, Ordering};
1476 use std::time::Duration;
1477 use tokio::sync::{mpsc, Barrier};
1478
1479 fn text_response(text: impl Into<String>) -> LlmResponse {
1480 LlmResponse {
1481 message: Message {
1482 role: "assistant".to_string(),
1483 content: vec![ContentBlock::Text { text: text.into() }],
1484 reasoning_content: None,
1485 },
1486 usage: TokenUsage {
1487 prompt_tokens: 10,
1488 completion_tokens: 5,
1489 total_tokens: 15,
1490 cache_read_tokens: None,
1491 cache_write_tokens: None,
1492 },
1493 stop_reason: Some("end_turn".to_string()),
1494 meta: None,
1495 }
1496 }
1497
1498 fn pre_analysis_response(messages: &[Message]) -> LlmResponse {
1499 let prompt = last_text(messages);
1500 let response = serde_json::json!({
1501 "intent": "GeneralPurpose",
1502 "requires_planning": false,
1503 "goal": {
1504 "description": prompt,
1505 "success_criteria": []
1506 },
1507 "execution_plan": {
1508 "complexity": "Simple",
1509 "steps": [{
1510 "id": "step-1",
1511 "description": prompt,
1512 "tool": null,
1513 "dependencies": [],
1514 "success_criteria": "Complete the request"
1515 }],
1516 "required_tools": []
1517 },
1518 "optimized_input": prompt
1519 });
1520 text_response(response.to_string())
1521 }
1522
1523 fn last_text(messages: &[Message]) -> String {
1524 messages
1525 .last()
1526 .and_then(|message| {
1527 message.content.iter().find_map(|block| {
1528 if let ContentBlock::Text { text } = block {
1529 Some(text.clone())
1530 } else {
1531 None
1532 }
1533 })
1534 })
1535 .unwrap_or_default()
1536 }
1537
1538 struct StaticLlmClient {
1539 text: String,
1540 }
1541
1542 impl StaticLlmClient {
1543 fn new(text: impl Into<String>) -> Self {
1544 Self { text: text.into() }
1545 }
1546 }
1547
1548 #[async_trait::async_trait]
1549 impl LlmClient for StaticLlmClient {
1550 async fn complete(
1551 &self,
1552 messages: &[Message],
1553 system: Option<&str>,
1554 _tools: &[ToolDefinition],
1555 ) -> Result<LlmResponse> {
1556 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1557 return Ok(pre_analysis_response(messages));
1558 }
1559 Ok(text_response(self.text.clone()))
1560 }
1561
1562 async fn complete_streaming(
1563 &self,
1564 _messages: &[Message],
1565 _system: Option<&str>,
1566 _tools: &[ToolDefinition],
1567 _cancel_token: tokio_util::sync::CancellationToken,
1568 ) -> Result<mpsc::Receiver<StreamEvent>> {
1569 anyhow::bail!("streaming is not used by task executor tests")
1570 }
1571 }
1572
1573 struct ConcurrentLlmClient {
1574 barrier: Arc<Barrier>,
1575 active: AtomicUsize,
1576 max_active: AtomicUsize,
1577 }
1578
1579 impl ConcurrentLlmClient {
1580 fn new(task_count: usize) -> Self {
1581 Self {
1582 barrier: Arc::new(Barrier::new(task_count)),
1583 active: AtomicUsize::new(0),
1584 max_active: AtomicUsize::new(0),
1585 }
1586 }
1587
1588 fn max_active(&self) -> usize {
1589 self.max_active.load(Ordering::SeqCst)
1590 }
1591
1592 fn record_active(&self) {
1593 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1594 let mut observed = self.max_active.load(Ordering::SeqCst);
1595 while active > observed {
1596 match self.max_active.compare_exchange(
1597 observed,
1598 active,
1599 Ordering::SeqCst,
1600 Ordering::SeqCst,
1601 ) {
1602 Ok(_) => break,
1603 Err(next) => observed = next,
1604 }
1605 }
1606 }
1607 }
1608
1609 struct LimitedConcurrencyLlmClient {
1610 active: AtomicUsize,
1611 max_active: AtomicUsize,
1612 }
1613
1614 impl LimitedConcurrencyLlmClient {
1615 fn new() -> Self {
1616 Self {
1617 active: AtomicUsize::new(0),
1618 max_active: AtomicUsize::new(0),
1619 }
1620 }
1621
1622 fn max_active(&self) -> usize {
1623 self.max_active.load(Ordering::SeqCst)
1624 }
1625
1626 fn record_active(&self) {
1627 let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1628 self.max_active.fetch_max(active, Ordering::SeqCst);
1629 }
1630 }
1631
1632 #[async_trait::async_trait]
1633 impl LlmClient for LimitedConcurrencyLlmClient {
1634 async fn complete(
1635 &self,
1636 messages: &[Message],
1637 system: Option<&str>,
1638 _tools: &[ToolDefinition],
1639 ) -> Result<LlmResponse> {
1640 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1641 return Ok(pre_analysis_response(messages));
1642 }
1643
1644 let prompt = last_text(messages);
1645 self.record_active();
1646 tokio::time::sleep(Duration::from_millis(40)).await;
1647 self.active.fetch_sub(1, Ordering::SeqCst);
1648 Ok(text_response(format!("completed: {prompt}")))
1649 }
1650
1651 async fn complete_streaming(
1652 &self,
1653 _messages: &[Message],
1654 _system: Option<&str>,
1655 _tools: &[ToolDefinition],
1656 _cancel_token: tokio_util::sync::CancellationToken,
1657 ) -> Result<mpsc::Receiver<StreamEvent>> {
1658 anyhow::bail!("streaming is not used by task executor tests")
1659 }
1660 }
1661
1662 #[async_trait::async_trait]
1663 impl LlmClient for ConcurrentLlmClient {
1664 async fn complete(
1665 &self,
1666 messages: &[Message],
1667 system: Option<&str>,
1668 _tools: &[ToolDefinition],
1669 ) -> Result<LlmResponse> {
1670 if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1671 return Ok(pre_analysis_response(messages));
1672 }
1673
1674 let prompt = last_text(messages);
1675 self.record_active();
1676 self.barrier.wait().await;
1677 if prompt.contains("slow") {
1678 tokio::time::sleep(Duration::from_millis(120)).await;
1679 } else {
1680 tokio::time::sleep(Duration::from_millis(10)).await;
1681 }
1682 self.active.fetch_sub(1, Ordering::SeqCst);
1683 Ok(text_response(format!("completed: {prompt}")))
1684 }
1685
1686 async fn complete_streaming(
1687 &self,
1688 _messages: &[Message],
1689 _system: Option<&str>,
1690 _tools: &[ToolDefinition],
1691 _cancel_token: tokio_util::sync::CancellationToken,
1692 ) -> Result<mpsc::Receiver<StreamEvent>> {
1693 anyhow::bail!("streaming is not used by task executor tests")
1694 }
1695 }
1696
1697 fn test_registry_with_writer() -> Arc<AgentRegistry> {
1698 let registry = AgentRegistry::new();
1699 let spec = crate::subagent::WorkerAgentSpec::custom("writer", "Write files")
1700 .with_permissions(PermissionPolicy::new().allow("write(*)").allow("read(*)"))
1701 .with_prompt("Write files when asked.")
1702 .with_max_steps(3);
1703 registry.register(spec.into_agent_definition());
1704 Arc::new(registry)
1705 }
1706
1707 fn test_registry_with_text_worker() -> Arc<AgentRegistry> {
1708 let registry = AgentRegistry::new();
1709 let spec = crate::subagent::WorkerAgentSpec::custom("worker", "Text worker")
1710 .with_prompt("Return a concise result.")
1711 .with_max_steps(1);
1712 registry.register(spec.into_agent_definition());
1713 Arc::new(registry)
1714 }
1715
1716 #[tokio::test]
1717 async fn task_child_run_permission_allow() {
1718 let workspace = tempfile::tempdir().unwrap();
1719 let mock = Arc::new(MockLlmClient::new(vec![
1720 MockLlmClient::tool_call_response(
1721 "t1",
1722 "write",
1723 serde_json::json!({
1724 "file_path": workspace.path().join("out.txt").to_string_lossy(),
1725 "content": "WRITTEN"
1726 }),
1727 ),
1728 MockLlmClient::text_response("Done."),
1729 ]));
1730
1731 let executor = TaskExecutor::new(
1732 test_registry_with_writer(),
1733 mock,
1734 workspace.path().to_string_lossy().to_string(),
1735 );
1736
1737 let result = executor
1738 .execute(
1739 TaskParams {
1740 agent: "writer".to_string(),
1741 description: "Write file".to_string(),
1742 prompt: "Write out.txt".to_string(),
1743 background: false,
1744 max_steps: Some(3),
1745 },
1746 None,
1747 None,
1748 )
1749 .await
1750 .unwrap();
1751
1752 assert!(
1753 result.success,
1754 "child run should succeed: {}",
1755 result.output
1756 );
1757 assert!(
1758 !result.output.contains("Permission denied"),
1759 "no permission denial: {}",
1760 result.output
1761 );
1762 let content = std::fs::read_to_string(workspace.path().join("out.txt")).unwrap();
1763 assert_eq!(content, "WRITTEN");
1764 }
1765
1766 #[tokio::test]
1767 async fn task_child_run_permission_deny() {
1768 let workspace = tempfile::tempdir().unwrap();
1769 let registry = AgentRegistry::new();
1770 let spec = crate::subagent::WorkerAgentSpec::custom("restricted", "Restricted agent")
1771 .with_permissions(PermissionPolicy::new().allow("read(*)").deny("bash(*)"))
1772 .with_max_steps(3);
1773 registry.register(spec.into_agent_definition());
1774
1775 let mock = Arc::new(MockLlmClient::new(vec![
1776 MockLlmClient::tool_call_response(
1777 "t1",
1778 "bash",
1779 serde_json::json!({"command": "echo hello"}),
1780 ),
1781 MockLlmClient::text_response("Could not run bash."),
1782 ]));
1783
1784 let executor = TaskExecutor::new(
1785 Arc::new(registry),
1786 mock,
1787 workspace.path().to_string_lossy().to_string(),
1788 );
1789
1790 let result = executor
1791 .execute(
1792 TaskParams {
1793 agent: "restricted".to_string(),
1794 description: "Try bash".to_string(),
1795 prompt: "Run echo hello".to_string(),
1796 background: false,
1797 max_steps: Some(3),
1798 },
1799 None,
1800 None,
1801 )
1802 .await
1803 .unwrap();
1804
1805 assert!(result.success, "agent should complete: {}", result.output);
1808 }
1809
1810 #[tokio::test]
1811 async fn task_child_run_confirmation_auto_approve() {
1812 let workspace = tempfile::tempdir().unwrap();
1813 let registry = AgentRegistry::new();
1814 let spec = crate::subagent::WorkerAgentSpec::custom("reader-writer", "Read and write")
1817 .with_permissions(PermissionPolicy::new().allow("read(*)"))
1818 .with_max_steps(3);
1819 registry.register(spec.into_agent_definition());
1820
1821 let mock = Arc::new(MockLlmClient::new(vec![
1822 MockLlmClient::tool_call_response(
1823 "t1",
1824 "write",
1825 serde_json::json!({
1826 "file_path": workspace.path().join("auto.txt").to_string_lossy(),
1827 "content": "AUTO_APPROVED"
1828 }),
1829 ),
1830 MockLlmClient::text_response("Written."),
1831 ]));
1832
1833 let executor = TaskExecutor::new(
1834 Arc::new(registry),
1835 mock,
1836 workspace.path().to_string_lossy().to_string(),
1837 );
1838
1839 let result = executor
1840 .execute(
1841 TaskParams {
1842 agent: "reader-writer".to_string(),
1843 description: "Write via auto-approve".to_string(),
1844 prompt: "Write auto.txt".to_string(),
1845 background: false,
1846 max_steps: Some(3),
1847 },
1848 None,
1849 None,
1850 )
1851 .await
1852 .unwrap();
1853
1854 assert!(
1855 result.success,
1856 "Ask should be auto-approved: {}",
1857 result.output
1858 );
1859 assert!(
1860 !result.output.contains("MissingConfirmationManager"),
1861 "no MissingConfirmationManager: {}",
1862 result.output
1863 );
1864 }
1865
1866 #[tokio::test]
1867 async fn task_child_run_step_budget_enforced() {
1868 let workspace = tempfile::tempdir().unwrap();
1869 let mock = Arc::new(MockLlmClient::new(vec![
1870 MockLlmClient::tool_call_response(
1871 "t1",
1872 "read",
1873 serde_json::json!({"file_path": "/tmp/a.txt"}),
1874 ),
1875 MockLlmClient::tool_call_response(
1876 "t2",
1877 "read",
1878 serde_json::json!({"file_path": "/tmp/b.txt"}),
1879 ),
1880 MockLlmClient::tool_call_response(
1881 "t3",
1882 "read",
1883 serde_json::json!({"file_path": "/tmp/c.txt"}),
1884 ),
1885 MockLlmClient::text_response("Should not reach here."),
1886 ]));
1887
1888 let executor = TaskExecutor::new(
1889 test_registry_with_writer(),
1890 mock,
1891 workspace.path().to_string_lossy().to_string(),
1892 );
1893
1894 let result = executor
1895 .execute(
1896 TaskParams {
1897 agent: "writer".to_string(),
1898 description: "Exceed budget".to_string(),
1899 prompt: "Read many files".to_string(),
1900 background: false,
1901 max_steps: Some(2),
1902 },
1903 None,
1904 None,
1905 )
1906 .await
1907 .unwrap();
1908
1909 assert!(
1911 !result.success,
1912 "should fail when exceeding step budget: {}",
1913 result.output
1914 );
1915 assert!(
1916 result.output.contains("Max tool rounds") || result.output.contains("max tool rounds"),
1917 "error should mention tool rounds: {}",
1918 result.output
1919 );
1920 }
1921
1922 #[tokio::test]
1923 async fn parallel_task_executor_runs_children_concurrently_and_preserves_input_order() {
1924 let workspace = tempfile::tempdir().unwrap();
1925 let client = Arc::new(ConcurrentLlmClient::new(2));
1926 let executor = Arc::new(TaskExecutor::new(
1927 test_registry_with_text_worker(),
1928 client.clone(),
1929 workspace.path().to_string_lossy().to_string(),
1930 ));
1931
1932 let tasks = vec![
1933 TaskParams {
1934 agent: "worker".to_string(),
1935 description: "Slow task".to_string(),
1936 prompt: "slow branch".to_string(),
1937 background: false,
1938 max_steps: Some(1),
1939 },
1940 TaskParams {
1941 agent: "worker".to_string(),
1942 description: "Fast task".to_string(),
1943 prompt: "fast branch".to_string(),
1944 background: false,
1945 max_steps: Some(1),
1946 },
1947 ];
1948
1949 let results = tokio::time::timeout(
1950 Duration::from_secs(2),
1951 executor.execute_parallel(tasks, None, None),
1952 )
1953 .await
1954 .expect("parallel children should reach the barrier and complete");
1955
1956 assert_eq!(results.len(), 2);
1957 assert!(
1958 client.max_active() >= 2,
1959 "expected concurrent child execution, max_active={}",
1960 client.max_active()
1961 );
1962 assert!(results[0].success);
1963 assert!(results[0].output.contains("slow branch"));
1964 assert!(results[1].success);
1965 assert!(results[1].output.contains("fast branch"));
1966 }
1967
1968 #[tokio::test]
1969 async fn parallel_task_executor_respects_configured_concurrency_limit() {
1970 let workspace = tempfile::tempdir().unwrap();
1971 let client = Arc::new(LimitedConcurrencyLlmClient::new());
1972 let executor = Arc::new(
1973 TaskExecutor::new(
1974 test_registry_with_text_worker(),
1975 client.clone(),
1976 workspace.path().to_string_lossy().to_string(),
1977 )
1978 .with_max_parallel_tasks(2),
1979 );
1980
1981 let tasks = (0..5)
1982 .map(|idx| TaskParams {
1983 agent: "worker".to_string(),
1984 description: format!("Task {idx}"),
1985 prompt: format!("branch {idx}"),
1986 background: false,
1987 max_steps: Some(1),
1988 })
1989 .collect::<Vec<_>>();
1990
1991 let results = executor.execute_parallel(tasks, None, None).await;
1992
1993 assert_eq!(results.len(), 5);
1994 assert!(results.iter().all(|result| result.success));
1995 assert_eq!(client.max_active(), 2);
1996 }
1997
1998 #[tokio::test]
1999 async fn parallel_task_executor_isolates_unknown_agent_failure() {
2000 let workspace = tempfile::tempdir().unwrap();
2001 let executor = Arc::new(TaskExecutor::new(
2002 test_registry_with_text_worker(),
2003 Arc::new(StaticLlmClient::new("valid branch done")),
2004 workspace.path().to_string_lossy().to_string(),
2005 ));
2006
2007 let tasks = vec![
2008 TaskParams {
2009 agent: "missing-agent".to_string(),
2010 description: "Missing".to_string(),
2011 prompt: "should fail".to_string(),
2012 background: false,
2013 max_steps: Some(1),
2014 },
2015 TaskParams {
2016 agent: "worker".to_string(),
2017 description: "Valid".to_string(),
2018 prompt: "should succeed".to_string(),
2019 background: false,
2020 max_steps: Some(1),
2021 },
2022 ];
2023
2024 let results = executor.execute_parallel(tasks, None, None).await;
2025
2026 assert_eq!(results.len(), 2);
2027 assert!(!results[0].success);
2028 assert_eq!(results[0].agent, "missing-agent");
2029 assert!(results[0].output.contains("Unknown agent type"));
2030 assert!(results[1].success);
2031 assert_eq!(results[1].agent, "worker");
2032 assert!(results[1].output.contains("valid branch done"));
2033 }
2034
2035 #[tokio::test]
2036 async fn parallel_task_executor_emits_subagent_events_for_each_child() {
2037 let workspace = tempfile::tempdir().unwrap();
2038 let executor = Arc::new(TaskExecutor::new(
2039 test_registry_with_text_worker(),
2040 Arc::new(StaticLlmClient::new("done")),
2041 workspace.path().to_string_lossy().to_string(),
2042 ));
2043 let (tx, mut rx) = broadcast::channel(64);
2044
2045 let tasks = vec![
2046 TaskParams {
2047 agent: "worker".to_string(),
2048 description: "One".to_string(),
2049 prompt: "first".to_string(),
2050 background: false,
2051 max_steps: Some(1),
2052 },
2053 TaskParams {
2054 agent: "worker".to_string(),
2055 description: "Two".to_string(),
2056 prompt: "second".to_string(),
2057 background: false,
2058 max_steps: Some(1),
2059 },
2060 ];
2061
2062 let results = executor.execute_parallel(tasks, Some(tx), None).await;
2063 assert_eq!(results.len(), 2);
2064 tokio::time::sleep(Duration::from_millis(20)).await;
2065
2066 let mut starts = Vec::new();
2067 let mut ends = Vec::new();
2068 let mut progress_statuses: Vec<String> = Vec::new();
2069 while let Ok(event) = rx.try_recv() {
2070 match event {
2071 AgentEvent::SubagentStart { description, .. } => starts.push(description),
2072 AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)),
2073 AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status),
2074 _ => {}
2075 }
2076 }
2077
2078 starts.sort();
2079 assert_eq!(starts, vec!["One".to_string(), "Two".to_string()]);
2080 assert_eq!(ends.len(), 2);
2081 assert!(ends
2082 .iter()
2083 .all(|(agent, success)| agent == "worker" && *success));
2084 assert!(
2087 progress_statuses
2088 .iter()
2089 .filter(|s| s == &"turn_completed")
2090 .count()
2091 >= 2,
2092 "expected at least two turn_completed progress events, got {:?}",
2093 progress_statuses
2094 );
2095 }
2096
2097 #[tokio::test]
2098 async fn parallel_task_tool_reports_error_when_any_child_fails() {
2099 let workspace = tempfile::tempdir().unwrap();
2100 let executor = Arc::new(TaskExecutor::new(
2101 test_registry_with_text_worker(),
2102 Arc::new(StaticLlmClient::new("valid branch done")),
2103 workspace.path().to_string_lossy().to_string(),
2104 ));
2105 let tool = ParallelTaskTool::new(executor);
2106 let ctx = ToolContext::new(workspace.path().to_path_buf());
2107
2108 let output = tool
2109 .execute(
2110 &serde_json::json!({
2111 "tasks": [
2112 {
2113 "agent": "missing-agent",
2114 "description": "Missing",
2115 "prompt": "should fail"
2116 },
2117 {
2118 "agent": "worker",
2119 "description": "Valid",
2120 "prompt": "should succeed"
2121 }
2122 ]
2123 }),
2124 &ctx,
2125 )
2126 .await
2127 .unwrap();
2128
2129 assert!(
2130 !output.success,
2131 "parallel_task should fail when any child result fails"
2132 );
2133 assert!(output.content.contains("[ERR]"));
2134 assert!(output.content.contains("[OK]"));
2135 let metadata = output.metadata.expect("metadata");
2136 assert_eq!(metadata["task_count"], 2);
2137 assert_eq!(metadata["results"][0]["success"], false);
2138 assert_eq!(metadata["results"][1]["success"], true);
2139 }
2140
2141 #[tokio::test]
2142 async fn parallel_task_both_inherit_permissions() {
2143 let workspace = tempfile::tempdir().unwrap();
2144 let mock = Arc::new(MockLlmClient::new(vec![
2145 MockLlmClient::tool_call_response(
2147 "t1",
2148 "write",
2149 serde_json::json!({
2150 "file_path": workspace.path().join("p1.txt").to_string_lossy(),
2151 "content": "P1"
2152 }),
2153 ),
2154 MockLlmClient::text_response("Done 1."),
2155 MockLlmClient::tool_call_response(
2157 "t2",
2158 "write",
2159 serde_json::json!({
2160 "file_path": workspace.path().join("p2.txt").to_string_lossy(),
2161 "content": "P2"
2162 }),
2163 ),
2164 MockLlmClient::text_response("Done 2."),
2165 ]));
2166
2167 let executor = Arc::new(TaskExecutor::new(
2168 test_registry_with_writer(),
2169 mock,
2170 workspace.path().to_string_lossy().to_string(),
2171 ));
2172
2173 let tasks = vec![
2174 TaskParams {
2175 agent: "writer".to_string(),
2176 description: "Write p1".to_string(),
2177 prompt: "Write p1.txt".to_string(),
2178 background: false,
2179 max_steps: Some(3),
2180 },
2181 TaskParams {
2182 agent: "writer".to_string(),
2183 description: "Write p2".to_string(),
2184 prompt: "Write p2.txt".to_string(),
2185 background: false,
2186 max_steps: Some(3),
2187 },
2188 ];
2189
2190 let results = executor.execute_parallel(tasks, None, None).await;
2191 assert_eq!(results.len(), 2);
2192
2193 for result in &results {
2194 assert!(
2195 result.success,
2196 "parallel child should succeed: {}",
2197 result.output
2198 );
2199 }
2200 }
2201
2202 #[test]
2203 fn synthesize_progress_emits_tool_completed_for_tool_end() {
2204 let event = AgentEvent::ToolEnd {
2205 id: "call-1".to_string(),
2206 name: "bash".to_string(),
2207 output: "hello".to_string(),
2208 exit_code: 0,
2209 metadata: None,
2210 error_kind: None,
2211 };
2212 let progress =
2213 synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2214 match progress {
2215 AgentEvent::SubagentProgress {
2216 task_id,
2217 session_id,
2218 status,
2219 metadata,
2220 } => {
2221 assert_eq!(task_id, "task-1");
2222 assert_eq!(session_id, "task-run-task-1");
2223 assert_eq!(status, "tool_completed");
2224 assert_eq!(metadata["tool"], "bash");
2225 assert_eq!(metadata["exit_code"], 0);
2226 assert_eq!(metadata["output_bytes"], 5);
2227 assert!(metadata.get("error_kind").is_none());
2228 }
2229 other => panic!("expected SubagentProgress, got {:?}", other),
2230 }
2231 }
2232
2233 #[test]
2234 fn synthesize_progress_includes_error_kind_when_present() {
2235 let event = AgentEvent::ToolEnd {
2236 id: "call-2".to_string(),
2237 name: "edit".to_string(),
2238 output: "boom".to_string(),
2239 exit_code: 1,
2240 metadata: None,
2241 error_kind: Some(crate::tools::ToolErrorKind::NotFound {
2242 path: "missing.txt".to_string(),
2243 }),
2244 };
2245 let progress =
2246 synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some");
2247 if let AgentEvent::SubagentProgress { metadata, .. } = progress {
2248 assert!(
2249 metadata.get("error_kind").is_some(),
2250 "error_kind should propagate into metadata"
2251 );
2252 } else {
2253 panic!("expected SubagentProgress");
2254 }
2255 }
2256
2257 #[test]
2258 fn synthesize_progress_emits_turn_completed_for_turn_end() {
2259 let event = AgentEvent::TurnEnd {
2260 turn: 3,
2261 usage: crate::llm::TokenUsage {
2262 prompt_tokens: 100,
2263 completion_tokens: 25,
2264 total_tokens: 125,
2265 cache_read_tokens: None,
2266 cache_write_tokens: None,
2267 },
2268 };
2269 let progress =
2270 synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2271 if let AgentEvent::SubagentProgress {
2272 status, metadata, ..
2273 } = progress
2274 {
2275 assert_eq!(status, "turn_completed");
2276 assert_eq!(metadata["turn"], 3);
2277 assert_eq!(metadata["total_tokens"], 125);
2278 assert_eq!(metadata["prompt_tokens"], 100);
2279 assert_eq!(metadata["completion_tokens"], 25);
2280 } else {
2281 panic!("expected SubagentProgress");
2282 }
2283 }
2284
2285 #[test]
2286 fn synthesize_progress_ignores_unrelated_events() {
2287 let ignored = [
2288 AgentEvent::TextDelta {
2289 text: "hi".to_string(),
2290 },
2291 AgentEvent::ToolStart {
2292 id: "x".to_string(),
2293 name: "bash".to_string(),
2294 },
2295 AgentEvent::TurnStart { turn: 1 },
2296 AgentEvent::SubagentStart {
2297 task_id: "nested".to_string(),
2298 session_id: "nested-run".to_string(),
2299 parent_session_id: "parent".to_string(),
2300 agent: "explore".to_string(),
2301 description: "nested".to_string(),
2302 },
2303 ];
2304 for event in &ignored {
2305 assert!(
2306 synthesize_subagent_progress(event, "task", "session").is_none(),
2307 "{:?} should not emit progress",
2308 event
2309 );
2310 }
2311 }
2312}