runkon-flow 0.6.1-alpha

Portable workflow execution engine — DSL, traits, and in-memory reference implementations
Documentation
use std::collections::HashMap;

use crate::engine_error::EngineError;
use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
use crate::types::{WorkflowRun, WorkflowRunStep};

pub use crate::traits::gate_approval_store::{
    gate_approval_state_from_fields, GateApprovalState, GateApprovalStore,
};
pub use crate::types::FanOutItemRow;

/// Parameters for creating a new workflow run.
pub struct NewRun {
    pub workflow_name: String,
    pub parent_run_id: String,
    pub dry_run: bool,
    pub trigger: String,
    pub definition_snapshot: Option<String>,
    pub parent_workflow_run_id: Option<String>,
}

/// Parameters for inserting a new workflow step.
///
/// When `retry_count` is `Some`, the step is inserted with `status = 'running'`
/// and `started_at` set atomically. When `None`, the step starts as `'pending'`.
pub struct NewStep {
    pub workflow_run_id: String,
    pub step_name: String,
    pub role: String,
    pub can_commit: bool,
    pub position: i64,
    pub iteration: i64,
    /// `Some(n)` → insert as running with retry_count=n; `None` → insert as pending.
    pub retry_count: Option<i64>,
}

/// Fields to update on an existing workflow step.
///
/// `generation` must match the parent `workflow_runs.generation` at write time;
/// a mismatch means another engine re-claimed the run and the write is rejected
/// with `EngineError::Cancelled(CancellationReason::LeaseLost)`.
/// Default is intentionally not derived — a silent `generation: 0` would mask
/// stale-write bugs.
pub struct StepUpdate {
    pub generation: i64,
    pub status: WorkflowStepStatus,
    pub child_run_id: Option<String>,
    pub result_text: Option<String>,
    pub context_out: Option<String>,
    pub markers_out: Option<String>,
    pub retry_count: Option<i64>,
    pub structured_output: Option<String>,
    pub step_error: Option<String>,
}

impl StepUpdate {
    /// Convenience constructor for a successful step completion.
    pub fn completed(
        generation: i64,
        child_run_id: Option<String>,
        result_text: Option<String>,
        context_out: Option<String>,
        markers_out: Option<String>,
        attempt: u32,
        structured_output: Option<String>,
    ) -> Self {
        Self {
            generation,
            status: WorkflowStepStatus::Completed,
            child_run_id,
            result_text,
            context_out,
            markers_out,
            retry_count: Some(attempt as i64),
            structured_output,
            step_error: None,
        }
    }

    /// Convenience constructor for a failed step.
    pub fn failed(generation: i64, err_msg: impl Into<String>, attempt: u32) -> Self {
        Self::failed_with_child(generation, err_msg, attempt, None)
    }

    /// Convenience constructor for a failed step with an optional child run ID.
    pub fn failed_with_child(
        generation: i64,
        err_msg: impl Into<String>,
        attempt: u32,
        child_run_id: Option<String>,
    ) -> Self {
        let err_msg = err_msg.into();
        Self {
            generation,
            status: WorkflowStepStatus::Failed,
            child_run_id,
            result_text: Some(err_msg.clone()),
            context_out: None,
            markers_out: None,
            retry_count: Some(attempt as i64),
            structured_output: None,
            step_error: Some(err_msg),
        }
    }
}

/// Status values for fan-out items, mirroring the string constants stored in the DB.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FanOutItemStatus {
    Pending,
    Running,
    Completed,
    Failed,
    Skipped,
}

impl FanOutItemStatus {
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Pending => "pending",
            Self::Running => "running",
            Self::Completed => "completed",
            Self::Failed => "failed",
            Self::Skipped => "skipped",
        }
    }
}

impl TryFrom<&str> for FanOutItemStatus {
    type Error = String;
    fn try_from(s: &str) -> std::result::Result<Self, Self::Error> {
        match s {
            "pending" => Ok(Self::Pending),
            "running" => Ok(Self::Running),
            "completed" => Ok(Self::Completed),
            "failed" => Ok(Self::Failed),
            "skipped" => Ok(Self::Skipped),
            other => Err(format!("unknown FanOutItemStatus: {other}")),
        }
    }
}

/// Update payload for a fan-out item, mapping the two existing update variants.
#[derive(Clone)]
pub enum FanOutItemUpdate {
    Running { child_run_id: String },
    Terminal { status: FanOutItemStatus },
}

/// Abstracts all persistence reads and writes needed by the workflow engine.
///
/// `GateApprovalStore + Send + Sync` are required bounds; `GateApprovalStore`
/// carries the gate-approval read/write methods as a supertrait so engine code
/// calling `state.persistence.get_gate_approval()` compiles without changes to
/// `ExecutionState`. All methods acquire a lock internally; no external
/// synchronization is needed.
pub trait WorkflowPersistence: GateApprovalStore + Send + Sync {
    // --- Run lifecycle ---

    fn create_run(&self, new_run: NewRun) -> Result<WorkflowRun, EngineError>;
    fn get_run(&self, run_id: &str) -> Result<Option<WorkflowRun>, EngineError>;
    fn list_active_runs(
        &self,
        statuses: &[WorkflowRunStatus],
    ) -> Result<Vec<WorkflowRun>, EngineError>;
    /// Update run status.
    ///
    /// NOTE: must not be called with `WorkflowRunStatus::Waiting` — use the engine's
    /// `set_waiting_blocked_on` path instead.
    fn update_run_status(
        &self,
        run_id: &str,
        status: WorkflowRunStatus,
        result_summary: Option<&str>,
        error: Option<&str>,
    ) -> Result<(), EngineError>;

    // --- Steps ---

    fn insert_step(&self, new_step: NewStep) -> Result<String, EngineError>;
    fn update_step(&self, step_id: &str, update: StepUpdate) -> Result<(), EngineError>;
    fn get_steps(&self, run_id: &str) -> Result<Vec<WorkflowRunStep>, EngineError>;

    // --- Fan-out ---

    /// Insert a new fan-out item row. Backends serialize `context` as JSON.
    fn insert_fan_out_item(
        &self,
        step_run_id: &str,
        item_type: &str,
        item_id: &str,
        item_ref: &str,
        context: &HashMap<String, String>,
    ) -> Result<String, EngineError>;
    fn update_fan_out_item(
        &self,
        item_id: &str,
        update: FanOutItemUpdate,
    ) -> Result<(), EngineError>;
    /// Flush a batch of fan-out item updates in a single operation.
    ///
    /// The default impl loops over `update_fan_out_item` for backwards compatibility.
    /// Production backends override this with a transactional bulk-write.
    fn batch_update_fan_out_items(
        &self,
        updates: &[(String, FanOutItemUpdate)],
    ) -> Result<(), EngineError> {
        for (id, update) in updates {
            self.update_fan_out_item(id, update.clone())?;
        }
        Ok(())
    }
    fn get_fan_out_items(
        &self,
        step_run_id: &str,
        status_filter: Option<FanOutItemStatus>,
    ) -> Result<Vec<FanOutItemRow>, EngineError>;

    // --- Engine lifecycle hooks ---

    /// Atomically claim ownership of a workflow run. Returns the new generation on
    /// success, or `None` if another engine already holds the lease.
    fn acquire_lease(
        &self,
        run_id: &str,
        token: &str,
        ttl_seconds: i64,
    ) -> Result<Option<i64>, EngineError>;

    /// Returns true if the run has been cancelled (e.g. via external request).
    fn is_run_cancelled(&self, run_id: &str) -> Result<bool, EngineError>;
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::status::WorkflowStepStatus;

    #[test]
    fn pending_when_status_is_waiting_and_no_approved_at() {
        let state = gate_approval_state_from_fields(None, WorkflowStepStatus::Waiting, None, None);
        assert!(matches!(state, GateApprovalState::Pending));
    }

    #[test]
    fn approved_when_approved_at_is_set_regardless_of_status() {
        let state = gate_approval_state_from_fields(
            Some("2025-01-01T00:00:00Z"),
            WorkflowStepStatus::Waiting,
            Some("lgtm".into()),
            None,
        );
        match state {
            GateApprovalState::Approved { feedback, .. } => {
                assert_eq!(feedback.as_deref(), Some("lgtm"));
            }
            other => panic!("expected Approved, got {other:?}"),
        }
    }

    #[test]
    fn approved_when_status_completed_and_no_approved_at() {
        let selections = vec!["option-a".to_string()];
        let state = gate_approval_state_from_fields(
            None,
            WorkflowStepStatus::Completed,
            None,
            Some(selections.clone()),
        );
        match state {
            GateApprovalState::Approved {
                feedback,
                selections: s,
            } => {
                assert!(feedback.is_none());
                assert_eq!(s, Some(selections));
            }
            other => panic!("expected Approved, got {other:?}"),
        }
    }

    #[test]
    fn rejected_when_status_failed_surfaces_feedback() {
        let state = gate_approval_state_from_fields(
            None,
            WorkflowStepStatus::Failed,
            Some("not ready".into()),
            None,
        );
        match state {
            GateApprovalState::Rejected { feedback } => {
                assert_eq!(feedback.as_deref(), Some("not ready"));
            }
            other => panic!("expected Rejected, got {other:?}"),
        }
    }

    #[test]
    fn rejected_with_no_feedback_when_none_stored() {
        let state = gate_approval_state_from_fields(None, WorkflowStepStatus::Failed, None, None);
        match state {
            GateApprovalState::Rejected { feedback } => {
                assert!(feedback.is_none());
            }
            other => panic!("expected Rejected, got {other:?}"),
        }
    }

    #[test]
    fn step_update_completed_sets_correct_fields() {
        let update = StepUpdate::completed(
            7,
            Some("child-123".into()),
            Some("result".into()),
            Some("ctx".into()),
            Some("markers".into()),
            3,
            Some("{\"key\": \"val\"}".into()),
        );
        assert_eq!(update.generation, 7);
        assert_eq!(update.status, WorkflowStepStatus::Completed);
        assert_eq!(update.child_run_id, Some("child-123".into()));
        assert_eq!(update.result_text, Some("result".into()));
        assert_eq!(update.context_out, Some("ctx".into()));
        assert_eq!(update.markers_out, Some("markers".into()));
        assert_eq!(update.retry_count, Some(3));
        assert_eq!(update.structured_output, Some("{\"key\": \"val\"}".into()));
        assert!(update.step_error.is_none());
    }

    #[test]
    fn step_update_failed_sets_correct_fields() {
        let update = StepUpdate::failed(5, "oops", 2);
        assert_eq!(update.generation, 5);
        assert_eq!(update.status, WorkflowStepStatus::Failed);
        assert_eq!(update.result_text, Some("oops".into()));
        assert_eq!(update.step_error, Some("oops".into()));
        assert!(update.child_run_id.is_none());
        assert!(update.context_out.is_none());
        assert!(update.markers_out.is_none());
        assert!(update.structured_output.is_none());
        assert_eq!(update.retry_count, Some(2));
    }

    #[test]
    fn step_update_failed_with_child_sets_child_run_id() {
        let update = StepUpdate::failed_with_child(3, "child err", 1, Some("child-run-42".into()));
        assert_eq!(update.generation, 3);
        assert_eq!(update.status, WorkflowStepStatus::Failed);
        assert_eq!(update.result_text, Some("child err".into()));
        assert_eq!(update.step_error, Some("child err".into()));
        assert_eq!(update.child_run_id, Some("child-run-42".into()));
        assert_eq!(update.retry_count, Some(1));
        assert!(update.context_out.is_none());
        assert!(update.markers_out.is_none());
        assert!(update.structured_output.is_none());
    }
}