tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8/// We support the following implementations:
9///
10/// - [broadcast::BroadcastOutputStream]
11/// - [single_subscriber::SingleSubscriberOutputStream]
12pub trait OutputStream {}
13
14/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
15/// Although reaching that level requires:
16/// 1. A receiver to listen for chunks.
17/// 2. The channel getting full.
18pub struct FromStreamOptions {
19    /// The size of an individual chunk read from the read buffer in bytes.
20    ///
21    /// default: 16 * 1024 // 16 kb
22    pub chunk_size: usize,
23
24    /// The number of chunks held by the underlying async channel.
25    ///
26    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
27    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
28    /// The bigger, the better, in terms of system resilience to write-spikes.
29    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
30    /// max.
31    pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35    fn default() -> Self {
36        Self {
37            chunk_size: 16 * 1024, // 16 kb
38            channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
39        }
40    }
41}
42
43/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
44/// The slices' length is at max of the previously configured maximum `chunk_size`.
45///
46/// We use the word "chunk", as it is often used when processing collections in segments or when
47/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
48///
49/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
50/// complete logical unit with defined boundaries within a protocol or format. This we do not have
51/// here.
52///
53/// Note: If the underlying stream is of lower buffer size, chunks of length `chunk_size` may
54/// never be observed.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59    fn as_ref(&self) -> &[u8] {
60        self.0.chunk()
61    }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66    /// ...
67    DropLatestIncomingIfBufferFull,
68
69    /// Will not lead to "lagging" (and dropping frames in the process).
70    /// But this lowers our speed at which we consume output and may affect the application
71    /// captured, as their pipe buffer may get full, requiring the application /
72    /// relying on the application to drop data instead of writing to stdout/stderr in order
73    /// to not block.
74    BlockUntilBufferHasSpace,
75}
76
77/// Control flag to indicate whether processing should continue or break.
78///
79/// Returning `Break` from an `Inspector` will let that inspector stop.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82    Continue,
83    Break,
84}
85
86/// What should happen when a line is too long?
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89    /// Drop any additional data received after the current line was considered too long until
90    /// the next newline character is observed, which then starts a new line.
91    #[default]
92    DropAdditionalData,
93
94    /// Emit the current line when the maximum allowed length is reached.
95    /// Any additional data received is immediately taken as the content of the next line.
96    ///
97    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
98    /// length limit.
99    ///
100    /// No data is dropped with this behavior.
101    EmitAdditionalAsNewLines,
102}
103
104/// Configuration options for parsing lines from a stream.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107    /// Maximum length of a single line in bytes.
108    /// When reached, further data won't be appended to the current line.
109    /// The line will be emitted in its current state.
110    ///
111    /// A value of `0` means that "no limit" is imposed.
112    ///
113    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
114    /// stream maliciously writing endless amounts of data without ever writing a line break
115    /// would starve this system from ever emitting a line and will lead to an infinite amount of
116    /// memory being allocated to hold the line data, letting this process running out of memory!
117    ///
118    /// Defaults to 16 kilobytes.
119    pub max_line_length: NumBytes,
120
121    /// What should happen when a line is too long?
122    pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126    fn default() -> Self {
127        Self {
128            max_line_length: 16.kilobytes(),
129            overflow_behavior: LineOverflowBehavior::default(),
130        }
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138    pub fn zero() -> Self {
139        Self(0)
140    }
141}
142
143pub trait NumBytesExt {
144    fn bytes(self) -> NumBytes;
145
146    fn kilobytes(self) -> NumBytes;
147
148    fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152    fn bytes(self) -> NumBytes {
153        NumBytes(self)
154    }
155
156    fn kilobytes(self) -> NumBytes {
157        NumBytes(self * 1024)
158    }
159
160    fn megabytes(self) -> NumBytes {
161        NumBytes(self * 1024 * 1024)
162    }
163}
164
165/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
166/// already hold some previously written data.
167/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
168/// The remainder of the chunk, not completed with a newline character, will become the new content
169/// of `line_buffer`.
170///
171/// The implementation tries to allocate as little as possible.
172///
173/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
174/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
175/// Only then can the underlying storage, used to capture that line, be reused to capture the
176/// next line.
177///
178/// # Members
179/// * `chunk` - New slice of bytes to process.
180/// * `line_buffer` - Buffer for reading one line.
181///   May hold previously seen, not-yet-closed, line-data.
182pub(crate) struct LineReader<'c, 'b> {
183    chunk: &'c [u8],
184    line_buffer: &'b mut BytesMut,
185    last_line_length: Option<usize>,
186    options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190    pub fn new(
191        chunk: &'c [u8],
192        line_buffer: &'b mut BytesMut,
193        options: LineParsingOptions,
194    ) -> Self {
195        Self {
196            chunk,
197            line_buffer,
198            last_line_length: None,
199            options,
200        }
201    }
202
203    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204        self.line_buffer.extend_from_slice(chunk)
205    }
206
207    fn _take_line(&mut self) -> bytes::Bytes {
208        self.last_line_length = Some(self.line_buffer.len());
209        self.line_buffer.split().freeze()
210    }
211
212    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213        if full_line_buffer {
214            match self.options.overflow_behavior {
215                LineOverflowBehavior::DropAdditionalData => {
216                    // Drop any additional (until the next newline character!)
217                    // and return the current (not regularly finished) line.
218                    let _ = self.chunk.skip_until(b'\n');
219                    self._take_line()
220                }
221                LineOverflowBehavior::EmitAdditionalAsNewLines => {
222                    // Do NOT drop any additional and return the current (not regularly finished)
223                    // line. This will lead to all additional data starting a new line in the
224                    // next iteration.
225                    self._take_line()
226                }
227            }
228        } else {
229            self._take_line()
230        }
231    }
232}
233
234impl Iterator for LineReader<'_, '_> {
235    type Item = bytes::Bytes;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        // Ensure we never go out of bounds with our line buffer.
239        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
240        // too large for our current `options.max_line_length`.
241        if self.options.max_line_length.0 != 0 {
242            assert!(self.line_buffer.len() <= self.options.max_line_length.0);
243        }
244
245        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
246        // every iterator must once return `None` to signal that it has finished!
247        // And this, we only do later.
248        if let Some(last_line_length) = self.last_line_length.take() {
249            // The previous iteration yielded line of this length!
250            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
251            if !reclaimed {
252                tracing::warn!(
253                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
254                );
255            }
256        }
257
258        // Code would work without this early-return. But this lets us skip a lot of actions on
259        // empty slices.
260        if self.chunk.is_empty() {
261            return None;
262        }
263
264        // Through our assert above, the first operand will always be bigger!
265        let remaining_line_length = if self.options.max_line_length.0 == 0 {
266            usize::MAX
267        } else {
268            self.options.max_line_length.0 - self.line_buffer.len()
269        };
270
271        // The previous iteration might have filled the line buffer completely.
272        // Apply overflow behavior.
273        if remaining_line_length == 0 {
274            return Some(self.take_line(true));
275        }
276
277        // We have space remaining in our line buffer.
278        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
279        // and the rest.
280        let (usable, rest) = self
281            .chunk
282            .split_at(usize::min(self.chunk.len(), remaining_line_length));
283
284        // Search for the next newline character in the usable portion of our current chunk.
285        match usable.iter().position(|b| *b == b'\n') {
286            None => {
287                // No line break found! Consume the whole usable chunk portion.
288                self.append_to_line_buffer(usable);
289                self.chunk = rest;
290
291                if rest.is_empty() {
292                    // Return None, as we have no more data to process.
293                    // Leftover data in `line_buffer` must be taken care of externally!
294                    None
295                } else {
296                    // Line now full. Would overflow using rest. Return the current line!
297                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
298                    Some(self.take_line(true))
299                }
300            }
301            Some(pos) => {
302                // Found a line break at `pos` - process the line and continue.
303                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
304                self.append_to_line_buffer(usable_until_line_break);
305
306                // We did split our chunk into `let (usable, rest) = ...` earlier.
307                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
308                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
309                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
310                // to get to the "real"/"complete" rest of data.
311                let rest = &self.chunk[usable_until_line_break.len()..];
312
313                // Skip the `\n` byte!
314                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
315
316                // Return the completed line.
317                Some(self.take_line(false))
318            }
319        }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::time::Duration;
326    use tokio::io::{AsyncWrite, AsyncWriteExt};
327
328    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
329        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
330        tokio::time::sleep(Duration::from_millis(50)).await;
331        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
332        tokio::time::sleep(Duration::from_millis(50)).await;
333        write.write_all("README.md\n".as_bytes()).await.unwrap();
334        tokio::time::sleep(Duration::from_millis(50)).await;
335        write.write_all("src\n".as_bytes()).await.unwrap();
336        tokio::time::sleep(Duration::from_millis(50)).await;
337        write.write_all("target\n".as_bytes()).await.unwrap();
338        tokio::time::sleep(Duration::from_millis(50)).await;
339    }
340
341    mod line_reader {
342        use crate::output_stream::LineReader;
343        use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
344        use assertr::prelude::*;
345        use bytes::{Bytes, BytesMut};
346        use tracing_test::traced_test;
347
348        #[test]
349        #[traced_test]
350        fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
351            let mut line_buffer = BytesMut::new();
352            let mut collected_lines: Vec<String> = Vec::new();
353
354            let data = "❤️❤️❤️\n👍\n";
355            for byte in data.as_bytes() {
356                let lr = LineReader {
357                    chunk: &[*byte],
358                    line_buffer: &mut line_buffer,
359                    last_line_length: None,
360                    options: LineParsingOptions::default(),
361                };
362                for line in lr {
363                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
364                }
365            }
366
367            assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
368        }
369
370        #[test]
371        #[traced_test]
372        fn reclaims_line_buffer_space_before_collecting_new_line() {
373            let mut line_buffer = BytesMut::new();
374            let mut collected_lines: Vec<String> = Vec::new();
375            let mut bytes: Vec<Bytes> = Vec::new();
376
377            let data = "❤️❤️❤️\n❤️❤️❤️\n";
378            for byte in data.as_bytes() {
379                let lr = LineReader {
380                    chunk: &[*byte],
381                    line_buffer: &mut line_buffer,
382                    last_line_length: None,
383                    options: LineParsingOptions::default(),
384                };
385                for line in lr {
386                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
387                    bytes.push(line);
388                }
389            }
390
391            let data = "❤️❤️❤️\n";
392            let lr = LineReader {
393                chunk: data.as_bytes(),
394                line_buffer: &mut line_buffer,
395                last_line_length: None,
396                options: LineParsingOptions::default(),
397            };
398            for line in lr {
399                collected_lines.push(String::from_utf8_lossy(&line).to_string());
400                bytes.push(line);
401            }
402
403            assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
404
405            logs_assert(|lines: &[&str]| {
406                match lines
407                    .iter()
408                    .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
409                    .count()
410                {
411                    3 => {}
412                    n => return Err(format!("Expected exactly one log, but found {n}")),
413                };
414                Ok(())
415            });
416        }
417
418        // Helper function to reduce duplication in test cases
419        fn run_test_case(
420            chunk: &[u8],
421            line_buffer_before: &str,
422            line_buffer_after: &str,
423            expected_lines: &[&str],
424            options: LineParsingOptions,
425        ) {
426            let mut line_buffer = BytesMut::from(line_buffer_before);
427            let mut collected_lines: Vec<String> = Vec::new();
428
429            let lr = LineReader {
430                chunk,
431                line_buffer: &mut line_buffer,
432                last_line_length: None,
433                options,
434            };
435            for line in lr {
436                collected_lines.push(String::from_utf8_lossy(&line).to_string());
437            }
438
439            assert_that(line_buffer).is_equal_to(line_buffer_after);
440
441            let expected_lines: Vec<String> =
442                expected_lines.iter().map(|s| s.to_string()).collect();
443
444            assert_that(collected_lines).is_equal_to(expected_lines);
445        }
446
447        #[test]
448        fn empty_chunk() {
449            run_test_case(
450                b"",
451                "previous: ",
452                "previous: ",
453                &[],
454                LineParsingOptions::default(),
455            );
456        }
457
458        #[test]
459        fn chunk_without_any_newlines() {
460            run_test_case(
461                b"no newlines here",
462                "previous: ",
463                "previous: no newlines here",
464                &[],
465                LineParsingOptions::default(),
466            );
467        }
468
469        #[test]
470        fn single_completed_line() {
471            run_test_case(
472                b"one line\n",
473                "",
474                "",
475                &["one line"],
476                LineParsingOptions::default(),
477            );
478        }
479
480        #[test]
481        fn multiple_completed_lines() {
482            run_test_case(
483                b"first line\nsecond line\nthird line\n",
484                "",
485                "",
486                &["first line", "second line", "third line"],
487                LineParsingOptions::default(),
488            );
489        }
490
491        #[test]
492        fn partial_line_at_the_end() {
493            run_test_case(
494                b"complete line\npartial",
495                "",
496                "partial",
497                &["complete line"],
498                LineParsingOptions::default(),
499            );
500        }
501
502        #[test]
503        fn initial_line_with_multiple_newlines() {
504            run_test_case(
505                b"continuation\nmore lines\n",
506                "previous: ",
507                "",
508                &["previous: continuation", "more lines"],
509                LineParsingOptions::default(),
510            );
511        }
512
513        #[test]
514        fn invalid_utf8_data() {
515            run_test_case(
516                b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
517                "",
518                "",
519                &["valid utf8�(�� invalid utf8"],
520                LineParsingOptions::default(),
521            );
522        }
523
524        #[test]
525        fn rest_of_too_long_line_is_dropped() {
526            run_test_case(
527                b"123456789\nabcdefghi\n",
528                "",
529                "",
530                &["1234", "abcd"],
531                LineParsingOptions {
532                    max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
533                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
534                },
535            );
536        }
537
538        #[test]
539        fn rest_of_too_long_line_is_returned_as_additional_lines() {
540            run_test_case(
541                b"123456789\nabcdefghi\n",
542                "",
543                "",
544                &["1234", "5678", "9", "abcd", "efgh", "i"],
545                LineParsingOptions {
546                    max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
547                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
548                },
549            );
550        }
551
552        #[test]
553        fn max_line_length_of_0_disables_line_length_checks_test1() {
554            run_test_case(
555                b"123456789\nabcdefghi\n",
556                "",
557                "",
558                &["123456789", "abcdefghi"],
559                LineParsingOptions {
560                    max_line_length: NumBytes(0),
561                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
562                },
563            );
564        }
565
566        #[test]
567        fn max_line_length_of_0_disables_line_length_checks_test2() {
568            run_test_case(
569                b"123456789\nabcdefghi\n",
570                "",
571                "",
572                &["123456789", "abcdefghi"],
573                LineParsingOptions {
574                    max_line_length: NumBytes(0),
575                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
576                },
577            );
578        }
579
580        #[test]
581        fn leading_and_trailing_whitespace_is_preserved() {
582            run_test_case(
583                b"   123456789     \n    abcdefghi        \n",
584                "",
585                "",
586                &["   123456789     ", "    abcdefghi        "],
587                LineParsingOptions {
588                    max_line_length: NumBytes(0),
589                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
590                },
591            );
592        }
593    }
594}