async-raft 0.6.1

An async implementation of the Raft distributed consensus protocol.
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use futures::prelude::*;
use maplit::hashset;
use tokio::time::sleep;

use fixtures::RaftRouter;

/// Client write tests.
///
/// What does this test do?
///
/// - create a stable 3-node cluster.
/// - write a lot of data to it.
/// - assert that the cluster stayed stable and has all of the expected data.
///
/// RUST_LOG=async_raft,memstore,client_writes=trace cargo test -p async-raft --test client_writes
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn client_writes() -> Result<()> {
    fixtures::init_tracing();

    // Setup test dependencies.
    let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
    let router = Arc::new(RaftRouter::new(config.clone()));
    router.new_raft_node(0).await;
    router.new_raft_node(1).await;
    router.new_raft_node(2).await;

    // Assert all nodes are in non-voter state & have no entries.
    sleep(Duration::from_secs(10)).await;
    router.assert_pristine_cluster().await;

    // Initialize the cluster, then assert that a stable cluster was formed & held.
    tracing::info!("--- initializing cluster");
    router.initialize_from_single_node(0).await?;
    sleep(Duration::from_secs(10)).await;
    router.assert_stable_cluster(Some(1), Some(1)).await;

    // Write a bunch of data and assert that the cluster stayes stable.
    let leader = router.leader().await.expect("leader not found");
    let mut clients = futures::stream::FuturesUnordered::new();
    clients.push(router.client_request_many(leader, "0", 1000));
    clients.push(router.client_request_many(leader, "1", 1000));
    clients.push(router.client_request_many(leader, "2", 1000));
    clients.push(router.client_request_many(leader, "3", 1000));
    clients.push(router.client_request_many(leader, "4", 1000));
    clients.push(router.client_request_many(leader, "5", 1000));
    while clients.next().await.is_some() {}
    sleep(Duration::from_secs(5)).await; // Ensure enough time is given for replication (this is WAY more than enough).
    router.assert_stable_cluster(Some(1), Some(6001)).await; // The extra 1 is from the leader's initial commit entry.
    router
        .assert_storage_state(
            1,
            6001,
            Some(0),
            6001,
            Some((
                (5000..5100).into(),
                1,
                MembershipConfig {
                    members: hashset![0, 1, 2],
                    members_after_consensus: None,
                },
            )),
        )
        .await;

    Ok(())
}