use std::time::Duration;
use tokio::sync::broadcast::{self, Receiver, Sender};
use crate::RUNTIME;
#[derive(Clone, Debug)]
pub(crate) struct TopologyMessageManager {
topology_check_requester: Sender<()>,
topology_change_notifier: Sender<()>,
}
impl TopologyMessageManager {
pub(super) fn new() -> Self {
let (topology_check_requester, _) = broadcast::channel(1);
let (topology_change_notifier, _) = broadcast::channel(1);
Self {
topology_check_requester,
topology_change_notifier,
}
}
pub(super) fn request_topology_check(&self) {
let _: Result<_, _> = self.topology_check_requester.send(());
}
pub(super) fn notify_topology_changed(&self) {
let _: Result<_, _> = self.topology_change_notifier.send(());
}
pub(super) fn subscribe_to_topology_check_requests(&self) -> TopologyMessageSubscriber {
TopologyMessageSubscriber::new(self.topology_check_requester.subscribe())
}
pub(super) fn subscribe_to_topology_changes(&self) -> TopologyMessageSubscriber {
TopologyMessageSubscriber::new(self.topology_change_notifier.subscribe())
}
}
pub(crate) struct TopologyMessageSubscriber {
receiver: Receiver<()>,
}
impl TopologyMessageSubscriber {
fn new(receiver: Receiver<()>) -> Self {
Self { receiver }
}
pub(crate) async fn wait_for_message(&mut self, timeout: Duration) -> bool {
RUNTIME.timeout(timeout, self.receiver.recv()).await.is_ok()
}
}