1use crate::types::Time;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum WorkflowError {
15 NodeNotFound(u64),
17 InvalidTransition {
19 node: u64,
21 from: NodeState,
23 to: NodeState,
25 },
26 TimedOut,
28 DependenciesNotSatisfied {
30 node: u64,
32 deps: Vec<u64>,
34 },
35}
36
37impl std::fmt::Display for WorkflowError {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 WorkflowError::NodeNotFound(n) => write!(f, "Node {n} not found"),
41 WorkflowError::InvalidTransition { node, from, to } => {
42 write!(f, "Invalid transition for node {node}: {from:?} -> {to:?}")
43 }
44 WorkflowError::TimedOut => write!(f, "Workflow timed out"),
45 WorkflowError::DependenciesNotSatisfied { node, deps } => {
46 write!(f, "Node {node} has unsatisfied dependencies: {deps:?}")
47 }
48 }
49 }
50}
51
52impl std::error::Error for WorkflowError {}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
58pub enum WorkflowStatus {
59 #[default]
61 Running,
62 Success,
64 Failure,
66}
67
68impl WorkflowStatus {
69 #[inline]
71 pub fn is_running(&self) -> bool {
72 matches!(self, WorkflowStatus::Running)
73 }
74
75 #[inline]
77 pub fn is_terminal(&self) -> bool {
78 matches!(self, WorkflowStatus::Success | WorkflowStatus::Failure)
79 }
80
81 #[inline]
83 pub fn is_success(&self) -> bool {
84 matches!(self, WorkflowStatus::Success)
85 }
86
87 #[inline]
89 pub fn is_failure(&self) -> bool {
90 matches!(self, WorkflowStatus::Failure)
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
98pub enum NodeState {
99 #[default]
101 Pending,
102 Active,
104 Completed,
106 Failed,
108}
109
110impl NodeState {
111 #[inline]
113 pub fn is_pending(&self) -> bool {
114 matches!(self, NodeState::Pending)
115 }
116
117 #[inline]
119 pub fn is_active(&self) -> bool {
120 matches!(self, NodeState::Active)
121 }
122
123 #[inline]
125 pub fn is_completed(&self) -> bool {
126 matches!(self, NodeState::Completed)
127 }
128
129 #[inline]
131 pub fn is_failed(&self) -> bool {
132 matches!(self, NodeState::Failed)
133 }
134
135 #[inline]
137 pub fn is_terminal(&self) -> bool {
138 matches!(self, NodeState::Completed | NodeState::Failed)
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
147pub struct Edge {
148 pub(crate) source: u64,
152
153 pub(crate) target: u64,
157}
158
159impl Edge {
160 pub fn new(source: u64, target: u64) -> Self {
162 Edge { source, target }
163 }
164
165 #[inline]
168 pub fn from(&self) -> u64 {
169 self.source
170 }
171
172 #[inline]
175 pub fn to(&self) -> u64 {
176 self.target
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, Default)]
186pub struct WorkflowDef {
187 pub(crate) nodes: Vec<u64>,
191
192 pub(crate) edges: Vec<Edge>,
196
197 predecessors: Vec<Vec<u64>>,
200
201 successors: Vec<Vec<u64>>,
204
205 node_to_index: Vec<(u64, usize)>,
207}
208
209impl WorkflowDef {
210 fn build_adjacency(&mut self) {
212 let n = self.nodes.len();
213 self.predecessors = vec![Vec::new(); n];
214 self.successors = vec![Vec::new(); n];
215 self.node_to_index = self
216 .nodes
217 .iter()
218 .enumerate()
219 .map(|(i, &id)| (id, i))
220 .collect();
221 self.node_to_index.sort_unstable_by_key(|&(id, _)| id);
223
224 for e in &self.edges {
225 if let (Some(src_idx), Some(tgt_idx)) =
226 (self.node_index(e.source), self.node_index(e.target))
227 {
228 self.predecessors[tgt_idx].push(e.source);
229 self.successors[src_idx].push(e.target);
230 }
231 }
232 }
233
234 pub fn new(nodes: Vec<u64>, edges: Vec<Edge>) -> Self {
239 let mut def = WorkflowDef {
240 nodes,
241 edges,
242 predecessors: Vec::new(),
243 successors: Vec::new(),
244 node_to_index: Vec::new(),
245 };
246 def.build_adjacency();
247 def
248 }
249
250 pub fn validated(mut nodes: Vec<u64>, edges: Vec<Edge>) -> Result<Self, WorkflowError> {
259 nodes.sort_unstable();
260 nodes.dedup();
261
262 for e in &edges {
263 if nodes.binary_search(&e.source).is_err() {
264 return Err(WorkflowError::NodeNotFound(e.source));
265 }
266 if nodes.binary_search(&e.target).is_err() {
267 return Err(WorkflowError::NodeNotFound(e.target));
268 }
269 }
270
271 let mut def = WorkflowDef {
272 nodes,
273 edges,
274 predecessors: Vec::new(),
275 successors: Vec::new(),
276 node_to_index: Vec::new(),
277 };
278 def.build_adjacency();
279 Ok(def)
280 }
281
282 #[inline]
284 pub fn node_index(&self, node_id: u64) -> Option<usize> {
285 self.node_to_index
286 .binary_search_by_key(&node_id, |&(id, _)| id)
287 .ok()
288 .map(|pos| self.node_to_index[pos].1)
289 }
290
291 pub fn require_node(&self, node_id: u64) -> Result<usize, WorkflowError> {
293 self.node_index(node_id)
294 .ok_or(WorkflowError::NodeNotFound(node_id))
295 }
296
297 pub fn is_source(&self, n: u64) -> bool {
301 match self.node_index(n) {
302 Some(idx) => self.predecessors[idx].is_empty(),
303 None => true, }
305 }
306
307 pub fn is_sink(&self, n: u64) -> bool {
311 match self.node_index(n) {
312 Some(idx) => self.successors[idx].is_empty(),
313 None => true,
314 }
315 }
316
317 pub fn dependencies(&self, n: u64) -> Vec<u64> {
321 match self.node_index(n) {
322 Some(idx) => self.predecessors[idx].clone(),
323 None => Vec::new(),
324 }
325 }
326
327 pub fn dependents(&self, n: u64) -> Vec<u64> {
331 match self.node_index(n) {
332 Some(idx) => self.successors[idx].clone(),
333 None => Vec::new(),
334 }
335 }
336
337 #[inline]
339 pub fn node_count(&self) -> usize {
340 self.nodes.len()
341 }
342
343 #[inline]
345 pub fn edge_count(&self) -> usize {
346 self.edges.len()
347 }
348
349 #[inline]
351 pub fn contains_node(&self, n: u64) -> bool {
352 self.node_index(n).is_some()
353 }
354
355 pub fn nodes(&self) -> &[u64] {
357 &self.nodes
358 }
359
360 pub fn edges(&self) -> &[Edge] {
362 &self.edges
363 }
364}
365
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub struct NodeStateEntry {
371 pub(crate) node_id: u64,
373 pub(crate) state: NodeState,
375}
376
377#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381pub struct RetryEntry {
382 pub(crate) node_id: u64,
384 pub(crate) count: u64,
386}
387
388#[derive(Debug, Clone, Default)]
395pub struct WorkflowInstance {
396 pub(crate) workflow_def: WorkflowDef,
400
401 pub(crate) status: WorkflowStatus,
405
406 pub(crate) node_states: Vec<NodeState>,
410
411 pub(crate) retries: Vec<u16>,
415
416 pub(crate) timeout_at: Time,
420
421 pending_ct: u32,
423
424 active_ct: u32,
426
427 failed_ct: u32,
429}
430
431impl WorkflowInstance {
432 pub fn new(workflow_def: WorkflowDef, timeout_at: Time, max_retries: u64) -> Self {
434 let n = workflow_def.nodes.len();
435 let node_states = vec![NodeState::Pending; n];
436 let retry_val = if max_retries > 65535u64 {
437 65535u16
438 } else {
439 max_retries as u16
440 };
441 let retries = vec![retry_val; n];
442
443 WorkflowInstance {
444 workflow_def,
445 status: WorkflowStatus::Running,
446 node_states,
447 retries,
448 timeout_at,
449 pending_ct: n as u32,
450 active_ct: 0,
451 failed_ct: 0,
452 }
453 }
454
455 pub fn running(timeout_at: Time) -> Self {
459 WorkflowInstance {
460 workflow_def: WorkflowDef::default(),
461 status: WorkflowStatus::Running,
462 node_states: Vec::new(),
463 retries: Vec::new(),
464 timeout_at,
465 pending_ct: 0,
466 active_ct: 0,
467 failed_ct: 0,
468 }
469 }
470
471 #[inline]
475 pub fn is_running(&self) -> bool {
476 self.status.is_running()
477 }
478
479 #[inline]
485 fn require_node(&self, n: u64) -> Result<usize, WorkflowError> {
486 self.workflow_def.require_node(n)
487 }
488
489 pub fn get_node_state(&self, n: u64) -> NodeState {
495 match self.workflow_def.node_index(n) {
496 Some(idx) => self.node_states[idx],
497 None => NodeState::Pending,
498 }
499 }
500
501 #[inline]
505 pub fn is_pending(&self, n: u64) -> bool {
506 self.get_node_state(n).is_pending()
507 }
508
509 #[inline]
513 pub fn is_active(&self, n: u64) -> bool {
514 self.get_node_state(n).is_active()
515 }
516
517 #[inline]
521 pub fn pending_count(&self) -> usize {
522 self.pending_ct as usize
523 }
524
525 #[inline]
529 pub fn active_count(&self) -> usize {
530 self.active_ct as usize
531 }
532
533 #[inline]
537 pub fn is_terminal(&self) -> bool {
538 self.pending_ct == 0 && self.active_ct == 0
539 }
540
541 #[inline]
545 pub fn has_failure(&self) -> bool {
546 self.failed_ct > 0
547 }
548
549 #[inline]
551 pub fn status(&self) -> WorkflowStatus {
552 self.status
553 }
554
555 #[inline]
557 pub fn timeout_at(&self) -> Time {
558 self.timeout_at
559 }
560
561 pub fn get_retries(&self, n: u64) -> u64 {
563 match self.workflow_def.node_index(n) {
564 Some(idx) => self.retries[idx] as u64,
565 None => 0,
566 }
567 }
568
569 #[inline]
571 pub fn definition(&self) -> &WorkflowDef {
572 &self.workflow_def
573 }
574
575 pub fn dependencies_satisfied(&self, n: u64) -> bool {
577 match self.workflow_def.node_index(n) {
578 Some(idx) => {
579 let preds = &self.workflow_def.predecessors[idx];
580 let mut j = 0;
581 while j < preds.len() {
582 let pred_id = preds[j];
583 if let Some(pred_idx) = self.workflow_def.node_index(pred_id) {
584 if !self.node_states[pred_idx].is_completed() {
585 return false;
586 }
587 }
588 j += 1;
589 }
590 true
591 }
592 None => true,
593 }
594 }
595
596 fn transition(&mut self, idx: usize, new_state: NodeState) {
598 let old = self.node_states[idx];
599 match old {
601 NodeState::Pending => {
602 debug_assert!(self.pending_ct > 0, "pending_ct underflow");
603 self.pending_ct = self.pending_ct.saturating_sub(1);
604 }
605 NodeState::Active => {
606 debug_assert!(self.active_ct > 0, "active_ct underflow");
607 self.active_ct = self.active_ct.saturating_sub(1);
608 }
609 NodeState::Failed => {
610 debug_assert!(self.failed_ct > 0, "failed_ct underflow");
611 self.failed_ct = self.failed_ct.saturating_sub(1);
612 }
613 NodeState::Completed => {}
614 }
615 match new_state {
617 NodeState::Pending => self.pending_ct += 1,
618 NodeState::Active => self.active_ct += 1,
619 NodeState::Failed => self.failed_ct += 1,
620 NodeState::Completed => {}
621 }
622 self.node_states[idx] = new_state;
623 }
624
625 pub fn start_node(&mut self, n: u64) -> Result<(), WorkflowError> {
635 let idx = self.require_node(n)?;
636 let state = self.node_states[idx];
637 if !state.is_pending() {
638 return Err(WorkflowError::InvalidTransition {
639 node: n,
640 from: state,
641 to: NodeState::Active,
642 });
643 }
644
645 if !self.dependencies_satisfied(n) {
646 let preds = &self.workflow_def.predecessors[idx];
647 let mut unsatisfied = Vec::new();
648 let mut j = 0;
649 while j < preds.len() {
650 let pred_id = preds[j];
651 if let Some(pi) = self.workflow_def.node_index(pred_id) {
652 if !self.node_states[pi].is_completed() {
653 unsatisfied.push(pred_id);
654 }
655 } else {
656 unsatisfied.push(pred_id);
657 }
658 j += 1;
659 }
660 return Err(WorkflowError::DependenciesNotSatisfied {
661 node: n,
662 deps: unsatisfied,
663 });
664 }
665
666 self.transition(idx, NodeState::Active);
667 Ok(())
668 }
669
670 pub fn complete_node(&mut self, n: u64) -> Result<(), WorkflowError> {
677 let idx = self.require_node(n)?;
678 let state = self.node_states[idx];
679 if !state.is_active() {
680 return Err(WorkflowError::InvalidTransition {
681 node: n,
682 from: state,
683 to: NodeState::Completed,
684 });
685 }
686
687 self.transition(idx, NodeState::Completed);
688 self.update_workflow_status();
689 Ok(())
690 }
691
692 pub fn fail_node(&mut self, n: u64) -> Result<(), WorkflowError> {
699 let idx = self.require_node(n)?;
700 let state = self.node_states[idx];
701 if !state.is_active() {
702 return Err(WorkflowError::InvalidTransition {
703 node: n,
704 from: state,
705 to: NodeState::Failed,
706 });
707 }
708
709 let retries = self.retries[idx];
711 if retries > 0 {
712 self.transition(idx, NodeState::Pending);
714 self.retries[idx] = retries - 1;
715 } else {
716 self.transition(idx, NodeState::Failed);
718 self.status = WorkflowStatus::Failure;
719 }
720
721 Ok(())
722 }
723
724 pub fn apply_timeout(&mut self) {
726 self.status = WorkflowStatus::Failure;
727 }
728
729 fn update_workflow_status(&mut self) {
731 if self.is_terminal() {
732 if self.has_failure() {
733 self.status = WorkflowStatus::Failure;
734 } else {
735 self.status = WorkflowStatus::Success;
736 }
737 }
738 }
739
740 pub fn measure(&self, now: Time) -> u64 {
746 let time_left = self.timeout_at.saturating_sub(now);
747
748 let pending = self.pending_ct as u64;
749 let active = self.active_ct as u64;
750
751 let retries_remaining: u64 = self
752 .retries
753 .iter()
754 .fold(0u64, |acc, &r| acc.saturating_add(r as u64));
755
756 time_left
760 .saturating_add(pending.saturating_mul(1000))
761 .saturating_add(active.saturating_mul(100))
762 .saturating_add(retries_remaining.saturating_mul(1000))
763 }
764}
765
766#[cfg(test)]
767mod tests {
768 use super::*;
769
770 fn make_linear_workflow() -> WorkflowDef {
771 WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 2), Edge::new(2, 3)])
773 }
774
775 fn make_parallel_workflow() -> WorkflowDef {
776 WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 3), Edge::new(2, 3)])
778 }
779
780 #[test]
781 fn test_workflow_status() {
782 assert!(WorkflowStatus::Running.is_running());
783 assert!(!WorkflowStatus::Running.is_terminal());
784
785 assert!(WorkflowStatus::Success.is_terminal());
786 assert!(WorkflowStatus::Success.is_success());
787
788 assert!(WorkflowStatus::Failure.is_terminal());
789 assert!(WorkflowStatus::Failure.is_failure());
790 }
791
792 #[test]
793 fn test_node_state() {
794 assert!(NodeState::Pending.is_pending());
795 assert!(NodeState::Active.is_active());
796 assert!(NodeState::Completed.is_completed());
797 assert!(NodeState::Failed.is_failed());
798
799 assert!(NodeState::Completed.is_terminal());
800 assert!(NodeState::Failed.is_terminal());
801 assert!(!NodeState::Pending.is_terminal());
802 assert!(!NodeState::Active.is_terminal());
803 }
804
805 #[test]
806 fn test_workflow_def_dependencies() {
807 let wf = make_linear_workflow();
808
809 assert!(wf.dependencies(1).is_empty());
811 assert!(wf.is_source(1));
812
813 assert_eq!(wf.dependencies(2), vec![1]);
815
816 assert_eq!(wf.dependencies(3), vec![2]);
818 assert!(wf.is_sink(3));
819 }
820
821 #[test]
822 fn test_workflow_def_dependents() {
823 let wf = make_linear_workflow();
824
825 assert_eq!(wf.dependents(1), vec![2]);
826 assert_eq!(wf.dependents(2), vec![3]);
827 assert!(wf.dependents(3).is_empty());
828 }
829
830 #[test]
831 fn test_workflow_instance_new() {
832 let wf = make_linear_workflow();
833 let wi = WorkflowInstance::new(wf, 100, 3);
834
835 assert!(wi.status().is_running());
836 assert_eq!(wi.pending_count(), 3);
837 assert_eq!(wi.active_count(), 0);
838 assert_eq!(wi.get_retries(1), 3);
839 }
840
841 #[test]
842 fn test_workflow_instance_start_node() {
843 let wf = make_linear_workflow();
844 let mut wi = WorkflowInstance::new(wf, 100, 0);
845
846 assert!(wi.start_node(1).is_ok());
848 assert!(wi.is_active(1));
849
850 let result = wi.start_node(2);
852 assert!(matches!(
853 result,
854 Err(WorkflowError::DependenciesNotSatisfied { .. })
855 ));
856 }
857
858 #[test]
859 fn test_workflow_instance_complete_flow() {
860 let wf = make_linear_workflow();
861 let mut wi = WorkflowInstance::new(wf, 100, 0);
862
863 wi.start_node(1).expect("start 1");
865 wi.complete_node(1).expect("complete 1");
866
867 wi.start_node(2).expect("start 2");
868 wi.complete_node(2).expect("complete 2");
869
870 wi.start_node(3).expect("start 3");
871 wi.complete_node(3).expect("complete 3");
872
873 assert!(wi.is_terminal());
874 assert!(wi.status().is_success());
875 }
876
877 #[test]
878 fn test_workflow_instance_failure() {
879 let wf = make_linear_workflow();
880 let mut wi = WorkflowInstance::new(wf, 100, 0);
881
882 wi.start_node(1).expect("start 1");
883 wi.fail_node(1).expect("fail 1");
884
885 assert!(wi.status().is_failure());
886 }
887
888 #[test]
889 fn test_workflow_instance_retry() {
890 let wf = make_linear_workflow();
891 let mut wi = WorkflowInstance::new(wf, 100, 2);
892
893 wi.start_node(1).expect("start 1");
894 assert_eq!(wi.get_retries(1), 2);
895
896 wi.fail_node(1).expect("fail 1");
898 assert!(wi.is_pending(1)); assert_eq!(wi.get_retries(1), 1);
900 assert!(wi.status().is_running()); wi.start_node(1).expect("start 1 again");
904 wi.fail_node(1).expect("fail 1 again");
905 assert!(wi.is_pending(1));
906 assert_eq!(wi.get_retries(1), 0);
907
908 wi.start_node(1).expect("start 1 final");
910 wi.fail_node(1).expect("fail 1 final");
911 assert!(wi.get_node_state(1).is_failed());
912 assert!(wi.status().is_failure());
913 }
914
915 #[test]
916 fn test_workflow_measure_decreases_on_complete() {
917 let wf = make_linear_workflow();
918 let mut wi = WorkflowInstance::new(wf, 100, 0);
919
920 let m1 = wi.measure(0);
921
922 wi.start_node(1).expect("start 1");
923 let m2 = wi.measure(0);
924
925 assert!(m2 < m1);
928
929 wi.complete_node(1).expect("complete 1");
930 let m3 = wi.measure(0);
931
932 assert!(m3 < m2);
935 }
936
937 #[test]
938 fn test_workflow_parallel() {
939 let wf = make_parallel_workflow();
940 let mut wi = WorkflowInstance::new(wf, 100, 0);
941
942 wi.start_node(1).expect("start 1");
944 wi.start_node(2).expect("start 2");
945
946 let result = wi.start_node(3);
948 assert!(matches!(
949 result,
950 Err(WorkflowError::DependenciesNotSatisfied { .. })
951 ));
952
953 wi.complete_node(1).expect("complete 1");
955
956 let result = wi.start_node(3);
958 assert!(matches!(
959 result,
960 Err(WorkflowError::DependenciesNotSatisfied { .. })
961 ));
962
963 wi.complete_node(2).expect("complete 2");
965
966 wi.start_node(3).expect("start 3");
968 wi.complete_node(3).expect("complete 3");
969
970 assert!(wi.status().is_success());
971 }
972}