Skip to main content

zeph_orchestration/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! DAG execution scheduler: drives task graph execution by emitting `SchedulerAction` commands.
5
6use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::cascade::{CascadeConfig, CascadeDetector};
14use super::dag;
15use super::error::OrchestrationError;
16use super::graph::{
17    ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
18};
19use super::router::AgentRouter;
20use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
21use super::verifier::inject_tasks as verifier_inject_tasks;
22use zeph_config::OrchestrationConfig;
23use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
24use zeph_subagent::{SubAgentDef, SubAgentError};
25
26/// Actions the scheduler requests the caller to perform.
27///
28/// The scheduler never holds `&mut SubAgentManager` — it produces these
29/// actions for the caller to execute (ADR-026 command pattern).
30#[derive(Debug)]
31pub enum SchedulerAction {
32    /// Spawn a sub-agent for a task.
33    Spawn {
34        task_id: TaskId,
35        agent_def_name: String,
36        prompt: String,
37    },
38    /// Cancel a running sub-agent (on graph abort/skip).
39    Cancel { agent_handle_id: String },
40    /// Execute a task inline via the main agent (no sub-agents configured).
41    RunInline { task_id: TaskId, prompt: String },
42    /// Graph reached a terminal or paused state.
43    Done { status: GraphStatus },
44    /// Request verification of a completed task's output (emitted when `verify_completeness=true`).
45    ///
46    /// The task stays `Completed` during verification. Downstream tasks are unblocked
47    /// immediately — verification is best-effort and does not gate dispatch.
48    /// The caller runs `PlanVerifier::verify()`, then optionally `PlanVerifier::replan()`,
49    /// then calls `DagScheduler::inject_tasks()` if new tasks were generated.
50    Verify { task_id: TaskId, output: String },
51}
52
53/// Event sent by a sub-agent loop when it terminates.
54#[derive(Debug)]
55pub struct TaskEvent {
56    pub task_id: TaskId,
57    pub agent_handle_id: String,
58    pub outcome: TaskOutcome,
59}
60
61/// Outcome of a sub-agent execution.
62#[derive(Debug)]
63pub enum TaskOutcome {
64    /// Agent completed successfully.
65    Completed {
66        output: String,
67        artifacts: Vec<PathBuf>,
68    },
69    /// Agent failed.
70    Failed { error: String },
71}
72
73/// Tracks a running task's spawn time and definition name for timeout detection.
74struct RunningTask {
75    agent_handle_id: String,
76    agent_def_name: String,
77    started_at: Instant,
78}
79
80/// DAG execution engine.
81///
82/// Drives task graph execution by producing `SchedulerAction` values
83/// that the caller executes against `SubAgentManager`.
84///
85/// # Caller Loop
86///
87/// ```text
88/// loop {
89///     let actions = scheduler.tick();
90///     for action in actions {
91///         match action {
92///             Spawn { task_id, agent_def_name, prompt } => {
93///                 match manager.spawn_for_task(...) {
94///                     Ok(handle_id) => scheduler.record_spawn(task_id, handle_id),
95///                     Err(e) => { for a in scheduler.record_spawn_failure(task_id, &e) { /* exec */ } }
96///                 }
97///             }
98///             Cancel { agent_handle_id } => { manager.cancel(&agent_handle_id); }
99///             Done { .. } => break,
100///         }
101///     }
102///     scheduler.wait_event().await;
103/// }
104/// ```
105#[allow(clippy::struct_excessive_bools)]
106pub struct DagScheduler {
107    graph: TaskGraph,
108    max_parallel: usize,
109    /// Immutable base parallelism limit from config. Never changes after construction.
110    ///
111    /// `max_parallel` is derived from this via `TopologyClassifier::compute_max_parallel`
112    /// and may be lower (e.g., 1 for `LinearChain`). Using `config_max_parallel` as the
113    /// base prevents drift: successive replan cycles always compute from the original
114    /// config value, not from a previously reduced `max_parallel`.
115    config_max_parallel: usize,
116    /// Maps `TaskId` -> running sub-agent state.
117    running: HashMap<TaskId, RunningTask>,
118    /// Receives completion/failure events from sub-agent loops.
119    event_rx: mpsc::Receiver<TaskEvent>,
120    /// Sender cloned into each spawned sub-agent via `spawn_for_task`.
121    event_tx: mpsc::Sender<TaskEvent>,
122    /// Per-task wall-clock timeout.
123    task_timeout: Duration,
124    /// Router for agent selection.
125    router: Box<dyn AgentRouter>,
126    /// Available agent definitions (cached from `SubAgentManager`).
127    available_agents: Vec<SubAgentDef>,
128    /// Total character budget for cross-task dependency context injection.
129    dependency_context_budget: usize,
130    /// Events buffered by `wait_event` for processing in the next `tick`.
131    buffered_events: VecDeque<TaskEvent>,
132    /// Sanitizer for dependency output injected into task prompts (SEC-ORCH-01).
133    sanitizer: ContentSanitizer,
134    /// Backoff duration before retrying deferred tasks when all ready tasks hit the concurrency limit.
135    deferral_backoff: Duration,
136    /// Consecutive spawn failures due to concurrency limits. Used to compute exponential backoff.
137    consecutive_spawn_failures: u32,
138    /// Topology analysis result. Recomputed on next tick when `topology_dirty=true`.
139    topology: TopologyAnalysis,
140    /// When true, topology is re-analyzed at the start of the next tick.
141    /// Set by `inject_tasks()` after appending replan tasks (critic C2).
142    topology_dirty: bool,
143    /// Current dispatch level for `LevelBarrier` strategy.
144    current_level: usize,
145    /// Whether post-task verification is enabled (`config.verify_completeness`).
146    verify_completeness: bool,
147    /// Provider name for verification LLM calls (`config.verify_provider`).
148    /// Empty string = use the agent's primary provider.
149    verify_provider: String,
150    /// Per-task replan count. Limits replanning to 1 cycle per task (critic S2).
151    task_replan_counts: HashMap<TaskId, u32>,
152    /// Global replan counter across the entire scheduler run (critic S2).
153    global_replan_count: u32,
154    /// Global replan hard cap from config.
155    max_replans: u32,
156    /// Completeness score threshold from config. Replan is triggered when
157    /// `VerificationResult::confidence < completeness_threshold_value` AND gaps exist.
158    completeness_threshold_value: f32,
159    /// Cascade failure detector. `Some` when `cascade_routing = true`.
160    cascade_detector: Option<CascadeDetector>,
161    /// Whether `tree_optimized_dispatch` was enabled at construction.
162    /// Stored so the dirty-reanalysis path can reproduce the same strategy mapping.
163    tree_optimized_dispatch: bool,
164    /// Whether `cascade_routing` was enabled at construction.
165    cascade_routing: bool,
166}
167
168impl std::fmt::Debug for DagScheduler {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("DagScheduler")
171            .field("graph_id", &self.graph.id)
172            .field("graph_status", &self.graph.status)
173            .field("running_count", &self.running.len())
174            .field("max_parallel", &self.max_parallel)
175            .field("task_timeout_secs", &self.task_timeout.as_secs())
176            .field("topology", &self.topology.topology)
177            .field("strategy", &self.topology.strategy)
178            .field("current_level", &self.current_level)
179            .field("global_replan_count", &self.global_replan_count)
180            .field("cascade_routing", &self.cascade_routing)
181            .field("tree_optimized_dispatch", &self.tree_optimized_dispatch)
182            .finish_non_exhaustive()
183    }
184}
185
186impl DagScheduler {
187    /// Create a new scheduler for the given graph.
188    ///
189    /// The graph must be in `Created` status. The scheduler transitions
190    /// it to `Running` and marks root tasks as `Ready`.
191    ///
192    /// # Errors
193    ///
194    /// Returns `OrchestrationError::InvalidGraph` if the graph is not in
195    /// `Created` status or has no tasks.
196    pub fn new(
197        mut graph: TaskGraph,
198        config: &OrchestrationConfig,
199        router: Box<dyn AgentRouter>,
200        available_agents: Vec<SubAgentDef>,
201    ) -> Result<Self, OrchestrationError> {
202        if graph.status != GraphStatus::Created {
203            return Err(OrchestrationError::InvalidGraph(format!(
204                "graph must be in Created status, got {}",
205                graph.status
206            )));
207        }
208
209        dag::validate(&graph.tasks, config.max_tasks as usize)?;
210
211        graph.status = GraphStatus::Running;
212
213        for task in &mut graph.tasks {
214            if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
215                task.status = TaskStatus::Ready;
216            }
217        }
218
219        let (event_tx, event_rx) = mpsc::channel(64);
220
221        let task_timeout = if config.task_timeout_secs > 0 {
222            Duration::from_secs(config.task_timeout_secs)
223        } else {
224            Duration::from_secs(600)
225        };
226
227        let topology = TopologyClassifier::analyze(&graph, config);
228        let max_parallel = topology.max_parallel;
229        let config_max_parallel = config.max_parallel as usize;
230
231        if config.topology_selection {
232            tracing::debug!(
233                topology = ?topology.topology,
234                strategy = ?topology.strategy,
235                max_parallel,
236                "topology-aware concurrency limit applied"
237            );
238        }
239
240        // Validate cascade_routing dependency on topology_selection.
241        if config.cascade_routing && !config.topology_selection {
242            tracing::warn!(
243                "cascade_routing = true requires topology_selection = true; \
244                 cascade routing is disabled (topology_selection is off)"
245            );
246        }
247
248        let cascade_detector = if config.cascade_routing && config.topology_selection {
249            Some(CascadeDetector::new(CascadeConfig {
250                failure_threshold: config.cascade_failure_threshold,
251            }))
252        } else {
253            None
254        };
255
256        Ok(Self {
257            graph,
258            max_parallel,
259            config_max_parallel,
260            running: HashMap::new(),
261            event_rx,
262            event_tx,
263            task_timeout,
264            router,
265            available_agents,
266            dependency_context_budget: config.dependency_context_budget,
267            buffered_events: VecDeque::new(),
268            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
269            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
270            consecutive_spawn_failures: 0,
271            topology,
272            topology_dirty: false,
273            current_level: 0,
274            verify_completeness: config.verify_completeness,
275            verify_provider: config.verify_provider.as_str().trim().to_owned(),
276            task_replan_counts: HashMap::new(),
277            global_replan_count: 0,
278            max_replans: config.max_replans,
279            completeness_threshold_value: config.completeness_threshold,
280            cascade_detector,
281            tree_optimized_dispatch: config.tree_optimized_dispatch,
282            cascade_routing: config.cascade_routing && config.topology_selection,
283        })
284    }
285
286    /// Create a scheduler from a graph that is in `Paused` or `Failed` status.
287    ///
288    /// Used for resume and retry flows. The caller is responsible for calling
289    /// [`dag::reset_for_retry`] (for retry) before passing the graph here.
290    ///
291    /// This constructor sets `graph.status = Running` (II3) and reconstructs
292    /// the `running` map from tasks that are still in `Running` state (IC1), so
293    /// their completion events are not silently dropped on the next tick.
294    ///
295    /// # Errors
296    ///
297    /// Returns `OrchestrationError::InvalidGraph` if the graph is in `Completed`
298    /// or `Canceled` status (terminal states that cannot be resumed).
299    pub fn resume_from(
300        mut graph: TaskGraph,
301        config: &OrchestrationConfig,
302        router: Box<dyn AgentRouter>,
303        available_agents: Vec<SubAgentDef>,
304    ) -> Result<Self, OrchestrationError> {
305        if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
306            return Err(OrchestrationError::InvalidGraph(format!(
307                "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
308                graph.status
309            )));
310        }
311
312        // II3: ensure the graph is in Running state so tick() does not immediately
313        // return Done{Paused}.
314        graph.status = GraphStatus::Running;
315
316        // IC1: reconstruct the `running` map from tasks that were still Running at
317        // pause time. Without this their completion events would arrive but
318        // process_event would ignore them (it checks self.running), leaving the
319        // task stuck until timeout.
320        let running: HashMap<TaskId, RunningTask> = graph
321            .tasks
322            .iter()
323            .filter(|t| t.status == TaskStatus::Running)
324            .filter_map(|t| {
325                let handle_id = t.assigned_agent.clone()?;
326                let def_name = t.agent_hint.clone().unwrap_or_default();
327                Some((
328                    t.id,
329                    RunningTask {
330                        agent_handle_id: handle_id,
331                        agent_def_name: def_name,
332                        // Conservative: treat as just-started so timeout window is reset.
333                        started_at: Instant::now(),
334                    },
335                ))
336            })
337            .collect();
338
339        let (event_tx, event_rx) = mpsc::channel(64);
340
341        let task_timeout = if config.task_timeout_secs > 0 {
342            Duration::from_secs(config.task_timeout_secs)
343        } else {
344            Duration::from_secs(600)
345        };
346
347        let topology = TopologyClassifier::analyze(&graph, config);
348        let max_parallel = topology.max_parallel;
349        let config_max_parallel = config.max_parallel as usize;
350
351        let cascade_detector = if config.cascade_routing && config.topology_selection {
352            Some(CascadeDetector::new(CascadeConfig {
353                failure_threshold: config.cascade_failure_threshold,
354            }))
355        } else {
356            None
357        };
358
359        Ok(Self {
360            graph,
361            max_parallel,
362            config_max_parallel,
363            running,
364            event_rx,
365            event_tx,
366            task_timeout,
367            router,
368            available_agents,
369            dependency_context_budget: config.dependency_context_budget,
370            buffered_events: VecDeque::new(),
371            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
372            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
373            consecutive_spawn_failures: 0,
374            topology,
375            topology_dirty: false,
376            current_level: 0,
377            verify_completeness: config.verify_completeness,
378            verify_provider: config.verify_provider.as_str().trim().to_owned(),
379            task_replan_counts: HashMap::new(),
380            global_replan_count: 0,
381            max_replans: config.max_replans,
382            completeness_threshold_value: config.completeness_threshold,
383            cascade_detector,
384            tree_optimized_dispatch: config.tree_optimized_dispatch,
385            cascade_routing: config.cascade_routing && config.topology_selection,
386        })
387    }
388
389    /// Validate that `verify_provider` references a known provider name.
390    ///
391    /// Call this after construction when `verify_completeness = true` to catch
392    /// misconfiguration early rather than failing open at runtime.
393    ///
394    /// - Empty `verify_provider` is always valid (falls back to the primary provider).
395    /// - If `provider_names` is empty, validation is skipped (provider set is unknown).
396    /// - Provider names are compared case-sensitively (matching the existing resolution convention).
397    ///
398    /// # Errors
399    ///
400    /// Returns `OrchestrationError::InvalidConfig` when `verify_completeness = true`,
401    /// `verify_provider` is non-empty, and the name is not present in `provider_names`.
402    pub fn validate_verify_config(
403        &self,
404        provider_names: &[&str],
405    ) -> Result<(), OrchestrationError> {
406        if !self.verify_completeness {
407            return Ok(());
408        }
409        let name = self.verify_provider.as_str();
410        if name.is_empty() || provider_names.is_empty() {
411            return Ok(());
412        }
413        if !provider_names.contains(&name) {
414            return Err(OrchestrationError::InvalidConfig(format!(
415                "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
416                name,
417                provider_names.join(", ")
418            )));
419        }
420        Ok(())
421    }
422
423    /// Get a clone of the event sender for injection into sub-agent loops.
424    #[must_use]
425    pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
426        self.event_tx.clone()
427    }
428
429    /// Immutable reference to the current graph state.
430    #[must_use]
431    pub fn graph(&self) -> &TaskGraph {
432        &self.graph
433    }
434
435    /// Return the final graph state.
436    ///
437    /// Clones the graph since `Drop` is implemented on the scheduler.
438    #[must_use]
439    pub fn into_graph(&self) -> TaskGraph {
440        self.graph.clone()
441    }
442
443    /// Current topology analysis.
444    #[must_use]
445    pub fn topology(&self) -> &TopologyAnalysis {
446        &self.topology
447    }
448
449    /// Minimum completeness score threshold from config.
450    ///
451    /// Used by the agent loop to gate whole-plan replan: replan is triggered when
452    /// `VerificationResult::confidence < completeness_threshold` AND gaps exist.
453    #[must_use]
454    pub fn completeness_threshold(&self) -> f32 {
455        self.completeness_threshold_value
456    }
457
458    /// Provider name for verification LLM calls (empty = use primary provider).
459    #[must_use]
460    pub fn verify_provider_name(&self) -> &str {
461        &self.verify_provider
462    }
463
464    /// Remaining whole-plan replan budget: `max_replans - global_replan_count`.
465    ///
466    /// Returns 0 when the global cap has been reached.
467    #[must_use]
468    pub fn max_replans_remaining(&self) -> u32 {
469        self.max_replans.saturating_sub(self.global_replan_count)
470    }
471
472    /// Increment `global_replan_count` to record a whole-plan replan cycle.
473    ///
474    /// Called by the agent loop after executing a partial DAG from whole-plan gaps.
475    /// Does NOT inject tasks into the original graph (the partial DAG is a separate run).
476    pub fn record_whole_plan_replan(&mut self) {
477        self.global_replan_count = self.global_replan_count.saturating_add(1);
478    }
479
480    /// Inject new tasks into the graph after a verify-replan cycle.
481    ///
482    /// Appends tasks and validates DAG acyclicity. Sets `topology_dirty=true` so
483    /// topology is re-analyzed at the start of the next `tick()`. Does NOT
484    /// re-analyze topology here (critic C2 — topology computed during injection
485    /// would be stale by the next tick).
486    ///
487    /// Per-task replan cap: each task is limited to 1 replan (critic S2).
488    /// Global hard cap: total replan count across the run is limited to `max_replans`.
489    ///
490    /// # Errors
491    ///
492    /// Returns `OrchestrationError::VerificationFailed` if the graph would exceed
493    /// `max_tasks` or injection introduces a cycle.
494    pub fn inject_tasks(
495        &mut self,
496        verified_task_id: TaskId,
497        new_tasks: Vec<TaskNode>,
498        max_tasks: usize,
499    ) -> Result<(), OrchestrationError> {
500        if new_tasks.is_empty() {
501            return Ok(());
502        }
503
504        // Per-task replan limit: 1 replan per task (critic S2).
505        let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
506        if *task_replan_count >= 1 {
507            tracing::warn!(
508                task_id = %verified_task_id,
509                "per-task replan limit (1) reached, skipping replan injection"
510            );
511            return Ok(());
512        }
513
514        // Global hard cap (critic S2).
515        if self.global_replan_count >= self.max_replans {
516            tracing::warn!(
517                global_replan_count = self.global_replan_count,
518                max_replans = self.max_replans,
519                "global replan limit reached, skipping replan injection"
520            );
521            return Ok(());
522        }
523
524        verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
525
526        *task_replan_count += 1;
527        self.global_replan_count += 1;
528
529        // Signal that topology needs re-analysis on the next tick (critic C2).
530        self.topology_dirty = true;
531
532        // Reset cascade failure counts — the graph has fundamentally changed (C13 fix).
533        if let Some(ref mut det) = self.cascade_detector {
534            det.reset();
535        }
536
537        Ok(())
538    }
539}
540
541impl Drop for DagScheduler {
542    fn drop(&mut self) {
543        if !self.running.is_empty() {
544            tracing::warn!(
545                running_tasks = self.running.len(),
546                "DagScheduler dropped with running tasks; agents may continue until their \
547                 CancellationToken fires or they complete naturally"
548            );
549        }
550    }
551}
552
553impl DagScheduler {
554    /// Process pending events and produce actions for the caller.
555    ///
556    /// Call `wait_event` after processing all actions to block until the next event.
557    #[allow(clippy::too_many_lines)]
558    pub fn tick(&mut self) -> Vec<SchedulerAction> {
559        if self.graph.status != GraphStatus::Running {
560            return vec![SchedulerAction::Done {
561                status: self.graph.status,
562            }];
563        }
564
565        self.reanalyze_topology_if_dirty();
566
567        let mut actions = Vec::new();
568
569        // Drain events buffered by wait_event, then any new ones in the channel.
570        while let Some(event) = self.buffered_events.pop_front() {
571            let cancel_actions = self.process_event(event);
572            actions.extend(cancel_actions);
573        }
574        while let Ok(event) = self.event_rx.try_recv() {
575            let cancel_actions = self.process_event(event);
576            actions.extend(cancel_actions);
577        }
578
579        if self.graph.status != GraphStatus::Running {
580            return actions;
581        }
582
583        // Check for timed-out tasks.
584        let timeout_actions = self.check_timeouts();
585        actions.extend(timeout_actions);
586
587        if self.graph.status != GraphStatus::Running {
588            return actions;
589        }
590
591        // Dispatch ready tasks up to max_parallel slots. Concurrency is pre-enforced here
592        // (topology-aware cap) and also enforced by SubAgentManager::spawn() returning
593        // ConcurrencyLimit when active + reserved >= max_concurrent.
594        // Non-transient spawn failures are handled by record_spawn_failure(); optimistic
595        // Running marks are reverted to Ready for ConcurrencyLimit errors.
596        let raw_ready = dag::ready_tasks(&self.graph);
597
598        // CascadeAware: partition ready tasks into preferred (healthy region) and deferred
599        // (cascading region). Deferred tasks still run when no preferred tasks remain.
600        // Skip for Sequential tasks — they must not be reordered relative to each other (C14).
601        let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::CascadeAware {
602            if let Some(ref detector) = self.cascade_detector {
603                let deprioritized = detector.deprioritized_tasks(&self.graph);
604                if deprioritized.is_empty() {
605                    raw_ready
606                } else {
607                    let (preferred, deferred): (Vec<_>, Vec<_>) =
608                        raw_ready.into_iter().partition(|id| {
609                            let is_sequential = self.graph.tasks[id.index()].execution_mode
610                                == ExecutionMode::Sequential;
611                            // Sequential tasks are never reordered.
612                            is_sequential || !deprioritized.contains(id)
613                        });
614                    preferred.into_iter().chain(deferred).collect()
615                }
616            } else {
617                raw_ready
618            }
619        } else {
620            raw_ready
621        };
622
623        // TreeOptimized: sort ready tasks by critical-path distance descending
624        // (tasks deepest in the DAG go first — shortest distance to sinks).
625        // Skip for Sequential tasks to preserve their ordering invariant (C14).
626        let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::TreeOptimized {
627            let max_depth = self.topology.depth;
628            let mut sortable = ready;
629            sortable.sort_by_key(|id| {
630                let task_depth = self.topology.depths.get(id).copied().unwrap_or(0);
631                // Deeper tasks have smaller key → dispatched first.
632                max_depth.saturating_sub(task_depth)
633            });
634            sortable
635        } else {
636            ready
637        };
638
639        self.advance_level_barrier_if_needed();
640
641        // Available dispatch slots for this tick.
642        let mut slots = self.max_parallel.saturating_sub(self.running.len());
643
644        // For sequential dispatch: track whether we already scheduled one sequential task
645        // this tick AND whether any sequential task is currently running.
646        let mut sequential_spawned_this_tick = false;
647        let has_running_sequential = self
648            .running
649            .keys()
650            .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
651
652        for task_id in ready {
653            if slots == 0 {
654                break;
655            }
656
657            // LevelBarrier: only dispatch tasks at the current level.
658            if self.topology.strategy == DispatchStrategy::LevelBarrier {
659                let task_depth = self
660                    .topology
661                    .depths
662                    .get(&task_id)
663                    .copied()
664                    .unwrap_or(usize::MAX);
665                if task_depth != self.current_level {
666                    continue;
667                }
668            }
669
670            let task = &self.graph.tasks[task_id.index()];
671
672            // Sequential tasks: only one may run at a time within the scheduler.
673            // Independent sequential tasks in separate DAG branches are still
674            // serialized here (they share exclusive-resource intent by annotation).
675            if task.execution_mode == ExecutionMode::Sequential {
676                if sequential_spawned_this_tick || has_running_sequential {
677                    continue;
678                }
679                sequential_spawned_this_tick = true;
680            }
681
682            let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
683                tracing::debug!(
684                    task_id = %task_id,
685                    title = %task.title,
686                    "no agent available, routing task to main agent inline"
687                );
688                let prompt = self.build_task_prompt(task);
689                self.graph.tasks[task_id.index()].status = TaskStatus::Running;
690                actions.push(SchedulerAction::RunInline { task_id, prompt });
691                slots -= 1;
692                continue;
693            };
694
695            let prompt = self.build_task_prompt(task);
696
697            // Mark task as Running optimistically (before record_spawn is called).
698            self.graph.tasks[task_id.index()].status = TaskStatus::Running;
699
700            actions.push(SchedulerAction::Spawn {
701                task_id,
702                agent_def_name,
703                prompt,
704            });
705            slots -= 1;
706        }
707
708        actions.extend(self.check_graph_completion());
709
710        actions
711    }
712
713    fn reanalyze_topology_if_dirty(&mut self) {
714        if !self.topology_dirty {
715            return;
716        }
717        let new_analysis = {
718            let n = self.graph.tasks.len();
719            if n == 0 {
720                TopologyAnalysis {
721                    topology: Topology::AllParallel,
722                    strategy: DispatchStrategy::FullParallel,
723                    max_parallel: self.config_max_parallel,
724                    depth: 0,
725                    depths: std::collections::HashMap::new(),
726                }
727            } else {
728                let (depth, depths) = super::topology::compute_depths_for_scheduler(&self.graph);
729                let topo = TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
730                let strategy_config = zeph_config::OrchestrationConfig {
731                    cascade_routing: self.cascade_routing,
732                    tree_optimized_dispatch: self.tree_optimized_dispatch,
733                    ..zeph_config::OrchestrationConfig::default()
734                };
735                let strategy = TopologyClassifier::strategy(topo, &strategy_config);
736                let max_parallel =
737                    TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
738                TopologyAnalysis {
739                    topology: topo,
740                    strategy,
741                    max_parallel,
742                    depth,
743                    depths,
744                }
745            }
746        };
747        self.topology = new_analysis;
748        self.max_parallel = self.topology.max_parallel;
749        self.topology_dirty = false;
750        if self.topology.strategy == DispatchStrategy::LevelBarrier {
751            let min_active = self
752                .graph
753                .tasks
754                .iter()
755                .filter(|t| !t.status.is_terminal())
756                .filter_map(|t| self.topology.depths.get(&t.id).copied())
757                .min();
758            if let Some(min_depth) = min_active {
759                self.current_level = self.current_level.min(min_depth);
760            }
761        }
762    }
763
764    fn advance_level_barrier_if_needed(&mut self) {
765        if self.topology.strategy != DispatchStrategy::LevelBarrier {
766            return;
767        }
768        let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
769            let task_depth = self
770                .topology
771                .depths
772                .get(&t.id)
773                .copied()
774                .unwrap_or(usize::MAX);
775            task_depth != self.current_level || t.status.is_terminal()
776        });
777        if all_current_level_terminal {
778            let max_depth = self.topology.depth;
779            while self.current_level <= max_depth {
780                let has_non_terminal = self.graph.tasks.iter().any(|t| {
781                    let d = self
782                        .topology
783                        .depths
784                        .get(&t.id)
785                        .copied()
786                        .unwrap_or(usize::MAX);
787                    d == self.current_level && !t.status.is_terminal()
788                });
789                if has_non_terminal {
790                    break;
791                }
792                self.current_level += 1;
793            }
794        }
795    }
796
797    fn check_graph_completion(&mut self) -> Vec<SchedulerAction> {
798        let running_in_graph_now = self
799            .graph
800            .tasks
801            .iter()
802            .filter(|t| t.status == TaskStatus::Running)
803            .count();
804        if running_in_graph_now != 0 || !self.running.is_empty() {
805            return vec![];
806        }
807        let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
808        if all_terminal {
809            self.graph.status = GraphStatus::Completed;
810            self.graph.finished_at = Some(super::graph::chrono_now());
811            return vec![SchedulerAction::Done {
812                status: GraphStatus::Completed,
813            }];
814        }
815        if dag::ready_tasks(&self.graph).is_empty() {
816            tracing::error!(
817                "scheduler deadlock: no running or ready tasks, but graph not complete"
818            );
819            self.graph.status = GraphStatus::Failed;
820            self.graph.finished_at = Some(super::graph::chrono_now());
821            debug_assert!(
822                self.running.is_empty(),
823                "deadlock branch reached with non-empty running map"
824            );
825            for task in &mut self.graph.tasks {
826                if !task.status.is_terminal() {
827                    task.status = TaskStatus::Canceled;
828                }
829            }
830            return vec![SchedulerAction::Done {
831                status: GraphStatus::Failed,
832            }];
833        }
834        vec![]
835    }
836
837    /// Wait for the next event from a running sub-agent.
838    ///
839    /// Buffers the received event for processing in the next `tick` call.
840    /// Returns immediately if no tasks are running. Uses a timeout so that
841    /// periodic timeout checking can occur.
842    /// Compute the current deferral backoff with exponential growth capped at 5 seconds.
843    ///
844    /// Each consecutive spawn failure due to concurrency limits doubles the base backoff.
845    fn current_deferral_backoff(&self) -> Duration {
846        const MAX_BACKOFF: Duration = Duration::from_secs(5);
847        let multiplier = 1u32
848            .checked_shl(self.consecutive_spawn_failures.min(10))
849            .unwrap_or(u32::MAX);
850        self.deferral_backoff
851            .saturating_mul(multiplier)
852            .min(MAX_BACKOFF)
853    }
854
855    pub async fn wait_event(&mut self) {
856        if self.running.is_empty() {
857            tokio::time::sleep(self.current_deferral_backoff()).await;
858            return;
859        }
860
861        // Find the nearest timeout deadline among running tasks.
862        let nearest_timeout = self
863            .running
864            .values()
865            .map(|r| {
866                self.task_timeout
867                    .checked_sub(r.started_at.elapsed())
868                    .unwrap_or(Duration::ZERO)
869            })
870            .min()
871            .unwrap_or(Duration::from_secs(1));
872
873        // Clamp to at least 100 ms to avoid busy-looping.
874        let wait_duration = nearest_timeout.max(Duration::from_millis(100));
875
876        tokio::select! {
877            Some(event) = self.event_rx.recv() => {
878                // SEC-ORCH-02: guard against unbounded buffer growth. Use total task
879                // count rather than max_parallel so that parallel bursts exceeding
880                // max_parallel do not cause premature event drops.
881                if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
882                    // PERF-SCHED-02: log at error level — a dropped completion event
883                    // leaves a task stuck in Running until its timeout fires.
884                    if let Some(dropped) = self.buffered_events.pop_front() {
885                        tracing::error!(
886                            task_id = %dropped.task_id,
887                            buffer_len = self.buffered_events.len(),
888                            "event buffer saturated; completion event dropped — task may \
889                             remain Running until timeout"
890                        );
891                    }
892                }
893                self.buffered_events.push_back(event);
894            }
895            () = tokio::time::sleep(wait_duration) => {}
896        }
897    }
898
899    /// Record that a spawn action was successfully executed.
900    ///
901    /// Called by the caller after successfully spawning via `SubAgentManager`.
902    ///
903    /// Resets `consecutive_spawn_failures` to 0 as a "spawn succeeded = scheduler healthy"
904    /// signal. This is intentionally separate from the batch-level backoff in
905    /// [`record_batch_backoff`]: `record_spawn` provides an immediate reset on the first
906    /// success within a batch, while `record_batch_backoff` governs the tick-granular
907    /// failure counter used for exponential wait backoff.
908    pub fn record_spawn(
909        &mut self,
910        task_id: TaskId,
911        agent_handle_id: String,
912        agent_def_name: String,
913    ) {
914        self.consecutive_spawn_failures = 0;
915        self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
916        self.running.insert(
917            task_id,
918            RunningTask {
919                agent_handle_id,
920                agent_def_name,
921                started_at: Instant::now(),
922            },
923        );
924    }
925
926    /// Handle a failed spawn attempt.
927    ///
928    /// If the error is a transient concurrency-limit rejection, reverts the task from
929    /// Running back to `Ready` so the next [`tick`] can retry the spawn when a slot opens.
930    /// Otherwise, marks the task as `Failed` and propagates failure.
931    /// Returns any cancel actions needed.
932    ///
933    /// # Errors (via returned actions)
934    ///
935    /// Propagates failure per the task's effective `FailureStrategy`.
936    pub fn record_spawn_failure(
937        &mut self,
938        task_id: TaskId,
939        error: &SubAgentError,
940    ) -> Vec<SchedulerAction> {
941        // Transient condition: the SubAgentManager rejected the spawn because all
942        // concurrency slots are occupied. Revert to Ready so the next tick retries.
943        // consecutive_spawn_failures is updated batch-wide by record_batch_backoff().
944        if let SubAgentError::ConcurrencyLimit { active, max } = error {
945            tracing::warn!(
946                task_id = %task_id,
947                active,
948                max,
949                next_backoff_ms = self.current_deferral_backoff().as_millis(),
950                "concurrency limit reached, deferring task to next tick"
951            );
952            self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
953            return Vec::new();
954        }
955
956        // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
957        let error_excerpt: String = error.to_string().chars().take(512).collect();
958        tracing::warn!(
959            task_id = %task_id,
960            error = %error_excerpt,
961            "spawn failed, marking task failed"
962        );
963        self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
964        let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
965        let mut actions = Vec::new();
966        for cancel_task_id in cancel_ids {
967            if let Some(running) = self.running.remove(&cancel_task_id) {
968                actions.push(SchedulerAction::Cancel {
969                    agent_handle_id: running.agent_handle_id,
970                });
971            }
972        }
973        if self.graph.status != GraphStatus::Running {
974            self.graph.finished_at = Some(super::graph::chrono_now());
975            actions.push(SchedulerAction::Done {
976                status: self.graph.status,
977            });
978        }
979        actions
980    }
981
982    /// Update the batch-level backoff counter after processing a full tick's spawn batch.
983    ///
984    /// With parallel dispatch a single tick may produce N Spawn actions. Individual
985    /// per-spawn counter updates would miscount concurrent rejections as "consecutive"
986    /// failures. This method captures the batch semantics instead:
987    /// - If any spawn succeeded → reset the counter (scheduler is healthy).
988    /// - Else if any spawn hit `ConcurrencyLimit` → this entire tick was a deferral tick.
989    /// - If neither → no spawns were attempted; counter unchanged.
990    pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
991        if any_success {
992            self.consecutive_spawn_failures = 0;
993        } else if any_concurrency_failure {
994            self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
995        }
996    }
997
998    /// Cancel all running tasks (for user-initiated plan cancellation).
999    ///
1000    /// # Warning: Cooperative Cancellation
1001    ///
1002    /// Cancellation is cooperative and asynchronous. Tool operations (file writes, shell
1003    /// executions) in progress at the time of cancellation complete before the agent loop
1004    /// checks the cancellation token. Callers should inspect the task graph state and clean
1005    /// up partially-written artifacts manually.
1006    pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
1007        self.graph.status = GraphStatus::Canceled;
1008        self.graph.finished_at = Some(super::graph::chrono_now());
1009
1010        // Drain running map first to avoid split borrow issues (M3).
1011        let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
1012        let mut actions: Vec<SchedulerAction> = running
1013            .into_iter()
1014            .map(|(task_id, r)| {
1015                self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
1016                SchedulerAction::Cancel {
1017                    agent_handle_id: r.agent_handle_id,
1018                }
1019            })
1020            .collect();
1021
1022        for task in &mut self.graph.tasks {
1023            if !task.status.is_terminal() {
1024                task.status = TaskStatus::Canceled;
1025            }
1026        }
1027
1028        actions.push(SchedulerAction::Done {
1029            status: GraphStatus::Canceled,
1030        });
1031        actions
1032    }
1033}
1034
1035impl DagScheduler {
1036    /// Process a single `TaskEvent` and return any cancel actions needed.
1037    fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
1038        let TaskEvent {
1039            task_id,
1040            agent_handle_id,
1041            outcome,
1042        } = event;
1043
1044        // Guard against stale events from previous incarnations (e.g. after timeout+retry).
1045        // A timed-out agent's event_tx outlives the timeout and may send a completion later.
1046        match self.running.get(&task_id) {
1047            Some(running) if running.agent_handle_id != agent_handle_id => {
1048                tracing::warn!(
1049                    task_id = %task_id,
1050                    expected = %running.agent_handle_id,
1051                    got = %agent_handle_id,
1052                    "discarding stale event from previous agent incarnation"
1053                );
1054                return Vec::new();
1055            }
1056            None => {
1057                tracing::debug!(
1058                    task_id = %task_id,
1059                    agent_handle_id = %agent_handle_id,
1060                    "ignoring event for task not in running map"
1061                );
1062                return Vec::new();
1063            }
1064            Some(_) => {}
1065        }
1066
1067        // Compute duration BEFORE removing from running map (C1 fix).
1068        let duration_ms = self.running.get(&task_id).map_or(0, |r| {
1069            u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
1070        });
1071        let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
1072
1073        self.running.remove(&task_id);
1074
1075        match outcome {
1076            TaskOutcome::Completed { output, artifacts } => {
1077                self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
1078                self.graph.tasks[task_id.index()].result = Some(TaskResult {
1079                    output: output.clone(),
1080                    artifacts,
1081                    duration_ms,
1082                    agent_id: Some(agent_handle_id),
1083                    agent_def: agent_def_name,
1084                });
1085
1086                // Record success in cascade detector.
1087                if let Some(ref mut detector) = self.cascade_detector {
1088                    detector.record_outcome(task_id, true, &self.graph);
1089                }
1090
1091                // Mark newly unblocked tasks as Ready.
1092                // Downstream tasks are unblocked immediately — verification does not gate dispatch.
1093                let newly_ready = dag::ready_tasks(&self.graph);
1094                for ready_id in newly_ready {
1095                    if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
1096                        self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
1097                    }
1098                }
1099
1100                // Emit Verify action when verify_completeness is enabled.
1101                // The replan budget is enforced inside inject_tasks() — the observation
1102                // (emitting Verify) must not be gated on the mutation budget, or tasks
1103                // after budget exhaustion never receive verification at all.
1104                // max_replans=0 still emits Verify; gaps are logged only (no inject_tasks call).
1105                if self.verify_completeness {
1106                    vec![SchedulerAction::Verify { task_id, output }]
1107                } else {
1108                    Vec::new()
1109                }
1110            }
1111
1112            TaskOutcome::Failed { error } => {
1113                // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
1114                let error_excerpt: String = error.chars().take(512).collect();
1115                tracing::warn!(
1116                    task_id = %task_id,
1117                    error = %error_excerpt,
1118                    "task failed"
1119                );
1120                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1121
1122                // Record failure in cascade detector.
1123                if let Some(ref mut detector) = self.cascade_detector {
1124                    detector.record_outcome(task_id, false, &self.graph);
1125                }
1126
1127                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1128                let mut actions = Vec::new();
1129
1130                for cancel_task_id in cancel_ids {
1131                    if let Some(running) = self.running.remove(&cancel_task_id) {
1132                        actions.push(SchedulerAction::Cancel {
1133                            agent_handle_id: running.agent_handle_id,
1134                        });
1135                    }
1136                }
1137
1138                if self.graph.status != GraphStatus::Running {
1139                    self.graph.finished_at = Some(super::graph::chrono_now());
1140                    actions.push(SchedulerAction::Done {
1141                        status: self.graph.status,
1142                    });
1143                }
1144
1145                actions
1146            }
1147        }
1148    }
1149
1150    /// Check all running tasks for timeout violations.
1151    ///
1152    /// # Warning: Cooperative Cancellation
1153    ///
1154    /// Cancel actions emitted here signal agents cooperatively. Tool operations in progress
1155    /// at the time of cancellation complete before the agent loop checks the cancellation
1156    /// token. Partially-written artifacts may remain on disk after cancellation.
1157    fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1158        let timed_out: Vec<(TaskId, String)> = self
1159            .running
1160            .iter()
1161            .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1162            .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1163            .collect();
1164
1165        let mut actions = Vec::new();
1166        for (task_id, agent_handle_id) in timed_out {
1167            tracing::warn!(
1168                task_id = %task_id,
1169                timeout_secs = self.task_timeout.as_secs(),
1170                "task timed out"
1171            );
1172            self.running.remove(&task_id);
1173            self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1174
1175            actions.push(SchedulerAction::Cancel { agent_handle_id });
1176
1177            let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1178            for cancel_task_id in cancel_ids {
1179                if let Some(running) = self.running.remove(&cancel_task_id) {
1180                    actions.push(SchedulerAction::Cancel {
1181                        agent_handle_id: running.agent_handle_id,
1182                    });
1183                }
1184            }
1185
1186            if self.graph.status != GraphStatus::Running {
1187                self.graph.finished_at = Some(super::graph::chrono_now());
1188                actions.push(SchedulerAction::Done {
1189                    status: self.graph.status,
1190                });
1191                break;
1192            }
1193        }
1194
1195        actions
1196    }
1197
1198    /// Build the task prompt with dependency context injection (Section 14).
1199    ///
1200    /// Uses char-boundary-safe truncation (S1 fix) to avoid panics on multi-byte UTF-8.
1201    /// Dependency output is sanitized (SEC-ORCH-01) and titles are XML-escaped to prevent
1202    /// prompt injection via crafted task outputs.
1203    fn build_task_prompt(&self, task: &TaskNode) -> String {
1204        if task.depends_on.is_empty() {
1205            return task.description.clone();
1206        }
1207
1208        let completed_deps: Vec<&TaskNode> = task
1209            .depends_on
1210            .iter()
1211            .filter_map(|dep_id| {
1212                let dep = &self.graph.tasks[dep_id.index()];
1213                if dep.status == TaskStatus::Completed {
1214                    Some(dep)
1215                } else {
1216                    None
1217                }
1218            })
1219            .collect();
1220
1221        if completed_deps.is_empty() {
1222            return task.description.clone();
1223        }
1224
1225        let budget_per_dep = self
1226            .dependency_context_budget
1227            .checked_div(completed_deps.len())
1228            .unwrap_or(self.dependency_context_budget);
1229
1230        let mut context_block = String::from("<completed-dependencies>\n");
1231
1232        for dep in &completed_deps {
1233            // SEC-ORCH-01: XML-escape dep.id and dep.title to prevent breaking out of the
1234            // <completed-dependencies> wrapper via crafted titles.
1235            let escaped_id = xml_escape(&dep.id.to_string());
1236            let escaped_title = xml_escape(&dep.title);
1237            let _ = writeln!(
1238                context_block,
1239                "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1240            );
1241
1242            if let Some(ref result) = dep.result {
1243                // SEC-ORCH-01: sanitize dep output to prevent prompt injection from upstream tasks.
1244                let source = ContentSource::new(ContentSourceKind::A2aMessage);
1245                let sanitized = self.sanitizer.sanitize(&result.output, source);
1246                let safe_output = sanitized.body;
1247
1248                // Char-boundary-safe truncation (S1): use chars().take() instead of byte slicing.
1249                let char_count = safe_output.chars().count();
1250                if char_count > budget_per_dep {
1251                    let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1252                    let _ = write!(
1253                        context_block,
1254                        "{truncated}...\n[truncated: {char_count} chars total]"
1255                    );
1256                } else {
1257                    context_block.push_str(&safe_output);
1258                }
1259            } else {
1260                context_block.push_str("[no output recorded]\n");
1261            }
1262            context_block.push('\n');
1263        }
1264
1265        // Add notes for skipped deps.
1266        for dep_id in &task.depends_on {
1267            let dep = &self.graph.tasks[dep_id.index()];
1268            if dep.status == TaskStatus::Skipped {
1269                let escaped_id = xml_escape(&dep.id.to_string());
1270                let escaped_title = xml_escape(&dep.title);
1271                let _ = writeln!(
1272                    context_block,
1273                    "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1274                );
1275            }
1276        }
1277
1278        context_block.push_str("</completed-dependencies>\n\n");
1279        format!("{context_block}Your task: {}", task.description)
1280    }
1281}
1282
1283/// Escape XML special characters in a string to prevent tag injection.
1284fn xml_escape(s: &str) -> String {
1285    let mut out = String::with_capacity(s.len());
1286    for c in s.chars() {
1287        match c {
1288            '<' => out.push_str("&lt;"),
1289            '>' => out.push_str("&gt;"),
1290            '&' => out.push_str("&amp;"),
1291            '"' => out.push_str("&quot;"),
1292            '\'' => out.push_str("&#39;"),
1293            other => out.push(other),
1294        }
1295    }
1296    out
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    #![allow(clippy::default_trait_access)]
1302
1303    use super::*;
1304    use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1305
1306    fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1307        let mut n = TaskNode::new(
1308            id,
1309            format!("task-{id}"),
1310            format!("description for task {id}"),
1311        );
1312        n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1313        n
1314    }
1315
1316    fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1317        let mut g = TaskGraph::new("test goal");
1318        g.tasks = nodes;
1319        g
1320    }
1321
1322    fn make_def(name: &str) -> SubAgentDef {
1323        use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1324        SubAgentDef {
1325            name: name.to_string(),
1326            description: format!("{name} agent"),
1327            model: None,
1328            tools: ToolPolicy::InheritAll,
1329            disallowed_tools: vec![],
1330            permissions: SubAgentPermissions::default(),
1331            skills: SkillFilter::default(),
1332            system_prompt: String::new(),
1333            hooks: SubagentHooks::default(),
1334            memory: None,
1335            source: None,
1336            file_path: None,
1337        }
1338    }
1339
1340    fn make_config() -> zeph_config::OrchestrationConfig {
1341        zeph_config::OrchestrationConfig {
1342            enabled: true,
1343            max_tasks: 20,
1344            max_parallel: 4,
1345            default_failure_strategy: "abort".to_string(),
1346            default_max_retries: 3,
1347            task_timeout_secs: 300,
1348            planner_provider: Default::default(),
1349            planner_max_tokens: 4096,
1350            dependency_context_budget: 16384,
1351            confirm_before_execute: true,
1352            aggregator_max_tokens: 4096,
1353            deferral_backoff_ms: 250,
1354            plan_cache: zeph_config::PlanCacheConfig::default(),
1355            topology_selection: false,
1356            verify_provider: Default::default(),
1357            verify_max_tokens: 1024,
1358            max_replans: 2,
1359            verify_completeness: false,
1360            completeness_threshold: 0.7,
1361            tool_provider: Default::default(),
1362            cascade_routing: false,
1363            cascade_failure_threshold: 0.5,
1364            tree_optimized_dispatch: false,
1365        }
1366    }
1367
1368    struct FirstRouter;
1369    impl AgentRouter for FirstRouter {
1370        fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1371            available.first().map(|d| d.name.clone())
1372        }
1373    }
1374
1375    struct NoneRouter;
1376    impl AgentRouter for NoneRouter {
1377        fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1378            None
1379        }
1380    }
1381
1382    fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1383        let config = make_config();
1384        let defs = vec![make_def("worker")];
1385        DagScheduler::new(graph, &config, router, defs).unwrap()
1386    }
1387
1388    fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1389        let config = make_config();
1390        let defs = vec![make_def("worker")];
1391        DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1392    }
1393
1394    // --- constructor tests ---
1395
1396    #[test]
1397    fn test_new_validates_graph_status() {
1398        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1399        graph.status = GraphStatus::Running; // wrong status
1400        let config = make_config();
1401        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1402        assert!(result.is_err());
1403        let err = result.unwrap_err();
1404        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1405    }
1406
1407    #[test]
1408    fn test_new_marks_roots_ready() {
1409        let graph = graph_from_nodes(vec![
1410            make_node(0, &[]),
1411            make_node(1, &[]),
1412            make_node(2, &[0, 1]),
1413        ]);
1414        let scheduler = make_scheduler(graph);
1415        assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1416        assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1417        assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1418        assert_eq!(scheduler.graph().status, GraphStatus::Running);
1419    }
1420
1421    #[test]
1422    fn test_new_validates_empty_graph() {
1423        let graph = graph_from_nodes(vec![]);
1424        let config = make_config();
1425        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1426        assert!(result.is_err());
1427    }
1428
1429    // --- tick tests ---
1430
1431    #[test]
1432    fn test_tick_produces_spawn_for_ready() {
1433        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1434        let mut scheduler = make_scheduler(graph);
1435        let actions = scheduler.tick();
1436        let spawns: Vec<_> = actions
1437            .iter()
1438            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1439            .collect();
1440        assert_eq!(spawns.len(), 2);
1441    }
1442
1443    #[test]
1444    fn test_tick_dispatches_all_regardless_of_max_parallel() {
1445        // tick() enforces max_parallel as a pre-dispatch cap.
1446        // With 5 independent tasks and max_parallel=2, only 2 are dispatched per tick.
1447        let graph = graph_from_nodes(vec![
1448            make_node(0, &[]),
1449            make_node(1, &[]),
1450            make_node(2, &[]),
1451            make_node(3, &[]),
1452            make_node(4, &[]),
1453        ]);
1454        let mut config = make_config();
1455        config.max_parallel = 2;
1456        let defs = vec![make_def("worker")];
1457        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1458        let actions = scheduler.tick();
1459        let spawn_count = actions
1460            .iter()
1461            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1462            .count();
1463        assert_eq!(
1464            spawn_count, 2,
1465            "max_parallel=2 caps dispatched tasks per tick"
1466        );
1467    }
1468
1469    #[test]
1470    fn test_tick_detects_completion() {
1471        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1472        graph.tasks[0].status = TaskStatus::Completed;
1473        let config = make_config();
1474        let defs = vec![make_def("worker")];
1475        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1476        // Manually set graph to Running since new() validated Created status
1477        // — but all tasks are terminal. tick() should detect completion.
1478        let actions = scheduler.tick();
1479        let has_done = actions.iter().any(|a| {
1480            matches!(
1481                a,
1482                SchedulerAction::Done {
1483                    status: GraphStatus::Completed
1484                }
1485            )
1486        });
1487        assert!(
1488            has_done,
1489            "should emit Done(Completed) when all tasks are terminal"
1490        );
1491    }
1492
1493    // --- completion event tests ---
1494
1495    #[test]
1496    fn test_completion_event_marks_deps_ready() {
1497        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1498        let mut scheduler = make_scheduler(graph);
1499
1500        // Simulate task 0 running.
1501        scheduler.graph.tasks[0].status = TaskStatus::Running;
1502        scheduler.running.insert(
1503            TaskId(0),
1504            RunningTask {
1505                agent_handle_id: "handle-0".to_string(),
1506                agent_def_name: "worker".to_string(),
1507                started_at: Instant::now(),
1508            },
1509        );
1510
1511        let event = TaskEvent {
1512            task_id: TaskId(0),
1513            agent_handle_id: "handle-0".to_string(),
1514            outcome: TaskOutcome::Completed {
1515                output: "done".to_string(),
1516                artifacts: vec![],
1517            },
1518        };
1519        scheduler.buffered_events.push_back(event);
1520
1521        let actions = scheduler.tick();
1522        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1523        // Task 1 should now be Ready or Spawn action emitted.
1524        let has_spawn_1 = actions
1525            .iter()
1526            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1527        assert!(
1528            has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1529            "task 1 should be spawned or marked Ready"
1530        );
1531    }
1532
1533    #[test]
1534    fn test_failure_abort_cancels_running() {
1535        let graph = graph_from_nodes(vec![
1536            make_node(0, &[]),
1537            make_node(1, &[]),
1538            make_node(2, &[0, 1]),
1539        ]);
1540        let mut scheduler = make_scheduler(graph);
1541
1542        // Simulate tasks 0 and 1 running.
1543        scheduler.graph.tasks[0].status = TaskStatus::Running;
1544        scheduler.running.insert(
1545            TaskId(0),
1546            RunningTask {
1547                agent_handle_id: "h0".to_string(),
1548                agent_def_name: "worker".to_string(),
1549                started_at: Instant::now(),
1550            },
1551        );
1552        scheduler.graph.tasks[1].status = TaskStatus::Running;
1553        scheduler.running.insert(
1554            TaskId(1),
1555            RunningTask {
1556                agent_handle_id: "h1".to_string(),
1557                agent_def_name: "worker".to_string(),
1558                started_at: Instant::now(),
1559            },
1560        );
1561
1562        // Task 0 fails with default Abort strategy.
1563        let event = TaskEvent {
1564            task_id: TaskId(0),
1565            agent_handle_id: "h0".to_string(),
1566            outcome: TaskOutcome::Failed {
1567                error: "boom".to_string(),
1568            },
1569        };
1570        scheduler.buffered_events.push_back(event);
1571
1572        let actions = scheduler.tick();
1573        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1574        let cancel_ids: Vec<_> = actions
1575            .iter()
1576            .filter_map(|a| {
1577                if let SchedulerAction::Cancel { agent_handle_id } = a {
1578                    Some(agent_handle_id.as_str())
1579                } else {
1580                    None
1581                }
1582            })
1583            .collect();
1584        assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1585        assert!(
1586            actions
1587                .iter()
1588                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1589        );
1590    }
1591
1592    #[test]
1593    fn test_failure_skip_propagates() {
1594        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1595        let mut scheduler = make_scheduler(graph);
1596
1597        // Set failure strategy to Skip on task 0.
1598        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1599        scheduler.graph.tasks[0].status = TaskStatus::Running;
1600        scheduler.running.insert(
1601            TaskId(0),
1602            RunningTask {
1603                agent_handle_id: "h0".to_string(),
1604                agent_def_name: "worker".to_string(),
1605                started_at: Instant::now(),
1606            },
1607        );
1608
1609        let event = TaskEvent {
1610            task_id: TaskId(0),
1611            agent_handle_id: "h0".to_string(),
1612            outcome: TaskOutcome::Failed {
1613                error: "skip me".to_string(),
1614            },
1615        };
1616        scheduler.buffered_events.push_back(event);
1617        scheduler.tick();
1618
1619        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1620        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1621    }
1622
1623    #[test]
1624    fn test_failure_retry_reschedules() {
1625        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1626        let mut scheduler = make_scheduler(graph);
1627
1628        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1629        scheduler.graph.tasks[0].max_retries = Some(3);
1630        scheduler.graph.tasks[0].retry_count = 0;
1631        scheduler.graph.tasks[0].status = TaskStatus::Running;
1632        scheduler.running.insert(
1633            TaskId(0),
1634            RunningTask {
1635                agent_handle_id: "h0".to_string(),
1636                agent_def_name: "worker".to_string(),
1637                started_at: Instant::now(),
1638            },
1639        );
1640
1641        let event = TaskEvent {
1642            task_id: TaskId(0),
1643            agent_handle_id: "h0".to_string(),
1644            outcome: TaskOutcome::Failed {
1645                error: "transient".to_string(),
1646            },
1647        };
1648        scheduler.buffered_events.push_back(event);
1649        let actions = scheduler.tick();
1650
1651        // Task should be rescheduled (Ready) and a Spawn action emitted.
1652        let has_spawn = actions
1653            .iter()
1654            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1655        assert!(
1656            has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1657            "retry should produce spawn or Ready status"
1658        );
1659        // retry_count incremented
1660        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1661    }
1662
1663    #[test]
1664    fn test_process_event_failed_retry() {
1665        // End-to-end: send Failed event, verify retry path produces Ready -> Spawn.
1666        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1667        let mut scheduler = make_scheduler(graph);
1668
1669        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1670        scheduler.graph.tasks[0].max_retries = Some(2);
1671        scheduler.graph.tasks[0].retry_count = 0;
1672        scheduler.graph.tasks[0].status = TaskStatus::Running;
1673        scheduler.running.insert(
1674            TaskId(0),
1675            RunningTask {
1676                agent_handle_id: "h0".to_string(),
1677                agent_def_name: "worker".to_string(),
1678                started_at: Instant::now(),
1679            },
1680        );
1681
1682        let event = TaskEvent {
1683            task_id: TaskId(0),
1684            agent_handle_id: "h0".to_string(),
1685            outcome: TaskOutcome::Failed {
1686                error: "first failure".to_string(),
1687            },
1688        };
1689        scheduler.buffered_events.push_back(event);
1690        let actions = scheduler.tick();
1691
1692        // After retry: retry_count = 1, status = Ready or Spawn emitted.
1693        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1694        let spawned = actions
1695            .iter()
1696            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1697        assert!(
1698            spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1699            "retry should emit Spawn or set Ready"
1700        );
1701        // Graph must still be Running.
1702        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1703    }
1704
1705    #[test]
1706    fn test_timeout_cancels_stalled() {
1707        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1708        let mut config = make_config();
1709        config.task_timeout_secs = 1; // 1 second timeout
1710        let defs = vec![make_def("worker")];
1711        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1712
1713        // Simulate a running task that started just over 1 second ago.
1714        scheduler.graph.tasks[0].status = TaskStatus::Running;
1715        scheduler.running.insert(
1716            TaskId(0),
1717            RunningTask {
1718                agent_handle_id: "h0".to_string(),
1719                agent_def_name: "worker".to_string(),
1720                started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), // already timed out
1721            },
1722        );
1723
1724        let actions = scheduler.tick();
1725        let has_cancel = actions.iter().any(
1726            |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1727        );
1728        assert!(has_cancel, "timed-out task should emit Cancel action");
1729        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1730    }
1731
1732    #[test]
1733    fn test_cancel_all() {
1734        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1735        let mut scheduler = make_scheduler(graph);
1736
1737        scheduler.graph.tasks[0].status = TaskStatus::Running;
1738        scheduler.running.insert(
1739            TaskId(0),
1740            RunningTask {
1741                agent_handle_id: "h0".to_string(),
1742                agent_def_name: "worker".to_string(),
1743                started_at: Instant::now(),
1744            },
1745        );
1746        scheduler.graph.tasks[1].status = TaskStatus::Running;
1747        scheduler.running.insert(
1748            TaskId(1),
1749            RunningTask {
1750                agent_handle_id: "h1".to_string(),
1751                agent_def_name: "worker".to_string(),
1752                started_at: Instant::now(),
1753            },
1754        );
1755
1756        let actions = scheduler.cancel_all();
1757
1758        assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1759        assert!(scheduler.running.is_empty());
1760        let cancel_count = actions
1761            .iter()
1762            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1763            .count();
1764        assert_eq!(cancel_count, 2);
1765        assert!(actions.iter().any(|a| matches!(
1766            a,
1767            SchedulerAction::Done {
1768                status: GraphStatus::Canceled
1769            }
1770        )));
1771    }
1772
1773    #[test]
1774    fn test_record_spawn_failure() {
1775        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1776        let mut scheduler = make_scheduler(graph);
1777
1778        // Simulate task marked Running (by tick) but spawn failed.
1779        scheduler.graph.tasks[0].status = TaskStatus::Running;
1780
1781        let error = SubAgentError::Spawn("spawn error".to_string());
1782        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1783        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1784        // With Abort strategy and no other running tasks, graph should be Failed.
1785        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1786        assert!(
1787            actions
1788                .iter()
1789                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1790        );
1791    }
1792
1793    #[test]
1794    fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1795        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1796        let mut scheduler = make_scheduler(graph);
1797
1798        // Simulate tick() optimistically marking the task Running before spawn.
1799        scheduler.graph.tasks[0].status = TaskStatus::Running;
1800
1801        // Concurrency limit hit — transient, should not fail the task.
1802        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1803        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1804        assert_eq!(
1805            scheduler.graph.tasks[0].status,
1806            TaskStatus::Ready,
1807            "task must revert to Ready so the next tick can retry"
1808        );
1809        assert_eq!(
1810            scheduler.graph.status,
1811            GraphStatus::Running,
1812            "graph must stay Running, not transition to Failed"
1813        );
1814        assert!(
1815            actions.is_empty(),
1816            "no cancel or done actions expected for a transient deferral"
1817        );
1818    }
1819
1820    #[test]
1821    fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1822        // Both spawn() and resume() now return SubAgentError::ConcurrencyLimit — verify handling.
1823        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1824        let mut scheduler = make_scheduler(graph);
1825        scheduler.graph.tasks[0].status = TaskStatus::Running;
1826
1827        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1828        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1829        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1830        assert!(actions.is_empty());
1831    }
1832
1833    // --- #1516 edge-case tests ---
1834
1835    #[test]
1836    fn test_concurrency_deferral_does_not_affect_running_task() {
1837        // Two root tasks. Task 0 is Running (successfully spawned).
1838        // Task 1 hits a concurrency limit and reverts to Ready.
1839        // Task 0 must be unaffected.
1840        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1841        let mut scheduler = make_scheduler(graph);
1842
1843        // Simulate both tasks optimistically marked Running by tick().
1844        scheduler.graph.tasks[0].status = TaskStatus::Running;
1845        scheduler.running.insert(
1846            TaskId(0),
1847            RunningTask {
1848                agent_handle_id: "h0".to_string(),
1849                agent_def_name: "worker".to_string(),
1850                started_at: Instant::now(),
1851            },
1852        );
1853        scheduler.graph.tasks[1].status = TaskStatus::Running;
1854
1855        // Task 1 spawn fails with concurrency limit.
1856        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1857        let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1858
1859        assert_eq!(
1860            scheduler.graph.tasks[0].status,
1861            TaskStatus::Running,
1862            "task 0 must remain Running"
1863        );
1864        assert_eq!(
1865            scheduler.graph.tasks[1].status,
1866            TaskStatus::Ready,
1867            "task 1 must revert to Ready"
1868        );
1869        assert_eq!(
1870            scheduler.graph.status,
1871            GraphStatus::Running,
1872            "graph must stay Running"
1873        );
1874        assert!(actions.is_empty(), "no cancel or done actions expected");
1875    }
1876
1877    #[test]
1878    fn test_max_concurrent_zero_no_infinite_loop() {
1879        // max_parallel=0 is a degenerate config. tick() uses saturating_sub so slots=0,
1880        // and no tasks are dispatched. The graph does not deadlock because ready tasks
1881        // still exist — the caller must increase max_parallel or handle this externally.
1882        // After max_parallel is increased and a new tick fires, tasks will be dispatched.
1883        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1884        let config = zeph_config::OrchestrationConfig {
1885            max_parallel: 0,
1886            ..make_config()
1887        };
1888        let mut scheduler = DagScheduler::new(
1889            graph,
1890            &config,
1891            Box::new(FirstRouter),
1892            vec![make_def("worker")],
1893        )
1894        .unwrap();
1895
1896        let actions1 = scheduler.tick();
1897        // No Spawn: slots = max_parallel(0) - running(0) = 0.
1898        assert!(
1899            actions1
1900                .iter()
1901                .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1902            "no Spawn expected when max_parallel=0"
1903        );
1904        assert!(
1905            actions1
1906                .iter()
1907                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1908            "no Done(Failed) expected — ready tasks exist, so no deadlock"
1909        );
1910        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1911
1912        // Second tick also dispatches nothing (still max_parallel=0, ready task exists).
1913        let actions2 = scheduler.tick();
1914        assert!(
1915            actions2
1916                .iter()
1917                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1918            "second tick must not emit Done(Failed) — ready tasks still exist"
1919        );
1920        assert_eq!(
1921            scheduler.graph.status,
1922            GraphStatus::Running,
1923            "graph must remain Running"
1924        );
1925    }
1926
1927    #[test]
1928    fn test_all_tasks_deferred_graph_stays_running() {
1929        // Both root tasks are spawned optimistically, both fail with ConcurrencyLimit,
1930        // and both revert to Ready. The graph must remain Running (not Failed) and
1931        // the next tick must re-emit Spawn actions for the deferred tasks.
1932        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1933        let mut scheduler = make_scheduler(graph);
1934
1935        // First tick emits Spawn for both tasks and marks them Running.
1936        let actions = scheduler.tick();
1937        assert_eq!(
1938            actions
1939                .iter()
1940                .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1941                .count(),
1942            2,
1943            "expected 2 Spawn actions on first tick"
1944        );
1945        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1946        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1947
1948        // Both spawns fail — both revert to Ready.
1949        let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1950        let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1951        let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1952        assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1953        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1954        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1955        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1956
1957        // Second tick must retry both deferred tasks.
1958        let retry_actions = scheduler.tick();
1959        let spawn_count = retry_actions
1960            .iter()
1961            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1962            .count();
1963        assert!(
1964            spawn_count > 0,
1965            "second tick must re-emit Spawn for deferred tasks"
1966        );
1967        assert!(
1968            retry_actions.iter().all(|a| !matches!(
1969                a,
1970                SchedulerAction::Done {
1971                    status: GraphStatus::Failed,
1972                    ..
1973                }
1974            )),
1975            "no Done(Failed) expected"
1976        );
1977    }
1978
1979    #[test]
1980    fn test_build_prompt_no_deps() {
1981        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1982        let scheduler = make_scheduler(graph);
1983        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1984        assert_eq!(prompt, "description for task 0");
1985    }
1986
1987    #[test]
1988    fn test_build_prompt_with_deps_and_truncation() {
1989        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1990        graph.tasks[0].status = TaskStatus::Completed;
1991        // Create output longer than budget
1992        graph.tasks[0].result = Some(TaskResult {
1993            output: "x".repeat(200),
1994            artifacts: vec![],
1995            duration_ms: 10,
1996            agent_id: None,
1997            agent_def: None,
1998        });
1999
2000        let config = zeph_config::OrchestrationConfig {
2001            dependency_context_budget: 50,
2002            ..make_config()
2003        };
2004        let scheduler = DagScheduler::new(
2005            graph,
2006            &config,
2007            Box::new(FirstRouter),
2008            vec![make_def("worker")],
2009        )
2010        .unwrap();
2011
2012        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2013        assert!(prompt.contains("<completed-dependencies>"));
2014        assert!(prompt.contains("[truncated:"));
2015        assert!(prompt.contains("Your task:"));
2016    }
2017
2018    #[test]
2019    fn test_duration_ms_computed_correctly() {
2020        // Regression test for C1: duration_ms must be non-zero after completion.
2021        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2022        let mut scheduler = make_scheduler(graph);
2023
2024        scheduler.graph.tasks[0].status = TaskStatus::Running;
2025        scheduler.running.insert(
2026            TaskId(0),
2027            RunningTask {
2028                agent_handle_id: "h0".to_string(),
2029                agent_def_name: "worker".to_string(),
2030                started_at: Instant::now()
2031                    .checked_sub(Duration::from_millis(50))
2032                    .unwrap(),
2033            },
2034        );
2035
2036        let event = TaskEvent {
2037            task_id: TaskId(0),
2038            agent_handle_id: "h0".to_string(),
2039            outcome: TaskOutcome::Completed {
2040                output: "result".to_string(),
2041                artifacts: vec![],
2042            },
2043        };
2044        scheduler.buffered_events.push_back(event);
2045        scheduler.tick();
2046
2047        let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
2048        assert!(
2049            result.duration_ms > 0,
2050            "duration_ms should be > 0, got {}",
2051            result.duration_ms
2052        );
2053    }
2054
2055    #[test]
2056    fn test_utf8_safe_truncation() {
2057        // S1 regression: truncation must not panic on multi-byte UTF-8.
2058        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2059        graph.tasks[0].status = TaskStatus::Completed;
2060        // Unicode: each char is 3 bytes in UTF-8.
2061        let unicode_output = "日本語テスト".repeat(100);
2062        graph.tasks[0].result = Some(TaskResult {
2063            output: unicode_output,
2064            artifacts: vec![],
2065            duration_ms: 10,
2066            agent_id: None,
2067            agent_def: None,
2068        });
2069
2070        // Budget large enough to hold the spotlighting wrapper + some Japanese chars.
2071        // The sanitizer adds ~200 chars of spotlight header, so 500 chars is sufficient.
2072        let config = zeph_config::OrchestrationConfig {
2073            dependency_context_budget: 500,
2074            ..make_config()
2075        };
2076        let scheduler = DagScheduler::new(
2077            graph,
2078            &config,
2079            Box::new(FirstRouter),
2080            vec![make_def("worker")],
2081        )
2082        .unwrap();
2083
2084        // Must not panic, and Japanese chars must be preserved in the output.
2085        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2086        assert!(
2087            prompt.contains("日"),
2088            "Japanese characters should be in the prompt after safe truncation"
2089        );
2090    }
2091
2092    #[test]
2093    fn test_no_agent_routes_inline() {
2094        // NoneRouter: when no agent matches, task falls back to RunInline.
2095        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2096        let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
2097        let actions = scheduler.tick();
2098        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
2099        assert!(
2100            actions
2101                .iter()
2102                .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
2103        );
2104    }
2105
2106    #[test]
2107    fn test_stale_event_rejected() {
2108        // Regression: events from a previous agent incarnation must be discarded.
2109        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2110        let mut scheduler = make_scheduler(graph);
2111
2112        // Simulate task running with handle "current-handle".
2113        scheduler.graph.tasks[0].status = TaskStatus::Running;
2114        scheduler.running.insert(
2115            TaskId(0),
2116            RunningTask {
2117                agent_handle_id: "current-handle".to_string(),
2118                agent_def_name: "worker".to_string(),
2119                started_at: Instant::now(),
2120            },
2121        );
2122
2123        // Send a completion event from the OLD agent (stale handle).
2124        let stale_event = TaskEvent {
2125            task_id: TaskId(0),
2126            agent_handle_id: "old-handle".to_string(),
2127            outcome: TaskOutcome::Completed {
2128                output: "stale output".to_string(),
2129                artifacts: vec![],
2130            },
2131        };
2132        scheduler.buffered_events.push_back(stale_event);
2133        let actions = scheduler.tick();
2134
2135        // Stale event must be discarded — task must NOT be completed.
2136        assert_ne!(
2137            scheduler.graph.tasks[0].status,
2138            TaskStatus::Completed,
2139            "stale event must not complete the task"
2140        );
2141        // No Spawn or Done actions should result from a discarded stale event.
2142        let has_done = actions
2143            .iter()
2144            .any(|a| matches!(a, SchedulerAction::Done { .. }));
2145        assert!(
2146            !has_done,
2147            "no Done action should be emitted for a stale event"
2148        );
2149        // Task must still be in the running map.
2150        assert!(
2151            scheduler.running.contains_key(&TaskId(0)),
2152            "running task must remain after stale event"
2153        );
2154    }
2155
2156    #[test]
2157    fn test_build_prompt_chars_count_in_truncation_message() {
2158        // Fix #3: truncation message must report char count, not byte count.
2159        // Use pure ASCII so sanitization doesn't significantly change char count.
2160        // Budget < output length => truncation triggered; verify the count label is "chars total".
2161        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2162        graph.tasks[0].status = TaskStatus::Completed;
2163        // ASCII output: byte count == char count, so both old and new code produce the same number,
2164        // but the label "chars total" (not "bytes total") is what matters here.
2165        let output = "x".repeat(200);
2166        graph.tasks[0].result = Some(TaskResult {
2167            output,
2168            artifacts: vec![],
2169            duration_ms: 10,
2170            agent_id: None,
2171            agent_def: None,
2172        });
2173
2174        let config = zeph_config::OrchestrationConfig {
2175            dependency_context_budget: 10, // truncate: sanitized output >> 10 chars
2176            ..make_config()
2177        };
2178        let scheduler = DagScheduler::new(
2179            graph,
2180            &config,
2181            Box::new(FirstRouter),
2182            vec![make_def("worker")],
2183        )
2184        .unwrap();
2185
2186        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2187        // Truncation must have been triggered and the message must use "chars total" label.
2188        assert!(
2189            prompt.contains("chars total"),
2190            "truncation message must use 'chars total' label. Prompt: {prompt}"
2191        );
2192        assert!(
2193            prompt.contains("[truncated:"),
2194            "prompt must contain truncation notice. Prompt: {prompt}"
2195        );
2196    }
2197
2198    // --- resume_from tests (MT-1) ---
2199
2200    #[test]
2201    fn test_resume_from_accepts_paused_graph() {
2202        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2203        graph.status = GraphStatus::Paused;
2204        graph.tasks[0].status = TaskStatus::Pending;
2205
2206        let scheduler =
2207            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2208                .expect("resume_from should accept Paused graph");
2209        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2210    }
2211
2212    #[test]
2213    fn test_resume_from_accepts_failed_graph() {
2214        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2215        graph.status = GraphStatus::Failed;
2216        graph.tasks[0].status = TaskStatus::Failed;
2217
2218        let scheduler =
2219            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2220                .expect("resume_from should accept Failed graph");
2221        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2222    }
2223
2224    #[test]
2225    fn test_resume_from_rejects_completed_graph() {
2226        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2227        graph.status = GraphStatus::Completed;
2228
2229        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2230            .unwrap_err();
2231        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2232    }
2233
2234    #[test]
2235    fn test_resume_from_rejects_canceled_graph() {
2236        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2237        graph.status = GraphStatus::Canceled;
2238
2239        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2240            .unwrap_err();
2241        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2242    }
2243
2244    #[test]
2245    fn test_resume_from_reconstructs_running_tasks() {
2246        // IC1: tasks that were Running at pause time must appear in the running map.
2247        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2248        graph.status = GraphStatus::Paused;
2249        graph.tasks[0].status = TaskStatus::Running;
2250        graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2251        graph.tasks[0].agent_hint = Some("worker".to_string());
2252        graph.tasks[1].status = TaskStatus::Pending;
2253
2254        let scheduler =
2255            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2256                .expect("should succeed");
2257
2258        assert!(
2259            scheduler.running.contains_key(&TaskId(0)),
2260            "Running task must be reconstructed in the running map (IC1)"
2261        );
2262        assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2263        assert!(
2264            !scheduler.running.contains_key(&TaskId(1)),
2265            "Pending task must not appear in running map"
2266        );
2267    }
2268
2269    #[test]
2270    fn test_resume_from_sets_status_running() {
2271        // II3: resume_from must set graph.status = Running regardless of input status.
2272        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2273        graph.status = GraphStatus::Paused;
2274
2275        let scheduler =
2276            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2277                .unwrap();
2278        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2279    }
2280
2281    // --- #1619 regression tests: consecutive_spawn_failures + exponential backoff ---
2282
2283    #[test]
2284    fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2285        // Each tick where all spawns hit ConcurrencyLimit must increment the counter
2286        // via record_batch_backoff(false, true).
2287        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2288        let mut scheduler = make_scheduler(graph);
2289        scheduler.graph.tasks[0].status = TaskStatus::Running;
2290
2291        assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2292
2293        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2294        scheduler.record_spawn_failure(TaskId(0), &error);
2295        // record_spawn_failure no longer increments; batch_backoff does.
2296        scheduler.record_batch_backoff(false, true);
2297        assert_eq!(
2298            scheduler.consecutive_spawn_failures, 1,
2299            "first deferral tick: consecutive_spawn_failures must be 1"
2300        );
2301
2302        scheduler.graph.tasks[0].status = TaskStatus::Running;
2303        scheduler.record_spawn_failure(TaskId(0), &error);
2304        scheduler.record_batch_backoff(false, true);
2305        assert_eq!(
2306            scheduler.consecutive_spawn_failures, 2,
2307            "second deferral tick: consecutive_spawn_failures must be 2"
2308        );
2309
2310        scheduler.graph.tasks[0].status = TaskStatus::Running;
2311        scheduler.record_spawn_failure(TaskId(0), &error);
2312        scheduler.record_batch_backoff(false, true);
2313        assert_eq!(
2314            scheduler.consecutive_spawn_failures, 3,
2315            "third deferral tick: consecutive_spawn_failures must be 3"
2316        );
2317    }
2318
2319    #[test]
2320    fn test_consecutive_spawn_failures_resets_on_success() {
2321        // record_spawn() after deferrals must reset consecutive_spawn_failures to 0
2322        // (via record_spawn internal reset; record_batch_backoff(true, _) also resets).
2323        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2324        let mut scheduler = make_scheduler(graph);
2325        scheduler.graph.tasks[0].status = TaskStatus::Running;
2326
2327        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2328        scheduler.record_spawn_failure(TaskId(0), &error);
2329        scheduler.record_batch_backoff(false, true);
2330        scheduler.graph.tasks[0].status = TaskStatus::Running;
2331        scheduler.record_spawn_failure(TaskId(0), &error);
2332        scheduler.record_batch_backoff(false, true);
2333        assert_eq!(scheduler.consecutive_spawn_failures, 2);
2334
2335        // Successful spawn resets the counter directly in record_spawn.
2336        scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2337        assert_eq!(
2338            scheduler.consecutive_spawn_failures, 0,
2339            "record_spawn must reset consecutive_spawn_failures to 0"
2340        );
2341    }
2342
2343    #[tokio::test]
2344    async fn test_exponential_backoff_duration() {
2345        // With consecutive_spawn_failures=0, backoff equals the base interval.
2346        // With consecutive_spawn_failures=3, backoff = min(base * 8, 5000ms).
2347        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2348        let config = zeph_config::OrchestrationConfig {
2349            deferral_backoff_ms: 50,
2350            ..make_config()
2351        };
2352        let mut scheduler = DagScheduler::new(
2353            graph,
2354            &config,
2355            Box::new(FirstRouter),
2356            vec![make_def("worker")],
2357        )
2358        .unwrap();
2359
2360        // consecutive_spawn_failures=0 → sleep ≈ 50ms (base).
2361        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2362        let start = tokio::time::Instant::now();
2363        scheduler.wait_event().await;
2364        let elapsed0 = start.elapsed();
2365        assert!(
2366            elapsed0.as_millis() >= 50,
2367            "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2368            elapsed0.as_millis()
2369        );
2370
2371        // Simulate 3 consecutive deferrals: multiplier = 2^3 = 8 → 400ms, capped at 5000ms.
2372        scheduler.consecutive_spawn_failures = 3;
2373        let start = tokio::time::Instant::now();
2374        scheduler.wait_event().await;
2375        let elapsed3 = start.elapsed();
2376        assert!(
2377            elapsed3.as_millis() >= 400,
2378            "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2379            elapsed3.as_millis()
2380        );
2381
2382        // Simulate 20 consecutive deferrals: exponent capped at 10 → 50 * 1024 = 51200 → capped at 5000ms.
2383        scheduler.consecutive_spawn_failures = 20;
2384        let start = tokio::time::Instant::now();
2385        scheduler.wait_event().await;
2386        let elapsed_capped = start.elapsed();
2387        assert!(
2388            elapsed_capped.as_millis() >= 5000,
2389            "backoff must be capped at 5000ms with high deferrals, got {}ms",
2390            elapsed_capped.as_millis()
2391        );
2392    }
2393
2394    // --- deferral_backoff regression test ---
2395
2396    #[tokio::test]
2397    async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2398        // Regression for issue #1519: wait_event must sleep deferral_backoff when
2399        // running is empty, preventing a busy spin-loop.
2400        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2401        let config = zeph_config::OrchestrationConfig {
2402            deferral_backoff_ms: 50,
2403            ..make_config()
2404        };
2405        let mut scheduler = DagScheduler::new(
2406            graph,
2407            &config,
2408            Box::new(FirstRouter),
2409            vec![make_def("worker")],
2410        )
2411        .unwrap();
2412
2413        // Do not start any tasks — running map stays empty.
2414        assert!(scheduler.running.is_empty());
2415
2416        let start = tokio::time::Instant::now();
2417        scheduler.wait_event().await;
2418        let elapsed = start.elapsed();
2419
2420        assert!(
2421            elapsed.as_millis() >= 50,
2422            "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2423            elapsed.as_millis()
2424        );
2425    }
2426
2427    #[test]
2428    fn test_current_deferral_backoff_exponential_growth() {
2429        // Regression for issue #1618: backoff must grow exponentially with consecutive
2430        // spawn failures so the scheduler does not busy-spin at 250ms when saturated.
2431        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2432        let config = zeph_config::OrchestrationConfig {
2433            deferral_backoff_ms: 250,
2434            ..make_config()
2435        };
2436        let mut scheduler = DagScheduler::new(
2437            graph,
2438            &config,
2439            Box::new(FirstRouter),
2440            vec![make_def("worker")],
2441        )
2442        .unwrap();
2443
2444        assert_eq!(
2445            scheduler.current_deferral_backoff(),
2446            Duration::from_millis(250)
2447        );
2448
2449        scheduler.consecutive_spawn_failures = 1;
2450        assert_eq!(
2451            scheduler.current_deferral_backoff(),
2452            Duration::from_millis(500)
2453        );
2454
2455        scheduler.consecutive_spawn_failures = 2;
2456        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2457
2458        scheduler.consecutive_spawn_failures = 3;
2459        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2460
2461        scheduler.consecutive_spawn_failures = 4;
2462        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2463
2464        // Cap at 5 seconds.
2465        scheduler.consecutive_spawn_failures = 5;
2466        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2467
2468        scheduler.consecutive_spawn_failures = 100;
2469        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2470    }
2471
2472    #[test]
2473    fn test_record_spawn_resets_consecutive_failures() {
2474        // Regression for issue #1618: a successful spawn resets the backoff counter.
2475        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2476        let mut scheduler = DagScheduler::new(
2477            graph,
2478            &make_config(),
2479            Box::new(FirstRouter),
2480            vec![make_def("worker")],
2481        )
2482        .unwrap();
2483
2484        scheduler.consecutive_spawn_failures = 3;
2485        let task_id = TaskId(0);
2486        scheduler.graph.tasks[0].status = TaskStatus::Running;
2487        scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2488
2489        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2490    }
2491
2492    #[test]
2493    fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2494        // record_spawn_failure(ConcurrencyLimit) reverts task to Ready but does NOT
2495        // change consecutive_spawn_failures — that is the job of record_batch_backoff.
2496        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2497        let mut scheduler = DagScheduler::new(
2498            graph,
2499            &make_config(),
2500            Box::new(FirstRouter),
2501            vec![make_def("worker")],
2502        )
2503        .unwrap();
2504
2505        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2506        let task_id = TaskId(0);
2507        scheduler.graph.tasks[0].status = TaskStatus::Running;
2508
2509        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2510        scheduler.record_spawn_failure(task_id, &error);
2511
2512        // Counter unchanged — batch_backoff is responsible for incrementing.
2513        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2514        // Task reverted to Ready.
2515        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2516    }
2517
2518    // --- #1628 parallel dispatch tests ---
2519
2520    #[test]
2521    fn test_parallel_dispatch_all_ready() {
2522        // tick() enforces max_parallel as a pre-dispatch cap. With 6 independent tasks
2523        // and max_parallel=2, only 2 tasks are dispatched per tick.
2524        let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2525        let graph = graph_from_nodes(nodes);
2526        let config = zeph_config::OrchestrationConfig {
2527            max_parallel: 2,
2528            ..make_config()
2529        };
2530        let mut scheduler = DagScheduler::new(
2531            graph,
2532            &config,
2533            Box::new(FirstRouter),
2534            vec![make_def("worker")],
2535        )
2536        .unwrap();
2537
2538        let actions = scheduler.tick();
2539        let spawn_count = actions
2540            .iter()
2541            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2542            .count();
2543        assert_eq!(
2544            spawn_count, 2,
2545            "only max_parallel=2 tasks dispatched per tick"
2546        );
2547
2548        let running_count = scheduler
2549            .graph
2550            .tasks
2551            .iter()
2552            .filter(|t| t.status == TaskStatus::Running)
2553            .count();
2554        assert_eq!(running_count, 2, "only 2 tasks marked Running");
2555    }
2556
2557    #[test]
2558    fn test_batch_backoff_partial_success() {
2559        // Some spawns succeed, some hit ConcurrencyLimit: counter resets to 0.
2560        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2561        let mut scheduler = make_scheduler(graph);
2562        scheduler.consecutive_spawn_failures = 3;
2563
2564        scheduler.record_batch_backoff(true, true);
2565        assert_eq!(
2566            scheduler.consecutive_spawn_failures, 0,
2567            "any success in batch must reset counter"
2568        );
2569    }
2570
2571    #[test]
2572    fn test_batch_backoff_all_failed() {
2573        // All spawns hit ConcurrencyLimit: counter increments by 1.
2574        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2575        let mut scheduler = make_scheduler(graph);
2576        scheduler.consecutive_spawn_failures = 2;
2577
2578        scheduler.record_batch_backoff(false, true);
2579        assert_eq!(
2580            scheduler.consecutive_spawn_failures, 3,
2581            "all-failure tick must increment counter"
2582        );
2583    }
2584
2585    #[test]
2586    fn test_batch_backoff_no_spawns() {
2587        // No spawn actions in tick: counter unchanged.
2588        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2589        let mut scheduler = make_scheduler(graph);
2590        scheduler.consecutive_spawn_failures = 5;
2591
2592        scheduler.record_batch_backoff(false, false);
2593        assert_eq!(
2594            scheduler.consecutive_spawn_failures, 5,
2595            "no spawns must not change counter"
2596        );
2597    }
2598
2599    #[test]
2600    fn test_buffer_guard_uses_task_count() {
2601        // Structural guard: verifies that the buffer capacity expression uses
2602        // graph.tasks.len() * 2 rather than max_parallel * 2. This is an intentional
2603        // regression-prevention test — if wait_event() is accidentally reverted to
2604        // max_parallel * 2 the assertion below catches the discrepancy.
2605        // Behavioral coverage (actual buffer drop prevention) requires an async harness
2606        // with a real channel, which is outside the scope of this unit test.
2607        //
2608        // Scenario: 10 tasks, max_parallel=2 → tasks.len()*2=20, max_parallel*2=4.
2609        // The guard must use 20, not 4.
2610        let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2611        let graph = graph_from_nodes(nodes);
2612        let config = zeph_config::OrchestrationConfig {
2613            max_parallel: 2, // 2*2=4, but tasks.len()*2=20
2614            ..make_config()
2615        };
2616        let scheduler = DagScheduler::new(
2617            graph,
2618            &config,
2619            Box::new(FirstRouter),
2620            vec![make_def("worker")],
2621        )
2622        .unwrap();
2623        // Confirm: tasks.len() * 2 = 20, max_parallel * 2 = 4.
2624        assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2625        assert_eq!(scheduler.max_parallel * 2, 4);
2626    }
2627
2628    #[test]
2629    fn test_batch_mixed_concurrency_and_fatal_failure() {
2630        // Mixed batch: task 0 gets ConcurrencyLimit (transient), task 1 gets a
2631        // non-transient Spawn error (fatal). Two independent tasks, no deps between them.
2632        // Verify:
2633        // - task 0 reverts to Ready (retried next tick)
2634        // - task 1 is marked Failed; with FailureStrategy::Skip the graph stays Running
2635        //   because task 1 has no dependents that would abort the graph
2636        // - record_batch_backoff(false, true) increments counter by 1
2637        let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2638        // FailureStrategy::Skip: task 1 fails but its absence is ignored.
2639        nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2640        let graph = graph_from_nodes(nodes);
2641        let mut scheduler = make_scheduler(graph);
2642
2643        // Optimistically mark both as Running (as tick() would do).
2644        scheduler.graph.tasks[0].status = TaskStatus::Running;
2645        scheduler.graph.tasks[1].status = TaskStatus::Running;
2646
2647        // Task 0: ConcurrencyLimit (transient).
2648        let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2649        let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2650        assert!(
2651            actions0.is_empty(),
2652            "ConcurrencyLimit must produce no extra actions"
2653        );
2654        assert_eq!(
2655            scheduler.graph.tasks[0].status,
2656            TaskStatus::Ready,
2657            "task 0 must revert to Ready"
2658        );
2659
2660        // Task 1: non-transient Spawn failure. record_spawn_failure marks it Failed,
2661        // then propagate_failure applies FailureStrategy::Skip → status becomes Skipped.
2662        let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2663        let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2664        assert_eq!(
2665            scheduler.graph.tasks[1].status,
2666            TaskStatus::Skipped,
2667            "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2668        );
2669        // No Done action from record_spawn_failure — graph still has task 0 alive.
2670        assert!(
2671            actions1
2672                .iter()
2673                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2674            "no Done action expected: task 0 is still Ready"
2675        );
2676
2677        // Batch result: no success, one ConcurrencyLimit failure.
2678        scheduler.consecutive_spawn_failures = 0;
2679        scheduler.record_batch_backoff(false, true);
2680        assert_eq!(
2681            scheduler.consecutive_spawn_failures, 1,
2682            "batch with only ConcurrencyLimit must increment counter"
2683        );
2684    }
2685
2686    /// Regression for #1879: when the scheduler detects a deadlock (no running or ready tasks,
2687    /// but the graph is not complete), all non-terminal tasks must be marked Canceled, not left
2688    /// in their previous status (e.g. Pending).
2689    #[test]
2690    fn test_deadlock_marks_non_terminal_tasks_canceled() {
2691        // Build a graph in Failed status (as if a prior retry pass left task 0 failed and
2692        // task 1/2 still Pending). resume_from() transitions it to Running without resetting
2693        // task statuses, so tick() immediately sees no running, no ready, not all terminal —
2694        // triggering the deadlock branch.
2695        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2696        nodes[0].status = TaskStatus::Failed;
2697        nodes[1].status = TaskStatus::Pending;
2698        nodes[2].status = TaskStatus::Pending;
2699
2700        let mut graph = graph_from_nodes(nodes);
2701        graph.status = GraphStatus::Failed;
2702
2703        let mut scheduler = DagScheduler::resume_from(
2704            graph,
2705            &make_config(),
2706            Box::new(FirstRouter),
2707            vec![make_def("worker")],
2708        )
2709        .unwrap();
2710
2711        // After resume_from, graph is Running but no tasks are Ready/Running — deadlock.
2712        let actions = scheduler.tick();
2713
2714        // Must emit Done(Failed).
2715        assert!(
2716            actions.iter().any(|a| matches!(
2717                a,
2718                SchedulerAction::Done {
2719                    status: GraphStatus::Failed
2720                }
2721            )),
2722            "deadlock must emit Done(Failed); got: {actions:?}"
2723        );
2724        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2725
2726        // task 0 was already Failed (terminal) — must remain unchanged.
2727        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2728        // task 1 was Pending (non-terminal) — must be Canceled.
2729        assert_eq!(
2730            scheduler.graph.tasks[1].status,
2731            TaskStatus::Canceled,
2732            "Pending task must be Canceled on deadlock"
2733        );
2734        // task 2 was Pending (non-terminal) — must be Canceled.
2735        assert_eq!(
2736            scheduler.graph.tasks[2].status,
2737            TaskStatus::Canceled,
2738            "Pending task must be Canceled on deadlock"
2739        );
2740    }
2741
2742    /// Regression for #1879: deadlock with one task Running should NOT trigger the deadlock
2743    /// branch (`running_in_graph_now` > 0 suppresses the check).
2744    #[test]
2745    fn test_deadlock_not_triggered_when_task_running() {
2746        // Graph in Failed with one task still marked Running — resume_from reconstructs
2747        // the running map. tick() sees running_in_graph_now > 0 and skips deadlock check.
2748        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2749        nodes[0].status = TaskStatus::Running;
2750        nodes[0].assigned_agent = Some("handle-1".into());
2751        nodes[1].status = TaskStatus::Pending;
2752
2753        let mut graph = graph_from_nodes(nodes);
2754        graph.status = GraphStatus::Failed;
2755
2756        let mut scheduler = DagScheduler::resume_from(
2757            graph,
2758            &make_config(),
2759            Box::new(FirstRouter),
2760            vec![make_def("worker")],
2761        )
2762        .unwrap();
2763
2764        let actions = scheduler.tick();
2765
2766        // Running task in graph — no deadlock triggered.
2767        assert!(
2768            actions
2769                .iter()
2770                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2771            "no Done action expected when a task is running; got: {actions:?}"
2772        );
2773        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2774    }
2775
2776    // --- topology_selection tests ---
2777
2778    #[test]
2779    fn topology_linear_chain_limits_parallelism_to_one() {
2780        // LinearChain topology with topology_selection=true → max_parallel overridden to 1.
2781        // tick() must dispatch exactly 1 task even though 1 root task is ready.
2782        let graph = graph_from_nodes(vec![
2783            make_node(0, &[]),
2784            make_node(1, &[0]),
2785            make_node(2, &[1]),
2786        ]);
2787        let config = zeph_config::OrchestrationConfig {
2788            topology_selection: true,
2789            max_parallel: 4,
2790            ..make_config()
2791        };
2792        let mut scheduler = DagScheduler::new(
2793            graph,
2794            &config,
2795            Box::new(FirstRouter),
2796            vec![make_def("worker")],
2797        )
2798        .unwrap();
2799
2800        assert_eq!(
2801            scheduler.topology().topology,
2802            crate::topology::Topology::LinearChain
2803        );
2804        assert_eq!(scheduler.max_parallel, 1);
2805
2806        let actions = scheduler.tick();
2807        let spawn_count = actions
2808            .iter()
2809            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2810            .count();
2811        assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2812    }
2813
2814    #[test]
2815    fn topology_all_parallel_dispatches_all_ready() {
2816        // AllParallel topology with topology_selection=true → max_parallel unchanged.
2817        // tick() dispatches all 4 independent tasks in one go.
2818        let graph = graph_from_nodes(vec![
2819            make_node(0, &[]),
2820            make_node(1, &[]),
2821            make_node(2, &[]),
2822            make_node(3, &[]),
2823        ]);
2824        let config = zeph_config::OrchestrationConfig {
2825            topology_selection: true,
2826            max_parallel: 4,
2827            ..make_config()
2828        };
2829        let mut scheduler = DagScheduler::new(
2830            graph,
2831            &config,
2832            Box::new(FirstRouter),
2833            vec![make_def("worker")],
2834        )
2835        .unwrap();
2836
2837        assert_eq!(
2838            scheduler.topology().topology,
2839            crate::topology::Topology::AllParallel
2840        );
2841
2842        let actions = scheduler.tick();
2843        let spawn_count = actions
2844            .iter()
2845            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2846            .count();
2847        assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2848    }
2849
2850    #[test]
2851    fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2852        // Three ready tasks: A(sequential), B(sequential), C(parallel).
2853        // tick() must dispatch A + C, hold B (another sequential already scheduled this tick).
2854        use crate::graph::ExecutionMode;
2855
2856        let mut a = make_node(0, &[]);
2857        a.execution_mode = ExecutionMode::Sequential;
2858        let mut b = make_node(1, &[]);
2859        b.execution_mode = ExecutionMode::Sequential;
2860        let mut c = make_node(2, &[]);
2861        c.execution_mode = ExecutionMode::Parallel;
2862
2863        let graph = graph_from_nodes(vec![a, b, c]);
2864        let config = zeph_config::OrchestrationConfig {
2865            max_parallel: 4,
2866            ..make_config()
2867        };
2868        let mut scheduler = DagScheduler::new(
2869            graph,
2870            &config,
2871            Box::new(FirstRouter),
2872            vec![make_def("worker")],
2873        )
2874        .unwrap();
2875
2876        let actions = scheduler.tick();
2877        let spawned: Vec<TaskId> = actions
2878            .iter()
2879            .filter_map(|a| {
2880                if let SchedulerAction::Spawn { task_id, .. } = a {
2881                    Some(*task_id)
2882                } else {
2883                    None
2884                }
2885            })
2886            .collect();
2887
2888        // A (seq, idx=0) and C (par, idx=2) dispatched; B (seq, idx=1) held.
2889        assert!(
2890            spawned.contains(&TaskId(0)),
2891            "A(sequential) must be dispatched"
2892        );
2893        assert!(
2894            spawned.contains(&TaskId(2)),
2895            "C(parallel) must be dispatched"
2896        );
2897        assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2898        assert_eq!(spawned.len(), 2);
2899    }
2900
2901    // --- inject_tasks replan cap tests (#2241) ---
2902
2903    #[test]
2904    fn test_inject_tasks_per_task_cap_skips_second() {
2905        // Per-task cap: 1 replan per task. Second inject for same task_id is a silent no-op.
2906        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2907        let mut scheduler = make_scheduler(graph);
2908
2909        let first = make_node(2, &[]);
2910        scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2911        assert_eq!(
2912            scheduler.graph.tasks.len(),
2913            3,
2914            "first inject must append the task"
2915        );
2916        assert_eq!(scheduler.global_replan_count, 1);
2917
2918        // Second inject for the same verified task — per-task count is already 1.
2919        let second = make_node(3, &[]);
2920        scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2921        assert_eq!(
2922            scheduler.graph.tasks.len(),
2923            3,
2924            "second inject must be silently skipped (per-task cap)"
2925        );
2926        assert_eq!(
2927            scheduler.global_replan_count, 1,
2928            "global counter must not increment on skipped inject"
2929        );
2930    }
2931
2932    #[test]
2933    fn test_inject_tasks_global_cap_skips_when_exhausted() {
2934        // Global cap: max_replans=1. First inject consumes the budget; second is a no-op.
2935        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2936        let mut config = make_config();
2937        config.max_replans = 1;
2938        let defs = vec![make_def("worker")];
2939        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2940
2941        let new1 = make_node(2, &[]);
2942        scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2943        assert_eq!(scheduler.global_replan_count, 1);
2944
2945        // Second inject for a different task — global cap exhausted.
2946        let new2 = make_node(3, &[]);
2947        scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2948        assert_eq!(
2949            scheduler.graph.tasks.len(),
2950            3,
2951            "global cap must prevent the second inject"
2952        );
2953        assert_eq!(
2954            scheduler.global_replan_count, 1,
2955            "global counter must not increment past cap"
2956        );
2957    }
2958
2959    #[test]
2960    fn test_inject_tasks_sets_topology_dirty() {
2961        // inject_tasks must set topology_dirty; tick() must clear it after re-analysis.
2962        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2963        let mut scheduler = make_scheduler(graph);
2964        assert!(
2965            !scheduler.topology_dirty,
2966            "topology_dirty must be false initially"
2967        );
2968
2969        let new_task = make_node(1, &[]);
2970        scheduler
2971            .inject_tasks(TaskId(0), vec![new_task], 20)
2972            .unwrap();
2973        assert!(
2974            scheduler.topology_dirty,
2975            "inject_tasks must set topology_dirty=true"
2976        );
2977
2978        scheduler.tick();
2979        assert!(
2980            !scheduler.topology_dirty,
2981            "tick() must clear topology_dirty after re-analysis"
2982        );
2983    }
2984
2985    #[test]
2986    fn test_inject_tasks_rejects_cycle() {
2987        // Injecting a task that introduces a cycle must return VerificationFailed.
2988        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2989        let mut scheduler = make_scheduler(graph);
2990
2991        // New task ID=1 with a self-reference (depends on itself) → cycle.
2992        let cyclic_task = make_node(1, &[1]);
2993        let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
2994        assert!(result.is_err(), "cyclic injection must return an error");
2995        assert!(
2996            matches!(
2997                result.unwrap_err(),
2998                OrchestrationError::VerificationFailed(_)
2999            ),
3000            "must return VerificationFailed for cycle"
3001        );
3002        // Global and per-task counters must not be incremented on error.
3003        assert_eq!(scheduler.global_replan_count, 0);
3004        assert!(
3005            !scheduler.topology_dirty,
3006            "topology_dirty must not be set when inject fails"
3007        );
3008    }
3009
3010    // --- LevelBarrier dispatch tests (#2242) ---
3011
3012    fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
3013        zeph_config::OrchestrationConfig {
3014            topology_selection: true,
3015            max_parallel: 4,
3016            ..make_config()
3017        }
3018    }
3019
3020    /// A(0)→{B(1),C(2)}, B(1)→D(3). Hierarchical topology, depths: A=0, B=1, C=1, D=2.
3021    fn make_hierarchical_graph() -> TaskGraph {
3022        graph_from_nodes(vec![
3023            make_node(0, &[]),
3024            make_node(1, &[0]),
3025            make_node(2, &[0]),
3026            make_node(3, &[1]),
3027        ])
3028    }
3029
3030    #[test]
3031    fn test_level_barrier_advances_on_terminal_level() {
3032        // When all tasks at current_level are terminal, tick() advances current_level
3033        // and dispatches tasks at the next non-terminal level.
3034        let graph = make_hierarchical_graph();
3035        let config = make_hierarchical_config();
3036        let defs = vec![make_def("worker")];
3037        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3038
3039        assert_eq!(
3040            scheduler.topology().strategy,
3041            crate::topology::DispatchStrategy::LevelBarrier,
3042            "must use LevelBarrier strategy for Hierarchical graph"
3043        );
3044        assert_eq!(scheduler.current_level, 0);
3045
3046        // First tick: only A(0) at level 0 is dispatched.
3047        let actions = scheduler.tick();
3048        let spawned_ids: Vec<_> = actions
3049            .iter()
3050            .filter_map(|a| {
3051                if let SchedulerAction::Spawn { task_id, .. } = a {
3052                    Some(*task_id)
3053                } else {
3054                    None
3055                }
3056            })
3057            .collect();
3058        assert_eq!(
3059            spawned_ids,
3060            vec![TaskId(0)],
3061            "first tick must dispatch only A at level 0"
3062        );
3063
3064        // Simulate A completing: mark Completed, mark B and C Ready (deps satisfied).
3065        scheduler.graph.tasks[0].status = TaskStatus::Completed;
3066        scheduler.running.clear();
3067        scheduler.graph.tasks[1].status = TaskStatus::Ready;
3068        scheduler.graph.tasks[2].status = TaskStatus::Ready;
3069
3070        // Second tick: A is terminal → level advances to 1 → B and C dispatched.
3071        let actions2 = scheduler.tick();
3072        assert_eq!(
3073            scheduler.current_level, 1,
3074            "current_level must advance to 1 after level-0 tasks terminate"
3075        );
3076        let spawned2: Vec<_> = actions2
3077            .iter()
3078            .filter_map(|a| {
3079                if let SchedulerAction::Spawn { task_id, .. } = a {
3080                    Some(*task_id)
3081                } else {
3082                    None
3083                }
3084            })
3085            .collect();
3086        assert!(
3087            spawned2.contains(&TaskId(1)),
3088            "B must be dispatched after level advance"
3089        );
3090        assert!(
3091            spawned2.contains(&TaskId(2)),
3092            "C must be dispatched after level advance"
3093        );
3094    }
3095
3096    #[test]
3097    fn test_level_barrier_failure_propagates_transitively() {
3098        // When A fails with Skip strategy, propagate_failure() BFS-marks all
3099        // descendants (B, C, D) as Skipped. tick() must then advance past level 0.
3100        let graph = make_hierarchical_graph();
3101        let config = make_hierarchical_config();
3102        let defs = vec![make_def("worker")];
3103        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3104
3105        // Set A to Skip failure strategy and simulate it running.
3106        scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
3107        scheduler.graph.tasks[0].status = TaskStatus::Running;
3108        scheduler.running.insert(
3109            TaskId(0),
3110            RunningTask {
3111                agent_handle_id: "h0".to_string(),
3112                agent_def_name: "worker".to_string(),
3113                started_at: Instant::now(),
3114            },
3115        );
3116
3117        // Push a failure event for A.
3118        scheduler.buffered_events.push_back(TaskEvent {
3119            task_id: TaskId(0),
3120            agent_handle_id: "h0".to_string(),
3121            outcome: TaskOutcome::Failed {
3122                error: "simulated failure".to_string(),
3123            },
3124        });
3125
3126        scheduler.tick();
3127
3128        // A failed with Skip → A=Skipped. B, C, D must be transitively Skipped.
3129        assert_eq!(
3130            scheduler.graph.tasks[0].status,
3131            TaskStatus::Skipped,
3132            "A must be Skipped (Skip strategy)"
3133        );
3134        assert_eq!(
3135            scheduler.graph.tasks[1].status,
3136            TaskStatus::Skipped,
3137            "B must be transitively Skipped"
3138        );
3139        assert_eq!(
3140            scheduler.graph.tasks[2].status,
3141            TaskStatus::Skipped,
3142            "C must be transitively Skipped"
3143        );
3144        assert_eq!(
3145            scheduler.graph.tasks[3].status,
3146            TaskStatus::Skipped,
3147            "D must be transitively Skipped"
3148        );
3149    }
3150
3151    #[test]
3152    fn test_level_barrier_current_level_reset_after_inject() {
3153        // inject_tasks() adding a task at depth < current_level must cause tick() to
3154        // reset current_level downward via the .min() guard (critic C2 / issue #2242).
3155        let graph = make_hierarchical_graph(); // A(0)→{B(1),C(2)}, B(1)→D(3)
3156        let config = make_hierarchical_config();
3157        let defs = vec![make_def("worker")];
3158        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3159
3160        // Manually mark A, B, C as Completed (simulate levels 0 and 1 done).
3161        scheduler.graph.tasks[0].status = TaskStatus::Completed; // A depth 0
3162        scheduler.graph.tasks[1].status = TaskStatus::Completed; // B depth 1
3163        scheduler.graph.tasks[2].status = TaskStatus::Completed; // C depth 1
3164        // D(3) is Pending at depth 2. Manually set current_level to 2.
3165        scheduler.current_level = 2;
3166
3167        // Inject E(4) depending on A(0) (Completed) → E will be at depth 1 after re-analysis.
3168        // This is shallower than current_level=2 → tick() must reset current_level to 1.
3169        let e = make_node(4, &[0]);
3170        scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3171        assert!(scheduler.topology_dirty);
3172
3173        // tick() re-analyzes topology (E at depth 1, D at depth 2).
3174        // min_non_terminal_depth = 1 (E is Ready). current_level = min(2, 1) = 1.
3175        scheduler.tick();
3176        assert_eq!(
3177            scheduler.current_level, 1,
3178            "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3179        );
3180    }
3181
3182    #[test]
3183    fn resume_from_preserves_topology_classification() {
3184        // resume_from() must also apply topology classification (fix H3).
3185        let mut graph = graph_from_nodes(vec![
3186            make_node(0, &[]),
3187            make_node(1, &[0]),
3188            make_node(2, &[1]),
3189        ]);
3190        // Put graph in Paused state so resume_from accepts it.
3191        graph.status = GraphStatus::Paused;
3192        graph.tasks[0].status = TaskStatus::Completed;
3193        graph.tasks[1].status = TaskStatus::Pending;
3194        graph.tasks[2].status = TaskStatus::Pending;
3195
3196        let config = zeph_config::OrchestrationConfig {
3197            topology_selection: true,
3198            max_parallel: 4,
3199            ..make_config()
3200        };
3201        let scheduler = DagScheduler::resume_from(
3202            graph,
3203            &config,
3204            Box::new(FirstRouter),
3205            vec![make_def("worker")],
3206        )
3207        .unwrap();
3208
3209        assert_eq!(
3210            scheduler.topology().topology,
3211            crate::topology::Topology::LinearChain,
3212            "resume_from must classify topology"
3213        );
3214        assert_eq!(
3215            scheduler.max_parallel, 1,
3216            "resume_from must apply topology limit"
3217        );
3218    }
3219
3220    // --- #2238: validate_verify_config tests ---
3221
3222    fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3223        zeph_config::OrchestrationConfig {
3224            verify_completeness: true,
3225            verify_provider: zeph_config::ProviderName::new(provider),
3226            ..make_config()
3227        }
3228    }
3229
3230    #[test]
3231    fn validate_verify_config_unknown_provider_returns_err() {
3232        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3233        let config = make_verify_config("nonexistent");
3234        let scheduler = DagScheduler::new(
3235            graph,
3236            &config,
3237            Box::new(FirstRouter),
3238            vec![make_def("worker")],
3239        )
3240        .unwrap();
3241        let result = scheduler.validate_verify_config(&["fast", "quality"]);
3242        assert!(result.is_err());
3243        let err_msg = result.unwrap_err().to_string();
3244        assert!(err_msg.contains("nonexistent"));
3245        assert!(err_msg.contains("fast"));
3246    }
3247
3248    #[test]
3249    fn validate_verify_config_known_provider_returns_ok() {
3250        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3251        let config = make_verify_config("fast");
3252        let scheduler = DagScheduler::new(
3253            graph,
3254            &config,
3255            Box::new(FirstRouter),
3256            vec![make_def("worker")],
3257        )
3258        .unwrap();
3259        assert!(
3260            scheduler
3261                .validate_verify_config(&["fast", "quality"])
3262                .is_ok()
3263        );
3264    }
3265
3266    #[test]
3267    fn validate_verify_config_empty_provider_always_ok() {
3268        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3269        let config = make_verify_config("");
3270        let scheduler = DagScheduler::new(
3271            graph,
3272            &config,
3273            Box::new(FirstRouter),
3274            vec![make_def("worker")],
3275        )
3276        .unwrap();
3277        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3278    }
3279
3280    #[test]
3281    fn validate_verify_config_disabled_skips_validation() {
3282        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3283        // verify_completeness = false: no validation even with bogus provider name
3284        let scheduler = make_scheduler(graph);
3285        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3286    }
3287
3288    #[test]
3289    fn validate_verify_config_empty_pool_skips_validation() {
3290        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3291        let config = make_verify_config("nonexistent");
3292        let scheduler = DagScheduler::new(
3293            graph,
3294            &config,
3295            Box::new(FirstRouter),
3296            vec![make_def("worker")],
3297        )
3298        .unwrap();
3299        // Empty provider_names slice = unknown provider set, skip validation.
3300        assert!(scheduler.validate_verify_config(&[]).is_ok());
3301    }
3302
3303    #[test]
3304    fn validate_verify_config_trims_whitespace_in_config() {
3305        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3306        // verify_provider with surrounding whitespace in config is trimmed at construction.
3307        let config = make_verify_config("  fast  ");
3308        let scheduler = DagScheduler::new(
3309            graph,
3310            &config,
3311            Box::new(FirstRouter),
3312            vec![make_def("worker")],
3313        )
3314        .unwrap();
3315        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3316    }
3317
3318    // --- #2237 regression tests: max_parallel drift across replan cycles ---
3319
3320    #[test]
3321    fn config_max_parallel_initialized_from_config() {
3322        // config_max_parallel must always equal config.max_parallel, regardless
3323        // of whether topology analysis reduces max_parallel for the initial topology.
3324        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3325        let config = zeph_config::OrchestrationConfig {
3326            topology_selection: true,
3327            max_parallel: 6,
3328            ..make_config()
3329        };
3330        let scheduler = DagScheduler::new(
3331            graph,
3332            &config,
3333            Box::new(FirstRouter),
3334            vec![make_def("worker")],
3335        )
3336        .unwrap();
3337
3338        assert_eq!(
3339            scheduler.config_max_parallel, 6,
3340            "config_max_parallel must equal config.max_parallel"
3341        );
3342        // LinearChain reduces max_parallel to 1, but config_max_parallel stays at 6.
3343        assert_eq!(
3344            scheduler.max_parallel, 1,
3345            "max_parallel reduced by topology analysis"
3346        );
3347        assert_eq!(
3348            scheduler.config_max_parallel, 6,
3349            "config_max_parallel must not be reduced by topology"
3350        );
3351    }
3352
3353    #[test]
3354    fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3355        // Regression for #2237: successive inject_tasks+tick cycles with a Mixed graph
3356        // must not reduce max_parallel below compute_max_parallel(Mixed, config_max_parallel).
3357        //
3358        // Before the fix, the tick() dirty path used self.max_parallel as the base for
3359        // compute_max_parallel, so each replan cycle reduced it further:
3360        //   cycle 1: max_parallel = (4/2+1)      = 3
3361        //   cycle 2: max_parallel = (3/2+1)      = 2  ← drift!
3362        //   cycle 3: max_parallel = (2/2+1)      = 2
3363        //
3364        // After the fix, config_max_parallel=4 is always used as the base:
3365        //   all cycles: max_parallel = (4/2+1)   = 3  ← stable
3366        let graph = graph_from_nodes(vec![
3367            make_node(0, &[]),
3368            make_node(1, &[0]),
3369            make_node(2, &[0]),
3370            make_node(3, &[1, 2]), // diamond → Mixed
3371        ]);
3372        let config = zeph_config::OrchestrationConfig {
3373            topology_selection: true,
3374            max_parallel: 4,
3375            max_tasks: 50,
3376            ..make_config()
3377        };
3378        let mut scheduler = DagScheduler::new(
3379            graph,
3380            &config,
3381            Box::new(FirstRouter),
3382            vec![make_def("worker")],
3383        )
3384        .unwrap();
3385
3386        // Initial analysis: Mixed topology → max_parallel = (4/2+1) = 3.
3387        assert_eq!(
3388            scheduler.topology().topology,
3389            crate::topology::Topology::Mixed,
3390            "initial topology must be Mixed"
3391        );
3392        let expected_max_parallel = (4usize / 2 + 1).clamp(1, 4); // = 3
3393        assert_eq!(scheduler.max_parallel, expected_max_parallel);
3394
3395        // Simulate inject_tasks (which sets topology_dirty=true) followed by tick().
3396        // The injected task depends on task 3 to keep the graph Mixed.
3397        let extra_task_id = 4u32;
3398        let extra_task = {
3399            let mut n = crate::graph::TaskNode::new(
3400                extra_task_id,
3401                "extra".to_string(),
3402                "extra task injected by replan",
3403            );
3404            n.depends_on = vec![TaskId(3)];
3405            n
3406        };
3407
3408        // inject_tasks requires the verified task to be Completed.
3409        scheduler.graph.tasks[3].status = TaskStatus::Completed;
3410
3411        scheduler
3412            .inject_tasks(TaskId(3), vec![extra_task], 50)
3413            .expect("inject must succeed");
3414        assert!(
3415            scheduler.topology_dirty,
3416            "topology_dirty must be true after inject"
3417        );
3418
3419        // First tick() after inject: re-analyzes topology. Must use config_max_parallel as base.
3420        let _ = scheduler.tick();
3421        let max_after_first_inject = scheduler.max_parallel;
3422        assert_eq!(
3423            max_after_first_inject, expected_max_parallel,
3424            "max_parallel must not drift after first inject+tick"
3425        );
3426
3427        // Second inject+tick cycle: max_parallel must still equal the original computed value.
3428        let extra_task2 = {
3429            let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3430            n.depends_on = vec![TaskId(extra_task_id)];
3431            n
3432        };
3433        scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3434        // Reset to created-like state to allow a second inject (per-task limit is 1,
3435        // so use a fresh task ID for the verified source).
3436        scheduler
3437            .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3438            .expect("second inject must succeed");
3439
3440        let _ = scheduler.tick();
3441        let max_after_second_inject = scheduler.max_parallel;
3442        assert_eq!(
3443            max_after_second_inject, expected_max_parallel,
3444            "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3445        );
3446    }
3447
3448    // --- VMAO adaptive replanning accessor tests ---
3449
3450    #[test]
3451    fn completeness_threshold_returns_config_value() {
3452        let mut config = make_config();
3453        config.completeness_threshold = 0.85;
3454        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3455        let scheduler =
3456            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3457        assert!((scheduler.completeness_threshold() - 0.85).abs() < f32::EPSILON);
3458    }
3459
3460    #[test]
3461    fn completeness_threshold_default_is_0_7() {
3462        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3463        let scheduler = make_scheduler(graph);
3464        assert!((scheduler.completeness_threshold() - 0.7).abs() < f32::EPSILON);
3465    }
3466
3467    #[test]
3468    fn verify_provider_name_returns_config_value() {
3469        let mut config = make_config();
3470        config.verify_provider = zeph_config::ProviderName::new("fast");
3471        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3472        let scheduler =
3473            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3474        assert_eq!(scheduler.verify_provider_name(), "fast");
3475    }
3476
3477    #[test]
3478    fn verify_provider_name_empty_when_not_set() {
3479        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3480        let scheduler = make_scheduler(graph);
3481        assert_eq!(scheduler.verify_provider_name(), "");
3482    }
3483
3484    #[test]
3485    fn max_replans_remaining_initial_equals_max_replans() {
3486        let mut config = make_config();
3487        config.max_replans = 3;
3488        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3489        let scheduler =
3490            DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3491        assert_eq!(scheduler.max_replans_remaining(), 3);
3492    }
3493
3494    #[test]
3495    fn max_replans_remaining_decrements_after_record() {
3496        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3497        let mut scheduler = make_scheduler(graph);
3498        assert_eq!(scheduler.max_replans_remaining(), 2);
3499        scheduler.record_whole_plan_replan();
3500        assert_eq!(scheduler.max_replans_remaining(), 1);
3501        scheduler.record_whole_plan_replan();
3502        assert_eq!(scheduler.max_replans_remaining(), 0);
3503        // saturating: stays at 0
3504        scheduler.record_whole_plan_replan();
3505        assert_eq!(scheduler.max_replans_remaining(), 0);
3506    }
3507
3508    #[test]
3509    fn record_whole_plan_replan_does_not_modify_graph() {
3510        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3511        let mut scheduler = make_scheduler(graph);
3512        let task_count_before = scheduler.graph().tasks.len();
3513        scheduler.record_whole_plan_replan();
3514        assert_eq!(
3515            scheduler.graph().tasks.len(),
3516            task_count_before,
3517            "record_whole_plan_replan must not modify the task graph"
3518        );
3519    }
3520
3521    // --- cascade routing tests ---
3522
3523    fn make_cascade_config() -> zeph_config::OrchestrationConfig {
3524        zeph_config::OrchestrationConfig {
3525            topology_selection: true,
3526            cascade_routing: true,
3527            cascade_failure_threshold: 0.4,
3528            max_parallel: 4,
3529            ..make_config()
3530        }
3531    }
3532
3533    #[test]
3534    fn inject_tasks_resets_cascade_detector() {
3535        // inject_tasks() must call cascade_detector.reset() (C13 fix).
3536        // Verify: recording a failure before inject makes the region healthy again after.
3537        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3538        graph.tasks[0].status = TaskStatus::Completed;
3539        graph.tasks[1].status = TaskStatus::Completed;
3540        let config = make_cascade_config();
3541        let mut scheduler = DagScheduler::new(
3542            graph,
3543            &config,
3544            Box::new(FirstRouter),
3545            vec![make_def("worker")],
3546        )
3547        .unwrap();
3548
3549        // Record a failure in the detector (simulates a failed task before inject).
3550        if let Some(ref mut det) = scheduler.cascade_detector {
3551            let g = &scheduler.graph;
3552            det.record_outcome(TaskId(1), false, g);
3553            // Sanity: region has 1 entry.
3554            assert_eq!(det.region_health().len(), 1);
3555        } else {
3556            panic!(
3557                "cascade_detector must be Some when cascade_routing=true and topology_selection=true"
3558            );
3559        }
3560
3561        // inject_tasks resets the detector.
3562        let new_task = make_node(2, &[1]);
3563        scheduler
3564            .inject_tasks(TaskId(1), vec![new_task], 20)
3565            .unwrap();
3566
3567        assert!(
3568            scheduler
3569                .cascade_detector
3570                .as_ref()
3571                .is_some_and(|d| d.region_health().is_empty()),
3572            "cascade_detector must be cleared after inject_tasks (C13 fix)"
3573        );
3574    }
3575
3576    #[test]
3577    fn sequential_tasks_not_reordered_by_cascade() {
3578        // Sequential tasks must stay at the front of the dispatch queue even when
3579        // their region is cascading (C14 fix): they must not be moved to the deferred set.
3580        //
3581        // Graph: root0 (healthy region), root1 -> t2 (cascading region, Sequential).
3582        // After injecting failures for root1's region, deprioritized = {root1, t2}.
3583        // But t2 is Sequential — it must stay in "preferred" partition.
3584        let mut graph = graph_from_nodes(vec![
3585            make_node(0, &[]),  // root for healthy region
3586            make_node(1, &[]),  // root for cascading region
3587            make_node(2, &[1]), // child of root1
3588        ]);
3589        graph.tasks[2].execution_mode = ExecutionMode::Sequential;
3590        let config = make_cascade_config();
3591        let mut scheduler = DagScheduler::new(
3592            graph,
3593            &config,
3594            Box::new(FirstRouter),
3595            vec![make_def("worker")],
3596        )
3597        .unwrap();
3598
3599        // Record failures for root1's region to make it cascade.
3600        if let Some(ref mut det) = scheduler.cascade_detector {
3601            let g = &scheduler.graph;
3602            // Two failures → rate 1.0 > threshold 0.4
3603            det.record_outcome(TaskId(1), false, g);
3604            det.record_outcome(TaskId(2), false, g);
3605        } else {
3606            panic!("cascade_detector must be Some");
3607        }
3608
3609        // Mark root0 and root1 as Ready (inject nothing — just check tick dispatch order).
3610        // tick() should put Sequential task in preferred, not deferred.
3611        let actions = scheduler.tick();
3612
3613        // Both root tasks (0 and 1) are Ready and root1 is in a cascading region,
3614        // but t2 (Sequential) must not be deprioritized. We verify by checking that
3615        // task 1 (root of cascading region) and task 2 (Sequential child) both appear
3616        // in the Spawn actions — they are not silently deferred behind root0.
3617        let spawned_ids: Vec<TaskId> = actions
3618            .iter()
3619            .filter_map(|a| {
3620                if let SchedulerAction::Spawn { task_id, .. }
3621                | SchedulerAction::RunInline { task_id, .. } = a
3622                {
3623                    Some(*task_id)
3624                } else {
3625                    None
3626                }
3627            })
3628            .collect();
3629
3630        // root0 and root1 are both roots (in-degree 0). root1 is deprioritized,
3631        // but t2 (Sequential) stays in preferred. At minimum root0 or t2/root1 must be spawned.
3632        // The key invariant: the presence of at least one spawn confirms the sequential
3633        // task was not silently dropped.
3634        assert!(
3635            !spawned_ids.is_empty(),
3636            "tick must dispatch at least one ready task; Sequential tasks must not be dropped by cascade logic"
3637        );
3638    }
3639
3640    #[test]
3641    fn cascade_routing_without_topology_selection_creates_no_detector() {
3642        // cascade_routing=true but topology_selection=false must not create a detector
3643        // (the constructor emits a warn but does not fail).
3644        let config = zeph_config::OrchestrationConfig {
3645            cascade_routing: true,
3646            topology_selection: false,
3647            ..make_config()
3648        };
3649        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3650        let scheduler = DagScheduler::new(
3651            graph,
3652            &config,
3653            Box::new(FirstRouter),
3654            vec![make_def("worker")],
3655        )
3656        .unwrap();
3657        assert!(
3658            scheduler.cascade_detector.is_none(),
3659            "cascade_detector must be None when topology_selection=false"
3660        );
3661    }
3662}