weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Serialization bridge between live runtime types and their persisted forms.
//!
//! The `Persisted*` types are stable, serde-friendly representations of runtime
//! state, checkpoints, and scheduler metadata. Conversion between live and persisted
//! types is handled through `From` / `TryFrom` impls, keeping checkpointer code
//! free of transformation detail.
//!
//! This module performs no I/O.

use chrono::Utc;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;

use crate::{
    channels::{Channel, ExtrasChannel, MessagesChannel},
    message::Message,
    runtimes::checkpointer::Checkpoint,
    state::VersionedState,
    types::NodeKind,
    utils::json_ext::JsonSerializable,
};

/// Errors arising from persistence conversion or JSON serialization.
#[derive(Debug, Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
pub enum PersistenceError {
    /// A serde_json serialization or deserialization failure.
    #[error("serialization error: {source}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(
            code(weavegraph::persistence::serde),
            help("Check that the JSON structure matches the Persisted* types.")
        )
    )]
    Serde {
        /// The underlying serde_json error.
        #[source]
        source: serde_json::Error,
    },

    /// A catch-all for other persistence failures.
    #[error("persistence error: {0}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(code(weavegraph::persistence::other))
    )]
    Other(String),
}

/// Convenience alias for persistence results.
pub type Result<T> = std::result::Result<T, PersistenceError>;

impl<T> JsonSerializable<PersistenceError> for T
where
    T: serde::Serialize + for<'de> serde::de::DeserializeOwned,
{
    fn to_json_string(&self) -> std::result::Result<String, PersistenceError> {
        serde_json::to_string(self).map_err(|source| PersistenceError::Serde { source })
    }

    fn from_json_str(s: &str) -> std::result::Result<Self, PersistenceError> {
        serde_json::from_str(s).map_err(|source| PersistenceError::Serde { source })
    }
}

/// Persisted form of a versioned list channel (e.g., messages, errors).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedVecChannel<T> {
    /// Change-detection counter; starts at 1 for a freshly created channel.
    pub version: u32,
    /// Stored items.
    #[serde(default)]
    pub items: Vec<T>,
}

impl<T> Default for PersistedVecChannel<T> {
    fn default() -> Self {
        Self {
            version: 1,
            items: Vec::new(),
        }
    }
}

/// Persisted form of a versioned map channel (e.g., extras).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedMapChannel<V> {
    /// Change-detection counter; starts at 1 for a freshly created channel.
    pub version: u32,
    /// Stored key-value pairs.
    #[serde(default)]
    pub map: FxHashMap<String, V>,
}

impl<V> Default for PersistedMapChannel<V> {
    fn default() -> Self {
        Self {
            version: 1,
            map: FxHashMap::default(),
        }
    }
}

/// Persisted snapshot of a [`VersionedState`].
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedState {
    /// Persisted messages channel.
    pub messages: PersistedVecChannel<Message>,
    /// Persisted extras channel.
    pub extra: PersistedMapChannel<Value>,
    /// Persisted errors channel.
    #[serde(default)]
    pub errors: PersistedVecChannel<crate::channels::errors::ErrorEvent>,
}

/// Persisted form of the scheduler's `versions_seen` tracking map.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PersistedVersionsSeen(pub FxHashMap<String, FxHashMap<String, u64>>);

/// Complete persisted representation of a runtime checkpoint.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedCheckpoint {
    /// Session this checkpoint belongs to.
    pub session_id: String,
    /// Monotonically increasing step counter.
    pub step: u64,
    /// Full state snapshot at this step.
    pub state: PersistedState,
    /// Frontier nodes encoded via [`NodeKind::encode`].
    pub frontier: Vec<String>,
    /// Scheduler version-gating state.
    pub versions_seen: PersistedVersionsSeen,
    /// Maximum concurrent node executions for this session.
    pub concurrency_limit: usize,
    /// RFC 3339 timestamp, avoiding a `chrono` type in the serialized shape.
    pub created_at: String,
    /// Nodes that executed in this step, encoded as strings.
    #[serde(default)]
    pub ran_nodes: Vec<String>,
    /// Nodes that were skipped in this step, encoded as strings.
    #[serde(default)]
    pub skipped_nodes: Vec<String>,
    /// Channels that changed during this step.
    #[serde(default)]
    pub updated_channels: Vec<String>,
}

impl From<&VersionedState> for PersistedState {
    fn from(s: &VersionedState) -> Self {
        Self {
            messages: PersistedVecChannel {
                version: s.messages.version(),
                items: s.messages.snapshot(),
            },
            extra: PersistedMapChannel {
                version: s.extra.version(),
                map: s.extra.snapshot(),
            },
            errors: PersistedVecChannel {
                version: s.errors.version(),
                items: s.errors.snapshot(),
            },
        }
    }
}

impl TryFrom<PersistedState> for VersionedState {
    type Error = PersistenceError;

    fn try_from(p: PersistedState) -> Result<Self> {
        Ok(Self {
            messages: MessagesChannel::new(p.messages.items, p.messages.version),
            extra: ExtrasChannel::new(p.extra.map, p.extra.version),
            errors: crate::channels::ErrorsChannel::new(p.errors.items, p.errors.version),
        })
    }
}

impl From<&FxHashMap<String, FxHashMap<String, u64>>> for PersistedVersionsSeen {
    fn from(v: &FxHashMap<String, FxHashMap<String, u64>>) -> Self {
        Self(v.clone())
    }
}

impl From<PersistedVersionsSeen> for FxHashMap<String, FxHashMap<String, u64>> {
    fn from(p: PersistedVersionsSeen) -> Self {
        p.0
    }
}

impl From<&Checkpoint> for PersistedCheckpoint {
    fn from(cp: &Checkpoint) -> Self {
        Self {
            session_id: cp.session_id.clone(),
            step: cp.step,
            state: PersistedState::from(&cp.state),
            frontier: cp.frontier.iter().map(NodeKind::encode).collect(),
            versions_seen: PersistedVersionsSeen(cp.versions_seen.clone()),
            concurrency_limit: cp.concurrency_limit,
            created_at: cp.created_at.to_rfc3339(),
            ran_nodes: cp.ran_nodes.iter().map(NodeKind::encode).collect(),
            skipped_nodes: cp.skipped_nodes.iter().map(NodeKind::encode).collect(),
            updated_channels: cp.updated_channels.clone(),
        }
    }
}

impl TryFrom<PersistedCheckpoint> for Checkpoint {
    type Error = PersistenceError;

    fn try_from(p: PersistedCheckpoint) -> Result<Self> {
        let state = VersionedState::try_from(p.state)?;
        let created_at = chrono::DateTime::parse_from_rfc3339(&p.created_at)
            .map(|dt| dt.with_timezone(&Utc))
            .unwrap_or_else(|_| Utc::now());
        Ok(Self {
            session_id: p.session_id,
            step: p.step,
            state,
            frontier: p.frontier.iter().map(|s| NodeKind::decode(s)).collect(),
            versions_seen: p.versions_seen.0,
            concurrency_limit: p.concurrency_limit,
            created_at,
            ran_nodes: p.ran_nodes.iter().map(|s| NodeKind::decode(s)).collect(),
            skipped_nodes: p
                .skipped_nodes
                .iter()
                .map(|s| NodeKind::decode(s))
                .collect(),
            updated_channels: p.updated_channels,
        })
    }
}