Skip to main content

tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use memchr::memchr;
3use std::borrow::Cow;
4
5/// Default chunk size read from the source stream. 16 kilobytes.
6pub const DEFAULT_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); // 16 kb
7
8/// Default channel capacity for stdout and stderr streams. 128 slots.
9pub const DEFAULT_CHANNEL_CAPACITY: usize = 128;
10
11/// Broadcast output stream implementation supporting multiple concurrent consumers.
12pub mod broadcast;
13
14pub(crate) mod impls;
15
16/// Single subscriber output stream implementation for efficient single-consumer scenarios.
17pub mod single_subscriber;
18
19/// We support the following implementations:
20///
21/// - [`broadcast::BroadcastOutputStream`]
22/// - [`single_subscriber::SingleSubscriberOutputStream`]
23pub trait OutputStream {
24    /// The maximum size of every chunk read by the backing `stream_reader`.
25    fn chunk_size(&self) -> NumBytes;
26
27    /// The number of chunks held by the underlying async channel.
28    fn channel_capacity(&self) -> usize;
29
30    /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
31    /// It should only be used for logging/debugging.
32    fn name(&self) -> &'static str;
33}
34
35/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
36/// Although reaching that level requires:
37/// 1. A receiver to listen for chunks.
38/// 2. The channel getting full.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct FromStreamOptions {
41    /// The size of an individual chunk read from the read buffer in bytes.
42    ///
43    /// Must be greater than zero.
44    ///
45    /// default: 16 * 1024 // 16 kb
46    pub chunk_size: NumBytes,
47
48    /// The number of chunks held by the underlying async channel.
49    ///
50    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
51    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
52    /// The bigger, the better, in terms of system resilience to write-spikes.
53    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
54    /// max.
55    pub channel_capacity: usize,
56}
57
58impl Default for FromStreamOptions {
59    fn default() -> Self {
60        Self {
61            chunk_size: DEFAULT_CHUNK_SIZE,
62            channel_capacity: DEFAULT_CHANNEL_CAPACITY, // => 16 kb * 128 = 2 mb (max memory usage consumption)
63        }
64    }
65}
66
67/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
68/// The slices' length is at max of the previously configured maximum `chunk_size`.
69///
70/// We use the word "chunk", as it is often used when processing collections in segments or when
71/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
72///
73/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
74/// complete logical unit with defined boundaries within a protocol or format. This we do not have
75/// here.
76///
77/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
78/// never be observed.
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct Chunk(bytes::Bytes);
81
82impl AsRef<[u8]> for Chunk {
83    fn as_ref(&self) -> &[u8] {
84        self.0.chunk()
85    }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub(crate) enum StreamEvent {
90    Chunk(Chunk),
91    Gap,
92    Eof,
93}
94
95/// Controls how a single-subscriber stream reacts when its in-memory buffer fills up.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum BackpressureControl {
98    /// Drop newly read chunks whenever the in-memory buffer is full.
99    ///
100    /// This keeps the stream reader moving and avoids backpressuring the child process, but a slow
101    /// consumer may miss output.
102    DropLatestIncomingIfBufferFull,
103
104    /// Wait for buffer space instead of dropping chunks.
105    ///
106    /// This avoids losing output inside the library, but it can slow down stream consumption. If
107    /// the child process writes faster than the consumer can keep up, the OS pipe may fill and
108    /// backpressure the child process itself.
109    BlockUntilBufferHasSpace,
110}
111
112/// Control flag to indicate whether processing should continue or break.
113///
114/// Returning `Break` from an `Inspector`/`Collector` will let that instance stop visiting any
115/// more data.
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum Next {
118    /// Interested in receiving additional data.
119    Continue,
120
121    /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
122    Break,
123}
124
125/// What should happen when a line is too long?
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum LineOverflowBehavior {
128    /// Drop any additional data received after the current line was considered too long until
129    /// the next newline character is observed, which then starts a new line.
130    ///
131    /// The discard state persists across chunk boundaries. Once the limit is reached, subsequent
132    /// bytes are ignored until a real newline is observed.
133    #[default]
134    DropAdditionalData,
135
136    /// Emit the current line when the maximum allowed length is reached.
137    /// Any additional data received is immediately taken as the content of the next line.
138    ///
139    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
140    /// length limit.
141    ///
142    /// No data is dropped with this behavior.
143    EmitAdditionalAsNewLines,
144}
145
146/// Controls how line-based write helpers delimit successive lines.
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub enum LineWriteMode {
149    /// Write lines exactly as parsed, without appending any delimiter.
150    ///
151    /// Use this when your mapper already includes delimiters or when the downstream format does
152    /// not want line separators reintroduced.
153    AsIs,
154
155    /// Append a trailing `\n` after each emitted line.
156    ///
157    /// This reconstructs conventional line-oriented output after parsing removed the original
158    /// newline byte.
159    AppendLf,
160}
161
162/// Configuration options for parsing lines from a stream.
163#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub struct LineParsingOptions {
165    /// Maximum length of a single line in bytes.
166    /// When reached, further data won't be appended to the current line.
167    /// The line will be emitted in its current state.
168    ///
169    /// A value of `0` means that "no limit" is imposed.
170    ///
171    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
172    /// stream maliciously writing endless amounts of data without ever writing a line break
173    /// would starve this system from ever emitting a line and will lead to an infinite amount of
174    /// memory being allocated to hold the line data, letting this process running out of memory!
175    ///
176    /// Defaults to 16 kilobytes.
177    pub max_line_length: NumBytes,
178
179    /// What should happen when a line is too long?
180    ///
181    /// When lossy buffering drops chunks before they reach the parser, line-based consumers
182    /// conservatively discard any partial line and resynchronize at the next newline instead of
183    /// joining bytes across the gap.
184    pub overflow_behavior: LineOverflowBehavior,
185}
186
187impl Default for LineParsingOptions {
188    fn default() -> Self {
189        Self {
190            max_line_length: 16.kilobytes(),
191            overflow_behavior: LineOverflowBehavior::default(),
192        }
193    }
194}
195
196/// Stateful parser for turning arbitrary byte chunks into lines.
197pub(crate) struct LineParserState {
198    line_buffer: BytesMut,
199    discard_until_newline: bool,
200}
201
202impl LineParserState {
203    pub fn new() -> Self {
204        Self {
205            line_buffer: BytesMut::new(),
206            discard_until_newline: false,
207        }
208    }
209
210    pub fn on_gap(&mut self) {
211        self.line_buffer.clear();
212        self.discard_until_newline = true;
213    }
214
215    pub fn visit_chunk(
216        &mut self,
217        mut chunk: &[u8],
218        options: LineParsingOptions,
219        mut f: impl FnMut(Cow<'_, str>) -> Next,
220    ) -> Next {
221        while !chunk.is_empty() {
222            if self.discard_until_newline {
223                match memchr(b'\n', chunk) {
224                    Some(pos) => {
225                        self.discard_until_newline = false;
226                        chunk = &chunk[pos + 1..];
227                    }
228                    None => return Next::Continue,
229                }
230                continue;
231            }
232
233            if options.max_line_length.0 != 0 && self.line_buffer.len() == options.max_line_length.0
234            {
235                match options.overflow_behavior {
236                    LineOverflowBehavior::DropAdditionalData => {
237                        if self.emit_line(&mut f) == Next::Break {
238                            return Next::Break;
239                        }
240                        self.discard_until_newline = true;
241                    }
242                    LineOverflowBehavior::EmitAdditionalAsNewLines => {
243                        if self.emit_line(&mut f) == Next::Break {
244                            return Next::Break;
245                        }
246                    }
247                }
248                continue;
249            }
250
251            let remaining_line_length = if options.max_line_length.0 == 0 {
252                chunk.len()
253            } else {
254                options.max_line_length.0 - self.line_buffer.len()
255            };
256            let scan_len = remaining_line_length.min(chunk.len());
257            let scan = &chunk[..scan_len];
258
259            if let Some(pos) = memchr(b'\n', scan) {
260                // Optimization: Complete line in chunk? Then do not copy into BytesMut first.
261                let result = if self.line_buffer.is_empty() {
262                    f(String::from_utf8_lossy(&scan[..pos]))
263                } else {
264                    self.line_buffer.extend_from_slice(&scan[..pos]);
265                    self.emit_line(&mut f)
266                };
267
268                if result == Next::Break {
269                    return Next::Break;
270                }
271                chunk = &chunk[pos + 1..];
272                continue;
273            }
274
275            self.line_buffer.extend_from_slice(scan);
276            chunk = &chunk[scan_len..];
277
278            if options.max_line_length.0 != 0
279                && self.line_buffer.len() == options.max_line_length.0
280                && matches!(
281                    options.overflow_behavior,
282                    LineOverflowBehavior::EmitAdditionalAsNewLines
283                )
284                && self.emit_line(&mut f) == Next::Break
285            {
286                return Next::Break;
287            }
288        }
289
290        Next::Continue
291    }
292
293    pub(crate) fn owned_lines<'a>(
294        &'a mut self,
295        chunk: &'a [u8],
296        options: LineParsingOptions,
297    ) -> OwnedLineReader<'a> {
298        OwnedLineReader {
299            parser: self,
300            chunk,
301            options,
302        }
303    }
304
305    pub fn finish(&self, f: impl FnOnce(Cow<'_, str>) -> Next) -> Next {
306        if self.discard_until_newline || self.line_buffer.is_empty() {
307            Next::Continue
308        } else {
309            f(String::from_utf8_lossy(&self.line_buffer))
310        }
311    }
312
313    pub(crate) fn finish_owned(&self) -> Option<String> {
314        if self.discard_until_newline || self.line_buffer.is_empty() {
315            None
316        } else {
317            Some(String::from_utf8_lossy(&self.line_buffer).into_owned())
318        }
319    }
320
321    fn emit_line(&mut self, f: &mut impl FnMut(Cow<'_, str>) -> Next) -> Next {
322        let line = self.line_buffer.split().freeze();
323        f(String::from_utf8_lossy(&line))
324    }
325
326    fn emit_owned_line(&mut self) -> String {
327        let line = self.line_buffer.split().freeze();
328        String::from_utf8_lossy(&line).into_owned()
329    }
330}
331
332pub(crate) struct OwnedLineReader<'a> {
333    parser: &'a mut LineParserState,
334    chunk: &'a [u8],
335    options: LineParsingOptions,
336}
337
338impl Iterator for OwnedLineReader<'_> {
339    type Item = String;
340
341    fn next(&mut self) -> Option<Self::Item> {
342        while !self.chunk.is_empty() {
343            if self.parser.discard_until_newline {
344                if let Some(pos) = memchr(b'\n', self.chunk) {
345                    self.parser.discard_until_newline = false;
346                    self.chunk = &self.chunk[pos + 1..];
347                } else {
348                    self.chunk = &[];
349                    return None;
350                }
351                continue;
352            }
353
354            if self.options.max_line_length.0 != 0
355                && self.parser.line_buffer.len() == self.options.max_line_length.0
356            {
357                return match self.options.overflow_behavior {
358                    LineOverflowBehavior::DropAdditionalData => {
359                        self.parser.discard_until_newline = true;
360                        Some(self.parser.emit_owned_line())
361                    }
362                    LineOverflowBehavior::EmitAdditionalAsNewLines => {
363                        Some(self.parser.emit_owned_line())
364                    }
365                };
366            }
367
368            let remaining_line_length = if self.options.max_line_length.0 == 0 {
369                self.chunk.len()
370            } else {
371                self.options.max_line_length.0 - self.parser.line_buffer.len()
372            };
373            let scan_len = remaining_line_length.min(self.chunk.len());
374            let scan = &self.chunk[..scan_len];
375
376            if let Some(pos) = memchr(b'\n', scan) {
377                self.chunk = &self.chunk[pos + 1..];
378                if self.parser.line_buffer.is_empty() {
379                    return Some(String::from_utf8_lossy(&scan[..pos]).into_owned());
380                }
381                self.parser.line_buffer.extend_from_slice(&scan[..pos]);
382                return Some(self.parser.emit_owned_line());
383            }
384
385            self.parser.line_buffer.extend_from_slice(scan);
386            self.chunk = &self.chunk[scan_len..];
387
388            if self.options.max_line_length.0 != 0
389                && self.parser.line_buffer.len() == self.options.max_line_length.0
390                && matches!(
391                    self.options.overflow_behavior,
392                    LineOverflowBehavior::EmitAdditionalAsNewLines
393                )
394            {
395                return Some(self.parser.emit_owned_line());
396            }
397        }
398
399        None
400    }
401}
402
403/// A wrapper type representing a number of bytes.
404///
405/// Use the [`NumBytesExt`] trait to conveniently create instances:
406/// ```
407/// use tokio_process_tools::NumBytesExt;
408/// let kb = 16.kilobytes();
409/// let mb = 2.megabytes();
410/// ```
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub struct NumBytes(usize);
413
414impl NumBytes {
415    /// Creates a `NumBytes` value of zero.
416    #[must_use]
417    pub fn zero() -> Self {
418        Self(0)
419    }
420
421    pub(crate) fn assert_non_zero(self, parameter_name: &str) {
422        assert!(
423            self.0 > 0,
424            "{parameter_name} must be greater than zero bytes"
425        );
426    }
427
428    /// The amount of bytes represented by this instance.
429    #[must_use]
430    pub fn bytes(&self) -> usize {
431        self.0
432    }
433}
434
435/// Extension trait providing convenience-functions for creation of [`NumBytes`] of certain sizes.
436pub trait NumBytesExt {
437    /// Interprets the value as literal bytes.
438    fn bytes(self) -> NumBytes;
439
440    /// Interprets the value as kilobytes (value * 1024).
441    fn kilobytes(self) -> NumBytes;
442
443    /// Interprets the value as megabytes (value * 1024 * 1024).
444    fn megabytes(self) -> NumBytes;
445}
446
447impl NumBytesExt for usize {
448    fn bytes(self) -> NumBytes {
449        NumBytes(self)
450    }
451
452    fn kilobytes(self) -> NumBytes {
453        NumBytes(self * 1024)
454    }
455
456    fn megabytes(self) -> NumBytes {
457        NumBytes(self * 1024 * 1024)
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::time::Duration;
464    use tokio::io::{AsyncWrite, AsyncWriteExt};
465
466    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
467        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
468        tokio::time::sleep(Duration::from_millis(50)).await;
469        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
470        tokio::time::sleep(Duration::from_millis(50)).await;
471        write.write_all("README.md\n".as_bytes()).await.unwrap();
472        tokio::time::sleep(Duration::from_millis(50)).await;
473        write.write_all("src\n".as_bytes()).await.unwrap();
474        tokio::time::sleep(Duration::from_millis(50)).await;
475        write.write_all("target\n".as_bytes()).await.unwrap();
476        tokio::time::sleep(Duration::from_millis(50)).await;
477    }
478
479    mod line_parser_state {
480        use crate::output_stream::LineParserState;
481        use crate::{LineOverflowBehavior, LineParsingOptions, Next, NumBytes, NumBytesExt};
482        use assertr::prelude::*;
483
484        fn run_test_case(
485            chunks: &[&[u8]],
486            mark_gap_before_chunk: Option<usize>,
487            expected_lines: &[&str],
488            options: LineParsingOptions,
489        ) {
490            let mut parser = LineParserState::new();
491            let mut collected_lines = Vec::<String>::new();
492
493            for (index, chunk) in chunks.iter().enumerate() {
494                if mark_gap_before_chunk == Some(index) {
495                    parser.on_gap();
496                }
497
498                assert_that!(parser.visit_chunk(chunk, options, |line| {
499                    collected_lines.push(line.into_owned());
500                    Next::Continue
501                }))
502                .is_equal_to(Next::Continue);
503            }
504
505            let _ = parser.finish(|line| {
506                collected_lines.push(line.into_owned());
507                Next::Continue
508            });
509
510            let expected_lines: Vec<String> = expected_lines
511                .iter()
512                .map(std::string::ToString::to_string)
513                .collect();
514            assert_that!(collected_lines).is_equal_to(expected_lines);
515        }
516
517        fn as_single_byte_chunks(data: &str) -> Vec<&[u8]> {
518            data.as_bytes().iter().map(std::slice::from_ref).collect()
519        }
520
521        #[test]
522        fn empty_chunk() {
523            run_test_case(&[b""], None, &[], LineParsingOptions::default());
524        }
525
526        #[test]
527        fn chunk_without_any_newlines() {
528            run_test_case(
529                &[b"no newlines here"],
530                None,
531                &["no newlines here"],
532                LineParsingOptions::default(),
533            );
534        }
535
536        #[test]
537        fn single_completed_line() {
538            run_test_case(
539                &[b"one line\n"],
540                None,
541                &["one line"],
542                LineParsingOptions::default(),
543            );
544        }
545
546        #[test]
547        fn multiple_completed_lines() {
548            run_test_case(
549                &[b"first line\nsecond line\nthird line\n"],
550                None,
551                &["first line", "second line", "third line"],
552                LineParsingOptions::default(),
553            );
554        }
555
556        #[test]
557        fn partial_line_at_the_end() {
558            run_test_case(
559                &[b"complete line\npartial"],
560                None,
561                &["complete line", "partial"],
562                LineParsingOptions::default(),
563            );
564        }
565
566        #[test]
567        fn initial_line_with_multiple_newlines() {
568            run_test_case(
569                &[b"previous: continuation\nmore lines\n"],
570                None,
571                &["previous: continuation", "more lines"],
572                LineParsingOptions::default(),
573            );
574        }
575
576        #[test]
577        fn invalid_utf8_data() {
578            run_test_case(
579                &[b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n"],
580                None,
581                &["valid utf8�(�� invalid utf8"],
582                LineParsingOptions::default(),
583            );
584        }
585
586        #[test]
587        fn rest_of_too_long_line_is_dropped() {
588            run_test_case(
589                &[b"123456789\nabcdefghi\n"],
590                None,
591                &["1234", "abcd"],
592                LineParsingOptions {
593                    max_line_length: 4.bytes(),
594                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
595                },
596            );
597        }
598
599        #[test]
600        fn rest_of_too_long_line_is_returned_as_additional_lines() {
601            run_test_case(
602                &[b"123456789\nabcdefghi\n"],
603                None,
604                &["1234", "5678", "9", "abcd", "efgh", "i"],
605                LineParsingOptions {
606                    max_line_length: 4.bytes(),
607                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
608                },
609            );
610        }
611
612        #[test]
613        fn max_line_length_of_0_disables_line_length_checks_test1() {
614            run_test_case(
615                &[b"123456789\nabcdefghi\n"],
616                None,
617                &["123456789", "abcdefghi"],
618                LineParsingOptions {
619                    max_line_length: NumBytes::zero(),
620                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
621                },
622            );
623        }
624
625        #[test]
626        fn max_line_length_of_0_disables_line_length_checks_test2() {
627            run_test_case(
628                &[b"123456789\nabcdefghi\n"],
629                None,
630                &["123456789", "abcdefghi"],
631                LineParsingOptions {
632                    max_line_length: NumBytes::zero(),
633                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
634                },
635            );
636        }
637
638        #[test]
639        fn leading_and_trailing_whitespace_is_preserved() {
640            run_test_case(
641                &[b"   123456789     \n    abcdefghi        \n"],
642                None,
643                &["   123456789     ", "    abcdefghi        "],
644                LineParsingOptions {
645                    max_line_length: NumBytes::zero(),
646                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
647                },
648            );
649        }
650
651        #[test]
652        fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
653            let chunks = as_single_byte_chunks("❤️❤️❤️\n👍\n");
654            run_test_case(
655                &chunks,
656                None,
657                &["❤️❤️❤️", "👍"],
658                LineParsingOptions::default(),
659            );
660        }
661
662        #[test]
663        fn overflow_drop_additional_data_persists_across_chunks() {
664            run_test_case(
665                &[b"1234", b"5678", b"9\nok\n"],
666                None,
667                &["1234", "ok"],
668                LineParsingOptions {
669                    max_line_length: 4.bytes(),
670                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
671                },
672            );
673        }
674
675        #[test]
676        fn gap_discards_partial_line_until_next_newline() {
677            run_test_case(
678                &[b"rea", b"dy\nnext\n"],
679                Some(1),
680                &["next"],
681                LineParsingOptions::default(),
682            );
683        }
684    }
685}