tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8/// We support the following implementations:
9///
10/// - [broadcast::BroadcastOutputStream]
11/// - [single_subscriber::SingleSubscriberOutputStream]
12pub trait OutputStream {}
13
14/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
15/// Although reaching that level requires:
16/// 1. A receiver to listen for chunks.
17/// 2. The channel getting full.
18pub struct FromStreamOptions {
19    /// The size of an individual chunk read from the read buffer in bytes.
20    ///
21    /// default: 16 * 1024 // 16 kb
22    pub chunk_size: usize,
23
24    /// The number of chunks held by the underlying async channel.
25    ///
26    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
27    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
28    /// The bigger, the better, in terms of system resilience to write-spikes.
29    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
30    /// max.
31    pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35    fn default() -> Self {
36        Self {
37            chunk_size: 16 * 1024, // 16 kb
38            channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
39        }
40    }
41}
42
43/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
44/// The slices' length is at max of the previously configured maximum `chunk_size`.
45///
46/// We use the word "chunk", as it is often used when processing collections in segments or when
47/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
48///
49/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
50/// complete logical unit with defined boundaries within a protocol or format. This we do not have
51/// here.
52///
53/// Note: If the underlying stream is of lower buffer size, chunks of length `chunk_size` may
54/// never be observed.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59    fn as_ref(&self) -> &[u8] {
60        self.0.chunk()
61    }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66    /// ...
67    DropLatestIncomingIfBufferFull,
68
69    /// Will not lead to "lagging" (and dropping frames in the process).
70    /// But this lowers our speed at which we consume output and may affect the application
71    /// captured, as their pipe buffer may get full, requiring the application /
72    /// relying on the application to drop data instead of writing to stdout/stderr in order
73    /// to not block.
74    BlockUntilBufferHasSpace,
75}
76
77/// Control flag to indicate whether processing should continue or break.
78///
79/// Returning `Break` from an `Inspector` will let that inspector stop.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82    Continue,
83    Break,
84}
85
86/// What should happen when a line is too long?
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89    /// Drop any additional data received after the current line was considered too long until
90    /// the next newline character is observed, which then starts a new line.
91    #[default]
92    DropAdditionalData,
93
94    /// Emit the current line when the maximum allowed length is reached.
95    /// Any additional data received is immediately taken as the content of the next line.
96    ///
97    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
98    /// length limit.
99    ///
100    /// No data is dropped with this behavior.
101    EmitAdditionalAsNewLines,
102}
103
104/// Configuration options for parsing lines from a stream.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107    /// Maximum length of a single line in bytes.
108    /// When reached, further data won't be appended to the current line.
109    /// The line will be emitted in its current state.
110    ///
111    /// A value of `0` means that "no limit" is imposed.
112    ///
113    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
114    /// stream maliciously writing endless amounts of data without ever writing a line break
115    /// would starve this system from ever emitting a line and will lead to an infinite amount of
116    /// memory being allocated to hold the line data, letting this process running out of memory!
117    ///
118    /// Defaults to 16 kilobytes.
119    pub max_line_length: NumBytes,
120
121    /// What should happen when a line is too long?
122    pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126    fn default() -> Self {
127        Self {
128            max_line_length: 16.kilobytes(),
129            overflow_behavior: LineOverflowBehavior::default(),
130        }
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138    pub fn zero() -> Self {
139        Self(0)
140    }
141}
142
143pub trait NumBytesExt {
144    fn bytes(self) -> NumBytes;
145
146    fn kilobytes(self) -> NumBytes;
147
148    fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152    fn bytes(self) -> NumBytes {
153        NumBytes(self)
154    }
155
156    fn kilobytes(self) -> NumBytes {
157        NumBytes(self * 1024)
158    }
159
160    fn megabytes(self) -> NumBytes {
161        NumBytes(self * 1024 * 1024)
162    }
163}
164
165/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
166/// already hold some previously written data.
167/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
168/// The remainder of the chunk, not completed with a newline character, will become the new content
169/// of `line_buffer`.
170///
171/// The implementation tries to allocate as little as possible.
172///
173/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
174/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
175/// Only then can the underlying storage, used to capture that line, be reused to capture the
176/// next line.
177///
178/// # Members
179/// * `chunk` - New slice of bytes to process.
180/// * `line_buffer` - Buffer for reading one line.
181///   May hold previously seen, not-yet-closed, line-data.
182pub(crate) struct LineReader<'c, 'b> {
183    chunk: &'c [u8],
184    line_buffer: &'b mut BytesMut,
185    last_line_length: Option<usize>,
186    options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190    pub fn new(
191        chunk: &'c [u8],
192        line_buffer: &'b mut BytesMut,
193        options: LineParsingOptions,
194    ) -> Self {
195        Self {
196            chunk,
197            line_buffer,
198            last_line_length: None,
199            options,
200        }
201    }
202
203    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204        self.line_buffer.extend_from_slice(chunk)
205    }
206
207    fn _take_line(&mut self) -> bytes::Bytes {
208        self.last_line_length = Some(self.line_buffer.len());
209        self.line_buffer.split().freeze()
210    }
211
212    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213        if full_line_buffer {
214            match self.options.overflow_behavior {
215                LineOverflowBehavior::DropAdditionalData => {
216                    // Drop any additional (until the next newline character!)
217                    // and return the current (not regularly finished) line.
218                    let _ = self.chunk.skip_until(b'\n');
219                    self._take_line()
220                }
221                LineOverflowBehavior::EmitAdditionalAsNewLines => {
222                    // Do NOT drop any additional and return the current (not regularly finished)
223                    // line. This will lead to all additional data starting a new line in the
224                    // next iteration.
225                    self._take_line()
226                }
227            }
228        } else {
229            self._take_line()
230        }
231    }
232}
233
234impl Iterator for LineReader<'_, '_> {
235    type Item = bytes::Bytes;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        // Ensure we never go out of bounds with our line buffer.
239        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
240        // too large for our current `options.max_line_length`.
241        if self.options.max_line_length.0 != 0 {
242            assert!(self.line_buffer.len() <= self.options.max_line_length.0);
243        }
244
245        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
246        // every iterator must once return `None` to signal that it has finished!
247        // And this, we only do later.
248        if let Some(last_line_length) = self.last_line_length.take() {
249            // The previous iteration yielded line of this length!
250            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
251            if !reclaimed {
252                tracing::warn!(
253                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
254                );
255            }
256        }
257
258        // Code would work without this early-return. But this lets us skip a lot of actions on
259        // empty slices.
260        if self.chunk.is_empty() {
261            return None;
262        }
263
264        // Through our assert above, the first operand will always be bigger!
265        let remaining_line_length = if self.options.max_line_length.0 == 0 {
266            usize::MAX
267        } else {
268            self.options.max_line_length.0 - self.line_buffer.len()
269       };
270
271        // The previous iteration might have filled the line buffer completely.
272        // Apply overflow behavior.
273        if remaining_line_length == 0 {
274            return Some(self.take_line(true));
275        }
276
277        // We have space remaining in our line buffer.
278        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
279        // and the rest.
280        let (usable, rest) = self
281            .chunk
282            .split_at(usize::min(self.chunk.len(), remaining_line_length));
283
284        // Search for the next newline character in the usable portion of our current chunk.
285        match usable.iter().position(|b| *b == b'\n') {
286            None => {
287                // No line break found! Consume the whole usable chunk portion.
288                self.append_to_line_buffer(usable);
289                self.chunk = rest;
290
291                if rest.is_empty() {
292                    // Return None, as we have no more data to process.
293                    // Leftover data in `line_buffer` must be taken care of externally!
294                    None
295                } else {
296                    // Line now full. Would overflow using rest. Return the current line!
297                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
298                    Some(self.take_line(true))
299                }
300            }
301            Some(pos) => {
302                // Found a line break at `pos` - process the line and continue.
303                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
304                self.append_to_line_buffer(usable_until_line_break);
305
306                // We did split our chunk into `let (usable, rest) = ...` earlier.
307                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
308                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
309                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
310                // to get to the "real"/"complete" rest of data.
311                let rest = &self.chunk[usable_until_line_break.len()..];
312
313                // Skip the `\n` byte!
314                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
315
316                // Return the completed line.
317                Some(self.take_line(false))
318            }
319        }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use crate::output_stream::LineReader;
326    use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
327    use assertr::prelude::*;
328    use bytes::{Bytes, BytesMut};
329    use std::time::Duration;
330    use tokio::io::{AsyncWrite, AsyncWriteExt};
331    use tracing_test::traced_test;
332
333    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
334        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
335        tokio::time::sleep(Duration::from_millis(50)).await;
336        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
337        tokio::time::sleep(Duration::from_millis(50)).await;
338        write.write_all("README.md\n".as_bytes()).await.unwrap();
339        tokio::time::sleep(Duration::from_millis(50)).await;
340        write.write_all("src\n".as_bytes()).await.unwrap();
341        tokio::time::sleep(Duration::from_millis(50)).await;
342        write.write_all("target\n".as_bytes()).await.unwrap();
343        tokio::time::sleep(Duration::from_millis(50)).await;
344    }
345
346    #[test]
347    #[traced_test]
348    fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
349        let mut line_buffer = BytesMut::new();
350        let mut collected_lines: Vec<String> = Vec::new();
351
352        let data = "❤️❤️❤️\n👍\n";
353        for byte in data.as_bytes() {
354            let lr = LineReader {
355                chunk: &[*byte],
356                line_buffer: &mut line_buffer,
357                last_line_length: None,
358                options: LineParsingOptions::default(),
359            };
360            for line in lr {
361                collected_lines.push(String::from_utf8_lossy(&line).to_string());
362            }
363        }
364
365        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
366    }
367
368    #[test]
369    #[traced_test]
370    fn reclaims_line_buffer_space_before_collecting_new_line() {
371        let mut line_buffer = BytesMut::new();
372        let mut collected_lines: Vec<String> = Vec::new();
373        let mut bytes: Vec<Bytes> = Vec::new();
374
375        let data = "❤️❤️❤️\n❤️❤️❤️\n";
376        for byte in data.as_bytes() {
377            let lr = LineReader {
378                chunk: &[*byte],
379                line_buffer: &mut line_buffer,
380                last_line_length: None,
381                options: LineParsingOptions::default(),
382            };
383            for line in lr {
384                collected_lines.push(String::from_utf8_lossy(&line).to_string());
385                bytes.push(line);
386            }
387        }
388
389        let data = "❤️❤️❤️\n";
390        let lr = LineReader {
391            chunk: data.as_bytes(),
392            line_buffer: &mut line_buffer,
393            last_line_length: None,
394            options: LineParsingOptions::default(),
395        };
396        for line in lr {
397            collected_lines.push(String::from_utf8_lossy(&line).to_string());
398            bytes.push(line);
399        }
400
401        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
402
403        logs_assert(|lines: &[&str]| {
404            match lines
405                .iter()
406                .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
407                .count()
408            {
409                3 => {}
410                n => return Err(format!("Expected exactly one log, but found {n}")),
411            };
412            Ok(())
413        });
414    }
415
416    #[test]
417    fn line_reader() {
418        // Helper function to reduce duplication in test cases
419        fn run_test_case(
420            test_name: &str,
421            chunk: &[u8],
422            line_buffer_before: &str,
423            line_buffer_after: &str,
424            expected_lines: &[&str],
425            options: LineParsingOptions,
426        ) {
427            let mut line_buffer = BytesMut::from(line_buffer_before);
428            let mut collected_lines: Vec<String> = Vec::new();
429
430            let lr = LineReader {
431                chunk,
432                line_buffer: &mut line_buffer,
433                last_line_length: None,
434                options,
435            };
436            for line in lr {
437                collected_lines.push(String::from_utf8_lossy(&line).to_string());
438            }
439
440            assert_that(line_buffer)
441                .with_detail_message(format!("Test case: {test_name}"))
442                .is_equal_to(line_buffer_after);
443
444            let expected_lines: Vec<String> =
445                expected_lines.iter().map(|s| s.to_string()).collect();
446
447            assert_that(collected_lines)
448                .with_detail_message(format!("Test case: {test_name}"))
449                .is_equal_to(expected_lines);
450        }
451
452        run_test_case(
453            "Test 1: Empty chunk",
454            b"",
455            "previous: ",
456            "previous: ",
457            &[],
458            LineParsingOptions::default(),
459        );
460
461        run_test_case(
462            "Test 2: Chunk with no newlines",
463            b"no newlines here",
464            "previous: ",
465            "previous: no newlines here",
466            &[],
467            LineParsingOptions::default(),
468        );
469
470        run_test_case(
471            "Test 3: Single complete line",
472            b"one line\n",
473            "",
474            "",
475            &["one line"],
476            LineParsingOptions::default(),
477        );
478
479        run_test_case(
480            "Test 4: Multiple complete lines",
481            b"first line\nsecond line\nthird line\n",
482            "",
483            "",
484            &["first line", "second line", "third line"],
485            LineParsingOptions::default(),
486        );
487
488        run_test_case(
489            "Test 5: Partial line at the end",
490            b"complete line\npartial",
491            "",
492            "partial",
493            &["complete line"],
494            LineParsingOptions::default(),
495        );
496
497        run_test_case(
498            "Test 6: Initial line with multiple newlines",
499            b"continuation\nmore lines\n",
500            "previous: ",
501            "",
502            &["previous: continuation", "more lines"],
503            LineParsingOptions::default(),
504        );
505
506        run_test_case(
507            "Test 7: Invalid UTF8 data",
508            b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
509            "",
510            "",
511            &["valid utf8�(�� invalid utf8"],
512            LineParsingOptions::default(),
513        );
514
515        run_test_case(
516            "Test 8 - Rest of too long line is dropped",
517            b"123456789\nabcdefghi\n",
518            "",
519            "",
520            &["1234", "abcd"],
521            LineParsingOptions {
522                max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
523                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
524            },
525        );
526
527        run_test_case(
528            "Test 9 - Rest of too long line is returned as additional lines",
529            b"123456789\nabcdefghi\n",
530            "",
531            "",
532            &["1234", "5678", "9", "abcd", "efgh", "i"],
533            LineParsingOptions {
534                max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
535                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
536            },
537        );
538
539        run_test_case(
540            "Test 10 - max line length of 0 disables line length checks #1",
541            b"123456789\nabcdefghi\n",
542            "",
543            "",
544            &["123456789", "abcdefghi"],
545            LineParsingOptions {
546                max_line_length: NumBytes(0),
547                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
548            },
549        );
550
551        run_test_case(
552            "Test 11 - max line length of 0 disables line length checks #2",
553            b"123456789\nabcdefghi\n",
554            "",
555            "",
556            &["123456789", "abcdefghi"],
557            LineParsingOptions {
558                max_line_length: NumBytes(0),
559                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
560            },
561        );
562    }
563}