use std::{sync::Arc, time::Duration};
use tokio::sync::watch::{self, Receiver, Sender};
use crate::RUNTIME;
#[derive(Clone, Debug)]
pub(crate) struct TopologyMessageManager {
topology_check_requester: Arc<Sender<()>>,
topology_check_listener: Receiver<()>,
topology_change_notifier: Arc<Sender<()>>,
topology_change_listener: Receiver<()>,
}
impl TopologyMessageManager {
pub(super) fn new() -> Self {
let (topology_check_requester, topology_check_listener) = watch::channel(());
let (topology_change_notifier, topology_change_listener) = watch::channel(());
Self {
topology_check_requester: Arc::new(topology_check_requester),
topology_check_listener,
topology_change_notifier: Arc::new(topology_change_notifier),
topology_change_listener,
}
}
pub(super) fn request_topology_check(&self) {
let _ = self.topology_check_requester.broadcast(());
}
pub(super) fn notify_topology_changed(&self) {
let _ = self.topology_change_notifier.broadcast(());
}
pub(super) async fn subscribe_to_topology_check_requests(&self) -> TopologyMessageSubscriber {
TopologyMessageSubscriber::new(&self.topology_check_listener).await
}
pub(super) async fn subscribe_to_topology_changes(&self) -> TopologyMessageSubscriber {
TopologyMessageSubscriber::new(&self.topology_change_listener).await
}
}
pub(crate) struct TopologyMessageSubscriber {
receiver: Receiver<()>,
}
impl TopologyMessageSubscriber {
async fn new(receiver: &Receiver<()>) -> Self {
let mut receiver = receiver.clone();
receiver.recv().await;
Self { receiver }
}
pub(crate) async fn wait_for_message(&mut self, timeout: Duration) -> bool {
RUNTIME.timeout(timeout, self.receiver.recv()).await.is_ok()
}
}