tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2
3pub mod broadcast;
4pub(crate) mod impls;
5pub mod single_subscriber;
6
7/// We support the following implementations:
8///
9/// - [broadcast::BroadcastOutputStream]
10/// - [single_subscriber::SingleSubscriberOutputStream]
11pub trait OutputStream {}
12
13/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
14/// Although reaching that level requires:
15/// 1. A receiver to listen for chunks.
16/// 2. The channel getting full.
17pub struct FromStreamOptions {
18    /// The size of an individual chunk read from the read buffer in bytes.
19    ///
20    /// default: 16 * 1024 // 16 kb
21    pub chunk_size: usize,
22
23    /// The number of chunks held by the underlying async channel.
24    ///
25    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
26    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
27    /// The bigger, the better, in terms of system resilience to write-spikes.
28    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
29    /// max.
30    pub channel_capacity: usize,
31}
32
33impl Default for FromStreamOptions {
34    fn default() -> Self {
35        Self {
36            chunk_size: 16 * 1024, // 16 kb
37            channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
38        }
39    }
40}
41
42/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
43/// The slices' length is at max of the previously configured maximum `chunk_size`.
44///
45/// We use the word "chunk", as it is often used when processing collections in segments or when
46/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
47///
48/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
49/// complete logical unit with defined boundaries within a protocol or format. This we do not have
50/// here.
51///
52/// Note: If the underlying stream is of lower buffer size, chunks of length `chunk_size` may
53/// never be observed.
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct Chunk(bytes::Bytes);
56
57impl AsRef<[u8]> for Chunk {
58    fn as_ref(&self) -> &[u8] {
59        self.0.chunk()
60    }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BackpressureControl {
65    /// ...
66    DropLatestIncomingIfBufferFull,
67
68    /// Will not lead to "lagging" (and dropping frames in the process).
69    /// But this lowers our speed at which we consume output and may affect the application
70    /// captured, as their pipe buffer may get full, requiring the application /
71    /// relying on the application to drop data instead of writing to stdout/stderr in order
72    /// to not block.
73    BlockUntilBufferHasSpace,
74}
75
76/// Control flag to indicate whether processing should continue or break.
77///
78/// Returning `Break` from an `Inspector` will let that inspector stop.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum Next {
81    Continue,
82    Break,
83}
84
85/// What should happen when a line is too long?
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
87pub enum LineOverflowBehavior {
88    /// Drop any additional data received after the current line was considered too long until
89    /// the next newline character is observed, which then starts a new line.
90    #[default]
91    DropAdditionalData,
92
93    /// Emit the current line when the maximum allowed length is reached.
94    /// Any additional data received is immediately taken as the content of the next line.
95    ///
96    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
97    /// length limit.
98    ///
99    /// No data is dropped with this behavior.
100    EmitAdditionalAsNewLines,
101}
102
103/// Configuration options for parsing lines from a stream.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct LineParsingOptions {
106    /// Maximum length of a single line in bytes.
107    /// When reached, further data won't be appended to the current line.
108    /// The line will be emitted in its current state.
109    ///
110    /// A value of `0` means that "no limit" is imposed.
111    ///
112    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
113    /// stream maliciously writing endless amounts of data without ever writing a line break
114    /// would starve this system from ever emitting a line and will lead to an infinite amount of
115    /// memory being allocated to hold the line data, letting this process running out of memory!
116    ///
117    /// Defaults to 16 kilobytes.
118    pub max_line_length: NumBytes,
119
120    /// What should happen when a line is too long?
121    pub overflow_behavior: LineOverflowBehavior,
122}
123
124impl Default for LineParsingOptions {
125    fn default() -> Self {
126        Self {
127            max_line_length: 16.kilobytes(),
128            overflow_behavior: LineOverflowBehavior::default(),
129        }
130    }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub struct NumBytes(usize);
135
136impl NumBytes {
137    pub fn zero() -> Self {
138        Self(0)
139    }
140}
141
142pub trait NumBytesExt {
143    fn bytes(self) -> NumBytes;
144
145    fn kilobytes(self) -> NumBytes;
146
147    fn megabytes(self) -> NumBytes;
148}
149
150impl NumBytesExt for usize {
151    fn bytes(self) -> NumBytes {
152        NumBytes(self)
153    }
154
155    fn kilobytes(self) -> NumBytes {
156        NumBytes(self * 1024)
157    }
158
159    fn megabytes(self) -> NumBytes {
160        NumBytes(self * 1024 * 1024)
161    }
162}
163
164/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
165/// already hold some previously written data.
166/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
167/// The remainder of the chunk, not completed with a newline character, will become the new content
168/// of `line_buffer`.
169///
170/// The implementation tries to allocate as little as possible.
171///
172/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
173/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
174/// Only then can the underlying storage, used to capture that line, be reused to capture the
175/// next line.
176///
177/// # Members
178/// * `chunk` - New slice of bytes to process.
179/// * `line_buffer` - Buffer for reading one line.
180///   May hold previously seen, not-yet-closed, line-data.
181pub(crate) struct LineReader<'c, 'b> {
182    chunk: &'c [u8],
183    line_buffer: &'b mut BytesMut,
184    last_line_length: Option<usize>,
185    options: LineParsingOptions,
186}
187
188impl<'c, 'b> LineReader<'c, 'b> {
189    pub fn new(
190        chunk: &'c [u8],
191        line_buffer: &'b mut BytesMut,
192        options: LineParsingOptions,
193    ) -> Self {
194        Self {
195            chunk,
196            line_buffer,
197            last_line_length: None,
198            options,
199        }
200    }
201
202    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
203        self.line_buffer.extend_from_slice(chunk)
204    }
205
206    fn _take_line(&mut self) -> bytes::Bytes {
207        self.last_line_length = Some(self.line_buffer.len());
208        self.line_buffer.split().freeze()
209    }
210
211    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
212        if full_line_buffer {
213            match self.options.overflow_behavior {
214                LineOverflowBehavior::DropAdditionalData => {
215                    // Drop any additional and return the current (not regularly finished) line.
216                    self.chunk = &[];
217                    self._take_line()
218                }
219                LineOverflowBehavior::EmitAdditionalAsNewLines => {
220                    // Do NOT drop any additional and return the current (not regularly finished)
221                    // line. This will lead to all additional data starting a new line in the
222                    // next iteration.
223                    self._take_line()
224                }
225            }
226        } else {
227            self._take_line()
228        }
229    }
230}
231
232impl Iterator for LineReader<'_, '_> {
233    type Item = bytes::Bytes;
234
235    fn next(&mut self) -> Option<Self::Item> {
236        // Ensure we never go out of bounds with our line buffer.
237        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
238        // too large for our current `options.max_line_length`.
239        assert!(self.line_buffer.len() <= self.options.max_line_length.0);
240
241        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
242        // every iterator must once return `None` to signal that it has finished!
243        // And this, we only do later.
244        if let Some(last_line_length) = self.last_line_length.take() {
245            // The previous iteration yielded line of this length!
246            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
247            if !reclaimed {
248                tracing::warn!(
249                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
250                );
251            }
252        }
253
254        // Code would work without this early-return. But this lets us skip a lot of actions on
255        // empty slices.
256        if self.chunk.is_empty() {
257            return None;
258        }
259
260        // Through our assert above, the first operand will always be bigger!
261        let remaining_line_length = self.options.max_line_length.0 - self.line_buffer.len();
262        // The previous iteration might have filled the line buffer completely.
263        // Apply overflow behavior.
264        if remaining_line_length == 0 {
265            return Some(self.take_line(true));
266        }
267
268        // We have space remaining in our line buffer.
269        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
270        // and the rest.
271        let (usable, rest) = self
272            .chunk
273            .split_at(usize::min(self.chunk.len(), remaining_line_length));
274
275        // Search for the next newline character in the usable portion of our current chunk.
276        match usable.iter().position(|b| *b == b'\n') {
277            None => {
278                // No line break found! Consume the whole usable chunk portion.
279                self.append_to_line_buffer(usable);
280                self.chunk = rest;
281
282                if rest.is_empty() {
283                    // Return None, as we have no more data to process.
284                    // Leftover data in `line_buffer` must be taken care of externally!
285                    None
286                } else {
287                    // Line now full. Would overflow using rest. Return the current line!
288                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
289                    Some(self.take_line(true))
290                }
291            }
292            Some(pos) => {
293                // Found a line break at `pos` - process the line and continue.
294                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
295                self.append_to_line_buffer(usable_until_line_break);
296
297                // We did split our chunk into `let (usable, rest) = ...` earlier.
298                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
299                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
300                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
301                // to get to the "real"/"complete" rest of data.
302                let rest = &self.chunk[usable_until_line_break.len()..];
303
304                // Skip the `\n` byte!
305                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
306
307                // Return the completed line.
308                Some(self.take_line(false))
309            }
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use crate::LineParsingOptions;
317    use crate::output_stream::LineReader;
318    use assertr::prelude::*;
319    use bytes::{Bytes, BytesMut};
320    use std::time::Duration;
321    use tokio::io::{AsyncWrite, AsyncWriteExt};
322    use tracing_test::traced_test;
323
324    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
325        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
326        tokio::time::sleep(Duration::from_millis(50)).await;
327        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
328        tokio::time::sleep(Duration::from_millis(50)).await;
329        write.write_all("README.md\n".as_bytes()).await.unwrap();
330        tokio::time::sleep(Duration::from_millis(50)).await;
331        write.write_all("src\n".as_bytes()).await.unwrap();
332        tokio::time::sleep(Duration::from_millis(50)).await;
333        write.write_all("target\n".as_bytes()).await.unwrap();
334        tokio::time::sleep(Duration::from_millis(50)).await;
335    }
336
337    #[test]
338    #[traced_test]
339    fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
340        let mut line_buffer = BytesMut::new();
341        let mut collected_lines: Vec<String> = Vec::new();
342
343        let data = "❤️❤️❤️\n👍\n";
344        for byte in data.as_bytes() {
345            let lr = LineReader {
346                chunk: &[*byte],
347                line_buffer: &mut line_buffer,
348                last_line_length: None,
349                options: LineParsingOptions::default(),
350            };
351            for line in lr {
352                collected_lines.push(String::from_utf8_lossy(&line).to_string());
353            }
354        }
355
356        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
357    }
358
359    #[test]
360    #[traced_test]
361    fn reclaims_line_buffer_space_before_collecting_new_line() {
362        let mut line_buffer = BytesMut::new();
363        let mut collected_lines: Vec<String> = Vec::new();
364        let mut bytes: Vec<Bytes> = Vec::new();
365
366        let data = "❤️❤️❤️\n❤️❤️❤️\n";
367        for byte in data.as_bytes() {
368            let lr = LineReader {
369                chunk: &[*byte],
370                line_buffer: &mut line_buffer,
371                last_line_length: None,
372                options: LineParsingOptions::default(),
373            };
374            for line in lr {
375                collected_lines.push(String::from_utf8_lossy(&line).to_string());
376                bytes.push(line);
377            }
378        }
379
380        let data = "❤️❤️❤️\n";
381        let lr = LineReader {
382            chunk: data.as_bytes(),
383            line_buffer: &mut line_buffer,
384            last_line_length: None,
385            options: LineParsingOptions::default(),
386        };
387        for line in lr {
388            collected_lines.push(String::from_utf8_lossy(&line).to_string());
389            bytes.push(line);
390        }
391
392        assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
393
394        logs_assert(|lines: &[&str]| {
395            match lines
396                .iter()
397                .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
398                .count()
399            {
400                3 => {}
401                n => return Err(format!("Expected exactly one log, but found {n}")),
402            };
403            Ok(())
404        });
405    }
406
407    #[test]
408    fn line_reader() {
409        // Helper function to reduce duplication in test cases
410        fn run_test_case(
411            test_name: &str,
412            chunk: &[u8],
413            line_buffer_before: &str,
414            line_buffer_after: &str,
415            expected_lines: &[&str],
416        ) {
417            let mut line_buffer = BytesMut::from(line_buffer_before);
418            let mut collected_lines: Vec<String> = Vec::new();
419
420            let lr = LineReader {
421                chunk,
422                line_buffer: &mut line_buffer,
423                last_line_length: None,
424                options: LineParsingOptions::default(),
425            };
426            for line in lr {
427                collected_lines.push(String::from_utf8_lossy(&line).to_string());
428            }
429
430            assert_that(line_buffer)
431                .with_detail_message(format!("Test case: {test_name}"))
432                .is_equal_to(line_buffer_after);
433
434            let expected_lines: Vec<String> =
435                expected_lines.iter().map(|s| s.to_string()).collect();
436
437            assert_that(collected_lines)
438                .with_detail_message(format!("Test case: {test_name}"))
439                .is_equal_to(expected_lines);
440        }
441
442        // Test case 1: Empty chunk
443        run_test_case(
444            "Empty chunk",
445            b"",
446            "previous: ",
447            "previous: ",
448            &[],
449        );
450
451        // Test case 2: Chunk with no newlines
452        run_test_case(
453            "Chunk with no newlines",
454            b"no newlines here",
455            "previous: ",
456            "previous: no newlines here",
457            &[],
458        );
459
460        // Test case 3: Single complete line
461        run_test_case("Single complete line", b"one line\n", "", "", &["one line"]);
462
463        // Test case 4: Multiple complete lines
464        run_test_case(
465            "Multiple complete lines",
466            b"first line\nsecond line\nthird line\n",
467            "",
468            "",
469            &["first line", "second line", "third line"],
470        );
471
472        // Test case 5: Partial line at the end
473        run_test_case(
474            "Partial line at the end",
475            b"complete line\npartial",
476            "",
477            "partial",
478            &["complete line"],
479        );
480
481        // Test case 6: Initial line with multiple newlines
482        run_test_case(
483            "Initial line with multiple newlines",
484            b"continuation\nmore lines\n",
485            "previous: ",
486            "",
487            &["previous: continuation", "more lines"],
488        );
489
490        // Test case 7: Invalid UTF8 data
491        run_test_case(
492            "Initial line with multiple newlines",
493            b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
494            "",
495            "",
496            &["valid utf8�(�� invalid utf8"],
497        );
498    }
499}