car-workflow 0.22.0

Declarative multi-stage workflow orchestration for Common Agent Runtime
Documentation
//! Durable persistence for paused workflow runs.
//!
//! The [`WorkflowEngine`](crate::WorkflowEngine) itself is pure: `run` returns a
//! [`PausedWorkflow`] checkpoint and `resume` takes one back. To survive a
//! process restart, that checkpoint has to live somewhere. [`CheckpointStore`]
//! is a minimal file-per-run JSON store: one `<run_id>.json` per paused run.
//!
//! It is deliberately separate from the engine so the execution logic stays
//! I/O-free and unit-testable, and so callers can swap in a different backing
//! store (database, object store) without touching the engine.
//!
//! ## Resume safety
//!
//! [`resume`](crate::WorkflowEngine::resume) may run side-effecting downstream
//! stages. [`CheckpointStore::claim`] atomically renames `<run_id>.json` to
//! `<run_id>.inflight` and hands back the checkpoint; a racing or duplicate
//! claim finds no `.json` and gets `None`. This makes resume **at-most-one
//! concurrent** — two in-flight resumes of one run cannot both proceed. The
//! caller runs `resume`, then [`save`](CheckpointStore::save)s a fresh
//! checkpoint if the run paused again, and finally
//! [`complete`](CheckpointStore::complete)s to drop the in-flight marker.
//!
//! This is *not* exactly-once across a crash: if the process dies after `claim`
//! but before `complete`, the `.inflight` marker is orphaned. Call
//! [`recover_orphaned`](CheckpointStore::recover_orphaned) once at startup
//! (before any resume can be in flight) to re-arm such runs. True
//! exactly-once for non-idempotent side effects needs an idempotency key at the
//! side-effect boundary and is out of scope here.
//!
//! ## Durability scope
//!
//! `save` writes a temp file and renames it into place, which is atomic against
//! a *clean* process restart (no half-written checkpoint is ever visible). It
//! does not `fsync`, so it is not a guarantee against power-loss / kernel crash.

use std::fs;
use std::path::{Path, PathBuf};

use crate::result::PausedWorkflow;

/// File-backed store of paused workflow checkpoints, keyed by `run_id`.
#[derive(Debug, Clone)]
pub struct CheckpointStore {
    dir: PathBuf,
}

impl CheckpointStore {
    /// Open (creating if needed) a checkpoint store rooted at `dir`.
    pub fn open(dir: impl AsRef<Path>) -> std::io::Result<Self> {
        let dir = dir.as_ref().to_path_buf();
        fs::create_dir_all(&dir)?;
        Ok(Self { dir })
    }

    fn path_for(&self, run_id: &str) -> PathBuf {
        self.dir.join(format!("{run_id}.json"))
    }

    fn inflight_path_for(&self, run_id: &str) -> PathBuf {
        self.dir.join(format!("{run_id}.inflight"))
    }

    fn read_paused(path: &Path) -> std::io::Result<Option<PausedWorkflow>> {
        match fs::read_to_string(path) {
            Ok(json) => {
                let paused = serde_json::from_str(&json)
                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
                Ok(Some(paused))
            }
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(e) => Err(e),
        }
    }

    /// Persist a paused run. Write-to-temp + atomic rename so no half-written
    /// checkpoint is ever visible. The temp name carries a fresh UUID so
    /// concurrent writers for the same `run_id` never share a temp path.
    pub fn save(&self, paused: &PausedWorkflow) -> std::io::Result<()> {
        let json = serde_json::to_string_pretty(paused)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        let final_path = self.path_for(&paused.run_id);
        let tmp_path = self.dir.join(format!(
            ".{}.{}.tmp",
            paused.run_id,
            uuid::Uuid::new_v4().simple()
        ));
        fs::write(&tmp_path, json)?;
        fs::rename(&tmp_path, &final_path)?;
        Ok(())
    }

    /// Load a paused run by id without claiming it, or `None` if absent.
    /// Read-only peek — does not protect against double-resume; use [`claim`]
    /// for that.
    ///
    /// [`claim`]: CheckpointStore::claim
    pub fn load(&self, run_id: &str) -> std::io::Result<Option<PausedWorkflow>> {
        Self::read_paused(&self.path_for(run_id))
    }

    /// Atomically take ownership of a paused run for resumption.
    ///
    /// Renames `<run_id>.json` to `<run_id>.inflight` (atomic) and returns the
    /// checkpoint. A second concurrent or duplicate claim finds no `.json` and
    /// gets `None` — this is the exactly-once guard against double-resume.
    /// After resuming, call [`save`](CheckpointStore::save) (if it paused again)
    /// then [`complete`](CheckpointStore::complete).
    pub fn claim(&self, run_id: &str) -> std::io::Result<Option<PausedWorkflow>> {
        let src = self.path_for(run_id);
        let dst = self.inflight_path_for(run_id);
        // A leftover `.inflight` (stale orphan from a prior crashed resume) would
        // make `fs::rename` fail on Windows, where rename-over-existing errors.
        // The `.json` source is authoritative, so clear the stale marker first.
        if src.exists() {
            remove_if_present(&dst)?;
        }
        match fs::rename(&src, &dst) {
            Ok(()) => Self::read_paused(&dst),
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(e) => Err(e),
        }
    }

    /// Recover runs orphaned by a crash between [`claim`](CheckpointStore::claim)
    /// and [`complete`](CheckpointStore::complete). Call **once at startup**,
    /// before any resume can be in flight — running it concurrently with a live
    /// resume could re-arm a run another caller is mid-way through.
    ///
    /// For each `<run_id>.inflight`: if a sibling `<run_id>.json` exists (a crash
    /// after a re-pause save), the `.json` is authoritative and the stale marker
    /// is removed; otherwise the marker is renamed back to `<run_id>.json`,
    /// re-arming the run for a fresh `claim`. Returns the number re-armed.
    pub fn recover_orphaned(&self) -> std::io::Result<usize> {
        let mut rearmed = 0;
        for entry in fs::read_dir(&self.dir)? {
            let path = entry?.path();
            if path.extension().and_then(|e| e.to_str()) != Some("inflight") {
                continue;
            }
            let Some(run_id) = path.file_stem().and_then(|s| s.to_str()) else {
                continue;
            };
            let json_path = self.path_for(run_id);
            if json_path.exists() {
                // Crash after re-pause save: the fresh .json wins.
                remove_if_present(&path)?;
            } else {
                // Crash mid-resume: re-arm so the run can be claimed again.
                fs::rename(&path, &json_path)?;
                rearmed += 1;
            }
        }
        Ok(rearmed)
    }

    /// Drop the in-flight marker after a claimed run finishes resuming.
    /// Idempotent. Leaves any fresh `<run_id>.json` written by a re-pause.
    pub fn complete(&self, run_id: &str) -> std::io::Result<()> {
        remove_if_present(&self.inflight_path_for(run_id))
    }

    /// Remove a checkpoint and any in-flight marker entirely (hard cleanup).
    /// Idempotent: removing a missing checkpoint is not an error.
    pub fn remove(&self, run_id: &str) -> std::io::Result<()> {
        remove_if_present(&self.path_for(run_id))?;
        remove_if_present(&self.inflight_path_for(run_id))
    }

    /// List the run IDs of all currently-paused runs.
    pub fn list(&self) -> std::io::Result<Vec<String>> {
        let mut ids = Vec::new();
        for entry in fs::read_dir(&self.dir)? {
            let entry = entry?;
            let path = entry.path();
            if path.extension().and_then(|e| e.to_str()) == Some("json") {
                if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
                    ids.push(stem.to_string());
                }
            }
        }
        ids.sort();
        Ok(ids)
    }
}

/// Remove a file, treating "already gone" as success.
fn remove_if_present(path: &Path) -> std::io::Result<()> {
    match fs::remove_file(path) {
        Ok(()) => Ok(()),
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
        Err(e) => Err(e),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::Workflow;
    use std::collections::HashMap;

    fn sample(run_id: &str) -> PausedWorkflow {
        PausedWorkflow {
            run_id: run_id.into(),
            workflow: Workflow {
                id: "wf".into(),
                name: "WF".into(),
                start: "gate".into(),
                goal: None,
                stages: vec![],
                edges: vec![],
                max_iterations: 100,
                metadata: HashMap::new(),
            },
            paused_stage_id: "gate".into(),
            prompt: "approve?".into(),
            fields: vec![],
            output_key: "approval".into(),
            wf_state: HashMap::new(),
            stage_results: vec![],
            completed_stage_ids: vec![],
            iterations: 1,
            prior_duration_ms: 0.0,
            created_at: chrono::Utc::now(),
        }
    }

    #[test]
    fn save_load_remove_roundtrip() {
        let dir = std::env::temp_dir().join(format!("car-ckpt-{}", new_id()));
        let store = CheckpointStore::open(&dir).unwrap();

        assert!(store.load("missing").unwrap().is_none());

        let p = sample("run-1");
        store.save(&p).unwrap();
        let loaded = store.load("run-1").unwrap().unwrap();
        assert_eq!(loaded.run_id, "run-1");
        assert_eq!(loaded.output_key, "approval");
        assert_eq!(loaded.prompt, "approve?");

        assert_eq!(store.list().unwrap(), vec!["run-1".to_string()]);

        store.remove("run-1").unwrap();
        assert!(store.load("run-1").unwrap().is_none());
        // Idempotent.
        store.remove("run-1").unwrap();

        let _ = fs::remove_dir_all(&dir);
    }

    #[test]
    fn claim_is_exactly_once() {
        let dir = std::env::temp_dir().join(format!("car-ckpt-claim-{}", new_id()));
        let store = CheckpointStore::open(&dir).unwrap();

        store.save(&sample("run-x")).unwrap();

        // First claim wins and gets the checkpoint.
        let first = store.claim("run-x").unwrap();
        assert!(first.is_some());
        // The run is no longer listed as paused-waiting.
        assert!(store.list().unwrap().is_empty());
        // A second claim finds nothing — the exactly-once guard.
        let second = store.claim("run-x").unwrap();
        assert!(second.is_none());

        // Completing drops the in-flight marker; idempotent.
        store.complete("run-x").unwrap();
        store.complete("run-x").unwrap();
        assert!(store.claim("run-x").unwrap().is_none());

        let _ = fs::remove_dir_all(&dir);
    }

    #[test]
    fn recover_rearms_orphaned_inflight() {
        let dir = std::env::temp_dir().join(format!("car-ckpt-recover-{}", new_id()));
        let store = CheckpointStore::open(&dir).unwrap();

        // Simulate a crash mid-resume: claimed (so .inflight exists, .json gone)
        // but never completed.
        store.save(&sample("crashed")).unwrap();
        let _ = store.claim("crashed").unwrap();
        assert!(store.claim("crashed").unwrap().is_none(), "claimed, no .json");

        // Recovery re-arms it.
        assert_eq!(store.recover_orphaned().unwrap(), 1);
        assert!(store.claim("crashed").unwrap().is_some(), "re-armed, claimable");
        // Finish the run so it doesn't look orphaned to the next recovery scan.
        store.complete("crashed").unwrap();

        // Both-files state (crash after re-pause save): .json wins, marker dropped.
        store.save(&sample("both")).unwrap();
        // Forge a stale inflight alongside the json.
        std::fs::write(dir.join("both.inflight"), "stale").unwrap();
        assert_eq!(store.recover_orphaned().unwrap(), 0, "json present → not re-armed");
        assert!(!dir.join("both.inflight").exists(), "stale marker removed");
        assert!(store.claim("both").unwrap().is_some());

        let _ = fs::remove_dir_all(&dir);
    }

    fn new_id() -> String {
        uuid::Uuid::new_v4().simple().to_string()[..8].to_string()
    }
}