async-raft 0.6.1

An async implementation of the Raft distributed consensus protocol.
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::{Config, State};
use maplit::hashset;
use tokio::time::sleep;

use fixtures::RaftRouter;

/// Client write tests.
///
/// What does this test do?
///
/// - create a stable 2-node cluster.
/// - starts a config change which adds two new nodes and removes the leader.
/// - the leader should commit the change to C0 & C1 with separate majorities and then stepdown
///   after the config change is committed.
///
/// RUST_LOG=async_raft,memstore,stepdown=trace cargo test -p async-raft --test stepdown
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn stepdown() -> Result<()> {
    fixtures::init_tracing();

    // Setup test dependencies.
    let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
    let router = Arc::new(RaftRouter::new(config.clone()));
    router.new_raft_node(0).await;
    router.new_raft_node(1).await;

    // Assert all nodes are in non-voter state & have no entries.
    sleep(Duration::from_secs(3)).await;
    router.assert_pristine_cluster().await;

    // Initialize the cluster, then assert that a stable cluster was formed & held.
    tracing::info!("--- initializing cluster");
    router.initialize_from_single_node(0).await?;
    sleep(Duration::from_secs(3)).await;
    router.assert_stable_cluster(Some(1), Some(1)).await;

    // Submit a config change which adds two new nodes and removes the current leader.
    let orig_leader = router.leader().await.expect("expected the cluster to have a leader");
    assert_eq!(0, orig_leader, "expected original leader to be node 0");
    router.new_raft_node(2).await;
    router.new_raft_node(3).await;
    router.change_membership(orig_leader, hashset![1, 2, 3]).await?;
    sleep(Duration::from_secs(5)).await; // Give time for step down metrics to flow through.

    // Assert on the state of the old leader.
    {
        let metrics = router
            .latest_metrics()
            .await
            .into_iter()
            .find(|node| node.id == 0)
            .expect("expected to find metrics on original leader node");
        let cfg = metrics.membership_config;
        assert!(metrics.state != State::Leader, "expected old leader to have stepped down");
        assert_eq!(
            metrics.current_term, 1,
            "expected old leader to still be in first term, got {}",
            metrics.current_term
        );
        assert_eq!(
            metrics.last_log_index, 3,
            "expected old leader to have last log index of 3, got {}",
            metrics.last_log_index
        );
        assert_eq!(
            metrics.last_applied, 3,
            "expected old leader to have last applied of 3, got {}",
            metrics.last_applied
        );
        assert_eq!(
            cfg.members,
            hashset![1, 2, 3],
            "expected old leader to have membership of [1, 2, 3], got {:?}",
            cfg.members
        );
        assert!(cfg.members_after_consensus.is_none(), "expected old leader to be out of joint consensus");
    }

    // Assert that the current cluster is stable.
    let _ = router.remove_node(0).await;
    sleep(Duration::from_secs(5)).await; // Give time for a new leader to be elected.
    router.assert_stable_cluster(Some(2), Some(4)).await;
    router.assert_storage_state(2, 4, None, 0, None).await;
    // ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders
    //                                         and config change entries are never applied to the state machine.

    Ok(())
}