openraft 0.10.0-alpha.18

Advanced Raft consensus
Documentation
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;

use crate::Membership;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::Vote;
use crate::async_runtime::WatchSender;
use crate::core::ServerState;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::log_id::LogIdOptionExt;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::StoredMembershipOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::alias::WatchSenderOf;
use crate::vote::raft_vote::RaftVoteExt;

/// Test wait for different state changes
#[test]
fn test_wait() {
    UTConfig::<()>::run(async {
        {
            // wait for leader
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.current_leader = Some(3);
                let rst = tx.send(update);
                assert!(rst.is_ok());
            });
            let got = w.current_leader(3, "leader").await?;
            h.await?;
            assert_eq!(Some(3), got.current_leader);
        }

        {
            // wait for applied log
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.last_log_index = Some(3);
                update.last_applied = Some(log_id(1, 0, 3));
                let rst = tx.send(update);
                assert!(rst.is_ok());
            });
            let got = w.applied_index(Some(3), "log").await?;
            let got_least2 = w.applied_index_at_least(Some(2), "log").await?;
            let got_least3 = w.applied_index_at_least(Some(3), "log").await?;
            let got_least4 = w.applied_index_at_least(Some(4), "log").await;
            h.await?;

            assert_eq!(Some(3), got.last_log_index);
            assert_eq!(Some(3), got.last_applied.index());
            assert_eq!(Some(3), got_least2.last_log_index);
            assert_eq!(Some(3), got_least2.last_applied.index());
            assert_eq!(Some(3), got_least3.last_log_index);
            assert_eq!(Some(3), got_least3.last_applied.index());

            assert!(got_least4.is_err());
        }

        {
            // wait for state
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.state = ServerState::Leader;
                let rst = tx.send(update);
                assert!(rst.is_ok());
            });
            let got = w.state(ServerState::Leader, "state").await?;
            h.await?;

            assert_eq!(ServerState::Leader, got.state);
        }

        {
            // wait for members
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.membership_config = Arc::new(StoredMembershipOf::<UTConfig>::new(
                    None,
                    Membership::new_with_defaults(vec![btreeset! {1,2}], [3]),
                ));
                let rst = tx.send(update);
                assert!(rst.is_ok());
            });
            let got = w.voter_ids([1, 2], "members").await?;
            h.await?;

            assert_eq!(
                btreeset![1, 2],
                got.membership_config.membership().get_joint_config().first().unwrap().clone()
            );
        }

        tracing::info!("--- wait for snapshot, Ok");
        {
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.snapshot = Some(log_id(1, 0, 2));
                let rst = tx.send(update);
                assert!(rst.is_ok());
            });
            let got = w.snapshot(log_id(1, 0, 2), "snapshot").await?;
            h.await?;

            assert_eq!(Some(log_id(1, 0, 2)), got.snapshot);
        }

        tracing::info!("--- wait for snapshot, only index matches");
        {
            let (init, w, tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(10)).await;
                let mut update = init.clone();
                update.snapshot = Some(log_id(3, 0, 2));
                let rst = tx.send(update);
                assert!(rst.is_ok());
                // delay otherwise the channel will be closed thus the error is shutdown.
                UTConfig::<()>::sleep(Duration::from_millis(200)).await;
            });
            let got = w.snapshot(log_id(1, 0, 2), "snapshot").await;
            h.await?;
            match got.unwrap_err() {
                WaitError::Timeout(t, _) => {
                    assert_eq!(Duration::from_millis(100), t);
                }
                _ => {
                    panic!("expect WaitError::Timeout");
                }
            }
        }

        {
            // timeout
            let (_init, w, _tx) = init_wait_test::<UTConfig>();

            let h = UTConfig::<()>::spawn(async move {
                UTConfig::<()>::sleep(Duration::from_millis(200)).await;
            });
            let got = w.state(ServerState::Follower, "timeout").await;
            h.await?;

            match got.unwrap_err() {
                WaitError::Timeout(t, _) => {
                    assert_eq!(Duration::from_millis(100), t);
                }
                _ => {
                    panic!("expect WaitError::Timeout");
                }
            }
        }

        Ok::<(), anyhow::Error>(())
    })
    .unwrap();
}

#[test]
fn test_wait_log_index() {
    UTConfig::<()>::run(async {
        // wait for applied log
        let (init, w, tx) = init_wait_test::<UTConfig>();

        let h = UTConfig::<()>::spawn(async move {
            UTConfig::<()>::sleep(Duration::from_millis(10)).await;
            let mut update = init.clone();
            update.last_log_index = Some(3);
            let rst = tx.send(update);
            assert!(rst.is_ok());
        });

        let got = w.log_index(Some(3), "log").await?;
        let got_least2 = w.log_index_at_least(Some(2), "log").await?;
        let got_least3 = w.log_index_at_least(Some(3), "log").await?;
        let got_least4 = w.log_index_at_least(Some(4), "log").await;
        h.await?;

        assert_eq!(Some(3), got.last_log_index);
        assert_eq!(Some(3), got_least2.last_log_index);
        assert_eq!(Some(3), got_least3.last_log_index);

        assert!(got_least4.is_err());

        Ok::<(), anyhow::Error>(())
    })
    .unwrap();
}

#[test]
fn test_wait_vote() {
    UTConfig::<()>::run(async {
        let (init, w, tx) = init_wait_test::<UTConfig>();

        let h = UTConfig::<()>::spawn(async move {
            UTConfig::<()>::sleep(Duration::from_millis(10)).await;
            let mut update = init.clone();
            update.vote = Vote::new_committed(1, 2);
            let rst = tx.send(update);
            assert!(rst.is_ok());
        });

        // timeout
        let res = w.vote(Vote::new(1, 2), "vote").await;
        assert!(res.is_err());

        let got = w.vote(Vote::new_committed(1, 2), "vote").await?;
        h.await?;
        assert_eq!(Vote::new_committed(1, 2), got.vote);

        Ok::<(), anyhow::Error>(())
    })
    .unwrap();
}

#[test]
fn test_wait_purged() {
    UTConfig::<()>::run(async {
        let (init, w, tx) = init_wait_test::<UTConfig>();

        let h = UTConfig::<()>::spawn(async move {
            UTConfig::<()>::sleep(Duration::from_millis(10)).await;
            let mut update = init.clone();
            update.purged = Some(log_id(1, 2, 3));
            let rst = tx.send(update);
            assert!(rst.is_ok());
        });
        let got = w.purged(Some(log_id(1, 2, 3)), "purged").await?;
        h.await?;
        assert_eq!(Some(log_id(1, 2, 3)), got.purged);

        Ok::<(), anyhow::Error>(())
    })
    .unwrap();
}

#[test]
fn test_wait_committed_index() {
    UTConfig::<()>::run(async {
        let (init, w, tx) = init_wait_test::<UTConfig>();

        let h = UTConfig::<()>::spawn(async move {
            UTConfig::<()>::sleep(Duration::from_millis(10)).await;
            let mut update = init.clone();
            update.committed = Some(log_id(1, 0, 3));
            let rst = tx.send(update);
            assert!(rst.is_ok());
        });

        let got = w.committed_index(Some(3), "committed").await?;
        let got_least2 = w.committed_index_at_least(Some(2), "committed").await?;
        let got_least3 = w.committed_index_at_least(Some(3), "committed").await?;
        let got_least4 = w.committed_index_at_least(Some(4), "committed").await;
        h.await?;

        assert_eq!(Some(3), got.committed.index());
        assert_eq!(Some(3), got_least2.committed.index());
        assert_eq!(Some(3), got_least3.committed.index());

        assert!(got_least4.is_err());

        Ok::<(), anyhow::Error>(())
    })
    .unwrap();
}

pub(crate) type InitResult<C> = (RaftMetrics<C>, Wait<C>, WatchSenderOf<C, RaftMetrics<C>>);

/// Build a initial state for testing of Wait:
/// Returns init metrics, Wait, and the tx to send an updated metrics.
fn init_wait_test<C>() -> InitResult<C>
where C: RaftTypeConfig<NodeId = u64> {
    #[allow(deprecated)]
    let init = RaftMetrics {
        running_state: Ok(()),
        id: NodeIdOf::<C>::default(),
        state: ServerState::Learner,
        current_term: Default::default(),
        vote: VoteOf::<C>::new_with_default_term(0),
        last_log_index: None,
        committed: None,
        last_applied: None,
        purged: None,

        #[cfg(feature = "metrics-logids")]
        log_id_list: Default::default(),

        current_leader: None,
        millis_since_quorum_ack: None,
        last_quorum_acked: None,
        membership_config: Arc::new(StoredMembershipOf::<C>::new(None, Membership::default())),
        heartbeat: None,

        snapshot: None,
        replication: None,
    };
    let (tx, rx) = C::watch_channel(init.clone());
    let w = Wait {
        timeout: Duration::from_millis(100),
        rx,
    };

    (init, w, tx)
}