1use crate::workflow::task::{TaskId, WorkflowTask};
7use petgraph::algo::toposort as petgraph_toposort;
8use petgraph::graph::{DiGraph, NodeIndex};
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use thiserror::Error;
12
13#[derive(Error, Debug)]
15pub enum WorkflowError {
16 #[error("Cycle detected in workflow involving tasks: {0:?}")]
18 CycleDetected(Vec<TaskId>),
19
20 #[error("Task not found: {0}")]
22 TaskNotFound(TaskId),
23
24 #[error("Workflow cannot be empty")]
26 EmptyWorkflow,
27
28 #[error("Missing dependency: {0}")]
30 MissingDependency(TaskId),
31
32 #[error("Checkpoint corrupted: {0}")]
34 CheckpointCorrupted(String),
35
36 #[error("Checkpoint not found: {0}")]
38 CheckpointNotFound(String),
39
40 #[error("Workflow structure changed: {0}")]
42 WorkflowChanged(String),
43
44 #[error("Timeout: {0}")]
46 Timeout(#[from] crate::workflow::timeout::TimeoutError),
47
48 #[error("Task execution failed: {0}")]
50 TaskFailed(String),
51}
52
53#[derive(Clone)]
57pub(in crate::workflow) struct TaskNode {
58 id: TaskId,
59 pub(in crate::workflow) name: String,
60 dependencies: Vec<TaskId>,
61 task: Arc<dyn WorkflowTask>,
62}
63
64impl TaskNode {
65 pub(in crate::workflow) fn id(&self) -> &TaskId {
67 &self.id
68 }
69
70 pub(in crate::workflow) fn task(&self) -> &Arc<dyn WorkflowTask> {
72 &self.task
73 }
74}
75
76pub struct Workflow {
92 pub(in crate::workflow) graph: DiGraph<TaskNode, ()>,
94 pub(in crate::workflow) task_map: HashMap<TaskId, NodeIndex>,
96}
97
98impl Workflow {
99 pub fn new() -> Self {
101 Self {
102 graph: DiGraph::new(),
103 task_map: HashMap::new(),
104 }
105 }
106
107 pub fn add_task(&mut self, task: Box<dyn WorkflowTask>) -> NodeIndex {
128 let id = task.id();
129 let name = task.name().to_string();
130 let dependencies = task.dependencies();
131
132 let task_arc = Arc::from(task);
134
135 let node = TaskNode {
136 id: id.clone(),
137 name,
138 dependencies,
139 task: task_arc,
140 };
141
142 let idx = self.graph.add_node(node);
143 self.task_map.insert(id, idx);
144
145 idx
146 }
147
148 pub fn add_dependency(
171 &mut self,
172 from_task: impl Into<TaskId>,
173 to_task: impl Into<TaskId>,
174 ) -> Result<(), WorkflowError> {
175 let from = from_task.into();
176 let to = to_task.into();
177
178 let from_idx = *self
180 .task_map
181 .get(&from)
182 .ok_or_else(|| WorkflowError::TaskNotFound(from.clone()))?;
183 let to_idx = *self
184 .task_map
185 .get(&to)
186 .ok_or_else(|| WorkflowError::TaskNotFound(to.clone()))?;
187
188 self.graph.add_edge(from_idx, to_idx, ());
190
191 match petgraph_toposort(&self.graph, None) {
193 Ok(_) => Ok(()),
194 Err(_) => {
195 self.graph.remove_edge(
197 self.graph
198 .find_edge(from_idx, to_idx)
199 .expect("Edge just added"),
200 );
201
202 let cycle_path = self.find_cycle_path(from_idx, to_idx);
204 Err(WorkflowError::CycleDetected(cycle_path))
205 }
206 }
207 }
208
209 pub fn execution_order(&self) -> Result<Vec<TaskId>, WorkflowError> {
220 if self.graph.node_count() == 0 {
221 return Err(WorkflowError::EmptyWorkflow);
222 }
223
224 let sorted_indices = petgraph_toposort(&self.graph, None)
226 .map_err(|_| WorkflowError::CycleDetected(self.detect_cycle_nodes()))?;
227
228 let mut order = Vec::new();
230 for idx in sorted_indices {
231 if let Some(node) = self.graph.node_weight(idx) {
232 order.push(node.id.clone());
233 }
234 }
235
236 Ok(order)
237 }
238
239 pub fn execution_layers(&self) -> Result<Vec<Vec<TaskId>>, WorkflowError> {
266 if self.graph.node_count() == 0 {
267 return Err(WorkflowError::EmptyWorkflow);
268 }
269
270 let _sorted_indices = petgraph_toposort(&self.graph, None)
272 .map_err(|_| WorkflowError::CycleDetected(self.detect_cycle_nodes()))?;
273
274 let roots: Vec<NodeIndex> = self
276 .graph
277 .node_indices()
278 .filter(|&idx| {
279 self.graph
280 .neighbors_directed(idx, petgraph::Direction::Incoming)
281 .count()
282 == 0
283 })
284 .collect();
285
286 if roots.is_empty() && self.graph.node_count() > 0 {
287 return Err(WorkflowError::CycleDetected(self.detect_cycle_nodes()));
289 }
290
291 let mut distances: HashMap<NodeIndex, usize> = HashMap::new();
294
295 for &root in &roots {
297 distances.insert(root, 0);
298 }
299
300 let sorted_indices = petgraph_toposort(&self.graph, None).unwrap();
302 for idx in sorted_indices {
303 let max_incoming = self
304 .graph
305 .neighbors_directed(idx, petgraph::Direction::Incoming)
306 .filter_map(|neighbor| distances.get(&neighbor).copied())
307 .max()
308 .unwrap_or(0);
309
310 let current_distance = distances.get(&idx).copied().unwrap_or(0);
311 distances.insert(idx, std::cmp::max(current_distance, max_incoming + 1));
312
313 for neighbor in self.graph.neighbors_directed(idx, petgraph::Direction::Outgoing) {
315 let neighbor_dist = distances.get(&neighbor).copied().unwrap_or(0);
316 if distances[&idx] + 1 > neighbor_dist {
317 distances.insert(neighbor, distances[&idx] + 1);
318 }
319 }
320 }
321
322 let mut layer_map: HashMap<usize, Vec<TaskId>> = HashMap::new();
324 for (idx, distance) in &distances {
325 if let Some(node) = self.graph.node_weight(*idx) {
326 let layer = if *distance == 0 { 0 } else { distance - 1 };
327 layer_map
328 .entry(layer)
329 .or_insert_with(Vec::new)
330 .push(node.id.clone());
331 }
332 }
333
334 let mut layers: Vec<(usize, Vec<TaskId>)> = layer_map.into_iter().collect();
336 layers.sort_by_key(|(layer, _)| *layer);
337
338 let result: Vec<Vec<TaskId>> = layers.into_iter().map(|(_, tasks)| tasks).collect();
340
341 Ok(result)
342 }
343
344 pub(in crate::workflow) fn ready_tasks(&self) -> Vec<&TaskNode> {
349 self.graph
350 .node_indices()
351 .filter(|&idx| self.graph.neighbors_directed(idx, petgraph::Direction::Incoming).count() == 0)
352 .filter_map(|idx| self.graph.node_weight(idx))
353 .collect()
354 }
355
356 pub fn task_ids(&self) -> Vec<TaskId> {
358 self.task_map.keys().cloned().collect()
359 }
360
361 pub fn task_count(&self) -> usize {
363 self.graph.node_count()
364 }
365
366 pub fn contains_task(&self, id: &TaskId) -> bool {
368 self.task_map.contains_key(id)
369 }
370
371 pub fn task_dependencies(&self, id: &TaskId) -> Option<Vec<TaskId>> {
376 self.task_map.get(id).map(|&idx| {
377 self.graph
378 .neighbors_directed(idx, petgraph::Direction::Incoming)
379 .filter_map(|neighbor_idx| self.graph.node_weight(neighbor_idx))
380 .map(|node| node.id.clone())
381 .collect()
382 })
383 }
384
385 pub fn task_name(&self, id: &TaskId) -> Option<String> {
387 self.task_map
388 .get(id)
389 .and_then(|&idx| self.graph.node_weight(idx))
390 .map(|node| node.name.clone())
391 }
392
393 pub fn apply_suggestions(
408 &mut self,
409 suggestions: Vec<crate::workflow::auto_detect::DependencySuggestion>,
410 ) -> Result<usize, WorkflowError> {
411 use crate::workflow::auto_detect::DependencyReason;
412
413 let mut applied = 0;
414
415 for suggestion in suggestions {
416 if self.task_dependencies(&suggestion.to_task)
418 .as_ref()
419 .map(|deps| deps.contains(&suggestion.from_task))
420 .unwrap_or(false)
421 {
422 continue;
423 }
424
425 self.add_dependency(suggestion.from_task, suggestion.to_task)?;
427 applied += 1;
428 }
429
430 Ok(applied)
431 }
432
433 pub fn preview_suggestions(
443 &self,
444 suggestions: &[crate::workflow::auto_detect::DependencySuggestion],
445 ) -> Vec<String> {
446 use crate::workflow::auto_detect::DependencyReason;
447
448 suggestions
449 .iter()
450 .map(|s| {
451 let reason_text = match &s.reason {
452 DependencyReason::SymbolImpact { symbol, hops } => {
453 format!("symbol '{}' impact ({} hops)", symbol, hops)
454 }
455 DependencyReason::Reference { symbol } => {
456 format!("reference to '{}'", symbol)
457 }
458 DependencyReason::Call { function } => {
459 format!("call to '{}'", function)
460 }
461 };
462
463 format!(
464 "Task '{}' should depend on task '{}' (reason: {}, confidence: {:.2})",
465 s.to_task, s.from_task, reason_text, s.confidence
466 )
467 })
468 .collect()
469 }
470
471 fn find_cycle_path(&self, start: NodeIndex, end: NodeIndex) -> Vec<TaskId> {
475 let mut visited = HashSet::new();
477 let mut queue = vec![(end, vec![end])];
478
479 while let Some((current, path)) = queue.pop() {
480 if current == start {
481 return path
483 .iter()
484 .filter_map(|&idx| {
485 self.graph.node_weight(idx).map(|node| node.id.clone())
486 })
487 .collect();
488 }
489
490 if visited.contains(¤t) {
491 continue;
492 }
493 visited.insert(current);
494
495 for neighbor in self
497 .graph
498 .neighbors_directed(current, petgraph::Direction::Incoming)
499 {
500 if !visited.contains(&neighbor) {
501 let mut new_path = path.clone();
502 new_path.push(neighbor);
503 queue.push((neighbor, new_path));
504 }
505 }
506 }
507
508 vec![
510 self.graph[start].id.clone(),
511 self.graph[end].id.clone(),
512 ]
513 }
514
515 fn detect_cycle_nodes(&self) -> Vec<TaskId> {
517 let sccs = petgraph::algo::tarjan_scc(&self.graph);
519
520 sccs
522 .into_iter()
523 .filter(|scc| scc.len() > 1)
524 .flat_map(|scc| {
525 scc.into_iter()
526 .filter_map(|idx| self.graph.node_weight(idx))
527 .map(|node| node.id.clone())
528 })
529 .collect()
530 }
531}
532
533impl Default for Workflow {
534 fn default() -> Self {
535 Self::new()
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::workflow::task::{TaskContext, TaskError, TaskResult, WorkflowTask};
543 use async_trait::async_trait;
544
545 struct MockTask {
547 id: TaskId,
548 name: String,
549 deps: Vec<TaskId>,
550 }
551
552 impl MockTask {
553 fn new(id: impl Into<TaskId>, name: &str) -> Self {
554 Self {
555 id: id.into(),
556 name: name.to_string(),
557 deps: Vec::new(),
558 }
559 }
560
561 fn with_dep(mut self, dep: impl Into<TaskId>) -> Self {
562 self.deps.push(dep.into());
563 self
564 }
565 }
566
567 #[async_trait]
568 impl WorkflowTask for MockTask {
569 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, TaskError> {
570 Ok(TaskResult::Success)
571 }
572
573 fn id(&self) -> TaskId {
574 self.id.clone()
575 }
576
577 fn name(&self) -> &str {
578 &self.name
579 }
580
581 fn dependencies(&self) -> Vec<TaskId> {
582 self.deps.clone()
583 }
584 }
585
586 #[test]
587 fn test_workflow_creation() {
588 let workflow = Workflow::new();
589 assert_eq!(workflow.task_count(), 0);
590 assert!(workflow.execution_order().is_err());
591 }
592
593 #[test]
594 fn test_add_task() {
595 let mut workflow = Workflow::new();
596 let task = Box::new(MockTask::new("task-1", "Task 1"));
597
598 workflow.add_task(task);
599
600 assert_eq!(workflow.task_count(), 1);
601 assert!(workflow.contains_task(&TaskId::new("task-1")));
602 }
603
604 #[test]
605 fn test_add_multiple_tasks() {
606 let mut workflow = Workflow::new();
607
608 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
609 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
610 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
611
612 assert_eq!(workflow.task_count(), 3);
613 }
614
615 #[test]
616 fn test_add_dependency() {
617 let mut workflow = Workflow::new();
618
619 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
620 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
621
622 let result = workflow.add_dependency("a", "b");
623 assert!(result.is_ok());
624 }
625
626 #[test]
627 fn test_cycle_detection_on_add() {
628 let mut workflow = Workflow::new();
629
630 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
631 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
632 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
633
634 workflow.add_dependency("a", "b").unwrap();
636 workflow.add_dependency("b", "c").unwrap();
637
638 let result = workflow.add_dependency("c", "a");
639 assert!(matches!(result, Err(WorkflowError::CycleDetected(_))));
640 }
641
642 #[test]
643 fn test_topological_sort() {
644 let mut workflow = Workflow::new();
645
646 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
647 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
648 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
649
650 workflow.add_dependency("a", "b").unwrap();
651 workflow.add_dependency("a", "c").unwrap();
652
653 let order = workflow.execution_order().unwrap();
654 assert_eq!(order.len(), 3);
655
656 assert_eq!(order[0], TaskId::new("a"));
658 }
659
660 #[test]
661 fn test_ready_tasks() {
662 let mut workflow = Workflow::new();
663
664 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
665 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
666 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
667
668 workflow.add_dependency("a", "b").unwrap();
669
670 let ready = workflow.ready_tasks();
671 assert_eq!(ready.len(), 2); let ready_ids: Vec<&TaskId> = ready.iter().map(|node| &node.id).collect();
674 assert!(ready_ids.contains(&&TaskId::new("a")));
675 assert!(ready_ids.contains(&&TaskId::new("c")));
676 }
677
678 #[test]
679 fn test_execution_order_with_complex_dag() {
680 let mut workflow = Workflow::new();
681
682 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
684 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
685 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
686 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
687
688 workflow.add_dependency("a", "b").unwrap();
689 workflow.add_dependency("a", "c").unwrap();
690 workflow.add_dependency("b", "d").unwrap();
691 workflow.add_dependency("c", "d").unwrap();
692
693 let order = workflow.execution_order().unwrap();
694 assert_eq!(order.len(), 4);
695
696 let pos_a = order.iter().position(|id| id == &TaskId::new("a")).unwrap();
698 let pos_b = order.iter().position(|id| id == &TaskId::new("b")).unwrap();
699 let pos_c = order.iter().position(|id| id == &TaskId::new("c")).unwrap();
700 let pos_d = order.iter().position(|id| id == &TaskId::new("d")).unwrap();
701
702 assert!(pos_a < pos_b);
703 assert!(pos_a < pos_c);
704 assert!(pos_b < pos_d);
705 assert!(pos_c < pos_d);
706 }
707
708 #[test]
709 fn test_dependency_nonexistent_task() {
710 let mut workflow = Workflow::new();
711 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
712
713 let result = workflow.add_dependency("a", "nonexistent");
714 assert!(matches!(result, Err(WorkflowError::TaskNotFound(_))));
715
716 let result = workflow.add_dependency("nonexistent", "a");
717 assert!(matches!(result, Err(WorkflowError::TaskNotFound(_))));
718 }
719
720 #[test]
721 fn test_self_cycle_detection() {
722 let mut workflow = Workflow::new();
723 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
724
725 let result = workflow.add_dependency("a", "a");
727 let _ = result;
731 }
732
733 #[test]
734 fn test_apply_suggestions() {
735 use crate::workflow::auto_detect::{DependencySuggestion, DependencyReason};
736
737 let mut workflow = Workflow::new();
738 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
739 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
740 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
741
742 let suggestions = vec![
743 DependencySuggestion {
744 from_task: TaskId::new("a"),
745 to_task: TaskId::new("b"),
746 reason: DependencyReason::SymbolImpact {
747 symbol: "test".to_string(),
748 hops: 1,
749 },
750 confidence: 0.9,
751 },
752 DependencySuggestion {
753 from_task: TaskId::new("b"),
754 to_task: TaskId::new("c"),
755 reason: DependencyReason::Reference {
756 symbol: "test".to_string(),
757 },
758 confidence: 0.85,
759 },
760 ];
761
762 let applied = workflow.apply_suggestions(suggestions).unwrap();
763 assert_eq!(applied, 2);
764
765 let deps_b = workflow.task_dependencies(&TaskId::new("b")).unwrap();
767 assert!(deps_b.contains(&TaskId::new("a")));
768
769 let deps_c = workflow.task_dependencies(&TaskId::new("c")).unwrap();
770 assert!(deps_c.contains(&TaskId::new("b")));
771 }
772
773 #[test]
774 fn test_apply_suggestions_skips_existing() {
775 use crate::workflow::auto_detect::{DependencySuggestion, DependencyReason};
776
777 let mut workflow = Workflow::new();
778 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
779 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
780
781 workflow.add_dependency("a", "b").unwrap();
783
784 let suggestions = vec![
785 DependencySuggestion {
786 from_task: TaskId::new("a"),
787 to_task: TaskId::new("b"),
788 reason: DependencyReason::SymbolImpact {
789 symbol: "test".to_string(),
790 hops: 1,
791 },
792 confidence: 0.9,
793 },
794 ];
795
796 let applied = workflow.apply_suggestions(suggestions).unwrap();
797 assert_eq!(applied, 0); }
799
800 #[test]
801 fn test_preview_suggestions() {
802 use crate::workflow::auto_detect::{DependencySuggestion, DependencyReason};
803
804 let workflow = Workflow::new();
805
806 let suggestions = vec![
807 DependencySuggestion {
808 from_task: TaskId::new("a"),
809 to_task: TaskId::new("b"),
810 reason: DependencyReason::SymbolImpact {
811 symbol: "test_func".to_string(),
812 hops: 2,
813 },
814 confidence: 0.85,
815 },
816 DependencySuggestion {
817 from_task: TaskId::new("b"),
818 to_task: TaskId::new("c"),
819 reason: DependencyReason::Reference {
820 symbol: "test_struct".to_string(),
821 },
822 confidence: 0.9,
823 },
824 ];
825
826 let preview = workflow.preview_suggestions(&suggestions);
827 assert_eq!(preview.len(), 2);
828
829 assert!(preview[0].contains("'b' should depend on task 'a'"));
830 assert!(preview[0].contains("test_func"));
831 assert!(preview[0].contains("2 hops"));
832
833 assert!(preview[1].contains("'c' should depend on task 'b'"));
834 assert!(preview[1].contains("test_struct"));
835 assert!(preview[1].contains("reference"));
836 }
837
838 #[test]
841 fn test_execution_layers_empty_workflow() {
842 let workflow = Workflow::new();
843 let result = workflow.execution_layers();
844 assert!(matches!(result, Err(WorkflowError::EmptyWorkflow)));
845 }
846
847 #[test]
848 fn test_execution_layers_single_task() {
849 let mut workflow = Workflow::new();
850 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
851
852 let layers = workflow.execution_layers().unwrap();
853 assert_eq!(layers.len(), 1);
854 assert_eq!(layers[0].len(), 1);
855 assert_eq!(layers[0][0], TaskId::new("a"));
856 }
857
858 #[test]
859 fn test_execution_layers_two_independent_tasks() {
860 let mut workflow = Workflow::new();
861 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
862 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
863
864 let layers = workflow.execution_layers().unwrap();
865 assert_eq!(layers.len(), 1);
866 assert_eq!(layers[0].len(), 2);
867 assert!(layers[0].contains(&TaskId::new("a")));
868 assert!(layers[0].contains(&TaskId::new("b")));
869 }
870
871 #[test]
872 fn test_execution_layers_linear_chain() {
873 let mut workflow = Workflow::new();
874 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
875 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
876 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
877
878 workflow.add_dependency("a", "b").unwrap();
879 workflow.add_dependency("b", "c").unwrap();
880
881 let layers = workflow.execution_layers().unwrap();
882 assert_eq!(layers.len(), 3);
883 assert_eq!(layers[0], vec![TaskId::new("a")]);
884 assert_eq!(layers[1], vec![TaskId::new("b")]);
885 assert_eq!(layers[2], vec![TaskId::new("c")]);
886 }
887
888 #[test]
889 fn test_execution_layers_diamond_pattern() {
890 let mut workflow = Workflow::new();
891
892 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
894 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
895 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
896 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
897
898 workflow.add_dependency("a", "b").unwrap();
899 workflow.add_dependency("a", "c").unwrap();
900 workflow.add_dependency("b", "d").unwrap();
901 workflow.add_dependency("c", "d").unwrap();
902
903 let layers = workflow.execution_layers().unwrap();
904 assert_eq!(layers.len(), 3);
905
906 assert_eq!(layers[0], vec![TaskId::new("a")]);
908
909 assert_eq!(layers[1].len(), 2);
911 assert!(layers[1].contains(&TaskId::new("b")));
912 assert!(layers[1].contains(&TaskId::new("c")));
913
914 assert_eq!(layers[2], vec![TaskId::new("d")]);
916 }
917
918 #[test]
919 fn test_execution_layers_fan_out() {
920 let mut workflow = Workflow::new();
921
922 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
924 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
925 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
926 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
927
928 workflow.add_dependency("a", "b").unwrap();
929 workflow.add_dependency("a", "c").unwrap();
930 workflow.add_dependency("a", "d").unwrap();
931
932 let layers = workflow.execution_layers().unwrap();
933 assert_eq!(layers.len(), 2);
934 assert_eq!(layers[0], vec![TaskId::new("a")]);
935 assert_eq!(layers[1].len(), 3);
936 }
937
938 #[test]
939 fn test_execution_layers_fan_in() {
940 let mut workflow = Workflow::new();
941
942 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
944 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
945 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
946 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
947
948 workflow.add_dependency("a", "d").unwrap();
949 workflow.add_dependency("b", "d").unwrap();
950 workflow.add_dependency("c", "d").unwrap();
951
952 let layers = workflow.execution_layers().unwrap();
953 assert_eq!(layers.len(), 2);
954 assert_eq!(layers[0].len(), 3); assert_eq!(layers[1], vec![TaskId::new("d")]);
956 }
957
958 #[test]
959 fn test_execution_layers_complex_dag() {
960 let mut workflow = Workflow::new();
961
962 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
971 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
972 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
973 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
974 workflow.add_task(Box::new(MockTask::new("e", "Task E")));
975 workflow.add_task(Box::new(MockTask::new("f", "Task F")));
976
977 workflow.add_dependency("a", "b").unwrap();
978 workflow.add_dependency("a", "c").unwrap();
979 workflow.add_dependency("b", "d").unwrap();
980 workflow.add_dependency("c", "e").unwrap();
981 workflow.add_dependency("d", "f").unwrap();
982 workflow.add_dependency("e", "f").unwrap();
983
984 let layers = workflow.execution_layers().unwrap();
985 assert_eq!(layers.len(), 4);
986 assert_eq!(layers[0], vec![TaskId::new("a")]);
987 assert_eq!(layers[1].len(), 2); assert_eq!(layers[2].len(), 2); assert_eq!(layers[3], vec![TaskId::new("f")]);
990 }
991
992 #[test]
993 fn test_execution_layers_with_cycle() {
994 let mut workflow = Workflow::new();
995
996 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
997 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
998 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
999
1000 let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
1004 let b_idx = workflow.task_map.get(&TaskId::new("b")).copied().unwrap();
1005 let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
1006
1007 workflow.graph.add_edge(a_idx, b_idx, ());
1008 workflow.graph.add_edge(b_idx, c_idx, ());
1009 workflow.graph.add_edge(c_idx, a_idx, ()); let result = workflow.execution_layers();
1012 assert!(matches!(result, Err(WorkflowError::CycleDetected(_))));
1013 }
1014}