Skip to main content

tokio_process_tools/process/
stream_config.rs

1use crate::output_stream::OutputStream;
2use crate::output_stream::backend::broadcast::BroadcastOutputStream;
3use crate::output_stream::backend::discard::DiscardedOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::config::{
6    StreamConfig, StreamConfigBuilder, StreamConfigMaxBufferedChunksBuilder,
7    StreamConfigReadChunkSizeBuilder, StreamConfigReadyBuilder, StreamConfigReplayBuilder,
8};
9use crate::output_stream::policy::{
10    Delivery, LossyWithoutBackpressure, ReliableWithBackpressure, Replay, ReplayEnabled,
11};
12use std::marker::PhantomData;
13use std::process::Stdio;
14use tokio::io::AsyncRead;
15
16mod process_stream_config {
17    use super::OutputStream;
18    use std::process::Stdio;
19    use tokio::io::AsyncRead;
20
21    pub trait Sealed<Stream>
22    where
23        Stream: OutputStream,
24    {
25        /// Returns the [`Stdio`] disposition this configuration requires for the matching child
26        /// stdio slot. Piped backends return [`Stdio::piped()`]; the discard backend returns
27        /// [`Stdio::null()`] so the OS routes the bytes to `/dev/null` without ever crossing into
28        /// the parent.
29        fn child_stdio(&self) -> Stdio;
30
31        /// Constructs the stream wrapper. `captured` is the parent end of the child's pipe when
32        /// `child_stdio()` returned [`Stdio::piped()`]; it is `None` when this configuration set
33        /// the child slot to [`Stdio::null()`] (in which case there is nothing to read from).
34        fn into_stream<S>(self, captured: Option<S>, stream_name: &'static str) -> Stream
35        where
36            S: AsyncRead + Unpin + Send + 'static;
37    }
38}
39
40/// Marker trait for process stream builder configurations.
41///
42/// This trait is sealed. External crates cannot implement additional process stream
43/// configuration types; use [`ProcessStreamBuilder`] to select one of the supported backends.
44pub trait ProcessStreamConfig<Stream>: process_stream_config::Sealed<Stream>
45where
46    Stream: OutputStream,
47{
48}
49
50impl<Config, Stream> ProcessStreamConfig<Stream> for Config
51where
52    Config: process_stream_config::Sealed<Stream>,
53    Stream: OutputStream,
54{
55}
56
57impl<D, R> process_stream_config::Sealed<BroadcastOutputStream<D, R>>
58    for PipedStreamConfig<BroadcastBackend, StreamConfigReadyBuilder<D, R>>
59where
60    D: Delivery,
61    R: Replay,
62{
63    fn child_stdio(&self) -> Stdio {
64        Stdio::piped()
65    }
66
67    fn into_stream<S>(
68        self,
69        captured: Option<S>,
70        stream_name: &'static str,
71    ) -> BroadcastOutputStream<D, R>
72    where
73        S: AsyncRead + Unpin + Send + 'static,
74    {
75        let stream = captured.expect(
76            "broadcast backend requires a captured pipe; child_stdio() promised Stdio::piped()",
77        );
78        BroadcastOutputStream::from_stream(stream, stream_name, self.stage.build())
79    }
80}
81
82impl<D, R> process_stream_config::Sealed<SingleSubscriberOutputStream<D, R>>
83    for PipedStreamConfig<SingleSubscriberBackend, StreamConfigReadyBuilder<D, R>>
84where
85    D: Delivery,
86    R: Replay,
87{
88    fn child_stdio(&self) -> Stdio {
89        Stdio::piped()
90    }
91
92    fn into_stream<S>(
93        self,
94        captured: Option<S>,
95        stream_name: &'static str,
96    ) -> SingleSubscriberOutputStream<D, R>
97    where
98        S: AsyncRead + Unpin + Send + 'static,
99    {
100        let stream = captured.expect(
101            "single-subscriber backend requires a captured pipe; child_stdio() promised \
102             Stdio::piped()",
103        );
104        SingleSubscriberOutputStream::from_stream(stream, stream_name, self.stage.build())
105    }
106}
107
108impl process_stream_config::Sealed<DiscardedOutputStream> for DiscardedStreamConfig {
109    fn child_stdio(&self) -> Stdio {
110        Stdio::null()
111    }
112
113    fn into_stream<S>(
114        self,
115        _captured: Option<S>,
116        stream_name: &'static str,
117    ) -> DiscardedOutputStream
118    where
119        S: AsyncRead + Unpin + Send + 'static,
120    {
121        DiscardedOutputStream::new(stream_name)
122    }
123}
124
125/// Builder for selecting the output stream backend for one process stream.
126///
127/// Backend choice controls stream ownership and fanout. Delivery policy and replay policy are
128/// selected in later builder stages and are independent decisions. The [`Self::discard`] entry
129/// short-circuits the chain entirely for stdio that should be routed to `/dev/null` at the OS
130/// level.
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ProcessStreamBuilder;
133
134#[doc(hidden)]
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BroadcastBackend;
137
138#[doc(hidden)]
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub struct SingleSubscriberBackend;
141
142/// Configuration produced by [`ProcessStreamBuilder::discard`]. The matching child stdio slot is
143/// set to [`Stdio::null()`], so the OS discards the bytes; no pipe is allocated and no reader
144/// task runs in the parent.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct DiscardedStreamConfig;
147
148#[doc(hidden)]
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub struct PipedStreamConfig<Backend, Stage> {
151    stage: Stage,
152    _backend: PhantomData<Backend>,
153}
154
155impl<Backend, Stage> PipedStreamConfig<Backend, Stage> {
156    fn new(stage: Stage) -> Self {
157        Self {
158            stage,
159            _backend: PhantomData,
160        }
161    }
162}
163
164impl<Backend> PipedStreamConfig<Backend, StreamConfigBuilder> {
165    /// Selects lossy delivery that never blocks the child. Slow active subscribers may observe
166    /// gaps; line-aware consumers resync at the next newline.
167    #[must_use]
168    pub fn lossy_without_backpressure(
169        self,
170    ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<LossyWithoutBackpressure>> {
171        PipedStreamConfig::new(self.stage.lossy_without_backpressure())
172    }
173
174    /// Selects reliable delivery that pauses the reader task when an active subscriber's buffer is
175    /// full, applying backpressure to the child. Reliability is scoped to currently-attached
176    /// subscribers; late attachers depend on the replay axis for earlier output.
177    #[must_use]
178    pub fn reliable_with_backpressure(
179        self,
180    ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<ReliableWithBackpressure>> {
181        PipedStreamConfig::new(self.stage.reliable_with_backpressure())
182    }
183}
184
185impl<Backend, D> PipedStreamConfig<Backend, StreamConfigReplayBuilder<D>>
186where
187    D: Delivery,
188{
189    /// Disables replay for future subscribers.
190    #[must_use]
191    pub fn no_replay(
192        self,
193    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, crate::NoReplay>> {
194        PipedStreamConfig::new(self.stage.no_replay())
195    }
196
197    /// Keeps the latest number of chunks for future subscribers.
198    #[must_use]
199    pub fn replay_last_chunks(
200        self,
201        chunks: usize,
202    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
203        PipedStreamConfig::new(self.stage.replay_last_chunks(chunks))
204    }
205
206    /// Keeps whole chunks covering at least the latest number of bytes.
207    #[must_use]
208    pub fn replay_last_bytes(
209        self,
210        bytes: crate::NumBytes,
211    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
212        PipedStreamConfig::new(self.stage.replay_last_bytes(bytes))
213    }
214
215    /// Keeps all output for the stream lifetime.
216    #[must_use]
217    pub fn replay_all(
218        self,
219    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
220        PipedStreamConfig::new(self.stage.replay_all())
221    }
222}
223
224impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, R>>
225where
226    D: Delivery,
227    R: Replay,
228{
229    /// Selects the size of chunks read from the underlying process stream.
230    #[must_use]
231    pub fn read_chunk_size(
232        self,
233        read_chunk_size: crate::NumBytes,
234    ) -> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>> {
235        PipedStreamConfig::new(self.stage.read_chunk_size(read_chunk_size))
236    }
237}
238
239impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>>
240where
241    D: Delivery,
242    R: Replay,
243{
244    /// Selects the maximum number of chunks held by the underlying async channel.
245    #[must_use]
246    pub fn max_buffered_chunks(
247        self,
248        max_buffered_chunks: usize,
249    ) -> PipedStreamConfig<Backend, StreamConfigReadyBuilder<D, R>> {
250        PipedStreamConfig::new(self.stage.max_buffered_chunks(max_buffered_chunks))
251    }
252}
253
254impl ProcessStreamBuilder {
255    /// Selects the broadcast backend for this stream.
256    ///
257    /// Use this when the same stdout or stderr stream must be consumed concurrently, such as
258    /// logging plus readiness checks or logging plus collection.
259    #[must_use]
260    pub fn broadcast(self) -> PipedStreamConfig<BroadcastBackend, StreamConfigBuilder> {
261        PipedStreamConfig::new(StreamConfig::builder())
262    }
263
264    /// Selects the single-subscriber backend for this stream.
265    ///
266    /// Use this when exactly one active consumer should own the stream and accidental concurrent
267    /// fanout should be rejected early. This backend can reduce coordination overhead in some
268    /// single-consumer paths, but delivery policy still determines lag behavior.
269    #[must_use]
270    pub fn single_subscriber(
271        self,
272    ) -> PipedStreamConfig<SingleSubscriberBackend, StreamConfigBuilder> {
273        PipedStreamConfig::new(StreamConfig::builder())
274    }
275
276    /// Routes the matching child stdio slot to [`Stdio::null()`].
277    ///
278    /// No pipe is allocated, no reader task is spawned, and the resulting stream is a
279    /// [`DiscardedOutputStream`] that does not expose any consumer methods. Reach for this when only
280    /// the exit status matters and the child's output should be dropped at the OS level.
281    #[must_use]
282    pub fn discard(self) -> DiscardedStreamConfig {
283        DiscardedStreamConfig
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::{NumBytes, NumBytesExt};
291    use assertr::prelude::*;
292
293    mod read_chunk_size {
294        use super::*;
295
296        #[test]
297        fn panics_on_zero_value() {
298            assert_that_panic_by(|| {
299                let _config = ProcessStreamBuilder
300                    .single_subscriber()
301                    .lossy_without_backpressure()
302                    .no_replay()
303                    .read_chunk_size(NumBytes::zero())
304                    .max_buffered_chunks(1);
305            })
306            .has_type::<String>()
307            .is_equal_to("read_chunk_size must be greater than zero bytes");
308        }
309    }
310
311    mod max_buffered_chunks {
312        use super::*;
313
314        #[test]
315        fn panics_on_zero_for_single_subscriber() {
316            assert_that_panic_by(|| {
317                let _config = ProcessStreamBuilder
318                    .single_subscriber()
319                    .lossy_without_backpressure()
320                    .no_replay()
321                    .read_chunk_size(8.bytes())
322                    .max_buffered_chunks(0);
323            })
324            .has_type::<String>()
325            .is_equal_to("max_buffered_chunks must be greater than zero");
326        }
327
328        #[test]
329        fn panics_on_zero_for_broadcast() {
330            assert_that_panic_by(|| {
331                let _config = ProcessStreamBuilder
332                    .broadcast()
333                    .lossy_without_backpressure()
334                    .no_replay()
335                    .read_chunk_size(8.bytes())
336                    .max_buffered_chunks(0);
337            })
338            .has_type::<String>()
339            .is_equal_to("max_buffered_chunks must be greater than zero");
340        }
341    }
342}