use std::collections::BTreeMap;
use tsoracle_openraft_toolkit::BootstrapMode;
mod common;
use common::{TestPeer, TestTypeConfig};
#[test]
fn bootstrap_mode_constructs_in_each_shape() {
let mut members: BTreeMap<u64, TestPeer> = BTreeMap::new();
members.insert(
1,
TestPeer {
addr: "host-1:9000".into(),
},
);
let _fresh: BootstrapMode<TestTypeConfig> = BootstrapMode::Fresh {
initial_members: members,
};
let _reopen: BootstrapMode<TestTypeConfig> = BootstrapMode::Reopen;
let _join: BootstrapMode<TestTypeConfig> = BootstrapMode::Join;
}
#[allow(dead_code)]
fn _bootstrap_signature_compiles<C, SM>(raft: &openraft::Raft<C, SM>, mode: BootstrapMode<C>)
where
C: openraft::RaftTypeConfig,
SM: openraft::storage::RaftStateMachine<C>,
{
let fut = async move { tsoracle_openraft_toolkit::bootstrap(raft, mode).await };
drop(fut);
}
#[allow(dead_code)]
fn _change_membership_signature_compiles<C, SM>(
raft: &openraft::Raft<C, SM>,
voters: std::collections::BTreeSet<C::NodeId>,
) where
C: openraft::RaftTypeConfig,
SM: openraft::storage::RaftStateMachine<C>,
{
let fut =
async move { tsoracle_openraft_toolkit::change_membership(raft, voters, false).await };
drop(fut);
}
#[allow(dead_code)]
fn _add_learner_signature_compiles<C, SM>(
raft: &openraft::Raft<C, SM>,
id: C::NodeId,
node: C::Node,
) where
C: openraft::RaftTypeConfig,
SM: openraft::storage::RaftStateMachine<C>,
{
let fut = async move { tsoracle_openraft_toolkit::add_learner(raft, id, node, false).await };
drop(fut);
}
#[allow(dead_code)]
fn _leadership_events_signature_compiles<C, SM>(
raft: &openraft::Raft<C, SM>,
) -> impl futures::Stream<Item = tsoracle_openraft_toolkit::LeadershipState<C>>
where
C: openraft::RaftTypeConfig,
SM: openraft::storage::RaftStateMachine<C>,
{
tsoracle_openraft_toolkit::leadership_events(raft)
}
#[tokio::test]
async fn leadership_events_emits_initial_state_and_terminates_on_drop() {
use futures::StreamExt;
use openraft::RaftMetrics;
use openraft::type_config::TypeConfigExt;
use tsoracle_openraft_toolkit::LeadershipState;
let metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial state emitted");
assert!(
matches!(
first,
LeadershipState::Follower {
term: 0,
leader: None
}
),
"expected initial Follower {{ term: 0, leader: None }}; got {first:?}",
);
drop(tx);
assert!(
stream.next().await.is_none(),
"stream should terminate when sender drops"
);
}
#[tokio::test]
async fn leadership_events_dedups_repeated_class_until_transition() {
use futures::StreamExt;
use openraft::RaftMetrics;
use openraft::ServerState;
use openraft::WatchSender;
use openraft::type_config::TypeConfigExt;
use tsoracle_openraft_toolkit::LeadershipState;
let initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial");
assert!(
matches!(first, LeadershipState::Follower { .. }),
"got {first:?}"
);
let mut next_follower: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
next_follower.current_term = 1;
tx.send(next_follower).unwrap();
let mut leader_metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
leader_metrics.state = ServerState::Leader;
leader_metrics.current_term = 1;
tx.send(leader_metrics).unwrap();
let next = stream.next().await.expect("transition");
assert!(
matches!(next, LeadershipState::Leader { term: 1 }),
"expected Leader {{ term: 1 }}; got {next:?}",
);
drop(tx);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn leadership_events_projects_candidate_learner_and_shutdown() {
use futures::StreamExt;
use openraft::RaftMetrics;
use openraft::ServerState;
use openraft::type_config::TypeConfigExt;
use tsoracle_openraft_toolkit::LeadershipState;
async fn first_emission(state: ServerState, term: u64) -> LeadershipState<TestTypeConfig> {
let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
metrics.state = state;
metrics.current_term = term;
let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(
rx
)
);
stream.next().await.expect("initial state emitted")
}
assert!(matches!(
first_emission(ServerState::Candidate, 5).await,
LeadershipState::Candidate { term: 5 }
));
assert!(matches!(
first_emission(ServerState::Learner, 9).await,
LeadershipState::Learner
));
assert!(matches!(
first_emission(ServerState::Shutdown, 2).await,
LeadershipState::Shutdown
));
}
#[tokio::test]
async fn leadership_events_resolves_follower_leader_when_in_membership() {
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use futures::StreamExt;
use openraft::Membership;
use openraft::RaftMetrics;
use openraft::StoredMembership;
use openraft::type_config::TypeConfigExt;
use openraft::type_config::alias::StoredMembershipOf;
use tsoracle_openraft_toolkit::LeadershipState;
let peer_2 = TestPeer {
addr: "host-2:9000".into(),
};
let nodes: BTreeMap<u64, TestPeer> = BTreeMap::from([
(
1,
TestPeer {
addr: "host-1:9000".into(),
},
),
(2, peer_2.clone()),
]);
let membership: Membership<u64, TestPeer> =
Membership::new(vec![BTreeSet::from([1u64, 2])], nodes).unwrap();
let stored: StoredMembershipOf<TestTypeConfig> = StoredMembership::new(None, membership);
let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
metrics.current_leader = Some(2);
metrics.membership_config = Arc::new(stored);
let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial state emitted");
match first {
LeadershipState::Follower {
term: 0,
leader: Some((id, node)),
} => {
assert_eq!(id, 2);
assert_eq!(node, peer_2);
}
other => panic!("expected Follower with resolved leader; got {other:?}"),
}
}
#[tokio::test]
async fn leadership_events_emits_leader_after_coalesced_term_change() {
use futures::StreamExt;
use openraft::RaftMetrics;
use openraft::ServerState;
use openraft::WatchSender;
use openraft::type_config::TypeConfigExt;
use tsoracle_openraft_toolkit::LeadershipState;
let mut initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
initial.state = ServerState::Leader;
initial.current_term = 1;
let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial Leader");
assert!(
matches!(first, LeadershipState::Leader { term: 1 }),
"expected initial Leader {{ term: 1 }}; got {first:?}",
);
let mut as_follower: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
as_follower.state = ServerState::Follower;
as_follower.current_term = 2;
tx.send(as_follower).unwrap();
let mut as_leader_again: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
as_leader_again.state = ServerState::Leader;
as_leader_again.current_term = 3;
tx.send(as_leader_again).unwrap();
drop(tx);
let next = stream.next().await;
assert!(
matches!(next, Some(LeadershipState::Leader { term: 3 })),
"expected Leader {{ term: 3 }} after coalesced Follower(2)/Leader(3); \
got {next:?} — a None here means the dedup suppressed the new term, \
which is the bug this test exists to prevent",
);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn leadership_events_suppresses_identical_projection() {
use futures::StreamExt;
use openraft::RaftMetrics;
use openraft::WatchSender;
use openraft::type_config::TypeConfigExt;
use tsoracle_openraft_toolkit::LeadershipState;
let initial: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
let (tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(initial);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial Follower");
assert!(
matches!(
first,
LeadershipState::Follower {
term: 0,
leader: None,
},
),
"expected initial Follower {{ term: 0, leader: None }}; got {first:?}",
);
let identical: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
tx.send(identical).unwrap();
drop(tx);
assert!(
stream.next().await.is_none(),
"identical projection must be suppressed; stream should terminate \
on sender drop without re-emitting",
);
}
#[tokio::test]
async fn leadership_events_drops_follower_leader_when_not_in_membership() {
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use futures::StreamExt;
use openraft::Membership;
use openraft::RaftMetrics;
use openraft::StoredMembership;
use openraft::type_config::TypeConfigExt;
use openraft::type_config::alias::StoredMembershipOf;
use tsoracle_openraft_toolkit::LeadershipState;
let nodes: BTreeMap<u64, TestPeer> = BTreeMap::from([(
1,
TestPeer {
addr: "host-1:9000".into(),
},
)]);
let membership: Membership<u64, TestPeer> =
Membership::new(vec![BTreeSet::from([1u64])], nodes).unwrap();
let stored: StoredMembershipOf<TestTypeConfig> = StoredMembership::new(None, membership);
let mut metrics: RaftMetrics<TestTypeConfig> = RaftMetrics::new_initial(1u64);
metrics.current_leader = Some(99);
metrics.membership_config = Arc::new(stored);
let (_tx, rx) = <TestTypeConfig as TypeConfigExt>::watch_channel(metrics);
let mut stream = std::pin::pin!(
tsoracle_openraft_toolkit::lifecycle::leader::stream_from_receiver::<TestTypeConfig>(rx)
);
let first = stream.next().await.expect("initial state emitted");
assert!(
matches!(
first,
LeadershipState::Follower {
term: 0,
leader: None
}
),
"expected Follower with no resolved leader; got {first:?}",
);
}