use batpak::store::projection::{CacheMeta, NoCache, ProjectionCache};
fn test_meta() -> CacheMeta {
CacheMeta {
watermark: 42,
cached_at_us: 1_000_000,
}
}
#[test]
fn nocache_get_always_returns_none() {
let cache = NoCache;
let result = cache.get(b"any_key").expect("get should not error");
assert!(
result.is_none(),
"NoCache::get should always return None. Investigate: src/store/projection.rs NoCache."
);
}
#[test]
fn nocache_put_is_noop() {
let cache = NoCache;
cache
.put(b"key", b"value", test_meta())
.expect("put should not error");
let result = cache.get(b"key").expect("get");
assert!(result.is_none(), "NoCache should not store anything.");
}
#[test]
fn nocache_delete_prefix_returns_zero() {
let cache = NoCache;
let count = cache.delete_prefix(b"prefix").expect("delete_prefix");
assert_eq!(count, 0, "NoCache::delete_prefix should return 0.");
}
#[test]
fn nocache_sync_is_noop() {
let cache = NoCache;
cache.sync().expect("NoCache::sync should not error.");
}
#[cfg(feature = "redb")]
mod redb_tests {
use super::*;
use batpak::store::projection::RedbCache;
use tempfile::TempDir;
fn redb_cache() -> (RedbCache, TempDir) {
let dir = TempDir::new().expect("temp dir");
let path = dir.path().join("test.redb");
let cache = RedbCache::open(&path).expect("open redb");
(cache, dir)
}
#[test]
fn redb_get_put_round_trip() {
let (cache, _dir) = redb_cache();
let meta = test_meta();
cache.put(b"key1", b"hello", meta.clone()).expect("put");
let (value, returned_meta) = cache.get(b"key1").expect("get").expect("should be Some");
assert_eq!(
value, b"hello",
"RedbCache round-trip failed. Investigate: src/store/projection.rs RedbCache."
);
assert_eq!(
returned_meta.watermark, 42,
"REDB ROUND-TRIP META WATERMARK: watermark should be preserved across put/get.\n\
Investigate: src/store/projection.rs RedbCache::put and RedbCache::get.\n\
Common causes: CacheMeta serialization losing watermark field.\n\
Run: cargo test --test projection_cache redb_get_put_round_trip"
);
assert_eq!(
returned_meta.cached_at_us, 1_000_000,
"REDB ROUND-TRIP META CACHED_AT: cached_at_us should be preserved across put/get.\n\
Investigate: src/store/projection.rs RedbCache::put and RedbCache::get.\n\
Common causes: CacheMeta serialization losing cached_at_us field.\n\
Run: cargo test --test projection_cache redb_get_put_round_trip"
);
assert!(
cache.get(b"nonexistent").expect("get").is_none(),
"REDB ROUND-TRIP: get for a key that was never inserted should return None.\n\
Investigate: src/store/projection.rs RedbCache::get.\n\
Common causes: get returning stale data, missing key check logic.\n\
Run: cargo test --test projection_cache redb_get_put_round_trip"
);
}
#[test]
fn redb_delete_prefix() {
let (cache, _dir) = redb_cache();
let meta = test_meta();
cache.put(b"user:1", b"alice", meta.clone()).expect("put");
cache.put(b"user:2", b"bob", meta.clone()).expect("put");
cache.put(b"order:1", b"widget", meta.clone()).expect("put");
let deleted = cache.delete_prefix(b"user:").expect("delete_prefix");
assert_eq!(deleted, 2, "Should delete 2 keys with prefix 'user:'.");
assert!(
cache.get(b"user:1").expect("get").is_none(),
"REDB DELETE PREFIX: key 'user:1' should be gone after delete_prefix('user:').\n\
Investigate: src/store/projection.rs RedbCache::delete_prefix.\n\
Common causes: prefix scan not matching key, deletion not committed.\n\
Run: cargo test --test projection_cache redb_delete_prefix"
);
assert!(
cache.get(b"user:2").expect("get").is_none(),
"REDB DELETE PREFIX: key 'user:2' should be gone after delete_prefix('user:').\n\
Investigate: src/store/projection.rs RedbCache::delete_prefix.\n\
Common causes: prefix scan stopping early, deletion not committed.\n\
Run: cargo test --test projection_cache redb_delete_prefix"
);
assert!(
cache.get(b"order:1").expect("get").is_some(),
"REDB DELETE PREFIX: key 'order:1' should survive delete_prefix('user:').\n\
Investigate: src/store/projection.rs RedbCache::delete_prefix.\n\
Common causes: prefix matching too broad, deleting keys that don't share the prefix.\n\
Run: cargo test --test projection_cache redb_delete_prefix"
);
}
#[test]
fn redb_sync_is_safe() {
let (cache, _dir) = redb_cache();
cache.sync().expect("RedbCache::sync should not error.");
}
use batpak::prelude::*;
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct Counter {
count: u32,
}
impl EventSourced<serde_json::Value> for Counter {
fn from_events(events: &[batpak::prelude::Event<serde_json::Value>]) -> Option<Self> {
Some(Counter {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &batpak::prelude::Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[]
}
}
#[test]
fn redb_projection_round_trip() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("cache.redb");
let cache = RedbCache::open(&cache_path).expect("open redb cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync_every_n_events: 1,
..StoreConfig::new("")
};
let store =
Store::open_with_cache(config, Box::new(cache)).expect("open store with redb cache");
let coord = Coordinate::new("entity:redb1", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let result: Option<Counter> = store
.project("entity:redb1", &Freshness::Consistent)
.expect("project");
assert_eq!(
result,
Some(Counter { count: 2 }),
"REDB PROJECTION ROUND-TRIP: first project should replay 2 events.\n\
Investigate: src/store/mod.rs project, src/store/projection.rs RedbCache.\n\
Run: cargo test --features redb --test projection_cache redb_projection_round_trip"
);
let result2: Option<Counter> = store
.project("entity:redb1", &Freshness::Consistent)
.expect("project 2");
assert_eq!(result2, Some(Counter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
let result3: Option<Counter> = store
.project("entity:redb1", &Freshness::Consistent)
.expect("project 3");
assert_eq!(
result3,
Some(Counter { count: 3 }),
"REDB CACHE INVALIDATION: after appending more events, project should re-replay.\n\
Investigate: src/store/mod.rs project watermark comparison.\n\
Run: cargo test --features redb --test projection_cache redb_projection_round_trip"
);
store.close().expect("close");
}
}
#[cfg(feature = "lmdb")]
mod lmdb_tests {
use super::*;
use batpak::store::projection::LmdbCache;
use tempfile::TempDir;
fn lmdb_cache() -> (LmdbCache, TempDir) {
let dir = TempDir::new().expect("temp dir");
let path = dir.path().join("lmdb");
let cache = LmdbCache::open(&path, 10 * 1024 * 1024).expect("open lmdb");
(cache, dir)
}
#[test]
fn lmdb_get_put_round_trip() {
let (cache, _dir) = lmdb_cache();
let meta = test_meta();
assert!(cache.get(b"key1").expect("get").is_none());
cache.put(b"key1", b"hello", meta.clone()).expect("put");
let (value, returned_meta) = cache.get(b"key1").expect("get").expect("should be Some");
assert_eq!(
value, b"hello",
"LMDB ROUND-TRIP VALUE: value should be preserved across put/get.\n\
Investigate: src/store/projection.rs LmdbCache::put and LmdbCache::get.\n\
Common causes: value bytes not written or read correctly from LMDB.\n\
Run: cargo test --test projection_cache lmdb_get_put_round_trip"
);
assert_eq!(
returned_meta.watermark, 42,
"LMDB ROUND-TRIP META WATERMARK: watermark should be preserved across put/get.\n\
Investigate: src/store/projection.rs LmdbCache::put and LmdbCache::get.\n\
Common causes: CacheMeta serialization losing watermark field.\n\
Run: cargo test --test projection_cache lmdb_get_put_round_trip"
);
}
#[test]
fn lmdb_delete_prefix() {
let (cache, _dir) = lmdb_cache();
let meta = test_meta();
cache.put(b"user:1", b"alice", meta.clone()).expect("put");
cache.put(b"user:2", b"bob", meta.clone()).expect("put");
cache.put(b"order:1", b"widget", meta.clone()).expect("put");
let deleted = cache.delete_prefix(b"user:").expect("delete_prefix");
assert_eq!(
deleted, 2,
"LMDB DELETE PREFIX: should delete exactly 2 keys with prefix 'user:'.\n\
Investigate: src/store/projection.rs LmdbCache::delete_prefix.\n\
Common causes: prefix scan not matching both keys, count not incremented correctly.\n\
Run: cargo test --test projection_cache lmdb_delete_prefix"
);
assert!(
cache.get(b"user:1").expect("get").is_none(),
"LMDB DELETE PREFIX: key 'user:1' should be gone after delete_prefix('user:').\n\
Investigate: src/store/projection.rs LmdbCache::delete_prefix.\n\
Common causes: prefix scan not matching key, deletion not committed.\n\
Run: cargo test --test projection_cache lmdb_delete_prefix"
);
assert!(
cache.get(b"order:1").expect("get").is_some(),
"LMDB DELETE PREFIX: key 'order:1' should survive delete_prefix('user:').\n\
Investigate: src/store/projection.rs LmdbCache::delete_prefix.\n\
Common causes: prefix matching too broad, deleting keys outside the prefix.\n\
Run: cargo test --test projection_cache lmdb_delete_prefix"
);
}
#[test]
fn lmdb_sync() {
let (cache, _dir) = lmdb_cache();
cache.sync().expect("LmdbCache::sync should not error.");
}
use batpak::prelude::*;
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct Counter {
count: u32,
}
impl EventSourced<serde_json::Value> for Counter {
fn from_events(events: &[batpak::prelude::Event<serde_json::Value>]) -> Option<Self> {
Some(Counter {
count: u32::try_from(events.len()).expect("test uses < 2^32 events"),
})
}
fn apply_event(&mut self, _event: &batpak::prelude::Event<serde_json::Value>) {
self.count += 1;
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[]
}
}
#[test]
fn lmdb_projection_round_trip() {
let dir = TempDir::new().expect("temp dir");
let cache_path = dir.path().join("lmdb_cache");
let cache = LmdbCache::open(&cache_path, 10 * 1024 * 1024).expect("open lmdb cache");
let config = StoreConfig {
data_dir: dir.path().join("data"),
segment_max_bytes: 4096,
sync_every_n_events: 1,
..StoreConfig::new("")
};
let store =
Store::open_with_cache(config, Box::new(cache)).expect("open store with lmdb cache");
let coord = Coordinate::new("entity:lmdb1", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 1);
store
.append(&coord, kind, &serde_json::json!({"x": 1}))
.expect("append 1");
store
.append(&coord, kind, &serde_json::json!({"x": 2}))
.expect("append 2");
let result: Option<Counter> = store
.project("entity:lmdb1", &Freshness::Consistent)
.expect("project");
assert_eq!(
result,
Some(Counter { count: 2 }),
"LMDB PROJECTION ROUND-TRIP: first project should replay 2 events.\n\
Investigate: src/store/mod.rs project, src/store/projection.rs LmdbCache.\n\
Run: cargo test --features lmdb --test projection_cache lmdb_projection_round_trip"
);
let result2: Option<Counter> = store
.project("entity:lmdb1", &Freshness::Consistent)
.expect("project 2");
assert_eq!(result2, Some(Counter { count: 2 }));
store
.append(&coord, kind, &serde_json::json!({"x": 3}))
.expect("append 3");
let result3: Option<Counter> = store
.project("entity:lmdb1", &Freshness::Consistent)
.expect("project 3");
assert_eq!(
result3,
Some(Counter { count: 3 }),
"LMDB CACHE INVALIDATION: after appending more events, project should re-replay.\n\
Investigate: src/store/mod.rs project watermark comparison.\n\
Run: cargo test --features lmdb --test projection_cache lmdb_projection_round_trip"
);
store.close().expect("close");
}
}