use std::collections::HashSet;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use edgestore::compactor::Compactor;
use edgestore::manifest::Manifest;
use edgestore::segment::SegmentReader;
use edgestore::{CompactionStats, EdgestoreConfig, Engine};
use tempfile::TempDir;
fn open_engine_small_segments(dir: &TempDir) -> Engine {
let mut cfg = EdgestoreConfig::new(dir.path());
cfg.segment_size_bytes = 512;
Engine::open(cfg).unwrap()
}
fn now_nanos() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64
}
fn open_manifest(dir: &TempDir) -> Manifest {
Manifest::open(&dir.path().join("manifest.mf")).unwrap()
}
#[test]
fn test_ttl_expiry_zero_live_relocation() {
let dir = TempDir::new().unwrap();
let mut cfg = EdgestoreConfig::new(dir.path());
cfg.segment_size_bytes = 512; cfg.compaction_write_budget_bytes = u64::MAX;
let mut engine = Engine::open(cfg).unwrap();
for i in 0u32..20 {
engine
.put_with_ttl(b"ns", format!("key-{:04}", i).as_bytes(), b"value", 1)
.unwrap();
}
let _ = engine.flush_to_segments();
let initial_segments = {
let m = open_manifest(&dir);
m.list_segments().len()
};
assert!(
initial_segments >= 1,
"expected at least 1 segment after writes, got {}",
initial_segments
);
std::thread::sleep(Duration::from_secs(2));
let stats = engine.compact_once().unwrap();
assert_eq!(
stats.live_records_relocated, 0,
"COMPACT-04: all TTL=1s records should be dead after 2s sleep; live_records_relocated must be 0, got {}",
stats.live_records_relocated
);
assert!(
stats.segments_removed >= 1,
"at least 1 segment should be removed after compaction, got {}",
stats.segments_removed
);
assert!(
stats.cohorts_collected >= 1,
"at least 1 cohort should be collected, got {}",
stats.cohorts_collected
);
}
#[test]
fn test_range_scan_overlapping_segments() {
let dir = TempDir::new().unwrap();
let mut engine = open_engine_small_segments(&dir);
engine
.put(b"ns", b"overlap_key", b"v1_old")
.unwrap();
let _ = engine.flush_to_segments();
for i in 0u32..60 {
engine
.put(b"ns", format!("unique-{:04}", i).as_bytes(), b"unique_val")
.unwrap();
}
engine
.put(b"ns", b"overlap_key", b"v1_new")
.unwrap();
let _ = engine.flush_to_segments();
let seg_count = {
let m = open_manifest(&dir);
m.list_segments().len()
};
assert!(
seg_count >= 2,
"expected at least 2 segments for LWW overlap test, got {}",
seg_count
);
let results = engine
.range(b"ns", b"", b"\xFF\xFF\xFF\xFF")
.unwrap();
let overlap = results
.iter()
.find(|(k, _)| k == b"overlap_key");
assert!(
overlap.is_some(),
"overlap_key should be present in range results"
);
let (_, val) = overlap.unwrap();
assert_eq!(
val, b"v1_new",
"COMPACT-05: LWW must return newest value for overlap_key; got {:?}",
String::from_utf8_lossy(val)
);
let mut key_counts = std::collections::HashMap::new();
for (k, _) in &results {
*key_counts.entry(k.clone()).or_insert(0u32) += 1;
}
for (k, count) in &key_counts {
assert_eq!(
*count, 1,
"key {:?} appears {} times in range results (expected 1 — no duplicates)",
String::from_utf8_lossy(k),
count
);
}
let keys: Vec<&[u8]> = results.iter().map(|(k, _)| k.as_slice()).collect();
let mut sorted_keys = keys.clone();
sorted_keys.sort();
assert_eq!(keys, sorted_keys, "range results must be sorted by key");
}
#[test]
fn test_snapshot_survives_compaction() {
let dir = TempDir::new().unwrap();
let mut cfg = EdgestoreConfig::new(dir.path());
cfg.segment_size_bytes = 512;
cfg.compaction_write_budget_bytes = u64::MAX;
let mut engine = Engine::open(cfg).unwrap();
for i in 0u32..15 {
engine
.put_with_ttl(b"ns", format!("snap-key-{:04}", i).as_bytes(), b"snap-val", 1)
.unwrap();
}
let _ = engine.flush_to_segments();
let seg_count_before = {
let m = open_manifest(&dir);
m.list_segments().len()
};
assert!(
seg_count_before >= 1,
"need at least 1 segment before snapshot, got {}",
seg_count_before
);
let snapshot = engine.snapshot().unwrap();
let before_val = snapshot.get(b"ns", b"snap-key-0000").unwrap();
assert_eq!(
before_val,
Some(b"snap-val".to_vec()),
"snapshot should see key before compaction"
);
std::thread::sleep(Duration::from_secs(2));
let stats_first = engine.compact_once().unwrap();
let after_val = snapshot.get(b"ns", b"snap-key-0000").unwrap();
assert_eq!(
after_val,
Some(b"snap-val".to_vec()),
"COMPACT-06: snapshot must remain readable after compaction; got {:?}",
after_val.as_deref().map(String::from_utf8_lossy)
);
for i in 0u32..15 {
let key = format!("snap-key-{:04}", i);
let val = snapshot.get(b"ns", key.as_bytes()).unwrap();
assert_eq!(
val,
Some(b"snap-val".to_vec()),
"snapshot key {} should still be readable after compaction",
key
);
}
drop(snapshot);
let stats_second = engine.compact_once().unwrap();
let _ = stats_first; let _ = stats_second; }
#[test]
fn test_compaction_write_budget_enforced() {
let dir = TempDir::new().unwrap();
let manifest_path = dir.path().join("manifest.mf");
let mut manifest = Manifest::open(&manifest_path).unwrap();
let cohort_window_secs: u64 = 3600;
let now = now_nanos();
for cohort_idx in 0u64..3 {
let seg_id = cohort_idx;
let write_time_nanos = now + (cohort_idx as i64 + 1) * 3_600_000_000_000_i64;
let ttl: u32 = 0;
use edgestore::types::{encode_key, MemEntry, Operation};
let entries: Vec<(Vec<u8>, MemEntry)> = (0..5u64)
.map(|j| {
let key = encode_key(
b"ns",
format!("cohort{}-key{}", cohort_idx, j).as_bytes(),
);
let entry = MemEntry {
key: key.clone(),
value: Some(format!("val-{}-{}", cohort_idx, j).into_bytes()),
op: Operation::Put,
lsn: cohort_idx * 10 + j + 1,
timestamp: write_time_nanos,
ttl,
};
(key, entry)
})
.collect();
let mut sorted = entries;
sorted.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut writer = edgestore::segment::SegmentWriter::new(
dir.path().to_path_buf(),
seg_id,
cohort_window_secs,
);
let meta = writer.flush(&sorted).unwrap();
manifest.add_segment(meta).unwrap();
}
assert_eq!(manifest.list_segments().len(), 3, "should have 3 segments");
let compactor = Compactor::new(dir.path().to_path_buf(), 1, cohort_window_secs);
let pinned: HashSet<u64> = HashSet::new();
let stats = compactor
.compact_cycle(&mut manifest, now_nanos(), &pinned)
.unwrap();
assert!(
stats.cohorts_collected <= 2,
"COMPACT-04: write budget should stop compaction early; cohorts_collected={} (expected <= 2)",
stats.cohorts_collected
);
let remaining = manifest.list_segments().len();
assert!(
remaining >= 1,
"some segments should remain after budget-bounded compaction, got {}",
remaining
);
}
#[test]
fn test_merkle_root_correct_after_compaction() {
let dir = TempDir::new().unwrap();
let manifest_path = dir.path().join("manifest.mf");
let mut manifest = Manifest::open(&manifest_path).unwrap();
let cohort_window_secs: u64 = 3600;
let write_time_nanos: i64 = 3_600_000_000_000; let now_nanos_compact: i64 = write_time_nanos + 2_000_000_000;
use edgestore::compactor::CohortInfo;
use edgestore::types::{encode_key, MemEntry, Operation};
let entries: Vec<(Vec<u8>, MemEntry)> = {
let mut v = vec![];
for i in 0u64..3 {
let key = encode_key(b"ns", format!("dead-{:04}", i).as_bytes());
v.push((
key.clone(),
MemEntry {
key: key.clone(),
value: Some(b"dead-val".to_vec()),
op: Operation::Put,
lsn: i + 1,
timestamp: write_time_nanos,
ttl: 1,
},
));
}
for i in 0u64..4 {
let key = encode_key(b"ns", format!("live-{:04}", i).as_bytes());
v.push((
key.clone(),
MemEntry {
key: key.clone(),
value: Some(b"live-val".to_vec()),
op: Operation::Put,
lsn: 100 + i,
timestamp: write_time_nanos,
ttl: 0,
},
));
}
v.sort_by(|(a, _), (b, _)| a.cmp(b));
v
};
let mut writer =
edgestore::segment::SegmentWriter::new(dir.path().to_path_buf(), 0, cohort_window_secs);
let meta0 = writer.flush(&entries).unwrap();
manifest.add_segment(meta0.clone()).unwrap();
let cohort = CohortInfo {
cohort_bucket: meta0.cohort_bucket,
segment_ids: vec![0],
max_death_time_nanos: meta0.death_time,
total_records: meta0.record_count,
dead_record_estimate: 0,
is_fully_expired: false,
};
let compactor = Compactor::new(dir.path().to_path_buf(), u64::MAX, cohort_window_secs);
let mut stats = CompactionStats::default();
compactor
.compact_partial_cohort(&mut manifest, &cohort, now_nanos_compact, 1, &mut stats)
.unwrap();
assert_eq!(
manifest.list_segments().len(),
1,
"should have exactly 1 output segment after compaction"
);
let output_meta = &manifest.list_segments()[0];
let output_seg_id = output_meta.segment_id;
assert_eq!(output_seg_id, 1, "output segment should have id=1");
let reader = SegmentReader::open(dir.path().to_path_buf(), output_seg_id).unwrap();
let stored_merkle_root = reader.meta.merkle_root.clone();
let all_entries = reader.range_scan(&[], &[0xFFu8; 256]).unwrap();
assert!(
!all_entries.is_empty(),
"output segment should have surviving entries"
);
let mut key_hashes: Vec<[u8; 32]> = all_entries
.iter()
.map(|(k, _)| *blake3::hash(k).as_bytes())
.collect();
key_hashes.sort_unstable();
let mut hasher = blake3::Hasher::new();
for h in &key_hashes {
hasher.update(h);
}
let recomputed_root = hasher.finalize().as_bytes().to_vec();
assert_eq!(
stored_merkle_root, recomputed_root,
"COMPACT-07: merkle_root in output segment meta must match recomputed value"
);
assert_eq!(stats.live_records_relocated, 4, "4 live records should be relocated");
assert_eq!(stats.segments_written, 1);
}