#![cfg(feature = "redex")]
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use net::adapter::net::behavior::placement::NodeId;
use net::adapter::net::channel::ChannelName;
use net::adapter::net::redex::{
apply_sync_response, election_outcome, handle_sync_request, tick, ApplyError, ChannelId,
HeartbeatTracker, Inbound, OutboundMessage, Redex, RedexFile, RedexFileConfig, ReplicaRole,
StateTransition, SyncNack, SyncRequestOutcome, TickInputs, TransitionSignal,
};
const DST_CHUNK_MAX_BYTES: u32 = 256 * 1024;
const DST_HEARTBEAT_MS: u64 = 50;
const STEP_DURATION_MS: u64 = DST_HEARTBEAT_MS;
const STEP_BUDGET: usize = 200;
struct VirtualNode {
role: ReplicaRole,
tracker: HeartbeatTracker,
file: RedexFile,
inbox: VecDeque<Inbound>,
killed: bool,
election_thrash_count: u64,
}
impl VirtualNode {
fn new(channel_name: &ChannelName) -> Self {
let redex = Redex::new();
let file = redex
.open_file(channel_name, RedexFileConfig::default())
.expect("open file");
Self {
role: ReplicaRole::Idle,
tracker: HeartbeatTracker::new(DST_HEARTBEAT_MS),
file,
inbox: VecDeque::new(),
killed: false,
election_thrash_count: 0,
}
}
fn tail_seq(&self) -> u64 {
self.file.next_seq()
}
fn force_transition(&mut self, target: ReplicaRole, signal: TransitionSignal) {
let _ = StateTransition::apply(self.role, target, signal)
.expect("DST harness drove an invalid state-machine transition");
self.role = target;
if matches!(signal, TransitionSignal::MissedHeartbeats) {
self.election_thrash_count += 1;
}
}
}
struct VirtualCluster {
nodes: BTreeMap<NodeId, VirtualNode>,
channel_id: ChannelId,
replica_set: Vec<NodeId>,
partitions: HashSet<(NodeId, NodeId)>,
now: Instant,
initial_now: Instant,
pending: VecDeque<(NodeId, NodeId, Inbound)>,
rtt: BTreeMap<(NodeId, NodeId), Duration>,
default_bandwidth_class: net::adapter::net::redex::BandwidthClass,
}
impl VirtualCluster {
fn new(ids: &[NodeId], channel_name: &str) -> Self {
let channel_name = ChannelName::new(channel_name).unwrap();
let channel_id = ChannelId::from_name(&channel_name);
let mut nodes = BTreeMap::new();
for &id in ids {
nodes.insert(id, VirtualNode::new(&channel_name));
}
let mut rtt = BTreeMap::new();
for i in 0..ids.len() {
for j in (i + 1)..ids.len() {
let a = ids[i].min(ids[j]);
let b = ids[i].max(ids[j]);
rtt.insert((a, b), Duration::from_millis(5));
}
}
let now = Instant::now();
Self {
nodes,
channel_id,
replica_set: ids.to_vec(),
partitions: HashSet::new(),
now,
initial_now: now,
pending: VecDeque::new(),
rtt,
default_bandwidth_class: net::adapter::net::redex::BandwidthClass::Foreground,
}
}
#[allow(dead_code)]
fn with_default_bandwidth_class(
mut self,
class: net::adapter::net::redex::BandwidthClass,
) -> Self {
self.default_bandwidth_class = class;
self
}
fn pair(a: NodeId, b: NodeId) -> (NodeId, NodeId) {
(a.min(b), a.max(b))
}
fn partition(&mut self, a: NodeId, b: NodeId) {
self.partitions.insert(Self::pair(a, b));
}
#[allow(dead_code)]
fn heal_partition(&mut self, a: NodeId, b: NodeId) {
self.partitions.remove(&Self::pair(a, b));
}
fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
if self.nodes.get(&a).map(|n| n.killed).unwrap_or(true) {
return true;
}
if self.nodes.get(&b).map(|n| n.killed).unwrap_or(true) {
return true;
}
self.partitions.contains(&Self::pair(a, b))
}
fn kill(&mut self, id: NodeId) {
if let Some(n) = self.nodes.get_mut(&id) {
n.killed = true;
n.inbox.clear();
}
}
fn force_leader(&mut self, id: NodeId) {
let n = self.nodes.get_mut(&id).expect("force_leader: unknown id");
n.force_transition(ReplicaRole::Replica, TransitionSignal::CapabilitySelected);
n.force_transition(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats);
n.force_transition(ReplicaRole::Leader, TransitionSignal::ElectionWon);
}
fn force_replica(&mut self, id: NodeId) {
let n = self.nodes.get_mut(&id).expect("force_replica: unknown id");
n.force_transition(ReplicaRole::Replica, TransitionSignal::CapabilitySelected);
}
fn step(&mut self) {
self.now += Duration::from_millis(STEP_DURATION_MS);
let node_ids: Vec<NodeId> = self.nodes.keys().copied().collect();
for id in &node_ids {
let killed = self.nodes.get(id).map(|n| n.killed).unwrap_or(true);
if killed {
continue;
}
self.tick_node(*id);
}
let mut staged: VecDeque<(NodeId, NodeId, Inbound)> = std::mem::take(&mut self.pending);
while let Some((src, dst, msg)) = staged.pop_front() {
if self.is_partitioned(src, dst) {
continue;
}
if let Some(n) = self.nodes.get_mut(&dst) {
if !n.killed {
n.inbox.push_back(msg);
}
}
}
for id in &node_ids {
let killed = self.nodes.get(id).map(|n| n.killed).unwrap_or(true);
if killed {
continue;
}
self.drain_inbox(*id);
}
}
fn tick_node(&mut self, id: NodeId) {
let (current_role, tail_seq) = {
let n = self.nodes.get(&id).expect("tick: unknown id");
(n.role, n.tail_seq())
};
let outcome = {
let n = self.nodes.get(&id).expect("tick: unknown id");
tick(TickInputs {
self_node_id: id,
current_role,
channel_id: self.channel_id,
tail_seq,
replica_set: &self.replica_set,
tracker: &n.tracker,
wall_clock_ms: self.now.duration_since(self.initial_now).as_millis() as u64,
chunk_max_bytes: DST_CHUNK_MAX_BYTES,
now: self.now,
default_bandwidth_class: self.default_bandwidth_class,
})
};
for msg in outcome.outbound {
match msg {
OutboundMessage::Heartbeat { target, msg } => {
self.pending
.push_back((id, target, Inbound::Heartbeat { from: id, msg }));
}
OutboundMessage::SyncRequest { target, msg } => {
self.pending
.push_back((id, target, Inbound::SyncRequest { from: id, msg }));
}
}
}
if let Some(pt) = outcome.transition {
let n = self.nodes.get_mut(&id).expect("tick: unknown id");
let _ = StateTransition::apply(n.role, pt.target, pt.signal)
.expect("tick produced an invalid transition");
n.role = pt.target;
if matches!(pt.signal, TransitionSignal::MissedHeartbeats) {
n.election_thrash_count += 1;
}
if pt.target == ReplicaRole::Candidate {
let healthy = n.tracker.healthy_peers(self.now);
let rtt_lookup = {
let cluster_rtt = self.rtt.clone();
let partitions = self.partitions.clone();
let killed_set: HashSet<NodeId> = self
.nodes
.iter()
.filter(|(_, n)| n.killed)
.map(|(&id, _)| id)
.collect();
move |peer: NodeId| -> Option<Duration> {
if peer == id {
return Some(Duration::ZERO);
}
if killed_set.contains(&peer) {
return None;
}
let p = (id.min(peer), id.max(peer));
if partitions.contains(&p) {
return None;
}
cluster_rtt.get(&p).copied()
}
};
let elect = election_outcome(id, &self.replica_set, rtt_lookup, |peer| {
peer == id || healthy.contains(&peer)
});
if let Some(pt) = elect {
let n = self.nodes.get_mut(&id).expect("tick: unknown id");
let _ = StateTransition::apply(n.role, pt.target, pt.signal)
.expect("election produced an invalid transition");
n.role = pt.target;
n.tracker.clear_believed_leader();
}
}
}
}
fn drain_inbox(&mut self, id: NodeId) {
loop {
let event = {
let n = self.nodes.get_mut(&id).expect("drain: unknown id");
n.inbox.pop_front()
};
let Some(event) = event else {
break;
};
self.handle_inbound(id, event);
}
}
fn handle_inbound(&mut self, id: NodeId, event: Inbound) {
match event {
Inbound::Heartbeat { from, msg } => {
if msg.channel_id != self.channel_id {
return;
}
let n = self.nodes.get_mut(&id).expect("handle: unknown id");
n.tracker
.record_heartbeat(from, msg.role, msg.tail_seq, self.now);
}
Inbound::SyncRequest { from, msg } => {
let role = self.nodes.get(&id).expect("handle: unknown id").role;
if role != ReplicaRole::Leader {
return;
}
let outcome = {
let n = self.nodes.get(&id).expect("handle: unknown id");
handle_sync_request(&n.file, &msg, self.channel_id)
};
match outcome {
SyncRequestOutcome::Response(resp) => {
self.pending.push_back((
id,
from,
Inbound::SyncResponse {
from: id,
msg: resp,
},
));
}
SyncRequestOutcome::Nack {
error_code,
leader_first_retained_seq,
detail,
} => {
let nack = SyncNack {
channel_id: self.channel_id,
since_seq: msg.since_seq,
error_code,
leader_first_retained_seq,
detail,
request_id: msg.request_id,
};
self.pending.push_back((
id,
from,
Inbound::SyncNack {
from: id,
msg: nack,
},
));
}
}
}
Inbound::SyncResponse { from: _, msg } => {
let role = self.nodes.get(&id).expect("handle: unknown id").role;
if role != ReplicaRole::Replica {
return;
}
let result = {
let n = self.nodes.get(&id).expect("handle: unknown id");
apply_sync_response(&n.file, &msg, self.channel_id)
};
if let Err(ApplyError::GapBeforeChunk { first_seq, .. }) = result {
let n = self.nodes.get_mut(&id).expect("handle: unknown id");
if n.file.skip_to(first_seq).is_ok() {
let _ = apply_sync_response(&n.file, &msg, self.channel_id);
}
}
}
Inbound::SyncNack { .. } => {
}
Inbound::Shutdown => unreachable!("DST harness doesn't queue Shutdown"),
}
}
fn run_until<F: Fn(&Self) -> bool>(&mut self, predicate: F, scenario: &str) -> usize {
for step in 1..=STEP_BUDGET {
self.step();
if predicate(self) {
return step;
}
}
panic!(
"DST scenario {scenario} did not reach predicate within {STEP_BUDGET} steps. \
Final state: {}",
self.snapshot()
);
}
fn snapshot(&self) -> String {
let mut s = String::new();
for (&id, n) in &self.nodes {
use std::fmt::Write;
let _ = write!(
&mut s,
"{:#x}@{:?}(t={},k={}) ",
id,
n.role,
n.tail_seq(),
n.killed
);
}
if !self.partitions.is_empty() {
s.push_str("| parts: ");
for (a, b) in &self.partitions {
use std::fmt::Write;
let _ = write!(&mut s, "({a:#x}↔{b:#x}) ");
}
}
s
}
fn live_leader_count(&self) -> usize {
self.nodes
.values()
.filter(|n| !n.killed && n.role == ReplicaRole::Leader)
.count()
}
}
fn append_on_leader(cluster: &VirtualCluster, leader_id: NodeId, count: u64, prefix: &str) {
let leader = cluster
.nodes
.get(&leader_id)
.expect("append: unknown leader");
assert_eq!(leader.role, ReplicaRole::Leader, "append: not leader");
for i in 0..count {
leader
.file
.append(format!("{prefix}-{i}").as_bytes())
.expect("append failed");
}
}
#[test]
fn three_node_happy_path_replicas_catch_up_to_leader() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/happy");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
append_on_leader(&cluster, a, 16, "evt");
let steps = cluster.run_until(
|cl| {
cl.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 16
&& cl.nodes.get(&c).map(|n| n.tail_seq()).unwrap_or(0) == 16
},
"three_node_happy_path",
);
assert!(
steps < STEP_BUDGET,
"happy-path convergence in {steps} steps (budget {STEP_BUDGET})"
);
}
#[test]
fn no_two_leaders_during_steady_state() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/steady");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
for step in 1..=50 {
cluster.step();
assert_eq!(
cluster.live_leader_count(),
1,
"steady-state must keep exactly one Leader at step {step}; got {} ({})",
cluster.live_leader_count(),
cluster.snapshot(),
);
}
}
#[test]
fn leader_crash_two_node_failover_converges_on_lone_survivor() {
let a = 0x10u64;
let b = 0x20u64;
let mut cluster = VirtualCluster::new(&[a, b], "dst/leader_crash_2node");
cluster.force_leader(a);
cluster.force_replica(b);
for _ in 0..5 {
cluster.step();
}
assert_eq!(cluster.live_leader_count(), 1);
cluster.kill(a);
let steps = cluster.run_until(
|c| c.nodes.get(&b).map(|n| n.role).unwrap_or(ReplicaRole::Idle) == ReplicaRole::Leader,
"leader_crash_failover_2node",
);
assert!(steps < STEP_BUDGET, "failover converged in {steps} steps");
assert_eq!(cluster.live_leader_count(), 1);
}
#[test]
fn symmetric_failover_with_three_survivors_produces_dual_leaders() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/dual_leader_doc");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
for _ in 0..5 {
cluster.step();
}
assert_eq!(cluster.live_leader_count(), 1);
cluster.kill(a);
cluster.run_until(
|cl| {
cl.nodes
.values()
.filter(|n| !n.killed)
.all(|n| n.role != ReplicaRole::Replica && n.role != ReplicaRole::Candidate)
},
"survivors transition past Replica/Candidate",
);
let leaders: Vec<NodeId> = cluster
.nodes
.iter()
.filter(|(_, n)| !n.killed && n.role == ReplicaRole::Leader)
.map(|(&id, _)| id)
.collect();
assert_eq!(
leaders.len(),
2,
"symmetric-RTT failover with 3 survivors produces dual-leader window per plan §4; \
got {} ({})",
leaders.len(),
cluster.snapshot(),
);
assert!(leaders.contains(&b) && leaders.contains(&c));
}
#[test]
fn asymmetric_rtt_failover_converges_when_one_peer_clearly_central() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/central_failover");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.rtt.insert((b.min(c), b.max(c)), Duration::ZERO);
for _ in 0..5 {
cluster.step();
}
cluster.kill(a);
cluster.run_until(
|cl| {
cl.live_leader_count() == 1
&& cl
.nodes
.values()
.filter(|n| !n.killed)
.all(|n| n.role != ReplicaRole::Candidate)
},
"asymmetric: survivors settle to exactly one Leader",
);
assert_eq!(
cluster.live_leader_count(),
1,
"central-peer convergence: exactly one Leader; got {}",
cluster.snapshot(),
);
assert_eq!(cluster.nodes.get(&b).unwrap().role, ReplicaRole::Leader);
assert_eq!(cluster.nodes.get(&c).unwrap().role, ReplicaRole::Replica);
}
#[test]
fn isolated_replica_does_not_advance_tail() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/isolated");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.partition(a, c);
cluster.partition(b, c);
append_on_leader(&cluster, a, 8, "iso");
let steps = cluster.run_until(
|c| c.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 8,
"isolated_replica_b_catches_up",
);
assert!(steps < STEP_BUDGET);
let c_tail = cluster.nodes.get(&c).map(|n| n.tail_seq()).unwrap_or(99);
assert_eq!(
c_tail, 0,
"isolated replica's tail must NOT advance; got {c_tail}"
);
}
#[test]
fn partition_heal_lets_isolated_replica_catch_up() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/heal");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.partition(a, c);
cluster.partition(b, c);
append_on_leader(&cluster, a, 12, "preheal");
cluster.run_until(
|c| c.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 12,
"pre-heal: B catches up",
);
cluster.heal_partition(a, c);
cluster.heal_partition(b, c);
let steps = cluster.run_until(
|c2| c2.nodes.get(&c).map(|n| n.tail_seq()).unwrap_or(0) == 12,
"post-heal: C catches up",
);
assert!(steps < STEP_BUDGET);
}
#[test]
fn divergence_freedom_no_two_replicas_hold_different_payload_at_same_seq() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/divergence");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
append_on_leader(&cluster, a, 24, "div");
cluster.run_until(
|cl| {
cl.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 24
&& cl.nodes.get(&c).map(|n| n.tail_seq()).unwrap_or(0) == 24
},
"divergence-freedom convergence",
);
let leader = cluster.nodes.get(&a).unwrap();
let leader_events = leader.file.read_range(0, 24);
assert_eq!(leader_events.len(), 24);
for replica_id in [b, c] {
let replica = cluster.nodes.get(&replica_id).unwrap();
let events = replica.file.read_range(0, 24);
assert_eq!(
events.len(),
24,
"replica {replica_id:#x} has {} events; leader has 24",
events.len(),
);
for (i, (leader_ev, replica_ev)) in leader_events.iter().zip(events.iter()).enumerate() {
assert_eq!(
leader_ev.entry.seq, replica_ev.entry.seq,
"replica {replica_id:#x} event {i}: seq mismatch"
);
assert_eq!(
leader_ev.payload.as_ref(),
replica_ev.payload.as_ref(),
"replica {replica_id:#x} event {i}: payload bytes differ"
);
}
}
}
#[test]
fn restart_during_sync_replica_resumes_from_local_tail() {
let a = 0x10u64;
let b = 0x20u64;
let mut cluster = VirtualCluster::new(&[a, b], "dst/restart");
cluster.force_leader(a);
cluster.force_replica(b);
append_on_leader(&cluster, a, 5, "pre");
cluster.run_until(
|c| c.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 5,
"B catches up pre-kill",
);
let b_tail_before_kill = cluster.nodes.get(&b).unwrap().tail_seq();
cluster.kill(b);
append_on_leader(&cluster, a, 10, "post-kill");
{
let n = cluster.nodes.get_mut(&b).unwrap();
n.killed = false;
n.tracker = HeartbeatTracker::new(DST_HEARTBEAT_MS);
}
let steps = cluster.run_until(
|c| c.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == b_tail_before_kill + 10,
"B resumes catch-up post-revival",
);
assert!(steps < STEP_BUDGET);
assert_eq!(cluster.nodes.get(&b).unwrap().tail_seq(), 15);
}
#[test]
fn no_two_leaders_within_a_partition_post_failover_with_central_peer() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let d = 0x40u64;
let mut cluster = VirtualCluster::new(&[a, b, c, d], "dst/no_two_leaders_central");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.force_replica(d);
cluster.rtt.insert((b.min(c), b.max(c)), Duration::ZERO);
cluster.rtt.insert((b.min(d), b.max(d)), Duration::ZERO);
for _ in 0..5 {
cluster.step();
}
cluster.kill(a);
cluster.run_until(
|cl| cl.live_leader_count() == 1,
"exactly one leader post-failover (central peer)",
);
for step in 1..=30 {
cluster.step();
assert_eq!(
cluster.live_leader_count(),
1,
"no_two_leaders: state must stay at exactly one Leader at step +{step}: {}",
cluster.snapshot(),
);
}
assert_eq!(cluster.nodes.get(&b).unwrap().role, ReplicaRole::Leader);
}
#[test]
fn wall_clock_ms_is_deterministic_function_of_step_counter() {
fn run_and_capture(channel: &str) -> Vec<u64> {
let a = 0x10u64;
let b = 0x20u64;
let mut cluster = VirtualCluster::new(&[a, b], channel);
cluster.force_leader(a);
cluster.force_replica(b);
let mut emitted_wall_clocks = Vec::new();
for step_idx in 1..=6u64 {
cluster.step();
let leader = cluster.nodes.get(&a).expect("leader present");
let outcome = tick(TickInputs {
self_node_id: a,
current_role: leader.role,
channel_id: cluster.channel_id,
tail_seq: leader.tail_seq(),
replica_set: &cluster.replica_set,
tracker: &leader.tracker,
wall_clock_ms: cluster.now.duration_since(cluster.initial_now).as_millis() as u64,
chunk_max_bytes: DST_CHUNK_MAX_BYTES,
now: cluster.now,
default_bandwidth_class: cluster.default_bandwidth_class,
});
for msg in outcome.outbound {
if let OutboundMessage::Heartbeat { msg, .. } = msg {
emitted_wall_clocks.push((step_idx, msg.wall_clock_ms));
}
}
}
emitted_wall_clocks.into_iter().map(|(_, ms)| ms).collect()
}
let trace_a = run_and_capture("dst/wall_clock_a");
std::thread::sleep(std::time::Duration::from_millis(75));
let trace_b = run_and_capture("dst/wall_clock_b");
assert_eq!(
trace_a, trace_b,
"wall_clock_ms sequence must be deterministic across real-time gaps"
);
for &ms in &trace_a {
assert!(
ms % STEP_DURATION_MS == 0,
"wall_clock_ms {ms} is not a multiple of STEP_DURATION_MS ({STEP_DURATION_MS})"
);
assert!(
ms <= 6 * STEP_DURATION_MS,
"wall_clock_ms {ms} exceeds 6×STEP_DURATION_MS ({})",
6 * STEP_DURATION_MS,
);
}
assert!(!trace_a.is_empty(), "no heartbeats emitted in 6 steps");
}
#[test]
fn election_storm_two_rounds_each_converges_within_hysteresis() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let d = 0x40u64;
let mut cluster = VirtualCluster::new(&[a, b, c, d], "dst/election_storm");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.force_replica(d);
cluster.rtt.insert((b, c), Duration::ZERO);
cluster.rtt.insert((b, d), Duration::ZERO);
for _ in 0..5 {
cluster.step();
}
cluster.kill(a);
cluster.run_until(
|cl| cl.live_leader_count() >= 1,
"storm r1: at least one new leader",
);
assert_eq!(
cluster.nodes.get(&b).unwrap().role,
ReplicaRole::Leader,
"storm r1: B (central peer) must win"
);
for _ in 0..5 {
cluster.step();
}
cluster.kill(b);
cluster.run_until(
|cl| cl.live_leader_count() >= 1,
"storm r2: at least one new leader",
);
let c_role = cluster.nodes.get(&c).unwrap().role;
let d_role = cluster.nodes.get(&d).unwrap().role;
assert!(
c_role != ReplicaRole::Candidate && d_role != ReplicaRole::Candidate,
"storm r2: no node stuck in Candidate (c={c_role:?}, d={d_role:?})"
);
assert!(
c_role == ReplicaRole::Leader || d_role == ReplicaRole::Leader,
"storm r2: at least one survivor is Leader"
);
let total_thrash: u64 = cluster
.nodes
.values()
.map(|n| n.election_thrash_count)
.sum();
assert!(
total_thrash >= 2,
"election_thrash counter must bump on each MissedHeartbeats transition; got {total_thrash}"
);
}
#[test]
fn divergence_freedom_after_partition_heal() {
let a = 0x10u64;
let b = 0x20u64;
let c = 0x30u64;
let mut cluster = VirtualCluster::new(&[a, b, c], "dst/divergence_after_heal");
cluster.force_leader(a);
cluster.force_replica(b);
cluster.force_replica(c);
cluster.partition(a, c);
cluster.partition(b, c);
append_on_leader(&cluster, a, 12, "during-partition");
cluster.run_until(
|cl| cl.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 12,
"B catches up while C is partitioned",
);
assert_eq!(cluster.nodes.get(&c).unwrap().tail_seq(), 0);
cluster.heal_partition(a, c);
cluster.heal_partition(b, c);
cluster.run_until(
|cl| cl.nodes.get(&c).map(|n| n.tail_seq()).unwrap_or(0) == 12,
"C catches up post-heal",
);
let leader = cluster.nodes.get(&a).unwrap();
let leader_events = leader.file.read_range(0, 12);
assert_eq!(leader_events.len(), 12);
for replica_id in [b, c] {
let replica = cluster.nodes.get(&replica_id).unwrap();
let events = replica.file.read_range(0, 12);
assert_eq!(
events.len(),
12,
"post-heal: replica {replica_id:#x} has {} events; leader has 12",
events.len(),
);
for (i, (le, re)) in leader_events.iter().zip(events.iter()).enumerate() {
assert_eq!(
le.entry.seq, re.entry.seq,
"post-heal: replica {replica_id:#x} event {i} seq mismatch"
);
assert_eq!(
le.payload.as_ref(),
re.payload.as_ref(),
"post-heal: replica {replica_id:#x} event {i} payload bytes differ"
);
}
}
}
#[test]
fn divergence_freedom_after_replica_revival() {
let a = 0x10u64;
let b = 0x20u64;
let mut cluster = VirtualCluster::new(&[a, b], "dst/divergence_after_revival");
cluster.force_leader(a);
cluster.force_replica(b);
append_on_leader(&cluster, a, 5, "pre-kill");
cluster.run_until(
|cl| cl.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 5,
"pre-kill convergence",
);
cluster.kill(b);
append_on_leader(&cluster, a, 7, "during-kill");
{
let n = cluster.nodes.get_mut(&b).unwrap();
n.killed = false;
n.tracker = HeartbeatTracker::new(DST_HEARTBEAT_MS);
}
cluster.run_until(
|cl| cl.nodes.get(&b).map(|n| n.tail_seq()).unwrap_or(0) == 12,
"B catches up post-revival",
);
let leader = cluster.nodes.get(&a).unwrap();
let leader_events = leader.file.read_range(0, 12);
let replica = cluster.nodes.get(&b).unwrap();
let replica_events = replica.file.read_range(0, 12);
assert_eq!(leader_events.len(), 12);
assert_eq!(replica_events.len(), 12);
for (i, (le, re)) in leader_events.iter().zip(replica_events.iter()).enumerate() {
assert_eq!(
le.entry.seq, re.entry.seq,
"revival: event {i} seq mismatch"
);
assert_eq!(
le.payload.as_ref(),
re.payload.as_ref(),
"revival: event {i} payload bytes differ"
);
}
}
#[test]
fn dst_emitted_sync_request_carries_configured_bandwidth_class() {
let a: NodeId = 0xA0;
let b: NodeId = 0xB0;
let mut cluster = VirtualCluster::new(&[a, b], "dst-class-plumbing")
.with_default_bandwidth_class(net::adapter::net::redex::BandwidthClass::Background);
let node_a = cluster.nodes.get_mut(&a).unwrap();
node_a.force_transition(ReplicaRole::Replica, TransitionSignal::CapabilitySelected);
node_a.force_transition(ReplicaRole::Candidate, TransitionSignal::MissedHeartbeats);
node_a.force_transition(ReplicaRole::Leader, TransitionSignal::ElectionWon);
for i in 0..5 {
node_a.file.append(format!("evt-{i}").as_bytes()).unwrap();
}
let node_b = cluster.nodes.get_mut(&b).unwrap();
node_b.force_transition(ReplicaRole::Replica, TransitionSignal::CapabilitySelected);
node_b
.tracker
.record_heartbeat(a, ReplicaRole::Leader, 5, cluster.now);
cluster.tick_node(b);
let mut saw_request = false;
while let Some((src, dst, inbound)) = cluster.pending.pop_front() {
if src == b && dst == a {
if let net::adapter::net::redex::Inbound::SyncRequest { msg, .. } = &inbound {
assert_eq!(
msg.class,
net::adapter::net::redex::BandwidthClass::Background,
"DST: emitted SyncRequest must carry the cluster's configured class",
);
saw_request = true;
}
}
}
assert!(
saw_request,
"expected at least one SyncRequest to be staged for delivery"
);
}