use batpak::prelude::*;
use batpak::store::projection::{CacheMeta, NativeCache, ProjectionCache};
use batpak::store::{Freshness, IndexTopology, Store, StoreConfig, StoreError, SyncConfig};
use tempfile::TempDir;
const GENERATION_KIND: EventKind = EventKind::custom(0xF, 0x51);
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct GenerationCounter {
count: u32,
}
impl EventSourced for GenerationCounter {
type Input = JsonValueInput;
fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
Some(Self {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[GENERATION_KIND]
}
}
fn cache_meta() -> CacheMeta {
CacheMeta {
watermark: 42,
cached_at_us: 1_000_000,
cached_at_mono_ns: None,
process_boot_ns: None,
}
}
#[test]
fn native_get_surfaces_non_not_found_io_errors() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let cache = NativeCache::open(&cache_path).expect("open native cache");
let shard_path = cache_path.join("ab");
std::fs::write(&shard_path, b"not a directory").expect("create shard path as a file");
let result = cache.get(&[0xAB]);
assert!(
matches!(result, Err(StoreError::CacheFailed(_))),
"PROPERTY: a cache shard path that is not a directory must propagate as CacheFailed, not degrade to a cache miss.\n\
Investigate: src/store/projection/mod.rs NativeCache::get.\n\
Common causes: treating every IO error as ErrorKind::NotFound."
);
}
#[test]
fn project_if_changed_replays_when_watermark_matches_but_generation_advances() {
let dir = TempDir::new().expect("temp dir");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
}
.with_index_topology(IndexTopology::entity_local());
let store = Store::open(config).expect("open store");
let coord = Coordinate::new("entity:generation-watermark", "scope:test").expect("coord");
store
.append(&coord, GENERATION_KIND, &serde_json::json!({"x": 1}))
.expect("append relevant 1");
store
.append(&coord, GENERATION_KIND, &serde_json::json!({"x": 2}))
.expect("append relevant 2");
let seeded: Option<GenerationCounter> = store
.project("entity:generation-watermark", &Freshness::Consistent)
.expect("seed group-local cache");
assert_eq!(seeded, Some(GenerationCounter { count: 2 }));
let baseline_generation = store
.entity_generation("entity:generation-watermark")
.expect("baseline generation");
store
.append(
&coord,
EventKind::custom(0xF, 9),
&serde_json::json!({"irrelevant": true}),
)
.expect("append irrelevant");
let changed = store
.project_if_changed::<GenerationCounter>(
"entity:generation-watermark",
baseline_generation,
&Freshness::Consistent,
)
.expect("project_if_changed")
.expect("entity generation changed");
assert!(
changed.0 > baseline_generation,
"PROPERTY: when an irrelevant append advances entity generation but leaves the relevant watermark unchanged, project_if_changed must replay and return the newer materialization generation"
);
assert_eq!(changed.1, Some(GenerationCounter { count: 2 }));
store.close().expect("close");
}
#[test]
fn native_get_put_smoke_keeps_test_meta_used() {
let dir = TempDir::new().expect("temp dir");
let cache = NativeCache::open(dir.path().join("cache")).expect("open native cache");
cache
.put(b"key", b"value", cache_meta())
.expect("put cache value");
let result = cache.get(b"key").expect("get cache value");
assert!(
result.is_some(),
"PROPERTY: the focused edge file still exercises a successful NativeCache get path"
);
}