#![allow(
clippy::disallowed_methods, // concurrent tests use thread::spawn for stress probes
clippy::panic, // test assertions
clippy::clone_on_ref_ptr, // Arc::clone style preference
clippy::field_reassign_with_default
)]
use batpak::prelude::*;
use std::io::Write;
use tempfile::TempDir;
fn test_store(dir: &TempDir) -> Store {
let mut config = StoreConfig::new(dir.path());
config.segment_max_bytes = 64 * 1024;
Store::open(config).expect("open store")
}
fn test_coord() -> Coordinate {
Coordinate::new("entity:test", "scope:test").expect("coord")
}
#[test]
fn frame_decode_too_short() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let buf = [0u8; 7]; match frame_decode(&buf) {
Err(FrameDecodeError::TooShort) => {}
other => panic!("expected TooShort, got {other:?}"),
}
}
#[test]
fn frame_decode_truncated() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let mut buf = vec![0u8; 12];
buf[0..4].copy_from_slice(&100u32.to_be_bytes()); buf[4..8].copy_from_slice(&0u32.to_be_bytes()); match frame_decode(&buf) {
Err(FrameDecodeError::Truncated {
expected_len,
available,
}) => {
assert_eq!(expected_len, 108);
assert_eq!(available, 12);
}
other => panic!("expected Truncated, got {other:?}"),
}
}
#[test]
fn frame_decode_crc_mismatch() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let payload = b"hello";
#[allow(clippy::cast_possible_truncation)]
let len = payload.len() as u32;
let bad_crc = 0xDEADBEEFu32;
let mut buf = Vec::new();
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(&bad_crc.to_be_bytes());
buf.extend_from_slice(payload);
match frame_decode(&buf) {
Err(FrameDecodeError::CrcMismatch { .. }) => {}
other => panic!("expected CrcMismatch, got {other:?}"),
}
}
#[test]
fn frame_decode_valid_round_trip() {
use batpak::store::segment::{frame_decode, frame_encode};
let data = "test_data";
let frame = frame_encode(&data).expect("encode");
let (msgpack, consumed) = frame_decode(&frame).expect("decode");
assert_eq!(consumed, frame.len());
let decoded: String = rmp_serde::from_slice(msgpack).expect("deserialize");
assert_eq!(decoded, "test_data");
}
#[test]
fn subscription_recv_returns_none_on_store_drop() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let region = Region::entity("entity:test");
let sub = store.subscribe(®ion);
let handle = std::thread::spawn(move || sub.recv());
std::thread::sleep(std::time::Duration::from_millis(50));
drop(store);
let result = handle.join().expect("thread join");
assert!(
result.is_none(),
"recv should return None when store is dropped"
);
}
#[test]
fn subscription_filters_by_region_in_recv_loop() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let kind = EventKind::custom(1, 1);
let coord_a = Coordinate::new("entity:a", "scope:test").expect("coord");
let coord_b = Coordinate::new("entity:b", "scope:test").expect("coord");
let region = Region::entity("entity:a");
let sub = store.subscribe(®ion);
store.append(&coord_b, kind, &"ignored").expect("append b");
store.append(&coord_a, kind, &"wanted").expect("append a");
let notif = sub.recv().expect("should get notification");
assert_eq!(notif.coord.entity(), "entity:a");
}
#[test]
fn store_drop_without_close_persists_data() {
let dir = TempDir::new().expect("tmpdir");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
{
let store = test_store(&dir);
store.append(&coord, kind, &"event1").expect("append");
store.sync().expect("sync");
}
let store = test_store(&dir);
let events = store.stream("entity:test");
assert_eq!(events.len(), 1, "event should survive drop-without-close");
}
#[test]
fn segment_max_bytes_very_small_forces_frequent_rotation() {
let dir = TempDir::new().expect("tmpdir");
let mut config = StoreConfig::new(dir.path());
config.segment_max_bytes = 128; let store = Store::open(config).expect("open");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
for i in 0..5 {
store
.append(&coord, kind, &format!("event_{i}"))
.expect("append");
}
store.sync().expect("sync");
let events = store.stream("entity:test");
assert_eq!(
events.len(),
5,
"all events should survive frequent rotation"
);
}
#[test]
fn concurrent_appends_same_entity_all_persisted() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let coord = test_coord();
let kind = EventKind::custom(1, 1);
let store = std::sync::Arc::new(store);
let n_threads = 4;
let n_per_thread = 25;
let handles: Vec<_> = (0..n_threads)
.map(|t| {
let s = store.clone();
let c = coord.clone();
std::thread::spawn(move || {
for i in 0..n_per_thread {
s.append(&c, kind, &format!("t{t}_e{i}")).expect("append");
}
})
})
.collect();
for h in handles {
h.join().expect("thread join");
}
store.sync().expect("sync");
let events = store.stream("entity:test");
assert_eq!(
events.len(),
n_threads * n_per_thread,
"all concurrent appends should be persisted"
);
let mut global_seqs: Vec<u64> = events.iter().map(|e| e.global_sequence).collect();
global_seqs.sort();
global_seqs.dedup();
assert_eq!(
global_seqs.len(),
n_threads * n_per_thread,
"all global sequences should be unique"
);
}
#[test]
fn compact_skips_when_below_min_segments() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let coord = test_coord();
let kind = EventKind::custom(1, 1);
store.append(&coord, kind, &"e1").expect("append");
store.sync().expect("sync");
let mut compact_config = CompactionConfig::default();
compact_config.min_segments = 10; let result = store.compact(&compact_config).expect("compact");
assert_eq!(
result.segments_removed, 0,
"should skip compaction below min_segments"
);
}
#[test]
fn scan_recovers_events_before_corruption() {
let dir = TempDir::new().expect("tmpdir");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
{
let store = test_store(&dir);
for i in 0..5 {
store
.append(&coord, kind, &format!("event_{i}"))
.expect("append");
}
store.sync().expect("sync");
store.close().expect("close");
}
let mut segments: Vec<_> = std::fs::read_dir(dir.path())
.expect("readdir")
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == "fbat")
.unwrap_or(false)
})
.collect();
assert!(!segments.is_empty(), "should have segment files");
let seg_path = segments.remove(0).path();
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&seg_path)
.expect("open segment");
f.write_all(&[0xFF; 64]).expect("write garbage");
drop(f);
let store = test_store(&dir);
let events = store.stream("entity:test");
assert!(
events.len() >= 3,
"should recover at least some events before corruption, got {}",
events.len()
);
}