batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
use super::ReplayInput;
use crate::event::{Event, EventKind, EventSourced, ProjectionInput};
use crate::store::index::{projection_kind_matches, ProjectionReplayItem};
use crate::store::{HlcPoint, ProjectionFusion3, Store, StoreError};
use std::collections::BTreeMap;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};

#[cfg(test)]
static FUSED_REPLAY_BATCH_READS: AtomicUsize = AtomicUsize::new(0);

#[cfg(test)]
pub(crate) fn reset_fused_replay_batch_reads() {
    FUSED_REPLAY_BATCH_READS.store(0, Ordering::SeqCst);
}

#[cfg(test)]
pub(crate) fn fused_replay_batch_reads() -> usize {
    FUSED_REPLAY_BATCH_READS.load(Ordering::SeqCst)
}

#[cfg(test)]
fn observe_fused_replay_batch_read() {
    FUSED_REPLAY_BATCH_READS.fetch_add(1, Ordering::SeqCst);
}

#[cfg(not(test))]
fn observe_fused_replay_batch_read() {}

pub(crate) fn project_fused2<Left, Right, State: crate::store::StoreState>(
    store: &Store<State>,
    entity: &str,
) -> Result<(Option<Left>, Option<Right>), StoreError>
where
    Left: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
    Right: EventSourced<Input = Left::Input>
        + serde::Serialize
        + serde::de::DeserializeOwned
        + 'static,
    Left::Input: ReplayInput,
{
    let relevant_kinds = fused_relevant_kinds::<Left, Right>();
    let Some(plan) = store
        .index
        .projection_replay_plan(entity, relevant_kinds.as_slice())
    else {
        return Ok((None, None));
    };

    let positions: Vec<&crate::store::index::DiskPos> =
        plan.items.iter().map(|item| &item.disk_pos).collect();
    observe_fused_replay_batch_read();
    let events = Left::Input::read_batch(&store.reader, &positions)?;
    let (left_events, left_lanes) = filtered_projection_events::<Left, _>(&events, &plan.items);
    let (right_events, right_lanes) = filtered_projection_events::<Right, _>(&events, &plan.items);

    let left = Left::from_events(left_events.as_slice());
    let right = Right::from_events(right_events.as_slice());
    notify_projection_applied_lanes::<Left, State>(store, entity, &left_lanes);
    notify_projection_applied_lanes::<Right, State>(store, entity, &right_lanes);

    Ok((left, right))
}

pub(crate) fn project_fused3<First, Second, Third, State: crate::store::StoreState>(
    store: &Store<State>,
    entity: &str,
) -> Result<ProjectionFusion3<First, Second, Third>, StoreError>
where
    First: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
    Second: EventSourced<Input = First::Input>
        + serde::Serialize
        + serde::de::DeserializeOwned
        + 'static,
    Third: EventSourced<Input = First::Input>
        + serde::Serialize
        + serde::de::DeserializeOwned
        + 'static,
    First::Input: ReplayInput,
{
    let relevant_kinds = fused_relevant_kinds3::<First, Second, Third>();
    let Some(plan) = store
        .index
        .projection_replay_plan(entity, relevant_kinds.as_slice())
    else {
        return Ok((None, None, None));
    };

    let positions: Vec<&crate::store::index::DiskPos> =
        plan.items.iter().map(|item| &item.disk_pos).collect();
    observe_fused_replay_batch_read();
    let events = First::Input::read_batch(&store.reader, &positions)?;
    let (first_events, first_lanes) = filtered_projection_events::<First, _>(&events, &plan.items);
    let (second_events, second_lanes) =
        filtered_projection_events::<Second, _>(&events, &plan.items);
    let (third_events, third_lanes) = filtered_projection_events::<Third, _>(&events, &plan.items);

    let first = First::from_events(first_events.as_slice());
    let second = Second::from_events(second_events.as_slice());
    let third = Third::from_events(third_events.as_slice());
    notify_projection_applied_lanes::<First, State>(store, entity, &first_lanes);
    notify_projection_applied_lanes::<Second, State>(store, entity, &second_lanes);
    notify_projection_applied_lanes::<Third, State>(store, entity, &third_lanes);

    Ok((first, second, third))
}

fn fused_relevant_kinds<Left, Right>() -> Vec<EventKind>
where
    Left: EventSourced,
    Right: EventSourced,
{
    let left = Left::relevant_event_kinds();
    let right = Right::relevant_event_kinds();
    collect_relevant_kinds(&[left, right])
}

fn fused_relevant_kinds3<First, Second, Third>() -> Vec<EventKind>
where
    First: EventSourced,
    Second: EventSourced,
    Third: EventSourced,
{
    collect_relevant_kinds(&[
        First::relevant_event_kinds(),
        Second::relevant_event_kinds(),
        Third::relevant_event_kinds(),
    ])
}

fn collect_relevant_kinds(slices: &[&[EventKind]]) -> Vec<EventKind> {
    if slices.iter().any(|slice| slice.is_empty()) {
        return Vec::new();
    }

    let capacity = slices
        .iter()
        .fold(0usize, |total, slice| total.saturating_add(slice.len()));
    let mut kinds = Vec::with_capacity(capacity);
    for slice in slices {
        for kind in slice.iter().copied() {
            if !kinds.contains(&kind) {
                kinds.push(kind);
            }
        }
    }
    kinds
}

fn filtered_projection_events<T, I>(
    events: &[Event<I::Payload>],
    items: &[ProjectionReplayItem],
) -> (Vec<Event<I::Payload>>, BTreeMap<u32, HlcPoint>)
where
    T: EventSourced<Input = I>,
    I: ProjectionInput,
{
    let mut filtered = Vec::new();
    let mut lanes = BTreeMap::<u32, HlcPoint>::new();
    for (event, item) in events.iter().zip(items) {
        if projection_kind_matches(T::relevant_event_kinds(), event.event_kind()) {
            filtered.push(event.clone());
            lanes
                .entry(item.lane)
                .and_modify(|current| *current = (*current).max_by_sequence(item.point))
                .or_insert(item.point);
        }
    }
    (filtered, lanes)
}

fn notify_projection_applied_lanes<T, State: crate::store::StoreState>(
    store: &Store<State>,
    entity: &str,
    lanes: &BTreeMap<u32, HlcPoint>,
) where
    T: 'static,
{
    let projection_id =
        crate::store::projection::registry::ProjectionRegistry::id_for_type::<T>(entity);
    for (lane, point) in lanes {
        store
            .projection_registry
            .notify_applied_on_lane(projection_id.clone(), *lane, *point);
    }
}

#[cfg(test)]
mod relevant_kinds_tests {
    use super::{collect_relevant_kinds, fused_relevant_kinds, fused_relevant_kinds3};
    use crate::event::{Event, EventKind, EventSourced, JsonValueInput};

    const A_KIND: EventKind = EventKind::custom(0xE, 11);
    const B_KIND: EventKind = EventKind::custom(0xE, 12);
    const C_KIND: EventKind = EventKind::custom(0xE, 13);

    struct FoldA;
    struct FoldB;
    struct FoldC;

    macro_rules! single_kind_fold {
        ($ty:ty, $kind:expr) => {
            impl EventSourced for $ty {
                type Input = JsonValueInput;
                const STATE_CONTRACT: crate::event::ProjectionStateContract =
                    crate::event::ProjectionStateContract::single_entity(
                        "fusion-relevant-kinds-test",
                    );

                fn from_events(_events: &[Event<serde_json::Value>]) -> Option<Self> {
                    None
                }

                fn apply_event(&mut self, _event: &Event<serde_json::Value>) {}

                fn relevant_event_kinds() -> &'static [EventKind] {
                    &[$kind]
                }

                fn state_extent(&self) -> crate::event::StateExtent {
                    crate::event::StateExtent::single_entity()
                }
            }
        };
    }

    single_kind_fold!(FoldA, A_KIND);
    single_kind_fold!(FoldB, B_KIND);
    single_kind_fold!(FoldC, C_KIND);

    #[test]
    fn fused_relevant_kinds_returns_union_of_both_folds() {
        let kinds = fused_relevant_kinds::<FoldA, FoldB>();

        // Kills `-> vec![]`: the union must be the two declared, non-empty kinds.
        assert_eq!(
            kinds,
            vec![A_KIND, B_KIND],
            "PROPERTY: fused_relevant_kinds must return the union of both folds' declared kinds"
        );
    }

    #[test]
    fn fused_relevant_kinds3_returns_union_of_three_folds() {
        let kinds = fused_relevant_kinds3::<FoldA, FoldB, FoldC>();

        // Kills `-> vec![]`: the union must contain all three declared kinds.
        assert_eq!(
            kinds,
            vec![A_KIND, B_KIND, C_KIND],
            "PROPERTY: fused_relevant_kinds3 must return the union of all three folds' declared kinds"
        );
    }

    #[test]
    fn collect_relevant_kinds_dedups_overlapping_slices() {
        let first: &[EventKind] = &[A_KIND];
        let second: &[EventKind] = &[A_KIND, B_KIND];
        let kinds = collect_relevant_kinds(&[first, second]);

        // Kills `-> vec![]` (non-empty) AND `delete !` at the dedup guard: with the
        // `!` deleted the guard becomes `if kinds.contains(&kind)`, so nothing is
        // ever pushed (kinds starts empty) and the result would be empty/wrong. The
        // correct deduped union is exactly [A_KIND, B_KIND] with no duplicate A_KIND.
        assert_eq!(
            kinds,
            vec![A_KIND, B_KIND],
            "PROPERTY: collect_relevant_kinds must dedup overlapping kinds into a non-empty union"
        );
    }

    #[test]
    fn collect_relevant_kinds_repeated_kind_appears_once() {
        let first: &[EventKind] = &[A_KIND, A_KIND];
        let second: &[EventKind] = &[A_KIND];
        let kinds = collect_relevant_kinds(&[first, second]);

        // With the `!` dedup guard deleted, a repeated kind would either vanish
        // (guard never true on empty start) or duplicate; the correct result is a
        // single A_KIND.
        assert_eq!(
            kinds,
            vec![A_KIND],
            "PROPERTY: a kind repeated across slices must appear exactly once"
        );
    }
}