use batpak::store::projection::{CacheMeta, NoCache, ProjectionCache};
use batpak::store::StoreError;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
fn test_meta() -> CacheMeta {
CacheMeta {
watermark: 42,
cached_at_us: 1_000_000,
cached_at_mono_ns: None,
process_boot_ns: None,
}
}
struct GetErrorCache;
impl ProjectionCache for GetErrorCache {
fn capabilities(&self) -> batpak::store::projection::CacheCapabilities {
batpak::store::projection::CacheCapabilities::none()
}
fn get(&self, _key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
Err(StoreError::CacheFailed(
"simulated cache get failure".into(),
))
}
fn put(&self, _key: &[u8], _value: &[u8], _meta: CacheMeta) -> Result<(), StoreError> {
Ok(())
}
fn delete_prefix(&self, _prefix: &[u8]) -> Result<u64, StoreError> {
Ok(0)
}
fn sync(&self) -> Result<(), StoreError> {
Ok(())
}
}
#[derive(Default)]
struct CacheProbeCounters {
gets: AtomicUsize,
puts: AtomicUsize,
prefetches: AtomicUsize,
}
struct CountingCache {
counters: Arc<CacheProbeCounters>,
}
impl ProjectionCache for CountingCache {
fn capabilities(&self) -> batpak::store::projection::CacheCapabilities {
batpak::store::projection::CacheCapabilities {
is_noop: false,
supports_prefetch: true,
}
}
fn get(&self, _key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
self.counters.gets.fetch_add(1, Ordering::SeqCst);
Ok(None)
}
fn put(&self, _key: &[u8], _value: &[u8], _meta: CacheMeta) -> Result<(), StoreError> {
self.counters.puts.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn delete_prefix(&self, _prefix: &[u8]) -> Result<u64, StoreError> {
Ok(0)
}
fn sync(&self) -> Result<(), StoreError> {
Ok(())
}
fn prefetch(&self, _key: &[u8], _predicted_meta: CacheMeta) -> Result<(), StoreError> {
self.counters.prefetches.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
fn legacy_cache_bytes(value: &[u8], watermark: u64, cached_at_us: i64) -> Vec<u8> {
let mut bytes = value.to_vec();
bytes.extend_from_slice(&watermark.to_le_bytes());
bytes.extend_from_slice(&cached_at_us.to_le_bytes());
bytes
}
#[test]
fn nocache_get_always_returns_none() {
let cache = NoCache;
let result = cache.get(b"any_key").expect("get should not error");
assert!(
result.is_none(),
"NoCache::get should always return None. Investigate: src/store/projection/mod.rs NoCache."
);
}
#[test]
fn nocache_put_is_noop() {
let cache = NoCache;
cache
.put(b"key", b"value", test_meta())
.expect("put should not error");
let result = cache.get(b"key").expect("get");
assert!(result.is_none(), "NoCache should not store anything.");
}
#[test]
fn nocache_delete_prefix_returns_zero() {
let cache = NoCache;
let count = cache.delete_prefix(b"prefix").expect("delete_prefix");
assert_eq!(count, 0, "NoCache::delete_prefix should return 0.");
}
#[test]
fn nocache_sync_is_noop() {
let cache = NoCache;
cache.sync().expect("NoCache::sync should not error.");
}
mod native_tests {
use super::*;
use batpak::store::projection::NativeCache;
use tempfile::TempDir;
fn native_cache() -> (NativeCache, TempDir) {
let dir = TempDir::new().expect("temp dir");
let path = dir.path().join("cache");
let cache = NativeCache::open(&path).expect("open native cache");
(cache, dir)
}
#[test]
fn native_get_put_round_trip() {
let (cache, _dir) = native_cache();
let meta = test_meta();
cache.put(b"key1", b"hello", meta.clone()).expect("put");
let (value, returned_meta) = cache.get(b"key1").expect("get").expect("should be Some");
assert_eq!(
value, b"hello",
"NativeCache round-trip failed. Investigate: src/store/projection/mod.rs NativeCache."
);
assert_eq!(
returned_meta.watermark, 42,
"NATIVE ROUND-TRIP META WATERMARK: watermark should be preserved across put/get.\n\
Investigate: src/store/projection/mod.rs NativeCache::put and NativeCache::get.\n\
Common causes: CacheMeta serialization losing watermark field."
);
assert_eq!(
returned_meta.cached_at_us, 1_000_000,
"NATIVE ROUND-TRIP META CACHED_AT: cached_at_us should be preserved across put/get.\n\
Investigate: src/store/projection/mod.rs NativeCache::put and NativeCache::get.\n\
Common causes: CacheMeta serialization losing cached_at_us field."
);
assert!(
cache.get(b"nonexistent").expect("get").is_none(),
"NATIVE ROUND-TRIP: get for a key that was never inserted should return None.\n\
Investigate: src/store/projection/mod.rs NativeCache::get."
);
}
#[test]
fn native_delete_prefix() {
let (cache, _dir) = native_cache();
let meta = test_meta();
cache.put(b"user:1", b"alice", meta.clone()).expect("put");
cache.put(b"user:2", b"bob", meta.clone()).expect("put");
cache.put(b"order:1", b"widget", meta.clone()).expect("put");
let deleted = cache.delete_prefix(b"user:").expect("delete_prefix");
assert_eq!(deleted, 2, "Should delete 2 keys with prefix 'user:'.");
assert!(
cache.get(b"user:1").expect("get").is_none(),
"NATIVE DELETE PREFIX: key 'user:1' should be gone after delete_prefix('user:').\n\
Investigate: src/store/projection/mod.rs NativeCache::delete_prefix."
);
assert!(
cache.get(b"user:2").expect("get").is_none(),
"NATIVE DELETE PREFIX: key 'user:2' should be gone after delete_prefix('user:')."
);
assert!(
cache.get(b"order:1").expect("get").is_some(),
"NATIVE DELETE PREFIX: key 'order:1' should survive delete_prefix('user:')."
);
}
#[test]
fn native_delete_prefix_is_idempotent() {
let (cache, _dir) = native_cache();
let meta = test_meta();
cache.put(b"user:1", b"alice", meta.clone()).expect("put");
cache.put(b"user:2", b"bob", meta).expect("put");
let first = cache.delete_prefix(b"user:").expect("delete prefix");
let second = cache.delete_prefix(b"user:").expect("delete prefix again");
cache.sync().expect("sync");
assert_eq!(
first, 2,
"NATIVE DELETE PREFIX IDEMPOTENCE: first delete should remove both matching entries."
);
assert_eq!(
second, 0,
"NATIVE DELETE PREFIX IDEMPOTENCE: repeating the delete must be a clean no-op."
);
}
#[test]
fn native_sync_is_safe() {
let (cache, _dir) = native_cache();
cache.sync().expect("NativeCache::sync should not error.");
}
#[test]
fn native_reopen_preserves_cache() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let meta = test_meta();
{
let cache = NativeCache::open(&cache_path).expect("open");
cache
.put(b"persistent_key", b"durable_value", meta)
.expect("put");
}
{
let cache = NativeCache::open(&cache_path).expect("reopen");
let (value, returned_meta) = cache
.get(b"persistent_key")
.expect("get")
.expect("should be Some after reopen");
assert_eq!(
value, b"durable_value",
"NativeCache must survive process restart."
);
assert_eq!(returned_meta.watermark, 42);
}
}
#[test]
fn native_corruption_falls_back_to_cache_miss() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let meta = test_meta();
let cache = NativeCache::open(&cache_path).expect("open");
cache.put(b"corrupt_me", b"valid_data", meta).expect("put");
let hex_key: String = b"corrupt_me".iter().map(|b| format!("{b:02x}")).collect();
let shard = &hex_key[..2];
let corrupt_path = cache_path.join(shard).join(format!("{hex_key}.bin"));
std::fs::write(&corrupt_path, b"garbage").expect("write garbage");
let result = cache.get(b"corrupt_me").expect("get should not error");
assert!(
result.is_none(),
"NATIVE CORRUPTION: corrupt cache file should degrade to cache miss, not error.\n\
Investigate: src/store/projection/mod.rs NativeCache::get decode error path."
);
assert!(
!corrupt_path.exists(),
"NATIVE SELF-HEAL: corrupt cache file should be deleted after failed decode."
);
}
#[test]
fn native_delete_prefix_with_0xff_keys() {
let (cache, _dir) = native_cache();
let meta = test_meta();
cache
.put(&[0xFF, 0x01], b"val1", meta.clone())
.expect("put");
cache
.put(&[0xFF, 0x02], b"val2", meta.clone())
.expect("put");
cache
.put(&[0xFF, 0xFF], b"val3", meta.clone())
.expect("put");
cache
.put(&[0xFE, 0x01], b"other", meta.clone())
.expect("put");
let deleted = cache.delete_prefix(&[0xFF]).expect("delete_prefix");
assert_eq!(
deleted, 3,
"DELETE PREFIX 0xFF: should delete all 3 keys starting with 0xFF."
);
assert!(
cache.get(&[0xFE, 0x01]).expect("get").is_some(),
"DELETE PREFIX 0xFF: key [0xFE, 0x01] should survive prefix delete of [0xFF]."
);
}
#[test]
fn native_delete_prefix_empty_prefix_deletes_all() {
let (cache, _dir) = native_cache();
let meta = test_meta();
cache.put(b"a", b"1", meta.clone()).expect("put");
cache.put(b"b", b"2", meta.clone()).expect("put");
cache.put(b"z", b"3", meta.clone()).expect("put");
let deleted = cache.delete_prefix(b"").expect("delete_prefix");
assert_eq!(
deleted, 3,
"DELETE PREFIX EMPTY: empty prefix should match all keys."
);
}
use batpak::prelude::*;
use batpak::store::SyncConfig;
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct Counter {
count: u32,
}
impl EventSourced for Counter {
type Input = batpak::prelude::JsonValueInput;
fn from_events(events: &[batpak::prelude::Event<serde_json::Value>]) -> Option<Self> {
Some(Counter {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &batpak::prelude::Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[]
}
}
#[test]
fn native_projection_round_trip() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let store = Store::open_with_native_cache(config, &cache_path)
.expect("open store with native cache");
let coord = Coordinate::new("entity:native1", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let result: Option<Counter> = store
.project("entity:native1", &Freshness::Consistent)
.expect("project");
assert_eq!(
result,
Some(Counter { count: 2 }),
"NATIVE PROJECTION ROUND-TRIP: first project should replay 2 events."
);
let result2: Option<Counter> = store
.project("entity:native1", &Freshness::Consistent)
.expect("project 2");
assert_eq!(result2, Some(Counter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
let result3: Option<Counter> = store
.project("entity:native1", &Freshness::Consistent)
.expect("project 3");
assert_eq!(
result3,
Some(Counter { count: 3 }),
"NATIVE CACHE INVALIDATION: after appending more events, project should re-replay."
);
store.close().expect("close");
}
#[test]
fn native_delete_prefix_then_project_repopulates_cache() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:native-miss", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store =
Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let _: Option<Counter> = store
.project("entity:native-miss", &Freshness::Consistent)
.expect("warm cache");
store.close().expect("close");
}
{
let cache = NativeCache::open(&cache_path).expect("reopen cache");
let deleted = cache
.delete_prefix(b"entity:native-miss")
.expect("delete prefix");
assert!(
deleted >= 1,
"NATIVE CACHE MISS PROOF: delete_prefix should remove at least one warmed cache key, got {deleted}."
);
}
{
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
let result: Option<Counter> = store
.project("entity:native-miss", &Freshness::Consistent)
.expect("project after delete");
assert_eq!(result, Some(Counter { count: 2 }));
store.close().expect("close");
}
let cache = NativeCache::open(&cache_path).expect("final reopen cache");
let repopulated = cache
.delete_prefix(b"entity:native-miss")
.expect("check repopulated");
assert!(
repopulated >= 1,
"NATIVE CACHE MISS PROOF: projecting after delete_prefix must repopulate the cache key."
);
}
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct MaybeStaleCounter {
count: u32,
}
impl batpak::prelude::EventSourced for MaybeStaleCounter {
type Input = batpak::prelude::JsonValueInput;
fn from_events(events: &[batpak::prelude::Event<serde_json::Value>]) -> Option<Self> {
Some(MaybeStaleCounter {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &batpak::prelude::Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [batpak::prelude::EventKind] {
&[]
}
}
const MAYBE_STALE_GENERATION_KIND: batpak::prelude::EventKind =
batpak::prelude::EventKind::custom(0xF, 0x51);
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct MaybeStaleGenerationCounter {
count: u32,
}
impl batpak::prelude::EventSourced for MaybeStaleGenerationCounter {
type Input = batpak::prelude::JsonValueInput;
fn from_events(events: &[batpak::prelude::Event<serde_json::Value>]) -> Option<Self> {
Some(Self {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &batpak::prelude::Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [batpak::prelude::EventKind] {
&[MAYBE_STALE_GENERATION_KIND]
}
}
fn find_only_native_cache_entry(cache_path: &std::path::Path) -> std::path::PathBuf {
let mut stack = vec![cache_path.to_path_buf()];
let mut entries = Vec::new();
while let Some(dir) = stack.pop() {
for entry in std::fs::read_dir(&dir).expect("read_dir") {
let entry = entry.expect("dir entry");
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(std::ffi::OsStr::to_str) == Some("bin") {
entries.push(path);
}
}
}
assert_eq!(
entries.len(),
1,
"PROJECTION CACHE TEST SETUP: expected exactly one native cache entry, found {}",
entries.len()
);
entries.pop().expect("single cache entry")
}
#[test]
fn freshness_maybe_stale_serves_stale_cache_within_window() {
use batpak::prelude::*;
use batpak::store::{Freshness, NativeCache, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let cache = NativeCache::open(&cache_path).expect("open native cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let store = Store::open_with_cache(config, Box::new(cache)).expect("open store");
let coord = Coordinate::new("entity:besteff1", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let result: Option<MaybeStaleCounter> = store
.project("entity:besteff1", &Freshness::Consistent)
.expect("project consistent");
assert_eq!(result, Some(MaybeStaleCounter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
let result_best: Option<MaybeStaleCounter> = store
.project(
"entity:besteff1",
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project maybe stale");
assert_eq!(
result_best,
Some(MaybeStaleCounter { count: 2 }),
"FRESHNESS BEST EFFORT: with large stale window, should serve cached value (count=2) \
even though a 3rd event was appended.\n\
Investigate: src/store/mod.rs project() MaybeStale branch."
);
let result_strict: Option<MaybeStaleCounter> = store
.project(
"entity:besteff1",
&Freshness::MaybeStale { max_stale_ms: 0 },
)
.expect("project maybe stale strict");
assert_eq!(
result_strict,
Some(MaybeStaleCounter { count: 3 }),
"FRESHNESS BEST EFFORT ZERO: with max_stale_ms=0, cache should always be considered \
stale, forcing a full replay (count=3)."
);
store.close().expect("close");
}
#[test]
fn freshness_maybe_stale_replays_when_stale_cache_bytes_are_corrupt() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:maybe-stale-corrupt", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store = Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:maybe-stale-corrupt", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
store.close().expect("close seeded store");
}
let cache_entry = find_only_native_cache_entry(&cache_path);
let mut corrupted = std::fs::read(&cache_entry).expect("read cache entry");
let last = corrupted
.len()
.checked_sub(1)
.expect("non-empty cache entry");
corrupted[last] ^= 0x5A;
std::fs::write(&cache_entry, corrupted).expect("corrupt cache entry");
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
let result: Option<MaybeStaleCounter> = store
.project(
"entity:maybe-stale-corrupt",
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project maybe stale after corruption");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 3 }),
"MAYBE STALE CORRUPTION HONESTY: a stale-but-young corrupt cache row must fall back to replay and return the current folded state.\n\
It must not serve garbage and must not preserve the stale count=2 row just because the age window still says 'fresh enough'."
);
store.close().expect("close");
}
#[test]
fn freshness_maybe_stale_replays_when_fresh_cache_bytes_are_corrupt() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:maybe-stale-fresh-corrupt", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store = Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:maybe-stale-fresh-corrupt", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store.close().expect("close seeded store");
}
let cache_entry = find_only_native_cache_entry(&cache_path);
let mut corrupted = std::fs::read(&cache_entry).expect("read cache entry");
let first = corrupted.first_mut().expect("non-empty cache entry");
*first ^= 0xA5;
std::fs::write(&cache_entry, corrupted).expect("corrupt cache entry");
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
let result: Option<MaybeStaleCounter> = store
.project(
"entity:maybe-stale-fresh-corrupt",
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project maybe stale after fresh corruption");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 2 }),
"MAYBE STALE FRESH CORRUPTION HONESTY: a fresh-but-corrupt cache row must fall back to replay and return the current folded state.\n\
It must not fail open just because the age window still says 'fresh enough'."
);
store.close().expect("close");
}
#[test]
fn project_if_changed_never_pairs_maybe_stale_cache_with_new_generation() {
use batpak::prelude::*;
use batpak::store::{Freshness, NativeCache, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let cache = NativeCache::open(&cache_path).expect("open native cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let store = Store::open_with_cache(config, Box::new(cache)).expect("open store");
let coord = Coordinate::new("entity:generation-honesty", "scope:test").expect("coord");
store
.append(
&coord,
MAYBE_STALE_GENERATION_KIND,
&serde_json::json!({"x": 1}),
)
.expect("append 1");
store
.append(
&coord,
MAYBE_STALE_GENERATION_KIND,
&serde_json::json!({"x": 2}),
)
.expect("append 2");
let seeded: Option<MaybeStaleGenerationCounter> = store
.project("entity:generation-honesty", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleGenerationCounter { count: 2 }));
let baseline_generation = store
.entity_generation("entity:generation-honesty")
.expect("baseline generation");
store
.append(
&coord,
MAYBE_STALE_GENERATION_KIND,
&serde_json::json!({"x": 3}),
)
.expect("append 3");
let changed = store
.project_if_changed::<MaybeStaleGenerationCounter>(
"entity:generation-honesty",
baseline_generation,
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project_if_changed")
.expect("changed projection");
assert!(
changed.0 > baseline_generation,
"generation should advance after the third relevant append"
);
assert_eq!(
changed.1,
Some(MaybeStaleGenerationCounter { count: 3 }),
"PROPERTY: project_if_changed must not return stale cache bytes together with a newer generation token.\n\
Investigate: src/store/projection/flow.rs project_if_changed() MaybeStale path."
);
store.close().expect("close");
}
#[test]
fn empty_projection_surface_skips_cache_for_no_replay_plan() {
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let counters = Arc::new(CacheProbeCounters::default());
let cache = CountingCache {
counters: Arc::clone(&counters),
};
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let store = Store::open_with_cache(config, Box::new(cache)).expect("open store");
let consistent: Option<MaybeStaleCounter> = store
.project("entity:no-events", &Freshness::Consistent)
.expect("project empty consistent");
let maybe_stale: Option<MaybeStaleCounter> = store
.project(
"entity:no-events",
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project empty maybe stale");
let unchanged = store
.project_if_changed::<MaybeStaleCounter>("entity:no-events", 0, &Freshness::Consistent)
.expect("project_if_changed empty");
assert_eq!(
consistent, None,
"EMPTY PROJECTION SURFACE: project() on an entity with no replay plan should return None."
);
assert_eq!(
maybe_stale, None,
"EMPTY PROJECTION SURFACE: MaybeStale must not invent a cache-backed value for an empty entity."
);
assert_eq!(
unchanged, None,
"EMPTY PROJECTION SURFACE: project_if_changed() should report no change for a never-seen entity."
);
assert_eq!(
counters.gets.load(Ordering::SeqCst),
0,
"EMPTY PROJECTION SURFACE: no replay plan should skip external cache get entirely."
);
assert_eq!(
counters.prefetches.load(Ordering::SeqCst),
0,
"EMPTY PROJECTION SURFACE: no replay plan should skip cache prefetch entirely."
);
assert_eq!(
counters.puts.load(Ordering::SeqCst),
0,
"EMPTY PROJECTION SURFACE: no replay plan should not populate cache."
);
store.close().expect("close");
}
#[test]
fn consistent_replays_when_reopened_native_cache_row_is_stale() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:consistent-stale-cache", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store = Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:consistent-stale-cache", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store.close().expect("close seeded store");
}
{
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
let result: Option<MaybeStaleCounter> = store
.project("entity:consistent-stale-cache", &Freshness::Consistent)
.expect("project after stale cache");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 3 }),
"CONSISTENT STALE CACHE HONESTY: after reopen, a populated external cache row with an older watermark must be bypassed and replayed."
);
store.close().expect("close");
}
}
#[test]
fn maybe_stale_replays_when_cache_row_has_valid_metadata_but_empty_payload() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:metadata-only-stale", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store = Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:metadata-only-stale", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
store.close().expect("close seeded store");
}
let cache_entry = find_only_native_cache_entry(&cache_path);
std::fs::write(&cache_entry, legacy_cache_bytes(&[], 2, 1_000_000))
.expect("write metadata-only legacy cache entry");
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
let result: Option<MaybeStaleCounter> = store
.project(
"entity:metadata-only-stale",
&Freshness::MaybeStale {
max_stale_ms: 60_000,
},
)
.expect("project maybe stale after metadata-only cache row");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 3 }),
"METADATA-ONLY CACHE HONESTY: a cache file with valid metadata but undecodable payload must replay rather than serve a stale MaybeStale success."
);
store.close().expect("close");
}
#[test]
fn consistent_replays_when_cache_row_has_valid_metadata_but_truncated_payload() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let coord = Coordinate::new("entity:metadata-valid-truncated", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
{
let store = Store::open_with_native_cache(config.clone(), &cache_path).expect("open store");
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:metadata-valid-truncated", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store.close().expect("close seeded store");
}
let cache_entry = find_only_native_cache_entry(&cache_path);
std::fs::write(&cache_entry, legacy_cache_bytes(b"\x92", 2, 1_000_000))
.expect("write truncated-payload legacy cache entry");
let store = Store::open_with_native_cache(config, &cache_path).expect("reopen store");
let result: Option<MaybeStaleCounter> = store
.project("entity:metadata-valid-truncated", &Freshness::Consistent)
.expect("project after truncated cache payload");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 2 }),
"TRUNCATED PAYLOAD CACHE HONESTY: valid cache metadata with an undecodable payload must fall back to replay under Consistent freshness."
);
store.close().expect("close");
}
#[test]
fn projection_replays_when_cache_get_errors() {
use batpak::prelude::*;
use batpak::store::{Freshness, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
};
let store = Store::open_with_cache(config, Box::new(GetErrorCache)).expect("open store");
let coord = Coordinate::new("entity:cache-get-error", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let result: Option<MaybeStaleCounter> = store
.project("entity:cache-get-error", &Freshness::Consistent)
.expect("project after cache get error");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 2 }),
"CACHE GET ERROR HONESTY: a cache backend get failure must fall back to replay and return the honest folded state."
);
store.close().expect("close");
}
#[test]
fn freshness_maybe_stale_replays_at_exact_age_boundary() {
use batpak::prelude::*;
use batpak::store::{Freshness, NativeCache, Store, StoreConfig, SyncConfig};
use tempfile::TempDir;
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache");
let cache = NativeCache::open(&cache_path).expect("open native cache");
let now_us = Arc::new(AtomicI64::new(1_000_000));
let clock: Arc<dyn Fn() -> i64 + Send + Sync> = {
let now_us = Arc::clone(&now_us);
Arc::new(move || now_us.load(Ordering::SeqCst))
};
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync: SyncConfig {
every_n_events: 1,
..SyncConfig::default()
},
..StoreConfig::new("")
}
.with_clock(Some(clock));
let store = Store::open_with_cache(config, Box::new(cache)).expect("open store");
let coord = Coordinate::new("entity:maybe-stale-boundary", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let seeded: Option<MaybeStaleCounter> = store
.project("entity:maybe-stale-boundary", &Freshness::Consistent)
.expect("seed cache");
assert_eq!(seeded, Some(MaybeStaleCounter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
now_us.store(1_005_000, Ordering::SeqCst);
let result: Option<MaybeStaleCounter> = store
.project(
"entity:maybe-stale-boundary",
&Freshness::MaybeStale { max_stale_ms: 5 },
)
.expect("project maybe stale at boundary");
assert_eq!(
result,
Some(MaybeStaleCounter { count: 3 }),
"MAYBE STALE BOUNDARY HONESTY: when cache age equals max_stale_ms exactly, the strict '<' boundary must force replay rather than serve the stale row."
);
store.close().expect("close");
}
#[test]
fn nocache_ignores_put_and_always_returns_none() {
let cache = NoCache;
cache.put(b"short", b"x", test_meta()).expect("put");
let result = cache.get(b"short").expect("get");
assert!(
result.is_none(),
"CACHE METADATA: NoCache should return None regardless of what was put."
);
}
#[test]
fn nocache_prefetch_is_noop() {
let cache = NoCache;
let meta = test_meta();
let caps = cache.capabilities();
assert!(
!caps.supports_prefetch,
"NoCache must explicitly report that it does not support prefetch hints."
);
assert!(
caps.is_noop,
"NoCache must report itself as a no-op cache backend."
);
cache
.prefetch(b"any_key", meta)
.expect("NoCache::prefetch should not error — it's a no-op by default.");
}