tokio_process_tools/output_stream/mod.rs
1use bytes::{Buf, BytesMut};
2
3pub mod broadcast;
4pub(crate) mod impls;
5pub mod single_subscriber;
6
7/// We support the following implementations:
8///
9/// - [broadcast::BroadcastOutputStream]
10/// - [single_subscriber::SingleSubscriberOutputStream]
11pub trait OutputStream {}
12
13/// NOTE: The maximum possible memory consumption is: `chunk_size * channel_capacity`.
14/// Although reaching that level requires:
15/// 1. A receiver to listen for chunks.
16/// 2. The channel getting full.
17pub struct FromStreamOptions {
18 /// The size of an individual chunk read from the read buffer in bytes.
19 ///
20 /// default: 16 * 1024 // 16 kb
21 pub chunk_size: usize,
22
23 /// The number of chunks held by the underlying async channel.
24 ///
25 /// When the subscriber (if present) is not fast enough to consume chunks equally fast or faster
26 /// than them getting read, this acts as a buffer to hold not-yet processed messages.
27 /// The bigger, the better, in terms of system resilience to write-spikes.
28 /// Multiply with `chunk_size` to obtain the amount of system resources this will consume at
29 /// max.
30 pub channel_capacity: usize,
31}
32
33impl Default for FromStreamOptions {
34 fn default() -> Self {
35 Self {
36 chunk_size: 16 * 1024, // 16 kb
37 channel_capacity: 128, // => 16 kb * 128 = 2 mb (max memory usage consumption)
38 }
39 }
40}
41
42/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
43/// The slices' length is at max of the previously configured maximum `chunk_size`.
44///
45/// We use the word "chunk", as it is often used when processing collections in segments or when
46/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
47///
48/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
49/// complete logical unit with defined boundaries within a protocol or format. This we do not have
50/// here.
51///
52/// Note: If the underlying stream is of lower buffer size, chunks of length `chunk_size` may
53/// never be observed.
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct Chunk(bytes::Bytes);
56
57impl AsRef<[u8]> for Chunk {
58 fn as_ref(&self) -> &[u8] {
59 self.0.chunk()
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum BackpressureControl {
65 /// ...
66 DropLatestIncomingIfBufferFull,
67
68 /// Will not lead to "lagging" (and dropping frames in the process).
69 /// But this lowers our speed at which we consume output and may affect the application
70 /// captured, as their pipe buffer may get full, requiring the application /
71 /// relying on the application to drop data instead of writing to stdout/stderr in order
72 /// to not block.
73 BlockUntilBufferHasSpace,
74}
75
76/// Control flag to indicate whether processing should continue or break.
77///
78/// Returning `Break` from an `Inspector` will let that inspector stop.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum Next {
81 Continue,
82 Break,
83}
84
85/// What should happen when a line is too long?
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
87pub enum LineOverflowBehavior {
88 /// Drop any additional data received after the current line was considered too long until
89 /// the next newline character is observed, which then starts a new line.
90 #[default]
91 DropAdditionalData,
92
93 /// Emit the current line when the maximum allowed length is reached.
94 /// Any additional data received is immediately taken as the content of the next line.
95 ///
96 /// This option really just adds intermediate line breaks to not let any emitted line exceed the
97 /// length limit.
98 ///
99 /// No data is dropped with this behavior.
100 EmitAdditionalAsNewLines,
101}
102
103/// Configuration options for parsing lines from a stream.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct LineParsingOptions {
106 /// Maximum length of a single line in bytes.
107 /// When reached, further data won't be appended to the current line.
108 /// The line will be emitted in its current state.
109 ///
110 /// A value of `0` means that "no limit" is imposed.
111 ///
112 /// Only set this to `0` when you absolutely trust the input stream! Remember that an observed
113 /// stream maliciously writing endless amounts of data without ever writing a line break
114 /// would starve this system from ever emitting a line and will lead to an infinite amount of
115 /// memory being allocated to hold the line data, letting this process running out of memory!
116 ///
117 /// Defaults to 16 kilobytes.
118 pub max_line_length: NumBytes,
119
120 /// What should happen when a line is too long?
121 pub overflow_behavior: LineOverflowBehavior,
122}
123
124impl Default for LineParsingOptions {
125 fn default() -> Self {
126 Self {
127 max_line_length: 16.kilobytes(),
128 overflow_behavior: LineOverflowBehavior::default(),
129 }
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub struct NumBytes(usize);
135
136impl NumBytes {
137 pub fn zero() -> Self {
138 Self(0)
139 }
140}
141
142pub trait NumBytesExt {
143 fn bytes(self) -> NumBytes;
144
145 fn kilobytes(self) -> NumBytes;
146
147 fn megabytes(self) -> NumBytes;
148}
149
150impl NumBytesExt for usize {
151 fn bytes(self) -> NumBytes {
152 NumBytes(self)
153 }
154
155 fn kilobytes(self) -> NumBytes {
156 NumBytes(self * 1024)
157 }
158
159 fn megabytes(self) -> NumBytes {
160 NumBytes(self * 1024 * 1024)
161 }
162}
163
164/// Conceptually, this iterator appends the given byte slice to the current line buffer, which may
165/// already hold some previously written data.
166/// The resulting view of data is split by newlines (`\n`). Every completed line is yielded.
167/// The remainder of the chunk, not completed with a newline character, will become the new content
168/// of `line_buffer`.
169///
170/// The implementation tries to allocate as little as possible.
171///
172/// It can be expected that `line_buffer` does not grow beyond `options.max_line_length` bytes
173/// **IF** any yielded line is dropped or cloned and **NOT** stored long term.
174/// Only then can the underlying storage, used to capture that line, be reused to capture the
175/// next line.
176///
177/// # Members
178/// * `chunk` - New slice of bytes to process.
179/// * `line_buffer` - Buffer for reading one line.
180/// May hold previously seen, not-yet-closed, line-data.
181pub(crate) struct LineReader<'c, 'b> {
182 chunk: &'c [u8],
183 line_buffer: &'b mut BytesMut,
184 last_line_length: Option<usize>,
185 options: LineParsingOptions,
186}
187
188impl<'c, 'b> LineReader<'c, 'b> {
189 pub fn new(
190 chunk: &'c [u8],
191 line_buffer: &'b mut BytesMut,
192 options: LineParsingOptions,
193 ) -> Self {
194 Self {
195 chunk,
196 line_buffer,
197 last_line_length: None,
198 options,
199 }
200 }
201
202 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
203 self.line_buffer.extend_from_slice(chunk)
204 }
205
206 fn _take_line(&mut self) -> bytes::Bytes {
207 self.last_line_length = Some(self.line_buffer.len());
208 self.line_buffer.split().freeze()
209 }
210
211 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
212 if full_line_buffer {
213 match self.options.overflow_behavior {
214 LineOverflowBehavior::DropAdditionalData => {
215 // Drop any additional and return the current (not regularly finished) line.
216 self.chunk = &[];
217 self._take_line()
218 }
219 LineOverflowBehavior::EmitAdditionalAsNewLines => {
220 // Do NOT drop any additional and return the current (not regularly finished)
221 // line. This will lead to all additional data starting a new line in the
222 // next iteration.
223 self._take_line()
224 }
225 }
226 } else {
227 self._take_line()
228 }
229 }
230}
231
232impl Iterator for LineReader<'_, '_> {
233 type Item = bytes::Bytes;
234
235 fn next(&mut self) -> Option<Self::Item> {
236 // Ensure we never go out of bounds with our line buffer.
237 // This also ensures that no-one creates a `LineReader` with a line buffer that is already
238 // too large for our current `options.max_line_length`.
239 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
240
241 // Note: This will always be seen, even when the processed chunk ends with `\n`, as
242 // every iterator must once return `None` to signal that it has finished!
243 // And this, we only do later.
244 if let Some(last_line_length) = self.last_line_length.take() {
245 // The previous iteration yielded line of this length!
246 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
247 if !reclaimed {
248 tracing::warn!(
249 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
250 );
251 }
252 }
253
254 // Code would work without this early-return. But this lets us skip a lot of actions on
255 // empty slices.
256 if self.chunk.is_empty() {
257 return None;
258 }
259
260 // Through our assert above, the first operand will always be bigger!
261 let remaining_line_length = self.options.max_line_length.0 - self.line_buffer.len();
262 // The previous iteration might have filled the line buffer completely.
263 // Apply overflow behavior.
264 if remaining_line_length == 0 {
265 return Some(self.take_line(true));
266 }
267
268 // We have space remaining in our line buffer.
269 // Split the chunk into two a usable portion (which would not "overflow" the line buffer)
270 // and the rest.
271 let (usable, rest) = self
272 .chunk
273 .split_at(usize::min(self.chunk.len(), remaining_line_length));
274
275 // Search for the next newline character in the usable portion of our current chunk.
276 match usable.iter().position(|b| *b == b'\n') {
277 None => {
278 // No line break found! Consume the whole usable chunk portion.
279 self.append_to_line_buffer(usable);
280 self.chunk = rest;
281
282 if rest.is_empty() {
283 // Return None, as we have no more data to process.
284 // Leftover data in `line_buffer` must be taken care of externally!
285 None
286 } else {
287 // Line now full. Would overflow using rest. Return the current line!
288 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
289 Some(self.take_line(true))
290 }
291 }
292 Some(pos) => {
293 // Found a line break at `pos` - process the line and continue.
294 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
295 self.append_to_line_buffer(usable_until_line_break);
296
297 // We did split our chunk into `let (usable, rest) = ...` earlier.
298 // We then split usable into `let (usable_until_line_break, _usable_rest) = ...`.
299 // We know that `_usable_rest` and `rest` are consecutive in `chunk`!
300 // This is the combination of `_usable_rest` and `rest` expressed through `chunk`
301 // to get to the "real"/"complete" rest of data.
302 let rest = &self.chunk[usable_until_line_break.len()..];
303
304 // Skip the `\n` byte!
305 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
306
307 // Return the completed line.
308 Some(self.take_line(false))
309 }
310 }
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use crate::LineParsingOptions;
317 use crate::output_stream::LineReader;
318 use assertr::prelude::*;
319 use bytes::{Bytes, BytesMut};
320 use std::time::Duration;
321 use tokio::io::{AsyncWrite, AsyncWriteExt};
322 use tracing_test::traced_test;
323
324 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
325 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
326 tokio::time::sleep(Duration::from_millis(50)).await;
327 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
328 tokio::time::sleep(Duration::from_millis(50)).await;
329 write.write_all("README.md\n".as_bytes()).await.unwrap();
330 tokio::time::sleep(Duration::from_millis(50)).await;
331 write.write_all("src\n".as_bytes()).await.unwrap();
332 tokio::time::sleep(Duration::from_millis(50)).await;
333 write.write_all("target\n".as_bytes()).await.unwrap();
334 tokio::time::sleep(Duration::from_millis(50)).await;
335 }
336
337 #[test]
338 #[traced_test]
339 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
340 let mut line_buffer = BytesMut::new();
341 let mut collected_lines: Vec<String> = Vec::new();
342
343 let data = "❤️❤️❤️\n👍\n";
344 for byte in data.as_bytes() {
345 let lr = LineReader {
346 chunk: &[*byte],
347 line_buffer: &mut line_buffer,
348 last_line_length: None,
349 options: LineParsingOptions::default(),
350 };
351 for line in lr {
352 collected_lines.push(String::from_utf8_lossy(&line).to_string());
353 }
354 }
355
356 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
357 }
358
359 #[test]
360 #[traced_test]
361 fn reclaims_line_buffer_space_before_collecting_new_line() {
362 let mut line_buffer = BytesMut::new();
363 let mut collected_lines: Vec<String> = Vec::new();
364 let mut bytes: Vec<Bytes> = Vec::new();
365
366 let data = "❤️❤️❤️\n❤️❤️❤️\n";
367 for byte in data.as_bytes() {
368 let lr = LineReader {
369 chunk: &[*byte],
370 line_buffer: &mut line_buffer,
371 last_line_length: None,
372 options: LineParsingOptions::default(),
373 };
374 for line in lr {
375 collected_lines.push(String::from_utf8_lossy(&line).to_string());
376 bytes.push(line);
377 }
378 }
379
380 let data = "❤️❤️❤️\n";
381 let lr = LineReader {
382 chunk: data.as_bytes(),
383 line_buffer: &mut line_buffer,
384 last_line_length: None,
385 options: LineParsingOptions::default(),
386 };
387 for line in lr {
388 collected_lines.push(String::from_utf8_lossy(&line).to_string());
389 bytes.push(line);
390 }
391
392 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
393
394 logs_assert(|lines: &[&str]| {
395 match lines
396 .iter()
397 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
398 .count()
399 {
400 3 => {}
401 n => return Err(format!("Expected exactly one log, but found {n}")),
402 };
403 Ok(())
404 });
405 }
406
407 #[test]
408 fn line_reader() {
409 // Helper function to reduce duplication in test cases
410 fn run_test_case(
411 test_name: &str,
412 chunk: &[u8],
413 line_buffer_before: &str,
414 line_buffer_after: &str,
415 expected_lines: &[&str],
416 ) {
417 let mut line_buffer = BytesMut::from(line_buffer_before);
418 let mut collected_lines: Vec<String> = Vec::new();
419
420 let lr = LineReader {
421 chunk,
422 line_buffer: &mut line_buffer,
423 last_line_length: None,
424 options: LineParsingOptions::default(),
425 };
426 for line in lr {
427 collected_lines.push(String::from_utf8_lossy(&line).to_string());
428 }
429
430 assert_that(line_buffer)
431 .with_detail_message(format!("Test case: {test_name}"))
432 .is_equal_to(line_buffer_after);
433
434 let expected_lines: Vec<String> =
435 expected_lines.iter().map(|s| s.to_string()).collect();
436
437 assert_that(collected_lines)
438 .with_detail_message(format!("Test case: {test_name}"))
439 .is_equal_to(expected_lines);
440 }
441
442 // Test case 1: Empty chunk
443 run_test_case(
444 "Empty chunk",
445 b"",
446 "previous: ",
447 "previous: ",
448 &[],
449 );
450
451 // Test case 2: Chunk with no newlines
452 run_test_case(
453 "Chunk with no newlines",
454 b"no newlines here",
455 "previous: ",
456 "previous: no newlines here",
457 &[],
458 );
459
460 // Test case 3: Single complete line
461 run_test_case("Single complete line", b"one line\n", "", "", &["one line"]);
462
463 // Test case 4: Multiple complete lines
464 run_test_case(
465 "Multiple complete lines",
466 b"first line\nsecond line\nthird line\n",
467 "",
468 "",
469 &["first line", "second line", "third line"],
470 );
471
472 // Test case 5: Partial line at the end
473 run_test_case(
474 "Partial line at the end",
475 b"complete line\npartial",
476 "",
477 "partial",
478 &["complete line"],
479 );
480
481 // Test case 6: Initial line with multiple newlines
482 run_test_case(
483 "Initial line with multiple newlines",
484 b"continuation\nmore lines\n",
485 "previous: ",
486 "",
487 &["previous: continuation", "more lines"],
488 );
489
490 // Test case 7: Invalid UTF8 data
491 run_test_case(
492 "Initial line with multiple newlines",
493 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
494 "",
495 "",
496 &["valid utf8�(�� invalid utf8"],
497 );
498 }
499}