#![cfg(feature = "redex")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::channel::ChannelName;
use net::adapter::net::redex::{
PlacementStrategy, Redex, RedexFileConfig, ReplicaRole, ReplicationConfig, TransitionSignal,
};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
handshake_no_start(a, b).await;
a.start();
b.start();
}
async fn handshake_no_start(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id).await.expect("connect");
accept.await.expect("accept task").expect("accept");
}
fn start_all(nodes: &[&Arc<MeshNode>]) {
for n in nodes {
n.start();
}
}
fn cn(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_node_replication_catches_replica_up() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
let name = cn("repl/e2e");
let a_id = node_a.node_id();
let b_id = node_b.node_id();
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_heartbeat_ms(150)
.with_placement(PlacementStrategy::Pinned(vec![a_id, b_id])),
));
let file_a = redex_a.open_file(&name, cfg.clone()).expect("open A");
let file_b = redex_b.open_file(&name, cfg).expect("open B");
assert_eq!(redex_a.replication_runtime_count(), 1);
assert_eq!(redex_b.replication_runtime_count(), 1);
let coord_a = redex_a.replication_coordinator_for(&name).expect("coord A");
let coord_b = redex_b.replication_coordinator_for(&name).expect("coord B");
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.expect("A → Replica");
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.expect("A → Candidate");
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.expect("A → Leader");
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.expect("B → Replica");
assert_eq!(coord_a.role(), ReplicaRole::Leader);
assert_eq!(coord_b.role(), ReplicaRole::Replica);
const N: u64 = 32;
for i in 0..N {
file_a
.append(format!("event-{i}").as_bytes())
.expect("append leader");
}
assert_eq!(file_a.next_seq(), N);
assert_eq!(file_b.next_seq(), 0);
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut last_b_tail = 0u64;
while tokio::time::Instant::now() < deadline {
last_b_tail = file_b.next_seq();
if last_b_tail == N {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(
last_b_tail, N,
"replica did not catch up to leader's tail within 5s (got {last_b_tail}, expected {N})"
);
let events = file_b.read_range(0, N);
assert_eq!(events.len(), N as usize);
for (i, ev) in events.iter().enumerate() {
assert_eq!(ev.entry.seq, i as u64, "event {i} out of order");
assert_eq!(
ev.payload.as_ref(),
format!("event-{i}").as_bytes(),
"event {i} payload mismatch",
);
}
let snap_a = redex_a
.replication_metrics_snapshot()
.expect("snapshot on enabled Redex");
let chan_a = snap_a
.channels
.iter()
.find(|c| c.channel == "repl/e2e")
.expect("channel in snapshot");
assert!(
chan_a.sync_bytes_total > 0,
"leader's sync_bytes_total should bump on SyncResponse ship; got {}",
chan_a.sync_bytes_total
);
assert_eq!(
chan_a.leader_changes_total, 1,
"leader changed exactly once (Idle → Replica → Candidate → Leader)"
);
redex_a.close_file(&name).expect("close A");
redex_b.close_file(&name).expect("close B");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_node_heartbeat_records_believed_leader() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
let name = cn("repl/heartbeat");
let a_id = node_a.node_id();
let b_id = node_b.node_id();
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_heartbeat_ms(150)
.with_placement(PlacementStrategy::Pinned(vec![a_id, b_id])),
));
redex_a.open_file(&name, cfg.clone()).expect("open A");
redex_b.open_file(&name, cfg).expect("open B");
let coord_a = redex_a.replication_coordinator_for(&name).unwrap();
let coord_b = redex_b.replication_coordinator_for(&name).unwrap();
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if coord_b.role() == ReplicaRole::Replica {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(coord_a.role(), ReplicaRole::Leader);
assert_eq!(coord_b.role(), ReplicaRole::Replica);
redex_a.close_file(&name).expect("close A");
redex_b.close_file(&name).expect("close B");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn leader_close_triggers_replica_election_and_promotion() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
let name = cn("repl/failover");
let a_id = node_a.node_id();
let b_id = node_b.node_id();
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_heartbeat_ms(150)
.with_placement(PlacementStrategy::Pinned(vec![a_id, b_id])),
));
redex_a.open_file(&name, cfg.clone()).expect("open A");
redex_b.open_file(&name, cfg).expect("open B");
let coord_a = redex_a.replication_coordinator_for(&name).unwrap();
let coord_b = redex_b.replication_coordinator_for(&name).unwrap();
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
{
let poll_deadline = tokio::time::Instant::now() + Duration::from_secs(3);
let coord_b_ref = &coord_b;
loop {
let snap = redex_b
.replication_metrics_snapshot()
.expect("metrics snapshot");
let observed = snap
.channel(name.as_str())
.map(|c| c.replica_lag_seconds.is_some())
.unwrap_or(false);
if observed {
break;
}
if tokio::time::Instant::now() >= poll_deadline {
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
let _ = coord_b_ref; }
}
redex_a.close_file(&name).expect("close A");
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
let mut final_role = coord_b.role();
while tokio::time::Instant::now() < deadline {
final_role = coord_b.role();
if final_role == ReplicaRole::Leader {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(
final_role,
ReplicaRole::Leader,
"replica failed to win election within 3s after leader silence (final role: {final_role:?})"
);
redex_b.close_file(&name).expect("close B");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn three_node_replication_fans_out_to_every_replica() {
let node_a = build_node().await;
let node_b = build_node().await;
let node_c = build_node().await;
handshake_no_start(&node_a, &node_b).await;
handshake_no_start(&node_a, &node_c).await;
handshake_no_start(&node_b, &node_c).await;
start_all(&[&node_a, &node_b, &node_c]);
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
let redex_c = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
redex_c.enable_replication(node_c.clone());
let name = cn("repl/three_node");
let a_id = node_a.node_id();
let b_id = node_b.node_id();
let c_id = node_c.node_id();
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_factor(3)
.with_heartbeat_ms(150)
.with_placement(PlacementStrategy::Pinned(vec![a_id, b_id, c_id])),
));
let file_a = redex_a.open_file(&name, cfg.clone()).expect("open A");
let file_b = redex_b.open_file(&name, cfg.clone()).expect("open B");
let file_c = redex_c.open_file(&name, cfg).expect("open C");
let coord_a = redex_a.replication_coordinator_for(&name).unwrap();
let coord_b = redex_b.replication_coordinator_for(&name).unwrap();
let coord_c = redex_c.replication_coordinator_for(&name).unwrap();
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_c
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
const N: u64 = 24;
for i in 0..N {
file_a
.append(format!("event-{i}").as_bytes())
.expect("append leader");
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut b_tail = 0u64;
let mut c_tail = 0u64;
while tokio::time::Instant::now() < deadline {
b_tail = file_b.next_seq();
c_tail = file_c.next_seq();
if b_tail == N && c_tail == N {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(
b_tail, N,
"replica B did not catch up (got {b_tail}, expected {N})"
);
assert_eq!(
c_tail, N,
"replica C did not catch up (got {c_tail}, expected {N})"
);
let events_b = file_b.read_range(0, N);
let events_c = file_c.read_range(0, N);
assert_eq!(events_b.len(), N as usize);
assert_eq!(events_c.len(), N as usize);
for i in 0..(N as usize) {
let expected = format!("event-{i}");
assert_eq!(events_b[i].payload.as_ref(), expected.as_bytes());
assert_eq!(events_c[i].payload.as_ref(), expected.as_bytes());
}
redex_a.close_file(&name).expect("close A");
redex_b.close_file(&name).expect("close B");
redex_c.close_file(&name).expect("close C");
}
fn median(mut xs: Vec<Duration>) -> Duration {
xs.sort();
xs[xs.len() / 2]
}
fn time_appends(
file: &net::adapter::net::redex::RedexFile,
n: u64,
payload_size: usize,
) -> Duration {
let payload = vec![0x42u8; payload_size];
let start = std::time::Instant::now();
for _ in 0..n {
file.append(&payload).expect("append failed");
}
start.elapsed()
}
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn replication_overhead_within_30_percent_budget() {
const N: u64 = 50_000;
const PAYLOAD_BYTES: usize = 64;
const TRIALS: usize = 5;
const OVERHEAD_BUDGET: f64 = 1.3;
let baseline_redex = Arc::new(Redex::new());
let baseline_file = baseline_redex
.open_file(&cn("perf/baseline"), RedexFileConfig::default())
.expect("open baseline");
let _ = time_appends(&baseline_file, N / 10, PAYLOAD_BYTES);
let mut baseline_trials = Vec::with_capacity(TRIALS);
for _ in 0..TRIALS {
let r = Redex::new();
let f = r
.open_file(&cn("perf/baseline_trial"), RedexFileConfig::default())
.unwrap();
baseline_trials.push(time_appends(&f, N, PAYLOAD_BYTES));
}
let baseline_median = median(baseline_trials);
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let a_id = node_a.node_id();
let b_id = node_b.node_id();
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_heartbeat_ms(500)
.with_placement(PlacementStrategy::Pinned(vec![a_id, b_id])),
));
let name = cn("perf/replicated");
let file_a = redex_a.open_file(&name, cfg.clone()).expect("open A");
let _file_b = redex_b.open_file(&name, cfg).expect("open B");
let coord_a = redex_a.replication_coordinator_for(&name).unwrap();
let coord_b = redex_b.replication_coordinator_for(&name).unwrap();
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
let _ = time_appends(&file_a, N / 10, PAYLOAD_BYTES);
let mut replicated_trials = Vec::with_capacity(TRIALS);
for _ in 0..TRIALS {
replicated_trials.push(time_appends(&file_a, N, PAYLOAD_BYTES));
}
let replicated_median = median(replicated_trials);
let ratio = replicated_median.as_secs_f64() / baseline_median.as_secs_f64();
eprintln!(
"replication overhead: baseline={:?} replicated={:?} ratio={:.3}x",
baseline_median, replicated_median, ratio
);
assert!(
ratio <= OVERHEAD_BUDGET,
"replication overhead = {:.2}x; Dataforts Phase 2 budget is ≤{}x (≤30% overhead). \
baseline median={:?}, replicated median={:?}",
ratio,
OVERHEAD_BUDGET,
baseline_median,
replicated_median,
);
redex_a.close_file(&name).ok();
redex_b.close_file(&name).ok();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bandwidth_budget_metric_field_is_plumbed() {
let node_a = build_node().await;
let node_b = build_node().await;
handshake(&node_a, &node_b).await;
let redex_a = Arc::new(Redex::new());
let redex_b = Arc::new(Redex::new());
redex_a.enable_replication(node_a.clone());
redex_b.enable_replication(node_b.clone());
let name = cn("perf/bandwidth");
let cfg = RedexFileConfig::default().with_replication(Some(
ReplicationConfig::new()
.with_heartbeat_ms(150)
.with_placement(PlacementStrategy::Pinned(vec![
node_a.node_id(),
node_b.node_id(),
]))
.with_replication_budget_fraction(0.5),
));
let file_a = redex_a.open_file(&name, cfg.clone()).expect("open A");
let _file_b = redex_b.open_file(&name, cfg).expect("open B");
let coord_a = redex_a.replication_coordinator_for(&name).unwrap();
let coord_b = redex_b.replication_coordinator_for(&name).unwrap();
coord_a
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats)
.await
.unwrap();
coord_a
.transition_to(ReplicaRole::Leader, TransitionSignal::ElectionWon)
.await
.unwrap();
coord_b
.transition_to(ReplicaRole::Replica, TransitionSignal::CapabilitySelected)
.await
.unwrap();
for i in 0..256u64 {
file_a.append(format!("bw-{i}").as_bytes()).unwrap();
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if redex_b
.replication_coordinator_for(&name)
.map(|c| c.tail_seq())
.unwrap_or(0)
>= 256
|| _file_b.next_seq() >= 256
{
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let snap = redex_a
.replication_metrics_snapshot()
.expect("snapshot enabled");
let chan = snap
.channels
.iter()
.find(|c| c.channel == "perf/bandwidth")
.expect("channel in snapshot");
assert!(
chan.sync_bytes_total > 0,
"leader shipped at least one SyncResponse",
);
assert_eq!(
chan.under_capacity_total, 0,
"Dataforts Phase 2: bandwidth budget at 0.5×NIC must NOT be \
exceeded under a 256-event burst. under_capacity_total bumping \
means the budget gate let too much through (or this test's \
workload is larger than the placeholder NIC peak's burst \
allowance)."
);
redex_a.close_file(&name).ok();
redex_b.close_file(&name).ok();
}