weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Checkpointer trait, shared types, and the in-memory backend.
//!
//! Three concrete implementations are available, selected via [`CheckpointerType`]:
//! - [`InMemoryCheckpointer`] — volatile, process-local; suitable for tests and
//!   single-run workloads.
//! - `SQLiteCheckpointer` (`feature = "sqlite"`) — file-backed with full step history.
//! - `PostgresCheckpointer` (`feature = "postgres"`) — server-backed with full step history.
//!
//! Enable debug tracing to inspect operations:
//! ```bash
//! RUST_LOG=weavegraph::runtimes::checkpointer=debug cargo run
//! ```

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rustc_hash::FxHashMap;
use std::sync::RwLock;

use crate::{
    runtimes::session::SessionState, schedulers::SchedulerState, state::VersionedState,
    types::NodeKind,
};

/// Snapshot of session execution state captured at a barrier boundary.
#[derive(Debug, Clone)]
pub struct Checkpoint {
    /// Session this checkpoint belongs to.
    pub session_id: String,
    /// Barrier step index at capture time.
    pub step: u64,
    /// Full versioned-state snapshot.
    pub state: VersionedState,
    /// Node frontier to resume from.
    pub frontier: Vec<NodeKind>,
    /// Scheduler version-gating counters.
    pub versions_seen: FxHashMap<String, FxHashMap<String, u64>>,
    /// Maximum concurrent nodes for this session.
    pub concurrency_limit: usize,
    /// Wall-clock time of capture.
    pub created_at: DateTime<Utc>,
    /// Nodes that executed in this step (empty for step 0).
    pub ran_nodes: Vec<NodeKind>,
    /// Nodes that were skipped in this step (empty for step 0).
    pub skipped_nodes: Vec<NodeKind>,
    /// Channels updated in this step (empty for step 0).
    pub updated_channels: Vec<String>,
}

impl Checkpoint {
    /// Build a checkpoint from raw session state, with no step execution metadata.
    #[must_use]
    pub fn from_session(session_id: &str, session: &SessionState) -> Self {
        Self {
            session_id: session_id.to_owned(),
            step: session.step,
            state: session.state.clone(),
            frontier: session.frontier.clone(),
            versions_seen: session.scheduler_state.versions_seen.clone(),
            concurrency_limit: session.scheduler.concurrency_limit,
            created_at: Utc::now(),
            ran_nodes: vec![],
            skipped_nodes: vec![],
            updated_channels: vec![],
        }
    }

    /// Build a checkpoint from a completed step, capturing full execution metadata.
    #[must_use]
    pub fn from_step_report(
        session_id: &str,
        session_state: &SessionState,
        step_report: &crate::runtimes::execution::StepReport,
    ) -> Self {
        Self {
            session_id: session_id.to_owned(),
            step: session_state.step,
            state: session_state.state.clone(),
            frontier: session_state.frontier.clone(),
            versions_seen: session_state.scheduler_state.versions_seen.clone(),
            concurrency_limit: session_state.scheduler.concurrency_limit,
            created_at: Utc::now(),
            ran_nodes: step_report.ran_nodes.clone(),
            skipped_nodes: step_report.skipped_nodes.clone(),
            updated_channels: step_report
                .barrier_outcome
                .updated_channels
                .iter()
                .map(|s| s.to_string())
                .collect(),
        }
    }
}

/// Errors returned by [`Checkpointer`] operations.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
#[non_exhaustive]
pub enum CheckpointerError {
    /// No checkpoint found for the given session.
    #[error("session not found: {session_id}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(
            code(weavegraph::checkpointer::not_found),
            help(
                "Ensure the session ID `{session_id}` is correct and the session has been created."
            )
        )
    )]
    NotFound {
        /// The session ID that was not found.
        session_id: String,
    },

    /// A storage backend error (database, filesystem, etc.).
    #[error("backend error: {message}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(
            code(weavegraph::checkpointer::backend),
            help("Check backend connectivity and permissions; backend message: {message}.")
        )
    )]
    Backend {
        /// Description of the backend error.
        message: String,
    },

    /// An unexpected or miscellaneous checkpointer error.
    #[error("checkpointer error: {message}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(code(weavegraph::checkpointer::other))
    )]
    Other {
        /// Human-readable error description.
        message: String,
    },
}

/// Selects the backing store used by the runtime's checkpointer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CheckpointerType {
    /// Volatile, process-local storage. Fast; no durability.
    InMemory,
    #[cfg(feature = "sqlite")]
    #[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
    /// File-backed SQLite storage with full step history.
    SQLite,
    #[cfg(feature = "postgres")]
    #[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
    /// PostgreSQL-backed storage with full step history.
    Postgres,
}

/// Convenience alias for checkpointer results.
pub type Result<T> = std::result::Result<T, CheckpointerError>;

/// Persistent storage and retrieval of workflow execution state.
///
/// Implementations must be `Send + Sync` and handle concurrent access to the
/// same session gracefully. The `save` operation replaces any existing
/// snapshot for the session; `load_latest` returns `None` for absent sessions.
#[async_trait]
pub trait Checkpointer: Send + Sync {
    /// Persist a checkpoint, replacing any prior snapshot for the same session.
    async fn save(&self, checkpoint: Checkpoint) -> Result<()>;

    /// Return the most recent checkpoint for a session, or `None` if absent.
    async fn load_latest(&self, session_id: &str) -> Result<Option<Checkpoint>>;

    /// Return all session IDs that have at least one stored checkpoint.
    async fn list_sessions(&self) -> Result<Vec<String>>;
}

/// Volatile in-memory checkpointer. Keeps only the latest snapshot per session.
///
/// Enable debug tracing to inspect operations:
/// ```bash
/// RUST_LOG=weavegraph::runtimes::checkpointer=debug
/// ```
#[derive(Default)]
pub struct InMemoryCheckpointer {
    inner: RwLock<FxHashMap<String, Checkpoint>>,
}

impl InMemoryCheckpointer {
    /// Create a new, empty in-memory checkpointer.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }
}

#[async_trait]
impl Checkpointer for InMemoryCheckpointer {
    #[tracing::instrument(skip(self), fields(session_id = %checkpoint.session_id, step = checkpoint.step))]
    async fn save(&self, checkpoint: Checkpoint) -> Result<()> {
        self.inner
            .write()
            .expect("InMemoryCheckpointer RwLock poisoned")
            .insert(checkpoint.session_id.clone(), checkpoint);
        Ok(())
    }

    #[tracing::instrument(skip(self), fields(session_id = %session_id))]
    async fn load_latest(&self, session_id: &str) -> Result<Option<Checkpoint>> {
        Ok(self
            .inner
            .read()
            .expect("InMemoryCheckpointer RwLock poisoned")
            .get(session_id)
            .cloned())
    }

    #[tracing::instrument(skip(self))]
    async fn list_sessions(&self) -> Result<Vec<String>> {
        Ok(self
            .inner
            .read()
            .expect("InMemoryCheckpointer RwLock poisoned")
            .keys()
            .cloned()
            .collect())
    }
}

/// Reconstruct a [`SessionState`] from a persisted [`Checkpoint`].
#[must_use = "restored session state should be used to continue execution"]
pub fn restore_session_state(cp: &Checkpoint) -> SessionState {
    use crate::schedulers::Scheduler;
    SessionState {
        state: cp.state.clone(),
        step: cp.step,
        frontier: cp.frontier.clone(),
        scheduler: Scheduler::new(cp.concurrency_limit),
        scheduler_state: SchedulerState {
            versions_seen: cp.versions_seen.clone(),
        },
    }
}