#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::BTreeMap;
use proptest::prelude::*;
use raft_io::{Action, Event, Message, NodeId, RaftConfig, RaftNode};
type InFlight = (NodeId, NodeId, Message);
struct Cluster {
nodes: Vec<RaftNode>,
wire: Vec<InFlight>,
side: Option<Vec<bool>>,
applied: Vec<Vec<Vec<u8>>>,
committed: BTreeMap<u64, Vec<u8>>,
next_value: u64,
}
impl Cluster {
fn new(n: usize) -> Self {
let ids: Vec<NodeId> = (0..n as NodeId).collect();
let nodes = ids
.iter()
.map(|&id| {
let cfg = RaftConfig::new(id, ids.clone())
.with_election_timeout(10, 20)
.with_heartbeat_interval(3)
.with_max_batch(8)
.with_seed(0x5000 + id);
RaftNode::new(cfg)
})
.collect();
Self {
nodes,
wire: Vec::new(),
side: None,
applied: vec![Vec::new(); n],
committed: BTreeMap::new(),
next_value: 1,
}
}
fn n(&self) -> usize {
self.nodes.len()
}
fn connected(&self, a: NodeId, b: NodeId) -> bool {
match &self.side {
None => true,
Some(side) => side[a as usize] == side[b as usize],
}
}
fn absorb(&mut self, node_idx: usize, actions: Vec<Action>) {
let from = self.nodes[node_idx].id();
for action in actions {
match action {
Action::Send { to, message } => self.wire.push((from, to, message)),
Action::Apply { index, command, .. } => {
let log = &mut self.applied[node_idx];
assert_eq!(
index as usize,
log.len() + 1,
"node {from} applied index {index} out of order (had {})",
log.len()
);
log.push(command.clone());
match self.committed.get(&index) {
Some(existing) => assert_eq!(
*existing, command,
"divergent committed entry at index {index}"
),
None => {
let _ = self.committed.insert(index, command);
}
}
}
_ => {}
}
}
}
fn tick(&mut self, node_idx: usize) {
let actions = self.nodes[node_idx].step(Event::Tick).expect("tick");
self.absorb(node_idx, actions);
}
fn propose(&mut self) {
let leader = self.nodes.iter().position(RaftNode::is_leader);
if let Some(i) = leader {
let value = self.next_value.to_be_bytes().to_vec();
self.next_value += 1;
if let Ok(actions) = self.nodes[i].step(Event::Propose(value)) {
self.absorb(i, actions);
}
}
}
fn deliver(&mut self, pick: usize, duplicate: bool) {
if self.wire.is_empty() {
return;
}
let idx = pick % self.wire.len();
let (from, to, message) = if duplicate {
self.wire[idx].clone()
} else {
self.wire.remove(idx)
};
if !self.connected(from, to) {
return; }
let target = to as usize;
let actions = self.nodes[target]
.step(Event::Message(message))
.expect("message handling never fails in memory");
self.absorb(target, actions);
}
fn drop_msg(&mut self, pick: usize) {
if !self.wire.is_empty() {
let idx = pick % self.wire.len();
let _ = self.wire.remove(idx);
}
}
fn partition(&mut self, mask: u32) {
let n = self.n();
let side: Vec<bool> = (0..n).map(|i| (mask >> i) & 1 == 1).collect();
if side.iter().all(|&s| s) || side.iter().all(|&s| !s) {
self.side = None;
} else {
self.side = Some(side);
}
}
fn heal(&mut self) {
self.side = None;
}
fn settle(&mut self, rounds: usize) {
for _ in 0..rounds {
for i in 0..self.n() {
self.tick(i);
}
let mut guard = 0;
while !self.wire.is_empty() && guard < 10_000 {
self.deliver(0, false);
guard += 1;
}
}
}
fn leaders(&self) -> usize {
self.nodes.iter().filter(|n| n.is_leader()).count()
}
}
#[derive(Clone, Copy, Debug)]
enum Op {
Tick(u8),
Propose,
Deliver(u16),
Duplicate(u16),
Drop(u16),
Partition(u32),
Heal,
}
fn op_strategy() -> impl Strategy<Value = Op> {
prop_oneof![
4 => any::<u8>().prop_map(Op::Tick),
3 => Just(Op::Propose),
6 => any::<u16>().prop_map(Op::Deliver),
1 => any::<u16>().prop_map(Op::Duplicate),
1 => any::<u16>().prop_map(Op::Drop),
1 => any::<u32>().prop_map(Op::Partition),
1 => Just(Op::Heal),
]
}
fn run(n: usize, ops: Vec<Op>) {
let mut cluster = Cluster::new(n);
for op in ops {
match op {
Op::Tick(s) => cluster.tick(s as usize % n),
Op::Propose => cluster.propose(),
Op::Deliver(s) => cluster.deliver(s as usize, false),
Op::Duplicate(s) => cluster.deliver(s as usize, true),
Op::Drop(s) => cluster.drop_msg(s as usize),
Op::Partition(m) => cluster.partition(m),
Op::Heal => cluster.heal(),
}
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(192))]
#[test]
fn replication_safety_three_nodes(ops in prop::collection::vec(op_strategy(), 0..500)) {
run(3, ops);
}
#[test]
fn replication_safety_five_nodes(ops in prop::collection::vec(op_strategy(), 0..700)) {
run(5, ops);
}
}
#[test]
fn cluster_replicates_and_commits_everywhere() {
for n in [3usize, 5] {
let mut cluster = Cluster::new(n);
cluster.settle(30);
assert_eq!(
cluster.leaders(),
1,
"{n}-node cluster should have one leader"
);
for _ in 0..20 {
cluster.propose();
cluster.settle(3);
}
cluster.settle(30);
let max_applied = cluster.applied.iter().map(Vec::len).max().unwrap();
assert!(
max_applied >= 20,
"{n}-node cluster committed too little: {max_applied}"
);
let agree = cluster
.applied
.iter()
.filter(|log| log.len() == max_applied)
.count();
assert!(agree > n / 2, "a quorum should have the full log");
}
}
#[test]
fn partition_minority_stalls_majority_progresses_then_heals() {
let mut cluster = Cluster::new(5);
cluster.settle(40);
assert_eq!(cluster.leaders(), 1);
cluster.partition(0b11000);
for _ in 0..30 {
cluster.propose();
cluster.settle(4);
}
let committed_during_partition = cluster.committed.len();
assert!(
committed_during_partition > 0,
"the majority partition must keep committing"
);
cluster.heal();
cluster.settle(80);
let mut logs: Vec<&Vec<Vec<u8>>> = cluster.applied.iter().collect();
logs.sort_by_key(|l| std::cmp::Reverse(l.len()));
let shortest = logs[0].len().min(logs[1].len());
assert_eq!(
logs[0][..shortest],
logs[1][..shortest],
"logs diverged after heal"
);
}