Skip to main content

tokio_process_tools/output_stream/
config.rs

1use crate::NumBytes;
2use crate::output_stream::policy::{
3    Delivery, DeliveryGuarantee, LossyWithoutBackpressure, NoReplay, ReliableWithBackpressure,
4    Replay, ReplayEnabled, ReplayRetention,
5};
6
7/// Default chunk size read from the source stream. 16 kilobytes.
8pub const DEFAULT_READ_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); // 16 kb
9
10/// Default maximum buffered chunks for stdout and stderr streams. 128 slots.
11pub const DEFAULT_MAX_BUFFERED_CHUNKS: usize = 128;
12
13pub(crate) fn assert_max_buffered_chunks_non_zero(chunks: usize, parameter_name: &str) {
14    assert!(chunks > 0, "{parameter_name} must be greater than zero");
15}
16
17/// Shared output stream configuration for all stream backends.
18///
19/// Backend selection controls whether a stream has one active owner or can fan out to multiple
20/// consumers. This configuration controls delivery, replay, and buffering for whichever backend is
21/// selected.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct StreamConfig<D = LossyWithoutBackpressure, R = NoReplay>
24where
25    D: Delivery,
26    R: Replay,
27{
28    /// The size of an individual chunk read from the underlying process stream.
29    ///
30    /// Must be greater than zero. The default is [`crate::DEFAULT_READ_CHUNK_SIZE`].
31    pub read_chunk_size: NumBytes,
32
33    /// The number of chunks held by the underlying async channel.
34    ///
35    /// Must be greater than zero. The default is [`crate::DEFAULT_MAX_BUFFERED_CHUNKS`].
36    /// With [`DeliveryGuarantee::ReliableWithBackpressure`], it is the maximum unread chunk lag
37    /// an active subscriber can have before reading waits.
38    pub max_buffered_chunks: usize,
39
40    /// How slow active subscribers affect reading from the underlying stream.
41    pub delivery: D,
42
43    /// Whether and how replay history is retained for subscribers that attach after output arrives.
44    pub replay: R,
45}
46
47impl StreamConfig<LossyWithoutBackpressure, NoReplay> {
48    /// Starts building an output stream configuration.
49    #[must_use]
50    pub fn builder() -> StreamConfigBuilder {
51        StreamConfigBuilder
52    }
53}
54
55/// Initial builder stage that requires selecting delivery behavior.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct StreamConfigBuilder;
58
59impl StreamConfigBuilder {
60    /// Lossy delivery that never blocks the child.
61    ///
62    /// **Mechanism:** the reader task keeps draining the child's pipe regardless of consumer
63    /// pace. When a subscriber's buffer fills, that subscriber's chunk is dropped instead of
64    /// pausing the child.
65    ///
66    /// **Cost:** slow active consumers may observe gaps or dropped output. Line-aware consumers
67    /// discard the in-progress partial line and resync at the next newline rather than splicing
68    /// across the gap.
69    #[must_use]
70    pub fn lossy_without_backpressure(self) -> StreamConfigReplayBuilder<LossyWithoutBackpressure> {
71        StreamConfigReplayBuilder {
72            delivery: LossyWithoutBackpressure,
73        }
74    }
75
76    /// Reliable delivery to active subscribers, paid for with backpressure on the child.
77    ///
78    /// **Mechanism:** when an active subscriber's buffer is full, the reader task waits before
79    /// reading more from the child's pipe. The kernel pipe then fills and the child's next write
80    /// blocks. That blocking is the cost of reliability.
81    ///
82    /// **Scope:** the guarantee applies only to subscribers that are *currently attached* when
83    /// each chunk is produced. Subscribers that attach later are not retroactively delivered
84    /// earlier chunks by this policy; that is what the replay axis is for.
85    #[must_use]
86    pub fn reliable_with_backpressure(self) -> StreamConfigReplayBuilder<ReliableWithBackpressure> {
87        StreamConfigReplayBuilder {
88            delivery: ReliableWithBackpressure,
89        }
90    }
91}
92
93/// Builder stage that requires selecting replay behavior.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub struct StreamConfigReplayBuilder<D>
96where
97    D: Delivery,
98{
99    delivery: D,
100}
101
102impl<D> StreamConfigReplayBuilder<D>
103where
104    D: Delivery,
105{
106    /// Disables replay for future subscribers.
107    ///
108    /// Consumers that attach after output has already arrived start at live output.
109    #[must_use]
110    pub fn no_replay(self) -> StreamConfigReadChunkSizeBuilder<D, NoReplay> {
111        StreamConfigReadChunkSizeBuilder {
112            delivery: self.delivery,
113            replay: NoReplay,
114        }
115    }
116
117    /// Keeps the latest number of chunks for future subscribers.
118    #[must_use]
119    pub fn replay_last_chunks(
120        self,
121        chunks: usize,
122    ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
123        let replay_retention = ReplayRetention::LastChunks(chunks);
124        replay_retention.assert_non_zero("chunks");
125        StreamConfigReadChunkSizeBuilder {
126            delivery: self.delivery,
127            replay: ReplayEnabled::new(replay_retention),
128        }
129    }
130
131    /// Keeps whole chunks covering at least the latest number of bytes.
132    #[must_use]
133    pub fn replay_last_bytes(
134        self,
135        bytes: NumBytes,
136    ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
137        let replay_retention = ReplayRetention::LastBytes(bytes);
138        replay_retention.assert_non_zero("bytes");
139        StreamConfigReadChunkSizeBuilder {
140            delivery: self.delivery,
141            replay: ReplayEnabled::new(replay_retention),
142        }
143    }
144
145    /// Retains all output of the stream.
146    ///
147    /// This can potentially grow massively and could require a lot of memory.
148    ///
149    /// Call `seal_replay()` on the stream once every consumer that needs the retained history has
150    /// attached. Sealing applies to *future* consumers only: it stops new replay from being
151    /// served, allowing the retained buffer to be released. Already-attached subscribers keep
152    /// the snapshot they were given at subscription time and are unaffected by the seal.
153    #[must_use]
154    pub fn replay_all(self) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
155        StreamConfigReadChunkSizeBuilder {
156            delivery: self.delivery,
157            replay: ReplayEnabled::new(ReplayRetention::All),
158        }
159    }
160}
161
162/// Builder stage that requires selecting the read chunk size.
163#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub struct StreamConfigReadChunkSizeBuilder<D, R>
165where
166    D: Delivery,
167    R: Replay,
168{
169    delivery: D,
170    replay: R,
171}
172
173impl<D, R> StreamConfigReadChunkSizeBuilder<D, R>
174where
175    D: Delivery,
176    R: Replay,
177{
178    /// Selects the size of chunks read from the underlying process stream.
179    ///
180    /// # Panics
181    ///
182    /// Panics if `read_chunk_size` is zero bytes.
183    #[must_use]
184    pub fn read_chunk_size(
185        self,
186        read_chunk_size: NumBytes,
187    ) -> StreamConfigMaxBufferedChunksBuilder<D, R> {
188        read_chunk_size.assert_non_zero("read_chunk_size");
189        StreamConfigMaxBufferedChunksBuilder {
190            delivery: self.delivery,
191            replay: self.replay,
192            read_chunk_size,
193        }
194    }
195}
196
197/// Builder stage that requires selecting the maximum number of buffered chunks.
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct StreamConfigMaxBufferedChunksBuilder<D, R>
200where
201    D: Delivery,
202    R: Replay,
203{
204    delivery: D,
205    replay: R,
206    read_chunk_size: NumBytes,
207}
208
209impl<D, R> StreamConfigMaxBufferedChunksBuilder<D, R>
210where
211    D: Delivery,
212    R: Replay,
213{
214    /// Selects the number of chunks held by the underlying async channel.
215    ///
216    /// # Panics
217    ///
218    /// Panics if `max_buffered_chunks` is zero.
219    #[must_use]
220    pub fn max_buffered_chunks(self, max_buffered_chunks: usize) -> StreamConfigReadyBuilder<D, R> {
221        assert_max_buffered_chunks_non_zero(max_buffered_chunks, "max_buffered_chunks");
222        StreamConfigReadyBuilder {
223            config: StreamConfig {
224                read_chunk_size: self.read_chunk_size,
225                max_buffered_chunks,
226                delivery: self.delivery,
227                replay: self.replay,
228            },
229        }
230    }
231}
232
233/// Final builder stage for [`StreamConfig`].
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub struct StreamConfigReadyBuilder<D, R>
236where
237    D: Delivery,
238    R: Replay,
239{
240    config: StreamConfig<D, R>,
241}
242
243impl<D, R> StreamConfigReadyBuilder<D, R>
244where
245    D: Delivery,
246    R: Replay,
247{
248    /// Builds the configured stream mode.
249    #[must_use]
250    pub fn build(self) -> StreamConfig<D, R> {
251        self.config
252    }
253}
254
255impl<D, R> StreamConfig<D, R>
256where
257    D: Delivery,
258    R: Replay,
259{
260    /// Returns the runtime delivery guarantee represented by this configuration.
261    #[must_use]
262    pub fn delivery_guarantee(self) -> DeliveryGuarantee {
263        self.delivery.guarantee()
264    }
265
266    /// Returns the replay retention represented by this configuration.
267    #[must_use]
268    pub fn replay_retention(self) -> Option<ReplayRetention> {
269        self.replay.replay_retention()
270    }
271
272    /// Returns whether this configuration enables replay-specific APIs.
273    #[must_use]
274    pub fn replay_enabled(self) -> bool {
275        self.replay.replay_enabled()
276    }
277
278    pub(crate) fn assert_valid(self, parameter_name: &str) {
279        self.read_chunk_size
280            .assert_non_zero(&format!("{parameter_name}.read_chunk_size"));
281        assert_max_buffered_chunks_non_zero(
282            self.max_buffered_chunks,
283            &format!("{parameter_name}.max_buffered_chunks"),
284        );
285        if let Some(replay_retention) = self.replay_retention() {
286            replay_retention.assert_non_zero(&format!("{parameter_name}.replay_retention"));
287        }
288    }
289}
290
291impl<D> StreamConfig<D, ReplayEnabled>
292where
293    D: Delivery,
294{
295    /// Returns this replay-enabled configuration with custom replay retention.
296    #[must_use]
297    pub fn with_replay_retention(mut self, replay_retention: ReplayRetention) -> Self {
298        replay_retention.assert_non_zero("replay_retention");
299        self.replay.replay_retention = replay_retention;
300        self
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::output_stream::num_bytes::NumBytesExt;
308    use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
309    use assertr::prelude::*;
310
311    #[test]
312    fn builder_creates_expected_delivery_and_replay_configs() {
313        let config: StreamConfig<LossyWithoutBackpressure, NoReplay> = StreamConfig::builder()
314            .lossy_without_backpressure()
315            .no_replay()
316            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
317            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
318            .build();
319
320        assert_that!(config.delivery_guarantee())
321            .is_equal_to(DeliveryGuarantee::LossyWithoutBackpressure);
322        assert_that!(config.replay_enabled()).is_false();
323        assert_that!(config.replay_retention()).is_none();
324        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
325        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
326
327        let config: StreamConfig<ReliableWithBackpressure, NoReplay> = StreamConfig::builder()
328            .reliable_with_backpressure()
329            .no_replay()
330            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
331            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
332            .build();
333
334        assert_that!(config.delivery_guarantee())
335            .is_equal_to(DeliveryGuarantee::ReliableWithBackpressure);
336        assert_that!(config.replay_enabled()).is_false();
337        assert_that!(config.replay_retention()).is_none();
338        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
339        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
340
341        let config: StreamConfig<LossyWithoutBackpressure, ReplayEnabled> = StreamConfig::builder()
342            .lossy_without_backpressure()
343            .replay_last_chunks(2)
344            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
345            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
346            .build();
347
348        assert_that!(config.delivery_guarantee())
349            .is_equal_to(DeliveryGuarantee::LossyWithoutBackpressure);
350        assert_that!(config.replay_enabled()).is_true();
351        assert_that!(config.replay_retention()).is_equal_to(Some(ReplayRetention::LastChunks(2)));
352        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
353        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
354
355        let config: StreamConfig<ReliableWithBackpressure, ReplayEnabled> = StreamConfig::builder()
356            .reliable_with_backpressure()
357            .replay_last_bytes(16.bytes())
358            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
359            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
360            .build();
361
362        assert_that!(config.delivery_guarantee())
363            .is_equal_to(DeliveryGuarantee::ReliableWithBackpressure);
364        assert_that!(config.replay_enabled()).is_true();
365        assert_that!(config.replay_retention())
366            .is_equal_to(Some(ReplayRetention::LastBytes(16.bytes())));
367        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
368        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
369    }
370
371    #[test]
372    fn invalid_configs_panic_with_parameter_names() {
373        assert_that_panic_by(|| {
374            let _config = StreamConfig::builder()
375                .lossy_without_backpressure()
376                .no_replay()
377                .read_chunk_size(0.bytes());
378        })
379        .has_type::<String>()
380        .is_equal_to("read_chunk_size must be greater than zero bytes");
381
382        assert_that_panic_by(|| {
383            let _config = StreamConfig::builder()
384                .lossy_without_backpressure()
385                .no_replay()
386                .read_chunk_size(8.bytes())
387                .max_buffered_chunks(0);
388        })
389        .has_type::<String>()
390        .is_equal_to("max_buffered_chunks must be greater than zero");
391
392        assert_that_panic_by(|| {
393            let _config = StreamConfig::builder()
394                .lossy_without_backpressure()
395                .replay_last_chunks(0);
396        })
397        .has_type::<String>()
398        .is_equal_to("chunks must retain at least one chunk");
399
400        assert_that_panic_by(|| {
401            let _config = StreamConfig::builder()
402                .lossy_without_backpressure()
403                .replay_last_bytes(NumBytes::zero());
404        })
405        .has_type::<String>()
406        .is_equal_to("bytes must retain at least one byte");
407
408        assert_that_panic_by(|| {
409            let _replay = ReplayEnabled::new(ReplayRetention::LastChunks(0));
410        })
411        .has_type::<String>()
412        .is_equal_to("replay_retention must retain at least one chunk");
413
414        assert_that_panic_by(|| {
415            let config = StreamConfig::builder()
416                .lossy_without_backpressure()
417                .replay_all()
418                .read_chunk_size(8.bytes())
419                .max_buffered_chunks(2)
420                .build();
421
422            let _config =
423                config.with_replay_retention(ReplayRetention::LastBytes(NumBytes::zero()));
424        })
425        .has_type::<String>()
426        .is_equal_to("replay_retention must retain at least one byte");
427
428        assert_that_panic_by(|| {
429            let config = StreamConfig {
430                read_chunk_size: 8.bytes(),
431                max_buffered_chunks: 2,
432                delivery: LossyWithoutBackpressure,
433                replay: ReplayEnabled {
434                    replay_retention: ReplayRetention::LastBytes(NumBytes::zero()),
435                },
436            };
437
438            config.assert_valid("options");
439        })
440        .has_type::<String>()
441        .is_equal_to("options.replay_retention must retain at least one byte");
442    }
443
444    #[tokio::test]
445    async fn one_config_constructs_both_stream_backends() {
446        use crate::OutputStream;
447        use crate::output_stream::backend::broadcast::BroadcastOutputStream;
448        use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
449
450        let config = StreamConfig::builder()
451            .lossy_without_backpressure()
452            .no_replay()
453            .read_chunk_size(8.bytes())
454            .max_buffered_chunks(2)
455            .build();
456
457        let broadcast = BroadcastOutputStream::from_stream(tokio::io::empty(), "stdout", config);
458        let single_subscriber =
459            SingleSubscriberOutputStream::from_stream(tokio::io::empty(), "stderr", config);
460
461        assert_that!(broadcast.read_chunk_size()).is_equal_to(8.bytes());
462        assert_that!(single_subscriber.read_chunk_size()).is_equal_to(8.bytes());
463        assert_that!(broadcast.max_buffered_chunks()).is_equal_to(2);
464        assert_that!(single_subscriber.max_buffered_chunks()).is_equal_to(2);
465    }
466}