#![allow(
clippy::disallowed_methods, // concurrent tests use thread::spawn for stress probes
clippy::panic, // test assertions
clippy::clone_on_ref_ptr, // Arc::clone style preference
clippy::field_reassign_with_default
)]
use batpak::prelude::*;
use std::io::Write;
use tempfile::TempDir;
mod common;
use common::medium_segment_store as test_store;
use common::test_coord;
#[test]
fn frame_decode_too_short() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let buf = [0u8; 7]; match frame_decode(&buf) {
Err(FrameDecodeError::TooShort) => {}
other => panic!("expected TooShort, got {other:?}"),
}
}
#[test]
fn frame_decode_truncated() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let mut buf = vec![0u8; 12];
buf[0..4].copy_from_slice(&100u32.to_be_bytes()); buf[4..8].copy_from_slice(&0u32.to_be_bytes()); match frame_decode(&buf) {
Err(FrameDecodeError::Truncated {
expected_len,
available,
}) => {
assert_eq!(expected_len, 108);
assert_eq!(available, 12);
}
other => panic!("expected Truncated, got {other:?}"),
}
}
#[test]
fn frame_decode_crc_mismatch() {
use batpak::store::segment::{frame_decode, FrameDecodeError};
let payload = b"hello";
#[allow(clippy::cast_possible_truncation)]
let len = payload.len() as u32;
let bad_crc = 0xDEADBEEFu32;
let mut buf = Vec::new();
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(&bad_crc.to_be_bytes());
buf.extend_from_slice(payload);
match frame_decode(&buf) {
Err(FrameDecodeError::CrcMismatch { .. }) => {}
other => panic!("expected CrcMismatch, got {other:?}"),
}
}
#[test]
fn frame_decode_valid_round_trip() {
use batpak::store::segment::{frame_decode, frame_encode};
let data = "test_data";
let frame = frame_encode(&data).expect("encode");
let (msgpack, consumed) = frame_decode(&frame).expect("decode");
assert_eq!(consumed, frame.len());
let decoded: String = rmp_serde::from_slice(msgpack).expect("deserialize");
assert_eq!(decoded, "test_data");
}
#[test]
fn append_frames_from_segment_copies_frame_bytes_exactly() {
use batpak::store::segment::{frame_encode, Segment, SEGMENT_MAGIC};
fn frame_bytes(path: &std::path::Path) -> Vec<u8> {
let bytes = std::fs::read(path).expect("read segment");
assert_eq!(
&bytes[..4],
SEGMENT_MAGIC,
"segment should start with FBAT magic"
);
let header_len = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]) as usize;
bytes[(8 + header_len)..].to_vec()
}
let dir = TempDir::new().expect("tmpdir");
let source_path;
{
let mut source = Segment::create(dir.path(), 1).expect("create source segment");
let frame_a = frame_encode(&serde_json::json!({"a": 1})).expect("encode frame a");
let frame_b = frame_encode(&serde_json::json!({"b": 2})).expect("encode frame b");
source.write_frame(&frame_a).expect("write frame a");
source.write_frame(&frame_b).expect("write frame b");
source
.sync_with_mode(&SyncMode::SyncData)
.expect("sync source");
source_path = source.path.clone();
let _sealed = source.seal();
}
let destination_path;
{
let mut destination = Segment::create(dir.path(), 2).expect("create destination segment");
destination
.append_frames_from_segment(&source_path)
.expect("append frames");
destination
.sync_with_mode(&SyncMode::SyncData)
.expect("sync destination");
destination_path = destination.path.clone();
let _sealed = destination.seal();
}
assert_eq!(
frame_bytes(&destination_path),
frame_bytes(&source_path),
"APPEND FRAMES: destination segment should contain exactly the source frame bytes after both headers are stripped."
);
}
#[test]
fn subscription_recv_returns_none_on_store_drop() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let region = Region::entity("entity:test");
let sub = store.subscribe_lossy(®ion);
let handle = std::thread::Builder::new()
.name("store-edge-sub-recv-block".into())
.spawn(move || sub.recv())
.expect("spawn subscription recv thread");
drop(store);
let result = handle.join().expect("thread join");
assert!(
result.is_none(),
"recv should return None when store is dropped"
);
}
#[test]
fn subscription_filters_by_region_in_recv_loop() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let kind = EventKind::custom(1, 1);
let coord_a = Coordinate::new("entity:a", "scope:test").expect("coord");
let coord_b = Coordinate::new("entity:b", "scope:test").expect("coord");
let region = Region::entity("entity:a");
let sub = store.subscribe_lossy(®ion);
store.append(&coord_b, kind, &"ignored").expect("append b");
store.append(&coord_a, kind, &"wanted").expect("append a");
let notif = sub.recv().expect("should get notification");
assert_eq!(notif.coord.entity(), "entity:a");
}
#[test]
fn store_drop_without_close_persists_data() {
let dir = TempDir::new().expect("tmpdir");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
{
let store = test_store(&dir);
store.append(&coord, kind, &"event1").expect("append");
store.sync().expect("sync");
}
let store = test_store(&dir);
let events = store.stream("entity:test");
assert_eq!(events.len(), 1, "event should survive drop-without-close");
}
#[test]
fn segment_max_bytes_very_small_forces_frequent_rotation() {
let dir = TempDir::new().expect("tmpdir");
let mut config = StoreConfig::new(dir.path());
config.segment_max_bytes = 128; let store = Store::open(config).expect("open");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
for i in 0..5 {
store
.append(&coord, kind, &format!("event_{i}"))
.expect("append");
}
store.sync().expect("sync");
let events = store.stream("entity:test");
assert_eq!(
events.len(),
5,
"all events should survive frequent rotation"
);
}
#[test]
fn single_append_payload_over_limit_is_rejected_cleanly() {
let dir = TempDir::new().expect("tmpdir");
let config = StoreConfig::new(dir.path()).with_single_append_max_bytes(8);
let store = Store::open(config).expect("open");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
let payload = "this payload is larger than eight bytes";
let err = match store.append(&coord, kind, &payload) {
Ok(_) => panic!("PROPERTY: oversized payload should not append successfully"),
Err(err) => err,
};
assert!(
matches!(err, StoreError::Configuration(ref msg) if msg.contains("single append bytes")),
"expected Configuration payload-limit error, got {err:?}"
);
}
#[test]
fn single_append_payload_at_exact_limit_succeeds() {
let dir = TempDir::new().expect("tmpdir");
let limit: u32 = 64;
let config = StoreConfig::new(dir.path()).with_single_append_max_bytes(limit);
let store = Store::open(config).expect("open");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
let mut s = String::new();
loop {
let bytes = rmp_serde::to_vec_named(&s).expect("serialize");
if bytes.len() == limit as usize {
break;
}
if bytes.len() > limit as usize {
panic!("overshot: could not construct a payload of exactly {limit} msgpack bytes");
}
s.push('A');
}
store
.append(&coord, kind, &s)
.expect("PROPERTY: payload of exactly single_append_max_bytes must be accepted");
s.push('B');
let result = store.append(&coord, kind, &s);
assert!(
result.is_err(),
"PROPERTY: payload exceeding single_append_max_bytes must be rejected"
);
}
#[test]
fn coordinate_component_length_limit_is_enforced() {
let long = "x".repeat(batpak::coordinate::MAX_COORDINATE_COMPONENT_LEN + 1);
let entity_err = match Coordinate::new(&long, "scope:test") {
Ok(_) => panic!("PROPERTY: overlong entity should be rejected"),
Err(err) => err,
};
assert!(
matches!(entity_err, CoordinateError::EntityTooLong { .. }),
"expected EntityTooLong, got {entity_err:?}"
);
let scope_err = match Coordinate::new("entity:test", &long) {
Ok(_) => panic!("PROPERTY: overlong scope should be rejected"),
Err(err) => err,
};
assert!(
matches!(scope_err, CoordinateError::ScopeTooLong { .. }),
"expected ScopeTooLong, got {scope_err:?}"
);
}
#[cfg(unix)]
#[test]
fn close_rejects_checkpoint_symlink_leaf() {
use std::os::unix::fs::symlink;
let dir = TempDir::new().expect("tmpdir");
let config = StoreConfig::new(dir.path())
.with_enable_checkpoint(true)
.with_enable_mmap_index(false);
let store = Store::open(config).expect("open");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
store.append(&coord, kind, &"event").expect("append");
let target = dir.path().join("attacker-target.ckpt");
std::fs::write(&target, b"sentinel").expect("write target");
let checkpoint_path = dir.path().join("index.ckpt");
symlink(&target, &checkpoint_path).expect("create checkpoint symlink");
let err = match store.close() {
Ok(_) => panic!("PROPERTY: close should reject checkpoint symlink leaf"),
Err(err) => err,
};
assert!(
matches!(err, StoreError::Io(ref io) if io.kind() == std::io::ErrorKind::InvalidInput),
"expected Io(InvalidInput), got {err:?}"
);
assert_eq!(
std::fs::read(&target).expect("read target"),
b"sentinel",
"checkpoint hardening must not clobber the symlink target"
);
}
#[cfg(unix)]
#[test]
fn open_with_native_cache_rejects_symlink_leaf() {
use std::os::unix::fs::symlink;
let dir = TempDir::new().expect("tmpdir");
let cache_real = dir.path().join("cache-real");
std::fs::create_dir_all(&cache_real).expect("create real cache dir");
let cache_link = dir.path().join("cache-link");
symlink(&cache_real, &cache_link).expect("create cache symlink");
let err = match Store::open_with_native_cache(StoreConfig::new(dir.path()), &cache_link) {
Ok(_) => panic!("PROPERTY: native cache root symlink should be rejected"),
Err(err) => err,
};
assert!(
matches!(err, StoreError::CacheFailed(_)),
"expected CacheFailed, got {err:?}"
);
}
#[test]
fn concurrent_appends_same_entity_all_persisted() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let coord = test_coord();
let kind = EventKind::custom(1, 1);
let store = std::sync::Arc::new(store);
let n_threads = 4;
let n_per_thread = 25;
let handles: Vec<_> = (0..n_threads)
.map(|t| {
let s = std::sync::Arc::<Store>::clone(&store);
let c = coord.clone();
std::thread::Builder::new()
.name(format!("store-edge-concurrent-append-{t}"))
.spawn(move || {
for i in 0..n_per_thread {
s.append(&c, kind, &format!("t{t}_e{i}")).expect("append");
}
})
.expect("spawn concurrent append thread")
})
.collect();
for h in handles {
h.join().expect("thread join");
}
store.sync().expect("sync");
let events = store.stream("entity:test");
assert_eq!(
events.len(),
n_threads * n_per_thread,
"all concurrent appends should be persisted"
);
let mut global_seqs: Vec<u64> = events.iter().map(|e| e.global_sequence).collect();
global_seqs.sort();
global_seqs.dedup();
assert_eq!(
global_seqs.len(),
n_threads * n_per_thread,
"all global sequences should be unique"
);
}
#[test]
fn compact_skips_when_below_min_segments() {
let dir = TempDir::new().expect("tmpdir");
let store = test_store(&dir);
let coord = test_coord();
let kind = EventKind::custom(1, 1);
store.append(&coord, kind, &"e1").expect("append");
store.sync().expect("sync");
let mut compact_config = CompactionConfig::default();
compact_config.min_segments = 10; let result = store.compact(&compact_config).expect("compact");
assert_eq!(
result.segments_removed, 0,
"should skip compaction below min_segments"
);
}
#[test]
fn scan_recovers_events_before_corruption() {
let dir = TempDir::new().expect("tmpdir");
let coord = test_coord();
let kind = EventKind::custom(1, 1);
{
let store = test_store(&dir);
for i in 0..5 {
store
.append(&coord, kind, &format!("event_{i}"))
.expect("append");
}
store.sync().expect("sync");
store.close().expect("close");
}
let mut segments: Vec<_> = std::fs::read_dir(dir.path())
.expect("readdir")
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == "fbat")
.unwrap_or(false)
})
.collect();
segments.sort_by_key(|e| e.file_name());
assert!(!segments.is_empty(), "should have segment files");
let seg_path = segments.remove(0).path();
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&seg_path)
.expect("open segment");
f.write_all(&[0xFF; 64]).expect("write garbage");
drop(f);
let store = test_store(&dir);
let events = store.stream("entity:test");
assert!(
events.len() >= 3,
"should recover at least some events before corruption, got {}",
events.len()
);
}