#[cfg(test)]
mod tests;
use futures::{FutureExt as _, Stream, StreamExt as _};
use std::pin::Pin;
use tokio::sync::broadcast::Receiver;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
pub fn has_subscribers<T>(tx: &tokio::sync::broadcast::Sender<T>) -> bool {
tx.closed().now_or_never().is_none()
}
pub fn subscription_stream<T: Clone + Send + 'static>(
rx: Receiver<T>,
) -> Pin<Box<dyn Stream<Item = T> + Send>> {
Box::pin(BroadcastStream::new(rx).filter_map(|r| async move {
match r {
Ok(v) => Some(v),
Err(BroadcastStreamRecvError::Lagged(n)) => {
tracing::warn!("broadcast subscription lagged: dropped {n} events");
None
}
}
}))
}