tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4/// Default chunk size read from the source stream. 16 kilobytes.
5pub const DEFAULT_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); // 16 kb
6
7/// Default channel capacity for stdout and stderr streams. 128 slots.
8pub const DEFAULT_CHANNEL_CAPACITY: usize = 128;
9
10/// Broadcast output stream implementation supporting multiple concurrent consumers.
11pub mod broadcast;
12
13pub(crate) mod impls;
14
15/// Single subscriber output stream implementation for efficient single-consumer scenarios.
16pub mod single_subscriber;
17
18/// We support the following implementations:
19///
20/// - [broadcast::BroadcastOutputStream]
21/// - [single_subscriber::SingleSubscriberOutputStream]
22pub trait OutputStream {
23    /// The maximum size of every chunk read by the backing `stream_reader`.
24    fn chunk_size(&self) -> NumBytes;
25
26    /// The number of chunks held by the underlying async channel.
27    fn channel_capacity(&self) -> usize;
28
29    /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
30    /// It should only be used for logging/debugging.
31    fn name(&self) -> &'static str;
32}
33
34/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
35/// Although reaching that level requires:
36/// 1. A receiver to listen for chunks.
37/// 2. The channel getting full.
38pub struct FromStreamOptions {
39    /// The size of an individual chunk read from the read buffer in bytes.
40    ///
41    /// default: 16 * 1024 // 16 kb
42    pub chunk_size: NumBytes,
43
44    /// The number of chunks held by the underlying async channel.
45    ///
46    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
47    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
48    /// The bigger, the better, in terms of system resilience to write-spikes.
49    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
50    /// max.
51    pub channel_capacity: usize,
52}
53
54impl Default for FromStreamOptions {
55    fn default() -> Self {
56        Self {
57            chunk_size: DEFAULT_CHUNK_SIZE,
58            channel_capacity: DEFAULT_CHANNEL_CAPACITY, // => 16 kb * 128 = 2 mb (max memory usage consumption)
59        }
60    }
61}
62
63/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
64/// The slices' length is at max of the previously configured maximum `chunk_size`.
65///
66/// We use the word "chunk", as it is often used when processing collections in segments or when
67/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
68///
69/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
70/// complete logical unit with defined boundaries within a protocol or format. This we do not have
71/// here.
72///
73/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
74/// never be observed.
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Chunk(bytes::Bytes);
77
78impl AsRef<[u8]> for Chunk {
79    fn as_ref(&self) -> &[u8] {
80        self.0.chunk()
81    }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum BackpressureControl {
86    /// ...
87    DropLatestIncomingIfBufferFull,
88
89    /// Will not lead to "lagging" (and dropping frames in the process).
90    /// But this lowers our speed at which we consume output and may affect the application
91    /// captured, as their pipe buffer may get full, requiring the application /
92    /// relying on the application to drop data instead of writing to stdout/stderr in order
93    /// to not block.
94    BlockUntilBufferHasSpace,
95}
96
97/// Control flag to indicate whether processing should continue or break.
98///
99/// Returning `Break` from an `Inspector`/`Collector` will let that instance stop visiting any
100/// more data.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum Next {
103    /// Interested in receiving additional data.
104    Continue,
105
106    /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
107    Break,
108}
109
110/// What should happen when a line is too long?
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
112pub enum LineOverflowBehavior {
113    /// Drop any additional data received after the current line was considered too long until
114    /// the next newline character is observed, which then starts a new line.
115    #[default]
116    DropAdditionalData,
117
118    /// Emit the current line when the maximum allowed length is reached.
119    /// Any additional data received is immediately taken as the content of the next line.
120    ///
121    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
122    /// length limit.
123    ///
124    /// No data is dropped with this behavior.
125    EmitAdditionalAsNewLines,
126}
127
128/// Configuration options for parsing lines from a stream.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct LineParsingOptions {
131    /// Maximum length of a single line in bytes.
132    /// When reached, further data won't be appended to the current line.
133    /// The line will be emitted in its current state.
134    ///
135    /// A value of `0` means that "no limit" is imposed.
136    ///
137    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
138    /// stream maliciously writing endless amounts of data without ever writing a line break
139    /// would starve this system from ever emitting a line and will lead to an infinite amount of
140    /// memory being allocated to hold the line data, letting this process running out of memory!
141    ///
142    /// Defaults to 16 kilobytes.
143    pub max_line_length: NumBytes,
144
145    /// What should happen when a line is too long?
146    pub overflow_behavior: LineOverflowBehavior,
147}
148
149impl Default for LineParsingOptions {
150    fn default() -> Self {
151        Self {
152            max_line_length: 16.kilobytes(),
153            overflow_behavior: LineOverflowBehavior::default(),
154        }
155    }
156}
157
158/// A wrapper type representing a number of bytes.
159///
160/// Use the [`NumBytesExt`] trait to conveniently create instances:
161/// ```
162/// use tokio_process_tools::NumBytesExt;
163/// let kb = 16.kilobytes();
164/// let mb = 2.megabytes();
165/// ```
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub struct NumBytes(usize);
168
169impl NumBytes {
170    /// Creates a NumBytes value of zero.
171    pub fn zero() -> Self {
172        Self(0)
173    }
174
175    /// The amount of bytes represented by this instance.
176    pub fn bytes(&self) -> usize {
177        self.0
178    }
179}
180
181/// Extension trait providing convenience-functions for creation of [`NumBytes`] of certain sizes.
182pub trait NumBytesExt {
183    /// Interprets the value as literal bytes.
184    fn bytes(self) -> NumBytes;
185
186    /// Interprets the value as kilobytes (value * 1024).
187    fn kilobytes(self) -> NumBytes;
188
189    /// Interprets the value as megabytes (value * 1024 * 1024).
190    fn megabytes(self) -> NumBytes;
191}
192
193impl NumBytesExt for usize {
194    fn bytes(self) -> NumBytes {
195        NumBytes(self)
196    }
197
198    fn kilobytes(self) -> NumBytes {
199        NumBytes(self * 1024)
200    }
201
202    fn megabytes(self) -> NumBytes {
203        NumBytes(self * 1024 * 1024)
204    }
205}
206
207/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
208/// already hold some previously written data.
209/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
210/// The remainder of the chunk, not completed with a newline character, will become the new content
211/// of `line_buffer`.
212///
213/// The implementation tries to allocate as little as possible.
214///
215/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
216/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
217/// Only then can the underlying storage, used to capture that line, be reused to capture the
218/// next line.
219///
220/// # Members
221/// * `chunk` - New slice of bytes to process.
222/// * `line_buffer` - Buffer for reading one line.
223///   May hold previously seen, not-yet-closed, line-data.
224pub(crate) struct LineReader<'c, 'b> {
225    chunk: &'c [u8],
226    line_buffer: &'b mut BytesMut,
227    last_line_length: Option<usize>,
228    options: LineParsingOptions,
229}
230
231impl<'c, 'b> LineReader<'c, 'b> {
232    pub fn new(
233        chunk: &'c [u8],
234        line_buffer: &'b mut BytesMut,
235        options: LineParsingOptions,
236    ) -> Self {
237        Self {
238            chunk,
239            line_buffer,
240            last_line_length: None,
241            options,
242        }
243    }
244
245    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
246        self.line_buffer.extend_from_slice(chunk)
247    }
248
249    fn _take_line(&mut self) -> bytes::Bytes {
250        self.last_line_length = Some(self.line_buffer.len());
251        self.line_buffer.split().freeze()
252    }
253
254    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
255        if full_line_buffer {
256            match self.options.overflow_behavior {
257                LineOverflowBehavior::DropAdditionalData => {
258                    // Drop any additional (until the next newline character!)
259                    // and return the current (not regularly finished) line.
260                    let _ = self.chunk.skip_until(b'\n');
261                    self._take_line()
262                }
263                LineOverflowBehavior::EmitAdditionalAsNewLines => {
264                    // Do NOT drop any additional and return the current (not regularly finished)
265                    // line. This will lead to all additional data starting a new line in the
266                    // next iteration.
267                    self._take_line()
268                }
269            }
270        } else {
271            self._take_line()
272        }
273    }
274}
275
276impl Iterator for LineReader<'_, '_> {
277    type Item = bytes::Bytes;
278
279    fn next(&mut self) -> Option<Self::Item> {
280        // Ensure we never go out of bounds with our line buffer.
281        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
282        // too large for our current `options.max_line_length`.
283        if self.options.max_line_length.0 != 0 {
284            assert!(self.line_buffer.len() <= self.options.max_line_length.0);
285        }
286
287        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
288        // every iterator must once return `None` to signal that it has finished!
289        // And this, we only do later.
290        if let Some(last_line_length) = self.last_line_length.take() {
291            // The previous iteration yielded line of this length!
292            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
293            if !reclaimed {
294                tracing::warn!(
295                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
296                );
297            }
298        }
299
300        // Code would work without this early-return. But this lets us skip a lot of actions on
301        // empty slices.
302        if self.chunk.is_empty() {
303            return None;
304        }
305
306        // Through our assert above, the first operand will always be bigger!
307        let remaining_line_length = if self.options.max_line_length.0 == 0 {
308            usize::MAX
309        } else {
310            self.options.max_line_length.0 - self.line_buffer.len()
311        };
312
313        // The previous iteration might have filled the line buffer completely.
314        // Apply overflow behavior.
315        if remaining_line_length == 0 {
316            return Some(self.take_line(true));
317        }
318
319        // We have space remaining in our line buffer.
320        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
321        // and the rest.
322        let (usable, rest) = self
323            .chunk
324            .split_at(usize::min(self.chunk.len(), remaining_line_length));
325
326        // Search for the next newline character in the usable portion of our current chunk.
327        match usable.iter().position(|b| *b == b'\n') {
328            None => {
329                // No line break found! Consume the whole usable chunk portion.
330                self.append_to_line_buffer(usable);
331                self.chunk = rest;
332
333                if rest.is_empty() {
334                    // Return None, as we have no more data to process.
335                    // Leftover data in `line_buffer` must be taken care of externally!
336                    None
337                } else {
338                    // Line now full. Would overflow using rest. Return the current line!
339                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
340                    Some(self.take_line(true))
341                }
342            }
343            Some(pos) => {
344                // Found a line break at `pos` - process the line and continue.
345                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
346                self.append_to_line_buffer(usable_until_line_break);
347
348                // We did split our chunk into `let (usable, rest) = ...` earlier.
349                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
350                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
351                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
352                // to get to the "real"/"complete" rest of data.
353                let rest = &self.chunk[usable_until_line_break.len()..];
354
355                // Skip the `\n` byte!
356                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
357
358                // Return the completed line.
359                Some(self.take_line(false))
360            }
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use std::time::Duration;
368    use tokio::io::{AsyncWrite, AsyncWriteExt};
369
370    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
371        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
372        tokio::time::sleep(Duration::from_millis(50)).await;
373        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
374        tokio::time::sleep(Duration::from_millis(50)).await;
375        write.write_all("README.md\n".as_bytes()).await.unwrap();
376        tokio::time::sleep(Duration::from_millis(50)).await;
377        write.write_all("src\n".as_bytes()).await.unwrap();
378        tokio::time::sleep(Duration::from_millis(50)).await;
379        write.write_all("target\n".as_bytes()).await.unwrap();
380        tokio::time::sleep(Duration::from_millis(50)).await;
381    }
382
383    mod line_reader {
384        use crate::output_stream::LineReader;
385        use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes, NumBytesExt};
386        use assertr::prelude::*;
387        use bytes::{Bytes, BytesMut};
388        use tracing_test::traced_test;
389
390        #[test]
391        #[traced_test]
392        fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
393            let mut line_buffer = BytesMut::new();
394            let mut collected_lines: Vec<String> = Vec::new();
395
396            let data = "❤️❤️❤️\n👍\n";
397            for byte in data.as_bytes() {
398                let lr = LineReader {
399                    chunk: &[*byte],
400                    line_buffer: &mut line_buffer,
401                    last_line_length: None,
402                    options: LineParsingOptions::default(),
403                };
404                for line in lr {
405                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
406                }
407            }
408
409            assert_that(collected_lines).contains_exactly(["❤️❤️❤️", "👍"]);
410        }
411
412        #[test]
413        #[traced_test]
414        fn reclaims_line_buffer_space_before_collecting_new_line() {
415            let mut line_buffer = BytesMut::new();
416            let mut collected_lines: Vec<String> = Vec::new();
417            let mut bytes: Vec<Bytes> = Vec::new();
418
419            let data = "❤️❤️❤️\n❤️❤️❤️\n";
420            for byte in data.as_bytes() {
421                let lr = LineReader {
422                    chunk: &[*byte],
423                    line_buffer: &mut line_buffer,
424                    last_line_length: None,
425                    options: LineParsingOptions::default(),
426                };
427                for line in lr {
428                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
429                    bytes.push(line);
430                }
431            }
432
433            let data = "❤️❤️❤️\n";
434            let lr = LineReader {
435                chunk: data.as_bytes(),
436                line_buffer: &mut line_buffer,
437                last_line_length: None,
438                options: LineParsingOptions::default(),
439            };
440            for line in lr {
441                collected_lines.push(String::from_utf8_lossy(&line).to_string());
442                bytes.push(line);
443            }
444
445            assert_that(collected_lines).contains_exactly(["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
446
447            logs_assert(|lines: &[&str]| {
448                match lines
449                    .iter()
450                    .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
451                    .count()
452                {
453                    3 => {}
454                    n => return Err(format!("Expected exactly one log, but found {n}")),
455                };
456                Ok(())
457            });
458        }
459
460        // Helper function to reduce duplication in test cases
461        fn run_test_case(
462            chunk: &[u8],
463            line_buffer_before: &str,
464            line_buffer_after: &str,
465            expected_lines: &[&str],
466            options: LineParsingOptions,
467        ) {
468            let mut line_buffer = BytesMut::from(line_buffer_before);
469            let mut collected_lines: Vec<String> = Vec::new();
470
471            let lr = LineReader {
472                chunk,
473                line_buffer: &mut line_buffer,
474                last_line_length: None,
475                options,
476            };
477            for line in lr {
478                collected_lines.push(String::from_utf8_lossy(&line).to_string());
479            }
480
481            assert_that(line_buffer).is_equal_to(line_buffer_after);
482
483            let expected_lines: Vec<String> =
484                expected_lines.iter().map(|s| s.to_string()).collect();
485
486            assert_that(collected_lines).is_equal_to(expected_lines);
487        }
488
489        #[test]
490        fn empty_chunk() {
491            run_test_case(
492                b"",
493                "previous: ",
494                "previous: ",
495                &[],
496                LineParsingOptions::default(),
497            );
498        }
499
500        #[test]
501        fn chunk_without_any_newlines() {
502            run_test_case(
503                b"no newlines here",
504                "previous: ",
505                "previous: no newlines here",
506                &[],
507                LineParsingOptions::default(),
508            );
509        }
510
511        #[test]
512        fn single_completed_line() {
513            run_test_case(
514                b"one line\n",
515                "",
516                "",
517                &["one line"],
518                LineParsingOptions::default(),
519            );
520        }
521
522        #[test]
523        fn multiple_completed_lines() {
524            run_test_case(
525                b"first line\nsecond line\nthird line\n",
526                "",
527                "",
528                &["first line", "second line", "third line"],
529                LineParsingOptions::default(),
530            );
531        }
532
533        #[test]
534        fn partial_line_at_the_end() {
535            run_test_case(
536                b"complete line\npartial",
537                "",
538                "partial",
539                &["complete line"],
540                LineParsingOptions::default(),
541            );
542        }
543
544        #[test]
545        fn initial_line_with_multiple_newlines() {
546            run_test_case(
547                b"continuation\nmore lines\n",
548                "previous: ",
549                "",
550                &["previous: continuation", "more lines"],
551                LineParsingOptions::default(),
552            );
553        }
554
555        #[test]
556        fn invalid_utf8_data() {
557            run_test_case(
558                b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
559                "",
560                "",
561                &["valid utf8�(�� invalid utf8"],
562                LineParsingOptions::default(),
563            );
564        }
565
566        #[test]
567        fn rest_of_too_long_line_is_dropped() {
568            run_test_case(
569                b"123456789\nabcdefghi\n",
570                "",
571                "",
572                &["1234", "abcd"],
573                LineParsingOptions {
574                    max_line_length: 4.bytes(), // Only allow lines with 4 ascii chars (or equiv.) max.
575                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
576                },
577            );
578        }
579
580        #[test]
581        fn rest_of_too_long_line_is_returned_as_additional_lines() {
582            run_test_case(
583                b"123456789\nabcdefghi\n",
584                "",
585                "",
586                &["1234", "5678", "9", "abcd", "efgh", "i"],
587                LineParsingOptions {
588                    max_line_length: 4.bytes(), // Only allow lines with 4 ascii chars (or equiv.) max.
589                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
590                },
591            );
592        }
593
594        #[test]
595        fn max_line_length_of_0_disables_line_length_checks_test1() {
596            run_test_case(
597                b"123456789\nabcdefghi\n",
598                "",
599                "",
600                &["123456789", "abcdefghi"],
601                LineParsingOptions {
602                    max_line_length: NumBytes::zero(),
603                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
604                },
605            );
606        }
607
608        #[test]
609        fn max_line_length_of_0_disables_line_length_checks_test2() {
610            run_test_case(
611                b"123456789\nabcdefghi\n",
612                "",
613                "",
614                &["123456789", "abcdefghi"],
615                LineParsingOptions {
616                    max_line_length: NumBytes::zero(),
617                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
618                },
619            );
620        }
621
622        #[test]
623        fn leading_and_trailing_whitespace_is_preserved() {
624            run_test_case(
625                b"   123456789     \n    abcdefghi        \n",
626                "",
627                "",
628                &["   123456789     ", "    abcdefghi        "],
629                LineParsingOptions {
630                    max_line_length: NumBytes::zero(),
631                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
632                },
633            );
634        }
635    }
636}