Skip to main content

tokio_process_tools/output_stream/
config.rs

1use crate::NumBytes;
2use crate::output_stream::policy::{
3    BestEffortDelivery, Delivery, DeliveryGuarantee, NoReplay, ReliableDelivery, Replay,
4    ReplayEnabled, ReplayRetention,
5};
6
7/// Default chunk size read from the source stream. 16 kilobytes.
8pub const DEFAULT_READ_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); // 16 kb
9
10/// Default maximum buffered chunks for stdout and stderr streams. 128 slots.
11pub const DEFAULT_MAX_BUFFERED_CHUNKS: usize = 128;
12
13pub(crate) fn assert_max_buffered_chunks_non_zero(chunks: usize, parameter_name: &str) {
14    assert!(chunks > 0, "{parameter_name} must be greater than zero");
15}
16
17/// Shared output stream configuration for all stream backends.
18///
19/// Backend selection controls whether a stream has one active owner or can fan out to multiple
20/// consumers. This configuration controls delivery, replay, and buffering for whichever backend is
21/// selected.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct StreamConfig<D = BestEffortDelivery, R = NoReplay>
24where
25    D: Delivery,
26    R: Replay,
27{
28    /// The size of an individual chunk read from the underlying process stream.
29    ///
30    /// Must be greater than zero. The default is [`crate::DEFAULT_READ_CHUNK_SIZE`].
31    pub read_chunk_size: NumBytes,
32
33    /// The number of chunks held by the underlying async channel.
34    ///
35    /// Must be greater than zero. The default is [`crate::DEFAULT_MAX_BUFFERED_CHUNKS`].
36    /// With [`DeliveryGuarantee::ReliableForActiveSubscribers`], it is the maximum unread chunk
37    /// lag an active subscriber can have before reading waits.
38    pub max_buffered_chunks: usize,
39
40    /// How slow active subscribers affect reading from the underlying stream.
41    pub delivery: D,
42
43    /// Whether and how replay history is retained for subscribers that attach after output arrives.
44    pub replay: R,
45}
46
47impl StreamConfig<BestEffortDelivery, NoReplay> {
48    /// Starts building an output stream configuration.
49    #[must_use]
50    pub fn builder() -> StreamConfigBuilder {
51        StreamConfigBuilder
52    }
53}
54
55/// Initial builder stage that requires selecting delivery behavior.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct StreamConfigBuilder;
58
59impl StreamConfigBuilder {
60    /// Delivery that keeps reading while slow consumers lag behind.
61    ///
62    /// Bounded buffers may overflow, so active consumers can observe gaps or dropped output. This
63    /// policy avoids applying backpressure for slow consumers, but it is not a blanket throughput
64    /// guarantee; backend implementation and workload shape still matter.
65    #[must_use]
66    pub fn best_effort_delivery(self) -> StreamConfigReplayBuilder<BestEffortDelivery> {
67        StreamConfigReplayBuilder {
68            delivery: BestEffortDelivery,
69        }
70    }
71
72    /// Delivery that waits for active subscribers when their buffers are full.
73    ///
74    /// This applies backpressure to process-output reading so active consumers see all chunks
75    /// delivered inside the library. It does not retain output for consumers that attach later;
76    /// that is controlled by replay settings.
77    #[must_use]
78    pub fn reliable_for_active_subscribers(self) -> StreamConfigReplayBuilder<ReliableDelivery> {
79        StreamConfigReplayBuilder {
80            delivery: ReliableDelivery,
81        }
82    }
83}
84
85/// Builder stage that requires selecting replay behavior.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct StreamConfigReplayBuilder<D>
88where
89    D: Delivery,
90{
91    delivery: D,
92}
93
94impl<D> StreamConfigReplayBuilder<D>
95where
96    D: Delivery,
97{
98    /// Disables replay for future subscribers.
99    ///
100    /// Consumers that attach after output has already arrived start at live output.
101    #[must_use]
102    pub fn no_replay(self) -> StreamConfigReadChunkSizeBuilder<D, NoReplay> {
103        StreamConfigReadChunkSizeBuilder {
104            delivery: self.delivery,
105            replay: NoReplay,
106        }
107    }
108
109    /// Keeps the latest number of chunks for future subscribers.
110    #[must_use]
111    pub fn replay_last_chunks(
112        self,
113        chunks: usize,
114    ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
115        let replay_retention = ReplayRetention::LastChunks(chunks);
116        replay_retention.assert_non_zero("chunks");
117        StreamConfigReadChunkSizeBuilder {
118            delivery: self.delivery,
119            replay: ReplayEnabled::new(replay_retention),
120        }
121    }
122
123    /// Keeps whole chunks covering at least the latest number of bytes.
124    #[must_use]
125    pub fn replay_last_bytes(
126        self,
127        bytes: NumBytes,
128    ) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
129        let replay_retention = ReplayRetention::LastBytes(bytes);
130        replay_retention.assert_non_zero("bytes");
131        StreamConfigReadChunkSizeBuilder {
132            delivery: self.delivery,
133            replay: ReplayEnabled::new(replay_retention),
134        }
135    }
136
137    /// Retains all output of the stream.
138    ///
139    /// This can potentially grow massively and could require a lot of memory.
140    ///
141    /// Make sure to call `seal_replay()` on the stream when all subscribers were created. This
142    /// allows the system to free up memory for data already replayed to all subscribers.
143    #[must_use]
144    pub fn replay_all(self) -> StreamConfigReadChunkSizeBuilder<D, ReplayEnabled> {
145        StreamConfigReadChunkSizeBuilder {
146            delivery: self.delivery,
147            replay: ReplayEnabled::new(ReplayRetention::All),
148        }
149    }
150}
151
152/// Builder stage that requires selecting the read chunk size.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct StreamConfigReadChunkSizeBuilder<D, R>
155where
156    D: Delivery,
157    R: Replay,
158{
159    delivery: D,
160    replay: R,
161}
162
163impl<D, R> StreamConfigReadChunkSizeBuilder<D, R>
164where
165    D: Delivery,
166    R: Replay,
167{
168    /// Selects the size of chunks read from the underlying process stream.
169    ///
170    /// # Panics
171    ///
172    /// Panics if `read_chunk_size` is zero bytes.
173    #[must_use]
174    pub fn read_chunk_size(
175        self,
176        read_chunk_size: NumBytes,
177    ) -> StreamConfigMaxBufferedChunksBuilder<D, R> {
178        read_chunk_size.assert_non_zero("read_chunk_size");
179        StreamConfigMaxBufferedChunksBuilder {
180            delivery: self.delivery,
181            replay: self.replay,
182            read_chunk_size,
183        }
184    }
185}
186
187/// Builder stage that requires selecting the maximum number of buffered chunks.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub struct StreamConfigMaxBufferedChunksBuilder<D, R>
190where
191    D: Delivery,
192    R: Replay,
193{
194    delivery: D,
195    replay: R,
196    read_chunk_size: NumBytes,
197}
198
199impl<D, R> StreamConfigMaxBufferedChunksBuilder<D, R>
200where
201    D: Delivery,
202    R: Replay,
203{
204    /// Selects the number of chunks held by the underlying async channel.
205    ///
206    /// # Panics
207    ///
208    /// Panics if `max_buffered_chunks` is zero.
209    #[must_use]
210    pub fn max_buffered_chunks(self, max_buffered_chunks: usize) -> StreamConfigReadyBuilder<D, R> {
211        assert_max_buffered_chunks_non_zero(max_buffered_chunks, "max_buffered_chunks");
212        StreamConfigReadyBuilder {
213            config: StreamConfig {
214                read_chunk_size: self.read_chunk_size,
215                max_buffered_chunks,
216                delivery: self.delivery,
217                replay: self.replay,
218            },
219        }
220    }
221}
222
223/// Final builder stage for [`StreamConfig`].
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub struct StreamConfigReadyBuilder<D, R>
226where
227    D: Delivery,
228    R: Replay,
229{
230    config: StreamConfig<D, R>,
231}
232
233impl<D, R> StreamConfigReadyBuilder<D, R>
234where
235    D: Delivery,
236    R: Replay,
237{
238    /// Builds the configured stream mode.
239    #[must_use]
240    pub fn build(self) -> StreamConfig<D, R> {
241        self.config
242    }
243}
244
245impl<D, R> StreamConfig<D, R>
246where
247    D: Delivery,
248    R: Replay,
249{
250    /// Returns the runtime delivery guarantee represented by this configuration.
251    #[must_use]
252    pub fn delivery_guarantee(self) -> DeliveryGuarantee {
253        self.delivery.guarantee()
254    }
255
256    /// Returns the replay retention represented by this configuration.
257    #[must_use]
258    pub fn replay_retention(self) -> Option<ReplayRetention> {
259        self.replay.replay_retention()
260    }
261
262    /// Returns whether this configuration enables replay-specific APIs.
263    #[must_use]
264    pub fn replay_enabled(self) -> bool {
265        self.replay.replay_enabled()
266    }
267
268    pub(crate) fn assert_valid(self, parameter_name: &str) {
269        self.read_chunk_size
270            .assert_non_zero(&format!("{parameter_name}.read_chunk_size"));
271        assert_max_buffered_chunks_non_zero(
272            self.max_buffered_chunks,
273            &format!("{parameter_name}.max_buffered_chunks"),
274        );
275        if let Some(replay_retention) = self.replay_retention() {
276            replay_retention.assert_non_zero(&format!("{parameter_name}.replay_retention"));
277        }
278    }
279}
280
281impl<D> StreamConfig<D, ReplayEnabled>
282where
283    D: Delivery,
284{
285    /// Returns this replay-enabled configuration with custom replay retention.
286    #[must_use]
287    pub fn with_replay_retention(mut self, replay_retention: ReplayRetention) -> Self {
288        replay_retention.assert_non_zero("replay_retention");
289        self.replay.replay_retention = replay_retention;
290        self
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::output_stream::num_bytes::NumBytesExt;
298    use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE};
299    use assertr::prelude::*;
300
301    #[test]
302    fn builder_creates_expected_delivery_and_replay_configs() {
303        let config: StreamConfig<BestEffortDelivery, NoReplay> = StreamConfig::builder()
304            .best_effort_delivery()
305            .no_replay()
306            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
307            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
308            .build();
309
310        assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
311        assert_that!(config.replay_enabled()).is_false();
312        assert_that!(config.replay_retention()).is_none();
313        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
314        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
315
316        let config: StreamConfig<ReliableDelivery, NoReplay> = StreamConfig::builder()
317            .reliable_for_active_subscribers()
318            .no_replay()
319            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
320            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
321            .build();
322
323        assert_that!(config.delivery_guarantee())
324            .is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
325        assert_that!(config.replay_enabled()).is_false();
326        assert_that!(config.replay_retention()).is_none();
327        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
328        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
329
330        let config: StreamConfig<BestEffortDelivery, ReplayEnabled> = StreamConfig::builder()
331            .best_effort_delivery()
332            .replay_last_chunks(2)
333            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
334            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
335            .build();
336
337        assert_that!(config.delivery_guarantee()).is_equal_to(DeliveryGuarantee::BestEffort);
338        assert_that!(config.replay_enabled()).is_true();
339        assert_that!(config.replay_retention()).is_equal_to(Some(ReplayRetention::LastChunks(2)));
340        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
341        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
342
343        let config: StreamConfig<ReliableDelivery, ReplayEnabled> = StreamConfig::builder()
344            .reliable_for_active_subscribers()
345            .replay_last_bytes(16.bytes())
346            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
347            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
348            .build();
349
350        assert_that!(config.delivery_guarantee())
351            .is_equal_to(DeliveryGuarantee::ReliableForActiveSubscribers);
352        assert_that!(config.replay_enabled()).is_true();
353        assert_that!(config.replay_retention())
354            .is_equal_to(Some(ReplayRetention::LastBytes(16.bytes())));
355        assert_that!(config.read_chunk_size).is_equal_to(DEFAULT_READ_CHUNK_SIZE);
356        assert_that!(config.max_buffered_chunks).is_equal_to(DEFAULT_MAX_BUFFERED_CHUNKS);
357    }
358
359    #[test]
360    fn invalid_configs_panic_with_parameter_names() {
361        assert_that_panic_by(|| {
362            let _config = StreamConfig::builder()
363                .best_effort_delivery()
364                .no_replay()
365                .read_chunk_size(0.bytes());
366        })
367        .has_type::<String>()
368        .is_equal_to("read_chunk_size must be greater than zero bytes");
369
370        assert_that_panic_by(|| {
371            let _config = StreamConfig::builder()
372                .best_effort_delivery()
373                .no_replay()
374                .read_chunk_size(8.bytes())
375                .max_buffered_chunks(0);
376        })
377        .has_type::<String>()
378        .is_equal_to("max_buffered_chunks must be greater than zero");
379
380        assert_that_panic_by(|| {
381            let _config = StreamConfig::builder()
382                .best_effort_delivery()
383                .replay_last_chunks(0);
384        })
385        .has_type::<String>()
386        .is_equal_to("chunks must retain at least one chunk");
387
388        assert_that_panic_by(|| {
389            let _config = StreamConfig::builder()
390                .best_effort_delivery()
391                .replay_last_bytes(NumBytes::zero());
392        })
393        .has_type::<String>()
394        .is_equal_to("bytes must retain at least one byte");
395
396        assert_that_panic_by(|| {
397            let _replay = ReplayEnabled::new(ReplayRetention::LastChunks(0));
398        })
399        .has_type::<String>()
400        .is_equal_to("replay_retention must retain at least one chunk");
401
402        assert_that_panic_by(|| {
403            let config = StreamConfig::builder()
404                .best_effort_delivery()
405                .replay_all()
406                .read_chunk_size(8.bytes())
407                .max_buffered_chunks(2)
408                .build();
409
410            let _config =
411                config.with_replay_retention(ReplayRetention::LastBytes(NumBytes::zero()));
412        })
413        .has_type::<String>()
414        .is_equal_to("replay_retention must retain at least one byte");
415
416        assert_that_panic_by(|| {
417            let config = StreamConfig {
418                read_chunk_size: 8.bytes(),
419                max_buffered_chunks: 2,
420                delivery: BestEffortDelivery,
421                replay: ReplayEnabled {
422                    replay_retention: ReplayRetention::LastBytes(NumBytes::zero()),
423                },
424            };
425
426            config.assert_valid("options");
427        })
428        .has_type::<String>()
429        .is_equal_to("options.replay_retention must retain at least one byte");
430    }
431
432    #[tokio::test]
433    async fn one_config_constructs_both_stream_backends() {
434        use crate::OutputStream;
435        use crate::output_stream::backend::broadcast::BroadcastOutputStream;
436        use crate::output_stream::backend::single_subscriber::SingleSubscriberOutputStream;
437
438        let config = StreamConfig::builder()
439            .best_effort_delivery()
440            .no_replay()
441            .read_chunk_size(8.bytes())
442            .max_buffered_chunks(2)
443            .build();
444
445        let broadcast = BroadcastOutputStream::from_stream(tokio::io::empty(), "stdout", config);
446        let single_subscriber =
447            SingleSubscriberOutputStream::from_stream(tokio::io::empty(), "stderr", config);
448
449        assert_that!(broadcast.read_chunk_size()).is_equal_to(8.bytes());
450        assert_that!(single_subscriber.read_chunk_size()).is_equal_to(8.bytes());
451        assert_that!(broadcast.max_buffered_chunks()).is_equal_to(2);
452        assert_that!(single_subscriber.max_buffered_chunks()).is_equal_to(2);
453    }
454}