batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
use super::*;
use crate::coordinate::Coordinate;
use crate::event::{Event, EventKind, EventSourced, JsonValueInput};
use crate::store::{Freshness, Store, StoreConfig};
use std::sync::Mutex;
use tempfile::TempDir;

type TestResult = Result<(), Box<dyn std::error::Error>>;

static FUSION_TEST_LOCK: Mutex<()> = Mutex::new(());

const LEFT_KIND: EventKind = EventKind::custom(0xF, 41);
const RIGHT_KIND: EventKind = EventKind::custom(0xF, 42);
const OVERLAP_KIND: EventKind = EventKind::custom(0xF, 43);
const NOISE_KIND: EventKind = EventKind::custom(0xF, 44);

macro_rules! single_entity_state_contract {
    ($key_space:literal) => {
        const STATE_CONTRACT: crate::event::ProjectionStateContract =
            crate::event::ProjectionStateContract::single_entity($key_space);

        fn state_extent(&self) -> crate::event::StateExtent {
            crate::event::StateExtent::single_entity()
        }
    };
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct LeftCount {
    count: usize,
}

impl EventSourced for LeftCount {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-left-count");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        (!events.is_empty()).then_some(Self {
            count: events.len(),
        })
    }

    fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
        self.count += 1;
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[LEFT_KIND]
    }
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct RightTotal {
    total: u64,
}

impl EventSourced for RightTotal {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-right-total");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        let total = events
            .iter()
            .filter_map(|event| event.payload.get("n"))
            .filter_map(serde_json::Value::as_u64)
            .sum::<u64>();
        (total > 0).then_some(Self { total })
    }

    fn apply_event(&mut self, event: &Event<serde_json::Value>) {
        self.total += event
            .payload
            .get("n")
            .and_then(serde_json::Value::as_u64)
            .unwrap_or(0);
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[RIGHT_KIND]
    }
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct OverlapCount {
    count: usize,
}

impl EventSourced for OverlapCount {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-overlap-count");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        (!events.is_empty()).then_some(Self {
            count: events.len(),
        })
    }

    fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
        self.count += 1;
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[OVERLAP_KIND]
    }
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct OverlapTotal {
    total: u64,
}

impl EventSourced for OverlapTotal {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-overlap-total");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        let total = events
            .iter()
            .filter_map(|event| event.payload.get("n"))
            .filter_map(serde_json::Value::as_u64)
            .sum::<u64>();
        (total > 0).then_some(Self { total })
    }

    fn apply_event(&mut self, event: &Event<serde_json::Value>) {
        self.total += event
            .payload
            .get("n")
            .and_then(serde_json::Value::as_u64)
            .unwrap_or(0);
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[OVERLAP_KIND]
    }
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct MatchAllCount {
    count: usize,
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct NoiseCount {
    count: usize,
}

impl EventSourced for NoiseCount {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-noise-count");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        (!events.is_empty()).then_some(Self {
            count: events.len(),
        })
    }

    fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
        self.count += 1;
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[NOISE_KIND]
    }
}

impl EventSourced for MatchAllCount {
    type Input = JsonValueInput;
    single_entity_state_contract!("fusion-match-all-count");

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        (!events.is_empty()).then_some(Self {
            count: events.len(),
        })
    }

    fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
        self.count += 1;
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[]
    }
}

#[test]
fn fused_direct_replay_reads_shared_stream_once() -> TestResult {
    let _lock = match FUSION_TEST_LOCK.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    let dir = TempDir::new()?;
    let store = Store::open(StoreConfig::new(dir.path()))?;
    let coord = Coordinate::new("entity:fused-once", "scope:fused")?;
    drop(store.append(&coord, LEFT_KIND, &serde_json::json!({ "n": 1 }))?);
    drop(store.append(&coord, RIGHT_KIND, &serde_json::json!({ "n": 2 }))?);
    drop(store.append(&coord, RIGHT_KIND, &serde_json::json!({ "n": 3 }))?);

    reset_fused_replay_batch_reads();
    let fused: (Option<LeftCount>, Option<RightTotal>) =
        store.project_fused2("entity:fused-once")?;

    assert_eq!(fused.0, Some(LeftCount { count: 1 }));
    assert_eq!(fused.1, Some(RightTotal { total: 5 }));
    assert_eq!(
        fused_replay_batch_reads(),
        1,
        "PROPERTY: fused projection must batch-read the shared replay stream once"
    );
    store.close()?;
    Ok(())
}

#[test]
fn fused_result_matches_separate_consistent_projections() -> TestResult {
    let _lock = match FUSION_TEST_LOCK.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    let dir = TempDir::new()?;
    let store = Store::open(StoreConfig::new(dir.path()))?;
    let coord = Coordinate::new("entity:fused-equiv", "scope:fused")?;
    drop(store.append(&coord, LEFT_KIND, &serde_json::json!({ "n": 1 }))?);
    drop(store.append(&coord, RIGHT_KIND, &serde_json::json!({ "n": 2 }))?);
    drop(store.append(&coord, LEFT_KIND, &serde_json::json!({ "n": 3 }))?);

    let separate_left: Option<LeftCount> =
        store.project("entity:fused-equiv", &Freshness::Consistent)?;
    let separate_right: Option<RightTotal> =
        store.project("entity:fused-equiv", &Freshness::Consistent)?;
    let fused: (Option<LeftCount>, Option<RightTotal>) =
        store.project_fused2("entity:fused-equiv")?;

    assert_eq!(fused, (separate_left, separate_right));
    store.close()?;
    Ok(())
}

#[test]
fn fused_overlapping_kinds_match_separate_projections_and_batch_read_once() -> TestResult {
    let _lock = match FUSION_TEST_LOCK.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    let dir = TempDir::new()?;
    let store = Store::open(StoreConfig::new(dir.path()))?;
    let coord = Coordinate::new("entity:fused-overlap", "scope:fused")?;
    drop(store.append(&coord, OVERLAP_KIND, &serde_json::json!({ "n": 2 }))?);
    drop(store.append(&coord, NOISE_KIND, &serde_json::json!({ "n": 100 }))?);
    drop(store.append(&coord, OVERLAP_KIND, &serde_json::json!({ "n": 5 }))?);

    let separate_count: Option<OverlapCount> =
        store.project("entity:fused-overlap", &Freshness::Consistent)?;
    let separate_total: Option<OverlapTotal> =
        store.project("entity:fused-overlap", &Freshness::Consistent)?;
    reset_fused_replay_batch_reads();
    let fused: (Option<OverlapCount>, Option<OverlapTotal>) =
        store.project_fused2("entity:fused-overlap")?;

    assert_eq!(
        fused,
        (separate_count, separate_total),
        "PROPERTY: overlapping-kind fused replay must equal separate projections"
    );
    assert_eq!(
        fused_replay_batch_reads(),
        1,
        "PROPERTY: overlapping-kind fusion must still batch-read the shared replay stream once"
    );
    store.close()?;
    Ok(())
}

#[test]
fn fused_empty_relevant_kind_projection_matches_all_events() -> TestResult {
    let _lock = match FUSION_TEST_LOCK.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    let dir = TempDir::new()?;
    let store = Store::open(StoreConfig::new(dir.path()))?;
    let coord = Coordinate::new("entity:fused-match-all", "scope:fused")?;
    drop(store.append(&coord, LEFT_KIND, &serde_json::json!({ "n": 1 }))?);
    drop(store.append(&coord, RIGHT_KIND, &serde_json::json!({ "n": 2 }))?);
    drop(store.append(&coord, NOISE_KIND, &serde_json::json!({ "n": 3 }))?);

    reset_fused_replay_batch_reads();
    let fused: (Option<MatchAllCount>, Option<RightTotal>) =
        store.project_fused2("entity:fused-match-all")?;

    assert_eq!(
        fused,
        (
            Some(MatchAllCount { count: 3 }),
            Some(RightTotal { total: 2 })
        ),
        "PROPERTY: an empty relevant_event_kinds slice means match all events in fused replay"
    );
    assert_eq!(
        fused_replay_batch_reads(),
        1,
        "PROPERTY: match-all fusion must batch-read the shared replay stream once"
    );
    store.close()?;
    Ok(())
}

#[test]
fn fused_three_projection_tuple_batch_reads_once() -> TestResult {
    let _lock = match FUSION_TEST_LOCK.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    let dir = TempDir::new()?;
    let store = Store::open(StoreConfig::new(dir.path()))?;
    let coord = Coordinate::new("entity:fused-three", "scope:fused")?;
    drop(store.append(&coord, LEFT_KIND, &serde_json::json!({ "n": 1 }))?);
    drop(store.append(&coord, RIGHT_KIND, &serde_json::json!({ "n": 2 }))?);
    drop(store.append(&coord, NOISE_KIND, &serde_json::json!({ "n": 3 }))?);

    let separate_left: Option<LeftCount> =
        store.project("entity:fused-three", &Freshness::Consistent)?;
    let separate_right: Option<RightTotal> =
        store.project("entity:fused-three", &Freshness::Consistent)?;
    let separate_noise: Option<NoiseCount> =
        store.project("entity:fused-three", &Freshness::Consistent)?;
    reset_fused_replay_batch_reads();
    let fused: (Option<LeftCount>, Option<RightTotal>, Option<NoiseCount>) =
        store.project_fused3("entity:fused-three")?;

    assert_eq!(
        fused,
        (separate_left, separate_right, separate_noise),
        "PROPERTY: fused 3-projection tuple fold must equal separate consistent folds"
    );
    assert_eq!(
        fused_replay_batch_reads(),
        1,
        "PROPERTY: fused 3-projection tuple must batch-read the shared replay stream once"
    );
    store.close()?;
    Ok(())
}