#![cfg(feature = "replication")]
use std::net::TcpListener;
use std::sync::Arc;
use std::time::{Duration, Instant};
use armdb::FixedConfig;
use armdb::FixedTree;
use armdb::ShutdownSignal;
use armdb::fixed_replication::{
FixedReplicationClient, FixedReplicationServer, FixedReplicationTarget,
};
use tempfile::TempDir;
const VLEN: usize = 16;
type K = [u8; 8];
fn pick_port() -> u16 {
let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
l.local_addr().expect("local_addr").port()
}
fn test_config(shard_count: usize) -> FixedConfig {
FixedConfig {
shard_count,
grow_step: 64,
..FixedConfig::test()
}
}
fn kbe(i: u64) -> K {
i.to_be_bytes()
}
fn vbuf(i: u64) -> [u8; VLEN] {
let mut out = [0u8; VLEN];
out[..8].copy_from_slice(&i.to_be_bytes());
out[8..].copy_from_slice(&(i.wrapping_mul(31)).to_be_bytes());
out
}
fn wait_for(timeout: Duration, mut cond: impl FnMut() -> bool) -> bool {
let start = Instant::now();
while start.elapsed() < timeout {
if cond() {
return true;
}
std::thread::sleep(Duration::from_millis(20));
}
cond()
}
#[test]
fn test_initial_sync_empty_follower() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..100u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
for i in 0..10u64 {
leader.delete(&kbe(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(10..100u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"follower did not converge: len={}",
follower.len()
);
for i in 0..10u64 {
assert!(
follower.get(&kbe(i)).is_none(),
"deleted key {i} leaked to follower"
);
}
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_streaming_live_writes() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
std::thread::sleep(Duration::from_millis(300));
for i in 0..500u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(20), || {
(0..500u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"streaming did not converge: follower len={}",
follower.len()
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_reconnect_after_disconnect() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..50u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let stop_follower1 = ShutdownSignal::new();
let target1: Arc<dyn FixedReplicationTarget> = follower.clone();
let client1 = FixedReplicationClient::start(bind_addr, target1, stop_follower1.clone())
.expect("start client1");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(0..50u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"first-phase catch-up failed"
);
stop_follower1.shutdown();
drop(client1);
for i in 50..100u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let stop_follower2 = ShutdownSignal::new();
let target2: Arc<dyn FixedReplicationTarget> = follower.clone();
let client2 = FixedReplicationClient::start(bind_addr, target2, stop_follower2.clone())
.expect("start client2");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(0..100u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"reconnect did not converge: len={}",
follower.len()
);
stop_follower2.shutdown();
drop(client2);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_topology_mismatch_errors() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..20u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(4))
.expect("open follower with 4 shards"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
std::thread::sleep(Duration::from_secs(2));
assert_eq!(
follower.len(),
0,
"topology-mismatched follower applied events"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_follower_restart_clean_uses_sidecar() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..80u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
assert!(
wait_for(Duration::from_secs(15), || {
(0..80u64).all(|i| follower.get(&kbe(i)) == Some(vbuf(i)))
}),
"initial sync did not converge"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
for _ in 0..50 {
if Arc::strong_count(&follower) == 1 {
break;
}
std::thread::sleep(Duration::from_millis(20));
}
let follower = Arc::try_unwrap(follower).unwrap_or_else(|arc| {
panic!(
"follower still has outstanding Arc refs: strong_count={}",
Arc::strong_count(&arc)
)
});
follower.close().expect("clean close of follower");
let follower_root = follower_dir.path();
for shard_id in 0..2 {
let shard_dir = follower_root.join(format!("shard_{:03}", shard_id));
let sidecar = shard_dir.join("fixed.versions");
assert!(
sidecar.exists(),
"missing versions sidecar for shard {shard_id}: {sidecar:?}"
);
}
let reopened = FixedTree::<K, VLEN>::open(follower_root, test_config(2))
.expect("reopen follower after clean close");
for i in 0..80u64 {
assert_eq!(
reopened.get(&kbe(i)),
Some(vbuf(i)),
"key {i} missing after reopen via sidecar"
);
}
assert_eq!(reopened.len(), 80);
}
#[test]
fn test_catchup_multibatch_does_not_stall() {
const KEYS: u64 = 800;
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
for i in 0..KEYS {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(1)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let start = Instant::now();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(5), || f.len() as u64 == KEYS),
"catch-up did not converge within 5s: len={}, elapsed={:?}",
follower.len(),
start.elapsed()
);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(3),
"catch-up took {elapsed:?} — suggests Phase-1 is stalling on Acks"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_protocol_version_mismatch_rejected() {
use armdb::fixed_replication::protocol::{
FixedMessageType, SyncRequest, decode_error, read_frame, write_frame,
};
use std::io::BufReader;
use std::net::TcpStream;
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let mut stream = TcpStream::connect(bind_addr).expect("connect to leader");
stream.set_nodelay(true).ok();
let mut reader = BufReader::new(stream.try_clone().expect("clone"));
let bogus = SyncRequest {
shard_id: 0,
protocol_version: 99,
flags: 0,
};
write_frame(&mut stream, &bogus.encode()).expect("write frame");
let frame = read_frame(&mut reader).expect("read error frame");
assert_eq!(frame.msg_type, FixedMessageType::Error);
let msg = decode_error(&frame.payload);
assert!(
msg.contains("protocol version"),
"unexpected error message: {msg}"
);
stop_leader.shutdown();
drop(server);
}