Skip to main content

zeph_orchestration/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! DAG execution scheduler: drives task graph execution by emitting `SchedulerAction` commands.
5
6use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::cascade::{CascadeConfig, CascadeDetector};
14use super::dag;
15use super::error::OrchestrationError;
16use super::graph::{
17    ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
18};
19use super::router::AgentRouter;
20use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
21use super::verifier::inject_tasks as verifier_inject_tasks;
22use zeph_config::OrchestrationConfig;
23use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
24use zeph_subagent::{SubAgentDef, SubAgentError};
25
26/// Actions the scheduler requests the caller to perform.
27///
28/// The scheduler never holds `&mut SubAgentManager` — it produces these
29/// actions for the caller to execute (ADR-026 command pattern).
30#[derive(Debug)]
31pub enum SchedulerAction {
32    /// Spawn a sub-agent for a task.
33    Spawn {
34        task_id: TaskId,
35        agent_def_name: String,
36        prompt: String,
37    },
38    /// Cancel a running sub-agent (on graph abort/skip).
39    Cancel { agent_handle_id: String },
40    /// Execute a task inline via the main agent (no sub-agents configured).
41    RunInline { task_id: TaskId, prompt: String },
42    /// Graph reached a terminal or paused state.
43    Done { status: GraphStatus },
44    /// Request verification of a completed task's output (emitted when `verify_completeness=true`).
45    ///
46    /// The task stays `Completed` during verification. Downstream tasks are unblocked
47    /// immediately — verification is best-effort and does not gate dispatch.
48    /// The caller runs `PlanVerifier::verify()`, then optionally `PlanVerifier::replan()`,
49    /// then calls `DagScheduler::inject_tasks()` if new tasks were generated.
50    Verify { task_id: TaskId, output: String },
51}
52
53/// Event sent by a sub-agent loop when it terminates.
54#[derive(Debug)]
55pub struct TaskEvent {
56    pub task_id: TaskId,
57    pub agent_handle_id: String,
58    pub outcome: TaskOutcome,
59}
60
61/// Outcome of a sub-agent execution.
62#[derive(Debug)]
63pub enum TaskOutcome {
64    /// Agent completed successfully.
65    Completed {
66        output: String,
67        artifacts: Vec<PathBuf>,
68    },
69    /// Agent failed.
70    Failed { error: String },
71}
72
73/// Tracks a running task's spawn time and definition name for timeout detection.
74struct RunningTask {
75    agent_handle_id: String,
76    agent_def_name: String,
77    started_at: Instant,
78}
79
80/// DAG execution engine.
81///
82/// Drives task graph execution by producing `SchedulerAction` values
83/// that the caller executes against `SubAgentManager`.
84///
85/// # Caller Loop
86///
87/// ```text
88/// loop {
89///     let actions = scheduler.tick();
90///     for action in actions {
91///         match action {
92///             Spawn { task_id, agent_def_name, prompt } => {
93///                 match manager.spawn_for_task(...) {
94///                     Ok(handle_id) => scheduler.record_spawn(task_id, handle_id),
95///                     Err(e) => { for a in scheduler.record_spawn_failure(task_id, &e) { /* exec */ } }
96///                 }
97///             }
98///             Cancel { agent_handle_id } => { manager.cancel(&agent_handle_id); }
99///             Done { .. } => break,
100///         }
101///     }
102///     scheduler.wait_event().await;
103/// }
104/// ```
105#[allow(clippy::struct_excessive_bools)]
106pub struct DagScheduler {
107    graph: TaskGraph,
108    max_parallel: usize,
109    /// Immutable base parallelism limit from config. Never changes after construction.
110    ///
111    /// `max_parallel` is derived from this via `TopologyClassifier::compute_max_parallel`
112    /// and may be lower (e.g., 1 for `LinearChain`). Using `config_max_parallel` as the
113    /// base prevents drift: successive replan cycles always compute from the original
114    /// config value, not from a previously reduced `max_parallel`.
115    config_max_parallel: usize,
116    /// Maps `TaskId` -> running sub-agent state.
117    running: HashMap<TaskId, RunningTask>,
118    /// Receives completion/failure events from sub-agent loops.
119    event_rx: mpsc::Receiver<TaskEvent>,
120    /// Sender cloned into each spawned sub-agent via `spawn_for_task`.
121    event_tx: mpsc::Sender<TaskEvent>,
122    /// Per-task wall-clock timeout.
123    task_timeout: Duration,
124    /// Router for agent selection.
125    router: Box<dyn AgentRouter>,
126    /// Available agent definitions (cached from `SubAgentManager`).
127    available_agents: Vec<SubAgentDef>,
128    /// Total character budget for cross-task dependency context injection.
129    dependency_context_budget: usize,
130    /// Events buffered by `wait_event` for processing in the next `tick`.
131    buffered_events: VecDeque<TaskEvent>,
132    /// Sanitizer for dependency output injected into task prompts (SEC-ORCH-01).
133    sanitizer: ContentSanitizer,
134    /// Backoff duration before retrying deferred tasks when all ready tasks hit the concurrency limit.
135    deferral_backoff: Duration,
136    /// Consecutive spawn failures due to concurrency limits. Used to compute exponential backoff.
137    consecutive_spawn_failures: u32,
138    /// Topology analysis result. Recomputed on next tick when `topology_dirty=true`.
139    topology: TopologyAnalysis,
140    /// When true, topology is re-analyzed at the start of the next tick.
141    /// Set by `inject_tasks()` after appending replan tasks (critic C2).
142    topology_dirty: bool,
143    /// Current dispatch level for `LevelBarrier` strategy.
144    current_level: usize,
145    /// Whether post-task verification is enabled (`config.verify_completeness`).
146    verify_completeness: bool,
147    /// Provider name for verification LLM calls (`config.verify_provider`).
148    /// Empty string = use the agent's primary provider.
149    verify_provider: String,
150    /// Per-task replan count. Limits replanning to 1 cycle per task (critic S2).
151    task_replan_counts: HashMap<TaskId, u32>,
152    /// Global replan counter across the entire scheduler run (critic S2).
153    global_replan_count: u32,
154    /// Global replan hard cap from config.
155    max_replans: u32,
156    /// Completeness score threshold from config. Replan is triggered when
157    /// `VerificationResult::confidence < completeness_threshold_value` AND gaps exist.
158    completeness_threshold_value: f32,
159    /// Cascade failure detector. `Some` when `cascade_routing = true`.
160    cascade_detector: Option<CascadeDetector>,
161    /// Whether `tree_optimized_dispatch` was enabled at construction.
162    /// Stored so the dirty-reanalysis path can reproduce the same strategy mapping.
163    tree_optimized_dispatch: bool,
164    /// Whether `cascade_routing` was enabled at construction.
165    cascade_routing: bool,
166}
167
168impl std::fmt::Debug for DagScheduler {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("DagScheduler")
171            .field("graph_id", &self.graph.id)
172            .field("graph_status", &self.graph.status)
173            .field("running_count", &self.running.len())
174            .field("max_parallel", &self.max_parallel)
175            .field("task_timeout_secs", &self.task_timeout.as_secs())
176            .field("topology", &self.topology.topology)
177            .field("strategy", &self.topology.strategy)
178            .field("current_level", &self.current_level)
179            .field("global_replan_count", &self.global_replan_count)
180            .field("cascade_routing", &self.cascade_routing)
181            .field("tree_optimized_dispatch", &self.tree_optimized_dispatch)
182            .finish_non_exhaustive()
183    }
184}
185
186impl DagScheduler {
187    /// Create a new scheduler for the given graph.
188    ///
189    /// The graph must be in `Created` status. The scheduler transitions
190    /// it to `Running` and marks root tasks as `Ready`.
191    ///
192    /// # Errors
193    ///
194    /// Returns `OrchestrationError::InvalidGraph` if the graph is not in
195    /// `Created` status or has no tasks.
196    pub fn new(
197        mut graph: TaskGraph,
198        config: &OrchestrationConfig,
199        router: Box<dyn AgentRouter>,
200        available_agents: Vec<SubAgentDef>,
201    ) -> Result<Self, OrchestrationError> {
202        if graph.status != GraphStatus::Created {
203            return Err(OrchestrationError::InvalidGraph(format!(
204                "graph must be in Created status, got {}",
205                graph.status
206            )));
207        }
208
209        dag::validate(&graph.tasks, config.max_tasks as usize)?;
210
211        graph.status = GraphStatus::Running;
212
213        for task in &mut graph.tasks {
214            if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
215                task.status = TaskStatus::Ready;
216            }
217        }
218
219        let (event_tx, event_rx) = mpsc::channel(64);
220
221        let task_timeout = if config.task_timeout_secs > 0 {
222            Duration::from_secs(config.task_timeout_secs)
223        } else {
224            Duration::from_secs(600)
225        };
226
227        let topology = TopologyClassifier::analyze(&graph, config);
228        let max_parallel = topology.max_parallel;
229        let config_max_parallel = config.max_parallel as usize;
230
231        if config.topology_selection {
232            tracing::debug!(
233                topology = ?topology.topology,
234                strategy = ?topology.strategy,
235                max_parallel,
236                "topology-aware concurrency limit applied"
237            );
238        }
239
240        // Validate cascade_routing dependency on topology_selection.
241        if config.cascade_routing && !config.topology_selection {
242            tracing::warn!(
243                "cascade_routing = true requires topology_selection = true; \
244                 cascade routing is disabled (topology_selection is off)"
245            );
246        }
247
248        let cascade_detector = if config.cascade_routing && config.topology_selection {
249            Some(CascadeDetector::new(CascadeConfig {
250                failure_threshold: config.cascade_failure_threshold,
251            }))
252        } else {
253            None
254        };
255
256        Ok(Self {
257            graph,
258            max_parallel,
259            config_max_parallel,
260            running: HashMap::new(),
261            event_rx,
262            event_tx,
263            task_timeout,
264            router,
265            available_agents,
266            dependency_context_budget: config.dependency_context_budget,
267            buffered_events: VecDeque::new(),
268            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
269            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
270            consecutive_spawn_failures: 0,
271            topology,
272            topology_dirty: false,
273            current_level: 0,
274            verify_completeness: config.verify_completeness,
275            verify_provider: config.verify_provider.trim().to_string(),
276            task_replan_counts: HashMap::new(),
277            global_replan_count: 0,
278            max_replans: config.max_replans,
279            completeness_threshold_value: config.completeness_threshold,
280            cascade_detector,
281            tree_optimized_dispatch: config.tree_optimized_dispatch,
282            cascade_routing: config.cascade_routing && config.topology_selection,
283        })
284    }
285
286    /// Create a scheduler from a graph that is in `Paused` or `Failed` status.
287    ///
288    /// Used for resume and retry flows. The caller is responsible for calling
289    /// [`dag::reset_for_retry`] (for retry) before passing the graph here.
290    ///
291    /// This constructor sets `graph.status = Running` (II3) and reconstructs
292    /// the `running` map from tasks that are still in `Running` state (IC1), so
293    /// their completion events are not silently dropped on the next tick.
294    ///
295    /// # Errors
296    ///
297    /// Returns `OrchestrationError::InvalidGraph` if the graph is in `Completed`
298    /// or `Canceled` status (terminal states that cannot be resumed).
299    pub fn resume_from(
300        mut graph: TaskGraph,
301        config: &OrchestrationConfig,
302        router: Box<dyn AgentRouter>,
303        available_agents: Vec<SubAgentDef>,
304    ) -> Result<Self, OrchestrationError> {
305        if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
306            return Err(OrchestrationError::InvalidGraph(format!(
307                "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
308                graph.status
309            )));
310        }
311
312        // II3: ensure the graph is in Running state so tick() does not immediately
313        // return Done{Paused}.
314        graph.status = GraphStatus::Running;
315
316        // IC1: reconstruct the `running` map from tasks that were still Running at
317        // pause time. Without this their completion events would arrive but
318        // process_event would ignore them (it checks self.running), leaving the
319        // task stuck until timeout.
320        let running: HashMap<TaskId, RunningTask> = graph
321            .tasks
322            .iter()
323            .filter(|t| t.status == TaskStatus::Running)
324            .filter_map(|t| {
325                let handle_id = t.assigned_agent.clone()?;
326                let def_name = t.agent_hint.clone().unwrap_or_default();
327                Some((
328                    t.id,
329                    RunningTask {
330                        agent_handle_id: handle_id,
331                        agent_def_name: def_name,
332                        // Conservative: treat as just-started so timeout window is reset.
333                        started_at: Instant::now(),
334                    },
335                ))
336            })
337            .collect();
338
339        let (event_tx, event_rx) = mpsc::channel(64);
340
341        let task_timeout = if config.task_timeout_secs > 0 {
342            Duration::from_secs(config.task_timeout_secs)
343        } else {
344            Duration::from_secs(600)
345        };
346
347        let topology = TopologyClassifier::analyze(&graph, config);
348        let max_parallel = topology.max_parallel;
349        let config_max_parallel = config.max_parallel as usize;
350
351        let cascade_detector = if config.cascade_routing && config.topology_selection {
352            Some(CascadeDetector::new(CascadeConfig {
353                failure_threshold: config.cascade_failure_threshold,
354            }))
355        } else {
356            None
357        };
358
359        Ok(Self {
360            graph,
361            max_parallel,
362            config_max_parallel,
363            running,
364            event_rx,
365            event_tx,
366            task_timeout,
367            router,
368            available_agents,
369            dependency_context_budget: config.dependency_context_budget,
370            buffered_events: VecDeque::new(),
371            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
372            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
373            consecutive_spawn_failures: 0,
374            topology,
375            topology_dirty: false,
376            current_level: 0,
377            verify_completeness: config.verify_completeness,
378            verify_provider: config.verify_provider.trim().to_string(),
379            task_replan_counts: HashMap::new(),
380            global_replan_count: 0,
381            max_replans: config.max_replans,
382            completeness_threshold_value: config.completeness_threshold,
383            cascade_detector,
384            tree_optimized_dispatch: config.tree_optimized_dispatch,
385            cascade_routing: config.cascade_routing && config.topology_selection,
386        })
387    }
388
389    /// Validate that `verify_provider` references a known provider name.
390    ///
391    /// Call this after construction when `verify_completeness = true` to catch
392    /// misconfiguration early rather than failing open at runtime.
393    ///
394    /// - Empty `verify_provider` is always valid (falls back to the primary provider).
395    /// - If `provider_names` is empty, validation is skipped (provider set is unknown).
396    /// - Provider names are compared case-sensitively (matching the existing resolution convention).
397    ///
398    /// # Errors
399    ///
400    /// Returns `OrchestrationError::InvalidConfig` when `verify_completeness = true`,
401    /// `verify_provider` is non-empty, and the name is not present in `provider_names`.
402    pub fn validate_verify_config(
403        &self,
404        provider_names: &[&str],
405    ) -> Result<(), OrchestrationError> {
406        if !self.verify_completeness {
407            return Ok(());
408        }
409        let name = self.verify_provider.as_str();
410        if name.is_empty() || provider_names.is_empty() {
411            return Ok(());
412        }
413        if !provider_names.contains(&name) {
414            return Err(OrchestrationError::InvalidConfig(format!(
415                "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
416                name,
417                provider_names.join(", ")
418            )));
419        }
420        Ok(())
421    }
422
423    /// Get a clone of the event sender for injection into sub-agent loops.
424    #[must_use]
425    pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
426        self.event_tx.clone()
427    }
428
429    /// Immutable reference to the current graph state.
430    #[must_use]
431    pub fn graph(&self) -> &TaskGraph {
432        &self.graph
433    }
434
435    /// Return the final graph state.
436    ///
437    /// Clones the graph since `Drop` is implemented on the scheduler.
438    #[must_use]
439    pub fn into_graph(&self) -> TaskGraph {
440        self.graph.clone()
441    }
442
443    /// Current topology analysis.
444    #[must_use]
445    pub fn topology(&self) -> &TopologyAnalysis {
446        &self.topology
447    }
448
449    /// Minimum completeness score threshold from config.
450    ///
451    /// Used by the agent loop to gate whole-plan replan: replan is triggered when
452    /// `VerificationResult::confidence < completeness_threshold` AND gaps exist.
453    #[must_use]
454    pub fn completeness_threshold(&self) -> f32 {
455        self.completeness_threshold_value
456    }
457
458    /// Provider name for verification LLM calls (empty = use primary provider).
459    #[must_use]
460    pub fn verify_provider_name(&self) -> &str {
461        &self.verify_provider
462    }
463
464    /// Remaining whole-plan replan budget: `max_replans - global_replan_count`.
465    ///
466    /// Returns 0 when the global cap has been reached.
467    #[must_use]
468    pub fn max_replans_remaining(&self) -> u32 {
469        self.max_replans.saturating_sub(self.global_replan_count)
470    }
471
472    /// Increment `global_replan_count` to record a whole-plan replan cycle.
473    ///
474    /// Called by the agent loop after executing a partial DAG from whole-plan gaps.
475    /// Does NOT inject tasks into the original graph (the partial DAG is a separate run).
476    pub fn record_whole_plan_replan(&mut self) {
477        self.global_replan_count = self.global_replan_count.saturating_add(1);
478    }
479
480    /// Inject new tasks into the graph after a verify-replan cycle.
481    ///
482    /// Appends tasks and validates DAG acyclicity. Sets `topology_dirty=true` so
483    /// topology is re-analyzed at the start of the next `tick()`. Does NOT
484    /// re-analyze topology here (critic C2 — topology computed during injection
485    /// would be stale by the next tick).
486    ///
487    /// Per-task replan cap: each task is limited to 1 replan (critic S2).
488    /// Global hard cap: total replan count across the run is limited to `max_replans`.
489    ///
490    /// # Errors
491    ///
492    /// Returns `OrchestrationError::VerificationFailed` if the graph would exceed
493    /// `max_tasks` or injection introduces a cycle.
494    pub fn inject_tasks(
495        &mut self,
496        verified_task_id: TaskId,
497        new_tasks: Vec<TaskNode>,
498        max_tasks: usize,
499    ) -> Result<(), OrchestrationError> {
500        if new_tasks.is_empty() {
501            return Ok(());
502        }
503
504        // Per-task replan limit: 1 replan per task (critic S2).
505        let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
506        if *task_replan_count >= 1 {
507            tracing::warn!(
508                task_id = %verified_task_id,
509                "per-task replan limit (1) reached, skipping replan injection"
510            );
511            return Ok(());
512        }
513
514        // Global hard cap (critic S2).
515        if self.global_replan_count >= self.max_replans {
516            tracing::warn!(
517                global_replan_count = self.global_replan_count,
518                max_replans = self.max_replans,
519                "global replan limit reached, skipping replan injection"
520            );
521            return Ok(());
522        }
523
524        verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
525
526        *task_replan_count += 1;
527        self.global_replan_count += 1;
528
529        // Signal that topology needs re-analysis on the next tick (critic C2).
530        self.topology_dirty = true;
531
532        // Reset cascade failure counts — the graph has fundamentally changed (C13 fix).
533        if let Some(ref mut det) = self.cascade_detector {
534            det.reset();
535        }
536
537        Ok(())
538    }
539}
540
541impl Drop for DagScheduler {
542    fn drop(&mut self) {
543        if !self.running.is_empty() {
544            tracing::warn!(
545                running_tasks = self.running.len(),
546                "DagScheduler dropped with running tasks; agents may continue until their \
547                 CancellationToken fires or they complete naturally"
548            );
549        }
550    }
551}
552
553impl DagScheduler {
554    /// Process pending events and produce actions for the caller.
555    ///
556    /// Call `wait_event` after processing all actions to block until the next event.
557    #[allow(clippy::too_many_lines)]
558    pub fn tick(&mut self) -> Vec<SchedulerAction> {
559        if self.graph.status != GraphStatus::Running {
560            return vec![SchedulerAction::Done {
561                status: self.graph.status,
562            }];
563        }
564
565        // Re-analyze topology when task set has changed (inject_tasks was called).
566        // Deferred from inject_tasks() to avoid stale analysis (critic C2).
567        if self.topology_dirty {
568            // Rebuild with topology_selection=true config equivalent: reuse existing analysis
569            // approach. Since we don't store the config here, re-analyze using a stub config
570            // that reflects what was originally set up.
571            let new_analysis = {
572                let n = self.graph.tasks.len();
573                if n == 0 {
574                    TopologyAnalysis {
575                        topology: Topology::AllParallel,
576                        strategy: DispatchStrategy::FullParallel,
577                        // Use the immutable config base so the fallback is consistent with
578                        // the initial analysis (not a previously reduced max_parallel).
579                        max_parallel: self.config_max_parallel,
580                        depth: 0,
581                        depths: std::collections::HashMap::new(),
582                    }
583                } else {
584                    // Compute depths once, then classify using pre-computed values.
585                    // This eliminates a redundant toposort compared to calling classify()
586                    // (which internally calls compute_longest_path_and_depths again).
587                    let (depth, depths) =
588                        super::topology::compute_depths_for_scheduler(&self.graph);
589                    let topo =
590                        TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
591                    // Reconstruct a minimal config stub carrying only the strategy-selection flags
592                    // so `strategy()` can apply the same overrides as at construction time.
593                    let strategy_config = zeph_config::OrchestrationConfig {
594                        cascade_routing: self.cascade_routing,
595                        tree_optimized_dispatch: self.tree_optimized_dispatch,
596                        ..zeph_config::OrchestrationConfig::default()
597                    };
598                    let strategy = TopologyClassifier::strategy(topo, &strategy_config);
599                    // ALWAYS use config_max_parallel as the base, never self.max_parallel,
600                    // to prevent drift across successive replan cycles (architect S2).
601                    let max_parallel =
602                        TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
603                    TopologyAnalysis {
604                        topology: topo,
605                        strategy,
606                        max_parallel,
607                        depth,
608                        depths,
609                    }
610                }
611            };
612            self.topology = new_analysis;
613            // Sync max_parallel to match the newly computed topology analysis (architect S2).
614            // The dispatch logic at line 559 reads self.max_parallel for slot computation;
615            // without this sync, self.max_parallel and self.topology.max_parallel diverge.
616            self.max_parallel = self.topology.max_parallel;
617            self.topology_dirty = false;
618
619            // After re-analysis, reset the level pointer to the shallowest non-terminal depth
620            // so injected tasks at lower depths are not skipped by the LevelBarrier.
621            // Uses .min() (not unconditional reset to 0) to preserve forward progress when
622            // injected tasks are at equal or deeper levels than current_level.
623            if self.topology.strategy == DispatchStrategy::LevelBarrier {
624                let min_active = self
625                    .graph
626                    .tasks
627                    .iter()
628                    .filter(|t| !t.status.is_terminal())
629                    .filter_map(|t| self.topology.depths.get(&t.id).copied())
630                    .min();
631                if let Some(min_depth) = min_active {
632                    self.current_level = self.current_level.min(min_depth);
633                }
634            }
635        }
636
637        let mut actions = Vec::new();
638
639        // Drain events buffered by wait_event, then any new ones in the channel.
640        while let Some(event) = self.buffered_events.pop_front() {
641            let cancel_actions = self.process_event(event);
642            actions.extend(cancel_actions);
643        }
644        while let Ok(event) = self.event_rx.try_recv() {
645            let cancel_actions = self.process_event(event);
646            actions.extend(cancel_actions);
647        }
648
649        if self.graph.status != GraphStatus::Running {
650            return actions;
651        }
652
653        // Check for timed-out tasks.
654        let timeout_actions = self.check_timeouts();
655        actions.extend(timeout_actions);
656
657        if self.graph.status != GraphStatus::Running {
658            return actions;
659        }
660
661        // Dispatch ready tasks up to max_parallel slots. Concurrency is pre-enforced here
662        // (topology-aware cap) and also enforced by SubAgentManager::spawn() returning
663        // ConcurrencyLimit when active + reserved >= max_concurrent.
664        // Non-transient spawn failures are handled by record_spawn_failure(); optimistic
665        // Running marks are reverted to Ready for ConcurrencyLimit errors.
666        let raw_ready = dag::ready_tasks(&self.graph);
667
668        // CascadeAware: partition ready tasks into preferred (healthy region) and deferred
669        // (cascading region). Deferred tasks still run when no preferred tasks remain.
670        // Skip for Sequential tasks — they must not be reordered relative to each other (C14).
671        let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::CascadeAware {
672            if let Some(ref detector) = self.cascade_detector {
673                let deprioritized = detector.deprioritized_tasks(&self.graph);
674                if deprioritized.is_empty() {
675                    raw_ready
676                } else {
677                    let (preferred, deferred): (Vec<_>, Vec<_>) =
678                        raw_ready.into_iter().partition(|id| {
679                            let is_sequential = self.graph.tasks[id.index()].execution_mode
680                                == ExecutionMode::Sequential;
681                            // Sequential tasks are never reordered.
682                            is_sequential || !deprioritized.contains(id)
683                        });
684                    preferred.into_iter().chain(deferred).collect()
685                }
686            } else {
687                raw_ready
688            }
689        } else {
690            raw_ready
691        };
692
693        // TreeOptimized: sort ready tasks by critical-path distance descending
694        // (tasks deepest in the DAG go first — shortest distance to sinks).
695        // Skip for Sequential tasks to preserve their ordering invariant (C14).
696        let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::TreeOptimized {
697            let max_depth = self.topology.depth;
698            let mut sortable = ready;
699            sortable.sort_by_key(|id| {
700                let task_depth = self.topology.depths.get(id).copied().unwrap_or(0);
701                // Deeper tasks have smaller key → dispatched first.
702                max_depth.saturating_sub(task_depth)
703            });
704            sortable
705        } else {
706            ready
707        };
708
709        // LevelBarrier: advance the current level when all tasks at this level are terminal.
710        // This handles failures/skips correctly because propagate_failure() uses BFS to
711        // transitively mark all non-terminal dependents as Skipped before tick() runs dispatch.
712        if self.topology.strategy == DispatchStrategy::LevelBarrier {
713            let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
714                let task_depth = self
715                    .topology
716                    .depths
717                    .get(&t.id)
718                    .copied()
719                    .unwrap_or(usize::MAX);
720                task_depth != self.current_level || t.status.is_terminal()
721            });
722            if all_current_level_terminal {
723                // Advance until we find a level with non-terminal tasks, or exhaust all levels.
724                let max_depth = self.topology.depth;
725                while self.current_level <= max_depth {
726                    let has_non_terminal = self.graph.tasks.iter().any(|t| {
727                        let d = self
728                            .topology
729                            .depths
730                            .get(&t.id)
731                            .copied()
732                            .unwrap_or(usize::MAX);
733                        d == self.current_level && !t.status.is_terminal()
734                    });
735                    if has_non_terminal {
736                        break;
737                    }
738                    self.current_level += 1;
739                }
740            }
741        }
742
743        // Available dispatch slots for this tick.
744        let mut slots = self.max_parallel.saturating_sub(self.running.len());
745
746        // For sequential dispatch: track whether we already scheduled one sequential task
747        // this tick AND whether any sequential task is currently running.
748        let mut sequential_spawned_this_tick = false;
749        let has_running_sequential = self
750            .running
751            .keys()
752            .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
753
754        for task_id in ready {
755            if slots == 0 {
756                break;
757            }
758
759            // LevelBarrier: only dispatch tasks at the current level.
760            if self.topology.strategy == DispatchStrategy::LevelBarrier {
761                let task_depth = self
762                    .topology
763                    .depths
764                    .get(&task_id)
765                    .copied()
766                    .unwrap_or(usize::MAX);
767                if task_depth != self.current_level {
768                    continue;
769                }
770            }
771
772            let task = &self.graph.tasks[task_id.index()];
773
774            // Sequential tasks: only one may run at a time within the scheduler.
775            // Independent sequential tasks in separate DAG branches are still
776            // serialized here (they share exclusive-resource intent by annotation).
777            if task.execution_mode == ExecutionMode::Sequential {
778                if sequential_spawned_this_tick || has_running_sequential {
779                    continue;
780                }
781                sequential_spawned_this_tick = true;
782            }
783
784            let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
785                tracing::debug!(
786                    task_id = %task_id,
787                    title = %task.title,
788                    "no agent available, routing task to main agent inline"
789                );
790                let prompt = self.build_task_prompt(task);
791                self.graph.tasks[task_id.index()].status = TaskStatus::Running;
792                actions.push(SchedulerAction::RunInline { task_id, prompt });
793                slots -= 1;
794                continue;
795            };
796
797            let prompt = self.build_task_prompt(task);
798
799            // Mark task as Running optimistically (before record_spawn is called).
800            self.graph.tasks[task_id.index()].status = TaskStatus::Running;
801
802            actions.push(SchedulerAction::Spawn {
803                task_id,
804                agent_def_name,
805                prompt,
806            });
807            slots -= 1;
808        }
809
810        // Check for completion or deadlock.
811        // Use graph Running status count to avoid false positives while Spawn actions
812        // are in-flight (record_spawn hasn't been called yet for freshly emitted spawns).
813        // Note: non-transient spawn failures (e.g. capability errors) are handled by
814        // record_spawn_failure() which marks the task Failed and propagates failure per
815        // the task's FailureStrategy — this detector does not fire for those cases because
816        // failed tasks are terminal and dag::ready_tasks() returns their unblocked dependents.
817        // ConcurrencyLimit errors are transient: record_spawn_failure() reverts the task
818        // from Running back to Ready, so ready_tasks() is non-empty and deadlock is not
819        // triggered.
820        let running_in_graph_now = self
821            .graph
822            .tasks
823            .iter()
824            .filter(|t| t.status == TaskStatus::Running)
825            .count();
826        if running_in_graph_now == 0 && self.running.is_empty() {
827            let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
828            if all_terminal {
829                self.graph.status = GraphStatus::Completed;
830                self.graph.finished_at = Some(super::graph::chrono_now());
831                actions.push(SchedulerAction::Done {
832                    status: GraphStatus::Completed,
833                });
834            } else if dag::ready_tasks(&self.graph).is_empty() {
835                tracing::error!(
836                    "scheduler deadlock: no running or ready tasks, but graph not complete"
837                );
838                self.graph.status = GraphStatus::Failed;
839                self.graph.finished_at = Some(super::graph::chrono_now());
840                // Invariant: deadlock fires only when self.running is empty (checked above).
841                debug_assert!(
842                    self.running.is_empty(),
843                    "deadlock branch reached with non-empty running map"
844                );
845                for task in &mut self.graph.tasks {
846                    if !task.status.is_terminal() {
847                        task.status = TaskStatus::Canceled;
848                    }
849                }
850                actions.push(SchedulerAction::Done {
851                    status: GraphStatus::Failed,
852                });
853            }
854        }
855
856        actions
857    }
858
859    /// Wait for the next event from a running sub-agent.
860    ///
861    /// Buffers the received event for processing in the next `tick` call.
862    /// Returns immediately if no tasks are running. Uses a timeout so that
863    /// periodic timeout checking can occur.
864    /// Compute the current deferral backoff with exponential growth capped at 5 seconds.
865    ///
866    /// Each consecutive spawn failure due to concurrency limits doubles the base backoff.
867    fn current_deferral_backoff(&self) -> Duration {
868        const MAX_BACKOFF: Duration = Duration::from_secs(5);
869        let multiplier = 1u32
870            .checked_shl(self.consecutive_spawn_failures.min(10))
871            .unwrap_or(u32::MAX);
872        self.deferral_backoff
873            .saturating_mul(multiplier)
874            .min(MAX_BACKOFF)
875    }
876
877    pub async fn wait_event(&mut self) {
878        if self.running.is_empty() {
879            tokio::time::sleep(self.current_deferral_backoff()).await;
880            return;
881        }
882
883        // Find the nearest timeout deadline among running tasks.
884        let nearest_timeout = self
885            .running
886            .values()
887            .map(|r| {
888                self.task_timeout
889                    .checked_sub(r.started_at.elapsed())
890                    .unwrap_or(Duration::ZERO)
891            })
892            .min()
893            .unwrap_or(Duration::from_secs(1));
894
895        // Clamp to at least 100 ms to avoid busy-looping.
896        let wait_duration = nearest_timeout.max(Duration::from_millis(100));
897
898        tokio::select! {
899            Some(event) = self.event_rx.recv() => {
900                // SEC-ORCH-02: guard against unbounded buffer growth. Use total task
901                // count rather than max_parallel so that parallel bursts exceeding
902                // max_parallel do not cause premature event drops.
903                if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
904                    // PERF-SCHED-02: log at error level — a dropped completion event
905                    // leaves a task stuck in Running until its timeout fires.
906                    if let Some(dropped) = self.buffered_events.pop_front() {
907                        tracing::error!(
908                            task_id = %dropped.task_id,
909                            buffer_len = self.buffered_events.len(),
910                            "event buffer saturated; completion event dropped — task may \
911                             remain Running until timeout"
912                        );
913                    }
914                }
915                self.buffered_events.push_back(event);
916            }
917            () = tokio::time::sleep(wait_duration) => {}
918        }
919    }
920
921    /// Record that a spawn action was successfully executed.
922    ///
923    /// Called by the caller after successfully spawning via `SubAgentManager`.
924    ///
925    /// Resets `consecutive_spawn_failures` to 0 as a "spawn succeeded = scheduler healthy"
926    /// signal. This is intentionally separate from the batch-level backoff in
927    /// [`record_batch_backoff`]: `record_spawn` provides an immediate reset on the first
928    /// success within a batch, while `record_batch_backoff` governs the tick-granular
929    /// failure counter used for exponential wait backoff.
930    pub fn record_spawn(
931        &mut self,
932        task_id: TaskId,
933        agent_handle_id: String,
934        agent_def_name: String,
935    ) {
936        self.consecutive_spawn_failures = 0;
937        self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
938        self.running.insert(
939            task_id,
940            RunningTask {
941                agent_handle_id,
942                agent_def_name,
943                started_at: Instant::now(),
944            },
945        );
946    }
947
948    /// Handle a failed spawn attempt.
949    ///
950    /// If the error is a transient concurrency-limit rejection, reverts the task from
951    /// Running back to `Ready` so the next [`tick`] can retry the spawn when a slot opens.
952    /// Otherwise, marks the task as `Failed` and propagates failure.
953    /// Returns any cancel actions needed.
954    ///
955    /// # Errors (via returned actions)
956    ///
957    /// Propagates failure per the task's effective `FailureStrategy`.
958    pub fn record_spawn_failure(
959        &mut self,
960        task_id: TaskId,
961        error: &SubAgentError,
962    ) -> Vec<SchedulerAction> {
963        // Transient condition: the SubAgentManager rejected the spawn because all
964        // concurrency slots are occupied. Revert to Ready so the next tick retries.
965        // consecutive_spawn_failures is updated batch-wide by record_batch_backoff().
966        if let SubAgentError::ConcurrencyLimit { active, max } = error {
967            tracing::warn!(
968                task_id = %task_id,
969                active,
970                max,
971                next_backoff_ms = self.current_deferral_backoff().as_millis(),
972                "concurrency limit reached, deferring task to next tick"
973            );
974            self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
975            return Vec::new();
976        }
977
978        // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
979        let error_excerpt: String = error.to_string().chars().take(512).collect();
980        tracing::warn!(
981            task_id = %task_id,
982            error = %error_excerpt,
983            "spawn failed, marking task failed"
984        );
985        self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
986        let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
987        let mut actions = Vec::new();
988        for cancel_task_id in cancel_ids {
989            if let Some(running) = self.running.remove(&cancel_task_id) {
990                actions.push(SchedulerAction::Cancel {
991                    agent_handle_id: running.agent_handle_id,
992                });
993            }
994        }
995        if self.graph.status != GraphStatus::Running {
996            self.graph.finished_at = Some(super::graph::chrono_now());
997            actions.push(SchedulerAction::Done {
998                status: self.graph.status,
999            });
1000        }
1001        actions
1002    }
1003
1004    /// Update the batch-level backoff counter after processing a full tick's spawn batch.
1005    ///
1006    /// With parallel dispatch a single tick may produce N Spawn actions. Individual
1007    /// per-spawn counter updates would miscount concurrent rejections as "consecutive"
1008    /// failures. This method captures the batch semantics instead:
1009    /// - If any spawn succeeded → reset the counter (scheduler is healthy).
1010    /// - Else if any spawn hit `ConcurrencyLimit` → this entire tick was a deferral tick.
1011    /// - If neither → no spawns were attempted; counter unchanged.
1012    pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
1013        if any_success {
1014            self.consecutive_spawn_failures = 0;
1015        } else if any_concurrency_failure {
1016            self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
1017        }
1018    }
1019
1020    /// Cancel all running tasks (for user-initiated plan cancellation).
1021    ///
1022    /// # Warning: Cooperative Cancellation
1023    ///
1024    /// Cancellation is cooperative and asynchronous. Tool operations (file writes, shell
1025    /// executions) in progress at the time of cancellation complete before the agent loop
1026    /// checks the cancellation token. Callers should inspect the task graph state and clean
1027    /// up partially-written artifacts manually.
1028    pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
1029        self.graph.status = GraphStatus::Canceled;
1030        self.graph.finished_at = Some(super::graph::chrono_now());
1031
1032        // Drain running map first to avoid split borrow issues (M3).
1033        let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
1034        let mut actions: Vec<SchedulerAction> = running
1035            .into_iter()
1036            .map(|(task_id, r)| {
1037                self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
1038                SchedulerAction::Cancel {
1039                    agent_handle_id: r.agent_handle_id,
1040                }
1041            })
1042            .collect();
1043
1044        for task in &mut self.graph.tasks {
1045            if !task.status.is_terminal() {
1046                task.status = TaskStatus::Canceled;
1047            }
1048        }
1049
1050        actions.push(SchedulerAction::Done {
1051            status: GraphStatus::Canceled,
1052        });
1053        actions
1054    }
1055}
1056
1057impl DagScheduler {
1058    /// Process a single `TaskEvent` and return any cancel actions needed.
1059    fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
1060        let TaskEvent {
1061            task_id,
1062            agent_handle_id,
1063            outcome,
1064        } = event;
1065
1066        // Guard against stale events from previous incarnations (e.g. after timeout+retry).
1067        // A timed-out agent's event_tx outlives the timeout and may send a completion later.
1068        match self.running.get(&task_id) {
1069            Some(running) if running.agent_handle_id != agent_handle_id => {
1070                tracing::warn!(
1071                    task_id = %task_id,
1072                    expected = %running.agent_handle_id,
1073                    got = %agent_handle_id,
1074                    "discarding stale event from previous agent incarnation"
1075                );
1076                return Vec::new();
1077            }
1078            None => {
1079                tracing::debug!(
1080                    task_id = %task_id,
1081                    agent_handle_id = %agent_handle_id,
1082                    "ignoring event for task not in running map"
1083                );
1084                return Vec::new();
1085            }
1086            Some(_) => {}
1087        }
1088
1089        // Compute duration BEFORE removing from running map (C1 fix).
1090        let duration_ms = self.running.get(&task_id).map_or(0, |r| {
1091            u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
1092        });
1093        let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
1094
1095        self.running.remove(&task_id);
1096
1097        match outcome {
1098            TaskOutcome::Completed { output, artifacts } => {
1099                self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
1100                self.graph.tasks[task_id.index()].result = Some(TaskResult {
1101                    output: output.clone(),
1102                    artifacts,
1103                    duration_ms,
1104                    agent_id: Some(agent_handle_id),
1105                    agent_def: agent_def_name,
1106                });
1107
1108                // Record success in cascade detector.
1109                if let Some(ref mut detector) = self.cascade_detector {
1110                    detector.record_outcome(task_id, true, &self.graph);
1111                }
1112
1113                // Mark newly unblocked tasks as Ready.
1114                // Downstream tasks are unblocked immediately — verification does not gate dispatch.
1115                let newly_ready = dag::ready_tasks(&self.graph);
1116                for ready_id in newly_ready {
1117                    if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
1118                        self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
1119                    }
1120                }
1121
1122                // Emit Verify action when verify_completeness is enabled.
1123                // The replan budget is enforced inside inject_tasks() — the observation
1124                // (emitting Verify) must not be gated on the mutation budget, or tasks
1125                // after budget exhaustion never receive verification at all.
1126                // max_replans=0 still emits Verify; gaps are logged only (no inject_tasks call).
1127                if self.verify_completeness {
1128                    vec![SchedulerAction::Verify { task_id, output }]
1129                } else {
1130                    Vec::new()
1131                }
1132            }
1133
1134            TaskOutcome::Failed { error } => {
1135                // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
1136                let error_excerpt: String = error.chars().take(512).collect();
1137                tracing::warn!(
1138                    task_id = %task_id,
1139                    error = %error_excerpt,
1140                    "task failed"
1141                );
1142                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1143
1144                // Record failure in cascade detector.
1145                if let Some(ref mut detector) = self.cascade_detector {
1146                    detector.record_outcome(task_id, false, &self.graph);
1147                }
1148
1149                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1150                let mut actions = Vec::new();
1151
1152                for cancel_task_id in cancel_ids {
1153                    if let Some(running) = self.running.remove(&cancel_task_id) {
1154                        actions.push(SchedulerAction::Cancel {
1155                            agent_handle_id: running.agent_handle_id,
1156                        });
1157                    }
1158                }
1159
1160                if self.graph.status != GraphStatus::Running {
1161                    self.graph.finished_at = Some(super::graph::chrono_now());
1162                    actions.push(SchedulerAction::Done {
1163                        status: self.graph.status,
1164                    });
1165                }
1166
1167                actions
1168            }
1169        }
1170    }
1171
1172    /// Check all running tasks for timeout violations.
1173    ///
1174    /// # Warning: Cooperative Cancellation
1175    ///
1176    /// Cancel actions emitted here signal agents cooperatively. Tool operations in progress
1177    /// at the time of cancellation complete before the agent loop checks the cancellation
1178    /// token. Partially-written artifacts may remain on disk after cancellation.
1179    fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1180        let timed_out: Vec<(TaskId, String)> = self
1181            .running
1182            .iter()
1183            .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1184            .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1185            .collect();
1186
1187        let mut actions = Vec::new();
1188        for (task_id, agent_handle_id) in timed_out {
1189            tracing::warn!(
1190                task_id = %task_id,
1191                timeout_secs = self.task_timeout.as_secs(),
1192                "task timed out"
1193            );
1194            self.running.remove(&task_id);
1195            self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1196
1197            actions.push(SchedulerAction::Cancel { agent_handle_id });
1198
1199            let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1200            for cancel_task_id in cancel_ids {
1201                if let Some(running) = self.running.remove(&cancel_task_id) {
1202                    actions.push(SchedulerAction::Cancel {
1203                        agent_handle_id: running.agent_handle_id,
1204                    });
1205                }
1206            }
1207
1208            if self.graph.status != GraphStatus::Running {
1209                self.graph.finished_at = Some(super::graph::chrono_now());
1210                actions.push(SchedulerAction::Done {
1211                    status: self.graph.status,
1212                });
1213                break;
1214            }
1215        }
1216
1217        actions
1218    }
1219
1220    /// Build the task prompt with dependency context injection (Section 14).
1221    ///
1222    /// Uses char-boundary-safe truncation (S1 fix) to avoid panics on multi-byte UTF-8.
1223    /// Dependency output is sanitized (SEC-ORCH-01) and titles are XML-escaped to prevent
1224    /// prompt injection via crafted task outputs.
1225    fn build_task_prompt(&self, task: &TaskNode) -> String {
1226        if task.depends_on.is_empty() {
1227            return task.description.clone();
1228        }
1229
1230        let completed_deps: Vec<&TaskNode> = task
1231            .depends_on
1232            .iter()
1233            .filter_map(|dep_id| {
1234                let dep = &self.graph.tasks[dep_id.index()];
1235                if dep.status == TaskStatus::Completed {
1236                    Some(dep)
1237                } else {
1238                    None
1239                }
1240            })
1241            .collect();
1242
1243        if completed_deps.is_empty() {
1244            return task.description.clone();
1245        }
1246
1247        let budget_per_dep = self
1248            .dependency_context_budget
1249            .checked_div(completed_deps.len())
1250            .unwrap_or(self.dependency_context_budget);
1251
1252        let mut context_block = String::from("<completed-dependencies>\n");
1253
1254        for dep in &completed_deps {
1255            // SEC-ORCH-01: XML-escape dep.id and dep.title to prevent breaking out of the
1256            // <completed-dependencies> wrapper via crafted titles.
1257            let escaped_id = xml_escape(&dep.id.to_string());
1258            let escaped_title = xml_escape(&dep.title);
1259            let _ = writeln!(
1260                context_block,
1261                "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1262            );
1263
1264            if let Some(ref result) = dep.result {
1265                // SEC-ORCH-01: sanitize dep output to prevent prompt injection from upstream tasks.
1266                let source = ContentSource::new(ContentSourceKind::A2aMessage);
1267                let sanitized = self.sanitizer.sanitize(&result.output, source);
1268                let safe_output = sanitized.body;
1269
1270                // Char-boundary-safe truncation (S1): use chars().take() instead of byte slicing.
1271                let char_count = safe_output.chars().count();
1272                if char_count > budget_per_dep {
1273                    let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1274                    let _ = write!(
1275                        context_block,
1276                        "{truncated}...\n[truncated: {char_count} chars total]"
1277                    );
1278                } else {
1279                    context_block.push_str(&safe_output);
1280                }
1281            } else {
1282                context_block.push_str("[no output recorded]\n");
1283            }
1284            context_block.push('\n');
1285        }
1286
1287        // Add notes for skipped deps.
1288        for dep_id in &task.depends_on {
1289            let dep = &self.graph.tasks[dep_id.index()];
1290            if dep.status == TaskStatus::Skipped {
1291                let escaped_id = xml_escape(&dep.id.to_string());
1292                let escaped_title = xml_escape(&dep.title);
1293                let _ = writeln!(
1294                    context_block,
1295                    "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1296                );
1297            }
1298        }
1299
1300        context_block.push_str("</completed-dependencies>\n\n");
1301        format!("{context_block}Your task: {}", task.description)
1302    }
1303}
1304
1305/// Escape XML special characters in a string to prevent tag injection.
1306fn xml_escape(s: &str) -> String {
1307    let mut out = String::with_capacity(s.len());
1308    for c in s.chars() {
1309        match c {
1310            '<' => out.push_str("&lt;"),
1311            '>' => out.push_str("&gt;"),
1312            '&' => out.push_str("&amp;"),
1313            '"' => out.push_str("&quot;"),
1314            '\'' => out.push_str("&#39;"),
1315            other => out.push(other),
1316        }
1317    }
1318    out
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    #![allow(clippy::default_trait_access)]
1324
1325    use super::*;
1326    use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1327
1328    fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1329        let mut n = TaskNode::new(
1330            id,
1331            format!("task-{id}"),
1332            format!("description for task {id}"),
1333        );
1334        n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1335        n
1336    }
1337
1338    fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1339        let mut g = TaskGraph::new("test goal");
1340        g.tasks = nodes;
1341        g
1342    }
1343
1344    fn make_def(name: &str) -> SubAgentDef {
1345        use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1346        SubAgentDef {
1347            name: name.to_string(),
1348            description: format!("{name} agent"),
1349            model: None,
1350            tools: ToolPolicy::InheritAll,
1351            disallowed_tools: vec![],
1352            permissions: SubAgentPermissions::default(),
1353            skills: SkillFilter::default(),
1354            system_prompt: String::new(),
1355            hooks: SubagentHooks::default(),
1356            memory: None,
1357            source: None,
1358            file_path: None,
1359        }
1360    }
1361
1362    fn make_config() -> zeph_config::OrchestrationConfig {
1363        zeph_config::OrchestrationConfig {
1364            enabled: true,
1365            max_tasks: 20,
1366            max_parallel: 4,
1367            default_failure_strategy: "abort".to_string(),
1368            default_max_retries: 3,
1369            task_timeout_secs: 300,
1370            planner_provider: String::new(),
1371            planner_max_tokens: 4096,
1372            dependency_context_budget: 16384,
1373            confirm_before_execute: true,
1374            aggregator_max_tokens: 4096,
1375            deferral_backoff_ms: 250,
1376            plan_cache: zeph_config::PlanCacheConfig::default(),
1377            topology_selection: false,
1378            verify_provider: String::new(),
1379            verify_max_tokens: 1024,
1380            max_replans: 2,
1381            verify_completeness: false,
1382            completeness_threshold: 0.7,
1383            tool_provider: String::new(),
1384            cascade_routing: false,
1385            cascade_failure_threshold: 0.5,
1386            tree_optimized_dispatch: false,
1387        }
1388    }
1389
1390    struct FirstRouter;
1391    impl AgentRouter for FirstRouter {
1392        fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1393            available.first().map(|d| d.name.clone())
1394        }
1395    }
1396
1397    struct NoneRouter;
1398    impl AgentRouter for NoneRouter {
1399        fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1400            None
1401        }
1402    }
1403
1404    fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1405        let config = make_config();
1406        let defs = vec![make_def("worker")];
1407        DagScheduler::new(graph, &config, router, defs).unwrap()
1408    }
1409
1410    fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1411        let config = make_config();
1412        let defs = vec![make_def("worker")];
1413        DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1414    }
1415
1416    // --- constructor tests ---
1417
1418    #[test]
1419    fn test_new_validates_graph_status() {
1420        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1421        graph.status = GraphStatus::Running; // wrong status
1422        let config = make_config();
1423        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1424        assert!(result.is_err());
1425        let err = result.unwrap_err();
1426        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1427    }
1428
1429    #[test]
1430    fn test_new_marks_roots_ready() {
1431        let graph = graph_from_nodes(vec![
1432            make_node(0, &[]),
1433            make_node(1, &[]),
1434            make_node(2, &[0, 1]),
1435        ]);
1436        let scheduler = make_scheduler(graph);
1437        assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1438        assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1439        assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1440        assert_eq!(scheduler.graph().status, GraphStatus::Running);
1441    }
1442
1443    #[test]
1444    fn test_new_validates_empty_graph() {
1445        let graph = graph_from_nodes(vec![]);
1446        let config = make_config();
1447        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1448        assert!(result.is_err());
1449    }
1450
1451    // --- tick tests ---
1452
1453    #[test]
1454    fn test_tick_produces_spawn_for_ready() {
1455        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1456        let mut scheduler = make_scheduler(graph);
1457        let actions = scheduler.tick();
1458        let spawns: Vec<_> = actions
1459            .iter()
1460            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1461            .collect();
1462        assert_eq!(spawns.len(), 2);
1463    }
1464
1465    #[test]
1466    fn test_tick_dispatches_all_regardless_of_max_parallel() {
1467        // tick() enforces max_parallel as a pre-dispatch cap.
1468        // With 5 independent tasks and max_parallel=2, only 2 are dispatched per tick.
1469        let graph = graph_from_nodes(vec![
1470            make_node(0, &[]),
1471            make_node(1, &[]),
1472            make_node(2, &[]),
1473            make_node(3, &[]),
1474            make_node(4, &[]),
1475        ]);
1476        let mut config = make_config();
1477        config.max_parallel = 2;
1478        let defs = vec![make_def("worker")];
1479        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1480        let actions = scheduler.tick();
1481        let spawn_count = actions
1482            .iter()
1483            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1484            .count();
1485        assert_eq!(
1486            spawn_count, 2,
1487            "max_parallel=2 caps dispatched tasks per tick"
1488        );
1489    }
1490
1491    #[test]
1492    fn test_tick_detects_completion() {
1493        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1494        graph.tasks[0].status = TaskStatus::Completed;
1495        let config = make_config();
1496        let defs = vec![make_def("worker")];
1497        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1498        // Manually set graph to Running since new() validated Created status
1499        // — but all tasks are terminal. tick() should detect completion.
1500        let actions = scheduler.tick();
1501        let has_done = actions.iter().any(|a| {
1502            matches!(
1503                a,
1504                SchedulerAction::Done {
1505                    status: GraphStatus::Completed
1506                }
1507            )
1508        });
1509        assert!(
1510            has_done,
1511            "should emit Done(Completed) when all tasks are terminal"
1512        );
1513    }
1514
1515    // --- completion event tests ---
1516
1517    #[test]
1518    fn test_completion_event_marks_deps_ready() {
1519        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1520        let mut scheduler = make_scheduler(graph);
1521
1522        // Simulate task 0 running.
1523        scheduler.graph.tasks[0].status = TaskStatus::Running;
1524        scheduler.running.insert(
1525            TaskId(0),
1526            RunningTask {
1527                agent_handle_id: "handle-0".to_string(),
1528                agent_def_name: "worker".to_string(),
1529                started_at: Instant::now(),
1530            },
1531        );
1532
1533        let event = TaskEvent {
1534            task_id: TaskId(0),
1535            agent_handle_id: "handle-0".to_string(),
1536            outcome: TaskOutcome::Completed {
1537                output: "done".to_string(),
1538                artifacts: vec![],
1539            },
1540        };
1541        scheduler.buffered_events.push_back(event);
1542
1543        let actions = scheduler.tick();
1544        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1545        // Task 1 should now be Ready or Spawn action emitted.
1546        let has_spawn_1 = actions
1547            .iter()
1548            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1549        assert!(
1550            has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1551            "task 1 should be spawned or marked Ready"
1552        );
1553    }
1554
1555    #[test]
1556    fn test_failure_abort_cancels_running() {
1557        let graph = graph_from_nodes(vec![
1558            make_node(0, &[]),
1559            make_node(1, &[]),
1560            make_node(2, &[0, 1]),
1561        ]);
1562        let mut scheduler = make_scheduler(graph);
1563
1564        // Simulate tasks 0 and 1 running.
1565        scheduler.graph.tasks[0].status = TaskStatus::Running;
1566        scheduler.running.insert(
1567            TaskId(0),
1568            RunningTask {
1569                agent_handle_id: "h0".to_string(),
1570                agent_def_name: "worker".to_string(),
1571                started_at: Instant::now(),
1572            },
1573        );
1574        scheduler.graph.tasks[1].status = TaskStatus::Running;
1575        scheduler.running.insert(
1576            TaskId(1),
1577            RunningTask {
1578                agent_handle_id: "h1".to_string(),
1579                agent_def_name: "worker".to_string(),
1580                started_at: Instant::now(),
1581            },
1582        );
1583
1584        // Task 0 fails with default Abort strategy.
1585        let event = TaskEvent {
1586            task_id: TaskId(0),
1587            agent_handle_id: "h0".to_string(),
1588            outcome: TaskOutcome::Failed {
1589                error: "boom".to_string(),
1590            },
1591        };
1592        scheduler.buffered_events.push_back(event);
1593
1594        let actions = scheduler.tick();
1595        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1596        let cancel_ids: Vec<_> = actions
1597            .iter()
1598            .filter_map(|a| {
1599                if let SchedulerAction::Cancel { agent_handle_id } = a {
1600                    Some(agent_handle_id.as_str())
1601                } else {
1602                    None
1603                }
1604            })
1605            .collect();
1606        assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1607        assert!(
1608            actions
1609                .iter()
1610                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1611        );
1612    }
1613
1614    #[test]
1615    fn test_failure_skip_propagates() {
1616        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1617        let mut scheduler = make_scheduler(graph);
1618
1619        // Set failure strategy to Skip on task 0.
1620        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1621        scheduler.graph.tasks[0].status = TaskStatus::Running;
1622        scheduler.running.insert(
1623            TaskId(0),
1624            RunningTask {
1625                agent_handle_id: "h0".to_string(),
1626                agent_def_name: "worker".to_string(),
1627                started_at: Instant::now(),
1628            },
1629        );
1630
1631        let event = TaskEvent {
1632            task_id: TaskId(0),
1633            agent_handle_id: "h0".to_string(),
1634            outcome: TaskOutcome::Failed {
1635                error: "skip me".to_string(),
1636            },
1637        };
1638        scheduler.buffered_events.push_back(event);
1639        scheduler.tick();
1640
1641        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1642        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1643    }
1644
1645    #[test]
1646    fn test_failure_retry_reschedules() {
1647        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1648        let mut scheduler = make_scheduler(graph);
1649
1650        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1651        scheduler.graph.tasks[0].max_retries = Some(3);
1652        scheduler.graph.tasks[0].retry_count = 0;
1653        scheduler.graph.tasks[0].status = TaskStatus::Running;
1654        scheduler.running.insert(
1655            TaskId(0),
1656            RunningTask {
1657                agent_handle_id: "h0".to_string(),
1658                agent_def_name: "worker".to_string(),
1659                started_at: Instant::now(),
1660            },
1661        );
1662
1663        let event = TaskEvent {
1664            task_id: TaskId(0),
1665            agent_handle_id: "h0".to_string(),
1666            outcome: TaskOutcome::Failed {
1667                error: "transient".to_string(),
1668            },
1669        };
1670        scheduler.buffered_events.push_back(event);
1671        let actions = scheduler.tick();
1672
1673        // Task should be rescheduled (Ready) and a Spawn action emitted.
1674        let has_spawn = actions
1675            .iter()
1676            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1677        assert!(
1678            has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1679            "retry should produce spawn or Ready status"
1680        );
1681        // retry_count incremented
1682        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1683    }
1684
1685    #[test]
1686    fn test_process_event_failed_retry() {
1687        // End-to-end: send Failed event, verify retry path produces Ready -> Spawn.
1688        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1689        let mut scheduler = make_scheduler(graph);
1690
1691        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1692        scheduler.graph.tasks[0].max_retries = Some(2);
1693        scheduler.graph.tasks[0].retry_count = 0;
1694        scheduler.graph.tasks[0].status = TaskStatus::Running;
1695        scheduler.running.insert(
1696            TaskId(0),
1697            RunningTask {
1698                agent_handle_id: "h0".to_string(),
1699                agent_def_name: "worker".to_string(),
1700                started_at: Instant::now(),
1701            },
1702        );
1703
1704        let event = TaskEvent {
1705            task_id: TaskId(0),
1706            agent_handle_id: "h0".to_string(),
1707            outcome: TaskOutcome::Failed {
1708                error: "first failure".to_string(),
1709            },
1710        };
1711        scheduler.buffered_events.push_back(event);
1712        let actions = scheduler.tick();
1713
1714        // After retry: retry_count = 1, status = Ready or Spawn emitted.
1715        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1716        let spawned = actions
1717            .iter()
1718            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1719        assert!(
1720            spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1721            "retry should emit Spawn or set Ready"
1722        );
1723        // Graph must still be Running.
1724        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1725    }
1726
1727    #[test]
1728    fn test_timeout_cancels_stalled() {
1729        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1730        let mut config = make_config();
1731        config.task_timeout_secs = 1; // 1 second timeout
1732        let defs = vec![make_def("worker")];
1733        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1734
1735        // Simulate a running task that started just over 1 second ago.
1736        scheduler.graph.tasks[0].status = TaskStatus::Running;
1737        scheduler.running.insert(
1738            TaskId(0),
1739            RunningTask {
1740                agent_handle_id: "h0".to_string(),
1741                agent_def_name: "worker".to_string(),
1742                started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), // already timed out
1743            },
1744        );
1745
1746        let actions = scheduler.tick();
1747        let has_cancel = actions.iter().any(
1748            |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1749        );
1750        assert!(has_cancel, "timed-out task should emit Cancel action");
1751        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1752    }
1753
1754    #[test]
1755    fn test_cancel_all() {
1756        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1757        let mut scheduler = make_scheduler(graph);
1758
1759        scheduler.graph.tasks[0].status = TaskStatus::Running;
1760        scheduler.running.insert(
1761            TaskId(0),
1762            RunningTask {
1763                agent_handle_id: "h0".to_string(),
1764                agent_def_name: "worker".to_string(),
1765                started_at: Instant::now(),
1766            },
1767        );
1768        scheduler.graph.tasks[1].status = TaskStatus::Running;
1769        scheduler.running.insert(
1770            TaskId(1),
1771            RunningTask {
1772                agent_handle_id: "h1".to_string(),
1773                agent_def_name: "worker".to_string(),
1774                started_at: Instant::now(),
1775            },
1776        );
1777
1778        let actions = scheduler.cancel_all();
1779
1780        assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1781        assert!(scheduler.running.is_empty());
1782        let cancel_count = actions
1783            .iter()
1784            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1785            .count();
1786        assert_eq!(cancel_count, 2);
1787        assert!(actions.iter().any(|a| matches!(
1788            a,
1789            SchedulerAction::Done {
1790                status: GraphStatus::Canceled
1791            }
1792        )));
1793    }
1794
1795    #[test]
1796    fn test_record_spawn_failure() {
1797        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1798        let mut scheduler = make_scheduler(graph);
1799
1800        // Simulate task marked Running (by tick) but spawn failed.
1801        scheduler.graph.tasks[0].status = TaskStatus::Running;
1802
1803        let error = SubAgentError::Spawn("spawn error".to_string());
1804        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1805        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1806        // With Abort strategy and no other running tasks, graph should be Failed.
1807        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1808        assert!(
1809            actions
1810                .iter()
1811                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1812        );
1813    }
1814
1815    #[test]
1816    fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1817        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1818        let mut scheduler = make_scheduler(graph);
1819
1820        // Simulate tick() optimistically marking the task Running before spawn.
1821        scheduler.graph.tasks[0].status = TaskStatus::Running;
1822
1823        // Concurrency limit hit — transient, should not fail the task.
1824        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1825        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1826        assert_eq!(
1827            scheduler.graph.tasks[0].status,
1828            TaskStatus::Ready,
1829            "task must revert to Ready so the next tick can retry"
1830        );
1831        assert_eq!(
1832            scheduler.graph.status,
1833            GraphStatus::Running,
1834            "graph must stay Running, not transition to Failed"
1835        );
1836        assert!(
1837            actions.is_empty(),
1838            "no cancel or done actions expected for a transient deferral"
1839        );
1840    }
1841
1842    #[test]
1843    fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1844        // Both spawn() and resume() now return SubAgentError::ConcurrencyLimit — verify handling.
1845        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1846        let mut scheduler = make_scheduler(graph);
1847        scheduler.graph.tasks[0].status = TaskStatus::Running;
1848
1849        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1850        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1851        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1852        assert!(actions.is_empty());
1853    }
1854
1855    // --- #1516 edge-case tests ---
1856
1857    #[test]
1858    fn test_concurrency_deferral_does_not_affect_running_task() {
1859        // Two root tasks. Task 0 is Running (successfully spawned).
1860        // Task 1 hits a concurrency limit and reverts to Ready.
1861        // Task 0 must be unaffected.
1862        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1863        let mut scheduler = make_scheduler(graph);
1864
1865        // Simulate both tasks optimistically marked Running by tick().
1866        scheduler.graph.tasks[0].status = TaskStatus::Running;
1867        scheduler.running.insert(
1868            TaskId(0),
1869            RunningTask {
1870                agent_handle_id: "h0".to_string(),
1871                agent_def_name: "worker".to_string(),
1872                started_at: Instant::now(),
1873            },
1874        );
1875        scheduler.graph.tasks[1].status = TaskStatus::Running;
1876
1877        // Task 1 spawn fails with concurrency limit.
1878        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1879        let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1880
1881        assert_eq!(
1882            scheduler.graph.tasks[0].status,
1883            TaskStatus::Running,
1884            "task 0 must remain Running"
1885        );
1886        assert_eq!(
1887            scheduler.graph.tasks[1].status,
1888            TaskStatus::Ready,
1889            "task 1 must revert to Ready"
1890        );
1891        assert_eq!(
1892            scheduler.graph.status,
1893            GraphStatus::Running,
1894            "graph must stay Running"
1895        );
1896        assert!(actions.is_empty(), "no cancel or done actions expected");
1897    }
1898
1899    #[test]
1900    fn test_max_concurrent_zero_no_infinite_loop() {
1901        // max_parallel=0 is a degenerate config. tick() uses saturating_sub so slots=0,
1902        // and no tasks are dispatched. The graph does not deadlock because ready tasks
1903        // still exist — the caller must increase max_parallel or handle this externally.
1904        // After max_parallel is increased and a new tick fires, tasks will be dispatched.
1905        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1906        let config = zeph_config::OrchestrationConfig {
1907            max_parallel: 0,
1908            ..make_config()
1909        };
1910        let mut scheduler = DagScheduler::new(
1911            graph,
1912            &config,
1913            Box::new(FirstRouter),
1914            vec![make_def("worker")],
1915        )
1916        .unwrap();
1917
1918        let actions1 = scheduler.tick();
1919        // No Spawn: slots = max_parallel(0) - running(0) = 0.
1920        assert!(
1921            actions1
1922                .iter()
1923                .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1924            "no Spawn expected when max_parallel=0"
1925        );
1926        assert!(
1927            actions1
1928                .iter()
1929                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1930            "no Done(Failed) expected — ready tasks exist, so no deadlock"
1931        );
1932        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1933
1934        // Second tick also dispatches nothing (still max_parallel=0, ready task exists).
1935        let actions2 = scheduler.tick();
1936        assert!(
1937            actions2
1938                .iter()
1939                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1940            "second tick must not emit Done(Failed) — ready tasks still exist"
1941        );
1942        assert_eq!(
1943            scheduler.graph.status,
1944            GraphStatus::Running,
1945            "graph must remain Running"
1946        );
1947    }
1948
1949    #[test]
1950    fn test_all_tasks_deferred_graph_stays_running() {
1951        // Both root tasks are spawned optimistically, both fail with ConcurrencyLimit,
1952        // and both revert to Ready. The graph must remain Running (not Failed) and
1953        // the next tick must re-emit Spawn actions for the deferred tasks.
1954        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1955        let mut scheduler = make_scheduler(graph);
1956
1957        // First tick emits Spawn for both tasks and marks them Running.
1958        let actions = scheduler.tick();
1959        assert_eq!(
1960            actions
1961                .iter()
1962                .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1963                .count(),
1964            2,
1965            "expected 2 Spawn actions on first tick"
1966        );
1967        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1968        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1969
1970        // Both spawns fail — both revert to Ready.
1971        let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1972        let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1973        let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1974        assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1975        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1976        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1977        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1978
1979        // Second tick must retry both deferred tasks.
1980        let retry_actions = scheduler.tick();
1981        let spawn_count = retry_actions
1982            .iter()
1983            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1984            .count();
1985        assert!(
1986            spawn_count > 0,
1987            "second tick must re-emit Spawn for deferred tasks"
1988        );
1989        assert!(
1990            retry_actions.iter().all(|a| !matches!(
1991                a,
1992                SchedulerAction::Done {
1993                    status: GraphStatus::Failed,
1994                    ..
1995                }
1996            )),
1997            "no Done(Failed) expected"
1998        );
1999    }
2000
2001    #[test]
2002    fn test_build_prompt_no_deps() {
2003        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2004        let scheduler = make_scheduler(graph);
2005        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
2006        assert_eq!(prompt, "description for task 0");
2007    }
2008
2009    #[test]
2010    fn test_build_prompt_with_deps_and_truncation() {
2011        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2012        graph.tasks[0].status = TaskStatus::Completed;
2013        // Create output longer than budget
2014        graph.tasks[0].result = Some(TaskResult {
2015            output: "x".repeat(200),
2016            artifacts: vec![],
2017            duration_ms: 10,
2018            agent_id: None,
2019            agent_def: None,
2020        });
2021
2022        let config = zeph_config::OrchestrationConfig {
2023            dependency_context_budget: 50,
2024            ..make_config()
2025        };
2026        let scheduler = DagScheduler::new(
2027            graph,
2028            &config,
2029            Box::new(FirstRouter),
2030            vec![make_def("worker")],
2031        )
2032        .unwrap();
2033
2034        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2035        assert!(prompt.contains("<completed-dependencies>"));
2036        assert!(prompt.contains("[truncated:"));
2037        assert!(prompt.contains("Your task:"));
2038    }
2039
2040    #[test]
2041    fn test_duration_ms_computed_correctly() {
2042        // Regression test for C1: duration_ms must be non-zero after completion.
2043        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2044        let mut scheduler = make_scheduler(graph);
2045
2046        scheduler.graph.tasks[0].status = TaskStatus::Running;
2047        scheduler.running.insert(
2048            TaskId(0),
2049            RunningTask {
2050                agent_handle_id: "h0".to_string(),
2051                agent_def_name: "worker".to_string(),
2052                started_at: Instant::now()
2053                    .checked_sub(Duration::from_millis(50))
2054                    .unwrap(),
2055            },
2056        );
2057
2058        let event = TaskEvent {
2059            task_id: TaskId(0),
2060            agent_handle_id: "h0".to_string(),
2061            outcome: TaskOutcome::Completed {
2062                output: "result".to_string(),
2063                artifacts: vec![],
2064            },
2065        };
2066        scheduler.buffered_events.push_back(event);
2067        scheduler.tick();
2068
2069        let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
2070        assert!(
2071            result.duration_ms > 0,
2072            "duration_ms should be > 0, got {}",
2073            result.duration_ms
2074        );
2075    }
2076
2077    #[test]
2078    fn test_utf8_safe_truncation() {
2079        // S1 regression: truncation must not panic on multi-byte UTF-8.
2080        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2081        graph.tasks[0].status = TaskStatus::Completed;
2082        // Unicode: each char is 3 bytes in UTF-8.
2083        let unicode_output = "日本語テスト".repeat(100);
2084        graph.tasks[0].result = Some(TaskResult {
2085            output: unicode_output,
2086            artifacts: vec![],
2087            duration_ms: 10,
2088            agent_id: None,
2089            agent_def: None,
2090        });
2091
2092        // Budget large enough to hold the spotlighting wrapper + some Japanese chars.
2093        // The sanitizer adds ~200 chars of spotlight header, so 500 chars is sufficient.
2094        let config = zeph_config::OrchestrationConfig {
2095            dependency_context_budget: 500,
2096            ..make_config()
2097        };
2098        let scheduler = DagScheduler::new(
2099            graph,
2100            &config,
2101            Box::new(FirstRouter),
2102            vec![make_def("worker")],
2103        )
2104        .unwrap();
2105
2106        // Must not panic, and Japanese chars must be preserved in the output.
2107        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2108        assert!(
2109            prompt.contains("日"),
2110            "Japanese characters should be in the prompt after safe truncation"
2111        );
2112    }
2113
2114    #[test]
2115    fn test_no_agent_routes_inline() {
2116        // NoneRouter: when no agent matches, task falls back to RunInline.
2117        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2118        let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
2119        let actions = scheduler.tick();
2120        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
2121        assert!(
2122            actions
2123                .iter()
2124                .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
2125        );
2126    }
2127
2128    #[test]
2129    fn test_stale_event_rejected() {
2130        // Regression: events from a previous agent incarnation must be discarded.
2131        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2132        let mut scheduler = make_scheduler(graph);
2133
2134        // Simulate task running with handle "current-handle".
2135        scheduler.graph.tasks[0].status = TaskStatus::Running;
2136        scheduler.running.insert(
2137            TaskId(0),
2138            RunningTask {
2139                agent_handle_id: "current-handle".to_string(),
2140                agent_def_name: "worker".to_string(),
2141                started_at: Instant::now(),
2142            },
2143        );
2144
2145        // Send a completion event from the OLD agent (stale handle).
2146        let stale_event = TaskEvent {
2147            task_id: TaskId(0),
2148            agent_handle_id: "old-handle".to_string(),
2149            outcome: TaskOutcome::Completed {
2150                output: "stale output".to_string(),
2151                artifacts: vec![],
2152            },
2153        };
2154        scheduler.buffered_events.push_back(stale_event);
2155        let actions = scheduler.tick();
2156
2157        // Stale event must be discarded — task must NOT be completed.
2158        assert_ne!(
2159            scheduler.graph.tasks[0].status,
2160            TaskStatus::Completed,
2161            "stale event must not complete the task"
2162        );
2163        // No Spawn or Done actions should result from a discarded stale event.
2164        let has_done = actions
2165            .iter()
2166            .any(|a| matches!(a, SchedulerAction::Done { .. }));
2167        assert!(
2168            !has_done,
2169            "no Done action should be emitted for a stale event"
2170        );
2171        // Task must still be in the running map.
2172        assert!(
2173            scheduler.running.contains_key(&TaskId(0)),
2174            "running task must remain after stale event"
2175        );
2176    }
2177
2178    #[test]
2179    fn test_build_prompt_chars_count_in_truncation_message() {
2180        // Fix #3: truncation message must report char count, not byte count.
2181        // Use pure ASCII so sanitization doesn't significantly change char count.
2182        // Budget < output length => truncation triggered; verify the count label is "chars total".
2183        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2184        graph.tasks[0].status = TaskStatus::Completed;
2185        // ASCII output: byte count == char count, so both old and new code produce the same number,
2186        // but the label "chars total" (not "bytes total") is what matters here.
2187        let output = "x".repeat(200);
2188        graph.tasks[0].result = Some(TaskResult {
2189            output,
2190            artifacts: vec![],
2191            duration_ms: 10,
2192            agent_id: None,
2193            agent_def: None,
2194        });
2195
2196        let config = zeph_config::OrchestrationConfig {
2197            dependency_context_budget: 10, // truncate: sanitized output >> 10 chars
2198            ..make_config()
2199        };
2200        let scheduler = DagScheduler::new(
2201            graph,
2202            &config,
2203            Box::new(FirstRouter),
2204            vec![make_def("worker")],
2205        )
2206        .unwrap();
2207
2208        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2209        // Truncation must have been triggered and the message must use "chars total" label.
2210        assert!(
2211            prompt.contains("chars total"),
2212            "truncation message must use 'chars total' label. Prompt: {prompt}"
2213        );
2214        assert!(
2215            prompt.contains("[truncated:"),
2216            "prompt must contain truncation notice. Prompt: {prompt}"
2217        );
2218    }
2219
2220    // --- resume_from tests (MT-1) ---
2221
2222    #[test]
2223    fn test_resume_from_accepts_paused_graph() {
2224        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2225        graph.status = GraphStatus::Paused;
2226        graph.tasks[0].status = TaskStatus::Pending;
2227
2228        let scheduler =
2229            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2230                .expect("resume_from should accept Paused graph");
2231        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2232    }
2233
2234    #[test]
2235    fn test_resume_from_accepts_failed_graph() {
2236        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2237        graph.status = GraphStatus::Failed;
2238        graph.tasks[0].status = TaskStatus::Failed;
2239
2240        let scheduler =
2241            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2242                .expect("resume_from should accept Failed graph");
2243        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2244    }
2245
2246    #[test]
2247    fn test_resume_from_rejects_completed_graph() {
2248        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2249        graph.status = GraphStatus::Completed;
2250
2251        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2252            .unwrap_err();
2253        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2254    }
2255
2256    #[test]
2257    fn test_resume_from_rejects_canceled_graph() {
2258        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2259        graph.status = GraphStatus::Canceled;
2260
2261        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2262            .unwrap_err();
2263        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2264    }
2265
2266    #[test]
2267    fn test_resume_from_reconstructs_running_tasks() {
2268        // IC1: tasks that were Running at pause time must appear in the running map.
2269        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2270        graph.status = GraphStatus::Paused;
2271        graph.tasks[0].status = TaskStatus::Running;
2272        graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2273        graph.tasks[0].agent_hint = Some("worker".to_string());
2274        graph.tasks[1].status = TaskStatus::Pending;
2275
2276        let scheduler =
2277            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2278                .expect("should succeed");
2279
2280        assert!(
2281            scheduler.running.contains_key(&TaskId(0)),
2282            "Running task must be reconstructed in the running map (IC1)"
2283        );
2284        assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2285        assert!(
2286            !scheduler.running.contains_key(&TaskId(1)),
2287            "Pending task must not appear in running map"
2288        );
2289    }
2290
2291    #[test]
2292    fn test_resume_from_sets_status_running() {
2293        // II3: resume_from must set graph.status = Running regardless of input status.
2294        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2295        graph.status = GraphStatus::Paused;
2296
2297        let scheduler =
2298            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2299                .unwrap();
2300        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2301    }
2302
2303    // --- #1619 regression tests: consecutive_spawn_failures + exponential backoff ---
2304
2305    #[test]
2306    fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2307        // Each tick where all spawns hit ConcurrencyLimit must increment the counter
2308        // via record_batch_backoff(false, true).
2309        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2310        let mut scheduler = make_scheduler(graph);
2311        scheduler.graph.tasks[0].status = TaskStatus::Running;
2312
2313        assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2314
2315        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2316        scheduler.record_spawn_failure(TaskId(0), &error);
2317        // record_spawn_failure no longer increments; batch_backoff does.
2318        scheduler.record_batch_backoff(false, true);
2319        assert_eq!(
2320            scheduler.consecutive_spawn_failures, 1,
2321            "first deferral tick: consecutive_spawn_failures must be 1"
2322        );
2323
2324        scheduler.graph.tasks[0].status = TaskStatus::Running;
2325        scheduler.record_spawn_failure(TaskId(0), &error);
2326        scheduler.record_batch_backoff(false, true);
2327        assert_eq!(
2328            scheduler.consecutive_spawn_failures, 2,
2329            "second deferral tick: consecutive_spawn_failures must be 2"
2330        );
2331
2332        scheduler.graph.tasks[0].status = TaskStatus::Running;
2333        scheduler.record_spawn_failure(TaskId(0), &error);
2334        scheduler.record_batch_backoff(false, true);
2335        assert_eq!(
2336            scheduler.consecutive_spawn_failures, 3,
2337            "third deferral tick: consecutive_spawn_failures must be 3"
2338        );
2339    }
2340
2341    #[test]
2342    fn test_consecutive_spawn_failures_resets_on_success() {
2343        // record_spawn() after deferrals must reset consecutive_spawn_failures to 0
2344        // (via record_spawn internal reset; record_batch_backoff(true, _) also resets).
2345        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2346        let mut scheduler = make_scheduler(graph);
2347        scheduler.graph.tasks[0].status = TaskStatus::Running;
2348
2349        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2350        scheduler.record_spawn_failure(TaskId(0), &error);
2351        scheduler.record_batch_backoff(false, true);
2352        scheduler.graph.tasks[0].status = TaskStatus::Running;
2353        scheduler.record_spawn_failure(TaskId(0), &error);
2354        scheduler.record_batch_backoff(false, true);
2355        assert_eq!(scheduler.consecutive_spawn_failures, 2);
2356
2357        // Successful spawn resets the counter directly in record_spawn.
2358        scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2359        assert_eq!(
2360            scheduler.consecutive_spawn_failures, 0,
2361            "record_spawn must reset consecutive_spawn_failures to 0"
2362        );
2363    }
2364
2365    #[tokio::test]
2366    async fn test_exponential_backoff_duration() {
2367        // With consecutive_spawn_failures=0, backoff equals the base interval.
2368        // With consecutive_spawn_failures=3, backoff = min(base * 8, 5000ms).
2369        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2370        let config = zeph_config::OrchestrationConfig {
2371            deferral_backoff_ms: 50,
2372            ..make_config()
2373        };
2374        let mut scheduler = DagScheduler::new(
2375            graph,
2376            &config,
2377            Box::new(FirstRouter),
2378            vec![make_def("worker")],
2379        )
2380        .unwrap();
2381
2382        // consecutive_spawn_failures=0 → sleep ≈ 50ms (base).
2383        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2384        let start = tokio::time::Instant::now();
2385        scheduler.wait_event().await;
2386        let elapsed0 = start.elapsed();
2387        assert!(
2388            elapsed0.as_millis() >= 50,
2389            "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2390            elapsed0.as_millis()
2391        );
2392
2393        // Simulate 3 consecutive deferrals: multiplier = 2^3 = 8 → 400ms, capped at 5000ms.
2394        scheduler.consecutive_spawn_failures = 3;
2395        let start = tokio::time::Instant::now();
2396        scheduler.wait_event().await;
2397        let elapsed3 = start.elapsed();
2398        assert!(
2399            elapsed3.as_millis() >= 400,
2400            "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2401            elapsed3.as_millis()
2402        );
2403
2404        // Simulate 20 consecutive deferrals: exponent capped at 10 → 50 * 1024 = 51200 → capped at 5000ms.
2405        scheduler.consecutive_spawn_failures = 20;
2406        let start = tokio::time::Instant::now();
2407        scheduler.wait_event().await;
2408        let elapsed_capped = start.elapsed();
2409        assert!(
2410            elapsed_capped.as_millis() >= 5000,
2411            "backoff must be capped at 5000ms with high deferrals, got {}ms",
2412            elapsed_capped.as_millis()
2413        );
2414    }
2415
2416    // --- deferral_backoff regression test ---
2417
2418    #[tokio::test]
2419    async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2420        // Regression for issue #1519: wait_event must sleep deferral_backoff when
2421        // running is empty, preventing a busy spin-loop.
2422        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2423        let config = zeph_config::OrchestrationConfig {
2424            deferral_backoff_ms: 50,
2425            ..make_config()
2426        };
2427        let mut scheduler = DagScheduler::new(
2428            graph,
2429            &config,
2430            Box::new(FirstRouter),
2431            vec![make_def("worker")],
2432        )
2433        .unwrap();
2434
2435        // Do not start any tasks — running map stays empty.
2436        assert!(scheduler.running.is_empty());
2437
2438        let start = tokio::time::Instant::now();
2439        scheduler.wait_event().await;
2440        let elapsed = start.elapsed();
2441
2442        assert!(
2443            elapsed.as_millis() >= 50,
2444            "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2445            elapsed.as_millis()
2446        );
2447    }
2448
2449    #[test]
2450    fn test_current_deferral_backoff_exponential_growth() {
2451        // Regression for issue #1618: backoff must grow exponentially with consecutive
2452        // spawn failures so the scheduler does not busy-spin at 250ms when saturated.
2453        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2454        let config = zeph_config::OrchestrationConfig {
2455            deferral_backoff_ms: 250,
2456            ..make_config()
2457        };
2458        let mut scheduler = DagScheduler::new(
2459            graph,
2460            &config,
2461            Box::new(FirstRouter),
2462            vec![make_def("worker")],
2463        )
2464        .unwrap();
2465
2466        assert_eq!(
2467            scheduler.current_deferral_backoff(),
2468            Duration::from_millis(250)
2469        );
2470
2471        scheduler.consecutive_spawn_failures = 1;
2472        assert_eq!(
2473            scheduler.current_deferral_backoff(),
2474            Duration::from_millis(500)
2475        );
2476
2477        scheduler.consecutive_spawn_failures = 2;
2478        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2479
2480        scheduler.consecutive_spawn_failures = 3;
2481        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2482
2483        scheduler.consecutive_spawn_failures = 4;
2484        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2485
2486        // Cap at 5 seconds.
2487        scheduler.consecutive_spawn_failures = 5;
2488        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2489
2490        scheduler.consecutive_spawn_failures = 100;
2491        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2492    }
2493
2494    #[test]
2495    fn test_record_spawn_resets_consecutive_failures() {
2496        // Regression for issue #1618: a successful spawn resets the backoff counter.
2497        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2498        let mut scheduler = DagScheduler::new(
2499            graph,
2500            &make_config(),
2501            Box::new(FirstRouter),
2502            vec![make_def("worker")],
2503        )
2504        .unwrap();
2505
2506        scheduler.consecutive_spawn_failures = 3;
2507        let task_id = TaskId(0);
2508        scheduler.graph.tasks[0].status = TaskStatus::Running;
2509        scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2510
2511        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2512    }
2513
2514    #[test]
2515    fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2516        // record_spawn_failure(ConcurrencyLimit) reverts task to Ready but does NOT
2517        // change consecutive_spawn_failures — that is the job of record_batch_backoff.
2518        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2519        let mut scheduler = DagScheduler::new(
2520            graph,
2521            &make_config(),
2522            Box::new(FirstRouter),
2523            vec![make_def("worker")],
2524        )
2525        .unwrap();
2526
2527        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2528        let task_id = TaskId(0);
2529        scheduler.graph.tasks[0].status = TaskStatus::Running;
2530
2531        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2532        scheduler.record_spawn_failure(task_id, &error);
2533
2534        // Counter unchanged — batch_backoff is responsible for incrementing.
2535        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2536        // Task reverted to Ready.
2537        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2538    }
2539
2540    // --- #1628 parallel dispatch tests ---
2541
2542    #[test]
2543    fn test_parallel_dispatch_all_ready() {
2544        // tick() enforces max_parallel as a pre-dispatch cap. With 6 independent tasks
2545        // and max_parallel=2, only 2 tasks are dispatched per tick.
2546        let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2547        let graph = graph_from_nodes(nodes);
2548        let config = zeph_config::OrchestrationConfig {
2549            max_parallel: 2,
2550            ..make_config()
2551        };
2552        let mut scheduler = DagScheduler::new(
2553            graph,
2554            &config,
2555            Box::new(FirstRouter),
2556            vec![make_def("worker")],
2557        )
2558        .unwrap();
2559
2560        let actions = scheduler.tick();
2561        let spawn_count = actions
2562            .iter()
2563            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2564            .count();
2565        assert_eq!(
2566            spawn_count, 2,
2567            "only max_parallel=2 tasks dispatched per tick"
2568        );
2569
2570        let running_count = scheduler
2571            .graph
2572            .tasks
2573            .iter()
2574            .filter(|t| t.status == TaskStatus::Running)
2575            .count();
2576        assert_eq!(running_count, 2, "only 2 tasks marked Running");
2577    }
2578
2579    #[test]
2580    fn test_batch_backoff_partial_success() {
2581        // Some spawns succeed, some hit ConcurrencyLimit: counter resets to 0.
2582        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2583        let mut scheduler = make_scheduler(graph);
2584        scheduler.consecutive_spawn_failures = 3;
2585
2586        scheduler.record_batch_backoff(true, true);
2587        assert_eq!(
2588            scheduler.consecutive_spawn_failures, 0,
2589            "any success in batch must reset counter"
2590        );
2591    }
2592
2593    #[test]
2594    fn test_batch_backoff_all_failed() {
2595        // All spawns hit ConcurrencyLimit: counter increments by 1.
2596        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2597        let mut scheduler = make_scheduler(graph);
2598        scheduler.consecutive_spawn_failures = 2;
2599
2600        scheduler.record_batch_backoff(false, true);
2601        assert_eq!(
2602            scheduler.consecutive_spawn_failures, 3,
2603            "all-failure tick must increment counter"
2604        );
2605    }
2606
2607    #[test]
2608    fn test_batch_backoff_no_spawns() {
2609        // No spawn actions in tick: counter unchanged.
2610        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2611        let mut scheduler = make_scheduler(graph);
2612        scheduler.consecutive_spawn_failures = 5;
2613
2614        scheduler.record_batch_backoff(false, false);
2615        assert_eq!(
2616            scheduler.consecutive_spawn_failures, 5,
2617            "no spawns must not change counter"
2618        );
2619    }
2620
2621    #[test]
2622    fn test_buffer_guard_uses_task_count() {
2623        // Structural guard: verifies that the buffer capacity expression uses
2624        // graph.tasks.len() * 2 rather than max_parallel * 2. This is an intentional
2625        // regression-prevention test — if wait_event() is accidentally reverted to
2626        // max_parallel * 2 the assertion below catches the discrepancy.
2627        // Behavioral coverage (actual buffer drop prevention) requires an async harness
2628        // with a real channel, which is outside the scope of this unit test.
2629        //
2630        // Scenario: 10 tasks, max_parallel=2 → tasks.len()*2=20, max_parallel*2=4.
2631        // The guard must use 20, not 4.
2632        let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2633        let graph = graph_from_nodes(nodes);
2634        let config = zeph_config::OrchestrationConfig {
2635            max_parallel: 2, // 2*2=4, but tasks.len()*2=20
2636            ..make_config()
2637        };
2638        let scheduler = DagScheduler::new(
2639            graph,
2640            &config,
2641            Box::new(FirstRouter),
2642            vec![make_def("worker")],
2643        )
2644        .unwrap();
2645        // Confirm: tasks.len() * 2 = 20, max_parallel * 2 = 4.
2646        assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2647        assert_eq!(scheduler.max_parallel * 2, 4);
2648    }
2649
2650    #[test]
2651    fn test_batch_mixed_concurrency_and_fatal_failure() {
2652        // Mixed batch: task 0 gets ConcurrencyLimit (transient), task 1 gets a
2653        // non-transient Spawn error (fatal). Two independent tasks, no deps between them.
2654        // Verify:
2655        // - task 0 reverts to Ready (retried next tick)
2656        // - task 1 is marked Failed; with FailureStrategy::Skip the graph stays Running
2657        //   because task 1 has no dependents that would abort the graph
2658        // - record_batch_backoff(false, true) increments counter by 1
2659        let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2660        // FailureStrategy::Skip: task 1 fails but its absence is ignored.
2661        nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2662        let graph = graph_from_nodes(nodes);
2663        let mut scheduler = make_scheduler(graph);
2664
2665        // Optimistically mark both as Running (as tick() would do).
2666        scheduler.graph.tasks[0].status = TaskStatus::Running;
2667        scheduler.graph.tasks[1].status = TaskStatus::Running;
2668
2669        // Task 0: ConcurrencyLimit (transient).
2670        let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2671        let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2672        assert!(
2673            actions0.is_empty(),
2674            "ConcurrencyLimit must produce no extra actions"
2675        );
2676        assert_eq!(
2677            scheduler.graph.tasks[0].status,
2678            TaskStatus::Ready,
2679            "task 0 must revert to Ready"
2680        );
2681
2682        // Task 1: non-transient Spawn failure. record_spawn_failure marks it Failed,
2683        // then propagate_failure applies FailureStrategy::Skip → status becomes Skipped.
2684        let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2685        let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2686        assert_eq!(
2687            scheduler.graph.tasks[1].status,
2688            TaskStatus::Skipped,
2689            "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2690        );
2691        // No Done action from record_spawn_failure — graph still has task 0 alive.
2692        assert!(
2693            actions1
2694                .iter()
2695                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2696            "no Done action expected: task 0 is still Ready"
2697        );
2698
2699        // Batch result: no success, one ConcurrencyLimit failure.
2700        scheduler.consecutive_spawn_failures = 0;
2701        scheduler.record_batch_backoff(false, true);
2702        assert_eq!(
2703            scheduler.consecutive_spawn_failures, 1,
2704            "batch with only ConcurrencyLimit must increment counter"
2705        );
2706    }
2707
2708    /// Regression for #1879: when the scheduler detects a deadlock (no running or ready tasks,
2709    /// but the graph is not complete), all non-terminal tasks must be marked Canceled, not left
2710    /// in their previous status (e.g. Pending).
2711    #[test]
2712    fn test_deadlock_marks_non_terminal_tasks_canceled() {
2713        // Build a graph in Failed status (as if a prior retry pass left task 0 failed and
2714        // task 1/2 still Pending). resume_from() transitions it to Running without resetting
2715        // task statuses, so tick() immediately sees no running, no ready, not all terminal —
2716        // triggering the deadlock branch.
2717        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2718        nodes[0].status = TaskStatus::Failed;
2719        nodes[1].status = TaskStatus::Pending;
2720        nodes[2].status = TaskStatus::Pending;
2721
2722        let mut graph = graph_from_nodes(nodes);
2723        graph.status = GraphStatus::Failed;
2724
2725        let mut scheduler = DagScheduler::resume_from(
2726            graph,
2727            &make_config(),
2728            Box::new(FirstRouter),
2729            vec![make_def("worker")],
2730        )
2731        .unwrap();
2732
2733        // After resume_from, graph is Running but no tasks are Ready/Running — deadlock.
2734        let actions = scheduler.tick();
2735
2736        // Must emit Done(Failed).
2737        assert!(
2738            actions.iter().any(|a| matches!(
2739                a,
2740                SchedulerAction::Done {
2741                    status: GraphStatus::Failed
2742                }
2743            )),
2744            "deadlock must emit Done(Failed); got: {actions:?}"
2745        );
2746        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2747
2748        // task 0 was already Failed (terminal) — must remain unchanged.
2749        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2750        // task 1 was Pending (non-terminal) — must be Canceled.
2751        assert_eq!(
2752            scheduler.graph.tasks[1].status,
2753            TaskStatus::Canceled,
2754            "Pending task must be Canceled on deadlock"
2755        );
2756        // task 2 was Pending (non-terminal) — must be Canceled.
2757        assert_eq!(
2758            scheduler.graph.tasks[2].status,
2759            TaskStatus::Canceled,
2760            "Pending task must be Canceled on deadlock"
2761        );
2762    }
2763
2764    /// Regression for #1879: deadlock with one task Running should NOT trigger the deadlock
2765    /// branch (`running_in_graph_now` > 0 suppresses the check).
2766    #[test]
2767    fn test_deadlock_not_triggered_when_task_running() {
2768        // Graph in Failed with one task still marked Running — resume_from reconstructs
2769        // the running map. tick() sees running_in_graph_now > 0 and skips deadlock check.
2770        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2771        nodes[0].status = TaskStatus::Running;
2772        nodes[0].assigned_agent = Some("handle-1".into());
2773        nodes[1].status = TaskStatus::Pending;
2774
2775        let mut graph = graph_from_nodes(nodes);
2776        graph.status = GraphStatus::Failed;
2777
2778        let mut scheduler = DagScheduler::resume_from(
2779            graph,
2780            &make_config(),
2781            Box::new(FirstRouter),
2782            vec![make_def("worker")],
2783        )
2784        .unwrap();
2785
2786        let actions = scheduler.tick();
2787
2788        // Running task in graph — no deadlock triggered.
2789        assert!(
2790            actions
2791                .iter()
2792                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2793            "no Done action expected when a task is running; got: {actions:?}"
2794        );
2795        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2796    }
2797
2798    // --- topology_selection tests ---
2799
2800    #[test]
2801    fn topology_linear_chain_limits_parallelism_to_one() {
2802        // LinearChain topology with topology_selection=true → max_parallel overridden to 1.
2803        // tick() must dispatch exactly 1 task even though 1 root task is ready.
2804        let graph = graph_from_nodes(vec![
2805            make_node(0, &[]),
2806            make_node(1, &[0]),
2807            make_node(2, &[1]),
2808        ]);
2809        let config = zeph_config::OrchestrationConfig {
2810            topology_selection: true,
2811            max_parallel: 4,
2812            ..make_config()
2813        };
2814        let mut scheduler = DagScheduler::new(
2815            graph,
2816            &config,
2817            Box::new(FirstRouter),
2818            vec![make_def("worker")],
2819        )
2820        .unwrap();
2821
2822        assert_eq!(
2823            scheduler.topology().topology,
2824            crate::topology::Topology::LinearChain
2825        );
2826        assert_eq!(scheduler.max_parallel, 1);
2827
2828        let actions = scheduler.tick();
2829        let spawn_count = actions
2830            .iter()
2831            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2832            .count();
2833        assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2834    }
2835
2836    #[test]
2837    fn topology_all_parallel_dispatches_all_ready() {
2838        // AllParallel topology with topology_selection=true → max_parallel unchanged.
2839        // tick() dispatches all 4 independent tasks in one go.
2840        let graph = graph_from_nodes(vec![
2841            make_node(0, &[]),
2842            make_node(1, &[]),
2843            make_node(2, &[]),
2844            make_node(3, &[]),
2845        ]);
2846        let config = zeph_config::OrchestrationConfig {
2847            topology_selection: true,
2848            max_parallel: 4,
2849            ..make_config()
2850        };
2851        let mut scheduler = DagScheduler::new(
2852            graph,
2853            &config,
2854            Box::new(FirstRouter),
2855            vec![make_def("worker")],
2856        )
2857        .unwrap();
2858
2859        assert_eq!(
2860            scheduler.topology().topology,
2861            crate::topology::Topology::AllParallel
2862        );
2863
2864        let actions = scheduler.tick();
2865        let spawn_count = actions
2866            .iter()
2867            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2868            .count();
2869        assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2870    }
2871
2872    #[test]
2873    fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2874        // Three ready tasks: A(sequential), B(sequential), C(parallel).
2875        // tick() must dispatch A + C, hold B (another sequential already scheduled this tick).
2876        use crate::graph::ExecutionMode;
2877
2878        let mut a = make_node(0, &[]);
2879        a.execution_mode = ExecutionMode::Sequential;
2880        let mut b = make_node(1, &[]);
2881        b.execution_mode = ExecutionMode::Sequential;
2882        let mut c = make_node(2, &[]);
2883        c.execution_mode = ExecutionMode::Parallel;
2884
2885        let graph = graph_from_nodes(vec![a, b, c]);
2886        let config = zeph_config::OrchestrationConfig {
2887            max_parallel: 4,
2888            ..make_config()
2889        };
2890        let mut scheduler = DagScheduler::new(
2891            graph,
2892            &config,
2893            Box::new(FirstRouter),
2894            vec![make_def("worker")],
2895        )
2896        .unwrap();
2897
2898        let actions = scheduler.tick();
2899        let spawned: Vec<TaskId> = actions
2900            .iter()
2901            .filter_map(|a| {
2902                if let SchedulerAction::Spawn { task_id, .. } = a {
2903                    Some(*task_id)
2904                } else {
2905                    None
2906                }
2907            })
2908            .collect();
2909
2910        // A (seq, idx=0) and C (par, idx=2) dispatched; B (seq, idx=1) held.
2911        assert!(
2912            spawned.contains(&TaskId(0)),
2913            "A(sequential) must be dispatched"
2914        );
2915        assert!(
2916            spawned.contains(&TaskId(2)),
2917            "C(parallel) must be dispatched"
2918        );
2919        assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2920        assert_eq!(spawned.len(), 2);
2921    }
2922
2923    // --- inject_tasks replan cap tests (#2241) ---
2924
2925    #[test]
2926    fn test_inject_tasks_per_task_cap_skips_second() {
2927        // Per-task cap: 1 replan per task. Second inject for same task_id is a silent no-op.
2928        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2929        let mut scheduler = make_scheduler(graph);
2930
2931        let first = make_node(2, &[]);
2932        scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2933        assert_eq!(
2934            scheduler.graph.tasks.len(),
2935            3,
2936            "first inject must append the task"
2937        );
2938        assert_eq!(scheduler.global_replan_count, 1);
2939
2940        // Second inject for the same verified task — per-task count is already 1.
2941        let second = make_node(3, &[]);
2942        scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2943        assert_eq!(
2944            scheduler.graph.tasks.len(),
2945            3,
2946            "second inject must be silently skipped (per-task cap)"
2947        );
2948        assert_eq!(
2949            scheduler.global_replan_count, 1,
2950            "global counter must not increment on skipped inject"
2951        );
2952    }
2953
2954    #[test]
2955    fn test_inject_tasks_global_cap_skips_when_exhausted() {
2956        // Global cap: max_replans=1. First inject consumes the budget; second is a no-op.
2957        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2958        let mut config = make_config();
2959        config.max_replans = 1;
2960        let defs = vec![make_def("worker")];
2961        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2962
2963        let new1 = make_node(2, &[]);
2964        scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2965        assert_eq!(scheduler.global_replan_count, 1);
2966
2967        // Second inject for a different task — global cap exhausted.
2968        let new2 = make_node(3, &[]);
2969        scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2970        assert_eq!(
2971            scheduler.graph.tasks.len(),
2972            3,
2973            "global cap must prevent the second inject"
2974        );
2975        assert_eq!(
2976            scheduler.global_replan_count, 1,
2977            "global counter must not increment past cap"
2978        );
2979    }
2980
2981    #[test]
2982    fn test_inject_tasks_sets_topology_dirty() {
2983        // inject_tasks must set topology_dirty; tick() must clear it after re-analysis.
2984        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2985        let mut scheduler = make_scheduler(graph);
2986        assert!(
2987            !scheduler.topology_dirty,
2988            "topology_dirty must be false initially"
2989        );
2990
2991        let new_task = make_node(1, &[]);
2992        scheduler
2993            .inject_tasks(TaskId(0), vec![new_task], 20)
2994            .unwrap();
2995        assert!(
2996            scheduler.topology_dirty,
2997            "inject_tasks must set topology_dirty=true"
2998        );
2999
3000        scheduler.tick();
3001        assert!(
3002            !scheduler.topology_dirty,
3003            "tick() must clear topology_dirty after re-analysis"
3004        );
3005    }
3006
3007    #[test]
3008    fn test_inject_tasks_rejects_cycle() {
3009        // Injecting a task that introduces a cycle must return VerificationFailed.
3010        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3011        let mut scheduler = make_scheduler(graph);
3012
3013        // New task ID=1 with a self-reference (depends on itself) → cycle.
3014        let cyclic_task = make_node(1, &[1]);
3015        let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
3016        assert!(result.is_err(), "cyclic injection must return an error");
3017        assert!(
3018            matches!(
3019                result.unwrap_err(),
3020                OrchestrationError::VerificationFailed(_)
3021            ),
3022            "must return VerificationFailed for cycle"
3023        );
3024        // Global and per-task counters must not be incremented on error.
3025        assert_eq!(scheduler.global_replan_count, 0);
3026        assert!(
3027            !scheduler.topology_dirty,
3028            "topology_dirty must not be set when inject fails"
3029        );
3030    }
3031
3032    // --- LevelBarrier dispatch tests (#2242) ---
3033
3034    fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
3035        zeph_config::OrchestrationConfig {
3036            topology_selection: true,
3037            max_parallel: 4,
3038            ..make_config()
3039        }
3040    }
3041
3042    /// A(0)→{B(1),C(2)}, B(1)→D(3). Hierarchical topology, depths: A=0, B=1, C=1, D=2.
3043    fn make_hierarchical_graph() -> TaskGraph {
3044        graph_from_nodes(vec![
3045            make_node(0, &[]),
3046            make_node(1, &[0]),
3047            make_node(2, &[0]),
3048            make_node(3, &[1]),
3049        ])
3050    }
3051
3052    #[test]
3053    fn test_level_barrier_advances_on_terminal_level() {
3054        // When all tasks at current_level are terminal, tick() advances current_level
3055        // and dispatches tasks at the next non-terminal level.
3056        let graph = make_hierarchical_graph();
3057        let config = make_hierarchical_config();
3058        let defs = vec![make_def("worker")];
3059        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3060
3061        assert_eq!(
3062            scheduler.topology().strategy,
3063            crate::topology::DispatchStrategy::LevelBarrier,
3064            "must use LevelBarrier strategy for Hierarchical graph"
3065        );
3066        assert_eq!(scheduler.current_level, 0);
3067
3068        // First tick: only A(0) at level 0 is dispatched.
3069        let actions = scheduler.tick();
3070        let spawned_ids: Vec<_> = actions
3071            .iter()
3072            .filter_map(|a| {
3073                if let SchedulerAction::Spawn { task_id, .. } = a {
3074                    Some(*task_id)
3075                } else {
3076                    None
3077                }
3078            })
3079            .collect();
3080        assert_eq!(
3081            spawned_ids,
3082            vec![TaskId(0)],
3083            "first tick must dispatch only A at level 0"
3084        );
3085
3086        // Simulate A completing: mark Completed, mark B and C Ready (deps satisfied).
3087        scheduler.graph.tasks[0].status = TaskStatus::Completed;
3088        scheduler.running.clear();
3089        scheduler.graph.tasks[1].status = TaskStatus::Ready;
3090        scheduler.graph.tasks[2].status = TaskStatus::Ready;
3091
3092        // Second tick: A is terminal → level advances to 1 → B and C dispatched.
3093        let actions2 = scheduler.tick();
3094        assert_eq!(
3095            scheduler.current_level, 1,
3096            "current_level must advance to 1 after level-0 tasks terminate"
3097        );
3098        let spawned2: Vec<_> = actions2
3099            .iter()
3100            .filter_map(|a| {
3101                if let SchedulerAction::Spawn { task_id, .. } = a {
3102                    Some(*task_id)
3103                } else {
3104                    None
3105                }
3106            })
3107            .collect();
3108        assert!(
3109            spawned2.contains(&TaskId(1)),
3110            "B must be dispatched after level advance"
3111        );
3112        assert!(
3113            spawned2.contains(&TaskId(2)),
3114            "C must be dispatched after level advance"
3115        );
3116    }
3117
3118    #[test]
3119    fn test_level_barrier_failure_propagates_transitively() {
3120        // When A fails with Skip strategy, propagate_failure() BFS-marks all
3121        // descendants (B, C, D) as Skipped. tick() must then advance past level 0.
3122        let graph = make_hierarchical_graph();
3123        let config = make_hierarchical_config();
3124        let defs = vec![make_def("worker")];
3125        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3126
3127        // Set A to Skip failure strategy and simulate it running.
3128        scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
3129        scheduler.graph.tasks[0].status = TaskStatus::Running;
3130        scheduler.running.insert(
3131            TaskId(0),
3132            RunningTask {
3133                agent_handle_id: "h0".to_string(),
3134                agent_def_name: "worker".to_string(),
3135                started_at: Instant::now(),
3136            },
3137        );
3138
3139        // Push a failure event for A.
3140        scheduler.buffered_events.push_back(TaskEvent {
3141            task_id: TaskId(0),
3142            agent_handle_id: "h0".to_string(),
3143            outcome: TaskOutcome::Failed {
3144                error: "simulated failure".to_string(),
3145            },
3146        });
3147
3148        scheduler.tick();
3149
3150        // A failed with Skip → A=Skipped. B, C, D must be transitively Skipped.
3151        assert_eq!(
3152            scheduler.graph.tasks[0].status,
3153            TaskStatus::Skipped,
3154            "A must be Skipped (Skip strategy)"
3155        );
3156        assert_eq!(
3157            scheduler.graph.tasks[1].status,
3158            TaskStatus::Skipped,
3159            "B must be transitively Skipped"
3160        );
3161        assert_eq!(
3162            scheduler.graph.tasks[2].status,
3163            TaskStatus::Skipped,
3164            "C must be transitively Skipped"
3165        );
3166        assert_eq!(
3167            scheduler.graph.tasks[3].status,
3168            TaskStatus::Skipped,
3169            "D must be transitively Skipped"
3170        );
3171    }
3172
3173    #[test]
3174    fn test_level_barrier_current_level_reset_after_inject() {
3175        // inject_tasks() adding a task at depth < current_level must cause tick() to
3176        // reset current_level downward via the .min() guard (critic C2 / issue #2242).
3177        let graph = make_hierarchical_graph(); // A(0)→{B(1),C(2)}, B(1)→D(3)
3178        let config = make_hierarchical_config();
3179        let defs = vec![make_def("worker")];
3180        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3181
3182        // Manually mark A, B, C as Completed (simulate levels 0 and 1 done).
3183        scheduler.graph.tasks[0].status = TaskStatus::Completed; // A depth 0
3184        scheduler.graph.tasks[1].status = TaskStatus::Completed; // B depth 1
3185        scheduler.graph.tasks[2].status = TaskStatus::Completed; // C depth 1
3186        // D(3) is Pending at depth 2. Manually set current_level to 2.
3187        scheduler.current_level = 2;
3188
3189        // Inject E(4) depending on A(0) (Completed) → E will be at depth 1 after re-analysis.
3190        // This is shallower than current_level=2 → tick() must reset current_level to 1.
3191        let e = make_node(4, &[0]);
3192        scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3193        assert!(scheduler.topology_dirty);
3194
3195        // tick() re-analyzes topology (E at depth 1, D at depth 2).
3196        // min_non_terminal_depth = 1 (E is Ready). current_level = min(2, 1) = 1.
3197        scheduler.tick();
3198        assert_eq!(
3199            scheduler.current_level, 1,
3200            "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3201        );
3202    }
3203
3204    #[test]
3205    fn resume_from_preserves_topology_classification() {
3206        // resume_from() must also apply topology classification (fix H3).
3207        let mut graph = graph_from_nodes(vec![
3208            make_node(0, &[]),
3209            make_node(1, &[0]),
3210            make_node(2, &[1]),
3211        ]);
3212        // Put graph in Paused state so resume_from accepts it.
3213        graph.status = GraphStatus::Paused;
3214        graph.tasks[0].status = TaskStatus::Completed;
3215        graph.tasks[1].status = TaskStatus::Pending;
3216        graph.tasks[2].status = TaskStatus::Pending;
3217
3218        let config = zeph_config::OrchestrationConfig {
3219            topology_selection: true,
3220            max_parallel: 4,
3221            ..make_config()
3222        };
3223        let scheduler = DagScheduler::resume_from(
3224            graph,
3225            &config,
3226            Box::new(FirstRouter),
3227            vec![make_def("worker")],
3228        )
3229        .unwrap();
3230
3231        assert_eq!(
3232            scheduler.topology().topology,
3233            crate::topology::Topology::LinearChain,
3234            "resume_from must classify topology"
3235        );
3236        assert_eq!(
3237            scheduler.max_parallel, 1,
3238            "resume_from must apply topology limit"
3239        );
3240    }
3241
3242    // --- #2238: validate_verify_config tests ---
3243
3244    fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3245        zeph_config::OrchestrationConfig {
3246            verify_completeness: true,
3247            verify_provider: provider.to_string(),
3248            ..make_config()
3249        }
3250    }
3251
3252    #[test]
3253    fn validate_verify_config_unknown_provider_returns_err() {
3254        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3255        let config = make_verify_config("nonexistent");
3256        let scheduler = DagScheduler::new(
3257            graph,
3258            &config,
3259            Box::new(FirstRouter),
3260            vec![make_def("worker")],
3261        )
3262        .unwrap();
3263        let result = scheduler.validate_verify_config(&["fast", "quality"]);
3264        assert!(result.is_err());
3265        let err_msg = result.unwrap_err().to_string();
3266        assert!(err_msg.contains("nonexistent"));
3267        assert!(err_msg.contains("fast"));
3268    }
3269
3270    #[test]
3271    fn validate_verify_config_known_provider_returns_ok() {
3272        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3273        let config = make_verify_config("fast");
3274        let scheduler = DagScheduler::new(
3275            graph,
3276            &config,
3277            Box::new(FirstRouter),
3278            vec![make_def("worker")],
3279        )
3280        .unwrap();
3281        assert!(
3282            scheduler
3283                .validate_verify_config(&["fast", "quality"])
3284                .is_ok()
3285        );
3286    }
3287
3288    #[test]
3289    fn validate_verify_config_empty_provider_always_ok() {
3290        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3291        let config = make_verify_config("");
3292        let scheduler = DagScheduler::new(
3293            graph,
3294            &config,
3295            Box::new(FirstRouter),
3296            vec![make_def("worker")],
3297        )
3298        .unwrap();
3299        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3300    }
3301
3302    #[test]
3303    fn validate_verify_config_disabled_skips_validation() {
3304        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3305        // verify_completeness = false: no validation even with bogus provider name
3306        let scheduler = make_scheduler(graph);
3307        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3308    }
3309
3310    #[test]
3311    fn validate_verify_config_empty_pool_skips_validation() {
3312        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3313        let config = make_verify_config("nonexistent");
3314        let scheduler = DagScheduler::new(
3315            graph,
3316            &config,
3317            Box::new(FirstRouter),
3318            vec![make_def("worker")],
3319        )
3320        .unwrap();
3321        // Empty provider_names slice = unknown provider set, skip validation.
3322        assert!(scheduler.validate_verify_config(&[]).is_ok());
3323    }
3324
3325    #[test]
3326    fn validate_verify_config_trims_whitespace_in_config() {
3327        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3328        // verify_provider with surrounding whitespace in config is trimmed at construction.
3329        let config = make_verify_config("  fast  ");
3330        let scheduler = DagScheduler::new(
3331            graph,
3332            &config,
3333            Box::new(FirstRouter),
3334            vec![make_def("worker")],
3335        )
3336        .unwrap();
3337        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3338    }
3339
3340    // --- #2237 regression tests: max_parallel drift across replan cycles ---
3341
3342    #[test]
3343    fn config_max_parallel_initialized_from_config() {
3344        // config_max_parallel must always equal config.max_parallel, regardless
3345        // of whether topology analysis reduces max_parallel for the initial topology.
3346        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3347        let config = zeph_config::OrchestrationConfig {
3348            topology_selection: true,
3349            max_parallel: 6,
3350            ..make_config()
3351        };
3352        let scheduler = DagScheduler::new(
3353            graph,
3354            &config,
3355            Box::new(FirstRouter),
3356            vec![make_def("worker")],
3357        )
3358        .unwrap();
3359
3360        assert_eq!(
3361            scheduler.config_max_parallel, 6,
3362            "config_max_parallel must equal config.max_parallel"
3363        );
3364        // LinearChain reduces max_parallel to 1, but config_max_parallel stays at 6.
3365        assert_eq!(
3366            scheduler.max_parallel, 1,
3367            "max_parallel reduced by topology analysis"
3368        );
3369        assert_eq!(
3370            scheduler.config_max_parallel, 6,
3371            "config_max_parallel must not be reduced by topology"
3372        );
3373    }
3374
3375    #[test]
3376    fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3377        // Regression for #2237: successive inject_tasks+tick cycles with a Mixed graph
3378        // must not reduce max_parallel below compute_max_parallel(Mixed, config_max_parallel).
3379        //
3380        // Before the fix, the tick() dirty path used self.max_parallel as the base for
3381        // compute_max_parallel, so each replan cycle reduced it further:
3382        //   cycle 1: max_parallel = (4/2+1)      = 3
3383        //   cycle 2: max_parallel = (3/2+1)      = 2  ← drift!
3384        //   cycle 3: max_parallel = (2/2+1)      = 2
3385        //
3386        // After the fix, config_max_parallel=4 is always used as the base:
3387        //   all cycles: max_parallel = (4/2+1)   = 3  ← stable
3388        let graph = graph_from_nodes(vec![
3389            make_node(0, &[]),
3390            make_node(1, &[0]),
3391            make_node(2, &[0]),
3392            make_node(3, &[1, 2]), // diamond → Mixed
3393        ]);
3394        let config = zeph_config::OrchestrationConfig {
3395            topology_selection: true,
3396            max_parallel: 4,
3397            max_tasks: 50,
3398            ..make_config()
3399        };
3400        let mut scheduler = DagScheduler::new(
3401            graph,
3402            &config,
3403            Box::new(FirstRouter),
3404            vec![make_def("worker")],
3405        )
3406        .unwrap();
3407
3408        // Initial analysis: Mixed topology → max_parallel = (4/2+1) = 3.
3409        assert_eq!(
3410            scheduler.topology().topology,
3411            crate::topology::Topology::Mixed,
3412            "initial topology must be Mixed"
3413        );
3414        let expected_max_parallel = (4usize / 2 + 1).clamp(1, 4); // = 3
3415        assert_eq!(scheduler.max_parallel, expected_max_parallel);
3416
3417        // Simulate inject_tasks (which sets topology_dirty=true) followed by tick().
3418        // The injected task depends on task 3 to keep the graph Mixed.
3419        let extra_task_id = 4u32;
3420        let extra_task = {
3421            let mut n = crate::graph::TaskNode::new(
3422                extra_task_id,
3423                "extra".to_string(),
3424                "extra task injected by replan",
3425            );
3426            n.depends_on = vec![TaskId(3)];
3427            n
3428        };
3429
3430        // inject_tasks requires the verified task to be Completed.
3431        scheduler.graph.tasks[3].status = TaskStatus::Completed;
3432
3433        scheduler
3434            .inject_tasks(TaskId(3), vec![extra_task], 50)
3435            .expect("inject must succeed");
3436        assert!(
3437            scheduler.topology_dirty,
3438            "topology_dirty must be true after inject"
3439        );
3440
3441        // First tick() after inject: re-analyzes topology. Must use config_max_parallel as base.
3442        let _ = scheduler.tick();
3443        let max_after_first_inject = scheduler.max_parallel;
3444        assert_eq!(
3445            max_after_first_inject, expected_max_parallel,
3446            "max_parallel must not drift after first inject+tick"
3447        );
3448
3449        // Second inject+tick cycle: max_parallel must still equal the original computed value.
3450        let extra_task2 = {
3451            let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3452            n.depends_on = vec![TaskId(extra_task_id)];
3453            n
3454        };
3455        scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3456        // Reset to created-like state to allow a second inject (per-task limit is 1,
3457        // so use a fresh task ID for the verified source).
3458        scheduler
3459            .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3460            .expect("second inject must succeed");
3461
3462        let _ = scheduler.tick();
3463        let max_after_second_inject = scheduler.max_parallel;
3464        assert_eq!(
3465            max_after_second_inject, expected_max_parallel,
3466            "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3467        );
3468    }
3469
3470    // --- VMAO adaptive replanning accessor tests ---
3471
3472    #[test]
3473    fn completeness_threshold_returns_config_value() {
3474        let mut config = make_config();
3475        config.completeness_threshold = 0.85;
3476        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3477        let scheduler =
3478            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3479        assert!((scheduler.completeness_threshold() - 0.85).abs() < f32::EPSILON);
3480    }
3481
3482    #[test]
3483    fn completeness_threshold_default_is_0_7() {
3484        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3485        let scheduler = make_scheduler(graph);
3486        assert!((scheduler.completeness_threshold() - 0.7).abs() < f32::EPSILON);
3487    }
3488
3489    #[test]
3490    fn verify_provider_name_returns_config_value() {
3491        let mut config = make_config();
3492        config.verify_provider = "fast".to_string();
3493        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3494        let scheduler =
3495            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3496        assert_eq!(scheduler.verify_provider_name(), "fast");
3497    }
3498
3499    #[test]
3500    fn verify_provider_name_empty_when_not_set() {
3501        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3502        let scheduler = make_scheduler(graph);
3503        assert_eq!(scheduler.verify_provider_name(), "");
3504    }
3505
3506    #[test]
3507    fn max_replans_remaining_initial_equals_max_replans() {
3508        let mut config = make_config();
3509        config.max_replans = 3;
3510        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3511        let scheduler =
3512            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3513        assert_eq!(scheduler.max_replans_remaining(), 3);
3514    }
3515
3516    #[test]
3517    fn max_replans_remaining_decrements_after_record() {
3518        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3519        let mut scheduler = make_scheduler(graph);
3520        assert_eq!(scheduler.max_replans_remaining(), 2);
3521        scheduler.record_whole_plan_replan();
3522        assert_eq!(scheduler.max_replans_remaining(), 1);
3523        scheduler.record_whole_plan_replan();
3524        assert_eq!(scheduler.max_replans_remaining(), 0);
3525        // saturating: stays at 0
3526        scheduler.record_whole_plan_replan();
3527        assert_eq!(scheduler.max_replans_remaining(), 0);
3528    }
3529
3530    #[test]
3531    fn record_whole_plan_replan_does_not_modify_graph() {
3532        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3533        let mut scheduler = make_scheduler(graph);
3534        let task_count_before = scheduler.graph().tasks.len();
3535        scheduler.record_whole_plan_replan();
3536        assert_eq!(
3537            scheduler.graph().tasks.len(),
3538            task_count_before,
3539            "record_whole_plan_replan must not modify the task graph"
3540        );
3541    }
3542
3543    // --- cascade routing tests ---
3544
3545    fn make_cascade_config() -> zeph_config::OrchestrationConfig {
3546        zeph_config::OrchestrationConfig {
3547            topology_selection: true,
3548            cascade_routing: true,
3549            cascade_failure_threshold: 0.4,
3550            max_parallel: 4,
3551            ..make_config()
3552        }
3553    }
3554
3555    #[test]
3556    fn inject_tasks_resets_cascade_detector() {
3557        // inject_tasks() must call cascade_detector.reset() (C13 fix).
3558        // Verify: recording a failure before inject makes the region healthy again after.
3559        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3560        graph.tasks[0].status = TaskStatus::Completed;
3561        graph.tasks[1].status = TaskStatus::Completed;
3562        let config = make_cascade_config();
3563        let mut scheduler = DagScheduler::new(
3564            graph,
3565            &config,
3566            Box::new(FirstRouter),
3567            vec![make_def("worker")],
3568        )
3569        .unwrap();
3570
3571        // Record a failure in the detector (simulates a failed task before inject).
3572        if let Some(ref mut det) = scheduler.cascade_detector {
3573            let g = &scheduler.graph;
3574            det.record_outcome(TaskId(1), false, g);
3575            // Sanity: region has 1 entry.
3576            assert_eq!(det.region_health().len(), 1);
3577        } else {
3578            panic!(
3579                "cascade_detector must be Some when cascade_routing=true and topology_selection=true"
3580            );
3581        }
3582
3583        // inject_tasks resets the detector.
3584        let new_task = make_node(2, &[1]);
3585        scheduler
3586            .inject_tasks(TaskId(1), vec![new_task], 20)
3587            .unwrap();
3588
3589        assert!(
3590            scheduler
3591                .cascade_detector
3592                .as_ref()
3593                .is_some_and(|d| d.region_health().is_empty()),
3594            "cascade_detector must be cleared after inject_tasks (C13 fix)"
3595        );
3596    }
3597
3598    #[test]
3599    fn sequential_tasks_not_reordered_by_cascade() {
3600        // Sequential tasks must stay at the front of the dispatch queue even when
3601        // their region is cascading (C14 fix): they must not be moved to the deferred set.
3602        //
3603        // Graph: root0 (healthy region), root1 -> t2 (cascading region, Sequential).
3604        // After injecting failures for root1's region, deprioritized = {root1, t2}.
3605        // But t2 is Sequential — it must stay in "preferred" partition.
3606        let mut graph = graph_from_nodes(vec![
3607            make_node(0, &[]),  // root for healthy region
3608            make_node(1, &[]),  // root for cascading region
3609            make_node(2, &[1]), // child of root1
3610        ]);
3611        graph.tasks[2].execution_mode = ExecutionMode::Sequential;
3612        let config = make_cascade_config();
3613        let mut scheduler = DagScheduler::new(
3614            graph,
3615            &config,
3616            Box::new(FirstRouter),
3617            vec![make_def("worker")],
3618        )
3619        .unwrap();
3620
3621        // Record failures for root1's region to make it cascade.
3622        if let Some(ref mut det) = scheduler.cascade_detector {
3623            let g = &scheduler.graph;
3624            // Two failures → rate 1.0 > threshold 0.4
3625            det.record_outcome(TaskId(1), false, g);
3626            det.record_outcome(TaskId(2), false, g);
3627        } else {
3628            panic!("cascade_detector must be Some");
3629        }
3630
3631        // Mark root0 and root1 as Ready (inject nothing — just check tick dispatch order).
3632        // tick() should put Sequential task in preferred, not deferred.
3633        let actions = scheduler.tick();
3634
3635        // Both root tasks (0 and 1) are Ready and root1 is in a cascading region,
3636        // but t2 (Sequential) must not be deprioritized. We verify by checking that
3637        // task 1 (root of cascading region) and task 2 (Sequential child) both appear
3638        // in the Spawn actions — they are not silently deferred behind root0.
3639        let spawned_ids: Vec<TaskId> = actions
3640            .iter()
3641            .filter_map(|a| {
3642                if let SchedulerAction::Spawn { task_id, .. }
3643                | SchedulerAction::RunInline { task_id, .. } = a
3644                {
3645                    Some(*task_id)
3646                } else {
3647                    None
3648                }
3649            })
3650            .collect();
3651
3652        // root0 and root1 are both roots (in-degree 0). root1 is deprioritized,
3653        // but t2 (Sequential) stays in preferred. At minimum root0 or t2/root1 must be spawned.
3654        // The key invariant: the presence of at least one spawn confirms the sequential
3655        // task was not silently dropped.
3656        assert!(
3657            !spawned_ids.is_empty(),
3658            "tick must dispatch at least one ready task; Sequential tasks must not be dropped by cascade logic"
3659        );
3660    }
3661
3662    #[test]
3663    fn cascade_routing_without_topology_selection_creates_no_detector() {
3664        // cascade_routing=true but topology_selection=false must not create a detector
3665        // (the constructor emits a warn but does not fail).
3666        let config = zeph_config::OrchestrationConfig {
3667            cascade_routing: true,
3668            topology_selection: false,
3669            ..make_config()
3670        };
3671        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3672        let scheduler = DagScheduler::new(
3673            graph,
3674            &config,
3675            Box::new(FirstRouter),
3676            vec![make_def("worker")],
3677        )
3678        .unwrap();
3679        assert!(
3680            scheduler.cascade_detector.is_none(),
3681            "cascade_detector must be None when topology_selection=false"
3682        );
3683    }
3684}