use d_engine_proto::common::LeaderInfo;
use tokio::sync::watch;
pub struct LeaderNotifier {
tx: watch::Sender<Option<LeaderInfo>>,
_rx: watch::Receiver<Option<LeaderInfo>>,
}
impl LeaderNotifier {
pub fn new() -> Self {
let (tx, rx) = watch::channel(None);
Self { tx, _rx: rx }
}
pub fn sender(&self) -> watch::Sender<Option<LeaderInfo>> {
self.tx.clone()
}
pub fn subscribe(&self) -> watch::Receiver<Option<LeaderInfo>> {
self.tx.subscribe()
}
}
impl Default for LeaderNotifier {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_leader_notifier_basic() {
let notifier = LeaderNotifier::new();
let sender = notifier.sender();
let mut rx = notifier.subscribe();
assert!(rx.borrow().is_none());
let leader_info = LeaderInfo {
leader_id: 1,
term: 5,
};
sender.send(Some(leader_info)).unwrap();
rx.changed().await.unwrap();
let received = *rx.borrow();
assert_eq!(received.unwrap().leader_id, 1);
assert_eq!(received.unwrap().term, 5);
}
#[tokio::test]
async fn test_multiple_subscribers() {
let notifier = LeaderNotifier::new();
let sender = notifier.sender();
let mut rx1 = notifier.subscribe();
let mut rx2 = notifier.subscribe();
let leader_info = LeaderInfo {
leader_id: 2,
term: 10,
};
sender.send(Some(leader_info)).unwrap();
rx1.changed().await.unwrap();
rx2.changed().await.unwrap();
assert_eq!(rx1.borrow().unwrap().leader_id, 2);
assert_eq!(rx2.borrow().unwrap().leader_id, 2);
}
}