use futures::Stream;
use openraft::async_runtime::watch::WatchReceiver;
use openraft::storage::RaftStateMachine;
use openraft::type_config::alias::WatchReceiverOf;
use openraft::{Raft, RaftMetrics, RaftTypeConfig, ServerState};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeadershipState<C: RaftTypeConfig> {
Leader {
term: u64,
},
Follower {
term: u64,
leader: Option<(C::NodeId, C::Node)>,
},
Candidate {
term: u64,
},
Learner,
Shutdown,
}
pub fn leadership_events<C, SM>(raft: &Raft<C, SM>) -> impl Stream<Item = LeadershipState<C>>
where
C: RaftTypeConfig,
SM: RaftStateMachine<C>,
{
stream_from_receiver::<C>(raft.metrics())
}
#[doc(hidden)]
pub fn stream_from_receiver<C: RaftTypeConfig>(
rx: WatchReceiverOf<C, RaftMetrics<C>>,
) -> impl Stream<Item = LeadershipState<C>> {
let init: (
WatchReceiverOf<C, RaftMetrics<C>>,
Option<LeadershipState<C>>,
) = (rx, None);
futures::stream::unfold(init, |(mut rx, mut last)| async move {
loop {
let projected: LeadershipState<C> = {
let snap = rx.borrow_watched();
project_state::<C>(&snap)
};
if last.as_ref().map(|l| l != &projected).unwrap_or(true) {
last = Some(projected.clone());
return Some((projected, (rx, last)));
}
if rx.changed().await.is_err() {
return None;
}
}
})
}
fn project_state<C: RaftTypeConfig>(m: &RaftMetrics<C>) -> LeadershipState<C> {
use openraft::vote::RaftTerm;
let term = m.current_term.as_u64().unwrap_or(0);
match m.state {
ServerState::Leader => LeadershipState::Leader { term },
ServerState::Candidate => LeadershipState::Candidate { term },
ServerState::Follower => LeadershipState::Follower {
term,
leader: resolve_leader::<C>(m),
},
ServerState::Learner => LeadershipState::Learner,
ServerState::Shutdown => LeadershipState::Shutdown,
}
}
fn resolve_leader<C: RaftTypeConfig>(m: &RaftMetrics<C>) -> Option<(C::NodeId, C::Node)> {
let leader_id = m.current_leader.clone()?;
m.membership_config
.nodes()
.find(|(id, _)| *id == &leader_id)
.map(|(id, node)| (id.clone(), node.clone()))
}