tokio_process_tools/output_stream/mod.rs
1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4/// Broadcast output stream implementation supporting multiple concurrent consumers.
5pub mod broadcast;
6
7pub(crate) mod impls;
8
9/// Single subscriber output stream implementation for efficient single-consumer scenarios.
10pub mod single_subscriber;
11
12/// We support the following implementations:
13///
14/// - [broadcast::BroadcastOutputStream]
15/// - [single_subscriber::SingleSubscriberOutputStream]
16pub trait OutputStream {}
17
18/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
19/// Although reaching that level requires:
20/// 1. A receiver to listen for chunks.
21/// 2. The channel getting full.
22pub struct FromStreamOptions {
23 /// The size of an individual chunk read from the read buffer in bytes.
24 ///
25 /// default: 16 * 1024 // 16 kb
26 pub chunk_size: usize,
27
28 /// The number of chunks held by the underlying async channel.
29 ///
30 /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
31 /// than them getting read, this acts as a buffer to hold not-yet processed messages.
32 /// The bigger, the better, in terms of system resilience to write-spikes.
33 /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
34 /// max.
35 pub channel_capacity: usize,
36}
37
38impl Default for FromStreamOptions {
39 fn default() -> Self {
40 Self {
41 chunk_size: 16 * 1024, // 16 kb
42 channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
43 }
44 }
45}
46
47/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
48/// The slices' length is at max of the previously configured maximum `chunk_size`.
49///
50/// We use the word "chunk", as it is often used when processing collections in segments or when
51/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
52///
53/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
54/// complete logical unit with defined boundaries within a protocol or format. This we do not have
55/// here.
56///
57/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
58/// never be observed.
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
60pub struct Chunk(bytes::Bytes);
61
62impl AsRef<[u8]> for Chunk {
63 fn as_ref(&self) -> &[u8] {
64 self.0.chunk()
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum BackpressureControl {
70 /// ...
71 DropLatestIncomingIfBufferFull,
72
73 /// Will not lead to "lagging" (and dropping frames in the process).
74 /// But this lowers our speed at which we consume output and may affect the application
75 /// captured, as their pipe buffer may get full, requiring the application /
76 /// relying on the application to drop data instead of writing to stdout/stderr in order
77 /// to not block.
78 BlockUntilBufferHasSpace,
79}
80
81/// Control flag to indicate whether processing should continue or break.
82///
83/// Returning `Break` from an `Inspector`/`Collector` will let that instance stop visiting any
84/// more data.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum Next {
87 /// Interested in receiving additional data.
88 Continue,
89
90 /// Not interested in receiving additional data. Will let the `inspector`/`collector` stop.
91 Break,
92}
93
94/// What should happen when a line is too long?
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum LineOverflowBehavior {
97 /// Drop any additional data received after the current line was considered too long until
98 /// the next newline character is observed, which then starts a new line.
99 #[default]
100 DropAdditionalData,
101
102 /// Emit the current line when the maximum allowed length is reached.
103 /// Any additional data received is immediately taken as the content of the next line.
104 ///
105 /// This option really just adds intermediate line breaks to not let any emitted line exceed the
106 /// length limit.
107 ///
108 /// No data is dropped with this behavior.
109 EmitAdditionalAsNewLines,
110}
111
112/// Configuration options for parsing lines from a stream.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub struct LineParsingOptions {
115 /// Maximum length of a single line in bytes.
116 /// When reached, further data won't be appended to the current line.
117 /// The line will be emitted in its current state.
118 ///
119 /// A value of `0` means that "no limit" is imposed.
120 ///
121 /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
122 /// stream maliciously writing endless amounts of data without ever writing a line break
123 /// would starve this system from ever emitting a line and will lead to an infinite amount of
124 /// memory being allocated to hold the line data, letting this process running out of memory!
125 ///
126 /// Defaults to 16 kilobytes.
127 pub max_line_length: NumBytes,
128
129 /// What should happen when a line is too long?
130 pub overflow_behavior: LineOverflowBehavior,
131}
132
133impl Default for LineParsingOptions {
134 fn default() -> Self {
135 Self {
136 max_line_length: 16.kilobytes(),
137 overflow_behavior: LineOverflowBehavior::default(),
138 }
139 }
140}
141
142/// A wrapper type representing a number of bytes.
143///
144/// Use the [`NumBytesExt`] trait to conveniently create instances:
145/// ```
146/// use tokio_process_tools::NumBytesExt;
147/// let kb = 16.kilobytes();
148/// let mb = 2.megabytes();
149/// ```
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct NumBytes(pub usize);
152
153impl NumBytes {
154 /// Creates a NumBytes value of zero.
155 pub fn zero() -> Self {
156 Self(0)
157 }
158}
159
160/// Extension trait providing convenience-functions for creation of [`NumBytes`] of certain sizes.
161pub trait NumBytesExt {
162 /// Interprets the value as literal bytes.
163 fn bytes(self) -> NumBytes;
164
165 /// Interprets the value as kilobytes (value * 1024).
166 fn kilobytes(self) -> NumBytes;
167
168 /// Interprets the value as megabytes (value * 1024 * 1024).
169 fn megabytes(self) -> NumBytes;
170}
171
172impl NumBytesExt for usize {
173 fn bytes(self) -> NumBytes {
174 NumBytes(self)
175 }
176
177 fn kilobytes(self) -> NumBytes {
178 NumBytes(self * 1024)
179 }
180
181 fn megabytes(self) -> NumBytes {
182 NumBytes(self * 1024 * 1024)
183 }
184}
185
186/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
187/// already hold some previously written data.
188/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
189/// The remainder of the chunk, not completed with a newline character, will become the new content
190/// of `line_buffer`.
191///
192/// The implementation tries to allocate as little as possible.
193///
194/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
195/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
196/// Only then can the underlying storage, used to capture that line, be reused to capture the
197/// next line.
198///
199/// # Members
200/// * `chunk` - New slice of bytes to process.
201/// * `line_buffer` - Buffer for reading one line.
202/// May hold previously seen, not-yet-closed, line-data.
203pub(crate) struct LineReader<'c, 'b> {
204 chunk: &'c [u8],
205 line_buffer: &'b mut BytesMut,
206 last_line_length: Option<usize>,
207 options: LineParsingOptions,
208}
209
210impl<'c, 'b> LineReader<'c, 'b> {
211 pub fn new(
212 chunk: &'c [u8],
213 line_buffer: &'b mut BytesMut,
214 options: LineParsingOptions,
215 ) -> Self {
216 Self {
217 chunk,
218 line_buffer,
219 last_line_length: None,
220 options,
221 }
222 }
223
224 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
225 self.line_buffer.extend_from_slice(chunk)
226 }
227
228 fn _take_line(&mut self) -> bytes::Bytes {
229 self.last_line_length = Some(self.line_buffer.len());
230 self.line_buffer.split().freeze()
231 }
232
233 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
234 if full_line_buffer {
235 match self.options.overflow_behavior {
236 LineOverflowBehavior::DropAdditionalData => {
237 // Drop any additional (until the next newline character!)
238 // and return the current (not regularly finished) line.
239 let _ = self.chunk.skip_until(b'\n');
240 self._take_line()
241 }
242 LineOverflowBehavior::EmitAdditionalAsNewLines => {
243 // Do NOT drop any additional and return the current (not regularly finished)
244 // line. This will lead to all additional data starting a new line in the
245 // next iteration.
246 self._take_line()
247 }
248 }
249 } else {
250 self._take_line()
251 }
252 }
253}
254
255impl Iterator for LineReader<'_, '_> {
256 type Item = bytes::Bytes;
257
258 fn next(&mut self) -> Option<Self::Item> {
259 // Ensure we never go out of bounds with our line buffer.
260 // This also ensures that no-one creates a `LineReader` with a line buffer that is already
261 // too large for our current `options.max_line_length`.
262 if self.options.max_line_length.0 != 0 {
263 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
264 }
265
266 // Note: This will always be seen, even when the processed chunk ends with `\n`, as
267 // every iterator must once return `None` to signal that it has finished!
268 // And this, we only do later.
269 if let Some(last_line_length) = self.last_line_length.take() {
270 // The previous iteration yielded line of this length!
271 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
272 if !reclaimed {
273 tracing::warn!(
274 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
275 );
276 }
277 }
278
279 // Code would work without this early-return. But this lets us skip a lot of actions on
280 // empty slices.
281 if self.chunk.is_empty() {
282 return None;
283 }
284
285 // Through our assert above, the first operand will always be bigger!
286 let remaining_line_length = if self.options.max_line_length.0 == 0 {
287 usize::MAX
288 } else {
289 self.options.max_line_length.0 - self.line_buffer.len()
290 };
291
292 // The previous iteration might have filled the line buffer completely.
293 // Apply overflow behavior.
294 if remaining_line_length == 0 {
295 return Some(self.take_line(true));
296 }
297
298 // We have space remaining in our line buffer.
299 // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
300 // and the rest.
301 let (usable, rest) = self
302 .chunk
303 .split_at(usize::min(self.chunk.len(), remaining_line_length));
304
305 // Search for the next newline character in the usable portion of our current chunk.
306 match usable.iter().position(|b| *b == b'\n') {
307 None => {
308 // No line break found! Consume the whole usable chunk portion.
309 self.append_to_line_buffer(usable);
310 self.chunk = rest;
311
312 if rest.is_empty() {
313 // Return None, as we have no more data to process.
314 // Leftover data in `line_buffer` must be taken care of externally!
315 None
316 } else {
317 // Line now full. Would overflow using rest. Return the current line!
318 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
319 Some(self.take_line(true))
320 }
321 }
322 Some(pos) => {
323 // Found a line break at `pos` - process the line and continue.
324 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
325 self.append_to_line_buffer(usable_until_line_break);
326
327 // We did split our chunk into `let (usable, rest) = ...` earlier.
328 // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
329 // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
330 // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
331 // to get to the "real"/"complete" rest of data.
332 let rest = &self.chunk[usable_until_line_break.len()..];
333
334 // Skip the `\n` byte!
335 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
336
337 // Return the completed line.
338 Some(self.take_line(false))
339 }
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::time::Duration;
347 use tokio::io::{AsyncWrite, AsyncWriteExt};
348
349 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
350 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
351 tokio::time::sleep(Duration::from_millis(50)).await;
352 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
353 tokio::time::sleep(Duration::from_millis(50)).await;
354 write.write_all("README.md\n".as_bytes()).await.unwrap();
355 tokio::time::sleep(Duration::from_millis(50)).await;
356 write.write_all("src\n".as_bytes()).await.unwrap();
357 tokio::time::sleep(Duration::from_millis(50)).await;
358 write.write_all("target\n".as_bytes()).await.unwrap();
359 tokio::time::sleep(Duration::from_millis(50)).await;
360 }
361
362 mod line_reader {
363 use crate::output_stream::LineReader;
364 use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
365 use assertr::prelude::*;
366 use bytes::{Bytes, BytesMut};
367 use tracing_test::traced_test;
368
369 #[test]
370 #[traced_test]
371 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
372 let mut line_buffer = BytesMut::new();
373 let mut collected_lines: Vec<String> = Vec::new();
374
375 let data = "❤️❤️❤️\n👍\n";
376 for byte in data.as_bytes() {
377 let lr = LineReader {
378 chunk: &[*byte],
379 line_buffer: &mut line_buffer,
380 last_line_length: None,
381 options: LineParsingOptions::default(),
382 };
383 for line in lr {
384 collected_lines.push(String::from_utf8_lossy(&line).to_string());
385 }
386 }
387
388 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
389 }
390
391 #[test]
392 #[traced_test]
393 fn reclaims_line_buffer_space_before_collecting_new_line() {
394 let mut line_buffer = BytesMut::new();
395 let mut collected_lines: Vec<String> = Vec::new();
396 let mut bytes: Vec<Bytes> = Vec::new();
397
398 let data = "❤️❤️❤️\n❤️❤️❤️\n";
399 for byte in data.as_bytes() {
400 let lr = LineReader {
401 chunk: &[*byte],
402 line_buffer: &mut line_buffer,
403 last_line_length: None,
404 options: LineParsingOptions::default(),
405 };
406 for line in lr {
407 collected_lines.push(String::from_utf8_lossy(&line).to_string());
408 bytes.push(line);
409 }
410 }
411
412 let data = "❤️❤️❤️\n";
413 let lr = LineReader {
414 chunk: data.as_bytes(),
415 line_buffer: &mut line_buffer,
416 last_line_length: None,
417 options: LineParsingOptions::default(),
418 };
419 for line in lr {
420 collected_lines.push(String::from_utf8_lossy(&line).to_string());
421 bytes.push(line);
422 }
423
424 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
425
426 logs_assert(|lines: &[&str]| {
427 match lines
428 .iter()
429 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
430 .count()
431 {
432 3 => {}
433 n => return Err(format!("Expected exactly one log, but found {n}")),
434 };
435 Ok(())
436 });
437 }
438
439 // Helper function to reduce duplication in test cases
440 fn run_test_case(
441 chunk: &[u8],
442 line_buffer_before: &str,
443 line_buffer_after: &str,
444 expected_lines: &[&str],
445 options: LineParsingOptions,
446 ) {
447 let mut line_buffer = BytesMut::from(line_buffer_before);
448 let mut collected_lines: Vec<String> = Vec::new();
449
450 let lr = LineReader {
451 chunk,
452 line_buffer: &mut line_buffer,
453 last_line_length: None,
454 options,
455 };
456 for line in lr {
457 collected_lines.push(String::from_utf8_lossy(&line).to_string());
458 }
459
460 assert_that(line_buffer).is_equal_to(line_buffer_after);
461
462 let expected_lines: Vec<String> =
463 expected_lines.iter().map(|s| s.to_string()).collect();
464
465 assert_that(collected_lines).is_equal_to(expected_lines);
466 }
467
468 #[test]
469 fn empty_chunk() {
470 run_test_case(
471 b"",
472 "previous: ",
473 "previous: ",
474 &[],
475 LineParsingOptions::default(),
476 );
477 }
478
479 #[test]
480 fn chunk_without_any_newlines() {
481 run_test_case(
482 b"no newlines here",
483 "previous: ",
484 "previous: no newlines here",
485 &[],
486 LineParsingOptions::default(),
487 );
488 }
489
490 #[test]
491 fn single_completed_line() {
492 run_test_case(
493 b"one line\n",
494 "",
495 "",
496 &["one line"],
497 LineParsingOptions::default(),
498 );
499 }
500
501 #[test]
502 fn multiple_completed_lines() {
503 run_test_case(
504 b"first line\nsecond line\nthird line\n",
505 "",
506 "",
507 &["first line", "second line", "third line"],
508 LineParsingOptions::default(),
509 );
510 }
511
512 #[test]
513 fn partial_line_at_the_end() {
514 run_test_case(
515 b"complete line\npartial",
516 "",
517 "partial",
518 &["complete line"],
519 LineParsingOptions::default(),
520 );
521 }
522
523 #[test]
524 fn initial_line_with_multiple_newlines() {
525 run_test_case(
526 b"continuation\nmore lines\n",
527 "previous: ",
528 "",
529 &["previous: continuation", "more lines"],
530 LineParsingOptions::default(),
531 );
532 }
533
534 #[test]
535 fn invalid_utf8_data() {
536 run_test_case(
537 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
538 "",
539 "",
540 &["valid utf8�(�� invalid utf8"],
541 LineParsingOptions::default(),
542 );
543 }
544
545 #[test]
546 fn rest_of_too_long_line_is_dropped() {
547 run_test_case(
548 b"123456789\nabcdefghi\n",
549 "",
550 "",
551 &["1234", "abcd"],
552 LineParsingOptions {
553 max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
554 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
555 },
556 );
557 }
558
559 #[test]
560 fn rest_of_too_long_line_is_returned_as_additional_lines() {
561 run_test_case(
562 b"123456789\nabcdefghi\n",
563 "",
564 "",
565 &["1234", "5678", "9", "abcd", "efgh", "i"],
566 LineParsingOptions {
567 max_line_length: NumBytes(4), // Only allow lines with 4 ascii chars (or equiv.) max.
568 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
569 },
570 );
571 }
572
573 #[test]
574 fn max_line_length_of_0_disables_line_length_checks_test1() {
575 run_test_case(
576 b"123456789\nabcdefghi\n",
577 "",
578 "",
579 &["123456789", "abcdefghi"],
580 LineParsingOptions {
581 max_line_length: NumBytes(0),
582 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
583 },
584 );
585 }
586
587 #[test]
588 fn max_line_length_of_0_disables_line_length_checks_test2() {
589 run_test_case(
590 b"123456789\nabcdefghi\n",
591 "",
592 "",
593 &["123456789", "abcdefghi"],
594 LineParsingOptions {
595 max_line_length: NumBytes(0),
596 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
597 },
598 );
599 }
600
601 #[test]
602 fn leading_and_trailing_whitespace_is_preserved() {
603 run_test_case(
604 b" 123456789 \n abcdefghi \n",
605 "",
606 "",
607 &[" 123456789 ", " abcdefghi "],
608 LineParsingOptions {
609 max_line_length: NumBytes(0),
610 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
611 },
612 );
613 }
614 }
615}