batpak 0.7.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
//! Focused projection-cache edge pins split out of `projection_cache.rs`.
//!
//! PROVES: LAW-001 (cache failures and generation-aware replay stay honest)
//! CATCHES: Cache I/O downgrades and project_if_changed stale-generation shortcuts
//! SEEDED: mutation-smoke gaps in NativeCache::get and project_inner generation checks

use batpak::prelude::*;
use batpak::store::projection::{CacheMeta, NativeCache, ProjectionCache};
use batpak::store::{Freshness, IndexTopology, Store, StoreConfig, StoreError, SyncConfig};
use tempfile::TempDir;

const GENERATION_KIND: EventKind = EventKind::custom(0xF, 0x51);

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct GenerationCounter {
    count: u32,
}

impl EventSourced for GenerationCounter {
    type Input = JsonValueInput;

    fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
        Some(Self {
            count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
        })
    }

    fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
        self.count += 1;
    }

    fn relevant_event_kinds() -> &'static [EventKind] {
        &[GENERATION_KIND]
    }
}

fn cache_meta() -> CacheMeta {
    CacheMeta {
        watermark: 42,
        cached_at_us: 1_000_000,
        cached_at_mono_ns: None,
        process_boot_ns: None,
    }
}

#[test]
fn native_get_surfaces_non_not_found_io_errors() {
    let dir = TempDir::new().expect("temp dir");
    let cache_path = dir.path().join("cache");
    let cache = NativeCache::open(&cache_path).expect("open native cache");
    let shard_path = cache_path.join("ab");
    std::fs::write(&shard_path, b"not a directory").expect("create shard path as a file");

    let result = cache.get(&[0xAB]);
    assert!(
        matches!(result, Err(StoreError::CacheFailed(_))),
        "PROPERTY: a cache shard path that is not a directory must propagate as CacheFailed, not degrade to a cache miss.\n\
         Investigate: src/store/projection/mod.rs NativeCache::get.\n\
         Common causes: treating every IO error as ErrorKind::NotFound."
    );
}

#[test]
fn project_if_changed_replays_when_watermark_matches_but_generation_advances() {
    let dir = TempDir::new().expect("temp dir");
    let config = StoreConfig {
        data_dir: dir.path().join("data"),
        segment_max_bytes: 4096,
        sync: SyncConfig {
            every_n_events: 1,
            ..SyncConfig::default()
        },
        ..StoreConfig::new("")
    }
    .with_index_topology(IndexTopology::entity_local());
    let store = Store::open(config).expect("open store");

    let coord = Coordinate::new("entity:generation-watermark", "scope:test").expect("coord");
    store
        .append(&coord, GENERATION_KIND, &serde_json::json!({"x": 1}))
        .expect("append relevant 1");
    store
        .append(&coord, GENERATION_KIND, &serde_json::json!({"x": 2}))
        .expect("append relevant 2");

    let seeded: Option<GenerationCounter> = store
        .project("entity:generation-watermark", &Freshness::Consistent)
        .expect("seed group-local cache");
    assert_eq!(seeded, Some(GenerationCounter { count: 2 }));
    let baseline_generation = store
        .entity_generation("entity:generation-watermark")
        .expect("baseline generation");

    store
        .append(
            &coord,
            EventKind::custom(0xF, 9),
            &serde_json::json!({"irrelevant": true}),
        )
        .expect("append irrelevant");

    let changed = store
        .project_if_changed::<GenerationCounter>(
            "entity:generation-watermark",
            baseline_generation,
            &Freshness::Consistent,
        )
        .expect("project_if_changed")
        .expect("entity generation changed");

    assert!(
        changed.0 > baseline_generation,
        "PROPERTY: when an irrelevant append advances entity generation but leaves the relevant watermark unchanged, project_if_changed must replay and return the newer materialization generation"
    );
    assert_eq!(changed.1, Some(GenerationCounter { count: 2 }));

    store.close().expect("close");
}

#[test]
fn native_get_put_smoke_keeps_test_meta_used() {
    let dir = TempDir::new().expect("temp dir");
    let cache = NativeCache::open(dir.path().join("cache")).expect("open native cache");
    cache
        .put(b"key", b"value", cache_meta())
        .expect("put cache value");

    let result = cache.get(b"key").expect("get cache value");
    assert!(
        result.is_some(),
        "PROPERTY: the focused edge file still exercises a successful NativeCache get path"
    );
}