Skip to main content

tokio_process_tools/output_stream/visitors/
collect.rs

1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::{AsyncLineSink, LineSink};
5use crate::output_stream::num_bytes::NumBytes;
6use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
7use std::borrow::Cow;
8use std::collections::VecDeque;
9use std::future::Future;
10use std::ops::Deref;
11use typed_builder::TypedBuilder;
12
13/// Controls which output is retained once a bounded in-memory collection reaches its limit.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum CollectionOverflowBehavior {
16    /// Keep the first retained output and discard additional output.
17    #[default]
18    DropAdditionalData,
19
20    /// Keep the newest retained output by evicting older retained output.
21    DropOldestData,
22}
23
24/// Options for collecting raw output bytes into memory.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum RawCollectionOptions {
27    /// Retain at most `max_bytes` bytes in memory.
28    Bounded {
29        /// Maximum number of bytes retained in memory.
30        max_bytes: NumBytes,
31
32        /// Which retained bytes to keep when more output is observed.
33        overflow_behavior: CollectionOverflowBehavior,
34    },
35
36    /// Retain all observed bytes in memory without a total output cap.
37    ///
38    /// Use only when the output source and its output volume are trusted.
39    TrustedUnbounded,
40}
41
42/// Options for collecting parsed output lines into memory.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum LineCollectionOptions {
45    /// Retain at most `max_bytes` total line bytes and at most `max_lines` lines in memory.
46    Bounded {
47        /// Maximum total bytes retained across all collected lines.
48        max_bytes: NumBytes,
49
50        /// Maximum number of lines retained in memory.
51        max_lines: usize,
52
53        /// Which retained lines to keep when more output is observed.
54        overflow_behavior: CollectionOverflowBehavior,
55    },
56
57    /// Retain all observed lines in memory without a total output cap.
58    ///
59    /// Use only when the output source and its output volume are trusted.
60    TrustedUnbounded,
61}
62
63/// Raw bytes collected from an output stream.
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct CollectedBytes {
66    /// Retained output bytes.
67    pub bytes: Vec<u8>,
68
69    /// Whether any bytes were discarded because the configured limit was exceeded.
70    pub truncated: bool,
71}
72
73impl CollectedBytes {
74    /// Creates an empty collected byte buffer.
75    #[must_use]
76    pub fn new() -> Self {
77        Self {
78            bytes: Vec::new(),
79            truncated: false,
80        }
81    }
82
83    pub(crate) fn push_chunk(&mut self, chunk: &[u8], options: RawCollectionOptions) {
84        match options {
85            RawCollectionOptions::TrustedUnbounded => self.bytes.extend_from_slice(chunk),
86            RawCollectionOptions::Bounded {
87                max_bytes,
88                overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
89            } => {
90                let max_bytes = max_bytes.bytes();
91                let remaining = max_bytes.saturating_sub(self.bytes.len());
92                if chunk.len() > remaining {
93                    self.truncated = true;
94                }
95                self.bytes
96                    .extend_from_slice(&chunk[..remaining.min(chunk.len())]);
97            }
98            RawCollectionOptions::Bounded {
99                max_bytes,
100                overflow_behavior: CollectionOverflowBehavior::DropOldestData,
101            } => {
102                let max_bytes = max_bytes.bytes();
103                if chunk.len() > max_bytes {
104                    self.bytes.clear();
105                    self.bytes
106                        .extend_from_slice(&chunk[chunk.len().saturating_sub(max_bytes)..]);
107                    self.truncated = true;
108                    return;
109                }
110
111                let required = self.bytes.len() + chunk.len();
112                if required > max_bytes {
113                    self.bytes.drain(0..required - max_bytes);
114                    self.truncated = true;
115                }
116                self.bytes.extend_from_slice(chunk);
117            }
118        }
119    }
120}
121
122impl Default for CollectedBytes {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl Deref for CollectedBytes {
129    type Target = [u8];
130
131    fn deref(&self) -> &Self::Target {
132        &self.bytes
133    }
134}
135
136/// Parsed lines collected from an output stream.
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct CollectedLines {
139    lines: VecDeque<String>,
140    truncated: bool,
141    retained_bytes: usize,
142}
143
144impl CollectedLines {
145    /// Creates an empty collected line buffer.
146    #[must_use]
147    pub fn new() -> Self {
148        Self {
149            lines: VecDeque::new(),
150            truncated: false,
151            retained_bytes: 0,
152        }
153    }
154
155    /// Retained output lines.
156    #[must_use]
157    pub fn lines(&self) -> &VecDeque<String> {
158        &self.lines
159    }
160
161    /// Whether any lines were discarded because the configured limit was exceeded.
162    #[must_use]
163    pub fn truncated(&self) -> bool {
164        self.truncated
165    }
166
167    /// Converts this collection into its retained output lines.
168    #[must_use]
169    pub fn into_lines(self) -> VecDeque<String> {
170        self.lines
171    }
172
173    /// Converts this collection into its retained output lines and truncation flag.
174    #[must_use]
175    pub fn into_parts(self) -> (VecDeque<String>, bool) {
176        (self.lines, self.truncated)
177    }
178
179    pub(crate) fn push_line(&mut self, line: String, options: LineCollectionOptions) {
180        match options {
181            LineCollectionOptions::TrustedUnbounded => self.push_back(line),
182            LineCollectionOptions::Bounded {
183                max_bytes,
184                max_lines,
185                overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
186            } => {
187                let line_len = line.len();
188                let max_bytes = max_bytes.bytes();
189                if self.lines.len() >= max_lines
190                    || line_len > max_bytes
191                    || line_len > max_bytes.saturating_sub(self.retained_bytes)
192                {
193                    self.truncated = true;
194                    return;
195                }
196                self.push_back(line);
197            }
198            LineCollectionOptions::Bounded {
199                max_bytes,
200                max_lines,
201                overflow_behavior: CollectionOverflowBehavior::DropOldestData,
202            } => {
203                let line_len = line.len();
204                let max_bytes = max_bytes.bytes();
205                if max_lines == 0 {
206                    self.truncated = true;
207                    return;
208                }
209                if line_len > max_bytes {
210                    self.truncated = true;
211                    return;
212                }
213
214                while self.lines.len() >= max_lines
215                    || line_len > max_bytes.saturating_sub(self.retained_bytes)
216                {
217                    self.pop_front()
218                        .expect("line buffer to contain an evictable line");
219                    self.truncated = true;
220                }
221                self.push_back(line);
222            }
223        }
224    }
225
226    fn push_back(&mut self, line: String) {
227        self.retained_bytes += line.len();
228        self.lines.push_back(line);
229    }
230
231    fn pop_front(&mut self) -> Option<String> {
232        let line = self.lines.pop_front()?;
233        self.retained_bytes -= line.len();
234        Some(line)
235    }
236}
237
238impl Default for CollectedLines {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244impl Deref for CollectedLines {
245    type Target = VecDeque<String>;
246
247    fn deref(&self) -> &Self::Target {
248        &self.lines
249    }
250}
251
252/// An async collector for raw output chunks.
253///
254/// The collector itself may hold state via `&mut self`, but only the sink `S` is returned from
255/// [`Consumer::wait`](crate::Consumer::wait) or [`Consumer::cancel`](crate::Consumer::cancel).
256///
257/// This trait-based API avoids allocating a boxed future for every collected item while still
258/// letting the returned future borrow `chunk` and `sink` across `.await`.
259///
260/// This uses a trait rather than `std::ops::AsyncFn` because stable Rust can express the lending
261/// async callback shape, but cannot yet express the `Send` bound required on an `AsyncFn`
262/// callback's returned future for use inside `tokio::spawn`.
263pub trait AsyncChunkCollector<S: Sink>: Send + 'static {
264    /// Collect a single chunk into `sink`.
265    fn collect<'a>(
266        &'a mut self,
267        chunk: Chunk,
268        sink: &'a mut S,
269    ) -> impl Future<Output = Next> + Send + 'a;
270}
271
272/// An async collector for parsed output lines.
273///
274/// The collector itself may hold state via `&mut self`, but only the sink `S` is returned from
275/// [`Consumer::wait`](crate::Consumer::wait) or [`Consumer::cancel`](crate::Consumer::cancel).
276///
277/// This uses a trait rather than `std::ops::AsyncFn` because stable Rust can express the lending
278/// async callback shape, but cannot yet express the `Send` bound required on an `AsyncFn`
279/// callback's returned future for use inside `tokio::spawn`. Once that bound is expressible on
280/// stable Rust, this API can move back toward async-closure ergonomics.
281pub trait AsyncLineCollector<S: Sink>: Send + 'static {
282    /// Collect a single parsed line into `sink`.
283    fn collect<'a>(
284        &'a mut self,
285        line: Cow<'a, str>,
286        sink: &'a mut S,
287    ) -> impl Future<Output = Next> + Send + 'a;
288}
289
290#[derive(TypedBuilder)]
291pub(crate) struct CollectChunks<T, F>
292where
293    T: Sink,
294    F: FnMut(Chunk, &mut T) + Send + 'static,
295{
296    pub sink: T,
297    pub f: F,
298}
299
300impl<T, F> StreamVisitor for CollectChunks<T, F>
301where
302    T: Sink,
303    F: FnMut(Chunk, &mut T) + Send + 'static,
304{
305    type Output = T;
306
307    fn on_chunk(&mut self, chunk: Chunk) -> Next {
308        (self.f)(chunk, &mut self.sink);
309        Next::Continue
310    }
311
312    fn into_output(self) -> Self::Output {
313        self.sink
314    }
315}
316
317#[derive(TypedBuilder)]
318pub(crate) struct CollectChunksAsync<T, C>
319where
320    T: Sink,
321    C: AsyncChunkCollector<T>,
322{
323    pub sink: T,
324    pub collector: C,
325}
326
327impl<T, C> AsyncStreamVisitor for CollectChunksAsync<T, C>
328where
329    T: Sink,
330    C: AsyncChunkCollector<T>,
331{
332    type Output = T;
333
334    fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_ {
335        self.collector.collect(chunk, &mut self.sink)
336    }
337
338    fn into_output(self) -> Self::Output {
339        self.sink
340    }
341}
342
343/// [`LineSink`] holding the user closure and a sink; `on_line` calls the closure with the
344/// line and a `&mut` borrow of the sink. Compose with
345/// [`LineAdapter`](crate::output_stream::line::adapter::LineAdapter) to drive `collect_lines`, or to
346/// build your own custom collect-lines consumer outside the built-in factory methods.
347pub struct CollectLineSink<T, F> {
348    sink: T,
349    f: F,
350}
351
352impl<T, F> CollectLineSink<T, F>
353where
354    T: Sink,
355    F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
356{
357    /// Creates a new sink that calls `f` with each parsed line and a `&mut` borrow of `sink`.
358    pub fn new(sink: T, f: F) -> Self {
359        Self { sink, f }
360    }
361}
362
363impl<T, F> LineSink for CollectLineSink<T, F>
364where
365    T: Sink,
366    F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
367{
368    type Output = T;
369
370    fn on_line(&mut self, line: Cow<'_, str>) -> Next {
371        (self.f)(line, &mut self.sink)
372    }
373
374    fn into_output(self) -> Self::Output {
375        self.sink
376    }
377}
378
379/// [`AsyncLineSink`] holding the user collector and a sink. Compose with
380/// [`LineAdapter`](crate::output_stream::line::adapter::LineAdapter) (its [`AsyncStreamVisitor`] impl
381/// is selected automatically when the inner sink is an [`AsyncLineSink`]) to drive
382/// `collect_lines_async`.
383pub struct CollectLineSinkAsync<T, C> {
384    sink: T,
385    collector: C,
386}
387
388impl<T, C> CollectLineSinkAsync<T, C>
389where
390    T: Sink,
391    C: AsyncLineCollector<T>,
392{
393    /// Creates a new sink that awaits `collector` with each parsed line and a `&mut` borrow of
394    /// `sink`.
395    pub fn new(sink: T, collector: C) -> Self {
396        Self { sink, collector }
397    }
398}
399
400impl<T, C> AsyncLineSink for CollectLineSinkAsync<T, C>
401where
402    T: Sink,
403    C: AsyncLineCollector<T>,
404{
405    type Output = T;
406
407    fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a {
408        self.collector.collect(line, &mut self.sink)
409    }
410
411    fn into_output(self) -> Self::Output {
412        self.sink
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::ConsumerError;
420    use crate::output_stream::consumer::driver::{spawn_consumer_async, spawn_consumer_sync};
421    use crate::output_stream::event::StreamEvent;
422    use crate::output_stream::event::tests::event_receiver;
423    use crate::output_stream::line::adapter::LineAdapter;
424    use crate::output_stream::line::options::LineParsingOptions;
425    use crate::output_stream::num_bytes::NumBytesExt;
426    use crate::{AsyncChunkCollector, AsyncLineCollector};
427    use assertr::prelude::*;
428    use bytes::Bytes;
429    use std::borrow::Cow;
430    use std::io;
431
432    fn drop_oldest_options(max_bytes: usize, max_lines: usize) -> LineCollectionOptions {
433        LineCollectionOptions::Bounded {
434            max_bytes: max_bytes.bytes(),
435            max_lines,
436            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
437        }
438    }
439
440    fn assert_retained_bytes_match_lines(collected: &CollectedLines) {
441        assert_that!(collected.retained_bytes)
442            .is_equal_to(collected.lines.iter().map(String::len).sum::<usize>());
443    }
444
445    struct ChunkCase {
446        name: &'static str,
447        overflow: CollectionOverflowBehavior,
448        max_bytes: usize,
449        chunks: &'static [&'static [u8]],
450        expected_bytes: &'static [u8],
451        expected_truncated: bool,
452    }
453
454    const CHUNK_BOUNDARY_CASES: &[ChunkCase] = &[
455        ChunkCase {
456            name: "drop_additional/empty_chunk_is_no_op",
457            overflow: CollectionOverflowBehavior::DropAdditionalData,
458            max_bytes: 5,
459            chunks: &[b""],
460            expected_bytes: b"",
461            expected_truncated: false,
462        },
463        ChunkCase {
464            name: "drop_additional/single_chunk_exactly_fills_buffer",
465            overflow: CollectionOverflowBehavior::DropAdditionalData,
466            max_bytes: 5,
467            chunks: &[b"abcde"],
468            expected_bytes: b"abcde",
469            expected_truncated: false,
470        },
471        ChunkCase {
472            name: "drop_additional/single_chunk_overshoots_by_one_byte",
473            overflow: CollectionOverflowBehavior::DropAdditionalData,
474            max_bytes: 5,
475            chunks: &[b"abcdef"],
476            expected_bytes: b"abcde",
477            expected_truncated: true,
478        },
479        ChunkCase {
480            name: "drop_additional/second_chunk_straddles_limit",
481            overflow: CollectionOverflowBehavior::DropAdditionalData,
482            max_bytes: 5,
483            chunks: &[b"abc", b"def"],
484            expected_bytes: b"abcde",
485            expected_truncated: true,
486        },
487        ChunkCase {
488            name: "drop_additional/first_chunk_exactly_fills_then_second_chunk_rejected",
489            overflow: CollectionOverflowBehavior::DropAdditionalData,
490            max_bytes: 5,
491            chunks: &[b"abcde", b"f"],
492            expected_bytes: b"abcde",
493            expected_truncated: true,
494        },
495        ChunkCase {
496            name: "drop_oldest/empty_chunk_is_no_op",
497            overflow: CollectionOverflowBehavior::DropOldestData,
498            max_bytes: 5,
499            chunks: &[b""],
500            expected_bytes: b"",
501            expected_truncated: false,
502        },
503        ChunkCase {
504            name: "drop_oldest/single_chunk_exactly_fills_buffer",
505            overflow: CollectionOverflowBehavior::DropOldestData,
506            max_bytes: 5,
507            chunks: &[b"abcde"],
508            expected_bytes: b"abcde",
509            expected_truncated: false,
510        },
511        ChunkCase {
512            name: "drop_oldest/single_chunk_overshoots_by_one_byte_into_empty",
513            overflow: CollectionOverflowBehavior::DropOldestData,
514            max_bytes: 5,
515            chunks: &[b"abcdef"],
516            expected_bytes: b"bcdef",
517            expected_truncated: true,
518        },
519        ChunkCase {
520            name: "drop_oldest/second_chunk_straddles_limit_evicts_front",
521            overflow: CollectionOverflowBehavior::DropOldestData,
522            max_bytes: 5,
523            chunks: &[b"abc", b"def"],
524            expected_bytes: b"bcdef",
525            expected_truncated: true,
526        },
527        ChunkCase {
528            name: "drop_oldest/oversized_chunk_into_empty_clears_and_keeps_tail",
529            overflow: CollectionOverflowBehavior::DropOldestData,
530            max_bytes: 5,
531            chunks: &[b"abcdefgh"],
532            expected_bytes: b"defgh",
533            expected_truncated: true,
534        },
535        ChunkCase {
536            name: "drop_oldest/oversized_chunk_into_partial_clears_existing",
537            overflow: CollectionOverflowBehavior::DropOldestData,
538            max_bytes: 5,
539            chunks: &[b"ab", b"cdefgh"],
540            expected_bytes: b"defgh",
541            expected_truncated: true,
542        },
543        ChunkCase {
544            name: "drop_oldest/first_chunk_exactly_fills_then_second_evicts_front",
545            overflow: CollectionOverflowBehavior::DropOldestData,
546            max_bytes: 5,
547            chunks: &[b"abcde", b"f"],
548            expected_bytes: b"bcdef",
549            expected_truncated: true,
550        },
551    ];
552
553    #[test]
554    fn push_chunk_boundary_matrix() {
555        for case in CHUNK_BOUNDARY_CASES {
556            let mut collected = CollectedBytes::new();
557            let options = RawCollectionOptions::Bounded {
558                max_bytes: case.max_bytes.bytes(),
559                overflow_behavior: case.overflow,
560            };
561            for chunk in case.chunks {
562                collected.push_chunk(chunk, options);
563            }
564
565            assert_that!(collected.bytes.as_slice())
566                .with_detail_message(format!("case: {}", case.name))
567                .is_equal_to(case.expected_bytes);
568            assert_that!(collected.truncated)
569                .with_detail_message(format!("case: {}", case.name))
570                .is_equal_to(case.expected_truncated);
571        }
572    }
573
574    struct LineCase {
575        name: &'static str,
576        overflow: CollectionOverflowBehavior,
577        max_bytes: usize,
578        max_lines: usize,
579        push: &'static [&'static str],
580        expected_lines: &'static [&'static str],
581        expected_truncated: bool,
582    }
583
584    const LINE_BOUNDARY_CASES: &[LineCase] = &[
585        LineCase {
586            name: "drop_additional/line_exactly_fills_byte_budget_with_slot_left",
587            overflow: CollectionOverflowBehavior::DropAdditionalData,
588            max_bytes: 5,
589            max_lines: 2,
590            push: &["abcde"],
591            expected_lines: &["abcde"],
592            expected_truncated: false,
593        },
594        LineCase {
595            name: "drop_additional/max_lines_reached_before_max_bytes",
596            overflow: CollectionOverflowBehavior::DropAdditionalData,
597            max_bytes: 100,
598            max_lines: 2,
599            push: &["a", "b", "c"],
600            expected_lines: &["a", "b"],
601            expected_truncated: true,
602        },
603        LineCase {
604            name: "drop_additional/max_bytes_reached_before_max_lines",
605            overflow: CollectionOverflowBehavior::DropAdditionalData,
606            max_bytes: 4,
607            max_lines: 10,
608            push: &["aa", "bb", "cc"],
609            expected_lines: &["aa", "bb"],
610            expected_truncated: true,
611        },
612        LineCase {
613            name: "drop_additional/line_equal_to_remaining_budget_accepted",
614            overflow: CollectionOverflowBehavior::DropAdditionalData,
615            max_bytes: 6,
616            max_lines: 10,
617            push: &["abc", "def"],
618            expected_lines: &["abc", "def"],
619            expected_truncated: false,
620        },
621        LineCase {
622            name: "drop_additional/line_one_byte_over_remaining_rejected",
623            overflow: CollectionOverflowBehavior::DropAdditionalData,
624            max_bytes: 6,
625            max_lines: 10,
626            push: &["abc", "defg"],
627            expected_lines: &["abc"],
628            expected_truncated: true,
629        },
630        LineCase {
631            name: "drop_additional/line_strictly_larger_than_max_bytes_rejected",
632            overflow: CollectionOverflowBehavior::DropAdditionalData,
633            max_bytes: 5,
634            max_lines: 10,
635            push: &["abc", "xxxxxxxxx"],
636            expected_lines: &["abc"],
637            expected_truncated: true,
638        },
639        LineCase {
640            name: "drop_oldest/line_exactly_fills_byte_budget_with_slot_left",
641            overflow: CollectionOverflowBehavior::DropOldestData,
642            max_bytes: 5,
643            max_lines: 2,
644            push: &["abcde"],
645            expected_lines: &["abcde"],
646            expected_truncated: false,
647        },
648        LineCase {
649            name: "drop_oldest/max_lines_reached_before_max_bytes_evicts_one",
650            overflow: CollectionOverflowBehavior::DropOldestData,
651            max_bytes: 100,
652            max_lines: 2,
653            push: &["a", "b", "c"],
654            expected_lines: &["b", "c"],
655            expected_truncated: true,
656        },
657        LineCase {
658            name: "drop_oldest/max_bytes_reached_before_max_lines_evicts_one",
659            overflow: CollectionOverflowBehavior::DropOldestData,
660            max_bytes: 6,
661            max_lines: 10,
662            push: &["aaa", "bbb", "ccc"],
663            expected_lines: &["bbb", "ccc"],
664            expected_truncated: true,
665        },
666        LineCase {
667            name: "drop_oldest/incoming_line_requires_evicting_multiple_lines",
668            overflow: CollectionOverflowBehavior::DropOldestData,
669            max_bytes: 8,
670            max_lines: 100,
671            push: &["a", "b", "cc", "dddd", "eeeeee"],
672            expected_lines: &["eeeeee"],
673            expected_truncated: true,
674        },
675        LineCase {
676            name: "drop_oldest/line_strictly_larger_than_max_bytes_rejected",
677            overflow: CollectionOverflowBehavior::DropOldestData,
678            max_bytes: 5,
679            max_lines: 10,
680            push: &["abc", "xxxxxxxxx"],
681            expected_lines: &["abc"],
682            expected_truncated: true,
683        },
684    ];
685
686    #[test]
687    fn push_line_boundary_matrix() {
688        for case in LINE_BOUNDARY_CASES {
689            let mut collected = CollectedLines::new();
690            let options = LineCollectionOptions::Bounded {
691                max_bytes: case.max_bytes.bytes(),
692                max_lines: case.max_lines,
693                overflow_behavior: case.overflow,
694            };
695            for line in case.push {
696                collected.push_line((*line).to_string(), options);
697            }
698
699            let actual_lines: Vec<&str> = collected.lines().iter().map(String::as_str).collect();
700            assert_that!(actual_lines)
701                .with_detail_message(format!("case: {}", case.name))
702                .is_equal_to(case.expected_lines.to_vec());
703            assert_that!(collected.truncated())
704                .with_detail_message(format!("case: {}", case.name))
705                .is_equal_to(case.expected_truncated);
706            assert_that!(collected.retained_bytes)
707                .with_detail_message(format!("case: {} (retained_bytes)", case.name))
708                .is_equal_to(
709                    case.expected_lines
710                        .iter()
711                        .map(|line| line.len())
712                        .sum::<usize>(),
713                );
714        }
715    }
716
717    #[test]
718    fn raw_collection_keeps_expected_bytes_when_truncated() {
719        let mut collected = CollectedBytes::new();
720        let options = RawCollectionOptions::Bounded {
721            max_bytes: 5.bytes(),
722            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
723        };
724
725        collected.push_chunk(b"abc", options);
726        collected.push_chunk(b"def", options);
727
728        assert_that!(collected.bytes.as_slice()).is_equal_to(b"abcde".as_slice());
729        assert_that!(collected.truncated).is_true();
730
731        let mut collected = CollectedBytes::new();
732        let options = RawCollectionOptions::Bounded {
733            max_bytes: 5.bytes(),
734            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
735        };
736
737        collected.push_chunk(b"abc", options);
738        collected.push_chunk(b"def", options);
739
740        assert_that!(collected.bytes.as_slice()).is_equal_to(b"bcdef".as_slice());
741        assert_that!(collected.truncated).is_true();
742    }
743
744    #[test]
745    fn basic_line_collection_limit_modes() {
746        let mut collected = CollectedLines::new();
747        let options = LineCollectionOptions::Bounded {
748            max_bytes: 7.bytes(),
749            max_lines: 2,
750            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
751        };
752
753        collected.push_line("one".to_string(), options);
754        collected.push_line("two".to_string(), options);
755        collected.push_line("three".to_string(), options);
756
757        assert_that!(
758            collected
759                .lines()
760                .iter()
761                .map(String::as_str)
762                .collect::<Vec<_>>()
763        )
764        .is_equal_to(vec!["one", "two"]);
765        assert_that!(collected.truncated()).is_true();
766
767        let mut collected = CollectedLines::new();
768        let options = LineCollectionOptions::Bounded {
769            max_bytes: 6.bytes(),
770            max_lines: 2,
771            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
772        };
773
774        collected.push_line("one".to_string(), options);
775        collected.push_line("two".to_string(), options);
776        collected.push_line("six".to_string(), options);
777
778        assert_that!(
779            collected
780                .lines()
781                .iter()
782                .map(String::as_str)
783                .collect::<Vec<_>>()
784        )
785        .is_equal_to(vec!["two", "six"]);
786        assert_that!(collected.truncated()).is_true();
787    }
788
789    #[test]
790    fn retained_bytes_tracks_appended_lines() {
791        let options = LineCollectionOptions::Bounded {
792            max_bytes: 100.bytes(),
793            max_lines: 100,
794            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
795        };
796        let mut collected = CollectedLines::new();
797
798        collected.push_line("aaa".to_string(), options);
799        collected.push_line("bbbb".to_string(), options);
800
801        assert_that!(collected.retained_bytes).is_equal_to(7);
802        assert_retained_bytes_match_lines(&collected);
803    }
804
805    #[test]
806    fn drop_oldest_preserves_retained_lines_when_oversized_line_arrives() {
807        let options = drop_oldest_options(10, 100);
808        let mut collected = CollectedLines::new();
809
810        collected.push_line("aaa".to_string(), options);
811        collected.push_line("bbb".to_string(), options);
812        collected.push_line("x".repeat(13), options);
813
814        assert_that!(collected.lines())
815            .with_detail_message(
816                "previously-retained lines must survive an oversized incoming line",
817            )
818            .is_equal_to(VecDeque::from(["aaa".to_string(), "bbb".to_string()]));
819        assert_that!(collected.retained_bytes).is_equal_to(6);
820        assert_retained_bytes_match_lines(&collected);
821        assert_that!(collected.truncated()).is_true();
822    }
823
824    #[test]
825    fn drop_oldest_evicts_old_lines_when_new_line_fits_but_budget_is_exceeded() {
826        let options = drop_oldest_options(10, 100);
827        let mut collected = CollectedLines::new();
828
829        collected.push_line("aaaa".to_string(), options);
830        collected.push_line("bbbb".to_string(), options);
831        collected.push_line("cccc".to_string(), options);
832
833        assert_that!(collected.lines())
834            .is_equal_to(VecDeque::from(["bbbb".to_string(), "cccc".to_string()]));
835        assert_that!(collected.retained_bytes).is_equal_to(8);
836        assert_retained_bytes_match_lines(&collected);
837        assert_that!(collected.truncated()).is_true();
838    }
839
840    #[test]
841    fn drop_oldest_updates_retained_bytes_when_evicting_by_line_count() {
842        let options = drop_oldest_options(100, 2);
843        let mut collected = CollectedLines::new();
844
845        collected.push_line("a".to_string(), options);
846        collected.push_line("bb".to_string(), options);
847        collected.push_line("ccc".to_string(), options);
848
849        assert_that!(collected.lines())
850            .is_equal_to(VecDeque::from(["bb".to_string(), "ccc".to_string()]));
851        assert_that!(collected.retained_bytes).is_equal_to(5);
852        assert_retained_bytes_match_lines(&collected);
853        assert_that!(collected.truncated()).is_true();
854    }
855
856    #[test]
857    fn drop_oldest_with_zero_max_lines_retains_nothing() {
858        let options = drop_oldest_options(100, 0);
859        let mut collected = CollectedLines::new();
860
861        collected.push_line("aaa".to_string(), options);
862
863        assert_that!(collected.lines().is_empty()).is_true();
864        assert_that!(collected.retained_bytes).is_equal_to(0);
865        assert_retained_bytes_match_lines(&collected);
866        assert_that!(collected.truncated()).is_true();
867    }
868
869    #[test]
870    fn drop_additional_preserves_retained_lines_when_oversized_line_arrives() {
871        let options = LineCollectionOptions::Bounded {
872            max_bytes: 10.bytes(),
873            max_lines: 100,
874            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
875        };
876        let mut collected = CollectedLines::new();
877
878        collected.push_line("aaa".to_string(), options);
879        collected.push_line("x".repeat(13), options);
880
881        assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
882        assert_that!(collected.retained_bytes).is_equal_to(3);
883        assert_retained_bytes_match_lines(&collected);
884        assert_that!(collected.truncated()).is_true();
885    }
886
887    #[test]
888    fn drop_additional_preserves_retained_bytes_when_limit_rejects_line() {
889        let options = LineCollectionOptions::Bounded {
890            max_bytes: 6.bytes(),
891            max_lines: 100,
892            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
893        };
894        let mut collected = CollectedLines::new();
895
896        collected.push_line("aaa".to_string(), options);
897        collected.push_line("bbbb".to_string(), options);
898
899        assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
900        assert_that!(collected.retained_bytes).is_equal_to(3);
901        assert_retained_bytes_match_lines(&collected);
902        assert_that!(collected.truncated()).is_true();
903    }
904
905    #[tokio::test]
906    async fn collectors_return_stream_read_error() {
907        let error =
908            crate::StreamReadError::new("custom", io::Error::from(io::ErrorKind::BrokenPipe));
909        let collector = spawn_consumer_sync(
910            "custom",
911            event_receiver(vec![
912                StreamEvent::Chunk(Chunk(Bytes::from_static(b"complete\npartial"))),
913                StreamEvent::ReadError(error),
914            ])
915            .await,
916            LineAdapter::new(
917                LineParsingOptions::default(),
918                CollectLineSink::new(Vec::<String>::new(), |line, lines: &mut Vec<String>| {
919                    lines.push(line.into_owned());
920                    Next::Continue
921                }),
922            ),
923        );
924
925        match collector.wait().await {
926            Err(ConsumerError::StreamRead { source }) => {
927                assert_that!(source.stream_name()).is_equal_to("custom");
928                assert_that!(source.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
929            }
930            other => {
931                assert_that!(&other).fail(format_args!(
932                    "expected consumer stream read error, got {other:?}"
933                ));
934            }
935        }
936    }
937
938    #[tokio::test]
939    async fn collectors_skip_gaps_and_keep_final_unterminated_line() {
940        let collector = spawn_consumer_sync(
941            "custom",
942            event_receiver(vec![
943                StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\npar"))),
944                StreamEvent::Gap,
945                StreamEvent::Chunk(Chunk(Bytes::from_static(b"\ntwo\nfinal"))),
946                StreamEvent::Eof,
947            ])
948            .await,
949            LineAdapter::new(
950                LineParsingOptions::default(),
951                CollectLineSink::new(Vec::<String>::new(), |line, lines: &mut Vec<String>| {
952                    lines.push(line.into_owned());
953                    Next::Continue
954                }),
955            ),
956        );
957
958        let lines = collector.wait().await.unwrap();
959        assert_that!(lines).contains_exactly(["one", "two", "final"]);
960    }
961
962    struct ExtendChunks;
963
964    impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
965        async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
966            seen.extend_from_slice(chunk.as_ref());
967            Next::Continue
968        }
969    }
970
971    #[tokio::test]
972    async fn chunk_collector_async_extends_sink_until_eof() {
973        let collector = spawn_consumer_async(
974            "custom",
975            event_receiver(vec![
976                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
977                StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
978                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
979                StreamEvent::Eof,
980            ])
981            .await,
982            CollectChunksAsync::builder()
983                .sink(Vec::new())
984                .collector(ExtendChunks)
985                .build(),
986        );
987
988        let seen = collector.wait().await.unwrap();
989        assert_that!(seen).is_equal_to(b"abcdef".to_vec());
990    }
991
992    #[tokio::test]
993    async fn chunk_collector_accepts_stateful_callback() {
994        let mut chunk_index = 0;
995        let collector = spawn_consumer_sync(
996            "custom",
997            event_receiver(vec![
998                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
999                StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1000                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1001                StreamEvent::Eof,
1002            ])
1003            .await,
1004            CollectChunks::builder()
1005                .sink(Vec::new())
1006                .f(
1007                    move |chunk: Chunk, indexed_chunks: &mut Vec<(usize, Vec<u8>)>| {
1008                        chunk_index += 1;
1009                        indexed_chunks.push((chunk_index, chunk.as_ref().to_vec()));
1010                    },
1011                )
1012                .build(),
1013        );
1014
1015        let indexed_chunks = collector.wait().await.unwrap();
1016        assert_that!(indexed_chunks).is_equal_to(vec![
1017            (1, b"ab".to_vec()),
1018            (2, b"cd".to_vec()),
1019            (3, b"ef".to_vec()),
1020        ]);
1021    }
1022
1023    #[tokio::test]
1024    async fn line_collector_accepts_stateful_callback() {
1025        let mut line_index = 0;
1026        let collector = spawn_consumer_sync(
1027            "custom",
1028            event_receiver(vec![
1029                StreamEvent::Chunk(Chunk(Bytes::from_static(b"alpha\nbeta\ngamma\n"))),
1030                StreamEvent::Eof,
1031            ])
1032            .await,
1033            LineAdapter::new(
1034                LineParsingOptions::default(),
1035                CollectLineSink::new(
1036                    Vec::new(),
1037                    move |line: Cow<'_, str>, indexed_lines: &mut Vec<String>| {
1038                        line_index += 1;
1039                        indexed_lines.push(format!("{line_index}:{line}"));
1040                        Next::Continue
1041                    },
1042                ),
1043            ),
1044        );
1045
1046        let indexed_lines = collector.wait().await.unwrap();
1047        assert_that!(indexed_lines).is_equal_to(vec![
1048            "1:alpha".to_string(),
1049            "2:beta".to_string(),
1050            "3:gamma".to_string(),
1051        ]);
1052    }
1053
1054    struct BreakOnLine;
1055
1056    impl AsyncLineCollector<Vec<String>> for BreakOnLine {
1057        async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
1058            if line == "break" {
1059                seen.push(line.into_owned());
1060                Next::Break
1061            } else {
1062                seen.push(line.into_owned());
1063                Next::Continue
1064            }
1065        }
1066    }
1067
1068    #[tokio::test]
1069    async fn line_collector_async_break_stops_after_requested_line() {
1070        let collector = spawn_consumer_async(
1071            "custom",
1072            event_receiver(vec![
1073                StreamEvent::Chunk(Chunk(Bytes::from_static(b"start\nbreak\nend\n"))),
1074                StreamEvent::Eof,
1075            ])
1076            .await,
1077            LineAdapter::new(
1078                LineParsingOptions::default(),
1079                CollectLineSinkAsync::new(Vec::new(), BreakOnLine),
1080            ),
1081        );
1082
1083        let seen = collector.wait().await.unwrap();
1084        assert_that!(seen).contains_exactly(["start", "break"]);
1085    }
1086}