1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::{AsyncLineVisitor, LineVisitor};
5use crate::output_stream::num_bytes::NumBytes;
6use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
7use std::borrow::Cow;
8use std::collections::VecDeque;
9use std::future::Future;
10use std::ops::Deref;
11use std::pin::Pin;
12use typed_builder::TypedBuilder;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16pub enum CollectionOverflowBehavior {
17 #[default]
19 DropAdditionalData,
20
21 DropOldestData,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum RawCollectionOptions {
28 Bounded {
30 max_bytes: NumBytes,
32
33 overflow_behavior: CollectionOverflowBehavior,
35 },
36
37 TrustedUnbounded,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum LineCollectionOptions {
46 Bounded {
48 max_bytes: NumBytes,
50
51 max_lines: usize,
53
54 overflow_behavior: CollectionOverflowBehavior,
56 },
57
58 TrustedUnbounded,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct CollectedBytes {
67 pub bytes: Vec<u8>,
69
70 pub truncated: bool,
72}
73
74impl CollectedBytes {
75 #[must_use]
77 pub fn new() -> Self {
78 Self {
79 bytes: Vec::new(),
80 truncated: false,
81 }
82 }
83
84 #[must_use = "the returned closure must be wired into a CollectChunks visitor; dropping it discards the collection logic"]
88 pub fn collector(
89 options: RawCollectionOptions,
90 ) -> impl FnMut(Chunk, &mut Self) + Send + 'static {
91 move |chunk: Chunk, sink: &mut Self| {
92 sink.push_chunk(chunk.as_ref(), options);
93 }
94 }
95
96 pub fn push_chunk(&mut self, chunk: &[u8], options: RawCollectionOptions) {
98 match options {
99 RawCollectionOptions::TrustedUnbounded => self.bytes.extend_from_slice(chunk),
100 RawCollectionOptions::Bounded {
101 max_bytes,
102 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
103 } => {
104 let max_bytes = max_bytes.bytes();
105 let remaining = max_bytes.saturating_sub(self.bytes.len());
106 if chunk.len() > remaining {
107 self.truncated = true;
108 }
109 self.bytes
110 .extend_from_slice(&chunk[..remaining.min(chunk.len())]);
111 }
112 RawCollectionOptions::Bounded {
113 max_bytes,
114 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
115 } => {
116 let max_bytes = max_bytes.bytes();
117 if chunk.len() > max_bytes {
118 self.bytes.clear();
119 self.bytes
120 .extend_from_slice(&chunk[chunk.len().saturating_sub(max_bytes)..]);
121 self.truncated = true;
122 return;
123 }
124
125 let required = self.bytes.len() + chunk.len();
126 if required > max_bytes {
127 self.bytes.drain(0..required - max_bytes);
128 self.truncated = true;
129 }
130 self.bytes.extend_from_slice(chunk);
131 }
132 }
133 }
134}
135
136impl Default for CollectedBytes {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl Deref for CollectedBytes {
143 type Target = [u8];
144
145 fn deref(&self) -> &Self::Target {
146 &self.bytes
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct CollectedLines {
153 lines: VecDeque<String>,
154 truncated: bool,
155 retained_bytes: usize,
156}
157
158impl CollectedLines {
159 #[must_use]
161 pub fn new() -> Self {
162 Self {
163 lines: VecDeque::new(),
164 truncated: false,
165 retained_bytes: 0,
166 }
167 }
168
169 #[must_use]
171 pub fn lines(&self) -> &VecDeque<String> {
172 &self.lines
173 }
174
175 #[must_use]
177 pub fn truncated(&self) -> bool {
178 self.truncated
179 }
180
181 #[must_use]
183 pub fn into_lines(self) -> VecDeque<String> {
184 self.lines
185 }
186
187 #[must_use]
189 pub fn into_parts(self) -> (VecDeque<String>, bool) {
190 (self.lines, self.truncated)
191 }
192
193 #[must_use = "the returned closure must be wired into a CollectLines visitor; dropping it discards the collection logic"]
198 pub fn line_collector(
199 options: LineCollectionOptions,
200 ) -> impl FnMut(Cow<'_, str>, &mut Self) -> Next + Send + 'static {
201 move |line: Cow<'_, str>, sink: &mut Self| {
202 sink.push_line(line.into_owned(), options);
203 Next::Continue
204 }
205 }
206
207 pub fn push_line(&mut self, line: String, options: LineCollectionOptions) {
215 match options {
216 LineCollectionOptions::TrustedUnbounded => self.push_back(line),
217 LineCollectionOptions::Bounded {
218 max_bytes,
219 max_lines,
220 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
221 } => {
222 let line_len = line.len();
223 let max_bytes = max_bytes.bytes();
224 if self.lines.len() >= max_lines
225 || line_len > max_bytes
226 || line_len > max_bytes.saturating_sub(self.retained_bytes)
227 {
228 self.truncated = true;
229 return;
230 }
231 self.push_back(line);
232 }
233 LineCollectionOptions::Bounded {
234 max_bytes,
235 max_lines,
236 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
237 } => {
238 let line_len = line.len();
239 let max_bytes = max_bytes.bytes();
240 if max_lines == 0 {
241 self.truncated = true;
242 return;
243 }
244 if line_len > max_bytes {
245 self.truncated = true;
246 return;
247 }
248
249 while self.lines.len() >= max_lines
250 || line_len > max_bytes.saturating_sub(self.retained_bytes)
251 {
252 self.pop_front()
253 .expect("line buffer to contain an evictable line");
254 self.truncated = true;
255 }
256 self.push_back(line);
257 }
258 }
259 }
260
261 fn push_back(&mut self, line: String) {
262 self.retained_bytes += line.len();
263 self.lines.push_back(line);
264 }
265
266 fn pop_front(&mut self) -> Option<String> {
267 let line = self.lines.pop_front()?;
268 self.retained_bytes -= line.len();
269 Some(line)
270 }
271}
272
273impl Default for CollectedLines {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279impl Deref for CollectedLines {
280 type Target = VecDeque<String>;
281
282 fn deref(&self) -> &Self::Target {
283 &self.lines
284 }
285}
286
287#[derive(TypedBuilder)]
296pub struct CollectChunks<T, F>
297where
298 T: Sink,
299 F: FnMut(Chunk, &mut T) + Send + 'static,
300{
301 pub sink: T,
303 pub f: F,
305}
306
307impl<T, F> CollectChunks<T, F>
308where
309 T: Sink,
310 F: FnMut(Chunk, &mut T) + Send + 'static,
311{
312 #[must_use]
317 pub fn fold(sink: T, f: F) -> Self {
318 Self { sink, f }
319 }
320}
321
322impl<T, F> StreamVisitor for CollectChunks<T, F>
323where
324 T: Sink,
325 F: FnMut(Chunk, &mut T) + Send + 'static,
326{
327 type Output = T;
328
329 fn on_chunk(&mut self, chunk: Chunk) -> Next {
330 (self.f)(chunk, &mut self.sink);
331 Next::Continue
332 }
333
334 fn into_output(self) -> Self::Output {
335 self.sink
336 }
337}
338
339pub struct CollectChunksAsync<T, F>
360where
361 T: Sink,
362 F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
363 + Send
364 + 'static,
365{
366 sink: T,
367 f: F,
368}
369
370impl<T, F> CollectChunksAsync<T, F>
371where
372 T: Sink,
373 F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
374 + Send
375 + 'static,
376{
377 #[must_use]
382 pub fn fold(sink: T, f: F) -> Self {
383 Self { sink, f }
384 }
385}
386
387impl<T, F> AsyncStreamVisitor for CollectChunksAsync<T, F>
388where
389 T: Sink,
390 F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
391 + Send
392 + 'static,
393{
394 type Output = T;
395
396 fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_ {
397 (self.f)(chunk, &mut self.sink)
398 }
399
400 fn into_output(self) -> Self::Output {
401 self.sink
402 }
403}
404
405pub struct CollectLines<T, F> {
411 sink: T,
412 f: F,
413}
414
415impl<T, F> CollectLines<T, F>
416where
417 T: Sink,
418 F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
419{
420 pub fn new(sink: T, f: F) -> Self {
422 Self { sink, f }
423 }
424}
425
426impl<T, F> LineVisitor for CollectLines<T, F>
427where
428 T: Sink,
429 F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
430{
431 type Output = T;
432
433 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
434 (self.f)(line, &mut self.sink)
435 }
436
437 fn into_output(self) -> Self::Output {
438 self.sink
439 }
440}
441
442pub struct CollectLinesAsync<T, F>
451where
452 T: Sink,
453 F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
454 + Send
455 + 'static,
456{
457 sink: T,
458 f: F,
459}
460
461impl<T, F> CollectLinesAsync<T, F>
462where
463 T: Sink,
464 F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
465 + Send
466 + 'static,
467{
468 pub fn new(sink: T, f: F) -> Self {
471 Self { sink, f }
472 }
473}
474
475impl<T, F> AsyncLineVisitor for CollectLinesAsync<T, F>
476where
477 T: Sink,
478 F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
479 + Send
480 + 'static,
481{
482 type Output = T;
483
484 fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a {
485 (self.f)(line, &mut self.sink)
486 }
487
488 fn into_output(self) -> Self::Output {
489 self.sink
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::ConsumerError;
497 use crate::output_stream::consumer::driver::{spawn_consumer_async, spawn_consumer_sync};
498 use crate::output_stream::event::StreamEvent;
499 use crate::output_stream::event::tests::event_receiver;
500 use crate::output_stream::line::adapter::ParseLines;
501 use crate::output_stream::line::options::LineParsingOptions;
502 use crate::output_stream::num_bytes::NumBytesExt;
503 use assertr::prelude::*;
504 use bytes::Bytes;
505 use std::borrow::Cow;
506 use std::io;
507
508 fn drop_oldest_options(max_bytes: usize, max_lines: usize) -> LineCollectionOptions {
509 LineCollectionOptions::Bounded {
510 max_bytes: max_bytes.bytes(),
511 max_lines,
512 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
513 }
514 }
515
516 fn assert_retained_bytes_match_lines(collected: &CollectedLines) {
517 assert_that!(collected.retained_bytes)
518 .is_equal_to(collected.lines.iter().map(String::len).sum::<usize>());
519 }
520
521 struct ChunkCase {
522 name: &'static str,
523 overflow: CollectionOverflowBehavior,
524 max_bytes: usize,
525 chunks: &'static [&'static [u8]],
526 expected_bytes: &'static [u8],
527 expected_truncated: bool,
528 }
529
530 const CHUNK_BOUNDARY_CASES: &[ChunkCase] = &[
531 ChunkCase {
532 name: "drop_additional/empty_chunk_is_no_op",
533 overflow: CollectionOverflowBehavior::DropAdditionalData,
534 max_bytes: 5,
535 chunks: &[b""],
536 expected_bytes: b"",
537 expected_truncated: false,
538 },
539 ChunkCase {
540 name: "drop_additional/single_chunk_exactly_fills_buffer",
541 overflow: CollectionOverflowBehavior::DropAdditionalData,
542 max_bytes: 5,
543 chunks: &[b"abcde"],
544 expected_bytes: b"abcde",
545 expected_truncated: false,
546 },
547 ChunkCase {
548 name: "drop_additional/single_chunk_overshoots_by_one_byte",
549 overflow: CollectionOverflowBehavior::DropAdditionalData,
550 max_bytes: 5,
551 chunks: &[b"abcdef"],
552 expected_bytes: b"abcde",
553 expected_truncated: true,
554 },
555 ChunkCase {
556 name: "drop_additional/second_chunk_straddles_limit",
557 overflow: CollectionOverflowBehavior::DropAdditionalData,
558 max_bytes: 5,
559 chunks: &[b"abc", b"def"],
560 expected_bytes: b"abcde",
561 expected_truncated: true,
562 },
563 ChunkCase {
564 name: "drop_additional/first_chunk_exactly_fills_then_second_chunk_rejected",
565 overflow: CollectionOverflowBehavior::DropAdditionalData,
566 max_bytes: 5,
567 chunks: &[b"abcde", b"f"],
568 expected_bytes: b"abcde",
569 expected_truncated: true,
570 },
571 ChunkCase {
572 name: "drop_oldest/empty_chunk_is_no_op",
573 overflow: CollectionOverflowBehavior::DropOldestData,
574 max_bytes: 5,
575 chunks: &[b""],
576 expected_bytes: b"",
577 expected_truncated: false,
578 },
579 ChunkCase {
580 name: "drop_oldest/single_chunk_exactly_fills_buffer",
581 overflow: CollectionOverflowBehavior::DropOldestData,
582 max_bytes: 5,
583 chunks: &[b"abcde"],
584 expected_bytes: b"abcde",
585 expected_truncated: false,
586 },
587 ChunkCase {
588 name: "drop_oldest/single_chunk_overshoots_by_one_byte_into_empty",
589 overflow: CollectionOverflowBehavior::DropOldestData,
590 max_bytes: 5,
591 chunks: &[b"abcdef"],
592 expected_bytes: b"bcdef",
593 expected_truncated: true,
594 },
595 ChunkCase {
596 name: "drop_oldest/second_chunk_straddles_limit_evicts_front",
597 overflow: CollectionOverflowBehavior::DropOldestData,
598 max_bytes: 5,
599 chunks: &[b"abc", b"def"],
600 expected_bytes: b"bcdef",
601 expected_truncated: true,
602 },
603 ChunkCase {
604 name: "drop_oldest/oversized_chunk_into_empty_clears_and_keeps_tail",
605 overflow: CollectionOverflowBehavior::DropOldestData,
606 max_bytes: 5,
607 chunks: &[b"abcdefgh"],
608 expected_bytes: b"defgh",
609 expected_truncated: true,
610 },
611 ChunkCase {
612 name: "drop_oldest/oversized_chunk_into_partial_clears_existing",
613 overflow: CollectionOverflowBehavior::DropOldestData,
614 max_bytes: 5,
615 chunks: &[b"ab", b"cdefgh"],
616 expected_bytes: b"defgh",
617 expected_truncated: true,
618 },
619 ChunkCase {
620 name: "drop_oldest/first_chunk_exactly_fills_then_second_evicts_front",
621 overflow: CollectionOverflowBehavior::DropOldestData,
622 max_bytes: 5,
623 chunks: &[b"abcde", b"f"],
624 expected_bytes: b"bcdef",
625 expected_truncated: true,
626 },
627 ];
628
629 #[test]
630 fn push_chunk_boundary_matrix() {
631 for case in CHUNK_BOUNDARY_CASES {
632 let mut collected = CollectedBytes::new();
633 let options = RawCollectionOptions::Bounded {
634 max_bytes: case.max_bytes.bytes(),
635 overflow_behavior: case.overflow,
636 };
637 for chunk in case.chunks {
638 collected.push_chunk(chunk, options);
639 }
640
641 assert_that!(collected.bytes.as_slice())
642 .with_detail_message(format!("case: {}", case.name))
643 .is_equal_to(case.expected_bytes);
644 assert_that!(collected.truncated)
645 .with_detail_message(format!("case: {}", case.name))
646 .is_equal_to(case.expected_truncated);
647 }
648 }
649
650 struct LineCase {
651 name: &'static str,
652 overflow: CollectionOverflowBehavior,
653 max_bytes: usize,
654 max_lines: usize,
655 push: &'static [&'static str],
656 expected_lines: &'static [&'static str],
657 expected_truncated: bool,
658 }
659
660 const LINE_BOUNDARY_CASES: &[LineCase] = &[
661 LineCase {
662 name: "drop_additional/line_exactly_fills_byte_budget_with_slot_left",
663 overflow: CollectionOverflowBehavior::DropAdditionalData,
664 max_bytes: 5,
665 max_lines: 2,
666 push: &["abcde"],
667 expected_lines: &["abcde"],
668 expected_truncated: false,
669 },
670 LineCase {
671 name: "drop_additional/max_lines_reached_before_max_bytes",
672 overflow: CollectionOverflowBehavior::DropAdditionalData,
673 max_bytes: 100,
674 max_lines: 2,
675 push: &["a", "b", "c"],
676 expected_lines: &["a", "b"],
677 expected_truncated: true,
678 },
679 LineCase {
680 name: "drop_additional/max_bytes_reached_before_max_lines",
681 overflow: CollectionOverflowBehavior::DropAdditionalData,
682 max_bytes: 4,
683 max_lines: 10,
684 push: &["aa", "bb", "cc"],
685 expected_lines: &["aa", "bb"],
686 expected_truncated: true,
687 },
688 LineCase {
689 name: "drop_additional/line_equal_to_remaining_budget_accepted",
690 overflow: CollectionOverflowBehavior::DropAdditionalData,
691 max_bytes: 6,
692 max_lines: 10,
693 push: &["abc", "def"],
694 expected_lines: &["abc", "def"],
695 expected_truncated: false,
696 },
697 LineCase {
698 name: "drop_additional/line_one_byte_over_remaining_rejected",
699 overflow: CollectionOverflowBehavior::DropAdditionalData,
700 max_bytes: 6,
701 max_lines: 10,
702 push: &["abc", "defg"],
703 expected_lines: &["abc"],
704 expected_truncated: true,
705 },
706 LineCase {
707 name: "drop_additional/line_strictly_larger_than_max_bytes_rejected",
708 overflow: CollectionOverflowBehavior::DropAdditionalData,
709 max_bytes: 5,
710 max_lines: 10,
711 push: &["abc", "xxxxxxxxx"],
712 expected_lines: &["abc"],
713 expected_truncated: true,
714 },
715 LineCase {
716 name: "drop_oldest/line_exactly_fills_byte_budget_with_slot_left",
717 overflow: CollectionOverflowBehavior::DropOldestData,
718 max_bytes: 5,
719 max_lines: 2,
720 push: &["abcde"],
721 expected_lines: &["abcde"],
722 expected_truncated: false,
723 },
724 LineCase {
725 name: "drop_oldest/max_lines_reached_before_max_bytes_evicts_one",
726 overflow: CollectionOverflowBehavior::DropOldestData,
727 max_bytes: 100,
728 max_lines: 2,
729 push: &["a", "b", "c"],
730 expected_lines: &["b", "c"],
731 expected_truncated: true,
732 },
733 LineCase {
734 name: "drop_oldest/max_bytes_reached_before_max_lines_evicts_one",
735 overflow: CollectionOverflowBehavior::DropOldestData,
736 max_bytes: 6,
737 max_lines: 10,
738 push: &["aaa", "bbb", "ccc"],
739 expected_lines: &["bbb", "ccc"],
740 expected_truncated: true,
741 },
742 LineCase {
743 name: "drop_oldest/incoming_line_requires_evicting_multiple_lines",
744 overflow: CollectionOverflowBehavior::DropOldestData,
745 max_bytes: 8,
746 max_lines: 100,
747 push: &["a", "b", "cc", "dddd", "eeeeee"],
748 expected_lines: &["eeeeee"],
749 expected_truncated: true,
750 },
751 LineCase {
752 name: "drop_oldest/line_strictly_larger_than_max_bytes_rejected",
753 overflow: CollectionOverflowBehavior::DropOldestData,
754 max_bytes: 5,
755 max_lines: 10,
756 push: &["abc", "xxxxxxxxx"],
757 expected_lines: &["abc"],
758 expected_truncated: true,
759 },
760 ];
761
762 #[test]
763 fn push_line_boundary_matrix() {
764 for case in LINE_BOUNDARY_CASES {
765 let mut collected = CollectedLines::new();
766 let options = LineCollectionOptions::Bounded {
767 max_bytes: case.max_bytes.bytes(),
768 max_lines: case.max_lines,
769 overflow_behavior: case.overflow,
770 };
771 for line in case.push {
772 collected.push_line((*line).to_string(), options);
773 }
774
775 let actual_lines: Vec<&str> = collected.lines().iter().map(String::as_str).collect();
776 assert_that!(actual_lines)
777 .with_detail_message(format!("case: {}", case.name))
778 .is_equal_to(case.expected_lines.to_vec());
779 assert_that!(collected.truncated())
780 .with_detail_message(format!("case: {}", case.name))
781 .is_equal_to(case.expected_truncated);
782 assert_that!(collected.retained_bytes)
783 .with_detail_message(format!("case: {} (retained_bytes)", case.name))
784 .is_equal_to(
785 case.expected_lines
786 .iter()
787 .map(|line| line.len())
788 .sum::<usize>(),
789 );
790 }
791 }
792
793 #[test]
794 fn raw_collection_keeps_expected_bytes_when_truncated() {
795 let mut collected = CollectedBytes::new();
796 let options = RawCollectionOptions::Bounded {
797 max_bytes: 5.bytes(),
798 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
799 };
800
801 collected.push_chunk(b"abc", options);
802 collected.push_chunk(b"def", options);
803
804 assert_that!(collected.bytes.as_slice()).is_equal_to(b"abcde".as_slice());
805 assert_that!(collected.truncated).is_true();
806
807 let mut collected = CollectedBytes::new();
808 let options = RawCollectionOptions::Bounded {
809 max_bytes: 5.bytes(),
810 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
811 };
812
813 collected.push_chunk(b"abc", options);
814 collected.push_chunk(b"def", options);
815
816 assert_that!(collected.bytes.as_slice()).is_equal_to(b"bcdef".as_slice());
817 assert_that!(collected.truncated).is_true();
818 }
819
820 #[test]
821 fn basic_line_collection_limit_modes() {
822 let mut collected = CollectedLines::new();
823 let options = LineCollectionOptions::Bounded {
824 max_bytes: 7.bytes(),
825 max_lines: 2,
826 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
827 };
828
829 collected.push_line("one".to_string(), options);
830 collected.push_line("two".to_string(), options);
831 collected.push_line("three".to_string(), options);
832
833 assert_that!(
834 collected
835 .lines()
836 .iter()
837 .map(String::as_str)
838 .collect::<Vec<_>>()
839 )
840 .is_equal_to(vec!["one", "two"]);
841 assert_that!(collected.truncated()).is_true();
842
843 let mut collected = CollectedLines::new();
844 let options = LineCollectionOptions::Bounded {
845 max_bytes: 6.bytes(),
846 max_lines: 2,
847 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
848 };
849
850 collected.push_line("one".to_string(), options);
851 collected.push_line("two".to_string(), options);
852 collected.push_line("six".to_string(), options);
853
854 assert_that!(
855 collected
856 .lines()
857 .iter()
858 .map(String::as_str)
859 .collect::<Vec<_>>()
860 )
861 .is_equal_to(vec!["two", "six"]);
862 assert_that!(collected.truncated()).is_true();
863 }
864
865 #[test]
866 fn retained_bytes_tracks_appended_lines() {
867 let options = LineCollectionOptions::Bounded {
868 max_bytes: 100.bytes(),
869 max_lines: 100,
870 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
871 };
872 let mut collected = CollectedLines::new();
873
874 collected.push_line("aaa".to_string(), options);
875 collected.push_line("bbbb".to_string(), options);
876
877 assert_that!(collected.retained_bytes).is_equal_to(7);
878 assert_retained_bytes_match_lines(&collected);
879 }
880
881 #[test]
882 fn drop_oldest_preserves_retained_lines_when_oversized_line_arrives() {
883 let options = drop_oldest_options(10, 100);
884 let mut collected = CollectedLines::new();
885
886 collected.push_line("aaa".to_string(), options);
887 collected.push_line("bbb".to_string(), options);
888 collected.push_line("x".repeat(13), options);
889
890 assert_that!(collected.lines())
891 .with_detail_message(
892 "previously-retained lines must survive an oversized incoming line",
893 )
894 .is_equal_to(VecDeque::from(["aaa".to_string(), "bbb".to_string()]));
895 assert_that!(collected.retained_bytes).is_equal_to(6);
896 assert_retained_bytes_match_lines(&collected);
897 assert_that!(collected.truncated()).is_true();
898 }
899
900 #[test]
901 fn drop_oldest_evicts_old_lines_when_new_line_fits_but_budget_is_exceeded() {
902 let options = drop_oldest_options(10, 100);
903 let mut collected = CollectedLines::new();
904
905 collected.push_line("aaaa".to_string(), options);
906 collected.push_line("bbbb".to_string(), options);
907 collected.push_line("cccc".to_string(), options);
908
909 assert_that!(collected.lines())
910 .is_equal_to(VecDeque::from(["bbbb".to_string(), "cccc".to_string()]));
911 assert_that!(collected.retained_bytes).is_equal_to(8);
912 assert_retained_bytes_match_lines(&collected);
913 assert_that!(collected.truncated()).is_true();
914 }
915
916 #[test]
917 fn drop_oldest_updates_retained_bytes_when_evicting_by_line_count() {
918 let options = drop_oldest_options(100, 2);
919 let mut collected = CollectedLines::new();
920
921 collected.push_line("a".to_string(), options);
922 collected.push_line("bb".to_string(), options);
923 collected.push_line("ccc".to_string(), options);
924
925 assert_that!(collected.lines())
926 .is_equal_to(VecDeque::from(["bb".to_string(), "ccc".to_string()]));
927 assert_that!(collected.retained_bytes).is_equal_to(5);
928 assert_retained_bytes_match_lines(&collected);
929 assert_that!(collected.truncated()).is_true();
930 }
931
932 #[test]
933 fn drop_oldest_with_zero_max_lines_retains_nothing() {
934 let options = drop_oldest_options(100, 0);
935 let mut collected = CollectedLines::new();
936
937 collected.push_line("aaa".to_string(), options);
938
939 assert_that!(collected.lines().is_empty()).is_true();
940 assert_that!(collected.retained_bytes).is_equal_to(0);
941 assert_retained_bytes_match_lines(&collected);
942 assert_that!(collected.truncated()).is_true();
943 }
944
945 #[test]
946 fn drop_additional_preserves_retained_lines_when_oversized_line_arrives() {
947 let options = LineCollectionOptions::Bounded {
948 max_bytes: 10.bytes(),
949 max_lines: 100,
950 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
951 };
952 let mut collected = CollectedLines::new();
953
954 collected.push_line("aaa".to_string(), options);
955 collected.push_line("x".repeat(13), options);
956
957 assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
958 assert_that!(collected.retained_bytes).is_equal_to(3);
959 assert_retained_bytes_match_lines(&collected);
960 assert_that!(collected.truncated()).is_true();
961 }
962
963 #[test]
964 fn drop_additional_preserves_retained_bytes_when_limit_rejects_line() {
965 let options = LineCollectionOptions::Bounded {
966 max_bytes: 6.bytes(),
967 max_lines: 100,
968 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
969 };
970 let mut collected = CollectedLines::new();
971
972 collected.push_line("aaa".to_string(), options);
973 collected.push_line("bbbb".to_string(), options);
974
975 assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
976 assert_that!(collected.retained_bytes).is_equal_to(3);
977 assert_retained_bytes_match_lines(&collected);
978 assert_that!(collected.truncated()).is_true();
979 }
980
981 #[tokio::test]
982 async fn collectors_return_stream_read_error() {
983 let error =
984 crate::StreamReadError::new("custom", io::Error::from(io::ErrorKind::BrokenPipe));
985 let collector = spawn_consumer_sync(
986 "custom",
987 event_receiver(vec![
988 StreamEvent::Chunk(Chunk(Bytes::from_static(b"complete\npartial"))),
989 StreamEvent::ReadError(error),
990 ])
991 .await,
992 ParseLines::collect(
993 LineParsingOptions::default(),
994 Vec::<String>::new(),
995 |line, lines: &mut Vec<String>| {
996 lines.push(line.into_owned());
997 Next::Continue
998 },
999 ),
1000 );
1001
1002 match collector.wait().await {
1003 Err(ConsumerError::StreamRead { source }) => {
1004 assert_that!(source.stream_name()).is_equal_to("custom");
1005 assert_that!(source.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
1006 }
1007 other => {
1008 assert_that!(&other).fail(format_args!(
1009 "expected consumer stream read error, got {other:?}"
1010 ));
1011 }
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn collectors_skip_gaps_and_keep_final_unterminated_line() {
1017 let collector = spawn_consumer_sync(
1018 "custom",
1019 event_receiver(vec![
1020 StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\npar"))),
1021 StreamEvent::Gap,
1022 StreamEvent::Chunk(Chunk(Bytes::from_static(b"\ntwo\nfinal"))),
1023 StreamEvent::Eof,
1024 ])
1025 .await,
1026 ParseLines::collect(
1027 LineParsingOptions::default(),
1028 Vec::<String>::new(),
1029 |line, lines| {
1030 lines.push(line.into_owned());
1031 Next::Continue
1032 },
1033 ),
1034 );
1035
1036 let lines = collector.wait().await.unwrap();
1037 assert_that!(lines).contains_exactly(["one", "two", "final"]);
1038 }
1039
1040 #[tokio::test]
1041 async fn chunk_collector_async_extends_sink_until_eof() {
1042 let collector = spawn_consumer_async(
1043 "custom",
1044 event_receiver(vec![
1045 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
1046 StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1047 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1048 StreamEvent::Eof,
1049 ])
1050 .await,
1051 CollectChunksAsync::fold(Vec::new(), |chunk, seen: &mut Vec<u8>| {
1052 Box::pin(async move {
1053 seen.extend_from_slice(chunk.as_ref());
1054 Next::Continue
1055 })
1056 }),
1057 );
1058
1059 let seen = collector.wait().await.unwrap();
1060 assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1061 }
1062
1063 #[tokio::test]
1064 async fn chunk_collector_accepts_stateful_callback() {
1065 let mut chunk_index = 0;
1066 let collector = spawn_consumer_sync(
1067 "custom",
1068 event_receiver(vec![
1069 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
1070 StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1071 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1072 StreamEvent::Eof,
1073 ])
1074 .await,
1075 CollectChunks::builder()
1076 .sink(Vec::new())
1077 .f(
1078 move |chunk: Chunk, indexed_chunks: &mut Vec<(usize, Vec<u8>)>| {
1079 chunk_index += 1;
1080 indexed_chunks.push((chunk_index, chunk.as_ref().to_vec()));
1081 },
1082 )
1083 .build(),
1084 );
1085
1086 let indexed_chunks = collector.wait().await.unwrap();
1087 assert_that!(indexed_chunks).is_equal_to(vec![
1088 (1, b"ab".to_vec()),
1089 (2, b"cd".to_vec()),
1090 (3, b"ef".to_vec()),
1091 ]);
1092 }
1093
1094 #[tokio::test]
1095 async fn line_collector_accepts_stateful_callback() {
1096 let mut line_index = 0;
1097 let collector = spawn_consumer_sync(
1098 "custom",
1099 event_receiver(vec![
1100 StreamEvent::Chunk(Chunk(Bytes::from_static(b"alpha\nbeta\ngamma\n"))),
1101 StreamEvent::Eof,
1102 ])
1103 .await,
1104 ParseLines::collect(
1105 LineParsingOptions::default(),
1106 Vec::new(),
1107 move |line: Cow<'_, str>, indexed_lines: &mut Vec<String>| {
1108 line_index += 1;
1109 indexed_lines.push(format!("{line_index}:{line}"));
1110 Next::Continue
1111 },
1112 ),
1113 );
1114
1115 let indexed_lines = collector.wait().await.unwrap();
1116 assert_that!(indexed_lines).is_equal_to(vec![
1117 "1:alpha".to_string(),
1118 "2:beta".to_string(),
1119 "3:gamma".to_string(),
1120 ]);
1121 }
1122
1123 #[tokio::test]
1124 async fn line_collector_async_break_stops_after_requested_line() {
1125 let collector = spawn_consumer_async(
1126 "custom",
1127 event_receiver(vec![
1128 StreamEvent::Chunk(Chunk(Bytes::from_static(b"start\nbreak\nend\n"))),
1129 StreamEvent::Eof,
1130 ])
1131 .await,
1132 ParseLines::collect_async(
1133 LineParsingOptions::default(),
1134 Vec::new(),
1135 |line, seen: &mut Vec<String>| {
1136 Box::pin(async move {
1137 let is_break = line == "break";
1138 seen.push(line.into_owned());
1139 if is_break {
1140 Next::Break
1141 } else {
1142 Next::Continue
1143 }
1144 })
1145 },
1146 ),
1147 );
1148
1149 let seen = collector.wait().await.unwrap();
1150 assert_that!(seen).contains_exactly(["start", "break"]);
1151 }
1152}