juncture-core 0.2.0

Core types and traits for Juncture state machine framework
Documentation
use crate::error::InvalidUpdateError;
use std::sync::Arc;

/// Per-field version numbers for state change tracking.
///
/// Each element corresponds to one state field, identified by the field's
/// declaration index (0-based). The Pregel scheduler uses version numbers
/// to decide which nodes need to re-execute when subscribed fields change.
///
/// When `#[derive(State)]` generates the `State` implementation, it also
/// generates a concrete `FieldVersions` type for the struct.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct FieldVersions(pub Vec<u64>);

impl FieldVersions {
    /// Create a new `FieldVersions` with `num_fields` entries initialized to zero.
    #[must_use]
    pub fn new(num_fields: usize) -> Self {
        Self(vec![0; num_fields])
    }

    /// Get the version number for a specific field.
    #[must_use]
    pub fn get(&self, field_idx: usize) -> u64 {
        self.0.get(field_idx).copied().unwrap_or(0)
    }

    /// Increment the version number for a specific field.
    pub fn bump(&mut self, field_idx: usize) {
        if let Some(v) = self.0.get_mut(field_idx) {
            *v = v.wrapping_add(1);
        }
    }
}

/// State trait for graph state management
///
/// All states used in Juncture graphs must implement this trait.
/// The #[derive(State)] macro automatically generates the implementation.
pub trait State: Clone + Default + Send + Sync + std::fmt::Debug + 'static {
    /// Partial update type generated by #[derive(State)]
    type Update: Default + Clone + Send + Sync + 'static;

    /// Field version tracking type generated by `#[derive(State)]`.
    type FieldVersions: Default + Clone + Send + Sync + 'static;

    /// Get the current field version numbers.
    ///
    /// Returns a snapshot of per-field version counters. The Pregel engine
    /// uses these to determine which nodes need to re-execute when their
    /// subscribed fields have new versions since their last execution.
    ///
    /// Default returns `Self::FieldVersions::default()` (all zeros).
    /// `#[derive(State)]` generates a proper implementation.
    #[must_use]
    fn field_versions(&self) -> Self::FieldVersions {
        Self::FieldVersions::default()
    }

    /// Increment version numbers for the fields marked in `changed`.
    ///
    /// Called after each superstep to bump version counters for fields
    /// that were modified. The Pregel engine uses version comparisons to
    /// implement reactive scheduling.
    ///
    /// Default is a no-op. `#[derive(State)]` generates a proper
    /// implementation.
    fn bump_versions(&mut self, _changed: &FieldsChanged) {
        // no-op default: version tracking is delegated to the engine
    }

    /// Apply an update to this state, returning which fields changed
    fn apply(&mut self, update: Self::Update) -> FieldsChanged;

    /// Apply an update to this state, returning which fields changed or an error
    ///
    /// Unlike `apply()`, this method returns a structured error when reducer
    /// constraints are violated (e.g., multiple writers on a replace channel).
    /// The default implementation delegates to `apply()` for backward compatibility.
    ///
    /// # Errors
    ///
    /// Returns `InvalidUpdateError` if the update violates reducer constraints,
    /// such as `InvalidUpdateError::MultipleOverwrite` when multiple nodes
    /// write to a replace channel in the same superstep.
    fn try_apply(&mut self, update: Self::Update) -> Result<FieldsChanged, InvalidUpdateError> {
        Ok(self.apply(update))
    }

    /// Reset ephemeral fields (called after each superstep)
    fn reset_ephemeral(&mut self);

    /// Finish a specific field (called when graph execution completes)
    ///
    /// This allows channels to finalize their state. For example,
    /// `LastValueAfterFinishChannel` only makes its value available after
    /// `finish()` is called.
    ///
    /// Default implementation is a no-op for channels that don't need finish semantics.
    fn finish_field(&mut self, _field_idx: usize) {}

    /// Consume a specific field (called after `apply_writes()` per superstep)
    ///
    /// This marks a channel's value as consumed by the framework after writes
    /// have been applied. For `EphemeralChannel`, this sets the `consumed` flag
    /// to `true`, signaling that the value has been read. The consumed flag is
    /// reset on the next `update()`.
    ///
    /// Called in `after_tick()` for each field that changed in the superstep.
    ///
    /// Default implementation is a no-op for field types that don't need
    /// consume semantics.
    fn consume_field(&mut self, _field_idx: usize) {}

    /// Indices of fields that use the `ephemeral` reducer.
    ///
    /// Used by the Pregel engine to call `consume_field()` only for fields
    /// that need consume semantics, avoiding unnecessary work. The proc-macro
    /// generates this as a static slice from `#[reducer(ephemeral)]` annotations.
    /// Default returns an empty slice for manually implemented states.
    #[must_use]
    fn consume_field_indices() -> &'static [usize] {
        &[]
    }

    /// Schema version for migration
    #[must_use]
    fn schema_version() -> u32 {
        1
    }

    /// Migrate from older schema version
    #[must_use]
    fn migrate(_from_version: u32, value: serde_json::Value) -> serde_json::Value {
        value
    }

    /// Indices of fields that use the `replace` reducer.
    ///
    /// Used by the Pregel engine to detect multiple writers in a single
    /// superstep. The proc-macro generates this as a static slice.
    /// Default returns an empty slice for manually implemented states.
    #[must_use]
    fn replace_field_indices() -> &'static [usize] {
        &[]
    }

    /// Indices of fields that use the `replace_after_finish` reducer.
    ///
    /// Used by the Pregel engine to call `finish_field()` only for fields
    /// that need finish semantics. The proc-macro generates this as a
    /// static slice. Default returns an empty slice for manually
    /// implemented states.
    #[must_use]
    fn replace_after_finish_field_indices() -> &'static [usize] {
        &[]
    }

    /// Check if a specific field is set (Some) in an update.
    ///
    /// Provides efficient field-level inspection without serialization,
    /// used by the Pregel engine for multi-writer conflict detection.
    /// The proc-macro generates an optimized match-based implementation.
    /// Default returns `false` for manually implemented states.
    #[must_use]
    fn field_is_set(_update: &Self::Update, _field_idx: usize) -> bool {
        false
    }

    /// Number of fields in this state type.
    ///
    /// Used by `validate_keys()` to verify that all reducer index arrays
    /// reference valid field positions. The proc-macro generates a constant
    /// based on the struct's field count. Default returns `0` for manually
    /// implemented states.
    #[must_use]
    fn field_count() -> usize {
        0
    }

    /// Names of fields in declaration order.
    ///
    /// Used by `validate_keys()` to report which field indices are invalid.
    /// The proc-macro generates a static slice of string literals matching
    /// the struct field names. Default returns an empty slice for manually
    /// implemented states.
    #[must_use]
    fn field_names() -> &'static [&'static str] {
        &[]
    }

    /// Field indices and snapshot frequencies for `DeltaChannel` fields.
    ///
    /// Returns `(field_index, snapshot_frequency)` pairs identifying which
    /// fields use [`DeltaChannel`](super::channel::DeltaChannel) and their
    /// configured snapshot interval. The Pregel engine uses this to track
    /// per-channel delta counters and decide when to persist a full snapshot
    /// versus an incremental delta.
    ///
    /// The proc-macro generates a static slice from `#[delta(frequency = N)]`
    /// annotations. Default returns an empty slice for manually implemented
    /// states (no delta channels).
    #[must_use]
    fn delta_channel_specs() -> &'static [(usize, usize)] {
        &[]
    }
}

/// Bitmask tracking which fields changed
///
/// Uses `u64` to track up to 64 fields. For states with more fields,
/// enable the "wide-state" feature to use `FixedBitSet` instead.
#[derive(Clone, Debug, Default)]
pub struct FieldsChanged(pub u64);

impl FieldsChanged {
    #[must_use]
    pub const fn is_empty(&self) -> bool {
        self.0 == 0
    }

    #[must_use]
    pub const fn has_field(&self, index: usize) -> bool {
        self.0 & (1 << index) != 0
    }

    #[allow(
        clippy::missing_const_for_fn,
        reason = "mutable methods cannot be const"
    )]
    pub fn set_field(&mut self, index: usize) {
        self.0 |= 1 << index;
    }

    #[allow(
        clippy::missing_const_for_fn,
        reason = "mutable methods cannot be const"
    )]
    pub fn merge(&mut self, other: &Self) {
        self.0 |= other.0;
    }
}

/// Copy-on-write state wrapper (default state wrapper)
///
/// For large states (e.g., long conversation histories), cloning the entire
/// state for each node spawn is expensive. `CowState` uses `Arc` to share
/// immutable state and only clones when modified.
///
/// This is the DEFAULT state wrapper in Juncture, not just an optimization.
#[derive(Debug)]
pub struct CowState<S: State> {
    /// Shared immutable state
    shared: Arc<S>,
    /// Pending local modifications
    pending: Option<S::Update>,
}

impl<S: State> CowState<S> {
    /// Create `CowState` from shared state
    #[must_use]
    pub const fn new(state: Arc<S>) -> Self {
        Self {
            shared: state,
            pending: None,
        }
    }

    /// Get current state (read-only)
    pub fn get(&self) -> &S {
        &self.shared
    }

    /// Get mutable access to the state, cloning the inner Arc if shared
    ///
    /// Uses clone-on-write semantics: if the Arc reference count is greater
    /// than one, the inner state is cloned before returning a mutable reference.
    /// This ensures no other `CowState` instances are affected by mutations.
    pub fn get_mut(&mut self) -> &mut S
    where
        S: Clone,
    {
        Arc::make_mut(&mut self.shared)
    }

    /// Apply an update (deferred until commit)
    ///
    /// Note: For proper merge semantics, this implementation simply stores
    /// the latest update. The proc-macro generates more sophisticated
    /// merge logic for complex update types.
    pub fn update(&mut self, changes: S::Update) {
        // Store the new update, replacing any previous pending update
        // The State trait's apply() method handles proper merging when commit() is called
        self.pending = Some(changes);
    }

    /// Commit updates and return new shared state
    pub fn commit(self) -> Arc<S> {
        if let Some(pending) = self.pending {
            let mut state = (*self.shared).clone();
            state.apply(pending);
            Arc::new(state)
        } else {
            self.shared
        }
    }

    /// Commit updates and return new shared state, propagating reducer errors
    ///
    /// Unlike `commit()`, this method returns a structured error when reducer
    /// constraints are violated (e.g., multiple writers on a replace channel).
    ///
    /// # Errors
    ///
    /// Returns `InvalidUpdateError` if the update violates reducer constraints.
    pub fn try_commit(self) -> Result<Arc<S>, InvalidUpdateError> {
        if let Some(pending) = self.pending {
            let mut state = (*self.shared).clone();
            let _changed = state.try_apply(pending)?;
            Ok(Arc::new(state))
        } else {
            Ok(self.shared)
        }
    }
}

impl<S: State> Clone for CowState<S> {
    fn clone(&self) -> Self {
        Self {
            shared: Arc::clone(&self.shared),
            pending: None,
        }
    }
}

impl<S: State> std::ops::Deref for CowState<S> {
    type Target = S;

    fn deref(&self) -> &Self::Target {
        &self.shared
    }
}

/// Trait for converting input schema into full State
///
/// Types implementing this trait can be used as the input type `I` for
/// [`StateGraph<S, I, O>`](crate::graph::StateGraph). The default blanket
/// implementation converts `S` into `S` (identity), ensuring full backward
/// compatibility when `I = S`.
pub trait IntoState<S: State>: Clone + Send + Sync + 'static {
    /// Convert `self` into the full state type `S`.
    fn into_state(self) -> S;
}

/// Blanket implementation: any `State` type converts to itself (identity).
impl<S: State> IntoState<S> for S {
    fn into_state(self) -> S {
        self
    }
}

/// Trait for extracting output schema from full State
///
/// Types implementing this trait can be used as the output type `O` for
/// [`StateGraph<S, I, O>`](crate::graph::StateGraph). The default blanket
/// implementation extracts `S` from `S` via `Clone`, ensuring full backward
/// compatibility when `O = S`.
pub trait FromState<S: State>: Clone + Send + Sync + 'static {
    /// Extract `Self` from a reference to the full state type `S`.
    fn from_state(state: &S) -> Self;
}

/// Blanket implementation: any `State` type extracts from itself via `Clone`.
impl<S: State> FromState<S> for S {
    fn from_state(state: &S) -> Self {
        state.clone()
    }
}

// Rust guideline compliant 2026-05-22