use crate::types::Time;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkflowError {
NodeNotFound(u64),
InvalidTransition {
node: u64,
from: NodeState,
to: NodeState,
},
TimedOut,
DependenciesNotSatisfied {
node: u64,
deps: Vec<u64>,
},
}
impl std::fmt::Display for WorkflowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkflowError::NodeNotFound(n) => write!(f, "Node {n} not found"),
WorkflowError::InvalidTransition { node, from, to } => {
write!(f, "Invalid transition for node {node}: {from:?} -> {to:?}")
}
WorkflowError::TimedOut => write!(f, "Workflow timed out"),
WorkflowError::DependenciesNotSatisfied { node, deps } => {
write!(f, "Node {node} has unsatisfied dependencies: {deps:?}")
}
}
}
}
impl std::error::Error for WorkflowError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum WorkflowStatus {
#[default]
Running,
Success,
Failure,
}
impl WorkflowStatus {
#[inline]
pub fn is_running(&self) -> bool {
matches!(self, WorkflowStatus::Running)
}
#[inline]
pub fn is_terminal(&self) -> bool {
matches!(self, WorkflowStatus::Success | WorkflowStatus::Failure)
}
#[inline]
pub fn is_success(&self) -> bool {
matches!(self, WorkflowStatus::Success)
}
#[inline]
pub fn is_failure(&self) -> bool {
matches!(self, WorkflowStatus::Failure)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum NodeState {
#[default]
Pending,
Active,
Completed,
Failed,
}
impl NodeState {
#[inline]
pub fn is_pending(&self) -> bool {
matches!(self, NodeState::Pending)
}
#[inline]
pub fn is_active(&self) -> bool {
matches!(self, NodeState::Active)
}
#[inline]
pub fn is_completed(&self) -> bool {
matches!(self, NodeState::Completed)
}
#[inline]
pub fn is_failed(&self) -> bool {
matches!(self, NodeState::Failed)
}
#[inline]
pub fn is_terminal(&self) -> bool {
matches!(self, NodeState::Completed | NodeState::Failed)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub struct Edge {
pub(crate) source: u64,
pub(crate) target: u64,
}
impl Edge {
pub fn new(source: u64, target: u64) -> Self {
Edge { source, target }
}
#[inline]
pub fn from(&self) -> u64 {
self.source
}
#[inline]
pub fn to(&self) -> u64 {
self.target
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct WorkflowDef {
pub(crate) nodes: Vec<u64>,
pub(crate) edges: Vec<Edge>,
predecessors: Vec<Vec<u64>>,
successors: Vec<Vec<u64>>,
node_to_index: Vec<(u64, usize)>,
}
impl WorkflowDef {
fn build_adjacency(&mut self) {
let n = self.nodes.len();
self.predecessors = vec![Vec::new(); n];
self.successors = vec![Vec::new(); n];
self.node_to_index = self
.nodes
.iter()
.enumerate()
.map(|(i, &id)| (id, i))
.collect();
self.node_to_index.sort_unstable_by_key(|&(id, _)| id);
for e in &self.edges {
if let (Some(src_idx), Some(tgt_idx)) =
(self.node_index(e.source), self.node_index(e.target))
{
self.predecessors[tgt_idx].push(e.source);
self.successors[src_idx].push(e.target);
}
}
}
pub fn new(nodes: Vec<u64>, edges: Vec<Edge>) -> Self {
let mut def = WorkflowDef {
nodes,
edges,
predecessors: Vec::new(),
successors: Vec::new(),
node_to_index: Vec::new(),
};
def.build_adjacency();
def
}
pub fn validated(mut nodes: Vec<u64>, edges: Vec<Edge>) -> Result<Self, WorkflowError> {
nodes.sort_unstable();
nodes.dedup();
for e in &edges {
if nodes.binary_search(&e.source).is_err() {
return Err(WorkflowError::NodeNotFound(e.source));
}
if nodes.binary_search(&e.target).is_err() {
return Err(WorkflowError::NodeNotFound(e.target));
}
}
let mut def = WorkflowDef {
nodes,
edges,
predecessors: Vec::new(),
successors: Vec::new(),
node_to_index: Vec::new(),
};
def.build_adjacency();
Ok(def)
}
#[inline]
pub fn node_index(&self, node_id: u64) -> Option<usize> {
self.node_to_index
.binary_search_by_key(&node_id, |&(id, _)| id)
.ok()
.map(|pos| self.node_to_index[pos].1)
}
pub fn require_node(&self, node_id: u64) -> Result<usize, WorkflowError> {
self.node_index(node_id)
.ok_or(WorkflowError::NodeNotFound(node_id))
}
pub fn is_source(&self, n: u64) -> bool {
match self.node_index(n) {
Some(idx) => self.predecessors[idx].is_empty(),
None => true, }
}
pub fn is_sink(&self, n: u64) -> bool {
match self.node_index(n) {
Some(idx) => self.successors[idx].is_empty(),
None => true,
}
}
pub fn dependencies(&self, n: u64) -> Vec<u64> {
match self.node_index(n) {
Some(idx) => self.predecessors[idx].clone(),
None => Vec::new(),
}
}
pub fn dependents(&self, n: u64) -> Vec<u64> {
match self.node_index(n) {
Some(idx) => self.successors[idx].clone(),
None => Vec::new(),
}
}
#[inline]
pub fn node_count(&self) -> usize {
self.nodes.len()
}
#[inline]
pub fn edge_count(&self) -> usize {
self.edges.len()
}
#[inline]
pub fn contains_node(&self, n: u64) -> bool {
self.node_index(n).is_some()
}
pub fn nodes(&self) -> &[u64] {
&self.nodes
}
pub fn edges(&self) -> &[Edge] {
&self.edges
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NodeStateEntry {
pub(crate) node_id: u64,
pub(crate) state: NodeState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryEntry {
pub(crate) node_id: u64,
pub(crate) count: u64,
}
#[derive(Debug, Clone, Default)]
pub struct WorkflowInstance {
pub(crate) workflow_def: WorkflowDef,
pub(crate) status: WorkflowStatus,
pub(crate) node_states: Vec<NodeState>,
pub(crate) retries: Vec<u16>,
pub(crate) timeout_at: Time,
pending_ct: u32,
active_ct: u32,
failed_ct: u32,
}
impl WorkflowInstance {
pub fn new(workflow_def: WorkflowDef, timeout_at: Time, max_retries: u64) -> Self {
let n = workflow_def.nodes.len();
let node_states = vec![NodeState::Pending; n];
let retry_val = if max_retries > 65535u64 {
65535u16
} else {
max_retries as u16
};
let retries = vec![retry_val; n];
WorkflowInstance {
workflow_def,
status: WorkflowStatus::Running,
node_states,
retries,
timeout_at,
pending_ct: n as u32,
active_ct: 0,
failed_ct: 0,
}
}
pub fn running(timeout_at: Time) -> Self {
WorkflowInstance {
workflow_def: WorkflowDef::default(),
status: WorkflowStatus::Running,
node_states: Vec::new(),
retries: Vec::new(),
timeout_at,
pending_ct: 0,
active_ct: 0,
failed_ct: 0,
}
}
#[inline]
pub fn is_running(&self) -> bool {
self.status.is_running()
}
#[inline]
fn require_node(&self, n: u64) -> Result<usize, WorkflowError> {
self.workflow_def.require_node(n)
}
pub fn get_node_state(&self, n: u64) -> NodeState {
match self.workflow_def.node_index(n) {
Some(idx) => self.node_states[idx],
None => NodeState::Pending,
}
}
#[inline]
pub fn is_pending(&self, n: u64) -> bool {
self.get_node_state(n).is_pending()
}
#[inline]
pub fn is_active(&self, n: u64) -> bool {
self.get_node_state(n).is_active()
}
#[inline]
pub fn pending_count(&self) -> usize {
self.pending_ct as usize
}
#[inline]
pub fn active_count(&self) -> usize {
self.active_ct as usize
}
#[inline]
pub fn is_terminal(&self) -> bool {
self.pending_ct == 0 && self.active_ct == 0
}
#[inline]
pub fn has_failure(&self) -> bool {
self.failed_ct > 0
}
#[inline]
pub fn status(&self) -> WorkflowStatus {
self.status
}
#[inline]
pub fn timeout_at(&self) -> Time {
self.timeout_at
}
pub fn get_retries(&self, n: u64) -> u64 {
match self.workflow_def.node_index(n) {
Some(idx) => self.retries[idx] as u64,
None => 0,
}
}
#[inline]
pub fn definition(&self) -> &WorkflowDef {
&self.workflow_def
}
pub fn dependencies_satisfied(&self, n: u64) -> bool {
match self.workflow_def.node_index(n) {
Some(idx) => {
let preds = &self.workflow_def.predecessors[idx];
let mut j = 0;
while j < preds.len() {
let pred_id = preds[j];
if let Some(pred_idx) = self.workflow_def.node_index(pred_id) {
if !self.node_states[pred_idx].is_completed() {
return false;
}
}
j += 1;
}
true
}
None => true,
}
}
fn transition(&mut self, idx: usize, new_state: NodeState) {
let old = self.node_states[idx];
match old {
NodeState::Pending => {
debug_assert!(self.pending_ct > 0, "pending_ct underflow");
self.pending_ct = self.pending_ct.saturating_sub(1);
}
NodeState::Active => {
debug_assert!(self.active_ct > 0, "active_ct underflow");
self.active_ct = self.active_ct.saturating_sub(1);
}
NodeState::Failed => {
debug_assert!(self.failed_ct > 0, "failed_ct underflow");
self.failed_ct = self.failed_ct.saturating_sub(1);
}
NodeState::Completed => {}
}
match new_state {
NodeState::Pending => self.pending_ct += 1,
NodeState::Active => self.active_ct += 1,
NodeState::Failed => self.failed_ct += 1,
NodeState::Completed => {}
}
self.node_states[idx] = new_state;
}
pub fn start_node(&mut self, n: u64) -> Result<(), WorkflowError> {
let idx = self.require_node(n)?;
let state = self.node_states[idx];
if !state.is_pending() {
return Err(WorkflowError::InvalidTransition {
node: n,
from: state,
to: NodeState::Active,
});
}
if !self.dependencies_satisfied(n) {
let preds = &self.workflow_def.predecessors[idx];
let mut unsatisfied = Vec::new();
let mut j = 0;
while j < preds.len() {
let pred_id = preds[j];
if let Some(pi) = self.workflow_def.node_index(pred_id) {
if !self.node_states[pi].is_completed() {
unsatisfied.push(pred_id);
}
} else {
unsatisfied.push(pred_id);
}
j += 1;
}
return Err(WorkflowError::DependenciesNotSatisfied {
node: n,
deps: unsatisfied,
});
}
self.transition(idx, NodeState::Active);
Ok(())
}
pub fn complete_node(&mut self, n: u64) -> Result<(), WorkflowError> {
let idx = self.require_node(n)?;
let state = self.node_states[idx];
if !state.is_active() {
return Err(WorkflowError::InvalidTransition {
node: n,
from: state,
to: NodeState::Completed,
});
}
self.transition(idx, NodeState::Completed);
self.update_workflow_status();
Ok(())
}
pub fn fail_node(&mut self, n: u64) -> Result<(), WorkflowError> {
let idx = self.require_node(n)?;
let state = self.node_states[idx];
if !state.is_active() {
return Err(WorkflowError::InvalidTransition {
node: n,
from: state,
to: NodeState::Failed,
});
}
let retries = self.retries[idx];
if retries > 0 {
self.transition(idx, NodeState::Pending);
self.retries[idx] = retries - 1;
} else {
self.transition(idx, NodeState::Failed);
self.status = WorkflowStatus::Failure;
}
Ok(())
}
pub fn apply_timeout(&mut self) {
self.status = WorkflowStatus::Failure;
}
fn update_workflow_status(&mut self) {
if self.is_terminal() {
if self.has_failure() {
self.status = WorkflowStatus::Failure;
} else {
self.status = WorkflowStatus::Success;
}
}
}
pub fn measure(&self, now: Time) -> u64 {
let time_left = self.timeout_at.saturating_sub(now);
let pending = self.pending_ct as u64;
let active = self.active_ct as u64;
let retries_remaining: u64 = self
.retries
.iter()
.fold(0u64, |acc, &r| acc.saturating_add(r as u64));
time_left
.saturating_add(pending.saturating_mul(1000))
.saturating_add(active.saturating_mul(100))
.saturating_add(retries_remaining.saturating_mul(1000))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_linear_workflow() -> WorkflowDef {
WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 2), Edge::new(2, 3)])
}
fn make_parallel_workflow() -> WorkflowDef {
WorkflowDef::new(vec![1, 2, 3], vec![Edge::new(1, 3), Edge::new(2, 3)])
}
#[test]
fn test_workflow_status() {
assert!(WorkflowStatus::Running.is_running());
assert!(!WorkflowStatus::Running.is_terminal());
assert!(WorkflowStatus::Success.is_terminal());
assert!(WorkflowStatus::Success.is_success());
assert!(WorkflowStatus::Failure.is_terminal());
assert!(WorkflowStatus::Failure.is_failure());
}
#[test]
fn test_node_state() {
assert!(NodeState::Pending.is_pending());
assert!(NodeState::Active.is_active());
assert!(NodeState::Completed.is_completed());
assert!(NodeState::Failed.is_failed());
assert!(NodeState::Completed.is_terminal());
assert!(NodeState::Failed.is_terminal());
assert!(!NodeState::Pending.is_terminal());
assert!(!NodeState::Active.is_terminal());
}
#[test]
fn test_workflow_def_dependencies() {
let wf = make_linear_workflow();
assert!(wf.dependencies(1).is_empty());
assert!(wf.is_source(1));
assert_eq!(wf.dependencies(2), vec![1]);
assert_eq!(wf.dependencies(3), vec![2]);
assert!(wf.is_sink(3));
}
#[test]
fn test_workflow_def_dependents() {
let wf = make_linear_workflow();
assert_eq!(wf.dependents(1), vec![2]);
assert_eq!(wf.dependents(2), vec![3]);
assert!(wf.dependents(3).is_empty());
}
#[test]
fn test_workflow_instance_new() {
let wf = make_linear_workflow();
let wi = WorkflowInstance::new(wf, 100, 3);
assert!(wi.status().is_running());
assert_eq!(wi.pending_count(), 3);
assert_eq!(wi.active_count(), 0);
assert_eq!(wi.get_retries(1), 3);
}
#[test]
fn test_workflow_instance_start_node() {
let wf = make_linear_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 0);
assert!(wi.start_node(1).is_ok());
assert!(wi.is_active(1));
let result = wi.start_node(2);
assert!(matches!(
result,
Err(WorkflowError::DependenciesNotSatisfied { .. })
));
}
#[test]
fn test_workflow_instance_complete_flow() {
let wf = make_linear_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 0);
wi.start_node(1).expect("start 1");
wi.complete_node(1).expect("complete 1");
wi.start_node(2).expect("start 2");
wi.complete_node(2).expect("complete 2");
wi.start_node(3).expect("start 3");
wi.complete_node(3).expect("complete 3");
assert!(wi.is_terminal());
assert!(wi.status().is_success());
}
#[test]
fn test_workflow_instance_failure() {
let wf = make_linear_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 0);
wi.start_node(1).expect("start 1");
wi.fail_node(1).expect("fail 1");
assert!(wi.status().is_failure());
}
#[test]
fn test_workflow_instance_retry() {
let wf = make_linear_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 2);
wi.start_node(1).expect("start 1");
assert_eq!(wi.get_retries(1), 2);
wi.fail_node(1).expect("fail 1");
assert!(wi.is_pending(1)); assert_eq!(wi.get_retries(1), 1);
assert!(wi.status().is_running());
wi.start_node(1).expect("start 1 again");
wi.fail_node(1).expect("fail 1 again");
assert!(wi.is_pending(1));
assert_eq!(wi.get_retries(1), 0);
wi.start_node(1).expect("start 1 final");
wi.fail_node(1).expect("fail 1 final");
assert!(wi.get_node_state(1).is_failed());
assert!(wi.status().is_failure());
}
#[test]
fn test_workflow_measure_decreases_on_complete() {
let wf = make_linear_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 0);
let m1 = wi.measure(0);
wi.start_node(1).expect("start 1");
let m2 = wi.measure(0);
assert!(m2 < m1);
wi.complete_node(1).expect("complete 1");
let m3 = wi.measure(0);
assert!(m3 < m2);
}
#[test]
fn test_workflow_parallel() {
let wf = make_parallel_workflow();
let mut wi = WorkflowInstance::new(wf, 100, 0);
wi.start_node(1).expect("start 1");
wi.start_node(2).expect("start 2");
let result = wi.start_node(3);
assert!(matches!(
result,
Err(WorkflowError::DependenciesNotSatisfied { .. })
));
wi.complete_node(1).expect("complete 1");
let result = wi.start_node(3);
assert!(matches!(
result,
Err(WorkflowError::DependenciesNotSatisfied { .. })
));
wi.complete_node(2).expect("complete 2");
wi.start_node(3).expect("start 3");
wi.complete_node(3).expect("complete 3");
assert!(wi.status().is_success());
}
}