Skip to main content

tokio_process_tools/output_stream/backend/single_subscriber/
mod.rs

1//! Single-active-consumer backend with optional replay.
2
3use crate::WaitForLineResult;
4use crate::output_stream::config::StreamConfig;
5use crate::output_stream::consumer::driver::consume_sync;
6use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
7use crate::output_stream::event::StreamEvent;
8use crate::output_stream::line::adapter::LineAdapter;
9use crate::output_stream::policy::{
10    BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, Replay, ReplayEnabled,
11    ReplayRetention,
12};
13use crate::output_stream::visitors::factories::impl_consumer_factories;
14use crate::output_stream::visitors::wait::WaitForLineSink;
15use crate::output_stream::{OutputStream, Subscription, TrySubscribable};
16use crate::{
17    AsyncStreamVisitor, Consumer, LineParsingOptions, NumBytes, StreamConsumerError, StreamVisitor,
18};
19use std::borrow::Cow;
20use std::collections::VecDeque;
21use std::fmt::{Debug, Formatter};
22use std::future::Future;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::io::AsyncRead;
26use tokio::sync::mpsc;
27use tokio::task::JoinHandle;
28
29/// Per-backend return-type alias used by the [`impl_consumer_factories!`] macro to keep the
30/// macro body backend-agnostic. Single-subscriber factories may fail when a consumer is
31/// already active, so each factory return is wrapped in `Result<_, StreamConsumerError>`.
32type FactoryReturn<T> = Result<T, StreamConsumerError>;
33
34mod reader;
35mod state;
36mod subscription;
37
38use reader::{read_chunked_best_effort, read_chunked_reliable};
39use state::{ActiveSubscriber, ConfiguredShared};
40use subscription::SingleSubscriberSubscription;
41
42impl Subscription for mpsc::Receiver<StreamEvent> {
43    fn next_event(&mut self) -> impl Future<Output = Option<StreamEvent>> + Send + '_ {
44        self.recv()
45    }
46}
47
48/// The output stream from a process. Either representing stdout or stderr.
49///
50/// This is the single-subscriber variant, allowing exactly one active consumer at a time. It is
51/// useful when one inspector, collector, or line waiter should own the stream and accidental
52/// concurrent fanout should be rejected early. It can reduce coordination overhead in some
53/// single-consumer paths, but it is not a categorical throughput replacement for broadcast.
54/// If multiple concurrent consumers are required, use the
55/// `output_stream::backend::broadcast::BroadcastOutputStream`.
56pub struct SingleSubscriberOutputStream<D = BestEffortDelivery, R = NoReplay>
57where
58    D: Delivery,
59    R: Replay,
60{
61    /// The task that reads output from the underlying stream and routes it to the active
62    /// subscriber, replay storage, or discard path.
63    stream_reader: JoinHandle<()>,
64
65    /// Typed stream configuration selected by the process stream builder.
66    options: StreamConfig<D, R>,
67
68    /// Shared replay state for typed single-subscriber configurations.
69    configured_shared: Arc<ConfiguredShared>,
70
71    /// Name of this stream.
72    name: &'static str,
73}
74
75impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
76where
77    D: Delivery,
78    R: Replay,
79{
80    fn drop(&mut self) {
81        self.stream_reader.abort();
82        self.configured_shared.clear_active();
83    }
84}
85
86impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
87where
88    D: Delivery + Debug,
89    R: Replay + Debug,
90{
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("SingleSubscriberOutputStream")
93            .field("output_collector", &"non-debug < JoinHandle<()> >")
94            .field("options", &self.options)
95            .field("name", &self.name)
96            .finish_non_exhaustive()
97    }
98}
99
100impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
101where
102    D: Delivery,
103    R: Replay,
104{
105    fn read_chunk_size(&self) -> NumBytes {
106        self.options.read_chunk_size
107    }
108
109    fn max_buffered_chunks(&self) -> usize {
110        self.options.max_buffered_chunks
111    }
112
113    fn name(&self) -> &'static str {
114        self.name
115    }
116}
117
118impl<D, R> SingleSubscriberOutputStream<D, R>
119where
120    D: Delivery,
121    R: Replay,
122{
123    /// Creates a new single-subscriber output stream from an async read stream and typed stream config.
124    pub fn from_stream<S>(stream: S, stream_name: &'static str, options: StreamConfig<D, R>) -> Self
125    where
126        S: AsyncRead + Unpin + Send + 'static,
127    {
128        options.assert_valid("options");
129
130        let shared = Arc::new(ConfiguredShared::new());
131        let active_rx = shared.subscribe_active();
132        let delivery_guarantee = options.delivery_guarantee();
133        let replay_retention = options.replay_retention();
134
135        let stream_reader = match delivery_guarantee {
136            DeliveryGuarantee::BestEffort => tokio::spawn(read_chunked_best_effort(
137                stream,
138                Arc::clone(&shared),
139                active_rx,
140                options.read_chunk_size,
141                replay_retention,
142                stream_name,
143            )),
144            DeliveryGuarantee::ReliableForActiveSubscribers => tokio::spawn(read_chunked_reliable(
145                stream,
146                Arc::clone(&shared),
147                active_rx,
148                options.read_chunk_size,
149                replay_retention,
150                stream_name,
151            )),
152        };
153
154        Self {
155            stream_reader,
156            options,
157            configured_shared: shared,
158            name: stream_name,
159        }
160    }
161
162    /// Returns whether replay-specific APIs are enabled for this stream.
163    #[must_use]
164    pub fn replay_enabled(&self) -> bool {
165        self.options.replay_enabled()
166    }
167
168    /// Returns the configured replay retention.
169    #[must_use]
170    pub fn replay_retention(&self) -> Option<ReplayRetention> {
171        self.options.replay_retention()
172    }
173
174    fn take_subscription(&self) -> Result<SingleSubscriberSubscription, StreamConsumerError> {
175        let shared = &self.configured_shared;
176
177        let (sender, receiver) = mpsc::channel(self.options.max_buffered_chunks);
178        let (id, replay, terminal_event) = {
179            let mut state = shared
180                .state
181                .lock()
182                .expect("single-subscriber state poisoned");
183
184            if state.active_id.is_some() {
185                return Err(StreamConsumerError::ActiveConsumer {
186                    stream_name: self.name,
187                });
188            }
189
190            let replay = if state.replay_sealed || self.options.replay_retention().is_none() {
191                VecDeque::default()
192            } else {
193                state.snapshot_events()
194            };
195            let id = state.attach_subscriber();
196            shared
197                .active_tx
198                .send_replace(Some(Arc::new(ActiveSubscriber { id, sender })));
199            (id, replay, state.terminal_event.clone())
200        };
201
202        Ok(SingleSubscriberSubscription {
203            id,
204            shared: Arc::clone(shared),
205            replay,
206            terminal_event,
207            live_receiver: Some(receiver),
208        })
209    }
210}
211
212impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
213where
214    D: Delivery,
215{
216    /// Seals replay history for future subscribers.
217    ///
218    /// This is a one-way, idempotent operation.
219    ///
220    /// # Panics
221    ///
222    /// Panics if the internal state mutex is poisoned.
223    pub fn seal_replay(&self) {
224        let mut state = self
225            .configured_shared
226            .state
227            .lock()
228            .expect("single-subscriber state poisoned");
229        state.replay_sealed = true;
230        state.trim_replay_window(self.options.replay_retention());
231    }
232
233    /// Returns `true` once replay history has been sealed.
234    ///
235    /// # Panics
236    ///
237    /// Panics if the internal state mutex is poisoned.
238    #[must_use]
239    pub fn is_replay_sealed(&self) -> bool {
240        self.configured_shared
241            .state
242            .lock()
243            .expect("single-subscriber state poisoned")
244            .replay_sealed
245    }
246}
247
248impl<D, R> TrySubscribable for SingleSubscriberOutputStream<D, R>
249where
250    D: Delivery,
251    R: Replay,
252{
253    fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError> {
254        self.take_subscription()
255    }
256}
257
258impl<D, R> SingleSubscriberOutputStream<D, R>
259where
260    D: Delivery,
261    R: Replay,
262{
263    /// Tries to drive the provided synchronous [`StreamVisitor`] over this stream.
264    ///
265    /// Returns a [`Consumer`] that owns the spawned task driving the visitor. All built-in
266    /// `inspect_*`, `collect_*`, and `wait_for_line` factories construct a built-in visitor and
267    /// call this method internally.
268    ///
269    /// # Errors
270    ///
271    /// Returns [`StreamConsumerError`] if the backend rejects the consumer (single-subscriber
272    /// streams allow only one active consumer or line waiter at a time).
273    pub fn consume_with<V>(&self, visitor: V) -> Result<Consumer<V::Output>, StreamConsumerError>
274    where
275        V: StreamVisitor,
276    {
277        Ok(spawn_consumer_sync(
278            self.name(),
279            self.take_subscription()?,
280            visitor,
281        ))
282    }
283
284    /// Tries to drive the provided asynchronous [`AsyncStreamVisitor`] over this stream.
285    ///
286    /// # Errors
287    ///
288    /// Returns [`StreamConsumerError`] if the backend rejects the consumer.
289    pub fn consume_with_async<V>(
290        &self,
291        visitor: V,
292    ) -> Result<Consumer<V::Output>, StreamConsumerError>
293    where
294        V: AsyncStreamVisitor,
295    {
296        Ok(spawn_consumer_async(
297            self.name(),
298            self.take_subscription()?,
299            visitor,
300        ))
301    }
302
303    impl_consumer_factories!();
304
305    /// Tries to wait for a line that matches the given predicate within `timeout`.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`StreamConsumerError`] if the backend rejects the line waiter.
310    ///
311    /// # Panics
312    ///
313    /// Panics if `options.max_line_length` is zero.
314    pub fn wait_for_line(
315        &self,
316        timeout: Duration,
317        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
318        options: LineParsingOptions,
319    ) -> Result<
320        impl Future<Output = Result<WaitForLineResult, crate::StreamReadError>> + Send + 'static,
321        StreamConsumerError,
322    > {
323        let subscription = self.take_subscription()?;
324        let visitor = LineAdapter::new(options, WaitForLineSink::new(predicate));
325        Ok(async move {
326            let (_term_sig_tx, term_sig_rx) = tokio::sync::oneshot::channel::<()>();
327            match tokio::time::timeout(timeout, consume_sync(subscription, visitor, term_sig_rx))
328                .await
329            {
330                Ok(Ok(true)) => Ok(WaitForLineResult::Matched),
331                Ok(Ok(false)) => Ok(WaitForLineResult::StreamClosed),
332                Ok(Err(err)) => Err(err),
333                Err(_) => Ok(WaitForLineResult::Timeout),
334            }
335        })
336    }
337}
338
339#[cfg(test)]
340mod tests;