use std::time::Duration;
use tracing::Instrument;
use crate::Config;
use crate::RaftTypeConfig;
use crate::StepDownPolicy;
use crate::async_runtime::MpscSender;
use crate::async_runtime::MpscWeakSender;
use crate::async_runtime::watch::WatchReceiver;
use crate::core::ServerState;
use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::MpscWeakSenderOf;
use crate::type_config::alias::WatchReceiverOf;
pub(crate) struct StepDownWatcher<C>
where C: RaftTypeConfig
{
rx_server_metrics: WatchReceiverOf<C, RaftServerMetrics<C>>,
rx_metrics: WatchReceiverOf<C, RaftMetrics<C>>,
tx_api: MpscWeakSenderOf<C, RaftMsg<C>>,
step_down_delay: Duration,
transfer_wait: Duration,
}
impl<C> StepDownWatcher<C>
where C: RaftTypeConfig
{
pub(crate) fn spawn(
rx_server_metrics: WatchReceiverOf<C, RaftServerMetrics<C>>,
rx_metrics: WatchReceiverOf<C, RaftMetrics<C>>,
tx_api: MpscWeakSenderOf<C, RaftMsg<C>>,
config: &Config,
) {
let delay = match config.removed_leader_step_down {
StepDownPolicy::Never => return,
StepDownPolicy::After(ms) => ms,
};
let this = Self {
rx_server_metrics,
rx_metrics,
tx_api,
step_down_delay: Duration::from_millis(delay),
transfer_wait: Duration::from_millis(config.heartbeat_interval),
};
let span = tracing::debug_span!("step_down_watcher");
let watch_fut = this.watch_loop().instrument(span);
let _join_handle = C::spawn(watch_fut);
}
async fn watch_loop(mut self) {
loop {
self.try_step_down().await;
let recv_res = self.rx_server_metrics.changed().await;
if recv_res.is_err() {
tracing::info!("RaftCore terminated, quit step_down_watcher");
return;
}
}
}
async fn try_step_down(&self) {
let server_metrics = self.rx_server_metrics.borrow_watched().clone();
if !is_removed_leader(&server_metrics) {
return;
}
C::sleep(self.step_down_delay).await;
let server_metrics = self.rx_server_metrics.borrow_watched().clone();
if !is_removed_leader(&server_metrics) {
return;
}
let metrics = self.rx_metrics.borrow_watched().clone();
let target = transfer_target(&metrics);
if let Some(to) = target {
tracing::info!("removed Leader steps down: transfer leadership to {}", to);
self.send(ExternalCommand::TriggerTransferLeader { to }).await;
}
C::sleep(self.transfer_wait).await;
debug_assert!(
server_metrics.membership_config.log_id().is_some(),
"the membership config of a Leader must have a log id"
);
let cmd = ExternalCommand::RefreshServerState {
vote: Some(server_metrics.vote.clone()),
membership_log_id: server_metrics.membership_config.log_id().clone(),
};
self.send(cmd).await;
}
async fn send(&self, cmd: ExternalCommand<C>) {
let Some(tx_api) = self.tx_api.upgrade() else {
return;
};
let _ = tx_api.send(RaftMsg::ExternalCommand { cmd }).await;
}
}
fn is_removed_leader<C>(metrics: &RaftServerMetrics<C>) -> bool
where C: RaftTypeConfig {
if metrics.state != ServerState::Leader {
return false;
}
if metrics.membership_config.membership().contains(&metrics.id) {
return false;
}
metrics.committed_membership_config == metrics.membership_config
}
fn transfer_target<C>(metrics: &RaftMetrics<C>) -> Option<C::NodeId>
where C: RaftTypeConfig {
let replication = metrics.replication.as_ref()?;
let membership = metrics.membership_config.membership();
let matching = |id: &C::NodeId| replication.get(id).cloned().flatten();
membership.voter_ids().max_by_key(matching)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use maplit::btreemap;
use maplit::btreeset;
use super::is_removed_leader;
use super::transfer_target;
use crate::Membership;
use crate::core::ServerState;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::type_config::alias::StoredMembershipOf;
fn m23() -> Membership<u64, ()> {
Membership::new_with_defaults(vec![btreeset! {2,3}], [])
}
fn stored(log_index: u64, mem: Membership<u64, ()>) -> Arc<StoredMembershipOf<UTConfig>> {
Arc::new(StoredMembershipOf::<UTConfig>::new(Some(log_id(2, 1, log_index)), mem))
}
fn server_metrics(mem: Membership<u64, ()>) -> RaftServerMetrics<UTConfig> {
let mut m = RaftServerMetrics::new_initial(1);
m.state = ServerState::Leader;
m.membership_config = stored(3, mem.clone());
m.committed_membership_config = stored(3, mem);
m
}
#[test]
fn test_is_removed_leader_not_leader() {
let mut m = server_metrics(m23());
m.state = ServerState::Follower;
assert!(!is_removed_leader(&m));
}
#[test]
fn test_is_removed_leader_voter_leader() {
let m = server_metrics(Membership::new_with_defaults(vec![btreeset! {1,2,3}], []));
assert!(!is_removed_leader(&m));
}
#[test]
fn test_is_removed_leader_learner_leader() {
let m = server_metrics(Membership::new_with_defaults(vec![btreeset! {2,3}], btreeset! {1}));
assert!(!is_removed_leader(&m));
}
#[test]
fn test_is_removed_leader_uncommitted_membership() {
let mut m = server_metrics(m23());
m.committed_membership_config = stored(1, Membership::new_with_defaults(vec![btreeset! {1,2}], []));
assert!(!is_removed_leader(&m));
}
#[test]
fn test_is_removed_leader_committed_membership() {
let m = server_metrics(m23());
assert!(is_removed_leader(&m));
}
#[test]
fn test_transfer_target_not_leader() {
let m = RaftMetrics::<UTConfig>::new_initial(1);
assert_eq!(None, transfer_target(&m));
}
#[test]
fn test_transfer_target_greatest_matching() {
let mut m = RaftMetrics::<UTConfig>::new_initial(1);
m.membership_config = stored(3, m23());
m.replication = Some(btreemap! {
2u64 => Some(log_id(2, 1, 2)),
3u64 => Some(log_id(2, 1, 3)),
});
assert_eq!(Some(3), transfer_target(&m));
m.replication = Some(btreemap! {
2u64 => Some(log_id(2, 1, 3)),
3u64 => None,
});
assert_eq!(Some(2), transfer_target(&m));
}
}