car-workflow 0.22.0

Declarative multi-stage workflow orchestration for Common Agent Runtime
Documentation
//! Workflow definition types — fully serializable for use through all bindings.

use std::collections::HashMap;

use car_ir::{ActionProposal, Precondition};
use car_multi::AgentSpec;
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// Top-level workflow definition: a named graph of stages with conditional edges.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Workflow {
    /// Unique identifier.
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// Stage ID of the entry point.
    pub start: String,
    /// The overall objective. When set, it is pinned into workflow state as
    /// `goal` at run start and re-anchored into every agent coordination's task
    /// ("Overall goal: … / Current step: …") so a long multi-stage run can't
    /// drift from the original objective. A structural defense against goal drift.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub goal: Option<String>,
    /// All stages in this workflow.
    pub stages: Vec<Stage>,
    /// Directed edges between stages (may have conditions).
    pub edges: Vec<Edge>,
    /// Maximum total stage executions before aborting (loop guard).
    #[serde(default = "default_max_iterations")]
    pub max_iterations: u32,
    /// Opaque metadata (owner, version, tags, etc.).
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

fn default_max_iterations() -> u32 {
    100
}

/// A named step in the workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stage {
    /// Unique stage identifier (referenced by edges).
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// What this stage does.
    pub step: StageStep,
    /// Optional compensation to run if a later stage fails (saga pattern).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub compensation: Option<CompensationHandler>,
    /// Optional timeout for this stage in milliseconds.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub timeout_ms: Option<u64>,
    /// Opaque metadata.
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

/// What a stage actually does.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StageStep {
    /// Run a car-multi agent coordination pattern.
    Pattern(PatternStep),
    /// Execute a car-engine action proposal directly.
    Proposal(ProposalStep),
    /// Run a nested sub-workflow.
    SubWorkflow(SubWorkflowStep),
    /// Pause the workflow and wait for human input (human-in-the-loop gate).
    ///
    /// The engine snapshots the run, returns [`crate::WorkflowStatus::Paused`]
    /// with a [`crate::PausedWorkflow`] checkpoint, and does not execute this
    /// stage's body. The run continues only when
    /// [`crate::WorkflowEngine::resume`] is called with the human's response.
    Approval(ApprovalStep),
    /// Repeat an inner step until a state predicate holds or a cap is reached.
    ///
    /// This makes the blog-style "loop until done" idiom a first-class,
    /// verifiable construct rather than something hand-wired from a back-edge,
    /// a counter stage, and a conditional edge. Bounded by `max_iterations`.
    LoopUntil(LoopUntilStep),
    /// Fan an inner step out over a list of items resolved **at runtime** from
    /// workflow state (e.g. produced by an earlier stage), not hardcoded in the
    /// manifest. This is the declarative analogue of `parallel(items.map(...))`
    /// where the item count is unknown until the run reaches this stage.
    ForEach(ForEachStep),
}

/// Repeat `body` until `until` holds (AND of preconditions over workflow state)
/// or `max_iterations` is reached — whichever comes first.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoopUntilStep {
    /// The step run each iteration. Its produced state (proposal `state_changes`,
    /// and `stage.<loop_id>.answer`/`.iteration`) is visible to `until`.
    pub body: Box<StageStep>,
    /// Loop stops once ALL of these pass (AND) against the state after a body
    /// iteration. Empty `until` runs exactly `max_iterations` times.
    #[serde(default)]
    pub until: Vec<Precondition>,
    /// Hard cap on body iterations. Must be >= 1 (enforced by
    /// [`crate::verify_workflow`]); the loop guard against unbounded repetition.
    pub max_iterations: u32,
}

/// Run `body` once per element of the array at state key `items_from`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForEachStep {
    /// Workflow-state key holding a JSON array. Each element becomes one item.
    /// A missing key or non-array value yields zero iterations (a no-op).
    pub items_from: String,
    /// The step to run per item. Before each run, the placeholders `{{item}}`
    /// (the element rendered as a string) and `{{index}}` (0-based position) are
    /// substituted into every string *value* in the body — pattern tasks, agent
    /// prompts, proposal parameter values — so the body can be parameterized by
    /// the item. (JSON object *keys* are not templated; `{{item}}` is expanded
    /// before `{{index}}`, so item text containing `{{index}}` is itself
    /// expanded.)
    ///
    /// Per-item results land in workflow state under
    /// `foreach.<id>.<index>.{item,answer}`, and the body's own state deltas
    /// (e.g. a proposal's `state_changes`) under
    /// `foreach.<id>.<index>.state.<key>` — namespaced per item so concurrent
    /// bodies never clobber one another. (Body deltas are deliberately NOT
    /// merged under their bare keys, unlike `LoopUntil`, which runs serially.)
    pub body: Box<StageStep>,
    /// Max bodies to run concurrently. `0` or `1` means sequential. Higher fans
    /// out via the runtime's bounded concurrency.
    #[serde(default)]
    pub max_concurrent: usize,
}

/// Run one of the car-multi coordination patterns.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatternStep {
    /// Which pattern to run.
    pub pattern: PatternKind,
    /// Task description passed to the pattern.
    pub task: String,
    /// Agents involved. Interpretation depends on pattern kind.
    pub agents: Vec<AgentSpec>,
    /// Pattern-specific configuration.
    /// - supervisor: `{"max_rounds": 3, "supervisor_index": 0}`
    /// - map_reduce: `{"max_concurrent": 5, "items": ["a", "b", "c"]}`
    /// - fleet: `{"timeout_secs": 60}`
    #[serde(default)]
    pub config: HashMap<String, Value>,
}

/// All supported car-multi coordination patterns.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PatternKind {
    SwarmParallel,
    SwarmSequential,
    SwarmDebate,
    Pipeline,
    Supervisor,
    Delegator,
    MapReduce,
    Vote,
    Fleet,
    /// Fresh-context verification: a reviewer agent checks prior work against
    /// acceptance criteria with no author context. A structural defense against
    /// self-preferential bias and premature "done". `agents[0]` is the reviewer;
    /// `config.criteria` is a string array; `config.review_key` is the state key
    /// holding the work to review (required — there is no default; a missing or
    /// empty value fails the review closed). The verdict is exposed as the typed
    /// `stage.<id>.review_passed` (bool) for edge branching.
    AdversarialReview,
    /// Rank competitors by single-elimination pairwise judging. All agents but
    /// the last are competitors; the last (or `config.judge_index`) is the judge.
    Tournament,
}

/// Execute a car-engine action proposal.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProposalStep {
    pub proposal: ActionProposal,
}

/// Run a nested workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubWorkflowStep {
    pub workflow: Box<Workflow>,
}

/// Pause for human-in-the-loop approval or input.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApprovalStep {
    /// Message shown to the human when the run pauses.
    pub prompt: String,
    /// Structured fields the human is asked to fill in (empty = free-form approval).
    #[serde(default)]
    pub fields: Vec<ApprovalField>,
    /// State key under which the human's response object is written on resume.
    ///
    /// The response is also mirrored to `stage.<id>.answer` for edge conditions.
    /// Must be non-empty (enforced by [`crate::verify_workflow`]).
    pub output_key: String,
}

/// One field in an [`ApprovalStep`] form.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApprovalField {
    /// Machine name (key in the response object).
    pub name: String,
    /// Human-readable label.
    #[serde(default)]
    pub label: String,
    /// Input kind: `text`, `textarea`, `options`, `bool`, or `number`.
    #[serde(default = "default_field_type")]
    pub field_type: String,
    /// Allowed values when `field_type` is `options`.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub options: Vec<String>,
    /// Whether the human must supply this field.
    #[serde(default)]
    pub required: bool,
}

fn default_field_type() -> String {
    "text".to_string()
}

/// Directed edge between two stages, optionally conditional.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Edge {
    /// Source stage ID.
    pub from: String,
    /// Target stage ID.
    pub to: String,
    /// Conditions that must ALL be satisfied (AND) for this edge to be taken.
    /// Evaluated against the workflow state after `from` completes.
    /// Empty = unconditional.
    #[serde(default)]
    pub conditions: Vec<Precondition>,
    /// Human-readable label (e.g., "on success", "if approved").
    #[serde(default)]
    pub label: String,
}

/// What to run when compensating a stage on workflow failure (saga pattern).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CompensationHandler {
    /// Run a proposal to undo side effects.
    Proposal(ProposalStep),
    /// Execute a named stage from this workflow.
    StageRef { stage_id: String },
}

impl Workflow {
    /// Look up a stage by ID.
    pub fn stage(&self, id: &str) -> Option<&Stage> {
        self.stages.iter().find(|s| s.id == id)
    }

    /// Get all outgoing edges from a stage.
    pub fn outgoing_edges(&self, stage_id: &str) -> Vec<&Edge> {
        self.edges.iter().filter(|e| e.from == stage_id).collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn simple_workflow() -> Workflow {
        Workflow {
            id: "test-wf".into(),
            name: "Test Workflow".into(),
            start: "stage-a".into(),
            goal: None,
            stages: vec![
                Stage {
                    id: "stage-a".into(),
                    name: "Stage A".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p1".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
                Stage {
                    id: "stage-b".into(),
                    name: "Stage B".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p2".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
            ],
            edges: vec![Edge {
                from: "stage-a".into(),
                to: "stage-b".into(),
                conditions: vec![],
                label: "always".into(),
            }],
            max_iterations: 100,
            metadata: HashMap::new(),
        }
    }

    #[test]
    fn serde_roundtrip() {
        let wf = simple_workflow();
        let json = serde_json::to_string_pretty(&wf).unwrap();
        let parsed: Workflow = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.id, "test-wf");
        assert_eq!(parsed.stages.len(), 2);
        assert_eq!(parsed.edges.len(), 1);
        assert_eq!(parsed.start, "stage-a");
    }

    #[test]
    fn stage_lookup() {
        let wf = simple_workflow();
        assert!(wf.stage("stage-a").is_some());
        assert!(wf.stage("nonexistent").is_none());
    }

    #[test]
    fn outgoing_edges() {
        let wf = simple_workflow();
        assert_eq!(wf.outgoing_edges("stage-a").len(), 1);
        assert_eq!(wf.outgoing_edges("stage-b").len(), 0);
    }
}