1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::{AsyncLineSink, LineSink};
5use crate::output_stream::num_bytes::NumBytes;
6use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
7use std::borrow::Cow;
8use std::collections::VecDeque;
9use std::future::Future;
10use std::ops::Deref;
11use typed_builder::TypedBuilder;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum CollectionOverflowBehavior {
16 #[default]
18 DropAdditionalData,
19
20 DropOldestData,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum RawCollectionOptions {
27 Bounded {
29 max_bytes: NumBytes,
31
32 overflow_behavior: CollectionOverflowBehavior,
34 },
35
36 TrustedUnbounded,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum LineCollectionOptions {
45 Bounded {
47 max_bytes: NumBytes,
49
50 max_lines: usize,
52
53 overflow_behavior: CollectionOverflowBehavior,
55 },
56
57 TrustedUnbounded,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct CollectedBytes {
66 pub bytes: Vec<u8>,
68
69 pub truncated: bool,
71}
72
73impl CollectedBytes {
74 #[must_use]
76 pub fn new() -> Self {
77 Self {
78 bytes: Vec::new(),
79 truncated: false,
80 }
81 }
82
83 pub(crate) fn push_chunk(&mut self, chunk: &[u8], options: RawCollectionOptions) {
84 match options {
85 RawCollectionOptions::TrustedUnbounded => self.bytes.extend_from_slice(chunk),
86 RawCollectionOptions::Bounded {
87 max_bytes,
88 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
89 } => {
90 let max_bytes = max_bytes.bytes();
91 let remaining = max_bytes.saturating_sub(self.bytes.len());
92 if chunk.len() > remaining {
93 self.truncated = true;
94 }
95 self.bytes
96 .extend_from_slice(&chunk[..remaining.min(chunk.len())]);
97 }
98 RawCollectionOptions::Bounded {
99 max_bytes,
100 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
101 } => {
102 let max_bytes = max_bytes.bytes();
103 if chunk.len() > max_bytes {
104 self.bytes.clear();
105 self.bytes
106 .extend_from_slice(&chunk[chunk.len().saturating_sub(max_bytes)..]);
107 self.truncated = true;
108 return;
109 }
110
111 let required = self.bytes.len() + chunk.len();
112 if required > max_bytes {
113 self.bytes.drain(0..required - max_bytes);
114 self.truncated = true;
115 }
116 self.bytes.extend_from_slice(chunk);
117 }
118 }
119 }
120}
121
122impl Default for CollectedBytes {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl Deref for CollectedBytes {
129 type Target = [u8];
130
131 fn deref(&self) -> &Self::Target {
132 &self.bytes
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct CollectedLines {
139 lines: VecDeque<String>,
140 truncated: bool,
141 retained_bytes: usize,
142}
143
144impl CollectedLines {
145 #[must_use]
147 pub fn new() -> Self {
148 Self {
149 lines: VecDeque::new(),
150 truncated: false,
151 retained_bytes: 0,
152 }
153 }
154
155 #[must_use]
157 pub fn lines(&self) -> &VecDeque<String> {
158 &self.lines
159 }
160
161 #[must_use]
163 pub fn truncated(&self) -> bool {
164 self.truncated
165 }
166
167 #[must_use]
169 pub fn into_lines(self) -> VecDeque<String> {
170 self.lines
171 }
172
173 #[must_use]
175 pub fn into_parts(self) -> (VecDeque<String>, bool) {
176 (self.lines, self.truncated)
177 }
178
179 pub(crate) fn push_line(&mut self, line: String, options: LineCollectionOptions) {
180 match options {
181 LineCollectionOptions::TrustedUnbounded => self.push_back(line),
182 LineCollectionOptions::Bounded {
183 max_bytes,
184 max_lines,
185 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
186 } => {
187 let line_len = line.len();
188 let max_bytes = max_bytes.bytes();
189 if self.lines.len() >= max_lines
190 || line_len > max_bytes
191 || line_len > max_bytes.saturating_sub(self.retained_bytes)
192 {
193 self.truncated = true;
194 return;
195 }
196 self.push_back(line);
197 }
198 LineCollectionOptions::Bounded {
199 max_bytes,
200 max_lines,
201 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
202 } => {
203 let line_len = line.len();
204 let max_bytes = max_bytes.bytes();
205 if max_lines == 0 {
206 self.truncated = true;
207 return;
208 }
209 if line_len > max_bytes {
210 self.truncated = true;
211 return;
212 }
213
214 while self.lines.len() >= max_lines
215 || line_len > max_bytes.saturating_sub(self.retained_bytes)
216 {
217 self.pop_front()
218 .expect("line buffer to contain an evictable line");
219 self.truncated = true;
220 }
221 self.push_back(line);
222 }
223 }
224 }
225
226 fn push_back(&mut self, line: String) {
227 self.retained_bytes += line.len();
228 self.lines.push_back(line);
229 }
230
231 fn pop_front(&mut self) -> Option<String> {
232 let line = self.lines.pop_front()?;
233 self.retained_bytes -= line.len();
234 Some(line)
235 }
236}
237
238impl Default for CollectedLines {
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244impl Deref for CollectedLines {
245 type Target = VecDeque<String>;
246
247 fn deref(&self) -> &Self::Target {
248 &self.lines
249 }
250}
251
252pub trait AsyncChunkCollector<S: Sink>: Send + 'static {
264 fn collect<'a>(
266 &'a mut self,
267 chunk: Chunk,
268 sink: &'a mut S,
269 ) -> impl Future<Output = Next> + Send + 'a;
270}
271
272pub trait AsyncLineCollector<S: Sink>: Send + 'static {
282 fn collect<'a>(
284 &'a mut self,
285 line: Cow<'a, str>,
286 sink: &'a mut S,
287 ) -> impl Future<Output = Next> + Send + 'a;
288}
289
290#[derive(TypedBuilder)]
291pub(crate) struct CollectChunks<T, F>
292where
293 T: Sink,
294 F: FnMut(Chunk, &mut T) + Send + 'static,
295{
296 pub sink: T,
297 pub f: F,
298}
299
300impl<T, F> StreamVisitor for CollectChunks<T, F>
301where
302 T: Sink,
303 F: FnMut(Chunk, &mut T) + Send + 'static,
304{
305 type Output = T;
306
307 fn on_chunk(&mut self, chunk: Chunk) -> Next {
308 (self.f)(chunk, &mut self.sink);
309 Next::Continue
310 }
311
312 fn into_output(self) -> Self::Output {
313 self.sink
314 }
315}
316
317#[derive(TypedBuilder)]
318pub(crate) struct CollectChunksAsync<T, C>
319where
320 T: Sink,
321 C: AsyncChunkCollector<T>,
322{
323 pub sink: T,
324 pub collector: C,
325}
326
327impl<T, C> AsyncStreamVisitor for CollectChunksAsync<T, C>
328where
329 T: Sink,
330 C: AsyncChunkCollector<T>,
331{
332 type Output = T;
333
334 fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_ {
335 self.collector.collect(chunk, &mut self.sink)
336 }
337
338 fn into_output(self) -> Self::Output {
339 self.sink
340 }
341}
342
343pub struct CollectLineSink<T, F> {
348 sink: T,
349 f: F,
350}
351
352impl<T, F> CollectLineSink<T, F>
353where
354 T: Sink,
355 F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
356{
357 pub fn new(sink: T, f: F) -> Self {
359 Self { sink, f }
360 }
361}
362
363impl<T, F> LineSink for CollectLineSink<T, F>
364where
365 T: Sink,
366 F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
367{
368 type Output = T;
369
370 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
371 (self.f)(line, &mut self.sink)
372 }
373
374 fn into_output(self) -> Self::Output {
375 self.sink
376 }
377}
378
379pub struct CollectLineSinkAsync<T, C> {
384 sink: T,
385 collector: C,
386}
387
388impl<T, C> CollectLineSinkAsync<T, C>
389where
390 T: Sink,
391 C: AsyncLineCollector<T>,
392{
393 pub fn new(sink: T, collector: C) -> Self {
396 Self { sink, collector }
397 }
398}
399
400impl<T, C> AsyncLineSink for CollectLineSinkAsync<T, C>
401where
402 T: Sink,
403 C: AsyncLineCollector<T>,
404{
405 type Output = T;
406
407 fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a {
408 self.collector.collect(line, &mut self.sink)
409 }
410
411 fn into_output(self) -> Self::Output {
412 self.sink
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::ConsumerError;
420 use crate::output_stream::consumer::driver::{spawn_consumer_async, spawn_consumer_sync};
421 use crate::output_stream::event::StreamEvent;
422 use crate::output_stream::event::tests::event_receiver;
423 use crate::output_stream::line::adapter::LineAdapter;
424 use crate::output_stream::line::options::LineParsingOptions;
425 use crate::output_stream::num_bytes::NumBytesExt;
426 use crate::{AsyncChunkCollector, AsyncLineCollector};
427 use assertr::prelude::*;
428 use bytes::Bytes;
429 use std::borrow::Cow;
430 use std::io;
431
432 fn drop_oldest_options(max_bytes: usize, max_lines: usize) -> LineCollectionOptions {
433 LineCollectionOptions::Bounded {
434 max_bytes: max_bytes.bytes(),
435 max_lines,
436 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
437 }
438 }
439
440 fn assert_retained_bytes_match_lines(collected: &CollectedLines) {
441 assert_that!(collected.retained_bytes)
442 .is_equal_to(collected.lines.iter().map(String::len).sum::<usize>());
443 }
444
445 struct ChunkCase {
446 name: &'static str,
447 overflow: CollectionOverflowBehavior,
448 max_bytes: usize,
449 chunks: &'static [&'static [u8]],
450 expected_bytes: &'static [u8],
451 expected_truncated: bool,
452 }
453
454 const CHUNK_BOUNDARY_CASES: &[ChunkCase] = &[
455 ChunkCase {
456 name: "drop_additional/empty_chunk_is_no_op",
457 overflow: CollectionOverflowBehavior::DropAdditionalData,
458 max_bytes: 5,
459 chunks: &[b""],
460 expected_bytes: b"",
461 expected_truncated: false,
462 },
463 ChunkCase {
464 name: "drop_additional/single_chunk_exactly_fills_buffer",
465 overflow: CollectionOverflowBehavior::DropAdditionalData,
466 max_bytes: 5,
467 chunks: &[b"abcde"],
468 expected_bytes: b"abcde",
469 expected_truncated: false,
470 },
471 ChunkCase {
472 name: "drop_additional/single_chunk_overshoots_by_one_byte",
473 overflow: CollectionOverflowBehavior::DropAdditionalData,
474 max_bytes: 5,
475 chunks: &[b"abcdef"],
476 expected_bytes: b"abcde",
477 expected_truncated: true,
478 },
479 ChunkCase {
480 name: "drop_additional/second_chunk_straddles_limit",
481 overflow: CollectionOverflowBehavior::DropAdditionalData,
482 max_bytes: 5,
483 chunks: &[b"abc", b"def"],
484 expected_bytes: b"abcde",
485 expected_truncated: true,
486 },
487 ChunkCase {
488 name: "drop_additional/first_chunk_exactly_fills_then_second_chunk_rejected",
489 overflow: CollectionOverflowBehavior::DropAdditionalData,
490 max_bytes: 5,
491 chunks: &[b"abcde", b"f"],
492 expected_bytes: b"abcde",
493 expected_truncated: true,
494 },
495 ChunkCase {
496 name: "drop_oldest/empty_chunk_is_no_op",
497 overflow: CollectionOverflowBehavior::DropOldestData,
498 max_bytes: 5,
499 chunks: &[b""],
500 expected_bytes: b"",
501 expected_truncated: false,
502 },
503 ChunkCase {
504 name: "drop_oldest/single_chunk_exactly_fills_buffer",
505 overflow: CollectionOverflowBehavior::DropOldestData,
506 max_bytes: 5,
507 chunks: &[b"abcde"],
508 expected_bytes: b"abcde",
509 expected_truncated: false,
510 },
511 ChunkCase {
512 name: "drop_oldest/single_chunk_overshoots_by_one_byte_into_empty",
513 overflow: CollectionOverflowBehavior::DropOldestData,
514 max_bytes: 5,
515 chunks: &[b"abcdef"],
516 expected_bytes: b"bcdef",
517 expected_truncated: true,
518 },
519 ChunkCase {
520 name: "drop_oldest/second_chunk_straddles_limit_evicts_front",
521 overflow: CollectionOverflowBehavior::DropOldestData,
522 max_bytes: 5,
523 chunks: &[b"abc", b"def"],
524 expected_bytes: b"bcdef",
525 expected_truncated: true,
526 },
527 ChunkCase {
528 name: "drop_oldest/oversized_chunk_into_empty_clears_and_keeps_tail",
529 overflow: CollectionOverflowBehavior::DropOldestData,
530 max_bytes: 5,
531 chunks: &[b"abcdefgh"],
532 expected_bytes: b"defgh",
533 expected_truncated: true,
534 },
535 ChunkCase {
536 name: "drop_oldest/oversized_chunk_into_partial_clears_existing",
537 overflow: CollectionOverflowBehavior::DropOldestData,
538 max_bytes: 5,
539 chunks: &[b"ab", b"cdefgh"],
540 expected_bytes: b"defgh",
541 expected_truncated: true,
542 },
543 ChunkCase {
544 name: "drop_oldest/first_chunk_exactly_fills_then_second_evicts_front",
545 overflow: CollectionOverflowBehavior::DropOldestData,
546 max_bytes: 5,
547 chunks: &[b"abcde", b"f"],
548 expected_bytes: b"bcdef",
549 expected_truncated: true,
550 },
551 ];
552
553 #[test]
554 fn push_chunk_boundary_matrix() {
555 for case in CHUNK_BOUNDARY_CASES {
556 let mut collected = CollectedBytes::new();
557 let options = RawCollectionOptions::Bounded {
558 max_bytes: case.max_bytes.bytes(),
559 overflow_behavior: case.overflow,
560 };
561 for chunk in case.chunks {
562 collected.push_chunk(chunk, options);
563 }
564
565 assert_that!(collected.bytes.as_slice())
566 .with_detail_message(format!("case: {}", case.name))
567 .is_equal_to(case.expected_bytes);
568 assert_that!(collected.truncated)
569 .with_detail_message(format!("case: {}", case.name))
570 .is_equal_to(case.expected_truncated);
571 }
572 }
573
574 struct LineCase {
575 name: &'static str,
576 overflow: CollectionOverflowBehavior,
577 max_bytes: usize,
578 max_lines: usize,
579 push: &'static [&'static str],
580 expected_lines: &'static [&'static str],
581 expected_truncated: bool,
582 }
583
584 const LINE_BOUNDARY_CASES: &[LineCase] = &[
585 LineCase {
586 name: "drop_additional/line_exactly_fills_byte_budget_with_slot_left",
587 overflow: CollectionOverflowBehavior::DropAdditionalData,
588 max_bytes: 5,
589 max_lines: 2,
590 push: &["abcde"],
591 expected_lines: &["abcde"],
592 expected_truncated: false,
593 },
594 LineCase {
595 name: "drop_additional/max_lines_reached_before_max_bytes",
596 overflow: CollectionOverflowBehavior::DropAdditionalData,
597 max_bytes: 100,
598 max_lines: 2,
599 push: &["a", "b", "c"],
600 expected_lines: &["a", "b"],
601 expected_truncated: true,
602 },
603 LineCase {
604 name: "drop_additional/max_bytes_reached_before_max_lines",
605 overflow: CollectionOverflowBehavior::DropAdditionalData,
606 max_bytes: 4,
607 max_lines: 10,
608 push: &["aa", "bb", "cc"],
609 expected_lines: &["aa", "bb"],
610 expected_truncated: true,
611 },
612 LineCase {
613 name: "drop_additional/line_equal_to_remaining_budget_accepted",
614 overflow: CollectionOverflowBehavior::DropAdditionalData,
615 max_bytes: 6,
616 max_lines: 10,
617 push: &["abc", "def"],
618 expected_lines: &["abc", "def"],
619 expected_truncated: false,
620 },
621 LineCase {
622 name: "drop_additional/line_one_byte_over_remaining_rejected",
623 overflow: CollectionOverflowBehavior::DropAdditionalData,
624 max_bytes: 6,
625 max_lines: 10,
626 push: &["abc", "defg"],
627 expected_lines: &["abc"],
628 expected_truncated: true,
629 },
630 LineCase {
631 name: "drop_additional/line_strictly_larger_than_max_bytes_rejected",
632 overflow: CollectionOverflowBehavior::DropAdditionalData,
633 max_bytes: 5,
634 max_lines: 10,
635 push: &["abc", "xxxxxxxxx"],
636 expected_lines: &["abc"],
637 expected_truncated: true,
638 },
639 LineCase {
640 name: "drop_oldest/line_exactly_fills_byte_budget_with_slot_left",
641 overflow: CollectionOverflowBehavior::DropOldestData,
642 max_bytes: 5,
643 max_lines: 2,
644 push: &["abcde"],
645 expected_lines: &["abcde"],
646 expected_truncated: false,
647 },
648 LineCase {
649 name: "drop_oldest/max_lines_reached_before_max_bytes_evicts_one",
650 overflow: CollectionOverflowBehavior::DropOldestData,
651 max_bytes: 100,
652 max_lines: 2,
653 push: &["a", "b", "c"],
654 expected_lines: &["b", "c"],
655 expected_truncated: true,
656 },
657 LineCase {
658 name: "drop_oldest/max_bytes_reached_before_max_lines_evicts_one",
659 overflow: CollectionOverflowBehavior::DropOldestData,
660 max_bytes: 6,
661 max_lines: 10,
662 push: &["aaa", "bbb", "ccc"],
663 expected_lines: &["bbb", "ccc"],
664 expected_truncated: true,
665 },
666 LineCase {
667 name: "drop_oldest/incoming_line_requires_evicting_multiple_lines",
668 overflow: CollectionOverflowBehavior::DropOldestData,
669 max_bytes: 8,
670 max_lines: 100,
671 push: &["a", "b", "cc", "dddd", "eeeeee"],
672 expected_lines: &["eeeeee"],
673 expected_truncated: true,
674 },
675 LineCase {
676 name: "drop_oldest/line_strictly_larger_than_max_bytes_rejected",
677 overflow: CollectionOverflowBehavior::DropOldestData,
678 max_bytes: 5,
679 max_lines: 10,
680 push: &["abc", "xxxxxxxxx"],
681 expected_lines: &["abc"],
682 expected_truncated: true,
683 },
684 ];
685
686 #[test]
687 fn push_line_boundary_matrix() {
688 for case in LINE_BOUNDARY_CASES {
689 let mut collected = CollectedLines::new();
690 let options = LineCollectionOptions::Bounded {
691 max_bytes: case.max_bytes.bytes(),
692 max_lines: case.max_lines,
693 overflow_behavior: case.overflow,
694 };
695 for line in case.push {
696 collected.push_line((*line).to_string(), options);
697 }
698
699 let actual_lines: Vec<&str> = collected.lines().iter().map(String::as_str).collect();
700 assert_that!(actual_lines)
701 .with_detail_message(format!("case: {}", case.name))
702 .is_equal_to(case.expected_lines.to_vec());
703 assert_that!(collected.truncated())
704 .with_detail_message(format!("case: {}", case.name))
705 .is_equal_to(case.expected_truncated);
706 assert_that!(collected.retained_bytes)
707 .with_detail_message(format!("case: {} (retained_bytes)", case.name))
708 .is_equal_to(
709 case.expected_lines
710 .iter()
711 .map(|line| line.len())
712 .sum::<usize>(),
713 );
714 }
715 }
716
717 #[test]
718 fn raw_collection_keeps_expected_bytes_when_truncated() {
719 let mut collected = CollectedBytes::new();
720 let options = RawCollectionOptions::Bounded {
721 max_bytes: 5.bytes(),
722 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
723 };
724
725 collected.push_chunk(b"abc", options);
726 collected.push_chunk(b"def", options);
727
728 assert_that!(collected.bytes.as_slice()).is_equal_to(b"abcde".as_slice());
729 assert_that!(collected.truncated).is_true();
730
731 let mut collected = CollectedBytes::new();
732 let options = RawCollectionOptions::Bounded {
733 max_bytes: 5.bytes(),
734 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
735 };
736
737 collected.push_chunk(b"abc", options);
738 collected.push_chunk(b"def", options);
739
740 assert_that!(collected.bytes.as_slice()).is_equal_to(b"bcdef".as_slice());
741 assert_that!(collected.truncated).is_true();
742 }
743
744 #[test]
745 fn basic_line_collection_limit_modes() {
746 let mut collected = CollectedLines::new();
747 let options = LineCollectionOptions::Bounded {
748 max_bytes: 7.bytes(),
749 max_lines: 2,
750 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
751 };
752
753 collected.push_line("one".to_string(), options);
754 collected.push_line("two".to_string(), options);
755 collected.push_line("three".to_string(), options);
756
757 assert_that!(
758 collected
759 .lines()
760 .iter()
761 .map(String::as_str)
762 .collect::<Vec<_>>()
763 )
764 .is_equal_to(vec!["one", "two"]);
765 assert_that!(collected.truncated()).is_true();
766
767 let mut collected = CollectedLines::new();
768 let options = LineCollectionOptions::Bounded {
769 max_bytes: 6.bytes(),
770 max_lines: 2,
771 overflow_behavior: CollectionOverflowBehavior::DropOldestData,
772 };
773
774 collected.push_line("one".to_string(), options);
775 collected.push_line("two".to_string(), options);
776 collected.push_line("six".to_string(), options);
777
778 assert_that!(
779 collected
780 .lines()
781 .iter()
782 .map(String::as_str)
783 .collect::<Vec<_>>()
784 )
785 .is_equal_to(vec!["two", "six"]);
786 assert_that!(collected.truncated()).is_true();
787 }
788
789 #[test]
790 fn retained_bytes_tracks_appended_lines() {
791 let options = LineCollectionOptions::Bounded {
792 max_bytes: 100.bytes(),
793 max_lines: 100,
794 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
795 };
796 let mut collected = CollectedLines::new();
797
798 collected.push_line("aaa".to_string(), options);
799 collected.push_line("bbbb".to_string(), options);
800
801 assert_that!(collected.retained_bytes).is_equal_to(7);
802 assert_retained_bytes_match_lines(&collected);
803 }
804
805 #[test]
806 fn drop_oldest_preserves_retained_lines_when_oversized_line_arrives() {
807 let options = drop_oldest_options(10, 100);
808 let mut collected = CollectedLines::new();
809
810 collected.push_line("aaa".to_string(), options);
811 collected.push_line("bbb".to_string(), options);
812 collected.push_line("x".repeat(13), options);
813
814 assert_that!(collected.lines())
815 .with_detail_message(
816 "previously-retained lines must survive an oversized incoming line",
817 )
818 .is_equal_to(VecDeque::from(["aaa".to_string(), "bbb".to_string()]));
819 assert_that!(collected.retained_bytes).is_equal_to(6);
820 assert_retained_bytes_match_lines(&collected);
821 assert_that!(collected.truncated()).is_true();
822 }
823
824 #[test]
825 fn drop_oldest_evicts_old_lines_when_new_line_fits_but_budget_is_exceeded() {
826 let options = drop_oldest_options(10, 100);
827 let mut collected = CollectedLines::new();
828
829 collected.push_line("aaaa".to_string(), options);
830 collected.push_line("bbbb".to_string(), options);
831 collected.push_line("cccc".to_string(), options);
832
833 assert_that!(collected.lines())
834 .is_equal_to(VecDeque::from(["bbbb".to_string(), "cccc".to_string()]));
835 assert_that!(collected.retained_bytes).is_equal_to(8);
836 assert_retained_bytes_match_lines(&collected);
837 assert_that!(collected.truncated()).is_true();
838 }
839
840 #[test]
841 fn drop_oldest_updates_retained_bytes_when_evicting_by_line_count() {
842 let options = drop_oldest_options(100, 2);
843 let mut collected = CollectedLines::new();
844
845 collected.push_line("a".to_string(), options);
846 collected.push_line("bb".to_string(), options);
847 collected.push_line("ccc".to_string(), options);
848
849 assert_that!(collected.lines())
850 .is_equal_to(VecDeque::from(["bb".to_string(), "ccc".to_string()]));
851 assert_that!(collected.retained_bytes).is_equal_to(5);
852 assert_retained_bytes_match_lines(&collected);
853 assert_that!(collected.truncated()).is_true();
854 }
855
856 #[test]
857 fn drop_oldest_with_zero_max_lines_retains_nothing() {
858 let options = drop_oldest_options(100, 0);
859 let mut collected = CollectedLines::new();
860
861 collected.push_line("aaa".to_string(), options);
862
863 assert_that!(collected.lines().is_empty()).is_true();
864 assert_that!(collected.retained_bytes).is_equal_to(0);
865 assert_retained_bytes_match_lines(&collected);
866 assert_that!(collected.truncated()).is_true();
867 }
868
869 #[test]
870 fn drop_additional_preserves_retained_lines_when_oversized_line_arrives() {
871 let options = LineCollectionOptions::Bounded {
872 max_bytes: 10.bytes(),
873 max_lines: 100,
874 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
875 };
876 let mut collected = CollectedLines::new();
877
878 collected.push_line("aaa".to_string(), options);
879 collected.push_line("x".repeat(13), options);
880
881 assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
882 assert_that!(collected.retained_bytes).is_equal_to(3);
883 assert_retained_bytes_match_lines(&collected);
884 assert_that!(collected.truncated()).is_true();
885 }
886
887 #[test]
888 fn drop_additional_preserves_retained_bytes_when_limit_rejects_line() {
889 let options = LineCollectionOptions::Bounded {
890 max_bytes: 6.bytes(),
891 max_lines: 100,
892 overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
893 };
894 let mut collected = CollectedLines::new();
895
896 collected.push_line("aaa".to_string(), options);
897 collected.push_line("bbbb".to_string(), options);
898
899 assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
900 assert_that!(collected.retained_bytes).is_equal_to(3);
901 assert_retained_bytes_match_lines(&collected);
902 assert_that!(collected.truncated()).is_true();
903 }
904
905 #[tokio::test]
906 async fn collectors_return_stream_read_error() {
907 let error =
908 crate::StreamReadError::new("custom", io::Error::from(io::ErrorKind::BrokenPipe));
909 let collector = spawn_consumer_sync(
910 "custom",
911 event_receiver(vec![
912 StreamEvent::Chunk(Chunk(Bytes::from_static(b"complete\npartial"))),
913 StreamEvent::ReadError(error),
914 ])
915 .await,
916 LineAdapter::new(
917 LineParsingOptions::default(),
918 CollectLineSink::new(Vec::<String>::new(), |line, lines: &mut Vec<String>| {
919 lines.push(line.into_owned());
920 Next::Continue
921 }),
922 ),
923 );
924
925 match collector.wait().await {
926 Err(ConsumerError::StreamRead { source }) => {
927 assert_that!(source.stream_name()).is_equal_to("custom");
928 assert_that!(source.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
929 }
930 other => {
931 assert_that!(&other).fail(format_args!(
932 "expected consumer stream read error, got {other:?}"
933 ));
934 }
935 }
936 }
937
938 #[tokio::test]
939 async fn collectors_skip_gaps_and_keep_final_unterminated_line() {
940 let collector = spawn_consumer_sync(
941 "custom",
942 event_receiver(vec![
943 StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\npar"))),
944 StreamEvent::Gap,
945 StreamEvent::Chunk(Chunk(Bytes::from_static(b"\ntwo\nfinal"))),
946 StreamEvent::Eof,
947 ])
948 .await,
949 LineAdapter::new(
950 LineParsingOptions::default(),
951 CollectLineSink::new(Vec::<String>::new(), |line, lines: &mut Vec<String>| {
952 lines.push(line.into_owned());
953 Next::Continue
954 }),
955 ),
956 );
957
958 let lines = collector.wait().await.unwrap();
959 assert_that!(lines).contains_exactly(["one", "two", "final"]);
960 }
961
962 struct ExtendChunks;
963
964 impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
965 async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
966 seen.extend_from_slice(chunk.as_ref());
967 Next::Continue
968 }
969 }
970
971 #[tokio::test]
972 async fn chunk_collector_async_extends_sink_until_eof() {
973 let collector = spawn_consumer_async(
974 "custom",
975 event_receiver(vec![
976 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
977 StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
978 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
979 StreamEvent::Eof,
980 ])
981 .await,
982 CollectChunksAsync::builder()
983 .sink(Vec::new())
984 .collector(ExtendChunks)
985 .build(),
986 );
987
988 let seen = collector.wait().await.unwrap();
989 assert_that!(seen).is_equal_to(b"abcdef".to_vec());
990 }
991
992 #[tokio::test]
993 async fn chunk_collector_accepts_stateful_callback() {
994 let mut chunk_index = 0;
995 let collector = spawn_consumer_sync(
996 "custom",
997 event_receiver(vec![
998 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
999 StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1000 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1001 StreamEvent::Eof,
1002 ])
1003 .await,
1004 CollectChunks::builder()
1005 .sink(Vec::new())
1006 .f(
1007 move |chunk: Chunk, indexed_chunks: &mut Vec<(usize, Vec<u8>)>| {
1008 chunk_index += 1;
1009 indexed_chunks.push((chunk_index, chunk.as_ref().to_vec()));
1010 },
1011 )
1012 .build(),
1013 );
1014
1015 let indexed_chunks = collector.wait().await.unwrap();
1016 assert_that!(indexed_chunks).is_equal_to(vec![
1017 (1, b"ab".to_vec()),
1018 (2, b"cd".to_vec()),
1019 (3, b"ef".to_vec()),
1020 ]);
1021 }
1022
1023 #[tokio::test]
1024 async fn line_collector_accepts_stateful_callback() {
1025 let mut line_index = 0;
1026 let collector = spawn_consumer_sync(
1027 "custom",
1028 event_receiver(vec![
1029 StreamEvent::Chunk(Chunk(Bytes::from_static(b"alpha\nbeta\ngamma\n"))),
1030 StreamEvent::Eof,
1031 ])
1032 .await,
1033 LineAdapter::new(
1034 LineParsingOptions::default(),
1035 CollectLineSink::new(
1036 Vec::new(),
1037 move |line: Cow<'_, str>, indexed_lines: &mut Vec<String>| {
1038 line_index += 1;
1039 indexed_lines.push(format!("{line_index}:{line}"));
1040 Next::Continue
1041 },
1042 ),
1043 ),
1044 );
1045
1046 let indexed_lines = collector.wait().await.unwrap();
1047 assert_that!(indexed_lines).is_equal_to(vec![
1048 "1:alpha".to_string(),
1049 "2:beta".to_string(),
1050 "3:gamma".to_string(),
1051 ]);
1052 }
1053
1054 struct BreakOnLine;
1055
1056 impl AsyncLineCollector<Vec<String>> for BreakOnLine {
1057 async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
1058 if line == "break" {
1059 seen.push(line.into_owned());
1060 Next::Break
1061 } else {
1062 seen.push(line.into_owned());
1063 Next::Continue
1064 }
1065 }
1066 }
1067
1068 #[tokio::test]
1069 async fn line_collector_async_break_stops_after_requested_line() {
1070 let collector = spawn_consumer_async(
1071 "custom",
1072 event_receiver(vec![
1073 StreamEvent::Chunk(Chunk(Bytes::from_static(b"start\nbreak\nend\n"))),
1074 StreamEvent::Eof,
1075 ])
1076 .await,
1077 LineAdapter::new(
1078 LineParsingOptions::default(),
1079 CollectLineSinkAsync::new(Vec::new(), BreakOnLine),
1080 ),
1081 );
1082
1083 let seen = collector.wait().await.unwrap();
1084 assert_that!(seen).contains_exactly(["start", "break"]);
1085 }
1086}