1use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus};
16use super::router::AgentRouter;
17use crate::config::OrchestrationConfig;
18use crate::sanitizer::{
19 ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind,
20};
21use crate::subagent::SubAgentDef;
22use crate::subagent::error::SubAgentError;
23
24#[derive(Debug)]
29pub enum SchedulerAction {
30 Spawn {
32 task_id: TaskId,
33 agent_def_name: String,
34 prompt: String,
35 },
36 Cancel { agent_handle_id: String },
38 RunInline { task_id: TaskId, prompt: String },
40 Done { status: GraphStatus },
42}
43
44#[derive(Debug)]
46pub struct TaskEvent {
47 pub task_id: TaskId,
48 pub agent_handle_id: String,
49 pub outcome: TaskOutcome,
50}
51
52#[derive(Debug)]
54pub enum TaskOutcome {
55 Completed {
57 output: String,
58 artifacts: Vec<PathBuf>,
59 },
60 Failed { error: String },
62}
63
64struct RunningTask {
66 agent_handle_id: String,
67 agent_def_name: String,
68 started_at: Instant,
69}
70
71pub struct DagScheduler {
97 graph: TaskGraph,
98 max_parallel: usize,
99 running: HashMap<TaskId, RunningTask>,
101 event_rx: mpsc::Receiver<TaskEvent>,
103 event_tx: mpsc::Sender<TaskEvent>,
105 task_timeout: Duration,
107 router: Box<dyn AgentRouter>,
109 available_agents: Vec<SubAgentDef>,
111 dependency_context_budget: usize,
113 buffered_events: VecDeque<TaskEvent>,
115 sanitizer: ContentSanitizer,
117 deferral_backoff: Duration,
119 consecutive_spawn_failures: u32,
121}
122
123impl std::fmt::Debug for DagScheduler {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("DagScheduler")
126 .field("graph_id", &self.graph.id)
127 .field("graph_status", &self.graph.status)
128 .field("running_count", &self.running.len())
129 .field("max_parallel", &self.max_parallel)
130 .field("task_timeout_secs", &self.task_timeout.as_secs())
131 .finish_non_exhaustive()
132 }
133}
134
135impl DagScheduler {
136 pub fn new(
146 mut graph: TaskGraph,
147 config: &OrchestrationConfig,
148 router: Box<dyn AgentRouter>,
149 available_agents: Vec<SubAgentDef>,
150 ) -> Result<Self, OrchestrationError> {
151 if graph.status != GraphStatus::Created {
152 return Err(OrchestrationError::InvalidGraph(format!(
153 "graph must be in Created status, got {}",
154 graph.status
155 )));
156 }
157
158 dag::validate(&graph.tasks, config.max_tasks as usize)?;
159
160 graph.status = GraphStatus::Running;
161
162 for task in &mut graph.tasks {
163 if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
164 task.status = TaskStatus::Ready;
165 }
166 }
167
168 let (event_tx, event_rx) = mpsc::channel(64);
169
170 let task_timeout = if config.task_timeout_secs > 0 {
171 Duration::from_secs(config.task_timeout_secs)
172 } else {
173 Duration::from_secs(600)
174 };
175
176 Ok(Self {
177 graph,
178 max_parallel: config.max_parallel as usize,
179 running: HashMap::new(),
180 event_rx,
181 event_tx,
182 task_timeout,
183 router,
184 available_agents,
185 dependency_context_budget: config.dependency_context_budget,
186 buffered_events: VecDeque::new(),
187 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
188 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
189 consecutive_spawn_failures: 0,
190 })
191 }
192
193 pub fn resume_from(
207 mut graph: TaskGraph,
208 config: &OrchestrationConfig,
209 router: Box<dyn AgentRouter>,
210 available_agents: Vec<SubAgentDef>,
211 ) -> Result<Self, OrchestrationError> {
212 if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
213 return Err(OrchestrationError::InvalidGraph(format!(
214 "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
215 graph.status
216 )));
217 }
218
219 graph.status = GraphStatus::Running;
222
223 let running: HashMap<TaskId, RunningTask> = graph
228 .tasks
229 .iter()
230 .filter(|t| t.status == TaskStatus::Running)
231 .filter_map(|t| {
232 let handle_id = t.assigned_agent.clone()?;
233 let def_name = t.agent_hint.clone().unwrap_or_default();
234 Some((
235 t.id,
236 RunningTask {
237 agent_handle_id: handle_id,
238 agent_def_name: def_name,
239 started_at: Instant::now(),
241 },
242 ))
243 })
244 .collect();
245
246 let (event_tx, event_rx) = mpsc::channel(64);
247
248 let task_timeout = if config.task_timeout_secs > 0 {
249 Duration::from_secs(config.task_timeout_secs)
250 } else {
251 Duration::from_secs(600)
252 };
253
254 Ok(Self {
255 graph,
256 max_parallel: config.max_parallel as usize,
257 running,
258 event_rx,
259 event_tx,
260 task_timeout,
261 router,
262 available_agents,
263 dependency_context_budget: config.dependency_context_budget,
264 buffered_events: VecDeque::new(),
265 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
266 deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
267 consecutive_spawn_failures: 0,
268 })
269 }
270
271 #[must_use]
273 pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
274 self.event_tx.clone()
275 }
276
277 #[must_use]
279 pub fn graph(&self) -> &TaskGraph {
280 &self.graph
281 }
282
283 #[must_use]
287 pub fn into_graph(&self) -> TaskGraph {
288 self.graph.clone()
289 }
290}
291
292impl Drop for DagScheduler {
293 fn drop(&mut self) {
294 if !self.running.is_empty() {
295 tracing::warn!(
296 running_tasks = self.running.len(),
297 "DagScheduler dropped with running tasks; agents may continue until their \
298 CancellationToken fires or they complete naturally"
299 );
300 }
301 }
302}
303
304impl DagScheduler {
305 pub fn tick(&mut self) -> Vec<SchedulerAction> {
309 if self.graph.status != GraphStatus::Running {
310 return vec![SchedulerAction::Done {
311 status: self.graph.status,
312 }];
313 }
314
315 let mut actions = Vec::new();
316
317 while let Some(event) = self.buffered_events.pop_front() {
319 let cancel_actions = self.process_event(event);
320 actions.extend(cancel_actions);
321 }
322 while let Ok(event) = self.event_rx.try_recv() {
323 let cancel_actions = self.process_event(event);
324 actions.extend(cancel_actions);
325 }
326
327 if self.graph.status != GraphStatus::Running {
328 return actions;
329 }
330
331 let timeout_actions = self.check_timeouts();
333 actions.extend(timeout_actions);
334
335 if self.graph.status != GraphStatus::Running {
336 return actions;
337 }
338
339 let ready = dag::ready_tasks(&self.graph);
344
345 for task_id in ready {
346 let task = &self.graph.tasks[task_id.index()];
347
348 let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
349 tracing::debug!(
350 task_id = %task_id,
351 title = %task.title,
352 "no agent available, routing task to main agent inline"
353 );
354 let prompt = self.build_task_prompt(task);
355 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
356 actions.push(SchedulerAction::RunInline { task_id, prompt });
357 continue;
358 };
359
360 let prompt = self.build_task_prompt(task);
361
362 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
364
365 actions.push(SchedulerAction::Spawn {
366 task_id,
367 agent_def_name,
368 prompt,
369 });
370 }
371
372 let running_in_graph_now = self
383 .graph
384 .tasks
385 .iter()
386 .filter(|t| t.status == TaskStatus::Running)
387 .count();
388 if running_in_graph_now == 0 && self.running.is_empty() {
389 let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
390 if all_terminal {
391 self.graph.status = GraphStatus::Completed;
392 self.graph.finished_at = Some(super::graph::chrono_now());
393 actions.push(SchedulerAction::Done {
394 status: GraphStatus::Completed,
395 });
396 } else if dag::ready_tasks(&self.graph).is_empty() {
397 tracing::error!(
398 "scheduler deadlock: no running or ready tasks, but graph not complete"
399 );
400 self.graph.status = GraphStatus::Failed;
401 self.graph.finished_at = Some(super::graph::chrono_now());
402 actions.push(SchedulerAction::Done {
403 status: GraphStatus::Failed,
404 });
405 }
406 }
407
408 actions
409 }
410
411 fn current_deferral_backoff(&self) -> Duration {
420 const MAX_BACKOFF: Duration = Duration::from_secs(5);
421 let multiplier = 1u32
422 .checked_shl(self.consecutive_spawn_failures.min(10))
423 .unwrap_or(u32::MAX);
424 self.deferral_backoff
425 .saturating_mul(multiplier)
426 .min(MAX_BACKOFF)
427 }
428
429 pub async fn wait_event(&mut self) {
430 if self.running.is_empty() {
431 tokio::time::sleep(self.current_deferral_backoff()).await;
432 return;
433 }
434
435 let nearest_timeout = self
437 .running
438 .values()
439 .map(|r| {
440 self.task_timeout
441 .checked_sub(r.started_at.elapsed())
442 .unwrap_or(Duration::ZERO)
443 })
444 .min()
445 .unwrap_or(Duration::from_secs(1));
446
447 let wait_duration = nearest_timeout.max(Duration::from_millis(100));
449
450 tokio::select! {
451 Some(event) = self.event_rx.recv() => {
452 if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
456 if let Some(dropped) = self.buffered_events.pop_front() {
459 tracing::error!(
460 task_id = %dropped.task_id,
461 buffer_len = self.buffered_events.len(),
462 "event buffer saturated; completion event dropped — task may \
463 remain Running until timeout"
464 );
465 }
466 }
467 self.buffered_events.push_back(event);
468 }
469 () = tokio::time::sleep(wait_duration) => {}
470 }
471 }
472
473 pub fn record_spawn(
483 &mut self,
484 task_id: TaskId,
485 agent_handle_id: String,
486 agent_def_name: String,
487 ) {
488 self.consecutive_spawn_failures = 0;
489 self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
490 self.running.insert(
491 task_id,
492 RunningTask {
493 agent_handle_id,
494 agent_def_name,
495 started_at: Instant::now(),
496 },
497 );
498 }
499
500 pub fn record_spawn_failure(
511 &mut self,
512 task_id: TaskId,
513 error: &SubAgentError,
514 ) -> Vec<SchedulerAction> {
515 if let SubAgentError::ConcurrencyLimit { active, max } = error {
519 tracing::warn!(
520 task_id = %task_id,
521 active,
522 max,
523 next_backoff_ms = self.current_deferral_backoff().as_millis(),
524 "concurrency limit reached, deferring task to next tick"
525 );
526 self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
527 return Vec::new();
528 }
529
530 let error_excerpt: String = error.to_string().chars().take(512).collect();
532 tracing::warn!(
533 task_id = %task_id,
534 error = %error_excerpt,
535 "spawn failed, marking task failed"
536 );
537 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
538 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
539 let mut actions = Vec::new();
540 for cancel_task_id in cancel_ids {
541 if let Some(running) = self.running.remove(&cancel_task_id) {
542 actions.push(SchedulerAction::Cancel {
543 agent_handle_id: running.agent_handle_id,
544 });
545 }
546 }
547 if self.graph.status != GraphStatus::Running {
548 self.graph.finished_at = Some(super::graph::chrono_now());
549 actions.push(SchedulerAction::Done {
550 status: self.graph.status,
551 });
552 }
553 actions
554 }
555
556 pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
565 if any_success {
566 self.consecutive_spawn_failures = 0;
567 } else if any_concurrency_failure {
568 self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
569 }
570 }
571
572 pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
581 self.graph.status = GraphStatus::Canceled;
582 self.graph.finished_at = Some(super::graph::chrono_now());
583
584 let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
586 let mut actions: Vec<SchedulerAction> = running
587 .into_iter()
588 .map(|(task_id, r)| {
589 self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
590 SchedulerAction::Cancel {
591 agent_handle_id: r.agent_handle_id,
592 }
593 })
594 .collect();
595
596 for task in &mut self.graph.tasks {
597 if !task.status.is_terminal() {
598 task.status = TaskStatus::Canceled;
599 }
600 }
601
602 actions.push(SchedulerAction::Done {
603 status: GraphStatus::Canceled,
604 });
605 actions
606 }
607}
608
609impl DagScheduler {
610 fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
612 let TaskEvent {
613 task_id,
614 agent_handle_id,
615 outcome,
616 } = event;
617
618 match self.running.get(&task_id) {
621 Some(running) if running.agent_handle_id != agent_handle_id => {
622 tracing::warn!(
623 task_id = %task_id,
624 expected = %running.agent_handle_id,
625 got = %agent_handle_id,
626 "discarding stale event from previous agent incarnation"
627 );
628 return Vec::new();
629 }
630 None => {
631 tracing::debug!(
632 task_id = %task_id,
633 agent_handle_id = %agent_handle_id,
634 "ignoring event for task not in running map"
635 );
636 return Vec::new();
637 }
638 Some(_) => {}
639 }
640
641 let duration_ms = self.running.get(&task_id).map_or(0, |r| {
643 u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
644 });
645 let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
646
647 self.running.remove(&task_id);
648
649 match outcome {
650 TaskOutcome::Completed { output, artifacts } => {
651 self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
652 self.graph.tasks[task_id.index()].result = Some(TaskResult {
653 output,
654 artifacts,
655 duration_ms,
656 agent_id: Some(agent_handle_id),
657 agent_def: agent_def_name,
658 });
659
660 let newly_ready = dag::ready_tasks(&self.graph);
662 for ready_id in newly_ready {
663 if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
664 self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
665 }
666 }
667
668 Vec::new()
669 }
670
671 TaskOutcome::Failed { error } => {
672 let error_excerpt: String = error.chars().take(512).collect();
674 tracing::warn!(
675 task_id = %task_id,
676 error = %error_excerpt,
677 "task failed"
678 );
679 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
680
681 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
682 let mut actions = Vec::new();
683
684 for cancel_task_id in cancel_ids {
685 if let Some(running) = self.running.remove(&cancel_task_id) {
686 actions.push(SchedulerAction::Cancel {
687 agent_handle_id: running.agent_handle_id,
688 });
689 }
690 }
691
692 if self.graph.status != GraphStatus::Running {
693 self.graph.finished_at = Some(super::graph::chrono_now());
694 actions.push(SchedulerAction::Done {
695 status: self.graph.status,
696 });
697 }
698
699 actions
700 }
701 }
702 }
703
704 fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
712 let timed_out: Vec<(TaskId, String)> = self
713 .running
714 .iter()
715 .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
716 .map(|(id, r)| (*id, r.agent_handle_id.clone()))
717 .collect();
718
719 let mut actions = Vec::new();
720 for (task_id, agent_handle_id) in timed_out {
721 tracing::warn!(
722 task_id = %task_id,
723 timeout_secs = self.task_timeout.as_secs(),
724 "task timed out"
725 );
726 self.running.remove(&task_id);
727 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
728
729 actions.push(SchedulerAction::Cancel { agent_handle_id });
730
731 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
732 for cancel_task_id in cancel_ids {
733 if let Some(running) = self.running.remove(&cancel_task_id) {
734 actions.push(SchedulerAction::Cancel {
735 agent_handle_id: running.agent_handle_id,
736 });
737 }
738 }
739
740 if self.graph.status != GraphStatus::Running {
741 self.graph.finished_at = Some(super::graph::chrono_now());
742 actions.push(SchedulerAction::Done {
743 status: self.graph.status,
744 });
745 break;
746 }
747 }
748
749 actions
750 }
751
752 fn build_task_prompt(&self, task: &TaskNode) -> String {
758 if task.depends_on.is_empty() {
759 return task.description.clone();
760 }
761
762 let completed_deps: Vec<&TaskNode> = task
763 .depends_on
764 .iter()
765 .filter_map(|dep_id| {
766 let dep = &self.graph.tasks[dep_id.index()];
767 if dep.status == TaskStatus::Completed {
768 Some(dep)
769 } else {
770 None
771 }
772 })
773 .collect();
774
775 if completed_deps.is_empty() {
776 return task.description.clone();
777 }
778
779 let budget_per_dep = self
780 .dependency_context_budget
781 .checked_div(completed_deps.len())
782 .unwrap_or(self.dependency_context_budget);
783
784 let mut context_block = String::from("<completed-dependencies>\n");
785
786 for dep in &completed_deps {
787 let escaped_id = xml_escape(&dep.id.to_string());
790 let escaped_title = xml_escape(&dep.title);
791 let _ = writeln!(
792 context_block,
793 "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
794 );
795
796 if let Some(ref result) = dep.result {
797 let source = ContentSource::new(ContentSourceKind::A2aMessage);
799 let sanitized = self.sanitizer.sanitize(&result.output, source);
800 let safe_output = sanitized.body;
801
802 let char_count = safe_output.chars().count();
804 if char_count > budget_per_dep {
805 let truncated: String = safe_output.chars().take(budget_per_dep).collect();
806 let _ = write!(
807 context_block,
808 "{truncated}...\n[truncated: {char_count} chars total]"
809 );
810 } else {
811 context_block.push_str(&safe_output);
812 }
813 } else {
814 context_block.push_str("[no output recorded]\n");
815 }
816 context_block.push('\n');
817 }
818
819 for dep_id in &task.depends_on {
821 let dep = &self.graph.tasks[dep_id.index()];
822 if dep.status == TaskStatus::Skipped {
823 let escaped_id = xml_escape(&dep.id.to_string());
824 let escaped_title = xml_escape(&dep.title);
825 let _ = writeln!(
826 context_block,
827 "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
828 );
829 }
830 }
831
832 context_block.push_str("</completed-dependencies>\n\n");
833 format!("{context_block}Your task: {}", task.description)
834 }
835}
836
837fn xml_escape(s: &str) -> String {
839 let mut out = String::with_capacity(s.len());
840 for c in s.chars() {
841 match c {
842 '<' => out.push_str("<"),
843 '>' => out.push_str(">"),
844 '&' => out.push_str("&"),
845 '"' => out.push_str("""),
846 '\'' => out.push_str("'"),
847 other => out.push(other),
848 }
849 }
850 out
851}
852
853#[cfg(test)]
854mod tests {
855 #![allow(clippy::default_trait_access)]
856
857 use super::*;
858 use crate::orchestration::graph::{
859 FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus,
860 };
861
862 fn make_node(id: u32, deps: &[u32]) -> TaskNode {
863 let mut n = TaskNode::new(
864 id,
865 format!("task-{id}"),
866 format!("description for task {id}"),
867 );
868 n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
869 n
870 }
871
872 fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
873 let mut g = TaskGraph::new("test goal");
874 g.tasks = nodes;
875 g
876 }
877
878 fn make_def(name: &str) -> SubAgentDef {
879 use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
880 SubAgentDef {
881 name: name.to_string(),
882 description: format!("{name} agent"),
883 model: None,
884 tools: ToolPolicy::InheritAll,
885 disallowed_tools: vec![],
886 permissions: SubAgentPermissions::default(),
887 skills: SkillFilter::default(),
888 system_prompt: String::new(),
889 hooks: crate::subagent::SubagentHooks::default(),
890 memory: None,
891 source: None,
892 file_path: None,
893 }
894 }
895
896 fn make_config() -> crate::config::OrchestrationConfig {
897 crate::config::OrchestrationConfig {
898 enabled: true,
899 max_tasks: 20,
900 max_parallel: 4,
901 default_failure_strategy: "abort".to_string(),
902 default_max_retries: 3,
903 task_timeout_secs: 300,
904 planner_model: None,
905 planner_max_tokens: 4096,
906 dependency_context_budget: 16384,
907 confirm_before_execute: true,
908 aggregator_max_tokens: 4096,
909 deferral_backoff_ms: 250,
910 }
911 }
912
913 struct FirstRouter;
914 impl AgentRouter for FirstRouter {
915 fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
916 available.first().map(|d| d.name.clone())
917 }
918 }
919
920 struct NoneRouter;
921 impl AgentRouter for NoneRouter {
922 fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
923 None
924 }
925 }
926
927 fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
928 let config = make_config();
929 let defs = vec![make_def("worker")];
930 DagScheduler::new(graph, &config, router, defs).unwrap()
931 }
932
933 fn make_scheduler(graph: TaskGraph) -> DagScheduler {
934 let config = make_config();
935 let defs = vec![make_def("worker")];
936 DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
937 }
938
939 #[test]
942 fn test_new_validates_graph_status() {
943 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
944 graph.status = GraphStatus::Running; let config = make_config();
946 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
947 assert!(result.is_err());
948 let err = result.unwrap_err();
949 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
950 }
951
952 #[test]
953 fn test_new_marks_roots_ready() {
954 let graph = graph_from_nodes(vec![
955 make_node(0, &[]),
956 make_node(1, &[]),
957 make_node(2, &[0, 1]),
958 ]);
959 let scheduler = make_scheduler(graph);
960 assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
961 assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
962 assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
963 assert_eq!(scheduler.graph().status, GraphStatus::Running);
964 }
965
966 #[test]
967 fn test_new_validates_empty_graph() {
968 let graph = graph_from_nodes(vec![]);
969 let config = make_config();
970 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
971 assert!(result.is_err());
972 }
973
974 #[test]
977 fn test_tick_produces_spawn_for_ready() {
978 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
979 let mut scheduler = make_scheduler(graph);
980 let actions = scheduler.tick();
981 let spawns: Vec<_> = actions
982 .iter()
983 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
984 .collect();
985 assert_eq!(spawns.len(), 2);
986 }
987
988 #[test]
989 fn test_tick_dispatches_all_regardless_of_max_parallel() {
990 let graph = graph_from_nodes(vec![
994 make_node(0, &[]),
995 make_node(1, &[]),
996 make_node(2, &[]),
997 make_node(3, &[]),
998 make_node(4, &[]),
999 ]);
1000 let mut config = make_config();
1001 config.max_parallel = 2;
1002 let defs = vec![make_def("worker")];
1003 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1004 let actions = scheduler.tick();
1005 let spawn_count = actions
1006 .iter()
1007 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1008 .count();
1009 assert_eq!(spawn_count, 5, "all 5 ready tasks must be dispatched");
1010 }
1011
1012 #[test]
1013 fn test_tick_detects_completion() {
1014 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1015 graph.tasks[0].status = TaskStatus::Completed;
1016 let config = make_config();
1017 let defs = vec![make_def("worker")];
1018 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1019 let actions = scheduler.tick();
1022 let has_done = actions.iter().any(|a| {
1023 matches!(
1024 a,
1025 SchedulerAction::Done {
1026 status: GraphStatus::Completed
1027 }
1028 )
1029 });
1030 assert!(
1031 has_done,
1032 "should emit Done(Completed) when all tasks are terminal"
1033 );
1034 }
1035
1036 #[test]
1039 fn test_completion_event_marks_deps_ready() {
1040 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1041 let mut scheduler = make_scheduler(graph);
1042
1043 scheduler.graph.tasks[0].status = TaskStatus::Running;
1045 scheduler.running.insert(
1046 TaskId(0),
1047 RunningTask {
1048 agent_handle_id: "handle-0".to_string(),
1049 agent_def_name: "worker".to_string(),
1050 started_at: Instant::now(),
1051 },
1052 );
1053
1054 let event = TaskEvent {
1055 task_id: TaskId(0),
1056 agent_handle_id: "handle-0".to_string(),
1057 outcome: TaskOutcome::Completed {
1058 output: "done".to_string(),
1059 artifacts: vec![],
1060 },
1061 };
1062 scheduler.buffered_events.push_back(event);
1063
1064 let actions = scheduler.tick();
1065 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1066 let has_spawn_1 = actions
1068 .iter()
1069 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1070 assert!(
1071 has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1072 "task 1 should be spawned or marked Ready"
1073 );
1074 }
1075
1076 #[test]
1077 fn test_failure_abort_cancels_running() {
1078 let graph = graph_from_nodes(vec![
1079 make_node(0, &[]),
1080 make_node(1, &[]),
1081 make_node(2, &[0, 1]),
1082 ]);
1083 let mut scheduler = make_scheduler(graph);
1084
1085 scheduler.graph.tasks[0].status = TaskStatus::Running;
1087 scheduler.running.insert(
1088 TaskId(0),
1089 RunningTask {
1090 agent_handle_id: "h0".to_string(),
1091 agent_def_name: "worker".to_string(),
1092 started_at: Instant::now(),
1093 },
1094 );
1095 scheduler.graph.tasks[1].status = TaskStatus::Running;
1096 scheduler.running.insert(
1097 TaskId(1),
1098 RunningTask {
1099 agent_handle_id: "h1".to_string(),
1100 agent_def_name: "worker".to_string(),
1101 started_at: Instant::now(),
1102 },
1103 );
1104
1105 let event = TaskEvent {
1107 task_id: TaskId(0),
1108 agent_handle_id: "h0".to_string(),
1109 outcome: TaskOutcome::Failed {
1110 error: "boom".to_string(),
1111 },
1112 };
1113 scheduler.buffered_events.push_back(event);
1114
1115 let actions = scheduler.tick();
1116 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1117 let cancel_ids: Vec<_> = actions
1118 .iter()
1119 .filter_map(|a| {
1120 if let SchedulerAction::Cancel { agent_handle_id } = a {
1121 Some(agent_handle_id.as_str())
1122 } else {
1123 None
1124 }
1125 })
1126 .collect();
1127 assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1128 assert!(
1129 actions
1130 .iter()
1131 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1132 );
1133 }
1134
1135 #[test]
1136 fn test_failure_skip_propagates() {
1137 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1138 let mut scheduler = make_scheduler(graph);
1139
1140 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1142 scheduler.graph.tasks[0].status = TaskStatus::Running;
1143 scheduler.running.insert(
1144 TaskId(0),
1145 RunningTask {
1146 agent_handle_id: "h0".to_string(),
1147 agent_def_name: "worker".to_string(),
1148 started_at: Instant::now(),
1149 },
1150 );
1151
1152 let event = TaskEvent {
1153 task_id: TaskId(0),
1154 agent_handle_id: "h0".to_string(),
1155 outcome: TaskOutcome::Failed {
1156 error: "skip me".to_string(),
1157 },
1158 };
1159 scheduler.buffered_events.push_back(event);
1160 scheduler.tick();
1161
1162 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1163 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1164 }
1165
1166 #[test]
1167 fn test_failure_retry_reschedules() {
1168 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1169 let mut scheduler = make_scheduler(graph);
1170
1171 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1172 scheduler.graph.tasks[0].max_retries = Some(3);
1173 scheduler.graph.tasks[0].retry_count = 0;
1174 scheduler.graph.tasks[0].status = TaskStatus::Running;
1175 scheduler.running.insert(
1176 TaskId(0),
1177 RunningTask {
1178 agent_handle_id: "h0".to_string(),
1179 agent_def_name: "worker".to_string(),
1180 started_at: Instant::now(),
1181 },
1182 );
1183
1184 let event = TaskEvent {
1185 task_id: TaskId(0),
1186 agent_handle_id: "h0".to_string(),
1187 outcome: TaskOutcome::Failed {
1188 error: "transient".to_string(),
1189 },
1190 };
1191 scheduler.buffered_events.push_back(event);
1192 let actions = scheduler.tick();
1193
1194 let has_spawn = actions
1196 .iter()
1197 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1198 assert!(
1199 has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1200 "retry should produce spawn or Ready status"
1201 );
1202 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1204 }
1205
1206 #[test]
1207 fn test_process_event_failed_retry() {
1208 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1210 let mut scheduler = make_scheduler(graph);
1211
1212 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1213 scheduler.graph.tasks[0].max_retries = Some(2);
1214 scheduler.graph.tasks[0].retry_count = 0;
1215 scheduler.graph.tasks[0].status = TaskStatus::Running;
1216 scheduler.running.insert(
1217 TaskId(0),
1218 RunningTask {
1219 agent_handle_id: "h0".to_string(),
1220 agent_def_name: "worker".to_string(),
1221 started_at: Instant::now(),
1222 },
1223 );
1224
1225 let event = TaskEvent {
1226 task_id: TaskId(0),
1227 agent_handle_id: "h0".to_string(),
1228 outcome: TaskOutcome::Failed {
1229 error: "first failure".to_string(),
1230 },
1231 };
1232 scheduler.buffered_events.push_back(event);
1233 let actions = scheduler.tick();
1234
1235 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1237 let spawned = actions
1238 .iter()
1239 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1240 assert!(
1241 spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1242 "retry should emit Spawn or set Ready"
1243 );
1244 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1246 }
1247
1248 #[test]
1249 fn test_timeout_cancels_stalled() {
1250 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1251 let mut config = make_config();
1252 config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
1254 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1255
1256 scheduler.graph.tasks[0].status = TaskStatus::Running;
1258 scheduler.running.insert(
1259 TaskId(0),
1260 RunningTask {
1261 agent_handle_id: "h0".to_string(),
1262 agent_def_name: "worker".to_string(),
1263 started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
1265 );
1266
1267 let actions = scheduler.tick();
1268 let has_cancel = actions.iter().any(
1269 |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1270 );
1271 assert!(has_cancel, "timed-out task should emit Cancel action");
1272 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1273 }
1274
1275 #[test]
1276 fn test_cancel_all() {
1277 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1278 let mut scheduler = make_scheduler(graph);
1279
1280 scheduler.graph.tasks[0].status = TaskStatus::Running;
1281 scheduler.running.insert(
1282 TaskId(0),
1283 RunningTask {
1284 agent_handle_id: "h0".to_string(),
1285 agent_def_name: "worker".to_string(),
1286 started_at: Instant::now(),
1287 },
1288 );
1289 scheduler.graph.tasks[1].status = TaskStatus::Running;
1290 scheduler.running.insert(
1291 TaskId(1),
1292 RunningTask {
1293 agent_handle_id: "h1".to_string(),
1294 agent_def_name: "worker".to_string(),
1295 started_at: Instant::now(),
1296 },
1297 );
1298
1299 let actions = scheduler.cancel_all();
1300
1301 assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1302 assert!(scheduler.running.is_empty());
1303 let cancel_count = actions
1304 .iter()
1305 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1306 .count();
1307 assert_eq!(cancel_count, 2);
1308 assert!(actions.iter().any(|a| matches!(
1309 a,
1310 SchedulerAction::Done {
1311 status: GraphStatus::Canceled
1312 }
1313 )));
1314 }
1315
1316 #[test]
1317 fn test_record_spawn_failure() {
1318 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1319 let mut scheduler = make_scheduler(graph);
1320
1321 scheduler.graph.tasks[0].status = TaskStatus::Running;
1323
1324 let error = SubAgentError::Spawn("spawn error".to_string());
1325 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1326 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1327 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1329 assert!(
1330 actions
1331 .iter()
1332 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1333 );
1334 }
1335
1336 #[test]
1337 fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
1338 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1339 let mut scheduler = make_scheduler(graph);
1340
1341 scheduler.graph.tasks[0].status = TaskStatus::Running;
1343
1344 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1346 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1347 assert_eq!(
1348 scheduler.graph.tasks[0].status,
1349 TaskStatus::Ready,
1350 "task must revert to Ready so the next tick can retry"
1351 );
1352 assert_eq!(
1353 scheduler.graph.status,
1354 GraphStatus::Running,
1355 "graph must stay Running, not transition to Failed"
1356 );
1357 assert!(
1358 actions.is_empty(),
1359 "no cancel or done actions expected for a transient deferral"
1360 );
1361 }
1362
1363 #[test]
1364 fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
1365 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1367 let mut scheduler = make_scheduler(graph);
1368 scheduler.graph.tasks[0].status = TaskStatus::Running;
1369
1370 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1371 let actions = scheduler.record_spawn_failure(TaskId(0), &error);
1372 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1373 assert!(actions.is_empty());
1374 }
1375
1376 #[test]
1379 fn test_concurrency_deferral_does_not_affect_running_task() {
1380 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1384 let mut scheduler = make_scheduler(graph);
1385
1386 scheduler.graph.tasks[0].status = TaskStatus::Running;
1388 scheduler.running.insert(
1389 TaskId(0),
1390 RunningTask {
1391 agent_handle_id: "h0".to_string(),
1392 agent_def_name: "worker".to_string(),
1393 started_at: Instant::now(),
1394 },
1395 );
1396 scheduler.graph.tasks[1].status = TaskStatus::Running;
1397
1398 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1400 let actions = scheduler.record_spawn_failure(TaskId(1), &error);
1401
1402 assert_eq!(
1403 scheduler.graph.tasks[0].status,
1404 TaskStatus::Running,
1405 "task 0 must remain Running"
1406 );
1407 assert_eq!(
1408 scheduler.graph.tasks[1].status,
1409 TaskStatus::Ready,
1410 "task 1 must revert to Ready"
1411 );
1412 assert_eq!(
1413 scheduler.graph.status,
1414 GraphStatus::Running,
1415 "graph must stay Running"
1416 );
1417 assert!(actions.is_empty(), "no cancel or done actions expected");
1418 }
1419
1420 #[test]
1421 fn test_max_concurrent_zero_no_infinite_loop() {
1422 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1427 let config = crate::config::OrchestrationConfig {
1428 max_parallel: 0,
1429 ..make_config()
1430 };
1431 let mut scheduler = DagScheduler::new(
1432 graph,
1433 &config,
1434 Box::new(FirstRouter),
1435 vec![make_def("worker")],
1436 )
1437 .unwrap();
1438
1439 let actions1 = scheduler.tick();
1440 assert!(
1442 actions1
1443 .iter()
1444 .any(|a| matches!(a, SchedulerAction::Spawn { .. })),
1445 "Spawn expected — parallel dispatch ignores max_parallel cap in tick()"
1446 );
1447 assert!(
1448 actions1
1449 .iter()
1450 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1451 "no Done(Failed) expected — ready tasks exist, so no deadlock"
1452 );
1453 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1454
1455 scheduler.graph.tasks[0].status = TaskStatus::Running;
1457 let error = SubAgentError::ConcurrencyLimit { active: 0, max: 0 };
1458 let extra = scheduler.record_spawn_failure(TaskId(0), &error);
1459 assert!(
1460 extra.is_empty(),
1461 "ConcurrencyLimit must not produce cancel/done actions"
1462 );
1463 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1464
1465 let actions2 = scheduler.tick();
1467 assert!(
1468 actions2
1469 .iter()
1470 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
1471 "second tick must not emit Done(Failed)"
1472 );
1473 assert_eq!(
1474 scheduler.graph.status,
1475 GraphStatus::Running,
1476 "graph must remain Running after two ticks"
1477 );
1478 }
1479
1480 #[test]
1481 fn test_all_tasks_deferred_graph_stays_running() {
1482 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1486 let mut scheduler = make_scheduler(graph);
1487
1488 let actions = scheduler.tick();
1490 assert_eq!(
1491 actions
1492 .iter()
1493 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1494 .count(),
1495 2,
1496 "expected 2 Spawn actions on first tick"
1497 );
1498 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1499 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
1500
1501 let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
1503 let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
1504 let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
1505 assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
1506 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
1507 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
1508 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1509
1510 let retry_actions = scheduler.tick();
1512 let spawn_count = retry_actions
1513 .iter()
1514 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
1515 .count();
1516 assert!(
1517 spawn_count > 0,
1518 "second tick must re-emit Spawn for deferred tasks"
1519 );
1520 assert!(
1521 retry_actions.iter().all(|a| !matches!(
1522 a,
1523 SchedulerAction::Done {
1524 status: GraphStatus::Failed,
1525 ..
1526 }
1527 )),
1528 "no Done(Failed) expected"
1529 );
1530 }
1531
1532 #[test]
1533 fn test_build_prompt_no_deps() {
1534 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1535 let scheduler = make_scheduler(graph);
1536 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1537 assert_eq!(prompt, "description for task 0");
1538 }
1539
1540 #[test]
1541 fn test_build_prompt_with_deps_and_truncation() {
1542 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1543 graph.tasks[0].status = TaskStatus::Completed;
1544 graph.tasks[0].result = Some(TaskResult {
1546 output: "x".repeat(200),
1547 artifacts: vec![],
1548 duration_ms: 10,
1549 agent_id: None,
1550 agent_def: None,
1551 });
1552
1553 let config = crate::config::OrchestrationConfig {
1554 dependency_context_budget: 50,
1555 ..make_config()
1556 };
1557 let scheduler = DagScheduler::new(
1558 graph,
1559 &config,
1560 Box::new(FirstRouter),
1561 vec![make_def("worker")],
1562 )
1563 .unwrap();
1564
1565 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1566 assert!(prompt.contains("<completed-dependencies>"));
1567 assert!(prompt.contains("[truncated:"));
1568 assert!(prompt.contains("Your task:"));
1569 }
1570
1571 #[test]
1572 fn test_duration_ms_computed_correctly() {
1573 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1575 let mut scheduler = make_scheduler(graph);
1576
1577 scheduler.graph.tasks[0].status = TaskStatus::Running;
1578 scheduler.running.insert(
1579 TaskId(0),
1580 RunningTask {
1581 agent_handle_id: "h0".to_string(),
1582 agent_def_name: "worker".to_string(),
1583 started_at: Instant::now()
1584 .checked_sub(Duration::from_millis(50))
1585 .unwrap(),
1586 },
1587 );
1588
1589 let event = TaskEvent {
1590 task_id: TaskId(0),
1591 agent_handle_id: "h0".to_string(),
1592 outcome: TaskOutcome::Completed {
1593 output: "result".to_string(),
1594 artifacts: vec![],
1595 },
1596 };
1597 scheduler.buffered_events.push_back(event);
1598 scheduler.tick();
1599
1600 let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1601 assert!(
1602 result.duration_ms > 0,
1603 "duration_ms should be > 0, got {}",
1604 result.duration_ms
1605 );
1606 }
1607
1608 #[test]
1609 fn test_utf8_safe_truncation() {
1610 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1612 graph.tasks[0].status = TaskStatus::Completed;
1613 let unicode_output = "日本語テスト".repeat(100);
1615 graph.tasks[0].result = Some(TaskResult {
1616 output: unicode_output,
1617 artifacts: vec![],
1618 duration_ms: 10,
1619 agent_id: None,
1620 agent_def: None,
1621 });
1622
1623 let config = crate::config::OrchestrationConfig {
1626 dependency_context_budget: 500,
1627 ..make_config()
1628 };
1629 let scheduler = DagScheduler::new(
1630 graph,
1631 &config,
1632 Box::new(FirstRouter),
1633 vec![make_def("worker")],
1634 )
1635 .unwrap();
1636
1637 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1639 assert!(
1640 prompt.contains("日"),
1641 "Japanese characters should be in the prompt after safe truncation"
1642 );
1643 }
1644
1645 #[test]
1646 fn test_no_agent_routes_inline() {
1647 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1649 let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1650 let actions = scheduler.tick();
1651 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
1652 assert!(
1653 actions
1654 .iter()
1655 .any(|a| matches!(a, SchedulerAction::RunInline { .. }))
1656 );
1657 }
1658
1659 #[test]
1660 fn test_stale_event_rejected() {
1661 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1663 let mut scheduler = make_scheduler(graph);
1664
1665 scheduler.graph.tasks[0].status = TaskStatus::Running;
1667 scheduler.running.insert(
1668 TaskId(0),
1669 RunningTask {
1670 agent_handle_id: "current-handle".to_string(),
1671 agent_def_name: "worker".to_string(),
1672 started_at: Instant::now(),
1673 },
1674 );
1675
1676 let stale_event = TaskEvent {
1678 task_id: TaskId(0),
1679 agent_handle_id: "old-handle".to_string(),
1680 outcome: TaskOutcome::Completed {
1681 output: "stale output".to_string(),
1682 artifacts: vec![],
1683 },
1684 };
1685 scheduler.buffered_events.push_back(stale_event);
1686 let actions = scheduler.tick();
1687
1688 assert_ne!(
1690 scheduler.graph.tasks[0].status,
1691 TaskStatus::Completed,
1692 "stale event must not complete the task"
1693 );
1694 let has_done = actions
1696 .iter()
1697 .any(|a| matches!(a, SchedulerAction::Done { .. }));
1698 assert!(
1699 !has_done,
1700 "no Done action should be emitted for a stale event"
1701 );
1702 assert!(
1704 scheduler.running.contains_key(&TaskId(0)),
1705 "running task must remain after stale event"
1706 );
1707 }
1708
1709 #[test]
1710 fn test_build_prompt_chars_count_in_truncation_message() {
1711 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1715 graph.tasks[0].status = TaskStatus::Completed;
1716 let output = "x".repeat(200);
1719 graph.tasks[0].result = Some(TaskResult {
1720 output,
1721 artifacts: vec![],
1722 duration_ms: 10,
1723 agent_id: None,
1724 agent_def: None,
1725 });
1726
1727 let config = crate::config::OrchestrationConfig {
1728 dependency_context_budget: 10, ..make_config()
1730 };
1731 let scheduler = DagScheduler::new(
1732 graph,
1733 &config,
1734 Box::new(FirstRouter),
1735 vec![make_def("worker")],
1736 )
1737 .unwrap();
1738
1739 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1740 assert!(
1742 prompt.contains("chars total"),
1743 "truncation message must use 'chars total' label. Prompt: {prompt}"
1744 );
1745 assert!(
1746 prompt.contains("[truncated:"),
1747 "prompt must contain truncation notice. Prompt: {prompt}"
1748 );
1749 }
1750
1751 #[test]
1754 fn test_resume_from_accepts_paused_graph() {
1755 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1756 graph.status = GraphStatus::Paused;
1757 graph.tasks[0].status = TaskStatus::Pending;
1758
1759 let scheduler =
1760 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1761 .expect("resume_from should accept Paused graph");
1762 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1763 }
1764
1765 #[test]
1766 fn test_resume_from_accepts_failed_graph() {
1767 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1768 graph.status = GraphStatus::Failed;
1769 graph.tasks[0].status = TaskStatus::Failed;
1770
1771 let scheduler =
1772 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1773 .expect("resume_from should accept Failed graph");
1774 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1775 }
1776
1777 #[test]
1778 fn test_resume_from_rejects_completed_graph() {
1779 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1780 graph.status = GraphStatus::Completed;
1781
1782 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1783 .unwrap_err();
1784 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1785 }
1786
1787 #[test]
1788 fn test_resume_from_rejects_canceled_graph() {
1789 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1790 graph.status = GraphStatus::Canceled;
1791
1792 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1793 .unwrap_err();
1794 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1795 }
1796
1797 #[test]
1798 fn test_resume_from_reconstructs_running_tasks() {
1799 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1801 graph.status = GraphStatus::Paused;
1802 graph.tasks[0].status = TaskStatus::Running;
1803 graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
1804 graph.tasks[0].agent_hint = Some("worker".to_string());
1805 graph.tasks[1].status = TaskStatus::Pending;
1806
1807 let scheduler =
1808 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1809 .expect("should succeed");
1810
1811 assert!(
1812 scheduler.running.contains_key(&TaskId(0)),
1813 "Running task must be reconstructed in the running map (IC1)"
1814 );
1815 assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
1816 assert!(
1817 !scheduler.running.contains_key(&TaskId(1)),
1818 "Pending task must not appear in running map"
1819 );
1820 }
1821
1822 #[test]
1823 fn test_resume_from_sets_status_running() {
1824 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1826 graph.status = GraphStatus::Paused;
1827
1828 let scheduler =
1829 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1830 .unwrap();
1831 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1832 }
1833
1834 #[test]
1837 fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
1838 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1841 let mut scheduler = make_scheduler(graph);
1842 scheduler.graph.tasks[0].status = TaskStatus::Running;
1843
1844 assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
1845
1846 let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
1847 scheduler.record_spawn_failure(TaskId(0), &error);
1848 scheduler.record_batch_backoff(false, true);
1850 assert_eq!(
1851 scheduler.consecutive_spawn_failures, 1,
1852 "first deferral tick: consecutive_spawn_failures must be 1"
1853 );
1854
1855 scheduler.graph.tasks[0].status = TaskStatus::Running;
1856 scheduler.record_spawn_failure(TaskId(0), &error);
1857 scheduler.record_batch_backoff(false, true);
1858 assert_eq!(
1859 scheduler.consecutive_spawn_failures, 2,
1860 "second deferral tick: consecutive_spawn_failures must be 2"
1861 );
1862
1863 scheduler.graph.tasks[0].status = TaskStatus::Running;
1864 scheduler.record_spawn_failure(TaskId(0), &error);
1865 scheduler.record_batch_backoff(false, true);
1866 assert_eq!(
1867 scheduler.consecutive_spawn_failures, 3,
1868 "third deferral tick: consecutive_spawn_failures must be 3"
1869 );
1870 }
1871
1872 #[test]
1873 fn test_consecutive_spawn_failures_resets_on_success() {
1874 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1877 let mut scheduler = make_scheduler(graph);
1878 scheduler.graph.tasks[0].status = TaskStatus::Running;
1879
1880 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
1881 scheduler.record_spawn_failure(TaskId(0), &error);
1882 scheduler.record_batch_backoff(false, true);
1883 scheduler.graph.tasks[0].status = TaskStatus::Running;
1884 scheduler.record_spawn_failure(TaskId(0), &error);
1885 scheduler.record_batch_backoff(false, true);
1886 assert_eq!(scheduler.consecutive_spawn_failures, 2);
1887
1888 scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
1890 assert_eq!(
1891 scheduler.consecutive_spawn_failures, 0,
1892 "record_spawn must reset consecutive_spawn_failures to 0"
1893 );
1894 }
1895
1896 #[tokio::test]
1897 async fn test_exponential_backoff_duration() {
1898 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1901 let config = crate::config::OrchestrationConfig {
1902 deferral_backoff_ms: 50,
1903 ..make_config()
1904 };
1905 let mut scheduler = DagScheduler::new(
1906 graph,
1907 &config,
1908 Box::new(FirstRouter),
1909 vec![make_def("worker")],
1910 )
1911 .unwrap();
1912
1913 assert_eq!(scheduler.consecutive_spawn_failures, 0);
1915 let start = tokio::time::Instant::now();
1916 scheduler.wait_event().await;
1917 let elapsed0 = start.elapsed();
1918 assert!(
1919 elapsed0.as_millis() >= 50,
1920 "backoff with 0 deferrals must be >= base (50ms), got {}ms",
1921 elapsed0.as_millis()
1922 );
1923
1924 scheduler.consecutive_spawn_failures = 3;
1926 let start = tokio::time::Instant::now();
1927 scheduler.wait_event().await;
1928 let elapsed3 = start.elapsed();
1929 assert!(
1930 elapsed3.as_millis() >= 400,
1931 "backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
1932 elapsed3.as_millis()
1933 );
1934
1935 scheduler.consecutive_spawn_failures = 20;
1937 let start = tokio::time::Instant::now();
1938 scheduler.wait_event().await;
1939 let elapsed20 = start.elapsed();
1940 assert!(
1941 elapsed20.as_millis() >= 5000,
1942 "backoff must be capped at 5000ms with high deferrals, got {}ms",
1943 elapsed20.as_millis()
1944 );
1945 }
1946
1947 #[tokio::test]
1950 async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
1951 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1954 let config = crate::config::OrchestrationConfig {
1955 deferral_backoff_ms: 50,
1956 ..make_config()
1957 };
1958 let mut scheduler = DagScheduler::new(
1959 graph,
1960 &config,
1961 Box::new(FirstRouter),
1962 vec![make_def("worker")],
1963 )
1964 .unwrap();
1965
1966 assert!(scheduler.running.is_empty());
1968
1969 let start = tokio::time::Instant::now();
1970 scheduler.wait_event().await;
1971 let elapsed = start.elapsed();
1972
1973 assert!(
1974 elapsed.as_millis() >= 50,
1975 "wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
1976 elapsed.as_millis()
1977 );
1978 }
1979
1980 #[test]
1981 fn test_current_deferral_backoff_exponential_growth() {
1982 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1985 let config = crate::config::OrchestrationConfig {
1986 deferral_backoff_ms: 250,
1987 ..make_config()
1988 };
1989 let mut scheduler = DagScheduler::new(
1990 graph,
1991 &config,
1992 Box::new(FirstRouter),
1993 vec![make_def("worker")],
1994 )
1995 .unwrap();
1996
1997 assert_eq!(
1998 scheduler.current_deferral_backoff(),
1999 Duration::from_millis(250)
2000 );
2001
2002 scheduler.consecutive_spawn_failures = 1;
2003 assert_eq!(
2004 scheduler.current_deferral_backoff(),
2005 Duration::from_millis(500)
2006 );
2007
2008 scheduler.consecutive_spawn_failures = 2;
2009 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
2010
2011 scheduler.consecutive_spawn_failures = 3;
2012 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
2013
2014 scheduler.consecutive_spawn_failures = 4;
2015 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
2016
2017 scheduler.consecutive_spawn_failures = 5;
2019 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2020
2021 scheduler.consecutive_spawn_failures = 100;
2022 assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
2023 }
2024
2025 #[test]
2026 fn test_record_spawn_resets_consecutive_failures() {
2027 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2029 let mut scheduler = DagScheduler::new(
2030 graph,
2031 &make_config(),
2032 Box::new(FirstRouter),
2033 vec![make_def("worker")],
2034 )
2035 .unwrap();
2036
2037 scheduler.consecutive_spawn_failures = 3;
2038 let task_id = TaskId(0);
2039 scheduler.graph.tasks[0].status = TaskStatus::Running;
2040 scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
2041
2042 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2043 }
2044
2045 #[test]
2046 fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
2047 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2050 let mut scheduler = DagScheduler::new(
2051 graph,
2052 &make_config(),
2053 Box::new(FirstRouter),
2054 vec![make_def("worker")],
2055 )
2056 .unwrap();
2057
2058 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2059 let task_id = TaskId(0);
2060 scheduler.graph.tasks[0].status = TaskStatus::Running;
2061
2062 let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2063 scheduler.record_spawn_failure(task_id, &error);
2064
2065 assert_eq!(scheduler.consecutive_spawn_failures, 0);
2067 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
2069 }
2070
2071 #[test]
2074 fn test_parallel_dispatch_all_ready() {
2075 let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
2078 let graph = graph_from_nodes(nodes);
2079 let config = crate::config::OrchestrationConfig {
2080 max_parallel: 2,
2081 ..make_config()
2082 };
2083 let mut scheduler = DagScheduler::new(
2084 graph,
2085 &config,
2086 Box::new(FirstRouter),
2087 vec![make_def("worker")],
2088 )
2089 .unwrap();
2090
2091 let actions = scheduler.tick();
2092 let spawn_count = actions
2093 .iter()
2094 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
2095 .count();
2096 assert_eq!(spawn_count, 6, "all 6 ready tasks must be dispatched");
2097
2098 let running_count = scheduler
2099 .graph
2100 .tasks
2101 .iter()
2102 .filter(|t| t.status == TaskStatus::Running)
2103 .count();
2104 assert_eq!(running_count, 6, "all 6 tasks must be marked Running");
2105 }
2106
2107 #[test]
2108 fn test_batch_backoff_partial_success() {
2109 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2111 let mut scheduler = make_scheduler(graph);
2112 scheduler.consecutive_spawn_failures = 3;
2113
2114 scheduler.record_batch_backoff(true, true);
2115 assert_eq!(
2116 scheduler.consecutive_spawn_failures, 0,
2117 "any success in batch must reset counter"
2118 );
2119 }
2120
2121 #[test]
2122 fn test_batch_backoff_all_failed() {
2123 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2125 let mut scheduler = make_scheduler(graph);
2126 scheduler.consecutive_spawn_failures = 2;
2127
2128 scheduler.record_batch_backoff(false, true);
2129 assert_eq!(
2130 scheduler.consecutive_spawn_failures, 3,
2131 "all-failure tick must increment counter"
2132 );
2133 }
2134
2135 #[test]
2136 fn test_batch_backoff_no_spawns() {
2137 let graph = graph_from_nodes(vec![make_node(0, &[])]);
2139 let mut scheduler = make_scheduler(graph);
2140 scheduler.consecutive_spawn_failures = 5;
2141
2142 scheduler.record_batch_backoff(false, false);
2143 assert_eq!(
2144 scheduler.consecutive_spawn_failures, 5,
2145 "no spawns must not change counter"
2146 );
2147 }
2148
2149 #[test]
2150 fn test_buffer_guard_uses_task_count() {
2151 let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
2161 let graph = graph_from_nodes(nodes);
2162 let config = crate::config::OrchestrationConfig {
2163 max_parallel: 2, ..make_config()
2165 };
2166 let scheduler = DagScheduler::new(
2167 graph,
2168 &config,
2169 Box::new(FirstRouter),
2170 vec![make_def("worker")],
2171 )
2172 .unwrap();
2173 assert_eq!(scheduler.graph.tasks.len() * 2, 20);
2175 assert_eq!(scheduler.max_parallel * 2, 4);
2176 }
2177
2178 #[test]
2179 fn test_batch_mixed_concurrency_and_fatal_failure() {
2180 let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
2188 nodes[1].failure_strategy = Some(FailureStrategy::Skip);
2190 let graph = graph_from_nodes(nodes);
2191 let mut scheduler = make_scheduler(graph);
2192
2193 scheduler.graph.tasks[0].status = TaskStatus::Running;
2195 scheduler.graph.tasks[1].status = TaskStatus::Running;
2196
2197 let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
2199 let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
2200 assert!(
2201 actions0.is_empty(),
2202 "ConcurrencyLimit must produce no extra actions"
2203 );
2204 assert_eq!(
2205 scheduler.graph.tasks[0].status,
2206 TaskStatus::Ready,
2207 "task 0 must revert to Ready"
2208 );
2209
2210 let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
2213 let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
2214 assert_eq!(
2215 scheduler.graph.tasks[1].status,
2216 TaskStatus::Skipped,
2217 "task 1: Skip strategy turns Failed into Skipped via propagate_failure"
2218 );
2219 assert!(
2221 actions1
2222 .iter()
2223 .all(|a| !matches!(a, SchedulerAction::Done { .. })),
2224 "no Done action expected: task 0 is still Ready"
2225 );
2226
2227 scheduler.consecutive_spawn_failures = 0;
2229 scheduler.record_batch_backoff(false, true);
2230 assert_eq!(
2231 scheduler.consecutive_spawn_failures, 1,
2232 "batch with only ConcurrencyLimit must increment counter"
2233 );
2234 }
2235}