1use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus};
16use super::router::AgentRouter;
17use crate::config::OrchestrationConfig;
18use crate::sanitizer::{
19 ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind,
20};
21use crate::subagent::SubAgentDef;
22
23#[derive(Debug)]
28pub enum SchedulerAction {
29 Spawn {
31 task_id: TaskId,
32 agent_def_name: String,
33 prompt: String,
34 },
35 Cancel { agent_handle_id: String },
37 Done { status: GraphStatus },
39}
40
41#[derive(Debug)]
43pub struct TaskEvent {
44 pub task_id: TaskId,
45 pub agent_handle_id: String,
46 pub outcome: TaskOutcome,
47}
48
49#[derive(Debug)]
51pub enum TaskOutcome {
52 Completed {
54 output: String,
55 artifacts: Vec<PathBuf>,
56 },
57 Failed { error: String },
59}
60
61struct RunningTask {
63 agent_handle_id: String,
64 agent_def_name: String,
65 started_at: Instant,
66}
67
68pub struct DagScheduler {
94 graph: TaskGraph,
95 max_parallel: usize,
96 running: HashMap<TaskId, RunningTask>,
98 event_rx: mpsc::Receiver<TaskEvent>,
100 event_tx: mpsc::Sender<TaskEvent>,
102 task_timeout: Duration,
104 router: Box<dyn AgentRouter>,
106 available_agents: Vec<SubAgentDef>,
108 dependency_context_budget: usize,
110 buffered_events: VecDeque<TaskEvent>,
112 sanitizer: ContentSanitizer,
114}
115
116impl std::fmt::Debug for DagScheduler {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("DagScheduler")
119 .field("graph_id", &self.graph.id)
120 .field("graph_status", &self.graph.status)
121 .field("running_count", &self.running.len())
122 .field("max_parallel", &self.max_parallel)
123 .field("task_timeout_secs", &self.task_timeout.as_secs())
124 .finish_non_exhaustive()
125 }
126}
127
128impl DagScheduler {
129 pub fn new(
139 mut graph: TaskGraph,
140 config: &OrchestrationConfig,
141 router: Box<dyn AgentRouter>,
142 available_agents: Vec<SubAgentDef>,
143 ) -> Result<Self, OrchestrationError> {
144 if graph.status != GraphStatus::Created {
145 return Err(OrchestrationError::InvalidGraph(format!(
146 "graph must be in Created status, got {}",
147 graph.status
148 )));
149 }
150
151 dag::validate(&graph.tasks, config.max_tasks as usize)?;
152
153 graph.status = GraphStatus::Running;
154
155 for task in &mut graph.tasks {
156 if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
157 task.status = TaskStatus::Ready;
158 }
159 }
160
161 let (event_tx, event_rx) = mpsc::channel(64);
162
163 let task_timeout = if config.task_timeout_secs > 0 {
164 Duration::from_secs(config.task_timeout_secs)
165 } else {
166 Duration::from_secs(600)
167 };
168
169 Ok(Self {
170 graph,
171 max_parallel: config.max_parallel as usize,
172 running: HashMap::new(),
173 event_rx,
174 event_tx,
175 task_timeout,
176 router,
177 available_agents,
178 dependency_context_budget: config.dependency_context_budget,
179 buffered_events: VecDeque::new(),
180 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
181 })
182 }
183
184 pub fn resume_from(
198 mut graph: TaskGraph,
199 config: &OrchestrationConfig,
200 router: Box<dyn AgentRouter>,
201 available_agents: Vec<SubAgentDef>,
202 ) -> Result<Self, OrchestrationError> {
203 if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
204 return Err(OrchestrationError::InvalidGraph(format!(
205 "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
206 graph.status
207 )));
208 }
209
210 graph.status = GraphStatus::Running;
213
214 let running: HashMap<TaskId, RunningTask> = graph
219 .tasks
220 .iter()
221 .filter(|t| t.status == TaskStatus::Running)
222 .filter_map(|t| {
223 let handle_id = t.assigned_agent.clone()?;
224 let def_name = t.agent_hint.clone().unwrap_or_default();
225 Some((
226 t.id,
227 RunningTask {
228 agent_handle_id: handle_id,
229 agent_def_name: def_name,
230 started_at: Instant::now(),
232 },
233 ))
234 })
235 .collect();
236
237 let (event_tx, event_rx) = mpsc::channel(64);
238
239 let task_timeout = if config.task_timeout_secs > 0 {
240 Duration::from_secs(config.task_timeout_secs)
241 } else {
242 Duration::from_secs(600)
243 };
244
245 Ok(Self {
246 graph,
247 max_parallel: config.max_parallel as usize,
248 running,
249 event_rx,
250 event_tx,
251 task_timeout,
252 router,
253 available_agents,
254 dependency_context_budget: config.dependency_context_budget,
255 buffered_events: VecDeque::new(),
256 sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
257 })
258 }
259
260 #[must_use]
262 pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
263 self.event_tx.clone()
264 }
265
266 #[must_use]
268 pub fn graph(&self) -> &TaskGraph {
269 &self.graph
270 }
271
272 #[must_use]
276 pub fn into_graph(&self) -> TaskGraph {
277 self.graph.clone()
278 }
279}
280
281impl Drop for DagScheduler {
282 fn drop(&mut self) {
283 if !self.running.is_empty() {
284 tracing::warn!(
285 running_tasks = self.running.len(),
286 "DagScheduler dropped with running tasks; agents may continue until their \
287 CancellationToken fires or they complete naturally"
288 );
289 }
290 }
291}
292
293impl DagScheduler {
294 pub fn tick(&mut self) -> Vec<SchedulerAction> {
298 if self.graph.status != GraphStatus::Running {
299 return vec![SchedulerAction::Done {
300 status: self.graph.status,
301 }];
302 }
303
304 let mut actions = Vec::new();
305
306 while let Some(event) = self.buffered_events.pop_front() {
308 let cancel_actions = self.process_event(event);
309 actions.extend(cancel_actions);
310 }
311 while let Ok(event) = self.event_rx.try_recv() {
312 let cancel_actions = self.process_event(event);
313 actions.extend(cancel_actions);
314 }
315
316 if self.graph.status != GraphStatus::Running {
317 return actions;
318 }
319
320 let timeout_actions = self.check_timeouts();
322 actions.extend(timeout_actions);
323
324 if self.graph.status != GraphStatus::Running {
325 return actions;
326 }
327
328 let ready = dag::ready_tasks(&self.graph);
330 let running_in_graph = self
334 .graph
335 .tasks
336 .iter()
337 .filter(|t| t.status == TaskStatus::Running)
338 .count();
339 let slots_available = self.max_parallel.saturating_sub(running_in_graph);
340
341 for task_id in ready.into_iter().take(slots_available) {
342 let task = &self.graph.tasks[task_id.index()];
343
344 let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
345 tracing::warn!(
346 task_id = %task_id,
347 title = %task.title,
348 "no agent available for task, marking failed"
349 );
350 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
351 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
352 for cancel_task_id in cancel_ids {
353 if let Some(running) = self.running.remove(&cancel_task_id) {
354 actions.push(SchedulerAction::Cancel {
355 agent_handle_id: running.agent_handle_id,
356 });
357 }
358 }
359 if self.graph.status != GraphStatus::Running {
360 self.graph.finished_at = Some(super::graph::chrono_now());
361 actions.push(SchedulerAction::Done {
362 status: self.graph.status,
363 });
364 return actions;
365 }
366 continue;
367 };
368
369 let prompt = self.build_task_prompt(task);
370
371 self.graph.tasks[task_id.index()].status = TaskStatus::Running;
373
374 actions.push(SchedulerAction::Spawn {
375 task_id,
376 agent_def_name,
377 prompt,
378 });
379 }
380
381 let running_in_graph_now = self
385 .graph
386 .tasks
387 .iter()
388 .filter(|t| t.status == TaskStatus::Running)
389 .count();
390 if running_in_graph_now == 0 && self.running.is_empty() {
391 let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
392 if all_terminal {
393 self.graph.status = GraphStatus::Completed;
394 self.graph.finished_at = Some(super::graph::chrono_now());
395 actions.push(SchedulerAction::Done {
396 status: GraphStatus::Completed,
397 });
398 } else if dag::ready_tasks(&self.graph).is_empty() {
399 tracing::error!(
400 "scheduler deadlock: no running or ready tasks, but graph not complete"
401 );
402 self.graph.status = GraphStatus::Failed;
403 self.graph.finished_at = Some(super::graph::chrono_now());
404 actions.push(SchedulerAction::Done {
405 status: GraphStatus::Failed,
406 });
407 }
408 }
409
410 actions
411 }
412
413 pub async fn wait_event(&mut self) {
419 if self.running.is_empty() {
420 return;
421 }
422
423 let nearest_timeout = self
425 .running
426 .values()
427 .map(|r| {
428 self.task_timeout
429 .checked_sub(r.started_at.elapsed())
430 .unwrap_or(Duration::ZERO)
431 })
432 .min()
433 .unwrap_or(Duration::from_secs(1));
434
435 let wait_duration = nearest_timeout.max(Duration::from_millis(100));
437
438 tokio::select! {
439 Some(event) = self.event_rx.recv() => {
440 if self.buffered_events.len() >= self.max_parallel * 2 {
442 if let Some(dropped) = self.buffered_events.pop_front() {
445 tracing::error!(
446 task_id = %dropped.task_id,
447 buffer_len = self.buffered_events.len(),
448 "event buffer saturated; completion event dropped — task may \
449 remain Running until timeout"
450 );
451 }
452 }
453 self.buffered_events.push_back(event);
454 }
455 () = tokio::time::sleep(wait_duration) => {}
456 }
457 }
458
459 pub fn record_spawn(
463 &mut self,
464 task_id: TaskId,
465 agent_handle_id: String,
466 agent_def_name: String,
467 ) {
468 self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
469 self.running.insert(
470 task_id,
471 RunningTask {
472 agent_handle_id,
473 agent_def_name,
474 started_at: Instant::now(),
475 },
476 );
477 }
478
479 pub fn record_spawn_failure(&mut self, task_id: TaskId, error: &str) -> Vec<SchedulerAction> {
488 let error_excerpt: String = error.chars().take(512).collect();
490 tracing::warn!(
491 task_id = %task_id,
492 error = %error_excerpt,
493 "spawn failed, marking task failed"
494 );
495 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
496 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
497 let mut actions = Vec::new();
498 for cancel_task_id in cancel_ids {
499 if let Some(running) = self.running.remove(&cancel_task_id) {
500 actions.push(SchedulerAction::Cancel {
501 agent_handle_id: running.agent_handle_id,
502 });
503 }
504 }
505 if self.graph.status != GraphStatus::Running {
506 self.graph.finished_at = Some(super::graph::chrono_now());
507 actions.push(SchedulerAction::Done {
508 status: self.graph.status,
509 });
510 }
511 actions
512 }
513
514 pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
523 self.graph.status = GraphStatus::Canceled;
524 self.graph.finished_at = Some(super::graph::chrono_now());
525
526 let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
528 let mut actions: Vec<SchedulerAction> = running
529 .into_iter()
530 .map(|(task_id, r)| {
531 self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
532 SchedulerAction::Cancel {
533 agent_handle_id: r.agent_handle_id,
534 }
535 })
536 .collect();
537
538 for task in &mut self.graph.tasks {
539 if !task.status.is_terminal() {
540 task.status = TaskStatus::Canceled;
541 }
542 }
543
544 actions.push(SchedulerAction::Done {
545 status: GraphStatus::Canceled,
546 });
547 actions
548 }
549}
550
551impl DagScheduler {
552 fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
554 let TaskEvent {
555 task_id,
556 agent_handle_id,
557 outcome,
558 } = event;
559
560 match self.running.get(&task_id) {
563 Some(running) if running.agent_handle_id != agent_handle_id => {
564 tracing::warn!(
565 task_id = %task_id,
566 expected = %running.agent_handle_id,
567 got = %agent_handle_id,
568 "discarding stale event from previous agent incarnation"
569 );
570 return Vec::new();
571 }
572 None => {
573 tracing::debug!(
574 task_id = %task_id,
575 agent_handle_id = %agent_handle_id,
576 "ignoring event for task not in running map"
577 );
578 return Vec::new();
579 }
580 Some(_) => {}
581 }
582
583 let duration_ms = self.running.get(&task_id).map_or(0, |r| {
585 u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
586 });
587 let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
588
589 self.running.remove(&task_id);
590
591 match outcome {
592 TaskOutcome::Completed { output, artifacts } => {
593 self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
594 self.graph.tasks[task_id.index()].result = Some(TaskResult {
595 output,
596 artifacts,
597 duration_ms,
598 agent_id: Some(agent_handle_id),
599 agent_def: agent_def_name,
600 });
601
602 let newly_ready = dag::ready_tasks(&self.graph);
604 for ready_id in newly_ready {
605 if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
606 self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
607 }
608 }
609
610 Vec::new()
611 }
612
613 TaskOutcome::Failed { error } => {
614 let error_excerpt: String = error.chars().take(512).collect();
616 tracing::warn!(
617 task_id = %task_id,
618 error = %error_excerpt,
619 "task failed"
620 );
621 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
622
623 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
624 let mut actions = Vec::new();
625
626 for cancel_task_id in cancel_ids {
627 if let Some(running) = self.running.remove(&cancel_task_id) {
628 actions.push(SchedulerAction::Cancel {
629 agent_handle_id: running.agent_handle_id,
630 });
631 }
632 }
633
634 if self.graph.status != GraphStatus::Running {
635 self.graph.finished_at = Some(super::graph::chrono_now());
636 actions.push(SchedulerAction::Done {
637 status: self.graph.status,
638 });
639 }
640
641 actions
642 }
643 }
644 }
645
646 fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
654 let timed_out: Vec<(TaskId, String)> = self
655 .running
656 .iter()
657 .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
658 .map(|(id, r)| (*id, r.agent_handle_id.clone()))
659 .collect();
660
661 let mut actions = Vec::new();
662 for (task_id, agent_handle_id) in timed_out {
663 tracing::warn!(
664 task_id = %task_id,
665 timeout_secs = self.task_timeout.as_secs(),
666 "task timed out"
667 );
668 self.running.remove(&task_id);
669 self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
670
671 actions.push(SchedulerAction::Cancel { agent_handle_id });
672
673 let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
674 for cancel_task_id in cancel_ids {
675 if let Some(running) = self.running.remove(&cancel_task_id) {
676 actions.push(SchedulerAction::Cancel {
677 agent_handle_id: running.agent_handle_id,
678 });
679 }
680 }
681
682 if self.graph.status != GraphStatus::Running {
683 self.graph.finished_at = Some(super::graph::chrono_now());
684 actions.push(SchedulerAction::Done {
685 status: self.graph.status,
686 });
687 break;
688 }
689 }
690
691 actions
692 }
693
694 fn build_task_prompt(&self, task: &TaskNode) -> String {
700 if task.depends_on.is_empty() {
701 return task.description.clone();
702 }
703
704 let completed_deps: Vec<&TaskNode> = task
705 .depends_on
706 .iter()
707 .filter_map(|dep_id| {
708 let dep = &self.graph.tasks[dep_id.index()];
709 if dep.status == TaskStatus::Completed {
710 Some(dep)
711 } else {
712 None
713 }
714 })
715 .collect();
716
717 if completed_deps.is_empty() {
718 return task.description.clone();
719 }
720
721 let budget_per_dep = self
722 .dependency_context_budget
723 .checked_div(completed_deps.len())
724 .unwrap_or(self.dependency_context_budget);
725
726 let mut context_block = String::from("<completed-dependencies>\n");
727
728 for dep in &completed_deps {
729 let escaped_id = xml_escape(&dep.id.to_string());
732 let escaped_title = xml_escape(&dep.title);
733 let _ = writeln!(
734 context_block,
735 "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
736 );
737
738 if let Some(ref result) = dep.result {
739 let source = ContentSource::new(ContentSourceKind::A2aMessage);
741 let sanitized = self.sanitizer.sanitize(&result.output, source);
742 let safe_output = sanitized.body;
743
744 let char_count = safe_output.chars().count();
746 if char_count > budget_per_dep {
747 let truncated: String = safe_output.chars().take(budget_per_dep).collect();
748 let _ = write!(
749 context_block,
750 "{truncated}...\n[truncated: {char_count} chars total]"
751 );
752 } else {
753 context_block.push_str(&safe_output);
754 }
755 } else {
756 context_block.push_str("[no output recorded]\n");
757 }
758 context_block.push('\n');
759 }
760
761 for dep_id in &task.depends_on {
763 let dep = &self.graph.tasks[dep_id.index()];
764 if dep.status == TaskStatus::Skipped {
765 let escaped_id = xml_escape(&dep.id.to_string());
766 let escaped_title = xml_escape(&dep.title);
767 let _ = writeln!(
768 context_block,
769 "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
770 );
771 }
772 }
773
774 context_block.push_str("</completed-dependencies>\n\n");
775 format!("{context_block}Your task: {}", task.description)
776 }
777}
778
779fn xml_escape(s: &str) -> String {
781 let mut out = String::with_capacity(s.len());
782 for c in s.chars() {
783 match c {
784 '<' => out.push_str("<"),
785 '>' => out.push_str(">"),
786 '&' => out.push_str("&"),
787 '"' => out.push_str("""),
788 '\'' => out.push_str("'"),
789 other => out.push(other),
790 }
791 }
792 out
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798 use crate::orchestration::graph::{
799 FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus,
800 };
801
802 fn make_node(id: u32, deps: &[u32]) -> TaskNode {
803 let mut n = TaskNode::new(
804 id,
805 format!("task-{id}"),
806 format!("description for task {id}"),
807 );
808 n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
809 n
810 }
811
812 fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
813 let mut g = TaskGraph::new("test goal");
814 g.tasks = nodes;
815 g
816 }
817
818 fn make_def(name: &str) -> SubAgentDef {
819 use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
820 SubAgentDef {
821 name: name.to_string(),
822 description: format!("{name} agent"),
823 model: None,
824 tools: ToolPolicy::InheritAll,
825 disallowed_tools: vec![],
826 permissions: SubAgentPermissions::default(),
827 skills: SkillFilter::default(),
828 system_prompt: String::new(),
829 hooks: Default::default(),
830 memory: None,
831 source: None,
832 file_path: None,
833 }
834 }
835
836 fn make_config() -> crate::config::OrchestrationConfig {
837 crate::config::OrchestrationConfig {
838 enabled: true,
839 max_tasks: 20,
840 max_parallel: 4,
841 default_failure_strategy: "abort".to_string(),
842 default_max_retries: 3,
843 task_timeout_secs: 300,
844 planner_model: None,
845 planner_max_tokens: 4096,
846 dependency_context_budget: 16384,
847 confirm_before_execute: true,
848 aggregator_max_tokens: 4096,
849 }
850 }
851
852 struct FirstRouter;
853 impl AgentRouter for FirstRouter {
854 fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
855 available.first().map(|d| d.name.clone())
856 }
857 }
858
859 struct NoneRouter;
860 impl AgentRouter for NoneRouter {
861 fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
862 None
863 }
864 }
865
866 fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
867 let config = make_config();
868 let defs = vec![make_def("worker")];
869 DagScheduler::new(graph, &config, router, defs).unwrap()
870 }
871
872 fn make_scheduler(graph: TaskGraph) -> DagScheduler {
873 let config = make_config();
874 let defs = vec![make_def("worker")];
875 DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
876 }
877
878 #[test]
881 fn test_new_validates_graph_status() {
882 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
883 graph.status = GraphStatus::Running; let config = make_config();
885 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
886 assert!(result.is_err());
887 let err = result.unwrap_err();
888 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
889 }
890
891 #[test]
892 fn test_new_marks_roots_ready() {
893 let graph = graph_from_nodes(vec![
894 make_node(0, &[]),
895 make_node(1, &[]),
896 make_node(2, &[0, 1]),
897 ]);
898 let scheduler = make_scheduler(graph);
899 assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
900 assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
901 assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
902 assert_eq!(scheduler.graph().status, GraphStatus::Running);
903 }
904
905 #[test]
906 fn test_new_validates_empty_graph() {
907 let graph = graph_from_nodes(vec![]);
908 let config = make_config();
909 let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
910 assert!(result.is_err());
911 }
912
913 #[test]
916 fn test_tick_produces_spawn_for_ready() {
917 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
918 let mut scheduler = make_scheduler(graph);
919 let actions = scheduler.tick();
920 let spawns: Vec<_> = actions
921 .iter()
922 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
923 .collect();
924 assert_eq!(spawns.len(), 2);
925 }
926
927 #[test]
928 fn test_tick_respects_max_parallel() {
929 let graph = graph_from_nodes(vec![
930 make_node(0, &[]),
931 make_node(1, &[]),
932 make_node(2, &[]),
933 make_node(3, &[]),
934 make_node(4, &[]),
935 ]);
936 let mut config = make_config();
937 config.max_parallel = 2;
938 let defs = vec![make_def("worker")];
939 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
940 let actions = scheduler.tick();
941 let spawn_count = actions
942 .iter()
943 .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
944 .count();
945 assert_eq!(spawn_count, 2);
946 }
947
948 #[test]
949 fn test_tick_detects_completion() {
950 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
951 graph.tasks[0].status = TaskStatus::Completed;
952 let config = make_config();
953 let defs = vec![make_def("worker")];
954 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
955 let actions = scheduler.tick();
958 let has_done = actions.iter().any(|a| {
959 matches!(
960 a,
961 SchedulerAction::Done {
962 status: GraphStatus::Completed
963 }
964 )
965 });
966 assert!(
967 has_done,
968 "should emit Done(Completed) when all tasks are terminal"
969 );
970 }
971
972 #[test]
975 fn test_completion_event_marks_deps_ready() {
976 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
977 let mut scheduler = make_scheduler(graph);
978
979 scheduler.graph.tasks[0].status = TaskStatus::Running;
981 scheduler.running.insert(
982 TaskId(0),
983 RunningTask {
984 agent_handle_id: "handle-0".to_string(),
985 agent_def_name: "worker".to_string(),
986 started_at: Instant::now(),
987 },
988 );
989
990 let event = TaskEvent {
991 task_id: TaskId(0),
992 agent_handle_id: "handle-0".to_string(),
993 outcome: TaskOutcome::Completed {
994 output: "done".to_string(),
995 artifacts: vec![],
996 },
997 };
998 scheduler.buffered_events.push_back(event);
999
1000 let actions = scheduler.tick();
1001 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1002 let has_spawn_1 = actions
1004 .iter()
1005 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1006 assert!(
1007 has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1008 "task 1 should be spawned or marked Ready"
1009 );
1010 }
1011
1012 #[test]
1013 fn test_failure_abort_cancels_running() {
1014 let graph = graph_from_nodes(vec![
1015 make_node(0, &[]),
1016 make_node(1, &[]),
1017 make_node(2, &[0, 1]),
1018 ]);
1019 let mut scheduler = make_scheduler(graph);
1020
1021 scheduler.graph.tasks[0].status = TaskStatus::Running;
1023 scheduler.running.insert(
1024 TaskId(0),
1025 RunningTask {
1026 agent_handle_id: "h0".to_string(),
1027 agent_def_name: "worker".to_string(),
1028 started_at: Instant::now(),
1029 },
1030 );
1031 scheduler.graph.tasks[1].status = TaskStatus::Running;
1032 scheduler.running.insert(
1033 TaskId(1),
1034 RunningTask {
1035 agent_handle_id: "h1".to_string(),
1036 agent_def_name: "worker".to_string(),
1037 started_at: Instant::now(),
1038 },
1039 );
1040
1041 let event = TaskEvent {
1043 task_id: TaskId(0),
1044 agent_handle_id: "h0".to_string(),
1045 outcome: TaskOutcome::Failed {
1046 error: "boom".to_string(),
1047 },
1048 };
1049 scheduler.buffered_events.push_back(event);
1050
1051 let actions = scheduler.tick();
1052 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1053 let cancel_ids: Vec<_> = actions
1054 .iter()
1055 .filter_map(|a| {
1056 if let SchedulerAction::Cancel { agent_handle_id } = a {
1057 Some(agent_handle_id.as_str())
1058 } else {
1059 None
1060 }
1061 })
1062 .collect();
1063 assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1064 assert!(
1065 actions
1066 .iter()
1067 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1068 );
1069 }
1070
1071 #[test]
1072 fn test_failure_skip_propagates() {
1073 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1074 let mut scheduler = make_scheduler(graph);
1075
1076 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1078 scheduler.graph.tasks[0].status = TaskStatus::Running;
1079 scheduler.running.insert(
1080 TaskId(0),
1081 RunningTask {
1082 agent_handle_id: "h0".to_string(),
1083 agent_def_name: "worker".to_string(),
1084 started_at: Instant::now(),
1085 },
1086 );
1087
1088 let event = TaskEvent {
1089 task_id: TaskId(0),
1090 agent_handle_id: "h0".to_string(),
1091 outcome: TaskOutcome::Failed {
1092 error: "skip me".to_string(),
1093 },
1094 };
1095 scheduler.buffered_events.push_back(event);
1096 scheduler.tick();
1097
1098 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1099 assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1100 }
1101
1102 #[test]
1103 fn test_failure_retry_reschedules() {
1104 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1105 let mut scheduler = make_scheduler(graph);
1106
1107 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1108 scheduler.graph.tasks[0].max_retries = Some(3);
1109 scheduler.graph.tasks[0].retry_count = 0;
1110 scheduler.graph.tasks[0].status = TaskStatus::Running;
1111 scheduler.running.insert(
1112 TaskId(0),
1113 RunningTask {
1114 agent_handle_id: "h0".to_string(),
1115 agent_def_name: "worker".to_string(),
1116 started_at: Instant::now(),
1117 },
1118 );
1119
1120 let event = TaskEvent {
1121 task_id: TaskId(0),
1122 agent_handle_id: "h0".to_string(),
1123 outcome: TaskOutcome::Failed {
1124 error: "transient".to_string(),
1125 },
1126 };
1127 scheduler.buffered_events.push_back(event);
1128 let actions = scheduler.tick();
1129
1130 let has_spawn = actions
1132 .iter()
1133 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1134 assert!(
1135 has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1136 "retry should produce spawn or Ready status"
1137 );
1138 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1140 }
1141
1142 #[test]
1143 fn test_process_event_failed_retry() {
1144 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1146 let mut scheduler = make_scheduler(graph);
1147
1148 scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1149 scheduler.graph.tasks[0].max_retries = Some(2);
1150 scheduler.graph.tasks[0].retry_count = 0;
1151 scheduler.graph.tasks[0].status = TaskStatus::Running;
1152 scheduler.running.insert(
1153 TaskId(0),
1154 RunningTask {
1155 agent_handle_id: "h0".to_string(),
1156 agent_def_name: "worker".to_string(),
1157 started_at: Instant::now(),
1158 },
1159 );
1160
1161 let event = TaskEvent {
1162 task_id: TaskId(0),
1163 agent_handle_id: "h0".to_string(),
1164 outcome: TaskOutcome::Failed {
1165 error: "first failure".to_string(),
1166 },
1167 };
1168 scheduler.buffered_events.push_back(event);
1169 let actions = scheduler.tick();
1170
1171 assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1173 let spawned = actions
1174 .iter()
1175 .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1176 assert!(
1177 spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1178 "retry should emit Spawn or set Ready"
1179 );
1180 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1182 }
1183
1184 #[test]
1185 fn test_timeout_cancels_stalled() {
1186 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1187 let mut config = make_config();
1188 config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
1190 let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1191
1192 scheduler.graph.tasks[0].status = TaskStatus::Running;
1194 scheduler.running.insert(
1195 TaskId(0),
1196 RunningTask {
1197 agent_handle_id: "h0".to_string(),
1198 agent_def_name: "worker".to_string(),
1199 started_at: Instant::now() - Duration::from_secs(2), },
1201 );
1202
1203 let actions = scheduler.tick();
1204 let has_cancel = actions.iter().any(
1205 |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1206 );
1207 assert!(has_cancel, "timed-out task should emit Cancel action");
1208 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1209 }
1210
1211 #[test]
1212 fn test_cancel_all() {
1213 let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1214 let mut scheduler = make_scheduler(graph);
1215
1216 scheduler.graph.tasks[0].status = TaskStatus::Running;
1217 scheduler.running.insert(
1218 TaskId(0),
1219 RunningTask {
1220 agent_handle_id: "h0".to_string(),
1221 agent_def_name: "worker".to_string(),
1222 started_at: Instant::now(),
1223 },
1224 );
1225 scheduler.graph.tasks[1].status = TaskStatus::Running;
1226 scheduler.running.insert(
1227 TaskId(1),
1228 RunningTask {
1229 agent_handle_id: "h1".to_string(),
1230 agent_def_name: "worker".to_string(),
1231 started_at: Instant::now(),
1232 },
1233 );
1234
1235 let actions = scheduler.cancel_all();
1236
1237 assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1238 assert!(scheduler.running.is_empty());
1239 let cancel_count = actions
1240 .iter()
1241 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1242 .count();
1243 assert_eq!(cancel_count, 2);
1244 assert!(actions.iter().any(|a| matches!(
1245 a,
1246 SchedulerAction::Done {
1247 status: GraphStatus::Canceled
1248 }
1249 )));
1250 }
1251
1252 #[test]
1253 fn test_record_spawn_failure() {
1254 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1255 let mut scheduler = make_scheduler(graph);
1256
1257 scheduler.graph.tasks[0].status = TaskStatus::Running;
1259
1260 let actions = scheduler.record_spawn_failure(TaskId(0), "spawn error");
1261 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1262 assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1264 assert!(
1265 actions
1266 .iter()
1267 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1268 );
1269 }
1270
1271 #[test]
1272 fn test_build_prompt_no_deps() {
1273 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1274 let scheduler = make_scheduler(graph);
1275 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1276 assert_eq!(prompt, "description for task 0");
1277 }
1278
1279 #[test]
1280 fn test_build_prompt_with_deps_and_truncation() {
1281 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1282 graph.tasks[0].status = TaskStatus::Completed;
1283 graph.tasks[0].result = Some(TaskResult {
1285 output: "x".repeat(200),
1286 artifacts: vec![],
1287 duration_ms: 10,
1288 agent_id: None,
1289 agent_def: None,
1290 });
1291
1292 let config = crate::config::OrchestrationConfig {
1293 dependency_context_budget: 50,
1294 ..make_config()
1295 };
1296 let scheduler = DagScheduler::new(
1297 graph,
1298 &config,
1299 Box::new(FirstRouter),
1300 vec![make_def("worker")],
1301 )
1302 .unwrap();
1303
1304 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1305 assert!(prompt.contains("<completed-dependencies>"));
1306 assert!(prompt.contains("[truncated:"));
1307 assert!(prompt.contains("Your task:"));
1308 }
1309
1310 #[test]
1311 fn test_duration_ms_computed_correctly() {
1312 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1314 let mut scheduler = make_scheduler(graph);
1315
1316 scheduler.graph.tasks[0].status = TaskStatus::Running;
1317 scheduler.running.insert(
1318 TaskId(0),
1319 RunningTask {
1320 agent_handle_id: "h0".to_string(),
1321 agent_def_name: "worker".to_string(),
1322 started_at: Instant::now() - Duration::from_millis(50),
1323 },
1324 );
1325
1326 let event = TaskEvent {
1327 task_id: TaskId(0),
1328 agent_handle_id: "h0".to_string(),
1329 outcome: TaskOutcome::Completed {
1330 output: "result".to_string(),
1331 artifacts: vec![],
1332 },
1333 };
1334 scheduler.buffered_events.push_back(event);
1335 scheduler.tick();
1336
1337 let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1338 assert!(
1339 result.duration_ms > 0,
1340 "duration_ms should be > 0, got {}",
1341 result.duration_ms
1342 );
1343 }
1344
1345 #[test]
1346 fn test_utf8_safe_truncation() {
1347 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1349 graph.tasks[0].status = TaskStatus::Completed;
1350 let unicode_output = "日本語テスト".repeat(100);
1352 graph.tasks[0].result = Some(TaskResult {
1353 output: unicode_output,
1354 artifacts: vec![],
1355 duration_ms: 10,
1356 agent_id: None,
1357 agent_def: None,
1358 });
1359
1360 let config = crate::config::OrchestrationConfig {
1363 dependency_context_budget: 500,
1364 ..make_config()
1365 };
1366 let scheduler = DagScheduler::new(
1367 graph,
1368 &config,
1369 Box::new(FirstRouter),
1370 vec![make_def("worker")],
1371 )
1372 .unwrap();
1373
1374 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1376 assert!(
1377 prompt.contains("日"),
1378 "Japanese characters should be in the prompt after safe truncation"
1379 );
1380 }
1381
1382 #[test]
1383 fn test_no_agent_marks_task_failed() {
1384 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1386 let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1387 let actions = scheduler.tick();
1388 assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1389 assert!(
1390 actions
1391 .iter()
1392 .any(|a| matches!(a, SchedulerAction::Done { .. }))
1393 );
1394 }
1395
1396 #[test]
1397 fn test_stale_event_rejected() {
1398 let graph = graph_from_nodes(vec![make_node(0, &[])]);
1400 let mut scheduler = make_scheduler(graph);
1401
1402 scheduler.graph.tasks[0].status = TaskStatus::Running;
1404 scheduler.running.insert(
1405 TaskId(0),
1406 RunningTask {
1407 agent_handle_id: "current-handle".to_string(),
1408 agent_def_name: "worker".to_string(),
1409 started_at: Instant::now(),
1410 },
1411 );
1412
1413 let stale_event = TaskEvent {
1415 task_id: TaskId(0),
1416 agent_handle_id: "old-handle".to_string(),
1417 outcome: TaskOutcome::Completed {
1418 output: "stale output".to_string(),
1419 artifacts: vec![],
1420 },
1421 };
1422 scheduler.buffered_events.push_back(stale_event);
1423 let actions = scheduler.tick();
1424
1425 assert_ne!(
1427 scheduler.graph.tasks[0].status,
1428 TaskStatus::Completed,
1429 "stale event must not complete the task"
1430 );
1431 let has_done = actions
1433 .iter()
1434 .any(|a| matches!(a, SchedulerAction::Done { .. }));
1435 assert!(
1436 !has_done,
1437 "no Done action should be emitted for a stale event"
1438 );
1439 assert!(
1441 scheduler.running.contains_key(&TaskId(0)),
1442 "running task must remain after stale event"
1443 );
1444 }
1445
1446 #[test]
1447 fn test_build_prompt_chars_count_in_truncation_message() {
1448 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1452 graph.tasks[0].status = TaskStatus::Completed;
1453 let output = "x".repeat(200);
1456 graph.tasks[0].result = Some(TaskResult {
1457 output,
1458 artifacts: vec![],
1459 duration_ms: 10,
1460 agent_id: None,
1461 agent_def: None,
1462 });
1463
1464 let config = crate::config::OrchestrationConfig {
1465 dependency_context_budget: 10, ..make_config()
1467 };
1468 let scheduler = DagScheduler::new(
1469 graph,
1470 &config,
1471 Box::new(FirstRouter),
1472 vec![make_def("worker")],
1473 )
1474 .unwrap();
1475
1476 let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1477 assert!(
1479 prompt.contains("chars total"),
1480 "truncation message must use 'chars total' label. Prompt: {prompt}"
1481 );
1482 assert!(
1483 prompt.contains("[truncated:"),
1484 "prompt must contain truncation notice. Prompt: {prompt}"
1485 );
1486 }
1487
1488 #[test]
1491 fn test_resume_from_accepts_paused_graph() {
1492 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1493 graph.status = GraphStatus::Paused;
1494 graph.tasks[0].status = TaskStatus::Pending;
1495
1496 let scheduler =
1497 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1498 .expect("resume_from should accept Paused graph");
1499 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1500 }
1501
1502 #[test]
1503 fn test_resume_from_accepts_failed_graph() {
1504 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1505 graph.status = GraphStatus::Failed;
1506 graph.tasks[0].status = TaskStatus::Failed;
1507
1508 let scheduler =
1509 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1510 .expect("resume_from should accept Failed graph");
1511 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1512 }
1513
1514 #[test]
1515 fn test_resume_from_rejects_completed_graph() {
1516 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1517 graph.status = GraphStatus::Completed;
1518
1519 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1520 .unwrap_err();
1521 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1522 }
1523
1524 #[test]
1525 fn test_resume_from_rejects_canceled_graph() {
1526 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1527 graph.status = GraphStatus::Canceled;
1528
1529 let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1530 .unwrap_err();
1531 assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1532 }
1533
1534 #[test]
1535 fn test_resume_from_reconstructs_running_tasks() {
1536 let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1538 graph.status = GraphStatus::Paused;
1539 graph.tasks[0].status = TaskStatus::Running;
1540 graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
1541 graph.tasks[0].agent_hint = Some("worker".to_string());
1542 graph.tasks[1].status = TaskStatus::Pending;
1543
1544 let scheduler =
1545 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1546 .expect("should succeed");
1547
1548 assert!(
1549 scheduler.running.contains_key(&TaskId(0)),
1550 "Running task must be reconstructed in the running map (IC1)"
1551 );
1552 assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
1553 assert!(
1554 !scheduler.running.contains_key(&TaskId(1)),
1555 "Pending task must not appear in running map"
1556 );
1557 }
1558
1559 #[test]
1560 fn test_resume_from_sets_status_running() {
1561 let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1563 graph.status = GraphStatus::Paused;
1564
1565 let scheduler =
1566 DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1567 .unwrap();
1568 assert_eq!(scheduler.graph.status, GraphStatus::Running);
1569 }
1570}