mod fixtures;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::{Config, SnapshotPolicy};
use maplit::hashset;
use tokio::time::sleep;
use fixtures::RaftRouter;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn compaction() -> Result<()> {
fixtures::init_tracing();
let config = Arc::new(
Config::build("test".into())
.snapshot_policy(SnapshotPolicy::LogsSinceLast(500))
.validate()
.expect("failed to build Raft config"),
);
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;
sleep(Duration::from_secs(10)).await;
router.assert_pristine_cluster().await;
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
router.client_request_many(0, "0", 499).await; sleep(Duration::from_secs(5)).await; router.assert_stable_cluster(Some(1), Some(500)).await;
router
.assert_storage_state(
1,
500,
Some(0),
500,
Some((
500.into(),
1,
MembershipConfig {
members: hashset![0],
members_after_consensus: None,
},
)),
)
.await;
router.new_raft_node(1).await;
router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter");
router
.change_membership(0, hashset![0, 1])
.await
.expect("failed to modify cluster membership");
sleep(Duration::from_secs(5)).await; router.assert_stable_cluster(Some(1), Some(502)).await; let expected_snap = Some((
500.into(),
1,
MembershipConfig {
members: hashset![0u64],
members_after_consensus: None,
},
));
router.assert_storage_state(1, 502, None, 500, expected_snap).await;
Ok(())
}