Skip to main content

zeph_orchestration/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! DAG execution scheduler: drives task graph execution by emitting `SchedulerAction` commands.
5
6use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{
16    ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
17};
18use super::router::AgentRouter;
19use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
20use super::verifier::inject_tasks as verifier_inject_tasks;
21use zeph_config::OrchestrationConfig;
22use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
23use zeph_subagent::{SubAgentDef, SubAgentError};
24
25/// Actions the scheduler requests the caller to perform.
26///
27/// The scheduler never holds `&mut SubAgentManager` — it produces these
28/// actions for the caller to execute (ADR-026 command pattern).
29#[derive(Debug)]
30pub enum SchedulerAction {
31    /// Spawn a sub-agent for a task.
32    Spawn {
33        task_id: TaskId,
34        agent_def_name: String,
35        prompt: String,
36    },
37    /// Cancel a running sub-agent (on graph abort/skip).
38    Cancel { agent_handle_id: String },
39    /// Execute a task inline via the main agent (no sub-agents configured).
40    RunInline { task_id: TaskId, prompt: String },
41    /// Graph reached a terminal or paused state.
42    Done { status: GraphStatus },
43    /// Request verification of a completed task's output (emitted when `verify_completeness=true`).
44    ///
45    /// The task stays `Completed` during verification. Downstream tasks are unblocked
46    /// immediately — verification is best-effort and does not gate dispatch.
47    /// The caller runs `PlanVerifier::verify()`, then optionally `PlanVerifier::replan()`,
48    /// then calls `DagScheduler::inject_tasks()` if new tasks were generated.
49    Verify { task_id: TaskId, output: String },
50}
51
52/// Event sent by a sub-agent loop when it terminates.
53#[derive(Debug)]
54pub struct TaskEvent {
55    pub task_id: TaskId,
56    pub agent_handle_id: String,
57    pub outcome: TaskOutcome,
58}
59
60/// Outcome of a sub-agent execution.
61#[derive(Debug)]
62pub enum TaskOutcome {
63    /// Agent completed successfully.
64    Completed {
65        output: String,
66        artifacts: Vec<PathBuf>,
67    },
68    /// Agent failed.
69    Failed { error: String },
70}
71
72/// Tracks a running task's spawn time and definition name for timeout detection.
73struct RunningTask {
74    agent_handle_id: String,
75    agent_def_name: String,
76    started_at: Instant,
77}
78
79/// DAG execution engine.
80///
81/// Drives task graph execution by producing `SchedulerAction` values
82/// that the caller executes against `SubAgentManager`.
83///
84/// # Caller Loop
85///
86/// ```text
87/// loop {
88///     let actions = scheduler.tick();
89///     for action in actions {
90///         match action {
91///             Spawn { task_id, agent_def_name, prompt } => {
92///                 match manager.spawn_for_task(...) {
93///                     Ok(handle_id) => scheduler.record_spawn(task_id, handle_id),
94///                     Err(e) => { for a in scheduler.record_spawn_failure(task_id, &e) { /* exec */ } }
95///                 }
96///             }
97///             Cancel { agent_handle_id } => { manager.cancel(&agent_handle_id); }
98///             Done { .. } => break,
99///         }
100///     }
101///     scheduler.wait_event().await;
102/// }
103/// ```
104pub struct DagScheduler {
105    graph: TaskGraph,
106    max_parallel: usize,
107    /// Immutable base parallelism limit from config. Never changes after construction.
108    ///
109    /// `max_parallel` is derived from this via `TopologyClassifier::compute_max_parallel`
110    /// and may be lower (e.g., 1 for `LinearChain`). Using `config_max_parallel` as the
111    /// base prevents drift: successive replan cycles always compute from the original
112    /// config value, not from a previously reduced `max_parallel`.
113    config_max_parallel: usize,
114    /// Maps `TaskId` -> running sub-agent state.
115    running: HashMap<TaskId, RunningTask>,
116    /// Receives completion/failure events from sub-agent loops.
117    event_rx: mpsc::Receiver<TaskEvent>,
118    /// Sender cloned into each spawned sub-agent via `spawn_for_task`.
119    event_tx: mpsc::Sender<TaskEvent>,
120    /// Per-task wall-clock timeout.
121    task_timeout: Duration,
122    /// Router for agent selection.
123    router: Box<dyn AgentRouter>,
124    /// Available agent definitions (cached from `SubAgentManager`).
125    available_agents: Vec<SubAgentDef>,
126    /// Total character budget for cross-task dependency context injection.
127    dependency_context_budget: usize,
128    /// Events buffered by `wait_event` for processing in the next `tick`.
129    buffered_events: VecDeque<TaskEvent>,
130    /// Sanitizer for dependency output injected into task prompts (SEC-ORCH-01).
131    sanitizer: ContentSanitizer,
132    /// Backoff duration before retrying deferred tasks when all ready tasks hit the concurrency limit.
133    deferral_backoff: Duration,
134    /// Consecutive spawn failures due to concurrency limits. Used to compute exponential backoff.
135    consecutive_spawn_failures: u32,
136    /// Topology analysis result. Recomputed on next tick when `topology_dirty=true`.
137    topology: TopologyAnalysis,
138    /// When true, topology is re-analyzed at the start of the next tick.
139    /// Set by `inject_tasks()` after appending replan tasks (critic C2).
140    topology_dirty: bool,
141    /// Current dispatch level for `LevelBarrier` strategy.
142    current_level: usize,
143    /// Whether post-task verification is enabled (`config.verify_completeness`).
144    verify_completeness: bool,
145    /// Provider name for verification LLM calls (`config.verify_provider`).
146    /// Empty string = use the agent's primary provider.
147    verify_provider: String,
148    /// Per-task replan count. Limits replanning to 1 cycle per task (critic S2).
149    task_replan_counts: HashMap<TaskId, u32>,
150    /// Global replan counter across the entire scheduler run (critic S2).
151    global_replan_count: u32,
152    /// Global replan hard cap from config.
153    max_replans: u32,
154}
155
156impl std::fmt::Debug for DagScheduler {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("DagScheduler")
159            .field("graph_id", &self.graph.id)
160            .field("graph_status", &self.graph.status)
161            .field("running_count", &self.running.len())
162            .field("max_parallel", &self.max_parallel)
163            .field("task_timeout_secs", &self.task_timeout.as_secs())
164            .field("topology", &self.topology.topology)
165            .field("strategy", &self.topology.strategy)
166            .field("current_level", &self.current_level)
167            .field("global_replan_count", &self.global_replan_count)
168            .finish_non_exhaustive()
169    }
170}
171
172impl DagScheduler {
173    /// Create a new scheduler for the given graph.
174    ///
175    /// The graph must be in `Created` status. The scheduler transitions
176    /// it to `Running` and marks root tasks as `Ready`.
177    ///
178    /// # Errors
179    ///
180    /// Returns `OrchestrationError::InvalidGraph` if the graph is not in
181    /// `Created` status or has no tasks.
182    pub fn new(
183        mut graph: TaskGraph,
184        config: &OrchestrationConfig,
185        router: Box<dyn AgentRouter>,
186        available_agents: Vec<SubAgentDef>,
187    ) -> Result<Self, OrchestrationError> {
188        if graph.status != GraphStatus::Created {
189            return Err(OrchestrationError::InvalidGraph(format!(
190                "graph must be in Created status, got {}",
191                graph.status
192            )));
193        }
194
195        dag::validate(&graph.tasks, config.max_tasks as usize)?;
196
197        graph.status = GraphStatus::Running;
198
199        for task in &mut graph.tasks {
200            if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
201                task.status = TaskStatus::Ready;
202            }
203        }
204
205        let (event_tx, event_rx) = mpsc::channel(64);
206
207        let task_timeout = if config.task_timeout_secs > 0 {
208            Duration::from_secs(config.task_timeout_secs)
209        } else {
210            Duration::from_secs(600)
211        };
212
213        let topology = TopologyClassifier::analyze(&graph, config);
214        let max_parallel = topology.max_parallel;
215        let config_max_parallel = config.max_parallel as usize;
216
217        if config.topology_selection {
218            tracing::debug!(
219                topology = ?topology.topology,
220                strategy = ?topology.strategy,
221                max_parallel,
222                "topology-aware concurrency limit applied"
223            );
224        }
225
226        Ok(Self {
227            graph,
228            max_parallel,
229            config_max_parallel,
230            running: HashMap::new(),
231            event_rx,
232            event_tx,
233            task_timeout,
234            router,
235            available_agents,
236            dependency_context_budget: config.dependency_context_budget,
237            buffered_events: VecDeque::new(),
238            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
239            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
240            consecutive_spawn_failures: 0,
241            topology,
242            topology_dirty: false,
243            current_level: 0,
244            verify_completeness: config.verify_completeness,
245            verify_provider: config.verify_provider.trim().to_string(),
246            task_replan_counts: HashMap::new(),
247            global_replan_count: 0,
248            max_replans: config.max_replans,
249        })
250    }
251
252    /// Create a scheduler from a graph that is in `Paused` or `Failed` status.
253    ///
254    /// Used for resume and retry flows. The caller is responsible for calling
255    /// [`dag::reset_for_retry`] (for retry) before passing the graph here.
256    ///
257    /// This constructor sets `graph.status = Running` (II3) and reconstructs
258    /// the `running` map from tasks that are still in `Running` state (IC1), so
259    /// their completion events are not silently dropped on the next tick.
260    ///
261    /// # Errors
262    ///
263    /// Returns `OrchestrationError::InvalidGraph` if the graph is in `Completed`
264    /// or `Canceled` status (terminal states that cannot be resumed).
265    pub fn resume_from(
266        mut graph: TaskGraph,
267        config: &OrchestrationConfig,
268        router: Box<dyn AgentRouter>,
269        available_agents: Vec<SubAgentDef>,
270    ) -> Result<Self, OrchestrationError> {
271        if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
272            return Err(OrchestrationError::InvalidGraph(format!(
273                "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
274                graph.status
275            )));
276        }
277
278        // II3: ensure the graph is in Running state so tick() does not immediately
279        // return Done{Paused}.
280        graph.status = GraphStatus::Running;
281
282        // IC1: reconstruct the `running` map from tasks that were still Running at
283        // pause time. Without this their completion events would arrive but
284        // process_event would ignore them (it checks self.running), leaving the
285        // task stuck until timeout.
286        let running: HashMap<TaskId, RunningTask> = graph
287            .tasks
288            .iter()
289            .filter(|t| t.status == TaskStatus::Running)
290            .filter_map(|t| {
291                let handle_id = t.assigned_agent.clone()?;
292                let def_name = t.agent_hint.clone().unwrap_or_default();
293                Some((
294                    t.id,
295                    RunningTask {
296                        agent_handle_id: handle_id,
297                        agent_def_name: def_name,
298                        // Conservative: treat as just-started so timeout window is reset.
299                        started_at: Instant::now(),
300                    },
301                ))
302            })
303            .collect();
304
305        let (event_tx, event_rx) = mpsc::channel(64);
306
307        let task_timeout = if config.task_timeout_secs > 0 {
308            Duration::from_secs(config.task_timeout_secs)
309        } else {
310            Duration::from_secs(600)
311        };
312
313        let topology = TopologyClassifier::analyze(&graph, config);
314        let max_parallel = topology.max_parallel;
315        let config_max_parallel = config.max_parallel as usize;
316
317        Ok(Self {
318            graph,
319            max_parallel,
320            config_max_parallel,
321            running,
322            event_rx,
323            event_tx,
324            task_timeout,
325            router,
326            available_agents,
327            dependency_context_budget: config.dependency_context_budget,
328            buffered_events: VecDeque::new(),
329            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
330            deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
331            consecutive_spawn_failures: 0,
332            topology,
333            topology_dirty: false,
334            current_level: 0,
335            verify_completeness: config.verify_completeness,
336            verify_provider: config.verify_provider.trim().to_string(),
337            task_replan_counts: HashMap::new(),
338            global_replan_count: 0,
339            max_replans: config.max_replans,
340        })
341    }
342
343    /// Validate that `verify_provider` references a known provider name.
344    ///
345    /// Call this after construction when `verify_completeness = true` to catch
346    /// misconfiguration early rather than failing open at runtime.
347    ///
348    /// - Empty `verify_provider` is always valid (falls back to the primary provider).
349    /// - If `provider_names` is empty, validation is skipped (provider set is unknown).
350    /// - Provider names are compared case-sensitively (matching the existing resolution convention).
351    ///
352    /// # Errors
353    ///
354    /// Returns `OrchestrationError::InvalidConfig` when `verify_completeness = true`,
355    /// `verify_provider` is non-empty, and the name is not present in `provider_names`.
356    pub fn validate_verify_config(
357        &self,
358        provider_names: &[&str],
359    ) -> Result<(), OrchestrationError> {
360        if !self.verify_completeness {
361            return Ok(());
362        }
363        let name = self.verify_provider.as_str();
364        if name.is_empty() || provider_names.is_empty() {
365            return Ok(());
366        }
367        if !provider_names.contains(&name) {
368            return Err(OrchestrationError::InvalidConfig(format!(
369                "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
370                name,
371                provider_names.join(", ")
372            )));
373        }
374        Ok(())
375    }
376
377    /// Get a clone of the event sender for injection into sub-agent loops.
378    #[must_use]
379    pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
380        self.event_tx.clone()
381    }
382
383    /// Immutable reference to the current graph state.
384    #[must_use]
385    pub fn graph(&self) -> &TaskGraph {
386        &self.graph
387    }
388
389    /// Return the final graph state.
390    ///
391    /// Clones the graph since `Drop` is implemented on the scheduler.
392    #[must_use]
393    pub fn into_graph(&self) -> TaskGraph {
394        self.graph.clone()
395    }
396
397    /// Current topology analysis.
398    #[must_use]
399    pub fn topology(&self) -> &TopologyAnalysis {
400        &self.topology
401    }
402
403    /// Inject new tasks into the graph after a verify-replan cycle.
404    ///
405    /// Appends tasks and validates DAG acyclicity. Sets `topology_dirty=true` so
406    /// topology is re-analyzed at the start of the next `tick()`. Does NOT
407    /// re-analyze topology here (critic C2 — topology computed during injection
408    /// would be stale by the next tick).
409    ///
410    /// Per-task replan cap: each task is limited to 1 replan (critic S2).
411    /// Global hard cap: total replan count across the run is limited to `max_replans`.
412    ///
413    /// # Errors
414    ///
415    /// Returns `OrchestrationError::VerificationFailed` if the graph would exceed
416    /// `max_tasks` or injection introduces a cycle.
417    pub fn inject_tasks(
418        &mut self,
419        verified_task_id: TaskId,
420        new_tasks: Vec<TaskNode>,
421        max_tasks: usize,
422    ) -> Result<(), OrchestrationError> {
423        if new_tasks.is_empty() {
424            return Ok(());
425        }
426
427        // Per-task replan limit: 1 replan per task (critic S2).
428        let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
429        if *task_replan_count >= 1 {
430            tracing::warn!(
431                task_id = %verified_task_id,
432                "per-task replan limit (1) reached, skipping replan injection"
433            );
434            return Ok(());
435        }
436
437        // Global hard cap (critic S2).
438        if self.global_replan_count >= self.max_replans {
439            tracing::warn!(
440                global_replan_count = self.global_replan_count,
441                max_replans = self.max_replans,
442                "global replan limit reached, skipping replan injection"
443            );
444            return Ok(());
445        }
446
447        verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
448
449        *task_replan_count += 1;
450        self.global_replan_count += 1;
451
452        // Signal that topology needs re-analysis on the next tick (critic C2).
453        self.topology_dirty = true;
454
455        Ok(())
456    }
457}
458
459impl Drop for DagScheduler {
460    fn drop(&mut self) {
461        if !self.running.is_empty() {
462            tracing::warn!(
463                running_tasks = self.running.len(),
464                "DagScheduler dropped with running tasks; agents may continue until their \
465                 CancellationToken fires or they complete naturally"
466            );
467        }
468    }
469}
470
471impl DagScheduler {
472    /// Process pending events and produce actions for the caller.
473    ///
474    /// Call `wait_event` after processing all actions to block until the next event.
475    #[allow(clippy::too_many_lines)]
476    pub fn tick(&mut self) -> Vec<SchedulerAction> {
477        if self.graph.status != GraphStatus::Running {
478            return vec![SchedulerAction::Done {
479                status: self.graph.status,
480            }];
481        }
482
483        // Re-analyze topology when task set has changed (inject_tasks was called).
484        // Deferred from inject_tasks() to avoid stale analysis (critic C2).
485        if self.topology_dirty {
486            // Rebuild with topology_selection=true config equivalent: reuse existing analysis
487            // approach. Since we don't store the config here, re-analyze using a stub config
488            // that reflects what was originally set up.
489            let new_analysis = {
490                let n = self.graph.tasks.len();
491                if n == 0 {
492                    TopologyAnalysis {
493                        topology: Topology::AllParallel,
494                        strategy: DispatchStrategy::FullParallel,
495                        // Use the immutable config base so the fallback is consistent with
496                        // the initial analysis (not a previously reduced max_parallel).
497                        max_parallel: self.config_max_parallel,
498                        depth: 0,
499                        depths: std::collections::HashMap::new(),
500                    }
501                } else {
502                    // Compute depths once, then classify using pre-computed values.
503                    // This eliminates a redundant toposort compared to calling classify()
504                    // (which internally calls compute_longest_path_and_depths again).
505                    let (depth, depths) =
506                        super::topology::compute_depths_for_scheduler(&self.graph);
507                    let topo =
508                        TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
509                    let strategy = TopologyClassifier::strategy(topo);
510                    // ALWAYS use config_max_parallel as the base, never self.max_parallel,
511                    // to prevent drift across successive replan cycles (architect S2).
512                    let max_parallel =
513                        TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
514                    TopologyAnalysis {
515                        topology: topo,
516                        strategy,
517                        max_parallel,
518                        depth,
519                        depths,
520                    }
521                }
522            };
523            self.topology = new_analysis;
524            // Sync max_parallel to match the newly computed topology analysis (architect S2).
525            // The dispatch logic at line 559 reads self.max_parallel for slot computation;
526            // without this sync, self.max_parallel and self.topology.max_parallel diverge.
527            self.max_parallel = self.topology.max_parallel;
528            self.topology_dirty = false;
529
530            // After re-analysis, reset the level pointer to the shallowest non-terminal depth
531            // so injected tasks at lower depths are not skipped by the LevelBarrier.
532            // Uses .min() (not unconditional reset to 0) to preserve forward progress when
533            // injected tasks are at equal or deeper levels than current_level.
534            if self.topology.strategy == DispatchStrategy::LevelBarrier {
535                let min_active = self
536                    .graph
537                    .tasks
538                    .iter()
539                    .filter(|t| !t.status.is_terminal())
540                    .filter_map(|t| self.topology.depths.get(&t.id).copied())
541                    .min();
542                if let Some(min_depth) = min_active {
543                    self.current_level = self.current_level.min(min_depth);
544                }
545            }
546        }
547
548        let mut actions = Vec::new();
549
550        // Drain events buffered by wait_event, then any new ones in the channel.
551        while let Some(event) = self.buffered_events.pop_front() {
552            let cancel_actions = self.process_event(event);
553            actions.extend(cancel_actions);
554        }
555        while let Ok(event) = self.event_rx.try_recv() {
556            let cancel_actions = self.process_event(event);
557            actions.extend(cancel_actions);
558        }
559
560        if self.graph.status != GraphStatus::Running {
561            return actions;
562        }
563
564        // Check for timed-out tasks.
565        let timeout_actions = self.check_timeouts();
566        actions.extend(timeout_actions);
567
568        if self.graph.status != GraphStatus::Running {
569            return actions;
570        }
571
572        // Dispatch ready tasks up to max_parallel slots. Concurrency is pre-enforced here
573        // (topology-aware cap) and also enforced by SubAgentManager::spawn() returning
574        // ConcurrencyLimit when active + reserved >= max_concurrent.
575        // Non-transient spawn failures are handled by record_spawn_failure(); optimistic
576        // Running marks are reverted to Ready for ConcurrencyLimit errors.
577        let ready = dag::ready_tasks(&self.graph);
578
579        // LevelBarrier: advance the current level when all tasks at this level are terminal.
580        // This handles failures/skips correctly because propagate_failure() uses BFS to
581        // transitively mark all non-terminal dependents as Skipped before tick() runs dispatch.
582        if self.topology.strategy == DispatchStrategy::LevelBarrier {
583            let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
584                let task_depth = self
585                    .topology
586                    .depths
587                    .get(&t.id)
588                    .copied()
589                    .unwrap_or(usize::MAX);
590                task_depth != self.current_level || t.status.is_terminal()
591            });
592            if all_current_level_terminal {
593                // Advance until we find a level with non-terminal tasks, or exhaust all levels.
594                let max_depth = self.topology.depth;
595                while self.current_level <= max_depth {
596                    let has_non_terminal = self.graph.tasks.iter().any(|t| {
597                        let d = self
598                            .topology
599                            .depths
600                            .get(&t.id)
601                            .copied()
602                            .unwrap_or(usize::MAX);
603                        d == self.current_level && !t.status.is_terminal()
604                    });
605                    if has_non_terminal {
606                        break;
607                    }
608                    self.current_level += 1;
609                }
610            }
611        }
612
613        // Available dispatch slots for this tick.
614        let mut slots = self.max_parallel.saturating_sub(self.running.len());
615
616        // For sequential dispatch: track whether we already scheduled one sequential task
617        // this tick AND whether any sequential task is currently running.
618        let mut sequential_spawned_this_tick = false;
619        let has_running_sequential = self
620            .running
621            .keys()
622            .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
623
624        for task_id in ready {
625            if slots == 0 {
626                break;
627            }
628
629            // LevelBarrier: only dispatch tasks at the current level.
630            if self.topology.strategy == DispatchStrategy::LevelBarrier {
631                let task_depth = self
632                    .topology
633                    .depths
634                    .get(&task_id)
635                    .copied()
636                    .unwrap_or(usize::MAX);
637                if task_depth != self.current_level {
638                    continue;
639                }
640            }
641
642            let task = &self.graph.tasks[task_id.index()];
643
644            // Sequential tasks: only one may run at a time within the scheduler.
645            // Independent sequential tasks in separate DAG branches are still
646            // serialized here (they share exclusive-resource intent by annotation).
647            if task.execution_mode == ExecutionMode::Sequential {
648                if sequential_spawned_this_tick || has_running_sequential {
649                    continue;
650                }
651                sequential_spawned_this_tick = true;
652            }
653
654            let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
655                tracing::debug!(
656                    task_id = %task_id,
657                    title = %task.title,
658                    "no agent available, routing task to main agent inline"
659                );
660                let prompt = self.build_task_prompt(task);
661                self.graph.tasks[task_id.index()].status = TaskStatus::Running;
662                actions.push(SchedulerAction::RunInline { task_id, prompt });
663                slots -= 1;
664                continue;
665            };
666
667            let prompt = self.build_task_prompt(task);
668
669            // Mark task as Running optimistically (before record_spawn is called).
670            self.graph.tasks[task_id.index()].status = TaskStatus::Running;
671
672            actions.push(SchedulerAction::Spawn {
673                task_id,
674                agent_def_name,
675                prompt,
676            });
677            slots -= 1;
678        }
679
680        // Check for completion or deadlock.
681        // Use graph Running status count to avoid false positives while Spawn actions
682        // are in-flight (record_spawn hasn't been called yet for freshly emitted spawns).
683        // Note: non-transient spawn failures (e.g. capability errors) are handled by
684        // record_spawn_failure() which marks the task Failed and propagates failure per
685        // the task's FailureStrategy — this detector does not fire for those cases because
686        // failed tasks are terminal and dag::ready_tasks() returns their unblocked dependents.
687        // ConcurrencyLimit errors are transient: record_spawn_failure() reverts the task
688        // from Running back to Ready, so ready_tasks() is non-empty and deadlock is not
689        // triggered.
690        let running_in_graph_now = self
691            .graph
692            .tasks
693            .iter()
694            .filter(|t| t.status == TaskStatus::Running)
695            .count();
696        if running_in_graph_now == 0 && self.running.is_empty() {
697            let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
698            if all_terminal {
699                self.graph.status = GraphStatus::Completed;
700                self.graph.finished_at = Some(super::graph::chrono_now());
701                actions.push(SchedulerAction::Done {
702                    status: GraphStatus::Completed,
703                });
704            } else if dag::ready_tasks(&self.graph).is_empty() {
705                tracing::error!(
706                    "scheduler deadlock: no running or ready tasks, but graph not complete"
707                );
708                self.graph.status = GraphStatus::Failed;
709                self.graph.finished_at = Some(super::graph::chrono_now());
710                // Invariant: deadlock fires only when self.running is empty (checked above).
711                debug_assert!(
712                    self.running.is_empty(),
713                    "deadlock branch reached with non-empty running map"
714                );
715                for task in &mut self.graph.tasks {
716                    if !task.status.is_terminal() {
717                        task.status = TaskStatus::Canceled;
718                    }
719                }
720                actions.push(SchedulerAction::Done {
721                    status: GraphStatus::Failed,
722                });
723            }
724        }
725
726        actions
727    }
728
729    /// Wait for the next event from a running sub-agent.
730    ///
731    /// Buffers the received event for processing in the next `tick` call.
732    /// Returns immediately if no tasks are running. Uses a timeout so that
733    /// periodic timeout checking can occur.
734    /// Compute the current deferral backoff with exponential growth capped at 5 seconds.
735    ///
736    /// Each consecutive spawn failure due to concurrency limits doubles the base backoff.
737    fn current_deferral_backoff(&self) -> Duration {
738        const MAX_BACKOFF: Duration = Duration::from_secs(5);
739        let multiplier = 1u32
740            .checked_shl(self.consecutive_spawn_failures.min(10))
741            .unwrap_or(u32::MAX);
742        self.deferral_backoff
743            .saturating_mul(multiplier)
744            .min(MAX_BACKOFF)
745    }
746
747    pub async fn wait_event(&mut self) {
748        if self.running.is_empty() {
749            tokio::time::sleep(self.current_deferral_backoff()).await;
750            return;
751        }
752
753        // Find the nearest timeout deadline among running tasks.
754        let nearest_timeout = self
755            .running
756            .values()
757            .map(|r| {
758                self.task_timeout
759                    .checked_sub(r.started_at.elapsed())
760                    .unwrap_or(Duration::ZERO)
761            })
762            .min()
763            .unwrap_or(Duration::from_secs(1));
764
765        // Clamp to at least 100 ms to avoid busy-looping.
766        let wait_duration = nearest_timeout.max(Duration::from_millis(100));
767
768        tokio::select! {
769            Some(event) = self.event_rx.recv() => {
770                // SEC-ORCH-02: guard against unbounded buffer growth. Use total task
771                // count rather than max_parallel so that parallel bursts exceeding
772                // max_parallel do not cause premature event drops.
773                if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
774                    // PERF-SCHED-02: log at error level — a dropped completion event
775                    // leaves a task stuck in Running until its timeout fires.
776                    if let Some(dropped) = self.buffered_events.pop_front() {
777                        tracing::error!(
778                            task_id = %dropped.task_id,
779                            buffer_len = self.buffered_events.len(),
780                            "event buffer saturated; completion event dropped — task may \
781                             remain Running until timeout"
782                        );
783                    }
784                }
785                self.buffered_events.push_back(event);
786            }
787            () = tokio::time::sleep(wait_duration) => {}
788        }
789    }
790
791    /// Record that a spawn action was successfully executed.
792    ///
793    /// Called by the caller after successfully spawning via `SubAgentManager`.
794    ///
795    /// Resets `consecutive_spawn_failures` to 0 as a "spawn succeeded = scheduler healthy"
796    /// signal. This is intentionally separate from the batch-level backoff in
797    /// [`record_batch_backoff`]: `record_spawn` provides an immediate reset on the first
798    /// success within a batch, while `record_batch_backoff` governs the tick-granular
799    /// failure counter used for exponential wait backoff.
800    pub fn record_spawn(
801        &mut self,
802        task_id: TaskId,
803        agent_handle_id: String,
804        agent_def_name: String,
805    ) {
806        self.consecutive_spawn_failures = 0;
807        self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
808        self.running.insert(
809            task_id,
810            RunningTask {
811                agent_handle_id,
812                agent_def_name,
813                started_at: Instant::now(),
814            },
815        );
816    }
817
818    /// Handle a failed spawn attempt.
819    ///
820    /// If the error is a transient concurrency-limit rejection, reverts the task from
821    /// Running back to `Ready` so the next [`tick`] can retry the spawn when a slot opens.
822    /// Otherwise, marks the task as `Failed` and propagates failure.
823    /// Returns any cancel actions needed.
824    ///
825    /// # Errors (via returned actions)
826    ///
827    /// Propagates failure per the task's effective `FailureStrategy`.
828    pub fn record_spawn_failure(
829        &mut self,
830        task_id: TaskId,
831        error: &SubAgentError,
832    ) -> Vec<SchedulerAction> {
833        // Transient condition: the SubAgentManager rejected the spawn because all
834        // concurrency slots are occupied. Revert to Ready so the next tick retries.
835        // consecutive_spawn_failures is updated batch-wide by record_batch_backoff().
836        if let SubAgentError::ConcurrencyLimit { active, max } = error {
837            tracing::warn!(
838                task_id = %task_id,
839                active,
840                max,
841                next_backoff_ms = self.current_deferral_backoff().as_millis(),
842                "concurrency limit reached, deferring task to next tick"
843            );
844            self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
845            return Vec::new();
846        }
847
848        // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
849        let error_excerpt: String = error.to_string().chars().take(512).collect();
850        tracing::warn!(
851            task_id = %task_id,
852            error = %error_excerpt,
853            "spawn failed, marking task failed"
854        );
855        self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
856        let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
857        let mut actions = Vec::new();
858        for cancel_task_id in cancel_ids {
859            if let Some(running) = self.running.remove(&cancel_task_id) {
860                actions.push(SchedulerAction::Cancel {
861                    agent_handle_id: running.agent_handle_id,
862                });
863            }
864        }
865        if self.graph.status != GraphStatus::Running {
866            self.graph.finished_at = Some(super::graph::chrono_now());
867            actions.push(SchedulerAction::Done {
868                status: self.graph.status,
869            });
870        }
871        actions
872    }
873
874    /// Update the batch-level backoff counter after processing a full tick's spawn batch.
875    ///
876    /// With parallel dispatch a single tick may produce N Spawn actions. Individual
877    /// per-spawn counter updates would miscount concurrent rejections as "consecutive"
878    /// failures. This method captures the batch semantics instead:
879    /// - If any spawn succeeded → reset the counter (scheduler is healthy).
880    /// - Else if any spawn hit `ConcurrencyLimit` → this entire tick was a deferral tick.
881    /// - If neither → no spawns were attempted; counter unchanged.
882    pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
883        if any_success {
884            self.consecutive_spawn_failures = 0;
885        } else if any_concurrency_failure {
886            self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
887        }
888    }
889
890    /// Cancel all running tasks (for user-initiated plan cancellation).
891    ///
892    /// # Warning: Cooperative Cancellation
893    ///
894    /// Cancellation is cooperative and asynchronous. Tool operations (file writes, shell
895    /// executions) in progress at the time of cancellation complete before the agent loop
896    /// checks the cancellation token. Callers should inspect the task graph state and clean
897    /// up partially-written artifacts manually.
898    pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
899        self.graph.status = GraphStatus::Canceled;
900        self.graph.finished_at = Some(super::graph::chrono_now());
901
902        // Drain running map first to avoid split borrow issues (M3).
903        let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
904        let mut actions: Vec<SchedulerAction> = running
905            .into_iter()
906            .map(|(task_id, r)| {
907                self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
908                SchedulerAction::Cancel {
909                    agent_handle_id: r.agent_handle_id,
910                }
911            })
912            .collect();
913
914        for task in &mut self.graph.tasks {
915            if !task.status.is_terminal() {
916                task.status = TaskStatus::Canceled;
917            }
918        }
919
920        actions.push(SchedulerAction::Done {
921            status: GraphStatus::Canceled,
922        });
923        actions
924    }
925}
926
927impl DagScheduler {
928    /// Process a single `TaskEvent` and return any cancel actions needed.
929    fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
930        let TaskEvent {
931            task_id,
932            agent_handle_id,
933            outcome,
934        } = event;
935
936        // Guard against stale events from previous incarnations (e.g. after timeout+retry).
937        // A timed-out agent's event_tx outlives the timeout and may send a completion later.
938        match self.running.get(&task_id) {
939            Some(running) if running.agent_handle_id != agent_handle_id => {
940                tracing::warn!(
941                    task_id = %task_id,
942                    expected = %running.agent_handle_id,
943                    got = %agent_handle_id,
944                    "discarding stale event from previous agent incarnation"
945                );
946                return Vec::new();
947            }
948            None => {
949                tracing::debug!(
950                    task_id = %task_id,
951                    agent_handle_id = %agent_handle_id,
952                    "ignoring event for task not in running map"
953                );
954                return Vec::new();
955            }
956            Some(_) => {}
957        }
958
959        // Compute duration BEFORE removing from running map (C1 fix).
960        let duration_ms = self.running.get(&task_id).map_or(0, |r| {
961            u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
962        });
963        let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
964
965        self.running.remove(&task_id);
966
967        match outcome {
968            TaskOutcome::Completed { output, artifacts } => {
969                self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
970                self.graph.tasks[task_id.index()].result = Some(TaskResult {
971                    output: output.clone(),
972                    artifacts,
973                    duration_ms,
974                    agent_id: Some(agent_handle_id),
975                    agent_def: agent_def_name,
976                });
977
978                // Mark newly unblocked tasks as Ready.
979                // Downstream tasks are unblocked immediately — verification does not gate dispatch.
980                let newly_ready = dag::ready_tasks(&self.graph);
981                for ready_id in newly_ready {
982                    if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
983                        self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
984                    }
985                }
986
987                // Emit Verify action when verify_completeness is enabled.
988                // The replan budget is enforced inside inject_tasks() — the observation
989                // (emitting Verify) must not be gated on the mutation budget, or tasks
990                // after budget exhaustion never receive verification at all.
991                // max_replans=0 still emits Verify; gaps are logged only (no inject_tasks call).
992                if self.verify_completeness {
993                    vec![SchedulerAction::Verify { task_id, output }]
994                } else {
995                    Vec::new()
996                }
997            }
998
999            TaskOutcome::Failed { error } => {
1000                // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
1001                let error_excerpt: String = error.chars().take(512).collect();
1002                tracing::warn!(
1003                    task_id = %task_id,
1004                    error = %error_excerpt,
1005                    "task failed"
1006                );
1007                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1008
1009                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1010                let mut actions = Vec::new();
1011
1012                for cancel_task_id in cancel_ids {
1013                    if let Some(running) = self.running.remove(&cancel_task_id) {
1014                        actions.push(SchedulerAction::Cancel {
1015                            agent_handle_id: running.agent_handle_id,
1016                        });
1017                    }
1018                }
1019
1020                if self.graph.status != GraphStatus::Running {
1021                    self.graph.finished_at = Some(super::graph::chrono_now());
1022                    actions.push(SchedulerAction::Done {
1023                        status: self.graph.status,
1024                    });
1025                }
1026
1027                actions
1028            }
1029        }
1030    }
1031
1032    /// Check all running tasks for timeout violations.
1033    ///
1034    /// # Warning: Cooperative Cancellation
1035    ///
1036    /// Cancel actions emitted here signal agents cooperatively. Tool operations in progress
1037    /// at the time of cancellation complete before the agent loop checks the cancellation
1038    /// token. Partially-written artifacts may remain on disk after cancellation.
1039    fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1040        let timed_out: Vec<(TaskId, String)> = self
1041            .running
1042            .iter()
1043            .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1044            .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1045            .collect();
1046
1047        let mut actions = Vec::new();
1048        for (task_id, agent_handle_id) in timed_out {
1049            tracing::warn!(
1050                task_id = %task_id,
1051                timeout_secs = self.task_timeout.as_secs(),
1052                "task timed out"
1053            );
1054            self.running.remove(&task_id);
1055            self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1056
1057            actions.push(SchedulerAction::Cancel { agent_handle_id });
1058
1059            let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1060            for cancel_task_id in cancel_ids {
1061                if let Some(running) = self.running.remove(&cancel_task_id) {
1062                    actions.push(SchedulerAction::Cancel {
1063                        agent_handle_id: running.agent_handle_id,
1064                    });
1065                }
1066            }
1067
1068            if self.graph.status != GraphStatus::Running {
1069                self.graph.finished_at = Some(super::graph::chrono_now());
1070                actions.push(SchedulerAction::Done {
1071                    status: self.graph.status,
1072                });
1073                break;
1074            }
1075        }
1076
1077        actions
1078    }
1079
1080    /// Build the task prompt with dependency context injection (Section 14).
1081    ///
1082    /// Uses char-boundary-safe truncation (S1 fix) to avoid panics on multi-byte UTF-8.
1083    /// Dependency output is sanitized (SEC-ORCH-01) and titles are XML-escaped to prevent
1084    /// prompt injection via crafted task outputs.
1085    fn build_task_prompt(&self, task: &TaskNode) -> String {
1086        if task.depends_on.is_empty() {
1087            return task.description.clone();
1088        }
1089
1090        let completed_deps: Vec<&TaskNode> = task
1091            .depends_on
1092            .iter()
1093            .filter_map(|dep_id| {
1094                let dep = &self.graph.tasks[dep_id.index()];
1095                if dep.status == TaskStatus::Completed {
1096                    Some(dep)
1097                } else {
1098                    None
1099                }
1100            })
1101            .collect();
1102
1103        if completed_deps.is_empty() {
1104            return task.description.clone();
1105        }
1106
1107        let budget_per_dep = self
1108            .dependency_context_budget
1109            .checked_div(completed_deps.len())
1110            .unwrap_or(self.dependency_context_budget);
1111
1112        let mut context_block = String::from("<completed-dependencies>\n");
1113
1114        for dep in &completed_deps {
1115            // SEC-ORCH-01: XML-escape dep.id and dep.title to prevent breaking out of the
1116            // <completed-dependencies> wrapper via crafted titles.
1117            let escaped_id = xml_escape(&dep.id.to_string());
1118            let escaped_title = xml_escape(&dep.title);
1119            let _ = writeln!(
1120                context_block,
1121                "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1122            );
1123
1124            if let Some(ref result) = dep.result {
1125                // SEC-ORCH-01: sanitize dep output to prevent prompt injection from upstream tasks.
1126                let source = ContentSource::new(ContentSourceKind::A2aMessage);
1127                let sanitized = self.sanitizer.sanitize(&result.output, source);
1128                let safe_output = sanitized.body;
1129
1130                // Char-boundary-safe truncation (S1): use chars().take() instead of byte slicing.
1131                let char_count = safe_output.chars().count();
1132                if char_count > budget_per_dep {
1133                    let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1134                    let _ = write!(
1135                        context_block,
1136                        "{truncated}...\n[truncated: {char_count} chars total]"
1137                    );
1138                } else {
1139                    context_block.push_str(&safe_output);
1140                }
1141            } else {
1142                context_block.push_str("[no output recorded]\n");
1143            }
1144            context_block.push('\n');
1145        }
1146
1147        // Add notes for skipped deps.
1148        for dep_id in &task.depends_on {
1149            let dep = &self.graph.tasks[dep_id.index()];
1150            if dep.status == TaskStatus::Skipped {
1151                let escaped_id = xml_escape(&dep.id.to_string());
1152                let escaped_title = xml_escape(&dep.title);
1153                let _ = writeln!(
1154                    context_block,
1155                    "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1156                );
1157            }
1158        }
1159
1160        context_block.push_str("</completed-dependencies>\n\n");
1161        format!("{context_block}Your task: {}", task.description)
1162    }
1163}
1164
1165/// Escape XML special characters in a string to prevent tag injection.
1166fn xml_escape(s: &str) -> String {
1167    let mut out = String::with_capacity(s.len());
1168    for c in s.chars() {
1169        match c {
1170            '<' => out.push_str("&lt;"),
1171            '>' => out.push_str("&gt;"),
1172            '&' => out.push_str("&amp;"),
1173            '"' => out.push_str("&quot;"),
1174            '\'' => out.push_str("&#39;"),
1175            other => out.push(other),
1176        }
1177    }
1178    out
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    #![allow(clippy::default_trait_access)]
1184
1185    use super::*;
1186    use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1187
1188    fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1189        let mut n = TaskNode::new(
1190            id,
1191            format!("task-{id}"),
1192            format!("description for task {id}"),
1193        );
1194        n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1195        n
1196    }
1197
1198    fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1199        let mut g = TaskGraph::new("test goal");
1200        g.tasks = nodes;
1201        g
1202    }
1203
1204    fn make_def(name: &str) -> SubAgentDef {
1205        use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1206        SubAgentDef {
1207            name: name.to_string(),
1208            description: format!("{name} agent"),
1209            model: None,
1210            tools: ToolPolicy::InheritAll,
1211            disallowed_tools: vec![],
1212            permissions: SubAgentPermissions::default(),
1213            skills: SkillFilter::default(),
1214            system_prompt: String::new(),
1215            hooks: SubagentHooks::default(),
1216            memory: None,
1217            source: None,
1218            file_path: None,
1219        }
1220    }
1221
1222    fn make_config() -> zeph_config::OrchestrationConfig {
1223        zeph_config::OrchestrationConfig {
1224            enabled: true,
1225            max_tasks: 20,
1226            max_parallel: 4,
1227            default_failure_strategy: "abort".to_string(),
1228            default_max_retries: 3,
1229            task_timeout_secs: 300,
1230            planner_provider: String::new(),
1231            planner_max_tokens: 4096,
1232            dependency_context_budget: 16384,
1233            confirm_before_execute: true,
1234            aggregator_max_tokens: 4096,
1235            deferral_backoff_ms: 250,
1236            plan_cache: zeph_config::PlanCacheConfig::default(),
1237            topology_selection: false,
1238            verify_provider: String::new(),
1239            verify_max_tokens: 1024,
1240            max_replans: 2,
1241            verify_completeness: false,
1242        }
1243    }
1244
1245    struct FirstRouter;
1246    impl AgentRouter for FirstRouter {
1247        fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1248            available.first().map(|d| d.name.clone())
1249        }
1250    }
1251
1252    struct NoneRouter;
1253    impl AgentRouter for NoneRouter {
1254        fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1255            None
1256        }
1257    }
1258
1259    fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1260        let config = make_config();
1261        let defs = vec![make_def("worker")];
1262        DagScheduler::new(graph, &config, router, defs).unwrap()
1263    }
1264
1265    fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1266        let config = make_config();
1267        let defs = vec![make_def("worker")];
1268        DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1269    }
1270
1271    // --- constructor tests ---
1272
1273    #[test]
1274    fn test_new_validates_graph_status() {
1275        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1276        graph.status = GraphStatus::Running; // wrong status
1277        let config = make_config();
1278        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1279        assert!(result.is_err());
1280        let err = result.unwrap_err();
1281        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1282    }
1283
1284    #[test]
1285    fn test_new_marks_roots_ready() {
1286        let graph = graph_from_nodes(vec![
1287            make_node(0, &[]),
1288            make_node(1, &[]),
1289            make_node(2, &[0, 1]),
1290        ]);
1291        let scheduler = make_scheduler(graph);
1292        assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1293        assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1294        assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1295        assert_eq!(scheduler.graph().status, GraphStatus::Running);
1296    }
1297
1298    #[test]
1299    fn test_new_validates_empty_graph() {
1300        let graph = graph_from_nodes(vec![]);
1301        let config = make_config();
1302        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1303        assert!(result.is_err());
1304    }
1305
1306    // --- tick tests ---
1307
1308    #[test]
1309    fn test_tick_produces_spawn_for_ready() {
1310        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1311        let mut scheduler = make_scheduler(graph);
1312        let actions = scheduler.tick();
1313        let spawns: Vec<_> = actions
1314            .iter()
1315            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1316            .collect();
1317        assert_eq!(spawns.len(), 2);
1318    }
1319
1320    #[test]
1321    fn test_tick_dispatches_all_regardless_of_max_parallel() {
1322        // tick() enforces max_parallel as a pre-dispatch cap.
1323        // With 5 independent tasks and max_parallel=2, only 2 are dispatched per tick.
1324        let graph = graph_from_nodes(vec![
1325            make_node(0, &[]),
1326            make_node(1, &[]),
1327            make_node(2, &[]),
1328            make_node(3, &[]),
1329            make_node(4, &[]),
1330        ]);
1331        let mut config = make_config();
1332        config.max_parallel = 2;
1333        let defs = vec![make_def("worker")];
1334        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1335        let actions = scheduler.tick();
1336        let spawn_count = actions
1337            .iter()
1338            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1339            .count();
1340        assert_eq!(
1341            spawn_count, 2,
1342            "max_parallel=2 caps dispatched tasks per tick"
1343        );
1344    }
1345
1346    #[test]
1347    fn test_tick_detects_completion() {
1348        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1349        graph.tasks[0].status = TaskStatus::Completed;
1350        let config = make_config();
1351        let defs = vec![make_def("worker")];
1352        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1353        // Manually set graph to Running since new() validated Created status
1354        // — but all tasks are terminal. tick() should detect completion.
1355        let actions = scheduler.tick();
1356        let has_done = actions.iter().any(|a| {
1357            matches!(
1358                a,
1359                SchedulerAction::Done {
1360                    status: GraphStatus::Completed
1361                }
1362            )
1363        });
1364        assert!(
1365            has_done,
1366            "should emit Done(Completed) when all tasks are terminal"
1367        );
1368    }
1369
1370    // --- completion event tests ---
1371
1372    #[test]
1373    fn test_completion_event_marks_deps_ready() {
1374        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1375        let mut scheduler = make_scheduler(graph);
1376
1377        // Simulate task 0 running.
1378        scheduler.graph.tasks[0].status = TaskStatus::Running;
1379        scheduler.running.insert(
1380            TaskId(0),
1381            RunningTask {
1382                agent_handle_id: "handle-0".to_string(),
1383                agent_def_name: "worker".to_string(),
1384                started_at: Instant::now(),
1385            },
1386        );
1387
1388        let event = TaskEvent {
1389            task_id: TaskId(0),
1390            agent_handle_id: "handle-0".to_string(),
1391            outcome: TaskOutcome::Completed {
1392                output: "done".to_string(),
1393                artifacts: vec![],
1394            },
1395        };
1396        scheduler.buffered_events.push_back(event);
1397
1398        let actions = scheduler.tick();
1399        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1400        // Task 1 should now be Ready or Spawn action emitted.
1401        let has_spawn_1 = actions
1402            .iter()
1403            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1404        assert!(
1405            has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1406            "task 1 should be spawned or marked Ready"
1407        );
1408    }
1409
1410    #[test]
1411    fn test_failure_abort_cancels_running() {
1412        let graph = graph_from_nodes(vec![
1413            make_node(0, &[]),
1414            make_node(1, &[]),
1415            make_node(2, &[0, 1]),
1416        ]);
1417        let mut scheduler = make_scheduler(graph);
1418
1419        // Simulate tasks 0 and 1 running.
1420        scheduler.graph.tasks[0].status = TaskStatus::Running;
1421        scheduler.running.insert(
1422            TaskId(0),
1423            RunningTask {
1424                agent_handle_id: "h0".to_string(),
1425                agent_def_name: "worker".to_string(),
1426                started_at: Instant::now(),
1427            },
1428        );
1429        scheduler.graph.tasks[1].status = TaskStatus::Running;
1430        scheduler.running.insert(
1431            TaskId(1),
1432            RunningTask {
1433                agent_handle_id: "h1".to_string(),
1434                agent_def_name: "worker".to_string(),
1435                started_at: Instant::now(),
1436            },
1437        );
1438
1439        // Task 0 fails with default Abort strategy.
1440        let event = TaskEvent {
1441            task_id: TaskId(0),
1442            agent_handle_id: "h0".to_string(),
1443            outcome: TaskOutcome::Failed {
1444                error: "boom".to_string(),
1445            },
1446        };
1447        scheduler.buffered_events.push_back(event);
1448
1449        let actions = scheduler.tick();
1450        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1451        let cancel_ids: Vec<_> = actions
1452            .iter()
1453            .filter_map(|a| {
1454                if let SchedulerAction::Cancel { agent_handle_id } = a {
1455                    Some(agent_handle_id.as_str())
1456                } else {
1457                    None
1458                }
1459            })
1460            .collect();
1461        assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1462        assert!(
1463            actions
1464                .iter()
1465                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1466        );
1467    }
1468
1469    #[test]
1470    fn test_failure_skip_propagates() {
1471        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1472        let mut scheduler = make_scheduler(graph);
1473
1474        // Set failure strategy to Skip on task 0.
1475        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1476        scheduler.graph.tasks[0].status = TaskStatus::Running;
1477        scheduler.running.insert(
1478            TaskId(0),
1479            RunningTask {
1480                agent_handle_id: "h0".to_string(),
1481                agent_def_name: "worker".to_string(),
1482                started_at: Instant::now(),
1483            },
1484        );
1485
1486        let event = TaskEvent {
1487            task_id: TaskId(0),
1488            agent_handle_id: "h0".to_string(),
1489            outcome: TaskOutcome::Failed {
1490                error: "skip me".to_string(),
1491            },
1492        };
1493        scheduler.buffered_events.push_back(event);
1494        scheduler.tick();
1495
1496        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1497        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1498    }
1499
1500    #[test]
1501    fn test_failure_retry_reschedules() {
1502        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1503        let mut scheduler = make_scheduler(graph);
1504
1505        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1506        scheduler.graph.tasks[0].max_retries = Some(3);
1507        scheduler.graph.tasks[0].retry_count = 0;
1508        scheduler.graph.tasks[0].status = TaskStatus::Running;
1509        scheduler.running.insert(
1510            TaskId(0),
1511            RunningTask {
1512                agent_handle_id: "h0".to_string(),
1513                agent_def_name: "worker".to_string(),
1514                started_at: Instant::now(),
1515            },
1516        );
1517
1518        let event = TaskEvent {
1519            task_id: TaskId(0),
1520            agent_handle_id: "h0".to_string(),
1521            outcome: TaskOutcome::Failed {
1522                error: "transient".to_string(),
1523            },
1524        };
1525        scheduler.buffered_events.push_back(event);
1526        let actions = scheduler.tick();
1527
1528        // Task should be rescheduled (Ready) and a Spawn action emitted.
1529        let has_spawn = actions
1530            .iter()
1531            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1532        assert!(
1533            has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1534            "retry should produce spawn or Ready status"
1535        );
1536        // retry_count incremented
1537        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1538    }
1539
1540    #[test]
1541    fn test_process_event_failed_retry() {
1542        // End-to-end: send Failed event, verify retry path produces Ready -> Spawn.
1543        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1544        let mut scheduler = make_scheduler(graph);
1545
1546        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1547        scheduler.graph.tasks[0].max_retries = Some(2);
1548        scheduler.graph.tasks[0].retry_count = 0;
1549        scheduler.graph.tasks[0].status = TaskStatus::Running;
1550        scheduler.running.insert(
1551            TaskId(0),
1552            RunningTask {
1553                agent_handle_id: "h0".to_string(),
1554                agent_def_name: "worker".to_string(),
1555                started_at: Instant::now(),
1556            },
1557        );
1558
1559        let event = TaskEvent {
1560            task_id: TaskId(0),
1561            agent_handle_id: "h0".to_string(),
1562            outcome: TaskOutcome::Failed {
1563                error: "first failure".to_string(),
1564            },
1565        };
1566        scheduler.buffered_events.push_back(event);
1567        let actions = scheduler.tick();
1568
1569        // After retry: retry_count = 1, status = Ready or Spawn emitted.
1570        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1571        let spawned = actions
1572            .iter()
1573            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1574        assert!(
1575            spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1576            "retry should emit Spawn or set Ready"
1577        );
1578        // Graph must still be Running.
1579        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1580    }
1581
1582    #[test]
1583    fn test_timeout_cancels_stalled() {
1584        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1585        let mut config = make_config();
1586        config.task_timeout_secs = 1; // 1 second timeout
1587        let defs = vec![make_def("worker")];
1588        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1589
1590        // Simulate a running task that started just over 1 second ago.
1591        scheduler.graph.tasks[0].status = TaskStatus::Running;
1592        scheduler.running.insert(
1593            TaskId(0),
1594            RunningTask {
1595                agent_handle_id: "h0".to_string(),
1596                agent_def_name: "worker".to_string(),
1597                started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), // already timed out
1598            },
1599        );
1600
1601        let actions = scheduler.tick();
1602        let has_cancel = actions.iter().any(
1603            |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1604        );
1605        assert!(has_cancel, "timed-out task should emit Cancel action");
1606        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1607    }
1608
1609    #[test]
1610    fn test_cancel_all() {
1611        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1612        let mut scheduler = make_scheduler(graph);
1613
1614        scheduler.graph.tasks[0].status = TaskStatus::Running;
1615        scheduler.running.insert(
1616            TaskId(0),
1617            RunningTask {
1618                agent_handle_id: "h0".to_string(),
1619                agent_def_name: "worker".to_string(),
1620                started_at: Instant::now(),
1621            },
1622        );
1623        scheduler.graph.tasks[1].status = TaskStatus::Running;
1624        scheduler.running.insert(
1625            TaskId(1),
1626            RunningTask {
1627                agent_handle_id: "h1".to_string(),
1628                agent_def_name: "worker".to_string(),
1629                started_at: Instant::now(),
1630            },
1631        );
1632
1633        let actions = scheduler.cancel_all();
1634
1635        assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1636        assert!(scheduler.running.is_empty());
1637        let cancel_count = actions
1638            .iter()
1639            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1640            .count();
1641        assert_eq!(cancel_count, 2);
1642        assert!(actions.iter().any(|a| matches!(
1643            a,
1644            SchedulerAction::Done {
1645                status: GraphStatus::Canceled
1646            }
1647        )));
1648    }
1649
1650    #[test]
1651    fn test_record_spawn_failure() {
1652        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1653        let mut scheduler = make_scheduler(graph);
1654
1655        // Simulate task marked Running (by tick) but spawn failed.
1656        scheduler.graph.tasks[0].status = TaskStatus::Running;
1657
1658        let error = SubAgentError::Spawn("spawn error".to_string());
1659        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1660        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1661        // With Abort strategy and no other running tasks, graph should be Failed.
1662        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1663        assert!(
1664            actions
1665                .iter()
1666                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1667        );
1668    }
1669
1670    #[test]
1671    fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1672        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1673        let mut scheduler = make_scheduler(graph);
1674
1675        // Simulate tick() optimistically marking the task Running before spawn.
1676        scheduler.graph.tasks[0].status = TaskStatus::Running;
1677
1678        // Concurrency limit hit — transient, should not fail the task.
1679        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1680        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1681        assert_eq!(
1682            scheduler.graph.tasks[0].status,
1683            TaskStatus::Ready,
1684            "task must revert to Ready so the next tick can retry"
1685        );
1686        assert_eq!(
1687            scheduler.graph.status,
1688            GraphStatus::Running,
1689            "graph must stay Running, not transition to Failed"
1690        );
1691        assert!(
1692            actions.is_empty(),
1693            "no cancel or done actions expected for a transient deferral"
1694        );
1695    }
1696
1697    #[test]
1698    fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1699        // Both spawn() and resume() now return SubAgentError::ConcurrencyLimit — verify handling.
1700        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1701        let mut scheduler = make_scheduler(graph);
1702        scheduler.graph.tasks[0].status = TaskStatus::Running;
1703
1704        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1705        let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1706        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1707        assert!(actions.is_empty());
1708    }
1709
1710    // --- #1516 edge-case tests ---
1711
1712    #[test]
1713    fn test_concurrency_deferral_does_not_affect_running_task() {
1714        // Two root tasks. Task 0 is Running (successfully spawned).
1715        // Task 1 hits a concurrency limit and reverts to Ready.
1716        // Task 0 must be unaffected.
1717        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1718        let mut scheduler = make_scheduler(graph);
1719
1720        // Simulate both tasks optimistically marked Running by tick().
1721        scheduler.graph.tasks[0].status = TaskStatus::Running;
1722        scheduler.running.insert(
1723            TaskId(0),
1724            RunningTask {
1725                agent_handle_id: "h0".to_string(),
1726                agent_def_name: "worker".to_string(),
1727                started_at: Instant::now(),
1728            },
1729        );
1730        scheduler.graph.tasks[1].status = TaskStatus::Running;
1731
1732        // Task 1 spawn fails with concurrency limit.
1733        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1734        let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1735
1736        assert_eq!(
1737            scheduler.graph.tasks[0].status,
1738            TaskStatus::Running,
1739            "task 0 must remain Running"
1740        );
1741        assert_eq!(
1742            scheduler.graph.tasks[1].status,
1743            TaskStatus::Ready,
1744            "task 1 must revert to Ready"
1745        );
1746        assert_eq!(
1747            scheduler.graph.status,
1748            GraphStatus::Running,
1749            "graph must stay Running"
1750        );
1751        assert!(actions.is_empty(), "no cancel or done actions expected");
1752    }
1753
1754    #[test]
1755    fn test_max_concurrent_zero_no_infinite_loop() {
1756        // max_parallel=0 is a degenerate config. tick() uses saturating_sub so slots=0,
1757        // and no tasks are dispatched. The graph does not deadlock because ready tasks
1758        // still exist — the caller must increase max_parallel or handle this externally.
1759        // After max_parallel is increased and a new tick fires, tasks will be dispatched.
1760        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1761        let config = zeph_config::OrchestrationConfig {
1762            max_parallel: 0,
1763            ..make_config()
1764        };
1765        let mut scheduler = DagScheduler::new(
1766            graph,
1767            &config,
1768            Box::new(FirstRouter),
1769            vec![make_def("worker")],
1770        )
1771        .unwrap();
1772
1773        let actions1 = scheduler.tick();
1774        // No Spawn: slots = max_parallel(0) - running(0) = 0.
1775        assert!(
1776            actions1
1777                .iter()
1778                .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1779            "no Spawn expected when max_parallel=0"
1780        );
1781        assert!(
1782            actions1
1783                .iter()
1784                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1785            "no Done(Failed) expected — ready tasks exist, so no deadlock"
1786        );
1787        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1788
1789        // Second tick also dispatches nothing (still max_parallel=0, ready task exists).
1790        let actions2 = scheduler.tick();
1791        assert!(
1792            actions2
1793                .iter()
1794                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1795            "second tick must not emit Done(Failed) — ready tasks still exist"
1796        );
1797        assert_eq!(
1798            scheduler.graph.status,
1799            GraphStatus::Running,
1800            "graph must remain Running"
1801        );
1802    }
1803
1804    #[test]
1805    fn test_all_tasks_deferred_graph_stays_running() {
1806        // Both root tasks are spawned optimistically, both fail with ConcurrencyLimit,
1807        // and both revert to Ready. The graph must remain Running (not Failed) and
1808        // the next tick must re-emit Spawn actions for the deferred tasks.
1809        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1810        let mut scheduler = make_scheduler(graph);
1811
1812        // First tick emits Spawn for both tasks and marks them Running.
1813        let actions = scheduler.tick();
1814        assert_eq!(
1815            actions
1816                .iter()
1817                .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1818                .count(),
1819            2,
1820            "expected 2 Spawn actions on first tick"
1821        );
1822        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1823        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1824
1825        // Both spawns fail — both revert to Ready.
1826        let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1827        let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1828        let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1829        assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1830        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1831        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1832        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1833
1834        // Second tick must retry both deferred tasks.
1835        let retry_actions = scheduler.tick();
1836        let spawn_count = retry_actions
1837            .iter()
1838            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1839            .count();
1840        assert!(
1841            spawn_count > 0,
1842            "second tick must re-emit Spawn for deferred tasks"
1843        );
1844        assert!(
1845            retry_actions.iter().all(|a| !matches!(
1846                a,
1847                SchedulerAction::Done {
1848                    status: GraphStatus::Failed,
1849                    ..
1850                }
1851            )),
1852            "no Done(Failed) expected"
1853        );
1854    }
1855
1856    #[test]
1857    fn test_build_prompt_no_deps() {
1858        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1859        let scheduler = make_scheduler(graph);
1860        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1861        assert_eq!(prompt, "description for task 0");
1862    }
1863
1864    #[test]
1865    fn test_build_prompt_with_deps_and_truncation() {
1866        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1867        graph.tasks[0].status = TaskStatus::Completed;
1868        // Create output longer than budget
1869        graph.tasks[0].result = Some(TaskResult {
1870            output: "x".repeat(200),
1871            artifacts: vec![],
1872            duration_ms: 10,
1873            agent_id: None,
1874            agent_def: None,
1875        });
1876
1877        let config = zeph_config::OrchestrationConfig {
1878            dependency_context_budget: 50,
1879            ..make_config()
1880        };
1881        let scheduler = DagScheduler::new(
1882            graph,
1883            &config,
1884            Box::new(FirstRouter),
1885            vec![make_def("worker")],
1886        )
1887        .unwrap();
1888
1889        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1890        assert!(prompt.contains("<completed-dependencies>"));
1891        assert!(prompt.contains("[truncated:"));
1892        assert!(prompt.contains("Your task:"));
1893    }
1894
1895    #[test]
1896    fn test_duration_ms_computed_correctly() {
1897        // Regression test for C1: duration_ms must be non-zero after completion.
1898        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1899        let mut scheduler = make_scheduler(graph);
1900
1901        scheduler.graph.tasks[0].status = TaskStatus::Running;
1902        scheduler.running.insert(
1903            TaskId(0),
1904            RunningTask {
1905                agent_handle_id: "h0".to_string(),
1906                agent_def_name: "worker".to_string(),
1907                started_at: Instant::now()
1908                    .checked_sub(Duration::from_millis(50))
1909                    .unwrap(),
1910            },
1911        );
1912
1913        let event = TaskEvent {
1914            task_id: TaskId(0),
1915            agent_handle_id: "h0".to_string(),
1916            outcome: TaskOutcome::Completed {
1917                output: "result".to_string(),
1918                artifacts: vec![],
1919            },
1920        };
1921        scheduler.buffered_events.push_back(event);
1922        scheduler.tick();
1923
1924        let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1925        assert!(
1926            result.duration_ms > 0,
1927            "duration_ms should be > 0, got {}",
1928            result.duration_ms
1929        );
1930    }
1931
1932    #[test]
1933    fn test_utf8_safe_truncation() {
1934        // S1 regression: truncation must not panic on multi-byte UTF-8.
1935        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1936        graph.tasks[0].status = TaskStatus::Completed;
1937        // Unicode: each char is 3 bytes in UTF-8.
1938        let unicode_output = "日本語テスト".repeat(100);
1939        graph.tasks[0].result = Some(TaskResult {
1940            output: unicode_output,
1941            artifacts: vec![],
1942            duration_ms: 10,
1943            agent_id: None,
1944            agent_def: None,
1945        });
1946
1947        // Budget large enough to hold the spotlighting wrapper + some Japanese chars.
1948        // The sanitizer adds ~200 chars of spotlight header, so 500 chars is sufficient.
1949        let config = zeph_config::OrchestrationConfig {
1950            dependency_context_budget: 500,
1951            ..make_config()
1952        };
1953        let scheduler = DagScheduler::new(
1954            graph,
1955            &config,
1956            Box::new(FirstRouter),
1957            vec![make_def("worker")],
1958        )
1959        .unwrap();
1960
1961        // Must not panic, and Japanese chars must be preserved in the output.
1962        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1963        assert!(
1964            prompt.contains("日"),
1965            "Japanese characters should be in the prompt after safe truncation"
1966        );
1967    }
1968
1969    #[test]
1970    fn test_no_agent_routes_inline() {
1971        // NoneRouter: when no agent matches, task falls back to RunInline.
1972        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1973        let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1974        let actions = scheduler.tick();
1975        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1976        assert!(
1977            actions
1978                .iter()
1979                .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
1980        );
1981    }
1982
1983    #[test]
1984    fn test_stale_event_rejected() {
1985        // Regression: events from a previous agent incarnation must be discarded.
1986        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1987        let mut scheduler = make_scheduler(graph);
1988
1989        // Simulate task running with handle "current-handle".
1990        scheduler.graph.tasks[0].status = TaskStatus::Running;
1991        scheduler.running.insert(
1992            TaskId(0),
1993            RunningTask {
1994                agent_handle_id: "current-handle".to_string(),
1995                agent_def_name: "worker".to_string(),
1996                started_at: Instant::now(),
1997            },
1998        );
1999
2000        // Send a completion event from the OLD agent (stale handle).
2001        let stale_event = TaskEvent {
2002            task_id: TaskId(0),
2003            agent_handle_id: "old-handle".to_string(),
2004            outcome: TaskOutcome::Completed {
2005                output: "stale output".to_string(),
2006                artifacts: vec![],
2007            },
2008        };
2009        scheduler.buffered_events.push_back(stale_event);
2010        let actions = scheduler.tick();
2011
2012        // Stale event must be discarded — task must NOT be completed.
2013        assert_ne!(
2014            scheduler.graph.tasks[0].status,
2015            TaskStatus::Completed,
2016            "stale event must not complete the task"
2017        );
2018        // No Spawn or Done actions should result from a discarded stale event.
2019        let has_done = actions
2020            .iter()
2021            .any(|a| matches!(a, SchedulerAction::Done { .. }));
2022        assert!(
2023            !has_done,
2024            "no Done action should be emitted for a stale event"
2025        );
2026        // Task must still be in the running map.
2027        assert!(
2028            scheduler.running.contains_key(&TaskId(0)),
2029            "running task must remain after stale event"
2030        );
2031    }
2032
2033    #[test]
2034    fn test_build_prompt_chars_count_in_truncation_message() {
2035        // Fix #3: truncation message must report char count, not byte count.
2036        // Use pure ASCII so sanitization doesn't significantly change char count.
2037        // Budget < output length => truncation triggered; verify the count label is "chars total".
2038        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2039        graph.tasks[0].status = TaskStatus::Completed;
2040        // ASCII output: byte count == char count, so both old and new code produce the same number,
2041        // but the label "chars total" (not "bytes total") is what matters here.
2042        let output = "x".repeat(200);
2043        graph.tasks[0].result = Some(TaskResult {
2044            output,
2045            artifacts: vec![],
2046            duration_ms: 10,
2047            agent_id: None,
2048            agent_def: None,
2049        });
2050
2051        let config = zeph_config::OrchestrationConfig {
2052            dependency_context_budget: 10, // truncate: sanitized output >> 10 chars
2053            ..make_config()
2054        };
2055        let scheduler = DagScheduler::new(
2056            graph,
2057            &config,
2058            Box::new(FirstRouter),
2059            vec![make_def("worker")],
2060        )
2061        .unwrap();
2062
2063        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2064        // Truncation must have been triggered and the message must use "chars total" label.
2065        assert!(
2066            prompt.contains("chars total"),
2067            "truncation message must use 'chars total' label. Prompt: {prompt}"
2068        );
2069        assert!(
2070            prompt.contains("[truncated:"),
2071            "prompt must contain truncation notice. Prompt: {prompt}"
2072        );
2073    }
2074
2075    // --- resume_from tests (MT-1) ---
2076
2077    #[test]
2078    fn test_resume_from_accepts_paused_graph() {
2079        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2080        graph.status = GraphStatus::Paused;
2081        graph.tasks[0].status = TaskStatus::Pending;
2082
2083        let scheduler =
2084            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2085                .expect("resume_from should accept Paused graph");
2086        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2087    }
2088
2089    #[test]
2090    fn test_resume_from_accepts_failed_graph() {
2091        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2092        graph.status = GraphStatus::Failed;
2093        graph.tasks[0].status = TaskStatus::Failed;
2094
2095        let scheduler =
2096            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2097                .expect("resume_from should accept Failed graph");
2098        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2099    }
2100
2101    #[test]
2102    fn test_resume_from_rejects_completed_graph() {
2103        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2104        graph.status = GraphStatus::Completed;
2105
2106        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2107            .unwrap_err();
2108        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2109    }
2110
2111    #[test]
2112    fn test_resume_from_rejects_canceled_graph() {
2113        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2114        graph.status = GraphStatus::Canceled;
2115
2116        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2117            .unwrap_err();
2118        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2119    }
2120
2121    #[test]
2122    fn test_resume_from_reconstructs_running_tasks() {
2123        // IC1: tasks that were Running at pause time must appear in the running map.
2124        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2125        graph.status = GraphStatus::Paused;
2126        graph.tasks[0].status = TaskStatus::Running;
2127        graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2128        graph.tasks[0].agent_hint = Some("worker".to_string());
2129        graph.tasks[1].status = TaskStatus::Pending;
2130
2131        let scheduler =
2132            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2133                .expect("should succeed");
2134
2135        assert!(
2136            scheduler.running.contains_key(&TaskId(0)),
2137            "Running task must be reconstructed in the running map (IC1)"
2138        );
2139        assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2140        assert!(
2141            !scheduler.running.contains_key(&TaskId(1)),
2142            "Pending task must not appear in running map"
2143        );
2144    }
2145
2146    #[test]
2147    fn test_resume_from_sets_status_running() {
2148        // II3: resume_from must set graph.status = Running regardless of input status.
2149        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2150        graph.status = GraphStatus::Paused;
2151
2152        let scheduler =
2153            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2154                .unwrap();
2155        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2156    }
2157
2158    // --- #1619 regression tests: consecutive_spawn_failures + exponential backoff ---
2159
2160    #[test]
2161    fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2162        // Each tick where all spawns hit ConcurrencyLimit must increment the counter
2163        // via record_batch_backoff(false, true).
2164        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2165        let mut scheduler = make_scheduler(graph);
2166        scheduler.graph.tasks[0].status = TaskStatus::Running;
2167
2168        assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2169
2170        let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2171        scheduler.record_spawn_failure(TaskId(0), &error);
2172        // record_spawn_failure no longer increments; batch_backoff does.
2173        scheduler.record_batch_backoff(false, true);
2174        assert_eq!(
2175            scheduler.consecutive_spawn_failures, 1,
2176            "first deferral tick: consecutive_spawn_failures must be 1"
2177        );
2178
2179        scheduler.graph.tasks[0].status = TaskStatus::Running;
2180        scheduler.record_spawn_failure(TaskId(0), &error);
2181        scheduler.record_batch_backoff(false, true);
2182        assert_eq!(
2183            scheduler.consecutive_spawn_failures, 2,
2184            "second deferral tick: consecutive_spawn_failures must be 2"
2185        );
2186
2187        scheduler.graph.tasks[0].status = TaskStatus::Running;
2188        scheduler.record_spawn_failure(TaskId(0), &error);
2189        scheduler.record_batch_backoff(false, true);
2190        assert_eq!(
2191            scheduler.consecutive_spawn_failures, 3,
2192            "third deferral tick: consecutive_spawn_failures must be 3"
2193        );
2194    }
2195
2196    #[test]
2197    fn test_consecutive_spawn_failures_resets_on_success() {
2198        // record_spawn() after deferrals must reset consecutive_spawn_failures to 0
2199        // (via record_spawn internal reset; record_batch_backoff(true, _) also resets).
2200        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2201        let mut scheduler = make_scheduler(graph);
2202        scheduler.graph.tasks[0].status = TaskStatus::Running;
2203
2204        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2205        scheduler.record_spawn_failure(TaskId(0), &error);
2206        scheduler.record_batch_backoff(false, true);
2207        scheduler.graph.tasks[0].status = TaskStatus::Running;
2208        scheduler.record_spawn_failure(TaskId(0), &error);
2209        scheduler.record_batch_backoff(false, true);
2210        assert_eq!(scheduler.consecutive_spawn_failures, 2);
2211
2212        // Successful spawn resets the counter directly in record_spawn.
2213        scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2214        assert_eq!(
2215            scheduler.consecutive_spawn_failures, 0,
2216            "record_spawn must reset consecutive_spawn_failures to 0"
2217        );
2218    }
2219
2220    #[tokio::test]
2221    async fn test_exponential_backoff_duration() {
2222        // With consecutive_spawn_failures=0, backoff equals the base interval.
2223        // With consecutive_spawn_failures=3, backoff = min(base * 8, 5000ms).
2224        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2225        let config = zeph_config::OrchestrationConfig {
2226            deferral_backoff_ms: 50,
2227            ..make_config()
2228        };
2229        let mut scheduler = DagScheduler::new(
2230            graph,
2231            &config,
2232            Box::new(FirstRouter),
2233            vec![make_def("worker")],
2234        )
2235        .unwrap();
2236
2237        // consecutive_spawn_failures=0 → sleep ≈ 50ms (base).
2238        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2239        let start = tokio::time::Instant::now();
2240        scheduler.wait_event().await;
2241        let elapsed0 = start.elapsed();
2242        assert!(
2243            elapsed0.as_millis() >= 50,
2244            "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2245            elapsed0.as_millis()
2246        );
2247
2248        // Simulate 3 consecutive deferrals: multiplier = 2^3 = 8 → 400ms, capped at 5000ms.
2249        scheduler.consecutive_spawn_failures = 3;
2250        let start = tokio::time::Instant::now();
2251        scheduler.wait_event().await;
2252        let elapsed3 = start.elapsed();
2253        assert!(
2254            elapsed3.as_millis() >= 400,
2255            "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2256            elapsed3.as_millis()
2257        );
2258
2259        // Simulate 20 consecutive deferrals: exponent capped at 10 → 50 * 1024 = 51200 → capped at 5000ms.
2260        scheduler.consecutive_spawn_failures = 20;
2261        let start = tokio::time::Instant::now();
2262        scheduler.wait_event().await;
2263        let elapsed20 = start.elapsed();
2264        assert!(
2265            elapsed20.as_millis() >= 5000,
2266            "backoff must be capped at 5000ms with high deferrals, got {}ms",
2267            elapsed20.as_millis()
2268        );
2269    }
2270
2271    // --- deferral_backoff regression test ---
2272
2273    #[tokio::test]
2274    async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2275        // Regression for issue #1519: wait_event must sleep deferral_backoff when
2276        // running is empty, preventing a busy spin-loop.
2277        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2278        let config = zeph_config::OrchestrationConfig {
2279            deferral_backoff_ms: 50,
2280            ..make_config()
2281        };
2282        let mut scheduler = DagScheduler::new(
2283            graph,
2284            &config,
2285            Box::new(FirstRouter),
2286            vec![make_def("worker")],
2287        )
2288        .unwrap();
2289
2290        // Do not start any tasks — running map stays empty.
2291        assert!(scheduler.running.is_empty());
2292
2293        let start = tokio::time::Instant::now();
2294        scheduler.wait_event().await;
2295        let elapsed = start.elapsed();
2296
2297        assert!(
2298            elapsed.as_millis() >= 50,
2299            "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2300            elapsed.as_millis()
2301        );
2302    }
2303
2304    #[test]
2305    fn test_current_deferral_backoff_exponential_growth() {
2306        // Regression for issue #1618: backoff must grow exponentially with consecutive
2307        // spawn failures so the scheduler does not busy-spin at 250ms when saturated.
2308        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2309        let config = zeph_config::OrchestrationConfig {
2310            deferral_backoff_ms: 250,
2311            ..make_config()
2312        };
2313        let mut scheduler = DagScheduler::new(
2314            graph,
2315            &config,
2316            Box::new(FirstRouter),
2317            vec![make_def("worker")],
2318        )
2319        .unwrap();
2320
2321        assert_eq!(
2322            scheduler.current_deferral_backoff(),
2323            Duration::from_millis(250)
2324        );
2325
2326        scheduler.consecutive_spawn_failures = 1;
2327        assert_eq!(
2328            scheduler.current_deferral_backoff(),
2329            Duration::from_millis(500)
2330        );
2331
2332        scheduler.consecutive_spawn_failures = 2;
2333        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2334
2335        scheduler.consecutive_spawn_failures = 3;
2336        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2337
2338        scheduler.consecutive_spawn_failures = 4;
2339        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2340
2341        // Cap at 5 seconds.
2342        scheduler.consecutive_spawn_failures = 5;
2343        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2344
2345        scheduler.consecutive_spawn_failures = 100;
2346        assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2347    }
2348
2349    #[test]
2350    fn test_record_spawn_resets_consecutive_failures() {
2351        // Regression for issue #1618: a successful spawn resets the backoff counter.
2352        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2353        let mut scheduler = DagScheduler::new(
2354            graph,
2355            &make_config(),
2356            Box::new(FirstRouter),
2357            vec![make_def("worker")],
2358        )
2359        .unwrap();
2360
2361        scheduler.consecutive_spawn_failures = 3;
2362        let task_id = TaskId(0);
2363        scheduler.graph.tasks[0].status = TaskStatus::Running;
2364        scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2365
2366        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2367    }
2368
2369    #[test]
2370    fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2371        // record_spawn_failure(ConcurrencyLimit) reverts task to Ready but does NOT
2372        // change consecutive_spawn_failures — that is the job of record_batch_backoff.
2373        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2374        let mut scheduler = DagScheduler::new(
2375            graph,
2376            &make_config(),
2377            Box::new(FirstRouter),
2378            vec![make_def("worker")],
2379        )
2380        .unwrap();
2381
2382        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2383        let task_id = TaskId(0);
2384        scheduler.graph.tasks[0].status = TaskStatus::Running;
2385
2386        let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2387        scheduler.record_spawn_failure(task_id, &error);
2388
2389        // Counter unchanged — batch_backoff is responsible for incrementing.
2390        assert_eq!(scheduler.consecutive_spawn_failures, 0);
2391        // Task reverted to Ready.
2392        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2393    }
2394
2395    // --- #1628 parallel dispatch tests ---
2396
2397    #[test]
2398    fn test_parallel_dispatch_all_ready() {
2399        // tick() enforces max_parallel as a pre-dispatch cap. With 6 independent tasks
2400        // and max_parallel=2, only 2 tasks are dispatched per tick.
2401        let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2402        let graph = graph_from_nodes(nodes);
2403        let config = zeph_config::OrchestrationConfig {
2404            max_parallel: 2,
2405            ..make_config()
2406        };
2407        let mut scheduler = DagScheduler::new(
2408            graph,
2409            &config,
2410            Box::new(FirstRouter),
2411            vec![make_def("worker")],
2412        )
2413        .unwrap();
2414
2415        let actions = scheduler.tick();
2416        let spawn_count = actions
2417            .iter()
2418            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2419            .count();
2420        assert_eq!(
2421            spawn_count, 2,
2422            "only max_parallel=2 tasks dispatched per tick"
2423        );
2424
2425        let running_count = scheduler
2426            .graph
2427            .tasks
2428            .iter()
2429            .filter(|t| t.status == TaskStatus::Running)
2430            .count();
2431        assert_eq!(running_count, 2, "only 2 tasks marked Running");
2432    }
2433
2434    #[test]
2435    fn test_batch_backoff_partial_success() {
2436        // Some spawns succeed, some hit ConcurrencyLimit: counter resets to 0.
2437        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2438        let mut scheduler = make_scheduler(graph);
2439        scheduler.consecutive_spawn_failures = 3;
2440
2441        scheduler.record_batch_backoff(true, true);
2442        assert_eq!(
2443            scheduler.consecutive_spawn_failures, 0,
2444            "any success in batch must reset counter"
2445        );
2446    }
2447
2448    #[test]
2449    fn test_batch_backoff_all_failed() {
2450        // All spawns hit ConcurrencyLimit: counter increments by 1.
2451        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2452        let mut scheduler = make_scheduler(graph);
2453        scheduler.consecutive_spawn_failures = 2;
2454
2455        scheduler.record_batch_backoff(false, true);
2456        assert_eq!(
2457            scheduler.consecutive_spawn_failures, 3,
2458            "all-failure tick must increment counter"
2459        );
2460    }
2461
2462    #[test]
2463    fn test_batch_backoff_no_spawns() {
2464        // No spawn actions in tick: counter unchanged.
2465        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2466        let mut scheduler = make_scheduler(graph);
2467        scheduler.consecutive_spawn_failures = 5;
2468
2469        scheduler.record_batch_backoff(false, false);
2470        assert_eq!(
2471            scheduler.consecutive_spawn_failures, 5,
2472            "no spawns must not change counter"
2473        );
2474    }
2475
2476    #[test]
2477    fn test_buffer_guard_uses_task_count() {
2478        // Structural guard: verifies that the buffer capacity expression uses
2479        // graph.tasks.len() * 2 rather than max_parallel * 2. This is an intentional
2480        // regression-prevention test — if wait_event() is accidentally reverted to
2481        // max_parallel * 2 the assertion below catches the discrepancy.
2482        // Behavioral coverage (actual buffer drop prevention) requires an async harness
2483        // with a real channel, which is outside the scope of this unit test.
2484        //
2485        // Scenario: 10 tasks, max_parallel=2 → tasks.len()*2=20, max_parallel*2=4.
2486        // The guard must use 20, not 4.
2487        let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2488        let graph = graph_from_nodes(nodes);
2489        let config = zeph_config::OrchestrationConfig {
2490            max_parallel: 2, // 2*2=4, but tasks.len()*2=20
2491            ..make_config()
2492        };
2493        let scheduler = DagScheduler::new(
2494            graph,
2495            &config,
2496            Box::new(FirstRouter),
2497            vec![make_def("worker")],
2498        )
2499        .unwrap();
2500        // Confirm: tasks.len() * 2 = 20, max_parallel * 2 = 4.
2501        assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2502        assert_eq!(scheduler.max_parallel * 2, 4);
2503    }
2504
2505    #[test]
2506    fn test_batch_mixed_concurrency_and_fatal_failure() {
2507        // Mixed batch: task 0 gets ConcurrencyLimit (transient), task 1 gets a
2508        // non-transient Spawn error (fatal). Two independent tasks, no deps between them.
2509        // Verify:
2510        // - task 0 reverts to Ready (retried next tick)
2511        // - task 1 is marked Failed; with FailureStrategy::Skip the graph stays Running
2512        //   because task 1 has no dependents that would abort the graph
2513        // - record_batch_backoff(false, true) increments counter by 1
2514        let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2515        // FailureStrategy::Skip: task 1 fails but its absence is ignored.
2516        nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2517        let graph = graph_from_nodes(nodes);
2518        let mut scheduler = make_scheduler(graph);
2519
2520        // Optimistically mark both as Running (as tick() would do).
2521        scheduler.graph.tasks[0].status = TaskStatus::Running;
2522        scheduler.graph.tasks[1].status = TaskStatus::Running;
2523
2524        // Task 0: ConcurrencyLimit (transient).
2525        let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2526        let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2527        assert!(
2528            actions0.is_empty(),
2529            "ConcurrencyLimit must produce no extra actions"
2530        );
2531        assert_eq!(
2532            scheduler.graph.tasks[0].status,
2533            TaskStatus::Ready,
2534            "task 0 must revert to Ready"
2535        );
2536
2537        // Task 1: non-transient Spawn failure. record_spawn_failure marks it Failed,
2538        // then propagate_failure applies FailureStrategy::Skip → status becomes Skipped.
2539        let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2540        let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2541        assert_eq!(
2542            scheduler.graph.tasks[1].status,
2543            TaskStatus::Skipped,
2544            "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2545        );
2546        // No Done action from record_spawn_failure — graph still has task 0 alive.
2547        assert!(
2548            actions1
2549                .iter()
2550                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2551            "no Done action expected: task 0 is still Ready"
2552        );
2553
2554        // Batch result: no success, one ConcurrencyLimit failure.
2555        scheduler.consecutive_spawn_failures = 0;
2556        scheduler.record_batch_backoff(false, true);
2557        assert_eq!(
2558            scheduler.consecutive_spawn_failures, 1,
2559            "batch with only ConcurrencyLimit must increment counter"
2560        );
2561    }
2562
2563    /// Regression for #1879: when the scheduler detects a deadlock (no running or ready tasks,
2564    /// but the graph is not complete), all non-terminal tasks must be marked Canceled, not left
2565    /// in their previous status (e.g. Pending).
2566    #[test]
2567    fn test_deadlock_marks_non_terminal_tasks_canceled() {
2568        // Build a graph in Failed status (as if a prior retry pass left task 0 failed and
2569        // task 1/2 still Pending). resume_from() transitions it to Running without resetting
2570        // task statuses, so tick() immediately sees no running, no ready, not all terminal —
2571        // triggering the deadlock branch.
2572        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2573        nodes[0].status = TaskStatus::Failed;
2574        nodes[1].status = TaskStatus::Pending;
2575        nodes[2].status = TaskStatus::Pending;
2576
2577        let mut graph = graph_from_nodes(nodes);
2578        graph.status = GraphStatus::Failed;
2579
2580        let mut scheduler = DagScheduler::resume_from(
2581            graph,
2582            &make_config(),
2583            Box::new(FirstRouter),
2584            vec![make_def("worker")],
2585        )
2586        .unwrap();
2587
2588        // After resume_from, graph is Running but no tasks are Ready/Running — deadlock.
2589        let actions = scheduler.tick();
2590
2591        // Must emit Done(Failed).
2592        assert!(
2593            actions.iter().any(|a| matches!(
2594                a,
2595                SchedulerAction::Done {
2596                    status: GraphStatus::Failed
2597                }
2598            )),
2599            "deadlock must emit Done(Failed); got: {actions:?}"
2600        );
2601        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2602
2603        // task 0 was already Failed (terminal) — must remain unchanged.
2604        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2605        // task 1 was Pending (non-terminal) — must be Canceled.
2606        assert_eq!(
2607            scheduler.graph.tasks[1].status,
2608            TaskStatus::Canceled,
2609            "Pending task must be Canceled on deadlock"
2610        );
2611        // task 2 was Pending (non-terminal) — must be Canceled.
2612        assert_eq!(
2613            scheduler.graph.tasks[2].status,
2614            TaskStatus::Canceled,
2615            "Pending task must be Canceled on deadlock"
2616        );
2617    }
2618
2619    /// Regression for #1879: deadlock with one task Running should NOT trigger the deadlock
2620    /// branch (running_in_graph_now > 0 suppresses the check).
2621    #[test]
2622    fn test_deadlock_not_triggered_when_task_running() {
2623        // Graph in Failed with one task still marked Running — resume_from reconstructs
2624        // the running map. tick() sees running_in_graph_now > 0 and skips deadlock check.
2625        let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2626        nodes[0].status = TaskStatus::Running;
2627        nodes[0].assigned_agent = Some("handle-1".into());
2628        nodes[1].status = TaskStatus::Pending;
2629
2630        let mut graph = graph_from_nodes(nodes);
2631        graph.status = GraphStatus::Failed;
2632
2633        let mut scheduler = DagScheduler::resume_from(
2634            graph,
2635            &make_config(),
2636            Box::new(FirstRouter),
2637            vec![make_def("worker")],
2638        )
2639        .unwrap();
2640
2641        let actions = scheduler.tick();
2642
2643        // Running task in graph — no deadlock triggered.
2644        assert!(
2645            actions
2646                .iter()
2647                .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2648            "no Done action expected when a task is running; got: {actions:?}"
2649        );
2650        assert_eq!(scheduler.graph.status, GraphStatus::Running);
2651    }
2652
2653    // --- topology_selection tests ---
2654
2655    #[test]
2656    fn topology_linear_chain_limits_parallelism_to_one() {
2657        // LinearChain topology with topology_selection=true → max_parallel overridden to 1.
2658        // tick() must dispatch exactly 1 task even though 1 root task is ready.
2659        let graph = graph_from_nodes(vec![
2660            make_node(0, &[]),
2661            make_node(1, &[0]),
2662            make_node(2, &[1]),
2663        ]);
2664        let config = zeph_config::OrchestrationConfig {
2665            topology_selection: true,
2666            max_parallel: 4,
2667            ..make_config()
2668        };
2669        let mut scheduler = DagScheduler::new(
2670            graph,
2671            &config,
2672            Box::new(FirstRouter),
2673            vec![make_def("worker")],
2674        )
2675        .unwrap();
2676
2677        assert_eq!(
2678            scheduler.topology().topology,
2679            crate::topology::Topology::LinearChain
2680        );
2681        assert_eq!(scheduler.max_parallel, 1);
2682
2683        let actions = scheduler.tick();
2684        let spawn_count = actions
2685            .iter()
2686            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2687            .count();
2688        assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2689    }
2690
2691    #[test]
2692    fn topology_all_parallel_dispatches_all_ready() {
2693        // AllParallel topology with topology_selection=true → max_parallel unchanged.
2694        // tick() dispatches all 4 independent tasks in one go.
2695        let graph = graph_from_nodes(vec![
2696            make_node(0, &[]),
2697            make_node(1, &[]),
2698            make_node(2, &[]),
2699            make_node(3, &[]),
2700        ]);
2701        let config = zeph_config::OrchestrationConfig {
2702            topology_selection: true,
2703            max_parallel: 4,
2704            ..make_config()
2705        };
2706        let mut scheduler = DagScheduler::new(
2707            graph,
2708            &config,
2709            Box::new(FirstRouter),
2710            vec![make_def("worker")],
2711        )
2712        .unwrap();
2713
2714        assert_eq!(
2715            scheduler.topology().topology,
2716            crate::topology::Topology::AllParallel
2717        );
2718
2719        let actions = scheduler.tick();
2720        let spawn_count = actions
2721            .iter()
2722            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2723            .count();
2724        assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2725    }
2726
2727    #[test]
2728    fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2729        // Three ready tasks: A(sequential), B(sequential), C(parallel).
2730        // tick() must dispatch A + C, hold B (another sequential already scheduled this tick).
2731        use crate::graph::ExecutionMode;
2732
2733        let mut a = make_node(0, &[]);
2734        a.execution_mode = ExecutionMode::Sequential;
2735        let mut b = make_node(1, &[]);
2736        b.execution_mode = ExecutionMode::Sequential;
2737        let mut c = make_node(2, &[]);
2738        c.execution_mode = ExecutionMode::Parallel;
2739
2740        let graph = graph_from_nodes(vec![a, b, c]);
2741        let config = zeph_config::OrchestrationConfig {
2742            max_parallel: 4,
2743            ..make_config()
2744        };
2745        let mut scheduler = DagScheduler::new(
2746            graph,
2747            &config,
2748            Box::new(FirstRouter),
2749            vec![make_def("worker")],
2750        )
2751        .unwrap();
2752
2753        let actions = scheduler.tick();
2754        let spawned: Vec<TaskId> = actions
2755            .iter()
2756            .filter_map(|a| {
2757                if let SchedulerAction::Spawn { task_id, .. } = a {
2758                    Some(*task_id)
2759                } else {
2760                    None
2761                }
2762            })
2763            .collect();
2764
2765        // A (seq, idx=0) and C (par, idx=2) dispatched; B (seq, idx=1) held.
2766        assert!(
2767            spawned.contains(&TaskId(0)),
2768            "A(sequential) must be dispatched"
2769        );
2770        assert!(
2771            spawned.contains(&TaskId(2)),
2772            "C(parallel) must be dispatched"
2773        );
2774        assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2775        assert_eq!(spawned.len(), 2);
2776    }
2777
2778    // --- inject_tasks replan cap tests (#2241) ---
2779
2780    #[test]
2781    fn test_inject_tasks_per_task_cap_skips_second() {
2782        // Per-task cap: 1 replan per task. Second inject for same task_id is a silent no-op.
2783        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2784        let mut scheduler = make_scheduler(graph);
2785
2786        let first = make_node(2, &[]);
2787        scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2788        assert_eq!(
2789            scheduler.graph.tasks.len(),
2790            3,
2791            "first inject must append the task"
2792        );
2793        assert_eq!(scheduler.global_replan_count, 1);
2794
2795        // Second inject for the same verified task — per-task count is already 1.
2796        let second = make_node(3, &[]);
2797        scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2798        assert_eq!(
2799            scheduler.graph.tasks.len(),
2800            3,
2801            "second inject must be silently skipped (per-task cap)"
2802        );
2803        assert_eq!(
2804            scheduler.global_replan_count, 1,
2805            "global counter must not increment on skipped inject"
2806        );
2807    }
2808
2809    #[test]
2810    fn test_inject_tasks_global_cap_skips_when_exhausted() {
2811        // Global cap: max_replans=1. First inject consumes the budget; second is a no-op.
2812        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2813        let mut config = make_config();
2814        config.max_replans = 1;
2815        let defs = vec![make_def("worker")];
2816        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2817
2818        let new1 = make_node(2, &[]);
2819        scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2820        assert_eq!(scheduler.global_replan_count, 1);
2821
2822        // Second inject for a different task — global cap exhausted.
2823        let new2 = make_node(3, &[]);
2824        scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2825        assert_eq!(
2826            scheduler.graph.tasks.len(),
2827            3,
2828            "global cap must prevent the second inject"
2829        );
2830        assert_eq!(
2831            scheduler.global_replan_count, 1,
2832            "global counter must not increment past cap"
2833        );
2834    }
2835
2836    #[test]
2837    fn test_inject_tasks_sets_topology_dirty() {
2838        // inject_tasks must set topology_dirty; tick() must clear it after re-analysis.
2839        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2840        let mut scheduler = make_scheduler(graph);
2841        assert!(
2842            !scheduler.topology_dirty,
2843            "topology_dirty must be false initially"
2844        );
2845
2846        let new_task = make_node(1, &[]);
2847        scheduler
2848            .inject_tasks(TaskId(0), vec![new_task], 20)
2849            .unwrap();
2850        assert!(
2851            scheduler.topology_dirty,
2852            "inject_tasks must set topology_dirty=true"
2853        );
2854
2855        scheduler.tick();
2856        assert!(
2857            !scheduler.topology_dirty,
2858            "tick() must clear topology_dirty after re-analysis"
2859        );
2860    }
2861
2862    #[test]
2863    fn test_inject_tasks_rejects_cycle() {
2864        // Injecting a task that introduces a cycle must return VerificationFailed.
2865        let graph = graph_from_nodes(vec![make_node(0, &[])]);
2866        let mut scheduler = make_scheduler(graph);
2867
2868        // New task ID=1 with a self-reference (depends on itself) → cycle.
2869        let cyclic_task = make_node(1, &[1]);
2870        let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
2871        assert!(result.is_err(), "cyclic injection must return an error");
2872        assert!(
2873            matches!(
2874                result.unwrap_err(),
2875                OrchestrationError::VerificationFailed(_)
2876            ),
2877            "must return VerificationFailed for cycle"
2878        );
2879        // Global and per-task counters must not be incremented on error.
2880        assert_eq!(scheduler.global_replan_count, 0);
2881        assert_eq!(
2882            scheduler.topology_dirty, false,
2883            "topology_dirty must not be set when inject fails"
2884        );
2885    }
2886
2887    // --- LevelBarrier dispatch tests (#2242) ---
2888
2889    fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
2890        zeph_config::OrchestrationConfig {
2891            topology_selection: true,
2892            max_parallel: 4,
2893            ..make_config()
2894        }
2895    }
2896
2897    /// A(0)→{B(1),C(2)}, B(1)→D(3). Hierarchical topology, depths: A=0, B=1, C=1, D=2.
2898    fn make_hierarchical_graph() -> TaskGraph {
2899        graph_from_nodes(vec![
2900            make_node(0, &[]),
2901            make_node(1, &[0]),
2902            make_node(2, &[0]),
2903            make_node(3, &[1]),
2904        ])
2905    }
2906
2907    #[test]
2908    fn test_level_barrier_advances_on_terminal_level() {
2909        // When all tasks at current_level are terminal, tick() advances current_level
2910        // and dispatches tasks at the next non-terminal level.
2911        let graph = make_hierarchical_graph();
2912        let config = make_hierarchical_config();
2913        let defs = vec![make_def("worker")];
2914        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2915
2916        assert_eq!(
2917            scheduler.topology().strategy,
2918            crate::topology::DispatchStrategy::LevelBarrier,
2919            "must use LevelBarrier strategy for Hierarchical graph"
2920        );
2921        assert_eq!(scheduler.current_level, 0);
2922
2923        // First tick: only A(0) at level 0 is dispatched.
2924        let actions = scheduler.tick();
2925        let spawned_ids: Vec<_> = actions
2926            .iter()
2927            .filter_map(|a| {
2928                if let SchedulerAction::Spawn { task_id, .. } = a {
2929                    Some(*task_id)
2930                } else {
2931                    None
2932                }
2933            })
2934            .collect();
2935        assert_eq!(
2936            spawned_ids,
2937            vec![TaskId(0)],
2938            "first tick must dispatch only A at level 0"
2939        );
2940
2941        // Simulate A completing: mark Completed, mark B and C Ready (deps satisfied).
2942        scheduler.graph.tasks[0].status = TaskStatus::Completed;
2943        scheduler.running.clear();
2944        scheduler.graph.tasks[1].status = TaskStatus::Ready;
2945        scheduler.graph.tasks[2].status = TaskStatus::Ready;
2946
2947        // Second tick: A is terminal → level advances to 1 → B and C dispatched.
2948        let actions2 = scheduler.tick();
2949        assert_eq!(
2950            scheduler.current_level, 1,
2951            "current_level must advance to 1 after level-0 tasks terminate"
2952        );
2953        let spawned2: Vec<_> = actions2
2954            .iter()
2955            .filter_map(|a| {
2956                if let SchedulerAction::Spawn { task_id, .. } = a {
2957                    Some(*task_id)
2958                } else {
2959                    None
2960                }
2961            })
2962            .collect();
2963        assert!(
2964            spawned2.contains(&TaskId(1)),
2965            "B must be dispatched after level advance"
2966        );
2967        assert!(
2968            spawned2.contains(&TaskId(2)),
2969            "C must be dispatched after level advance"
2970        );
2971    }
2972
2973    #[test]
2974    fn test_level_barrier_failure_propagates_transitively() {
2975        // When A fails with Skip strategy, propagate_failure() BFS-marks all
2976        // descendants (B, C, D) as Skipped. tick() must then advance past level 0.
2977        let graph = make_hierarchical_graph();
2978        let config = make_hierarchical_config();
2979        let defs = vec![make_def("worker")];
2980        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2981
2982        // Set A to Skip failure strategy and simulate it running.
2983        scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
2984        scheduler.graph.tasks[0].status = TaskStatus::Running;
2985        scheduler.running.insert(
2986            TaskId(0),
2987            RunningTask {
2988                agent_handle_id: "h0".to_string(),
2989                agent_def_name: "worker".to_string(),
2990                started_at: Instant::now(),
2991            },
2992        );
2993
2994        // Push a failure event for A.
2995        scheduler.buffered_events.push_back(TaskEvent {
2996            task_id: TaskId(0),
2997            agent_handle_id: "h0".to_string(),
2998            outcome: TaskOutcome::Failed {
2999                error: "simulated failure".to_string(),
3000            },
3001        });
3002
3003        scheduler.tick();
3004
3005        // A failed with Skip → A=Skipped. B, C, D must be transitively Skipped.
3006        assert_eq!(
3007            scheduler.graph.tasks[0].status,
3008            TaskStatus::Skipped,
3009            "A must be Skipped (Skip strategy)"
3010        );
3011        assert_eq!(
3012            scheduler.graph.tasks[1].status,
3013            TaskStatus::Skipped,
3014            "B must be transitively Skipped"
3015        );
3016        assert_eq!(
3017            scheduler.graph.tasks[2].status,
3018            TaskStatus::Skipped,
3019            "C must be transitively Skipped"
3020        );
3021        assert_eq!(
3022            scheduler.graph.tasks[3].status,
3023            TaskStatus::Skipped,
3024            "D must be transitively Skipped"
3025        );
3026    }
3027
3028    #[test]
3029    fn test_level_barrier_current_level_reset_after_inject() {
3030        // inject_tasks() adding a task at depth < current_level must cause tick() to
3031        // reset current_level downward via the .min() guard (critic C2 / issue #2242).
3032        let graph = make_hierarchical_graph(); // A(0)→{B(1),C(2)}, B(1)→D(3)
3033        let config = make_hierarchical_config();
3034        let defs = vec![make_def("worker")];
3035        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3036
3037        // Manually mark A, B, C as Completed (simulate levels 0 and 1 done).
3038        scheduler.graph.tasks[0].status = TaskStatus::Completed; // A depth 0
3039        scheduler.graph.tasks[1].status = TaskStatus::Completed; // B depth 1
3040        scheduler.graph.tasks[2].status = TaskStatus::Completed; // C depth 1
3041        // D(3) is Pending at depth 2. Manually set current_level to 2.
3042        scheduler.current_level = 2;
3043
3044        // Inject E(4) depending on A(0) (Completed) → E will be at depth 1 after re-analysis.
3045        // This is shallower than current_level=2 → tick() must reset current_level to 1.
3046        let e = make_node(4, &[0]);
3047        scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3048        assert!(scheduler.topology_dirty);
3049
3050        // tick() re-analyzes topology (E at depth 1, D at depth 2).
3051        // min_non_terminal_depth = 1 (E is Ready). current_level = min(2, 1) = 1.
3052        scheduler.tick();
3053        assert_eq!(
3054            scheduler.current_level, 1,
3055            "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3056        );
3057    }
3058
3059    #[test]
3060    fn resume_from_preserves_topology_classification() {
3061        // resume_from() must also apply topology classification (fix H3).
3062        let mut graph = graph_from_nodes(vec![
3063            make_node(0, &[]),
3064            make_node(1, &[0]),
3065            make_node(2, &[1]),
3066        ]);
3067        // Put graph in Paused state so resume_from accepts it.
3068        graph.status = GraphStatus::Paused;
3069        graph.tasks[0].status = TaskStatus::Completed;
3070        graph.tasks[1].status = TaskStatus::Pending;
3071        graph.tasks[2].status = TaskStatus::Pending;
3072
3073        let config = zeph_config::OrchestrationConfig {
3074            topology_selection: true,
3075            max_parallel: 4,
3076            ..make_config()
3077        };
3078        let scheduler = DagScheduler::resume_from(
3079            graph,
3080            &config,
3081            Box::new(FirstRouter),
3082            vec![make_def("worker")],
3083        )
3084        .unwrap();
3085
3086        assert_eq!(
3087            scheduler.topology().topology,
3088            crate::topology::Topology::LinearChain,
3089            "resume_from must classify topology"
3090        );
3091        assert_eq!(
3092            scheduler.max_parallel, 1,
3093            "resume_from must apply topology limit"
3094        );
3095    }
3096
3097    // --- #2238: validate_verify_config tests ---
3098
3099    fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3100        zeph_config::OrchestrationConfig {
3101            verify_completeness: true,
3102            verify_provider: provider.to_string(),
3103            ..make_config()
3104        }
3105    }
3106
3107    #[test]
3108    fn validate_verify_config_unknown_provider_returns_err() {
3109        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3110        let config = make_verify_config("nonexistent");
3111        let scheduler = DagScheduler::new(
3112            graph,
3113            &config,
3114            Box::new(FirstRouter),
3115            vec![make_def("worker")],
3116        )
3117        .unwrap();
3118        let result = scheduler.validate_verify_config(&["fast", "quality"]);
3119        assert!(result.is_err());
3120        let err_msg = result.unwrap_err().to_string();
3121        assert!(err_msg.contains("nonexistent"));
3122        assert!(err_msg.contains("fast"));
3123    }
3124
3125    #[test]
3126    fn validate_verify_config_known_provider_returns_ok() {
3127        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3128        let config = make_verify_config("fast");
3129        let scheduler = DagScheduler::new(
3130            graph,
3131            &config,
3132            Box::new(FirstRouter),
3133            vec![make_def("worker")],
3134        )
3135        .unwrap();
3136        assert!(
3137            scheduler
3138                .validate_verify_config(&["fast", "quality"])
3139                .is_ok()
3140        );
3141    }
3142
3143    #[test]
3144    fn validate_verify_config_empty_provider_always_ok() {
3145        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3146        let config = make_verify_config("");
3147        let scheduler = DagScheduler::new(
3148            graph,
3149            &config,
3150            Box::new(FirstRouter),
3151            vec![make_def("worker")],
3152        )
3153        .unwrap();
3154        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3155    }
3156
3157    #[test]
3158    fn validate_verify_config_disabled_skips_validation() {
3159        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3160        // verify_completeness = false: no validation even with bogus provider name
3161        let scheduler = make_scheduler(graph);
3162        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3163    }
3164
3165    #[test]
3166    fn validate_verify_config_empty_pool_skips_validation() {
3167        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3168        let config = make_verify_config("nonexistent");
3169        let scheduler = DagScheduler::new(
3170            graph,
3171            &config,
3172            Box::new(FirstRouter),
3173            vec![make_def("worker")],
3174        )
3175        .unwrap();
3176        // Empty provider_names slice = unknown provider set, skip validation.
3177        assert!(scheduler.validate_verify_config(&[]).is_ok());
3178    }
3179
3180    #[test]
3181    fn validate_verify_config_trims_whitespace_in_config() {
3182        let graph = graph_from_nodes(vec![make_node(0, &[])]);
3183        // verify_provider with surrounding whitespace in config is trimmed at construction.
3184        let config = make_verify_config("  fast  ");
3185        let scheduler = DagScheduler::new(
3186            graph,
3187            &config,
3188            Box::new(FirstRouter),
3189            vec![make_def("worker")],
3190        )
3191        .unwrap();
3192        assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3193    }
3194
3195    // --- #2237 regression tests: max_parallel drift across replan cycles ---
3196
3197    #[test]
3198    fn config_max_parallel_initialized_from_config() {
3199        // config_max_parallel must always equal config.max_parallel, regardless
3200        // of whether topology analysis reduces max_parallel for the initial topology.
3201        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3202        let config = zeph_config::OrchestrationConfig {
3203            topology_selection: true,
3204            max_parallel: 6,
3205            ..make_config()
3206        };
3207        let scheduler = DagScheduler::new(
3208            graph,
3209            &config,
3210            Box::new(FirstRouter),
3211            vec![make_def("worker")],
3212        )
3213        .unwrap();
3214
3215        assert_eq!(
3216            scheduler.config_max_parallel, 6,
3217            "config_max_parallel must equal config.max_parallel"
3218        );
3219        // LinearChain reduces max_parallel to 1, but config_max_parallel stays at 6.
3220        assert_eq!(
3221            scheduler.max_parallel, 1,
3222            "max_parallel reduced by topology analysis"
3223        );
3224        assert_eq!(
3225            scheduler.config_max_parallel, 6,
3226            "config_max_parallel must not be reduced by topology"
3227        );
3228    }
3229
3230    #[test]
3231    fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3232        // Regression for #2237: successive inject_tasks+tick cycles with a Mixed graph
3233        // must not reduce max_parallel below compute_max_parallel(Mixed, config_max_parallel).
3234        //
3235        // Before the fix, the tick() dirty path used self.max_parallel as the base for
3236        // compute_max_parallel, so each replan cycle reduced it further:
3237        //   cycle 1: max_parallel = (4/2+1)      = 3
3238        //   cycle 2: max_parallel = (3/2+1)      = 2  ← drift!
3239        //   cycle 3: max_parallel = (2/2+1)      = 2
3240        //
3241        // After the fix, config_max_parallel=4 is always used as the base:
3242        //   all cycles: max_parallel = (4/2+1)   = 3  ← stable
3243        let graph = graph_from_nodes(vec![
3244            make_node(0, &[]),
3245            make_node(1, &[0]),
3246            make_node(2, &[0]),
3247            make_node(3, &[1, 2]), // diamond → Mixed
3248        ]);
3249        let config = zeph_config::OrchestrationConfig {
3250            topology_selection: true,
3251            max_parallel: 4,
3252            max_tasks: 50,
3253            ..make_config()
3254        };
3255        let mut scheduler = DagScheduler::new(
3256            graph,
3257            &config,
3258            Box::new(FirstRouter),
3259            vec![make_def("worker")],
3260        )
3261        .unwrap();
3262
3263        // Initial analysis: Mixed topology → max_parallel = (4/2+1) = 3.
3264        assert_eq!(
3265            scheduler.topology().topology,
3266            crate::topology::Topology::Mixed,
3267            "initial topology must be Mixed"
3268        );
3269        let expected_max_parallel = (4usize / 2 + 1).min(4).max(1); // = 3
3270        assert_eq!(scheduler.max_parallel, expected_max_parallel);
3271
3272        // Simulate inject_tasks (which sets topology_dirty=true) followed by tick().
3273        // The injected task depends on task 3 to keep the graph Mixed.
3274        let extra_task_id = 4u32;
3275        let extra_task = {
3276            let mut n = crate::graph::TaskNode::new(
3277                extra_task_id,
3278                "extra".to_string(),
3279                "extra task injected by replan",
3280            );
3281            n.depends_on = vec![TaskId(3)];
3282            n
3283        };
3284
3285        // inject_tasks requires the verified task to be Completed.
3286        scheduler.graph.tasks[3].status = TaskStatus::Completed;
3287
3288        scheduler
3289            .inject_tasks(TaskId(3), vec![extra_task], 50)
3290            .expect("inject must succeed");
3291        assert!(
3292            scheduler.topology_dirty,
3293            "topology_dirty must be true after inject"
3294        );
3295
3296        // First tick() after inject: re-analyzes topology. Must use config_max_parallel as base.
3297        let _ = scheduler.tick();
3298        let max_after_first_inject = scheduler.max_parallel;
3299        assert_eq!(
3300            max_after_first_inject, expected_max_parallel,
3301            "max_parallel must not drift after first inject+tick"
3302        );
3303
3304        // Second inject+tick cycle: max_parallel must still equal the original computed value.
3305        let extra_task2 = {
3306            let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3307            n.depends_on = vec![TaskId(extra_task_id)];
3308            n
3309        };
3310        scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3311        // Reset to created-like state to allow a second inject (per-task limit is 1,
3312        // so use a fresh task ID for the verified source).
3313        scheduler
3314            .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3315            .expect("second inject must succeed");
3316
3317        let _ = scheduler.tick();
3318        let max_after_second_inject = scheduler.max_parallel;
3319        assert_eq!(
3320            max_after_second_inject, expected_max_parallel,
3321            "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3322        );
3323    }
3324}