tokio_process_tools/output_stream/
mod.rs1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8pub trait OutputStream {}
13
14pub struct FromStreamOptions {
19 pub chunk_size: usize,
23
24 pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35 fn default() -> Self {
36 Self {
37 chunk_size: 16 * 1024, channel_capacity: 128, }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59 fn as_ref(&self) -> &[u8] {
60 self.0.chunk()
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66 DropLatestIncomingIfBufferFull,
68
69 BlockUntilBufferHasSpace,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82 Continue,
83 Break,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89 #[default]
92 DropAdditionalData,
93
94 EmitAdditionalAsNewLines,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107 pub max_line_length: NumBytes,
120
121 pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126 fn default() -> Self {
127 Self {
128 max_line_length: 16.kilobytes(),
129 overflow_behavior: LineOverflowBehavior::default(),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138 pub fn zero() -> Self {
139 Self(0)
140 }
141}
142
143pub trait NumBytesExt {
144 fn bytes(self) -> NumBytes;
145
146 fn kilobytes(self) -> NumBytes;
147
148 fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152 fn bytes(self) -> NumBytes {
153 NumBytes(self)
154 }
155
156 fn kilobytes(self) -> NumBytes {
157 NumBytes(self * 1024)
158 }
159
160 fn megabytes(self) -> NumBytes {
161 NumBytes(self * 1024 * 1024)
162 }
163}
164
165pub(crate) struct LineReader<'c, 'b> {
183 chunk: &'c [u8],
184 line_buffer: &'b mut BytesMut,
185 last_line_length: Option<usize>,
186 options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190 pub fn new(
191 chunk: &'c [u8],
192 line_buffer: &'b mut BytesMut,
193 options: LineParsingOptions,
194 ) -> Self {
195 Self {
196 chunk,
197 line_buffer,
198 last_line_length: None,
199 options,
200 }
201 }
202
203 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204 self.line_buffer.extend_from_slice(chunk)
205 }
206
207 fn _take_line(&mut self) -> bytes::Bytes {
208 self.last_line_length = Some(self.line_buffer.len());
209 self.line_buffer.split().freeze()
210 }
211
212 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213 if full_line_buffer {
214 match self.options.overflow_behavior {
215 LineOverflowBehavior::DropAdditionalData => {
216 let _ = self.chunk.skip_until(b'\n');
219 self._take_line()
220 }
221 LineOverflowBehavior::EmitAdditionalAsNewLines => {
222 self._take_line()
226 }
227 }
228 } else {
229 self._take_line()
230 }
231 }
232}
233
234impl Iterator for LineReader<'_, '_> {
235 type Item = bytes::Bytes;
236
237 fn next(&mut self) -> Option<Self::Item> {
238 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
242
243 if let Some(last_line_length) = self.last_line_length.take() {
247 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
249 if !reclaimed {
250 tracing::warn!(
251 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
252 );
253 }
254 }
255
256 if self.chunk.is_empty() {
259 return None;
260 }
261
262 let remaining_line_length = self.options.max_line_length.0 - self.line_buffer.len();
264 if remaining_line_length == 0 {
267 return Some(self.take_line(true));
268 }
269
270 let (usable, rest) = self
274 .chunk
275 .split_at(usize::min(self.chunk.len(), remaining_line_length));
276
277 match usable.iter().position(|b| *b == b'\n') {
279 None => {
280 self.append_to_line_buffer(usable);
282 self.chunk = rest;
283
284 if rest.is_empty() {
285 None
288 } else {
289 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
291 Some(self.take_line(true))
292 }
293 }
294 Some(pos) => {
295 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
297 self.append_to_line_buffer(usable_until_line_break);
298
299 let rest = &self.chunk[usable_until_line_break.len()..];
305
306 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
308
309 Some(self.take_line(false))
311 }
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use crate::output_stream::LineReader;
319 use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
320 use assertr::prelude::*;
321 use bytes::{Bytes, BytesMut};
322 use std::time::Duration;
323 use tokio::io::{AsyncWrite, AsyncWriteExt};
324 use tracing_test::traced_test;
325
326 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
327 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
328 tokio::time::sleep(Duration::from_millis(50)).await;
329 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
330 tokio::time::sleep(Duration::from_millis(50)).await;
331 write.write_all("README.md\n".as_bytes()).await.unwrap();
332 tokio::time::sleep(Duration::from_millis(50)).await;
333 write.write_all("src\n".as_bytes()).await.unwrap();
334 tokio::time::sleep(Duration::from_millis(50)).await;
335 write.write_all("target\n".as_bytes()).await.unwrap();
336 tokio::time::sleep(Duration::from_millis(50)).await;
337 }
338
339 #[test]
340 #[traced_test]
341 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
342 let mut line_buffer = BytesMut::new();
343 let mut collected_lines: Vec<String> = Vec::new();
344
345 let data = "❤️❤️❤️\n👍\n";
346 for byte in data.as_bytes() {
347 let lr = LineReader {
348 chunk: &[*byte],
349 line_buffer: &mut line_buffer,
350 last_line_length: None,
351 options: LineParsingOptions::default(),
352 };
353 for line in lr {
354 collected_lines.push(String::from_utf8_lossy(&line).to_string());
355 }
356 }
357
358 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
359 }
360
361 #[test]
362 #[traced_test]
363 fn reclaims_line_buffer_space_before_collecting_new_line() {
364 let mut line_buffer = BytesMut::new();
365 let mut collected_lines: Vec<String> = Vec::new();
366 let mut bytes: Vec<Bytes> = Vec::new();
367
368 let data = "❤️❤️❤️\n❤️❤️❤️\n";
369 for byte in data.as_bytes() {
370 let lr = LineReader {
371 chunk: &[*byte],
372 line_buffer: &mut line_buffer,
373 last_line_length: None,
374 options: LineParsingOptions::default(),
375 };
376 for line in lr {
377 collected_lines.push(String::from_utf8_lossy(&line).to_string());
378 bytes.push(line);
379 }
380 }
381
382 let data = "❤️❤️❤️\n";
383 let lr = LineReader {
384 chunk: data.as_bytes(),
385 line_buffer: &mut line_buffer,
386 last_line_length: None,
387 options: LineParsingOptions::default(),
388 };
389 for line in lr {
390 collected_lines.push(String::from_utf8_lossy(&line).to_string());
391 bytes.push(line);
392 }
393
394 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
395
396 logs_assert(|lines: &[&str]| {
397 match lines
398 .iter()
399 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
400 .count()
401 {
402 3 => {}
403 n => return Err(format!("Expected exactly one log, but found {n}")),
404 };
405 Ok(())
406 });
407 }
408
409 #[test]
410 fn line_reader() {
411 fn run_test_case(
413 test_name: &str,
414 chunk: &[u8],
415 line_buffer_before: &str,
416 line_buffer_after: &str,
417 expected_lines: &[&str],
418 options: LineParsingOptions,
419 ) {
420 let mut line_buffer = BytesMut::from(line_buffer_before);
421 let mut collected_lines: Vec<String> = Vec::new();
422
423 let lr = LineReader {
424 chunk,
425 line_buffer: &mut line_buffer,
426 last_line_length: None,
427 options,
428 };
429 for line in lr {
430 collected_lines.push(String::from_utf8_lossy(&line).to_string());
431 }
432
433 assert_that(line_buffer)
434 .with_detail_message(format!("Test case: {test_name}"))
435 .is_equal_to(line_buffer_after);
436
437 let expected_lines: Vec<String> =
438 expected_lines.iter().map(|s| s.to_string()).collect();
439
440 assert_that(collected_lines)
441 .with_detail_message(format!("Test case: {test_name}"))
442 .is_equal_to(expected_lines);
443 }
444
445 run_test_case(
446 "Test 1: Empty chunk",
447 b"",
448 "previous: ",
449 "previous: ",
450 &[],
451 LineParsingOptions::default(),
452 );
453
454 run_test_case(
455 "Test 2: Chunk with no newlines",
456 b"no newlines here",
457 "previous: ",
458 "previous: no newlines here",
459 &[],
460 LineParsingOptions::default(),
461 );
462
463 run_test_case(
464 "Test 3: Single complete line",
465 b"one line\n",
466 "",
467 "",
468 &["one line"],
469 LineParsingOptions::default(),
470 );
471
472 run_test_case(
473 "Test 4: Multiple complete lines",
474 b"first line\nsecond line\nthird line\n",
475 "",
476 "",
477 &["first line", "second line", "third line"],
478 LineParsingOptions::default(),
479 );
480
481 run_test_case(
482 "Test 5: Partial line at the end",
483 b"complete line\npartial",
484 "",
485 "partial",
486 &["complete line"],
487 LineParsingOptions::default(),
488 );
489
490 run_test_case(
491 "Test 6: Initial line with multiple newlines",
492 b"continuation\nmore lines\n",
493 "previous: ",
494 "",
495 &["previous: continuation", "more lines"],
496 LineParsingOptions::default(),
497 );
498
499 run_test_case(
500 "Test 7: Invalid UTF8 data",
501 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
502 "",
503 "",
504 &["valid utf8�(�� invalid utf8"],
505 LineParsingOptions::default(),
506 );
507
508 run_test_case(
509 "Test 8 - Rest of too long line is dropped",
510 b"123456789\nabcdefghi\n",
511 "",
512 "",
513 &["1234", "abcd"],
514 LineParsingOptions {
515 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::DropAdditionalData,
517 },
518 );
519
520 run_test_case(
521 "Test 9 - Rest of too long line is returned as additional lines",
522 b"123456789\nabcdefghi\n",
523 "",
524 "",
525 &["1234", "5678", "9", "abcd", "efgh", "i"],
526 LineParsingOptions {
527 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
529 },
530 );
531 }
532}