batpak 0.7.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
use batpak::prelude::*;
use batpak::store::{BatchAppendItem, CausationRef, Store, StoreConfig};
use tempfile::TempDir;

const KIND: EventKind = EventKind::custom(0xF, 0x66);
const BATCH_LEN: usize = 96;

fn test_config(dir: &TempDir) -> StoreConfig {
    StoreConfig::new(dir.path())
        .with_group_commit_max_batch(16)
        .with_sync_every_n_events(512)
        .with_enable_checkpoint(false)
        .with_enable_mmap_index(false)
}

fn coord_parts(idx: usize) -> (String, String) {
    (
        format!("entity:batch-stage:{}", idx % 6),
        format!("scope:batch-stage:{}", idx % 3),
    )
}

fn reused_coordinate_batch() -> Vec<BatchAppendItem> {
    let templates: Vec<_> = (0..6)
        .map(|idx| {
            let (entity, scope) = coord_parts(idx);
            Coordinate::new(entity, scope).expect("template coordinate")
        })
        .collect();

    (0..BATCH_LEN)
        .map(|idx| {
            BatchAppendItem::new(
                templates[idx % templates.len()].clone(),
                KIND,
                &serde_json::json!({"batch": "reused", "i": idx, "bucket": idx % 4}),
                AppendOptions::new().with_idempotency(0xA000 + idx as u128),
                CausationRef::None,
            )
            .expect("reused batch item")
        })
        .collect()
}

fn fresh_coordinate_batch() -> Vec<BatchAppendItem> {
    (0..BATCH_LEN)
        .map(|idx| {
            let (entity, scope) = coord_parts(idx);
            BatchAppendItem::new(
                Coordinate::new(entity, scope).expect("fresh coordinate"),
                KIND,
                &serde_json::json!({"batch": "fresh", "i": idx, "bucket": idx % 4}),
                AppendOptions::new().with_idempotency(0xB000 + idx as u128),
                CausationRef::None,
            )
            .expect("fresh batch item")
        })
        .collect()
}

fn duplicate_heavy_batch() -> Vec<BatchAppendItem> {
    (0usize..72)
        .map(|idx| {
            let entity = if idx % 2 == 0 {
                "entity:dup:a"
            } else {
                "entity:dup:b"
            };
            let scope = if idx % 3 == 0 {
                "scope:dup:x"
            } else {
                "scope:dup:y"
            };
            BatchAppendItem::new(
                Coordinate::new(entity, scope).expect("duplicate-heavy coordinate"),
                KIND,
                &serde_json::json!({"i": idx, "entity": entity, "scope": scope}),
                AppendOptions::new().with_idempotency(0xC000 + idx as u128),
                CausationRef::None,
            )
            .expect("duplicate-heavy item")
        })
        .collect()
}

fn snapshot(store: &Store) -> Vec<(String, String, u64)> {
    let mut rows: Vec<_> = store
        .by_fact(KIND)
        .into_iter()
        .map(|entry| {
            (
                entry.coord.entity().to_owned(),
                entry.coord.scope().to_owned(),
                entry.global_sequence,
            )
        })
        .collect();
    rows.sort_by_key(|(_, _, sequence)| *sequence);
    rows
}

fn assert_contiguous_sequences(rows: &[(String, String, u64)], label: &str) {
    let sequences: Vec<_> = rows.iter().map(|(_, _, sequence)| *sequence).collect();
    assert!(
        !sequences.is_empty(),
        "PROPERTY: {label} should have at least one visible sequence"
    );
    let first_sequence = sequences[0];
    assert_eq!(
        sequences,
        (first_sequence..first_sequence + rows.len() as u64).collect::<Vec<_>>(),
        "{label} should publish contiguous visible sequences"
    );
}

#[test]
fn reused_and_fresh_coordinate_batches_have_identical_visibility_after_reopen() {
    let reused_dir = TempDir::new().expect("reused temp dir");
    let fresh_dir = TempDir::new().expect("fresh temp dir");

    let reused_store = Store::open(test_config(&reused_dir)).expect("open reused store");
    let fresh_store = Store::open(test_config(&fresh_dir)).expect("open fresh store");

    let reused_receipts = reused_store
        .append_batch(reused_coordinate_batch())
        .expect("append reused batch");
    let fresh_receipts = fresh_store
        .append_batch(fresh_coordinate_batch())
        .expect("append fresh batch");
    assert_eq!(reused_receipts.len(), BATCH_LEN);
    assert_eq!(fresh_receipts.len(), BATCH_LEN);

    reused_store.sync().expect("sync reused");
    fresh_store.sync().expect("sync fresh");

    let reused_snapshot = snapshot(&reused_store);
    let fresh_snapshot = snapshot(&fresh_store);
    assert_eq!(
        reused_snapshot.len(),
        BATCH_LEN,
        "reused-coordinate batch should surface every event before reopen"
    );
    assert_eq!(
        fresh_snapshot.len(),
        BATCH_LEN,
        "fresh-coordinate batch should surface every event before reopen"
    );

    assert_eq!(
        reused_snapshot
            .iter()
            .map(|(entity, scope, _)| (entity.clone(), scope.clone()))
            .collect::<Vec<_>>(),
        fresh_snapshot
            .iter()
            .map(|(entity, scope, _)| (entity.clone(), scope.clone()))
            .collect::<Vec<_>>(),
        "batch staging must preserve the same visible entity/scope ordering regardless of coordinate construction pattern"
    );
    assert_contiguous_sequences(&reused_snapshot, "reused-coordinate batch");
    assert_contiguous_sequences(&fresh_snapshot, "fresh-coordinate batch");

    reused_store.close().expect("close reused");
    fresh_store.close().expect("close fresh");

    let reopened_reused = Store::open(test_config(&reused_dir)).expect("reopen reused store");
    let reopened_fresh = Store::open(test_config(&fresh_dir)).expect("reopen fresh store");
    assert_eq!(
        snapshot(&reopened_reused),
        reused_snapshot,
        "reused-coordinate batch should reopen with identical visible state"
    );
    assert_eq!(
        snapshot(&reopened_fresh),
        fresh_snapshot,
        "fresh-coordinate batch should reopen with identical visible state"
    );
    assert_eq!(
        reopened_reused.by_scope("scope:batch-stage:0").len(),
        reopened_fresh.by_scope("scope:batch-stage:0").len(),
        "scope query results should match across coordinate construction patterns"
    );

    reopened_reused.close().expect("close reopened reused");
    reopened_fresh.close().expect("close reopened fresh");
}

#[test]
fn duplicate_heavy_batch_preserves_scope_and_stream_queries_after_reopen() {
    let dir = TempDir::new().expect("temp dir");
    let store = Store::open(test_config(&dir)).expect("open store");

    let receipts = store
        .append_batch(duplicate_heavy_batch())
        .expect("append duplicate-heavy batch");
    assert_eq!(receipts.len(), 72);

    store.sync().expect("sync duplicate-heavy batch");
    let live_snapshot = snapshot(&store);
    assert_eq!(
        live_snapshot.len(),
        72,
        "duplicate-heavy batch should surface every event"
    );
    assert_eq!(
        store.stream("entity:dup:a").len(),
        36,
        "entity stream should preserve duplicate-heavy batch membership"
    );
    assert_eq!(
        store.stream("entity:dup:b").len(),
        36,
        "entity stream should preserve duplicate-heavy batch membership"
    );
    let scope_x = live_snapshot
        .iter()
        .filter(|(_, scope, _)| scope == "scope:dup:x")
        .count();
    let scope_y = live_snapshot
        .iter()
        .filter(|(_, scope, _)| scope == "scope:dup:y")
        .count();
    assert_eq!(
        scope_x, 24,
        "live snapshot should preserve scope x membership"
    );
    assert_eq!(
        scope_y, 48,
        "live snapshot should preserve scope y membership"
    );

    assert_contiguous_sequences(&live_snapshot, "duplicate-heavy batch");

    store.close().expect("close");

    let reopened = Store::open(test_config(&dir)).expect("reopen store");
    let reopened_snapshot = snapshot(&reopened);
    assert_eq!(reopened_snapshot.len(), 72);
    assert_eq!(reopened.stream("entity:dup:a").len(), 36);
    assert_eq!(reopened.stream("entity:dup:b").len(), 36);
    assert_eq!(
        reopened_snapshot
            .iter()
            .filter(|(_, scope, _)| scope == "scope:dup:x")
            .count(),
        24
    );
    assert_eq!(
        reopened_snapshot
            .iter()
            .filter(|(_, scope, _)| scope == "scope:dup:y")
            .count(),
        48
    );
    reopened.close().expect("close reopened");
}