use futures_util::Stream;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use super::{LiveEvent, LiveStream};
#[derive(Clone)]
pub struct ManualLiveStream {
tx: Arc<broadcast::Sender<LiveEvent>>,
}
impl ManualLiveStream {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self { tx: Arc::new(tx) }
}
pub fn push(&self, event: LiveEvent) -> usize {
self.tx.send(event).unwrap_or(0)
}
}
impl Default for ManualLiveStream {
fn default() -> Self {
Self::new(16)
}
}
impl LiveStream for ManualLiveStream {
fn subscribe(&self) -> Pin<Box<dyn Stream<Item = LiveEvent> + Send>> {
let rx = self.tx.subscribe();
Box::pin(futures_util::StreamExt::filter_map(
BroadcastStream::new(rx),
|r| async move { r.ok() },
))
}
}