mod fixtures;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_raft::{Config, State};
use maplit::hashset;
use tokio::time::sleep;
use fixtures::RaftRouter;
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn stepdown() -> Result<()> {
fixtures::init_tracing();
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;
router.new_raft_node(1).await;
sleep(Duration::from_secs(3)).await;
router.assert_pristine_cluster().await;
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(3)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
let orig_leader = router.leader().await.expect("expected the cluster to have a leader");
assert_eq!(0, orig_leader, "expected original leader to be node 0");
router.new_raft_node(2).await;
router.new_raft_node(3).await;
router.change_membership(orig_leader, hashset![1, 2, 3]).await?;
sleep(Duration::from_secs(5)).await;
{
let metrics = router
.latest_metrics()
.await
.into_iter()
.find(|node| node.id == 0)
.expect("expected to find metrics on original leader node");
let cfg = metrics.membership_config;
assert!(metrics.state != State::Leader, "expected old leader to have stepped down");
assert_eq!(
metrics.current_term, 1,
"expected old leader to still be in first term, got {}",
metrics.current_term
);
assert_eq!(
metrics.last_log_index, 3,
"expected old leader to have last log index of 3, got {}",
metrics.last_log_index
);
assert_eq!(
metrics.last_applied, 3,
"expected old leader to have last applied of 3, got {}",
metrics.last_applied
);
assert_eq!(
cfg.members,
hashset![1, 2, 3],
"expected old leader to have membership of [1, 2, 3], got {:?}",
cfg.members
);
assert!(cfg.members_after_consensus.is_none(), "expected old leader to be out of joint consensus");
}
let _ = router.remove_node(0).await;
sleep(Duration::from_secs(5)).await; router.assert_stable_cluster(Some(2), Some(4)).await;
router.assert_storage_state(2, 4, None, 0, None).await;
Ok(())
}