tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8/// We support the following implementations:
9///
10/// - [broadcast::BroadcastOutputStream]
11/// - [single_subscriber::SingleSubscriberOutputStream]
12pub trait OutputStream {}
13
14/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
15/// Although reaching that level requires:
16/// 1. A receiver to listen for chunks.
17/// 2. The channel getting full.
18pub struct FromStreamOptions {
19    /// The size of an individual chunk read from the read buffer in bytes.
20    ///
21    /// default: 16 * 1024 // 16 kb
22    pub chunk_size: usize,
23
24    /// The number of chunks held by the underlying async channel.
25    ///
26    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
27    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
28    /// The bigger, the better, in terms of system resilience to write-spikes.
29    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
30    /// max.
31    pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35    fn default() -> Self {
36        Self {
37            chunk_size: 16 * 1024, // 16 kb
38            channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
39        }
40    }
41}
42
43/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
44/// The slices' length is at max of the previously configured maximum `chunk_size`.
45///
46/// We use the word "chunk", as it is often used when processing collections in segments or when
47/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
48///
49/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
50/// complete logical unit with defined boundaries within a protocol or format. This we do not have
51/// here.
52///
53/// Note: If the underlying stream is of lower buffer size, chunks of length `chunk_size` may
54/// never be observed.
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59    fn as_ref(&self) -> &[u8] {
60        self.0.chunk()
61    }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66    /// ...
67    DropLatestIncomingIfBufferFull,
68
69    /// Will not lead to "lagging" (and dropping frames in the process).
70    /// But this lowers our speed at which we consume output and may affect the application
71    /// captured, as their pipe buffer may get full, requiring the application /
72    /// relying on the application to drop data instead of writing to stdout/stderr in order
73    /// to not block.
74    BlockUntilBufferHasSpace,
75}
76
77/// Control flag to indicate whether processing should continue or break.
78///
79/// Returning `Break` from an `Inspector` will let that inspector stop.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82    Continue,
83    Break,
84}
85
86/// What should happen when a line is too long?
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89    /// Drop any additional data received after the current line was considered too long until
90    /// the next newline character is observed, which then starts a new line.
91    #[default]
92    DropAdditionalData,
93
94    /// Emit the current line when the maximum allowed length is reached.
95    /// Any additional data received is immediately taken as the content of the next line.
96    ///
97    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
98    /// length limit.
99    ///
100    /// No data is dropped with this behavior.
101    EmitAdditionalAsNewLines,
102}
103
104/// Configuration options for parsing lines from a stream.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107    /// Maximum length of a single line in bytes.
108    /// When reached, further data won't be appended to the current line.
109    /// The line will be emitted in its current state.
110    ///
111    /// A value of `0` means that "no limit" is imposed.
112    ///
113    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
114    /// stream maliciously writing endless amounts of data without ever writing a line break
115    /// would starve this system from ever emitting a line and will lead to an infinite amount of
116    /// memory being allocated to hold the line data, letting this process running out of memory!
117    ///
118    /// Defaults to 16 kilobytes.
119    pub max_line_length: NumBytes,
120
121    /// What should happen when a line is too long?
122    pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126    fn default() -> Self {
127        Self {
128            max_line_length: 16.kilobytes(),
129            overflow_behavior: LineOverflowBehavior::default(),
130        }
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138    pub fn zero() -> Self {
139        Self(0)
140    }
141}
142
143pub trait NumBytesExt {
144    fn bytes(self) -> NumBytes;
145
146    fn kilobytes(self) -> NumBytes;
147
148    fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152    fn bytes(self) -> NumBytes {
153        NumBytes(self)
154    }
155
156    fn kilobytes(self) -> NumBytes {
157        NumBytes(self * 1024)
158    }
159
160    fn megabytes(self) -> NumBytes {
161        NumBytes(self * 1024 * 1024)
162    }
163}
164
165/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
166/// already hold some previously written data.
167/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
168/// The remainder of the chunk, not completed with a newline character, will become the new content
169/// of `line_buffer`.
170///
171/// The implementation tries to allocate as little as possible.
172///
173/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
174/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
175/// Only then can the underlying storage, used to capture that line, be reused to capture the
176/// next line.
177///
178/// # Members
179/// * `chunk` - New slice of bytes to process.
180/// * `line_buffer` - Buffer for reading one line.
181///   May hold previously seen, not-yet-closed, line-data.
182pub(crate) struct LineReader<'c, 'b> {
183    chunk: &'c [u8],
184    line_buffer: &'b mut BytesMut,
185    last_line_length: Option<usize>,
186    options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190    pub fn new(
191        chunk: &'c [u8],
192        line_buffer: &'b mut BytesMut,
193        options: LineParsingOptions,
194    ) -> Self {
195        Self {
196            chunk,
197            line_buffer,
198            last_line_length: None,
199            options,
200        }
201    }
202
203    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204        self.line_buffer.extend_from_slice(chunk)
205    }
206
207    fn _take_line(&mut self) -> bytes::Bytes {
208        self.last_line_length = Some(self.line_buffer.len());
209        self.line_buffer.split().freeze()
210    }
211
212    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213        if full_line_buffer {
214            match self.options.overflow_behavior {
215                LineOverflowBehavior::DropAdditionalData => {
216                    // Drop any additional (until the next newline character!)
217                    // and return the current (not regularly finished) line.
218                    let _ = self.chunk.skip_until(b'\n');
219                    self._take_line()
220                }
221                LineOverflowBehavior::EmitAdditionalAsNewLines => {
222                    // Do NOT drop any additional and return the current (not regularly finished)
223                    // line. This will lead to all additional data starting a new line in the
224                    // next iteration.
225                    self._take_line()
226                }
227            }
228        } else {
229            self._take_line()
230        }
231    }
232}
233
234impl Iterator for LineReader<'_, '_> {
235    type Item = bytes::Bytes;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        // Ensure we never go out of bounds with our line buffer.
239        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
240        // too large for our current `options.max_line_length`.
241        assert!(self.line_buffer.len() <= self.options.max_line_length.0);
242
243        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
244        // every iterator must once return `None` to signal that it has finished!
245        // And this, we only do later.
246        if let Some(last_line_length) = self.last_line_length.take() {
247            // The previous iteration yielded line of this length!
248            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
249            if !reclaimed {
250                tracing::warn!(
251                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
252                );
253            }
254        }
255
256        // Code would work without this early-return. But this lets us skip a lot of actions on
257        // empty slices.
258        if self.chunk.is_empty() {
259            return None;
260        }
261
262        // Through our assert above, the first operand will always be bigger!
263        let remaining_line_length = self.options.max_line_length.0 - self.line_buffer.len();
264        // The previous iteration might have filled the line buffer completely.
265        // Apply overflow behavior.
266        if remaining_line_length == 0 {
267            return Some(self.take_line(true));
268        }
269
270        // We have space remaining in our line buffer.
271        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
272        // and the rest.
273        let (usable, rest) = self
274            .chunk
275            .split_at(usize::min(self.chunk.len(), remaining_line_length));
276
277        // Search for the next newline character in the usable portion of our current chunk.
278        match usable.iter().position(|b| *b == b'\n') {
279            None => {
280                // No line break found! Consume the whole usable chunk portion.
281                self.append_to_line_buffer(usable);
282                self.chunk = rest;
283
284                if rest.is_empty() {
285                    // Return None, as we have no more data to process.
286                    // Leftover data in `line_buffer` must be taken care of externally!
287                    None
288                } else {
289                    // Line now full. Would overflow using rest. Return the current line!
290                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
291                    Some(self.take_line(true))
292                }
293            }
294            Some(pos) => {
295                // Found a line break at `pos` - process the line and continue.
296                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
297                self.append_to_line_buffer(usable_until_line_break);
298
299                // We did split our chunk into `let (usable, rest) = ...` earlier.
300                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
301                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
302                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
303                // to get to the "real"/"complete" rest of data.
304                let rest = &self.chunk[usable_until_line_break.len()..];
305
306                // Skip the `\n` byte!
307                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
308
309                // Return the completed line.
310                Some(self.take_line(false))
311            }
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use crate::output_stream::LineReader;
319    use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
320    use assertr::prelude::*;
321    use bytes::{Bytes, BytesMut};
322    use std::time::Duration;
323    use tokio::io::{AsyncWrite, AsyncWriteExt};
324    use tracing_test::traced_test;
325
326    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
327        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
328        tokio::time::sleep(Duration::from_millis(50)).await;
329        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
330        tokio::time::sleep(Duration::from_millis(50)).await;
331        write.write_all("README.md\n".as_bytes()).await.unwrap();
332        tokio::time::sleep(Duration::from_millis(50)).await;
333        write.write_all("src\n".as_bytes()).await.unwrap();
334        tokio::time::sleep(Duration::from_millis(50)).await;
335        write.write_all("target\n".as_bytes()).await.unwrap();
336        tokio::time::sleep(Duration::from_millis(50)).await;
337    }
338
339    #[test]
340    #[traced_test]
341    fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
342        let mut line_buffer = BytesMut::new();
343        let mut collected_lines: Vec<String> = Vec::new();
344
345        let data = "❤️❤️❤️\n👍\n";
346        for byte in data.as_bytes() {
347            let lr = LineReader {
348                chunk: &[*byte],
349                line_buffer: &mut line_buffer,
350                last_line_length: None,
351                options: LineParsingOptions::default(),
352            };
353            for line in lr {
354                collected_lines.push(String::from_utf8_lossy(&line).to_string());
355            }
356        }
357
358        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
359    }
360
361    #[test]
362    #[traced_test]
363    fn reclaims_line_buffer_space_before_collecting_new_line() {
364        let mut line_buffer = BytesMut::new();
365        let mut collected_lines: Vec<String> = Vec::new();
366        let mut bytes: Vec<Bytes> = Vec::new();
367
368        let data = "❤️❤️❤️\n❤️❤️❤️\n";
369        for byte in data.as_bytes() {
370            let lr = LineReader {
371                chunk: &[*byte],
372                line_buffer: &mut line_buffer,
373                last_line_length: None,
374                options: LineParsingOptions::default(),
375            };
376            for line in lr {
377                collected_lines.push(String::from_utf8_lossy(&line).to_string());
378                bytes.push(line);
379            }
380        }
381
382        let data = "❤️❤️❤️\n";
383        let lr = LineReader {
384            chunk: data.as_bytes(),
385            line_buffer: &mut line_buffer,
386            last_line_length: None,
387            options: LineParsingOptions::default(),
388        };
389        for line in lr {
390            collected_lines.push(String::from_utf8_lossy(&line).to_string());
391            bytes.push(line);
392        }
393
394        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
395
396        logs_assert(|lines: &[&str]| {
397            match lines
398                .iter()
399                .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
400                .count()
401            {
402                3 => {}
403                n => return Err(format!("Expected exactly one log, but found {n}")),
404            };
405            Ok(())
406        });
407    }
408
409    #[test]
410    fn line_reader() {
411        // Helper function to reduce duplication in test cases
412        fn run_test_case(
413            test_name: &str,
414            chunk: &[u8],
415            line_buffer_before: &str,
416            line_buffer_after: &str,
417            expected_lines: &[&str],
418            options: LineParsingOptions,
419        ) {
420            let mut line_buffer = BytesMut::from(line_buffer_before);
421            let mut collected_lines: Vec<String> = Vec::new();
422
423            let lr = LineReader {
424                chunk,
425                line_buffer: &mut line_buffer,
426                last_line_length: None,
427                options,
428            };
429            for line in lr {
430                collected_lines.push(String::from_utf8_lossy(&line).to_string());
431            }
432
433            assert_that(line_buffer)
434                .with_detail_message(format!("Test case: {test_name}"))
435                .is_equal_to(line_buffer_after);
436
437            let expected_lines: Vec<String> =
438                expected_lines.iter().map(|s| s.to_string()).collect();
439
440            assert_that(collected_lines)
441                .with_detail_message(format!("Test case: {test_name}"))
442                .is_equal_to(expected_lines);
443        }
444
445        run_test_case(
446            "Test 1: Empty chunk",
447            b"",
448            "previous: ",
449            "previous: ",
450            &[],
451            LineParsingOptions::default(),
452        );
453
454        run_test_case(
455            "Test 2: Chunk with no newlines",
456            b"no newlines here",
457            "previous: ",
458            "previous: no newlines here",
459            &[],
460            LineParsingOptions::default(),
461        );
462
463        run_test_case(
464            "Test 3: Single complete line",
465            b"one line\n",
466            "",
467            "",
468            &["one line"],
469            LineParsingOptions::default(),
470        );
471
472        run_test_case(
473            "Test 4: Multiple complete lines",
474            b"first line\nsecond line\nthird line\n",
475            "",
476            "",
477            &["first line", "second line", "third line"],
478            LineParsingOptions::default(),
479        );
480
481        run_test_case(
482            "Test 5: Partial line at the end",
483            b"complete line\npartial",
484            "",
485            "partial",
486            &["complete line"],
487            LineParsingOptions::default(),
488        );
489
490        run_test_case(
491            "Test 6: Initial line with multiple newlines",
492            b"continuation\nmore lines\n",
493            "previous: ",
494            "",
495            &["previous: continuation", "more lines"],
496            LineParsingOptions::default(),
497        );
498
499        run_test_case(
500            "Test 7: Invalid UTF8 data",
501            b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
502            "",
503            "",
504            &["valid utf8�(�� invalid utf8"],
505            LineParsingOptions::default(),
506        );
507
508        run_test_case(
509            "Test 8 - Rest of too long line is dropped",
510            b"123456789\nabcdefghi\n",
511            "",
512            "",
513            &["1234", "abcd"],
514            LineParsingOptions {
515                max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
516                overflow_behavior: LineOverflowBehavior::DropAdditionalData,
517            },
518        );
519
520        run_test_case(
521            "Test 9 - Rest of too long line is returned as additional lines",
522            b"123456789\nabcdefghi\n",
523            "",
524            "",
525            &["1234", "5678", "9", "abcd", "efgh", "i"],
526            LineParsingOptions {
527                max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
528                overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
529            },
530        );
531    }
532}