vantage_live/live_stream/manual.rs
1//! Manually-driven `LiveStream` for tests.
2//!
3//! Tests that need to exercise the cache-invalidation path push events
4//! into a `ManualLiveStream` and observe what happens downstream.
5
6use futures_util::Stream;
7use std::pin::Pin;
8use std::sync::Arc;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::BroadcastStream;
11
12use super::{LiveEvent, LiveStream};
13
14/// Cheap clone, multi-subscriber, lossy on slow consumers. Backed by a
15/// `tokio::sync::broadcast` channel.
16#[derive(Clone)]
17pub struct ManualLiveStream {
18 tx: Arc<broadcast::Sender<LiveEvent>>,
19}
20
21impl ManualLiveStream {
22 /// Capacity is the per-subscriber buffer; older events are dropped if a
23 /// subscriber lags behind. Tests rarely want more than 16.
24 pub fn new(capacity: usize) -> Self {
25 let (tx, _) = broadcast::channel(capacity);
26 Self { tx: Arc::new(tx) }
27 }
28
29 /// Push an event to every current subscriber. Returns the count of
30 /// subscribers that received it (zero is fine, just means nobody's
31 /// listening yet).
32 pub fn push(&self, event: LiveEvent) -> usize {
33 self.tx.send(event).unwrap_or(0)
34 }
35}
36
37impl Default for ManualLiveStream {
38 fn default() -> Self {
39 Self::new(16)
40 }
41}
42
43impl LiveStream for ManualLiveStream {
44 fn subscribe(&self) -> Pin<Box<dyn Stream<Item = LiveEvent> + Send>> {
45 let rx = self.tx.subscribe();
46 // BroadcastStream yields Result<LiveEvent, BroadcastStreamRecvError>
47 // — drop the error variant; it only fires on lag, which a v1
48 // sloppy-invalidation consumer doesn't care about.
49 Box::pin(futures_util::StreamExt::filter_map(
50 BroadcastStream::new(rx),
51 |r| async move { r.ok() },
52 ))
53 }
54}