tokio_process_tools/output_stream/
mod.rs

1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4/// Broadcast output stream implementation supporting multiple concurrent consumers.
5pub mod broadcast;
6
7pub(crate) mod impls;
8
9/// Single subscriber output stream implementation for efficient single-consumer scenarios.
10pub mod single_subscriber;
11
12/// We support the following implementations:
13///
14/// - [broadcast::BroadcastOutputStream]
15/// - [single_subscriber::SingleSubscriberOutputStream]
16pub trait OutputStream {}
17
18/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
19/// Although reaching that level requires:
20/// 1. A receiver to listen for chunks.
21/// 2. The channel getting full.
22pub struct FromStreamOptions {
23    /// The size of an individual chunk read from the read buffer in bytes.
24    ///
25    /// default: 16 * 1024 // 16 kb
26    pub chunk_size: usize,
27
28    /// The number of chunks held by the underlying async channel.
29    ///
30    /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
31    /// than them getting read, this acts as a buffer to hold not-yet processed messages.
32    /// The bigger, the better, in terms of system resilience to write-spikes.
33    /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
34    /// max.
35    pub channel_capacity: usize,
36}
37
38impl Default for FromStreamOptions {
39    fn default() -> Self {
40        Self {
41            chunk_size: 16 * 1024, // 16 kb
42            channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
43        }
44    }
45}
46
47/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
48/// The slices' length is at max of the previously configured maximum `chunk_size`.
49///
50/// We use the word "chunk", as it is often used when processing collections in segments or when
51/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
52///
53/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
54/// complete logical unit with defined boundaries within a protocol or format. This we do not have
55/// here.
56///
57/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
58/// never be observed.
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct Chunk(bytes::Bytes);
61
62impl AsRef<[u8]> for Chunk {
63    fn as_ref(&self) -> &[u8] {
64        self.0.chunk()
65    }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum BackpressureControl {
70    /// ...
71    DropLatestIncomingIfBufferFull,
72
73    /// Will not lead to "lagging" (and dropping frames in the process).
74    /// But this lowers our speed at which we consume output and may affect the application
75    /// captured, as their pipe buffer may get full, requiring the application /
76    /// relying on the application to drop data instead of writing to stdout/stderr in order
77    /// to not block.
78    BlockUntilBufferHasSpace,
79}
80
81/// Control flag to indicate whether processing should continue or break.
82///
83/// Returning `Break` from an `Inspector`/`Collector` will let that instance stop visiting any
84/// more data.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum Next {
87    /// Interested in receiving additional data.
88    Continue,
89
90    /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
91    Break,
92}
93
94/// What should happen when a line is too long?
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum LineOverflowBehavior {
97    /// Drop any additional data received after the current line was considered too long until
98    /// the next newline character is observed, which then starts a new line.
99    #[default]
100    DropAdditionalData,
101
102    /// Emit the current line when the maximum allowed length is reached.
103    /// Any additional data received is immediately taken as the content of the next line.
104    ///
105    /// This option really just adds intermediate line breaks to not let any emitted line exceed the
106    /// length limit.
107    ///
108    /// No data is dropped with this behavior.
109    EmitAdditionalAsNewLines,
110}
111
112/// Configuration options for parsing lines from a stream.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub struct LineParsingOptions {
115    /// Maximum length of a single line in bytes.
116    /// When reached, further data won't be appended to the current line.
117    /// The line will be emitted in its current state.
118    ///
119    /// A value of `0` means that "no limit" is imposed.
120    ///
121    /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
122    /// stream maliciously writing endless amounts of data without ever writing a line break
123    /// would starve this system from ever emitting a line and will lead to an infinite amount of
124    /// memory being allocated to hold the line data, letting this process running out of memory!
125    ///
126    /// Defaults to 16 kilobytes.
127    pub max_line_length: NumBytes,
128
129    /// What should happen when a line is too long?
130    pub overflow_behavior: LineOverflowBehavior,
131}
132
133impl Default for LineParsingOptions {
134    fn default() -> Self {
135        Self {
136            max_line_length: 16.kilobytes(),
137            overflow_behavior: LineOverflowBehavior::default(),
138        }
139    }
140}
141
142/// A wrapper type representing a number of bytes.
143///
144/// Use the [`NumBytesExt`] trait to conveniently create instances:
145/// ```
146/// use tokio_process_tools::NumBytesExt;
147/// let kb = 16.kilobytes();
148/// let mb = 2.megabytes();
149/// ```
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct NumBytes(pub usize);
152
153impl NumBytes {
154    /// Creates a NumBytes value of zero.
155    pub fn zero() -> Self {
156        Self(0)
157    }
158}
159
160/// Extension trait providing convenience-functions for creation of [`NumBytes`] of certain sizes.
161pub trait NumBytesExt {
162    /// Interprets the value as literal bytes.
163    fn bytes(self) -> NumBytes;
164
165    /// Interprets the value as kilobytes (value * 1024).
166    fn kilobytes(self) -> NumBytes;
167
168    /// Interprets the value as megabytes (value * 1024 * 1024).
169    fn megabytes(self) -> NumBytes;
170}
171
172impl NumBytesExt for usize {
173    fn bytes(self) -> NumBytes {
174        NumBytes(self)
175    }
176
177    fn kilobytes(self) -> NumBytes {
178        NumBytes(self * 1024)
179    }
180
181    fn megabytes(self) -> NumBytes {
182        NumBytes(self * 1024 * 1024)
183    }
184}
185
186/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
187/// already hold some previously written data.
188/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
189/// The remainder of the chunk, not completed with a newline character, will become the new content
190/// of `line_buffer`.
191///
192/// The implementation tries to allocate as little as possible.
193///
194/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
195/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
196/// Only then can the underlying storage, used to capture that line, be reused to capture the
197/// next line.
198///
199/// # Members
200/// * `chunk` - New slice of bytes to process.
201/// * `line_buffer` - Buffer for reading one line.
202///   May hold previously seen, not-yet-closed, line-data.
203pub(crate) struct LineReader<'c, 'b> {
204    chunk: &'c [u8],
205    line_buffer: &'b mut BytesMut,
206    last_line_length: Option<usize>,
207    options: LineParsingOptions,
208}
209
210impl<'c, 'b> LineReader<'c, 'b> {
211    pub fn new(
212        chunk: &'c [u8],
213        line_buffer: &'b mut BytesMut,
214        options: LineParsingOptions,
215    ) -> Self {
216        Self {
217            chunk,
218            line_buffer,
219            last_line_length: None,
220            options,
221        }
222    }
223
224    fn append_to_line_buffer(&mut self, chunk: &[u8]) {
225        self.line_buffer.extend_from_slice(chunk)
226    }
227
228    fn _take_line(&mut self) -> bytes::Bytes {
229        self.last_line_length = Some(self.line_buffer.len());
230        self.line_buffer.split().freeze()
231    }
232
233    fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
234        if full_line_buffer {
235            match self.options.overflow_behavior {
236                LineOverflowBehavior::DropAdditionalData => {
237                    // Drop any additional (until the next newline character!)
238                    // and return the current (not regularly finished) line.
239                    let _ = self.chunk.skip_until(b'\n');
240                    self._take_line()
241                }
242                LineOverflowBehavior::EmitAdditionalAsNewLines => {
243                    // Do NOT drop any additional and return the current (not regularly finished)
244                    // line. This will lead to all additional data starting a new line in the
245                    // next iteration.
246                    self._take_line()
247                }
248            }
249        } else {
250            self._take_line()
251        }
252    }
253}
254
255impl Iterator for LineReader<'_, '_> {
256    type Item = bytes::Bytes;
257
258    fn next(&mut self) -> Option<Self::Item> {
259        // Ensure we never go out of bounds with our line buffer.
260        // This also ensures that no-one creates a `LineReader` with a line buffer that is already
261        // too large for our current `options.max_line_length`.
262        if self.options.max_line_length.0 != 0 {
263            assert!(self.line_buffer.len() <= self.options.max_line_length.0);
264        }
265
266        // Note: This will always be seen, even when the processed chunk ends with `\n`, as
267        // every iterator must once return `None` to signal that it has finished!
268        // And this, we only do later.
269        if let Some(last_line_length) = self.last_line_length.take() {
270            // The previous iteration yielded line of this length!
271            let reclaimed = self.line_buffer.try_reclaim(last_line_length);
272            if !reclaimed {
273                tracing::warn!(
274                    "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
275                );
276            }
277        }
278
279        // Code would work without this early-return. But this lets us skip a lot of actions on
280        // empty slices.
281        if self.chunk.is_empty() {
282            return None;
283        }
284
285        // Through our assert above, the first operand will always be bigger!
286        let remaining_line_length = if self.options.max_line_length.0 == 0 {
287            usize::MAX
288        } else {
289            self.options.max_line_length.0 - self.line_buffer.len()
290        };
291
292        // The previous iteration might have filled the line buffer completely.
293        // Apply overflow behavior.
294        if remaining_line_length == 0 {
295            return Some(self.take_line(true));
296        }
297
298        // We have space remaining in our line buffer.
299        // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
300        // and the rest.
301        let (usable, rest) = self
302            .chunk
303            .split_at(usize::min(self.chunk.len(), remaining_line_length));
304
305        // Search for the next newline character in the usable portion of our current chunk.
306        match usable.iter().position(|b| *b == b'\n') {
307            None => {
308                // No line break found! Consume the whole usable chunk portion.
309                self.append_to_line_buffer(usable);
310                self.chunk = rest;
311
312                if rest.is_empty() {
313                    // Return None, as we have no more data to process.
314                    // Leftover data in `line_buffer` must be taken care of externally!
315                    None
316                } else {
317                    // Line now full. Would overflow using rest. Return the current line!
318                    assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
319                    Some(self.take_line(true))
320                }
321            }
322            Some(pos) => {
323                // Found a line break at `pos` - process the line and continue.
324                let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
325                self.append_to_line_buffer(usable_until_line_break);
326
327                // We did split our chunk into `let (usable, rest) = ...` earlier.
328                // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
329                // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
330                // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
331                // to get to the "real"/"complete" rest of data.
332                let rest = &self.chunk[usable_until_line_break.len()..];
333
334                // Skip the `\n` byte!
335                self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
336
337                // Return the completed line.
338                Some(self.take_line(false))
339            }
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::time::Duration;
347    use tokio::io::{AsyncWrite, AsyncWriteExt};
348
349    pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
350        write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
351        tokio::time::sleep(Duration::from_millis(50)).await;
352        write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
353        tokio::time::sleep(Duration::from_millis(50)).await;
354        write.write_all("README.md\n".as_bytes()).await.unwrap();
355        tokio::time::sleep(Duration::from_millis(50)).await;
356        write.write_all("src\n".as_bytes()).await.unwrap();
357        tokio::time::sleep(Duration::from_millis(50)).await;
358        write.write_all("target\n".as_bytes()).await.unwrap();
359        tokio::time::sleep(Duration::from_millis(50)).await;
360    }
361
362    mod line_reader {
363        use crate::output_stream::LineReader;
364        use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
365        use assertr::prelude::*;
366        use bytes::{Bytes, BytesMut};
367        use tracing_test::traced_test;
368
369        #[test]
370        #[traced_test]
371        fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
372            let mut line_buffer = BytesMut::new();
373            let mut collected_lines: Vec<String> = Vec::new();
374
375            let data = "❤️❤️❤️\n👍\n";
376            for byte in data.as_bytes() {
377                let lr = LineReader {
378                    chunk: &[*byte],
379                    line_buffer: &mut line_buffer,
380                    last_line_length: None,
381                    options: LineParsingOptions::default(),
382                };
383                for line in lr {
384                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
385                }
386            }
387
388            assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
389        }
390
391        #[test]
392        #[traced_test]
393        fn reclaims_line_buffer_space_before_collecting_new_line() {
394            let mut line_buffer = BytesMut::new();
395            let mut collected_lines: Vec<String> = Vec::new();
396            let mut bytes: Vec<Bytes> = Vec::new();
397
398            let data = "❤️❤️❤️\n❤️❤️❤️\n";
399            for byte in data.as_bytes() {
400                let lr = LineReader {
401                    chunk: &[*byte],
402                    line_buffer: &mut line_buffer,
403                    last_line_length: None,
404                    options: LineParsingOptions::default(),
405                };
406                for line in lr {
407                    collected_lines.push(String::from_utf8_lossy(&line).to_string());
408                    bytes.push(line);
409                }
410            }
411
412            let data = "❤️❤️❤️\n";
413            let lr = LineReader {
414                chunk: data.as_bytes(),
415                line_buffer: &mut line_buffer,
416                last_line_length: None,
417                options: LineParsingOptions::default(),
418            };
419            for line in lr {
420                collected_lines.push(String::from_utf8_lossy(&line).to_string());
421                bytes.push(line);
422            }
423
424            assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
425
426            logs_assert(|lines: &[&str]| {
427                match lines
428                    .iter()
429                    .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
430                    .count()
431                {
432                    3 => {}
433                    n => return Err(format!("Expected exactly one log, but found {n}")),
434                };
435                Ok(())
436            });
437        }
438
439        // Helper function to reduce duplication in test cases
440        fn run_test_case(
441            chunk: &[u8],
442            line_buffer_before: &str,
443            line_buffer_after: &str,
444            expected_lines: &[&str],
445            options: LineParsingOptions,
446        ) {
447            let mut line_buffer = BytesMut::from(line_buffer_before);
448            let mut collected_lines: Vec<String> = Vec::new();
449
450            let lr = LineReader {
451                chunk,
452                line_buffer: &mut line_buffer,
453                last_line_length: None,
454                options,
455            };
456            for line in lr {
457                collected_lines.push(String::from_utf8_lossy(&line).to_string());
458            }
459
460            assert_that(line_buffer).is_equal_to(line_buffer_after);
461
462            let expected_lines: Vec<String> =
463                expected_lines.iter().map(|s| s.to_string()).collect();
464
465            assert_that(collected_lines).is_equal_to(expected_lines);
466        }
467
468        #[test]
469        fn empty_chunk() {
470            run_test_case(
471                b"",
472                "previous: ",
473                "previous: ",
474                &[],
475                LineParsingOptions::default(),
476            );
477        }
478
479        #[test]
480        fn chunk_without_any_newlines() {
481            run_test_case(
482                b"no newlines here",
483                "previous: ",
484                "previous: no newlines here",
485                &[],
486                LineParsingOptions::default(),
487            );
488        }
489
490        #[test]
491        fn single_completed_line() {
492            run_test_case(
493                b"one line\n",
494                "",
495                "",
496                &["one line"],
497                LineParsingOptions::default(),
498            );
499        }
500
501        #[test]
502        fn multiple_completed_lines() {
503            run_test_case(
504                b"first line\nsecond line\nthird line\n",
505                "",
506                "",
507                &["first line", "second line", "third line"],
508                LineParsingOptions::default(),
509            );
510        }
511
512        #[test]
513        fn partial_line_at_the_end() {
514            run_test_case(
515                b"complete line\npartial",
516                "",
517                "partial",
518                &["complete line"],
519                LineParsingOptions::default(),
520            );
521        }
522
523        #[test]
524        fn initial_line_with_multiple_newlines() {
525            run_test_case(
526                b"continuation\nmore lines\n",
527                "previous: ",
528                "",
529                &["previous: continuation", "more lines"],
530                LineParsingOptions::default(),
531            );
532        }
533
534        #[test]
535        fn invalid_utf8_data() {
536            run_test_case(
537                b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
538                "",
539                "",
540                &["valid utf8�(�� invalid utf8"],
541                LineParsingOptions::default(),
542            );
543        }
544
545        #[test]
546        fn rest_of_too_long_line_is_dropped() {
547            run_test_case(
548                b"123456789\nabcdefghi\n",
549                "",
550                "",
551                &["1234", "abcd"],
552                LineParsingOptions {
553                    max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
554                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
555                },
556            );
557        }
558
559        #[test]
560        fn rest_of_too_long_line_is_returned_as_additional_lines() {
561            run_test_case(
562                b"123456789\nabcdefghi\n",
563                "",
564                "",
565                &["1234", "5678", "9", "abcd", "efgh", "i"],
566                LineParsingOptions {
567                    max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
568                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
569                },
570            );
571        }
572
573        #[test]
574        fn max_line_length_of_0_disables_line_length_checks_test1() {
575            run_test_case(
576                b"123456789\nabcdefghi\n",
577                "",
578                "",
579                &["123456789", "abcdefghi"],
580                LineParsingOptions {
581                    max_line_length: NumBytes(0),
582                    overflow_behavior: LineOverflowBehavior::DropAdditionalData,
583                },
584            );
585        }
586
587        #[test]
588        fn max_line_length_of_0_disables_line_length_checks_test2() {
589            run_test_case(
590                b"123456789\nabcdefghi\n",
591                "",
592                "",
593                &["123456789", "abcdefghi"],
594                LineParsingOptions {
595                    max_line_length: NumBytes(0),
596                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
597                },
598            );
599        }
600
601        #[test]
602        fn leading_and_trailing_whitespace_is_preserved() {
603            run_test_case(
604                b"   123456789     \n    abcdefghi        \n",
605                "",
606                "",
607                &["   123456789     ", "    abcdefghi        "],
608                LineParsingOptions {
609                    max_line_length: NumBytes(0),
610                    overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
611                },
612            );
613        }
614    }
615}