Skip to main content

tokio_process_tools/output_stream/backend/broadcast/
mod.rs

1//! Multi-consumer broadcast backend with optional replay.
2//!
3//! Two implementations live side by side and are selected by
4//! [`BroadcastOutputStream::from_stream`]:
5//!
6//! - [`fast`]: a thin wrapper around `tokio::sync::broadcast` used only when the config
7//!   is exactly `LossyWithoutBackpressure + NoReplay`. It avoids the shared-state mutex and the
8//!   replay buffer entirely, at the cost of dropping output for slow or late subscribers.
9//! - [`fanout`]: the generic `<D: Delivery, R: Replay>` path used for every other
10//!   combination. It owns an `Arc<Shared>` that tracks the subscriber registry and replay
11//!   history, and routes per-event dispatch through [`state::append_event`] to honor the
12//!   configured delivery guarantee.
13//!
14//! The dispatch lives in [`BroadcastOutputStream::from_stream`] below; see each
15//! submodule's `//!` block for the rationale of that path.
16
17use crate::WaitForLineResult;
18use crate::output_stream::config::StreamConfig;
19use crate::output_stream::consumer::driver::consume_sync;
20use crate::output_stream::event::StreamEvent;
21use crate::output_stream::line::adapter::ParseLines;
22use crate::output_stream::policy::{
23    Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, Replay, ReplayEnabled,
24};
25use crate::output_stream::visitors::wait::WaitForLine;
26use crate::output_stream::{Consumable, OutputStream, Subscribable};
27use crate::{LineParsingOptions, NumBytes};
28use std::borrow::Cow;
29use std::convert::Infallible;
30use std::fmt::{Debug, Formatter};
31use std::future::Future;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::io::AsyncRead;
35#[cfg(test)]
36use tokio::sync::watch;
37use unwrap_infallible::UnwrapInfallible;
38
39mod fanout;
40mod fast;
41mod state;
42mod subscription;
43
44use fanout::{FanoutReplayBackend, new_fanout_backend};
45use fast::{FastBackend, new_fast_backend};
46use state::{BestEffortLiveQueue, SubscriberSender};
47use subscription::{FastSubscription, LiveReceiver, SharedSubscription};
48
49pub use subscription::BroadcastSubscription;
50
51enum Backend<D, R>
52where
53    D: Delivery,
54    R: Replay,
55{
56    Fast(FastBackend),
57    FanoutReplay(FanoutReplayBackend<D, R>),
58}
59
60/// The output stream from a process using a multi-consumer broadcast backend.
61///
62/// Broadcast streams support multiple consumers and can optionally retain replay history for
63/// consumers that attach after output has already arrived. Use this backend when the same stream
64/// needs concurrent fanout, such as logging plus readiness checks or logging plus collection.
65/// Delivery policy still determines whether slow active consumers observe gaps or apply
66/// backpressure.
67pub struct BroadcastOutputStream<D = LossyWithoutBackpressure, R = NoReplay>
68where
69    D: Delivery,
70    R: Replay,
71{
72    backend: Backend<D, R>,
73}
74
75impl<D, R> Drop for BroadcastOutputStream<D, R>
76where
77    D: Delivery,
78    R: Replay,
79{
80    fn drop(&mut self) {
81        match &self.backend {
82            Backend::Fast(backend) => {
83                backend.stream_reader.abort();
84            }
85            Backend::FanoutReplay(backend) => {
86                backend.stream_reader.abort();
87                {
88                    let mut state = backend
89                        .shared
90                        .state
91                        .lock()
92                        .expect("broadcast state poisoned");
93                    state.close_for_drop();
94                }
95            }
96        }
97    }
98}
99
100impl<D, R> Debug for BroadcastOutputStream<D, R>
101where
102    D: Delivery + Debug,
103    R: Replay + Debug,
104{
105    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106        let mut debug = f.debug_struct("BroadcastOutputStream");
107        debug.field("output_collector", &"non-debug < JoinHandle<()> >");
108        match &self.backend {
109            Backend::Fast(backend) => {
110                debug.field("backend", &"tokio::sync::broadcast");
111                debug.field("options", &backend.options);
112                debug.field("name", &backend.name);
113            }
114            Backend::FanoutReplay(backend) => {
115                debug.field("backend", &"fanout replay");
116                debug.field("options", &backend.options);
117                debug.field("name", &backend.name);
118            }
119        }
120        debug.finish_non_exhaustive()
121    }
122}
123
124impl<D, R> OutputStream for BroadcastOutputStream<D, R>
125where
126    D: Delivery,
127    R: Replay,
128{
129    fn read_chunk_size(&self) -> NumBytes {
130        match &self.backend {
131            Backend::Fast(backend) => backend.options.read_chunk_size,
132            Backend::FanoutReplay(backend) => backend.options.read_chunk_size,
133        }
134    }
135
136    fn max_buffered_chunks(&self) -> usize {
137        match &self.backend {
138            Backend::Fast(backend) => backend.options.max_buffered_chunks,
139            Backend::FanoutReplay(backend) => backend.options.max_buffered_chunks,
140        }
141    }
142
143    fn name(&self) -> &'static str {
144        match &self.backend {
145            Backend::Fast(backend) => backend.name,
146            Backend::FanoutReplay(backend) => backend.name,
147        }
148    }
149}
150
151impl<D, R> BroadcastOutputStream<D, R>
152where
153    D: Delivery,
154    R: Replay,
155{
156    /// Creates a new broadcast output stream from an async read stream and typed stream config.
157    #[doc(hidden)]
158    #[must_use]
159    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
160        stream: S,
161        stream_name: &'static str,
162        options: StreamConfig<D, R>,
163    ) -> Self {
164        options.assert_valid("options");
165
166        if options.delivery_guarantee() == DeliveryGuarantee::LossyWithoutBackpressure
167            && !options.replay_enabled()
168        {
169            return Self {
170                backend: Backend::Fast(new_fast_backend(
171                    stream,
172                    stream_name,
173                    options.read_chunk_size,
174                    options.max_buffered_chunks,
175                )),
176            };
177        }
178
179        Self {
180            backend: Backend::FanoutReplay(new_fanout_backend(stream, stream_name, options)),
181        }
182    }
183}
184
185impl<D> BroadcastOutputStream<D, ReplayEnabled>
186where
187    D: Delivery,
188{
189    /// Seals replay history for future subscribers.
190    ///
191    /// This is a one-way, idempotent operation. Active subscribers keep their unread tail data
192    /// according to the configured delivery policy.
193    ///
194    /// # Panics
195    ///
196    /// Panics if the internal state mutex is poisoned.
197    pub fn seal_replay(&self) {
198        let Backend::FanoutReplay(backend) = &self.backend else {
199            return;
200        };
201        {
202            let mut state = backend
203                .shared
204                .state
205                .lock()
206                .expect("broadcast state poisoned");
207            state.seal_replay();
208        }
209    }
210
211    /// Returns `true` once replay history has been sealed.
212    ///
213    /// # Panics
214    ///
215    /// Panics if the internal state mutex is poisoned.
216    #[must_use]
217    pub fn is_replay_sealed(&self) -> bool {
218        let Backend::FanoutReplay(backend) = &self.backend else {
219            return false;
220        };
221        backend
222            .shared
223            .state
224            .lock()
225            .expect("broadcast state poisoned")
226            .replay_sealed
227    }
228}
229
230#[cfg(test)]
231impl<D, R> BroadcastOutputStream<D, R>
232where
233    D: Delivery,
234    R: Replay,
235{
236    pub(super) fn subscribe_bytes_ingested(&self) -> watch::Receiver<u64> {
237        match &self.backend {
238            Backend::Fast(backend) => backend.bytes_ingested_tx.subscribe(),
239            Backend::FanoutReplay(backend) => backend.shared.subscribe_bytes_ingested(),
240        }
241    }
242}
243
244impl<D, R> Subscribable for BroadcastOutputStream<D, R>
245where
246    D: Delivery,
247    R: Replay,
248{
249    type Subscription = BroadcastSubscription<D, R>;
250    type SubscribeError = Infallible;
251
252    fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError> {
253        Ok(match &self.backend {
254            Backend::Fast(backend) => {
255                let (receiver, emit_terminal_event) = {
256                    let state = backend
257                        .closure_state
258                        .lock()
259                        .expect("closure_state poisoned");
260                    let receiver = backend.sender.subscribe();
261                    let terminal_event = state
262                        .read_error
263                        .clone()
264                        .map(StreamEvent::ReadError)
265                        .or_else(|| state.closed.then_some(StreamEvent::Eof));
266                    (receiver, terminal_event)
267                };
268
269                BroadcastSubscription::fast(FastSubscription {
270                    receiver,
271                    emit_terminal_event,
272                })
273            }
274            Backend::FanoutReplay(backend) => {
275                let mut state = backend
276                    .shared
277                    .state
278                    .lock()
279                    .expect("broadcast state poisoned");
280
281                let (subscriber_sender, live_receiver) = match backend.options.delivery_guarantee()
282                {
283                    DeliveryGuarantee::ReliableWithBackpressure => {
284                        let (sender, receiver) =
285                            tokio::sync::mpsc::channel(backend.options.max_buffered_chunks);
286                        (
287                            SubscriberSender::Reliable(sender),
288                            LiveReceiver::Reliable(receiver),
289                        )
290                    }
291                    DeliveryGuarantee::LossyWithoutBackpressure => {
292                        let queue = Arc::new(BestEffortLiveQueue::new(
293                            backend.options.max_buffered_chunks,
294                        ));
295                        (
296                            SubscriberSender::BestEffort(Arc::clone(&queue)),
297                            LiveReceiver::BestEffort(queue),
298                        )
299                    }
300                };
301                let (replay, live_start_seq) = state.replay_snapshot(backend.options);
302                let id = if state.closed || state.terminal.is_some() {
303                    None
304                } else {
305                    Some(state.add_subscriber(subscriber_sender))
306                };
307
308                BroadcastSubscription::shared(SharedSubscription {
309                    shared: Arc::clone(&backend.shared),
310                    id,
311                    replay,
312                    live_start_seq,
313                    live_receiver: if id.is_some() {
314                        live_receiver
315                    } else {
316                        LiveReceiver::Closed
317                    },
318                    _marker: std::marker::PhantomData,
319                    done: false,
320                })
321            }
322        })
323    }
324}
325
326impl<D, R> Consumable for BroadcastOutputStream<D, R>
327where
328    D: Delivery,
329    R: Replay,
330{
331    type Error = Infallible;
332}
333
334impl<D, R> BroadcastOutputStream<D, R>
335where
336    D: Delivery,
337    R: Replay,
338{
339    /// Waits for a line that matches the given predicate within `timeout`.
340    ///
341    /// The returned future resolves to
342    /// `Ok(`[`WaitForLineResult::Matched`]`)` if a matching line is found,
343    /// `Ok(`[`WaitForLineResult::StreamClosed`]`)` if the stream ends first, or
344    /// `Ok(`[`WaitForLineResult::Timeout`]`)` if the timeout expires first.
345    ///
346    /// The stream subscription is acquired synchronously inside this method, *before* the
347    /// returned future is polled, so output produced between this call and the first
348    /// `.await` cannot race ahead of the matcher.
349    ///
350    /// The waiter starts at the earliest output currently available to new consumers. With
351    /// replay enabled and unsealed, that can include retained past output; otherwise it starts
352    /// at live output.
353    ///
354    /// When chunks are dropped in [`DeliveryGuarantee::LossyWithoutBackpressure`] mode, this waiter discards
355    /// any partial line in progress and resynchronizes at the next newline instead of matching
356    /// across the gap.
357    ///
358    /// # Errors
359    ///
360    /// Returns [`crate::StreamReadError`] if the underlying stream fails while being read.
361    ///
362    /// # Panics
363    ///
364    /// Panics if `options.max_line_length` is zero.
365    pub fn wait_for_line(
366        &self,
367        timeout: Duration,
368        predicate: impl Fn(Cow<'_, str>) -> bool + Send + 'static,
369        options: LineParsingOptions,
370    ) -> impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static
371    {
372        let subscription = self.try_subscribe().unwrap_infallible();
373        let visitor = ParseLines::new(options, WaitForLine::new(predicate));
374        async move {
375            // Hold the sender on this stack frame so the receiver never fires while the future
376            // is alive (the sender drops naturally when the future returns or is canceled).
377            let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
378            match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
379                .await
380            {
381                Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
382                Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
383                Ok(Err(err)) => Err(err),
384                Err(_) => Ok(WaitForLineResult::Timeout),
385            }
386        }
387    }
388}
389
390#[cfg(test)]
391mod tests;