1use bytes::{Buf, BytesMut};
2use memchr::memchr;
3use std::borrow::Cow;
4
5pub const DEFAULT_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024); pub const DEFAULT_CHANNEL_CAPACITY: usize = 128;
10
11pub mod broadcast;
13
14pub(crate) mod impls;
15
16pub mod single_subscriber;
18
19pub trait OutputStream {
24 fn chunk_size(&self) -> NumBytes;
26
27 fn channel_capacity(&self) -> usize;
29
30 fn name(&self) -> &'static str;
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct FromStreamOptions {
41 pub chunk_size: NumBytes,
47
48 pub channel_capacity: usize,
56}
57
58impl Default for FromStreamOptions {
59 fn default() -> Self {
60 Self {
61 chunk_size: DEFAULT_CHUNK_SIZE,
62 channel_capacity: DEFAULT_CHANNEL_CAPACITY, }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct Chunk(bytes::Bytes);
81
82impl AsRef<[u8]> for Chunk {
83 fn as_ref(&self) -> &[u8] {
84 self.0.chunk()
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub(crate) enum StreamEvent {
90 Chunk(Chunk),
91 Gap,
92 Eof,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum BackpressureControl {
98 DropLatestIncomingIfBufferFull,
103
104 BlockUntilBufferHasSpace,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum Next {
118 Continue,
120
121 Break,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum LineOverflowBehavior {
128 #[default]
134 DropAdditionalData,
135
136 EmitAdditionalAsNewLines,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub enum LineWriteMode {
149 AsIs,
154
155 AppendLf,
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub struct LineParsingOptions {
165 pub max_line_length: NumBytes,
178
179 pub overflow_behavior: LineOverflowBehavior,
185}
186
187impl Default for LineParsingOptions {
188 fn default() -> Self {
189 Self {
190 max_line_length: 16.kilobytes(),
191 overflow_behavior: LineOverflowBehavior::default(),
192 }
193 }
194}
195
196pub(crate) struct LineParserState {
198 line_buffer: BytesMut,
199 discard_until_newline: bool,
200}
201
202impl LineParserState {
203 pub fn new() -> Self {
204 Self {
205 line_buffer: BytesMut::new(),
206 discard_until_newline: false,
207 }
208 }
209
210 pub fn on_gap(&mut self) {
211 self.line_buffer.clear();
212 self.discard_until_newline = true;
213 }
214
215 pub fn visit_chunk(
216 &mut self,
217 mut chunk: &[u8],
218 options: LineParsingOptions,
219 mut f: impl FnMut(Cow<'_, str>) -> Next,
220 ) -> Next {
221 while !chunk.is_empty() {
222 if self.discard_until_newline {
223 match memchr(b'\n', chunk) {
224 Some(pos) => {
225 self.discard_until_newline = false;
226 chunk = &chunk[pos + 1..];
227 }
228 None => return Next::Continue,
229 }
230 continue;
231 }
232
233 if options.max_line_length.0 != 0 && self.line_buffer.len() == options.max_line_length.0
234 {
235 match options.overflow_behavior {
236 LineOverflowBehavior::DropAdditionalData => {
237 if self.emit_line(&mut f) == Next::Break {
238 return Next::Break;
239 }
240 self.discard_until_newline = true;
241 }
242 LineOverflowBehavior::EmitAdditionalAsNewLines => {
243 if self.emit_line(&mut f) == Next::Break {
244 return Next::Break;
245 }
246 }
247 }
248 continue;
249 }
250
251 let remaining_line_length = if options.max_line_length.0 == 0 {
252 chunk.len()
253 } else {
254 options.max_line_length.0 - self.line_buffer.len()
255 };
256 let scan_len = remaining_line_length.min(chunk.len());
257 let scan = &chunk[..scan_len];
258
259 if let Some(pos) = memchr(b'\n', scan) {
260 let result = if self.line_buffer.is_empty() {
262 f(String::from_utf8_lossy(&scan[..pos]))
263 } else {
264 self.line_buffer.extend_from_slice(&scan[..pos]);
265 self.emit_line(&mut f)
266 };
267
268 if result == Next::Break {
269 return Next::Break;
270 }
271 chunk = &chunk[pos + 1..];
272 continue;
273 }
274
275 self.line_buffer.extend_from_slice(scan);
276 chunk = &chunk[scan_len..];
277
278 if options.max_line_length.0 != 0
279 && self.line_buffer.len() == options.max_line_length.0
280 && matches!(
281 options.overflow_behavior,
282 LineOverflowBehavior::EmitAdditionalAsNewLines
283 )
284 && self.emit_line(&mut f) == Next::Break
285 {
286 return Next::Break;
287 }
288 }
289
290 Next::Continue
291 }
292
293 pub(crate) fn owned_lines<'a>(
294 &'a mut self,
295 chunk: &'a [u8],
296 options: LineParsingOptions,
297 ) -> OwnedLineReader<'a> {
298 OwnedLineReader {
299 parser: self,
300 chunk,
301 options,
302 }
303 }
304
305 pub fn finish(&self, f: impl FnOnce(Cow<'_, str>) -> Next) -> Next {
306 if self.discard_until_newline || self.line_buffer.is_empty() {
307 Next::Continue
308 } else {
309 f(String::from_utf8_lossy(&self.line_buffer))
310 }
311 }
312
313 pub(crate) fn finish_owned(&self) -> Option<String> {
314 if self.discard_until_newline || self.line_buffer.is_empty() {
315 None
316 } else {
317 Some(String::from_utf8_lossy(&self.line_buffer).into_owned())
318 }
319 }
320
321 fn emit_line(&mut self, f: &mut impl FnMut(Cow<'_, str>) -> Next) -> Next {
322 let line = self.line_buffer.split().freeze();
323 f(String::from_utf8_lossy(&line))
324 }
325
326 fn emit_owned_line(&mut self) -> String {
327 let line = self.line_buffer.split().freeze();
328 String::from_utf8_lossy(&line).into_owned()
329 }
330}
331
332pub(crate) struct OwnedLineReader<'a> {
333 parser: &'a mut LineParserState,
334 chunk: &'a [u8],
335 options: LineParsingOptions,
336}
337
338impl Iterator for OwnedLineReader<'_> {
339 type Item = String;
340
341 fn next(&mut self) -> Option<Self::Item> {
342 while !self.chunk.is_empty() {
343 if self.parser.discard_until_newline {
344 if let Some(pos) = memchr(b'\n', self.chunk) {
345 self.parser.discard_until_newline = false;
346 self.chunk = &self.chunk[pos + 1..];
347 } else {
348 self.chunk = &[];
349 return None;
350 }
351 continue;
352 }
353
354 if self.options.max_line_length.0 != 0
355 && self.parser.line_buffer.len() == self.options.max_line_length.0
356 {
357 return match self.options.overflow_behavior {
358 LineOverflowBehavior::DropAdditionalData => {
359 self.parser.discard_until_newline = true;
360 Some(self.parser.emit_owned_line())
361 }
362 LineOverflowBehavior::EmitAdditionalAsNewLines => {
363 Some(self.parser.emit_owned_line())
364 }
365 };
366 }
367
368 let remaining_line_length = if self.options.max_line_length.0 == 0 {
369 self.chunk.len()
370 } else {
371 self.options.max_line_length.0 - self.parser.line_buffer.len()
372 };
373 let scan_len = remaining_line_length.min(self.chunk.len());
374 let scan = &self.chunk[..scan_len];
375
376 if let Some(pos) = memchr(b'\n', scan) {
377 self.chunk = &self.chunk[pos + 1..];
378 if self.parser.line_buffer.is_empty() {
379 return Some(String::from_utf8_lossy(&scan[..pos]).into_owned());
380 }
381 self.parser.line_buffer.extend_from_slice(&scan[..pos]);
382 return Some(self.parser.emit_owned_line());
383 }
384
385 self.parser.line_buffer.extend_from_slice(scan);
386 self.chunk = &self.chunk[scan_len..];
387
388 if self.options.max_line_length.0 != 0
389 && self.parser.line_buffer.len() == self.options.max_line_length.0
390 && matches!(
391 self.options.overflow_behavior,
392 LineOverflowBehavior::EmitAdditionalAsNewLines
393 )
394 {
395 return Some(self.parser.emit_owned_line());
396 }
397 }
398
399 None
400 }
401}
402
403#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub struct NumBytes(usize);
413
414impl NumBytes {
415 #[must_use]
417 pub fn zero() -> Self {
418 Self(0)
419 }
420
421 pub(crate) fn assert_non_zero(self, parameter_name: &str) {
422 assert!(
423 self.0 > 0,
424 "{parameter_name} must be greater than zero bytes"
425 );
426 }
427
428 #[must_use]
430 pub fn bytes(&self) -> usize {
431 self.0
432 }
433}
434
435pub trait NumBytesExt {
437 fn bytes(self) -> NumBytes;
439
440 fn kilobytes(self) -> NumBytes;
442
443 fn megabytes(self) -> NumBytes;
445}
446
447impl NumBytesExt for usize {
448 fn bytes(self) -> NumBytes {
449 NumBytes(self)
450 }
451
452 fn kilobytes(self) -> NumBytes {
453 NumBytes(self * 1024)
454 }
455
456 fn megabytes(self) -> NumBytes {
457 NumBytes(self * 1024 * 1024)
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::time::Duration;
464 use tokio::io::{AsyncWrite, AsyncWriteExt};
465
466 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
467 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
468 tokio::time::sleep(Duration::from_millis(50)).await;
469 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
470 tokio::time::sleep(Duration::from_millis(50)).await;
471 write.write_all("README.md\n".as_bytes()).await.unwrap();
472 tokio::time::sleep(Duration::from_millis(50)).await;
473 write.write_all("src\n".as_bytes()).await.unwrap();
474 tokio::time::sleep(Duration::from_millis(50)).await;
475 write.write_all("target\n".as_bytes()).await.unwrap();
476 tokio::time::sleep(Duration::from_millis(50)).await;
477 }
478
479 mod line_parser_state {
480 use crate::output_stream::LineParserState;
481 use crate::{LineOverflowBehavior, LineParsingOptions, Next, NumBytes, NumBytesExt};
482 use assertr::prelude::*;
483
484 fn run_test_case(
485 chunks: &[&[u8]],
486 mark_gap_before_chunk: Option<usize>,
487 expected_lines: &[&str],
488 options: LineParsingOptions,
489 ) {
490 let mut parser = LineParserState::new();
491 let mut collected_lines = Vec::<String>::new();
492
493 for (index, chunk) in chunks.iter().enumerate() {
494 if mark_gap_before_chunk == Some(index) {
495 parser.on_gap();
496 }
497
498 assert_that!(parser.visit_chunk(chunk, options, |line| {
499 collected_lines.push(line.into_owned());
500 Next::Continue
501 }))
502 .is_equal_to(Next::Continue);
503 }
504
505 let _ = parser.finish(|line| {
506 collected_lines.push(line.into_owned());
507 Next::Continue
508 });
509
510 let expected_lines: Vec<String> = expected_lines
511 .iter()
512 .map(std::string::ToString::to_string)
513 .collect();
514 assert_that!(collected_lines).is_equal_to(expected_lines);
515 }
516
517 fn as_single_byte_chunks(data: &str) -> Vec<&[u8]> {
518 data.as_bytes().iter().map(std::slice::from_ref).collect()
519 }
520
521 #[test]
522 fn empty_chunk() {
523 run_test_case(&[b""], None, &[], LineParsingOptions::default());
524 }
525
526 #[test]
527 fn chunk_without_any_newlines() {
528 run_test_case(
529 &[b"no newlines here"],
530 None,
531 &["no newlines here"],
532 LineParsingOptions::default(),
533 );
534 }
535
536 #[test]
537 fn single_completed_line() {
538 run_test_case(
539 &[b"one line\n"],
540 None,
541 &["one line"],
542 LineParsingOptions::default(),
543 );
544 }
545
546 #[test]
547 fn multiple_completed_lines() {
548 run_test_case(
549 &[b"first line\nsecond line\nthird line\n"],
550 None,
551 &["first line", "second line", "third line"],
552 LineParsingOptions::default(),
553 );
554 }
555
556 #[test]
557 fn partial_line_at_the_end() {
558 run_test_case(
559 &[b"complete line\npartial"],
560 None,
561 &["complete line", "partial"],
562 LineParsingOptions::default(),
563 );
564 }
565
566 #[test]
567 fn initial_line_with_multiple_newlines() {
568 run_test_case(
569 &[b"previous: continuation\nmore lines\n"],
570 None,
571 &["previous: continuation", "more lines"],
572 LineParsingOptions::default(),
573 );
574 }
575
576 #[test]
577 fn invalid_utf8_data() {
578 run_test_case(
579 &[b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n"],
580 None,
581 &["valid utf8�(�� invalid utf8"],
582 LineParsingOptions::default(),
583 );
584 }
585
586 #[test]
587 fn rest_of_too_long_line_is_dropped() {
588 run_test_case(
589 &[b"123456789\nabcdefghi\n"],
590 None,
591 &["1234", "abcd"],
592 LineParsingOptions {
593 max_line_length: 4.bytes(),
594 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
595 },
596 );
597 }
598
599 #[test]
600 fn rest_of_too_long_line_is_returned_as_additional_lines() {
601 run_test_case(
602 &[b"123456789\nabcdefghi\n"],
603 None,
604 &["1234", "5678", "9", "abcd", "efgh", "i"],
605 LineParsingOptions {
606 max_line_length: 4.bytes(),
607 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
608 },
609 );
610 }
611
612 #[test]
613 fn max_line_length_of_0_disables_line_length_checks_test1() {
614 run_test_case(
615 &[b"123456789\nabcdefghi\n"],
616 None,
617 &["123456789", "abcdefghi"],
618 LineParsingOptions {
619 max_line_length: NumBytes::zero(),
620 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
621 },
622 );
623 }
624
625 #[test]
626 fn max_line_length_of_0_disables_line_length_checks_test2() {
627 run_test_case(
628 &[b"123456789\nabcdefghi\n"],
629 None,
630 &["123456789", "abcdefghi"],
631 LineParsingOptions {
632 max_line_length: NumBytes::zero(),
633 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
634 },
635 );
636 }
637
638 #[test]
639 fn leading_and_trailing_whitespace_is_preserved() {
640 run_test_case(
641 &[b" 123456789 \n abcdefghi \n"],
642 None,
643 &[" 123456789 ", " abcdefghi "],
644 LineParsingOptions {
645 max_line_length: NumBytes::zero(),
646 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
647 },
648 );
649 }
650
651 #[test]
652 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
653 let chunks = as_single_byte_chunks("❤️❤️❤️\n👍\n");
654 run_test_case(
655 &chunks,
656 None,
657 &["❤️❤️❤️", "👍"],
658 LineParsingOptions::default(),
659 );
660 }
661
662 #[test]
663 fn overflow_drop_additional_data_persists_across_chunks() {
664 run_test_case(
665 &[b"1234", b"5678", b"9\nok\n"],
666 None,
667 &["1234", "ok"],
668 LineParsingOptions {
669 max_line_length: 4.bytes(),
670 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
671 },
672 );
673 }
674
675 #[test]
676 fn gap_discards_partial_line_until_next_newline() {
677 run_test_case(
678 &[b"rea", b"dy\nnext\n"],
679 Some(1),
680 &["next"],
681 LineParsingOptions::default(),
682 );
683 }
684 }
685}