tokio_process_tools/output_stream/mod.rs
1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4/// Default chunk size read from the source stream. 16 kilobytes.
5pub const DEFAULT_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); // 16 kb
6
7/// Default channel capacity for stdout and stderr streams. 128 slots.
8pub const DEFAULT_CHANNEL_CAPACITY: usize = 128;
9
10/// Broadcast output stream implementation supporting multiple concurrent consumers.
11pub mod broadcast;
12
13pub(crate) mod impls;
14
15/// Single subscriber output stream implementation for efficient single-consumer scenarios.
16pub mod single_subscriber;
17
18/// We support the following implementations:
19///
20/// - [broadcast::BroadcastOutputStream]
21/// - [single_subscriber::SingleSubscriberOutputStream]
22pub trait OutputStream {
23 /// The maximum size of every chunk read by the backing `stream_reader`.
24 fn chunk_size(&self) -> NumBytes;
25
26 /// The number of chunks held by the underlying async channel.
27 fn channel_capacity(&self) -> usize;
28
29 /// Type of stream. Can be "stdout" or "stderr". But we do not guarantee this output.
30 /// It should only be used for logging/debugging.
31 fn name(&self) -> &'static str;
32}
33
34/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
35/// Although reaching that level requires:
36/// 1. A receiver to listen for chunks.
37/// 2. The channel getting full.
38pub struct FromStreamOptions {
39 /// The size of an individual chunk read from the read buffer in bytes.
40 ///
41 /// default: 16 * 1024 // 16 kb
42 pub chunk_size: NumBytes,
43
44 /// The number of chunks held by the underlying async channel.
45 ///
46 /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
47 /// than them getting read, this acts as a buffer to hold not-yet processed messages.
48 /// The bigger, the better, in terms of system resilience to write-spikes.
49 /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
50 /// max.
51 pub channel_capacity: usize,
52}
53
54impl Default for FromStreamOptions {
55 fn default() -> Self {
56 Self {
57 chunk_size: DEFAULT_CHUNK_SIZE,
58 channel_capacity: DEFAULT_CHANNEL_CAPACITY, // => 16 kb * 128 = 2 mb (max memory usage consumption)
59 }
60 }
61}
62
63/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
64/// The slices' length is at max of the previously configured maximum `chunk_size`.
65///
66/// We use the word "chunk", as it is often used when processing collections in segments or when
67/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
68///
69/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
70/// complete logical unit with defined boundaries within a protocol or format. This we do not have
71/// here.
72///
73/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
74/// never be observed.
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76pub struct Chunk(bytes::Bytes);
77
78impl AsRef<[u8]> for Chunk {
79 fn as_ref(&self) -> &[u8] {
80 self.0.chunk()
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum BackpressureControl {
86 /// ...
87 DropLatestIncomingIfBufferFull,
88
89 /// Will not lead to "lagging" (and dropping frames in the process).
90 /// But this lowers our speed at which we consume output and may affect the application
91 /// captured, as their pipe buffer may get full, requiring the application /
92 /// relying on the application to drop data instead of writing to stdout/stderr in order
93 /// to not block.
94 BlockUntilBufferHasSpace,
95}
96
97/// Control flag to indicate whether processing should continue or break.
98///
99/// Returning `Break` from an `Inspector`/`Collector` will let that instance stop visiting any
100/// more data.
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum Next {
103 /// Interested in receiving additional data.
104 Continue,
105
106 /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
107 Break,
108}
109
110/// What should happen when a line is too long?
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
112pub enum LineOverflowBehavior {
113 /// Drop any additional data received after the current line was considered too long until
114 /// the next newline character is observed, which then starts a new line.
115 #[default]
116 DropAdditionalData,
117
118 /// Emit the current line when the maximum allowed length is reached.
119 /// Any additional data received is immediately taken as the content of the next line.
120 ///
121 /// This option really just adds intermediate line breaks to not let any emitted line exceed the
122 /// length limit.
123 ///
124 /// No data is dropped with this behavior.
125 EmitAdditionalAsNewLines,
126}
127
128/// Configuration options for parsing lines from a stream.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct LineParsingOptions {
131 /// Maximum length of a single line in bytes.
132 /// When reached, further data won't be appended to the current line.
133 /// The line will be emitted in its current state.
134 ///
135 /// A value of `0` means that "no limit" is imposed.
136 ///
137 /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
138 /// stream maliciously writing endless amounts of data without ever writing a line break
139 /// would starve this system from ever emitting a line and will lead to an infinite amount of
140 /// memory being allocated to hold the line data, letting this process running out of memory!
141 ///
142 /// Defaults to 16 kilobytes.
143 pub max_line_length: NumBytes,
144
145 /// What should happen when a line is too long?
146 pub overflow_behavior: LineOverflowBehavior,
147}
148
149impl Default for LineParsingOptions {
150 fn default() -> Self {
151 Self {
152 max_line_length: 16.kilobytes(),
153 overflow_behavior: LineOverflowBehavior::default(),
154 }
155 }
156}
157
158/// A wrapper type representing a number of bytes.
159///
160/// Use the [`NumBytesExt`] trait to conveniently create instances:
161/// ```
162/// use tokio_process_tools::NumBytesExt;
163/// let kb = 16.kilobytes();
164/// let mb = 2.megabytes();
165/// ```
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub struct NumBytes(usize);
168
169impl NumBytes {
170 /// Creates a NumBytes value of zero.
171 pub fn zero() -> Self {
172 Self(0)
173 }
174
175 /// The amount of bytes represented by this instance.
176 pub fn bytes(&self) -> usize {
177 self.0
178 }
179}
180
181/// Extension trait providing convenience-functions for creation of [`NumBytes`] of certain sizes.
182pub trait NumBytesExt {
183 /// Interprets the value as literal bytes.
184 fn bytes(self) -> NumBytes;
185
186 /// Interprets the value as kilobytes (value * 1024).
187 fn kilobytes(self) -> NumBytes;
188
189 /// Interprets the value as megabytes (value * 1024 * 1024).
190 fn megabytes(self) -> NumBytes;
191}
192
193impl NumBytesExt for usize {
194 fn bytes(self) -> NumBytes {
195 NumBytes(self)
196 }
197
198 fn kilobytes(self) -> NumBytes {
199 NumBytes(self * 1024)
200 }
201
202 fn megabytes(self) -> NumBytes {
203 NumBytes(self * 1024 * 1024)
204 }
205}
206
207/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
208/// already hold some previously written data.
209/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
210/// The remainder of the chunk, not completed with a newline character, will become the new content
211/// of `line_buffer`.
212///
213/// The implementation tries to allocate as little as possible.
214///
215/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
216/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
217/// Only then can the underlying storage, used to capture that line, be reused to capture the
218/// next line.
219///
220/// # Members
221/// * `chunk` - New slice of bytes to process.
222/// * `line_buffer` - Buffer for reading one line.
223/// May hold previously seen, not-yet-closed, line-data.
224pub(crate) struct LineReader<'c, 'b> {
225 chunk: &'c [u8],
226 line_buffer: &'b mut BytesMut,
227 last_line_length: Option<usize>,
228 options: LineParsingOptions,
229}
230
231impl<'c, 'b> LineReader<'c, 'b> {
232 pub fn new(
233 chunk: &'c [u8],
234 line_buffer: &'b mut BytesMut,
235 options: LineParsingOptions,
236 ) -> Self {
237 Self {
238 chunk,
239 line_buffer,
240 last_line_length: None,
241 options,
242 }
243 }
244
245 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
246 self.line_buffer.extend_from_slice(chunk)
247 }
248
249 fn _take_line(&mut self) -> bytes::Bytes {
250 self.last_line_length = Some(self.line_buffer.len());
251 self.line_buffer.split().freeze()
252 }
253
254 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
255 if full_line_buffer {
256 match self.options.overflow_behavior {
257 LineOverflowBehavior::DropAdditionalData => {
258 // Drop any additional (until the next newline character!)
259 // and return the current (not regularly finished) line.
260 let _ = self.chunk.skip_until(b'\n');
261 self._take_line()
262 }
263 LineOverflowBehavior::EmitAdditionalAsNewLines => {
264 // Do NOT drop any additional and return the current (not regularly finished)
265 // line. This will lead to all additional data starting a new line in the
266 // next iteration.
267 self._take_line()
268 }
269 }
270 } else {
271 self._take_line()
272 }
273 }
274}
275
276impl Iterator for LineReader<'_, '_> {
277 type Item = bytes::Bytes;
278
279 fn next(&mut self) -> Option<Self::Item> {
280 // Ensure we never go out of bounds with our line buffer.
281 // This also ensures that no-one creates a `LineReader` with a line buffer that is already
282 // too large for our current `options.max_line_length`.
283 if self.options.max_line_length.0 != 0 {
284 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
285 }
286
287 // Note: This will always be seen, even when the processed chunk ends with `\n`, as
288 // every iterator must once return `None` to signal that it has finished!
289 // And this, we only do later.
290 if let Some(last_line_length) = self.last_line_length.take() {
291 // The previous iteration yielded line of this length!
292 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
293 if !reclaimed {
294 tracing::warn!(
295 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
296 );
297 }
298 }
299
300 // Code would work without this early-return. But this lets us skip a lot of actions on
301 // empty slices.
302 if self.chunk.is_empty() {
303 return None;
304 }
305
306 // Through our assert above, the first operand will always be bigger!
307 let remaining_line_length = if self.options.max_line_length.0 == 0 {
308 usize::MAX
309 } else {
310 self.options.max_line_length.0 - self.line_buffer.len()
311 };
312
313 // The previous iteration might have filled the line buffer completely.
314 // Apply overflow behavior.
315 if remaining_line_length == 0 {
316 return Some(self.take_line(true));
317 }
318
319 // We have space remaining in our line buffer.
320 // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
321 // and the rest.
322 let (usable, rest) = self
323 .chunk
324 .split_at(usize::min(self.chunk.len(), remaining_line_length));
325
326 // Search for the next newline character in the usable portion of our current chunk.
327 match usable.iter().position(|b| *b == b'\n') {
328 None => {
329 // No line break found! Consume the whole usable chunk portion.
330 self.append_to_line_buffer(usable);
331 self.chunk = rest;
332
333 if rest.is_empty() {
334 // Return None, as we have no more data to process.
335 // Leftover data in `line_buffer` must be taken care of externally!
336 None
337 } else {
338 // Line now full. Would overflow using rest. Return the current line!
339 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
340 Some(self.take_line(true))
341 }
342 }
343 Some(pos) => {
344 // Found a line break at `pos` - process the line and continue.
345 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
346 self.append_to_line_buffer(usable_until_line_break);
347
348 // We did split our chunk into `let (usable, rest) = ...` earlier.
349 // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
350 // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
351 // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
352 // to get to the "real"/"complete" rest of data.
353 let rest = &self.chunk[usable_until_line_break.len()..];
354
355 // Skip the `\n` byte!
356 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
357
358 // Return the completed line.
359 Some(self.take_line(false))
360 }
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use std::time::Duration;
368 use tokio::io::{AsyncWrite, AsyncWriteExt};
369
370 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
371 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
372 tokio::time::sleep(Duration::from_millis(50)).await;
373 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
374 tokio::time::sleep(Duration::from_millis(50)).await;
375 write.write_all("README.md\n".as_bytes()).await.unwrap();
376 tokio::time::sleep(Duration::from_millis(50)).await;
377 write.write_all("src\n".as_bytes()).await.unwrap();
378 tokio::time::sleep(Duration::from_millis(50)).await;
379 write.write_all("target\n".as_bytes()).await.unwrap();
380 tokio::time::sleep(Duration::from_millis(50)).await;
381 }
382
383 mod line_reader {
384 use crate::output_stream::LineReader;
385 use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes, NumBytesExt};
386 use assertr::prelude::*;
387 use bytes::{Bytes, BytesMut};
388 use tracing_test::traced_test;
389
390 #[test]
391 #[traced_test]
392 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
393 let mut line_buffer = BytesMut::new();
394 let mut collected_lines: Vec<String> = Vec::new();
395
396 let data = "❤️❤️❤️\n👍\n";
397 for byte in data.as_bytes() {
398 let lr = LineReader {
399 chunk: &[*byte],
400 line_buffer: &mut line_buffer,
401 last_line_length: None,
402 options: LineParsingOptions::default(),
403 };
404 for line in lr {
405 collected_lines.push(String::from_utf8_lossy(&line).to_string());
406 }
407 }
408
409 assert_that(collected_lines).contains_exactly(["❤️❤️❤️", "👍"]);
410 }
411
412 #[test]
413 #[traced_test]
414 fn reclaims_line_buffer_space_before_collecting_new_line() {
415 let mut line_buffer = BytesMut::new();
416 let mut collected_lines: Vec<String> = Vec::new();
417 let mut bytes: Vec<Bytes> = Vec::new();
418
419 let data = "❤️❤️❤️\n❤️❤️❤️\n";
420 for byte in data.as_bytes() {
421 let lr = LineReader {
422 chunk: &[*byte],
423 line_buffer: &mut line_buffer,
424 last_line_length: None,
425 options: LineParsingOptions::default(),
426 };
427 for line in lr {
428 collected_lines.push(String::from_utf8_lossy(&line).to_string());
429 bytes.push(line);
430 }
431 }
432
433 let data = "❤️❤️❤️\n";
434 let lr = LineReader {
435 chunk: data.as_bytes(),
436 line_buffer: &mut line_buffer,
437 last_line_length: None,
438 options: LineParsingOptions::default(),
439 };
440 for line in lr {
441 collected_lines.push(String::from_utf8_lossy(&line).to_string());
442 bytes.push(line);
443 }
444
445 assert_that(collected_lines).contains_exactly(["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
446
447 logs_assert(|lines: &[&str]| {
448 match lines
449 .iter()
450 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
451 .count()
452 {
453 3 => {}
454 n => return Err(format!("Expected exactly one log, but found {n}")),
455 };
456 Ok(())
457 });
458 }
459
460 // Helper function to reduce duplication in test cases
461 fn run_test_case(
462 chunk: &[u8],
463 line_buffer_before: &str,
464 line_buffer_after: &str,
465 expected_lines: &[&str],
466 options: LineParsingOptions,
467 ) {
468 let mut line_buffer = BytesMut::from(line_buffer_before);
469 let mut collected_lines: Vec<String> = Vec::new();
470
471 let lr = LineReader {
472 chunk,
473 line_buffer: &mut line_buffer,
474 last_line_length: None,
475 options,
476 };
477 for line in lr {
478 collected_lines.push(String::from_utf8_lossy(&line).to_string());
479 }
480
481 assert_that(line_buffer).is_equal_to(line_buffer_after);
482
483 let expected_lines: Vec<String> =
484 expected_lines.iter().map(|s| s.to_string()).collect();
485
486 assert_that(collected_lines).is_equal_to(expected_lines);
487 }
488
489 #[test]
490 fn empty_chunk() {
491 run_test_case(
492 b"",
493 "previous: ",
494 "previous: ",
495 &[],
496 LineParsingOptions::default(),
497 );
498 }
499
500 #[test]
501 fn chunk_without_any_newlines() {
502 run_test_case(
503 b"no newlines here",
504 "previous: ",
505 "previous: no newlines here",
506 &[],
507 LineParsingOptions::default(),
508 );
509 }
510
511 #[test]
512 fn single_completed_line() {
513 run_test_case(
514 b"one line\n",
515 "",
516 "",
517 &["one line"],
518 LineParsingOptions::default(),
519 );
520 }
521
522 #[test]
523 fn multiple_completed_lines() {
524 run_test_case(
525 b"first line\nsecond line\nthird line\n",
526 "",
527 "",
528 &["first line", "second line", "third line"],
529 LineParsingOptions::default(),
530 );
531 }
532
533 #[test]
534 fn partial_line_at_the_end() {
535 run_test_case(
536 b"complete line\npartial",
537 "",
538 "partial",
539 &["complete line"],
540 LineParsingOptions::default(),
541 );
542 }
543
544 #[test]
545 fn initial_line_with_multiple_newlines() {
546 run_test_case(
547 b"continuation\nmore lines\n",
548 "previous: ",
549 "",
550 &["previous: continuation", "more lines"],
551 LineParsingOptions::default(),
552 );
553 }
554
555 #[test]
556 fn invalid_utf8_data() {
557 run_test_case(
558 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
559 "",
560 "",
561 &["valid utf8�(�� invalid utf8"],
562 LineParsingOptions::default(),
563 );
564 }
565
566 #[test]
567 fn rest_of_too_long_line_is_dropped() {
568 run_test_case(
569 b"123456789\nabcdefghi\n",
570 "",
571 "",
572 &["1234", "abcd"],
573 LineParsingOptions {
574 max_line_length: 4.bytes(), // Only allow lines with 4 ascii chars (or equiv.) max.
575 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
576 },
577 );
578 }
579
580 #[test]
581 fn rest_of_too_long_line_is_returned_as_additional_lines() {
582 run_test_case(
583 b"123456789\nabcdefghi\n",
584 "",
585 "",
586 &["1234", "5678", "9", "abcd", "efgh", "i"],
587 LineParsingOptions {
588 max_line_length: 4.bytes(), // Only allow lines with 4 ascii chars (or equiv.) max.
589 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
590 },
591 );
592 }
593
594 #[test]
595 fn max_line_length_of_0_disables_line_length_checks_test1() {
596 run_test_case(
597 b"123456789\nabcdefghi\n",
598 "",
599 "",
600 &["123456789", "abcdefghi"],
601 LineParsingOptions {
602 max_line_length: NumBytes::zero(),
603 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
604 },
605 );
606 }
607
608 #[test]
609 fn max_line_length_of_0_disables_line_length_checks_test2() {
610 run_test_case(
611 b"123456789\nabcdefghi\n",
612 "",
613 "",
614 &["123456789", "abcdefghi"],
615 LineParsingOptions {
616 max_line_length: NumBytes::zero(),
617 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
618 },
619 );
620 }
621
622 #[test]
623 fn leading_and_trailing_whitespace_is_preserved() {
624 run_test_case(
625 b" 123456789 \n abcdefghi \n",
626 "",
627 "",
628 &[" 123456789 ", " abcdefghi "],
629 LineParsingOptions {
630 max_line_length: NumBytes::zero(),
631 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
632 },
633 );
634 }
635 }
636}