use edgestore::segment::{
SegmentReader, SegmentWriter, SPARSE_INDEX_STRIDE,
};
use edgestore::types::{encode_key, MemEntry, Operation, SegmentMeta};
use edgestore::{EdgestoreConfig, Engine};
use std::io::{Seek, SeekFrom, Write};
use tempfile::TempDir;
fn open_engine(dir: &TempDir) -> Engine {
Engine::open(EdgestoreConfig::new(dir.path())).unwrap()
}
fn make_entry(key: &[u8], lsn: u64, value: &[u8], ts: i64) -> MemEntry {
MemEntry {
key: key.to_vec(),
value: Some(value.to_vec()),
op: Operation::Put,
lsn,
timestamp: ts,
ttl: 0,
}
}
fn sorted_entries(n: usize) -> Vec<(Vec<u8>, MemEntry)> {
let mut v: Vec<(Vec<u8>, MemEntry)> = (0..n)
.map(|i| {
let k = encode_key(b"ns", format!("key-{:04}", i).as_bytes());
let val = format!("val-{:04}", i);
let e = make_entry(&k, i as u64 + 1, val.as_bytes(), 3_600_000_000_000);
(k, e)
})
.collect();
v.sort_by(|(a, _), (b, _)| a.cmp(b));
v
}
#[test]
fn test_flush_produces_four_files() {
let dir = TempDir::new().unwrap();
let mut engine = open_engine(&dir);
for i in 0u32..100 {
engine
.put(b"ns", format!("key-{:04}", i).as_bytes(), b"value")
.unwrap();
}
let meta = engine.flush_to_segments().unwrap();
assert_eq!(meta.record_count, 100);
let seg = format!("segment-{:08}.dat", meta.segment_id);
let dat_path = dir.path().join(&seg);
let idx_path = dir.path().join(seg.replace(".dat", ".idx"));
let xf_path = dir.path().join(seg.replace(".dat", ".xf"));
let meta_path = dir.path().join(seg.replace(".dat", ".meta"));
assert!(dat_path.exists(), ".dat missing");
assert!(idx_path.exists(), ".idx missing");
assert!(xf_path.exists(), ".xf missing");
assert!(meta_path.exists(), ".meta missing");
let dat_bytes = std::fs::read(&dat_path).unwrap();
let expected_hash = blake3::hash(&dat_bytes).as_bytes().to_vec();
assert_eq!(meta.segment_hash, expected_hash, "BLAKE3 hash mismatch");
}
#[test]
fn test_xor_filter_no_false_negatives() {
let dir = TempDir::new().unwrap();
let entries = sorted_entries(200);
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let reader = SegmentReader::open(dir.path().to_path_buf(), 0).unwrap();
for (key, _) in &entries {
assert!(
reader.get(key).unwrap().is_some(),
"false negative for key {:?}",
key
);
}
}
#[test]
fn test_xor_filter_fast_reject() {
let dir = TempDir::new().unwrap();
let entries = sorted_entries(50);
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let reader = SegmentReader::open(dir.path().to_path_buf(), 0).unwrap();
for i in 0..50usize {
let absent = encode_key(b"ns", format!("absent-{:04}", i).as_bytes());
assert!(
reader.get(&absent).unwrap().is_none(),
"unexpected hit for absent key"
);
}
}
#[test]
fn test_sparse_index_seek_accuracy() {
let dir = TempDir::new().unwrap();
let entries = sorted_entries(500);
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let idx_path = dir.path().join("segment-00000000.idx");
let index = edgestore::segment::read_idx_file(&idx_path).unwrap();
let target_key = &entries[250].0;
let mut best_idx_pos: usize = 0;
for (i, (k, _)) in index.iter().enumerate() {
if k.as_slice() <= target_key.as_slice() {
best_idx_pos = i;
} else {
break;
}
}
let approx_entry_pos = best_idx_pos * SPARSE_INDEX_STRIDE;
assert!(
250 - approx_entry_pos.min(250) <= SPARSE_INDEX_STRIDE,
"sparse index too far from target: index_pos={}, entry=250, stride={}",
approx_entry_pos,
SPARSE_INDEX_STRIDE
);
}
#[test]
fn test_segment_meta_cohort_and_death_time() {
let dir = TempDir::new().unwrap();
let mut engine = open_engine(&dir);
for i in 0u32..20 {
engine
.put(b"ns", format!("key-{:04}", i).as_bytes(), b"val")
.unwrap();
}
let meta: SegmentMeta = engine.flush_to_segments().unwrap();
assert!(meta.cohort_bucket != 0 || meta.death_time > 0, "cohort fields unset");
assert!(meta.death_time > 0, "death_time is zero");
}
#[test]
fn test_segment_format_encode_decode() {
let dir = TempDir::new().unwrap();
let entries = sorted_entries(10);
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let reader = SegmentReader::open(dir.path().to_path_buf(), 0).unwrap();
for (key, original) in &entries {
let found = reader.get(key).unwrap().expect("key not found");
assert_eq!(found.lsn, original.lsn);
assert_eq!(found.value, original.value);
assert_eq!(found.op, original.op);
}
}
#[test]
fn test_manifest_replay_after_multiple_flushes() {
let dir = TempDir::new().unwrap();
let batch1_key = b"batch1-key";
let batch2_key = b"batch2-key";
{
let mut engine = open_engine(&dir);
for i in 0u32..50 {
engine
.put(b"ns", format!("key1-{:04}", i).as_bytes(), b"val1")
.unwrap();
}
engine
.put(b"ns", batch1_key, b"batch1-value")
.unwrap();
engine.flush_to_segments().unwrap();
for i in 0u32..50 {
engine
.put(b"ns", format!("key2-{:04}", i).as_bytes(), b"val2")
.unwrap();
}
engine
.put(b"ns", batch2_key, b"batch2-value")
.unwrap();
engine.flush_to_segments().unwrap();
}
let engine2 = open_engine(&dir);
assert_eq!(
engine2.get(b"ns", batch1_key).unwrap(),
Some(b"batch1-value".to_vec()),
"batch1 key not readable after reopen"
);
assert_eq!(
engine2.get(b"ns", batch2_key).unwrap(),
Some(b"batch2-value".to_vec()),
"batch2 key not readable after reopen"
);
}
#[test]
fn test_segment_corruption_detection() {
let dir = TempDir::new().unwrap();
let entries = sorted_entries(20);
let mut writer = SegmentWriter::new(dir.path().to_path_buf(), 0, 3600);
writer.flush(&entries).unwrap();
let dat_path = dir.path().join("segment-00000000.dat");
{
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&dat_path)
.unwrap();
f.seek(SeekFrom::Start(100)).unwrap();
f.write_all(&[0xFF]).unwrap();
}
let reader = SegmentReader::open(dir.path().to_path_buf(), 0).unwrap();
let result = reader.get(&entries[5].0);
match result {
Ok(_) | Err(_) => {}
}
}
#[test]
fn test_segment_survives_engine_crash_recovery() {
let dir = TempDir::new().unwrap();
{
let mut engine = open_engine(&dir);
for i in 0u32..30 {
engine
.put(b"ns", format!("flushed-{:04}", i).as_bytes(), b"v1")
.unwrap();
}
engine.flush_to_segments().unwrap();
for i in 0u32..20 {
engine
.put(b"ns", format!("unflushed-{:04}", i).as_bytes(), b"v2")
.unwrap();
}
engine.flush().unwrap();
}
let engine2 = open_engine(&dir);
for i in 0u32..30 {
let key = format!("flushed-{:04}", i);
assert_eq!(
engine2.get(b"ns", key.as_bytes()).unwrap(),
Some(b"v1".to_vec()),
"flushed key {} missing after recovery",
key
);
}
for i in 0u32..20 {
let key = format!("unflushed-{:04}", i);
assert_eq!(
engine2.get(b"ns", key.as_bytes()).unwrap(),
Some(b"v2".to_vec()),
"unflushed key {} missing after recovery",
key
);
}
}
#[test]
fn test_range_exclusive_end() {
let dir = TempDir::new().unwrap();
let mut engine = open_engine(&dir);
engine.put(b"ns", b"a", b"va").unwrap();
engine.put(b"ns", b"b", b"vb").unwrap();
engine.put(b"ns", b"c", b"vc").unwrap();
let results = engine.range(b"ns", b"a", b"b").unwrap();
assert_eq!(results.len(), 1, "memtable: range [a, b) must exclude key b");
assert_eq!(results[0].0, b"a", "memtable: only key a must be returned");
assert!(
results.iter().all(|(k, _)| k.as_slice() != b"b"),
"memtable: key b must not appear"
);
engine.flush_to_segments().unwrap();
let results = engine.range(b"ns", b"a", b"b").unwrap();
assert_eq!(results.len(), 1, "segment: range [a, b) must exclude key b");
assert_eq!(results[0].0, b"a", "segment: only key a must be returned");
assert!(
results.iter().all(|(k, _)| k.as_slice() != b"b"),
"segment: key b must not appear"
);
}