raft-io 0.3.0

Raft consensus and replicated-log engine for Rust. Leader election, log replication, membership changes, and snapshotting over a pluggable transport and a pluggable log store. The consensus layer above wal-db and the coordination substrate for Hive DB clustering.
Documentation
//! Replication-safety tests over an adversarial, deterministic cluster.
//!
//! Where `safety.rs` checks the election layer, this checks the heart of Raft:
//! that committed log entries never diverge, however hostile the network. The
//! harness models a network that can **reorder**, **drop**, **duplicate**, and
//! **partition** messages, and a schedule of ticks, proposals, and deliveries is
//! generated by `proptest`. After every step it asserts the two observable
//! safety properties:
//!
//! - **State Machine Safety / Log Matching** — if any two nodes apply an entry
//!   at the same index, it is the *same* command. A committed entry, once
//!   observed, is never contradicted by another node, across leader changes and
//!   partitions.
//! - **Apply ordering** — each node applies entries in strict index order with
//!   no gaps.
//!
//! Because the core is deterministic, any counterexample `proptest` finds is
//! replayable exactly from its seed.

#![allow(clippy::unwrap_used, clippy::expect_used)]

use std::collections::BTreeMap;

use proptest::prelude::*;
use raft_io::{Action, Event, Message, NodeId, RaftConfig, RaftNode};

/// An in-flight message: who sent it, who it is for, and the payload.
type InFlight = (NodeId, NodeId, Message);

/// A cluster plus a controllable network and the bookkeeping needed to assert
/// safety after every step.
struct Cluster {
    nodes: Vec<RaftNode>,
    /// Messages currently traversing the network, in no guaranteed order.
    wire: Vec<InFlight>,
    /// Optional partition: `side[i]` places node `i` in one of two groups;
    /// messages crossing the boundary are dropped on delivery.
    side: Option<Vec<bool>>,
    /// Every command applied by each node, in order, for the apply-ordering check.
    applied: Vec<Vec<Vec<u8>>>,
    /// The agreed command at each committed index, for the agreement check.
    committed: BTreeMap<u64, Vec<u8>>,
    /// Monotonic source of distinct proposal payloads.
    next_value: u64,
}

impl Cluster {
    fn new(n: usize) -> Self {
        let ids: Vec<NodeId> = (0..n as NodeId).collect();
        let nodes = ids
            .iter()
            .map(|&id| {
                let cfg = RaftConfig::new(id, ids.clone())
                    .with_election_timeout(10, 20)
                    .with_heartbeat_interval(3)
                    .with_max_batch(8)
                    .with_seed(0x5000 + id);
                RaftNode::new(cfg)
            })
            .collect();
        Self {
            nodes,
            wire: Vec::new(),
            side: None,
            applied: vec![Vec::new(); n],
            committed: BTreeMap::new(),
            next_value: 1,
        }
    }

    fn n(&self) -> usize {
        self.nodes.len()
    }

    /// Two nodes can exchange messages unless a partition separates them.
    fn connected(&self, a: NodeId, b: NodeId) -> bool {
        match &self.side {
            None => true,
            Some(side) => side[a as usize] == side[b as usize],
        }
    }

    /// Records applies emitted by `node_idx` and enforces the safety invariants.
    fn absorb(&mut self, node_idx: usize, actions: Vec<Action>) {
        let from = self.nodes[node_idx].id();
        for action in actions {
            match action {
                Action::Send { to, message } => self.wire.push((from, to, message)),
                Action::Apply { index, command, .. } => {
                    // Apply ordering: strictly sequential, no gaps.
                    let log = &mut self.applied[node_idx];
                    assert_eq!(
                        index as usize,
                        log.len() + 1,
                        "node {from} applied index {index} out of order (had {})",
                        log.len()
                    );
                    log.push(command.clone());
                    // Agreement: same command at a given committed index everywhere.
                    match self.committed.get(&index) {
                        Some(existing) => assert_eq!(
                            *existing, command,
                            "divergent committed entry at index {index}"
                        ),
                        None => {
                            let _ = self.committed.insert(index, command);
                        }
                    }
                }
                _ => {}
            }
        }
    }

    fn tick(&mut self, node_idx: usize) {
        let actions = self.nodes[node_idx].step(Event::Tick).expect("tick");
        self.absorb(node_idx, actions);
    }

    /// Proposes a fresh value to whichever node currently believes it leads.
    fn propose(&mut self) {
        let leader = self.nodes.iter().position(RaftNode::is_leader);
        if let Some(i) = leader {
            let value = self.next_value.to_be_bytes().to_vec();
            self.next_value += 1;
            if let Ok(actions) = self.nodes[i].step(Event::Propose(value)) {
                self.absorb(i, actions);
            }
        }
    }

    /// Delivers the in-flight message at `pick % len`, dropping it if a partition
    /// separates sender and receiver.
    fn deliver(&mut self, pick: usize, duplicate: bool) {
        if self.wire.is_empty() {
            return;
        }
        let idx = pick % self.wire.len();
        let (from, to, message) = if duplicate {
            self.wire[idx].clone()
        } else {
            self.wire.remove(idx)
        };
        if !self.connected(from, to) {
            return; // partitioned: silently dropped
        }
        let target = to as usize;
        let actions = self.nodes[target]
            .step(Event::Message(message))
            .expect("message handling never fails in memory");
        self.absorb(target, actions);
    }

    fn drop_msg(&mut self, pick: usize) {
        if !self.wire.is_empty() {
            let idx = pick % self.wire.len();
            let _ = self.wire.remove(idx);
        }
    }

    /// Partitions the cluster into two groups by a bitmask over node indices.
    fn partition(&mut self, mask: u32) {
        let n = self.n();
        // Force a non-trivial split so the test actually exercises a partition.
        let side: Vec<bool> = (0..n).map(|i| (mask >> i) & 1 == 1).collect();
        if side.iter().all(|&s| s) || side.iter().all(|&s| !s) {
            self.side = None;
        } else {
            self.side = Some(side);
        }
    }

    fn heal(&mut self) {
        self.side = None;
    }

    /// Runs a fair round of ticks and full mailbox drains until the network is
    /// quiet, used to settle the cluster after a schedule.
    fn settle(&mut self, rounds: usize) {
        for _ in 0..rounds {
            for i in 0..self.n() {
                self.tick(i);
            }
            // Drain the wire fully.
            let mut guard = 0;
            while !self.wire.is_empty() && guard < 10_000 {
                self.deliver(0, false);
                guard += 1;
            }
        }
    }

    fn leaders(&self) -> usize {
        self.nodes.iter().filter(|n| n.is_leader()).count()
    }
}

#[derive(Clone, Copy, Debug)]
enum Op {
    Tick(u8),
    Propose,
    Deliver(u16),
    Duplicate(u16),
    Drop(u16),
    Partition(u32),
    Heal,
}

fn op_strategy() -> impl Strategy<Value = Op> {
    prop_oneof![
        4 => any::<u8>().prop_map(Op::Tick),
        3 => Just(Op::Propose),
        6 => any::<u16>().prop_map(Op::Deliver),
        1 => any::<u16>().prop_map(Op::Duplicate),
        1 => any::<u16>().prop_map(Op::Drop),
        1 => any::<u32>().prop_map(Op::Partition),
        1 => Just(Op::Heal),
    ]
}

fn run(n: usize, ops: Vec<Op>) {
    let mut cluster = Cluster::new(n);
    for op in ops {
        match op {
            Op::Tick(s) => cluster.tick(s as usize % n),
            Op::Propose => cluster.propose(),
            Op::Deliver(s) => cluster.deliver(s as usize, false),
            Op::Duplicate(s) => cluster.deliver(s as usize, true),
            Op::Drop(s) => cluster.drop_msg(s as usize),
            Op::Partition(m) => cluster.partition(m),
            Op::Heal => cluster.heal(),
        }
    }
}

proptest! {
    #![proptest_config(ProptestConfig::with_cases(192))]

    /// Three nodes never diverge on a committed entry, under any adversarial
    /// schedule of ticks, proposals, reorderings, drops, duplicates, and
    /// partitions. Safety is asserted continuously inside the harness.
    #[test]
    fn replication_safety_three_nodes(ops in prop::collection::vec(op_strategy(), 0..500)) {
        run(3, ops);
    }

    /// The same on a five-node cluster.
    #[test]
    fn replication_safety_five_nodes(ops in prop::collection::vec(op_strategy(), 0..700)) {
        run(5, ops);
    }
}

/// A healthy cluster replicates and commits a batch of proposals on every node.
#[test]
fn cluster_replicates_and_commits_everywhere() {
    for n in [3usize, 5] {
        let mut cluster = Cluster::new(n);
        // Elect a leader.
        cluster.settle(30);
        assert_eq!(
            cluster.leaders(),
            1,
            "{n}-node cluster should have one leader"
        );

        // Propose a run of commands and let them replicate.
        for _ in 0..20 {
            cluster.propose();
            cluster.settle(3);
        }
        cluster.settle(30);

        // Every committed index agrees, and a quorum has applied the entries.
        let max_applied = cluster.applied.iter().map(Vec::len).max().unwrap();
        assert!(
            max_applied >= 20,
            "{n}-node cluster committed too little: {max_applied}"
        );
        let agree = cluster
            .applied
            .iter()
            .filter(|log| log.len() == max_applied)
            .count();
        assert!(agree > n / 2, "a quorum should have the full log");
    }
}

/// A minority partition cannot commit; the majority side keeps making progress;
/// healing reconciles the isolated node.
#[test]
fn partition_minority_stalls_majority_progresses_then_heals() {
    let mut cluster = Cluster::new(5);
    cluster.settle(40);
    assert_eq!(cluster.leaders(), 1);

    // Isolate nodes {3,4} (a minority) from {0,1,2} (a majority).
    cluster.partition(0b11000);

    // The majority side still elects/keeps a leader and commits.
    for _ in 0..30 {
        cluster.propose();
        cluster.settle(4);
    }
    let committed_during_partition = cluster.committed.len();
    assert!(
        committed_during_partition > 0,
        "the majority partition must keep committing"
    );

    // Heal and let the isolated minority catch up.
    cluster.heal();
    cluster.settle(80);

    // The two best-stocked nodes must agree entry-for-entry (no divergence).
    let mut logs: Vec<&Vec<Vec<u8>>> = cluster.applied.iter().collect();
    logs.sort_by_key(|l| std::cmp::Reverse(l.len()));
    let shortest = logs[0].len().min(logs[1].len());
    assert_eq!(
        logs[0][..shortest],
        logs[1][..shortest],
        "logs diverged after heal"
    );
}