Skip to main content

vantage_live/live_stream/
manual.rs

1//! Manually-driven `LiveStream` for tests.
2//!
3//! Tests that need to exercise the cache-invalidation path push events
4//! into a `ManualLiveStream` and observe what happens downstream.
5
6use futures_util::Stream;
7use std::pin::Pin;
8use std::sync::Arc;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::BroadcastStream;
11
12use super::{LiveEvent, LiveStream};
13
14/// Cheap clone, multi-subscriber, lossy on slow consumers. Backed by a
15/// `tokio::sync::broadcast` channel.
16#[derive(Clone)]
17pub struct ManualLiveStream {
18    tx: Arc<broadcast::Sender<LiveEvent>>,
19}
20
21impl ManualLiveStream {
22    /// Capacity is the per-subscriber buffer; older events are dropped if a
23    /// subscriber lags behind. Tests rarely want more than 16.
24    pub fn new(capacity: usize) -> Self {
25        let (tx, _) = broadcast::channel(capacity);
26        Self { tx: Arc::new(tx) }
27    }
28
29    /// Push an event to every current subscriber. Returns the count of
30    /// subscribers that received it (zero is fine, just means nobody's
31    /// listening yet).
32    pub fn push(&self, event: LiveEvent) -> usize {
33        self.tx.send(event).unwrap_or(0)
34    }
35}
36
37impl Default for ManualLiveStream {
38    fn default() -> Self {
39        Self::new(16)
40    }
41}
42
43impl LiveStream for ManualLiveStream {
44    fn subscribe(&self) -> Pin<Box<dyn Stream<Item = LiveEvent> + Send>> {
45        let rx = self.tx.subscribe();
46        // BroadcastStream yields Result<LiveEvent, BroadcastStreamRecvError>
47        // — drop the error variant; it only fires on lag, which a v1
48        // sloppy-invalidation consumer doesn't care about.
49        Box::pin(futures_util::StreamExt::filter_map(
50            BroadcastStream::new(rx),
51            |r| async move { r.ok() },
52        ))
53    }
54}