Skip to main content

tokio_process_tools/output_stream/backend/single_subscriber/
mod.rs

1//! Single-active-consumer backend with optional replay.
2
3use crate::WaitForLineResult;
4use crate::output_stream::config::StreamConfig;
5use crate::output_stream::consumer::driver::consume_sync;
6use crate::output_stream::event::StreamEvent;
7use crate::output_stream::line::adapter::ParseLines;
8use crate::output_stream::policy::{
9    Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, Replay, ReplayEnabled,
10    ReplayRetention,
11};
12use crate::output_stream::visitors::wait::WaitForLine;
13use crate::output_stream::{Consumable, OutputStream, Subscribable, Subscription};
14use crate::{LineParsingOptions, NumBytes, StreamConsumerError};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::fmt::{Debug, Formatter};
18use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21use tokio::io::AsyncRead;
22use tokio::sync::mpsc;
23use tokio::task::JoinHandle;
24
25mod reader;
26mod state;
27mod subscription;
28
29use reader::{read_chunked_best_effort, read_chunked_reliable};
30use state::{ActiveSubscriber, ConfiguredShared};
31
32pub use subscription::SingleSubscriberSubscription;
33
34impl Subscription for mpsc::Receiver<StreamEvent> {
35    fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
36        self.recv()
37    }
38}
39
40/// The output stream from a process. Either representing stdout or stderr.
41///
42/// This is the single-subscriber variant, allowing exactly one active consumer at a time. It is
43/// useful when one inspector, collector, or line waiter should own the stream and accidental
44/// concurrent fanout should be rejected early. It can reduce coordination overhead in some
45/// single-consumer paths, but it is not a categorical throughput replacement for broadcast.
46/// If multiple concurrent consumers are required, use the
47/// `output_stream::backend::broadcast::BroadcastOutputStream`.
48pub struct SingleSubscriberOutputStream<D = LossyWithoutBackpressure, R = NoReplay>
49where
50    D: Delivery,
51    R: Replay,
52{
53    /// The task that reads output from the underlying stream and routes it to the active
54    /// subscriber, replay storage, or discard path.
55    stream_reader: JoinHandle<()>,
56
57    /// Typed stream configuration selected by the process stream builder.
58    options: StreamConfig<D, R>,
59
60    /// Shared replay state for typed single-subscriber configurations.
61    configured_shared: Arc<ConfiguredShared>,
62
63    /// Name of this stream.
64    name: &'static str,
65}
66
67impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
68where
69    D: Delivery,
70    R: Replay,
71{
72    fn drop(&mut self) {
73        self.stream_reader.abort();
74        self.configured_shared.clear_active();
75    }
76}
77
78impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
79where
80    D: Delivery + Debug,
81    R: Replay + Debug,
82{
83    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("SingleSubscriberOutputStream")
85            .field("output_collector", &"non-debug < JoinHandle<()> >")
86            .field("options", &self.options)
87            .field("name", &self.name)
88            .finish_non_exhaustive()
89    }
90}
91
92impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
93where
94    D: Delivery,
95    R: Replay,
96{
97    fn read_chunk_size(&self) -> NumBytes {
98        self.options.read_chunk_size
99    }
100
101    fn max_buffered_chunks(&self) -> usize {
102        self.options.max_buffered_chunks
103    }
104
105    fn name(&self) -> &'static str {
106        self.name
107    }
108}
109
110impl<D, R> SingleSubscriberOutputStream<D, R>
111where
112    D: Delivery,
113    R: Replay,
114{
115    /// Creates a new single-subscriber output stream from an async read stream and typed stream config.
116    #[doc(hidden)]
117    #[must_use]
118    pub fn from_stream<S>(stream: S, stream_name: &'static str, options: StreamConfig<D, R>) -> Self
119    where
120        S: AsyncRead + Unpin + Send + 'static,
121    {
122        options.assert_valid("options");
123
124        let shared = Arc::new(ConfiguredShared::new());
125        let active_rx = shared.subscribe_active();
126        let delivery_guarantee = options.delivery_guarantee();
127        let replay_retention = options.replay_retention();
128
129        let stream_reader = match delivery_guarantee {
130            DeliveryGuarantee::LossyWithoutBackpressure => tokio::spawn(read_chunked_best_effort(
131                stream,
132                Arc::clone(&shared),
133                active_rx,
134                options.read_chunk_size,
135                replay_retention,
136                stream_name,
137            )),
138            DeliveryGuarantee::ReliableWithBackpressure => tokio::spawn(read_chunked_reliable(
139                stream,
140                Arc::clone(&shared),
141                active_rx,
142                options.read_chunk_size,
143                replay_retention,
144                stream_name,
145            )),
146        };
147
148        Self {
149            stream_reader,
150            options,
151            configured_shared: shared,
152            name: stream_name,
153        }
154    }
155
156    /// Returns whether replay-specific APIs are enabled for this stream.
157    #[must_use]
158    pub fn replay_enabled(&self) -> bool {
159        self.options.replay_enabled()
160    }
161
162    /// Returns the configured replay retention.
163    #[must_use]
164    pub fn replay_retention(&self) -> Option<ReplayRetention> {
165        self.options.replay_retention()
166    }
167}
168
169impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
170where
171    D: Delivery,
172{
173    /// Seals replay history for future subscribers.
174    ///
175    /// This is a one-way, idempotent operation.
176    ///
177    /// # Panics
178    ///
179    /// Panics if the internal state mutex is poisoned.
180    pub fn seal_replay(&self) {
181        let mut state = self
182            .configured_shared
183            .state
184            .lock()
185            .expect("single-subscriber state poisoned");
186        state.replay_sealed = true;
187        state.trim_replay_window(self.options.replay_retention());
188    }
189
190    /// Returns `true` once replay history has been sealed.
191    ///
192    /// # Panics
193    ///
194    /// Panics if the internal state mutex is poisoned.
195    #[must_use]
196    pub fn is_replay_sealed(&self) -> bool {
197        self.configured_shared
198            .state
199            .lock()
200            .expect("single-subscriber state poisoned")
201            .replay_sealed
202    }
203}
204
205impl<D, R> Subscribable for SingleSubscriberOutputStream<D, R>
206where
207    D: Delivery,
208    R: Replay,
209{
210    type Subscription = SingleSubscriberSubscription;
211    type SubscribeError = StreamConsumerError;
212
213    fn try_subscribe(&self) -> Result<Self::Subscription, Self::SubscribeError> {
214        let shared = &self.configured_shared;
215
216        let (sender, receiver) = mpsc::channel(self.options.max_buffered_chunks);
217        let (id, replay, terminal_event) = {
218            let mut state = shared
219                .state
220                .lock()
221                .expect("single-subscriber state poisoned");
222
223            if state.active_id.is_some() {
224                return Err(StreamConsumerError::ActiveConsumer {
225                    stream_name: self.name,
226                });
227            }
228
229            let replay = if state.replay_sealed || self.options.replay_retention().is_none() {
230                VecDeque::default()
231            } else {
232                state.snapshot_events()
233            };
234            let id = state.attach_subscriber();
235            shared
236                .active_tx
237                .send_replace(Some(Arc::new(ActiveSubscriber { id, sender })));
238            (id, replay, state.terminal_event.clone())
239        };
240
241        Ok(SingleSubscriberSubscription {
242            id,
243            shared: Arc::clone(shared),
244            replay,
245            terminal_event,
246            live_receiver: Some(receiver),
247        })
248    }
249}
250
251impl<D, R> Consumable for SingleSubscriberOutputStream<D, R>
252where
253    D: Delivery,
254    R: Replay,
255{
256    type Error = StreamConsumerError;
257}
258
259impl<D, R> SingleSubscriberOutputStream<D, R>
260where
261    D: Delivery,
262    R: Replay,
263{
264    /// Tries to wait for a line that matches the given predicate within `timeout`.
265    ///
266    /// # Errors
267    ///
268    /// Returns [`StreamConsumerError`] if the backend rejects the line waiter.
269    ///
270    /// # Panics
271    ///
272    /// Panics if `options.max_line_length` is zero.
273    pub fn wait_for_line(
274        &self,
275        timeout: Duration,
276        predicate: impl Fn(Cow<'_, str>) -> bool + Send + 'static,
277        options: LineParsingOptions,
278    ) -> Result<
279        impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static,
280        StreamConsumerError,
281    > {
282        let subscription = self.try_subscribe()?;
283        let visitor = ParseLines::new(options, WaitForLine::new(predicate));
284        Ok(async move {
285            let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
286            match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
287                .await
288            {
289                Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
290                Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
291                Ok(Err(err)) => Err(err),
292                Err(_) => Ok(WaitForLineResult::Timeout),
293            }
294        })
295    }
296}
297
298#[cfg(test)]
299mod tests;