use std::sync::Arc;
use nodedb_types::NodeId;
use tokio::sync::Notify;
use tracing::debug;
use crate::swim::member::MemberState;
use crate::swim::subscriber::MembershipSubscriber;
pub struct RebalancerKickHook {
kick: Arc<Notify>,
}
impl RebalancerKickHook {
pub fn new(kick: Arc<Notify>) -> Self {
Self { kick }
}
}
impl MembershipSubscriber for RebalancerKickHook {
fn on_state_change(&self, node_id: &NodeId, old: Option<MemberState>, new: MemberState) {
let relevant = match (old, new) {
(None, MemberState::Alive) => true,
(_, MemberState::Dead) | (_, MemberState::Left) => true,
(Some(MemberState::Dead), MemberState::Alive)
| (Some(MemberState::Left), MemberState::Alive) => true,
_ => false,
};
if relevant {
debug!(?node_id, ?old, ?new, "rebalancer kick: membership change");
self.kick.notify_one();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
fn counting_notify() -> (Arc<Notify>, Arc<AtomicU32>, tokio::task::JoinHandle<()>) {
let notify = Arc::new(Notify::new());
let counter = Arc::new(AtomicU32::new(0));
let n = notify.clone();
let c = counter.clone();
let handle = tokio::spawn(async move {
loop {
n.notified().await;
c.fetch_add(1, Ordering::SeqCst);
}
});
(notify, counter, handle)
}
#[tokio::test]
async fn kick_fires_on_new_node_alive() {
let (notify, counter, handle) = counting_notify();
let hook = RebalancerKickHook::new(notify);
hook.on_state_change(&NodeId::new("new"), None, MemberState::Alive);
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(counter.load(Ordering::SeqCst) >= 1);
handle.abort();
}
#[tokio::test]
async fn kick_fires_on_dead() {
let (notify, counter, handle) = counting_notify();
let hook = RebalancerKickHook::new(notify);
hook.on_state_change(
&NodeId::new("x"),
Some(MemberState::Alive),
MemberState::Dead,
);
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(counter.load(Ordering::SeqCst) >= 1);
handle.abort();
}
#[tokio::test]
async fn kick_fires_on_left() {
let (notify, counter, handle) = counting_notify();
let hook = RebalancerKickHook::new(notify);
hook.on_state_change(
&NodeId::new("x"),
Some(MemberState::Alive),
MemberState::Left,
);
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert!(counter.load(Ordering::SeqCst) >= 1);
handle.abort();
}
#[test]
fn kick_does_not_fire_on_suspect() {
let notify = Arc::new(Notify::new());
let hook = RebalancerKickHook::new(notify);
hook.on_state_change(
&NodeId::new("x"),
Some(MemberState::Alive),
MemberState::Suspect,
);
}
}