weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Replay conformance helpers for comparing workflow runs.
//!
//! Normalizes nondeterministic fields, compares final state and event streams,
//! and returns human-readable differences suitable for test assertions.

use serde_json::{Value, json};
use thiserror::Error;

use crate::{
    channels::Channel,
    event_bus::Event,
    state::{StateKey, StateLifecycle, VersionedState},
};

/// Captured output from one workflow run.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ReplayRun {
    /// Final workflow state produced by the run.
    pub final_state: VersionedState,
    /// Events captured during the run.
    pub events: Vec<Event>,
}

impl ReplayRun {
    /// Construct from final state and captured events.
    #[must_use]
    pub fn new(final_state: VersionedState, events: Vec<Event>) -> Self {
        Self {
            final_state,
            events,
        }
    }
}

/// Result of comparing two replay artifacts.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayComparison {
    differences: Vec<String>,
}

impl ReplayComparison {
    /// No differences found.
    #[must_use]
    pub fn matched() -> Self {
        Self {
            differences: Vec::new(),
        }
    }

    /// Construct with the supplied differences.
    #[must_use]
    pub fn with_differences(differences: Vec<String>) -> Self {
        Self { differences }
    }

    /// Returns `true` when no differences were found.
    #[must_use]
    pub fn is_match(&self) -> bool {
        self.differences.is_empty()
    }

    /// Returns the differences found during comparison.
    #[must_use]
    pub fn differences(&self) -> &[String] {
        &self.differences
    }

    /// Convert into a `Result` for use in assertions.
    pub fn assert_matches(self) -> Result<(), ReplayConformanceError> {
        if self.is_match() {
            Ok(())
        } else {
            Err(ReplayConformanceError::Mismatch {
                differences: self.differences,
            })
        }
    }
}

/// Errors returned by replay conformance helpers.
#[derive(Debug, Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
#[non_exhaustive]
pub enum ReplayConformanceError {
    /// The compared runs were not equivalent.
    #[error("replay conformance mismatch: {differences:?}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(code(weavegraph::replay::mismatch))
    )]
    Mismatch {
        /// Human-readable differences.
        differences: Vec<String>,
    },
}

/// Strip nondeterministic fields from an event for replay comparison.
///
/// Removes the top-level `timestamp` field from the event's JSON representation.
#[must_use]
pub fn normalize_event(event: &Event) -> Value {
    let mut value = event.to_json_value();
    if let Value::Object(obj) = &mut value {
        obj.remove("timestamp");
    }
    value
}

/// Serialize a final state to JSON for stable comparison.
#[must_use]
pub fn normalize_state(state: &VersionedState) -> Value {
    json!({
        "messages": state.messages.snapshot(),
        "messages_version": state.messages.version(),
        "extra": state.extra.snapshot(),
        "extra_version": state.extra.version(),
        "errors": state.errors.snapshot(),
        "errors_version": state.errors.version(),
    })
}

/// Compare two final states with default normalization.
#[must_use]
pub fn compare_final_state(left: &VersionedState, right: &VersionedState) -> ReplayComparison {
    let left_value = normalize_state(left);
    let right_value = normalize_state(right);
    if left_value == right_value {
        ReplayComparison::matched()
    } else {
        ReplayComparison::with_differences(vec![format!(
            "final state differs: left={left_value} right={right_value}"
        )])
    }
}

/// Compare two event streams with the default event normalizer.
#[must_use]
pub fn compare_event_sequences(left: &[Event], right: &[Event]) -> ReplayComparison {
    compare_event_sequences_with(left, right, normalize_event)
}

/// Compare two event streams with a caller-provided normalizer.
///
/// Use this when domain events contain timestamps, generated IDs, or other
/// values that need semantic rather than byte-for-byte comparison.
#[must_use]
pub fn compare_event_sequences_with<F>(
    left: &[Event],
    right: &[Event],
    normalizer: F,
) -> ReplayComparison
where
    F: Fn(&Event) -> Value,
{
    let left_values: Vec<Value> = left.iter().map(&normalizer).collect();
    let right_values: Vec<Value> = right.iter().map(&normalizer).collect();

    if left_values == right_values {
        return ReplayComparison::matched();
    }

    let mut differences = Vec::new();
    if left_values.len() != right_values.len() {
        differences.push(format!(
            "event count differs: left={} right={}",
            left_values.len(),
            right_values.len()
        ));
    }

    if let Some((i, (l, r))) = left_values
        .iter()
        .zip(&right_values)
        .enumerate()
        .find(|(_, (l, r))| l != r)
    {
        differences.push(format!("event {i} differs: left={l} right={r}"));
    }

    ReplayComparison::with_differences(differences)
}

/// Compare two captured runs with default state and event normalization.
#[must_use]
pub fn compare_replay_runs(left: &ReplayRun, right: &ReplayRun) -> ReplayComparison {
    compare_replay_runs_with(left, right, normalize_event)
}

/// Compare two captured runs with a caller-provided event normalizer.
#[must_use]
pub fn compare_replay_runs_with<F>(
    left: &ReplayRun,
    right: &ReplayRun,
    event_normalizer: F,
) -> ReplayComparison
where
    F: Fn(&Event) -> Value,
{
    let mut differences = Vec::new();
    let state_cmp = compare_final_state(&left.final_state, &right.final_state);
    differences.extend_from_slice(state_cmp.differences());
    let event_cmp = compare_event_sequences_with(&left.events, &right.events, event_normalizer);
    differences.extend_from_slice(event_cmp.differences());
    ReplayComparison::with_differences(differences)
}

/// A filter profile for [`normalize_state_with`] and [`compare_final_state_with`].
///
/// Lists extra-map keys to exclude from normalized state output — the primary
/// mechanism for separating durable state from per-invocation scratch values
/// during replay comparison and resume assertions.
///
/// ## Conflict detection
///
/// [`ignore_key`](Self::ignore_key) records the key's [`StateLifecycle`]
/// annotation. Registering the same storage key with a **different** lifecycle
/// annotation panics with a clear message, surfacing configuration mistakes at
/// test time rather than silently producing wrong results.
///
/// Raw-string keys added via [`ignore_extra_keys`](Self::ignore_extra_keys)
/// carry no lifecycle annotation and do not trigger conflict detection.
///
/// ## Examples
///
/// ```rust
/// use weavegraph::runtimes::replay::{StateNormalizeProfile, normalize_state_with};
/// use weavegraph::state::{StateKey, StateLifecycle};
/// use weavegraph::state::VersionedState;
///
/// const TICK_EVENT: StateKey<u64> = StateKey::new("wq", "event", 1).invocation_scoped();
///
/// let profile = StateNormalizeProfile::new().ignore_key(TICK_EVENT);
///
/// let state = VersionedState::new_with_user_message("hello");
/// let _normalized = normalize_state_with(&state, &profile);
/// ```
#[derive(Debug, Default, Clone)]
pub struct StateNormalizeProfile {
    // Each entry: (storage_key, lifecycle annotation if added via typed StateKey).
    ignored: Vec<(String, Option<StateLifecycle>)>,
}

impl StateNormalizeProfile {
    /// Create an empty profile (no keys ignored).
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Ignore the given raw storage key strings during normalization.
    ///
    /// Prefer [`ignore_key`](Self::ignore_key) when a typed `StateKey` constant
    /// is available, as it also validates lifecycle consistency.
    #[must_use]
    pub fn ignore_extra_keys<I, S>(mut self, keys: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        for k in keys {
            self.add_raw(k.into(), None);
        }
        self
    }

    /// Ignore the storage slot identified by `key` during normalization.
    ///
    /// Panics if the same storage key has already been registered with a
    /// different [`StateLifecycle`] annotation.
    #[must_use]
    pub fn ignore_key<T>(mut self, key: StateKey<T>) -> Self {
        self.add_raw(key.storage_key(), Some(key.lifecycle()));
        self
    }

    fn add_raw(&mut self, storage_key: String, lifecycle: Option<StateLifecycle>) {
        if let Some((_, existing_lc)) = self.ignored.iter().find(|(k, _)| k == &storage_key) {
            match (existing_lc, &lifecycle) {
                (Some(a), Some(b)) if a != b => panic!(
                    "StateNormalizeProfile: conflicting lifecycle annotations for key {:?}: \
                     already registered as {:?}, attempted to re-register as {:?}. \
                     Ensure the same StateKey constant is used throughout.",
                    storage_key, a, b
                ),
                _ => {} // duplicate or compatible — idempotent
            }
            return;
        }
        self.ignored.push((storage_key, lifecycle));
    }

    /// Iterate over the storage key strings this profile ignores.
    pub fn ignored_keys(&self) -> impl Iterator<Item = &str> {
        self.ignored.iter().map(|(k, _)| k.as_str())
    }
}

/// Normalize a final state to JSON, excluding keys listed in `profile`.
///
/// Identical to [`normalize_state`] except named keys are suppressed from the
/// `extra` map. Use this to compare only durable state when some extra entries
/// are per-invocation scratch.
///
/// ## Examples
///
/// ```rust
/// use weavegraph::runtimes::replay::{StateNormalizeProfile, normalize_state_with};
/// use weavegraph::state::{StateKey, VersionedState};
///
/// const TICK: StateKey<u64> = StateKey::new("wq", "tick", 1).invocation_scoped();
///
/// let profile = StateNormalizeProfile::new().ignore_key(TICK);
/// let state = VersionedState::new_with_user_message("hello");
/// let _value = normalize_state_with(&state, &profile);
/// ```
#[must_use]
pub fn normalize_state_with(state: &VersionedState, profile: &StateNormalizeProfile) -> Value {
    let mut extra = state.extra.snapshot();
    for key in profile.ignored_keys() {
        extra.remove(key);
    }
    json!({
        "messages": state.messages.snapshot(),
        "messages_version": state.messages.version(),
        "extra": extra,
        "extra_version": state.extra.version(),
        "errors": state.errors.snapshot(),
        "errors_version": state.errors.version(),
    })
}

/// Compare two final states using a caller-provided normalization profile.
///
/// Equivalent to [`compare_final_state`] but filters the `extra` map through
/// `profile` before comparing.
#[must_use]
pub fn compare_final_state_with(
    left: &VersionedState,
    right: &VersionedState,
    profile: &StateNormalizeProfile,
) -> ReplayComparison {
    let left_value = normalize_state_with(left, profile);
    let right_value = normalize_state_with(right, profile);
    if left_value == right_value {
        ReplayComparison::matched()
    } else {
        ReplayComparison::with_differences(vec![format!(
            "final state differs: left={left_value} right={right_value}"
        )])
    }
}

/// Compare two captured runs using a state profile and a caller-provided event normalizer.
///
/// Combines [`compare_final_state_with`] and [`compare_event_sequences_with`] into
/// a single assertion. Use this in iterative resume tests that need both
/// durable-state filtering and custom event normalization.
#[must_use]
pub fn compare_replay_runs_with_profile<F>(
    left: &ReplayRun,
    right: &ReplayRun,
    state_profile: &StateNormalizeProfile,
    event_normalizer: F,
) -> ReplayComparison
where
    F: Fn(&Event) -> Value,
{
    let mut differences = Vec::new();
    let state_cmp = compare_final_state_with(&left.final_state, &right.final_state, state_profile);
    differences.extend_from_slice(state_cmp.differences());
    let event_cmp = compare_event_sequences_with(&left.events, &right.events, event_normalizer);
    differences.extend_from_slice(event_cmp.differences());
    ReplayComparison::with_differences(differences)
}