1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8pub trait OutputStream {}
13
14pub struct FromStreamOptions {
19 pub chunk_size: usize,
23
24 pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35 fn default() -> Self {
36 Self {
37 chunk_size: 16 * 1024, channel_capacity: 128, }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59 fn as_ref(&self) -> &[u8] {
60 self.0.chunk()
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66 DropLatestIncomingIfBufferFull,
68
69 BlockUntilBufferHasSpace,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82 Continue,
83 Break,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89 #[default]
92 DropAdditionalData,
93
94 EmitAdditionalAsNewLines,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107 pub max_line_length: NumBytes,
120
121 pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126 fn default() -> Self {
127 Self {
128 max_line_length: 16.kilobytes(),
129 overflow_behavior: LineOverflowBehavior::default(),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138 pub fn zero() -> Self {
139 Self(0)
140 }
141}
142
143pub trait NumBytesExt {
144 fn bytes(self) -> NumBytes;
145
146 fn kilobytes(self) -> NumBytes;
147
148 fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152 fn bytes(self) -> NumBytes {
153 NumBytes(self)
154 }
155
156 fn kilobytes(self) -> NumBytes {
157 NumBytes(self * 1024)
158 }
159
160 fn megabytes(self) -> NumBytes {
161 NumBytes(self * 1024 * 1024)
162 }
163}
164
165pub(crate) struct LineReader<'c, 'b> {
183 chunk: &'c [u8],
184 line_buffer: &'b mut BytesMut,
185 last_line_length: Option<usize>,
186 options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190 pub fn new(
191 chunk: &'c [u8],
192 line_buffer: &'b mut BytesMut,
193 options: LineParsingOptions,
194 ) -> Self {
195 Self {
196 chunk,
197 line_buffer,
198 last_line_length: None,
199 options,
200 }
201 }
202
203 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204 self.line_buffer.extend_from_slice(chunk)
205 }
206
207 fn _take_line(&mut self) -> bytes::Bytes {
208 self.last_line_length = Some(self.line_buffer.len());
209 self.line_buffer.split().freeze()
210 }
211
212 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213 if full_line_buffer {
214 match self.options.overflow_behavior {
215 LineOverflowBehavior::DropAdditionalData => {
216 let _ = self.chunk.skip_until(b'\n');
219 self._take_line()
220 }
221 LineOverflowBehavior::EmitAdditionalAsNewLines => {
222 self._take_line()
226 }
227 }
228 } else {
229 self._take_line()
230 }
231 }
232}
233
234impl Iterator for LineReader<'_, '_> {
235 type Item = bytes::Bytes;
236
237 fn next(&mut self) -> Option<Self::Item> {
238 if self.options.max_line_length.0 != 0 {
242 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
243 }
244
245 if let Some(last_line_length) = self.last_line_length.take() {
249 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
251 if !reclaimed {
252 tracing::warn!(
253 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
254 );
255 }
256 }
257
258 if self.chunk.is_empty() {
261 return None;
262 }
263
264 let remaining_line_length = if self.options.max_line_length.0 == 0 {
266 usize::MAX
267 } else {
268 self.options.max_line_length.0 - self.line_buffer.len()
269 };
270
271 if remaining_line_length == 0 {
274 return Some(self.take_line(true));
275 }
276
277 let (usable, rest) = self
281 .chunk
282 .split_at(usize::min(self.chunk.len(), remaining_line_length));
283
284 match usable.iter().position(|b| *b == b'\n') {
286 None => {
287 self.append_to_line_buffer(usable);
289 self.chunk = rest;
290
291 if rest.is_empty() {
292 None
295 } else {
296 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
298 Some(self.take_line(true))
299 }
300 }
301 Some(pos) => {
302 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
304 self.append_to_line_buffer(usable_until_line_break);
305
306 let rest = &self.chunk[usable_until_line_break.len()..];
312
313 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
315
316 Some(self.take_line(false))
318 }
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::time::Duration;
326 use tokio::io::{AsyncWrite, AsyncWriteExt};
327
328 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
329 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
330 tokio::time::sleep(Duration::from_millis(50)).await;
331 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
332 tokio::time::sleep(Duration::from_millis(50)).await;
333 write.write_all("README.md\n".as_bytes()).await.unwrap();
334 tokio::time::sleep(Duration::from_millis(50)).await;
335 write.write_all("src\n".as_bytes()).await.unwrap();
336 tokio::time::sleep(Duration::from_millis(50)).await;
337 write.write_all("target\n".as_bytes()).await.unwrap();
338 tokio::time::sleep(Duration::from_millis(50)).await;
339 }
340
341 mod line_reader {
342 use crate::output_stream::LineReader;
343 use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
344 use assertr::prelude::*;
345 use bytes::{Bytes, BytesMut};
346 use tracing_test::traced_test;
347
348 #[test]
349 #[traced_test]
350 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
351 let mut line_buffer = BytesMut::new();
352 let mut collected_lines: Vec<String> = Vec::new();
353
354 let data = "❤️❤️❤️\n👍\n";
355 for byte in data.as_bytes() {
356 let lr = LineReader {
357 chunk: &[*byte],
358 line_buffer: &mut line_buffer,
359 last_line_length: None,
360 options: LineParsingOptions::default(),
361 };
362 for line in lr {
363 collected_lines.push(String::from_utf8_lossy(&line).to_string());
364 }
365 }
366
367 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
368 }
369
370 #[test]
371 #[traced_test]
372 fn reclaims_line_buffer_space_before_collecting_new_line() {
373 let mut line_buffer = BytesMut::new();
374 let mut collected_lines: Vec<String> = Vec::new();
375 let mut bytes: Vec<Bytes> = Vec::new();
376
377 let data = "❤️❤️❤️\n❤️❤️❤️\n";
378 for byte in data.as_bytes() {
379 let lr = LineReader {
380 chunk: &[*byte],
381 line_buffer: &mut line_buffer,
382 last_line_length: None,
383 options: LineParsingOptions::default(),
384 };
385 for line in lr {
386 collected_lines.push(String::from_utf8_lossy(&line).to_string());
387 bytes.push(line);
388 }
389 }
390
391 let data = "❤️❤️❤️\n";
392 let lr = LineReader {
393 chunk: data.as_bytes(),
394 line_buffer: &mut line_buffer,
395 last_line_length: None,
396 options: LineParsingOptions::default(),
397 };
398 for line in lr {
399 collected_lines.push(String::from_utf8_lossy(&line).to_string());
400 bytes.push(line);
401 }
402
403 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
404
405 logs_assert(|lines: &[&str]| {
406 match lines
407 .iter()
408 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
409 .count()
410 {
411 3 => {}
412 n => return Err(format!("Expected exactly one log, but found {n}")),
413 };
414 Ok(())
415 });
416 }
417
418 fn run_test_case(
420 chunk: &[u8],
421 line_buffer_before: &str,
422 line_buffer_after: &str,
423 expected_lines: &[&str],
424 options: LineParsingOptions,
425 ) {
426 let mut line_buffer = BytesMut::from(line_buffer_before);
427 let mut collected_lines: Vec<String> = Vec::new();
428
429 let lr = LineReader {
430 chunk,
431 line_buffer: &mut line_buffer,
432 last_line_length: None,
433 options,
434 };
435 for line in lr {
436 collected_lines.push(String::from_utf8_lossy(&line).to_string());
437 }
438
439 assert_that(line_buffer).is_equal_to(line_buffer_after);
440
441 let expected_lines: Vec<String> =
442 expected_lines.iter().map(|s| s.to_string()).collect();
443
444 assert_that(collected_lines).is_equal_to(expected_lines);
445 }
446
447 #[test]
448 fn empty_chunk() {
449 run_test_case(
450 b"",
451 "previous: ",
452 "previous: ",
453 &[],
454 LineParsingOptions::default(),
455 );
456 }
457
458 #[test]
459 fn chunk_without_any_newlines() {
460 run_test_case(
461 b"no newlines here",
462 "previous: ",
463 "previous: no newlines here",
464 &[],
465 LineParsingOptions::default(),
466 );
467 }
468
469 #[test]
470 fn single_completed_line() {
471 run_test_case(
472 b"one line\n",
473 "",
474 "",
475 &["one line"],
476 LineParsingOptions::default(),
477 );
478 }
479
480 #[test]
481 fn multiple_completed_lines() {
482 run_test_case(
483 b"first line\nsecond line\nthird line\n",
484 "",
485 "",
486 &["first line", "second line", "third line"],
487 LineParsingOptions::default(),
488 );
489 }
490
491 #[test]
492 fn partial_line_at_the_end() {
493 run_test_case(
494 b"complete line\npartial",
495 "",
496 "partial",
497 &["complete line"],
498 LineParsingOptions::default(),
499 );
500 }
501
502 #[test]
503 fn initial_line_with_multiple_newlines() {
504 run_test_case(
505 b"continuation\nmore lines\n",
506 "previous: ",
507 "",
508 &["previous: continuation", "more lines"],
509 LineParsingOptions::default(),
510 );
511 }
512
513 #[test]
514 fn invalid_utf8_data() {
515 run_test_case(
516 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
517 "",
518 "",
519 &["valid utf8�(�� invalid utf8"],
520 LineParsingOptions::default(),
521 );
522 }
523
524 #[test]
525 fn rest_of_too_long_line_is_dropped() {
526 run_test_case(
527 b"123456789\nabcdefghi\n",
528 "",
529 "",
530 &["1234", "abcd"],
531 LineParsingOptions {
532 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::DropAdditionalData,
534 },
535 );
536 }
537
538 #[test]
539 fn rest_of_too_long_line_is_returned_as_additional_lines() {
540 run_test_case(
541 b"123456789\nabcdefghi\n",
542 "",
543 "",
544 &["1234", "5678", "9", "abcd", "efgh", "i"],
545 LineParsingOptions {
546 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
548 },
549 );
550 }
551
552 #[test]
553 fn max_line_length_of_0_disables_line_length_checks_test1() {
554 run_test_case(
555 b"123456789\nabcdefghi\n",
556 "",
557 "",
558 &["123456789", "abcdefghi"],
559 LineParsingOptions {
560 max_line_length: NumBytes(0),
561 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
562 },
563 );
564 }
565
566 #[test]
567 fn max_line_length_of_0_disables_line_length_checks_test2() {
568 run_test_case(
569 b"123456789\nabcdefghi\n",
570 "",
571 "",
572 &["123456789", "abcdefghi"],
573 LineParsingOptions {
574 max_line_length: NumBytes(0),
575 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
576 },
577 );
578 }
579
580 #[test]
581 fn leading_and_trailing_whitespace_is_preserved() {
582 run_test_case(
583 b" 123456789 \n abcdefghi \n",
584 "",
585 "",
586 &[" 123456789 ", " abcdefghi "],
587 LineParsingOptions {
588 max_line_length: NumBytes(0),
589 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
590 },
591 );
592 }
593 }
594}