Skip to main content

tokio_process_tools/process/
stream_config.rs

1use crate::output_stream::OutputStream;
2use crate::output_stream::backend::broadcast::BroadcastOutputStream;
3use crate::output_stream::backend::discard::DiscardedOutputStream;
4use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::config::{
6    StreamConfig, StreamConfigBuilder, StreamConfigMaxBufferedChunksBuilder,
7    StreamConfigReadChunkSizeBuilder, StreamConfigReadyBuilder, StreamConfigReplayBuilder,
8};
9use crate::output_stream::policy::{
10    BestEffortDelivery, Delivery, ReliableDelivery, Replay, ReplayEnabled,
11};
12use std::marker::PhantomData;
13use std::process::Stdio;
14use tokio::io::AsyncRead;
15
16mod process_stream_config {
17    use super::OutputStream;
18    use std::process::Stdio;
19    use tokio::io::AsyncRead;
20
21    pub trait Sealed<Stream>
22    where
23        Stream: OutputStream,
24    {
25        /// Returns the [`Stdio`] disposition this configuration requires for the matching child
26        /// stdio slot. Piped backends return [`Stdio::piped()`]; the discard backend returns
27        /// [`Stdio::null()`] so the OS routes the bytes to `/dev/null` without ever crossing into
28        /// the parent.
29        fn child_stdio(&self) -> Stdio;
30
31        /// Constructs the stream wrapper. `captured` is the parent end of the child's pipe when
32        /// `child_stdio()` returned [`Stdio::piped()`]; it is `None` when this configuration set
33        /// the child slot to [`Stdio::null()`] (in which case there is nothing to read from).
34        fn into_stream<S>(self, captured: Option<S>, stream_name: &'static str) -> Stream
35        where
36            S: AsyncRead + Unpin + Send + 'static;
37    }
38}
39
40/// Marker trait for process stream builder configurations.
41///
42/// This trait is sealed. External crates cannot implement additional process stream
43/// configuration types; use [`ProcessStreamBuilder`] to select one of the supported backends.
44pub trait ProcessStreamConfig<Stream>: process_stream_config::Sealed<Stream>
45where
46    Stream: OutputStream,
47{
48}
49
50impl<Config, Stream> ProcessStreamConfig<Stream> for Config
51where
52    Config: process_stream_config::Sealed<Stream>,
53    Stream: OutputStream,
54{
55}
56
57impl<D, R> process_stream_config::Sealed<BroadcastOutputStream<D, R>>
58    for PipedStreamConfig<BroadcastBackend, StreamConfigReadyBuilder<D, R>>
59where
60    D: Delivery,
61    R: Replay,
62{
63    fn child_stdio(&self) -> Stdio {
64        Stdio::piped()
65    }
66
67    fn into_stream<S>(
68        self,
69        captured: Option<S>,
70        stream_name: &'static str,
71    ) -> BroadcastOutputStream<D, R>
72    where
73        S: AsyncRead + Unpin + Send + 'static,
74    {
75        let stream = captured.expect(
76            "broadcast backend requires a captured pipe; child_stdio() promised Stdio::piped()",
77        );
78        BroadcastOutputStream::from_stream(stream, stream_name, self.stage.build())
79    }
80}
81
82impl<D, R> process_stream_config::Sealed<SingleSubscriberOutputStream<D, R>>
83    for PipedStreamConfig<SingleSubscriberBackend, StreamConfigReadyBuilder<D, R>>
84where
85    D: Delivery,
86    R: Replay,
87{
88    fn child_stdio(&self) -> Stdio {
89        Stdio::piped()
90    }
91
92    fn into_stream<S>(
93        self,
94        captured: Option<S>,
95        stream_name: &'static str,
96    ) -> SingleSubscriberOutputStream<D, R>
97    where
98        S: AsyncRead + Unpin + Send + 'static,
99    {
100        let stream = captured.expect(
101            "single-subscriber backend requires a captured pipe; child_stdio() promised \
102             Stdio::piped()",
103        );
104        SingleSubscriberOutputStream::from_stream(stream, stream_name, self.stage.build())
105    }
106}
107
108impl process_stream_config::Sealed<DiscardedOutputStream> for DiscardedStreamConfig {
109    fn child_stdio(&self) -> Stdio {
110        Stdio::null()
111    }
112
113    fn into_stream<S>(
114        self,
115        _captured: Option<S>,
116        stream_name: &'static str,
117    ) -> DiscardedOutputStream
118    where
119        S: AsyncRead + Unpin + Send + 'static,
120    {
121        DiscardedOutputStream::new(stream_name)
122    }
123}
124
125/// Builder for selecting the output stream backend for one process stream.
126///
127/// Backend choice controls stream ownership and fanout. Delivery policy and replay policy are
128/// selected in later builder stages and are independent decisions. The [`Self::discard`] entry
129/// short-circuits the chain entirely for stdio that should be routed to `/dev/null` at the OS
130/// level.
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct ProcessStreamBuilder;
133
134#[doc(hidden)]
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct BroadcastBackend;
137
138#[doc(hidden)]
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub struct SingleSubscriberBackend;
141
142/// Configuration produced by [`ProcessStreamBuilder::discard`]. The matching child stdio slot is
143/// set to [`Stdio::null()`], so the OS discards the bytes; no pipe is allocated and no reader
144/// task runs in the parent.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct DiscardedStreamConfig;
147
148#[doc(hidden)]
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub struct PipedStreamConfig<Backend, Stage> {
151    stage: Stage,
152    _backend: PhantomData<Backend>,
153}
154
155impl<Backend, Stage> PipedStreamConfig<Backend, Stage> {
156    fn new(stage: Stage) -> Self {
157        Self {
158            stage,
159            _backend: PhantomData,
160        }
161    }
162}
163
164impl<Backend> PipedStreamConfig<Backend, StreamConfigBuilder> {
165    /// Selects bounded live delivery where slow consumers may observe gaps or dropped output.
166    #[must_use]
167    pub fn best_effort_delivery(
168        self,
169    ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<BestEffortDelivery>> {
170        PipedStreamConfig::new(self.stage.best_effort_delivery())
171    }
172
173    /// Selects delivery that waits for active consumers when their buffers are full.
174    #[must_use]
175    pub fn reliable_for_active_subscribers(
176        self,
177    ) -> PipedStreamConfig<Backend, StreamConfigReplayBuilder<ReliableDelivery>> {
178        PipedStreamConfig::new(self.stage.reliable_for_active_subscribers())
179    }
180}
181
182impl<Backend, D> PipedStreamConfig<Backend, StreamConfigReplayBuilder<D>>
183where
184    D: Delivery,
185{
186    /// Disables replay for future subscribers.
187    #[must_use]
188    pub fn no_replay(
189        self,
190    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, crate::NoReplay>> {
191        PipedStreamConfig::new(self.stage.no_replay())
192    }
193
194    /// Keeps the latest number of chunks for future subscribers.
195    #[must_use]
196    pub fn replay_last_chunks(
197        self,
198        chunks: usize,
199    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
200        PipedStreamConfig::new(self.stage.replay_last_chunks(chunks))
201    }
202
203    /// Keeps whole chunks covering at least the latest number of bytes.
204    #[must_use]
205    pub fn replay_last_bytes(
206        self,
207        bytes: crate::NumBytes,
208    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
209        PipedStreamConfig::new(self.stage.replay_last_bytes(bytes))
210    }
211
212    /// Keeps all output for the stream lifetime.
213    #[must_use]
214    pub fn replay_all(
215        self,
216    ) -> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, ReplayEnabled>> {
217        PipedStreamConfig::new(self.stage.replay_all())
218    }
219}
220
221impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigReadChunkSizeBuilder<D, R>>
222where
223    D: Delivery,
224    R: Replay,
225{
226    /// Selects the size of chunks read from the underlying process stream.
227    #[must_use]
228    pub fn read_chunk_size(
229        self,
230        read_chunk_size: crate::NumBytes,
231    ) -> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>> {
232        PipedStreamConfig::new(self.stage.read_chunk_size(read_chunk_size))
233    }
234}
235
236impl<Backend, D, R> PipedStreamConfig<Backend, StreamConfigMaxBufferedChunksBuilder<D, R>>
237where
238    D: Delivery,
239    R: Replay,
240{
241    /// Selects the maximum number of chunks held by the underlying async channel.
242    #[must_use]
243    pub fn max_buffered_chunks(
244        self,
245        max_buffered_chunks: usize,
246    ) -> PipedStreamConfig<Backend, StreamConfigReadyBuilder<D, R>> {
247        PipedStreamConfig::new(self.stage.max_buffered_chunks(max_buffered_chunks))
248    }
249}
250
251impl ProcessStreamBuilder {
252    /// Selects the broadcast backend for this stream.
253    ///
254    /// Use this when the same stdout or stderr stream must be consumed concurrently, such as
255    /// logging plus readiness checks or logging plus collection.
256    #[must_use]
257    pub fn broadcast(self) -> PipedStreamConfig<BroadcastBackend, StreamConfigBuilder> {
258        PipedStreamConfig::new(StreamConfig::builder())
259    }
260
261    /// Selects the single-subscriber backend for this stream.
262    ///
263    /// Use this when exactly one active consumer should own the stream and accidental concurrent
264    /// fanout should be rejected early. This backend can reduce coordination overhead in some
265    /// single-consumer paths, but delivery policy still determines lag behavior.
266    #[must_use]
267    pub fn single_subscriber(
268        self,
269    ) -> PipedStreamConfig<SingleSubscriberBackend, StreamConfigBuilder> {
270        PipedStreamConfig::new(StreamConfig::builder())
271    }
272
273    /// Routes the matching child stdio slot to [`Stdio::null()`].
274    ///
275    /// No pipe is allocated, no reader task is spawned, and the resulting stream is a
276    /// [`DiscardedOutputStream`] that does not expose any consumer methods. Reach for this when only
277    /// the exit status matters and the child's output should be dropped at the OS level.
278    #[must_use]
279    pub fn discard(self) -> DiscardedStreamConfig {
280        DiscardedStreamConfig
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::{NumBytes, NumBytesExt};
288    use assertr::prelude::*;
289
290    mod read_chunk_size {
291        use super::*;
292
293        #[test]
294        fn panics_on_zero_value() {
295            assert_that_panic_by(|| {
296                let _config = ProcessStreamBuilder
297                    .single_subscriber()
298                    .best_effort_delivery()
299                    .no_replay()
300                    .read_chunk_size(NumBytes::zero())
301                    .max_buffered_chunks(1);
302            })
303            .has_type::<String>()
304            .is_equal_to("read_chunk_size must be greater than zero bytes");
305        }
306    }
307
308    mod max_buffered_chunks {
309        use super::*;
310
311        #[test]
312        fn panics_on_zero_for_single_subscriber() {
313            assert_that_panic_by(|| {
314                let _config = ProcessStreamBuilder
315                    .single_subscriber()
316                    .best_effort_delivery()
317                    .no_replay()
318                    .read_chunk_size(8.bytes())
319                    .max_buffered_chunks(0);
320            })
321            .has_type::<String>()
322            .is_equal_to("max_buffered_chunks must be greater than zero");
323        }
324
325        #[test]
326        fn panics_on_zero_for_broadcast() {
327            assert_that_panic_by(|| {
328                let _config = ProcessStreamBuilder
329                    .broadcast()
330                    .best_effort_delivery()
331                    .no_replay()
332                    .read_chunk_size(8.bytes())
333                    .max_buffered_chunks(0);
334            })
335            .has_type::<String>()
336            .is_equal_to("max_buffered_chunks must be greater than zero");
337        }
338    }
339}