1use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::cascade::{CascadeConfig, CascadeDetector};
14use super::dag;
15use super::error::OrchestrationError;
16use super::graph::{
17 ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
18};
19use super::router::AgentRouter;
20use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
21use super::verifier::inject_tasks as verifier_inject_tasks;
22use zeph_config::OrchestrationConfig;
23use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
24use zeph_subagent::{SubAgentDef, SubAgentError};
25
26#[derive(Debug)]
31pub enum SchedulerAction {
32 Spawn {
34 task_id: TaskId,
35 agent_def_name: String,
36 prompt: String,
37 },
38 Cancel { agent_handle_id: String },
40 RunInline { task_id: TaskId, prompt: String },
42 Done { status: GraphStatus },
44 Verify { task_id: TaskId, output: String },
51}
52
53#[derive(Debug)]
55pub struct TaskEvent {
56 pub task_id: TaskId,
57 pub agent_handle_id: String,
58 pub outcome: TaskOutcome,
59}
60
61#[derive(Debug)]
63pub enum TaskOutcome {
64 Completed {
66 output: String,
67 artifacts: Vec<PathBuf>,
68 },
69 Failed { error: String },
71}
72
73struct RunningTask {
75 agent_handle_id: String,
76 agent_def_name: String,
77 started_at: Instant,
78}
79
80#[allow(clippy::struct_excessive_bools)]
106pub struct DagScheduler {
107 graph: TaskGraph,
108 max_parallel: usize,
109 config_max_parallel: usize,
116 running: HashMap<TaskId, RunningTask>,
118 event_rx: mpsc::Receiver<TaskEvent>,
120 event_tx: mpsc::Sender<TaskEvent>,
122 task_timeout: Duration,
124 router: Box<dyn AgentRouter>,
126 available_agents: Vec<SubAgentDef>,
128 dependency_context_budget: usize,
130 buffered_events: VecDeque<TaskEvent>,
132 sanitizer: ContentSanitizer,
134 deferral_backoff: Duration,
136 consecutive_spawn_failures: u32,
138 topology: TopologyAnalysis,
140 topology_dirty: bool,
143 current_level: usize,
145 verify_completeness: bool,
147 verify_provider: String,
150 task_replan_counts: HashMap<TaskId, u32>,
152 global_replan_count: u32,
154 max_replans: u32,
156 completeness_threshold_value: f32,
159 cascade_detector: Option<CascadeDetector>,
161 tree_optimized_dispatch: bool,
164 cascade_routing: bool,
166}
167
168impl std::fmt::Debug for DagScheduler {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("DagScheduler")
171 .field("graph_id", &self.graph.id)
172 .field("graph_status", &self.graph.status)
173 .field("running_count", &self.running.len())
174 .field("max_parallel", &self.max_parallel)
175 .field("task_timeout_secs", &self.task_timeout.as_secs())
176 .field("topology", &self.topology.topology)
177 .field("strategy", &self.topology.strategy)
178 .field("current_level", &self.current_level)
179 .field("global_replan_count", &self.global_replan_count)
180 .field("cascade_routing", &self.cascade_routing)
181 .field("tree_optimized_dispatch", &self.tree_optimized_dispatch)
182 .finish_non_exhaustive()
183 }
184}
185
186impl DagScheduler {
187 pub fn new(
197 mut graph: TaskGraph,
198 config: &OrchestrationConfig,
199 router: Box<dyn AgentRouter>,
200 available_agents: Vec<SubAgentDef>,
201 ) -> Result<Self, OrchestrationError> {
202 if graph.status != GraphStatus::Created {
203 return Err(OrchestrationError::InvalidGraph(format!(
204 "graph must be in Created status, got {}",
205 graph.status
206 )));
207 }
208
209 dag::validate(&graph.tasks, config.max_tasks as usize)?;
210
211 graph.status = GraphStatus::Running;
212
213 for task in &mut graph.tasks {
214 if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
215 task.status = TaskStatus::Ready;
216 }
217 }
218
219 let (event_tx, event_rx) = mpsc::channel(64);
220
221 let task_timeout = if config.task_timeout_secs > 0 {
222 Duration::from_secs(config.task_timeout_secs)
223 } else {
224 Duration::from_secs(600)
225 };
226
227 let topology = TopologyClassifier::analyze(&graph, config);
228 let max_parallel = topology.max_parallel;
229 let config_max_parallel = config.max_parallel as usize;
230
231 if config.topology_selection {
232 tracing::debug!(
233 topology = ?topology.topology,
234 strategy = ?topology.strategy,
235 max_parallel,
236 "topology-aware concurrency limit applied"
237 );
238 }
239
240 if config.cascade_routing && !config.topology_selection {
242 tracing::warn!(
243 "cascade_routing = true requires topology_selection = true; \
244 cascade routing is disabled (topology_selection is off)"
245 );
246 }
247
248 let cascade_detector = if config.cascade_routing && config.topology_selection {
249 Some(CascadeDetector::new(CascadeConfig {
250 failure_threshold: config.cascade_failure_threshold,
251 }))
252 } else {
253 None
254 };
255
256 Ok(Self {
257 graph,
258 max_parallel,
259 config_max_parallel,
260 running: HashMap::new(),
261 event_rx,
262 event_tx,
263 task_timeout,
264 router,
265 available_agents,
266 dependency_context_budget: config.dependency_context_budget,
267 buffered_events: VecDeque::new(),
268 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
269 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
270 consecutive_spawn_failures: 0,
271 topology,
272 topology_dirty: false,
273 current_level: 0,
274 verify_completeness: config.verify_completeness,
275 verify_provider: config.verify_provider.as_str().trim().to_owned(),
276 task_replan_counts: HashMap::new(),
277 global_replan_count: 0,
278 max_replans: config.max_replans,
279 completeness_threshold_value: config.completeness_threshold,
280 cascade_detector,
281 tree_optimized_dispatch: config.tree_optimized_dispatch,
282 cascade_routing: config.cascade_routing && config.topology_selection,
283 })
284 }
285
286 pub fn resume_from(
300 mut graph: TaskGraph,
301 config: &OrchestrationConfig,
302 router: Box<dyn AgentRouter>,
303 available_agents: Vec<SubAgentDef>,
304 ) -> Result<Self, OrchestrationError> {
305 if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
306 return Err(OrchestrationError::InvalidGraph(format!(
307 "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
308 graph.status
309 )));
310 }
311
312 graph.status = GraphStatus::Running;
315
316 let running: HashMap<TaskId, RunningTask> = graph
321 .tasks
322 .iter()
323 .filter(|t| t.status == TaskStatus::Running)
324 .filter_map(|t| {
325 let handle_id = t.assigned_agent.clone()?;
326 let def_name = t.agent_hint.clone().unwrap_or_default();
327 Some((
328 t.id,
329 RunningTask {
330 agent_handle_id: handle_id,
331 agent_def_name: def_name,
332 started_at: Instant::now(),
334 },
335 ))
336 })
337 .collect();
338
339 let (event_tx, event_rx) = mpsc::channel(64);
340
341 let task_timeout = if config.task_timeout_secs > 0 {
342 Duration::from_secs(config.task_timeout_secs)
343 } else {
344 Duration::from_secs(600)
345 };
346
347 let topology = TopologyClassifier::analyze(&graph, config);
348 let max_parallel = topology.max_parallel;
349 let config_max_parallel = config.max_parallel as usize;
350
351 let cascade_detector = if config.cascade_routing && config.topology_selection {
352 Some(CascadeDetector::new(CascadeConfig {
353 failure_threshold: config.cascade_failure_threshold,
354 }))
355 } else {
356 None
357 };
358
359 Ok(Self {
360 graph,
361 max_parallel,
362 config_max_parallel,
363 running,
364 event_rx,
365 event_tx,
366 task_timeout,
367 router,
368 available_agents,
369 dependency_context_budget: config.dependency_context_budget,
370 buffered_events: VecDeque::new(),
371 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
372 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
373 consecutive_spawn_failures: 0,
374 topology,
375 topology_dirty: false,
376 current_level: 0,
377 verify_completeness: config.verify_completeness,
378 verify_provider: config.verify_provider.as_str().trim().to_owned(),
379 task_replan_counts: HashMap::new(),
380 global_replan_count: 0,
381 max_replans: config.max_replans,
382 completeness_threshold_value: config.completeness_threshold,
383 cascade_detector,
384 tree_optimized_dispatch: config.tree_optimized_dispatch,
385 cascade_routing: config.cascade_routing && config.topology_selection,
386 })
387 }
388
389 pub fn validate_verify_config(
403 &self,
404 provider_names: &[&str],
405 ) -> Result<(), OrchestrationError> {
406 if !self.verify_completeness {
407 return Ok(());
408 }
409 let name = self.verify_provider.as_str();
410 if name.is_empty() || provider_names.is_empty() {
411 return Ok(());
412 }
413 if !provider_names.contains(&name) {
414 return Err(OrchestrationError::InvalidConfig(format!(
415 "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
416 name,
417 provider_names.join(", ")
418 )));
419 }
420 Ok(())
421 }
422
423 #[must_use]
425 pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
426 self.event_tx.clone()
427 }
428
429 #[must_use]
431 pub fn graph(&self) -> &TaskGraph {
432 &self.graph
433 }
434
435 #[must_use]
439 pub fn into_graph(&self) -> TaskGraph {
440 self.graph.clone()
441 }
442
443 #[must_use]
445 pub fn topology(&self) -> &TopologyAnalysis {
446 &self.topology
447 }
448
449 #[must_use]
454 pub fn completeness_threshold(&self) -> f32 {
455 self.completeness_threshold_value
456 }
457
458 #[must_use]
460 pub fn verify_provider_name(&self) -> &str {
461 &self.verify_provider
462 }
463
464 #[must_use]
468 pub fn max_replans_remaining(&self) -> u32 {
469 self.max_replans.saturating_sub(self.global_replan_count)
470 }
471
472 pub fn record_whole_plan_replan(&mut self) {
477 self.global_replan_count = self.global_replan_count.saturating_add(1);
478 }
479
480 pub fn inject_tasks(
495 &mut self,
496 verified_task_id: TaskId,
497 new_tasks: Vec<TaskNode>,
498 max_tasks: usize,
499 ) -> Result<(), OrchestrationError> {
500 if new_tasks.is_empty() {
501 return Ok(());
502 }
503
504 let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
506 if *task_replan_count >= 1 {
507 tracing::warn!(
508 task_id = %verified_task_id,
509 "per-task replan limit (1) reached, skipping replan injection"
510 );
511 return Ok(());
512 }
513
514 if self.global_replan_count >= self.max_replans {
516 tracing::warn!(
517 global_replan_count = self.global_replan_count,
518 max_replans = self.max_replans,
519 "global replan limit reached, skipping replan injection"
520 );
521 return Ok(());
522 }
523
524 verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
525
526 *task_replan_count += 1;
527 self.global_replan_count += 1;
528
529 self.topology_dirty = true;
531
532 if let Some(ref mut det) = self.cascade_detector {
534 det.reset();
535 }
536
537 Ok(())
538 }
539}
540
541impl Drop for DagScheduler {
542 fn drop(&mut self) {
543 if !self.running.is_empty() {
544 tracing::warn!(
545 running_tasks = self.running.len(),
546 "DagScheduler dropped with running tasks; agents may continue until their \
547 CancellationToken fires or they complete naturally"
548 );
549 }
550 }
551}
552
553impl DagScheduler {
554 #[allow(clippy::too_many_lines)]
558 pub fn tick(&mut self) -> Vec<SchedulerAction> {
559 if self.graph.status != GraphStatus::Running {
560 return vec![SchedulerAction::Done {
561 status: self.graph.status,
562 }];
563 }
564
565 self.reanalyze_topology_if_dirty();
566
567 let mut actions = Vec::new();
568
569 while let Some(event) = self.buffered_events.pop_front() {
571 let cancel_actions = self.process_event(event);
572 actions.extend(cancel_actions);
573 }
574 while let Ok(event) = self.event_rx.try_recv() {
575 let cancel_actions = self.process_event(event);
576 actions.extend(cancel_actions);
577 }
578
579 if self.graph.status != GraphStatus::Running {
580 return actions;
581 }
582
583 let timeout_actions = self.check_timeouts();
585 actions.extend(timeout_actions);
586
587 if self.graph.status != GraphStatus::Running {
588 return actions;
589 }
590
591 let raw_ready = dag::ready_tasks(&self.graph);
597
598 let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::CascadeAware {
602 if let Some(ref detector) = self.cascade_detector {
603 let deprioritized = detector.deprioritized_tasks(&self.graph);
604 if deprioritized.is_empty() {
605 raw_ready
606 } else {
607 let (preferred, deferred): (Vec<_>, Vec<_>) =
608 raw_ready.into_iter().partition(|id| {
609 let is_sequential = self.graph.tasks[id.index()].execution_mode
610 == ExecutionMode::Sequential;
611 is_sequential || !deprioritized.contains(id)
613 });
614 preferred.into_iter().chain(deferred).collect()
615 }
616 } else {
617 raw_ready
618 }
619 } else {
620 raw_ready
621 };
622
623 let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::TreeOptimized {
627 let max_depth = self.topology.depth;
628 let mut sortable = ready;
629 sortable.sort_by_key(|id| {
630 let task_depth = self.topology.depths.get(id).copied().unwrap_or(0);
631 max_depth.saturating_sub(task_depth)
633 });
634 sortable
635 } else {
636 ready
637 };
638
639 self.advance_level_barrier_if_needed();
640
641 let mut slots = self.max_parallel.saturating_sub(self.running.len());
643
644 let mut sequential_spawned_this_tick = false;
647 let has_running_sequential = self
648 .running
649 .keys()
650 .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
651
652 for task_id in ready {
653 if slots == 0 {
654 break;
655 }
656
657 if self.topology.strategy == DispatchStrategy::LevelBarrier {
659 let task_depth = self
660 .topology
661 .depths
662 .get(&task_id)
663 .copied()
664 .unwrap_or(usize::MAX);
665 if task_depth != self.current_level {
666 continue;
667 }
668 }
669
670 let task = &self.graph.tasks[task_id.index()];
671
672 if task.execution_mode == ExecutionMode::Sequential {
676 if sequential_spawned_this_tick || has_running_sequential {
677 continue;
678 }
679 sequential_spawned_this_tick = true;
680 }
681
682 let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
683 tracing::debug!(
684 task_id = %task_id,
685 title = %task.title,
686 "no agent available, routing task to main agent inline"
687 );
688 let prompt = self.build_task_prompt(task);
689 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
690 actions.push(SchedulerAction::RunInline { task_id, prompt });
691 slots -= 1;
692 continue;
693 };
694
695 let prompt = self.build_task_prompt(task);
696
697 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
699
700 actions.push(SchedulerAction::Spawn {
701 task_id,
702 agent_def_name,
703 prompt,
704 });
705 slots -= 1;
706 }
707
708 actions.extend(self.check_graph_completion());
709
710 actions
711 }
712
713 fn reanalyze_topology_if_dirty(&mut self) {
714 if !self.topology_dirty {
715 return;
716 }
717 let new_analysis = {
718 let n = self.graph.tasks.len();
719 if n == 0 {
720 TopologyAnalysis {
721 topology: Topology::AllParallel,
722 strategy: DispatchStrategy::FullParallel,
723 max_parallel: self.config_max_parallel,
724 depth: 0,
725 depths: std::collections::HashMap::new(),
726 }
727 } else {
728 let (depth, depths) = super::topology::compute_depths_for_scheduler(&self.graph);
729 let topo = TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
730 let strategy_config = zeph_config::OrchestrationConfig {
731 cascade_routing: self.cascade_routing,
732 tree_optimized_dispatch: self.tree_optimized_dispatch,
733 ..zeph_config::OrchestrationConfig::default()
734 };
735 let strategy = TopologyClassifier::strategy(topo, &strategy_config);
736 let max_parallel =
737 TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
738 TopologyAnalysis {
739 topology: topo,
740 strategy,
741 max_parallel,
742 depth,
743 depths,
744 }
745 }
746 };
747 self.topology = new_analysis;
748 self.max_parallel = self.topology.max_parallel;
749 self.topology_dirty = false;
750 if self.topology.strategy == DispatchStrategy::LevelBarrier {
751 let min_active = self
752 .graph
753 .tasks
754 .iter()
755 .filter(|t| !t.status.is_terminal())
756 .filter_map(|t| self.topology.depths.get(&t.id).copied())
757 .min();
758 if let Some(min_depth) = min_active {
759 self.current_level = self.current_level.min(min_depth);
760 }
761 }
762 }
763
764 fn advance_level_barrier_if_needed(&mut self) {
765 if self.topology.strategy != DispatchStrategy::LevelBarrier {
766 return;
767 }
768 let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
769 let task_depth = self
770 .topology
771 .depths
772 .get(&t.id)
773 .copied()
774 .unwrap_or(usize::MAX);
775 task_depth != self.current_level || t.status.is_terminal()
776 });
777 if all_current_level_terminal {
778 let max_depth = self.topology.depth;
779 while self.current_level <= max_depth {
780 let has_non_terminal = self.graph.tasks.iter().any(|t| {
781 let d = self
782 .topology
783 .depths
784 .get(&t.id)
785 .copied()
786 .unwrap_or(usize::MAX);
787 d == self.current_level && !t.status.is_terminal()
788 });
789 if has_non_terminal {
790 break;
791 }
792 self.current_level += 1;
793 }
794 }
795 }
796
797 fn check_graph_completion(&mut self) -> Vec<SchedulerAction> {
798 let running_in_graph_now = self
799 .graph
800 .tasks
801 .iter()
802 .filter(|t| t.status == TaskStatus::Running)
803 .count();
804 if running_in_graph_now != 0 || !self.running.is_empty() {
805 return vec![];
806 }
807 let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
808 if all_terminal {
809 self.graph.status = GraphStatus::Completed;
810 self.graph.finished_at = Some(super::graph::chrono_now());
811 return vec![SchedulerAction::Done {
812 status: GraphStatus::Completed,
813 }];
814 }
815 if dag::ready_tasks(&self.graph).is_empty() {
816 tracing::error!(
817 "scheduler deadlock: no running or ready tasks, but graph not complete"
818 );
819 self.graph.status = GraphStatus::Failed;
820 self.graph.finished_at = Some(super::graph::chrono_now());
821 debug_assert!(
822 self.running.is_empty(),
823 "deadlock branch reached with non-empty running map"
824 );
825 for task in &mut self.graph.tasks {
826 if !task.status.is_terminal() {
827 task.status = TaskStatus::Canceled;
828 }
829 }
830 return vec![SchedulerAction::Done {
831 status: GraphStatus::Failed,
832 }];
833 }
834 vec![]
835 }
836
837 fn current_deferral_backoff(&self) -> Duration {
846 const MAX_BACKOFF: Duration = Duration::from_secs(5);
847 let multiplier = 1u32
848 .checked_shl(self.consecutive_spawn_failures.min(10))
849 .unwrap_or(u32::MAX);
850 self.deferral_backoff
851 .saturating_mul(multiplier)
852 .min(MAX_BACKOFF)
853 }
854
855 pub async fn wait_event(&mut self) {
856 if self.running.is_empty() {
857 tokio::time::sleep(self.current_deferral_backoff()).await;
858 return;
859 }
860
861 let nearest_timeout = self
863 .running
864 .values()
865 .map(|r| {
866 self.task_timeout
867 .checked_sub(r.started_at.elapsed())
868 .unwrap_or(Duration::ZERO)
869 })
870 .min()
871 .unwrap_or(Duration::from_secs(1));
872
873 let wait_duration = nearest_timeout.max(Duration::from_millis(100));
875
876 tokio::select! {
877 Some(event) = self.event_rx.recv() => {
878 if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
882 if let Some(dropped) = self.buffered_events.pop_front() {
885 tracing::error!(
886 task_id = %dropped.task_id,
887 buffer_len = self.buffered_events.len(),
888 "event buffer saturated; completion event dropped — task may \
889 remain Running until timeout"
890 );
891 }
892 }
893 self.buffered_events.push_back(event);
894 }
895 () = tokio::time::sleep(wait_duration) => {}
896 }
897 }
898
899 pub fn record_spawn(
909 &mut self,
910 task_id: TaskId,
911 agent_handle_id: String,
912 agent_def_name: String,
913 ) {
914 self.consecutive_spawn_failures = 0;
915 self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
916 self.running.insert(
917 task_id,
918 RunningTask {
919 agent_handle_id,
920 agent_def_name,
921 started_at: Instant::now(),
922 },
923 );
924 }
925
926 pub fn record_spawn_failure(
937 &mut self,
938 task_id: TaskId,
939 error: &SubAgentError,
940 ) -> Vec<SchedulerAction> {
941 if let SubAgentError::ConcurrencyLimit { active, max } = error {
945 tracing::warn!(
946 task_id = %task_id,
947 active,
948 max,
949 next_backoff_ms = self.current_deferral_backoff().as_millis(),
950 "concurrency limit reached, deferring task to next tick"
951 );
952 self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
953 return Vec::new();
954 }
955
956 let error_excerpt: String = error.to_string().chars().take(512).collect();
958 tracing::warn!(
959 task_id = %task_id,
960 error = %error_excerpt,
961 "spawn failed, marking task failed"
962 );
963 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
964 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
965 let mut actions = Vec::new();
966 for cancel_task_id in cancel_ids {
967 if let Some(running) = self.running.remove(&cancel_task_id) {
968 actions.push(SchedulerAction::Cancel {
969 agent_handle_id: running.agent_handle_id,
970 });
971 }
972 }
973 if self.graph.status != GraphStatus::Running {
974 self.graph.finished_at = Some(super::graph::chrono_now());
975 actions.push(SchedulerAction::Done {
976 status: self.graph.status,
977 });
978 }
979 actions
980 }
981
982 pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
991 if any_success {
992 self.consecutive_spawn_failures = 0;
993 } else if any_concurrency_failure {
994 self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
995 }
996 }
997
998 pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
1007 self.graph.status = GraphStatus::Canceled;
1008 self.graph.finished_at = Some(super::graph::chrono_now());
1009
1010 let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
1012 let mut actions: Vec<SchedulerAction> = running
1013 .into_iter()
1014 .map(|(task_id, r)| {
1015 self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
1016 SchedulerAction::Cancel {
1017 agent_handle_id: r.agent_handle_id,
1018 }
1019 })
1020 .collect();
1021
1022 for task in &mut self.graph.tasks {
1023 if !task.status.is_terminal() {
1024 task.status = TaskStatus::Canceled;
1025 }
1026 }
1027
1028 actions.push(SchedulerAction::Done {
1029 status: GraphStatus::Canceled,
1030 });
1031 actions
1032 }
1033}
1034
1035impl DagScheduler {
1036 fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
1038 let TaskEvent {
1039 task_id,
1040 agent_handle_id,
1041 outcome,
1042 } = event;
1043
1044 match self.running.get(&task_id) {
1047 Some(running) if running.agent_handle_id != agent_handle_id => {
1048 tracing::warn!(
1049 task_id = %task_id,
1050 expected = %running.agent_handle_id,
1051 got = %agent_handle_id,
1052 "discarding stale event from previous agent incarnation"
1053 );
1054 return Vec::new();
1055 }
1056 None => {
1057 tracing::debug!(
1058 task_id = %task_id,
1059 agent_handle_id = %agent_handle_id,
1060 "ignoring event for task not in running map"
1061 );
1062 return Vec::new();
1063 }
1064 Some(_) => {}
1065 }
1066
1067 let duration_ms = self.running.get(&task_id).map_or(0, |r| {
1069 u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
1070 });
1071 let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
1072
1073 self.running.remove(&task_id);
1074
1075 match outcome {
1076 TaskOutcome::Completed { output, artifacts } => {
1077 self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
1078 self.graph.tasks[task_id.index()].result = Some(TaskResult {
1079 output: output.clone(),
1080 artifacts,
1081 duration_ms,
1082 agent_id: Some(agent_handle_id),
1083 agent_def: agent_def_name,
1084 });
1085
1086 if let Some(ref mut detector) = self.cascade_detector {
1088 detector.record_outcome(task_id, true, &self.graph);
1089 }
1090
1091 let newly_ready = dag::ready_tasks(&self.graph);
1094 for ready_id in newly_ready {
1095 if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
1096 self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
1097 }
1098 }
1099
1100 if self.verify_completeness {
1106 vec![SchedulerAction::Verify { task_id, output }]
1107 } else {
1108 Vec::new()
1109 }
1110 }
1111
1112 TaskOutcome::Failed { error } => {
1113 let error_excerpt: String = error.chars().take(512).collect();
1115 tracing::warn!(
1116 task_id = %task_id,
1117 error = %error_excerpt,
1118 "task failed"
1119 );
1120 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1121
1122 if let Some(ref mut detector) = self.cascade_detector {
1124 detector.record_outcome(task_id, false, &self.graph);
1125 }
1126
1127 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1128 let mut actions = Vec::new();
1129
1130 for cancel_task_id in cancel_ids {
1131 if let Some(running) = self.running.remove(&cancel_task_id) {
1132 actions.push(SchedulerAction::Cancel {
1133 agent_handle_id: running.agent_handle_id,
1134 });
1135 }
1136 }
1137
1138 if self.graph.status != GraphStatus::Running {
1139 self.graph.finished_at = Some(super::graph::chrono_now());
1140 actions.push(SchedulerAction::Done {
1141 status: self.graph.status,
1142 });
1143 }
1144
1145 actions
1146 }
1147 }
1148 }
1149
1150 fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1158 let timed_out: Vec<(TaskId, String)> = self
1159 .running
1160 .iter()
1161 .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1162 .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1163 .collect();
1164
1165 let mut actions = Vec::new();
1166 for (task_id, agent_handle_id) in timed_out {
1167 tracing::warn!(
1168 task_id = %task_id,
1169 timeout_secs = self.task_timeout.as_secs(),
1170 "task timed out"
1171 );
1172 self.running.remove(&task_id);
1173 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1174
1175 actions.push(SchedulerAction::Cancel { agent_handle_id });
1176
1177 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1178 for cancel_task_id in cancel_ids {
1179 if let Some(running) = self.running.remove(&cancel_task_id) {
1180 actions.push(SchedulerAction::Cancel {
1181 agent_handle_id: running.agent_handle_id,
1182 });
1183 }
1184 }
1185
1186 if self.graph.status != GraphStatus::Running {
1187 self.graph.finished_at = Some(super::graph::chrono_now());
1188 actions.push(SchedulerAction::Done {
1189 status: self.graph.status,
1190 });
1191 break;
1192 }
1193 }
1194
1195 actions
1196 }
1197
1198 fn build_task_prompt(&self, task: &TaskNode) -> String {
1204 if task.depends_on.is_empty() {
1205 return task.description.clone();
1206 }
1207
1208 let completed_deps: Vec<&TaskNode> = task
1209 .depends_on
1210 .iter()
1211 .filter_map(|dep_id| {
1212 let dep = &self.graph.tasks[dep_id.index()];
1213 if dep.status == TaskStatus::Completed {
1214 Some(dep)
1215 } else {
1216 None
1217 }
1218 })
1219 .collect();
1220
1221 if completed_deps.is_empty() {
1222 return task.description.clone();
1223 }
1224
1225 let budget_per_dep = self
1226 .dependency_context_budget
1227 .checked_div(completed_deps.len())
1228 .unwrap_or(self.dependency_context_budget);
1229
1230 let mut context_block = String::from("<completed-dependencies>\n");
1231
1232 for dep in &completed_deps {
1233 let escaped_id = xml_escape(&dep.id.to_string());
1236 let escaped_title = xml_escape(&dep.title);
1237 let _ = writeln!(
1238 context_block,
1239 "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1240 );
1241
1242 if let Some(ref result) = dep.result {
1243 let source = ContentSource::new(ContentSourceKind::A2aMessage);
1245 let sanitized = self.sanitizer.sanitize(&result.output, source);
1246 let safe_output = sanitized.body;
1247
1248 let char_count = safe_output.chars().count();
1250 if char_count > budget_per_dep {
1251 let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1252 let _ = write!(
1253 context_block,
1254 "{truncated}...\n[truncated: {char_count} chars total]"
1255 );
1256 } else {
1257 context_block.push_str(&safe_output);
1258 }
1259 } else {
1260 context_block.push_str("[no output recorded]\n");
1261 }
1262 context_block.push('\n');
1263 }
1264
1265 for dep_id in &task.depends_on {
1267 let dep = &self.graph.tasks[dep_id.index()];
1268 if dep.status == TaskStatus::Skipped {
1269 let escaped_id = xml_escape(&dep.id.to_string());
1270 let escaped_title = xml_escape(&dep.title);
1271 let _ = writeln!(
1272 context_block,
1273 "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1274 );
1275 }
1276 }
1277
1278 context_block.push_str("</completed-dependencies>\n\n");
1279 format!("{context_block}Your task: {}", task.description)
1280 }
1281}
1282
1283fn xml_escape(s: &str) -> String {
1285 let mut out = String::with_capacity(s.len());
1286 for c in s.chars() {
1287 match c {
1288 '<' => out.push_str("<"),
1289 '>' => out.push_str(">"),
1290 '&' => out.push_str("&"),
1291 '"' => out.push_str("""),
1292 '\'' => out.push_str("'"),
1293 other => out.push(other),
1294 }
1295 }
1296 out
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 #![allow(clippy::default_trait_access)]
1302
1303 use super::*;
1304 use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1305
1306 fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1307 let mut n = TaskNode::new(
1308 id,
1309 format!("task-{id}"),
1310 format!("description for task {id}"),
1311 );
1312 n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1313 n
1314 }
1315
1316 fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1317 let mut g = TaskGraph::new("test goal");
1318 g.tasks = nodes;
1319 g
1320 }
1321
1322 fn make_def(name: &str) -> SubAgentDef {
1323 use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1324 SubAgentDef {
1325 name: name.to_string(),
1326 description: format!("{name} agent"),
1327 model: None,
1328 tools: ToolPolicy::InheritAll,
1329 disallowed_tools: vec![],
1330 permissions: SubAgentPermissions::default(),
1331 skills: SkillFilter::default(),
1332 system_prompt: String::new(),
1333 hooks: SubagentHooks::default(),
1334 memory: None,
1335 source: None,
1336 file_path: None,
1337 }
1338 }
1339
1340 fn make_config() -> zeph_config::OrchestrationConfig {
1341 zeph_config::OrchestrationConfig {
1342 enabled: true,
1343 max_tasks: 20,
1344 max_parallel: 4,
1345 default_failure_strategy: "abort".to_string(),
1346 default_max_retries: 3,
1347 task_timeout_secs: 300,
1348 planner_provider: Default::default(),
1349 planner_max_tokens: 4096,
1350 dependency_context_budget: 16384,
1351 confirm_before_execute: true,
1352 aggregator_max_tokens: 4096,
1353 deferral_backoff_ms: 250,
1354 plan_cache: zeph_config::PlanCacheConfig::default(),
1355 topology_selection: false,
1356 verify_provider: Default::default(),
1357 verify_max_tokens: 1024,
1358 max_replans: 2,
1359 verify_completeness: false,
1360 completeness_threshold: 0.7,
1361 tool_provider: Default::default(),
1362 cascade_routing: false,
1363 cascade_failure_threshold: 0.5,
1364 tree_optimized_dispatch: false,
1365 }
1366 }
1367
1368 struct FirstRouter;
1369 impl AgentRouter for FirstRouter {
1370 fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1371 available.first().map(|d| d.name.clone())
1372 }
1373 }
1374
1375 struct NoneRouter;
1376 impl AgentRouter for NoneRouter {
1377 fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1378 None
1379 }
1380 }
1381
1382 fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1383 let config = make_config();
1384 let defs = vec![make_def("worker")];
1385 DagScheduler::new(graph, &config, router, defs).unwrap()
1386 }
1387
1388 fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1389 let config = make_config();
1390 let defs = vec![make_def("worker")];
1391 DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1392 }
1393
1394 #[test]
1397 fn test_new_validates_graph_status() {
1398 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1399 graph.status = GraphStatus::Running; let config = make_config();
1401 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1402 assert!(result.is_err());
1403 let err = result.unwrap_err();
1404 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1405 }
1406
1407 #[test]
1408 fn test_new_marks_roots_ready() {
1409 let graph = graph_from_nodes(vec![
1410 make_node(0, &[]),
1411 make_node(1, &[]),
1412 make_node(2, &[0, 1]),
1413 ]);
1414 let scheduler = make_scheduler(graph);
1415 assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1416 assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1417 assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1418 assert_eq!(scheduler.graph().status, GraphStatus::Running);
1419 }
1420
1421 #[test]
1422 fn test_new_validates_empty_graph() {
1423 let graph = graph_from_nodes(vec![]);
1424 let config = make_config();
1425 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1426 assert!(result.is_err());
1427 }
1428
1429 #[test]
1432 fn test_tick_produces_spawn_for_ready() {
1433 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1434 let mut scheduler = make_scheduler(graph);
1435 let actions = scheduler.tick();
1436 let spawns: Vec<_> = actions
1437 .iter()
1438 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1439 .collect();
1440 assert_eq!(spawns.len(), 2);
1441 }
1442
1443 #[test]
1444 fn test_tick_dispatches_all_regardless_of_max_parallel() {
1445 let graph = graph_from_nodes(vec![
1448 make_node(0, &[]),
1449 make_node(1, &[]),
1450 make_node(2, &[]),
1451 make_node(3, &[]),
1452 make_node(4, &[]),
1453 ]);
1454 let mut config = make_config();
1455 config.max_parallel = 2;
1456 let defs = vec![make_def("worker")];
1457 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1458 let actions = scheduler.tick();
1459 let spawn_count = actions
1460 .iter()
1461 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1462 .count();
1463 assert_eq!(
1464 spawn_count, 2,
1465 "max_parallel=2 caps dispatched tasks per tick"
1466 );
1467 }
1468
1469 #[test]
1470 fn test_tick_detects_completion() {
1471 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1472 graph.tasks[0].status = TaskStatus::Completed;
1473 let config = make_config();
1474 let defs = vec![make_def("worker")];
1475 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1476 let actions = scheduler.tick();
1479 let has_done = actions.iter().any(|a| {
1480 matches!(
1481 a,
1482 SchedulerAction::Done {
1483 status: GraphStatus::Completed
1484 }
1485 )
1486 });
1487 assert!(
1488 has_done,
1489 "should emit Done(Completed) when all tasks are terminal"
1490 );
1491 }
1492
1493 #[test]
1496 fn test_completion_event_marks_deps_ready() {
1497 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1498 let mut scheduler = make_scheduler(graph);
1499
1500 scheduler.graph.tasks[0].status = TaskStatus::Running;
1502 scheduler.running.insert(
1503 TaskId(0),
1504 RunningTask {
1505 agent_handle_id: "handle-0".to_string(),
1506 agent_def_name: "worker".to_string(),
1507 started_at: Instant::now(),
1508 },
1509 );
1510
1511 let event = TaskEvent {
1512 task_id: TaskId(0),
1513 agent_handle_id: "handle-0".to_string(),
1514 outcome: TaskOutcome::Completed {
1515 output: "done".to_string(),
1516 artifacts: vec![],
1517 },
1518 };
1519 scheduler.buffered_events.push_back(event);
1520
1521 let actions = scheduler.tick();
1522 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1523 let has_spawn_1 = actions
1525 .iter()
1526 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1527 assert!(
1528 has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1529 "task 1 should be spawned or marked Ready"
1530 );
1531 }
1532
1533 #[test]
1534 fn test_failure_abort_cancels_running() {
1535 let graph = graph_from_nodes(vec![
1536 make_node(0, &[]),
1537 make_node(1, &[]),
1538 make_node(2, &[0, 1]),
1539 ]);
1540 let mut scheduler = make_scheduler(graph);
1541
1542 scheduler.graph.tasks[0].status = TaskStatus::Running;
1544 scheduler.running.insert(
1545 TaskId(0),
1546 RunningTask {
1547 agent_handle_id: "h0".to_string(),
1548 agent_def_name: "worker".to_string(),
1549 started_at: Instant::now(),
1550 },
1551 );
1552 scheduler.graph.tasks[1].status = TaskStatus::Running;
1553 scheduler.running.insert(
1554 TaskId(1),
1555 RunningTask {
1556 agent_handle_id: "h1".to_string(),
1557 agent_def_name: "worker".to_string(),
1558 started_at: Instant::now(),
1559 },
1560 );
1561
1562 let event = TaskEvent {
1564 task_id: TaskId(0),
1565 agent_handle_id: "h0".to_string(),
1566 outcome: TaskOutcome::Failed {
1567 error: "boom".to_string(),
1568 },
1569 };
1570 scheduler.buffered_events.push_back(event);
1571
1572 let actions = scheduler.tick();
1573 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1574 let cancel_ids: Vec<_> = actions
1575 .iter()
1576 .filter_map(|a| {
1577 if let SchedulerAction::Cancel { agent_handle_id } = a {
1578 Some(agent_handle_id.as_str())
1579 } else {
1580 None
1581 }
1582 })
1583 .collect();
1584 assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1585 assert!(
1586 actions
1587 .iter()
1588 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1589 );
1590 }
1591
1592 #[test]
1593 fn test_failure_skip_propagates() {
1594 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1595 let mut scheduler = make_scheduler(graph);
1596
1597 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1599 scheduler.graph.tasks[0].status = TaskStatus::Running;
1600 scheduler.running.insert(
1601 TaskId(0),
1602 RunningTask {
1603 agent_handle_id: "h0".to_string(),
1604 agent_def_name: "worker".to_string(),
1605 started_at: Instant::now(),
1606 },
1607 );
1608
1609 let event = TaskEvent {
1610 task_id: TaskId(0),
1611 agent_handle_id: "h0".to_string(),
1612 outcome: TaskOutcome::Failed {
1613 error: "skip me".to_string(),
1614 },
1615 };
1616 scheduler.buffered_events.push_back(event);
1617 scheduler.tick();
1618
1619 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1620 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1621 }
1622
1623 #[test]
1624 fn test_failure_retry_reschedules() {
1625 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1626 let mut scheduler = make_scheduler(graph);
1627
1628 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1629 scheduler.graph.tasks[0].max_retries = Some(3);
1630 scheduler.graph.tasks[0].retry_count = 0;
1631 scheduler.graph.tasks[0].status = TaskStatus::Running;
1632 scheduler.running.insert(
1633 TaskId(0),
1634 RunningTask {
1635 agent_handle_id: "h0".to_string(),
1636 agent_def_name: "worker".to_string(),
1637 started_at: Instant::now(),
1638 },
1639 );
1640
1641 let event = TaskEvent {
1642 task_id: TaskId(0),
1643 agent_handle_id: "h0".to_string(),
1644 outcome: TaskOutcome::Failed {
1645 error: "transient".to_string(),
1646 },
1647 };
1648 scheduler.buffered_events.push_back(event);
1649 let actions = scheduler.tick();
1650
1651 let has_spawn = actions
1653 .iter()
1654 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1655 assert!(
1656 has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1657 "retry should produce spawn or Ready status"
1658 );
1659 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1661 }
1662
1663 #[test]
1664 fn test_process_event_failed_retry() {
1665 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1667 let mut scheduler = make_scheduler(graph);
1668
1669 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1670 scheduler.graph.tasks[0].max_retries = Some(2);
1671 scheduler.graph.tasks[0].retry_count = 0;
1672 scheduler.graph.tasks[0].status = TaskStatus::Running;
1673 scheduler.running.insert(
1674 TaskId(0),
1675 RunningTask {
1676 agent_handle_id: "h0".to_string(),
1677 agent_def_name: "worker".to_string(),
1678 started_at: Instant::now(),
1679 },
1680 );
1681
1682 let event = TaskEvent {
1683 task_id: TaskId(0),
1684 agent_handle_id: "h0".to_string(),
1685 outcome: TaskOutcome::Failed {
1686 error: "first failure".to_string(),
1687 },
1688 };
1689 scheduler.buffered_events.push_back(event);
1690 let actions = scheduler.tick();
1691
1692 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1694 let spawned = actions
1695 .iter()
1696 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1697 assert!(
1698 spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1699 "retry should emit Spawn or set Ready"
1700 );
1701 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1703 }
1704
1705 #[test]
1706 fn test_timeout_cancels_stalled() {
1707 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1708 let mut config = make_config();
1709 config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
1711 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1712
1713 scheduler.graph.tasks[0].status = TaskStatus::Running;
1715 scheduler.running.insert(
1716 TaskId(0),
1717 RunningTask {
1718 agent_handle_id: "h0".to_string(),
1719 agent_def_name: "worker".to_string(),
1720 started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
1722 );
1723
1724 let actions = scheduler.tick();
1725 let has_cancel = actions.iter().any(
1726 |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1727 );
1728 assert!(has_cancel, "timed-out task should emit Cancel action");
1729 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1730 }
1731
1732 #[test]
1733 fn test_cancel_all() {
1734 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1735 let mut scheduler = make_scheduler(graph);
1736
1737 scheduler.graph.tasks[0].status = TaskStatus::Running;
1738 scheduler.running.insert(
1739 TaskId(0),
1740 RunningTask {
1741 agent_handle_id: "h0".to_string(),
1742 agent_def_name: "worker".to_string(),
1743 started_at: Instant::now(),
1744 },
1745 );
1746 scheduler.graph.tasks[1].status = TaskStatus::Running;
1747 scheduler.running.insert(
1748 TaskId(1),
1749 RunningTask {
1750 agent_handle_id: "h1".to_string(),
1751 agent_def_name: "worker".to_string(),
1752 started_at: Instant::now(),
1753 },
1754 );
1755
1756 let actions = scheduler.cancel_all();
1757
1758 assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1759 assert!(scheduler.running.is_empty());
1760 let cancel_count = actions
1761 .iter()
1762 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1763 .count();
1764 assert_eq!(cancel_count, 2);
1765 assert!(actions.iter().any(|a| matches!(
1766 a,
1767 SchedulerAction::Done {
1768 status: GraphStatus::Canceled
1769 }
1770 )));
1771 }
1772
1773 #[test]
1774 fn test_record_spawn_failure() {
1775 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1776 let mut scheduler = make_scheduler(graph);
1777
1778 scheduler.graph.tasks[0].status = TaskStatus::Running;
1780
1781 let error = SubAgentError::Spawn("spawn error".to_string());
1782 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1783 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1784 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1786 assert!(
1787 actions
1788 .iter()
1789 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1790 );
1791 }
1792
1793 #[test]
1794 fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1795 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1796 let mut scheduler = make_scheduler(graph);
1797
1798 scheduler.graph.tasks[0].status = TaskStatus::Running;
1800
1801 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1803 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1804 assert_eq!(
1805 scheduler.graph.tasks[0].status,
1806 TaskStatus::Ready,
1807 "task must revert to Ready so the next tick can retry"
1808 );
1809 assert_eq!(
1810 scheduler.graph.status,
1811 GraphStatus::Running,
1812 "graph must stay Running, not transition to Failed"
1813 );
1814 assert!(
1815 actions.is_empty(),
1816 "no cancel or done actions expected for a transient deferral"
1817 );
1818 }
1819
1820 #[test]
1821 fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1822 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1824 let mut scheduler = make_scheduler(graph);
1825 scheduler.graph.tasks[0].status = TaskStatus::Running;
1826
1827 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1828 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1829 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1830 assert!(actions.is_empty());
1831 }
1832
1833 #[test]
1836 fn test_concurrency_deferral_does_not_affect_running_task() {
1837 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1841 let mut scheduler = make_scheduler(graph);
1842
1843 scheduler.graph.tasks[0].status = TaskStatus::Running;
1845 scheduler.running.insert(
1846 TaskId(0),
1847 RunningTask {
1848 agent_handle_id: "h0".to_string(),
1849 agent_def_name: "worker".to_string(),
1850 started_at: Instant::now(),
1851 },
1852 );
1853 scheduler.graph.tasks[1].status = TaskStatus::Running;
1854
1855 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1857 let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1858
1859 assert_eq!(
1860 scheduler.graph.tasks[0].status,
1861 TaskStatus::Running,
1862 "task 0 must remain Running"
1863 );
1864 assert_eq!(
1865 scheduler.graph.tasks[1].status,
1866 TaskStatus::Ready,
1867 "task 1 must revert to Ready"
1868 );
1869 assert_eq!(
1870 scheduler.graph.status,
1871 GraphStatus::Running,
1872 "graph must stay Running"
1873 );
1874 assert!(actions.is_empty(), "no cancel or done actions expected");
1875 }
1876
1877 #[test]
1878 fn test_max_concurrent_zero_no_infinite_loop() {
1879 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1884 let config = zeph_config::OrchestrationConfig {
1885 max_parallel: 0,
1886 ..make_config()
1887 };
1888 let mut scheduler = DagScheduler::new(
1889 graph,
1890 &config,
1891 Box::new(FirstRouter),
1892 vec![make_def("worker")],
1893 )
1894 .unwrap();
1895
1896 let actions1 = scheduler.tick();
1897 assert!(
1899 actions1
1900 .iter()
1901 .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1902 "no Spawn expected when max_parallel=0"
1903 );
1904 assert!(
1905 actions1
1906 .iter()
1907 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1908 "no Done(Failed) expected — ready tasks exist, so no deadlock"
1909 );
1910 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1911
1912 let actions2 = scheduler.tick();
1914 assert!(
1915 actions2
1916 .iter()
1917 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1918 "second tick must not emit Done(Failed) — ready tasks still exist"
1919 );
1920 assert_eq!(
1921 scheduler.graph.status,
1922 GraphStatus::Running,
1923 "graph must remain Running"
1924 );
1925 }
1926
1927 #[test]
1928 fn test_all_tasks_deferred_graph_stays_running() {
1929 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1933 let mut scheduler = make_scheduler(graph);
1934
1935 let actions = scheduler.tick();
1937 assert_eq!(
1938 actions
1939 .iter()
1940 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1941 .count(),
1942 2,
1943 "expected 2 Spawn actions on first tick"
1944 );
1945 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1946 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1947
1948 let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1950 let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1951 let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1952 assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1953 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1954 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1955 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1956
1957 let retry_actions = scheduler.tick();
1959 let spawn_count = retry_actions
1960 .iter()
1961 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1962 .count();
1963 assert!(
1964 spawn_count > 0,
1965 "second tick must re-emit Spawn for deferred tasks"
1966 );
1967 assert!(
1968 retry_actions.iter().all(|a| !matches!(
1969 a,
1970 SchedulerAction::Done {
1971 status: GraphStatus::Failed,
1972 ..
1973 }
1974 )),
1975 "no Done(Failed) expected"
1976 );
1977 }
1978
1979 #[test]
1980 fn test_build_prompt_no_deps() {
1981 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1982 let scheduler = make_scheduler(graph);
1983 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1984 assert_eq!(prompt, "description for task 0");
1985 }
1986
1987 #[test]
1988 fn test_build_prompt_with_deps_and_truncation() {
1989 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1990 graph.tasks[0].status = TaskStatus::Completed;
1991 graph.tasks[0].result = Some(TaskResult {
1993 output: "x".repeat(200),
1994 artifacts: vec![],
1995 duration_ms: 10,
1996 agent_id: None,
1997 agent_def: None,
1998 });
1999
2000 let config = zeph_config::OrchestrationConfig {
2001 dependency_context_budget: 50,
2002 ..make_config()
2003 };
2004 let scheduler = DagScheduler::new(
2005 graph,
2006 &config,
2007 Box::new(FirstRouter),
2008 vec![make_def("worker")],
2009 )
2010 .unwrap();
2011
2012 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2013 assert!(prompt.contains("<completed-dependencies>"));
2014 assert!(prompt.contains("[truncated:"));
2015 assert!(prompt.contains("Your task:"));
2016 }
2017
2018 #[test]
2019 fn test_duration_ms_computed_correctly() {
2020 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2022 let mut scheduler = make_scheduler(graph);
2023
2024 scheduler.graph.tasks[0].status = TaskStatus::Running;
2025 scheduler.running.insert(
2026 TaskId(0),
2027 RunningTask {
2028 agent_handle_id: "h0".to_string(),
2029 agent_def_name: "worker".to_string(),
2030 started_at: Instant::now()
2031 .checked_sub(Duration::from_millis(50))
2032 .unwrap(),
2033 },
2034 );
2035
2036 let event = TaskEvent {
2037 task_id: TaskId(0),
2038 agent_handle_id: "h0".to_string(),
2039 outcome: TaskOutcome::Completed {
2040 output: "result".to_string(),
2041 artifacts: vec![],
2042 },
2043 };
2044 scheduler.buffered_events.push_back(event);
2045 scheduler.tick();
2046
2047 let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
2048 assert!(
2049 result.duration_ms > 0,
2050 "duration_ms should be > 0, got {}",
2051 result.duration_ms
2052 );
2053 }
2054
2055 #[test]
2056 fn test_utf8_safe_truncation() {
2057 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2059 graph.tasks[0].status = TaskStatus::Completed;
2060 let unicode_output = "日本語テスト".repeat(100);
2062 graph.tasks[0].result = Some(TaskResult {
2063 output: unicode_output,
2064 artifacts: vec![],
2065 duration_ms: 10,
2066 agent_id: None,
2067 agent_def: None,
2068 });
2069
2070 let config = zeph_config::OrchestrationConfig {
2073 dependency_context_budget: 500,
2074 ..make_config()
2075 };
2076 let scheduler = DagScheduler::new(
2077 graph,
2078 &config,
2079 Box::new(FirstRouter),
2080 vec![make_def("worker")],
2081 )
2082 .unwrap();
2083
2084 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2086 assert!(
2087 prompt.contains("日"),
2088 "Japanese characters should be in the prompt after safe truncation"
2089 );
2090 }
2091
2092 #[test]
2093 fn test_no_agent_routes_inline() {
2094 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2096 let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
2097 let actions = scheduler.tick();
2098 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
2099 assert!(
2100 actions
2101 .iter()
2102 .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
2103 );
2104 }
2105
2106 #[test]
2107 fn test_stale_event_rejected() {
2108 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2110 let mut scheduler = make_scheduler(graph);
2111
2112 scheduler.graph.tasks[0].status = TaskStatus::Running;
2114 scheduler.running.insert(
2115 TaskId(0),
2116 RunningTask {
2117 agent_handle_id: "current-handle".to_string(),
2118 agent_def_name: "worker".to_string(),
2119 started_at: Instant::now(),
2120 },
2121 );
2122
2123 let stale_event = TaskEvent {
2125 task_id: TaskId(0),
2126 agent_handle_id: "old-handle".to_string(),
2127 outcome: TaskOutcome::Completed {
2128 output: "stale output".to_string(),
2129 artifacts: vec![],
2130 },
2131 };
2132 scheduler.buffered_events.push_back(stale_event);
2133 let actions = scheduler.tick();
2134
2135 assert_ne!(
2137 scheduler.graph.tasks[0].status,
2138 TaskStatus::Completed,
2139 "stale event must not complete the task"
2140 );
2141 let has_done = actions
2143 .iter()
2144 .any(|a| matches!(a, SchedulerAction::Done { .. }));
2145 assert!(
2146 !has_done,
2147 "no Done action should be emitted for a stale event"
2148 );
2149 assert!(
2151 scheduler.running.contains_key(&TaskId(0)),
2152 "running task must remain after stale event"
2153 );
2154 }
2155
2156 #[test]
2157 fn test_build_prompt_chars_count_in_truncation_message() {
2158 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2162 graph.tasks[0].status = TaskStatus::Completed;
2163 let output = "x".repeat(200);
2166 graph.tasks[0].result = Some(TaskResult {
2167 output,
2168 artifacts: vec![],
2169 duration_ms: 10,
2170 agent_id: None,
2171 agent_def: None,
2172 });
2173
2174 let config = zeph_config::OrchestrationConfig {
2175 dependency_context_budget: 10, ..make_config()
2177 };
2178 let scheduler = DagScheduler::new(
2179 graph,
2180 &config,
2181 Box::new(FirstRouter),
2182 vec![make_def("worker")],
2183 )
2184 .unwrap();
2185
2186 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2187 assert!(
2189 prompt.contains("chars total"),
2190 "truncation message must use 'chars total' label. Prompt: {prompt}"
2191 );
2192 assert!(
2193 prompt.contains("[truncated:"),
2194 "prompt must contain truncation notice. Prompt: {prompt}"
2195 );
2196 }
2197
2198 #[test]
2201 fn test_resume_from_accepts_paused_graph() {
2202 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2203 graph.status = GraphStatus::Paused;
2204 graph.tasks[0].status = TaskStatus::Pending;
2205
2206 let scheduler =
2207 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2208 .expect("resume_from should accept Paused graph");
2209 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2210 }
2211
2212 #[test]
2213 fn test_resume_from_accepts_failed_graph() {
2214 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2215 graph.status = GraphStatus::Failed;
2216 graph.tasks[0].status = TaskStatus::Failed;
2217
2218 let scheduler =
2219 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2220 .expect("resume_from should accept Failed graph");
2221 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2222 }
2223
2224 #[test]
2225 fn test_resume_from_rejects_completed_graph() {
2226 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2227 graph.status = GraphStatus::Completed;
2228
2229 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2230 .unwrap_err();
2231 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2232 }
2233
2234 #[test]
2235 fn test_resume_from_rejects_canceled_graph() {
2236 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2237 graph.status = GraphStatus::Canceled;
2238
2239 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2240 .unwrap_err();
2241 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2242 }
2243
2244 #[test]
2245 fn test_resume_from_reconstructs_running_tasks() {
2246 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2248 graph.status = GraphStatus::Paused;
2249 graph.tasks[0].status = TaskStatus::Running;
2250 graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2251 graph.tasks[0].agent_hint = Some("worker".to_string());
2252 graph.tasks[1].status = TaskStatus::Pending;
2253
2254 let scheduler =
2255 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2256 .expect("should succeed");
2257
2258 assert!(
2259 scheduler.running.contains_key(&TaskId(0)),
2260 "Running task must be reconstructed in the running map (IC1)"
2261 );
2262 assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2263 assert!(
2264 !scheduler.running.contains_key(&TaskId(1)),
2265 "Pending task must not appear in running map"
2266 );
2267 }
2268
2269 #[test]
2270 fn test_resume_from_sets_status_running() {
2271 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2273 graph.status = GraphStatus::Paused;
2274
2275 let scheduler =
2276 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2277 .unwrap();
2278 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2279 }
2280
2281 #[test]
2284 fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2285 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2288 let mut scheduler = make_scheduler(graph);
2289 scheduler.graph.tasks[0].status = TaskStatus::Running;
2290
2291 assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2292
2293 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2294 scheduler.record_spawn_failure(TaskId(0), &error);
2295 scheduler.record_batch_backoff(false, true);
2297 assert_eq!(
2298 scheduler.consecutive_spawn_failures, 1,
2299 "first deferral tick: consecutive_spawn_failures must be 1"
2300 );
2301
2302 scheduler.graph.tasks[0].status = TaskStatus::Running;
2303 scheduler.record_spawn_failure(TaskId(0), &error);
2304 scheduler.record_batch_backoff(false, true);
2305 assert_eq!(
2306 scheduler.consecutive_spawn_failures, 2,
2307 "second deferral tick: consecutive_spawn_failures must be 2"
2308 );
2309
2310 scheduler.graph.tasks[0].status = TaskStatus::Running;
2311 scheduler.record_spawn_failure(TaskId(0), &error);
2312 scheduler.record_batch_backoff(false, true);
2313 assert_eq!(
2314 scheduler.consecutive_spawn_failures, 3,
2315 "third deferral tick: consecutive_spawn_failures must be 3"
2316 );
2317 }
2318
2319 #[test]
2320 fn test_consecutive_spawn_failures_resets_on_success() {
2321 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2324 let mut scheduler = make_scheduler(graph);
2325 scheduler.graph.tasks[0].status = TaskStatus::Running;
2326
2327 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2328 scheduler.record_spawn_failure(TaskId(0), &error);
2329 scheduler.record_batch_backoff(false, true);
2330 scheduler.graph.tasks[0].status = TaskStatus::Running;
2331 scheduler.record_spawn_failure(TaskId(0), &error);
2332 scheduler.record_batch_backoff(false, true);
2333 assert_eq!(scheduler.consecutive_spawn_failures, 2);
2334
2335 scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2337 assert_eq!(
2338 scheduler.consecutive_spawn_failures, 0,
2339 "record_spawn must reset consecutive_spawn_failures to 0"
2340 );
2341 }
2342
2343 #[tokio::test]
2344 async fn test_exponential_backoff_duration() {
2345 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2348 let config = zeph_config::OrchestrationConfig {
2349 deferral_backoff_ms: 50,
2350 ..make_config()
2351 };
2352 let mut scheduler = DagScheduler::new(
2353 graph,
2354 &config,
2355 Box::new(FirstRouter),
2356 vec![make_def("worker")],
2357 )
2358 .unwrap();
2359
2360 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2362 let start = tokio::time::Instant::now();
2363 scheduler.wait_event().await;
2364 let elapsed0 = start.elapsed();
2365 assert!(
2366 elapsed0.as_millis() >= 50,
2367 "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2368 elapsed0.as_millis()
2369 );
2370
2371 scheduler.consecutive_spawn_failures = 3;
2373 let start = tokio::time::Instant::now();
2374 scheduler.wait_event().await;
2375 let elapsed3 = start.elapsed();
2376 assert!(
2377 elapsed3.as_millis() >= 400,
2378 "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2379 elapsed3.as_millis()
2380 );
2381
2382 scheduler.consecutive_spawn_failures = 20;
2384 let start = tokio::time::Instant::now();
2385 scheduler.wait_event().await;
2386 let elapsed_capped = start.elapsed();
2387 assert!(
2388 elapsed_capped.as_millis() >= 5000,
2389 "backoff must be capped at 5000ms with high deferrals, got {}ms",
2390 elapsed_capped.as_millis()
2391 );
2392 }
2393
2394 #[tokio::test]
2397 async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2398 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2401 let config = zeph_config::OrchestrationConfig {
2402 deferral_backoff_ms: 50,
2403 ..make_config()
2404 };
2405 let mut scheduler = DagScheduler::new(
2406 graph,
2407 &config,
2408 Box::new(FirstRouter),
2409 vec![make_def("worker")],
2410 )
2411 .unwrap();
2412
2413 assert!(scheduler.running.is_empty());
2415
2416 let start = tokio::time::Instant::now();
2417 scheduler.wait_event().await;
2418 let elapsed = start.elapsed();
2419
2420 assert!(
2421 elapsed.as_millis() >= 50,
2422 "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2423 elapsed.as_millis()
2424 );
2425 }
2426
2427 #[test]
2428 fn test_current_deferral_backoff_exponential_growth() {
2429 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2432 let config = zeph_config::OrchestrationConfig {
2433 deferral_backoff_ms: 250,
2434 ..make_config()
2435 };
2436 let mut scheduler = DagScheduler::new(
2437 graph,
2438 &config,
2439 Box::new(FirstRouter),
2440 vec![make_def("worker")],
2441 )
2442 .unwrap();
2443
2444 assert_eq!(
2445 scheduler.current_deferral_backoff(),
2446 Duration::from_millis(250)
2447 );
2448
2449 scheduler.consecutive_spawn_failures = 1;
2450 assert_eq!(
2451 scheduler.current_deferral_backoff(),
2452 Duration::from_millis(500)
2453 );
2454
2455 scheduler.consecutive_spawn_failures = 2;
2456 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2457
2458 scheduler.consecutive_spawn_failures = 3;
2459 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2460
2461 scheduler.consecutive_spawn_failures = 4;
2462 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2463
2464 scheduler.consecutive_spawn_failures = 5;
2466 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2467
2468 scheduler.consecutive_spawn_failures = 100;
2469 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2470 }
2471
2472 #[test]
2473 fn test_record_spawn_resets_consecutive_failures() {
2474 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2476 let mut scheduler = DagScheduler::new(
2477 graph,
2478 &make_config(),
2479 Box::new(FirstRouter),
2480 vec![make_def("worker")],
2481 )
2482 .unwrap();
2483
2484 scheduler.consecutive_spawn_failures = 3;
2485 let task_id = TaskId(0);
2486 scheduler.graph.tasks[0].status = TaskStatus::Running;
2487 scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2488
2489 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2490 }
2491
2492 #[test]
2493 fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2494 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2497 let mut scheduler = DagScheduler::new(
2498 graph,
2499 &make_config(),
2500 Box::new(FirstRouter),
2501 vec![make_def("worker")],
2502 )
2503 .unwrap();
2504
2505 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2506 let task_id = TaskId(0);
2507 scheduler.graph.tasks[0].status = TaskStatus::Running;
2508
2509 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2510 scheduler.record_spawn_failure(task_id, &error);
2511
2512 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2514 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2516 }
2517
2518 #[test]
2521 fn test_parallel_dispatch_all_ready() {
2522 let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2525 let graph = graph_from_nodes(nodes);
2526 let config = zeph_config::OrchestrationConfig {
2527 max_parallel: 2,
2528 ..make_config()
2529 };
2530 let mut scheduler = DagScheduler::new(
2531 graph,
2532 &config,
2533 Box::new(FirstRouter),
2534 vec![make_def("worker")],
2535 )
2536 .unwrap();
2537
2538 let actions = scheduler.tick();
2539 let spawn_count = actions
2540 .iter()
2541 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2542 .count();
2543 assert_eq!(
2544 spawn_count, 2,
2545 "only max_parallel=2 tasks dispatched per tick"
2546 );
2547
2548 let running_count = scheduler
2549 .graph
2550 .tasks
2551 .iter()
2552 .filter(|t| t.status == TaskStatus::Running)
2553 .count();
2554 assert_eq!(running_count, 2, "only 2 tasks marked Running");
2555 }
2556
2557 #[test]
2558 fn test_batch_backoff_partial_success() {
2559 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2561 let mut scheduler = make_scheduler(graph);
2562 scheduler.consecutive_spawn_failures = 3;
2563
2564 scheduler.record_batch_backoff(true, true);
2565 assert_eq!(
2566 scheduler.consecutive_spawn_failures, 0,
2567 "any success in batch must reset counter"
2568 );
2569 }
2570
2571 #[test]
2572 fn test_batch_backoff_all_failed() {
2573 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2575 let mut scheduler = make_scheduler(graph);
2576 scheduler.consecutive_spawn_failures = 2;
2577
2578 scheduler.record_batch_backoff(false, true);
2579 assert_eq!(
2580 scheduler.consecutive_spawn_failures, 3,
2581 "all-failure tick must increment counter"
2582 );
2583 }
2584
2585 #[test]
2586 fn test_batch_backoff_no_spawns() {
2587 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2589 let mut scheduler = make_scheduler(graph);
2590 scheduler.consecutive_spawn_failures = 5;
2591
2592 scheduler.record_batch_backoff(false, false);
2593 assert_eq!(
2594 scheduler.consecutive_spawn_failures, 5,
2595 "no spawns must not change counter"
2596 );
2597 }
2598
2599 #[test]
2600 fn test_buffer_guard_uses_task_count() {
2601 let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2611 let graph = graph_from_nodes(nodes);
2612 let config = zeph_config::OrchestrationConfig {
2613 max_parallel: 2, ..make_config()
2615 };
2616 let scheduler = DagScheduler::new(
2617 graph,
2618 &config,
2619 Box::new(FirstRouter),
2620 vec![make_def("worker")],
2621 )
2622 .unwrap();
2623 assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2625 assert_eq!(scheduler.max_parallel * 2, 4);
2626 }
2627
2628 #[test]
2629 fn test_batch_mixed_concurrency_and_fatal_failure() {
2630 let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2638 nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2640 let graph = graph_from_nodes(nodes);
2641 let mut scheduler = make_scheduler(graph);
2642
2643 scheduler.graph.tasks[0].status = TaskStatus::Running;
2645 scheduler.graph.tasks[1].status = TaskStatus::Running;
2646
2647 let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2649 let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2650 assert!(
2651 actions0.is_empty(),
2652 "ConcurrencyLimit must produce no extra actions"
2653 );
2654 assert_eq!(
2655 scheduler.graph.tasks[0].status,
2656 TaskStatus::Ready,
2657 "task 0 must revert to Ready"
2658 );
2659
2660 let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2663 let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2664 assert_eq!(
2665 scheduler.graph.tasks[1].status,
2666 TaskStatus::Skipped,
2667 "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2668 );
2669 assert!(
2671 actions1
2672 .iter()
2673 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2674 "no Done action expected: task 0 is still Ready"
2675 );
2676
2677 scheduler.consecutive_spawn_failures = 0;
2679 scheduler.record_batch_backoff(false, true);
2680 assert_eq!(
2681 scheduler.consecutive_spawn_failures, 1,
2682 "batch with only ConcurrencyLimit must increment counter"
2683 );
2684 }
2685
2686 #[test]
2690 fn test_deadlock_marks_non_terminal_tasks_canceled() {
2691 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2696 nodes[0].status = TaskStatus::Failed;
2697 nodes[1].status = TaskStatus::Pending;
2698 nodes[2].status = TaskStatus::Pending;
2699
2700 let mut graph = graph_from_nodes(nodes);
2701 graph.status = GraphStatus::Failed;
2702
2703 let mut scheduler = DagScheduler::resume_from(
2704 graph,
2705 &make_config(),
2706 Box::new(FirstRouter),
2707 vec![make_def("worker")],
2708 )
2709 .unwrap();
2710
2711 let actions = scheduler.tick();
2713
2714 assert!(
2716 actions.iter().any(|a| matches!(
2717 a,
2718 SchedulerAction::Done {
2719 status: GraphStatus::Failed
2720 }
2721 )),
2722 "deadlock must emit Done(Failed); got: {actions:?}"
2723 );
2724 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2725
2726 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2728 assert_eq!(
2730 scheduler.graph.tasks[1].status,
2731 TaskStatus::Canceled,
2732 "Pending task must be Canceled on deadlock"
2733 );
2734 assert_eq!(
2736 scheduler.graph.tasks[2].status,
2737 TaskStatus::Canceled,
2738 "Pending task must be Canceled on deadlock"
2739 );
2740 }
2741
2742 #[test]
2745 fn test_deadlock_not_triggered_when_task_running() {
2746 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2749 nodes[0].status = TaskStatus::Running;
2750 nodes[0].assigned_agent = Some("handle-1".into());
2751 nodes[1].status = TaskStatus::Pending;
2752
2753 let mut graph = graph_from_nodes(nodes);
2754 graph.status = GraphStatus::Failed;
2755
2756 let mut scheduler = DagScheduler::resume_from(
2757 graph,
2758 &make_config(),
2759 Box::new(FirstRouter),
2760 vec![make_def("worker")],
2761 )
2762 .unwrap();
2763
2764 let actions = scheduler.tick();
2765
2766 assert!(
2768 actions
2769 .iter()
2770 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2771 "no Done action expected when a task is running; got: {actions:?}"
2772 );
2773 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2774 }
2775
2776 #[test]
2779 fn topology_linear_chain_limits_parallelism_to_one() {
2780 let graph = graph_from_nodes(vec![
2783 make_node(0, &[]),
2784 make_node(1, &[0]),
2785 make_node(2, &[1]),
2786 ]);
2787 let config = zeph_config::OrchestrationConfig {
2788 topology_selection: true,
2789 max_parallel: 4,
2790 ..make_config()
2791 };
2792 let mut scheduler = DagScheduler::new(
2793 graph,
2794 &config,
2795 Box::new(FirstRouter),
2796 vec![make_def("worker")],
2797 )
2798 .unwrap();
2799
2800 assert_eq!(
2801 scheduler.topology().topology,
2802 crate::topology::Topology::LinearChain
2803 );
2804 assert_eq!(scheduler.max_parallel, 1);
2805
2806 let actions = scheduler.tick();
2807 let spawn_count = actions
2808 .iter()
2809 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2810 .count();
2811 assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2812 }
2813
2814 #[test]
2815 fn topology_all_parallel_dispatches_all_ready() {
2816 let graph = graph_from_nodes(vec![
2819 make_node(0, &[]),
2820 make_node(1, &[]),
2821 make_node(2, &[]),
2822 make_node(3, &[]),
2823 ]);
2824 let config = zeph_config::OrchestrationConfig {
2825 topology_selection: true,
2826 max_parallel: 4,
2827 ..make_config()
2828 };
2829 let mut scheduler = DagScheduler::new(
2830 graph,
2831 &config,
2832 Box::new(FirstRouter),
2833 vec![make_def("worker")],
2834 )
2835 .unwrap();
2836
2837 assert_eq!(
2838 scheduler.topology().topology,
2839 crate::topology::Topology::AllParallel
2840 );
2841
2842 let actions = scheduler.tick();
2843 let spawn_count = actions
2844 .iter()
2845 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2846 .count();
2847 assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2848 }
2849
2850 #[test]
2851 fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2852 use crate::graph::ExecutionMode;
2855
2856 let mut a = make_node(0, &[]);
2857 a.execution_mode = ExecutionMode::Sequential;
2858 let mut b = make_node(1, &[]);
2859 b.execution_mode = ExecutionMode::Sequential;
2860 let mut c = make_node(2, &[]);
2861 c.execution_mode = ExecutionMode::Parallel;
2862
2863 let graph = graph_from_nodes(vec![a, b, c]);
2864 let config = zeph_config::OrchestrationConfig {
2865 max_parallel: 4,
2866 ..make_config()
2867 };
2868 let mut scheduler = DagScheduler::new(
2869 graph,
2870 &config,
2871 Box::new(FirstRouter),
2872 vec![make_def("worker")],
2873 )
2874 .unwrap();
2875
2876 let actions = scheduler.tick();
2877 let spawned: Vec<TaskId> = actions
2878 .iter()
2879 .filter_map(|a| {
2880 if let SchedulerAction::Spawn { task_id, .. } = a {
2881 Some(*task_id)
2882 } else {
2883 None
2884 }
2885 })
2886 .collect();
2887
2888 assert!(
2890 spawned.contains(&TaskId(0)),
2891 "A(sequential) must be dispatched"
2892 );
2893 assert!(
2894 spawned.contains(&TaskId(2)),
2895 "C(parallel) must be dispatched"
2896 );
2897 assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2898 assert_eq!(spawned.len(), 2);
2899 }
2900
2901 #[test]
2904 fn test_inject_tasks_per_task_cap_skips_second() {
2905 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2907 let mut scheduler = make_scheduler(graph);
2908
2909 let first = make_node(2, &[]);
2910 scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2911 assert_eq!(
2912 scheduler.graph.tasks.len(),
2913 3,
2914 "first inject must append the task"
2915 );
2916 assert_eq!(scheduler.global_replan_count, 1);
2917
2918 let second = make_node(3, &[]);
2920 scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2921 assert_eq!(
2922 scheduler.graph.tasks.len(),
2923 3,
2924 "second inject must be silently skipped (per-task cap)"
2925 );
2926 assert_eq!(
2927 scheduler.global_replan_count, 1,
2928 "global counter must not increment on skipped inject"
2929 );
2930 }
2931
2932 #[test]
2933 fn test_inject_tasks_global_cap_skips_when_exhausted() {
2934 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2936 let mut config = make_config();
2937 config.max_replans = 1;
2938 let defs = vec![make_def("worker")];
2939 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2940
2941 let new1 = make_node(2, &[]);
2942 scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2943 assert_eq!(scheduler.global_replan_count, 1);
2944
2945 let new2 = make_node(3, &[]);
2947 scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2948 assert_eq!(
2949 scheduler.graph.tasks.len(),
2950 3,
2951 "global cap must prevent the second inject"
2952 );
2953 assert_eq!(
2954 scheduler.global_replan_count, 1,
2955 "global counter must not increment past cap"
2956 );
2957 }
2958
2959 #[test]
2960 fn test_inject_tasks_sets_topology_dirty() {
2961 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2963 let mut scheduler = make_scheduler(graph);
2964 assert!(
2965 !scheduler.topology_dirty,
2966 "topology_dirty must be false initially"
2967 );
2968
2969 let new_task = make_node(1, &[]);
2970 scheduler
2971 .inject_tasks(TaskId(0), vec![new_task], 20)
2972 .unwrap();
2973 assert!(
2974 scheduler.topology_dirty,
2975 "inject_tasks must set topology_dirty=true"
2976 );
2977
2978 scheduler.tick();
2979 assert!(
2980 !scheduler.topology_dirty,
2981 "tick() must clear topology_dirty after re-analysis"
2982 );
2983 }
2984
2985 #[test]
2986 fn test_inject_tasks_rejects_cycle() {
2987 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2989 let mut scheduler = make_scheduler(graph);
2990
2991 let cyclic_task = make_node(1, &[1]);
2993 let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
2994 assert!(result.is_err(), "cyclic injection must return an error");
2995 assert!(
2996 matches!(
2997 result.unwrap_err(),
2998 OrchestrationError::VerificationFailed(_)
2999 ),
3000 "must return VerificationFailed for cycle"
3001 );
3002 assert_eq!(scheduler.global_replan_count, 0);
3004 assert!(
3005 !scheduler.topology_dirty,
3006 "topology_dirty must not be set when inject fails"
3007 );
3008 }
3009
3010 fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
3013 zeph_config::OrchestrationConfig {
3014 topology_selection: true,
3015 max_parallel: 4,
3016 ..make_config()
3017 }
3018 }
3019
3020 fn make_hierarchical_graph() -> TaskGraph {
3022 graph_from_nodes(vec![
3023 make_node(0, &[]),
3024 make_node(1, &[0]),
3025 make_node(2, &[0]),
3026 make_node(3, &[1]),
3027 ])
3028 }
3029
3030 #[test]
3031 fn test_level_barrier_advances_on_terminal_level() {
3032 let graph = make_hierarchical_graph();
3035 let config = make_hierarchical_config();
3036 let defs = vec![make_def("worker")];
3037 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3038
3039 assert_eq!(
3040 scheduler.topology().strategy,
3041 crate::topology::DispatchStrategy::LevelBarrier,
3042 "must use LevelBarrier strategy for Hierarchical graph"
3043 );
3044 assert_eq!(scheduler.current_level, 0);
3045
3046 let actions = scheduler.tick();
3048 let spawned_ids: Vec<_> = actions
3049 .iter()
3050 .filter_map(|a| {
3051 if let SchedulerAction::Spawn { task_id, .. } = a {
3052 Some(*task_id)
3053 } else {
3054 None
3055 }
3056 })
3057 .collect();
3058 assert_eq!(
3059 spawned_ids,
3060 vec![TaskId(0)],
3061 "first tick must dispatch only A at level 0"
3062 );
3063
3064 scheduler.graph.tasks[0].status = TaskStatus::Completed;
3066 scheduler.running.clear();
3067 scheduler.graph.tasks[1].status = TaskStatus::Ready;
3068 scheduler.graph.tasks[2].status = TaskStatus::Ready;
3069
3070 let actions2 = scheduler.tick();
3072 assert_eq!(
3073 scheduler.current_level, 1,
3074 "current_level must advance to 1 after level-0 tasks terminate"
3075 );
3076 let spawned2: Vec<_> = actions2
3077 .iter()
3078 .filter_map(|a| {
3079 if let SchedulerAction::Spawn { task_id, .. } = a {
3080 Some(*task_id)
3081 } else {
3082 None
3083 }
3084 })
3085 .collect();
3086 assert!(
3087 spawned2.contains(&TaskId(1)),
3088 "B must be dispatched after level advance"
3089 );
3090 assert!(
3091 spawned2.contains(&TaskId(2)),
3092 "C must be dispatched after level advance"
3093 );
3094 }
3095
3096 #[test]
3097 fn test_level_barrier_failure_propagates_transitively() {
3098 let graph = make_hierarchical_graph();
3101 let config = make_hierarchical_config();
3102 let defs = vec![make_def("worker")];
3103 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3104
3105 scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
3107 scheduler.graph.tasks[0].status = TaskStatus::Running;
3108 scheduler.running.insert(
3109 TaskId(0),
3110 RunningTask {
3111 agent_handle_id: "h0".to_string(),
3112 agent_def_name: "worker".to_string(),
3113 started_at: Instant::now(),
3114 },
3115 );
3116
3117 scheduler.buffered_events.push_back(TaskEvent {
3119 task_id: TaskId(0),
3120 agent_handle_id: "h0".to_string(),
3121 outcome: TaskOutcome::Failed {
3122 error: "simulated failure".to_string(),
3123 },
3124 });
3125
3126 scheduler.tick();
3127
3128 assert_eq!(
3130 scheduler.graph.tasks[0].status,
3131 TaskStatus::Skipped,
3132 "A must be Skipped (Skip strategy)"
3133 );
3134 assert_eq!(
3135 scheduler.graph.tasks[1].status,
3136 TaskStatus::Skipped,
3137 "B must be transitively Skipped"
3138 );
3139 assert_eq!(
3140 scheduler.graph.tasks[2].status,
3141 TaskStatus::Skipped,
3142 "C must be transitively Skipped"
3143 );
3144 assert_eq!(
3145 scheduler.graph.tasks[3].status,
3146 TaskStatus::Skipped,
3147 "D must be transitively Skipped"
3148 );
3149 }
3150
3151 #[test]
3152 fn test_level_barrier_current_level_reset_after_inject() {
3153 let graph = make_hierarchical_graph(); let config = make_hierarchical_config();
3157 let defs = vec![make_def("worker")];
3158 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3159
3160 scheduler.graph.tasks[0].status = TaskStatus::Completed; scheduler.graph.tasks[1].status = TaskStatus::Completed; scheduler.graph.tasks[2].status = TaskStatus::Completed; scheduler.current_level = 2;
3166
3167 let e = make_node(4, &[0]);
3170 scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3171 assert!(scheduler.topology_dirty);
3172
3173 scheduler.tick();
3176 assert_eq!(
3177 scheduler.current_level, 1,
3178 "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3179 );
3180 }
3181
3182 #[test]
3183 fn resume_from_preserves_topology_classification() {
3184 let mut graph = graph_from_nodes(vec![
3186 make_node(0, &[]),
3187 make_node(1, &[0]),
3188 make_node(2, &[1]),
3189 ]);
3190 graph.status = GraphStatus::Paused;
3192 graph.tasks[0].status = TaskStatus::Completed;
3193 graph.tasks[1].status = TaskStatus::Pending;
3194 graph.tasks[2].status = TaskStatus::Pending;
3195
3196 let config = zeph_config::OrchestrationConfig {
3197 topology_selection: true,
3198 max_parallel: 4,
3199 ..make_config()
3200 };
3201 let scheduler = DagScheduler::resume_from(
3202 graph,
3203 &config,
3204 Box::new(FirstRouter),
3205 vec![make_def("worker")],
3206 )
3207 .unwrap();
3208
3209 assert_eq!(
3210 scheduler.topology().topology,
3211 crate::topology::Topology::LinearChain,
3212 "resume_from must classify topology"
3213 );
3214 assert_eq!(
3215 scheduler.max_parallel, 1,
3216 "resume_from must apply topology limit"
3217 );
3218 }
3219
3220 fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3223 zeph_config::OrchestrationConfig {
3224 verify_completeness: true,
3225 verify_provider: zeph_config::ProviderName::new(provider),
3226 ..make_config()
3227 }
3228 }
3229
3230 #[test]
3231 fn validate_verify_config_unknown_provider_returns_err() {
3232 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3233 let config = make_verify_config("nonexistent");
3234 let scheduler = DagScheduler::new(
3235 graph,
3236 &config,
3237 Box::new(FirstRouter),
3238 vec![make_def("worker")],
3239 )
3240 .unwrap();
3241 let result = scheduler.validate_verify_config(&["fast", "quality"]);
3242 assert!(result.is_err());
3243 let err_msg = result.unwrap_err().to_string();
3244 assert!(err_msg.contains("nonexistent"));
3245 assert!(err_msg.contains("fast"));
3246 }
3247
3248 #[test]
3249 fn validate_verify_config_known_provider_returns_ok() {
3250 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3251 let config = make_verify_config("fast");
3252 let scheduler = DagScheduler::new(
3253 graph,
3254 &config,
3255 Box::new(FirstRouter),
3256 vec![make_def("worker")],
3257 )
3258 .unwrap();
3259 assert!(
3260 scheduler
3261 .validate_verify_config(&["fast", "quality"])
3262 .is_ok()
3263 );
3264 }
3265
3266 #[test]
3267 fn validate_verify_config_empty_provider_always_ok() {
3268 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3269 let config = make_verify_config("");
3270 let scheduler = DagScheduler::new(
3271 graph,
3272 &config,
3273 Box::new(FirstRouter),
3274 vec![make_def("worker")],
3275 )
3276 .unwrap();
3277 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3278 }
3279
3280 #[test]
3281 fn validate_verify_config_disabled_skips_validation() {
3282 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3283 let scheduler = make_scheduler(graph);
3285 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3286 }
3287
3288 #[test]
3289 fn validate_verify_config_empty_pool_skips_validation() {
3290 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3291 let config = make_verify_config("nonexistent");
3292 let scheduler = DagScheduler::new(
3293 graph,
3294 &config,
3295 Box::new(FirstRouter),
3296 vec![make_def("worker")],
3297 )
3298 .unwrap();
3299 assert!(scheduler.validate_verify_config(&[]).is_ok());
3301 }
3302
3303 #[test]
3304 fn validate_verify_config_trims_whitespace_in_config() {
3305 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3306 let config = make_verify_config(" fast ");
3308 let scheduler = DagScheduler::new(
3309 graph,
3310 &config,
3311 Box::new(FirstRouter),
3312 vec![make_def("worker")],
3313 )
3314 .unwrap();
3315 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3316 }
3317
3318 #[test]
3321 fn config_max_parallel_initialized_from_config() {
3322 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3325 let config = zeph_config::OrchestrationConfig {
3326 topology_selection: true,
3327 max_parallel: 6,
3328 ..make_config()
3329 };
3330 let scheduler = DagScheduler::new(
3331 graph,
3332 &config,
3333 Box::new(FirstRouter),
3334 vec![make_def("worker")],
3335 )
3336 .unwrap();
3337
3338 assert_eq!(
3339 scheduler.config_max_parallel, 6,
3340 "config_max_parallel must equal config.max_parallel"
3341 );
3342 assert_eq!(
3344 scheduler.max_parallel, 1,
3345 "max_parallel reduced by topology analysis"
3346 );
3347 assert_eq!(
3348 scheduler.config_max_parallel, 6,
3349 "config_max_parallel must not be reduced by topology"
3350 );
3351 }
3352
3353 #[test]
3354 fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3355 let graph = graph_from_nodes(vec![
3367 make_node(0, &[]),
3368 make_node(1, &[0]),
3369 make_node(2, &[0]),
3370 make_node(3, &[1, 2]), ]);
3372 let config = zeph_config::OrchestrationConfig {
3373 topology_selection: true,
3374 max_parallel: 4,
3375 max_tasks: 50,
3376 ..make_config()
3377 };
3378 let mut scheduler = DagScheduler::new(
3379 graph,
3380 &config,
3381 Box::new(FirstRouter),
3382 vec![make_def("worker")],
3383 )
3384 .unwrap();
3385
3386 assert_eq!(
3388 scheduler.topology().topology,
3389 crate::topology::Topology::Mixed,
3390 "initial topology must be Mixed"
3391 );
3392 let expected_max_parallel = (4usize / 2 + 1).clamp(1, 4); assert_eq!(scheduler.max_parallel, expected_max_parallel);
3394
3395 let extra_task_id = 4u32;
3398 let extra_task = {
3399 let mut n = crate::graph::TaskNode::new(
3400 extra_task_id,
3401 "extra".to_string(),
3402 "extra task injected by replan",
3403 );
3404 n.depends_on = vec![TaskId(3)];
3405 n
3406 };
3407
3408 scheduler.graph.tasks[3].status = TaskStatus::Completed;
3410
3411 scheduler
3412 .inject_tasks(TaskId(3), vec![extra_task], 50)
3413 .expect("inject must succeed");
3414 assert!(
3415 scheduler.topology_dirty,
3416 "topology_dirty must be true after inject"
3417 );
3418
3419 let _ = scheduler.tick();
3421 let max_after_first_inject = scheduler.max_parallel;
3422 assert_eq!(
3423 max_after_first_inject, expected_max_parallel,
3424 "max_parallel must not drift after first inject+tick"
3425 );
3426
3427 let extra_task2 = {
3429 let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3430 n.depends_on = vec![TaskId(extra_task_id)];
3431 n
3432 };
3433 scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3434 scheduler
3437 .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3438 .expect("second inject must succeed");
3439
3440 let _ = scheduler.tick();
3441 let max_after_second_inject = scheduler.max_parallel;
3442 assert_eq!(
3443 max_after_second_inject, expected_max_parallel,
3444 "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3445 );
3446 }
3447
3448 #[test]
3451 fn completeness_threshold_returns_config_value() {
3452 let mut config = make_config();
3453 config.completeness_threshold = 0.85;
3454 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3455 let scheduler =
3456 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3457 assert!((scheduler.completeness_threshold() - 0.85).abs() < f32::EPSILON);
3458 }
3459
3460 #[test]
3461 fn completeness_threshold_default_is_0_7() {
3462 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3463 let scheduler = make_scheduler(graph);
3464 assert!((scheduler.completeness_threshold() - 0.7).abs() < f32::EPSILON);
3465 }
3466
3467 #[test]
3468 fn verify_provider_name_returns_config_value() {
3469 let mut config = make_config();
3470 config.verify_provider = zeph_config::ProviderName::new("fast");
3471 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3472 let scheduler =
3473 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3474 assert_eq!(scheduler.verify_provider_name(), "fast");
3475 }
3476
3477 #[test]
3478 fn verify_provider_name_empty_when_not_set() {
3479 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3480 let scheduler = make_scheduler(graph);
3481 assert_eq!(scheduler.verify_provider_name(), "");
3482 }
3483
3484 #[test]
3485 fn max_replans_remaining_initial_equals_max_replans() {
3486 let mut config = make_config();
3487 config.max_replans = 3;
3488 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3489 let scheduler =
3490 DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
3491 assert_eq!(scheduler.max_replans_remaining(), 3);
3492 }
3493
3494 #[test]
3495 fn max_replans_remaining_decrements_after_record() {
3496 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3497 let mut scheduler = make_scheduler(graph);
3498 assert_eq!(scheduler.max_replans_remaining(), 2);
3499 scheduler.record_whole_plan_replan();
3500 assert_eq!(scheduler.max_replans_remaining(), 1);
3501 scheduler.record_whole_plan_replan();
3502 assert_eq!(scheduler.max_replans_remaining(), 0);
3503 scheduler.record_whole_plan_replan();
3505 assert_eq!(scheduler.max_replans_remaining(), 0);
3506 }
3507
3508 #[test]
3509 fn record_whole_plan_replan_does_not_modify_graph() {
3510 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3511 let mut scheduler = make_scheduler(graph);
3512 let task_count_before = scheduler.graph().tasks.len();
3513 scheduler.record_whole_plan_replan();
3514 assert_eq!(
3515 scheduler.graph().tasks.len(),
3516 task_count_before,
3517 "record_whole_plan_replan must not modify the task graph"
3518 );
3519 }
3520
3521 fn make_cascade_config() -> zeph_config::OrchestrationConfig {
3524 zeph_config::OrchestrationConfig {
3525 topology_selection: true,
3526 cascade_routing: true,
3527 cascade_failure_threshold: 0.4,
3528 max_parallel: 4,
3529 ..make_config()
3530 }
3531 }
3532
3533 #[test]
3534 fn inject_tasks_resets_cascade_detector() {
3535 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3538 graph.tasks[0].status = TaskStatus::Completed;
3539 graph.tasks[1].status = TaskStatus::Completed;
3540 let config = make_cascade_config();
3541 let mut scheduler = DagScheduler::new(
3542 graph,
3543 &config,
3544 Box::new(FirstRouter),
3545 vec![make_def("worker")],
3546 )
3547 .unwrap();
3548
3549 if let Some(ref mut det) = scheduler.cascade_detector {
3551 let g = &scheduler.graph;
3552 det.record_outcome(TaskId(1), false, g);
3553 assert_eq!(det.region_health().len(), 1);
3555 } else {
3556 panic!(
3557 "cascade_detector must be Some when cascade_routing=true and topology_selection=true"
3558 );
3559 }
3560
3561 let new_task = make_node(2, &[1]);
3563 scheduler
3564 .inject_tasks(TaskId(1), vec![new_task], 20)
3565 .unwrap();
3566
3567 assert!(
3568 scheduler
3569 .cascade_detector
3570 .as_ref()
3571 .is_some_and(|d| d.region_health().is_empty()),
3572 "cascade_detector must be cleared after inject_tasks (C13 fix)"
3573 );
3574 }
3575
3576 #[test]
3577 fn sequential_tasks_not_reordered_by_cascade() {
3578 let mut graph = graph_from_nodes(vec![
3585 make_node(0, &[]), make_node(1, &[]), make_node(2, &[1]), ]);
3589 graph.tasks[2].execution_mode = ExecutionMode::Sequential;
3590 let config = make_cascade_config();
3591 let mut scheduler = DagScheduler::new(
3592 graph,
3593 &config,
3594 Box::new(FirstRouter),
3595 vec![make_def("worker")],
3596 )
3597 .unwrap();
3598
3599 if let Some(ref mut det) = scheduler.cascade_detector {
3601 let g = &scheduler.graph;
3602 det.record_outcome(TaskId(1), false, g);
3604 det.record_outcome(TaskId(2), false, g);
3605 } else {
3606 panic!("cascade_detector must be Some");
3607 }
3608
3609 let actions = scheduler.tick();
3612
3613 let spawned_ids: Vec<TaskId> = actions
3618 .iter()
3619 .filter_map(|a| {
3620 if let SchedulerAction::Spawn { task_id, .. }
3621 | SchedulerAction::RunInline { task_id, .. } = a
3622 {
3623 Some(*task_id)
3624 } else {
3625 None
3626 }
3627 })
3628 .collect();
3629
3630 assert!(
3635 !spawned_ids.is_empty(),
3636 "tick must dispatch at least one ready task; Sequential tasks must not be dropped by cascade logic"
3637 );
3638 }
3639
3640 #[test]
3641 fn cascade_routing_without_topology_selection_creates_no_detector() {
3642 let config = zeph_config::OrchestrationConfig {
3645 cascade_routing: true,
3646 topology_selection: false,
3647 ..make_config()
3648 };
3649 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3650 let scheduler = DagScheduler::new(
3651 graph,
3652 &config,
3653 Box::new(FirstRouter),
3654 vec![make_def("worker")],
3655 )
3656 .unwrap();
3657 assert!(
3658 scheduler.cascade_detector.is_none(),
3659 "cascade_detector must be None when topology_selection=false"
3660 );
3661 }
3662}