car-workflow 0.7.0

Declarative multi-stage workflow orchestration for Common Agent Runtime
Documentation
//! Workflow definition types — fully serializable for use through all bindings.

use std::collections::HashMap;

use car_ir::{ActionProposal, Precondition};
use car_multi::AgentSpec;
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// Top-level workflow definition: a named graph of stages with conditional edges.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Workflow {
    /// Unique identifier.
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// Stage ID of the entry point.
    pub start: String,
    /// All stages in this workflow.
    pub stages: Vec<Stage>,
    /// Directed edges between stages (may have conditions).
    pub edges: Vec<Edge>,
    /// Maximum total stage executions before aborting (loop guard).
    #[serde(default = "default_max_iterations")]
    pub max_iterations: u32,
    /// Opaque metadata (owner, version, tags, etc.).
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

fn default_max_iterations() -> u32 {
    100
}

/// A named step in the workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stage {
    /// Unique stage identifier (referenced by edges).
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// What this stage does.
    pub step: StageStep,
    /// Optional compensation to run if a later stage fails (saga pattern).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub compensation: Option<CompensationHandler>,
    /// Optional timeout for this stage in milliseconds.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub timeout_ms: Option<u64>,
    /// Opaque metadata.
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

/// What a stage actually does.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StageStep {
    /// Run a car-multi agent coordination pattern.
    Pattern(PatternStep),
    /// Execute a car-engine action proposal directly.
    Proposal(ProposalStep),
    /// Run a nested sub-workflow.
    SubWorkflow(SubWorkflowStep),
}

/// Run one of the car-multi coordination patterns.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatternStep {
    /// Which pattern to run.
    pub pattern: PatternKind,
    /// Task description passed to the pattern.
    pub task: String,
    /// Agents involved. Interpretation depends on pattern kind.
    pub agents: Vec<AgentSpec>,
    /// Pattern-specific configuration.
    /// - supervisor: `{"max_rounds": 3, "supervisor_index": 0}`
    /// - map_reduce: `{"max_concurrent": 5, "items": ["a", "b", "c"]}`
    /// - fleet: `{"timeout_secs": 60}`
    #[serde(default)]
    pub config: HashMap<String, Value>,
}

/// All supported car-multi coordination patterns.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PatternKind {
    SwarmParallel,
    SwarmSequential,
    SwarmDebate,
    Pipeline,
    Supervisor,
    Delegator,
    MapReduce,
    Vote,
    Fleet,
}

/// Execute a car-engine action proposal.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProposalStep {
    pub proposal: ActionProposal,
}

/// Run a nested workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubWorkflowStep {
    pub workflow: Box<Workflow>,
}

/// Directed edge between two stages, optionally conditional.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Edge {
    /// Source stage ID.
    pub from: String,
    /// Target stage ID.
    pub to: String,
    /// Conditions that must ALL be satisfied (AND) for this edge to be taken.
    /// Evaluated against the workflow state after `from` completes.
    /// Empty = unconditional.
    #[serde(default)]
    pub conditions: Vec<Precondition>,
    /// Human-readable label (e.g., "on success", "if approved").
    #[serde(default)]
    pub label: String,
}

/// What to run when compensating a stage on workflow failure (saga pattern).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CompensationHandler {
    /// Run a proposal to undo side effects.
    Proposal(ProposalStep),
    /// Execute a named stage from this workflow.
    StageRef { stage_id: String },
}

impl Workflow {
    /// Look up a stage by ID.
    pub fn stage(&self, id: &str) -> Option<&Stage> {
        self.stages.iter().find(|s| s.id == id)
    }

    /// Get all outgoing edges from a stage.
    pub fn outgoing_edges(&self, stage_id: &str) -> Vec<&Edge> {
        self.edges.iter().filter(|e| e.from == stage_id).collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn simple_workflow() -> Workflow {
        Workflow {
            id: "test-wf".into(),
            name: "Test Workflow".into(),
            start: "stage-a".into(),
            stages: vec![
                Stage {
                    id: "stage-a".into(),
                    name: "Stage A".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p1".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
                Stage {
                    id: "stage-b".into(),
                    name: "Stage B".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p2".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
            ],
            edges: vec![Edge {
                from: "stage-a".into(),
                to: "stage-b".into(),
                conditions: vec![],
                label: "always".into(),
            }],
            max_iterations: 100,
            metadata: HashMap::new(),
        }
    }

    #[test]
    fn serde_roundtrip() {
        let wf = simple_workflow();
        let json = serde_json::to_string_pretty(&wf).unwrap();
        let parsed: Workflow = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.id, "test-wf");
        assert_eq!(parsed.stages.len(), 2);
        assert_eq!(parsed.edges.len(), 1);
        assert_eq!(parsed.start, "stage-a");
    }

    #[test]
    fn stage_lookup() {
        let wf = simple_workflow();
        assert!(wf.stage("stage-a").is_some());
        assert!(wf.stage("nonexistent").is_none());
    }

    #[test]
    fn outgoing_edges() {
        let wf = simple_workflow();
        assert_eq!(wf.outgoing_edges("stage-a").len(), 1);
        assert_eq!(wf.outgoing_edges("stage-b").len(), 0);
    }
}