Skip to main content

tokio_process_tools/output_stream/backend/broadcast/
mod.rs

1//! Multi-consumer broadcast backend with optional replay.
2//!
3//! Two implementations live side by side and are selected by
4//! [`BroadcastOutputStream::from_stream`]:
5//!
6//! - [`fast`] — a thin wrapper around `tokio::sync::broadcast` used only when the config
7//!   is exactly `BestEffortDelivery + NoReplay`. It avoids the shared-state mutex and the
8//!   replay buffer entirely, at the cost of dropping output for slow or late subscribers.
9//! - [`fanout`] — the generic `<D: Delivery, R: Replay>` path used for every other
10//!   combination. It owns an `Arc<Shared>` that tracks the subscriber registry and replay
11//!   history, and routes per-event dispatch through [`state::append_event`] to honor the
12//!   configured delivery guarantee.
13//!
14//! The dispatch lives in [`BroadcastOutputStream::from_stream`] below; see each
15//! submodule's `//!` block for the rationale of that path.
16
17use crate::WaitForLineResult;
18use crate::output_stream::config::StreamConfig;
19use crate::output_stream::consumer::driver::consume_sync;
20use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
21use crate::output_stream::event::StreamEvent;
22use crate::output_stream::line::adapter::LineAdapter;
23use crate::output_stream::policy::{
24    BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, Replay, ReplayEnabled,
25};
26use crate::output_stream::visitors::factories::impl_consumer_factories;
27use crate::output_stream::visitors::wait::WaitForLineSink;
28use crate::output_stream::{OutputStream, TrySubscribable};
29use crate::{
30    AsyncStreamVisitor, Consumer, LineParsingOptions, NumBytes, StreamConsumerError, StreamVisitor,
31};
32use std::borrow::Cow;
33use std::fmt::{Debug, Formatter};
34use std::future::Future;
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::io::AsyncRead;
38#[cfg(test)]
39use tokio::sync::watch;
40
41/// Per-backend return-type alias used by the [`impl_consumer_factories!`] macro to keep the
42/// macro body backend-agnostic. Broadcast factories cannot fail, so the alias is the identity.
43type FactoryReturn<T> = T;
44
45mod fanout;
46mod fast;
47mod state;
48mod subscription;
49
50use fanout::{FanoutReplayBackend, new_fanout_backend};
51use fast::{FastBackend, new_fast_backend};
52use state::{BestEffortLiveQueue, SubscriberSender};
53use subscription::{BroadcastSubscription, FastSubscription, LiveReceiver, SharedSubscription};
54
55enum Backend<D, R>
56where
57    D: Delivery,
58    R: Replay,
59{
60    Fast(FastBackend),
61    FanoutReplay(FanoutReplayBackend<D, R>),
62}
63
64/// The output stream from a process using a multi-consumer broadcast backend.
65///
66/// Broadcast streams support multiple consumers and can optionally retain replay history for
67/// consumers that attach after output has already arrived. Use this backend when the same stream
68/// needs concurrent fanout, such as logging plus readiness checks or logging plus collection.
69/// Delivery policy still determines whether slow active consumers observe gaps or apply
70/// backpressure.
71pub struct BroadcastOutputStream<D = BestEffortDelivery, R = NoReplay>
72where
73    D: Delivery,
74    R: Replay,
75{
76    backend: Backend<D, R>,
77}
78
79impl<D, R> Drop for BroadcastOutputStream<D, R>
80where
81    D: Delivery,
82    R: Replay,
83{
84    fn drop(&mut self) {
85        match &self.backend {
86            Backend::Fast(backend) => {
87                backend.stream_reader.abort();
88            }
89            Backend::FanoutReplay(backend) => {
90                backend.stream_reader.abort();
91                {
92                    let mut state = backend
93                        .shared
94                        .state
95                        .lock()
96                        .expect("broadcast state poisoned");
97                    state.close_for_drop();
98                }
99            }
100        }
101    }
102}
103
104impl<D, R> Debug for BroadcastOutputStream<D, R>
105where
106    D: Delivery + Debug,
107    R: Replay + Debug,
108{
109    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110        let mut debug = f.debug_struct("BroadcastOutputStream");
111        debug.field("output_collector", &"non-debug < JoinHandle<()> >");
112        match &self.backend {
113            Backend::Fast(backend) => {
114                debug.field("backend", &"tokio::sync::broadcast");
115                debug.field("options", &backend.options);
116                debug.field("name", &backend.name);
117            }
118            Backend::FanoutReplay(backend) => {
119                debug.field("backend", &"fanout replay");
120                debug.field("options", &backend.options);
121                debug.field("name", &backend.name);
122            }
123        }
124        debug.finish_non_exhaustive()
125    }
126}
127
128impl<D, R> OutputStream for BroadcastOutputStream<D, R>
129where
130    D: Delivery,
131    R: Replay,
132{
133    fn read_chunk_size(&self) -> NumBytes {
134        match &self.backend {
135            Backend::Fast(backend) => backend.options.read_chunk_size,
136            Backend::FanoutReplay(backend) => backend.options.read_chunk_size,
137        }
138    }
139
140    fn max_buffered_chunks(&self) -> usize {
141        match &self.backend {
142            Backend::Fast(backend) => backend.options.max_buffered_chunks,
143            Backend::FanoutReplay(backend) => backend.options.max_buffered_chunks,
144        }
145    }
146
147    fn name(&self) -> &'static str {
148        match &self.backend {
149            Backend::Fast(backend) => backend.name,
150            Backend::FanoutReplay(backend) => backend.name,
151        }
152    }
153}
154
155impl<D, R> BroadcastOutputStream<D, R>
156where
157    D: Delivery,
158    R: Replay,
159{
160    /// Creates a new broadcast output stream from an async read stream and typed stream config.
161    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
162        stream: S,
163        stream_name: &'static str,
164        options: StreamConfig<D, R>,
165    ) -> Self {
166        options.assert_valid("options");
167
168        if options.delivery_guarantee() == DeliveryGuarantee::BestEffort
169            && !options.replay_enabled()
170        {
171            return Self {
172                backend: Backend::Fast(new_fast_backend(
173                    stream,
174                    stream_name,
175                    options.read_chunk_size,
176                    options.max_buffered_chunks,
177                )),
178            };
179        }
180
181        Self {
182            backend: Backend::FanoutReplay(new_fanout_backend(stream, stream_name, options)),
183        }
184    }
185}
186
187impl<D> BroadcastOutputStream<D, ReplayEnabled>
188where
189    D: Delivery,
190{
191    /// Seals replay history for future subscribers.
192    ///
193    /// This is a one-way, idempotent operation. Active subscribers keep their unread tail data
194    /// according to the configured delivery policy.
195    ///
196    /// # Panics
197    ///
198    /// Panics if the internal state mutex is poisoned.
199    pub fn seal_replay(&self) {
200        let Backend::FanoutReplay(backend) = &self.backend else {
201            return;
202        };
203        {
204            let mut state = backend
205                .shared
206                .state
207                .lock()
208                .expect("broadcast state poisoned");
209            state.seal_replay();
210        }
211    }
212
213    /// Returns `true` once replay history has been sealed.
214    ///
215    /// # Panics
216    ///
217    /// Panics if the internal state mutex is poisoned.
218    #[must_use]
219    pub fn is_replay_sealed(&self) -> bool {
220        let Backend::FanoutReplay(backend) = &self.backend else {
221            return false;
222        };
223        backend
224            .shared
225            .state
226            .lock()
227            .expect("broadcast state poisoned")
228            .replay_sealed
229    }
230}
231
232#[cfg(test)]
233impl<D, R> BroadcastOutputStream<D, R>
234where
235    D: Delivery,
236    R: Replay,
237{
238    pub(super) fn subscribe_bytes_ingested(&self) -> watch::Receiver<u64> {
239        match &self.backend {
240            Backend::Fast(backend) => backend.bytes_ingested_tx.subscribe(),
241            Backend::FanoutReplay(backend) => backend.shared.subscribe_bytes_ingested(),
242        }
243    }
244}
245
246impl<D, R> BroadcastOutputStream<D, R>
247where
248    D: Delivery,
249    R: Replay,
250{
251    fn subscribe(&self) -> BroadcastSubscription<D, R> {
252        let Backend::FanoutReplay(backend) = &self.backend else {
253            panic!("fanout broadcast subscription requested for fast backend");
254        };
255        let mut state = backend
256            .shared
257            .state
258            .lock()
259            .expect("broadcast state poisoned");
260
261        let (subscriber_sender, live_receiver) = match backend.options.delivery_guarantee() {
262            DeliveryGuarantee::ReliableForActiveSubscribers => {
263                let (sender, receiver) =
264                    tokio::sync::mpsc::channel(backend.options.max_buffered_chunks);
265                (
266                    SubscriberSender::Reliable(sender),
267                    LiveReceiver::Reliable(receiver),
268                )
269            }
270            DeliveryGuarantee::BestEffort => {
271                let queue = Arc::new(BestEffortLiveQueue::new(
272                    backend.options.max_buffered_chunks,
273                ));
274                (
275                    SubscriberSender::BestEffort(Arc::clone(&queue)),
276                    LiveReceiver::BestEffort(queue),
277                )
278            }
279        };
280        let (replay, live_start_seq) = state.replay_snapshot(backend.options);
281        let id = if state.closed || state.terminal.is_some() {
282            None
283        } else {
284            Some(state.add_subscriber(subscriber_sender))
285        };
286
287        BroadcastSubscription::Shared(SharedSubscription {
288            shared: Arc::clone(&backend.shared),
289            id,
290            replay,
291            live_start_seq,
292            live_receiver: if id.is_some() {
293                live_receiver
294            } else {
295                LiveReceiver::Closed
296            },
297            _marker: std::marker::PhantomData,
298            done: false,
299        })
300    }
301
302    fn subscribe_normal(&self) -> BroadcastSubscription<D, R> {
303        match &self.backend {
304            Backend::Fast(backend) => {
305                let (receiver, emit_terminal_event) = {
306                    let state = backend
307                        .closure_state
308                        .lock()
309                        .expect("closure_state poisoned");
310                    let receiver = backend.sender.subscribe();
311                    let terminal_event = state
312                        .read_error
313                        .clone()
314                        .map(StreamEvent::ReadError)
315                        .or_else(|| state.closed.then_some(StreamEvent::Eof));
316                    (receiver, terminal_event)
317                };
318
319                BroadcastSubscription::Fast(FastSubscription {
320                    receiver,
321                    emit_terminal_event,
322                })
323            }
324            Backend::FanoutReplay(_) => self.subscribe(),
325        }
326    }
327}
328
329impl<D, R> TrySubscribable for BroadcastOutputStream<D, R>
330where
331    D: Delivery,
332    R: Replay,
333{
334    fn try_subscribe(
335        &self,
336    ) -> Result<impl crate::output_stream::Subscription, StreamConsumerError> {
337        Ok(self.subscribe_normal())
338    }
339}
340
341impl<D, R> BroadcastOutputStream<D, R>
342where
343    D: Delivery,
344    R: Replay,
345{
346    /// Drives the provided synchronous [`StreamVisitor`] over this stream and returns a
347    /// [`Consumer`] that owns the spawned task.
348    ///
349    /// All built-in `inspect_*`, `collect_*`, and `wait_for_line` factories construct a
350    /// built-in visitor and call this method internally; reach for `consume_with` when the
351    /// closure-shaped factories don't fit and you need direct access to the chunk/gap/EOF
352    /// lifecycle. The returned [`Consumer`]'s [`wait`](Consumer::wait) yields whatever the
353    /// visitor produces from [`StreamVisitor::into_output`].
354    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
355    pub fn consume_with<V>(&self, visitor: V) -> Consumer<V::Output>
356    where
357        V: StreamVisitor,
358    {
359        spawn_consumer_sync(self.name(), self.subscribe_normal(), visitor)
360    }
361
362    /// Drives the provided asynchronous [`AsyncStreamVisitor`] over this stream and returns a
363    /// [`Consumer`] that owns the spawned task.
364    ///
365    /// Use this when observing a chunk requires `.await` (for example, forwarding chunks to an
366    /// async writer or channel). See [`consume_with`](Self::consume_with) for the synchronous
367    /// variant.
368    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your visitor is never invoked and the consumer effectively dies immediately. You can safely do a `let _consumer = ...` binding to ignore the typical 'unused' warning."]
369    pub fn consume_with_async<V>(&self, visitor: V) -> Consumer<V::Output>
370    where
371        V: AsyncStreamVisitor,
372    {
373        spawn_consumer_async(self.name(), self.subscribe_normal(), visitor)
374    }
375
376    impl_consumer_factories!();
377
378    /// Waits for a line that matches the given predicate within `timeout`.
379    ///
380    /// The returned future resolves to
381    /// `Ok(`[`WaitForLineResult::Matched`]`)` if a matching line is found,
382    /// `Ok(`[`WaitForLineResult::StreamClosed`]`)` if the stream ends first, or
383    /// `Ok(`[`WaitForLineResult::Timeout`]`)` if the timeout expires first.
384    ///
385    /// The stream subscription is acquired synchronously inside this method, *before* the
386    /// returned future is polled, so output produced between this call and the first
387    /// `.await` cannot race ahead of the matcher.
388    ///
389    /// The waiter starts at the earliest output currently available to new consumers. With
390    /// replay enabled and unsealed, that can include retained past output; otherwise it starts
391    /// at live output.
392    ///
393    /// When chunks are dropped in [`DeliveryGuarantee::BestEffort`] mode, this waiter discards
394    /// any partial line in progress and resynchronizes at the next newline instead of matching
395    /// across the gap.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`crate::StreamReadError`] if the underlying stream fails while being read.
400    ///
401    /// # Panics
402    ///
403    /// Panics if `options.max_line_length` is zero.
404    pub fn wait_for_line(
405        &self,
406        timeout: Duration,
407        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
408        options: LineParsingOptions,
409    ) -> impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static
410    {
411        let subscription = self.subscribe_normal();
412        let visitor = LineAdapter::new(options, WaitForLineSink::new(predicate));
413        async move {
414            // Hold the sender on this stack frame so the receiver never fires while the future
415            // is alive — the sender drops naturally when the future returns or is cancelled.
416            let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
417            match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
418                .await
419            {
420                Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
421                Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
422                Ok(Err(err)) => Err(err),
423                Err(_) => Ok(WaitForLineResult::Timeout),
424            }
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests;