Skip to main content

zeph_core/orchestration/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! DAG execution scheduler: drives task graph execution by emitting `SchedulerAction` commands.
5
6use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus};
16use super::router::AgentRouter;
17use crate::config::OrchestrationConfig;
18use crate::sanitizer::{
19    ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind,
20};
21use crate::subagent::SubAgentDef;
22use crate::subagent::error::SubAgentError;
23
24/// Actions the scheduler requests the caller to perform.
25///
26/// The scheduler never holds `&mut SubAgentManager` — it produces these
27/// actions for the caller to execute (ADR-026 command pattern).
28#[derive(Debug)]
29pub enum SchedulerAction {
30    /// Spawn a sub-agent for a task.
31    Spawn {
32        task_id: TaskId,
33        agent_def_name: String,
34        prompt: String,
35    },
36    /// Cancel a running sub-agent (on graph abort/skip).
37    Cancel { agent_handle_id: String },
38    /// Execute a task inline via the main agent (no sub-agents configured).
39    RunInline { task_id: TaskId, prompt: String },
40    /// Graph reached a terminal or paused state.
41    Done { status: GraphStatus },
42}
43
44/// Event sent by a sub-agent loop when it terminates.
45#[derive(Debug)]
46pub struct TaskEvent {
47    pub task_id: TaskId,
48    pub agent_handle_id: String,
49    pub outcome: TaskOutcome,
50}
51
52/// Outcome of a sub-agent execution.
53#[derive(Debug)]
54pub enum TaskOutcome {
55    /// Agent completed successfully.
56    Completed {
57        output: String,
58        artifacts: Vec<PathBuf>,
59    },
60    /// Agent failed.
61    Failed { error: String },
62}
63
64/// Tracks a running task's spawn time and definition name for timeout detection.
65struct RunningTask {
66    agent_handle_id: String,
67    agent_def_name: String,
68    started_at: Instant,
69}
70
71/// DAG execution engine.
72///
73/// Drives task graph execution by producing `SchedulerAction` values
74/// that the caller executes against `SubAgentManager`.
75///
76/// # Caller Loop
77///
78/// ```text
79/// loop {
80///     let actions = scheduler.tick();
81///     for action in actions {
82///         match action {
83///             Spawn { task_id, agent_def_name, prompt } => {
84///                 match manager.spawn_for_task(...) {
85///                     Ok(handle_id) => scheduler.record_spawn(task_id, handle_id),
86///                     Err(e) => { for a in scheduler.record_spawn_failure(task_id, &e) { /* exec */ } }
87///                 }
88///             }
89///             Cancel { agent_handle_id } => { manager.cancel(&agent_handle_id); }
90///             Done { .. } => break,
91///         }
92///     }
93///     scheduler.wait_event().await;
94/// }
95/// ```
96pub struct DagScheduler {
97    graph: TaskGraph,
98    max_parallel: usize,
99    /// Maps `TaskId` -> running sub-agent state.
100    running: HashMap<TaskId, RunningTask>,
101    /// Receives completion/failure events from sub-agent loops.
102    event_rx: mpsc::Receiver<TaskEvent>,
103    /// Sender cloned into each spawned sub-agent via `spawn_for_task`.
104    event_tx: mpsc::Sender<TaskEvent>,
105    /// Per-task wall-clock timeout.
106    task_timeout: Duration,
107    /// Router for agent selection.
108    router: Box<dyn AgentRouter>,
109    /// Available agent definitions (cached from `SubAgentManager`).
110    available_agents: Vec<SubAgentDef>,
111    /// Total character budget for cross-task dependency context injection.
112    dependency_context_budget: usize,
113    /// Events buffered by `wait_event` for processing in the next `tick`.
114    buffered_events: VecDeque<TaskEvent>,
115    /// Sanitizer for dependency output injected into task prompts (SEC-ORCH-01).
116    sanitizer: ContentSanitizer,
117    /// Backoff duration before retrying deferred tasks when all ready tasks hit the concurrency limit.
118    deferral_backoff: Duration,
119    /// Consecutive spawn failures due to concurrency limits. Used to compute exponential backoff.
120    consecutive_spawn_failures: u32,
121}
122
123impl std::fmt::Debug for DagScheduler {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("DagScheduler")
126            .field("graph_id", &self.graph.id)
127            .field("graph_status", &self.graph.status)
128            .field("running_count", &self.running.len())
129            .field("max_parallel", &self.max_parallel)
130            .field("task_timeout_secs", &self.task_timeout.as_secs())
131            .finish_non_exhaustive()
132    }
133}
134
135impl DagScheduler {
136    /// Create a new scheduler for the given graph.
137    ///
138    /// The graph must be in `Created` status. The scheduler transitions
139    /// it to `Running` and marks root tasks as `Ready`.
140    ///
141    /// # Errors
142    ///
143    /// Returns `OrchestrationError::InvalidGraph` if the graph is not in
144    /// `Created` status or has no tasks.
145    pub fn new(
146        mut graph: TaskGraph,
147        config: &OrchestrationConfig,
148        router: Box<dyn AgentRouter>,
149        available_agents: Vec<SubAgentDef>,
150    ) -> Result<Self, OrchestrationError> {
151        if graph.status != GraphStatus::Created {
152            return Err(OrchestrationError::InvalidGraph(format!(
153                "graph must be in Created status, got {}",
154                graph.status
155            )));
156        }
157
158        dag::validate(&graph.tasks, config.max_tasks as usize)?;
159
160        graph.status = GraphStatus::Running;
161
162        for task in &mut graph.tasks {
163            if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
164                task.status = TaskStatus::Ready;
165            }
166        }
167
168        let (event_tx, event_rx) = mpsc::channel(64);
169
170        let task_timeout = if config.task_timeout_secs > 0 {
171            Duration::from_secs(config.task_timeout_secs)
172        } else {
173            Duration::from_secs(600)
174        };
175
176        Ok(Self {
177            graph,
178            max_parallel: config.max_parallel as usize,
179            running: HashMap::new(),
180            event_rx,
181            event_tx,
182            task_timeout,
183            router,
184            available_agents,
185            dependency_context_budget: config.dependency_context_budget,
186            buffered_events: VecDeque::new(),
187            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
188            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
189            consecutive_spawn_failures: 0,
190        })
191    }
192
193    /// Create a scheduler from a graph that is in `Paused` or `Failed` status.
194    ///
195    /// Used for resume and retry flows. The caller is responsible for calling
196    /// [`dag::reset_for_retry`] (for retry) before passing the graph here.
197    ///
198    /// This constructor sets `graph.status = Running` (II3) and reconstructs
199    /// the `running` map from tasks that are still in `Running` state (IC1), so
200    /// their completion events are not silently dropped on the next tick.
201    ///
202    /// # Errors
203    ///
204    /// Returns `OrchestrationError::InvalidGraph` if the graph is in `Completed`
205    /// or `Canceled` status (terminal states that cannot be resumed).
206    pub fn resume_from(
207        mut graph: TaskGraph,
208        config: &OrchestrationConfig,
209        router: Box<dyn AgentRouter>,
210        available_agents: Vec<SubAgentDef>,
211    ) -> Result<Self, OrchestrationError> {
212        if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
213            return Err(OrchestrationError::InvalidGraph(format!(
214                "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
215                graph.status
216            )));
217        }
218
219        // II3: ensure the graph is in Running state so tick() does not immediately
220        // return Done{Paused}.
221        graph.status = GraphStatus::Running;
222
223        // IC1: reconstruct the `running` map from tasks that were still Running at
224        // pause time. Without this their completion events would arrive but
225        // process_event would ignore them (it checks self.running), leaving the
226        // task stuck until timeout.
227        let running: HashMap<TaskId, RunningTask> = graph
228            .tasks
229            .iter()
230            .filter(|t| t.status == TaskStatus::Running)
231            .filter_map(|t| {
232                let handle_id = t.assigned_agent.clone()?;
233                let def_name = t.agent_hint.clone().unwrap_or_default();
234                Some((
235                    t.id,
236                    RunningTask {
237                        agent_handle_id: handle_id,
238                        agent_def_name: def_name,
239                        // Conservative: treat as just-started so timeout window is reset.
240                        started_at: Instant::now(),
241                    },
242                ))
243            })
244            .collect();
245
246        let (event_tx, event_rx) = mpsc::channel(64);
247
248        let task_timeout = if config.task_timeout_secs > 0 {
249            Duration::from_secs(config.task_timeout_secs)
250        } else {
251            Duration::from_secs(600)
252        };
253
254        Ok(Self {
255            graph,
256            max_parallel: config.max_parallel as usize,
257            running,
258            event_rx,
259            event_tx,
260            task_timeout,
261            router,
262            available_agents,
263            dependency_context_budget: config.dependency_context_budget,
264            buffered_events: VecDeque::new(),
265            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
266            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
267            consecutive_spawn_failures: 0,
268        })
269    }
270
271    /// Get a clone of the event sender for injection into sub-agent loops.
272    #[must_use]
273    pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
274        self.event_tx.clone()
275    }
276
277    /// Immutable reference to the current graph state.
278    #[must_use]
279    pub fn graph(&self) -> &TaskGraph {
280        &self.graph
281    }
282
283    /// Return the final graph state.
284    ///
285    /// Clones the graph since `Drop` is implemented on the scheduler.
286    #[must_use]
287    pub fn into_graph(&self) -> TaskGraph {
288        self.graph.clone()
289    }
290}
291
292impl Drop for DagScheduler {
293    fn drop(&mut self) {
294        if !self.running.is_empty() {
295            tracing::warn!(
296                running_tasks = self.running.len(),
297                "DagScheduler dropped with running tasks; agents may continue until their \
298                 CancellationToken fires or they complete naturally"
299            );
300        }
301    }
302}
303
304impl DagScheduler {
305    /// Process pending events and produce actions for the caller.
306    ///
307    /// Call `wait_event` after processing all actions to block until the next event.
308    pub fn tick(&mut self) -> Vec<SchedulerAction> {
309        if self.graph.status != GraphStatus::Running {
310            return vec![SchedulerAction::Done {
311                status: self.graph.status,
312            }];
313        }
314
315        let mut actions = Vec::new();
316
317        // Drain events buffered by wait_event, then any new ones in the channel.
318        while let Some(event) = self.buffered_events.pop_front() {
319            let cancel_actions = self.process_event(event);
320            actions.extend(cancel_actions);
321        }
322        while let Ok(event) = self.event_rx.try_recv() {
323            let cancel_actions = self.process_event(event);
324            actions.extend(cancel_actions);
325        }
326
327        if self.graph.status != GraphStatus::Running {
328            return actions;
329        }
330
331        // Check for timed-out tasks.
332        let timeout_actions = self.check_timeouts();
333        actions.extend(timeout_actions);
334
335        if self.graph.status != GraphStatus::Running {
336            return actions;
337        }
338
339        // Dispatch ALL ready tasks. Concurrency is enforced by SubAgentManager::spawn()
340        // which returns ConcurrencyLimit when active + reserved >= max_concurrent.
341        // Non-transient spawn failures are handled by record_spawn_failure(); optimistic
342        // Running marks are reverted to Ready for ConcurrencyLimit errors.
343        let ready = dag::ready_tasks(&self.graph);
344
345        for task_id in ready {
346            let task = &self.graph.tasks[task_id.index()];
347
348            let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
349                tracing::debug!(
350                    task_id = %task_id,
351                    title = %task.title,
352                    "no agent available, routing task to main agent inline"
353                );
354                let prompt = self.build_task_prompt(task);
355                self.graph.tasks[task_id.index()].status = TaskStatus::Running;
356                actions.push(SchedulerAction::RunInline { task_id, prompt });
357                continue;
358            };
359
360            let prompt = self.build_task_prompt(task);
361
362            // Mark task as Running optimistically (before record_spawn is called).
363            self.graph.tasks[task_id.index()].status = TaskStatus::Running;
364
365            actions.push(SchedulerAction::Spawn {
366                task_id,
367                agent_def_name,
368                prompt,
369            });
370        }
371
372        // Check for completion or deadlock.
373        // Use graph Running status count to avoid false positives while Spawn actions
374        // are in-flight (record_spawn hasn't been called yet for freshly emitted spawns).
375        // Note: non-transient spawn failures (e.g. capability errors) are handled by
376        // record_spawn_failure() which marks the task Failed and propagates failure per
377        // the task's FailureStrategy — this detector does not fire for those cases because
378        // failed tasks are terminal and dag::ready_tasks() returns their unblocked dependents.
379        // ConcurrencyLimit errors are transient: record_spawn_failure() reverts the task
380        // from Running back to Ready, so ready_tasks() is non-empty and deadlock is not
381        // triggered.
382        let running_in_graph_now = self
383            .graph
384            .tasks
385            .iter()
386            .filter(|t| t.status == TaskStatus::Running)
387            .count();
388        if running_in_graph_now == 0 && self.running.is_empty() {
389            let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
390            if all_terminal {
391                self.graph.status = GraphStatus::Completed;
392                self.graph.finished_at = Some(super::graph::chrono_now());
393                actions.push(SchedulerAction::Done {
394                    status: GraphStatus::Completed,
395                });
396            } else if dag::ready_tasks(&self.graph).is_empty() {
397                tracing::error!(
398                    "scheduler deadlock: no running or ready tasks, but graph not complete"
399                );
400                self.graph.status = GraphStatus::Failed;
401                self.graph.finished_at = Some(super::graph::chrono_now());
402                actions.push(SchedulerAction::Done {
403                    status: GraphStatus::Failed,
404                });
405            }
406        }
407
408        actions
409    }
410
411    /// Wait for the next event from a running sub-agent.
412    ///
413    /// Buffers the received event for processing in the next `tick` call.
414    /// Returns immediately if no tasks are running. Uses a timeout so that
415    /// periodic timeout checking can occur.
416    /// Compute the current deferral backoff with exponential growth capped at 5 seconds.
417    ///
418    /// Each consecutive spawn failure due to concurrency limits doubles the base backoff.
419    fn current_deferral_backoff(&self) -> Duration {
420        const MAX_BACKOFF: Duration = Duration::from_secs(5);
421        let multiplier = 1u32
422            .checked_shl(self.consecutive_spawn_failures.min(10))
423            .unwrap_or(u32::MAX);
424        self.deferral_backoff
425            .saturating_mul(multiplier)
426            .min(MAX_BACKOFF)
427    }
428
429    pub async fn wait_event(&mut self) {
430        if self.running.is_empty() {
431            tokio::time::sleep(self.current_deferral_backoff()).await;
432            return;
433        }
434
435        // Find the nearest timeout deadline among running tasks.
436        let nearest_timeout = self
437            .running
438            .values()
439            .map(|r| {
440                self.task_timeout
441                    .checked_sub(r.started_at.elapsed())
442                    .unwrap_or(Duration::ZERO)
443            })
444            .min()
445            .unwrap_or(Duration::from_secs(1));
446
447        // Clamp to at least 100 ms to avoid busy-looping.
448        let wait_duration = nearest_timeout.max(Duration::from_millis(100));
449
450        tokio::select! {
451            Some(event) = self.event_rx.recv() => {
452                // SEC-ORCH-02: guard against unbounded buffer growth. Use total task
453                // count rather than max_parallel so that parallel bursts exceeding
454                // max_parallel do not cause premature event drops.
455                if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
456                    // PERF-SCHED-02: log at error level — a dropped completion event
457                    // leaves a task stuck in Running until its timeout fires.
458                    if let Some(dropped) = self.buffered_events.pop_front() {
459                        tracing::error!(
460                            task_id = %dropped.task_id,
461                            buffer_len = self.buffered_events.len(),
462                            "event buffer saturated; completion event dropped — task may \
463                             remain Running until timeout"
464                        );
465                    }
466                }
467                self.buffered_events.push_back(event);
468            }
469            () = tokio::time::sleep(wait_duration) => {}
470        }
471    }
472
473    /// Record that a spawn action was successfully executed.
474    ///
475    /// Called by the caller after successfully spawning via `SubAgentManager`.
476    ///
477    /// Resets `consecutive_spawn_failures` to 0 as a "spawn succeeded = scheduler healthy"
478    /// signal. This is intentionally separate from the batch-level backoff in
479    /// [`record_batch_backoff`]: `record_spawn` provides an immediate reset on the first
480    /// success within a batch, while `record_batch_backoff` governs the tick-granular
481    /// failure counter used for exponential wait backoff.
482    pub fn record_spawn(
483        &mut self,
484        task_id: TaskId,
485        agent_handle_id: String,
486        agent_def_name: String,
487    ) {
488        self.consecutive_spawn_failures = 0;
489        self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
490        self.running.insert(
491            task_id,
492            RunningTask {
493                agent_handle_id,
494                agent_def_name,
495                started_at: Instant::now(),
496            },
497        );
498    }
499
500    /// Handle a failed spawn attempt.
501    ///
502    /// If the error is a transient concurrency-limit rejection, reverts the task from
503    /// Running back to `Ready` so the next [`tick`] can retry the spawn when a slot opens.
504    /// Otherwise, marks the task as `Failed` and propagates failure.
505    /// Returns any cancel actions needed.
506    ///
507    /// # Errors (via returned actions)
508    ///
509    /// Propagates failure per the task's effective `FailureStrategy`.
510    pub fn record_spawn_failure(
511        &mut self,
512        task_id: TaskId,
513        error: &SubAgentError,
514    ) -> Vec<SchedulerAction> {
515        // Transient condition: the SubAgentManager rejected the spawn because all
516        // concurrency slots are occupied. Revert to Ready so the next tick retries.
517        // consecutive_spawn_failures is updated batch-wide by record_batch_backoff().
518        if let SubAgentError::ConcurrencyLimit { active, max } = error {
519            tracing::warn!(
520                task_id = %task_id,
521                active,
522                max,
523                next_backoff_ms = self.current_deferral_backoff().as_millis(),
524                "concurrency limit reached, deferring task to next tick"
525            );
526            self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
527            return Vec::new();
528        }
529
530        // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
531        let error_excerpt: String = error.to_string().chars().take(512).collect();
532        tracing::warn!(
533            task_id = %task_id,
534            error = %error_excerpt,
535            "spawn failed, marking task failed"
536        );
537        self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
538        let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
539        let mut actions = Vec::new();
540        for cancel_task_id in cancel_ids {
541            if let Some(running) = self.running.remove(&cancel_task_id) {
542                actions.push(SchedulerAction::Cancel {
543                    agent_handle_id: running.agent_handle_id,
544                });
545            }
546        }
547        if self.graph.status != GraphStatus::Running {
548            self.graph.finished_at = Some(super::graph::chrono_now());
549            actions.push(SchedulerAction::Done {
550                status: self.graph.status,
551            });
552        }
553        actions
554    }
555
556    /// Update the batch-level backoff counter after processing a full tick's spawn batch.
557    ///
558    /// With parallel dispatch a single tick may produce N Spawn actions. Individual
559    /// per-spawn counter updates would miscount concurrent rejections as "consecutive"
560    /// failures. This method captures the batch semantics instead:
561    /// - If any spawn succeeded → reset the counter (scheduler is healthy).
562    /// - Else if any spawn hit `ConcurrencyLimit` → this entire tick was a deferral tick.
563    /// - If neither → no spawns were attempted; counter unchanged.
564    pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
565        if any_success {
566            self.consecutive_spawn_failures = 0;
567        } else if any_concurrency_failure {
568            self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
569        }
570    }
571
572    /// Cancel all running tasks (for user-initiated plan cancellation).
573    ///
574    /// # Warning: Cooperative Cancellation
575    ///
576    /// Cancellation is cooperative and asynchronous. Tool operations (file writes, shell
577    /// executions) in progress at the time of cancellation complete before the agent loop
578    /// checks the cancellation token. Callers should inspect the task graph state and clean
579    /// up partially-written artifacts manually.
580    pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
581        self.graph.status = GraphStatus::Canceled;
582        self.graph.finished_at = Some(super::graph::chrono_now());
583
584        // Drain running map first to avoid split borrow issues (M3).
585        let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
586        let mut actions: Vec<SchedulerAction> = running
587            .into_iter()
588            .map(|(task_id, r)| {
589                self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
590                SchedulerAction::Cancel {
591                    agent_handle_id: r.agent_handle_id,
592                }
593            })
594            .collect();
595
596        for task in &mut self.graph.tasks {
597            if !task.status.is_terminal() {
598                task.status = TaskStatus::Canceled;
599            }
600        }
601
602        actions.push(SchedulerAction::Done {
603            status: GraphStatus::Canceled,
604        });
605        actions
606    }
607}
608
609impl DagScheduler {
610    /// Process a single `TaskEvent` and return any cancel actions needed.
611    fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
612        let TaskEvent {
613            task_id,
614            agent_handle_id,
615            outcome,
616        } = event;
617
618        // Guard against stale events from previous incarnations (e.g. after timeout+retry).
619        // A timed-out agent's event_tx outlives the timeout and may send a completion later.
620        match self.running.get(&task_id) {
621            Some(running) if running.agent_handle_id != agent_handle_id => {
622                tracing::warn!(
623                    task_id = %task_id,
624                    expected = %running.agent_handle_id,
625                    got = %agent_handle_id,
626                    "discarding stale event from previous agent incarnation"
627                );
628                return Vec::new();
629            }
630            None => {
631                tracing::debug!(
632                    task_id = %task_id,
633                    agent_handle_id = %agent_handle_id,
634                    "ignoring event for task not in running map"
635                );
636                return Vec::new();
637            }
638            Some(_) => {}
639        }
640
641        // Compute duration BEFORE removing from running map (C1 fix).
642        let duration_ms = self.running.get(&task_id).map_or(0, |r| {
643            u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
644        });
645        let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
646
647        self.running.remove(&task_id);
648
649        match outcome {
650            TaskOutcome::Completed { output, artifacts } => {
651                self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
652                self.graph.tasks[task_id.index()].result = Some(TaskResult {
653                    output,
654                    artifacts,
655                    duration_ms,
656                    agent_id: Some(agent_handle_id),
657                    agent_def: agent_def_name,
658                });
659
660                // Mark newly unblocked tasks as Ready.
661                let newly_ready = dag::ready_tasks(&self.graph);
662                for ready_id in newly_ready {
663                    if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
664                        self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
665                    }
666                }
667
668                Vec::new()
669            }
670
671            TaskOutcome::Failed { error } => {
672                // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
673                let error_excerpt: String = error.chars().take(512).collect();
674                tracing::warn!(
675                    task_id = %task_id,
676                    error = %error_excerpt,
677                    "task failed"
678                );
679                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
680
681                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
682                let mut actions = Vec::new();
683
684                for cancel_task_id in cancel_ids {
685                    if let Some(running) = self.running.remove(&cancel_task_id) {
686                        actions.push(SchedulerAction::Cancel {
687                            agent_handle_id: running.agent_handle_id,
688                        });
689                    }
690                }
691
692                if self.graph.status != GraphStatus::Running {
693                    self.graph.finished_at = Some(super::graph::chrono_now());
694                    actions.push(SchedulerAction::Done {
695                        status: self.graph.status,
696                    });
697                }
698
699                actions
700            }
701        }
702    }
703
704    /// Check all running tasks for timeout violations.
705    ///
706    /// # Warning: Cooperative Cancellation
707    ///
708    /// Cancel actions emitted here signal agents cooperatively. Tool operations in progress
709    /// at the time of cancellation complete before the agent loop checks the cancellation
710    /// token. Partially-written artifacts may remain on disk after cancellation.
711    fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
712        let timed_out: Vec<(TaskId, String)> = self
713            .running
714            .iter()
715            .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
716            .map(|(id, r)| (*id, r.agent_handle_id.clone()))
717            .collect();
718
719        let mut actions = Vec::new();
720        for (task_id, agent_handle_id) in timed_out {
721            tracing::warn!(
722                task_id = %task_id,
723                timeout_secs = self.task_timeout.as_secs(),
724                "task timed out"
725            );
726            self.running.remove(&task_id);
727            self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
728
729            actions.push(SchedulerAction::Cancel { agent_handle_id });
730
731            let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
732            for cancel_task_id in cancel_ids {
733                if let Some(running) = self.running.remove(&cancel_task_id) {
734                    actions.push(SchedulerAction::Cancel {
735                        agent_handle_id: running.agent_handle_id,
736                    });
737                }
738            }
739
740            if self.graph.status != GraphStatus::Running {
741                self.graph.finished_at = Some(super::graph::chrono_now());
742                actions.push(SchedulerAction::Done {
743                    status: self.graph.status,
744                });
745                break;
746            }
747        }
748
749        actions
750    }
751
752    /// Build the task prompt with dependency context injection (Section 14).
753    ///
754    /// Uses char-boundary-safe truncation (S1 fix) to avoid panics on multi-byte UTF-8.
755    /// Dependency output is sanitized (SEC-ORCH-01) and titles are XML-escaped to prevent
756    /// prompt injection via crafted task outputs.
757    fn build_task_prompt(&self, task: &TaskNode) -> String {
758        if task.depends_on.is_empty() {
759            return task.description.clone();
760        }
761
762        let completed_deps: Vec<&TaskNode> = task
763            .depends_on
764            .iter()
765            .filter_map(|dep_id| {
766                let dep = &self.graph.tasks[dep_id.index()];
767                if dep.status == TaskStatus::Completed {
768                    Some(dep)
769                } else {
770                    None
771                }
772            })
773            .collect();
774
775        if completed_deps.is_empty() {
776            return task.description.clone();
777        }
778
779        let budget_per_dep = self
780            .dependency_context_budget
781            .checked_div(completed_deps.len())
782            .unwrap_or(self.dependency_context_budget);
783
784        let mut context_block = String::from("<completed-dependencies>\n");
785
786        for dep in &completed_deps {
787            // SEC-ORCH-01: XML-escape dep.id and dep.title to prevent breaking out of the
788            // <completed-dependencies> wrapper via crafted titles.
789            let escaped_id = xml_escape(&dep.id.to_string());
790            let escaped_title = xml_escape(&dep.title);
791            let _ = writeln!(
792                context_block,
793                "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
794            );
795
796            if let Some(ref result) = dep.result {
797                // SEC-ORCH-01: sanitize dep output to prevent prompt injection from upstream tasks.
798                let source = ContentSource::new(ContentSourceKind::A2aMessage);
799                let sanitized = self.sanitizer.sanitize(&result.output, source);
800                let safe_output = sanitized.body;
801
802                // Char-boundary-safe truncation (S1): use chars().take() instead of byte slicing.
803                let char_count = safe_output.chars().count();
804                if char_count > budget_per_dep {
805                    let truncated: String = safe_output.chars().take(budget_per_dep).collect();
806                    let _ = write!(
807                        context_block,
808                        "{truncated}...\n[truncated: {char_count} chars total]"
809                    );
810                } else {
811                    context_block.push_str(&safe_output);
812                }
813            } else {
814                context_block.push_str("[no output recorded]\n");
815            }
816            context_block.push('\n');
817        }
818
819        // Add notes for skipped deps.
820        for dep_id in &task.depends_on {
821            let dep = &self.graph.tasks[dep_id.index()];
822            if dep.status == TaskStatus::Skipped {
823                let escaped_id = xml_escape(&dep.id.to_string());
824                let escaped_title = xml_escape(&dep.title);
825                let _ = writeln!(
826                    context_block,
827                    "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
828                );
829            }
830        }
831
832        context_block.push_str("</completed-dependencies>\n\n");
833        format!("{context_block}Your task: {}", task.description)
834    }
835}
836
837/// Escape XML special characters in a string to prevent tag injection.
838fn xml_escape(s: &str) -> String {
839    let mut out = String::with_capacity(s.len());
840    for c in s.chars() {
841        match c {
842            '<' => out.push_str("&lt;"),
843            '>' => out.push_str("&gt;"),
844            '&' => out.push_str("&amp;"),
845            '"' => out.push_str("&quot;"),
846            '\'' => out.push_str("&#39;"),
847            other => out.push(other),
848        }
849    }
850    out
851}
852
853#[cfg(test)]
854mod tests {
855    #![allow(clippy::default_trait_access)]
856
857    use super::*;
858    use crate::orchestration::graph::{
859        FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus,
860    };
861
862    fn make_node(id: u32, deps: &[u32]) -> TaskNode {
863        let mut n = TaskNode::new(
864            id,
865            format!("task-{id}"),
866            format!("description for task {id}"),
867        );
868        n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
869        n
870    }
871
872    fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
873        let mut g = TaskGraph::new("test goal");
874        g.tasks = nodes;
875        g
876    }
877
878    fn make_def(name: &str) -> SubAgentDef {
879        use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
880        SubAgentDef {
881            name: name.to_string(),
882            description: format!("{name} agent"),
883            model: None,
884            tools: ToolPolicy::InheritAll,
885            disallowed_tools: vec![],
886            permissions: SubAgentPermissions::default(),
887            skills: SkillFilter::default(),
888            system_prompt: String::new(),
889            hooks: crate::subagent::SubagentHooks::default(),
890            memory: None,
891            source: None,
892            file_path: None,
893        }
894    }
895
896    fn make_config() -> crate::config::OrchestrationConfig {
897        crate::config::OrchestrationConfig {
898            enabled: true,
899            max_tasks: 20,
900            max_parallel: 4,
901            default_failure_strategy: "abort".to_string(),
902            default_max_retries: 3,
903            task_timeout_secs: 300,
904            planner_model: None,
905            planner_max_tokens: 4096,
906            dependency_context_budget: 16384,
907            confirm_before_execute: true,
908            aggregator_max_tokens: 4096,
909            deferral_backoff_ms: 250,
910        }
911    }
912
913    struct FirstRouter;
914    impl AgentRouter for FirstRouter {
915        fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
916            available.first().map(|d| d.name.clone())
917        }
918    }
919
920    struct NoneRouter;
921    impl AgentRouter for NoneRouter {
922        fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
923            None
924        }
925    }
926
927    fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
928        let config = make_config();
929        let defs = vec![make_def("worker")];
930        DagScheduler::new(graph, &config, router, defs).unwrap()
931    }
932
933    fn make_scheduler(graph: TaskGraph) -> DagScheduler {
934        let config = make_config();
935        let defs = vec![make_def("worker")];
936        DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
937    }
938
939    // --- constructor tests ---
940
941    #[test]
942    fn test_new_validates_graph_status() {
943        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
944        graph.status = GraphStatus::Running; // wrong status
945        let config = make_config();
946        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
947        assert!(result.is_err());
948        let err = result.unwrap_err();
949        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
950    }
951
952    #[test]
953    fn test_new_marks_roots_ready() {
954        let graph = graph_from_nodes(vec![
955            make_node(0, &[]),
956            make_node(1, &[]),
957            make_node(2, &[0, 1]),
958        ]);
959        let scheduler = make_scheduler(graph);
960        assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
961        assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
962        assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
963        assert_eq!(scheduler.graph().status, GraphStatus::Running);
964    }
965
966    #[test]
967    fn test_new_validates_empty_graph() {
968        let graph = graph_from_nodes(vec![]);
969        let config = make_config();
970        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
971        assert!(result.is_err());
972    }
973
974    // --- tick tests ---
975
976    #[test]
977    fn test_tick_produces_spawn_for_ready() {
978        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
979        let mut scheduler = make_scheduler(graph);
980        let actions = scheduler.tick();
981        let spawns: Vec<_> = actions
982            .iter()
983            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
984            .collect();
985        assert_eq!(spawns.len(), 2);
986    }
987
988    #[test]
989    fn test_tick_dispatches_all_regardless_of_max_parallel() {
990        // With parallel dispatch, tick() emits Spawn for ALL ready tasks.
991        // max_parallel no longer caps the number of Spawn actions per tick;
992        // concurrency is enforced by SubAgentManager.
993        let graph = graph_from_nodes(vec![
994            make_node(0, &[]),
995            make_node(1, &[]),
996            make_node(2, &[]),
997            make_node(3, &[]),
998            make_node(4, &[]),
999        ]);
1000        let mut config = make_config();
1001        config.max_parallel = 2;
1002        let defs = vec![make_def("worker")];
1003        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1004        let actions = scheduler.tick();
1005        let spawn_count = actions
1006            .iter()
1007            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1008            .count();
1009        assert_eq!(spawn_count, 5, "all 5 ready tasks must be dispatched");
1010    }
1011
1012    #[test]
1013    fn test_tick_detects_completion() {
1014        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1015        graph.tasks[0].status = TaskStatus::Completed;
1016        let config = make_config();
1017        let defs = vec![make_def("worker")];
1018        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1019        // Manually set graph to Running since new() validated Created status
1020        // — but all tasks are terminal. tick() should detect completion.
1021        let actions = scheduler.tick();
1022        let has_done = actions.iter().any(|a| {
1023            matches!(
1024                a,
1025                SchedulerAction::Done {
1026                    status: GraphStatus::Completed
1027                }
1028            )
1029        });
1030        assert!(
1031            has_done,
1032            "should emit Done(Completed) when all tasks are terminal"
1033        );
1034    }
1035
1036    // --- completion event tests ---
1037
1038    #[test]
1039    fn test_completion_event_marks_deps_ready() {
1040        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1041        let mut scheduler = make_scheduler(graph);
1042
1043        // Simulate task 0 running.
1044        scheduler.graph.tasks[0].status = TaskStatus::Running;
1045        scheduler.running.insert(
1046            TaskId(0),
1047            RunningTask {
1048                agent_handle_id: "handle-0".to_string(),
1049                agent_def_name: "worker".to_string(),
1050                started_at: Instant::now(),
1051            },
1052        );
1053
1054        let event = TaskEvent {
1055            task_id: TaskId(0),
1056            agent_handle_id: "handle-0".to_string(),
1057            outcome: TaskOutcome::Completed {
1058                output: "done".to_string(),
1059                artifacts: vec![],
1060            },
1061        };
1062        scheduler.buffered_events.push_back(event);
1063
1064        let actions = scheduler.tick();
1065        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1066        // Task 1 should now be Ready or Spawn action emitted.
1067        let has_spawn_1 = actions
1068            .iter()
1069            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1070        assert!(
1071            has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1072            "task 1 should be spawned or marked Ready"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_failure_abort_cancels_running() {
1078        let graph = graph_from_nodes(vec![
1079            make_node(0, &[]),
1080            make_node(1, &[]),
1081            make_node(2, &[0, 1]),
1082        ]);
1083        let mut scheduler = make_scheduler(graph);
1084
1085        // Simulate tasks 0 and 1 running.
1086        scheduler.graph.tasks[0].status = TaskStatus::Running;
1087        scheduler.running.insert(
1088            TaskId(0),
1089            RunningTask {
1090                agent_handle_id: "h0".to_string(),
1091                agent_def_name: "worker".to_string(),
1092                started_at: Instant::now(),
1093            },
1094        );
1095        scheduler.graph.tasks[1].status = TaskStatus::Running;
1096        scheduler.running.insert(
1097            TaskId(1),
1098            RunningTask {
1099                agent_handle_id: "h1".to_string(),
1100                agent_def_name: "worker".to_string(),
1101                started_at: Instant::now(),
1102            },
1103        );
1104
1105        // Task 0 fails with default Abort strategy.
1106        let event = TaskEvent {
1107            task_id: TaskId(0),
1108            agent_handle_id: "h0".to_string(),
1109            outcome: TaskOutcome::Failed {
1110                error: "boom".to_string(),
1111            },
1112        };
1113        scheduler.buffered_events.push_back(event);
1114
1115        let actions = scheduler.tick();
1116        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1117        let cancel_ids: Vec<_> = actions
1118            .iter()
1119            .filter_map(|a| {
1120                if let SchedulerAction::Cancel { agent_handle_id } = a {
1121                    Some(agent_handle_id.as_str())
1122                } else {
1123                    None
1124                }
1125            })
1126            .collect();
1127        assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1128        assert!(
1129            actions
1130                .iter()
1131                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1132        );
1133    }
1134
1135    #[test]
1136    fn test_failure_skip_propagates() {
1137        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1138        let mut scheduler = make_scheduler(graph);
1139
1140        // Set failure strategy to Skip on task 0.
1141        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1142        scheduler.graph.tasks[0].status = TaskStatus::Running;
1143        scheduler.running.insert(
1144            TaskId(0),
1145            RunningTask {
1146                agent_handle_id: "h0".to_string(),
1147                agent_def_name: "worker".to_string(),
1148                started_at: Instant::now(),
1149            },
1150        );
1151
1152        let event = TaskEvent {
1153            task_id: TaskId(0),
1154            agent_handle_id: "h0".to_string(),
1155            outcome: TaskOutcome::Failed {
1156                error: "skip me".to_string(),
1157            },
1158        };
1159        scheduler.buffered_events.push_back(event);
1160        scheduler.tick();
1161
1162        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1163        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1164    }
1165
1166    #[test]
1167    fn test_failure_retry_reschedules() {
1168        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1169        let mut scheduler = make_scheduler(graph);
1170
1171        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1172        scheduler.graph.tasks[0].max_retries = Some(3);
1173        scheduler.graph.tasks[0].retry_count = 0;
1174        scheduler.graph.tasks[0].status = TaskStatus::Running;
1175        scheduler.running.insert(
1176            TaskId(0),
1177            RunningTask {
1178                agent_handle_id: "h0".to_string(),
1179                agent_def_name: "worker".to_string(),
1180                started_at: Instant::now(),
1181            },
1182        );
1183
1184        let event = TaskEvent {
1185            task_id: TaskId(0),
1186            agent_handle_id: "h0".to_string(),
1187            outcome: TaskOutcome::Failed {
1188                error: "transient".to_string(),
1189            },
1190        };
1191        scheduler.buffered_events.push_back(event);
1192        let actions = scheduler.tick();
1193
1194        // Task should be rescheduled (Ready) and a Spawn action emitted.
1195        let has_spawn = actions
1196            .iter()
1197            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1198        assert!(
1199            has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1200            "retry should produce spawn or Ready status"
1201        );
1202        // retry_count incremented
1203        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1204    }
1205
1206    #[test]
1207    fn test_process_event_failed_retry() {
1208        // End-to-end: send Failed event, verify retry path produces Ready -> Spawn.
1209        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1210        let mut scheduler = make_scheduler(graph);
1211
1212        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1213        scheduler.graph.tasks[0].max_retries = Some(2);
1214        scheduler.graph.tasks[0].retry_count = 0;
1215        scheduler.graph.tasks[0].status = TaskStatus::Running;
1216        scheduler.running.insert(
1217            TaskId(0),
1218            RunningTask {
1219                agent_handle_id: "h0".to_string(),
1220                agent_def_name: "worker".to_string(),
1221                started_at: Instant::now(),
1222            },
1223        );
1224
1225        let event = TaskEvent {
1226            task_id: TaskId(0),
1227            agent_handle_id: "h0".to_string(),
1228            outcome: TaskOutcome::Failed {
1229                error: "first failure".to_string(),
1230            },
1231        };
1232        scheduler.buffered_events.push_back(event);
1233        let actions = scheduler.tick();
1234
1235        // After retry: retry_count = 1, status = Ready or Spawn emitted.
1236        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1237        let spawned = actions
1238            .iter()
1239            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1240        assert!(
1241            spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1242            "retry should emit Spawn or set Ready"
1243        );
1244        // Graph must still be Running.
1245        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1246    }
1247
1248    #[test]
1249    fn test_timeout_cancels_stalled() {
1250        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1251        let mut config = make_config();
1252        config.task_timeout_secs = 1; // 1 second timeout
1253        let defs = vec![make_def("worker")];
1254        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1255
1256        // Simulate a running task that started just over 1 second ago.
1257        scheduler.graph.tasks[0].status = TaskStatus::Running;
1258        scheduler.running.insert(
1259            TaskId(0),
1260            RunningTask {
1261                agent_handle_id: "h0".to_string(),
1262                agent_def_name: "worker".to_string(),
1263                started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), // already timed out
1264            },
1265        );
1266
1267        let actions = scheduler.tick();
1268        let has_cancel = actions.iter().any(
1269            |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1270        );
1271        assert!(has_cancel, "timed-out task should emit Cancel action");
1272        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1273    }
1274
1275    #[test]
1276    fn test_cancel_all() {
1277        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1278        let mut scheduler = make_scheduler(graph);
1279
1280        scheduler.graph.tasks[0].status = TaskStatus::Running;
1281        scheduler.running.insert(
1282            TaskId(0),
1283            RunningTask {
1284                agent_handle_id: "h0".to_string(),
1285                agent_def_name: "worker".to_string(),
1286                started_at: Instant::now(),
1287            },
1288        );
1289        scheduler.graph.tasks[1].status = TaskStatus::Running;
1290        scheduler.running.insert(
1291            TaskId(1),
1292            RunningTask {
1293                agent_handle_id: "h1".to_string(),
1294                agent_def_name: "worker".to_string(),
1295                started_at: Instant::now(),
1296            },
1297        );
1298
1299        let actions = scheduler.cancel_all();
1300
1301        assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1302        assert!(scheduler.running.is_empty());
1303        let cancel_count = actions
1304            .iter()
1305            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1306            .count();
1307        assert_eq!(cancel_count, 2);
1308        assert!(actions.iter().any(|a| matches!(
1309            a,
1310            SchedulerAction::Done {
1311                status: GraphStatus::Canceled
1312            }
1313        )));
1314    }
1315
1316    #[test]
1317    fn test_record_spawn_failure() {
1318        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1319        let mut scheduler = make_scheduler(graph);
1320
1321        // Simulate task marked Running (by tick) but spawn failed.
1322        scheduler.graph.tasks[0].status = TaskStatus::Running;
1323
1324        let error = SubAgentError::Spawn("spawn error".to_string());
1325        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1326        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1327        // With Abort strategy and no other running tasks, graph should be Failed.
1328        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1329        assert!(
1330            actions
1331                .iter()
1332                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1333        );
1334    }
1335
1336    #[test]
1337    fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1338        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1339        let mut scheduler = make_scheduler(graph);
1340
1341        // Simulate tick() optimistically marking the task Running before spawn.
1342        scheduler.graph.tasks[0].status = TaskStatus::Running;
1343
1344        // Concurrency limit hit — transient, should not fail the task.
1345        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1346        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1347        assert_eq!(
1348            scheduler.graph.tasks[0].status,
1349            TaskStatus::Ready,
1350            "task must revert to Ready so the next tick can retry"
1351        );
1352        assert_eq!(
1353            scheduler.graph.status,
1354            GraphStatus::Running,
1355            "graph must stay Running, not transition to Failed"
1356        );
1357        assert!(
1358            actions.is_empty(),
1359            "no cancel or done actions expected for a transient deferral"
1360        );
1361    }
1362
1363    #[test]
1364    fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1365        // Both spawn() and resume() now return SubAgentError::ConcurrencyLimit — verify handling.
1366        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1367        let mut scheduler = make_scheduler(graph);
1368        scheduler.graph.tasks[0].status = TaskStatus::Running;
1369
1370        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1371        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1372        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1373        assert!(actions.is_empty());
1374    }
1375
1376    // --- #1516 edge-case tests ---
1377
1378    #[test]
1379    fn test_concurrency_deferral_does_not_affect_running_task() {
1380        // Two root tasks. Task 0 is Running (successfully spawned).
1381        // Task 1 hits a concurrency limit and reverts to Ready.
1382        // Task 0 must be unaffected.
1383        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1384        let mut scheduler = make_scheduler(graph);
1385
1386        // Simulate both tasks optimistically marked Running by tick().
1387        scheduler.graph.tasks[0].status = TaskStatus::Running;
1388        scheduler.running.insert(
1389            TaskId(0),
1390            RunningTask {
1391                agent_handle_id: "h0".to_string(),
1392                agent_def_name: "worker".to_string(),
1393                started_at: Instant::now(),
1394            },
1395        );
1396        scheduler.graph.tasks[1].status = TaskStatus::Running;
1397
1398        // Task 1 spawn fails with concurrency limit.
1399        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1400        let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1401
1402        assert_eq!(
1403            scheduler.graph.tasks[0].status,
1404            TaskStatus::Running,
1405            "task 0 must remain Running"
1406        );
1407        assert_eq!(
1408            scheduler.graph.tasks[1].status,
1409            TaskStatus::Ready,
1410            "task 1 must revert to Ready"
1411        );
1412        assert_eq!(
1413            scheduler.graph.status,
1414            GraphStatus::Running,
1415            "graph must stay Running"
1416        );
1417        assert!(actions.is_empty(), "no cancel or done actions expected");
1418    }
1419
1420    #[test]
1421    fn test_max_concurrent_zero_no_infinite_loop() {
1422        // max_parallel=0 is a degenerate config. With parallel dispatch, tick() still
1423        // emits Spawn for all ready tasks — concurrency enforcement is in SubAgentManager.
1424        // After the caller calls record_spawn_failure(ConcurrencyLimit), the task reverts
1425        // to Ready and the graph stays Running (no deadlock).
1426        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1427        let config = crate::config::OrchestrationConfig {
1428            max_parallel: 0,
1429            ..make_config()
1430        };
1431        let mut scheduler = DagScheduler::new(
1432            graph,
1433            &config,
1434            Box::new(FirstRouter),
1435            vec![make_def("worker")],
1436        )
1437        .unwrap();
1438
1439        let actions1 = scheduler.tick();
1440        // tick() dispatches all ready tasks regardless of max_parallel.
1441        assert!(
1442            actions1
1443                .iter()
1444                .any(|a| matches!(a, SchedulerAction::Spawn { .. })),
1445            "Spawn expected — parallel dispatch ignores max_parallel cap in tick()"
1446        );
1447        assert!(
1448            actions1
1449                .iter()
1450                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1451            "no Done(Failed) expected — ready tasks exist, so no deadlock"
1452        );
1453        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1454
1455        // Simulate caller receiving ConcurrencyLimit from SubAgentManager.
1456        scheduler.graph.tasks[0].status = TaskStatus::Running;
1457        let error = SubAgentError::ConcurrencyLimit { active: 0, max: 0 };
1458        let extra = scheduler.record_spawn_failure(TaskId(0), &error);
1459        assert!(
1460            extra.is_empty(),
1461            "ConcurrencyLimit must not produce cancel/done actions"
1462        );
1463        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1464
1465        // Second tick must also dispatch (not deadlock).
1466        let actions2 = scheduler.tick();
1467        assert!(
1468            actions2
1469                .iter()
1470                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1471            "second tick must not emit Done(Failed)"
1472        );
1473        assert_eq!(
1474            scheduler.graph.status,
1475            GraphStatus::Running,
1476            "graph must remain Running after two ticks"
1477        );
1478    }
1479
1480    #[test]
1481    fn test_all_tasks_deferred_graph_stays_running() {
1482        // Both root tasks are spawned optimistically, both fail with ConcurrencyLimit,
1483        // and both revert to Ready. The graph must remain Running (not Failed) and
1484        // the next tick must re-emit Spawn actions for the deferred tasks.
1485        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1486        let mut scheduler = make_scheduler(graph);
1487
1488        // First tick emits Spawn for both tasks and marks them Running.
1489        let actions = scheduler.tick();
1490        assert_eq!(
1491            actions
1492                .iter()
1493                .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1494                .count(),
1495            2,
1496            "expected 2 Spawn actions on first tick"
1497        );
1498        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1499        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1500
1501        // Both spawns fail — both revert to Ready.
1502        let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1503        let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1504        let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1505        assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1506        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1507        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1508        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1509
1510        // Second tick must retry both deferred tasks.
1511        let retry_actions = scheduler.tick();
1512        let spawn_count = retry_actions
1513            .iter()
1514            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1515            .count();
1516        assert!(
1517            spawn_count > 0,
1518            "second tick must re-emit Spawn for deferred tasks"
1519        );
1520        assert!(
1521            retry_actions.iter().all(|a| !matches!(
1522                a,
1523                SchedulerAction::Done {
1524                    status: GraphStatus::Failed,
1525                    ..
1526                }
1527            )),
1528            "no Done(Failed) expected"
1529        );
1530    }
1531
1532    #[test]
1533    fn test_build_prompt_no_deps() {
1534        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1535        let scheduler = make_scheduler(graph);
1536        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1537        assert_eq!(prompt, "description for task 0");
1538    }
1539
1540    #[test]
1541    fn test_build_prompt_with_deps_and_truncation() {
1542        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1543        graph.tasks[0].status = TaskStatus::Completed;
1544        // Create output longer than budget
1545        graph.tasks[0].result = Some(TaskResult {
1546            output: "x".repeat(200),
1547            artifacts: vec![],
1548            duration_ms: 10,
1549            agent_id: None,
1550            agent_def: None,
1551        });
1552
1553        let config = crate::config::OrchestrationConfig {
1554            dependency_context_budget: 50,
1555            ..make_config()
1556        };
1557        let scheduler = DagScheduler::new(
1558            graph,
1559            &config,
1560            Box::new(FirstRouter),
1561            vec![make_def("worker")],
1562        )
1563        .unwrap();
1564
1565        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1566        assert!(prompt.contains("<completed-dependencies>"));
1567        assert!(prompt.contains("[truncated:"));
1568        assert!(prompt.contains("Your task:"));
1569    }
1570
1571    #[test]
1572    fn test_duration_ms_computed_correctly() {
1573        // Regression test for C1: duration_ms must be non-zero after completion.
1574        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1575        let mut scheduler = make_scheduler(graph);
1576
1577        scheduler.graph.tasks[0].status = TaskStatus::Running;
1578        scheduler.running.insert(
1579            TaskId(0),
1580            RunningTask {
1581                agent_handle_id: "h0".to_string(),
1582                agent_def_name: "worker".to_string(),
1583                started_at: Instant::now()
1584                    .checked_sub(Duration::from_millis(50))
1585                    .unwrap(),
1586            },
1587        );
1588
1589        let event = TaskEvent {
1590            task_id: TaskId(0),
1591            agent_handle_id: "h0".to_string(),
1592            outcome: TaskOutcome::Completed {
1593                output: "result".to_string(),
1594                artifacts: vec![],
1595            },
1596        };
1597        scheduler.buffered_events.push_back(event);
1598        scheduler.tick();
1599
1600        let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1601        assert!(
1602            result.duration_ms > 0,
1603            "duration_ms should be > 0, got {}",
1604            result.duration_ms
1605        );
1606    }
1607
1608    #[test]
1609    fn test_utf8_safe_truncation() {
1610        // S1 regression: truncation must not panic on multi-byte UTF-8.
1611        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1612        graph.tasks[0].status = TaskStatus::Completed;
1613        // Unicode: each char is 3 bytes in UTF-8.
1614        let unicode_output = "日本語テスト".repeat(100);
1615        graph.tasks[0].result = Some(TaskResult {
1616            output: unicode_output,
1617            artifacts: vec![],
1618            duration_ms: 10,
1619            agent_id: None,
1620            agent_def: None,
1621        });
1622
1623        // Budget large enough to hold the spotlighting wrapper + some Japanese chars.
1624        // The sanitizer adds ~200 chars of spotlight header, so 500 chars is sufficient.
1625        let config = crate::config::OrchestrationConfig {
1626            dependency_context_budget: 500,
1627            ..make_config()
1628        };
1629        let scheduler = DagScheduler::new(
1630            graph,
1631            &config,
1632            Box::new(FirstRouter),
1633            vec![make_def("worker")],
1634        )
1635        .unwrap();
1636
1637        // Must not panic, and Japanese chars must be preserved in the output.
1638        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1639        assert!(
1640            prompt.contains("日"),
1641            "Japanese characters should be in the prompt after safe truncation"
1642        );
1643    }
1644
1645    #[test]
1646    fn test_no_agent_routes_inline() {
1647        // NoneRouter: when no agent matches, task falls back to RunInline.
1648        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1649        let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1650        let actions = scheduler.tick();
1651        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1652        assert!(
1653            actions
1654                .iter()
1655                .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
1656        );
1657    }
1658
1659    #[test]
1660    fn test_stale_event_rejected() {
1661        // Regression: events from a previous agent incarnation must be discarded.
1662        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1663        let mut scheduler = make_scheduler(graph);
1664
1665        // Simulate task running with handle "current-handle".
1666        scheduler.graph.tasks[0].status = TaskStatus::Running;
1667        scheduler.running.insert(
1668            TaskId(0),
1669            RunningTask {
1670                agent_handle_id: "current-handle".to_string(),
1671                agent_def_name: "worker".to_string(),
1672                started_at: Instant::now(),
1673            },
1674        );
1675
1676        // Send a completion event from the OLD agent (stale handle).
1677        let stale_event = TaskEvent {
1678            task_id: TaskId(0),
1679            agent_handle_id: "old-handle".to_string(),
1680            outcome: TaskOutcome::Completed {
1681                output: "stale output".to_string(),
1682                artifacts: vec![],
1683            },
1684        };
1685        scheduler.buffered_events.push_back(stale_event);
1686        let actions = scheduler.tick();
1687
1688        // Stale event must be discarded — task must NOT be completed.
1689        assert_ne!(
1690            scheduler.graph.tasks[0].status,
1691            TaskStatus::Completed,
1692            "stale event must not complete the task"
1693        );
1694        // No Spawn or Done actions should result from a discarded stale event.
1695        let has_done = actions
1696            .iter()
1697            .any(|a| matches!(a, SchedulerAction::Done { .. }));
1698        assert!(
1699            !has_done,
1700            "no Done action should be emitted for a stale event"
1701        );
1702        // Task must still be in the running map.
1703        assert!(
1704            scheduler.running.contains_key(&TaskId(0)),
1705            "running task must remain after stale event"
1706        );
1707    }
1708
1709    #[test]
1710    fn test_build_prompt_chars_count_in_truncation_message() {
1711        // Fix #3: truncation message must report char count, not byte count.
1712        // Use pure ASCII so sanitization doesn't significantly change char count.
1713        // Budget < output length => truncation triggered; verify the count label is "chars total".
1714        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1715        graph.tasks[0].status = TaskStatus::Completed;
1716        // ASCII output: byte count == char count, so both old and new code produce the same number,
1717        // but the label "chars total" (not "bytes total") is what matters here.
1718        let output = "x".repeat(200);
1719        graph.tasks[0].result = Some(TaskResult {
1720            output,
1721            artifacts: vec![],
1722            duration_ms: 10,
1723            agent_id: None,
1724            agent_def: None,
1725        });
1726
1727        let config = crate::config::OrchestrationConfig {
1728            dependency_context_budget: 10, // truncate: sanitized output >> 10 chars
1729            ..make_config()
1730        };
1731        let scheduler = DagScheduler::new(
1732            graph,
1733            &config,
1734            Box::new(FirstRouter),
1735            vec![make_def("worker")],
1736        )
1737        .unwrap();
1738
1739        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1740        // Truncation must have been triggered and the message must use "chars total" label.
1741        assert!(
1742            prompt.contains("chars total"),
1743            "truncation message must use 'chars total' label. Prompt: {prompt}"
1744        );
1745        assert!(
1746            prompt.contains("[truncated:"),
1747            "prompt must contain truncation notice. Prompt: {prompt}"
1748        );
1749    }
1750
1751    // --- resume_from tests (MT-1) ---
1752
1753    #[test]
1754    fn test_resume_from_accepts_paused_graph() {
1755        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1756        graph.status = GraphStatus::Paused;
1757        graph.tasks[0].status = TaskStatus::Pending;
1758
1759        let scheduler =
1760            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1761                .expect("resume_from should accept Paused graph");
1762        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1763    }
1764
1765    #[test]
1766    fn test_resume_from_accepts_failed_graph() {
1767        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1768        graph.status = GraphStatus::Failed;
1769        graph.tasks[0].status = TaskStatus::Failed;
1770
1771        let scheduler =
1772            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1773                .expect("resume_from should accept Failed graph");
1774        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1775    }
1776
1777    #[test]
1778    fn test_resume_from_rejects_completed_graph() {
1779        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1780        graph.status = GraphStatus::Completed;
1781
1782        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1783            .unwrap_err();
1784        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1785    }
1786
1787    #[test]
1788    fn test_resume_from_rejects_canceled_graph() {
1789        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1790        graph.status = GraphStatus::Canceled;
1791
1792        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1793            .unwrap_err();
1794        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1795    }
1796
1797    #[test]
1798    fn test_resume_from_reconstructs_running_tasks() {
1799        // IC1: tasks that were Running at pause time must appear in the running map.
1800        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1801        graph.status = GraphStatus::Paused;
1802        graph.tasks[0].status = TaskStatus::Running;
1803        graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
1804        graph.tasks[0].agent_hint = Some("worker".to_string());
1805        graph.tasks[1].status = TaskStatus::Pending;
1806
1807        let scheduler =
1808            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1809                .expect("should succeed");
1810
1811        assert!(
1812            scheduler.running.contains_key(&TaskId(0)),
1813            "Running task must be reconstructed in the running map (IC1)"
1814        );
1815        assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
1816        assert!(
1817            !scheduler.running.contains_key(&TaskId(1)),
1818            "Pending task must not appear in running map"
1819        );
1820    }
1821
1822    #[test]
1823    fn test_resume_from_sets_status_running() {
1824        // II3: resume_from must set graph.status = Running regardless of input status.
1825        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1826        graph.status = GraphStatus::Paused;
1827
1828        let scheduler =
1829            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1830                .unwrap();
1831        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1832    }
1833
1834    // --- #1619 regression tests: consecutive_spawn_failures + exponential backoff ---
1835
1836    #[test]
1837    fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
1838        // Each tick where all spawns hit ConcurrencyLimit must increment the counter
1839        // via record_batch_backoff(false, true).
1840        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1841        let mut scheduler = make_scheduler(graph);
1842        scheduler.graph.tasks[0].status = TaskStatus::Running;
1843
1844        assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
1845
1846        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1847        scheduler.record_spawn_failure(TaskId(0), &error);
1848        // record_spawn_failure no longer increments; batch_backoff does.
1849        scheduler.record_batch_backoff(false, true);
1850        assert_eq!(
1851            scheduler.consecutive_spawn_failures, 1,
1852            "first deferral tick: consecutive_spawn_failures must be 1"
1853        );
1854
1855        scheduler.graph.tasks[0].status = TaskStatus::Running;
1856        scheduler.record_spawn_failure(TaskId(0), &error);
1857        scheduler.record_batch_backoff(false, true);
1858        assert_eq!(
1859            scheduler.consecutive_spawn_failures, 2,
1860            "second deferral tick: consecutive_spawn_failures must be 2"
1861        );
1862
1863        scheduler.graph.tasks[0].status = TaskStatus::Running;
1864        scheduler.record_spawn_failure(TaskId(0), &error);
1865        scheduler.record_batch_backoff(false, true);
1866        assert_eq!(
1867            scheduler.consecutive_spawn_failures, 3,
1868            "third deferral tick: consecutive_spawn_failures must be 3"
1869        );
1870    }
1871
1872    #[test]
1873    fn test_consecutive_spawn_failures_resets_on_success() {
1874        // record_spawn() after deferrals must reset consecutive_spawn_failures to 0
1875        // (via record_spawn internal reset; record_batch_backoff(true, _) also resets).
1876        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1877        let mut scheduler = make_scheduler(graph);
1878        scheduler.graph.tasks[0].status = TaskStatus::Running;
1879
1880        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1881        scheduler.record_spawn_failure(TaskId(0), &error);
1882        scheduler.record_batch_backoff(false, true);
1883        scheduler.graph.tasks[0].status = TaskStatus::Running;
1884        scheduler.record_spawn_failure(TaskId(0), &error);
1885        scheduler.record_batch_backoff(false, true);
1886        assert_eq!(scheduler.consecutive_spawn_failures, 2);
1887
1888        // Successful spawn resets the counter directly in record_spawn.
1889        scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
1890        assert_eq!(
1891            scheduler.consecutive_spawn_failures, 0,
1892            "record_spawn must reset consecutive_spawn_failures to 0"
1893        );
1894    }
1895
1896    #[tokio::test]
1897    async fn test_exponential_backoff_duration() {
1898        // With consecutive_spawn_failures=0, backoff equals the base interval.
1899        // With consecutive_spawn_failures=3, backoff = min(base * 8, 5000ms).
1900        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1901        let config = crate::config::OrchestrationConfig {
1902            deferral_backoff_ms: 50,
1903            ..make_config()
1904        };
1905        let mut scheduler = DagScheduler::new(
1906            graph,
1907            &config,
1908            Box::new(FirstRouter),
1909            vec![make_def("worker")],
1910        )
1911        .unwrap();
1912
1913        // consecutive_spawn_failures=0 → sleep ≈ 50ms (base).
1914        assert_eq!(scheduler.consecutive_spawn_failures, 0);
1915        let start = tokio::time::Instant::now();
1916        scheduler.wait_event().await;
1917        let elapsed0 = start.elapsed();
1918        assert!(
1919            elapsed0.as_millis() >= 50,
1920            "backoff with 0 deferrals must be >= base (50ms), got {}ms",
1921            elapsed0.as_millis()
1922        );
1923
1924        // Simulate 3 consecutive deferrals: multiplier = 2^3 = 8 → 400ms, capped at 5000ms.
1925        scheduler.consecutive_spawn_failures = 3;
1926        let start = tokio::time::Instant::now();
1927        scheduler.wait_event().await;
1928        let elapsed3 = start.elapsed();
1929        assert!(
1930            elapsed3.as_millis() >= 400,
1931            "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
1932            elapsed3.as_millis()
1933        );
1934
1935        // Simulate 20 consecutive deferrals: exponent capped at 10 → 50 * 1024 = 51200 → capped at 5000ms.
1936        scheduler.consecutive_spawn_failures = 20;
1937        let start = tokio::time::Instant::now();
1938        scheduler.wait_event().await;
1939        let elapsed20 = start.elapsed();
1940        assert!(
1941            elapsed20.as_millis() >= 5000,
1942            "backoff must be capped at 5000ms with high deferrals, got {}ms",
1943            elapsed20.as_millis()
1944        );
1945    }
1946
1947    // --- deferral_backoff regression test ---
1948
1949    #[tokio::test]
1950    async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
1951        // Regression for issue #1519: wait_event must sleep deferral_backoff when
1952        // running is empty, preventing a busy spin-loop.
1953        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1954        let config = crate::config::OrchestrationConfig {
1955            deferral_backoff_ms: 50,
1956            ..make_config()
1957        };
1958        let mut scheduler = DagScheduler::new(
1959            graph,
1960            &config,
1961            Box::new(FirstRouter),
1962            vec![make_def("worker")],
1963        )
1964        .unwrap();
1965
1966        // Do not start any tasks — running map stays empty.
1967        assert!(scheduler.running.is_empty());
1968
1969        let start = tokio::time::Instant::now();
1970        scheduler.wait_event().await;
1971        let elapsed = start.elapsed();
1972
1973        assert!(
1974            elapsed.as_millis() >= 50,
1975            "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
1976            elapsed.as_millis()
1977        );
1978    }
1979
1980    #[test]
1981    fn test_current_deferral_backoff_exponential_growth() {
1982        // Regression for issue #1618: backoff must grow exponentially with consecutive
1983        // spawn failures so the scheduler does not busy-spin at 250ms when saturated.
1984        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1985        let config = crate::config::OrchestrationConfig {
1986            deferral_backoff_ms: 250,
1987            ..make_config()
1988        };
1989        let mut scheduler = DagScheduler::new(
1990            graph,
1991            &config,
1992            Box::new(FirstRouter),
1993            vec![make_def("worker")],
1994        )
1995        .unwrap();
1996
1997        assert_eq!(
1998            scheduler.current_deferral_backoff(),
1999            Duration::from_millis(250)
2000        );
2001
2002        scheduler.consecutive_spawn_failures = 1;
2003        assert_eq!(
2004            scheduler.current_deferral_backoff(),
2005            Duration::from_millis(500)
2006        );
2007
2008        scheduler.consecutive_spawn_failures = 2;
2009        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2010
2011        scheduler.consecutive_spawn_failures = 3;
2012        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2013
2014        scheduler.consecutive_spawn_failures = 4;
2015        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2016
2017        // Cap at 5 seconds.
2018        scheduler.consecutive_spawn_failures = 5;
2019        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2020
2021        scheduler.consecutive_spawn_failures = 100;
2022        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2023    }
2024
2025    #[test]
2026    fn test_record_spawn_resets_consecutive_failures() {
2027        // Regression for issue #1618: a successful spawn resets the backoff counter.
2028        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2029        let mut scheduler = DagScheduler::new(
2030            graph,
2031            &make_config(),
2032            Box::new(FirstRouter),
2033            vec![make_def("worker")],
2034        )
2035        .unwrap();
2036
2037        scheduler.consecutive_spawn_failures = 3;
2038        let task_id = TaskId(0);
2039        scheduler.graph.tasks[0].status = TaskStatus::Running;
2040        scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2041
2042        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2043    }
2044
2045    #[test]
2046    fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2047        // record_spawn_failure(ConcurrencyLimit) reverts task to Ready but does NOT
2048        // change consecutive_spawn_failures — that is the job of record_batch_backoff.
2049        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2050        let mut scheduler = DagScheduler::new(
2051            graph,
2052            &make_config(),
2053            Box::new(FirstRouter),
2054            vec![make_def("worker")],
2055        )
2056        .unwrap();
2057
2058        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2059        let task_id = TaskId(0);
2060        scheduler.graph.tasks[0].status = TaskStatus::Running;
2061
2062        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2063        scheduler.record_spawn_failure(task_id, &error);
2064
2065        // Counter unchanged — batch_backoff is responsible for incrementing.
2066        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2067        // Task reverted to Ready.
2068        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2069    }
2070
2071    // --- #1628 parallel dispatch tests ---
2072
2073    #[test]
2074    fn test_parallel_dispatch_all_ready() {
2075        // tick() must emit Spawn for ALL ready tasks, not just max_parallel.
2076        // Here 6 independent tasks with max_parallel=2.
2077        let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2078        let graph = graph_from_nodes(nodes);
2079        let config = crate::config::OrchestrationConfig {
2080            max_parallel: 2,
2081            ..make_config()
2082        };
2083        let mut scheduler = DagScheduler::new(
2084            graph,
2085            &config,
2086            Box::new(FirstRouter),
2087            vec![make_def("worker")],
2088        )
2089        .unwrap();
2090
2091        let actions = scheduler.tick();
2092        let spawn_count = actions
2093            .iter()
2094            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2095            .count();
2096        assert_eq!(spawn_count, 6, "all 6 ready tasks must be dispatched");
2097
2098        let running_count = scheduler
2099            .graph
2100            .tasks
2101            .iter()
2102            .filter(|t| t.status == TaskStatus::Running)
2103            .count();
2104        assert_eq!(running_count, 6, "all 6 tasks must be marked Running");
2105    }
2106
2107    #[test]
2108    fn test_batch_backoff_partial_success() {
2109        // Some spawns succeed, some hit ConcurrencyLimit: counter resets to 0.
2110        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2111        let mut scheduler = make_scheduler(graph);
2112        scheduler.consecutive_spawn_failures = 3;
2113
2114        scheduler.record_batch_backoff(true, true);
2115        assert_eq!(
2116            scheduler.consecutive_spawn_failures, 0,
2117            "any success in batch must reset counter"
2118        );
2119    }
2120
2121    #[test]
2122    fn test_batch_backoff_all_failed() {
2123        // All spawns hit ConcurrencyLimit: counter increments by 1.
2124        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2125        let mut scheduler = make_scheduler(graph);
2126        scheduler.consecutive_spawn_failures = 2;
2127
2128        scheduler.record_batch_backoff(false, true);
2129        assert_eq!(
2130            scheduler.consecutive_spawn_failures, 3,
2131            "all-failure tick must increment counter"
2132        );
2133    }
2134
2135    #[test]
2136    fn test_batch_backoff_no_spawns() {
2137        // No spawn actions in tick: counter unchanged.
2138        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2139        let mut scheduler = make_scheduler(graph);
2140        scheduler.consecutive_spawn_failures = 5;
2141
2142        scheduler.record_batch_backoff(false, false);
2143        assert_eq!(
2144            scheduler.consecutive_spawn_failures, 5,
2145            "no spawns must not change counter"
2146        );
2147    }
2148
2149    #[test]
2150    fn test_buffer_guard_uses_task_count() {
2151        // Structural guard: verifies that the buffer capacity expression uses
2152        // graph.tasks.len() * 2 rather than max_parallel * 2. This is an intentional
2153        // regression-prevention test — if wait_event() is accidentally reverted to
2154        // max_parallel * 2 the assertion below catches the discrepancy.
2155        // Behavioral coverage (actual buffer drop prevention) requires an async harness
2156        // with a real channel, which is outside the scope of this unit test.
2157        //
2158        // Scenario: 10 tasks, max_parallel=2 → tasks.len()*2=20, max_parallel*2=4.
2159        // The guard must use 20, not 4.
2160        let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2161        let graph = graph_from_nodes(nodes);
2162        let config = crate::config::OrchestrationConfig {
2163            max_parallel: 2, // 2*2=4, but tasks.len()*2=20
2164            ..make_config()
2165        };
2166        let scheduler = DagScheduler::new(
2167            graph,
2168            &config,
2169            Box::new(FirstRouter),
2170            vec![make_def("worker")],
2171        )
2172        .unwrap();
2173        // Confirm: tasks.len() * 2 = 20, max_parallel * 2 = 4.
2174        assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2175        assert_eq!(scheduler.max_parallel * 2, 4);
2176    }
2177
2178    #[test]
2179    fn test_batch_mixed_concurrency_and_fatal_failure() {
2180        // Mixed batch: task 0 gets ConcurrencyLimit (transient), task 1 gets a
2181        // non-transient Spawn error (fatal). Two independent tasks, no deps between them.
2182        // Verify:
2183        // - task 0 reverts to Ready (retried next tick)
2184        // - task 1 is marked Failed; with FailureStrategy::Skip the graph stays Running
2185        //   because task 1 has no dependents that would abort the graph
2186        // - record_batch_backoff(false, true) increments counter by 1
2187        let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2188        // FailureStrategy::Skip: task 1 fails but its absence is ignored.
2189        nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2190        let graph = graph_from_nodes(nodes);
2191        let mut scheduler = make_scheduler(graph);
2192
2193        // Optimistically mark both as Running (as tick() would do).
2194        scheduler.graph.tasks[0].status = TaskStatus::Running;
2195        scheduler.graph.tasks[1].status = TaskStatus::Running;
2196
2197        // Task 0: ConcurrencyLimit (transient).
2198        let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2199        let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2200        assert!(
2201            actions0.is_empty(),
2202            "ConcurrencyLimit must produce no extra actions"
2203        );
2204        assert_eq!(
2205            scheduler.graph.tasks[0].status,
2206            TaskStatus::Ready,
2207            "task 0 must revert to Ready"
2208        );
2209
2210        // Task 1: non-transient Spawn failure. record_spawn_failure marks it Failed,
2211        // then propagate_failure applies FailureStrategy::Skip → status becomes Skipped.
2212        let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2213        let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2214        assert_eq!(
2215            scheduler.graph.tasks[1].status,
2216            TaskStatus::Skipped,
2217            "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2218        );
2219        // No Done action from record_spawn_failure — graph still has task 0 alive.
2220        assert!(
2221            actions1
2222                .iter()
2223                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2224            "no Done action expected: task 0 is still Ready"
2225        );
2226
2227        // Batch result: no success, one ConcurrencyLimit failure.
2228        scheduler.consecutive_spawn_failures = 0;
2229        scheduler.record_batch_backoff(false, true);
2230        assert_eq!(
2231            scheduler.consecutive_spawn_failures, 1,
2232            "batch with only ConcurrencyLimit must increment counter"
2233        );
2234    }
2235}