batpak 0.7.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
use crate::event::EventSourced;
use crate::store::index::columnar::CachedProjectionSlot;
use crate::store::index::ProjectionReplayPlan;
use crate::store::Freshness;
use std::any::TypeId;

/// Internal dispatch strategy for a single project() call.
/// Computed once from known metadata; makes the decision tree explicit and testable.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProjectionStrategy {
    /// No replay plan exists — the entity has no matching events.
    Empty,
    /// Group-local cache hit is fresh; deserialize and return.
    GroupLocalHit,
    /// Group-local slot exists but is stale; apply delta events incrementally.
    GroupLocalIncremental,
    /// Probe the external cache first, then fall back to full replay.
    ExternalCacheThenReplay,
    /// Skip external cache entirely and go straight to disk replay.
    DirectReplay,
}

#[derive(Debug, Clone)]
pub(super) struct ReplayContext {
    pub(super) plan: ProjectionReplayPlan,
    pub(super) cache_key: Vec<u8>,
    pub(super) watermark: u64,
    /// Wall-clock µs-since-epoch captured at plan build. Used as the
    /// prefetch-hint predicted timestamp so backends can warm the right
    /// row; NOT used as the `cached_at_us` stamp written into the real
    /// cache row. The honest put-time stamp is taken inside
    /// `store_projection_value` right before `ProjectionCache::put`
    /// (see G6). Survives across process restarts via the cache format;
    /// not monotonic on its own.
    pub(super) cached_at_us: i64,
    /// Monotonic ns-since-process-anchor captured at plan build. Only
    /// meaningful within the producing process; readers compare
    /// `process_boot_ns` before trusting age deltas. Same rationale as
    /// `cached_at_us`: used for prefetch prediction, NOT as the stamp
    /// written at put time.
    pub(super) cached_at_mono_ns: i64,
    /// This process's monotonic-epoch marker. Stamped on every cached value
    /// produced by this replay so subsequent reads can detect cross-process
    /// monotonic-clock comparisons.
    pub(super) process_boot_ns: u64,
    pub(super) type_id: TypeId,
}

#[derive(Debug, Clone)]
pub(super) struct PreparedProjection {
    pub(super) replay: ReplayContext,
    pub(super) group_local_slot: Option<CachedProjectionSlot>,
    pub(super) group_local_fresh: bool,
}

#[derive(Debug, Clone)]
pub(super) enum ProjectionPreparation {
    Empty,
    Planned(PreparedProjection),
}

#[derive(Debug, Clone, Copy)]
pub(super) struct ReplayExecution<'a> {
    pub(super) entity: &'a str,
    pub(super) freshness: &'a Freshness,
    pub(super) replay: &'a ReplayContext,
    pub(super) started_at: std::time::Instant,
}

#[derive(Debug, Clone)]
pub(super) enum ProjectionDispatch {
    Empty,
    GroupLocalHit {
        slot: CachedProjectionSlot,
        replay: ReplayContext,
    },
    GroupLocalIncremental {
        slot: CachedProjectionSlot,
        replay: ReplayContext,
    },
    ExternalCacheThenReplay {
        replay: ReplayContext,
    },
    DirectReplay {
        replay: ReplayContext,
    },
}

impl ProjectionDispatch {
    pub(super) fn strategy(&self) -> ProjectionStrategy {
        match self {
            Self::Empty => ProjectionStrategy::Empty,
            Self::GroupLocalHit { .. } => ProjectionStrategy::GroupLocalHit,
            Self::GroupLocalIncremental { .. } => ProjectionStrategy::GroupLocalIncremental,
            Self::ExternalCacheThenReplay { .. } => ProjectionStrategy::ExternalCacheThenReplay,
            Self::DirectReplay { .. } => ProjectionStrategy::DirectReplay,
        }
    }
}

pub(super) fn replay_execution<'a>(
    entity: &'a str,
    freshness: &'a Freshness,
    replay: &'a ReplayContext,
    started_at: std::time::Instant,
) -> ReplayExecution<'a> {
    ReplayExecution {
        entity,
        freshness,
        replay,
        started_at,
    }
}

impl PreparedProjection {
    pub(super) fn dispatch<T: EventSourced>(
        self,
        incremental_enabled: bool,
        cache_is_noop: bool,
    ) -> ProjectionDispatch {
        let strategy = compute_strategy(
            self.group_local_slot.as_ref(),
            self.group_local_fresh,
            T::supports_incremental_apply(),
            incremental_enabled,
            cache_is_noop,
        );
        let Self {
            replay,
            group_local_slot,
            ..
        } = self;

        match (strategy, group_local_slot) {
            (ProjectionStrategy::GroupLocalHit, Some(slot)) => {
                ProjectionDispatch::GroupLocalHit { slot, replay }
            }
            (ProjectionStrategy::GroupLocalIncremental, Some(slot)) => {
                ProjectionDispatch::GroupLocalIncremental { slot, replay }
            }
            (ProjectionStrategy::ExternalCacheThenReplay, _) => {
                ProjectionDispatch::ExternalCacheThenReplay { replay }
            }
            (ProjectionStrategy::DirectReplay, _) => ProjectionDispatch::DirectReplay { replay },
            (ProjectionStrategy::Empty, _) => ProjectionDispatch::Empty,
            (
                ProjectionStrategy::GroupLocalHit | ProjectionStrategy::GroupLocalIncremental,
                None,
            ) => {
                debug_assert!(
                    false,
                    "compute_strategy selected a group-local projection strategy without a cached slot"
                );
                ProjectionDispatch::DirectReplay { replay }
            }
        }
    }
}

/// Pure function: decide which projection strategy to use from known metadata.
/// No I/O, no side effects — makes the decision tree unit-testable.
pub(super) fn compute_strategy(
    group_local_slot: Option<&CachedProjectionSlot>,
    is_group_local_fresh: bool,
    supports_incremental: bool,
    incremental_enabled: bool,
    cache_is_noop: bool,
) -> ProjectionStrategy {
    if group_local_slot.is_some() {
        if is_group_local_fresh {
            return ProjectionStrategy::GroupLocalHit;
        }
        if supports_incremental && incremental_enabled {
            return ProjectionStrategy::GroupLocalIncremental;
        }
    }
    if cache_is_noop {
        return ProjectionStrategy::DirectReplay;
    }
    ProjectionStrategy::ExternalCacheThenReplay
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::event::{Event, EventKind};
    use crate::store::index::ProjectionReplayPlan;

    #[derive(Default, Debug, serde::Serialize, serde::Deserialize)]
    struct Counter;

    impl EventSourced for Counter {
        type Input = crate::event::JsonValueInput;

        fn apply_event(&mut self, _event: &Event<serde_json::Value>) {}

        fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
            (!events.is_empty()).then_some(Self)
        }

        fn relevant_event_kinds() -> &'static [EventKind] {
            static KINDS: [EventKind; 1] = [EventKind::custom(0xF, 1)];
            &KINDS
        }
    }

    fn replay_context() -> ReplayContext {
        ReplayContext {
            plan: ProjectionReplayPlan {
                watermark: 7,
                generation: 9,
                items: vec![],
            },
            cache_key: vec![1, 2, 3],
            watermark: 7,
            cached_at_us: 0,
            cached_at_mono_ns: 0,
            process_boot_ns: 0,
            type_id: std::any::TypeId::of::<Counter>(),
        }
    }

    #[test]
    fn dispatch_uses_direct_replay_for_noop_cache_without_group_local_slot() {
        let prepared = PreparedProjection {
            replay: replay_context(),
            group_local_slot: None,
            group_local_fresh: false,
        };

        assert!(matches!(
            prepared.dispatch::<Counter>(false, true),
            ProjectionDispatch::DirectReplay { replay }
                if replay.plan.generation == 9
        ));
    }
}