mod fixtures;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use futures::prelude::*;
use maplit::hashset;
use tokio::time::sleep;
use fixtures::RaftRouter;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn client_writes() -> Result<()> {
fixtures::init_tracing();
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;
router.new_raft_node(1).await;
router.new_raft_node(2).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
let leader = router.leader().await.expect("leader not found");
let mut clients = futures::stream::FuturesUnordered::new();
clients.push(router.client_request_many(leader, "0", 1000));
clients.push(router.client_request_many(leader, "1", 1000));
clients.push(router.client_request_many(leader, "2", 1000));
clients.push(router.client_request_many(leader, "3", 1000));
clients.push(router.client_request_many(leader, "4", 1000));
clients.push(router.client_request_many(leader, "5", 1000));
while clients.next().await.is_some() {}
sleep(Duration::from_secs(5)).await; router.assert_stable_cluster(Some(1), Some(6001)).await; router
.assert_storage_state(
1,
6001,
Some(0),
6001,
Some((
(5000..5100).into(),
1,
MembershipConfig {
members: hashset![0, 1, 2],
members_after_consensus: None,
},
)),
)
.await;
Ok(())
}