use std::sync::{Arc, Mutex};
use std::time::Duration;
use edgestore::{EdgestoreConfig, Engine, RemoteStore};
use edgestore_repl::{AntiEntropyLoop, FilesystemRemoteStore, HttpReplicationServer};
use tempfile::TempDir;
fn open_engine(dir: &TempDir) -> Engine {
Engine::open(EdgestoreConfig::new(dir.path())).unwrap()
}
fn open_engine_small(dir: &TempDir) -> Engine {
let mut cfg = EdgestoreConfig::new(dir.path());
cfg.segment_size_bytes = 256;
Engine::open(cfg).unwrap()
}
#[test]
fn test_sc1_two_engine_sync_via_http() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let engine_a: Arc<Mutex<Engine>> = {
let mut a = open_engine_small(&dir_a);
for i in 0u32..10 {
a.put(b"ns", format!("key{:03}", i).as_bytes(), b"value_from_a")
.unwrap();
}
let _ = a.flush_to_segments();
let refs = a.export_manifest().unwrap();
assert!(
!refs.is_empty(),
"SC1: Engine A must have at least 1 segment after writes+flush"
);
Arc::new(Mutex::new(a))
};
let server = HttpReplicationServer::new(Arc::clone(&engine_a));
let (_handle_a, port_a) = server.start("127.0.0.1:0").unwrap();
let peer_url = format!("http://127.0.0.1:{}", port_a);
std::thread::sleep(Duration::from_millis(50));
let engine_b: Arc<Mutex<Engine>> = Arc::new(Mutex::new(open_engine(&dir_b)));
{
let a = engine_a.lock().unwrap();
let b = engine_b.lock().unwrap();
let a_root = a.range_merkle_root().unwrap();
let b_root = b.range_merkle_root().unwrap();
assert_ne!(
a_root, b_root,
"SC1: A and B must have different Merkle roots before sync"
);
}
let loop_b = AntiEntropyLoop::new(
Arc::clone(&engine_b),
peer_url,
"peer-a".to_string(),
dir_b.path().to_path_buf(),
)
.with_interval(1);
let _handle_b = loop_b.start();
std::thread::sleep(Duration::from_secs(4));
let a_root = engine_a.lock().unwrap().range_merkle_root().unwrap();
let b_in_sync = engine_b
.lock()
.unwrap()
.compare_merkle(&a_root)
.unwrap();
assert!(
b_in_sync,
"SC1: B's Merkle root must match A's after anti-entropy sync"
);
let b = engine_b.lock().unwrap();
for i in 0u32..10 {
let key = format!("key{:03}", i);
let val = b.get(b"ns", key.as_bytes()).unwrap();
assert!(
val.is_some(),
"SC1: key '{}' must be readable in B after sync",
key
);
assert_eq!(
val.unwrap(),
b"value_from_a".to_vec(),
"SC1: key '{}' value mismatch in B after sync",
key
);
}
}
#[test]
fn test_sc4_filesystem_remote_store_roundtrip() {
let dir = TempDir::new().unwrap();
let store = FilesystemRemoteStore::new(dir.path().to_path_buf())
.expect("FilesystemRemoteStore::new must succeed");
let data: Vec<u8> = (0u16..1024).map(|i| (i % 256) as u8).collect();
let hash: [u8; 32] = *blake3::hash(&data).as_bytes();
store.upload(&hash, &data).expect("upload must succeed");
let downloaded = store.download(&hash).expect("download must succeed");
assert_eq!(
downloaded, data,
"SC4: downloaded bytes must exactly match uploaded bytes"
);
let downloaded_hash: [u8; 32] = *blake3::hash(&downloaded).as_bytes();
assert_eq!(
downloaded_hash, hash,
"SC4: BLAKE3 of downloaded bytes must match original hash"
);
store.delete(&hash).expect("delete must succeed");
let result = store.download(&hash);
assert!(
result.is_err(),
"SC4: download after delete must return Err; got Ok"
);
}
#[test]
fn test_sc5_debug_json_endpoint() {
let dir_a = TempDir::new().unwrap();
let engine_a: Arc<Mutex<Engine>> = {
let mut a = open_engine(&dir_a);
a.put(b"ns", b"k1", b"v1").unwrap();
a.put(b"ns", b"k2", b"v2").unwrap();
a.put(b"ns", b"k3", b"v3").unwrap();
a.flush_to_segments().unwrap();
Arc::new(Mutex::new(a))
};
let server = HttpReplicationServer::new(Arc::clone(&engine_a));
let (_handle, port) = server.start("127.0.0.1:0").unwrap();
std::thread::sleep(Duration::from_millis(100));
let base = format!("http://127.0.0.1:{}", port);
let resp = ureq::get(&format!("{}/merkle?debug=json", base))
.call()
.expect("GET /merkle?debug=json must succeed");
assert_eq!(
resp.status(),
200,
"SC5: /merkle?debug=json must return 200"
);
let content_type = resp
.header("Content-Type")
.unwrap_or("")
.to_string();
assert!(
content_type.contains("application/json"),
"SC5: /merkle?debug=json Content-Type must be application/json; got '{}'",
content_type
);
let body = resp.into_string().expect("must read body as string");
let json: serde_json::Value =
serde_json::from_str(&body).expect("SC5: /merkle?debug=json body must be valid JSON");
assert!(
json.is_object(),
"SC5: /merkle?debug=json must return a JSON object"
);
assert!(
json.get("root").is_some(),
"SC5: /merkle?debug=json must contain 'root' key; got: {}",
json
);
let resp2 = ureq::get(&format!("{}/segments?debug=json", base))
.call()
.expect("GET /segments?debug=json must succeed");
assert_eq!(
resp2.status(),
200,
"SC5: /segments?debug=json must return 200"
);
let body2 = resp2.into_string().expect("must read body as string");
let json2: serde_json::Value =
serde_json::from_str(&body2).expect("SC5: /segments?debug=json body must be valid JSON");
assert!(
json2.is_array(),
"SC5: /segments?debug=json must return a JSON array; got: {}",
json2
);
let arr = json2.as_array().unwrap();
assert!(
!arr.is_empty(),
"SC5: /segments?debug=json array must have at least 1 element (we flushed 3 records)"
);
for (i, element) in arr.iter().enumerate() {
assert!(
element.get("segment_id").is_some(),
"SC5: segment[{}] must have 'segment_id' key; got: {}",
i,
element
);
assert!(
element.get("segment_hash").is_some(),
"SC5: segment[{}] must have 'segment_hash' key; got: {}",
i,
element
);
}
}