#![cfg(feature = "replication")]
use std::net::TcpListener;
use std::sync::Arc;
use std::time::{Duration, Instant};
use armdb::FixedConfig;
use armdb::FixedTree;
use armdb::ShutdownSignal;
use armdb::fixed_replication::{
FixedReplicationClient, FixedReplicationServer, FixedReplicationTarget,
};
use tempfile::TempDir;
const VLEN: usize = 16;
type K = [u8; 8];
fn pick_port() -> u16 {
let l = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
l.local_addr().expect("local_addr").port()
}
fn test_config(shard_count: usize) -> FixedConfig {
FixedConfig {
shard_count,
grow_step: 64,
..FixedConfig::test()
}
}
fn kbe(i: u64) -> K {
i.to_be_bytes()
}
fn vbuf(i: u64) -> [u8; VLEN] {
let mut out = [0u8; VLEN];
out[..8].copy_from_slice(&i.to_be_bytes());
out[8..].copy_from_slice(&(i.wrapping_mul(31)).to_be_bytes());
out
}
fn wait_for(timeout: Duration, mut cond: impl FnMut() -> bool) -> bool {
let start = Instant::now();
while start.elapsed() < timeout {
if cond() {
return true;
}
std::thread::sleep(Duration::from_millis(20));
}
cond()
}
#[test]
fn test_initial_sync_empty_follower() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..100u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
for i in 0..10u64 {
leader.delete(&kbe(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(10..100u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"follower did not converge: len={}",
follower.len()
);
for i in 0..10u64 {
assert!(
follower.get(&kbe(i)).is_none(),
"deleted key {i} leaked to follower"
);
}
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_streaming_live_writes() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
std::thread::sleep(Duration::from_millis(300));
for i in 0..500u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(20), || {
(0..500u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"streaming did not converge: follower len={}",
follower.len()
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_reconnect_after_disconnect() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..50u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let stop_follower1 = ShutdownSignal::new();
let target1: Arc<dyn FixedReplicationTarget> = follower.clone();
let client1 = FixedReplicationClient::start(bind_addr, target1, stop_follower1.clone())
.expect("start client1");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(0..50u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"first-phase catch-up failed"
);
stop_follower1.shutdown();
drop(client1);
for i in 50..100u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let stop_follower2 = ShutdownSignal::new();
let target2: Arc<dyn FixedReplicationTarget> = follower.clone();
let client2 = FixedReplicationClient::start(bind_addr, target2, stop_follower2.clone())
.expect("start client2");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(15), || {
(0..100u64).all(|i| f.get(&kbe(i)) == Some(vbuf(i)))
}),
"reconnect did not converge: len={}",
follower.len()
);
stop_follower2.shutdown();
drop(client2);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_topology_mismatch_errors() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..20u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(4))
.expect("open follower with 4 shards"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
std::thread::sleep(Duration::from_secs(2));
assert_eq!(
follower.len(),
0,
"topology-mismatched follower applied events"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_follower_restart_clean_uses_sidecar() {
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
for i in 0..80u64 {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
assert!(
wait_for(Duration::from_secs(15), || {
(0..80u64).all(|i| follower.get(&kbe(i)) == Some(vbuf(i)))
}),
"initial sync did not converge"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
for _ in 0..50 {
if Arc::strong_count(&follower) == 1 {
break;
}
std::thread::sleep(Duration::from_millis(20));
}
let follower = Arc::try_unwrap(follower).unwrap_or_else(|arc| {
panic!(
"follower still has outstanding Arc refs: strong_count={}",
Arc::strong_count(&arc)
)
});
follower.close().expect("clean close of follower");
let follower_root = follower_dir.path();
for shard_id in 0..2 {
let shard_dir = follower_root.join(format!("shard_{:03}", shard_id));
let sidecar = shard_dir.join("fixed.versions");
assert!(
sidecar.exists(),
"missing versions sidecar for shard {shard_id}: {sidecar:?}"
);
}
let reopened = FixedTree::<K, VLEN>::open(follower_root, test_config(2))
.expect("reopen follower after clean close");
for i in 0..80u64 {
assert_eq!(
reopened.get(&kbe(i)),
Some(vbuf(i)),
"key {i} missing after reopen via sidecar"
);
}
assert_eq!(reopened.len(), 80);
}
#[test]
fn test_catchup_multibatch_does_not_stall() {
const KEYS: u64 = 800;
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
for i in 0..KEYS {
leader.put(&kbe(i), &vbuf(i)).unwrap();
}
leader.flush().unwrap();
let port = pick_port();
let bind_addr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let follower_dir = TempDir::new().unwrap();
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(1)).expect("open follower"),
);
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let stop_follower = ShutdownSignal::new();
let start = Instant::now();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone())
.expect("start client");
let f = follower.clone();
assert!(
wait_for(Duration::from_secs(5), || f.len() as u64 == KEYS),
"catch-up did not converge within 5s: len={}, elapsed={:?}",
follower.len(),
start.elapsed()
);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(3),
"catch-up took {elapsed:?} — suggests Phase-1 is stalling on Acks"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
#[test]
fn test_protocol_version_mismatch_rejected() {
use armdb::fixed_replication::protocol::{
FixedMessageType, SyncRequest, decode_error, read_frame, write_frame,
};
use std::io::BufReader;
use std::net::TcpStream;
let leader_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let mut stream = TcpStream::connect(bind_addr).expect("connect to leader");
stream.set_nodelay(true).ok();
let mut reader = BufReader::new(stream.try_clone().expect("clone"));
let bogus = SyncRequest {
shard_id: 0,
protocol_version: 99,
flags: 0,
};
write_frame(&mut stream, &bogus.encode()).expect("write frame");
let frame = read_frame(&mut reader).expect("read error frame");
assert_eq!(frame.msg_type, FixedMessageType::Error);
let msg = decode_error(&frame.payload);
assert!(
msg.contains("protocol version"),
"unexpected error message: {msg}"
);
stop_leader.shutdown();
drop(server);
}
#[test]
fn fixed_reconnect_restores_streaming_after_caught_up() {
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let leader_dir = TempDir::new().unwrap();
let follower_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let stop_first = ShutdownSignal::new();
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let first = FixedReplicationClient::start(bind_addr, target.clone(), stop_first.clone())
.expect("start first client");
std::thread::sleep(Duration::from_millis(200));
stop_first.shutdown();
drop(first);
std::thread::sleep(Duration::from_millis(200));
let stop_second = ShutdownSignal::new();
let second = FixedReplicationClient::start(bind_addr, target, stop_second.clone())
.expect("start second client");
std::thread::sleep(Duration::from_millis(200));
leader.put(&kbe(99), &vbuf(99)).unwrap();
assert!(
wait_for(Duration::from_secs(5), || follower.get(&kbe(99))
== Some(vbuf(99))),
"second follower did not receive streaming event after reconnect"
);
stop_second.shutdown();
drop(second);
stop_leader.shutdown();
drop(server);
}
#[test]
fn fixed_second_concurrent_follower_is_rejected_and_first_keeps_streaming() {
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let leader_dir = TempDir::new().unwrap();
let follower1_dir = TempDir::new().unwrap();
let follower2_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
let follower1 = Arc::new(
FixedTree::<K, VLEN>::open(follower1_dir.path(), test_config(2)).expect("open follower1"),
);
let follower2 = Arc::new(
FixedTree::<K, VLEN>::open(follower2_dir.path(), test_config(2)).expect("open follower2"),
);
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
std::thread::sleep(Duration::from_millis(50));
let stop1 = ShutdownSignal::new();
let target1: Arc<dyn FixedReplicationTarget> = follower1.clone();
let client1 =
FixedReplicationClient::start(bind_addr, target1, stop1.clone()).expect("start client1");
std::thread::sleep(Duration::from_millis(200));
let stop2 = ShutdownSignal::new();
let target2: Arc<dyn FixedReplicationTarget> = follower2.clone();
let client2 =
FixedReplicationClient::start(bind_addr, target2, stop2.clone()).expect("start client2");
std::thread::sleep(Duration::from_millis(300));
leader.put(&kbe(55), &vbuf(55)).unwrap();
assert!(
wait_for(Duration::from_secs(5), || follower1.get(&kbe(55))
== Some(vbuf(55))),
"first follower did not receive streaming event while second was rejected"
);
stop2.shutdown();
drop(client2);
stop1.shutdown();
drop(client1);
stop_leader.shutdown();
drop(server);
}
#[test]
fn fixed_phase1_invalid_occupied_sends_reset_for_non_empty_follower() {
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let leader_dir = TempDir::new().unwrap();
let follower_dir = TempDir::new().unwrap();
{
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
let old_key = kbe(1);
let old_value = vbuf(10);
leader.put(&old_key, &old_value).unwrap();
}
corrupt_first_slot_value(leader_dir.path());
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(1)).expect("open leader"),
);
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(1)).expect("open follower"),
);
let stale_key = kbe(1);
let stale_value = vbuf(99);
follower.put(&stale_key, &stale_value).unwrap();
assert_eq!(follower.get(&stale_key), Some(stale_value));
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.unwrap();
let stop_follower = ShutdownSignal::new();
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
let client = FixedReplicationClient::start(bind_addr, target, stop_follower.clone()).unwrap();
assert!(
wait_for(Duration::from_secs(5), || follower
.get(&stale_key)
.is_none()),
"follower stale mapping was not cleared after reset event"
);
stop_follower.shutdown();
drop(client);
stop_leader.shutdown();
drop(server);
}
use std::path::Path;
fn corrupt_first_slot_value(root: &Path) {
use std::io::{Read, Seek, SeekFrom, Write};
let data_path = root.join("shard_000").join("fixed.data");
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(data_path)
.unwrap();
let offset = 4096 + 4 + 8; file.seek(SeekFrom::Start(offset)).unwrap();
let mut byte = [0u8; 1];
file.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
file.seek(SeekFrom::Start(offset)).unwrap();
file.write_all(&byte).unwrap();
file.sync_data().unwrap();
}
#[test]
fn fixed_reconnect_returns_consumer_multiple_times() {
let port = pick_port();
let bind_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let leader_dir = TempDir::new().unwrap();
let follower_dir = TempDir::new().unwrap();
let leader = Arc::new(
FixedTree::<K, VLEN>::open(leader_dir.path(), test_config(2)).expect("open leader"),
);
let follower = Arc::new(
FixedTree::<K, VLEN>::open(follower_dir.path(), test_config(2)).expect("open follower"),
);
let stop_leader = ShutdownSignal::new();
let server =
FixedReplicationServer::start(bind_addr, leader.fixed_engine_access(), stop_leader.clone())
.expect("start server");
let target: Arc<dyn FixedReplicationTarget> = follower.clone();
for i in 0..3u64 {
let stop = ShutdownSignal::new();
let client =
FixedReplicationClient::start(bind_addr, target.clone(), stop.clone()).unwrap();
std::thread::sleep(Duration::from_millis(100));
stop.shutdown();
drop(client);
std::thread::sleep(Duration::from_millis(100));
let key = kbe(10_000 + i);
let value = vbuf(20_000 + i);
leader.put(&key, &value).unwrap();
let stop = ShutdownSignal::new();
let client =
FixedReplicationClient::start(bind_addr, target.clone(), stop.clone()).unwrap();
assert!(
wait_for(Duration::from_secs(5), || follower.get(&key) == Some(value)),
"reconnect iteration {i} did not receive streaming updates"
);
stop.shutdown();
drop(client);
std::thread::sleep(Duration::from_millis(100));
}
stop_leader.shutdown();
drop(server);
}