Skip to main content

a3s_code_core/tools/
task.rs

1//! Task tools for delegated child runs.
2//!
3//! The Task tool allows the main agent to delegate specialized work to focused
4//! child runs. Each child run gets bounded context and the permissions declared
5//! by its agent definition.
6//!
7//! ## Usage
8//!
9//! ```json
10//! {
11//!   "agent": "explore",
12//!   "description": "Find authentication code",
13//!   "prompt": "Search for files related to user authentication..."
14//! }
15//! ```
16
17use crate::agent::{AgentConfig, AgentEvent, AgentLoop};
18use crate::llm::structured::{generate_blocking, StructuredMode, StructuredRequest};
19use crate::llm::LlmClient;
20use crate::mcp::manager::McpManager;
21use crate::orchestration::{AgentExecutor, AgentStepSpec, StepOutcome};
22use crate::subagent::AgentRegistry;
23use crate::tools::types::{Tool, ToolContext, ToolOutput};
24use anyhow::{Context, Result};
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use std::path::PathBuf;
28use std::sync::Arc;
29use tokio::sync::broadcast;
30
31const TASK_OUTPUT_CONTEXT_LIMIT: usize = 4_000;
32const TASK_OUTPUT_CONTEXT_HEAD: usize = 3_000;
33const TASK_OUTPUT_CONTEXT_TAIL: usize = 800;
34
35/// Task tool parameters
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(deny_unknown_fields)]
38pub struct TaskParams {
39    /// Agent type to use (explore, general, plan, verification, review, etc.)
40    pub agent: String,
41    /// Short description of the task (for display)
42    pub description: String,
43    /// Detailed prompt for the agent
44    pub prompt: String,
45    /// Optional: run in background (default: false)
46    #[serde(default)]
47    pub background: bool,
48    /// Optional: maximum steps for this task
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub max_steps: Option<usize>,
51}
52
53/// Task tool result
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct TaskResult {
56    /// Task output from the delegated child run.
57    pub output: String,
58    /// Child session ID
59    pub session_id: String,
60    /// Agent type used
61    pub agent: String,
62    /// Whether the task succeeded
63    pub success: bool,
64    /// Task ID for tracking
65    pub task_id: String,
66}
67
68fn compact_task_output(output: &str) -> (String, bool) {
69    if output.len() <= TASK_OUTPUT_CONTEXT_LIMIT {
70        return (output.to_string(), false);
71    }
72
73    let head = crate::text::truncate_utf8(output, TASK_OUTPUT_CONTEXT_HEAD);
74    let tail_start = output
75        .char_indices()
76        .find_map(|(idx, _)| {
77            if output.len().saturating_sub(idx) <= TASK_OUTPUT_CONTEXT_TAIL {
78                Some(idx)
79            } else {
80                None
81            }
82        })
83        .unwrap_or(output.len());
84    let tail = &output[tail_start..];
85
86    (
87        format!(
88            "{}\n\n[{} bytes omitted from delegated task output]\n\n{}",
89            head,
90            output.len().saturating_sub(head.len() + tail.len()),
91            tail
92        ),
93        true,
94    )
95}
96
97/// Translate selected child-loop events into a `SubagentProgress` milestone
98/// for the parent broadcast. Returns `None` for events that aren't worth
99/// surfacing as progress (text deltas, tool starts, subagent events from
100/// nested delegation, etc.).
101///
102/// Currently emits progress for:
103/// - `ToolEnd`   → `status = "tool_completed"`,
104///   `metadata = { tool, exit_code, output_bytes, error_kind? }`
105/// - `TurnEnd`   → `status = "turn_completed"`,
106///   `metadata = { turn, total_tokens, prompt_tokens, completion_tokens }`
107fn synthesize_subagent_progress(
108    event: &AgentEvent,
109    task_id: &str,
110    session_id: &str,
111) -> Option<AgentEvent> {
112    match event {
113        AgentEvent::ToolEnd {
114            name,
115            output,
116            exit_code,
117            error_kind,
118            ..
119        } => {
120            let mut metadata = serde_json::json!({
121                "tool": name,
122                "exit_code": exit_code,
123                "output_bytes": output.len(),
124            });
125            if let Some(kind) = error_kind {
126                metadata["error_kind"] =
127                    serde_json::to_value(kind).unwrap_or(serde_json::Value::Null);
128            }
129            Some(AgentEvent::SubagentProgress {
130                task_id: task_id.to_string(),
131                session_id: session_id.to_string(),
132                status: "tool_completed".to_string(),
133                metadata,
134            })
135        }
136        AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress {
137            task_id: task_id.to_string(),
138            session_id: session_id.to_string(),
139            status: "turn_completed".to_string(),
140            metadata: serde_json::json!({
141                "turn": turn,
142                "total_tokens": usage.total_tokens,
143                "prompt_tokens": usage.prompt_tokens,
144                "completion_tokens": usage.completion_tokens,
145            }),
146        }),
147        _ => None,
148    }
149}
150
151fn task_artifact_id(result: &TaskResult) -> String {
152    format!("task-output:{}", result.task_id)
153}
154
155fn task_artifact_uri(result: &TaskResult) -> String {
156    format!(
157        "a3s://tasks/{}/runs/{}/output",
158        result.session_id, result.task_id
159    )
160}
161
162fn format_task_result_for_context(result: &TaskResult) -> (String, bool) {
163    let (output, truncated) = compact_task_output(&result.output);
164    let status = if result.success {
165        "completed"
166    } else {
167        "failed"
168    };
169    let artifact_id = task_artifact_id(result);
170    let artifact_uri = task_artifact_uri(result);
171    let mut formatted = format!(
172        "Task {status}: {}\nAgent: {}\nSession: {}\nTask ID: {}\nArtifact ID: {}\nArtifact URI: {}\n",
173        result.task_id, result.agent, result.session_id, result.task_id, artifact_id, artifact_uri
174    );
175    if truncated {
176        formatted.push_str(
177            "Output excerpt: truncated for parent context. Use the artifact URI or child run session/events if exact omitted content is needed.\n",
178        );
179    } else {
180        formatted.push_str("Output:\n");
181    }
182    formatted.push_str(&output);
183    (formatted, truncated)
184}
185
186/// Task executor for delegated child runs.
187pub struct TaskExecutor {
188    /// Agent registry for looking up agent definitions
189    registry: Arc<AgentRegistry>,
190    /// LLM client used to power child agent loops
191    llm_client: Arc<dyn LlmClient>,
192    /// Workspace path shared with child agents
193    workspace: String,
194    /// Optional MCP manager for registering MCP tools in child sessions
195    mcp_manager: Option<Arc<McpManager>>,
196    /// Parent capabilities to inherit into child runs.
197    parent_context: Option<crate::child_run::ChildRunContext>,
198    max_parallel_tasks: usize,
199    /// Optional shared tracker — when present each task registers a
200    /// `CancellationToken` so callers can cancel by `task_id`.
201    subagent_tracker: Option<Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>>,
202}
203
204impl TaskExecutor {
205    /// Create a new task executor
206    pub fn new(
207        registry: Arc<AgentRegistry>,
208        llm_client: Arc<dyn LlmClient>,
209        workspace: String,
210    ) -> Self {
211        Self {
212            registry,
213            llm_client,
214            workspace,
215            mcp_manager: None,
216            parent_context: None,
217            max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
218            subagent_tracker: None,
219        }
220    }
221
222    /// Create a new task executor with MCP manager for tool inheritance
223    pub fn with_mcp(
224        registry: Arc<AgentRegistry>,
225        llm_client: Arc<dyn LlmClient>,
226        workspace: String,
227        mcp_manager: Arc<McpManager>,
228    ) -> Self {
229        Self {
230            registry,
231            llm_client,
232            workspace,
233            mcp_manager: Some(mcp_manager),
234            parent_context: None,
235            max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS,
236            subagent_tracker: None,
237        }
238    }
239
240    /// Set parent session capabilities to inherit into child runs.
241    pub fn with_parent_context(mut self, ctx: crate::child_run::ChildRunContext) -> Self {
242        if let Some(max_parallel_tasks) = ctx.max_parallel_tasks {
243            self.max_parallel_tasks = max_parallel_tasks.max(1);
244        }
245        self.parent_context = Some(ctx);
246        self
247    }
248
249    pub fn with_max_parallel_tasks(mut self, max_parallel_tasks: usize) -> Self {
250        self.max_parallel_tasks = max_parallel_tasks.max(1);
251        self
252    }
253
254    /// Share a tracker with this executor. When set, each task registers
255    /// a `CancellationToken` against the tracker so the parent session
256    /// can cancel by `task_id`.
257    pub fn with_subagent_tracker(
258        mut self,
259        tracker: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
260    ) -> Self {
261        self.subagent_tracker = Some(tracker);
262        self
263    }
264
265    /// Execute a task by spawning an isolated child AgentLoop.
266    ///
267    /// `parent_session_id` flows into the emitted `SubagentStart`/`SubagentEnd`
268    /// events so dashboards can associate child runs with the parent session.
269    pub async fn execute(
270        &self,
271        params: TaskParams,
272        event_tx: Option<broadcast::Sender<AgentEvent>>,
273        parent_session_id: Option<&str>,
274    ) -> Result<TaskResult> {
275        let task_id = format!("task-{}", uuid::Uuid::new_v4());
276        self.execute_with_task_id(task_id, params, event_tx, parent_session_id, true)
277            .await
278    }
279
280    /// Execute a task using a caller-supplied task id. Used by `execute_background`
281    /// so the synchronously-returned task id matches the one in lifecycle events.
282    /// When `emit_start` is `false` the caller is responsible for emitting
283    /// `SubagentStart` themselves (e.g. to avoid a race against a tracker query).
284    pub async fn execute_with_task_id(
285        &self,
286        task_id: String,
287        params: TaskParams,
288        event_tx: Option<broadcast::Sender<AgentEvent>>,
289        parent_session_id: Option<&str>,
290        emit_start: bool,
291    ) -> Result<TaskResult> {
292        let session_id = format!("task-run-{}", task_id);
293
294        let agent = self
295            .registry
296            .get(&params.agent)
297            .context(format!("Unknown agent type: '{}'", params.agent))?;
298
299        if emit_start {
300            if let Some(ref tx) = event_tx {
301                let _ = tx.send(AgentEvent::SubagentStart {
302                    task_id: task_id.clone(),
303                    session_id: session_id.clone(),
304                    parent_session_id: parent_session_id.unwrap_or_default().to_string(),
305                    agent: params.agent.clone(),
306                    description: params.description.clone(),
307                });
308            }
309        }
310
311        // Build a child ToolExecutor. Task tools are intentionally omitted
312        // here to prevent unlimited delegation nesting.
313        let child_executor = if let Some(ref parent_ctx) = self.parent_context {
314            if let Some(ref services) = parent_ctx.workspace_services {
315                crate::tools::ToolExecutor::new_with_workspace_services_and_artifact_limits(
316                    self.workspace.clone(),
317                    Arc::clone(services),
318                    crate::tools::ArtifactStoreLimits::default(),
319                )
320            } else {
321                crate::tools::ToolExecutor::new(self.workspace.clone())
322            }
323        } else {
324            crate::tools::ToolExecutor::new(self.workspace.clone())
325        };
326
327        // Register MCP tools so child agents can access MCP servers.
328        if let Some(ref mcp) = self.mcp_manager {
329            let all_tools = mcp.get_all_tools().await;
330            let mut by_server: std::collections::HashMap<
331                String,
332                Vec<crate::mcp::protocol::McpTool>,
333            > = std::collections::HashMap::new();
334            for (server, tool) in all_tools {
335                by_server.entry(server).or_default().push(tool);
336            }
337            for (server_name, tools) in by_server {
338                let wrappers =
339                    crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp));
340                for wrapper in wrappers {
341                    child_executor.register_dynamic_tool(wrapper);
342                }
343            }
344        }
345
346        let child_executor = Arc::new(child_executor);
347
348        let mut child_config = AgentConfig {
349            tools: child_executor.definitions(),
350            ..AgentConfig::default()
351        };
352        agent.apply_to(&mut child_config);
353        if let Some(ref parent_ctx) = self.parent_context {
354            parent_ctx.apply_to(&mut child_config);
355        }
356        if let Some(max_steps) = params.max_steps {
357            child_config.max_tool_rounds = max_steps;
358        }
359
360        let mut tool_context =
361            ToolContext::new(PathBuf::from(&self.workspace)).with_session_id(session_id.clone());
362        if let Some(ref parent_ctx) = self.parent_context {
363            if let Some(ref services) = parent_ctx.workspace_services {
364                tool_context = tool_context.with_workspace_services(Arc::clone(services));
365            }
366        }
367
368        let agent_loop = AgentLoop::new(
369            Arc::clone(&self.llm_client),
370            child_executor,
371            tool_context,
372            child_config,
373        );
374
375        // Create an mpsc channel for the child agent and forward events to broadcast.
376        // Selected child events (ToolEnd, TurnEnd) are also surfaced to the parent
377        // broadcast as synthetic `SubagentProgress` events so dashboards can observe
378        // mid-task milestones without subscribing to the raw event stream.
379        let child_event_tx = if let Some(ref broadcast_tx) = event_tx {
380            let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100);
381            let broadcast_tx_clone = broadcast_tx.clone();
382            let progress_task_id = task_id.clone();
383            let progress_session_id = session_id.clone();
384
385            tokio::spawn(async move {
386                while let Some(event) = mpsc_rx.recv().await {
387                    if let Some(progress) = synthesize_subagent_progress(
388                        &event,
389                        &progress_task_id,
390                        &progress_session_id,
391                    ) {
392                        let _ = broadcast_tx_clone.send(progress);
393                    }
394                    let _ = broadcast_tx_clone.send(event);
395                }
396            });
397
398            Some(mpsc_tx)
399        } else {
400            None
401        };
402
403        // Register a CancellationToken with the tracker (if shared) so the
404        // parent session's `cancel_subagent_task` can interrupt this run.
405        let cancel_token = tokio_util::sync::CancellationToken::new();
406        if let Some(ref tracker) = self.subagent_tracker {
407            tracker
408                .register_canceller(&task_id, cancel_token.clone())
409                .await;
410        }
411
412        let (output, success) = match agent_loop
413            .execute_with_session(
414                &[],
415                &params.prompt,
416                Some(&session_id),
417                child_event_tx,
418                Some(&cancel_token),
419            )
420            .await
421        {
422            Ok(result) => (result.text, true),
423            Err(e) if cancel_token.is_cancelled() => {
424                (format!("Task cancelled by caller: {}", e), false)
425            }
426            Err(e) => (format!("Task failed: {}", e), false),
427        };
428
429        if let Some(ref tracker) = self.subagent_tracker {
430            tracker.clear_canceller(&task_id).await;
431        }
432
433        if let Some(ref tx) = event_tx {
434            let _ = tx.send(AgentEvent::SubagentEnd {
435                task_id: task_id.clone(),
436                session_id: session_id.clone(),
437                agent: params.agent.clone(),
438                output: output.clone(),
439                success,
440            });
441        }
442
443        Ok(TaskResult {
444            output,
445            session_id,
446            agent: params.agent,
447            success,
448            task_id,
449        })
450    }
451
452    /// Execute a task in the background.
453    ///
454    /// Returns immediately with the task ID; the same id is used in the emitted
455    /// `SubagentStart`/`SubagentEnd` events so callers can correlate. Pre-emits
456    /// `SubagentStart` synchronously when an event channel is available so a
457    /// caller that queries the subagent task tracker right after this call
458    /// observes the task in `Running` state without a race window.
459    pub fn execute_background(
460        self: Arc<Self>,
461        params: TaskParams,
462        event_tx: Option<broadcast::Sender<AgentEvent>>,
463        parent_session_id: Option<String>,
464    ) -> String {
465        let task_id = format!("task-{}", uuid::Uuid::new_v4());
466        let session_id = format!("task-run-{}", task_id);
467
468        if let Some(ref tx) = event_tx {
469            let _ = tx.send(AgentEvent::SubagentStart {
470                task_id: task_id.clone(),
471                session_id,
472                parent_session_id: parent_session_id.clone().unwrap_or_default(),
473                agent: params.agent.clone(),
474                description: params.description.clone(),
475            });
476        }
477
478        let task_id_for_spawn = task_id.clone();
479        let task_id_for_log = task_id.clone();
480        tokio::spawn(async move {
481            if let Err(e) = self
482                .execute_with_task_id(
483                    task_id_for_spawn,
484                    params,
485                    event_tx,
486                    parent_session_id.as_deref(),
487                    false,
488                )
489                .await
490            {
491                tracing::error!("Background task {} failed: {}", task_id_for_log, e);
492            }
493        });
494
495        task_id
496    }
497
498    /// Execute multiple tasks in parallel.
499    ///
500    /// Spawns all tasks concurrently and waits for all to complete.
501    /// Returns results in the same order as the input tasks. Routed through
502    /// the [`AgentExecutor`](crate::orchestration::AgentExecutor) seam so the
503    /// same fan-out works whether steps run locally (default) or are placed
504    /// on remote nodes by a host.
505    pub async fn execute_parallel(
506        self: &Arc<Self>,
507        tasks: Vec<TaskParams>,
508        event_tx: Option<broadcast::Sender<AgentEvent>>,
509        parent_session_id: Option<&str>,
510    ) -> Vec<TaskResult> {
511        let parent = parent_session_id.map(|s| s.to_string());
512        let specs = tasks
513            .into_iter()
514            .map(|params| AgentStepSpec {
515                task_id: format!("task-{}", uuid::Uuid::new_v4()),
516                agent: params.agent,
517                description: params.description,
518                prompt: params.prompt,
519                max_steps: params.max_steps,
520                parent_session_id: parent.clone(),
521                output_schema: None,
522            })
523            .collect();
524
525        let executor: Arc<dyn AgentExecutor> = Arc::<Self>::clone(self);
526        crate::orchestration::execute_steps_parallel(executor, specs, event_tx)
527            .await
528            .into_iter()
529            .map(TaskResult::from)
530            .collect()
531    }
532}
533
534impl From<TaskResult> for StepOutcome {
535    fn from(r: TaskResult) -> Self {
536        StepOutcome {
537            task_id: r.task_id,
538            session_id: r.session_id,
539            agent: r.agent,
540            output: r.output,
541            success: r.success,
542            structured: None,
543        }
544    }
545}
546
547impl From<StepOutcome> for TaskResult {
548    fn from(o: StepOutcome) -> Self {
549        TaskResult {
550            output: o.output,
551            session_id: o.session_id,
552            agent: o.agent,
553            success: o.success,
554            task_id: o.task_id,
555        }
556    }
557}
558
559/// The local, in-process executor: every step runs as a child `AgentLoop` on
560/// this node's tokio runtime. This is the default; a host (书安OS) substitutes
561/// its own [`AgentExecutor`] to place steps across a cluster.
562#[async_trait]
563impl AgentExecutor for TaskExecutor {
564    async fn execute_step(
565        &self,
566        spec: AgentStepSpec,
567        event_tx: Option<broadcast::Sender<AgentEvent>>,
568    ) -> StepOutcome {
569        let agent = spec.agent.clone();
570        let task_id = spec.task_id.clone();
571        let output_schema = spec.output_schema.clone();
572        let params = TaskParams {
573            agent: spec.agent,
574            description: spec.description,
575            prompt: spec.prompt,
576            background: false,
577            max_steps: spec.max_steps,
578        };
579        let mut outcome: StepOutcome = match self
580            .execute_with_task_id(
581                task_id.clone(),
582                params,
583                event_tx,
584                spec.parent_session_id.as_deref(),
585                true,
586            )
587            .await
588        {
589            Ok(result) => result.into(),
590            Err(e) => return StepOutcome::failed(task_id, agent, format!("Task failed: {e}")),
591        };
592
593        // When the step requested structured output, coerce the (succeeded)
594        // free-text result to the schema. A coercion failure demotes the step
595        // to unsuccessful so callers never treat unvalidated text as the
596        // promised object.
597        if outcome.success {
598            if let Some(schema) = output_schema {
599                match self.coerce_to_schema(&outcome.output, schema).await {
600                    Ok(object) => outcome.structured = Some(object),
601                    Err(e) => {
602                        outcome.success = false;
603                        outcome.output =
604                            format!("{}\n\n[structured output failed: {e}]", outcome.output);
605                    }
606                }
607            }
608        }
609        outcome
610    }
611
612    fn concurrency_hint(&self) -> usize {
613        self.max_parallel_tasks
614    }
615}
616
617impl TaskExecutor {
618    /// Coerce a step's free-text output into a JSON object validated against
619    /// `schema`, reusing the structured-output machinery (Tool mode — the most
620    /// portable across providers, with built-in repair). This is one extra LLM
621    /// call beyond the step's own run.
622    async fn coerce_to_schema(
623        &self,
624        output: &str,
625        schema: serde_json::Value,
626    ) -> Result<serde_json::Value> {
627        let req = StructuredRequest {
628            prompt: format!(
629                "Convert the following task result into a single JSON object that conforms to \
630                 the required schema. Use only information present in the result.\n\n\
631                 --- TASK RESULT ---\n{output}"
632            ),
633            system: Some(
634                "You output exactly one JSON object matching the provided schema.".to_string(),
635            ),
636            schema,
637            schema_name: "step_output".to_string(),
638            schema_description: None,
639            // Tool mode works on every provider that supports tool use and
640            // does not depend on response_format wiring.
641            mode: StructuredMode::Tool,
642            max_repair_attempts: 2,
643        };
644        let result = generate_blocking(&*self.llm_client, &req).await?;
645        Ok(result.object)
646    }
647}
648
649/// Get the JSON schema for TaskParams
650pub fn task_params_schema() -> serde_json::Value {
651    serde_json::json!({
652        "type": "object",
653        "additionalProperties": false,
654        "properties": {
655            "agent": {
656                "type": "string",
657                "description": "Required. Canonical agent type to use (for example: explore, general, plan, verification, review). Always provide this exact field name: 'agent'."
658            },
659            "description": {
660                "type": "string",
661                "description": "Required. Short task label for display and tracking. Always provide this exact field name: 'description'."
662            },
663            "prompt": {
664                "type": "string",
665                "description": "Required. Detailed instruction for the delegated child run. Always provide this exact field name: 'prompt'."
666            },
667            "background": {
668                "type": "boolean",
669                "description": "Optional. Run the task in the background. Default: false.",
670                "default": false
671            },
672            "max_steps": {
673                "type": "integer",
674                "description": "Optional. Maximum number of steps for this task."
675            }
676        },
677        "required": ["agent", "description", "prompt"],
678        "examples": [
679            {
680                "agent": "explore",
681                "description": "Find Rust files",
682                "prompt": "Search the workspace for Rust files and summarize the layout."
683            },
684            {
685                "agent": "general",
686                "description": "Investigate test failure",
687                "prompt": "Inspect the failing tests and explain the root cause.",
688                "max_steps": 6
689            }
690        ]
691    })
692}
693
694/// TaskTool wraps TaskExecutor as a Tool for registration in ToolExecutor.
695/// This allows the LLM to delegate tasks through the standard tool interface.
696pub struct TaskTool {
697    executor: Arc<TaskExecutor>,
698}
699
700impl TaskTool {
701    /// Create a new TaskTool
702    pub fn new(executor: Arc<TaskExecutor>) -> Self {
703        Self { executor }
704    }
705}
706
707#[async_trait]
708impl Tool for TaskTool {
709    fn name(&self) -> &str {
710        "task"
711    }
712
713    fn description(&self) -> &str {
714        "Delegate a bounded task to a specialized child run. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
715    }
716
717    fn parameters(&self) -> serde_json::Value {
718        task_params_schema()
719    }
720
721    async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
722        let params: TaskParams =
723            serde_json::from_value(args.clone()).context("Invalid task parameters")?;
724
725        if params.background {
726            let task_id = Arc::clone(&self.executor).execute_background(
727                params,
728                ctx.agent_event_tx.clone(),
729                ctx.session_id.clone(),
730            );
731            return Ok(ToolOutput::success(format!(
732                "Task started in background. Task ID: {}",
733                task_id
734            )));
735        }
736
737        let result = self
738            .executor
739            .execute(
740                params,
741                ctx.agent_event_tx.clone(),
742                ctx.session_id.as_deref(),
743            )
744            .await?;
745        let (content, truncated) = format_task_result_for_context(&result);
746        let metadata = serde_json::json!({
747            "task_id": result.task_id,
748            "session_id": result.session_id,
749            "agent": result.agent,
750            "success": result.success,
751            "output_bytes": result.output.len(),
752            "truncated_for_context": truncated,
753            "artifact_id": task_artifact_id(&result),
754            "artifact_uri": task_artifact_uri(&result),
755        });
756
757        if result.success {
758            Ok(ToolOutput::success(content).with_metadata(metadata))
759        } else {
760            Ok(ToolOutput::error(content).with_metadata(metadata))
761        }
762    }
763}
764
765/// Parameters for parallel task execution
766#[derive(Debug, Clone, Serialize, Deserialize)]
767#[serde(deny_unknown_fields)]
768pub struct ParallelTaskParams {
769    /// List of tasks to execute concurrently
770    pub tasks: Vec<TaskParams>,
771}
772
773/// Get the JSON schema for ParallelTaskParams
774pub fn parallel_task_params_schema() -> serde_json::Value {
775    serde_json::json!({
776        "type": "object",
777        "additionalProperties": false,
778        "properties": {
779            "tasks": {
780                "type": "array",
781                "description": "List of tasks to execute in parallel. Each task runs as an independent delegated child run concurrently.",
782                "items": {
783                    "type": "object",
784                    "additionalProperties": false,
785                    "properties": {
786                        "agent": {
787                            "type": "string",
788                            "description": "Required. Canonical agent type for this task."
789                        },
790                        "description": {
791                            "type": "string",
792                            "description": "Required. Short task label for display and tracking."
793                        },
794                        "prompt": {
795                            "type": "string",
796                            "description": "Required. Detailed instruction for the delegated child run."
797                        }
798                    },
799                    "required": ["agent", "description", "prompt"]
800                },
801                "minItems": 1
802            }
803        },
804        "required": ["tasks"],
805        "examples": [
806            {
807                "tasks": [
808                    {
809                        "agent": "explore",
810                        "description": "Find Rust files",
811                        "prompt": "List Rust files under src/."
812                    },
813                    {
814                        "agent": "explore",
815                        "description": "Find tests",
816                        "prompt": "List test files and summarize their purpose."
817                    }
818                ]
819            }
820        ]
821    })
822}
823
824/// ParallelTaskTool allows the LLM to fan out multiple delegated tasks concurrently.
825///
826/// All tasks execute in parallel and the tool returns when all complete.
827pub struct ParallelTaskTool {
828    executor: Arc<TaskExecutor>,
829}
830
831impl ParallelTaskTool {
832    /// Create a new ParallelTaskTool
833    pub fn new(executor: Arc<TaskExecutor>) -> Self {
834        Self { executor }
835    }
836}
837
838#[async_trait]
839impl Tool for ParallelTaskTool {
840    fn name(&self) -> &str {
841        "parallel_task"
842    }
843
844    fn description(&self) -> &str {
845        "Execute multiple delegated child runs in parallel. All tasks run concurrently and results are returned when all complete. Built-in agents: explore (read-only codebase search), general/general-purpose (full access multi-step), plan (read-only planning), verification (adversarial validation), review (code review). Custom agents from agent_dirs and .a3s/agents are also available; .claude/agents is read for compatibility."
846    }
847
848    fn parameters(&self) -> serde_json::Value {
849        parallel_task_params_schema()
850    }
851
852    async fn execute(&self, args: &serde_json::Value, ctx: &ToolContext) -> Result<ToolOutput> {
853        let params: ParallelTaskParams =
854            serde_json::from_value(args.clone()).context("Invalid parallel task parameters")?;
855
856        if params.tasks.is_empty() {
857            return Ok(ToolOutput::error("No tasks provided".to_string()));
858        }
859
860        let task_count = params.tasks.len();
861
862        let results = self
863            .executor
864            .execute_parallel(
865                params.tasks,
866                ctx.agent_event_tx.clone(),
867                ctx.session_id.as_deref(),
868            )
869            .await;
870
871        // Format results with compact per-task excerpts for parent context.
872        let mut output = format!("Executed {} tasks in parallel:\n\n", task_count);
873        let mut metadata_results = Vec::new();
874        for (i, result) in results.iter().enumerate() {
875            let status = if result.success { "[OK]" } else { "[ERR]" };
876            let (formatted, truncated) = format_task_result_for_context(result);
877            metadata_results.push(serde_json::json!({
878                "task_id": result.task_id,
879                "session_id": result.session_id,
880                "agent": result.agent,
881                "success": result.success,
882                "output": formatted.clone(),
883                "output_bytes": result.output.len(),
884                "truncated_for_context": truncated,
885                "artifact_id": task_artifact_id(result),
886                "artifact_uri": task_artifact_uri(result),
887            }));
888            output.push_str(&format!(
889                "--- Task {} ({}) {} ---\n{}\n\n",
890                i + 1,
891                result.agent,
892                status,
893                formatted
894            ));
895        }
896
897        let all_success = results.iter().all(|result| result.success);
898        let output = if all_success {
899            ToolOutput::success(output)
900        } else {
901            ToolOutput::error(output)
902        };
903
904        Ok(output.with_metadata(serde_json::json!({
905            "task_count": task_count,
906            "results": metadata_results,
907        })))
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914
915    #[test]
916    fn test_task_params_deserialize() {
917        let json = r#"{
918            "agent": "explore",
919            "description": "Find auth code",
920            "prompt": "Search for authentication files"
921        }"#;
922
923        let params: TaskParams = serde_json::from_str(json).unwrap();
924        assert_eq!(params.agent, "explore");
925        assert_eq!(params.description, "Find auth code");
926        assert!(!params.background);
927    }
928
929    #[test]
930    fn test_task_params_with_background() {
931        let json = r#"{
932            "agent": "general",
933            "description": "Long task",
934            "prompt": "Do something complex",
935            "background": true
936        }"#;
937
938        let params: TaskParams = serde_json::from_str(json).unwrap();
939        assert!(params.background);
940    }
941
942    #[test]
943    fn test_task_params_with_max_steps() {
944        let json = r#"{
945            "agent": "plan",
946            "description": "Planning task",
947            "prompt": "Create a plan",
948            "max_steps": 10
949        }"#;
950
951        let params: TaskParams = serde_json::from_str(json).unwrap();
952        assert_eq!(params.agent, "plan");
953        assert_eq!(params.max_steps, Some(10));
954        assert!(!params.background);
955    }
956
957    #[test]
958    fn test_task_params_all_fields() {
959        let json = r#"{
960            "agent": "general",
961            "description": "Complex task",
962            "prompt": "Do everything",
963            "background": true,
964            "max_steps": 20
965        }"#;
966
967        let params: TaskParams = serde_json::from_str(json).unwrap();
968        assert_eq!(params.agent, "general");
969        assert_eq!(params.description, "Complex task");
970        assert_eq!(params.prompt, "Do everything");
971        assert!(params.background);
972        assert_eq!(params.max_steps, Some(20));
973    }
974
975    #[test]
976    fn test_task_params_missing_required_field() {
977        let json = r#"{
978            "agent": "explore",
979            "description": "Missing prompt"
980        }"#;
981
982        let result: Result<TaskParams, _> = serde_json::from_str(json);
983        assert!(result.is_err());
984    }
985
986    #[test]
987    fn test_task_params_serialize() {
988        let params = TaskParams {
989            agent: "explore".to_string(),
990            description: "Test task".to_string(),
991            prompt: "Test prompt".to_string(),
992            background: false,
993            max_steps: Some(5),
994        };
995
996        let json = serde_json::to_string(&params).unwrap();
997        assert!(json.contains("explore"));
998        assert!(json.contains("Test task"));
999        assert!(json.contains("Test prompt"));
1000    }
1001
1002    #[test]
1003    fn test_task_params_clone() {
1004        let params = TaskParams {
1005            agent: "explore".to_string(),
1006            description: "Test".to_string(),
1007            prompt: "Prompt".to_string(),
1008            background: true,
1009            max_steps: None,
1010        };
1011
1012        let cloned = params.clone();
1013        assert_eq!(params.agent, cloned.agent);
1014        assert_eq!(params.description, cloned.description);
1015        assert_eq!(params.background, cloned.background);
1016    }
1017
1018    #[test]
1019    fn test_task_result_serialize() {
1020        let result = TaskResult {
1021            output: "Found 5 files".to_string(),
1022            session_id: "session-123".to_string(),
1023            agent: "explore".to_string(),
1024            success: true,
1025            task_id: "task-456".to_string(),
1026        };
1027
1028        let json = serde_json::to_string(&result).unwrap();
1029        assert!(json.contains("Found 5 files"));
1030        assert!(json.contains("explore"));
1031    }
1032
1033    #[test]
1034    fn test_task_result_deserialize() {
1035        let json = r#"{
1036            "output": "Task completed",
1037            "session_id": "sess-789",
1038            "agent": "general",
1039            "success": false,
1040            "task_id": "task-123"
1041        }"#;
1042
1043        let result: TaskResult = serde_json::from_str(json).unwrap();
1044        assert_eq!(result.output, "Task completed");
1045        assert_eq!(result.session_id, "sess-789");
1046        assert_eq!(result.agent, "general");
1047        assert!(!result.success);
1048        assert_eq!(result.task_id, "task-123");
1049    }
1050
1051    #[test]
1052    fn test_task_result_clone() {
1053        let result = TaskResult {
1054            output: "Output".to_string(),
1055            session_id: "session-1".to_string(),
1056            agent: "explore".to_string(),
1057            success: true,
1058            task_id: "task-1".to_string(),
1059        };
1060
1061        let cloned = result.clone();
1062        assert_eq!(result.output, cloned.output);
1063        assert_eq!(result.success, cloned.success);
1064    }
1065
1066    #[test]
1067    fn test_compact_task_output_preserves_small_output() {
1068        let (output, truncated) = compact_task_output("short result");
1069        assert_eq!(output, "short result");
1070        assert!(!truncated);
1071    }
1072
1073    #[test]
1074    fn test_format_task_result_for_context_truncates_large_output() {
1075        let result = TaskResult {
1076            output: format!("{}TAIL", "x".repeat(TASK_OUTPUT_CONTEXT_LIMIT + 500)),
1077            session_id: "session-1".to_string(),
1078            agent: "explore".to_string(),
1079            success: true,
1080            task_id: "task-1".to_string(),
1081        };
1082
1083        let (formatted, truncated) = format_task_result_for_context(&result);
1084        assert!(truncated);
1085        assert!(formatted.contains("Output excerpt"));
1086        assert!(formatted.contains("bytes omitted"));
1087        assert!(formatted.contains("Artifact ID: task-output:task-1"));
1088        assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1089        assert!(formatted.contains("TAIL"));
1090        assert!(formatted.len() < result.output.len());
1091    }
1092
1093    #[test]
1094    fn test_task_artifact_reference_is_stable() {
1095        let result = TaskResult {
1096            output: "done".to_string(),
1097            session_id: "session-1".to_string(),
1098            agent: "explore".to_string(),
1099            success: true,
1100            task_id: "task-1".to_string(),
1101        };
1102
1103        assert_eq!(task_artifact_id(&result), "task-output:task-1");
1104        assert_eq!(
1105            task_artifact_uri(&result),
1106            "a3s://tasks/session-1/runs/task-1/output"
1107        );
1108
1109        let (formatted, truncated) = format_task_result_for_context(&result);
1110        assert!(!truncated);
1111        assert!(formatted.contains("Artifact URI: a3s://tasks/session-1/runs/task-1/output"));
1112    }
1113
1114    #[test]
1115    fn test_task_params_schema() {
1116        let schema = task_params_schema();
1117        assert_eq!(schema["type"], "object");
1118        assert_eq!(schema["additionalProperties"], false);
1119        assert!(schema["properties"]["agent"].is_object());
1120        assert!(schema["properties"]["prompt"].is_object());
1121    }
1122
1123    #[test]
1124    fn test_task_params_schema_required_fields() {
1125        let schema = task_params_schema();
1126        let required = schema["required"].as_array().unwrap();
1127        assert!(required.contains(&serde_json::json!("agent")));
1128        assert!(required.contains(&serde_json::json!("description")));
1129        assert!(required.contains(&serde_json::json!("prompt")));
1130    }
1131
1132    #[test]
1133    fn test_task_params_schema_properties() {
1134        let schema = task_params_schema();
1135        let props = &schema["properties"];
1136
1137        assert_eq!(props["agent"]["type"], "string");
1138        assert_eq!(props["description"]["type"], "string");
1139        assert_eq!(props["prompt"]["type"], "string");
1140        assert_eq!(props["background"]["type"], "boolean");
1141        assert_eq!(props["background"]["default"], false);
1142        assert_eq!(props["max_steps"]["type"], "integer");
1143    }
1144
1145    #[test]
1146    fn test_task_params_schema_descriptions() {
1147        let schema = task_params_schema();
1148        let props = &schema["properties"];
1149
1150        assert!(props["agent"]["description"].is_string());
1151        assert!(props["description"]["description"].is_string());
1152        assert!(props["prompt"]["description"].is_string());
1153        assert!(props["background"]["description"].is_string());
1154        assert!(props["max_steps"]["description"].is_string());
1155    }
1156
1157    #[test]
1158    fn test_task_params_default_background() {
1159        let params = TaskParams {
1160            agent: "explore".to_string(),
1161            description: "Test".to_string(),
1162            prompt: "Test prompt".to_string(),
1163            background: false,
1164            max_steps: None,
1165        };
1166        assert!(!params.background);
1167    }
1168
1169    #[test]
1170    fn test_task_params_serialize_skip_none() {
1171        let params = TaskParams {
1172            agent: "explore".to_string(),
1173            description: "Test".to_string(),
1174            prompt: "Test prompt".to_string(),
1175            background: false,
1176            max_steps: None,
1177        };
1178        let json = serde_json::to_string(&params).unwrap();
1179        // max_steps should not appear when None
1180        assert!(!json.contains("max_steps"));
1181    }
1182
1183    #[test]
1184    fn test_task_params_serialize_with_max_steps() {
1185        let params = TaskParams {
1186            agent: "explore".to_string(),
1187            description: "Test".to_string(),
1188            prompt: "Test prompt".to_string(),
1189            background: false,
1190            max_steps: Some(15),
1191        };
1192        let json = serde_json::to_string(&params).unwrap();
1193        assert!(json.contains("max_steps"));
1194        assert!(json.contains("15"));
1195    }
1196
1197    #[test]
1198    fn test_task_result_success_true() {
1199        let result = TaskResult {
1200            output: "Success".to_string(),
1201            session_id: "sess-1".to_string(),
1202            agent: "explore".to_string(),
1203            success: true,
1204            task_id: "task-1".to_string(),
1205        };
1206        assert!(result.success);
1207    }
1208
1209    #[test]
1210    fn test_task_result_success_false() {
1211        let result = TaskResult {
1212            output: "Failed".to_string(),
1213            session_id: "sess-1".to_string(),
1214            agent: "explore".to_string(),
1215            success: false,
1216            task_id: "task-1".to_string(),
1217        };
1218        assert!(!result.success);
1219    }
1220
1221    #[test]
1222    fn test_task_params_empty_strings() {
1223        let params = TaskParams {
1224            agent: "".to_string(),
1225            description: "".to_string(),
1226            prompt: "".to_string(),
1227            background: false,
1228            max_steps: None,
1229        };
1230        let json = serde_json::to_string(&params).unwrap();
1231        let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1232        assert_eq!(deserialized.agent, "");
1233        assert_eq!(deserialized.description, "");
1234        assert_eq!(deserialized.prompt, "");
1235    }
1236
1237    #[test]
1238    fn test_task_result_empty_output() {
1239        let result = TaskResult {
1240            output: "".to_string(),
1241            session_id: "sess-1".to_string(),
1242            agent: "explore".to_string(),
1243            success: true,
1244            task_id: "task-1".to_string(),
1245        };
1246        assert_eq!(result.output, "");
1247    }
1248
1249    #[test]
1250    fn test_task_params_debug_format() {
1251        let params = TaskParams {
1252            agent: "explore".to_string(),
1253            description: "Test".to_string(),
1254            prompt: "Test prompt".to_string(),
1255            background: false,
1256            max_steps: None,
1257        };
1258        let debug_str = format!("{:?}", params);
1259        assert!(debug_str.contains("explore"));
1260        assert!(debug_str.contains("Test"));
1261    }
1262
1263    #[test]
1264    fn test_task_result_debug_format() {
1265        let result = TaskResult {
1266            output: "Output".to_string(),
1267            session_id: "sess-1".to_string(),
1268            agent: "explore".to_string(),
1269            success: true,
1270            task_id: "task-1".to_string(),
1271        };
1272        let debug_str = format!("{:?}", result);
1273        assert!(debug_str.contains("Output"));
1274        assert!(debug_str.contains("explore"));
1275    }
1276
1277    #[test]
1278    fn test_task_params_roundtrip() {
1279        let original = TaskParams {
1280            agent: "general".to_string(),
1281            description: "Roundtrip test".to_string(),
1282            prompt: "Test roundtrip serialization".to_string(),
1283            background: true,
1284            max_steps: Some(42),
1285        };
1286        let json = serde_json::to_string(&original).unwrap();
1287        let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1288        assert_eq!(original.agent, deserialized.agent);
1289        assert_eq!(original.description, deserialized.description);
1290        assert_eq!(original.prompt, deserialized.prompt);
1291        assert_eq!(original.background, deserialized.background);
1292        assert_eq!(original.max_steps, deserialized.max_steps);
1293    }
1294
1295    #[test]
1296    fn test_task_result_roundtrip() {
1297        let original = TaskResult {
1298            output: "Roundtrip output".to_string(),
1299            session_id: "sess-roundtrip".to_string(),
1300            agent: "plan".to_string(),
1301            success: false,
1302            task_id: "task-roundtrip".to_string(),
1303        };
1304        let json = serde_json::to_string(&original).unwrap();
1305        let deserialized: TaskResult = serde_json::from_str(&json).unwrap();
1306        assert_eq!(original.output, deserialized.output);
1307        assert_eq!(original.session_id, deserialized.session_id);
1308        assert_eq!(original.agent, deserialized.agent);
1309        assert_eq!(original.success, deserialized.success);
1310        assert_eq!(original.task_id, deserialized.task_id);
1311    }
1312
1313    #[test]
1314    fn test_parallel_task_params_deserialize() {
1315        let json = r#"{
1316            "tasks": [
1317                { "agent": "explore", "description": "Find auth", "prompt": "Search auth files" },
1318                { "agent": "general", "description": "Fix bug", "prompt": "Fix the login bug" }
1319            ]
1320        }"#;
1321
1322        let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1323        assert_eq!(params.tasks.len(), 2);
1324        assert_eq!(params.tasks[0].agent, "explore");
1325        assert_eq!(params.tasks[1].agent, "general");
1326    }
1327
1328    #[test]
1329    fn test_parallel_task_params_single_task() {
1330        let json = r#"{
1331            "tasks": [
1332                { "agent": "plan", "description": "Plan work", "prompt": "Create a plan" }
1333            ]
1334        }"#;
1335
1336        let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1337        assert_eq!(params.tasks.len(), 1);
1338    }
1339
1340    #[test]
1341    fn test_parallel_task_params_empty_tasks() {
1342        let json = r#"{ "tasks": [] }"#;
1343        let params: ParallelTaskParams = serde_json::from_str(json).unwrap();
1344        assert!(params.tasks.is_empty());
1345    }
1346
1347    #[test]
1348    fn test_parallel_task_params_missing_tasks() {
1349        let json = r#"{}"#;
1350        let result: Result<ParallelTaskParams, _> = serde_json::from_str(json);
1351        assert!(result.is_err());
1352    }
1353
1354    #[test]
1355    fn test_parallel_task_params_serialize() {
1356        let params = ParallelTaskParams {
1357            tasks: vec![
1358                TaskParams {
1359                    agent: "explore".to_string(),
1360                    description: "Task 1".to_string(),
1361                    prompt: "Prompt 1".to_string(),
1362                    background: false,
1363                    max_steps: None,
1364                },
1365                TaskParams {
1366                    agent: "general".to_string(),
1367                    description: "Task 2".to_string(),
1368                    prompt: "Prompt 2".to_string(),
1369                    background: false,
1370                    max_steps: Some(10),
1371                },
1372            ],
1373        };
1374        let json = serde_json::to_string(&params).unwrap();
1375        assert!(json.contains("explore"));
1376        assert!(json.contains("general"));
1377        assert!(json.contains("Prompt 1"));
1378        assert!(json.contains("Prompt 2"));
1379    }
1380
1381    #[test]
1382    fn test_parallel_task_params_roundtrip() {
1383        let original = ParallelTaskParams {
1384            tasks: vec![
1385                TaskParams {
1386                    agent: "explore".to_string(),
1387                    description: "Explore".to_string(),
1388                    prompt: "Find files".to_string(),
1389                    background: false,
1390                    max_steps: None,
1391                },
1392                TaskParams {
1393                    agent: "plan".to_string(),
1394                    description: "Plan".to_string(),
1395                    prompt: "Make plan".to_string(),
1396                    background: false,
1397                    max_steps: Some(5),
1398                },
1399            ],
1400        };
1401        let json = serde_json::to_string(&original).unwrap();
1402        let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1403        assert_eq!(original.tasks.len(), deserialized.tasks.len());
1404        assert_eq!(original.tasks[0].agent, deserialized.tasks[0].agent);
1405        assert_eq!(original.tasks[1].agent, deserialized.tasks[1].agent);
1406        assert_eq!(original.tasks[1].max_steps, deserialized.tasks[1].max_steps);
1407    }
1408
1409    #[test]
1410    fn test_parallel_task_params_clone() {
1411        let params = ParallelTaskParams {
1412            tasks: vec![TaskParams {
1413                agent: "explore".to_string(),
1414                description: "Test".to_string(),
1415                prompt: "Prompt".to_string(),
1416                background: false,
1417                max_steps: None,
1418            }],
1419        };
1420        let cloned = params.clone();
1421        assert_eq!(params.tasks.len(), cloned.tasks.len());
1422        assert_eq!(params.tasks[0].agent, cloned.tasks[0].agent);
1423    }
1424
1425    #[test]
1426    fn test_parallel_task_params_schema() {
1427        let schema = parallel_task_params_schema();
1428        assert_eq!(schema["type"], "object");
1429        assert_eq!(schema["additionalProperties"], false);
1430        assert!(schema["properties"]["tasks"].is_object());
1431        assert_eq!(schema["properties"]["tasks"]["type"], "array");
1432        assert_eq!(schema["properties"]["tasks"]["minItems"], 1);
1433    }
1434
1435    #[test]
1436    fn test_parallel_task_params_schema_required() {
1437        let schema = parallel_task_params_schema();
1438        let required = schema["required"].as_array().unwrap();
1439        assert!(required.contains(&serde_json::json!("tasks")));
1440    }
1441
1442    #[test]
1443    fn test_parallel_task_params_schema_items() {
1444        let schema = parallel_task_params_schema();
1445        let items = &schema["properties"]["tasks"]["items"];
1446        assert_eq!(items["type"], "object");
1447        assert_eq!(items["additionalProperties"], false);
1448        let item_required = items["required"].as_array().unwrap();
1449        assert!(item_required.contains(&serde_json::json!("agent")));
1450        assert!(item_required.contains(&serde_json::json!("description")));
1451        assert!(item_required.contains(&serde_json::json!("prompt")));
1452    }
1453
1454    #[test]
1455    fn test_task_schema_examples_use_delegation_core() {
1456        let task = task_params_schema();
1457        let task_examples = task["examples"].as_array().unwrap();
1458        assert_eq!(task_examples[0]["agent"], "explore");
1459        assert!(task_examples[0].get("task").is_none());
1460
1461        let parallel = parallel_task_params_schema();
1462        let parallel_examples = parallel["examples"].as_array().unwrap();
1463        assert!(!parallel_examples[0]["tasks"].as_array().unwrap().is_empty());
1464    }
1465
1466    #[test]
1467    fn test_parallel_task_params_debug() {
1468        let params = ParallelTaskParams {
1469            tasks: vec![TaskParams {
1470                agent: "explore".to_string(),
1471                description: "Debug test".to_string(),
1472                prompt: "Test".to_string(),
1473                background: false,
1474                max_steps: None,
1475            }],
1476        };
1477        let debug_str = format!("{:?}", params);
1478        assert!(debug_str.contains("explore"));
1479        assert!(debug_str.contains("Debug test"));
1480    }
1481
1482    #[test]
1483    fn test_parallel_task_params_large_count() {
1484        // Validate that ParallelTaskParams can hold 150 tasks without truncation
1485        let tasks: Vec<TaskParams> = (0..150)
1486            .map(|i| TaskParams {
1487                agent: "explore".to_string(),
1488                description: format!("Task {}", i),
1489                prompt: format!("Prompt for task {}", i),
1490                background: false,
1491                max_steps: Some(10),
1492            })
1493            .collect();
1494
1495        let params = ParallelTaskParams { tasks };
1496        let json = serde_json::to_string(&params).unwrap();
1497        let deserialized: ParallelTaskParams = serde_json::from_str(&json).unwrap();
1498        assert_eq!(deserialized.tasks.len(), 150);
1499        assert_eq!(deserialized.tasks[0].description, "Task 0");
1500        assert_eq!(deserialized.tasks[149].description, "Task 149");
1501    }
1502
1503    #[test]
1504    fn test_task_params_max_steps_zero() {
1505        // max_steps = 0 is a valid edge case (callers decide enforcement)
1506        let params = TaskParams {
1507            agent: "explore".to_string(),
1508            description: "Edge case".to_string(),
1509            prompt: "Zero steps".to_string(),
1510            background: false,
1511            max_steps: Some(0),
1512        };
1513        let json = serde_json::to_string(&params).unwrap();
1514        let deserialized: TaskParams = serde_json::from_str(&json).unwrap();
1515        assert_eq!(deserialized.max_steps, Some(0));
1516    }
1517
1518    #[test]
1519    fn test_parallel_task_params_all_background() {
1520        let tasks: Vec<TaskParams> = (0..5)
1521            .map(|i| TaskParams {
1522                agent: "general".to_string(),
1523                description: format!("BG task {}", i),
1524                prompt: "Run in background".to_string(),
1525                background: true,
1526                max_steps: None,
1527            })
1528            .collect();
1529        let params = ParallelTaskParams { tasks };
1530        for task in &params.tasks {
1531            assert!(task.background);
1532        }
1533    }
1534
1535    #[test]
1536    fn test_task_params_rejects_permissive_field() {
1537        let json = r#"{
1538            "agent": "general",
1539            "description": "Legacy field rejection",
1540            "prompt": "Verify legacy fields are rejected",
1541            "permissive": true
1542        }"#;
1543
1544        let result: Result<TaskParams, _> = serde_json::from_str(json);
1545        assert!(result.is_err());
1546    }
1547
1548    #[test]
1549    fn test_task_params_schema_hides_permissive_field() {
1550        let schema = task_params_schema();
1551        let props = &schema["properties"];
1552
1553        assert!(props.get("permissive").is_none());
1554    }
1555
1556    // ========================================================================
1557    // Contract tests — verify task delegation with MockLlmClient (no network)
1558    // ========================================================================
1559
1560    use crate::agent::tests::MockLlmClient;
1561    use crate::llm::{ContentBlock, LlmResponse, Message, StreamEvent, TokenUsage, ToolDefinition};
1562    use crate::permissions::PermissionPolicy;
1563    use crate::subagent::AgentRegistry;
1564    use std::sync::atomic::{AtomicUsize, Ordering};
1565    use std::time::Duration;
1566    use tokio::sync::{mpsc, Barrier};
1567
1568    fn text_response(text: impl Into<String>) -> LlmResponse {
1569        LlmResponse {
1570            message: Message {
1571                role: "assistant".to_string(),
1572                content: vec![ContentBlock::Text { text: text.into() }],
1573                reasoning_content: None,
1574            },
1575            usage: TokenUsage {
1576                prompt_tokens: 10,
1577                completion_tokens: 5,
1578                total_tokens: 15,
1579                cache_read_tokens: None,
1580                cache_write_tokens: None,
1581            },
1582            stop_reason: Some("end_turn".to_string()),
1583            meta: None,
1584        }
1585    }
1586
1587    fn pre_analysis_response(messages: &[Message]) -> LlmResponse {
1588        let prompt = last_text(messages);
1589        let response = serde_json::json!({
1590            "intent": "GeneralPurpose",
1591            "requires_planning": false,
1592            "goal": {
1593                "description": prompt,
1594                "success_criteria": []
1595            },
1596            "execution_plan": {
1597                "complexity": "Simple",
1598                "steps": [{
1599                    "id": "step-1",
1600                    "description": prompt,
1601                    "tool": null,
1602                    "dependencies": [],
1603                    "success_criteria": "Complete the request"
1604                }],
1605                "required_tools": []
1606            },
1607            "optimized_input": prompt
1608        });
1609        text_response(response.to_string())
1610    }
1611
1612    fn last_text(messages: &[Message]) -> String {
1613        messages
1614            .last()
1615            .and_then(|message| {
1616                message.content.iter().find_map(|block| {
1617                    if let ContentBlock::Text { text } = block {
1618                        Some(text.clone())
1619                    } else {
1620                        None
1621                    }
1622                })
1623            })
1624            .unwrap_or_default()
1625    }
1626
1627    /// Client for the schema-coercion tests. The agent's own turn returns
1628    /// plain text (which ends the loop); the structured-output coercion call
1629    /// — recognizable by the injected `step_output` tool — returns a tool call
1630    /// carrying the object.
1631    struct SchemaCoercionClient;
1632
1633    #[async_trait::async_trait]
1634    impl LlmClient for SchemaCoercionClient {
1635        async fn complete(
1636            &self,
1637            messages: &[Message],
1638            system: Option<&str>,
1639            tools: &[ToolDefinition],
1640        ) -> Result<LlmResponse> {
1641            if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1642                return Ok(pre_analysis_response(messages));
1643            }
1644            // The structured-output coercion injects a synthetic tool named
1645            // `emit_<schema_name>` (here `emit_step_output`).
1646            if tools.iter().any(|t| t.name == "emit_step_output") {
1647                return Ok(MockLlmClient::tool_call_response(
1648                    "coerce-1",
1649                    "emit_step_output",
1650                    serde_json::json!({ "verdict": "ok" }),
1651                ));
1652            }
1653            Ok(text_response("The verdict is ok."))
1654        }
1655
1656        async fn complete_streaming(
1657            &self,
1658            _messages: &[Message],
1659            _system: Option<&str>,
1660            _tools: &[ToolDefinition],
1661            _cancel_token: tokio_util::sync::CancellationToken,
1662        ) -> Result<mpsc::Receiver<StreamEvent>> {
1663            anyhow::bail!("streaming is not used by schema coercion tests")
1664        }
1665    }
1666
1667    fn verdict_schema() -> serde_json::Value {
1668        serde_json::json!({
1669            "type": "object",
1670            "properties": { "verdict": { "type": "string" } },
1671            "required": ["verdict"]
1672        })
1673    }
1674
1675    #[tokio::test]
1676    async fn execute_step_with_schema_coerces_structured_output() {
1677        let workspace = tempfile::tempdir().unwrap();
1678        let executor = TaskExecutor::new(
1679            Arc::new(AgentRegistry::new()),
1680            Arc::new(SchemaCoercionClient),
1681            workspace.path().to_string_lossy().to_string(),
1682        );
1683        let spec = AgentStepSpec::new("step-1", "general", "assess", "Assess the thing.")
1684            .with_output_schema(verdict_schema());
1685
1686        let outcome = executor.execute_step(spec, None).await;
1687
1688        assert!(outcome.success, "step should succeed: {}", outcome.output);
1689        assert_eq!(
1690            outcome.structured,
1691            Some(serde_json::json!({ "verdict": "ok" })),
1692            "a schema'd step returns the validated object in `structured`"
1693        );
1694    }
1695
1696    #[tokio::test]
1697    async fn execute_step_without_schema_has_no_structured_output() {
1698        let workspace = tempfile::tempdir().unwrap();
1699        let executor = TaskExecutor::new(
1700            Arc::new(AgentRegistry::new()),
1701            Arc::new(SchemaCoercionClient),
1702            workspace.path().to_string_lossy().to_string(),
1703        );
1704        let spec = AgentStepSpec::new("step-2", "general", "assess", "Assess the thing.");
1705
1706        let outcome = executor.execute_step(spec, None).await;
1707
1708        assert!(outcome.success, "step should succeed: {}", outcome.output);
1709        assert_eq!(
1710            outcome.structured, None,
1711            "no schema requested → no structured output, no coercion call"
1712        );
1713    }
1714
1715    /// The agent's turn returns text; the coercion call (`emit_step_output`)
1716    /// always returns an object that VIOLATES the schema, so `generate_blocking`
1717    /// exhausts its repairs and bails.
1718    struct SchemaFailClient;
1719
1720    #[async_trait::async_trait]
1721    impl LlmClient for SchemaFailClient {
1722        async fn complete(
1723            &self,
1724            messages: &[Message],
1725            system: Option<&str>,
1726            tools: &[ToolDefinition],
1727        ) -> Result<LlmResponse> {
1728            if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1729                return Ok(pre_analysis_response(messages));
1730            }
1731            if tools.iter().any(|t| t.name == "emit_step_output") {
1732                // `{}` is missing the required `verdict` field → schema invalid.
1733                return Ok(MockLlmClient::tool_call_response(
1734                    "coerce-fail",
1735                    "emit_step_output",
1736                    serde_json::json!({}),
1737                ));
1738            }
1739            Ok(text_response("some answer"))
1740        }
1741
1742        async fn complete_streaming(
1743            &self,
1744            _messages: &[Message],
1745            _system: Option<&str>,
1746            _tools: &[ToolDefinition],
1747            _cancel_token: tokio_util::sync::CancellationToken,
1748        ) -> Result<mpsc::Receiver<StreamEvent>> {
1749            anyhow::bail!("streaming unused")
1750        }
1751    }
1752
1753    #[tokio::test]
1754    async fn execute_step_with_schema_demotes_step_on_coercion_failure() {
1755        let workspace = tempfile::tempdir().unwrap();
1756        let executor = TaskExecutor::new(
1757            Arc::new(AgentRegistry::new()),
1758            Arc::new(SchemaFailClient),
1759            workspace.path().to_string_lossy().to_string(),
1760        );
1761        let spec = AgentStepSpec::new("step-x", "general", "assess", "Assess the thing.")
1762            .with_output_schema(verdict_schema());
1763
1764        let outcome = executor.execute_step(spec, None).await;
1765
1766        assert!(
1767            !outcome.success,
1768            "a step whose output can't satisfy the schema is demoted to failure"
1769        );
1770        assert_eq!(outcome.structured, None, "no validated object on failure");
1771        assert!(
1772            outcome.output.contains("[structured output failed"),
1773            "the demotion marker is appended: {}",
1774            outcome.output
1775        );
1776    }
1777
1778    #[tokio::test]
1779    async fn parallel_isolates_schema_coercion_failure_from_sibling() {
1780        let workspace = tempfile::tempdir().unwrap();
1781        let executor: Arc<dyn AgentExecutor> = Arc::new(TaskExecutor::new(
1782            Arc::new(AgentRegistry::new()),
1783            Arc::new(SchemaFailClient),
1784            workspace.path().to_string_lossy().to_string(),
1785        ));
1786        // A plain step (no schema → succeeds) alongside a schema'd step whose
1787        // coercion fails. The failure must not drop or fail the sibling.
1788        let specs = vec![
1789            AgentStepSpec::new("plain", "general", "d", "p"),
1790            AgentStepSpec::new("schemad", "general", "d", "p").with_output_schema(verdict_schema()),
1791        ];
1792        let out = crate::orchestration::execute_steps_parallel(executor, specs, None).await;
1793
1794        assert_eq!(out.len(), 2);
1795        assert_eq!(out[0].task_id, "plain");
1796        assert!(out[0].success, "no-schema sibling unaffected");
1797        assert_eq!(out[0].structured, None);
1798        assert_eq!(out[1].task_id, "schemad");
1799        assert!(!out[1].success, "schema-failing step surfaces as failure");
1800        assert_eq!(out[1].structured, None);
1801        assert!(out[1].output.contains("[structured output failed"));
1802    }
1803
1804    #[tokio::test]
1805    async fn failed_step_with_schema_skips_coercion() {
1806        let workspace = tempfile::tempdir().unwrap();
1807        let executor = TaskExecutor::new(
1808            Arc::new(AgentRegistry::new()),
1809            Arc::new(SchemaCoercionClient),
1810            workspace.path().to_string_lossy().to_string(),
1811        );
1812        // Unknown agent → the run fails BEFORE coercion. The failure is the
1813        // run error, not a coercion failure — coercion must not run.
1814        let spec = AgentStepSpec::new("step-y", "no-such-agent", "d", "p")
1815            .with_output_schema(verdict_schema());
1816
1817        let outcome = executor.execute_step(spec, None).await;
1818
1819        assert!(!outcome.success);
1820        assert_eq!(outcome.structured, None);
1821        assert!(
1822            !outcome.output.contains("[structured output failed"),
1823            "coercion never ran — failure is the run error, not a coercion failure: {}",
1824            outcome.output
1825        );
1826    }
1827
1828    struct StaticLlmClient {
1829        text: String,
1830    }
1831
1832    impl StaticLlmClient {
1833        fn new(text: impl Into<String>) -> Self {
1834            Self { text: text.into() }
1835        }
1836    }
1837
1838    #[async_trait::async_trait]
1839    impl LlmClient for StaticLlmClient {
1840        async fn complete(
1841            &self,
1842            messages: &[Message],
1843            system: Option<&str>,
1844            _tools: &[ToolDefinition],
1845        ) -> Result<LlmResponse> {
1846            if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1847                return Ok(pre_analysis_response(messages));
1848            }
1849            Ok(text_response(self.text.clone()))
1850        }
1851
1852        async fn complete_streaming(
1853            &self,
1854            _messages: &[Message],
1855            _system: Option<&str>,
1856            _tools: &[ToolDefinition],
1857            _cancel_token: tokio_util::sync::CancellationToken,
1858        ) -> Result<mpsc::Receiver<StreamEvent>> {
1859            anyhow::bail!("streaming is not used by task executor tests")
1860        }
1861    }
1862
1863    struct ConcurrentLlmClient {
1864        barrier: Arc<Barrier>,
1865        active: AtomicUsize,
1866        max_active: AtomicUsize,
1867    }
1868
1869    impl ConcurrentLlmClient {
1870        fn new(task_count: usize) -> Self {
1871            Self {
1872                barrier: Arc::new(Barrier::new(task_count)),
1873                active: AtomicUsize::new(0),
1874                max_active: AtomicUsize::new(0),
1875            }
1876        }
1877
1878        fn max_active(&self) -> usize {
1879            self.max_active.load(Ordering::SeqCst)
1880        }
1881
1882        fn record_active(&self) {
1883            let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1884            let mut observed = self.max_active.load(Ordering::SeqCst);
1885            while active > observed {
1886                match self.max_active.compare_exchange(
1887                    observed,
1888                    active,
1889                    Ordering::SeqCst,
1890                    Ordering::SeqCst,
1891                ) {
1892                    Ok(_) => break,
1893                    Err(next) => observed = next,
1894                }
1895            }
1896        }
1897    }
1898
1899    struct LimitedConcurrencyLlmClient {
1900        active: AtomicUsize,
1901        max_active: AtomicUsize,
1902    }
1903
1904    impl LimitedConcurrencyLlmClient {
1905        fn new() -> Self {
1906            Self {
1907                active: AtomicUsize::new(0),
1908                max_active: AtomicUsize::new(0),
1909            }
1910        }
1911
1912        fn max_active(&self) -> usize {
1913            self.max_active.load(Ordering::SeqCst)
1914        }
1915
1916        fn record_active(&self) {
1917            let active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
1918            self.max_active.fetch_max(active, Ordering::SeqCst);
1919        }
1920    }
1921
1922    #[async_trait::async_trait]
1923    impl LlmClient for LimitedConcurrencyLlmClient {
1924        async fn complete(
1925            &self,
1926            messages: &[Message],
1927            system: Option<&str>,
1928            _tools: &[ToolDefinition],
1929        ) -> Result<LlmResponse> {
1930            if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1931                return Ok(pre_analysis_response(messages));
1932            }
1933
1934            let prompt = last_text(messages);
1935            self.record_active();
1936            tokio::time::sleep(Duration::from_millis(40)).await;
1937            self.active.fetch_sub(1, Ordering::SeqCst);
1938            Ok(text_response(format!("completed: {prompt}")))
1939        }
1940
1941        async fn complete_streaming(
1942            &self,
1943            _messages: &[Message],
1944            _system: Option<&str>,
1945            _tools: &[ToolDefinition],
1946            _cancel_token: tokio_util::sync::CancellationToken,
1947        ) -> Result<mpsc::Receiver<StreamEvent>> {
1948            anyhow::bail!("streaming is not used by task executor tests")
1949        }
1950    }
1951
1952    #[async_trait::async_trait]
1953    impl LlmClient for ConcurrentLlmClient {
1954        async fn complete(
1955            &self,
1956            messages: &[Message],
1957            system: Option<&str>,
1958            _tools: &[ToolDefinition],
1959        ) -> Result<LlmResponse> {
1960            if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) {
1961                return Ok(pre_analysis_response(messages));
1962            }
1963
1964            let prompt = last_text(messages);
1965            self.record_active();
1966            self.barrier.wait().await;
1967            if prompt.contains("slow") {
1968                tokio::time::sleep(Duration::from_millis(120)).await;
1969            } else {
1970                tokio::time::sleep(Duration::from_millis(10)).await;
1971            }
1972            self.active.fetch_sub(1, Ordering::SeqCst);
1973            Ok(text_response(format!("completed: {prompt}")))
1974        }
1975
1976        async fn complete_streaming(
1977            &self,
1978            _messages: &[Message],
1979            _system: Option<&str>,
1980            _tools: &[ToolDefinition],
1981            _cancel_token: tokio_util::sync::CancellationToken,
1982        ) -> Result<mpsc::Receiver<StreamEvent>> {
1983            anyhow::bail!("streaming is not used by task executor tests")
1984        }
1985    }
1986
1987    fn test_registry_with_writer() -> Arc<AgentRegistry> {
1988        let registry = AgentRegistry::new();
1989        let spec = crate::subagent::WorkerAgentSpec::custom("writer", "Write files")
1990            .with_permissions(PermissionPolicy::new().allow("write(*)").allow("read(*)"))
1991            .with_prompt("Write files when asked.")
1992            .with_max_steps(3);
1993        registry.register(spec.into_agent_definition());
1994        Arc::new(registry)
1995    }
1996
1997    fn test_registry_with_text_worker() -> Arc<AgentRegistry> {
1998        let registry = AgentRegistry::new();
1999        let spec = crate::subagent::WorkerAgentSpec::custom("worker", "Text worker")
2000            .with_prompt("Return a concise result.")
2001            .with_max_steps(1);
2002        registry.register(spec.into_agent_definition());
2003        Arc::new(registry)
2004    }
2005
2006    #[tokio::test]
2007    async fn task_child_run_permission_allow() {
2008        let workspace = tempfile::tempdir().unwrap();
2009        let mock = Arc::new(MockLlmClient::new(vec![
2010            MockLlmClient::tool_call_response(
2011                "t1",
2012                "write",
2013                serde_json::json!({
2014                    "file_path": workspace.path().join("out.txt").to_string_lossy(),
2015                    "content": "WRITTEN"
2016                }),
2017            ),
2018            MockLlmClient::text_response("Done."),
2019        ]));
2020
2021        let executor = TaskExecutor::new(
2022            test_registry_with_writer(),
2023            mock,
2024            workspace.path().to_string_lossy().to_string(),
2025        );
2026
2027        let result = executor
2028            .execute(
2029                TaskParams {
2030                    agent: "writer".to_string(),
2031                    description: "Write file".to_string(),
2032                    prompt: "Write out.txt".to_string(),
2033                    background: false,
2034                    max_steps: Some(3),
2035                },
2036                None,
2037                None,
2038            )
2039            .await
2040            .unwrap();
2041
2042        assert!(
2043            result.success,
2044            "child run should succeed: {}",
2045            result.output
2046        );
2047        assert!(
2048            !result.output.contains("Permission denied"),
2049            "no permission denial: {}",
2050            result.output
2051        );
2052        let content = std::fs::read_to_string(workspace.path().join("out.txt")).unwrap();
2053        assert_eq!(content, "WRITTEN");
2054    }
2055
2056    #[tokio::test]
2057    async fn task_child_run_permission_deny() {
2058        let workspace = tempfile::tempdir().unwrap();
2059        let registry = AgentRegistry::new();
2060        let spec = crate::subagent::WorkerAgentSpec::custom("restricted", "Restricted agent")
2061            .with_permissions(PermissionPolicy::new().allow("read(*)").deny("bash(*)"))
2062            .with_max_steps(3);
2063        registry.register(spec.into_agent_definition());
2064
2065        let mock = Arc::new(MockLlmClient::new(vec![
2066            MockLlmClient::tool_call_response(
2067                "t1",
2068                "bash",
2069                serde_json::json!({"command": "echo hello"}),
2070            ),
2071            MockLlmClient::text_response("Could not run bash."),
2072        ]));
2073
2074        let executor = TaskExecutor::new(
2075            Arc::new(registry),
2076            mock,
2077            workspace.path().to_string_lossy().to_string(),
2078        );
2079
2080        let result = executor
2081            .execute(
2082                TaskParams {
2083                    agent: "restricted".to_string(),
2084                    description: "Try bash".to_string(),
2085                    prompt: "Run echo hello".to_string(),
2086                    background: false,
2087                    max_steps: Some(3),
2088                },
2089                None,
2090                None,
2091            )
2092            .await
2093            .unwrap();
2094
2095        // The agent completes (LLM responds after denial), but bash was denied.
2096        // The denial is sent as a tool result to the LLM, which then responds.
2097        assert!(result.success, "agent should complete: {}", result.output);
2098    }
2099
2100    #[tokio::test]
2101    async fn task_child_run_confirmation_auto_approve() {
2102        let workspace = tempfile::tempdir().unwrap();
2103        let registry = AgentRegistry::new();
2104        // Agent with allow("read(*)") — write is not in allow list, so it returns Ask.
2105        // With AutoApproveConfirmation (default for agents with permissions), Ask → approve.
2106        let spec = crate::subagent::WorkerAgentSpec::custom("reader-writer", "Read and write")
2107            .with_permissions(PermissionPolicy::new().allow("read(*)"))
2108            .with_max_steps(3);
2109        registry.register(spec.into_agent_definition());
2110
2111        let mock = Arc::new(MockLlmClient::new(vec![
2112            MockLlmClient::tool_call_response(
2113                "t1",
2114                "write",
2115                serde_json::json!({
2116                    "file_path": workspace.path().join("auto.txt").to_string_lossy(),
2117                    "content": "AUTO_APPROVED"
2118                }),
2119            ),
2120            MockLlmClient::text_response("Written."),
2121        ]));
2122
2123        let executor = TaskExecutor::new(
2124            Arc::new(registry),
2125            mock,
2126            workspace.path().to_string_lossy().to_string(),
2127        );
2128
2129        let result = executor
2130            .execute(
2131                TaskParams {
2132                    agent: "reader-writer".to_string(),
2133                    description: "Write via auto-approve".to_string(),
2134                    prompt: "Write auto.txt".to_string(),
2135                    background: false,
2136                    max_steps: Some(3),
2137                },
2138                None,
2139                None,
2140            )
2141            .await
2142            .unwrap();
2143
2144        assert!(
2145            result.success,
2146            "Ask should be auto-approved: {}",
2147            result.output
2148        );
2149        assert!(
2150            !result.output.contains("MissingConfirmationManager"),
2151            "no MissingConfirmationManager: {}",
2152            result.output
2153        );
2154    }
2155
2156    #[tokio::test]
2157    async fn task_child_run_step_budget_enforced() {
2158        let workspace = tempfile::tempdir().unwrap();
2159        let mock = Arc::new(MockLlmClient::new(vec![
2160            MockLlmClient::tool_call_response(
2161                "t1",
2162                "read",
2163                serde_json::json!({"file_path": "/tmp/a.txt"}),
2164            ),
2165            MockLlmClient::tool_call_response(
2166                "t2",
2167                "read",
2168                serde_json::json!({"file_path": "/tmp/b.txt"}),
2169            ),
2170            MockLlmClient::tool_call_response(
2171                "t3",
2172                "read",
2173                serde_json::json!({"file_path": "/tmp/c.txt"}),
2174            ),
2175            MockLlmClient::text_response("Should not reach here."),
2176        ]));
2177
2178        let executor = TaskExecutor::new(
2179            test_registry_with_writer(),
2180            mock,
2181            workspace.path().to_string_lossy().to_string(),
2182        );
2183
2184        let result = executor
2185            .execute(
2186                TaskParams {
2187                    agent: "writer".to_string(),
2188                    description: "Exceed budget".to_string(),
2189                    prompt: "Read many files".to_string(),
2190                    background: false,
2191                    max_steps: Some(2),
2192                },
2193                None,
2194                None,
2195            )
2196            .await
2197            .unwrap();
2198
2199        // The agent should fail after exceeding 2 tool rounds
2200        assert!(
2201            !result.success,
2202            "should fail when exceeding step budget: {}",
2203            result.output
2204        );
2205        assert!(
2206            result.output.contains("Max tool rounds") || result.output.contains("max tool rounds"),
2207            "error should mention tool rounds: {}",
2208            result.output
2209        );
2210    }
2211
2212    #[tokio::test]
2213    async fn parallel_task_executor_runs_children_concurrently_and_preserves_input_order() {
2214        let workspace = tempfile::tempdir().unwrap();
2215        let client = Arc::new(ConcurrentLlmClient::new(2));
2216        let executor = Arc::new(TaskExecutor::new(
2217            test_registry_with_text_worker(),
2218            client.clone(),
2219            workspace.path().to_string_lossy().to_string(),
2220        ));
2221
2222        let tasks = vec![
2223            TaskParams {
2224                agent: "worker".to_string(),
2225                description: "Slow task".to_string(),
2226                prompt: "slow branch".to_string(),
2227                background: false,
2228                max_steps: Some(1),
2229            },
2230            TaskParams {
2231                agent: "worker".to_string(),
2232                description: "Fast task".to_string(),
2233                prompt: "fast branch".to_string(),
2234                background: false,
2235                max_steps: Some(1),
2236            },
2237        ];
2238
2239        let results = tokio::time::timeout(
2240            Duration::from_secs(2),
2241            executor.execute_parallel(tasks, None, None),
2242        )
2243        .await
2244        .expect("parallel children should reach the barrier and complete");
2245
2246        assert_eq!(results.len(), 2);
2247        assert!(
2248            client.max_active() >= 2,
2249            "expected concurrent child execution, max_active={}",
2250            client.max_active()
2251        );
2252        assert!(results[0].success);
2253        assert!(results[0].output.contains("slow branch"));
2254        assert!(results[1].success);
2255        assert!(results[1].output.contains("fast branch"));
2256    }
2257
2258    #[tokio::test]
2259    async fn parallel_task_executor_respects_configured_concurrency_limit() {
2260        let workspace = tempfile::tempdir().unwrap();
2261        let client = Arc::new(LimitedConcurrencyLlmClient::new());
2262        let executor = Arc::new(
2263            TaskExecutor::new(
2264                test_registry_with_text_worker(),
2265                client.clone(),
2266                workspace.path().to_string_lossy().to_string(),
2267            )
2268            .with_max_parallel_tasks(2),
2269        );
2270
2271        let tasks = (0..5)
2272            .map(|idx| TaskParams {
2273                agent: "worker".to_string(),
2274                description: format!("Task {idx}"),
2275                prompt: format!("branch {idx}"),
2276                background: false,
2277                max_steps: Some(1),
2278            })
2279            .collect::<Vec<_>>();
2280
2281        let results = executor.execute_parallel(tasks, None, None).await;
2282
2283        assert_eq!(results.len(), 5);
2284        assert!(results.iter().all(|result| result.success));
2285        assert_eq!(client.max_active(), 2);
2286    }
2287
2288    #[tokio::test]
2289    async fn parallel_task_executor_isolates_unknown_agent_failure() {
2290        let workspace = tempfile::tempdir().unwrap();
2291        let executor = Arc::new(TaskExecutor::new(
2292            test_registry_with_text_worker(),
2293            Arc::new(StaticLlmClient::new("valid branch done")),
2294            workspace.path().to_string_lossy().to_string(),
2295        ));
2296
2297        let tasks = vec![
2298            TaskParams {
2299                agent: "missing-agent".to_string(),
2300                description: "Missing".to_string(),
2301                prompt: "should fail".to_string(),
2302                background: false,
2303                max_steps: Some(1),
2304            },
2305            TaskParams {
2306                agent: "worker".to_string(),
2307                description: "Valid".to_string(),
2308                prompt: "should succeed".to_string(),
2309                background: false,
2310                max_steps: Some(1),
2311            },
2312        ];
2313
2314        let results = executor.execute_parallel(tasks, None, None).await;
2315
2316        assert_eq!(results.len(), 2);
2317        assert!(!results[0].success);
2318        assert_eq!(results[0].agent, "missing-agent");
2319        assert!(results[0].output.contains("Unknown agent type"));
2320        assert!(results[1].success);
2321        assert_eq!(results[1].agent, "worker");
2322        assert!(results[1].output.contains("valid branch done"));
2323    }
2324
2325    #[tokio::test]
2326    async fn parallel_task_executor_emits_subagent_events_for_each_child() {
2327        let workspace = tempfile::tempdir().unwrap();
2328        let executor = Arc::new(TaskExecutor::new(
2329            test_registry_with_text_worker(),
2330            Arc::new(StaticLlmClient::new("done")),
2331            workspace.path().to_string_lossy().to_string(),
2332        ));
2333        let (tx, mut rx) = broadcast::channel(64);
2334
2335        let tasks = vec![
2336            TaskParams {
2337                agent: "worker".to_string(),
2338                description: "One".to_string(),
2339                prompt: "first".to_string(),
2340                background: false,
2341                max_steps: Some(1),
2342            },
2343            TaskParams {
2344                agent: "worker".to_string(),
2345                description: "Two".to_string(),
2346                prompt: "second".to_string(),
2347                background: false,
2348                max_steps: Some(1),
2349            },
2350        ];
2351
2352        let results = executor.execute_parallel(tasks, Some(tx), None).await;
2353        assert_eq!(results.len(), 2);
2354        tokio::time::sleep(Duration::from_millis(20)).await;
2355
2356        let mut starts = Vec::new();
2357        let mut ends = Vec::new();
2358        let mut progress_statuses: Vec<String> = Vec::new();
2359        while let Ok(event) = rx.try_recv() {
2360            match event {
2361                AgentEvent::SubagentStart { description, .. } => starts.push(description),
2362                AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)),
2363                AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status),
2364                _ => {}
2365            }
2366        }
2367
2368        starts.sort();
2369        assert_eq!(starts, vec!["One".to_string(), "Two".to_string()]);
2370        assert_eq!(ends.len(), 2);
2371        assert!(ends
2372            .iter()
2373            .all(|(agent, success)| agent == "worker" && *success));
2374        // Each child loop emits at least one TurnEnd, so we expect at least
2375        // two synthesized turn_completed progress events across the run.
2376        assert!(
2377            progress_statuses
2378                .iter()
2379                .filter(|s| s == &"turn_completed")
2380                .count()
2381                >= 2,
2382            "expected at least two turn_completed progress events, got {:?}",
2383            progress_statuses
2384        );
2385    }
2386
2387    #[tokio::test]
2388    async fn parallel_task_tool_reports_error_when_any_child_fails() {
2389        let workspace = tempfile::tempdir().unwrap();
2390        let executor = Arc::new(TaskExecutor::new(
2391            test_registry_with_text_worker(),
2392            Arc::new(StaticLlmClient::new("valid branch done")),
2393            workspace.path().to_string_lossy().to_string(),
2394        ));
2395        let tool = ParallelTaskTool::new(executor);
2396        let ctx = ToolContext::new(workspace.path().to_path_buf());
2397
2398        let output = tool
2399            .execute(
2400                &serde_json::json!({
2401                    "tasks": [
2402                        {
2403                            "agent": "missing-agent",
2404                            "description": "Missing",
2405                            "prompt": "should fail"
2406                        },
2407                        {
2408                            "agent": "worker",
2409                            "description": "Valid",
2410                            "prompt": "should succeed"
2411                        }
2412                    ]
2413                }),
2414                &ctx,
2415            )
2416            .await
2417            .unwrap();
2418
2419        assert!(
2420            !output.success,
2421            "parallel_task should fail when any child result fails"
2422        );
2423        assert!(output.content.contains("[ERR]"));
2424        assert!(output.content.contains("[OK]"));
2425        let metadata = output.metadata.expect("metadata");
2426        assert_eq!(metadata["task_count"], 2);
2427        assert_eq!(metadata["results"][0]["success"], false);
2428        assert_eq!(metadata["results"][1]["success"], true);
2429    }
2430
2431    #[tokio::test]
2432    async fn parallel_task_both_inherit_permissions() {
2433        let workspace = tempfile::tempdir().unwrap();
2434        let mock = Arc::new(MockLlmClient::new(vec![
2435            // Task 1 responses
2436            MockLlmClient::tool_call_response(
2437                "t1",
2438                "write",
2439                serde_json::json!({
2440                    "file_path": workspace.path().join("p1.txt").to_string_lossy(),
2441                    "content": "P1"
2442                }),
2443            ),
2444            MockLlmClient::text_response("Done 1."),
2445            // Task 2 responses
2446            MockLlmClient::tool_call_response(
2447                "t2",
2448                "write",
2449                serde_json::json!({
2450                    "file_path": workspace.path().join("p2.txt").to_string_lossy(),
2451                    "content": "P2"
2452                }),
2453            ),
2454            MockLlmClient::text_response("Done 2."),
2455        ]));
2456
2457        let executor = Arc::new(TaskExecutor::new(
2458            test_registry_with_writer(),
2459            mock,
2460            workspace.path().to_string_lossy().to_string(),
2461        ));
2462
2463        let tasks = vec![
2464            TaskParams {
2465                agent: "writer".to_string(),
2466                description: "Write p1".to_string(),
2467                prompt: "Write p1.txt".to_string(),
2468                background: false,
2469                max_steps: Some(3),
2470            },
2471            TaskParams {
2472                agent: "writer".to_string(),
2473                description: "Write p2".to_string(),
2474                prompt: "Write p2.txt".to_string(),
2475                background: false,
2476                max_steps: Some(3),
2477            },
2478        ];
2479
2480        let results = executor.execute_parallel(tasks, None, None).await;
2481        assert_eq!(results.len(), 2);
2482
2483        for result in &results {
2484            assert!(
2485                result.success,
2486                "parallel child should succeed: {}",
2487                result.output
2488            );
2489        }
2490    }
2491
2492    #[test]
2493    fn synthesize_progress_emits_tool_completed_for_tool_end() {
2494        let event = AgentEvent::ToolEnd {
2495            id: "call-1".to_string(),
2496            name: "bash".to_string(),
2497            output: "hello".to_string(),
2498            exit_code: 0,
2499            metadata: None,
2500            error_kind: None,
2501        };
2502        let progress =
2503            synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2504        match progress {
2505            AgentEvent::SubagentProgress {
2506                task_id,
2507                session_id,
2508                status,
2509                metadata,
2510            } => {
2511                assert_eq!(task_id, "task-1");
2512                assert_eq!(session_id, "task-run-task-1");
2513                assert_eq!(status, "tool_completed");
2514                assert_eq!(metadata["tool"], "bash");
2515                assert_eq!(metadata["exit_code"], 0);
2516                assert_eq!(metadata["output_bytes"], 5);
2517                assert!(metadata.get("error_kind").is_none());
2518            }
2519            other => panic!("expected SubagentProgress, got {:?}", other),
2520        }
2521    }
2522
2523    #[test]
2524    fn synthesize_progress_includes_error_kind_when_present() {
2525        let event = AgentEvent::ToolEnd {
2526            id: "call-2".to_string(),
2527            name: "edit".to_string(),
2528            output: "boom".to_string(),
2529            exit_code: 1,
2530            metadata: None,
2531            error_kind: Some(crate::tools::ToolErrorKind::NotFound {
2532                path: "missing.txt".to_string(),
2533            }),
2534        };
2535        let progress =
2536            synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some");
2537        if let AgentEvent::SubagentProgress { metadata, .. } = progress {
2538            assert!(
2539                metadata.get("error_kind").is_some(),
2540                "error_kind should propagate into metadata"
2541            );
2542        } else {
2543            panic!("expected SubagentProgress");
2544        }
2545    }
2546
2547    #[test]
2548    fn synthesize_progress_emits_turn_completed_for_turn_end() {
2549        let event = AgentEvent::TurnEnd {
2550            turn: 3,
2551            usage: crate::llm::TokenUsage {
2552                prompt_tokens: 100,
2553                completion_tokens: 25,
2554                total_tokens: 125,
2555                cache_read_tokens: None,
2556                cache_write_tokens: None,
2557            },
2558        };
2559        let progress =
2560            synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
2561        if let AgentEvent::SubagentProgress {
2562            status, metadata, ..
2563        } = progress
2564        {
2565            assert_eq!(status, "turn_completed");
2566            assert_eq!(metadata["turn"], 3);
2567            assert_eq!(metadata["total_tokens"], 125);
2568            assert_eq!(metadata["prompt_tokens"], 100);
2569            assert_eq!(metadata["completion_tokens"], 25);
2570        } else {
2571            panic!("expected SubagentProgress");
2572        }
2573    }
2574
2575    #[test]
2576    fn synthesize_progress_ignores_unrelated_events() {
2577        let ignored = [
2578            AgentEvent::TextDelta {
2579                text: "hi".to_string(),
2580            },
2581            AgentEvent::ToolStart {
2582                id: "x".to_string(),
2583                name: "bash".to_string(),
2584            },
2585            AgentEvent::TurnStart { turn: 1 },
2586            AgentEvent::SubagentStart {
2587                task_id: "nested".to_string(),
2588                session_id: "nested-run".to_string(),
2589                parent_session_id: "parent".to_string(),
2590                agent: "explore".to_string(),
2591                description: "nested".to_string(),
2592            },
2593        ];
2594        for event in &ignored {
2595            assert!(
2596                synthesize_subagent_progress(event, "task", "session").is_none(),
2597                "{:?} should not emit progress",
2598                event
2599            );
2600        }
2601    }
2602}