use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::step::StepDef;
pub type FlowId = Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum FlowMode {
Sequential,
Parallel,
Dag,
Hierarchical,
}
impl std::fmt::Display for FlowMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sequential => write!(f, "sequential"),
Self::Parallel => write!(f, "parallel"),
Self::Dag => write!(f, "dag"),
Self::Hierarchical => write!(f, "hierarchical"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDef {
pub id: FlowId,
pub name: String,
pub description: String,
pub mode: FlowMode,
pub steps: Vec<StepDef>,
pub rollback_on_failure: bool,
pub timeout_ms: Option<u64>,
#[serde(default = "default_flow_version")]
pub version: u32,
}
#[must_use]
#[inline]
pub(crate) fn default_flow_version() -> u32 {
1
}
impl FlowDef {
#[must_use]
pub fn new(name: impl Into<String>, mode: FlowMode) -> Self {
Self {
id: Uuid::new_v4(),
name: name.into(),
description: String::new(),
mode,
steps: Vec::new(),
rollback_on_failure: false,
timeout_ms: None,
version: default_flow_version(),
}
}
pub fn add_step(&mut self, step: StepDef) {
self.steps.push(step);
}
#[must_use]
pub fn with_version(mut self, version: u32) -> Self {
self.version = version;
self
}
#[must_use]
pub fn with_rollback(mut self) -> Self {
self.rollback_on_failure = true;
self
}
#[must_use]
pub fn with_timeout(mut self, ms: u64) -> Self {
self.timeout_ms = Some(ms);
self
}
pub fn validate(&self) -> crate::Result<()> {
if self.mode == FlowMode::Dag {
self.check_cycles()?;
} else if self.steps.iter().any(|s| !s.depends_on.is_empty()) {
return Err(crate::SzalError::InvalidFlow(format!(
"steps have dependencies but flow mode is {} (use dag mode for dependencies)",
self.mode
)));
}
if self.mode == FlowMode::Hierarchical {
self.validate_hierarchical(&self.steps)?;
}
for step in &self.steps {
if step.trigger_mode == crate::step::TriggerMode::Any && step.depends_on.is_empty() {
return Err(crate::SzalError::InvalidFlow(format!(
"step '{}' has trigger_mode Any but no dependencies",
step.name
)));
}
}
Ok(())
}
fn validate_hierarchical(&self, steps: &[crate::step::StepDef]) -> crate::Result<()> {
for step in steps {
if !step.depends_on.is_empty() {
return Err(crate::SzalError::InvalidFlow(format!(
"step '{}' has depends_on in hierarchical mode (use sub_steps for nesting)",
step.name
)));
}
self.validate_hierarchical(&step.sub_steps)?;
}
Ok(())
}
fn check_cycles(&self) -> crate::Result<()> {
use std::collections::{HashMap, HashSet};
let id_set: HashSet<_> = self.steps.iter().map(|s| s.id).collect();
let mut visited = HashSet::new();
let mut in_stack = HashSet::new();
let deps: HashMap<_, _> = self.steps.iter().map(|s| (s.id, &s.depends_on)).collect();
fn dfs(
node: uuid::Uuid,
deps: &HashMap<uuid::Uuid, &Vec<uuid::Uuid>>,
visited: &mut HashSet<uuid::Uuid>,
in_stack: &mut HashSet<uuid::Uuid>,
) -> bool {
visited.insert(node);
in_stack.insert(node);
if let Some(neighbors) = deps.get(&node) {
for &n in *neighbors {
if !visited.contains(&n) {
if dfs(n, deps, visited, in_stack) {
return true;
}
} else if in_stack.contains(&n) {
return true;
}
}
}
in_stack.remove(&node);
false
}
for step in &self.steps {
if !visited.contains(&step.id) && dfs(step.id, &deps, &mut visited, &mut in_stack) {
return Err(crate::SzalError::CycleDetected(self.name.clone()));
}
}
for step in &self.steps {
for dep in &step.depends_on {
if !id_set.contains(dep) {
return Err(crate::SzalError::InvalidFlow(format!(
"step '{}' depends on non-existent step",
step.name
)));
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn flow_sequential() {
let mut flow = FlowDef::new("deploy", FlowMode::Sequential);
flow.add_step(StepDef::new("build"));
flow.add_step(StepDef::new("test"));
flow.add_step(StepDef::new("deploy"));
assert_eq!(flow.steps.len(), 3);
assert_eq!(flow.mode, FlowMode::Sequential);
}
#[test]
fn flow_dag_valid() {
let build = StepDef::new("build");
let test = StepDef::new("test").depends_on(build.id);
let deploy = StepDef::new("deploy").depends_on(test.id);
let mut flow = FlowDef::new("pipeline", FlowMode::Dag);
flow.add_step(build);
flow.add_step(test);
flow.add_step(deploy);
assert!(flow.validate().is_ok());
}
#[test]
fn flow_dag_cycle_detected() {
let mut a = StepDef::new("a");
let mut b = StepDef::new("b");
b.depends_on = vec![a.id];
a.depends_on = vec![b.id];
let mut flow = FlowDef::new("broken", FlowMode::Dag);
flow.add_step(a);
flow.add_step(b);
assert!(flow.validate().is_err());
}
#[test]
fn flow_mode_display() {
assert_eq!(FlowMode::Sequential.to_string(), "sequential");
assert_eq!(FlowMode::Dag.to_string(), "dag");
}
#[test]
fn flow_builder() {
let flow = FlowDef::new("test", FlowMode::Parallel)
.with_rollback()
.with_timeout(120_000);
assert!(flow.rollback_on_failure);
assert_eq!(flow.timeout_ms, Some(120_000));
}
#[test]
fn flow_sequential_rejects_depends_on() {
let a = StepDef::new("a");
let b = StepDef::new("b").depends_on(a.id);
let mut flow = FlowDef::new("bad", FlowMode::Sequential);
flow.add_step(a);
flow.add_step(b);
assert!(flow.validate().is_err());
}
#[test]
fn flow_parallel_rejects_depends_on() {
let a = StepDef::new("a");
let b = StepDef::new("b").depends_on(a.id);
let mut flow = FlowDef::new("bad", FlowMode::Parallel);
flow.add_step(a);
flow.add_step(b);
assert!(flow.validate().is_err());
}
#[test]
fn flow_version_defaults_and_builder() {
let flow = FlowDef::new("v", FlowMode::Sequential);
assert_eq!(flow.version, 1);
let flow = flow.with_version(5);
assert_eq!(flow.version, 5);
}
#[test]
fn flow_deserializes_without_version_field() {
let legacy = r#"{
"id": "00000000-0000-0000-0000-000000000000",
"name": "legacy",
"description": "",
"mode": "Sequential",
"steps": [],
"rollback_on_failure": false,
"timeout_ms": null
}"#;
let flow: FlowDef = serde_json::from_str(legacy).unwrap();
assert_eq!(flow.version, 1);
assert_eq!(flow.name, "legacy");
}
#[test]
fn flow_version_serde_roundtrip() {
let flow = FlowDef::new("v", FlowMode::Dag).with_version(7);
let json = serde_json::to_string(&flow).unwrap();
let back: FlowDef = serde_json::from_str(&json).unwrap();
assert_eq!(back.version, 7);
}
#[test]
fn flow_serde_roundtrip() {
let build = StepDef::new("build");
let test = StepDef::new("test").depends_on(build.id);
let mut flow = FlowDef::new("pipeline", FlowMode::Dag).with_rollback();
flow.add_step(build);
flow.add_step(test);
let json = serde_json::to_string(&flow).unwrap();
let back: FlowDef = serde_json::from_str(&json).unwrap();
assert_eq!(back.name, "pipeline");
assert_eq!(back.steps.len(), 2);
assert!(back.rollback_on_failure);
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn linear_dag_never_cycles(n in 2usize..50) {
let mut flow = FlowDef::new("linear", FlowMode::Dag);
let mut prev_id = None;
for i in 0..n {
let mut step = StepDef::new(format!("s{i}"));
if let Some(pid) = prev_id {
step = step.depends_on(pid);
}
prev_id = Some(step.id);
flow.add_step(step);
}
prop_assert!(flow.validate().is_ok());
}
#[test]
fn fanout_dag_never_cycles(leaves in 2usize..50) {
let root = StepDef::new("root");
let root_id = root.id;
let mut flow = FlowDef::new("fanout", FlowMode::Dag);
flow.add_step(root);
for i in 0..leaves {
flow.add_step(StepDef::new(format!("leaf{i}")).depends_on(root_id));
}
prop_assert!(flow.validate().is_ok());
}
#[test]
fn sequential_always_valid(n in 1usize..50) {
let mut flow = FlowDef::new("seq", FlowMode::Sequential);
for i in 0..n {
flow.add_step(StepDef::new(format!("s{i}")));
}
prop_assert!(flow.validate().is_ok());
}
}
}