1use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{
16 ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
17};
18use super::router::AgentRouter;
19use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
20use super::verifier::inject_tasks as verifier_inject_tasks;
21use zeph_config::OrchestrationConfig;
22use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
23use zeph_subagent::{SubAgentDef, SubAgentError};
24
25#[derive(Debug)]
30pub enum SchedulerAction {
31 Spawn {
33 task_id: TaskId,
34 agent_def_name: String,
35 prompt: String,
36 },
37 Cancel { agent_handle_id: String },
39 RunInline { task_id: TaskId, prompt: String },
41 Done { status: GraphStatus },
43 Verify { task_id: TaskId, output: String },
50}
51
52#[derive(Debug)]
54pub struct TaskEvent {
55 pub task_id: TaskId,
56 pub agent_handle_id: String,
57 pub outcome: TaskOutcome,
58}
59
60#[derive(Debug)]
62pub enum TaskOutcome {
63 Completed {
65 output: String,
66 artifacts: Vec<PathBuf>,
67 },
68 Failed { error: String },
70}
71
72struct RunningTask {
74 agent_handle_id: String,
75 agent_def_name: String,
76 started_at: Instant,
77}
78
79pub struct DagScheduler {
105 graph: TaskGraph,
106 max_parallel: usize,
107 config_max_parallel: usize,
114 running: HashMap<TaskId, RunningTask>,
116 event_rx: mpsc::Receiver<TaskEvent>,
118 event_tx: mpsc::Sender<TaskEvent>,
120 task_timeout: Duration,
122 router: Box<dyn AgentRouter>,
124 available_agents: Vec<SubAgentDef>,
126 dependency_context_budget: usize,
128 buffered_events: VecDeque<TaskEvent>,
130 sanitizer: ContentSanitizer,
132 deferral_backoff: Duration,
134 consecutive_spawn_failures: u32,
136 topology: TopologyAnalysis,
138 topology_dirty: bool,
141 current_level: usize,
143 verify_completeness: bool,
145 verify_provider: String,
148 task_replan_counts: HashMap<TaskId, u32>,
150 global_replan_count: u32,
152 max_replans: u32,
154}
155
156impl std::fmt::Debug for DagScheduler {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("DagScheduler")
159 .field("graph_id", &self.graph.id)
160 .field("graph_status", &self.graph.status)
161 .field("running_count", &self.running.len())
162 .field("max_parallel", &self.max_parallel)
163 .field("task_timeout_secs", &self.task_timeout.as_secs())
164 .field("topology", &self.topology.topology)
165 .field("strategy", &self.topology.strategy)
166 .field("current_level", &self.current_level)
167 .field("global_replan_count", &self.global_replan_count)
168 .finish_non_exhaustive()
169 }
170}
171
172impl DagScheduler {
173 pub fn new(
183 mut graph: TaskGraph,
184 config: &OrchestrationConfig,
185 router: Box<dyn AgentRouter>,
186 available_agents: Vec<SubAgentDef>,
187 ) -> Result<Self, OrchestrationError> {
188 if graph.status != GraphStatus::Created {
189 return Err(OrchestrationError::InvalidGraph(format!(
190 "graph must be in Created status, got {}",
191 graph.status
192 )));
193 }
194
195 dag::validate(&graph.tasks, config.max_tasks as usize)?;
196
197 graph.status = GraphStatus::Running;
198
199 for task in &mut graph.tasks {
200 if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
201 task.status = TaskStatus::Ready;
202 }
203 }
204
205 let (event_tx, event_rx) = mpsc::channel(64);
206
207 let task_timeout = if config.task_timeout_secs > 0 {
208 Duration::from_secs(config.task_timeout_secs)
209 } else {
210 Duration::from_secs(600)
211 };
212
213 let topology = TopologyClassifier::analyze(&graph, config);
214 let max_parallel = topology.max_parallel;
215 let config_max_parallel = config.max_parallel as usize;
216
217 if config.topology_selection {
218 tracing::debug!(
219 topology = ?topology.topology,
220 strategy = ?topology.strategy,
221 max_parallel,
222 "topology-aware concurrency limit applied"
223 );
224 }
225
226 Ok(Self {
227 graph,
228 max_parallel,
229 config_max_parallel,
230 running: HashMap::new(),
231 event_rx,
232 event_tx,
233 task_timeout,
234 router,
235 available_agents,
236 dependency_context_budget: config.dependency_context_budget,
237 buffered_events: VecDeque::new(),
238 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
239 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
240 consecutive_spawn_failures: 0,
241 topology,
242 topology_dirty: false,
243 current_level: 0,
244 verify_completeness: config.verify_completeness,
245 verify_provider: config.verify_provider.trim().to_string(),
246 task_replan_counts: HashMap::new(),
247 global_replan_count: 0,
248 max_replans: config.max_replans,
249 })
250 }
251
252 pub fn resume_from(
266 mut graph: TaskGraph,
267 config: &OrchestrationConfig,
268 router: Box<dyn AgentRouter>,
269 available_agents: Vec<SubAgentDef>,
270 ) -> Result<Self, OrchestrationError> {
271 if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
272 return Err(OrchestrationError::InvalidGraph(format!(
273 "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
274 graph.status
275 )));
276 }
277
278 graph.status = GraphStatus::Running;
281
282 let running: HashMap<TaskId, RunningTask> = graph
287 .tasks
288 .iter()
289 .filter(|t| t.status == TaskStatus::Running)
290 .filter_map(|t| {
291 let handle_id = t.assigned_agent.clone()?;
292 let def_name = t.agent_hint.clone().unwrap_or_default();
293 Some((
294 t.id,
295 RunningTask {
296 agent_handle_id: handle_id,
297 agent_def_name: def_name,
298 started_at: Instant::now(),
300 },
301 ))
302 })
303 .collect();
304
305 let (event_tx, event_rx) = mpsc::channel(64);
306
307 let task_timeout = if config.task_timeout_secs > 0 {
308 Duration::from_secs(config.task_timeout_secs)
309 } else {
310 Duration::from_secs(600)
311 };
312
313 let topology = TopologyClassifier::analyze(&graph, config);
314 let max_parallel = topology.max_parallel;
315 let config_max_parallel = config.max_parallel as usize;
316
317 Ok(Self {
318 graph,
319 max_parallel,
320 config_max_parallel,
321 running,
322 event_rx,
323 event_tx,
324 task_timeout,
325 router,
326 available_agents,
327 dependency_context_budget: config.dependency_context_budget,
328 buffered_events: VecDeque::new(),
329 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
330 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
331 consecutive_spawn_failures: 0,
332 topology,
333 topology_dirty: false,
334 current_level: 0,
335 verify_completeness: config.verify_completeness,
336 verify_provider: config.verify_provider.trim().to_string(),
337 task_replan_counts: HashMap::new(),
338 global_replan_count: 0,
339 max_replans: config.max_replans,
340 })
341 }
342
343 pub fn validate_verify_config(
357 &self,
358 provider_names: &[&str],
359 ) -> Result<(), OrchestrationError> {
360 if !self.verify_completeness {
361 return Ok(());
362 }
363 let name = self.verify_provider.as_str();
364 if name.is_empty() || provider_names.is_empty() {
365 return Ok(());
366 }
367 if !provider_names.contains(&name) {
368 return Err(OrchestrationError::InvalidConfig(format!(
369 "verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
370 name,
371 provider_names.join(", ")
372 )));
373 }
374 Ok(())
375 }
376
377 #[must_use]
379 pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
380 self.event_tx.clone()
381 }
382
383 #[must_use]
385 pub fn graph(&self) -> &TaskGraph {
386 &self.graph
387 }
388
389 #[must_use]
393 pub fn into_graph(&self) -> TaskGraph {
394 self.graph.clone()
395 }
396
397 #[must_use]
399 pub fn topology(&self) -> &TopologyAnalysis {
400 &self.topology
401 }
402
403 pub fn inject_tasks(
418 &mut self,
419 verified_task_id: TaskId,
420 new_tasks: Vec<TaskNode>,
421 max_tasks: usize,
422 ) -> Result<(), OrchestrationError> {
423 if new_tasks.is_empty() {
424 return Ok(());
425 }
426
427 let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
429 if *task_replan_count >= 1 {
430 tracing::warn!(
431 task_id = %verified_task_id,
432 "per-task replan limit (1) reached, skipping replan injection"
433 );
434 return Ok(());
435 }
436
437 if self.global_replan_count >= self.max_replans {
439 tracing::warn!(
440 global_replan_count = self.global_replan_count,
441 max_replans = self.max_replans,
442 "global replan limit reached, skipping replan injection"
443 );
444 return Ok(());
445 }
446
447 verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
448
449 *task_replan_count += 1;
450 self.global_replan_count += 1;
451
452 self.topology_dirty = true;
454
455 Ok(())
456 }
457}
458
459impl Drop for DagScheduler {
460 fn drop(&mut self) {
461 if !self.running.is_empty() {
462 tracing::warn!(
463 running_tasks = self.running.len(),
464 "DagScheduler dropped with running tasks; agents may continue until their \
465 CancellationToken fires or they complete naturally"
466 );
467 }
468 }
469}
470
471impl DagScheduler {
472 #[allow(clippy::too_many_lines)]
476 pub fn tick(&mut self) -> Vec<SchedulerAction> {
477 if self.graph.status != GraphStatus::Running {
478 return vec![SchedulerAction::Done {
479 status: self.graph.status,
480 }];
481 }
482
483 if self.topology_dirty {
486 let new_analysis = {
490 let n = self.graph.tasks.len();
491 if n == 0 {
492 TopologyAnalysis {
493 topology: Topology::AllParallel,
494 strategy: DispatchStrategy::FullParallel,
495 max_parallel: self.config_max_parallel,
498 depth: 0,
499 depths: std::collections::HashMap::new(),
500 }
501 } else {
502 let (depth, depths) =
506 super::topology::compute_depths_for_scheduler(&self.graph);
507 let topo =
508 TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
509 let strategy = TopologyClassifier::strategy(topo);
510 let max_parallel =
513 TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
514 TopologyAnalysis {
515 topology: topo,
516 strategy,
517 max_parallel,
518 depth,
519 depths,
520 }
521 }
522 };
523 self.topology = new_analysis;
524 self.max_parallel = self.topology.max_parallel;
528 self.topology_dirty = false;
529
530 if self.topology.strategy == DispatchStrategy::LevelBarrier {
535 let min_active = self
536 .graph
537 .tasks
538 .iter()
539 .filter(|t| !t.status.is_terminal())
540 .filter_map(|t| self.topology.depths.get(&t.id).copied())
541 .min();
542 if let Some(min_depth) = min_active {
543 self.current_level = self.current_level.min(min_depth);
544 }
545 }
546 }
547
548 let mut actions = Vec::new();
549
550 while let Some(event) = self.buffered_events.pop_front() {
552 let cancel_actions = self.process_event(event);
553 actions.extend(cancel_actions);
554 }
555 while let Ok(event) = self.event_rx.try_recv() {
556 let cancel_actions = self.process_event(event);
557 actions.extend(cancel_actions);
558 }
559
560 if self.graph.status != GraphStatus::Running {
561 return actions;
562 }
563
564 let timeout_actions = self.check_timeouts();
566 actions.extend(timeout_actions);
567
568 if self.graph.status != GraphStatus::Running {
569 return actions;
570 }
571
572 let ready = dag::ready_tasks(&self.graph);
578
579 if self.topology.strategy == DispatchStrategy::LevelBarrier {
583 let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
584 let task_depth = self
585 .topology
586 .depths
587 .get(&t.id)
588 .copied()
589 .unwrap_or(usize::MAX);
590 task_depth != self.current_level || t.status.is_terminal()
591 });
592 if all_current_level_terminal {
593 let max_depth = self.topology.depth;
595 while self.current_level <= max_depth {
596 let has_non_terminal = self.graph.tasks.iter().any(|t| {
597 let d = self
598 .topology
599 .depths
600 .get(&t.id)
601 .copied()
602 .unwrap_or(usize::MAX);
603 d == self.current_level && !t.status.is_terminal()
604 });
605 if has_non_terminal {
606 break;
607 }
608 self.current_level += 1;
609 }
610 }
611 }
612
613 let mut slots = self.max_parallel.saturating_sub(self.running.len());
615
616 let mut sequential_spawned_this_tick = false;
619 let has_running_sequential = self
620 .running
621 .keys()
622 .any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
623
624 for task_id in ready {
625 if slots == 0 {
626 break;
627 }
628
629 if self.topology.strategy == DispatchStrategy::LevelBarrier {
631 let task_depth = self
632 .topology
633 .depths
634 .get(&task_id)
635 .copied()
636 .unwrap_or(usize::MAX);
637 if task_depth != self.current_level {
638 continue;
639 }
640 }
641
642 let task = &self.graph.tasks[task_id.index()];
643
644 if task.execution_mode == ExecutionMode::Sequential {
648 if sequential_spawned_this_tick || has_running_sequential {
649 continue;
650 }
651 sequential_spawned_this_tick = true;
652 }
653
654 let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
655 tracing::debug!(
656 task_id = %task_id,
657 title = %task.title,
658 "no agent available, routing task to main agent inline"
659 );
660 let prompt = self.build_task_prompt(task);
661 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
662 actions.push(SchedulerAction::RunInline { task_id, prompt });
663 slots -= 1;
664 continue;
665 };
666
667 let prompt = self.build_task_prompt(task);
668
669 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
671
672 actions.push(SchedulerAction::Spawn {
673 task_id,
674 agent_def_name,
675 prompt,
676 });
677 slots -= 1;
678 }
679
680 let running_in_graph_now = self
691 .graph
692 .tasks
693 .iter()
694 .filter(|t| t.status == TaskStatus::Running)
695 .count();
696 if running_in_graph_now == 0 && self.running.is_empty() {
697 let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
698 if all_terminal {
699 self.graph.status = GraphStatus::Completed;
700 self.graph.finished_at = Some(super::graph::chrono_now());
701 actions.push(SchedulerAction::Done {
702 status: GraphStatus::Completed,
703 });
704 } else if dag::ready_tasks(&self.graph).is_empty() {
705 tracing::error!(
706 "scheduler deadlock: no running or ready tasks, but graph not complete"
707 );
708 self.graph.status = GraphStatus::Failed;
709 self.graph.finished_at = Some(super::graph::chrono_now());
710 debug_assert!(
712 self.running.is_empty(),
713 "deadlock branch reached with non-empty running map"
714 );
715 for task in &mut self.graph.tasks {
716 if !task.status.is_terminal() {
717 task.status = TaskStatus::Canceled;
718 }
719 }
720 actions.push(SchedulerAction::Done {
721 status: GraphStatus::Failed,
722 });
723 }
724 }
725
726 actions
727 }
728
729 fn current_deferral_backoff(&self) -> Duration {
738 const MAX_BACKOFF: Duration = Duration::from_secs(5);
739 let multiplier = 1u32
740 .checked_shl(self.consecutive_spawn_failures.min(10))
741 .unwrap_or(u32::MAX);
742 self.deferral_backoff
743 .saturating_mul(multiplier)
744 .min(MAX_BACKOFF)
745 }
746
747 pub async fn wait_event(&mut self) {
748 if self.running.is_empty() {
749 tokio::time::sleep(self.current_deferral_backoff()).await;
750 return;
751 }
752
753 let nearest_timeout = self
755 .running
756 .values()
757 .map(|r| {
758 self.task_timeout
759 .checked_sub(r.started_at.elapsed())
760 .unwrap_or(Duration::ZERO)
761 })
762 .min()
763 .unwrap_or(Duration::from_secs(1));
764
765 let wait_duration = nearest_timeout.max(Duration::from_millis(100));
767
768 tokio::select! {
769 Some(event) = self.event_rx.recv() => {
770 if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
774 if let Some(dropped) = self.buffered_events.pop_front() {
777 tracing::error!(
778 task_id = %dropped.task_id,
779 buffer_len = self.buffered_events.len(),
780 "event buffer saturated; completion event dropped — task may \
781 remain Running until timeout"
782 );
783 }
784 }
785 self.buffered_events.push_back(event);
786 }
787 () = tokio::time::sleep(wait_duration) => {}
788 }
789 }
790
791 pub fn record_spawn(
801 &mut self,
802 task_id: TaskId,
803 agent_handle_id: String,
804 agent_def_name: String,
805 ) {
806 self.consecutive_spawn_failures = 0;
807 self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
808 self.running.insert(
809 task_id,
810 RunningTask {
811 agent_handle_id,
812 agent_def_name,
813 started_at: Instant::now(),
814 },
815 );
816 }
817
818 pub fn record_spawn_failure(
829 &mut self,
830 task_id: TaskId,
831 error: &SubAgentError,
832 ) -> Vec<SchedulerAction> {
833 if let SubAgentError::ConcurrencyLimit { active, max } = error {
837 tracing::warn!(
838 task_id = %task_id,
839 active,
840 max,
841 next_backoff_ms = self.current_deferral_backoff().as_millis(),
842 "concurrency limit reached, deferring task to next tick"
843 );
844 self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
845 return Vec::new();
846 }
847
848 let error_excerpt: String = error.to_string().chars().take(512).collect();
850 tracing::warn!(
851 task_id = %task_id,
852 error = %error_excerpt,
853 "spawn failed, marking task failed"
854 );
855 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
856 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
857 let mut actions = Vec::new();
858 for cancel_task_id in cancel_ids {
859 if let Some(running) = self.running.remove(&cancel_task_id) {
860 actions.push(SchedulerAction::Cancel {
861 agent_handle_id: running.agent_handle_id,
862 });
863 }
864 }
865 if self.graph.status != GraphStatus::Running {
866 self.graph.finished_at = Some(super::graph::chrono_now());
867 actions.push(SchedulerAction::Done {
868 status: self.graph.status,
869 });
870 }
871 actions
872 }
873
874 pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
883 if any_success {
884 self.consecutive_spawn_failures = 0;
885 } else if any_concurrency_failure {
886 self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
887 }
888 }
889
890 pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
899 self.graph.status = GraphStatus::Canceled;
900 self.graph.finished_at = Some(super::graph::chrono_now());
901
902 let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
904 let mut actions: Vec<SchedulerAction> = running
905 .into_iter()
906 .map(|(task_id, r)| {
907 self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
908 SchedulerAction::Cancel {
909 agent_handle_id: r.agent_handle_id,
910 }
911 })
912 .collect();
913
914 for task in &mut self.graph.tasks {
915 if !task.status.is_terminal() {
916 task.status = TaskStatus::Canceled;
917 }
918 }
919
920 actions.push(SchedulerAction::Done {
921 status: GraphStatus::Canceled,
922 });
923 actions
924 }
925}
926
927impl DagScheduler {
928 fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
930 let TaskEvent {
931 task_id,
932 agent_handle_id,
933 outcome,
934 } = event;
935
936 match self.running.get(&task_id) {
939 Some(running) if running.agent_handle_id != agent_handle_id => {
940 tracing::warn!(
941 task_id = %task_id,
942 expected = %running.agent_handle_id,
943 got = %agent_handle_id,
944 "discarding stale event from previous agent incarnation"
945 );
946 return Vec::new();
947 }
948 None => {
949 tracing::debug!(
950 task_id = %task_id,
951 agent_handle_id = %agent_handle_id,
952 "ignoring event for task not in running map"
953 );
954 return Vec::new();
955 }
956 Some(_) => {}
957 }
958
959 let duration_ms = self.running.get(&task_id).map_or(0, |r| {
961 u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
962 });
963 let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
964
965 self.running.remove(&task_id);
966
967 match outcome {
968 TaskOutcome::Completed { output, artifacts } => {
969 self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
970 self.graph.tasks[task_id.index()].result = Some(TaskResult {
971 output: output.clone(),
972 artifacts,
973 duration_ms,
974 agent_id: Some(agent_handle_id),
975 agent_def: agent_def_name,
976 });
977
978 let newly_ready = dag::ready_tasks(&self.graph);
981 for ready_id in newly_ready {
982 if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
983 self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
984 }
985 }
986
987 if self.verify_completeness {
993 vec![SchedulerAction::Verify { task_id, output }]
994 } else {
995 Vec::new()
996 }
997 }
998
999 TaskOutcome::Failed { error } => {
1000 let error_excerpt: String = error.chars().take(512).collect();
1002 tracing::warn!(
1003 task_id = %task_id,
1004 error = %error_excerpt,
1005 "task failed"
1006 );
1007 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1008
1009 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1010 let mut actions = Vec::new();
1011
1012 for cancel_task_id in cancel_ids {
1013 if let Some(running) = self.running.remove(&cancel_task_id) {
1014 actions.push(SchedulerAction::Cancel {
1015 agent_handle_id: running.agent_handle_id,
1016 });
1017 }
1018 }
1019
1020 if self.graph.status != GraphStatus::Running {
1021 self.graph.finished_at = Some(super::graph::chrono_now());
1022 actions.push(SchedulerAction::Done {
1023 status: self.graph.status,
1024 });
1025 }
1026
1027 actions
1028 }
1029 }
1030 }
1031
1032 fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
1040 let timed_out: Vec<(TaskId, String)> = self
1041 .running
1042 .iter()
1043 .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
1044 .map(|(id, r)| (*id, r.agent_handle_id.clone()))
1045 .collect();
1046
1047 let mut actions = Vec::new();
1048 for (task_id, agent_handle_id) in timed_out {
1049 tracing::warn!(
1050 task_id = %task_id,
1051 timeout_secs = self.task_timeout.as_secs(),
1052 "task timed out"
1053 );
1054 self.running.remove(&task_id);
1055 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
1056
1057 actions.push(SchedulerAction::Cancel { agent_handle_id });
1058
1059 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
1060 for cancel_task_id in cancel_ids {
1061 if let Some(running) = self.running.remove(&cancel_task_id) {
1062 actions.push(SchedulerAction::Cancel {
1063 agent_handle_id: running.agent_handle_id,
1064 });
1065 }
1066 }
1067
1068 if self.graph.status != GraphStatus::Running {
1069 self.graph.finished_at = Some(super::graph::chrono_now());
1070 actions.push(SchedulerAction::Done {
1071 status: self.graph.status,
1072 });
1073 break;
1074 }
1075 }
1076
1077 actions
1078 }
1079
1080 fn build_task_prompt(&self, task: &TaskNode) -> String {
1086 if task.depends_on.is_empty() {
1087 return task.description.clone();
1088 }
1089
1090 let completed_deps: Vec<&TaskNode> = task
1091 .depends_on
1092 .iter()
1093 .filter_map(|dep_id| {
1094 let dep = &self.graph.tasks[dep_id.index()];
1095 if dep.status == TaskStatus::Completed {
1096 Some(dep)
1097 } else {
1098 None
1099 }
1100 })
1101 .collect();
1102
1103 if completed_deps.is_empty() {
1104 return task.description.clone();
1105 }
1106
1107 let budget_per_dep = self
1108 .dependency_context_budget
1109 .checked_div(completed_deps.len())
1110 .unwrap_or(self.dependency_context_budget);
1111
1112 let mut context_block = String::from("<completed-dependencies>\n");
1113
1114 for dep in &completed_deps {
1115 let escaped_id = xml_escape(&dep.id.to_string());
1118 let escaped_title = xml_escape(&dep.title);
1119 let _ = writeln!(
1120 context_block,
1121 "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
1122 );
1123
1124 if let Some(ref result) = dep.result {
1125 let source = ContentSource::new(ContentSourceKind::A2aMessage);
1127 let sanitized = self.sanitizer.sanitize(&result.output, source);
1128 let safe_output = sanitized.body;
1129
1130 let char_count = safe_output.chars().count();
1132 if char_count > budget_per_dep {
1133 let truncated: String = safe_output.chars().take(budget_per_dep).collect();
1134 let _ = write!(
1135 context_block,
1136 "{truncated}...\n[truncated: {char_count} chars total]"
1137 );
1138 } else {
1139 context_block.push_str(&safe_output);
1140 }
1141 } else {
1142 context_block.push_str("[no output recorded]\n");
1143 }
1144 context_block.push('\n');
1145 }
1146
1147 for dep_id in &task.depends_on {
1149 let dep = &self.graph.tasks[dep_id.index()];
1150 if dep.status == TaskStatus::Skipped {
1151 let escaped_id = xml_escape(&dep.id.to_string());
1152 let escaped_title = xml_escape(&dep.title);
1153 let _ = writeln!(
1154 context_block,
1155 "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
1156 );
1157 }
1158 }
1159
1160 context_block.push_str("</completed-dependencies>\n\n");
1161 format!("{context_block}Your task: {}", task.description)
1162 }
1163}
1164
1165fn xml_escape(s: &str) -> String {
1167 let mut out = String::with_capacity(s.len());
1168 for c in s.chars() {
1169 match c {
1170 '<' => out.push_str("<"),
1171 '>' => out.push_str(">"),
1172 '&' => out.push_str("&"),
1173 '"' => out.push_str("""),
1174 '\'' => out.push_str("'"),
1175 other => out.push(other),
1176 }
1177 }
1178 out
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 #![allow(clippy::default_trait_access)]
1184
1185 use super::*;
1186 use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
1187
1188 fn make_node(id: u32, deps: &[u32]) -> TaskNode {
1189 let mut n = TaskNode::new(
1190 id,
1191 format!("task-{id}"),
1192 format!("description for task {id}"),
1193 );
1194 n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
1195 n
1196 }
1197
1198 fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
1199 let mut g = TaskGraph::new("test goal");
1200 g.tasks = nodes;
1201 g
1202 }
1203
1204 fn make_def(name: &str) -> SubAgentDef {
1205 use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
1206 SubAgentDef {
1207 name: name.to_string(),
1208 description: format!("{name} agent"),
1209 model: None,
1210 tools: ToolPolicy::InheritAll,
1211 disallowed_tools: vec![],
1212 permissions: SubAgentPermissions::default(),
1213 skills: SkillFilter::default(),
1214 system_prompt: String::new(),
1215 hooks: SubagentHooks::default(),
1216 memory: None,
1217 source: None,
1218 file_path: None,
1219 }
1220 }
1221
1222 fn make_config() -> zeph_config::OrchestrationConfig {
1223 zeph_config::OrchestrationConfig {
1224 enabled: true,
1225 max_tasks: 20,
1226 max_parallel: 4,
1227 default_failure_strategy: "abort".to_string(),
1228 default_max_retries: 3,
1229 task_timeout_secs: 300,
1230 planner_provider: String::new(),
1231 planner_max_tokens: 4096,
1232 dependency_context_budget: 16384,
1233 confirm_before_execute: true,
1234 aggregator_max_tokens: 4096,
1235 deferral_backoff_ms: 250,
1236 plan_cache: zeph_config::PlanCacheConfig::default(),
1237 topology_selection: false,
1238 verify_provider: String::new(),
1239 verify_max_tokens: 1024,
1240 max_replans: 2,
1241 verify_completeness: false,
1242 }
1243 }
1244
1245 struct FirstRouter;
1246 impl AgentRouter for FirstRouter {
1247 fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
1248 available.first().map(|d| d.name.clone())
1249 }
1250 }
1251
1252 struct NoneRouter;
1253 impl AgentRouter for NoneRouter {
1254 fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
1255 None
1256 }
1257 }
1258
1259 fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
1260 let config = make_config();
1261 let defs = vec![make_def("worker")];
1262 DagScheduler::new(graph, &config, router, defs).unwrap()
1263 }
1264
1265 fn make_scheduler(graph: TaskGraph) -> DagScheduler {
1266 let config = make_config();
1267 let defs = vec![make_def("worker")];
1268 DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
1269 }
1270
1271 #[test]
1274 fn test_new_validates_graph_status() {
1275 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1276 graph.status = GraphStatus::Running; let config = make_config();
1278 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1279 assert!(result.is_err());
1280 let err = result.unwrap_err();
1281 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1282 }
1283
1284 #[test]
1285 fn test_new_marks_roots_ready() {
1286 let graph = graph_from_nodes(vec![
1287 make_node(0, &[]),
1288 make_node(1, &[]),
1289 make_node(2, &[0, 1]),
1290 ]);
1291 let scheduler = make_scheduler(graph);
1292 assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
1293 assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
1294 assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
1295 assert_eq!(scheduler.graph().status, GraphStatus::Running);
1296 }
1297
1298 #[test]
1299 fn test_new_validates_empty_graph() {
1300 let graph = graph_from_nodes(vec![]);
1301 let config = make_config();
1302 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
1303 assert!(result.is_err());
1304 }
1305
1306 #[test]
1309 fn test_tick_produces_spawn_for_ready() {
1310 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1311 let mut scheduler = make_scheduler(graph);
1312 let actions = scheduler.tick();
1313 let spawns: Vec<_> = actions
1314 .iter()
1315 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1316 .collect();
1317 assert_eq!(spawns.len(), 2);
1318 }
1319
1320 #[test]
1321 fn test_tick_dispatches_all_regardless_of_max_parallel() {
1322 let graph = graph_from_nodes(vec![
1325 make_node(0, &[]),
1326 make_node(1, &[]),
1327 make_node(2, &[]),
1328 make_node(3, &[]),
1329 make_node(4, &[]),
1330 ]);
1331 let mut config = make_config();
1332 config.max_parallel = 2;
1333 let defs = vec![make_def("worker")];
1334 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1335 let actions = scheduler.tick();
1336 let spawn_count = actions
1337 .iter()
1338 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1339 .count();
1340 assert_eq!(
1341 spawn_count, 2,
1342 "max_parallel=2 caps dispatched tasks per tick"
1343 );
1344 }
1345
1346 #[test]
1347 fn test_tick_detects_completion() {
1348 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1349 graph.tasks[0].status = TaskStatus::Completed;
1350 let config = make_config();
1351 let defs = vec![make_def("worker")];
1352 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1353 let actions = scheduler.tick();
1356 let has_done = actions.iter().any(|a| {
1357 matches!(
1358 a,
1359 SchedulerAction::Done {
1360 status: GraphStatus::Completed
1361 }
1362 )
1363 });
1364 assert!(
1365 has_done,
1366 "should emit Done(Completed) when all tasks are terminal"
1367 );
1368 }
1369
1370 #[test]
1373 fn test_completion_event_marks_deps_ready() {
1374 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1375 let mut scheduler = make_scheduler(graph);
1376
1377 scheduler.graph.tasks[0].status = TaskStatus::Running;
1379 scheduler.running.insert(
1380 TaskId(0),
1381 RunningTask {
1382 agent_handle_id: "handle-0".to_string(),
1383 agent_def_name: "worker".to_string(),
1384 started_at: Instant::now(),
1385 },
1386 );
1387
1388 let event = TaskEvent {
1389 task_id: TaskId(0),
1390 agent_handle_id: "handle-0".to_string(),
1391 outcome: TaskOutcome::Completed {
1392 output: "done".to_string(),
1393 artifacts: vec![],
1394 },
1395 };
1396 scheduler.buffered_events.push_back(event);
1397
1398 let actions = scheduler.tick();
1399 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1400 let has_spawn_1 = actions
1402 .iter()
1403 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1404 assert!(
1405 has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1406 "task 1 should be spawned or marked Ready"
1407 );
1408 }
1409
1410 #[test]
1411 fn test_failure_abort_cancels_running() {
1412 let graph = graph_from_nodes(vec![
1413 make_node(0, &[]),
1414 make_node(1, &[]),
1415 make_node(2, &[0, 1]),
1416 ]);
1417 let mut scheduler = make_scheduler(graph);
1418
1419 scheduler.graph.tasks[0].status = TaskStatus::Running;
1421 scheduler.running.insert(
1422 TaskId(0),
1423 RunningTask {
1424 agent_handle_id: "h0".to_string(),
1425 agent_def_name: "worker".to_string(),
1426 started_at: Instant::now(),
1427 },
1428 );
1429 scheduler.graph.tasks[1].status = TaskStatus::Running;
1430 scheduler.running.insert(
1431 TaskId(1),
1432 RunningTask {
1433 agent_handle_id: "h1".to_string(),
1434 agent_def_name: "worker".to_string(),
1435 started_at: Instant::now(),
1436 },
1437 );
1438
1439 let event = TaskEvent {
1441 task_id: TaskId(0),
1442 agent_handle_id: "h0".to_string(),
1443 outcome: TaskOutcome::Failed {
1444 error: "boom".to_string(),
1445 },
1446 };
1447 scheduler.buffered_events.push_back(event);
1448
1449 let actions = scheduler.tick();
1450 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1451 let cancel_ids: Vec<_> = actions
1452 .iter()
1453 .filter_map(|a| {
1454 if let SchedulerAction::Cancel { agent_handle_id } = a {
1455 Some(agent_handle_id.as_str())
1456 } else {
1457 None
1458 }
1459 })
1460 .collect();
1461 assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1462 assert!(
1463 actions
1464 .iter()
1465 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1466 );
1467 }
1468
1469 #[test]
1470 fn test_failure_skip_propagates() {
1471 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1472 let mut scheduler = make_scheduler(graph);
1473
1474 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1476 scheduler.graph.tasks[0].status = TaskStatus::Running;
1477 scheduler.running.insert(
1478 TaskId(0),
1479 RunningTask {
1480 agent_handle_id: "h0".to_string(),
1481 agent_def_name: "worker".to_string(),
1482 started_at: Instant::now(),
1483 },
1484 );
1485
1486 let event = TaskEvent {
1487 task_id: TaskId(0),
1488 agent_handle_id: "h0".to_string(),
1489 outcome: TaskOutcome::Failed {
1490 error: "skip me".to_string(),
1491 },
1492 };
1493 scheduler.buffered_events.push_back(event);
1494 scheduler.tick();
1495
1496 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1497 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1498 }
1499
1500 #[test]
1501 fn test_failure_retry_reschedules() {
1502 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1503 let mut scheduler = make_scheduler(graph);
1504
1505 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1506 scheduler.graph.tasks[0].max_retries = Some(3);
1507 scheduler.graph.tasks[0].retry_count = 0;
1508 scheduler.graph.tasks[0].status = TaskStatus::Running;
1509 scheduler.running.insert(
1510 TaskId(0),
1511 RunningTask {
1512 agent_handle_id: "h0".to_string(),
1513 agent_def_name: "worker".to_string(),
1514 started_at: Instant::now(),
1515 },
1516 );
1517
1518 let event = TaskEvent {
1519 task_id: TaskId(0),
1520 agent_handle_id: "h0".to_string(),
1521 outcome: TaskOutcome::Failed {
1522 error: "transient".to_string(),
1523 },
1524 };
1525 scheduler.buffered_events.push_back(event);
1526 let actions = scheduler.tick();
1527
1528 let has_spawn = actions
1530 .iter()
1531 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1532 assert!(
1533 has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1534 "retry should produce spawn or Ready status"
1535 );
1536 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1538 }
1539
1540 #[test]
1541 fn test_process_event_failed_retry() {
1542 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1544 let mut scheduler = make_scheduler(graph);
1545
1546 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1547 scheduler.graph.tasks[0].max_retries = Some(2);
1548 scheduler.graph.tasks[0].retry_count = 0;
1549 scheduler.graph.tasks[0].status = TaskStatus::Running;
1550 scheduler.running.insert(
1551 TaskId(0),
1552 RunningTask {
1553 agent_handle_id: "h0".to_string(),
1554 agent_def_name: "worker".to_string(),
1555 started_at: Instant::now(),
1556 },
1557 );
1558
1559 let event = TaskEvent {
1560 task_id: TaskId(0),
1561 agent_handle_id: "h0".to_string(),
1562 outcome: TaskOutcome::Failed {
1563 error: "first failure".to_string(),
1564 },
1565 };
1566 scheduler.buffered_events.push_back(event);
1567 let actions = scheduler.tick();
1568
1569 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1571 let spawned = actions
1572 .iter()
1573 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1574 assert!(
1575 spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1576 "retry should emit Spawn or set Ready"
1577 );
1578 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1580 }
1581
1582 #[test]
1583 fn test_timeout_cancels_stalled() {
1584 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1585 let mut config = make_config();
1586 config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
1588 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1589
1590 scheduler.graph.tasks[0].status = TaskStatus::Running;
1592 scheduler.running.insert(
1593 TaskId(0),
1594 RunningTask {
1595 agent_handle_id: "h0".to_string(),
1596 agent_def_name: "worker".to_string(),
1597 started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
1599 );
1600
1601 let actions = scheduler.tick();
1602 let has_cancel = actions.iter().any(
1603 |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1604 );
1605 assert!(has_cancel, "timed-out task should emit Cancel action");
1606 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1607 }
1608
1609 #[test]
1610 fn test_cancel_all() {
1611 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1612 let mut scheduler = make_scheduler(graph);
1613
1614 scheduler.graph.tasks[0].status = TaskStatus::Running;
1615 scheduler.running.insert(
1616 TaskId(0),
1617 RunningTask {
1618 agent_handle_id: "h0".to_string(),
1619 agent_def_name: "worker".to_string(),
1620 started_at: Instant::now(),
1621 },
1622 );
1623 scheduler.graph.tasks[1].status = TaskStatus::Running;
1624 scheduler.running.insert(
1625 TaskId(1),
1626 RunningTask {
1627 agent_handle_id: "h1".to_string(),
1628 agent_def_name: "worker".to_string(),
1629 started_at: Instant::now(),
1630 },
1631 );
1632
1633 let actions = scheduler.cancel_all();
1634
1635 assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1636 assert!(scheduler.running.is_empty());
1637 let cancel_count = actions
1638 .iter()
1639 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1640 .count();
1641 assert_eq!(cancel_count, 2);
1642 assert!(actions.iter().any(|a| matches!(
1643 a,
1644 SchedulerAction::Done {
1645 status: GraphStatus::Canceled
1646 }
1647 )));
1648 }
1649
1650 #[test]
1651 fn test_record_spawn_failure() {
1652 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1653 let mut scheduler = make_scheduler(graph);
1654
1655 scheduler.graph.tasks[0].status = TaskStatus::Running;
1657
1658 let error = SubAgentError::Spawn("spawn error".to_string());
1659 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1660 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1661 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1663 assert!(
1664 actions
1665 .iter()
1666 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1667 );
1668 }
1669
1670 #[test]
1671 fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1672 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1673 let mut scheduler = make_scheduler(graph);
1674
1675 scheduler.graph.tasks[0].status = TaskStatus::Running;
1677
1678 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1680 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1681 assert_eq!(
1682 scheduler.graph.tasks[0].status,
1683 TaskStatus::Ready,
1684 "task must revert to Ready so the next tick can retry"
1685 );
1686 assert_eq!(
1687 scheduler.graph.status,
1688 GraphStatus::Running,
1689 "graph must stay Running, not transition to Failed"
1690 );
1691 assert!(
1692 actions.is_empty(),
1693 "no cancel or done actions expected for a transient deferral"
1694 );
1695 }
1696
1697 #[test]
1698 fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1699 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1701 let mut scheduler = make_scheduler(graph);
1702 scheduler.graph.tasks[0].status = TaskStatus::Running;
1703
1704 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1705 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1706 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1707 assert!(actions.is_empty());
1708 }
1709
1710 #[test]
1713 fn test_concurrency_deferral_does_not_affect_running_task() {
1714 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1718 let mut scheduler = make_scheduler(graph);
1719
1720 scheduler.graph.tasks[0].status = TaskStatus::Running;
1722 scheduler.running.insert(
1723 TaskId(0),
1724 RunningTask {
1725 agent_handle_id: "h0".to_string(),
1726 agent_def_name: "worker".to_string(),
1727 started_at: Instant::now(),
1728 },
1729 );
1730 scheduler.graph.tasks[1].status = TaskStatus::Running;
1731
1732 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1734 let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1735
1736 assert_eq!(
1737 scheduler.graph.tasks[0].status,
1738 TaskStatus::Running,
1739 "task 0 must remain Running"
1740 );
1741 assert_eq!(
1742 scheduler.graph.tasks[1].status,
1743 TaskStatus::Ready,
1744 "task 1 must revert to Ready"
1745 );
1746 assert_eq!(
1747 scheduler.graph.status,
1748 GraphStatus::Running,
1749 "graph must stay Running"
1750 );
1751 assert!(actions.is_empty(), "no cancel or done actions expected");
1752 }
1753
1754 #[test]
1755 fn test_max_concurrent_zero_no_infinite_loop() {
1756 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1761 let config = zeph_config::OrchestrationConfig {
1762 max_parallel: 0,
1763 ..make_config()
1764 };
1765 let mut scheduler = DagScheduler::new(
1766 graph,
1767 &config,
1768 Box::new(FirstRouter),
1769 vec![make_def("worker")],
1770 )
1771 .unwrap();
1772
1773 let actions1 = scheduler.tick();
1774 assert!(
1776 actions1
1777 .iter()
1778 .all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
1779 "no Spawn expected when max_parallel=0"
1780 );
1781 assert!(
1782 actions1
1783 .iter()
1784 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1785 "no Done(Failed) expected — ready tasks exist, so no deadlock"
1786 );
1787 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1788
1789 let actions2 = scheduler.tick();
1791 assert!(
1792 actions2
1793 .iter()
1794 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1795 "second tick must not emit Done(Failed) — ready tasks still exist"
1796 );
1797 assert_eq!(
1798 scheduler.graph.status,
1799 GraphStatus::Running,
1800 "graph must remain Running"
1801 );
1802 }
1803
1804 #[test]
1805 fn test_all_tasks_deferred_graph_stays_running() {
1806 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1810 let mut scheduler = make_scheduler(graph);
1811
1812 let actions = scheduler.tick();
1814 assert_eq!(
1815 actions
1816 .iter()
1817 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1818 .count(),
1819 2,
1820 "expected 2 Spawn actions on first tick"
1821 );
1822 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1823 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1824
1825 let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1827 let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1828 let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1829 assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1830 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1831 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1832 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1833
1834 let retry_actions = scheduler.tick();
1836 let spawn_count = retry_actions
1837 .iter()
1838 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1839 .count();
1840 assert!(
1841 spawn_count > 0,
1842 "second tick must re-emit Spawn for deferred tasks"
1843 );
1844 assert!(
1845 retry_actions.iter().all(|a| !matches!(
1846 a,
1847 SchedulerAction::Done {
1848 status: GraphStatus::Failed,
1849 ..
1850 }
1851 )),
1852 "no Done(Failed) expected"
1853 );
1854 }
1855
1856 #[test]
1857 fn test_build_prompt_no_deps() {
1858 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1859 let scheduler = make_scheduler(graph);
1860 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1861 assert_eq!(prompt, "description for task 0");
1862 }
1863
1864 #[test]
1865 fn test_build_prompt_with_deps_and_truncation() {
1866 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1867 graph.tasks[0].status = TaskStatus::Completed;
1868 graph.tasks[0].result = Some(TaskResult {
1870 output: "x".repeat(200),
1871 artifacts: vec![],
1872 duration_ms: 10,
1873 agent_id: None,
1874 agent_def: None,
1875 });
1876
1877 let config = zeph_config::OrchestrationConfig {
1878 dependency_context_budget: 50,
1879 ..make_config()
1880 };
1881 let scheduler = DagScheduler::new(
1882 graph,
1883 &config,
1884 Box::new(FirstRouter),
1885 vec![make_def("worker")],
1886 )
1887 .unwrap();
1888
1889 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1890 assert!(prompt.contains("<completed-dependencies>"));
1891 assert!(prompt.contains("[truncated:"));
1892 assert!(prompt.contains("Your task:"));
1893 }
1894
1895 #[test]
1896 fn test_duration_ms_computed_correctly() {
1897 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1899 let mut scheduler = make_scheduler(graph);
1900
1901 scheduler.graph.tasks[0].status = TaskStatus::Running;
1902 scheduler.running.insert(
1903 TaskId(0),
1904 RunningTask {
1905 agent_handle_id: "h0".to_string(),
1906 agent_def_name: "worker".to_string(),
1907 started_at: Instant::now()
1908 .checked_sub(Duration::from_millis(50))
1909 .unwrap(),
1910 },
1911 );
1912
1913 let event = TaskEvent {
1914 task_id: TaskId(0),
1915 agent_handle_id: "h0".to_string(),
1916 outcome: TaskOutcome::Completed {
1917 output: "result".to_string(),
1918 artifacts: vec![],
1919 },
1920 };
1921 scheduler.buffered_events.push_back(event);
1922 scheduler.tick();
1923
1924 let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1925 assert!(
1926 result.duration_ms > 0,
1927 "duration_ms should be > 0, got {}",
1928 result.duration_ms
1929 );
1930 }
1931
1932 #[test]
1933 fn test_utf8_safe_truncation() {
1934 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1936 graph.tasks[0].status = TaskStatus::Completed;
1937 let unicode_output = "日本語テスト".repeat(100);
1939 graph.tasks[0].result = Some(TaskResult {
1940 output: unicode_output,
1941 artifacts: vec![],
1942 duration_ms: 10,
1943 agent_id: None,
1944 agent_def: None,
1945 });
1946
1947 let config = zeph_config::OrchestrationConfig {
1950 dependency_context_budget: 500,
1951 ..make_config()
1952 };
1953 let scheduler = DagScheduler::new(
1954 graph,
1955 &config,
1956 Box::new(FirstRouter),
1957 vec![make_def("worker")],
1958 )
1959 .unwrap();
1960
1961 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1963 assert!(
1964 prompt.contains("日"),
1965 "Japanese characters should be in the prompt after safe truncation"
1966 );
1967 }
1968
1969 #[test]
1970 fn test_no_agent_routes_inline() {
1971 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1973 let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1974 let actions = scheduler.tick();
1975 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1976 assert!(
1977 actions
1978 .iter()
1979 .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
1980 );
1981 }
1982
1983 #[test]
1984 fn test_stale_event_rejected() {
1985 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1987 let mut scheduler = make_scheduler(graph);
1988
1989 scheduler.graph.tasks[0].status = TaskStatus::Running;
1991 scheduler.running.insert(
1992 TaskId(0),
1993 RunningTask {
1994 agent_handle_id: "current-handle".to_string(),
1995 agent_def_name: "worker".to_string(),
1996 started_at: Instant::now(),
1997 },
1998 );
1999
2000 let stale_event = TaskEvent {
2002 task_id: TaskId(0),
2003 agent_handle_id: "old-handle".to_string(),
2004 outcome: TaskOutcome::Completed {
2005 output: "stale output".to_string(),
2006 artifacts: vec![],
2007 },
2008 };
2009 scheduler.buffered_events.push_back(stale_event);
2010 let actions = scheduler.tick();
2011
2012 assert_ne!(
2014 scheduler.graph.tasks[0].status,
2015 TaskStatus::Completed,
2016 "stale event must not complete the task"
2017 );
2018 let has_done = actions
2020 .iter()
2021 .any(|a| matches!(a, SchedulerAction::Done { .. }));
2022 assert!(
2023 !has_done,
2024 "no Done action should be emitted for a stale event"
2025 );
2026 assert!(
2028 scheduler.running.contains_key(&TaskId(0)),
2029 "running task must remain after stale event"
2030 );
2031 }
2032
2033 #[test]
2034 fn test_build_prompt_chars_count_in_truncation_message() {
2035 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2039 graph.tasks[0].status = TaskStatus::Completed;
2040 let output = "x".repeat(200);
2043 graph.tasks[0].result = Some(TaskResult {
2044 output,
2045 artifacts: vec![],
2046 duration_ms: 10,
2047 agent_id: None,
2048 agent_def: None,
2049 });
2050
2051 let config = zeph_config::OrchestrationConfig {
2052 dependency_context_budget: 10, ..make_config()
2054 };
2055 let scheduler = DagScheduler::new(
2056 graph,
2057 &config,
2058 Box::new(FirstRouter),
2059 vec![make_def("worker")],
2060 )
2061 .unwrap();
2062
2063 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
2064 assert!(
2066 prompt.contains("chars total"),
2067 "truncation message must use 'chars total' label. Prompt: {prompt}"
2068 );
2069 assert!(
2070 prompt.contains("[truncated:"),
2071 "prompt must contain truncation notice. Prompt: {prompt}"
2072 );
2073 }
2074
2075 #[test]
2078 fn test_resume_from_accepts_paused_graph() {
2079 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2080 graph.status = GraphStatus::Paused;
2081 graph.tasks[0].status = TaskStatus::Pending;
2082
2083 let scheduler =
2084 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2085 .expect("resume_from should accept Paused graph");
2086 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2087 }
2088
2089 #[test]
2090 fn test_resume_from_accepts_failed_graph() {
2091 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2092 graph.status = GraphStatus::Failed;
2093 graph.tasks[0].status = TaskStatus::Failed;
2094
2095 let scheduler =
2096 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2097 .expect("resume_from should accept Failed graph");
2098 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2099 }
2100
2101 #[test]
2102 fn test_resume_from_rejects_completed_graph() {
2103 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2104 graph.status = GraphStatus::Completed;
2105
2106 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2107 .unwrap_err();
2108 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2109 }
2110
2111 #[test]
2112 fn test_resume_from_rejects_canceled_graph() {
2113 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2114 graph.status = GraphStatus::Canceled;
2115
2116 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2117 .unwrap_err();
2118 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
2119 }
2120
2121 #[test]
2122 fn test_resume_from_reconstructs_running_tasks() {
2123 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2125 graph.status = GraphStatus::Paused;
2126 graph.tasks[0].status = TaskStatus::Running;
2127 graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
2128 graph.tasks[0].agent_hint = Some("worker".to_string());
2129 graph.tasks[1].status = TaskStatus::Pending;
2130
2131 let scheduler =
2132 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2133 .expect("should succeed");
2134
2135 assert!(
2136 scheduler.running.contains_key(&TaskId(0)),
2137 "Running task must be reconstructed in the running map (IC1)"
2138 );
2139 assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
2140 assert!(
2141 !scheduler.running.contains_key(&TaskId(1)),
2142 "Pending task must not appear in running map"
2143 );
2144 }
2145
2146 #[test]
2147 fn test_resume_from_sets_status_running() {
2148 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
2150 graph.status = GraphStatus::Paused;
2151
2152 let scheduler =
2153 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
2154 .unwrap();
2155 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2156 }
2157
2158 #[test]
2161 fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
2162 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2165 let mut scheduler = make_scheduler(graph);
2166 scheduler.graph.tasks[0].status = TaskStatus::Running;
2167
2168 assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
2169
2170 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
2171 scheduler.record_spawn_failure(TaskId(0), &error);
2172 scheduler.record_batch_backoff(false, true);
2174 assert_eq!(
2175 scheduler.consecutive_spawn_failures, 1,
2176 "first deferral tick: consecutive_spawn_failures must be 1"
2177 );
2178
2179 scheduler.graph.tasks[0].status = TaskStatus::Running;
2180 scheduler.record_spawn_failure(TaskId(0), &error);
2181 scheduler.record_batch_backoff(false, true);
2182 assert_eq!(
2183 scheduler.consecutive_spawn_failures, 2,
2184 "second deferral tick: consecutive_spawn_failures must be 2"
2185 );
2186
2187 scheduler.graph.tasks[0].status = TaskStatus::Running;
2188 scheduler.record_spawn_failure(TaskId(0), &error);
2189 scheduler.record_batch_backoff(false, true);
2190 assert_eq!(
2191 scheduler.consecutive_spawn_failures, 3,
2192 "third deferral tick: consecutive_spawn_failures must be 3"
2193 );
2194 }
2195
2196 #[test]
2197 fn test_consecutive_spawn_failures_resets_on_success() {
2198 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2201 let mut scheduler = make_scheduler(graph);
2202 scheduler.graph.tasks[0].status = TaskStatus::Running;
2203
2204 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2205 scheduler.record_spawn_failure(TaskId(0), &error);
2206 scheduler.record_batch_backoff(false, true);
2207 scheduler.graph.tasks[0].status = TaskStatus::Running;
2208 scheduler.record_spawn_failure(TaskId(0), &error);
2209 scheduler.record_batch_backoff(false, true);
2210 assert_eq!(scheduler.consecutive_spawn_failures, 2);
2211
2212 scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
2214 assert_eq!(
2215 scheduler.consecutive_spawn_failures, 0,
2216 "record_spawn must reset consecutive_spawn_failures to 0"
2217 );
2218 }
2219
2220 #[tokio::test]
2221 async fn test_exponential_backoff_duration() {
2222 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2225 let config = zeph_config::OrchestrationConfig {
2226 deferral_backoff_ms: 50,
2227 ..make_config()
2228 };
2229 let mut scheduler = DagScheduler::new(
2230 graph,
2231 &config,
2232 Box::new(FirstRouter),
2233 vec![make_def("worker")],
2234 )
2235 .unwrap();
2236
2237 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2239 let start = tokio::time::Instant::now();
2240 scheduler.wait_event().await;
2241 let elapsed0 = start.elapsed();
2242 assert!(
2243 elapsed0.as_millis() >= 50,
2244 "backoff with 0 deferrals must be >= base (50ms), got {}ms",
2245 elapsed0.as_millis()
2246 );
2247
2248 scheduler.consecutive_spawn_failures = 3;
2250 let start = tokio::time::Instant::now();
2251 scheduler.wait_event().await;
2252 let elapsed3 = start.elapsed();
2253 assert!(
2254 elapsed3.as_millis() >= 400,
2255 "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
2256 elapsed3.as_millis()
2257 );
2258
2259 scheduler.consecutive_spawn_failures = 20;
2261 let start = tokio::time::Instant::now();
2262 scheduler.wait_event().await;
2263 let elapsed20 = start.elapsed();
2264 assert!(
2265 elapsed20.as_millis() >= 5000,
2266 "backoff must be capped at 5000ms with high deferrals, got {}ms",
2267 elapsed20.as_millis()
2268 );
2269 }
2270
2271 #[tokio::test]
2274 async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
2275 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2278 let config = zeph_config::OrchestrationConfig {
2279 deferral_backoff_ms: 50,
2280 ..make_config()
2281 };
2282 let mut scheduler = DagScheduler::new(
2283 graph,
2284 &config,
2285 Box::new(FirstRouter),
2286 vec![make_def("worker")],
2287 )
2288 .unwrap();
2289
2290 assert!(scheduler.running.is_empty());
2292
2293 let start = tokio::time::Instant::now();
2294 scheduler.wait_event().await;
2295 let elapsed = start.elapsed();
2296
2297 assert!(
2298 elapsed.as_millis() >= 50,
2299 "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
2300 elapsed.as_millis()
2301 );
2302 }
2303
2304 #[test]
2305 fn test_current_deferral_backoff_exponential_growth() {
2306 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2309 let config = zeph_config::OrchestrationConfig {
2310 deferral_backoff_ms: 250,
2311 ..make_config()
2312 };
2313 let mut scheduler = DagScheduler::new(
2314 graph,
2315 &config,
2316 Box::new(FirstRouter),
2317 vec![make_def("worker")],
2318 )
2319 .unwrap();
2320
2321 assert_eq!(
2322 scheduler.current_deferral_backoff(),
2323 Duration::from_millis(250)
2324 );
2325
2326 scheduler.consecutive_spawn_failures = 1;
2327 assert_eq!(
2328 scheduler.current_deferral_backoff(),
2329 Duration::from_millis(500)
2330 );
2331
2332 scheduler.consecutive_spawn_failures = 2;
2333 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2334
2335 scheduler.consecutive_spawn_failures = 3;
2336 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2337
2338 scheduler.consecutive_spawn_failures = 4;
2339 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2340
2341 scheduler.consecutive_spawn_failures = 5;
2343 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2344
2345 scheduler.consecutive_spawn_failures = 100;
2346 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2347 }
2348
2349 #[test]
2350 fn test_record_spawn_resets_consecutive_failures() {
2351 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2353 let mut scheduler = DagScheduler::new(
2354 graph,
2355 &make_config(),
2356 Box::new(FirstRouter),
2357 vec![make_def("worker")],
2358 )
2359 .unwrap();
2360
2361 scheduler.consecutive_spawn_failures = 3;
2362 let task_id = TaskId(0);
2363 scheduler.graph.tasks[0].status = TaskStatus::Running;
2364 scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2365
2366 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2367 }
2368
2369 #[test]
2370 fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2371 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2374 let mut scheduler = DagScheduler::new(
2375 graph,
2376 &make_config(),
2377 Box::new(FirstRouter),
2378 vec![make_def("worker")],
2379 )
2380 .unwrap();
2381
2382 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2383 let task_id = TaskId(0);
2384 scheduler.graph.tasks[0].status = TaskStatus::Running;
2385
2386 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2387 scheduler.record_spawn_failure(task_id, &error);
2388
2389 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2391 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2393 }
2394
2395 #[test]
2398 fn test_parallel_dispatch_all_ready() {
2399 let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2402 let graph = graph_from_nodes(nodes);
2403 let config = zeph_config::OrchestrationConfig {
2404 max_parallel: 2,
2405 ..make_config()
2406 };
2407 let mut scheduler = DagScheduler::new(
2408 graph,
2409 &config,
2410 Box::new(FirstRouter),
2411 vec![make_def("worker")],
2412 )
2413 .unwrap();
2414
2415 let actions = scheduler.tick();
2416 let spawn_count = actions
2417 .iter()
2418 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2419 .count();
2420 assert_eq!(
2421 spawn_count, 2,
2422 "only max_parallel=2 tasks dispatched per tick"
2423 );
2424
2425 let running_count = scheduler
2426 .graph
2427 .tasks
2428 .iter()
2429 .filter(|t| t.status == TaskStatus::Running)
2430 .count();
2431 assert_eq!(running_count, 2, "only 2 tasks marked Running");
2432 }
2433
2434 #[test]
2435 fn test_batch_backoff_partial_success() {
2436 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2438 let mut scheduler = make_scheduler(graph);
2439 scheduler.consecutive_spawn_failures = 3;
2440
2441 scheduler.record_batch_backoff(true, true);
2442 assert_eq!(
2443 scheduler.consecutive_spawn_failures, 0,
2444 "any success in batch must reset counter"
2445 );
2446 }
2447
2448 #[test]
2449 fn test_batch_backoff_all_failed() {
2450 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2452 let mut scheduler = make_scheduler(graph);
2453 scheduler.consecutive_spawn_failures = 2;
2454
2455 scheduler.record_batch_backoff(false, true);
2456 assert_eq!(
2457 scheduler.consecutive_spawn_failures, 3,
2458 "all-failure tick must increment counter"
2459 );
2460 }
2461
2462 #[test]
2463 fn test_batch_backoff_no_spawns() {
2464 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2466 let mut scheduler = make_scheduler(graph);
2467 scheduler.consecutive_spawn_failures = 5;
2468
2469 scheduler.record_batch_backoff(false, false);
2470 assert_eq!(
2471 scheduler.consecutive_spawn_failures, 5,
2472 "no spawns must not change counter"
2473 );
2474 }
2475
2476 #[test]
2477 fn test_buffer_guard_uses_task_count() {
2478 let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2488 let graph = graph_from_nodes(nodes);
2489 let config = zeph_config::OrchestrationConfig {
2490 max_parallel: 2, ..make_config()
2492 };
2493 let scheduler = DagScheduler::new(
2494 graph,
2495 &config,
2496 Box::new(FirstRouter),
2497 vec![make_def("worker")],
2498 )
2499 .unwrap();
2500 assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2502 assert_eq!(scheduler.max_parallel * 2, 4);
2503 }
2504
2505 #[test]
2506 fn test_batch_mixed_concurrency_and_fatal_failure() {
2507 let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2515 nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2517 let graph = graph_from_nodes(nodes);
2518 let mut scheduler = make_scheduler(graph);
2519
2520 scheduler.graph.tasks[0].status = TaskStatus::Running;
2522 scheduler.graph.tasks[1].status = TaskStatus::Running;
2523
2524 let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2526 let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2527 assert!(
2528 actions0.is_empty(),
2529 "ConcurrencyLimit must produce no extra actions"
2530 );
2531 assert_eq!(
2532 scheduler.graph.tasks[0].status,
2533 TaskStatus::Ready,
2534 "task 0 must revert to Ready"
2535 );
2536
2537 let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2540 let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2541 assert_eq!(
2542 scheduler.graph.tasks[1].status,
2543 TaskStatus::Skipped,
2544 "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2545 );
2546 assert!(
2548 actions1
2549 .iter()
2550 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2551 "no Done action expected: task 0 is still Ready"
2552 );
2553
2554 scheduler.consecutive_spawn_failures = 0;
2556 scheduler.record_batch_backoff(false, true);
2557 assert_eq!(
2558 scheduler.consecutive_spawn_failures, 1,
2559 "batch with only ConcurrencyLimit must increment counter"
2560 );
2561 }
2562
2563 #[test]
2567 fn test_deadlock_marks_non_terminal_tasks_canceled() {
2568 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
2573 nodes[0].status = TaskStatus::Failed;
2574 nodes[1].status = TaskStatus::Pending;
2575 nodes[2].status = TaskStatus::Pending;
2576
2577 let mut graph = graph_from_nodes(nodes);
2578 graph.status = GraphStatus::Failed;
2579
2580 let mut scheduler = DagScheduler::resume_from(
2581 graph,
2582 &make_config(),
2583 Box::new(FirstRouter),
2584 vec![make_def("worker")],
2585 )
2586 .unwrap();
2587
2588 let actions = scheduler.tick();
2590
2591 assert!(
2593 actions.iter().any(|a| matches!(
2594 a,
2595 SchedulerAction::Done {
2596 status: GraphStatus::Failed
2597 }
2598 )),
2599 "deadlock must emit Done(Failed); got: {actions:?}"
2600 );
2601 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
2602
2603 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
2605 assert_eq!(
2607 scheduler.graph.tasks[1].status,
2608 TaskStatus::Canceled,
2609 "Pending task must be Canceled on deadlock"
2610 );
2611 assert_eq!(
2613 scheduler.graph.tasks[2].status,
2614 TaskStatus::Canceled,
2615 "Pending task must be Canceled on deadlock"
2616 );
2617 }
2618
2619 #[test]
2622 fn test_deadlock_not_triggered_when_task_running() {
2623 let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
2626 nodes[0].status = TaskStatus::Running;
2627 nodes[0].assigned_agent = Some("handle-1".into());
2628 nodes[1].status = TaskStatus::Pending;
2629
2630 let mut graph = graph_from_nodes(nodes);
2631 graph.status = GraphStatus::Failed;
2632
2633 let mut scheduler = DagScheduler::resume_from(
2634 graph,
2635 &make_config(),
2636 Box::new(FirstRouter),
2637 vec![make_def("worker")],
2638 )
2639 .unwrap();
2640
2641 let actions = scheduler.tick();
2642
2643 assert!(
2645 actions
2646 .iter()
2647 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2648 "no Done action expected when a task is running; got: {actions:?}"
2649 );
2650 assert_eq!(scheduler.graph.status, GraphStatus::Running);
2651 }
2652
2653 #[test]
2656 fn topology_linear_chain_limits_parallelism_to_one() {
2657 let graph = graph_from_nodes(vec![
2660 make_node(0, &[]),
2661 make_node(1, &[0]),
2662 make_node(2, &[1]),
2663 ]);
2664 let config = zeph_config::OrchestrationConfig {
2665 topology_selection: true,
2666 max_parallel: 4,
2667 ..make_config()
2668 };
2669 let mut scheduler = DagScheduler::new(
2670 graph,
2671 &config,
2672 Box::new(FirstRouter),
2673 vec![make_def("worker")],
2674 )
2675 .unwrap();
2676
2677 assert_eq!(
2678 scheduler.topology().topology,
2679 crate::topology::Topology::LinearChain
2680 );
2681 assert_eq!(scheduler.max_parallel, 1);
2682
2683 let actions = scheduler.tick();
2684 let spawn_count = actions
2685 .iter()
2686 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2687 .count();
2688 assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
2689 }
2690
2691 #[test]
2692 fn topology_all_parallel_dispatches_all_ready() {
2693 let graph = graph_from_nodes(vec![
2696 make_node(0, &[]),
2697 make_node(1, &[]),
2698 make_node(2, &[]),
2699 make_node(3, &[]),
2700 ]);
2701 let config = zeph_config::OrchestrationConfig {
2702 topology_selection: true,
2703 max_parallel: 4,
2704 ..make_config()
2705 };
2706 let mut scheduler = DagScheduler::new(
2707 graph,
2708 &config,
2709 Box::new(FirstRouter),
2710 vec![make_def("worker")],
2711 )
2712 .unwrap();
2713
2714 assert_eq!(
2715 scheduler.topology().topology,
2716 crate::topology::Topology::AllParallel
2717 );
2718
2719 let actions = scheduler.tick();
2720 let spawn_count = actions
2721 .iter()
2722 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2723 .count();
2724 assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
2725 }
2726
2727 #[test]
2728 fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
2729 use crate::graph::ExecutionMode;
2732
2733 let mut a = make_node(0, &[]);
2734 a.execution_mode = ExecutionMode::Sequential;
2735 let mut b = make_node(1, &[]);
2736 b.execution_mode = ExecutionMode::Sequential;
2737 let mut c = make_node(2, &[]);
2738 c.execution_mode = ExecutionMode::Parallel;
2739
2740 let graph = graph_from_nodes(vec![a, b, c]);
2741 let config = zeph_config::OrchestrationConfig {
2742 max_parallel: 4,
2743 ..make_config()
2744 };
2745 let mut scheduler = DagScheduler::new(
2746 graph,
2747 &config,
2748 Box::new(FirstRouter),
2749 vec![make_def("worker")],
2750 )
2751 .unwrap();
2752
2753 let actions = scheduler.tick();
2754 let spawned: Vec<TaskId> = actions
2755 .iter()
2756 .filter_map(|a| {
2757 if let SchedulerAction::Spawn { task_id, .. } = a {
2758 Some(*task_id)
2759 } else {
2760 None
2761 }
2762 })
2763 .collect();
2764
2765 assert!(
2767 spawned.contains(&TaskId(0)),
2768 "A(sequential) must be dispatched"
2769 );
2770 assert!(
2771 spawned.contains(&TaskId(2)),
2772 "C(parallel) must be dispatched"
2773 );
2774 assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
2775 assert_eq!(spawned.len(), 2);
2776 }
2777
2778 #[test]
2781 fn test_inject_tasks_per_task_cap_skips_second() {
2782 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2784 let mut scheduler = make_scheduler(graph);
2785
2786 let first = make_node(2, &[]);
2787 scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
2788 assert_eq!(
2789 scheduler.graph.tasks.len(),
2790 3,
2791 "first inject must append the task"
2792 );
2793 assert_eq!(scheduler.global_replan_count, 1);
2794
2795 let second = make_node(3, &[]);
2797 scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
2798 assert_eq!(
2799 scheduler.graph.tasks.len(),
2800 3,
2801 "second inject must be silently skipped (per-task cap)"
2802 );
2803 assert_eq!(
2804 scheduler.global_replan_count, 1,
2805 "global counter must not increment on skipped inject"
2806 );
2807 }
2808
2809 #[test]
2810 fn test_inject_tasks_global_cap_skips_when_exhausted() {
2811 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
2813 let mut config = make_config();
2814 config.max_replans = 1;
2815 let defs = vec![make_def("worker")];
2816 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2817
2818 let new1 = make_node(2, &[]);
2819 scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
2820 assert_eq!(scheduler.global_replan_count, 1);
2821
2822 let new2 = make_node(3, &[]);
2824 scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
2825 assert_eq!(
2826 scheduler.graph.tasks.len(),
2827 3,
2828 "global cap must prevent the second inject"
2829 );
2830 assert_eq!(
2831 scheduler.global_replan_count, 1,
2832 "global counter must not increment past cap"
2833 );
2834 }
2835
2836 #[test]
2837 fn test_inject_tasks_sets_topology_dirty() {
2838 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2840 let mut scheduler = make_scheduler(graph);
2841 assert!(
2842 !scheduler.topology_dirty,
2843 "topology_dirty must be false initially"
2844 );
2845
2846 let new_task = make_node(1, &[]);
2847 scheduler
2848 .inject_tasks(TaskId(0), vec![new_task], 20)
2849 .unwrap();
2850 assert!(
2851 scheduler.topology_dirty,
2852 "inject_tasks must set topology_dirty=true"
2853 );
2854
2855 scheduler.tick();
2856 assert!(
2857 !scheduler.topology_dirty,
2858 "tick() must clear topology_dirty after re-analysis"
2859 );
2860 }
2861
2862 #[test]
2863 fn test_inject_tasks_rejects_cycle() {
2864 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2866 let mut scheduler = make_scheduler(graph);
2867
2868 let cyclic_task = make_node(1, &[1]);
2870 let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
2871 assert!(result.is_err(), "cyclic injection must return an error");
2872 assert!(
2873 matches!(
2874 result.unwrap_err(),
2875 OrchestrationError::VerificationFailed(_)
2876 ),
2877 "must return VerificationFailed for cycle"
2878 );
2879 assert_eq!(scheduler.global_replan_count, 0);
2881 assert_eq!(
2882 scheduler.topology_dirty, false,
2883 "topology_dirty must not be set when inject fails"
2884 );
2885 }
2886
2887 fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
2890 zeph_config::OrchestrationConfig {
2891 topology_selection: true,
2892 max_parallel: 4,
2893 ..make_config()
2894 }
2895 }
2896
2897 fn make_hierarchical_graph() -> TaskGraph {
2899 graph_from_nodes(vec![
2900 make_node(0, &[]),
2901 make_node(1, &[0]),
2902 make_node(2, &[0]),
2903 make_node(3, &[1]),
2904 ])
2905 }
2906
2907 #[test]
2908 fn test_level_barrier_advances_on_terminal_level() {
2909 let graph = make_hierarchical_graph();
2912 let config = make_hierarchical_config();
2913 let defs = vec![make_def("worker")];
2914 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2915
2916 assert_eq!(
2917 scheduler.topology().strategy,
2918 crate::topology::DispatchStrategy::LevelBarrier,
2919 "must use LevelBarrier strategy for Hierarchical graph"
2920 );
2921 assert_eq!(scheduler.current_level, 0);
2922
2923 let actions = scheduler.tick();
2925 let spawned_ids: Vec<_> = actions
2926 .iter()
2927 .filter_map(|a| {
2928 if let SchedulerAction::Spawn { task_id, .. } = a {
2929 Some(*task_id)
2930 } else {
2931 None
2932 }
2933 })
2934 .collect();
2935 assert_eq!(
2936 spawned_ids,
2937 vec![TaskId(0)],
2938 "first tick must dispatch only A at level 0"
2939 );
2940
2941 scheduler.graph.tasks[0].status = TaskStatus::Completed;
2943 scheduler.running.clear();
2944 scheduler.graph.tasks[1].status = TaskStatus::Ready;
2945 scheduler.graph.tasks[2].status = TaskStatus::Ready;
2946
2947 let actions2 = scheduler.tick();
2949 assert_eq!(
2950 scheduler.current_level, 1,
2951 "current_level must advance to 1 after level-0 tasks terminate"
2952 );
2953 let spawned2: Vec<_> = actions2
2954 .iter()
2955 .filter_map(|a| {
2956 if let SchedulerAction::Spawn { task_id, .. } = a {
2957 Some(*task_id)
2958 } else {
2959 None
2960 }
2961 })
2962 .collect();
2963 assert!(
2964 spawned2.contains(&TaskId(1)),
2965 "B must be dispatched after level advance"
2966 );
2967 assert!(
2968 spawned2.contains(&TaskId(2)),
2969 "C must be dispatched after level advance"
2970 );
2971 }
2972
2973 #[test]
2974 fn test_level_barrier_failure_propagates_transitively() {
2975 let graph = make_hierarchical_graph();
2978 let config = make_hierarchical_config();
2979 let defs = vec![make_def("worker")];
2980 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
2981
2982 scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
2984 scheduler.graph.tasks[0].status = TaskStatus::Running;
2985 scheduler.running.insert(
2986 TaskId(0),
2987 RunningTask {
2988 agent_handle_id: "h0".to_string(),
2989 agent_def_name: "worker".to_string(),
2990 started_at: Instant::now(),
2991 },
2992 );
2993
2994 scheduler.buffered_events.push_back(TaskEvent {
2996 task_id: TaskId(0),
2997 agent_handle_id: "h0".to_string(),
2998 outcome: TaskOutcome::Failed {
2999 error: "simulated failure".to_string(),
3000 },
3001 });
3002
3003 scheduler.tick();
3004
3005 assert_eq!(
3007 scheduler.graph.tasks[0].status,
3008 TaskStatus::Skipped,
3009 "A must be Skipped (Skip strategy)"
3010 );
3011 assert_eq!(
3012 scheduler.graph.tasks[1].status,
3013 TaskStatus::Skipped,
3014 "B must be transitively Skipped"
3015 );
3016 assert_eq!(
3017 scheduler.graph.tasks[2].status,
3018 TaskStatus::Skipped,
3019 "C must be transitively Skipped"
3020 );
3021 assert_eq!(
3022 scheduler.graph.tasks[3].status,
3023 TaskStatus::Skipped,
3024 "D must be transitively Skipped"
3025 );
3026 }
3027
3028 #[test]
3029 fn test_level_barrier_current_level_reset_after_inject() {
3030 let graph = make_hierarchical_graph(); let config = make_hierarchical_config();
3034 let defs = vec![make_def("worker")];
3035 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
3036
3037 scheduler.graph.tasks[0].status = TaskStatus::Completed; scheduler.graph.tasks[1].status = TaskStatus::Completed; scheduler.graph.tasks[2].status = TaskStatus::Completed; scheduler.current_level = 2;
3043
3044 let e = make_node(4, &[0]);
3047 scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
3048 assert!(scheduler.topology_dirty);
3049
3050 scheduler.tick();
3053 assert_eq!(
3054 scheduler.current_level, 1,
3055 "current_level must reset to min non-terminal depth (1) after inject at depth 1"
3056 );
3057 }
3058
3059 #[test]
3060 fn resume_from_preserves_topology_classification() {
3061 let mut graph = graph_from_nodes(vec![
3063 make_node(0, &[]),
3064 make_node(1, &[0]),
3065 make_node(2, &[1]),
3066 ]);
3067 graph.status = GraphStatus::Paused;
3069 graph.tasks[0].status = TaskStatus::Completed;
3070 graph.tasks[1].status = TaskStatus::Pending;
3071 graph.tasks[2].status = TaskStatus::Pending;
3072
3073 let config = zeph_config::OrchestrationConfig {
3074 topology_selection: true,
3075 max_parallel: 4,
3076 ..make_config()
3077 };
3078 let scheduler = DagScheduler::resume_from(
3079 graph,
3080 &config,
3081 Box::new(FirstRouter),
3082 vec![make_def("worker")],
3083 )
3084 .unwrap();
3085
3086 assert_eq!(
3087 scheduler.topology().topology,
3088 crate::topology::Topology::LinearChain,
3089 "resume_from must classify topology"
3090 );
3091 assert_eq!(
3092 scheduler.max_parallel, 1,
3093 "resume_from must apply topology limit"
3094 );
3095 }
3096
3097 fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
3100 zeph_config::OrchestrationConfig {
3101 verify_completeness: true,
3102 verify_provider: provider.to_string(),
3103 ..make_config()
3104 }
3105 }
3106
3107 #[test]
3108 fn validate_verify_config_unknown_provider_returns_err() {
3109 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3110 let config = make_verify_config("nonexistent");
3111 let scheduler = DagScheduler::new(
3112 graph,
3113 &config,
3114 Box::new(FirstRouter),
3115 vec![make_def("worker")],
3116 )
3117 .unwrap();
3118 let result = scheduler.validate_verify_config(&["fast", "quality"]);
3119 assert!(result.is_err());
3120 let err_msg = result.unwrap_err().to_string();
3121 assert!(err_msg.contains("nonexistent"));
3122 assert!(err_msg.contains("fast"));
3123 }
3124
3125 #[test]
3126 fn validate_verify_config_known_provider_returns_ok() {
3127 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3128 let config = make_verify_config("fast");
3129 let scheduler = DagScheduler::new(
3130 graph,
3131 &config,
3132 Box::new(FirstRouter),
3133 vec![make_def("worker")],
3134 )
3135 .unwrap();
3136 assert!(
3137 scheduler
3138 .validate_verify_config(&["fast", "quality"])
3139 .is_ok()
3140 );
3141 }
3142
3143 #[test]
3144 fn validate_verify_config_empty_provider_always_ok() {
3145 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3146 let config = make_verify_config("");
3147 let scheduler = DagScheduler::new(
3148 graph,
3149 &config,
3150 Box::new(FirstRouter),
3151 vec![make_def("worker")],
3152 )
3153 .unwrap();
3154 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3155 }
3156
3157 #[test]
3158 fn validate_verify_config_disabled_skips_validation() {
3159 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3160 let scheduler = make_scheduler(graph);
3162 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3163 }
3164
3165 #[test]
3166 fn validate_verify_config_empty_pool_skips_validation() {
3167 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3168 let config = make_verify_config("nonexistent");
3169 let scheduler = DagScheduler::new(
3170 graph,
3171 &config,
3172 Box::new(FirstRouter),
3173 vec![make_def("worker")],
3174 )
3175 .unwrap();
3176 assert!(scheduler.validate_verify_config(&[]).is_ok());
3178 }
3179
3180 #[test]
3181 fn validate_verify_config_trims_whitespace_in_config() {
3182 let graph = graph_from_nodes(vec![make_node(0, &[])]);
3183 let config = make_verify_config(" fast ");
3185 let scheduler = DagScheduler::new(
3186 graph,
3187 &config,
3188 Box::new(FirstRouter),
3189 vec![make_def("worker")],
3190 )
3191 .unwrap();
3192 assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
3193 }
3194
3195 #[test]
3198 fn config_max_parallel_initialized_from_config() {
3199 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
3202 let config = zeph_config::OrchestrationConfig {
3203 topology_selection: true,
3204 max_parallel: 6,
3205 ..make_config()
3206 };
3207 let scheduler = DagScheduler::new(
3208 graph,
3209 &config,
3210 Box::new(FirstRouter),
3211 vec![make_def("worker")],
3212 )
3213 .unwrap();
3214
3215 assert_eq!(
3216 scheduler.config_max_parallel, 6,
3217 "config_max_parallel must equal config.max_parallel"
3218 );
3219 assert_eq!(
3221 scheduler.max_parallel, 1,
3222 "max_parallel reduced by topology analysis"
3223 );
3224 assert_eq!(
3225 scheduler.config_max_parallel, 6,
3226 "config_max_parallel must not be reduced by topology"
3227 );
3228 }
3229
3230 #[test]
3231 fn max_parallel_does_not_drift_across_inject_tick_cycles() {
3232 let graph = graph_from_nodes(vec![
3244 make_node(0, &[]),
3245 make_node(1, &[0]),
3246 make_node(2, &[0]),
3247 make_node(3, &[1, 2]), ]);
3249 let config = zeph_config::OrchestrationConfig {
3250 topology_selection: true,
3251 max_parallel: 4,
3252 max_tasks: 50,
3253 ..make_config()
3254 };
3255 let mut scheduler = DagScheduler::new(
3256 graph,
3257 &config,
3258 Box::new(FirstRouter),
3259 vec![make_def("worker")],
3260 )
3261 .unwrap();
3262
3263 assert_eq!(
3265 scheduler.topology().topology,
3266 crate::topology::Topology::Mixed,
3267 "initial topology must be Mixed"
3268 );
3269 let expected_max_parallel = (4usize / 2 + 1).min(4).max(1); assert_eq!(scheduler.max_parallel, expected_max_parallel);
3271
3272 let extra_task_id = 4u32;
3275 let extra_task = {
3276 let mut n = crate::graph::TaskNode::new(
3277 extra_task_id,
3278 "extra".to_string(),
3279 "extra task injected by replan",
3280 );
3281 n.depends_on = vec![TaskId(3)];
3282 n
3283 };
3284
3285 scheduler.graph.tasks[3].status = TaskStatus::Completed;
3287
3288 scheduler
3289 .inject_tasks(TaskId(3), vec![extra_task], 50)
3290 .expect("inject must succeed");
3291 assert!(
3292 scheduler.topology_dirty,
3293 "topology_dirty must be true after inject"
3294 );
3295
3296 let _ = scheduler.tick();
3298 let max_after_first_inject = scheduler.max_parallel;
3299 assert_eq!(
3300 max_after_first_inject, expected_max_parallel,
3301 "max_parallel must not drift after first inject+tick"
3302 );
3303
3304 let extra_task2 = {
3306 let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
3307 n.depends_on = vec![TaskId(extra_task_id)];
3308 n
3309 };
3310 scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
3311 scheduler
3314 .inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
3315 .expect("second inject must succeed");
3316
3317 let _ = scheduler.tick();
3318 let max_after_second_inject = scheduler.max_parallel;
3319 assert_eq!(
3320 max_after_second_inject, expected_max_parallel,
3321 "max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
3322 );
3323 }
3324}