use std::sync::Arc;
use std::time::Duration;
use maplit::btreeset;
use crate::Membership;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::Vote;
use crate::async_runtime::WatchSender;
use crate::core::ServerState;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::log_id::LogIdOptionExt;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::StoredMembershipOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::alias::WatchSenderOf;
use crate::vote::raft_vote::RaftVoteExt;
#[test]
fn test_wait() {
UTConfig::<()>::run(async {
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.current_leader = Some(3);
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.current_leader(3, "leader").await?;
h.await?;
assert_eq!(Some(3), got.current_leader);
}
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.last_log_index = Some(3);
update.last_applied = Some(log_id(1, 0, 3));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.applied_index(Some(3), "log").await?;
let got_least2 = w.applied_index_at_least(Some(2), "log").await?;
let got_least3 = w.applied_index_at_least(Some(3), "log").await?;
let got_least4 = w.applied_index_at_least(Some(4), "log").await;
h.await?;
assert_eq!(Some(3), got.last_log_index);
assert_eq!(Some(3), got.last_applied.index());
assert_eq!(Some(3), got_least2.last_log_index);
assert_eq!(Some(3), got_least2.last_applied.index());
assert_eq!(Some(3), got_least3.last_log_index);
assert_eq!(Some(3), got_least3.last_applied.index());
assert!(got_least4.is_err());
}
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.state = ServerState::Leader;
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.state(ServerState::Leader, "state").await?;
h.await?;
assert_eq!(ServerState::Leader, got.state);
}
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.membership_config = Arc::new(StoredMembershipOf::<UTConfig>::new(
None,
Membership::new_with_defaults(vec![btreeset! {1,2}], [3]),
));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.voter_ids([1, 2], "members").await?;
h.await?;
assert_eq!(
btreeset![1, 2],
got.membership_config.membership().get_joint_config().first().unwrap().clone()
);
}
tracing::info!("--- wait for snapshot, Ok");
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.snapshot = Some(log_id(1, 0, 2));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.snapshot(log_id(1, 0, 2), "snapshot").await?;
h.await?;
assert_eq!(Some(log_id(1, 0, 2)), got.snapshot);
}
tracing::info!("--- wait for snapshot, only index matches");
{
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.snapshot = Some(log_id(3, 0, 2));
let rst = tx.send(update);
assert!(rst.is_ok());
UTConfig::<()>::sleep(Duration::from_millis(200)).await;
});
let got = w.snapshot(log_id(1, 0, 2), "snapshot").await;
h.await?;
match got.unwrap_err() {
WaitError::Timeout(t, _) => {
assert_eq!(Duration::from_millis(100), t);
}
_ => {
panic!("expect WaitError::Timeout");
}
}
}
{
let (_init, w, _tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(200)).await;
});
let got = w.state(ServerState::Follower, "timeout").await;
h.await?;
match got.unwrap_err() {
WaitError::Timeout(t, _) => {
assert_eq!(Duration::from_millis(100), t);
}
_ => {
panic!("expect WaitError::Timeout");
}
}
}
Ok::<(), anyhow::Error>(())
})
.unwrap();
}
#[test]
fn test_wait_log_index() {
UTConfig::<()>::run(async {
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.last_log_index = Some(3);
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.log_index(Some(3), "log").await?;
let got_least2 = w.log_index_at_least(Some(2), "log").await?;
let got_least3 = w.log_index_at_least(Some(3), "log").await?;
let got_least4 = w.log_index_at_least(Some(4), "log").await;
h.await?;
assert_eq!(Some(3), got.last_log_index);
assert_eq!(Some(3), got_least2.last_log_index);
assert_eq!(Some(3), got_least3.last_log_index);
assert!(got_least4.is_err());
Ok::<(), anyhow::Error>(())
})
.unwrap();
}
#[test]
fn test_wait_vote() {
UTConfig::<()>::run(async {
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.vote = Vote::new_committed(1, 2);
let rst = tx.send(update);
assert!(rst.is_ok());
});
let res = w.vote(Vote::new(1, 2), "vote").await;
assert!(res.is_err());
let got = w.vote(Vote::new_committed(1, 2), "vote").await?;
h.await?;
assert_eq!(Vote::new_committed(1, 2), got.vote);
Ok::<(), anyhow::Error>(())
})
.unwrap();
}
#[test]
fn test_wait_purged() {
UTConfig::<()>::run(async {
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.purged = Some(log_id(1, 2, 3));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.purged(Some(log_id(1, 2, 3)), "purged").await?;
h.await?;
assert_eq!(Some(log_id(1, 2, 3)), got.purged);
Ok::<(), anyhow::Error>(())
})
.unwrap();
}
#[test]
fn test_wait_committed_index() {
UTConfig::<()>::run(async {
let (init, w, tx) = init_wait_test::<UTConfig>();
let h = UTConfig::<()>::spawn(async move {
UTConfig::<()>::sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.committed = Some(log_id(1, 0, 3));
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.committed_index(Some(3), "committed").await?;
let got_least2 = w.committed_index_at_least(Some(2), "committed").await?;
let got_least3 = w.committed_index_at_least(Some(3), "committed").await?;
let got_least4 = w.committed_index_at_least(Some(4), "committed").await;
h.await?;
assert_eq!(Some(3), got.committed.index());
assert_eq!(Some(3), got_least2.committed.index());
assert_eq!(Some(3), got_least3.committed.index());
assert!(got_least4.is_err());
Ok::<(), anyhow::Error>(())
})
.unwrap();
}
pub(crate) type InitResult<C> = (RaftMetrics<C>, Wait<C>, WatchSenderOf<C, RaftMetrics<C>>);
fn init_wait_test<C>() -> InitResult<C>
where C: RaftTypeConfig<NodeId = u64> {
#[allow(deprecated)]
let init = RaftMetrics {
running_state: Ok(()),
id: NodeIdOf::<C>::default(),
state: ServerState::Learner,
current_term: Default::default(),
vote: VoteOf::<C>::new_with_default_term(0),
last_log_index: None,
committed: None,
last_applied: None,
purged: None,
#[cfg(feature = "metrics-logids")]
log_id_list: Default::default(),
current_leader: None,
millis_since_quorum_ack: None,
last_quorum_acked: None,
membership_config: Arc::new(StoredMembershipOf::<C>::new(None, Membership::default())),
heartbeat: None,
snapshot: None,
replication: None,
};
let (tx, rx) = C::watch_channel(init.clone());
let w = Wait {
timeout: Duration::from_millis(100),
rx,
};
(init, w, tx)
}