use std::{sync::Arc, time::Duration};
use tokio::sync::watch::{self, Receiver, Sender};
use crate::RUNTIME;
#[derive(Clone, Debug)]
pub(crate) struct TopologyMessageManager {
topology_check_requester: Arc<Sender<()>>,
topology_check_listener: Receiver<()>,
topology_change_notifier: Arc<Sender<()>>,
topology_change_listener: Receiver<()>,
}
impl TopologyMessageManager {
pub(super) fn new() -> Self {
let (topology_check_requester, topology_check_listener) = watch::channel(());
let (topology_change_notifier, topology_change_listener) = watch::channel(());
Self {
topology_check_requester: Arc::new(topology_check_requester),
topology_check_listener,
topology_change_notifier: Arc::new(topology_change_notifier),
topology_change_listener,
}
}
pub(super) fn request_topology_check(&self) {
let _ = self.topology_check_requester.broadcast(());
}
pub(super) fn notify_topology_changed(&self) {
let _ = self.topology_change_notifier.broadcast(());
}
pub(super) async fn wait_for_topology_check_request(&self, timeout: Duration) -> bool {
let mut listener = self.topology_check_listener.clone();
wait_for_notification(&mut listener, timeout).await
}
pub(crate) async fn wait_for_topology_change(&self, timeout: Duration) -> bool {
let mut listener = self.topology_change_listener.clone();
let _ = listener.recv().await;
wait_for_notification(&mut listener, timeout).await
}
}
async fn wait_for_notification(receiver: &mut Receiver<()>, timeout: Duration) -> bool {
RUNTIME.timeout(timeout, receiver.recv()).await.is_ok()
}