use std::time::Duration;
use edgestore::{EdgestoreConfig, Engine, ImportResult};
use edgestore::replication::SegmentRef;
use tempfile::TempDir;
fn open_engine(dir: &TempDir) -> Engine {
Engine::open(EdgestoreConfig::new(dir.path())).unwrap()
}
fn open_engine_small(dir: &TempDir) -> Engine {
let mut cfg = EdgestoreConfig::new(dir.path());
cfg.segment_size_bytes = 256;
Engine::open(cfg).unwrap()
}
fn read_first_segment(engine: &Engine) -> ([u8; 32], Vec<u8>) {
let refs = engine.export_manifest().unwrap();
assert!(!refs.is_empty(), "engine must have at least one segment");
let seg_ref = &refs[0];
let hash = seg_ref.segment_hash;
let dat_path = engine
.db_path()
.join(format!("segment-{:08}.dat", seg_ref.segment_id));
let data = std::fs::read(&dat_path).expect("segment .dat file must exist");
(hash, data)
}
#[test]
fn test_sc2_cursor_durability() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
{
let mut a = open_engine_small(&dir_a);
for i in 0u32..5 {
a.put(b"ns", format!("key{:03}", i).as_bytes(), b"value_a")
.unwrap();
}
let _ = a.flush_to_segments();
for i in 5u32..10 {
a.put(b"ns", format!("key{:03}", i).as_bytes(), b"value_a")
.unwrap();
}
let _ = a.flush_to_segments();
}
let a = open_engine_small(&dir_a);
let peer_segments: Vec<SegmentRef> = a.export_manifest().unwrap();
assert!(
peer_segments.len() >= 2,
"Engine A must have at least 2 segments; got {}",
peer_segments.len()
);
let mut b = open_engine(&dir_b);
let b_root = b.range_merkle_root().unwrap();
let a_root = a.range_merkle_root().unwrap();
assert!(
!b.compare_merkle(&a_root).unwrap(),
"B and A must have diverged Merkle roots before sync"
);
let _ = (b_root, a_root);
let missing = b.missing_segments(&peer_segments);
assert!(
missing.len() >= 2,
"B must be missing at least 2 segments; got {}",
missing.len()
);
let hash0 = missing[0];
let hash0_hex: String = hash0.iter().map(|byte| format!("{:02x}", byte)).collect();
let seg_ref0 = peer_segments
.iter()
.find(|s| s.segment_hash == hash0)
.expect("hash0 must be in A's manifest");
let dat_path0 = dir_a
.path()
.join(format!("segment-{:08}.dat", seg_ref0.segment_id));
let data0 = std::fs::read(&dat_path0).unwrap();
let result0 = b.import_segment(&data0, &hash0).unwrap();
assert!(
matches!(result0, ImportResult::Applied { keys_written, .. } if keys_written >= 1),
"first import must be Applied with keys_written >= 1"
);
let cursor_dir = dir_b.path().join("sync");
std::fs::create_dir_all(&cursor_dir).unwrap();
let cursor_path = cursor_dir.join("peer-a.cursor");
#[derive(serde::Serialize, serde::Deserialize, Default)]
struct CursorCompat {
last_known_merkle_root: Vec<u8>,
segments_pending: Vec<Vec<u8>>,
last_attempt_secs: u64,
segments_applied_total: u64,
}
let pending_hashes: Vec<Vec<u8>> = missing[1..].iter().map(|h| h.to_vec()).collect();
let cursor = CursorCompat {
last_known_merkle_root: vec![],
segments_pending: pending_hashes.clone(),
last_attempt_secs: 0,
segments_applied_total: 1,
};
let cursor_bytes = rmp_serde::to_vec(&cursor).unwrap();
std::fs::write(&cursor_path, &cursor_bytes).unwrap();
drop(b);
let mut b = open_engine(&dir_b);
let cursor_file = std::fs::File::open(&cursor_path).unwrap();
let loaded: CursorCompat = rmp_serde::from_read(cursor_file).unwrap();
assert!(
!loaded.segments_pending.is_empty(),
"cursor must have pending segments after partial sync"
);
let result_retry = b.import_segment(&data0, &hash0).unwrap();
assert!(
matches!(result_retry, ImportResult::Skipped),
"re-importing an already-applied segment must return Skipped; got {:?}",
hash0_hex
);
for hash_vec in &pending_hashes {
let mut hash = [0u8; 32];
hash.copy_from_slice(hash_vec);
let seg_ref = peer_segments
.iter()
.find(|s| s.segment_hash == hash)
.expect("pending hash must be in A's manifest");
let dat_path = dir_a
.path()
.join(format!("segment-{:08}.dat", seg_ref.segment_id));
let data = std::fs::read(&dat_path).unwrap();
let r = b.import_segment(&data, &hash).unwrap();
assert!(
matches!(r, ImportResult::Applied { .. } | ImportResult::Skipped),
"remaining import must be Applied or Skipped"
);
}
for i in 0u32..10 {
let key = format!("key{:03}", i);
let val = b.get(b"ns", key.as_bytes()).unwrap();
assert!(
val.is_some(),
"SC2: key {} must be readable in B after full sync",
key
);
}
}
#[test]
fn test_sc3_lww_higher_timestamp_wins() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let mut a = open_engine(&dir_a);
a.put(b"ns", b"shared", b"from_a").unwrap();
std::thread::sleep(Duration::from_millis(5));
let mut b = open_engine(&dir_b);
b.put(b"ns", b"shared", b"from_b").unwrap();
a.flush_to_segments().unwrap();
let (hash_a, data_a) = read_first_segment(&a);
let result = b.import_segment(&data_a, &hash_a).unwrap();
match result {
ImportResult::Applied { keys_skipped, .. } => {
assert!(
keys_skipped >= 1,
"SC3a: 'shared' key should be skipped (B timestamp > A timestamp); keys_skipped={}",
keys_skipped
);
}
ImportResult::Skipped => {
}
ImportResult::HashMismatch => {
panic!("SC3a: unexpected HashMismatch on import");
}
}
let val = b.get(b"ns", b"shared").unwrap();
assert_eq!(
val,
Some(b"from_b".to_vec()),
"SC3a: B's higher-timestamp record must win; expected 'from_b', got {:?}",
val.as_deref().map(String::from_utf8_lossy)
);
}
#[test]
fn test_sc3_lww_collision_local_wins() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let mut a = open_engine(&dir_a);
let mut b = open_engine(&dir_b);
a.put(b"ns", b"collision", b"from_alpha").unwrap();
std::thread::sleep(Duration::from_millis(2));
b.put(b"ns", b"collision", b"from_beta").unwrap();
a.flush_to_segments().unwrap();
let (hash_a, data_a) = read_first_segment(&a);
let result = b.import_segment(&data_a, &hash_a).unwrap();
match &result {
ImportResult::Applied { keys_skipped, .. } => {
assert!(
*keys_skipped >= 1,
"SC3b: 'collision' must be skipped because B's timestamp >= A's; keys_skipped={}",
keys_skipped
);
}
ImportResult::Skipped => {} ImportResult::HashMismatch => panic!("SC3b: unexpected HashMismatch"),
}
let val = b.get(b"ns", b"collision").unwrap();
assert_eq!(
val,
Some(b"from_beta".to_vec()),
"SC3b: local record must win on timestamp tie/lower-incoming; got {:?}",
val.as_deref().map(String::from_utf8_lossy)
);
}
#[test]
fn test_import_delete_tombstone_replicated() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let mut a = open_engine(&dir_a);
a.put(b"ns", b"del_me", b"val").unwrap();
a.delete(b"ns", b"del_me").unwrap();
a.flush_to_segments().unwrap();
let (hash_a, data_a) = read_first_segment(&a);
let mut b = open_engine(&dir_b);
let result = b.import_segment(&data_a, &hash_a).unwrap();
assert!(
matches!(result, ImportResult::Applied { .. }),
"C-01: import must be Applied"
);
let val = b.get(b"ns", b"del_me").unwrap();
assert_eq!(
val, None,
"C-01: delete tombstone must be replicated; expected None, got {:?}",
val
);
}
#[test]
fn test_import_post_restart_get() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let mut a = open_engine(&dir_a);
a.put(b"ns", b"persist", b"after_restart").unwrap();
a.flush_to_segments().unwrap();
let (hash_a, data_a) = read_first_segment(&a);
{
let mut b = open_engine(&dir_b);
let result = b.import_segment(&data_a, &hash_a).unwrap();
assert!(
matches!(result, ImportResult::Applied { keys_written, .. } if keys_written >= 1),
"C-02: import must apply at least one key"
);
let val = b.get(b"ns", b"persist").unwrap();
assert_eq!(val, Some(b"after_restart".to_vec()));
}
let b = open_engine(&dir_b);
let val = b.get(b"ns", b"persist").unwrap();
assert_eq!(
val,
Some(b"after_restart".to_vec()),
"C-02: imported data must be readable after engine restart"
);
}
#[test]
fn test_import_post_restart_range() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let mut a = open_engine(&dir_a);
a.put(b"ns", b"alpha", b"1").unwrap();
a.put(b"ns", b"beta", b"2").unwrap();
a.put(b"ns", b"gamma", b"3").unwrap();
a.flush_to_segments().unwrap();
let (hash_a, data_a) = read_first_segment(&a);
{
let mut b = open_engine(&dir_b);
let result = b.import_segment(&data_a, &hash_a).unwrap();
assert!(matches!(result, ImportResult::Applied { .. }));
}
let b = open_engine(&dir_b);
let results = b.range(b"ns", b"alpha", b"gamma").unwrap();
let keys: Vec<String> = results
.into_iter()
.map(|(k, _)| String::from_utf8_lossy(&k).to_string())
.collect();
assert_eq!(
keys,
vec!["alpha", "beta"],
"C-03: range scan must include imported keys after restart; got {:?}",
keys
);
let prefix_results = b.prefix(b"ns", b"be").unwrap();
assert_eq!(
prefix_results.len(),
1,
"C-03: prefix query must return 'beta' after restart"
);
assert_eq!(prefix_results[0].0, b"beta");
}