Skip to main content

lion_core/state/
workflow.rs

1// Copyright (C) 2026 HaiyangLi
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Lion State Workflow
4//!
5//! Corresponds to: Lion/State/Workflow.lean
6//!
7//! Workflow DAG and termination (Theorem 008).
8//! Uses precomputed adjacency lists and dense indexed arrays for O(1) node lookups.
9
10use crate::types::Time;
11
12/// Error type for workflow operations
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum WorkflowError {
15    /// Node not found in workflow
16    NodeNotFound(u64),
17    /// Invalid node state transition
18    InvalidTransition {
19        /// The node that had the invalid transition
20        node: u64,
21        /// The current state
22        from: NodeState,
23        /// The attempted state
24        to: NodeState,
25    },
26    /// Workflow has timed out
27    TimedOut,
28    /// Dependencies not satisfied
29    DependenciesNotSatisfied {
30        /// The node that has unsatisfied dependencies
31        node: u64,
32        /// The unsatisfied dependencies
33        deps: Vec<u64>,
34    },
35}
36
37impl std::fmt::Display for WorkflowError {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            WorkflowError::NodeNotFound(n) => write!(f, "Node {n} not found"),
41            WorkflowError::InvalidTransition { node, from, to } => {
42                write!(f, "Invalid transition for node {node}: {from:?} -> {to:?}")
43            }
44            WorkflowError::TimedOut => write!(f, "Workflow timed out"),
45            WorkflowError::DependenciesNotSatisfied { node, deps } => {
46                write!(f, "Node {node} has unsatisfied dependencies: {deps:?}")
47            }
48        }
49    }
50}
51
52impl std::error::Error for WorkflowError {}
53
54/// Workflow execution status
55///
56/// Corresponds to Lean: `inductive WorkflowStatus`
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
58pub enum WorkflowStatus {
59    /// Workflow is running
60    #[default]
61    Running,
62    /// Workflow completed successfully
63    Success,
64    /// Workflow failed
65    Failure,
66}
67
68impl WorkflowStatus {
69    /// Check if the workflow is running
70    #[inline]
71    pub fn is_running(&self) -> bool {
72        matches!(self, WorkflowStatus::Running)
73    }
74
75    /// Check if the workflow is terminal (success or failure)
76    #[inline]
77    pub fn is_terminal(&self) -> bool {
78        matches!(self, WorkflowStatus::Success | WorkflowStatus::Failure)
79    }
80
81    /// Check if the workflow succeeded
82    #[inline]
83    pub fn is_success(&self) -> bool {
84        matches!(self, WorkflowStatus::Success)
85    }
86
87    /// Check if the workflow failed
88    #[inline]
89    pub fn is_failure(&self) -> bool {
90        matches!(self, WorkflowStatus::Failure)
91    }
92}
93
94/// Node execution state
95///
96/// Corresponds to Lean: `inductive NodeState`
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
98pub enum NodeState {
99    /// Node is waiting for dependencies
100    #[default]
101    Pending,
102    /// Node is currently executing
103    Active,
104    /// Node completed successfully
105    Completed,
106    /// Node failed
107    Failed,
108}
109
110impl NodeState {
111    /// Check if the node is pending
112    #[inline]
113    pub fn is_pending(&self) -> bool {
114        matches!(self, NodeState::Pending)
115    }
116
117    /// Check if the node is active
118    #[inline]
119    pub fn is_active(&self) -> bool {
120        matches!(self, NodeState::Active)
121    }
122
123    /// Check if the node is completed
124    #[inline]
125    pub fn is_completed(&self) -> bool {
126        matches!(self, NodeState::Completed)
127    }
128
129    /// Check if the node is failed
130    #[inline]
131    pub fn is_failed(&self) -> bool {
132        matches!(self, NodeState::Failed)
133    }
134
135    /// Check if the node is terminal (completed or failed)
136    #[inline]
137    pub fn is_terminal(&self) -> bool {
138        matches!(self, NodeState::Completed | NodeState::Failed)
139    }
140}
141
142/// DAG edge: (source, target) means 'source' must complete before 'target' starts
143///
144/// Corresponds to Lean: `structure Edge`
145/// NOTE: Renamed 'from' to 'source' to avoid Lean keyword conflict
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
147pub struct Edge {
148    /// Source node (must complete first)
149    ///
150    /// Corresponds to Lean: `from_ : Nat`
151    pub(crate) source: u64,
152
153    /// Target node (starts after source)
154    ///
155    /// Corresponds to Lean: `to_ : Nat`
156    pub(crate) target: u64,
157}
158
159impl Edge {
160    /// Create a new edge
161    pub fn new(source: u64, target: u64) -> Self {
162        Edge { source, target }
163    }
164
165    /// Get the source node
166    /// NOTE: Method kept as 'from()' for API compatibility
167    #[inline]
168    pub fn from(&self) -> u64 {
169        self.source
170    }
171
172    /// Get the target node
173    /// NOTE: Method kept as 'to()' for API compatibility
174    #[inline]
175    pub fn to(&self) -> u64 {
176        self.target
177    }
178}
179
180/// Workflow definition (static DAG structure)
181///
182/// Corresponds to Lean: `structure WorkflowDef`
183///
184/// Precomputes adjacency lists and a node-to-index map for O(1) lookups.
185#[derive(Debug, Clone, PartialEq, Eq, Default)]
186pub struct WorkflowDef {
187    /// Node identifiers (sorted, deduplicated)
188    ///
189    /// Corresponds to Lean: `nodes : List Nat`
190    pub(crate) nodes: Vec<u64>,
191
192    /// DAG edges
193    ///
194    /// Corresponds to Lean: `edges : List Edge`
195    pub(crate) edges: Vec<Edge>,
196
197    /// Precomputed predecessors per node index: predecessors[i] = list of node IDs
198    /// that must complete before nodes[i] can start.
199    predecessors: Vec<Vec<u64>>,
200
201    /// Precomputed successors per node index: successors[i] = list of node IDs
202    /// that depend on nodes[i].
203    successors: Vec<Vec<u64>>,
204
205    /// Sorted (node_id, index) pairs for binary-search lookup.
206    node_to_index: Vec<(u64, usize)>,
207}
208
209impl WorkflowDef {
210    /// Build adjacency lists and index map from nodes + edges.
211    fn build_adjacency(&mut self) {
212        let n = self.nodes.len();
213        self.predecessors = vec![Vec::new(); n];
214        self.successors = vec![Vec::new(); n];
215        self.node_to_index = self
216            .nodes
217            .iter()
218            .enumerate()
219            .map(|(i, &id)| (id, i))
220            .collect();
221        // node_to_index is sorted because nodes is sorted
222        self.node_to_index.sort_unstable_by_key(|&(id, _)| id);
223
224        for e in &self.edges {
225            if let (Some(src_idx), Some(tgt_idx)) =
226                (self.node_index(e.source), self.node_index(e.target))
227            {
228                self.predecessors[tgt_idx].push(e.source);
229                self.successors[src_idx].push(e.target);
230            }
231        }
232    }
233
234    /// Create a new workflow definition (unchecked).
235    ///
236    /// Prefer `validated()` for production use to ensure all edge endpoints
237    /// reference existing nodes.
238    pub fn new(nodes: Vec<u64>, edges: Vec<Edge>) -> Self {
239        let mut def = WorkflowDef {
240            nodes,
241            edges,
242            predecessors: Vec::new(),
243            successors: Vec::new(),
244            node_to_index: Vec::new(),
245        };
246        def.build_adjacency();
247        def
248    }
249
250    /// Create a validated workflow definition.
251    ///
252    /// Sorts and deduplicates node IDs, then verifies that all edge
253    /// endpoints reference existing nodes.
254    ///
255    /// # Errors
256    ///
257    /// Returns `WorkflowError::NodeNotFound` if an edge references a node not in the node list.
258    pub fn validated(mut nodes: Vec<u64>, edges: Vec<Edge>) -> Result<Self, WorkflowError> {
259        nodes.sort_unstable();
260        nodes.dedup();
261
262        for e in &edges {
263            if nodes.binary_search(&e.source).is_err() {
264                return Err(WorkflowError::NodeNotFound(e.source));
265            }
266            if nodes.binary_search(&e.target).is_err() {
267                return Err(WorkflowError::NodeNotFound(e.target));
268            }
269        }
270
271        let mut def = WorkflowDef {
272            nodes,
273            edges,
274            predecessors: Vec::new(),
275            successors: Vec::new(),
276            node_to_index: Vec::new(),
277        };
278        def.build_adjacency();
279        Ok(def)
280    }
281
282    /// Look up the dense index for a node ID via binary search.
283    #[inline]
284    pub fn node_index(&self, node_id: u64) -> Option<usize> {
285        self.node_to_index
286            .binary_search_by_key(&node_id, |&(id, _)| id)
287            .ok()
288            .map(|pos| self.node_to_index[pos].1)
289    }
290
291    /// Require a node to exist, returning its index or an error.
292    pub fn require_node(&self, node_id: u64) -> Result<usize, WorkflowError> {
293        self.node_index(node_id)
294            .ok_or(WorkflowError::NodeNotFound(node_id))
295    }
296
297    /// Check if node has no incoming edges (ready to start)
298    ///
299    /// Corresponds to Lean: `def WorkflowDef.is_source`
300    pub fn is_source(&self, n: u64) -> bool {
301        match self.node_index(n) {
302            Some(idx) => self.predecessors[idx].is_empty(),
303            None => true, // non-existent nodes trivially have no predecessors
304        }
305    }
306
307    /// Check if node has no outgoing edges (terminal)
308    ///
309    /// Corresponds to Lean: `def WorkflowDef.is_sink`
310    pub fn is_sink(&self, n: u64) -> bool {
311        match self.node_index(n) {
312            Some(idx) => self.successors[idx].is_empty(),
313            None => true,
314        }
315    }
316
317    /// Get dependencies of a node (predecessors)
318    ///
319    /// Corresponds to Lean: `def WorkflowDef.dependencies`
320    pub fn dependencies(&self, n: u64) -> Vec<u64> {
321        match self.node_index(n) {
322            Some(idx) => self.predecessors[idx].clone(),
323            None => Vec::new(),
324        }
325    }
326
327    /// Get dependents of a node (successors)
328    ///
329    /// Corresponds to Lean: `def WorkflowDef.dependents`
330    pub fn dependents(&self, n: u64) -> Vec<u64> {
331        match self.node_index(n) {
332            Some(idx) => self.successors[idx].clone(),
333            None => Vec::new(),
334        }
335    }
336
337    /// Get the number of nodes
338    #[inline]
339    pub fn node_count(&self) -> usize {
340        self.nodes.len()
341    }
342
343    /// Get the number of edges
344    #[inline]
345    pub fn edge_count(&self) -> usize {
346        self.edges.len()
347    }
348
349    /// Check if a node exists in the workflow
350    #[inline]
351    pub fn contains_node(&self, n: u64) -> bool {
352        self.node_index(n).is_some()
353    }
354
355    /// Get all nodes
356    pub fn nodes(&self) -> &[u64] {
357        &self.nodes
358    }
359
360    /// Get all edges
361    pub fn edges(&self) -> &[Edge] {
362        &self.edges
363    }
364}
365
366/// Entry in node state map (node_id, state)
367///
368/// Kept for API compatibility. WorkflowInstance uses dense arrays internally.
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370pub struct NodeStateEntry {
371    /// Node identifier
372    pub(crate) node_id: u64,
373    /// Current state of the node
374    pub(crate) state: NodeState,
375}
376
377/// Entry in retry count map (node_id, retry_count)
378///
379/// Kept for API compatibility. WorkflowInstance uses dense arrays internally.
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381pub struct RetryEntry {
382    /// Node identifier
383    pub(crate) node_id: u64,
384    /// Remaining retry count
385    pub(crate) count: u64,
386}
387
388/// Workflow instance (runtime state)
389///
390/// Corresponds to Lean: `structure WorkflowInstance`
391///
392/// Uses dense arrays indexed by node position for O(1) state access.
393/// Maintains counters for O(1) aggregate queries.
394#[derive(Debug, Clone, Default)]
395pub struct WorkflowInstance {
396    /// Static workflow definition
397    ///
398    /// Corresponds to Lean: `definition : WorkflowDef`
399    pub(crate) workflow_def: WorkflowDef,
400
401    /// Current workflow status
402    ///
403    /// Corresponds to Lean: `status : WorkflowStatus`
404    pub(crate) status: WorkflowStatus,
405
406    /// Per-node state (dense, indexed by node position in workflow_def.nodes)
407    ///
408    /// Corresponds to Lean: `node_states : Nat -> NodeState`
409    pub(crate) node_states: Vec<NodeState>,
410
411    /// Per-node retry count (dense, indexed by node position)
412    ///
413    /// Corresponds to Lean: `retries : Nat -> Nat`
414    pub(crate) retries: Vec<u16>,
415
416    /// Timeout deadline
417    ///
418    /// Corresponds to Lean: `timeout_at : Time`
419    pub(crate) timeout_at: Time,
420
421    /// Count of nodes in Pending state
422    pending_ct: u32,
423
424    /// Count of nodes in Active state
425    active_ct: u32,
426
427    /// Count of nodes in Failed state
428    failed_ct: u32,
429}
430
431impl WorkflowInstance {
432    /// Create a new workflow instance
433    pub fn new(workflow_def: WorkflowDef, timeout_at: Time, max_retries: u64) -> Self {
434        let n = workflow_def.nodes.len();
435        let node_states = vec![NodeState::Pending; n];
436        let retry_val = if max_retries > 65535u64 {
437            65535u16
438        } else {
439            max_retries as u16
440        };
441        let retries = vec![retry_val; n];
442
443        WorkflowInstance {
444            workflow_def,
445            status: WorkflowStatus::Running,
446            node_states,
447            retries,
448            timeout_at,
449            pending_ct: n as u32,
450            active_ct: 0,
451            failed_ct: 0,
452        }
453    }
454
455    /// Create a simple running workflow instance with specified timeout
456    ///
457    /// Useful for tests where we just need a running workflow.
458    pub fn running(timeout_at: Time) -> Self {
459        WorkflowInstance {
460            workflow_def: WorkflowDef::default(),
461            status: WorkflowStatus::Running,
462            node_states: Vec::new(),
463            retries: Vec::new(),
464            timeout_at,
465            pending_ct: 0,
466            active_ct: 0,
467            failed_ct: 0,
468        }
469    }
470
471    /// Check if this workflow is running
472    ///
473    /// Convenience method that delegates to status().is_running()
474    #[inline]
475    pub fn is_running(&self) -> bool {
476        self.status.is_running()
477    }
478
479    /// Check that a node exists and return its dense index.
480    ///
481    /// # Errors
482    ///
483    /// Returns `WorkflowError::NodeNotFound` if the node is not in the definition.
484    #[inline]
485    fn require_node(&self, n: u64) -> Result<usize, WorkflowError> {
486        self.workflow_def.require_node(n)
487    }
488
489    /// Get node state by node ID.
490    ///
491    /// Returns `Pending` for nodes that exist in the definition but have no
492    /// explicit state entry yet. For nodes NOT in the definition, also returns Pending
493    /// (callers should use `require_node()` first for validation).
494    pub fn get_node_state(&self, n: u64) -> NodeState {
495        match self.workflow_def.node_index(n) {
496            Some(idx) => self.node_states[idx],
497            None => NodeState::Pending,
498        }
499    }
500
501    /// Check if node is in pending state
502    ///
503    /// Corresponds to Lean: `def WorkflowInstance.is_pending`
504    #[inline]
505    pub fn is_pending(&self, n: u64) -> bool {
506        self.get_node_state(n).is_pending()
507    }
508
509    /// Check if node is in active state
510    ///
511    /// Corresponds to Lean: `def WorkflowInstance.is_active`
512    #[inline]
513    pub fn is_active(&self, n: u64) -> bool {
514        self.get_node_state(n).is_active()
515    }
516
517    /// Count pending nodes (O(1))
518    ///
519    /// Corresponds to Lean: `def WorkflowInstance.pending_count`
520    #[inline]
521    pub fn pending_count(&self) -> usize {
522        self.pending_ct as usize
523    }
524
525    /// Count active nodes (O(1))
526    ///
527    /// Corresponds to Lean: `def WorkflowInstance.active_count`
528    #[inline]
529    pub fn active_count(&self) -> usize {
530        self.active_ct as usize
531    }
532
533    /// Check if all nodes are terminal (completed or failed) (O(1))
534    ///
535    /// Corresponds to Lean: `def WorkflowInstance.is_terminal`
536    #[inline]
537    pub fn is_terminal(&self) -> bool {
538        self.pending_ct == 0 && self.active_ct == 0
539    }
540
541    /// Check if at least one node failed (O(1))
542    ///
543    /// Corresponds to Lean: `def WorkflowInstance.has_failure`
544    #[inline]
545    pub fn has_failure(&self) -> bool {
546        self.failed_ct > 0
547    }
548
549    /// Get the workflow status
550    #[inline]
551    pub fn status(&self) -> WorkflowStatus {
552        self.status
553    }
554
555    /// Get the timeout deadline
556    #[inline]
557    pub fn timeout_at(&self) -> Time {
558        self.timeout_at
559    }
560
561    /// Get retry count for a node
562    pub fn get_retries(&self, n: u64) -> u64 {
563        match self.workflow_def.node_index(n) {
564            Some(idx) => self.retries[idx] as u64,
565            None => 0,
566        }
567    }
568
569    /// Get a reference to the definition
570    #[inline]
571    pub fn definition(&self) -> &WorkflowDef {
572        &self.workflow_def
573    }
574
575    /// Check if dependencies are satisfied for a node
576    pub fn dependencies_satisfied(&self, n: u64) -> bool {
577        match self.workflow_def.node_index(n) {
578            Some(idx) => {
579                let preds = &self.workflow_def.predecessors[idx];
580                let mut j = 0;
581                while j < preds.len() {
582                    let pred_id = preds[j];
583                    if let Some(pred_idx) = self.workflow_def.node_index(pred_id) {
584                        if !self.node_states[pred_idx].is_completed() {
585                            return false;
586                        }
587                    }
588                    j += 1;
589                }
590                true
591            }
592            None => true,
593        }
594    }
595
596    /// Transition a node's state, updating counters.
597    fn transition(&mut self, idx: usize, new_state: NodeState) {
598        let old = self.node_states[idx];
599        // Decrement old counter
600        match old {
601            NodeState::Pending => {
602                debug_assert!(self.pending_ct > 0, "pending_ct underflow");
603                self.pending_ct = self.pending_ct.saturating_sub(1);
604            }
605            NodeState::Active => {
606                debug_assert!(self.active_ct > 0, "active_ct underflow");
607                self.active_ct = self.active_ct.saturating_sub(1);
608            }
609            NodeState::Failed => {
610                debug_assert!(self.failed_ct > 0, "failed_ct underflow");
611                self.failed_ct = self.failed_ct.saturating_sub(1);
612            }
613            NodeState::Completed => {}
614        }
615        // Increment new counter
616        match new_state {
617            NodeState::Pending => self.pending_ct += 1,
618            NodeState::Active => self.active_ct += 1,
619            NodeState::Failed => self.failed_ct += 1,
620            NodeState::Completed => {}
621        }
622        self.node_states[idx] = new_state;
623    }
624
625    /// Start a pending node (transition to active)
626    ///
627    /// Returns Err if node not in definition, not pending, or dependencies not satisfied.
628    ///
629    /// # Errors
630    ///
631    /// Returns `WorkflowError::NodeNotFound` if the node is not in the workflow definition.
632    /// Returns `WorkflowError::InvalidTransition` if the node is not in the `Pending` state.
633    /// Returns `WorkflowError::DependenciesNotSatisfied` if upstream nodes are not yet completed.
634    pub fn start_node(&mut self, n: u64) -> Result<(), WorkflowError> {
635        let idx = self.require_node(n)?;
636        let state = self.node_states[idx];
637        if !state.is_pending() {
638            return Err(WorkflowError::InvalidTransition {
639                node: n,
640                from: state,
641                to: NodeState::Active,
642            });
643        }
644
645        if !self.dependencies_satisfied(n) {
646            let preds = &self.workflow_def.predecessors[idx];
647            let mut unsatisfied = Vec::new();
648            let mut j = 0;
649            while j < preds.len() {
650                let pred_id = preds[j];
651                if let Some(pi) = self.workflow_def.node_index(pred_id) {
652                    if !self.node_states[pi].is_completed() {
653                        unsatisfied.push(pred_id);
654                    }
655                } else {
656                    unsatisfied.push(pred_id);
657                }
658                j += 1;
659            }
660            return Err(WorkflowError::DependenciesNotSatisfied {
661                node: n,
662                deps: unsatisfied,
663            });
664        }
665
666        self.transition(idx, NodeState::Active);
667        Ok(())
668    }
669
670    /// Complete an active node (transition to completed)
671    ///
672    /// # Errors
673    ///
674    /// Returns `WorkflowError::NodeNotFound` if the node is not in the workflow definition.
675    /// Returns `WorkflowError::InvalidTransition` if the node is not in the `Active` state.
676    pub fn complete_node(&mut self, n: u64) -> Result<(), WorkflowError> {
677        let idx = self.require_node(n)?;
678        let state = self.node_states[idx];
679        if !state.is_active() {
680            return Err(WorkflowError::InvalidTransition {
681                node: n,
682                from: state,
683                to: NodeState::Completed,
684            });
685        }
686
687        self.transition(idx, NodeState::Completed);
688        self.update_workflow_status();
689        Ok(())
690    }
691
692    /// Fail an active node (transition to failed)
693    ///
694    /// # Errors
695    ///
696    /// Returns `WorkflowError::NodeNotFound` if the node is not in the workflow definition.
697    /// Returns `WorkflowError::InvalidTransition` if the node is not in the `Active` state.
698    pub fn fail_node(&mut self, n: u64) -> Result<(), WorkflowError> {
699        let idx = self.require_node(n)?;
700        let state = self.node_states[idx];
701        if !state.is_active() {
702            return Err(WorkflowError::InvalidTransition {
703                node: n,
704                from: state,
705                to: NodeState::Failed,
706            });
707        }
708
709        // Check if we have retries
710        let retries = self.retries[idx];
711        if retries > 0 {
712            // Retry: go back to pending and decrement retries
713            self.transition(idx, NodeState::Pending);
714            self.retries[idx] = retries - 1;
715        } else {
716            // No retries left: fail
717            self.transition(idx, NodeState::Failed);
718            self.status = WorkflowStatus::Failure;
719        }
720
721        Ok(())
722    }
723
724    /// Apply timeout
725    pub fn apply_timeout(&mut self) {
726        self.status = WorkflowStatus::Failure;
727    }
728
729    /// Update workflow status based on node states
730    fn update_workflow_status(&mut self) {
731        if self.is_terminal() {
732            if self.has_failure() {
733                self.status = WorkflowStatus::Failure;
734            } else {
735                self.status = WorkflowStatus::Success;
736            }
737        }
738    }
739
740    /// Calculate the termination measure
741    ///
742    /// Corresponds to Lean: `def workflow_measure`
743    ///
744    /// A natural number that decreases on each workflow step.
745    pub fn measure(&self, now: Time) -> u64 {
746        let time_left = self.timeout_at.saturating_sub(now);
747
748        let pending = self.pending_ct as u64;
749        let active = self.active_ct as u64;
750
751        let retries_remaining: u64 = self
752            .retries
753            .iter()
754            .fold(0u64, |acc, &r| acc.saturating_add(r as u64));
755
756        // Weighted sum for strict decrease (from Lean)
757        // Weights: time (x1), pending (x1000), active (x100), retries_remaining (x1000)
758        // Use saturating arithmetic to prevent overflow (G.3 audit requirement)
759        time_left
760            .saturating_add(pending.saturating_mul(1000))
761            .saturating_add(active.saturating_mul(100))
762            .saturating_add(retries_remaining.saturating_mul(1000))
763    }
764}
765
766#[cfg(test)]
767mod tests {
768    use super::*;
769
770    fn make_linear_workflow() -> WorkflowDef {
771        // 1 -> 2 -> 3
772        WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 2), Edge::new(2, 3)])
773    }
774
775    fn make_parallel_workflow() -> WorkflowDef {
776        // 1 -> 3, 2 -> 3 (both 1 and 2 must complete before 3)
777        WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 3), Edge::new(2, 3)])
778    }
779
780    #[test]
781    fn test_workflow_status() {
782        assert!(WorkflowStatus::Running.is_running());
783        assert!(!WorkflowStatus::Running.is_terminal());
784
785        assert!(WorkflowStatus::Success.is_terminal());
786        assert!(WorkflowStatus::Success.is_success());
787
788        assert!(WorkflowStatus::Failure.is_terminal());
789        assert!(WorkflowStatus::Failure.is_failure());
790    }
791
792    #[test]
793    fn test_node_state() {
794        assert!(NodeState::Pending.is_pending());
795        assert!(NodeState::Active.is_active());
796        assert!(NodeState::Completed.is_completed());
797        assert!(NodeState::Failed.is_failed());
798
799        assert!(NodeState::Completed.is_terminal());
800        assert!(NodeState::Failed.is_terminal());
801        assert!(!NodeState::Pending.is_terminal());
802        assert!(!NodeState::Active.is_terminal());
803    }
804
805    #[test]
806    fn test_workflow_def_dependencies() {
807        let wf = make_linear_workflow();
808
809        // Node 1 has no dependencies (source)
810        assert!(wf.dependencies(1).is_empty());
811        assert!(wf.is_source(1));
812
813        // Node 2 depends on 1
814        assert_eq!(wf.dependencies(2), vec![1]);
815
816        // Node 3 depends on 2 (sink)
817        assert_eq!(wf.dependencies(3), vec![2]);
818        assert!(wf.is_sink(3));
819    }
820
821    #[test]
822    fn test_workflow_def_dependents() {
823        let wf = make_linear_workflow();
824
825        assert_eq!(wf.dependents(1), vec![2]);
826        assert_eq!(wf.dependents(2), vec![3]);
827        assert!(wf.dependents(3).is_empty());
828    }
829
830    #[test]
831    fn test_workflow_instance_new() {
832        let wf = make_linear_workflow();
833        let wi = WorkflowInstance::new(wf, 100, 3);
834
835        assert!(wi.status().is_running());
836        assert_eq!(wi.pending_count(), 3);
837        assert_eq!(wi.active_count(), 0);
838        assert_eq!(wi.get_retries(1), 3);
839    }
840
841    #[test]
842    fn test_workflow_instance_start_node() {
843        let wf = make_linear_workflow();
844        let mut wi = WorkflowInstance::new(wf, 100, 0);
845
846        // Start node 1 (source, no deps)
847        assert!(wi.start_node(1).is_ok());
848        assert!(wi.is_active(1));
849
850        // Cannot start node 2 (deps not satisfied)
851        let result = wi.start_node(2);
852        assert!(matches!(
853            result,
854            Err(WorkflowError::DependenciesNotSatisfied { .. })
855        ));
856    }
857
858    #[test]
859    fn test_workflow_instance_complete_flow() {
860        let wf = make_linear_workflow();
861        let mut wi = WorkflowInstance::new(wf, 100, 0);
862
863        // Execute 1 -> 2 -> 3
864        wi.start_node(1).expect("start 1");
865        wi.complete_node(1).expect("complete 1");
866
867        wi.start_node(2).expect("start 2");
868        wi.complete_node(2).expect("complete 2");
869
870        wi.start_node(3).expect("start 3");
871        wi.complete_node(3).expect("complete 3");
872
873        assert!(wi.is_terminal());
874        assert!(wi.status().is_success());
875    }
876
877    #[test]
878    fn test_workflow_instance_failure() {
879        let wf = make_linear_workflow();
880        let mut wi = WorkflowInstance::new(wf, 100, 0);
881
882        wi.start_node(1).expect("start 1");
883        wi.fail_node(1).expect("fail 1");
884
885        assert!(wi.status().is_failure());
886    }
887
888    #[test]
889    fn test_workflow_instance_retry() {
890        let wf = make_linear_workflow();
891        let mut wi = WorkflowInstance::new(wf, 100, 2);
892
893        wi.start_node(1).expect("start 1");
894        assert_eq!(wi.get_retries(1), 2);
895
896        // First fail - should retry
897        wi.fail_node(1).expect("fail 1");
898        assert!(wi.is_pending(1)); // Back to pending
899        assert_eq!(wi.get_retries(1), 1);
900        assert!(wi.status().is_running()); // Still running
901
902        // Second fail - should retry again
903        wi.start_node(1).expect("start 1 again");
904        wi.fail_node(1).expect("fail 1 again");
905        assert!(wi.is_pending(1));
906        assert_eq!(wi.get_retries(1), 0);
907
908        // Third fail - no more retries
909        wi.start_node(1).expect("start 1 final");
910        wi.fail_node(1).expect("fail 1 final");
911        assert!(wi.get_node_state(1).is_failed());
912        assert!(wi.status().is_failure());
913    }
914
915    #[test]
916    fn test_workflow_measure_decreases_on_complete() {
917        let wf = make_linear_workflow();
918        let mut wi = WorkflowInstance::new(wf, 100, 0);
919
920        let m1 = wi.measure(0);
921
922        wi.start_node(1).expect("start 1");
923        let m2 = wi.measure(0);
924
925        // start_node: pending decreases, active increases
926        // net: -1000 + 100 = -900
927        assert!(m2 < m1);
928
929        wi.complete_node(1).expect("complete 1");
930        let m3 = wi.measure(0);
931
932        // complete_node: active decreases
933        // net: -100
934        assert!(m3 < m2);
935    }
936
937    #[test]
938    fn test_workflow_parallel() {
939        let wf = make_parallel_workflow();
940        let mut wi = WorkflowInstance::new(wf, 100, 0);
941
942        // Both 1 and 2 can start (they're sources)
943        wi.start_node(1).expect("start 1");
944        wi.start_node(2).expect("start 2");
945
946        // Cannot start 3 yet
947        let result = wi.start_node(3);
948        assert!(matches!(
949            result,
950            Err(WorkflowError::DependenciesNotSatisfied { .. })
951        ));
952
953        // Complete 1
954        wi.complete_node(1).expect("complete 1");
955
956        // Still cannot start 3
957        let result = wi.start_node(3);
958        assert!(matches!(
959            result,
960            Err(WorkflowError::DependenciesNotSatisfied { .. })
961        ));
962
963        // Complete 2
964        wi.complete_node(2).expect("complete 2");
965
966        // Now can start 3
967        wi.start_node(3).expect("start 3");
968        wi.complete_node(3).expect("complete 3");
969
970        assert!(wi.status().is_success());
971    }
972}