use bytes::Bytes;
use kimberlite_crypto::ChainHash;
use kimberlite_types::{Offset, StreamId};
use tempfile::TempDir;
use crate::{MemoryStorage, OffsetIndex, Record, Storage, StorageBackend, StorageError};
#[test]
fn record_to_bytes_produces_correct_format() {
let record = Record::new(Offset::new(42), None, Bytes::from("hello"));
let bytes = record.to_bytes();
assert_eq!(bytes.len(), 63);
let start_sentinel = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
assert_eq!(start_sentinel, 0xBADC_0FFE);
let offset = u64::from_le_bytes(bytes[4..12].try_into().unwrap());
assert_eq!(offset, 42);
assert_eq!(&bytes[12..44], &[0u8; 32]);
assert_eq!(bytes[44], 0);
assert_eq!(bytes[45], 0);
let length = u32::from_le_bytes(bytes[46..50].try_into().unwrap());
assert_eq!(length, 5);
assert_eq!(&bytes[50..55], b"hello");
let stored_crc = u32::from_le_bytes(bytes[55..59].try_into().unwrap());
let computed_crc = kimberlite_crypto::crc32(&bytes[0..55]);
assert_eq!(stored_crc, computed_crc);
let end_sentinel = u32::from_le_bytes(bytes[59..63].try_into().unwrap());
assert_eq!(end_sentinel, 0xC0FF_EE42);
}
#[test]
fn record_roundtrip_preserves_data() {
let original = Record::new(Offset::new(123), None, Bytes::from("test payload"));
let bytes: Bytes = original.to_bytes().into();
let (parsed, consumed) = Record::from_bytes(&bytes).unwrap();
assert_eq!(parsed.offset(), Offset::new(123));
assert_eq!(parsed.prev_hash(), None);
assert_eq!(parsed.payload().as_ref(), b"test payload");
assert_eq!(consumed, bytes.len());
}
#[test]
fn record_roundtrip_with_prev_hash() {
let prev_hash = ChainHash::from_bytes(&[42u8; 32]);
let original = Record::new(Offset::new(1), Some(prev_hash), Bytes::from("linked"));
let bytes: Bytes = original.to_bytes().into();
let (parsed, _) = Record::from_bytes(&bytes).unwrap();
assert_eq!(parsed.offset(), Offset::new(1));
assert_eq!(parsed.prev_hash(), Some(prev_hash));
assert_eq!(parsed.payload().as_ref(), b"linked");
}
#[test]
fn record_from_bytes_detects_corruption() {
let record = Record::new(Offset::new(0), None, Bytes::from("data"));
let mut bytes: Vec<u8> = record.to_bytes();
bytes[50] ^= 0xFF;
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(matches!(result, Err(StorageError::CorruptedRecord)));
}
#[test]
fn record_from_bytes_handles_truncated_header() {
let short_data = Bytes::from(vec![0u8; 40]);
let result = Record::from_bytes(&short_data);
assert!(matches!(result, Err(StorageError::UnexpectedEof)));
}
#[test]
fn record_from_bytes_handles_truncated_payload() {
let mut data = Vec::new();
data.extend_from_slice(&0xBADC_0FFE_u32.to_le_bytes()); data.extend_from_slice(&0u64.to_le_bytes()); data.extend_from_slice(&[0u8; 32]); data.push(0); data.push(0); data.extend_from_slice(&100u32.to_le_bytes()); data.extend_from_slice(&[0u8; 50]);
let result = Record::from_bytes(&Bytes::from(data));
assert!(matches!(result, Err(StorageError::UnexpectedEof)));
}
#[test]
fn record_empty_payload() {
let record = Record::new(Offset::new(0), None, Bytes::new());
let bytes: Bytes = record.to_bytes().into();
let (parsed, _) = Record::from_bytes(&bytes).unwrap();
assert!(parsed.payload().is_empty());
}
#[test]
fn record_compute_hash_creates_chain() {
let record0 = Record::new(Offset::new(0), None, Bytes::from("hello"));
let hash0 = record0.compute_hash();
let record1 = Record::new(Offset::new(1), Some(hash0), Bytes::from("world"));
let hash1 = record1.compute_hash();
assert_ne!(hash0, hash1);
let record1_copy = Record::new(Offset::new(1), Some(hash0), Bytes::from("world"));
assert_eq!(record1_copy.compute_hash(), hash1);
}
#[test]
fn test_append_and_lookup() {
let mut index = OffsetIndex::new();
index.append(0);
index.append(100);
index.append(250);
assert_eq!(index.lookup(Offset::new(0)), Some(0));
assert_eq!(index.lookup(Offset::new(1)), Some(100));
assert_eq!(index.lookup(Offset::new(2)), Some(250));
assert_eq!(index.lookup(Offset::new(3)), None); assert_eq!(index.len(), 3);
assert!(!index.is_empty());
}
#[test]
fn test_save_and_load_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("test.idx");
let mut original = OffsetIndex::new();
original.append(0);
original.append(100);
original.append(250);
original.save(&index_path).expect("save succeeds");
let loaded = OffsetIndex::load(&index_path).expect("load succeeds");
assert_eq!(loaded.len(), 3);
assert_eq!(loaded.lookup(Offset::new(0)), Some(0));
assert_eq!(loaded.lookup(Offset::new(1)), Some(100));
assert_eq!(loaded.lookup(Offset::new(2)), Some(250));
}
#[test]
fn test_empty_index_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("empty.idx");
let original = OffsetIndex::new();
assert!(original.is_empty());
original.save(&index_path).expect("save succeeds");
let loaded = OffsetIndex::load(&index_path).expect("load succeeds");
assert!(loaded.is_empty());
assert_eq!(loaded.len(), 0);
}
#[test]
fn test_large_index_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("large.idx");
let mut original = OffsetIndex::new();
let mut byte_pos = 0u64;
for _ in 0..10_000 {
original.append(byte_pos);
byte_pos += 53; }
original.save(&index_path).expect("save succeeds");
let loaded = OffsetIndex::load(&index_path).expect("load succeeds");
assert_eq!(loaded.len(), 10_000);
assert_eq!(loaded.lookup(Offset::new(0)), Some(0));
assert_eq!(loaded.lookup(Offset::new(9999)), Some(9999 * 53));
}
#[test]
fn test_load_detects_invalid_magic() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("bad_magic.idx");
let mut data = vec![0u8; 20]; data[0..4].copy_from_slice(b"XXXX"); std::fs::write(&index_path, &data).unwrap();
let result = OffsetIndex::load(&index_path);
assert!(matches!(result, Err(StorageError::InvalidIndexMagic)));
}
#[test]
fn test_load_detects_unsupported_version() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("bad_version.idx");
let mut data = vec![0u8; 24];
data[0..4].copy_from_slice(b"VDXI");
data[4] = 0xFF; let crc = kimberlite_crypto::crc32(&data[0..20]);
data[20..24].copy_from_slice(&crc.to_le_bytes());
std::fs::write(&index_path, &data).unwrap();
let result = OffsetIndex::load(&index_path);
assert!(matches!(
result,
Err(StorageError::UnsupportedIndexVersion(0xFF))
));
}
#[test]
fn test_load_detects_truncated_file() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("truncated.idx");
let data = vec![0u8; 10];
std::fs::write(&index_path, &data).unwrap();
let result = OffsetIndex::load(&index_path);
assert!(matches!(
result,
Err(StorageError::IndexTruncated {
expected: 20,
actual: 10
})
));
}
#[test]
fn test_load_detects_truncated_positions() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("truncated_pos.idx");
let mut data = vec![0u8; 16 + (5 * 8) + 4]; data[0..4].copy_from_slice(b"VDXI");
data[4] = 0x01; data[8..16].copy_from_slice(&10u64.to_le_bytes()); let crc = kimberlite_crypto::crc32(&data[0..data.len() - 4]);
let crc_start = data.len() - 4;
data[crc_start..].copy_from_slice(&crc.to_le_bytes());
std::fs::write(&index_path, &data).unwrap();
let result = OffsetIndex::load(&index_path);
assert!(matches!(
result,
Err(StorageError::IndexTruncated {
expected: 100,
actual: 60
})
));
}
#[test]
fn test_load_detects_corrupted_crc() {
let temp_dir = TempDir::new().unwrap();
let index_path = temp_dir.path().join("corrupted.idx");
let mut index = OffsetIndex::new();
index.append(0);
index.append(100);
index.save(&index_path).expect("save succeeds");
let mut data = std::fs::read(&index_path).unwrap();
data[16] ^= 0xFF; std::fs::write(&index_path, &data).unwrap();
let result = OffsetIndex::load(&index_path);
assert!(matches!(
result,
Err(StorageError::IndexChecksumMismatch { .. })
));
}
#[test]
fn test_from_positions_creates_valid_index() {
let positions = vec![0, 100, 250, 400];
let index = OffsetIndex::from_positions(positions.clone());
assert_eq!(index.len(), 4);
assert_eq!(index.positions(), positions.as_slice());
assert_eq!(index.lookup(Offset::new(2)), Some(250));
}
#[test]
fn test_index_equality() {
let mut index1 = OffsetIndex::new();
index1.append(0);
index1.append(100);
let index2 = OffsetIndex::from_positions(vec![0, 100]);
assert_eq!(index1, index2);
}
#[test]
fn test_index_clone() {
let mut original = OffsetIndex::new();
original.append(0);
original.append(100);
let cloned = original.clone();
assert_eq!(original, cloned);
assert_eq!(cloned.lookup(Offset::new(1)), Some(100));
}
mod integration {
use super::*;
use tempfile::TempDir;
fn setup_storage() -> (Storage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let storage = Storage::new(temp_dir.path());
(storage, temp_dir)
}
fn test_events(count: usize) -> Vec<Bytes> {
(0..count)
.map(|i| Bytes::from(format!("event-{i}")))
.collect()
}
#[test]
fn append_and_read_single_event() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let (new_offset, _hash) = storage
.append_batch(stream_id, test_events(1), Offset::new(0), None, false)
.unwrap();
assert_eq!(new_offset, Offset::new(1));
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].as_ref(), b"event-0");
}
#[test]
fn append_and_read_multiple_events() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(5), Offset::new(0), None, false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 5);
(0..5).for_each(|i| {
assert_eq!(events[i].as_ref(), format!("event-{i}").as_bytes());
});
}
#[test]
fn read_from_middle_offset() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(5), u64::MAX)
.unwrap();
assert_eq!(events.len(), 5);
assert_eq!(events[0].as_ref(), b"event-5");
assert_eq!(events[4].as_ref(), b"event-9");
}
#[test]
fn read_respects_max_bytes() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let events: Vec<Bytes> = (0..10)
.map(|i| Bytes::from(format!("event-{i:04}"))) .collect();
storage
.append_batch(stream_id, events, Offset::new(0), None, false)
.unwrap();
let events = storage.read_from(stream_id, Offset::new(0), 25).unwrap();
assert!(events.len() < 10);
assert!(!events.is_empty());
}
#[test]
fn append_multiple_batches_sequential() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let (offset_after_batch1, hash_after_batch1) = storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
assert_eq!(offset_after_batch1, Offset::new(3));
let events2: Vec<Bytes> = vec![Bytes::from("batch2-0"), Bytes::from("batch2-1")];
let (offset_after_batch2, _) = storage
.append_batch(
stream_id,
events2,
Offset::new(3),
Some(hash_after_batch1),
false,
)
.unwrap();
assert_eq!(offset_after_batch2, Offset::new(5));
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 5);
assert_eq!(events[0].as_ref(), b"event-0");
assert_eq!(events[2].as_ref(), b"event-2");
assert_eq!(events[3].as_ref(), b"batch2-0");
assert_eq!(events[4].as_ref(), b"batch2-1");
}
#[test]
fn append_with_fsync() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let result = storage.append_batch(stream_id, test_events(1), Offset::new(0), None, true);
assert!(result.is_ok());
}
#[test]
fn multiple_streams_are_isolated() {
let (mut storage, _dir) = setup_storage();
let stream1 = StreamId::new(1);
let stream2 = StreamId::new(2);
storage
.append_batch(
stream1,
vec![Bytes::from("stream1-event")],
Offset::new(0),
None,
false,
)
.unwrap();
storage
.append_batch(
stream2,
vec![Bytes::from("stream2-event")],
Offset::new(0),
None,
false,
)
.unwrap();
let events1 = storage
.read_from(stream1, Offset::new(0), u64::MAX)
.unwrap();
let events2 = storage
.read_from(stream2, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events1.len(), 1);
assert_eq!(events1[0].as_ref(), b"stream1-event");
assert_eq!(events2.len(), 1);
assert_eq!(events2[0].as_ref(), b"stream2-event");
}
#[test]
fn hash_chain_is_built_correctly() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let (_, final_hash) = storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
let record0 = Record::new(Offset::new(0), None, Bytes::from("event-0"));
let hash0 = record0.compute_hash();
let record1 = Record::new(Offset::new(1), Some(hash0), Bytes::from("event-1"));
let hash1 = record1.compute_hash();
let record2 = Record::new(Offset::new(2), Some(hash1), Bytes::from("event-2"));
let hash2 = record2.compute_hash();
assert_eq!(final_hash, hash2);
}
#[test]
fn tampered_record_is_detected() {
let (mut storage, dir) = setup_storage();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
let segment_path = dir
.path()
.join(stream_id.to_string())
.join("segment_000000.log");
let mut data = std::fs::read(&segment_path).unwrap();
data[64] ^= 0xFF;
std::fs::write(&segment_path, &data).unwrap();
let result = storage.read_from(stream_id, Offset::new(0), u64::MAX);
assert!(result.is_err());
}
#[test]
fn read_records_returns_full_records() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
let records = storage
.read_records_from_genesis(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0].prev_hash(), None);
assert_eq!(records[0].offset(), Offset::new(0));
assert_eq!(records[1].prev_hash(), Some(records[0].compute_hash()));
assert_eq!(records[1].offset(), Offset::new(1));
assert_eq!(records[2].prev_hash(), Some(records[1].compute_hash()));
assert_eq!(records[2].offset(), Offset::new(2));
}
}
mod segment_rotation_tests {
use super::*;
use kimberlite_types::CheckpointPolicy;
use tempfile::TempDir;
fn setup_small_segment_storage() -> (Storage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let storage =
Storage::with_max_segment_size(temp_dir.path(), CheckpointPolicy::default(), 500);
(storage, temp_dir)
}
fn test_events(count: usize) -> Vec<Bytes> {
(0..count)
.map(|i| Bytes::from(format!("event-{i}")))
.collect()
}
#[test]
fn segment_rotates_when_size_exceeded() {
let (mut storage, _dir) = setup_small_segment_storage();
let stream_id = StreamId::new(1);
let (offset1, hash1) = storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
assert_eq!(offset1, Offset::new(10));
assert!(storage.segment_count(stream_id) >= 2);
let (_offset2, _hash2) = storage
.append_batch(stream_id, test_events(5), offset1, Some(hash1), false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 15);
assert_eq!(events[0].as_ref(), b"event-0");
assert_eq!(events[9].as_ref(), b"event-9");
assert_eq!(events[10].as_ref(), b"event-0"); assert_eq!(events[14].as_ref(), b"event-4");
}
#[test]
fn read_from_middle_across_segments() {
let (mut storage, _dir) = setup_small_segment_storage();
let stream_id = StreamId::new(1);
let (offset1, hash1) = storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
let (_offset2, _hash2) = storage
.append_batch(stream_id, test_events(5), offset1, Some(hash1), false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(12), u64::MAX)
.unwrap();
assert_eq!(events.len(), 3);
assert_eq!(events[0].as_ref(), b"event-2");
}
#[test]
fn hash_chain_integrity_across_segments() {
let (mut storage, _dir) = setup_small_segment_storage();
let stream_id = StreamId::new(1);
let (offset1, hash1) = storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
let (offset2, hash2) = storage
.append_batch(stream_id, test_events(10), offset1, Some(hash1), false)
.unwrap();
let (_offset3, _hash3) = storage
.append_batch(stream_id, test_events(10), offset2, Some(hash2), false)
.unwrap();
let records = storage
.read_records_from_genesis(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(records.len(), 30);
}
#[test]
fn completed_segments_are_listed() {
let (mut storage, _dir) = setup_small_segment_storage();
let stream_id = StreamId::new(1);
let (offset1, hash1) = storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
let (_offset2, _hash2) = storage
.append_batch(stream_id, test_events(5), offset1, Some(hash1), false)
.unwrap();
let completed = storage.completed_segments(stream_id);
assert!(!completed.is_empty());
assert!(completed.contains(&0));
}
#[test]
fn no_rotation_when_below_threshold() {
let temp_dir = TempDir::new().unwrap();
let mut storage = Storage::with_max_segment_size(
temp_dir.path(),
CheckpointPolicy::default(),
1024 * 1024 * 1024, );
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(100), Offset::new(0), None, false)
.unwrap();
assert_eq!(storage.segment_count(stream_id), 1);
}
}
mod checkpoint_tests {
use super::*;
use kimberlite_types::CheckpointPolicy;
use tempfile::TempDir;
fn setup_storage() -> (Storage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let storage = Storage::new(temp_dir.path());
(storage, temp_dir)
}
fn test_events(count: usize) -> Vec<Bytes> {
(0..count)
.map(|i| Bytes::from(format!("event-{i}")))
.collect()
}
#[test]
fn create_and_read_checkpoint() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let (next_offset, last_hash) = storage
.append_batch(stream_id, test_events(5), Offset::new(0), None, false)
.unwrap();
assert_eq!(next_offset, Offset::new(5));
let (cp_next_offset, _cp_hash) = storage
.create_checkpoint(stream_id, next_offset, Some(last_hash), 5, false)
.unwrap();
assert_eq!(cp_next_offset, Offset::new(6));
let last_cp = storage.last_checkpoint(stream_id).unwrap();
assert_eq!(last_cp, Some(Offset::new(5)));
}
#[test]
fn read_with_checkpoint_verification() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let (offset1, hash1) = storage
.append_batch(stream_id, test_events(5), Offset::new(0), None, false)
.unwrap();
let (offset2, hash2) = storage
.create_checkpoint(stream_id, offset1, Some(hash1), 5, false)
.unwrap();
storage
.append_batch(stream_id, test_events(5), offset2, Some(hash2), false)
.unwrap();
let records = storage
.read_records_verified(stream_id, Offset::new(7), u64::MAX)
.unwrap();
assert_eq!(records.len(), 4);
assert_eq!(records[0].offset(), Offset::new(7));
}
#[test]
fn checkpoint_policy_triggers() {
let temp_dir = TempDir::new().unwrap();
let policy = CheckpointPolicy::every(3);
let storage = Storage::with_checkpoint_policy(temp_dir.path(), policy);
assert_eq!(storage.checkpoint_policy().every_n_records, 3);
assert!(
storage
.checkpoint_policy()
.should_checkpoint(Offset::new(2))
); assert!(
!storage
.checkpoint_policy()
.should_checkpoint(Offset::new(3))
); }
#[test]
fn empty_stream_has_no_checkpoints() {
let (mut storage, _dir) = setup_storage();
let stream_id = StreamId::new(1);
let last_cp = storage.last_checkpoint(stream_id).unwrap();
assert_eq!(last_cp, None);
}
}
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn record_roundtrip_any_payload(payload in prop::collection::vec(any::<u8>(), 0..1000)) {
let record = Record::new(Offset::new(42), None, Bytes::from(payload.clone()));
let bytes: Bytes = record.to_bytes().into();
let (parsed, consumed) = Record::from_bytes(&bytes).unwrap();
prop_assert_eq!(parsed.offset(), Offset::new(42));
prop_assert_eq!(parsed.prev_hash(), None);
prop_assert_eq!(parsed.payload().as_ref(), payload.as_slice());
prop_assert_eq!(consumed, bytes.len());
}
#[test]
fn record_roundtrip_any_offset(offset in 0u64..u64::MAX) {
let record = Record::new(Offset::new(offset), None, Bytes::from("test"));
let bytes: Bytes = record.to_bytes().into();
let (parsed, _) = Record::from_bytes(&bytes).unwrap();
prop_assert_eq!(parsed.offset().as_u64(), offset);
}
#[test]
fn record_roundtrip_with_any_prev_hash(hash_bytes in prop::collection::vec(any::<u8>(), 32..=32)) {
let hash_arr: [u8; 32] = hash_bytes.try_into().unwrap();
let prev_hash = ChainHash::from_bytes(&hash_arr);
let record = Record::new(Offset::new(1), Some(prev_hash), Bytes::from("data"));
let bytes: Bytes = record.to_bytes().into();
let (parsed, _) = Record::from_bytes(&bytes).unwrap();
prop_assert_eq!(parsed.prev_hash(), Some(prev_hash));
}
#[test]
fn corruption_is_detected(
payload in prop::collection::vec(any::<u8>(), 1..100),
flip_pos in 0usize..1000
) {
let record = Record::new(Offset::new(0), None, Bytes::from(payload));
let mut bytes = record.to_bytes();
let max_pos = bytes.len().saturating_sub(4); if max_pos > 0 {
let actual_pos = flip_pos % max_pos;
bytes[actual_pos] ^= 1;
let result = Record::from_bytes(&Bytes::from(bytes));
prop_assert!(result.is_err());
}
}
#[test]
fn hash_chain_detects_tampering(
payloads in prop::collection::vec(prop::collection::vec(any::<u8>(), 10..100), 3..10),
tamper_index in 0usize..9
) {
let mut records = Vec::new();
let mut prev_hash = None;
for (i, payload) in payloads.iter().enumerate() {
let record = Record::new(Offset::new(i as u64), prev_hash, Bytes::from(payload.clone()));
prev_hash = Some(record.compute_hash());
records.push(record);
}
let actual_tamper_index = tamper_index % records.len().saturating_sub(1);
if actual_tamper_index < records.len() {
let mut tampered_payload = records[actual_tamper_index].payload().to_vec();
if !tampered_payload.is_empty() {
tampered_payload[0] ^= 1; }
let tampered_record = Record::new(
records[actual_tamper_index].offset(),
records[actual_tamper_index].prev_hash(),
Bytes::from(tampered_payload),
);
prop_assert_ne!(
tampered_record.compute_hash(),
records[actual_tamper_index].compute_hash(),
"Tampering should change record hash"
);
if actual_tamper_index + 1 < records.len() {
let next_record = &records[actual_tamper_index + 1];
prop_assert_ne!(
next_record.prev_hash(),
Some(tampered_record.compute_hash()),
"Next record's prev_hash should not match tampered hash"
);
}
}
}
#[test]
fn partial_write_detected(
payload in prop::collection::vec(any::<u8>(), 64..1024),
truncate_bytes in 1usize..1023
) {
let record = Record::new(Offset::new(0), None, Bytes::from(payload));
let bytes = record.to_bytes();
let truncate_at = truncate_bytes % bytes.len();
if truncate_at > 0 && truncate_at < bytes.len() {
let truncated = Bytes::from(bytes[..truncate_at].to_vec());
let result = Record::from_bytes(&truncated);
prop_assert!(result.is_err(), "Truncated record should fail to parse");
}
}
#[test]
fn multiple_bit_flips_detected(
payload in prop::collection::vec(any::<u8>(), 32..256),
flip_positions in prop::collection::vec(0usize..300, 2..5)
) {
let record = Record::new(Offset::new(0), None, Bytes::from(payload));
let mut bytes = record.to_bytes();
let max_pos = bytes.len().saturating_sub(4); if max_pos > 0 {
let unique_positions: Vec<usize> = flip_positions.iter()
.map(|p| p % max_pos)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
if unique_positions.len() >= 2 {
for actual_pos in &unique_positions {
bytes[*actual_pos] ^= 1;
}
let result = Record::from_bytes(&Bytes::from(bytes));
prop_assert!(result.is_err(), "Multiple bit flips should be detected");
}
}
}
#[test]
fn byte_substitution_detected(
payload in prop::collection::vec(any::<u8>(), 32..256),
substitute_pos in 0usize..300,
new_value in any::<u8>()
) {
let record = Record::new(Offset::new(0), None, Bytes::from(payload));
let mut bytes = record.to_bytes();
let max_pos = bytes.len().saturating_sub(4); if max_pos > 0 {
let actual_pos = substitute_pos % max_pos;
let old_value = bytes[actual_pos];
if new_value != old_value {
bytes[actual_pos] = new_value;
let result = Record::from_bytes(&Bytes::from(bytes));
prop_assert!(result.is_err(), "Byte substitution should be detected");
}
}
}
}
}
#[test]
fn torn_write_missing_record_start_sentinel() {
let record = Record::new(Offset::new(42), None, Bytes::from("test"));
let mut bytes = record.to_bytes();
bytes[0] = 0xFF;
bytes[1] = 0xFF;
bytes[2] = 0xFF;
bytes[3] = 0xFF;
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(matches!(result, Err(StorageError::TornWrite { .. })));
if let Err(StorageError::TornWrite { reason }) = result {
assert!(reason.contains("RECORD_START"));
}
}
#[test]
fn torn_write_missing_record_end_sentinel() {
let record = Record::new(Offset::new(42), None, Bytes::from("test"));
let mut bytes = record.to_bytes();
let len = bytes.len();
bytes[len - 4] = 0xFF;
bytes[len - 3] = 0xFF;
bytes[len - 2] = 0xFF;
bytes[len - 1] = 0xFF;
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(matches!(result, Err(StorageError::TornWrite { .. })));
if let Err(StorageError::TornWrite { reason }) = result {
assert!(reason.contains("RECORD_END"));
}
}
#[test]
fn torn_write_truncated_record_missing_end_sentinel() {
let record = Record::new(Offset::new(42), None, Bytes::from("test"));
let mut bytes = record.to_bytes();
bytes.truncate(bytes.len() - 4);
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(matches!(result, Err(StorageError::UnexpectedEof)));
}
#[test]
fn torn_write_truncated_record_mid_payload() {
let record = Record::new(Offset::new(42), None, Bytes::from("hello world"));
let mut bytes = record.to_bytes();
bytes.truncate(55);
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(matches!(result, Err(StorageError::UnexpectedEof)));
}
#[test]
fn torn_write_detection_with_valid_record() {
let record = Record::new(Offset::new(42), None, Bytes::from("test"));
let bytes = record.to_bytes();
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(result.is_ok());
let (parsed, _) = result.unwrap();
assert_eq!(parsed.offset(), Offset::new(42));
assert_eq!(parsed.payload(), &Bytes::from("test"));
}
#[test]
fn torn_write_detection_preserves_hash_chain() {
let prev_hash = Some(kimberlite_crypto::chain_hash(None, b"previous record"));
let record = Record::new(Offset::new(100), prev_hash, Bytes::from("data"));
let bytes = record.to_bytes();
let (parsed, _) = Record::from_bytes(&Bytes::from(bytes)).unwrap();
assert_eq!(parsed.prev_hash(), prev_hash);
assert_eq!(parsed.offset(), Offset::new(100));
}
#[test]
fn torn_write_large_payload() {
let large_payload = vec![0xAB; 10000];
let record = Record::new(Offset::new(999), None, Bytes::from(large_payload.clone()));
let bytes = record.to_bytes();
let mut corrupted = bytes.clone();
let len = corrupted.len();
corrupted[len - 1] = 0xFF;
let result = Record::from_bytes(&Bytes::from(corrupted));
assert!(matches!(result, Err(StorageError::TornWrite { .. })));
let (parsed, _) = Record::from_bytes(&Bytes::from(bytes)).unwrap();
assert_eq!(parsed.payload(), &Bytes::from(large_payload));
}
#[test]
fn torn_write_empty_payload() {
let record = Record::new(Offset::new(1), None, Bytes::from(""));
let bytes = record.to_bytes();
let mut corrupted = bytes.clone();
corrupted[0] = 0x00;
let result = Record::from_bytes(&Bytes::from(corrupted));
assert!(matches!(result, Err(StorageError::TornWrite { .. })));
let result = Record::from_bytes(&Bytes::from(bytes));
assert!(result.is_ok());
}
#[test]
fn latest_chain_hash_empty_stream_returns_none() {
let temp_dir = TempDir::new().expect("temp dir");
let mut storage = Storage::new(temp_dir.path());
let stream_id = StreamId::new(1);
let result = storage
.latest_chain_hash(stream_id)
.expect("latest_chain_hash should not error on unknown stream");
assert!(result.is_none(), "empty stream must return None");
}
#[test]
fn latest_chain_hash_recovers_after_restart() {
let temp_dir = TempDir::new().expect("temp dir");
let stream_id = StreamId::new(1);
let tail_hash_before_restart = {
let mut storage = Storage::new(temp_dir.path());
let events = vec![
Bytes::from_static(b"event1"),
Bytes::from_static(b"event2"),
Bytes::from_static(b"event3"),
];
let (_new_offset, new_hash) = storage
.append_batch(stream_id, events, Offset::ZERO, None, true)
.expect("initial append must succeed");
new_hash
};
let mut storage = Storage::new(temp_dir.path());
let recovered = storage
.latest_chain_hash(stream_id)
.expect("latest_chain_hash must succeed after restart");
assert_eq!(
recovered,
Some(tail_hash_before_restart),
"recovered hash must match the last hash from before restart"
);
let (next_offset, _) = storage
.append_batch(
stream_id,
vec![Bytes::from_static(b"post_restart")],
Offset::new(3),
recovered,
true,
)
.expect("append after restart must succeed");
assert_eq!(next_offset, Offset::new(4));
let records = storage
.read_from(stream_id, Offset::ZERO, 1024 * 1024)
.expect("verified read must succeed");
assert_eq!(records.len(), 4, "all four records must read back");
}
mod memory_tests {
use super::*;
fn test_events(count: usize) -> Vec<Bytes> {
(0..count)
.map(|i| Bytes::from(format!("event-{i}")))
.collect()
}
#[test]
fn append_and_read_single_event() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
let (new_offset, _hash) = storage
.append_batch(stream_id, test_events(1), Offset::new(0), None, false)
.unwrap();
assert_eq!(new_offset, Offset::new(1));
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].as_ref(), b"event-0");
}
#[test]
fn append_and_read_multiple_events() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(5), Offset::new(0), None, false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events.len(), 5);
(0..5).for_each(|i| {
assert_eq!(events[i].as_ref(), format!("event-{i}").as_bytes());
});
}
#[test]
fn read_from_middle_offset() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
let events = storage
.read_from(stream_id, Offset::new(5), u64::MAX)
.unwrap();
assert_eq!(events.len(), 5);
assert_eq!(events[0].as_ref(), b"event-5");
assert_eq!(events[4].as_ref(), b"event-9");
}
#[test]
fn multiple_streams_are_isolated() {
let mut storage = MemoryStorage::new();
let stream1 = StreamId::new(1);
let stream2 = StreamId::new(2);
storage
.append_batch(
stream1,
vec![Bytes::from("stream1-event")],
Offset::new(0),
None,
false,
)
.unwrap();
storage
.append_batch(
stream2,
vec![Bytes::from("stream2-event")],
Offset::new(0),
None,
false,
)
.unwrap();
let events1 = storage
.read_from(stream1, Offset::new(0), u64::MAX)
.unwrap();
let events2 = storage
.read_from(stream2, Offset::new(0), u64::MAX)
.unwrap();
assert_eq!(events1[0].as_ref(), b"stream1-event");
assert_eq!(events2[0].as_ref(), b"stream2-event");
}
#[test]
fn hash_chain_matches_on_disk_storage() {
let stream_id = StreamId::new(42);
let events = test_events(7);
let mut mem = MemoryStorage::new();
let (mem_offset, mem_hash) = mem
.append_batch(stream_id, events.clone(), Offset::new(0), None, false)
.unwrap();
let dir = TempDir::new().unwrap();
let mut disk = Storage::new(dir.path());
let (disk_offset, disk_hash) = disk
.append_batch(stream_id, events, Offset::new(0), None, false)
.unwrap();
assert_eq!(
mem_offset, disk_offset,
"offsets must match across backends"
);
assert_eq!(
mem_hash, disk_hash,
"hash chain must be deterministic across backends"
);
}
#[test]
fn hash_chain_continuity_matches_on_disk() {
let stream_id = StreamId::new(1);
let batch1 = test_events(3);
let batch2 = vec![Bytes::from("batch2-0"), Bytes::from("batch2-1")];
let mem_final = {
let mut mem = MemoryStorage::new();
let (offset1, hash1) = mem
.append_batch(stream_id, batch1.clone(), Offset::new(0), None, false)
.unwrap();
let (_, final_hash) = mem
.append_batch(stream_id, batch2.clone(), offset1, Some(hash1), false)
.unwrap();
final_hash
};
let disk_final = {
let dir = TempDir::new().unwrap();
let mut disk = Storage::new(dir.path());
let (offset1, hash1) = disk
.append_batch(stream_id, batch1, Offset::new(0), None, false)
.unwrap();
let (_, final_hash) = disk
.append_batch(stream_id, batch2, offset1, Some(hash1), false)
.unwrap();
final_hash
};
assert_eq!(
mem_final, disk_final,
"multi-batch chain hash must be deterministic across backends"
);
}
#[test]
fn latest_chain_hash_empty_stream_returns_none() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
let result = storage.latest_chain_hash(stream_id).unwrap();
assert!(result.is_none());
}
#[test]
fn latest_chain_hash_returns_last_appended() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
let (_, expected_hash) = storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
let recovered = storage.latest_chain_hash(stream_id).unwrap();
assert_eq!(recovered, Some(expected_hash));
}
#[test]
fn tampered_buffer_is_detected_on_read() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
let (offset1, _real_hash) = storage
.append_batch(stream_id, test_events(3), Offset::new(0), None, false)
.unwrap();
let wrong_hash = ChainHash::from_bytes(&[0x42u8; 32]);
storage
.append_batch(
stream_id,
vec![Bytes::from("broken")],
offset1,
Some(wrong_hash),
false,
)
.unwrap();
let result = storage.read_from(stream_id, Offset::new(0), u64::MAX);
assert!(
matches!(result, Err(StorageError::ChainVerificationFailed { .. })),
"wrong prev_hash must trigger ChainVerificationFailed, got {result:?}"
);
}
#[test]
fn empty_stream_read_returns_empty_vec() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(99);
let events = storage
.read_from(stream_id, Offset::new(0), u64::MAX)
.unwrap();
assert!(events.is_empty());
}
#[test]
fn segment_count_is_one_for_fresh_stream() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
assert_eq!(storage.segment_count(stream_id), 0);
storage
.append_batch(stream_id, test_events(1), Offset::new(0), None, false)
.unwrap();
assert_eq!(storage.segment_count(stream_id), 1);
}
#[test]
fn segment_rotates_when_size_exceeded() {
let mut storage = MemoryStorage::with_max_segment_size(500);
let stream_id = StreamId::new(1);
storage
.append_batch(stream_id, test_events(10), Offset::new(0), None, false)
.unwrap();
assert!(
storage.segment_count(stream_id) >= 2,
"expected at least 2 segments after rotation threshold, got {}",
storage.segment_count(stream_id)
);
let completed = storage.completed_segments(stream_id);
assert!(completed.contains(&0), "segment 0 should be completed");
}
#[test]
fn read_respects_max_bytes() {
let mut storage = MemoryStorage::new();
let stream_id = StreamId::new(1);
let events: Vec<Bytes> = (0..10)
.map(|i| Bytes::from(format!("event-{i:04}")))
.collect();
storage
.append_batch(stream_id, events, Offset::new(0), None, false)
.unwrap();
let events = storage.read_from(stream_id, Offset::new(0), 25).unwrap();
assert!(events.len() < 10);
assert!(!events.is_empty());
}
#[test]
fn flush_indexes_is_noop_and_ok() {
let mut storage = MemoryStorage::new();
assert!(storage.flush_indexes().is_ok());
assert!(storage.flush_indexes().is_ok());
}
#[test]
#[should_panic(expected = "cannot append empty batch")]
fn empty_batch_panics() {
let mut storage = MemoryStorage::new();
let _ = storage.append_batch(StreamId::new(1), Vec::new(), Offset::new(0), None, false);
}
}