use tokio::sync::watch;
use crate::DhtHandle;
#[derive(Debug, Clone)]
pub struct DhtBroadcast {
inner: watch::Sender<Option<DhtHandle>>,
}
impl DhtBroadcast {
#[must_use]
pub fn new(initial: Option<DhtHandle>) -> Self {
let (tx, _rx) = watch::channel(initial);
Self { inner: tx }
}
pub fn replace(&self, new_handle: Option<DhtHandle>) {
let _ = self.inner.send(new_handle);
}
#[must_use]
pub fn borrow(&self) -> watch::Ref<'_, Option<DhtHandle>> {
self.inner.borrow()
}
#[must_use]
pub fn subscribe(&self) -> DhtReceiver {
DhtReceiver {
inner: self.inner.subscribe(),
}
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.inner.receiver_count()
}
}
#[derive(Debug, Clone)]
pub struct DhtReceiver {
inner: watch::Receiver<Option<DhtHandle>>,
}
impl DhtReceiver {
#[must_use]
pub fn current(&self) -> Option<DhtHandle> {
self.inner.borrow().clone()
}
pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
self.inner.changed().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn broadcast_starts_with_initial_none_value() {
let bx = DhtBroadcast::new(None);
let rx = bx.subscribe();
assert!(rx.current().is_none());
}
#[tokio::test]
async fn replace_propagates_to_subscribed_receivers() {
let bx = DhtBroadcast::new(None);
let rx1 = bx.subscribe();
let rx2 = bx.subscribe();
assert!(rx1.current().is_none());
assert!(rx2.current().is_none());
bx.replace(None);
assert!(rx1.current().is_none());
assert!(rx2.current().is_none());
}
#[tokio::test]
async fn receiver_count_tracks_subscriptions() {
let bx = DhtBroadcast::new(None);
assert_eq!(bx.receiver_count(), 0, "no subscribers initially");
let rx1 = bx.subscribe();
assert_eq!(bx.receiver_count(), 1);
let rx2 = bx.subscribe();
assert_eq!(bx.receiver_count(), 2);
drop(rx1);
assert_eq!(bx.receiver_count(), 1);
drop(rx2);
assert_eq!(bx.receiver_count(), 0, "all subscribers dropped");
}
#[tokio::test]
async fn changed_resolves_after_replace() {
let bx = DhtBroadcast::new(None);
let mut rx = bx.subscribe();
let bx2 = bx.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
bx2.replace(None); });
let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx.changed()).await;
assert!(result.is_ok(), "changed should resolve before deadline");
result.unwrap().unwrap();
task.await.unwrap();
}
#[tokio::test]
async fn cloned_receivers_observe_independently() {
let bx = DhtBroadcast::new(None);
let rx1 = bx.subscribe();
let rx2 = rx1.clone();
assert!(rx1.current().is_none());
assert!(rx2.current().is_none());
assert_eq!(bx.receiver_count(), 2);
}
}