1use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::cascade::{CascadeConfig, CascadeDetector};
14use super::dag;
15use super::error::OrchestrationError;
16use super::graph::{
17 ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
18};
19use super::router::AgentRouter;
20use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
21use super::verifier::inject_tasks as verifier_inject_tasks;
22use zeph_config::OrchestrationConfig;
23use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
24use zeph_subagent::{SubAgentDef, SubAgentError};
25
26#[derive(Debug)]
31pub enum SchedulerAction {
32 Spawn {
34 task_id: TaskId,
35 agent_def_name: String,
36 prompt: String,
37 },
38 Cancel { agent_handle_id: String },
40 RunInline { task_id: TaskId, prompt: String },
42 Done { status: GraphStatus },
44 Verify { task_id: TaskId, output: String },
51}
52
53#[derive(Debug)]
55pub struct TaskEvent {
56 pub task_id: TaskId,
57 pub agent_handle_id: String,
58 pub outcome: TaskOutcome,
59}
60
61#[derive(Debug)]
63pub enum TaskOutcome {
64 Completed {
66 output: String,
67 artifacts: Vec<PathBuf>,
68 },
69 Failed { error: String },
71}
72
73struct RunningTask {
75 agent_handle_id: String,
76 agent_def_name: String,
77 started_at: Instant,
78}
79
80#[allow(clippy::struct_excessive_bools)]
106pub struct DagScheduler {
107 graph: TaskGraph,
108 max_parallel: usize,
109 config_max_parallel: usize,
116 running: HashMap<TaskId, RunningTask>,
118 event_rx: mpsc::Receiver<TaskEvent>,
120 event_tx: mpsc::Sender<TaskEvent>,
122 task_timeout: Duration,
124 router: Box<dyn AgentRouter>,
126 available_agents: Vec<SubAgentDef>,
128 dependency_context_budget: usize,
130 buffered_events: VecDeque<TaskEvent>,
132 sanitizer: ContentSanitizer,
134 deferral_backoff: Duration,
136 consecutive_spawn_failures: u32,
138 topology: TopologyAnalysis,
140 topology_dirty: bool,
143 current_level: usize,
145 verify_completeness: bool,
147 verify_provider: String,
150 task_replan_counts: HashMap<TaskId, u32>,
152 global_replan_count: u32,
154 max_replans: u32,
156 completeness_threshold_value: f32,
159 cascade_detector: Option<CascadeDetector>,
161 tree_optimized_dispatch: bool,
164 cascade_routing: bool,
166}
167
168impl std::fmt::Debug for DagScheduler {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("DagScheduler")
171 .field("graph_id", &self.graph.id)
172 .field("graph_status", &self.graph.status)
173 .field("running_count", &self.running.len())
174 .field("max_parallel", &self.max_parallel)
175 .field("task_timeout_secs", &self.task_timeout.as_secs())
176 .field("topology", &self.topology.topology)
177 .field("strategy", &self.topology.strategy)
178 .field("current_level", &self.current_level)
179 .field("global_replan_count", &self.global_replan_count)
180 .field("cascade_routing", &self.cascade_routing)
181 .field("tree_optimized_dispatch", &self.tree_optimized_dispatch)
182 .finish_non_exhaustive()
183 }
184}
185
186impl DagScheduler {
187 pub fn new(
197 mut graph: TaskGraph,
198 config: &OrchestrationConfig,
199 router: Box<dyn AgentRouter>,
200 available_agents: Vec<SubAgentDef>,
201 ) -> Result<Self, OrchestrationError> {
202 if graph.status != GraphStatus::Created {
203 return Err(OrchestrationError::InvalidGraph(format!(
204 "graph must be in Created status, got {}",
205 graph.status
206 )));
207 }
208
209 dag::validate(&graph.tasks, config.max_tasks as usize)?;
210
211 graph.status = GraphStatus::Running;
212
213 for task in &mut graph.tasks {
214 if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
215 task.status = TaskStatus::Ready;
216 }
217 }
218
219 let (event_tx, event_rx) = mpsc::channel(64);
220
221 let task_timeout = if config.task_timeout_secs > 0 {
222 Duration::from_secs(config.task_timeout_secs)
223 } else {
224 Duration::from_secs(600)
225 };
226
227 let topology = TopologyClassifier::analyze(&graph, config);
228 let max_parallel = topology.max_parallel;
229 let config_max_parallel = config.max_parallel as usize;
230
231 if config.topology_selection {
232 tracing::debug!(
233 topology = ?topology.topology,
234 strategy = ?topology.strategy,
235 max_parallel,
236 "topology-aware concurrency limit applied"
237 );
238 }
239
240 if config.cascade_routing && !config.topology_selection {
242 tracing::warn!(
243 "cascade_routing = true requires topology_selection = true; \
244 cascade routing is disabled (topology_selection is off)"
245 );
246 }
247
248 let cascade_detector = if config.cascade_routing && config.topology_selection {
249 Some(CascadeDetector::new(CascadeConfig {
250 failure_threshold: config.cascade_failure_threshold,
251 }))
252 } else {
253 None
254 };
255
256 Ok(Self {
257 graph,
258 max_parallel,
259 config_max_parallel,
260 running: HashMap::new(),
261 event_rx,
262 event_tx,
263 task_timeout,
264 router,
265 available_agents,
266 dependency_context_budget: config.dependency_context_budget,
267 buffered_events: VecDeque::new(),
268 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
269 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
270 consecutive_spawn_failures: 0,
271 topology,
272 topology_dirty: false,
273 current_level: 0,
274 verify_completeness: config.verify_completeness,
275 verify_provider: config.verify_provider.trim().to_string(),
276 task_replan_counts: HashMap::new(),
277 global_replan_count: 0,
278 max_replans: config.max_replans,
279 completeness_threshold_value: config.completeness_threshold,
280 cascade_detector,
281 tree_optimized_dispatch: config.tree_optimized_dispatch,
282 cascade_routing: config.cascade_routing && config.topology_selection,
283 })
284 }
285
286 pub fn resume_from(
300 mut graph: TaskGraph,
301 config: &OrchestrationConfig,
302 router: Box<dyn AgentRouter>,
303 available_agents: Vec<SubAgentDef>,
304 ) -> Result<Self, OrchestrationError> {
305 if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
306 return Err(OrchestrationError::InvalidGraph(format!(
307 "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
308 graph.status
309 )));
310 }
311
312 graph.status = GraphStatus::Running;
315
316 let running: HashMap<TaskId, RunningTask> = graph
321 .tasks
322 .iter()
323 .filter(|t| t.status == TaskStatus::Running)
324 .filter_map(|t| {
325 let handle_id = t.assigned_agent.clone()?;
326 let def_name = t.agent_hint.clone().unwrap_or_default();
327 Some((
328 t.id,
329 RunningTask {
330 agent_handle_id: handle_id,
331 agent_def_name: def_name,
332 started_at: Instant::now(),
334 },
335 ))
336 })
337 .collect();
338
339 let (event_tx, event_rx) = mpsc::channel(64);
340
341 let task_timeout = if config.task_timeout_secs > 0 {
342 Duration::from_secs(config.task_timeout_secs)
343 } else {
344 Duration::from_secs(600)
345 };
346
347 let topology = TopologyClassifier::analyze(&graph, config);
348 let max_parallel = topology.max_parallel;
349 let config_max_parallel = config.max_parallel as usize;
350
351 let cascade_detector = if config.cascade_routing && config.topology_selection {
352 Some(CascadeDetector::new(CascadeConfig {
353 failure_threshold: config.cascade_failure_threshold,
354 }))
355 } else {
356 None
357 };
358
359 Ok(Self {
360 graph,
361 max_parallel,
362 config_max_parallel,
363 running,
364 event_rx,
365 event_tx,
366 task_timeout,
367 router,
368 available_agents,
369 dependency_context_budget: config.dependency_context_budget,
370 buffered_events: VecDeque::new(),
371 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
372 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
373 consecutive_spawn_failures: 0,
374 topology,
375 topology_dirty: false,
376 current_level: 0,
377 verify_completeness: config.verify_completeness,
378 verify_provider: config.verify_provider.trim().to_string(),
379 task_replan_counts: HashMap::new(),
380 global_replan_count: 0,
381 max_replans: config.max_replans,
382 completeness_threshold_value: config.completeness_threshold,
383 cascade_detector,
384 tree_optimized_dispatch: config.tree_optimized_dispatch,
385 cascade_routing: config.cascade_routing && config.topology_selection,
386 })
387 }
388
389 pub fn validate_verify_config(
403 &self,
404 provider_names: &[&str],
405 ) -> Result<(), OrchestrationError> {
406 if !self.verify_completeness {
407 return Ok(());
408 }
409 let name = self.verify_provider.as_str();
410 if name.is_empty() || provider_names.is_empty() {
411 return Ok(());
412 }
413 if !provider_names.contains(&name) {
414 return Err(OrchestrationError::InvalidConfig(format!(
415 "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
416 name,
417 provider_names.join(", ")
418 )));
419 }
420 Ok(())
421 }
422
423 #[must_use]
425 pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
426 self.event_tx.clone()
427 }
428
429 #[must_use]
431 pub fn graph(&self) -> &TaskGraph {
432 &self.graph
433 }
434
435 #[must_use]
439 pub fn into_graph(&self) -> TaskGraph {
440 self.graph.clone()
441 }
442
443 #[must_use]
445 pub fn topology(&self) -> &TopologyAnalysis {
446 &self.topology
447 }
448
449 #[must_use]
454 pub fn completeness_threshold(&self) -> f32 {
455 self.completeness_threshold_value
456 }
457
458 #[must_use]
460 pub fn verify_provider_name(&self) -> &str {
461 &self.verify_provider
462 }
463
464 #[must_use]
468 pub fn max_replans_remaining(&self) -> u32 {
469 self.max_replans.saturating_sub(self.global_replan_count)
470 }
471
472 pub fn record_whole_plan_replan(&mut self) {
477 self.global_replan_count = self.global_replan_count.saturating_add(1);
478 }
479
480 pub fn inject_tasks(
495 &mut self,
496 verified_task_id: TaskId,
497 new_tasks: Vec<TaskNode>,
498 max_tasks: usize,
499 ) -> Result<(), OrchestrationError> {
500 if new_tasks.is_empty() {
501 return Ok(());
502 }
503
504 let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
506 if *task_replan_count >= 1 {
507 tracing::warn!(
508 task_id = %verified_task_id,
509 "per-task replan limit (1) reached, skipping replan injection"
510 );
511 return Ok(());
512 }
513
514 if self.global_replan_count >= self.max_replans {
516 tracing::warn!(
517 global_replan_count = self.global_replan_count,
518 max_replans = self.max_replans,
519 "global replan limit reached, skipping replan injection"
520 );
521 return Ok(());
522 }
523
524 verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
525
526 *task_replan_count += 1;
527 self.global_replan_count += 1;
528
529 self.topology_dirty = true;
531
532 if let Some(ref mut det) = self.cascade_detector {
534 det.reset();
535 }
536
537 Ok(())
538 }
539}
540
541impl Drop for DagScheduler {
542 fn drop(&mut self) {
543 if !self.running.is_empty() {
544 tracing::warn!(
545 running_tasks = self.running.len(),
546 "DagScheduler dropped with running tasks; agents may continue until their \
547 CancellationToken fires or they complete naturally"
548 );
549 }
550 }
551}
552
553impl DagScheduler {
554 #[allow(clippy::too_many_lines)]
558 pub fn tick(&mut self) -> Vec<SchedulerAction> {
559 if self.graph.status != GraphStatus::Running {
560 return vec![SchedulerAction::Done {
561 status: self.graph.status,
562 }];
563 }
564
565 if self.topology_dirty {
568 let new_analysis = {
572 let n = self.graph.tasks.len();
573 if n == 0 {
574 TopologyAnalysis {
575 topology: Topology::AllParallel,
576 strategy: DispatchStrategy::FullParallel,
577 max_parallel: self.config_max_parallel,
580 depth: 0,
581 depths: std::collections::HashMap::new(),
582 }
583 } else {
584 let (depth, depths) =
588 super::topology::compute_depths_for_scheduler(&self.graph);
589 let topo =
590 TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
591 let strategy_config = zeph_config::OrchestrationConfig {
594 cascade_routing: self.cascade_routing,
595 tree_optimized_dispatch: self.tree_optimized_dispatch,
596 ..zeph_config::OrchestrationConfig::default()
597 };
598 let strategy = TopologyClassifier::strategy(topo, &strategy_config);
599 let max_parallel =
602 TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
603 TopologyAnalysis {
604 topology: topo,
605 strategy,
606 max_parallel,
607 depth,
608 depths,
609 }
610 }
611 };
612 self.topology = new_analysis;
613 self.max_parallel = self.topology.max_parallel;
617 self.topology_dirty = false;
618
619 if self.topology.strategy == DispatchStrategy::LevelBarrier {
624 let min_active = self
625 .graph
626 .tasks
627 .iter()
628 .filter(|t| !t.status.is_terminal())
629 .filter_map(|t| self.topology.depths.get(&t.id).copied())
630 .min();
631 if let Some(min_depth) = min_active {
632 self.current_level = self.current_level.min(min_depth);
633 }
634 }
635 }
636
637 let mut actions = Vec::new();
638
639 while let Some(event) = self.buffered_events.pop_front() {
641 let cancel_actions = self.process_event(event);
642 actions.extend(cancel_actions);
643 }
644 while let Ok(event) = self.event_rx.try_recv() {
645 let cancel_actions = self.process_event(event);
646 actions.extend(cancel_actions);
647 }
648
649 if self.graph.status != GraphStatus::Running {
650 return actions;
651 }
652
653 let timeout_actions = self.check_timeouts();
655 actions.extend(timeout_actions);
656
657 if self.graph.status != GraphStatus::Running {
658 return actions;
659 }
660
661 let raw_ready = dag::ready_tasks(&self.graph);
667
668 let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::CascadeAware {
672 if let Some(ref detector) = self.cascade_detector {
673 let deprioritized = detector.deprioritized_tasks(&self.graph);
674 if deprioritized.is_empty() {
675 raw_ready
676 } else {
677 let (preferred, deferred): (Vec<_>, Vec<_>) =
678 raw_ready.into_iter().partition(|id| {
679 let is_sequential = self.graph.tasks[id.index()].execution_mode
680 == ExecutionMode::Sequential;
681 is_sequential || !deprioritized.contains(id)
683 });
684 preferred.into_iter().chain(deferred).collect()
685 }
686 } else {
687 raw_ready
688 }
689 } else {
690 raw_ready
691 };
692
693 let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::TreeOptimized {
697 let max_depth = self.topology.depth;
698 let mut sortable = ready;
699 sortable.sort_by_key(|id| {
700 let task_depth = self.topology.depths.get(id).copied().unwrap_or(0);
701 max_depth.saturating_sub(task_depth)
703 });
704 sortable
705 } else {
706 ready
707 };
708
709 if self.topology.strategy == DispatchStrategy::LevelBarrier {
713 let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
714 let task_depth = self
715 .topology
716 .depths
717 .get(&t.id)
718 .copied()
719 .unwrap_or(usize::MAX);
720 task_depth != self.current_level || t.status.is_terminal()
721 });
722 if all_current_level_terminal {
723 let max_depth = self.topology.depth;
725 while self.current_level <= max_depth {
726 let has_non_terminal = self.graph.tasks.iter().any(|t| {
727 let d = self
728 .topology
729 .depths
730 .get(&t.id)
731 .copied()
732 .unwrap_or(usize::MAX);
733 d == self.current_level && !t.status.is_terminal()
734 });
735 if has_non_terminal {
736 break;
737 }
738 self.current_level += 1;
739 }
740 }
741 }
742
743 let mut slots = self.max_parallel.saturating_sub(self.running.len());
745
746 let mut sequential_spawned_this_tick = false;
749 let has_running_sequential = self
750 .running
751 .keys()
752 .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
753
754 for task_id in ready {
755 if slots == 0 {
756 break;
757 }
758
759 if self.topology.strategy == DispatchStrategy::LevelBarrier {
761 let task_depth = self
762 .topology
763 .depths
764 .get(&task_id)
765 .copied()
766 .unwrap_or(usize::MAX);
767 if task_depth != self.current_level {
768 continue;
769 }
770 }
771
772 let task = &self.graph.tasks[task_id.index()];
773
774 if task.execution_mode == ExecutionMode::Sequential {
778 if sequential_spawned_this_tick || has_running_sequential {
779 continue;
780 }
781 sequential_spawned_this_tick = true;
782 }
783
784 let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
785 tracing::debug!(
786 task_id = %task_id,
787 title = %task.title,
788 "no agent available, routing task to main agent inline"
789 );
790 let prompt = self.build_task_prompt(task);
791 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
792 actions.push(SchedulerAction::RunInline { task_id, prompt });
793 slots -= 1;
794 continue;
795 };
796
797 let prompt = self.build_task_prompt(task);
798
799 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
801
802 actions.push(SchedulerAction::Spawn {
803 task_id,
804 agent_def_name,
805 prompt,
806 });
807 slots -= 1;
808 }
809
810 let running_in_graph_now = self
821 .graph
822 .tasks
823 .iter()
824 .filter(|t| t.status == TaskStatus::Running)
825 .count();
826 if running_in_graph_now == 0 && self.running.is_empty() {
827 let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
828 if all_terminal {
829 self.graph.status = GraphStatus::Completed;
830 self.graph.finished_at = Some(super::graph::chrono_now());
831 actions.push(SchedulerAction::Done {
832 status: GraphStatus::Completed,
833 });
834 } else if dag::ready_tasks(&self.graph).is_empty() {
835 tracing::error!(
836 "scheduler deadlock: no running or ready tasks, but graph not complete"
837 );
838 self.graph.status = GraphStatus::Failed;
839 self.graph.finished_at = Some(super::graph::chrono_now());
840 debug_assert!(
842 self.running.is_empty(),
843 "deadlock branch reached with non-empty running map"
844 );
845 for task in &mut self.graph.tasks {
846 if !task.status.is_terminal() {
847 task.status = TaskStatus::Canceled;
848 }
849 }
850 actions.push(SchedulerAction::Done {
851 status: GraphStatus::Failed,
852 });
853 }
854 }
855
856 actions
857 }
858
859 fn current_deferral_backoff(&self) -> Duration {
868 const MAX_BACKOFF: Duration = Duration::from_secs(5);
869 let multiplier = 1u32
870 .checked_shl(self.consecutive_spawn_failures.min(10))
871 .unwrap_or(u32::MAX);
872 self.deferral_backoff
873 .saturating_mul(multiplier)
874 .min(MAX_BACKOFF)
875 }
876
877 pub async fn wait_event(&mut self) {
878 if self.running.is_empty() {
879 tokio::time::sleep(self.current_deferral_backoff()).await;
880 return;
881 }
882
883 let nearest_timeout = self
885 .running
886 .values()
887 .map(|r| {
888 self.task_timeout
889 .checked_sub(r.started_at.elapsed())
890 .unwrap_or(Duration::ZERO)
891 })
892 .min()
893 .unwrap_or(Duration::from_secs(1));
894
895 let wait_duration = nearest_timeout.max(Duration::from_millis(100));
897
898 tokio::select! {
899 Some(event) = self.event_rx.recv() => {
900 if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
904 if let Some(dropped) = self.buffered_events.pop_front() {
907 tracing::error!(
908 task_id = %dropped.task_id,
909 buffer_len = self.buffered_events.len(),
910 "event buffer saturated; completion event dropped — task may \
911 remain Running until timeout"
912 );
913 }
914 }
915 self.buffered_events.push_back(event);
916 }
917 () = tokio::time::sleep(wait_duration) => {}
918 }
919 }
920
921 pub fn record_spawn(
931 &mut self,
932 task_id: TaskId,
933 agent_handle_id: String,
934 agent_def_name: String,
935 ) {
936 self.consecutive_spawn_failures = 0;
937 self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
938 self.running.insert(
939 task_id,
940 RunningTask {
941 agent_handle_id,
942 agent_def_name,
943 started_at: Instant::now(),
944 },
945 );
946 }
947
948 pub fn record_spawn_failure(
959 &mut self,
960 task_id: TaskId,
961 error: &SubAgentError,
962 ) -> Vec<SchedulerAction> {
963 if let SubAgentError::ConcurrencyLimit { active, max } = error {
967 tracing::warn!(
968 task_id = %task_id,
969 active,
970 max,
971 next_backoff_ms = self.current_deferral_backoff().as_millis(),
972 "concurrency limit reached, deferring task to next tick"
973 );
974 self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
975 return Vec::new();
976 }
977
978 let error_excerpt: String = error.to_string().chars().take(512).collect();
980 tracing::warn!(
981 task_id = %task_id,
982 error = %error_excerpt,
983 "spawn failed, marking task failed"
984 );
985 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
986 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
987 let mut actions = Vec::new();
988 for cancel_task_id in cancel_ids {
989 if let Some(running) = self.running.remove(&cancel_task_id) {
990 actions.push(SchedulerAction::Cancel {
991 agent_handle_id: running.agent_handle_id,
992 });
993 }
994 }
995 if self.graph.status != GraphStatus::Running {
996 self.graph.finished_at = Some(super::graph::chrono_now());
997 actions.push(SchedulerAction::Done {
998 status: self.graph.status,
999 });
1000 }
1001 actions
1002 }
1003
1004 pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
1013 if any_success {
1014 self.consecutive_spawn_failures = 0;
1015 } else if any_concurrency_failure {
1016 self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
1017 }
1018 }
1019
1020 pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
1029 self.graph.status = GraphStatus::Canceled;
1030 self.graph.finished_at = Some(super::graph::chrono_now());
1031
1032 let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
1034 let mut actions: Vec<SchedulerAction> = running
1035 .into_iter()
1036 .map(|(task_id, r)| {
1037 self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
1038 SchedulerAction::Cancel {
1039 agent_handle_id: r.agent_handle_id,
1040 }
1041 })
1042 .collect();
1043
1044 for task in &mut self.graph.tasks {
1045 if !task.status.is_terminal() {
1046 task.status = TaskStatus::Canceled;
1047 }
1048 }
1049
1050 actions.push(SchedulerAction::Done {
1051 status: GraphStatus::Canceled,
1052 });
1053 actions
1054 }
1055}
1056
1057impl DagScheduler {
1058 fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
1060 let TaskEvent {
1061 task_id,
1062 agent_handle_id,
1063 outcome,
1064 } = event;
1065
1066 match self.running.get(&task_id) {
1069 Some(running) if running.agent_handle_id != agent_handle_id => {
1070 tracing::warn!(
1071 task_id = %task_id,
1072 expected = %running.agent_handle_id,
1073 got = %agent_handle_id,
1074 "discarding stale event from previous agent incarnation"
1075 );
1076 return Vec::new();
1077 }
1078 None => {
1079 tracing::debug!(
1080 task_id = %task_id,
1081 agent_handle_id = %agent_handle_id,
1082 "ignoring event for task not in running map"
1083 );
1084 return Vec::new();
1085 }
1086 Some(_) => {}
1087 }
1088
1089 let duration_ms = self.running.get(&task_id).map_or(0, |r| {
1091 u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
1092 });
1093 let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
1094
1095 self.running.remove(&task_id);
1096
1097 match outcome {
1098 TaskOutcome::Completed { output, artifacts } => {
1099 self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
1100 self.graph.tasks[task_id.index()].result = Some(TaskResult {
1101 output: output.clone(),
1102 artifacts,
1103 duration_ms,
1104 agent_id: Some(agent_handle_id),
1105 agent_def: agent_def_name,
1106 });
1107
1108 if let Some(ref mut detector) = self.cascade_detector {
1110 detector.record_outcome(task_id, true, &self.graph);
1111 }
1112
1113 let newly_ready = dag::ready_tasks(&self.graph);
1116 for ready_id in newly_ready {
1117 if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
1118 self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
1119 }
1120 }
1121
1122 if self.verify_completeness {
1128 vec![SchedulerAction::Verify { task_id, output }]
1129 } else {
1130 Vec::new()
1131 }
1132 }
1133
1134 TaskOutcome::Failed { error } => {
1135 let error_excerpt: String = error.chars().take(512).collect();
1137 tracing::warn!(
1138 task_id = %task_id,
1139 error = %error_excerpt,
1140 "task failed"
1141 );
1142 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1143
1144 if let Some(ref mut detector) = self.cascade_detector {
1146 detector.record_outcome(task_id, false, &self.graph);
1147 }
1148
1149 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1150 let mut actions = Vec::new();
1151
1152 for cancel_task_id in cancel_ids {
1153 if let Some(running) = self.running.remove(&cancel_task_id) {
1154 actions.push(SchedulerAction::Cancel {
1155 agent_handle_id: running.agent_handle_id,
1156 });
1157 }
1158 }
1159
1160 if self.graph.status != GraphStatus::Running {
1161 self.graph.finished_at = Some(super::graph::chrono_now());
1162 actions.push(SchedulerAction::Done {
1163 status: self.graph.status,
1164 });
1165 }
1166
1167 actions
1168 }
1169 }
1170 }
1171
1172 fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1180 let timed_out: Vec<(TaskId, String)> = self
1181 .running
1182 .iter()
1183 .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1184 .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1185 .collect();
1186
1187 let mut actions = Vec::new();
1188 for (task_id, agent_handle_id) in timed_out {
1189 tracing::warn!(
1190 task_id = %task_id,
1191 timeout_secs = self.task_timeout.as_secs(),
1192 "task timed out"
1193 );
1194 self.running.remove(&task_id);
1195 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1196
1197 actions.push(SchedulerAction::Cancel { agent_handle_id });
1198
1199 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1200 for cancel_task_id in cancel_ids {
1201 if let Some(running) = self.running.remove(&cancel_task_id) {
1202 actions.push(SchedulerAction::Cancel {
1203 agent_handle_id: running.agent_handle_id,
1204 });
1205 }
1206 }
1207
1208 if self.graph.status != GraphStatus::Running {
1209 self.graph.finished_at = Some(super::graph::chrono_now());
1210 actions.push(SchedulerAction::Done {
1211 status: self.graph.status,
1212 });
1213 break;
1214 }
1215 }
1216
1217 actions
1218 }
1219
1220 fn build_task_prompt(&self, task: &TaskNode) -> String {
1226 if task.depends_on.is_empty() {
1227 return task.description.clone();
1228 }
1229
1230 let completed_deps: Vec<&TaskNode> = task
1231 .depends_on
1232 .iter()
1233 .filter_map(|dep_id| {
1234 let dep = &self.graph.tasks[dep_id.index()];
1235 if dep.status == TaskStatus::Completed {
1236 Some(dep)
1237 } else {
1238 None
1239 }
1240 })
1241 .collect();
1242
1243 if completed_deps.is_empty() {
1244 return task.description.clone();
1245 }
1246
1247 let budget_per_dep = self
1248 .dependency_context_budget
1249 .checked_div(completed_deps.len())
1250 .unwrap_or(self.dependency_context_budget);
1251
1252 let mut context_block = String::from("<completed-dependencies>\n");
1253
1254 for dep in &completed_deps {
1255 let escaped_id = xml_escape(&dep.id.to_string());
1258 let escaped_title = xml_escape(&dep.title);
1259 let _ = writeln!(
1260 context_block,
1261 "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1262 );
1263
1264 if let Some(ref result) = dep.result {
1265 let source = ContentSource::new(ContentSourceKind::A2aMessage);
1267 let sanitized = self.sanitizer.sanitize(&result.output, source);
1268 let safe_output = sanitized.body;
1269
1270 let char_count = safe_output.chars().count();
1272 if char_count > budget_per_dep {
1273 let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1274 let _ = write!(
1275 context_block,
1276 "{truncated}...\n[truncated: {char_count} chars total]"
1277 );
1278 } else {
1279 context_block.push_str(&safe_output);
1280 }
1281 } else {
1282 context_block.push_str("[no output recorded]\n");
1283 }
1284 context_block.push('\n');
1285 }
1286
1287 for dep_id in &task.depends_on {
1289 let dep = &self.graph.tasks[dep_id.index()];
1290 if dep.status == TaskStatus::Skipped {
1291 let escaped_id = xml_escape(&dep.id.to_string());
1292 let escaped_title = xml_escape(&dep.title);
1293 let _ = writeln!(
1294 context_block,
1295 "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1296 );
1297 }
1298 }
1299
1300 context_block.push_str("</completed-dependencies>\n\n");
1301 format!("{context_block}Your task: {}", task.description)
1302 }
1303}
1304
1305fn xml_escape(s: &str) -> String {
1307 let mut out = String::with_capacity(s.len());
1308 for c in s.chars() {
1309 match c {
1310 '<' => out.push_str("<"),
1311 '>' => out.push_str(">"),
1312 '&' => out.push_str("&"),
1313 '"' => out.push_str("""),
1314 '\'' => out.push_str("'"),
1315 other => out.push(other),
1316 }
1317 }
1318 out
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323 #![allow(clippy::default_trait_access)]
1324
1325 use super::*;
1326 use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1327
1328 fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1329 let mut n = TaskNode::new(
1330 id,
1331 format!("task-{id}"),
1332 format!("description for task {id}"),
1333 );
1334 n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1335 n
1336 }
1337
1338 fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1339 let mut g = TaskGraph::new("test goal");
1340 g.tasks = nodes;
1341 g
1342 }
1343
1344 fn make_def(name: &str) -> SubAgentDef {
1345 use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1346 SubAgentDef {
1347 name: name.to_string(),
1348 description: format!("{name} agent"),
1349 model: None,
1350 tools: ToolPolicy::InheritAll,
1351 disallowed_tools: vec![],
1352 permissions: SubAgentPermissions::default(),
1353 skills: SkillFilter::default(),
1354 system_prompt: String::new(),
1355 hooks: SubagentHooks::default(),
1356 memory: None,
1357 source: None,
1358 file_path: None,
1359 }
1360 }
1361
1362 fn make_config() -> zeph_config::OrchestrationConfig {
1363 zeph_config::OrchestrationConfig {
1364 enabled: true,
1365 max_tasks: 20,
1366 max_parallel: 4,
1367 default_failure_strategy: "abort".to_string(),
1368 default_max_retries: 3,
1369 task_timeout_secs: 300,
1370 planner_provider: String::new(),
1371 planner_max_tokens: 4096,
1372 dependency_context_budget: 16384,
1373 confirm_before_execute: true,
1374 aggregator_max_tokens: 4096,
1375 deferral_backoff_ms: 250,
1376 plan_cache: zeph_config::PlanCacheConfig::default(),
1377 topology_selection: false,
1378 verify_provider: String::new(),
1379 verify_max_tokens: 1024,
1380 max_replans: 2,
1381 verify_completeness: false,
1382 completeness_threshold: 0.7,
1383 tool_provider: String::new(),
1384 cascade_routing: false,
1385 cascade_failure_threshold: 0.5,
1386 tree_optimized_dispatch: false,
1387 }
1388 }
1389
1390 struct FirstRouter;
1391 impl AgentRouter for FirstRouter {
1392 fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1393 available.first().map(|d| d.name.clone())
1394 }
1395 }
1396
1397 struct NoneRouter;
1398 impl AgentRouter for NoneRouter {
1399 fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1400 None
1401 }
1402 }
1403
1404 fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1405 let config = make_config();
1406 let defs = vec![make_def("worker")];
1407 DagScheduler::new(graph, &config, router, defs).unwrap()
1408 }
1409
1410 fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1411 let config = make_config();
1412 let defs = vec![make_def("worker")];
1413 DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1414 }
1415
1416 #[test]
1419 fn test_new_validates_graph_status() {
1420 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1421 graph.status = GraphStatus::Running; let config = make_config();
1423 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1424 assert!(result.is_err());
1425 let err = result.unwrap_err();
1426 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1427 }
1428
1429 #[test]
1430 fn test_new_marks_roots_ready() {
1431 let graph = graph_from_nodes(vec![
1432 make_node(0, &[]),
1433 make_node(1, &[]),
1434 make_node(2, &[0, 1]),
1435 ]);
1436 let scheduler = make_scheduler(graph);
1437 assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1438 assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1439 assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1440 assert_eq!(scheduler.graph().status, GraphStatus::Running);
1441 }
1442
1443 #[test]
1444 fn test_new_validates_empty_graph() {
1445 let graph = graph_from_nodes(vec![]);
1446 let config = make_config();
1447 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1448 assert!(result.is_err());
1449 }
1450
1451 #[test]
1454 fn test_tick_produces_spawn_for_ready() {
1455 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1456 let mut scheduler = make_scheduler(graph);
1457 let actions = scheduler.tick();
1458 let spawns: Vec<_> = actions
1459 .iter()
1460 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1461 .collect();
1462 assert_eq!(spawns.len(), 2);
1463 }
1464
1465 #[test]
1466 fn test_tick_dispatches_all_regardless_of_max_parallel() {
1467 let graph = graph_from_nodes(vec![
1470 make_node(0, &[]),
1471 make_node(1, &[]),
1472 make_node(2, &[]),
1473 make_node(3, &[]),
1474 make_node(4, &[]),
1475 ]);
1476 let mut config = make_config();
1477 config.max_parallel = 2;
1478 let defs = vec![make_def("worker")];
1479 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1480 let actions = scheduler.tick();
1481 let spawn_count = actions
1482 .iter()
1483 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1484 .count();
1485 assert_eq!(
1486 spawn_count, 2,
1487 "max_parallel=2 caps dispatched tasks per tick"
1488 );
1489 }
1490
1491 #[test]
1492 fn test_tick_detects_completion() {
1493 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1494 graph.tasks[0].status = TaskStatus::Completed;
1495 let config = make_config();
1496 let defs = vec![make_def("worker")];
1497 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1498 let actions = scheduler.tick();
1501 let has_done = actions.iter().any(|a| {
1502 matches!(
1503 a,
1504 SchedulerAction::Done {
1505 status: GraphStatus::Completed
1506 }
1507 )
1508 });
1509 assert!(
1510 has_done,
1511 "should emit Done(Completed) when all tasks are terminal"
1512 );
1513 }
1514
1515 #[test]
1518 fn test_completion_event_marks_deps_ready() {
1519 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1520 let mut scheduler = make_scheduler(graph);
1521
1522 scheduler.graph.tasks[0].status = TaskStatus::Running;
1524 scheduler.running.insert(
1525 TaskId(0),
1526 RunningTask {
1527 agent_handle_id: "handle-0".to_string(),
1528 agent_def_name: "worker".to_string(),
1529 started_at: Instant::now(),
1530 },
1531 );
1532
1533 let event = TaskEvent {
1534 task_id: TaskId(0),
1535 agent_handle_id: "handle-0".to_string(),
1536 outcome: TaskOutcome::Completed {
1537 output: "done".to_string(),
1538 artifacts: vec![],
1539 },
1540 };
1541 scheduler.buffered_events.push_back(event);
1542
1543 let actions = scheduler.tick();
1544 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1545 let has_spawn_1 = actions
1547 .iter()
1548 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1549 assert!(
1550 has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1551 "task 1 should be spawned or marked Ready"
1552 );
1553 }
1554
1555 #[test]
1556 fn test_failure_abort_cancels_running() {
1557 let graph = graph_from_nodes(vec![
1558 make_node(0, &[]),
1559 make_node(1, &[]),
1560 make_node(2, &[0, 1]),
1561 ]);
1562 let mut scheduler = make_scheduler(graph);
1563
1564 scheduler.graph.tasks[0].status = TaskStatus::Running;
1566 scheduler.running.insert(
1567 TaskId(0),
1568 RunningTask {
1569 agent_handle_id: "h0".to_string(),
1570 agent_def_name: "worker".to_string(),
1571 started_at: Instant::now(),
1572 },
1573 );
1574 scheduler.graph.tasks[1].status = TaskStatus::Running;
1575 scheduler.running.insert(
1576 TaskId(1),
1577 RunningTask {
1578 agent_handle_id: "h1".to_string(),
1579 agent_def_name: "worker".to_string(),
1580 started_at: Instant::now(),
1581 },
1582 );
1583
1584 let event = TaskEvent {
1586 task_id: TaskId(0),
1587 agent_handle_id: "h0".to_string(),
1588 outcome: TaskOutcome::Failed {
1589 error: "boom".to_string(),
1590 },
1591 };
1592 scheduler.buffered_events.push_back(event);
1593
1594 let actions = scheduler.tick();
1595 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1596 let cancel_ids: Vec<_> = actions
1597 .iter()
1598 .filter_map(|a| {
1599 if let SchedulerAction::Cancel { agent_handle_id } = a {
1600 Some(agent_handle_id.as_str())
1601 } else {
1602 None
1603 }
1604 })
1605 .collect();
1606 assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1607 assert!(
1608 actions
1609 .iter()
1610 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1611 );
1612 }
1613
1614 #[test]
1615 fn test_failure_skip_propagates() {
1616 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1617 let mut scheduler = make_scheduler(graph);
1618
1619 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1621 scheduler.graph.tasks[0].status = TaskStatus::Running;
1622 scheduler.running.insert(
1623 TaskId(0),
1624 RunningTask {
1625 agent_handle_id: "h0".to_string(),
1626 agent_def_name: "worker".to_string(),
1627 started_at: Instant::now(),
1628 },
1629 );
1630
1631 let event = TaskEvent {
1632 task_id: TaskId(0),
1633 agent_handle_id: "h0".to_string(),
1634 outcome: TaskOutcome::Failed {
1635 error: "skip me".to_string(),
1636 },
1637 };
1638 scheduler.buffered_events.push_back(event);
1639 scheduler.tick();
1640
1641 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1642 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1643 }
1644
1645 #[test]
1646 fn test_failure_retry_reschedules() {
1647 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1648 let mut scheduler = make_scheduler(graph);
1649
1650 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1651 scheduler.graph.tasks[0].max_retries = Some(3);
1652 scheduler.graph.tasks[0].retry_count = 0;
1653 scheduler.graph.tasks[0].status = TaskStatus::Running;
1654 scheduler.running.insert(
1655 TaskId(0),
1656 RunningTask {
1657 agent_handle_id: "h0".to_string(),
1658 agent_def_name: "worker".to_string(),
1659 started_at: Instant::now(),
1660 },
1661 );
1662
1663 let event = TaskEvent {
1664 task_id: TaskId(0),
1665 agent_handle_id: "h0".to_string(),
1666 outcome: TaskOutcome::Failed {
1667 error: "transient".to_string(),
1668 },
1669 };
1670 scheduler.buffered_events.push_back(event);
1671 let actions = scheduler.tick();
1672
1673 let has_spawn = actions
1675 .iter()
1676 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1677 assert!(
1678 has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1679 "retry should produce spawn or Ready status"
1680 );
1681 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1683 }
1684
1685 #[test]
1686 fn test_process_event_failed_retry() {
1687 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1689 let mut scheduler = make_scheduler(graph);
1690
1691 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1692 scheduler.graph.tasks[0].max_retries = Some(2);
1693 scheduler.graph.tasks[0].retry_count = 0;
1694 scheduler.graph.tasks[0].status = TaskStatus::Running;
1695 scheduler.running.insert(
1696 TaskId(0),
1697 RunningTask {
1698 agent_handle_id: "h0".to_string(),
1699 agent_def_name: "worker".to_string(),
1700 started_at: Instant::now(),
1701 },
1702 );
1703
1704 let event = TaskEvent {
1705 task_id: TaskId(0),
1706 agent_handle_id: "h0".to_string(),
1707 outcome: TaskOutcome::Failed {
1708 error: "first failure".to_string(),
1709 },
1710 };
1711 scheduler.buffered_events.push_back(event);
1712 let actions = scheduler.tick();
1713
1714 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1716 let spawned = actions
1717 .iter()
1718 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1719 assert!(
1720 spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1721 "retry should emit Spawn or set Ready"
1722 );
1723 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1725 }
1726
1727 #[test]
1728 fn test_timeout_cancels_stalled() {
1729 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1730 let mut config = make_config();
1731 config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
1733 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1734
1735 scheduler.graph.tasks[0].status = TaskStatus::Running;
1737 scheduler.running.insert(
1738 TaskId(0),
1739 RunningTask {
1740 agent_handle_id: "h0".to_string(),
1741 agent_def_name: "worker".to_string(),
1742 started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
1744 );
1745
1746 let actions = scheduler.tick();
1747 let has_cancel = actions.iter().any(
1748 |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1749 );
1750 assert!(has_cancel, "timed-out task should emit Cancel action");
1751 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1752 }
1753
1754 #[test]
1755 fn test_cancel_all() {
1756 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1757 let mut scheduler = make_scheduler(graph);
1758
1759 scheduler.graph.tasks[0].status = TaskStatus::Running;
1760 scheduler.running.insert(
1761 TaskId(0),
1762 RunningTask {
1763 agent_handle_id: "h0".to_string(),
1764 agent_def_name: "worker".to_string(),
1765 started_at: Instant::now(),
1766 },
1767 );
1768 scheduler.graph.tasks[1].status = TaskStatus::Running;
1769 scheduler.running.insert(
1770 TaskId(1),
1771 RunningTask {
1772 agent_handle_id: "h1".to_string(),
1773 agent_def_name: "worker".to_string(),
1774 started_at: Instant::now(),
1775 },
1776 );
1777
1778 let actions = scheduler.cancel_all();
1779
1780 assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1781 assert!(scheduler.running.is_empty());
1782 let cancel_count = actions
1783 .iter()
1784 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1785 .count();
1786 assert_eq!(cancel_count, 2);
1787 assert!(actions.iter().any(|a| matches!(
1788 a,
1789 SchedulerAction::Done {
1790 status: GraphStatus::Canceled
1791 }
1792 )));
1793 }
1794
1795 #[test]
1796 fn test_record_spawn_failure() {
1797 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1798 let mut scheduler = make_scheduler(graph);
1799
1800 scheduler.graph.tasks[0].status = TaskStatus::Running;
1802
1803 let error = SubAgentError::Spawn("spawn error".to_string());
1804 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1805 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1806 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1808 assert!(
1809 actions
1810 .iter()
1811 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1812 );
1813 }
1814
1815 #[test]
1816 fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1817 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1818 let mut scheduler = make_scheduler(graph);
1819
1820 scheduler.graph.tasks[0].status = TaskStatus::Running;
1822
1823 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1825 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1826 assert_eq!(
1827 scheduler.graph.tasks[0].status,
1828 TaskStatus::Ready,
1829 "task must revert to Ready so the next tick can retry"
1830 );
1831 assert_eq!(
1832 scheduler.graph.status,
1833 GraphStatus::Running,
1834 "graph must stay Running, not transition to Failed"
1835 );
1836 assert!(
1837 actions.is_empty(),
1838 "no cancel or done actions expected for a transient deferral"
1839 );
1840 }
1841
1842 #[test]
1843 fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1844 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1846 let mut scheduler = make_scheduler(graph);
1847 scheduler.graph.tasks[0].status = TaskStatus::Running;
1848
1849 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1850 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1851 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1852 assert!(actions.is_empty());
1853 }
1854
1855 #[test]
1858 fn test_concurrency_deferral_does_not_affect_running_task() {
1859 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1863 let mut scheduler = make_scheduler(graph);
1864
1865 scheduler.graph.tasks[0].status = TaskStatus::Running;
1867 scheduler.running.insert(
1868 TaskId(0),
1869 RunningTask {
1870 agent_handle_id: "h0".to_string(),
1871 agent_def_name: "worker".to_string(),
1872 started_at: Instant::now(),
1873 },
1874 );
1875 scheduler.graph.tasks[1].status = TaskStatus::Running;
1876
1877 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1879 let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1880
1881 assert_eq!(
1882 scheduler.graph.tasks[0].status,
1883 TaskStatus::Running,
1884 "task 0 must remain Running"
1885 );
1886 assert_eq!(
1887 scheduler.graph.tasks[1].status,
1888 TaskStatus::Ready,
1889 "task 1 must revert to Ready"
1890 );
1891 assert_eq!(
1892 scheduler.graph.status,
1893 GraphStatus::Running,
1894 "graph must stay Running"
1895 );
1896 assert!(actions.is_empty(), "no cancel or done actions expected");
1897 }
1898
1899 #[test]
1900 fn test_max_concurrent_zero_no_infinite_loop() {
1901 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1906 let config = zeph_config::OrchestrationConfig {
1907 max_parallel: 0,
1908 ..make_config()
1909 };
1910 let mut scheduler = DagScheduler::new(
1911 graph,
1912 &config,
1913 Box::new(FirstRouter),
1914 vec![make_def("worker")],
1915 )
1916 .unwrap();
1917
1918 let actions1 = scheduler.tick();
1919 assert!(
1921 actions1
1922 .iter()
1923 .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1924 "no Spawn expected when max_parallel=0"
1925 );
1926 assert!(
1927 actions1
1928 .iter()
1929 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1930 "no Done(Failed) expected — ready tasks exist, so no deadlock"
1931 );
1932 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1933
1934 let actions2 = scheduler.tick();
1936 assert!(
1937 actions2
1938 .iter()
1939 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1940 "second tick must not emit Done(Failed) — ready tasks still exist"
1941 );
1942 assert_eq!(
1943 scheduler.graph.status,
1944 GraphStatus::Running,
1945 "graph must remain Running"
1946 );
1947 }
1948
1949 #[test]
1950 fn test_all_tasks_deferred_graph_stays_running() {
1951 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1955 let mut scheduler = make_scheduler(graph);
1956
1957 let actions = scheduler.tick();
1959 assert_eq!(
1960 actions
1961 .iter()
1962 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1963 .count(),
1964 2,
1965 "expected 2 Spawn actions on first tick"
1966 );
1967 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1968 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1969
1970 let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1972 let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1973 let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1974 assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1975 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1976 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1977 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1978
1979 let retry_actions = scheduler.tick();
1981 let spawn_count = retry_actions
1982 .iter()
1983 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1984 .count();
1985 assert!(
1986 spawn_count > 0,
1987 "second tick must re-emit Spawn for deferred tasks"
1988 );
1989 assert!(
1990 retry_actions.iter().all(|a| !matches!(
1991 a,
1992 SchedulerAction::Done {
1993 status: GraphStatus::Failed,
1994 ..
1995 }
1996 )),
1997 "no Done(Failed) expected"
1998 );
1999 }
2000
2001 #[test]
2002 fn test_build_prompt_no_deps() {
2003 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2004 let scheduler = make_scheduler(graph);
2005 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
2006 assert_eq!(prompt, "description for task 0");
2007 }
2008
2009 #[test]
2010 fn test_build_prompt_with_deps_and_truncation() {
2011 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2012 graph.tasks[0].status = TaskStatus::Completed;
2013 graph.tasks[0].result = Some(TaskResult {
2015 output: "x".repeat(200),
2016 artifacts: vec![],
2017 duration_ms: 10,
2018 agent_id: None,
2019 agent_def: None,
2020 });
2021
2022 let config = zeph_config::OrchestrationConfig {
2023 dependency_context_budget: 50,
2024 ..make_config()
2025 };
2026 let scheduler = DagScheduler::new(
2027 graph,
2028 &config,
2029 Box::new(FirstRouter),
2030 vec![make_def("worker")],
2031 )
2032 .unwrap();
2033
2034 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2035 assert!(prompt.contains("<completed-dependencies>"));
2036 assert!(prompt.contains("[truncated:"));
2037 assert!(prompt.contains("Your task:"));
2038 }
2039
2040 #[test]
2041 fn test_duration_ms_computed_correctly() {
2042 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2044 let mut scheduler = make_scheduler(graph);
2045
2046 scheduler.graph.tasks[0].status = TaskStatus::Running;
2047 scheduler.running.insert(
2048 TaskId(0),
2049 RunningTask {
2050 agent_handle_id: "h0".to_string(),
2051 agent_def_name: "worker".to_string(),
2052 started_at: Instant::now()
2053 .checked_sub(Duration::from_millis(50))
2054 .unwrap(),
2055 },
2056 );
2057
2058 let event = TaskEvent {
2059 task_id: TaskId(0),
2060 agent_handle_id: "h0".to_string(),
2061 outcome: TaskOutcome::Completed {
2062 output: "result".to_string(),
2063 artifacts: vec![],
2064 },
2065 };
2066 scheduler.buffered_events.push_back(event);
2067 scheduler.tick();
2068
2069 let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
2070 assert!(
2071 result.duration_ms > 0,
2072 "duration_ms should be > 0, got {}",
2073 result.duration_ms
2074 );
2075 }
2076
2077 #[test]
2078 fn test_utf8_safe_truncation() {
2079 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2081 graph.tasks[0].status = TaskStatus::Completed;
2082 let unicode_output = "日本語テスト".repeat(100);
2084 graph.tasks[0].result = Some(TaskResult {
2085 output: unicode_output,
2086 artifacts: vec![],
2087 duration_ms: 10,
2088 agent_id: None,
2089 agent_def: None,
2090 });
2091
2092 let config = zeph_config::OrchestrationConfig {
2095 dependency_context_budget: 500,
2096 ..make_config()
2097 };
2098 let scheduler = DagScheduler::new(
2099 graph,
2100 &config,
2101 Box::new(FirstRouter),
2102 vec![make_def("worker")],
2103 )
2104 .unwrap();
2105
2106 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2108 assert!(
2109 prompt.contains("日"),
2110 "Japanese characters should be in the prompt after safe truncation"
2111 );
2112 }
2113
2114 #[test]
2115 fn test_no_agent_routes_inline() {
2116 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2118 let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
2119 let actions = scheduler.tick();
2120 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
2121 assert!(
2122 actions
2123 .iter()
2124 .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
2125 );
2126 }
2127
2128 #[test]
2129 fn test_stale_event_rejected() {
2130 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2132 let mut scheduler = make_scheduler(graph);
2133
2134 scheduler.graph.tasks[0].status = TaskStatus::Running;
2136 scheduler.running.insert(
2137 TaskId(0),
2138 RunningTask {
2139 agent_handle_id: "current-handle".to_string(),
2140 agent_def_name: "worker".to_string(),
2141 started_at: Instant::now(),
2142 },
2143 );
2144
2145 let stale_event = TaskEvent {
2147 task_id: TaskId(0),
2148 agent_handle_id: "old-handle".to_string(),
2149 outcome: TaskOutcome::Completed {
2150 output: "stale output".to_string(),
2151 artifacts: vec![],
2152 },
2153 };
2154 scheduler.buffered_events.push_back(stale_event);
2155 let actions = scheduler.tick();
2156
2157 assert_ne!(
2159 scheduler.graph.tasks[0].status,
2160 TaskStatus::Completed,
2161 "stale event must not complete the task"
2162 );
2163 let has_done = actions
2165 .iter()
2166 .any(|a| matches!(a, SchedulerAction::Done { .. }));
2167 assert!(
2168 !has_done,
2169 "no Done action should be emitted for a stale event"
2170 );
2171 assert!(
2173 scheduler.running.contains_key(&TaskId(0)),
2174 "running task must remain after stale event"
2175 );
2176 }
2177
2178 #[test]
2179 fn test_build_prompt_chars_count_in_truncation_message() {
2180 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2184 graph.tasks[0].status = TaskStatus::Completed;
2185 let output = "x".repeat(200);
2188 graph.tasks[0].result = Some(TaskResult {
2189 output,
2190 artifacts: vec![],
2191 duration_ms: 10,
2192 agent_id: None,
2193 agent_def: None,
2194 });
2195
2196 let config = zeph_config::OrchestrationConfig {
2197 dependency_context_budget: 10, ..make_config()
2199 };
2200 let scheduler = DagScheduler::new(
2201 graph,
2202 &config,
2203 Box::new(FirstRouter),
2204 vec![make_def("worker")],
2205 )
2206 .unwrap();
2207
2208 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2209 assert!(
2211 prompt.contains("chars total"),
2212 "truncation message must use 'chars total' label. Prompt: {prompt}"
2213 );
2214 assert!(
2215 prompt.contains("[truncated:"),
2216 "prompt must contain truncation notice. Prompt: {prompt}"
2217 );
2218 }
2219
2220 #[test]
2223 fn test_resume_from_accepts_paused_graph() {
2224 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2225 graph.status = GraphStatus::Paused;
2226 graph.tasks[0].status = TaskStatus::Pending;
2227
2228 let scheduler =
2229 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2230 .expect("resume_from should accept Paused graph");
2231 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2232 }
2233
2234 #[test]
2235 fn test_resume_from_accepts_failed_graph() {
2236 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2237 graph.status = GraphStatus::Failed;
2238 graph.tasks[0].status = TaskStatus::Failed;
2239
2240 let scheduler =
2241 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2242 .expect("resume_from should accept Failed graph");
2243 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2244 }
2245
2246 #[test]
2247 fn test_resume_from_rejects_completed_graph() {
2248 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2249 graph.status = GraphStatus::Completed;
2250
2251 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2252 .unwrap_err();
2253 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2254 }
2255
2256 #[test]
2257 fn test_resume_from_rejects_canceled_graph() {
2258 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2259 graph.status = GraphStatus::Canceled;
2260
2261 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2262 .unwrap_err();
2263 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2264 }
2265
2266 #[test]
2267 fn test_resume_from_reconstructs_running_tasks() {
2268 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2270 graph.status = GraphStatus::Paused;
2271 graph.tasks[0].status = TaskStatus::Running;
2272 graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2273 graph.tasks[0].agent_hint = Some("worker".to_string());
2274 graph.tasks[1].status = TaskStatus::Pending;
2275
2276 let scheduler =
2277 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2278 .expect("should succeed");
2279
2280 assert!(
2281 scheduler.running.contains_key(&TaskId(0)),
2282 "Running task must be reconstructed in the running map (IC1)"
2283 );
2284 assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2285 assert!(
2286 !scheduler.running.contains_key(&TaskId(1)),
2287 "Pending task must not appear in running map"
2288 );
2289 }
2290
2291 #[test]
2292 fn test_resume_from_sets_status_running() {
2293 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2295 graph.status = GraphStatus::Paused;
2296
2297 let scheduler =
2298 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2299 .unwrap();
2300 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2301 }
2302
2303 #[test]
2306 fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2307 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2310 let mut scheduler = make_scheduler(graph);
2311 scheduler.graph.tasks[0].status = TaskStatus::Running;
2312
2313 assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2314
2315 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2316 scheduler.record_spawn_failure(TaskId(0), &error);
2317 scheduler.record_batch_backoff(false, true);
2319 assert_eq!(
2320 scheduler.consecutive_spawn_failures, 1,
2321 "first deferral tick: consecutive_spawn_failures must be 1"
2322 );
2323
2324 scheduler.graph.tasks[0].status = TaskStatus::Running;
2325 scheduler.record_spawn_failure(TaskId(0), &error);
2326 scheduler.record_batch_backoff(false, true);
2327 assert_eq!(
2328 scheduler.consecutive_spawn_failures, 2,
2329 "second deferral tick: consecutive_spawn_failures must be 2"
2330 );
2331
2332 scheduler.graph.tasks[0].status = TaskStatus::Running;
2333 scheduler.record_spawn_failure(TaskId(0), &error);
2334 scheduler.record_batch_backoff(false, true);
2335 assert_eq!(
2336 scheduler.consecutive_spawn_failures, 3,
2337 "third deferral tick: consecutive_spawn_failures must be 3"
2338 );
2339 }
2340
2341 #[test]
2342 fn test_consecutive_spawn_failures_resets_on_success() {
2343 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2346 let mut scheduler = make_scheduler(graph);
2347 scheduler.graph.tasks[0].status = TaskStatus::Running;
2348
2349 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2350 scheduler.record_spawn_failure(TaskId(0), &error);
2351 scheduler.record_batch_backoff(false, true);
2352 scheduler.graph.tasks[0].status = TaskStatus::Running;
2353 scheduler.record_spawn_failure(TaskId(0), &error);
2354 scheduler.record_batch_backoff(false, true);
2355 assert_eq!(scheduler.consecutive_spawn_failures, 2);
2356
2357 scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2359 assert_eq!(
2360 scheduler.consecutive_spawn_failures, 0,
2361 "record_spawn must reset consecutive_spawn_failures to 0"
2362 );
2363 }
2364
2365 #[tokio::test]
2366 async fn test_exponential_backoff_duration() {
2367 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2370 let config = zeph_config::OrchestrationConfig {
2371 deferral_backoff_ms: 50,
2372 ..make_config()
2373 };
2374 let mut scheduler = DagScheduler::new(
2375 graph,
2376 &config,
2377 Box::new(FirstRouter),
2378 vec![make_def("worker")],
2379 )
2380 .unwrap();
2381
2382 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2384 let start = tokio::time::Instant::now();
2385 scheduler.wait_event().await;
2386 let elapsed0 = start.elapsed();
2387 assert!(
2388 elapsed0.as_millis() >= 50,
2389 "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2390 elapsed0.as_millis()
2391 );
2392
2393 scheduler.consecutive_spawn_failures = 3;
2395 let start = tokio::time::Instant::now();
2396 scheduler.wait_event().await;
2397 let elapsed3 = start.elapsed();
2398 assert!(
2399 elapsed3.as_millis() >= 400,
2400 "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2401 elapsed3.as_millis()
2402 );
2403
2404 scheduler.consecutive_spawn_failures = 20;
2406 let start = tokio::time::Instant::now();
2407 scheduler.wait_event().await;
2408 let elapsed_capped = start.elapsed();
2409 assert!(
2410 elapsed_capped.as_millis() >= 5000,
2411 "backoff must be capped at 5000ms with high deferrals, got {}ms",
2412 elapsed_capped.as_millis()
2413 );
2414 }
2415
2416 #[tokio::test]
2419 async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2420 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2423 let config = zeph_config::OrchestrationConfig {
2424 deferral_backoff_ms: 50,
2425 ..make_config()
2426 };
2427 let mut scheduler = DagScheduler::new(
2428 graph,
2429 &config,
2430 Box::new(FirstRouter),
2431 vec![make_def("worker")],
2432 )
2433 .unwrap();
2434
2435 assert!(scheduler.running.is_empty());
2437
2438 let start = tokio::time::Instant::now();
2439 scheduler.wait_event().await;
2440 let elapsed = start.elapsed();
2441
2442 assert!(
2443 elapsed.as_millis() >= 50,
2444 "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2445 elapsed.as_millis()
2446 );
2447 }
2448
2449 #[test]
2450 fn test_current_deferral_backoff_exponential_growth() {
2451 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2454 let config = zeph_config::OrchestrationConfig {
2455 deferral_backoff_ms: 250,
2456 ..make_config()
2457 };
2458 let mut scheduler = DagScheduler::new(
2459 graph,
2460 &config,
2461 Box::new(FirstRouter),
2462 vec![make_def("worker")],
2463 )
2464 .unwrap();
2465
2466 assert_eq!(
2467 scheduler.current_deferral_backoff(),
2468 Duration::from_millis(250)
2469 );
2470
2471 scheduler.consecutive_spawn_failures = 1;
2472 assert_eq!(
2473 scheduler.current_deferral_backoff(),
2474 Duration::from_millis(500)
2475 );
2476
2477 scheduler.consecutive_spawn_failures = 2;
2478 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2479
2480 scheduler.consecutive_spawn_failures = 3;
2481 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2482
2483 scheduler.consecutive_spawn_failures = 4;
2484 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2485
2486 scheduler.consecutive_spawn_failures = 5;
2488 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2489
2490 scheduler.consecutive_spawn_failures = 100;
2491 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2492 }
2493
2494 #[test]
2495 fn test_record_spawn_resets_consecutive_failures() {
2496 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2498 let mut scheduler = DagScheduler::new(
2499 graph,
2500 &make_config(),
2501 Box::new(FirstRouter),
2502 vec![make_def("worker")],
2503 )
2504 .unwrap();
2505
2506 scheduler.consecutive_spawn_failures = 3;
2507 let task_id = TaskId(0);
2508 scheduler.graph.tasks[0].status = TaskStatus::Running;
2509 scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2510
2511 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2512 }
2513
2514 #[test]
2515 fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2516 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2519 let mut scheduler = DagScheduler::new(
2520 graph,
2521 &make_config(),
2522 Box::new(FirstRouter),
2523 vec![make_def("worker")],
2524 )
2525 .unwrap();
2526
2527 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2528 let task_id = TaskId(0);
2529 scheduler.graph.tasks[0].status = TaskStatus::Running;
2530
2531 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2532 scheduler.record_spawn_failure(task_id, &error);
2533
2534 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2536 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2538 }
2539
2540 #[test]
2543 fn test_parallel_dispatch_all_ready() {
2544 let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2547 let graph = graph_from_nodes(nodes);
2548 let config = zeph_config::OrchestrationConfig {
2549 max_parallel: 2,
2550 ..make_config()
2551 };
2552 let mut scheduler = DagScheduler::new(
2553 graph,
2554 &config,
2555 Box::new(FirstRouter),
2556 vec![make_def("worker")],
2557 )
2558 .unwrap();
2559
2560 let actions = scheduler.tick();
2561 let spawn_count = actions
2562 .iter()
2563 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2564 .count();
2565 assert_eq!(
2566 spawn_count, 2,
2567 "only max_parallel=2 tasks dispatched per tick"
2568 );
2569
2570 let running_count = scheduler
2571 .graph
2572 .tasks
2573 .iter()
2574 .filter(|t| t.status == TaskStatus::Running)
2575 .count();
2576 assert_eq!(running_count, 2, "only 2 tasks marked Running");
2577 }
2578
2579 #[test]
2580 fn test_batch_backoff_partial_success() {
2581 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2583 let mut scheduler = make_scheduler(graph);
2584 scheduler.consecutive_spawn_failures = 3;
2585
2586 scheduler.record_batch_backoff(true, true);
2587 assert_eq!(
2588 scheduler.consecutive_spawn_failures, 0,
2589 "any success in batch must reset counter"
2590 );
2591 }
2592
2593 #[test]
2594 fn test_batch_backoff_all_failed() {
2595 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2597 let mut scheduler = make_scheduler(graph);
2598 scheduler.consecutive_spawn_failures = 2;
2599
2600 scheduler.record_batch_backoff(false, true);
2601 assert_eq!(
2602 scheduler.consecutive_spawn_failures, 3,
2603 "all-failure tick must increment counter"
2604 );
2605 }
2606
2607 #[test]
2608 fn test_batch_backoff_no_spawns() {
2609 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2611 let mut scheduler = make_scheduler(graph);
2612 scheduler.consecutive_spawn_failures = 5;
2613
2614 scheduler.record_batch_backoff(false, false);
2615 assert_eq!(
2616 scheduler.consecutive_spawn_failures, 5,
2617 "no spawns must not change counter"
2618 );
2619 }
2620
2621 #[test]
2622 fn test_buffer_guard_uses_task_count() {
2623 let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2633 let graph = graph_from_nodes(nodes);
2634 let config = zeph_config::OrchestrationConfig {
2635 max_parallel: 2, ..make_config()
2637 };
2638 let scheduler = DagScheduler::new(
2639 graph,
2640 &config,
2641 Box::new(FirstRouter),
2642 vec![make_def("worker")],
2643 )
2644 .unwrap();
2645 assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2647 assert_eq!(scheduler.max_parallel * 2, 4);
2648 }
2649
2650 #[test]
2651 fn test_batch_mixed_concurrency_and_fatal_failure() {
2652 let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2660 nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2662 let graph = graph_from_nodes(nodes);
2663 let mut scheduler = make_scheduler(graph);
2664
2665 scheduler.graph.tasks[0].status = TaskStatus::Running;
2667 scheduler.graph.tasks[1].status = TaskStatus::Running;
2668
2669 let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2671 let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2672 assert!(
2673 actions0.is_empty(),
2674 "ConcurrencyLimit must produce no extra actions"
2675 );
2676 assert_eq!(
2677 scheduler.graph.tasks[0].status,
2678 TaskStatus::Ready,
2679 "task 0 must revert to Ready"
2680 );
2681
2682 let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2685 let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2686 assert_eq!(
2687 scheduler.graph.tasks[1].status,
2688 TaskStatus::Skipped,
2689 "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2690 );
2691 assert!(
2693 actions1
2694 .iter()
2695 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2696 "no Done action expected: task 0 is still Ready"
2697 );
2698
2699 scheduler.consecutive_spawn_failures = 0;
2701 scheduler.record_batch_backoff(false, true);
2702 assert_eq!(
2703 scheduler.consecutive_spawn_failures, 1,
2704 "batch with only ConcurrencyLimit must increment counter"
2705 );
2706 }
2707
2708 #[test]
2712 fn test_deadlock_marks_non_terminal_tasks_canceled() {
2713 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2718 nodes[0].status = TaskStatus::Failed;
2719 nodes[1].status = TaskStatus::Pending;
2720 nodes[2].status = TaskStatus::Pending;
2721
2722 let mut graph = graph_from_nodes(nodes);
2723 graph.status = GraphStatus::Failed;
2724
2725 let mut scheduler = DagScheduler::resume_from(
2726 graph,
2727 &make_config(),
2728 Box::new(FirstRouter),
2729 vec![make_def("worker")],
2730 )
2731 .unwrap();
2732
2733 let actions = scheduler.tick();
2735
2736 assert!(
2738 actions.iter().any(|a| matches!(
2739 a,
2740 SchedulerAction::Done {
2741 status: GraphStatus::Failed
2742 }
2743 )),
2744 "deadlock must emit Done(Failed); got: {actions:?}"
2745 );
2746 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2747
2748 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2750 assert_eq!(
2752 scheduler.graph.tasks[1].status,
2753 TaskStatus::Canceled,
2754 "Pending task must be Canceled on deadlock"
2755 );
2756 assert_eq!(
2758 scheduler.graph.tasks[2].status,
2759 TaskStatus::Canceled,
2760 "Pending task must be Canceled on deadlock"
2761 );
2762 }
2763
2764 #[test]
2767 fn test_deadlock_not_triggered_when_task_running() {
2768 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2771 nodes[0].status = TaskStatus::Running;
2772 nodes[0].assigned_agent = Some("handle-1".into());
2773 nodes[1].status = TaskStatus::Pending;
2774
2775 let mut graph = graph_from_nodes(nodes);
2776 graph.status = GraphStatus::Failed;
2777
2778 let mut scheduler = DagScheduler::resume_from(
2779 graph,
2780 &make_config(),
2781 Box::new(FirstRouter),
2782 vec![make_def("worker")],
2783 )
2784 .unwrap();
2785
2786 let actions = scheduler.tick();
2787
2788 assert!(
2790 actions
2791 .iter()
2792 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2793 "no Done action expected when a task is running; got: {actions:?}"
2794 );
2795 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2796 }
2797
2798 #[test]
2801 fn topology_linear_chain_limits_parallelism_to_one() {
2802 let graph = graph_from_nodes(vec![
2805 make_node(0, &[]),
2806 make_node(1, &[0]),
2807 make_node(2, &[1]),
2808 ]);
2809 let config = zeph_config::OrchestrationConfig {
2810 topology_selection: true,
2811 max_parallel: 4,
2812 ..make_config()
2813 };
2814 let mut scheduler = DagScheduler::new(
2815 graph,
2816 &config,
2817 Box::new(FirstRouter),
2818 vec![make_def("worker")],
2819 )
2820 .unwrap();
2821
2822 assert_eq!(
2823 scheduler.topology().topology,
2824 crate::topology::Topology::LinearChain
2825 );
2826 assert_eq!(scheduler.max_parallel, 1);
2827
2828 let actions = scheduler.tick();
2829 let spawn_count = actions
2830 .iter()
2831 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2832 .count();
2833 assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2834 }
2835
2836 #[test]
2837 fn topology_all_parallel_dispatches_all_ready() {
2838 let graph = graph_from_nodes(vec![
2841 make_node(0, &[]),
2842 make_node(1, &[]),
2843 make_node(2, &[]),
2844 make_node(3, &[]),
2845 ]);
2846 let config = zeph_config::OrchestrationConfig {
2847 topology_selection: true,
2848 max_parallel: 4,
2849 ..make_config()
2850 };
2851 let mut scheduler = DagScheduler::new(
2852 graph,
2853 &config,
2854 Box::new(FirstRouter),
2855 vec![make_def("worker")],
2856 )
2857 .unwrap();
2858
2859 assert_eq!(
2860 scheduler.topology().topology,
2861 crate::topology::Topology::AllParallel
2862 );
2863
2864 let actions = scheduler.tick();
2865 let spawn_count = actions
2866 .iter()
2867 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2868 .count();
2869 assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2870 }
2871
2872 #[test]
2873 fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2874 use crate::graph::ExecutionMode;
2877
2878 let mut a = make_node(0, &[]);
2879 a.execution_mode = ExecutionMode::Sequential;
2880 let mut b = make_node(1, &[]);
2881 b.execution_mode = ExecutionMode::Sequential;
2882 let mut c = make_node(2, &[]);
2883 c.execution_mode = ExecutionMode::Parallel;
2884
2885 let graph = graph_from_nodes(vec![a, b, c]);
2886 let config = zeph_config::OrchestrationConfig {
2887 max_parallel: 4,
2888 ..make_config()
2889 };
2890 let mut scheduler = DagScheduler::new(
2891 graph,
2892 &config,
2893 Box::new(FirstRouter),
2894 vec![make_def("worker")],
2895 )
2896 .unwrap();
2897
2898 let actions = scheduler.tick();
2899 let spawned: Vec<TaskId> = actions
2900 .iter()
2901 .filter_map(|a| {
2902 if let SchedulerAction::Spawn { task_id, .. } = a {
2903 Some(*task_id)
2904 } else {
2905 None
2906 }
2907 })
2908 .collect();
2909
2910 assert!(
2912 spawned.contains(&TaskId(0)),
2913 "A(sequential) must be dispatched"
2914 );
2915 assert!(
2916 spawned.contains(&TaskId(2)),
2917 "C(parallel) must be dispatched"
2918 );
2919 assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2920 assert_eq!(spawned.len(), 2);
2921 }
2922
2923 #[test]
2926 fn test_inject_tasks_per_task_cap_skips_second() {
2927 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2929 let mut scheduler = make_scheduler(graph);
2930
2931 let first = make_node(2, &[]);
2932 scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2933 assert_eq!(
2934 scheduler.graph.tasks.len(),
2935 3,
2936 "first inject must append the task"
2937 );
2938 assert_eq!(scheduler.global_replan_count, 1);
2939
2940 let second = make_node(3, &[]);
2942 scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2943 assert_eq!(
2944 scheduler.graph.tasks.len(),
2945 3,
2946 "second inject must be silently skipped (per-task cap)"
2947 );
2948 assert_eq!(
2949 scheduler.global_replan_count, 1,
2950 "global counter must not increment on skipped inject"
2951 );
2952 }
2953
2954 #[test]
2955 fn test_inject_tasks_global_cap_skips_when_exhausted() {
2956 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2958 let mut config = make_config();
2959 config.max_replans = 1;
2960 let defs = vec![make_def("worker")];
2961 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2962
2963 let new1 = make_node(2, &[]);
2964 scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2965 assert_eq!(scheduler.global_replan_count, 1);
2966
2967 let new2 = make_node(3, &[]);
2969 scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2970 assert_eq!(
2971 scheduler.graph.tasks.len(),
2972 3,
2973 "global cap must prevent the second inject"
2974 );
2975 assert_eq!(
2976 scheduler.global_replan_count, 1,
2977 "global counter must not increment past cap"
2978 );
2979 }
2980
2981 #[test]
2982 fn test_inject_tasks_sets_topology_dirty() {
2983 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2985 let mut scheduler = make_scheduler(graph);
2986 assert!(
2987 !scheduler.topology_dirty,
2988 "topology_dirty must be false initially"
2989 );
2990
2991 let new_task = make_node(1, &[]);
2992 scheduler
2993 .inject_tasks(TaskId(0), vec![new_task], 20)
2994 .unwrap();
2995 assert!(
2996 scheduler.topology_dirty,
2997 "inject_tasks must set topology_dirty=true"
2998 );
2999
3000 scheduler.tick();
3001 assert!(
3002 !scheduler.topology_dirty,
3003 "tick() must clear topology_dirty after re-analysis"
3004 );
3005 }
3006
3007 #[test]
3008 fn test_inject_tasks_rejects_cycle() {
3009 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3011 let mut scheduler = make_scheduler(graph);
3012
3013 let cyclic_task = make_node(1, &[1]);
3015 let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
3016 assert!(result.is_err(), "cyclic injection must return an error");
3017 assert!(
3018 matches!(
3019 result.unwrap_err(),
3020 OrchestrationError::VerificationFailed(_)
3021 ),
3022 "must return VerificationFailed for cycle"
3023 );
3024 assert_eq!(scheduler.global_replan_count, 0);
3026 assert!(
3027 !scheduler.topology_dirty,
3028 "topology_dirty must not be set when inject fails"
3029 );
3030 }
3031
3032 fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
3035 zeph_config::OrchestrationConfig {
3036 topology_selection: true,
3037 max_parallel: 4,
3038 ..make_config()
3039 }
3040 }
3041
3042 fn make_hierarchical_graph() -> TaskGraph {
3044 graph_from_nodes(vec![
3045 make_node(0, &[]),
3046 make_node(1, &[0]),
3047 make_node(2, &[0]),
3048 make_node(3, &[1]),
3049 ])
3050 }
3051
3052 #[test]
3053 fn test_level_barrier_advances_on_terminal_level() {
3054 let graph = make_hierarchical_graph();
3057 let config = make_hierarchical_config();
3058 let defs = vec![make_def("worker")];
3059 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3060
3061 assert_eq!(
3062 scheduler.topology().strategy,
3063 crate::topology::DispatchStrategy::LevelBarrier,
3064 "must use LevelBarrier strategy for Hierarchical graph"
3065 );
3066 assert_eq!(scheduler.current_level, 0);
3067
3068 let actions = scheduler.tick();
3070 let spawned_ids: Vec<_> = actions
3071 .iter()
3072 .filter_map(|a| {
3073 if let SchedulerAction::Spawn { task_id, .. } = a {
3074 Some(*task_id)
3075 } else {
3076 None
3077 }
3078 })
3079 .collect();
3080 assert_eq!(
3081 spawned_ids,
3082 vec![TaskId(0)],
3083 "first tick must dispatch only A at level 0"
3084 );
3085
3086 scheduler.graph.tasks[0].status = TaskStatus::Completed;
3088 scheduler.running.clear();
3089 scheduler.graph.tasks[1].status = TaskStatus::Ready;
3090 scheduler.graph.tasks[2].status = TaskStatus::Ready;
3091
3092 let actions2 = scheduler.tick();
3094 assert_eq!(
3095 scheduler.current_level, 1,
3096 "current_level must advance to 1 after level-0 tasks terminate"
3097 );
3098 let spawned2: Vec<_> = actions2
3099 .iter()
3100 .filter_map(|a| {
3101 if let SchedulerAction::Spawn { task_id, .. } = a {
3102 Some(*task_id)
3103 } else {
3104 None
3105 }
3106 })
3107 .collect();
3108 assert!(
3109 spawned2.contains(&TaskId(1)),
3110 "B must be dispatched after level advance"
3111 );
3112 assert!(
3113 spawned2.contains(&TaskId(2)),
3114 "C must be dispatched after level advance"
3115 );
3116 }
3117
3118 #[test]
3119 fn test_level_barrier_failure_propagates_transitively() {
3120 let graph = make_hierarchical_graph();
3123 let config = make_hierarchical_config();
3124 let defs = vec![make_def("worker")];
3125 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3126
3127 scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
3129 scheduler.graph.tasks[0].status = TaskStatus::Running;
3130 scheduler.running.insert(
3131 TaskId(0),
3132 RunningTask {
3133 agent_handle_id: "h0".to_string(),
3134 agent_def_name: "worker".to_string(),
3135 started_at: Instant::now(),
3136 },
3137 );
3138
3139 scheduler.buffered_events.push_back(TaskEvent {
3141 task_id: TaskId(0),
3142 agent_handle_id: "h0".to_string(),
3143 outcome: TaskOutcome::Failed {
3144 error: "simulated failure".to_string(),
3145 },
3146 });
3147
3148 scheduler.tick();
3149
3150 assert_eq!(
3152 scheduler.graph.tasks[0].status,
3153 TaskStatus::Skipped,
3154 "A must be Skipped (Skip strategy)"
3155 );
3156 assert_eq!(
3157 scheduler.graph.tasks[1].status,
3158 TaskStatus::Skipped,
3159 "B must be transitively Skipped"
3160 );
3161 assert_eq!(
3162 scheduler.graph.tasks[2].status,
3163 TaskStatus::Skipped,
3164 "C must be transitively Skipped"
3165 );
3166 assert_eq!(
3167 scheduler.graph.tasks[3].status,
3168 TaskStatus::Skipped,
3169 "D must be transitively Skipped"
3170 );
3171 }
3172
3173 #[test]
3174 fn test_level_barrier_current_level_reset_after_inject() {
3175 let graph = make_hierarchical_graph(); let config = make_hierarchical_config();
3179 let defs = vec![make_def("worker")];
3180 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3181
3182 scheduler.graph.tasks[0].status = TaskStatus::Completed; scheduler.graph.tasks[1].status = TaskStatus::Completed; scheduler.graph.tasks[2].status = TaskStatus::Completed; scheduler.current_level = 2;
3188
3189 let e = make_node(4, &[0]);
3192 scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3193 assert!(scheduler.topology_dirty);
3194
3195 scheduler.tick();
3198 assert_eq!(
3199 scheduler.current_level, 1,
3200 "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3201 );
3202 }
3203
3204 #[test]
3205 fn resume_from_preserves_topology_classification() {
3206 let mut graph = graph_from_nodes(vec![
3208 make_node(0, &[]),
3209 make_node(1, &[0]),
3210 make_node(2, &[1]),
3211 ]);
3212 graph.status = GraphStatus::Paused;
3214 graph.tasks[0].status = TaskStatus::Completed;
3215 graph.tasks[1].status = TaskStatus::Pending;
3216 graph.tasks[2].status = TaskStatus::Pending;
3217
3218 let config = zeph_config::OrchestrationConfig {
3219 topology_selection: true,
3220 max_parallel: 4,
3221 ..make_config()
3222 };
3223 let scheduler = DagScheduler::resume_from(
3224 graph,
3225 &config,
3226 Box::new(FirstRouter),
3227 vec![make_def("worker")],
3228 )
3229 .unwrap();
3230
3231 assert_eq!(
3232 scheduler.topology().topology,
3233 crate::topology::Topology::LinearChain,
3234 "resume_from must classify topology"
3235 );
3236 assert_eq!(
3237 scheduler.max_parallel, 1,
3238 "resume_from must apply topology limit"
3239 );
3240 }
3241
3242 fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3245 zeph_config::OrchestrationConfig {
3246 verify_completeness: true,
3247 verify_provider: provider.to_string(),
3248 ..make_config()
3249 }
3250 }
3251
3252 #[test]
3253 fn validate_verify_config_unknown_provider_returns_err() {
3254 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3255 let config = make_verify_config("nonexistent");
3256 let scheduler = DagScheduler::new(
3257 graph,
3258 &config,
3259 Box::new(FirstRouter),
3260 vec![make_def("worker")],
3261 )
3262 .unwrap();
3263 let result = scheduler.validate_verify_config(&["fast", "quality"]);
3264 assert!(result.is_err());
3265 let err_msg = result.unwrap_err().to_string();
3266 assert!(err_msg.contains("nonexistent"));
3267 assert!(err_msg.contains("fast"));
3268 }
3269
3270 #[test]
3271 fn validate_verify_config_known_provider_returns_ok() {
3272 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3273 let config = make_verify_config("fast");
3274 let scheduler = DagScheduler::new(
3275 graph,
3276 &config,
3277 Box::new(FirstRouter),
3278 vec![make_def("worker")],
3279 )
3280 .unwrap();
3281 assert!(
3282 scheduler
3283 .validate_verify_config(&["fast", "quality"])
3284 .is_ok()
3285 );
3286 }
3287
3288 #[test]
3289 fn validate_verify_config_empty_provider_always_ok() {
3290 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3291 let config = make_verify_config("");
3292 let scheduler = DagScheduler::new(
3293 graph,
3294 &config,
3295 Box::new(FirstRouter),
3296 vec![make_def("worker")],
3297 )
3298 .unwrap();
3299 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3300 }
3301
3302 #[test]
3303 fn validate_verify_config_disabled_skips_validation() {
3304 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3305 let scheduler = make_scheduler(graph);
3307 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3308 }
3309
3310 #[test]
3311 fn validate_verify_config_empty_pool_skips_validation() {
3312 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3313 let config = make_verify_config("nonexistent");
3314 let scheduler = DagScheduler::new(
3315 graph,
3316 &config,
3317 Box::new(FirstRouter),
3318 vec![make_def("worker")],
3319 )
3320 .unwrap();
3321 assert!(scheduler.validate_verify_config(&[]).is_ok());
3323 }
3324
3325 #[test]
3326 fn validate_verify_config_trims_whitespace_in_config() {
3327 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3328 let config = make_verify_config(" fast ");
3330 let scheduler = DagScheduler::new(
3331 graph,
3332 &config,
3333 Box::new(FirstRouter),
3334 vec![make_def("worker")],
3335 )
3336 .unwrap();
3337 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3338 }
3339
3340 #[test]
3343 fn config_max_parallel_initialized_from_config() {
3344 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3347 let config = zeph_config::OrchestrationConfig {
3348 topology_selection: true,
3349 max_parallel: 6,
3350 ..make_config()
3351 };
3352 let scheduler = DagScheduler::new(
3353 graph,
3354 &config,
3355 Box::new(FirstRouter),
3356 vec![make_def("worker")],
3357 )
3358 .unwrap();
3359
3360 assert_eq!(
3361 scheduler.config_max_parallel, 6,
3362 "config_max_parallel must equal config.max_parallel"
3363 );
3364 assert_eq!(
3366 scheduler.max_parallel, 1,
3367 "max_parallel reduced by topology analysis"
3368 );
3369 assert_eq!(
3370 scheduler.config_max_parallel, 6,
3371 "config_max_parallel must not be reduced by topology"
3372 );
3373 }
3374
3375 #[test]
3376 fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3377 let graph = graph_from_nodes(vec![
3389 make_node(0, &[]),
3390 make_node(1, &[0]),
3391 make_node(2, &[0]),
3392 make_node(3, &[1, 2]), ]);
3394 let config = zeph_config::OrchestrationConfig {
3395 topology_selection: true,
3396 max_parallel: 4,
3397 max_tasks: 50,
3398 ..make_config()
3399 };
3400 let mut scheduler = DagScheduler::new(
3401 graph,
3402 &config,
3403 Box::new(FirstRouter),
3404 vec![make_def("worker")],
3405 )
3406 .unwrap();
3407
3408 assert_eq!(
3410 scheduler.topology().topology,
3411 crate::topology::Topology::Mixed,
3412 "initial topology must be Mixed"
3413 );
3414 let expected_max_parallel = (4usize / 2 + 1).clamp(1, 4); assert_eq!(scheduler.max_parallel, expected_max_parallel);
3416
3417 let extra_task_id = 4u32;
3420 let extra_task = {
3421 let mut n = crate::graph::TaskNode::new(
3422 extra_task_id,
3423 "extra".to_string(),
3424 "extra task injected by replan",
3425 );
3426 n.depends_on = vec![TaskId(3)];
3427 n
3428 };
3429
3430 scheduler.graph.tasks[3].status = TaskStatus::Completed;
3432
3433 scheduler
3434 .inject_tasks(TaskId(3), vec![extra_task], 50)
3435 .expect("inject must succeed");
3436 assert!(
3437 scheduler.topology_dirty,
3438 "topology_dirty must be true after inject"
3439 );
3440
3441 let _ = scheduler.tick();
3443 let max_after_first_inject = scheduler.max_parallel;
3444 assert_eq!(
3445 max_after_first_inject, expected_max_parallel,
3446 "max_parallel must not drift after first inject+tick"
3447 );
3448
3449 let extra_task2 = {
3451 let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3452 n.depends_on = vec![TaskId(extra_task_id)];
3453 n
3454 };
3455 scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3456 scheduler
3459 .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3460 .expect("second inject must succeed");
3461
3462 let _ = scheduler.tick();
3463 let max_after_second_inject = scheduler.max_parallel;
3464 assert_eq!(
3465 max_after_second_inject, expected_max_parallel,
3466 "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3467 );
3468 }
3469
3470 #[test]
3473 fn completeness_threshold_returns_config_value() {
3474 let mut config = make_config();
3475 config.completeness_threshold = 0.85;
3476 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3477 let scheduler =
3478 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3479 assert!((scheduler.completeness_threshold() - 0.85).abs() < f32::EPSILON);
3480 }
3481
3482 #[test]
3483 fn completeness_threshold_default_is_0_7() {
3484 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3485 let scheduler = make_scheduler(graph);
3486 assert!((scheduler.completeness_threshold() - 0.7).abs() < f32::EPSILON);
3487 }
3488
3489 #[test]
3490 fn verify_provider_name_returns_config_value() {
3491 let mut config = make_config();
3492 config.verify_provider = "fast".to_string();
3493 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3494 let scheduler =
3495 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3496 assert_eq!(scheduler.verify_provider_name(), "fast");
3497 }
3498
3499 #[test]
3500 fn verify_provider_name_empty_when_not_set() {
3501 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3502 let scheduler = make_scheduler(graph);
3503 assert_eq!(scheduler.verify_provider_name(), "");
3504 }
3505
3506 #[test]
3507 fn max_replans_remaining_initial_equals_max_replans() {
3508 let mut config = make_config();
3509 config.max_replans = 3;
3510 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3511 let scheduler =
3512 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3513 assert_eq!(scheduler.max_replans_remaining(), 3);
3514 }
3515
3516 #[test]
3517 fn max_replans_remaining_decrements_after_record() {
3518 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3519 let mut scheduler = make_scheduler(graph);
3520 assert_eq!(scheduler.max_replans_remaining(), 2);
3521 scheduler.record_whole_plan_replan();
3522 assert_eq!(scheduler.max_replans_remaining(), 1);
3523 scheduler.record_whole_plan_replan();
3524 assert_eq!(scheduler.max_replans_remaining(), 0);
3525 scheduler.record_whole_plan_replan();
3527 assert_eq!(scheduler.max_replans_remaining(), 0);
3528 }
3529
3530 #[test]
3531 fn record_whole_plan_replan_does_not_modify_graph() {
3532 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3533 let mut scheduler = make_scheduler(graph);
3534 let task_count_before = scheduler.graph().tasks.len();
3535 scheduler.record_whole_plan_replan();
3536 assert_eq!(
3537 scheduler.graph().tasks.len(),
3538 task_count_before,
3539 "record_whole_plan_replan must not modify the task graph"
3540 );
3541 }
3542
3543 fn make_cascade_config() -> zeph_config::OrchestrationConfig {
3546 zeph_config::OrchestrationConfig {
3547 topology_selection: true,
3548 cascade_routing: true,
3549 cascade_failure_threshold: 0.4,
3550 max_parallel: 4,
3551 ..make_config()
3552 }
3553 }
3554
3555 #[test]
3556 fn inject_tasks_resets_cascade_detector() {
3557 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3560 graph.tasks[0].status = TaskStatus::Completed;
3561 graph.tasks[1].status = TaskStatus::Completed;
3562 let config = make_cascade_config();
3563 let mut scheduler = DagScheduler::new(
3564 graph,
3565 &config,
3566 Box::new(FirstRouter),
3567 vec![make_def("worker")],
3568 )
3569 .unwrap();
3570
3571 if let Some(ref mut det) = scheduler.cascade_detector {
3573 let g = &scheduler.graph;
3574 det.record_outcome(TaskId(1), false, g);
3575 assert_eq!(det.region_health().len(), 1);
3577 } else {
3578 panic!(
3579 "cascade_detector must be Some when cascade_routing=true and topology_selection=true"
3580 );
3581 }
3582
3583 let new_task = make_node(2, &[1]);
3585 scheduler
3586 .inject_tasks(TaskId(1), vec![new_task], 20)
3587 .unwrap();
3588
3589 assert!(
3590 scheduler
3591 .cascade_detector
3592 .as_ref()
3593 .is_some_and(|d| d.region_health().is_empty()),
3594 "cascade_detector must be cleared after inject_tasks (C13 fix)"
3595 );
3596 }
3597
3598 #[test]
3599 fn sequential_tasks_not_reordered_by_cascade() {
3600 let mut graph = graph_from_nodes(vec![
3607 make_node(0, &[]), make_node(1, &[]), make_node(2, &[1]), ]);
3611 graph.tasks[2].execution_mode = ExecutionMode::Sequential;
3612 let config = make_cascade_config();
3613 let mut scheduler = DagScheduler::new(
3614 graph,
3615 &config,
3616 Box::new(FirstRouter),
3617 vec![make_def("worker")],
3618 )
3619 .unwrap();
3620
3621 if let Some(ref mut det) = scheduler.cascade_detector {
3623 let g = &scheduler.graph;
3624 det.record_outcome(TaskId(1), false, g);
3626 det.record_outcome(TaskId(2), false, g);
3627 } else {
3628 panic!("cascade_detector must be Some");
3629 }
3630
3631 let actions = scheduler.tick();
3634
3635 let spawned_ids: Vec<TaskId> = actions
3640 .iter()
3641 .filter_map(|a| {
3642 if let SchedulerAction::Spawn { task_id, .. }
3643 | SchedulerAction::RunInline { task_id, .. } = a
3644 {
3645 Some(*task_id)
3646 } else {
3647 None
3648 }
3649 })
3650 .collect();
3651
3652 assert!(
3657 !spawned_ids.is_empty(),
3658 "tick must dispatch at least one ready task; Sequential tasks must not be dropped by cascade logic"
3659 );
3660 }
3661
3662 #[test]
3663 fn cascade_routing_without_topology_selection_creates_no_detector() {
3664 let config = zeph_config::OrchestrationConfig {
3667 cascade_routing: true,
3668 topology_selection: false,
3669 ..make_config()
3670 };
3671 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3672 let scheduler = DagScheduler::new(
3673 graph,
3674 &config,
3675 Box::new(FirstRouter),
3676 vec![make_def("worker")],
3677 )
3678 .unwrap();
3679 assert!(
3680 scheduler.cascade_detector.is_none(),
3681 "cascade_detector must be None when topology_selection=false"
3682 );
3683 }
3684}