Skip to main content

tokio_process_tools/output_stream/visitors/
collect.rs

1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::{AsyncLineVisitor, LineVisitor};
5use crate::output_stream::num_bytes::NumBytes;
6use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
7use std::borrow::Cow;
8use std::collections::VecDeque;
9use std::future::Future;
10use std::ops::Deref;
11use std::pin::Pin;
12use typed_builder::TypedBuilder;
13
14/// Controls which output is retained once a bounded in-memory collection reaches its limit.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16pub enum CollectionOverflowBehavior {
17    /// Keep the first retained output and discard additional output.
18    #[default]
19    DropAdditionalData,
20
21    /// Keep the newest retained output by evicting older retained output.
22    DropOldestData,
23}
24
25/// Options for collecting raw output bytes into memory.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum RawCollectionOptions {
28    /// Retain at most `max_bytes` bytes in memory.
29    Bounded {
30        /// Maximum number of bytes retained in memory.
31        max_bytes: NumBytes,
32
33        /// Which retained bytes to keep when more output is observed.
34        overflow_behavior: CollectionOverflowBehavior,
35    },
36
37    /// Retain all observed bytes in memory without a total output cap.
38    ///
39    /// Use only when the output source and its output volume are trusted.
40    TrustedUnbounded,
41}
42
43/// Options for collecting parsed output lines into memory.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum LineCollectionOptions {
46    /// Retain at most `max_bytes` total line bytes and at most `max_lines` lines in memory.
47    Bounded {
48        /// Maximum total bytes retained across all collected lines.
49        max_bytes: NumBytes,
50
51        /// Maximum number of lines retained in memory.
52        max_lines: usize,
53
54        /// Which retained lines to keep when more output is observed.
55        overflow_behavior: CollectionOverflowBehavior,
56    },
57
58    /// Retain all observed lines in memory without a total output cap.
59    ///
60    /// Use only when the output source and its output volume are trusted.
61    TrustedUnbounded,
62}
63
64/// Raw bytes collected from an output stream.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct CollectedBytes {
67    /// Retained output bytes.
68    pub bytes: Vec<u8>,
69
70    /// Whether any bytes were discarded because the configured limit was exceeded.
71    pub truncated: bool,
72}
73
74impl CollectedBytes {
75    /// Creates an empty collected byte buffer.
76    #[must_use]
77    pub fn new() -> Self {
78        Self {
79            bytes: Vec::new(),
80            truncated: false,
81        }
82    }
83
84    /// Returns a closure suitable as the `f` field of a [`CollectChunks`] visitor that appends
85    /// each observed chunk into a [`CollectedBytes`] sink under `options`. Mirrors the
86    /// closure that the removed `collect_chunks_into_vec` factory used to inline.
87    #[must_use = "the returned closure must be wired into a CollectChunks visitor; dropping it discards the collection logic"]
88    pub fn collector(
89        options: RawCollectionOptions,
90    ) -> impl FnMut(Chunk, &mut Self) + Send + 'static {
91        move |chunk: Chunk, sink: &mut Self| {
92            sink.push_chunk(chunk.as_ref(), options);
93        }
94    }
95
96    /// Appends `chunk` into this buffer, honoring `options` (truncation / overflow behavior).
97    pub fn push_chunk(&mut self, chunk: &[u8], options: RawCollectionOptions) {
98        match options {
99            RawCollectionOptions::TrustedUnbounded => self.bytes.extend_from_slice(chunk),
100            RawCollectionOptions::Bounded {
101                max_bytes,
102                overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
103            } => {
104                let max_bytes = max_bytes.bytes();
105                let remaining = max_bytes.saturating_sub(self.bytes.len());
106                if chunk.len() > remaining {
107                    self.truncated = true;
108                }
109                self.bytes
110                    .extend_from_slice(&chunk[..remaining.min(chunk.len())]);
111            }
112            RawCollectionOptions::Bounded {
113                max_bytes,
114                overflow_behavior: CollectionOverflowBehavior::DropOldestData,
115            } => {
116                let max_bytes = max_bytes.bytes();
117                if chunk.len() > max_bytes {
118                    self.bytes.clear();
119                    self.bytes
120                        .extend_from_slice(&chunk[chunk.len().saturating_sub(max_bytes)..]);
121                    self.truncated = true;
122                    return;
123                }
124
125                let required = self.bytes.len() + chunk.len();
126                if required > max_bytes {
127                    self.bytes.drain(0..required - max_bytes);
128                    self.truncated = true;
129                }
130                self.bytes.extend_from_slice(chunk);
131            }
132        }
133    }
134}
135
136impl Default for CollectedBytes {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl Deref for CollectedBytes {
143    type Target = [u8];
144
145    fn deref(&self) -> &Self::Target {
146        &self.bytes
147    }
148}
149
150/// Parsed lines collected from an output stream.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct CollectedLines {
153    lines: VecDeque<String>,
154    truncated: bool,
155    retained_bytes: usize,
156}
157
158impl CollectedLines {
159    /// Creates an empty collected line buffer.
160    #[must_use]
161    pub fn new() -> Self {
162        Self {
163            lines: VecDeque::new(),
164            truncated: false,
165            retained_bytes: 0,
166        }
167    }
168
169    /// Retained output lines.
170    #[must_use]
171    pub fn lines(&self) -> &VecDeque<String> {
172        &self.lines
173    }
174
175    /// Whether any lines were discarded because the configured limit was exceeded.
176    #[must_use]
177    pub fn truncated(&self) -> bool {
178        self.truncated
179    }
180
181    /// Converts this collection into its retained output lines.
182    #[must_use]
183    pub fn into_lines(self) -> VecDeque<String> {
184        self.lines
185    }
186
187    /// Converts this collection into its retained output lines and truncation flag.
188    #[must_use]
189    pub fn into_parts(self) -> (VecDeque<String>, bool) {
190        (self.lines, self.truncated)
191    }
192
193    /// Returns a closure suitable as the `f` field of a [`CollectLines`] (composed inside a
194    /// [`ParseLines`](crate::output_stream::line::adapter::ParseLines)) that appends each
195    /// parsed line into a [`CollectedLines`] sink under `options`. Mirrors the closure that
196    /// the removed `collect_lines_into_vec` factory used to inline.
197    #[must_use = "the returned closure must be wired into a CollectLines visitor; dropping it discards the collection logic"]
198    pub fn line_collector(
199        options: LineCollectionOptions,
200    ) -> impl FnMut(Cow<'_, str>, &mut Self) -> Next + Send + 'static {
201        move |line: Cow<'_, str>, sink: &mut Self| {
202            sink.push_line(line.into_owned(), options);
203            Next::Continue
204        }
205    }
206
207    /// Appends `line` into this buffer, honoring `options` (truncation / overflow behavior).
208    ///
209    /// # Panics
210    ///
211    /// Panics if an internal invariant is violated while evicting older lines under
212    /// [`CollectionOverflowBehavior::DropOldestData`]; the eviction loop only runs while the
213    /// buffer is non-empty, so this is unreachable in correct use.
214    pub fn push_line(&mut self, line: String, options: LineCollectionOptions) {
215        match options {
216            LineCollectionOptions::TrustedUnbounded => self.push_back(line),
217            LineCollectionOptions::Bounded {
218                max_bytes,
219                max_lines,
220                overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
221            } => {
222                let line_len = line.len();
223                let max_bytes = max_bytes.bytes();
224                if self.lines.len() >= max_lines
225                    || line_len > max_bytes
226                    || line_len > max_bytes.saturating_sub(self.retained_bytes)
227                {
228                    self.truncated = true;
229                    return;
230                }
231                self.push_back(line);
232            }
233            LineCollectionOptions::Bounded {
234                max_bytes,
235                max_lines,
236                overflow_behavior: CollectionOverflowBehavior::DropOldestData,
237            } => {
238                let line_len = line.len();
239                let max_bytes = max_bytes.bytes();
240                if max_lines == 0 {
241                    self.truncated = true;
242                    return;
243                }
244                if line_len > max_bytes {
245                    self.truncated = true;
246                    return;
247                }
248
249                while self.lines.len() >= max_lines
250                    || line_len > max_bytes.saturating_sub(self.retained_bytes)
251                {
252                    self.pop_front()
253                        .expect("line buffer to contain an evictable line");
254                    self.truncated = true;
255                }
256                self.push_back(line);
257            }
258        }
259    }
260
261    fn push_back(&mut self, line: String) {
262        self.retained_bytes += line.len();
263        self.lines.push_back(line);
264    }
265
266    fn pop_front(&mut self) -> Option<String> {
267        let line = self.lines.pop_front()?;
268        self.retained_bytes -= line.len();
269        Some(line)
270    }
271}
272
273impl Default for CollectedLines {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl Deref for CollectedLines {
280    type Target = VecDeque<String>;
281
282    fn deref(&self) -> &Self::Target {
283        &self.lines
284    }
285}
286
287/// Synchronous [`StreamVisitor`] that observes each chunk and lets `f` decide what to do with
288/// the sink `T`. The sink is yielded back to the caller via
289/// [`StreamVisitor::into_output`] when the consumer terminates, allowing patterns like
290/// "collect into a `Vec<u8>` and return it on completion."
291///
292/// Construct via [`builder`](Self::builder) and pass to
293/// [`Consumable::consume`](crate::Consumable::consume). For the common
294/// "collect-into-`CollectedBytes`" case, see [`CollectedBytes::collector`].
295#[derive(TypedBuilder)]
296pub struct CollectChunks<T, F>
297where
298    T: Sink,
299    F: FnMut(Chunk, &mut T) + Send + 'static,
300{
301    /// Sink to collect into.
302    pub sink: T,
303    /// Per-chunk collector closure.
304    pub f: F,
305}
306
307impl<T, F> CollectChunks<T, F>
308where
309    T: Sink,
310    F: FnMut(Chunk, &mut T) + Send + 'static,
311{
312    /// Fold-style constructor: `sink` is the initial accumulator and `f` folds each observed
313    /// chunk into it. Equivalent to
314    /// `CollectChunks::builder().sink(sink).f(f).build()`, but reads at the call site like a
315    /// `fold(initial, step)`.
316    #[must_use]
317    pub fn fold(sink: T, f: F) -> Self {
318        Self { sink, f }
319    }
320}
321
322impl<T, F> StreamVisitor for CollectChunks<T, F>
323where
324    T: Sink,
325    F: FnMut(Chunk, &mut T) + Send + 'static,
326{
327    type Output = T;
328
329    fn on_chunk(&mut self, chunk: Chunk) -> Next {
330        (self.f)(chunk, &mut self.sink);
331        Next::Continue
332    }
333
334    fn into_output(self) -> Self::Output {
335        self.sink
336    }
337}
338
339/// Asynchronous counterpart to [`CollectChunks`]. The per-chunk hook is a closure that returns a
340/// boxed future, allowing the future to borrow `chunk` and `sink` across `.await` while remaining
341/// `Send` for `tokio::spawn`. Construct via [`fold`](Self::fold) and pass to
342/// [`Consumable::consume_async`](crate::Consumable::consume_async).
343///
344/// The boxed future incurs one allocation per observed chunk; for hot paths that cannot afford
345/// that, implement [`AsyncStreamVisitor`] directly on a struct that owns the sink.
346///
347/// # Example
348///
349/// ```rust, no_run
350/// # use tokio_process_tools::{Chunk, Next};
351/// # use tokio_process_tools::visitors::collect::CollectChunksAsync;
352/// let visitor = CollectChunksAsync::fold(Vec::<u8>::new(), |chunk, sink| {
353///     Box::pin(async move {
354///         sink.extend_from_slice(chunk.as_ref());
355///         Next::Continue
356///     })
357/// });
358/// ```
359pub struct CollectChunksAsync<T, F>
360where
361    T: Sink,
362    F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
363        + Send
364        + 'static,
365{
366    sink: T,
367    f: F,
368}
369
370impl<T, F> CollectChunksAsync<T, F>
371where
372    T: Sink,
373    F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
374        + Send
375        + 'static,
376{
377    /// Fold-style constructor: `sink` is the initial accumulator and `f` folds each observed
378    /// chunk into it asynchronously. The closure must wrap its async body in
379    /// `Box::pin(async move { ... })` so the returned future can borrow `chunk` and `sink`
380    /// while remaining `Send`.
381    #[must_use]
382    pub fn fold(sink: T, f: F) -> Self {
383        Self { sink, f }
384    }
385}
386
387impl<T, F> AsyncStreamVisitor for CollectChunksAsync<T, F>
388where
389    T: Sink,
390    F: for<'a> FnMut(Chunk, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
391        + Send
392        + 'static,
393{
394    type Output = T;
395
396    fn on_chunk(&mut self, chunk: Chunk) -> impl Future<Output = Next> + Send + '_ {
397        (self.f)(chunk, &mut self.sink)
398    }
399
400    fn into_output(self) -> Self::Output {
401        self.sink
402    }
403}
404
405/// [`LineVisitor`] holding the user closure and a sink; `on_line` calls the closure with the
406/// line and a `&mut` borrow of the sink. Compose with
407/// [`ParseLines`](crate::output_stream::line::adapter::ParseLines) (most easily via
408/// [`ParseLines::collect`](crate::output_stream::line::adapter::ParseLines::collect)) to
409/// drive a line-aware collecting consumer.
410pub struct CollectLines<T, F> {
411    sink: T,
412    f: F,
413}
414
415impl<T, F> CollectLines<T, F>
416where
417    T: Sink,
418    F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
419{
420    /// Creates a new sink that calls `f` with each parsed line and a `&mut` borrow of `sink`.
421    pub fn new(sink: T, f: F) -> Self {
422        Self { sink, f }
423    }
424}
425
426impl<T, F> LineVisitor for CollectLines<T, F>
427where
428    T: Sink,
429    F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
430{
431    type Output = T;
432
433    fn on_line(&mut self, line: Cow<'_, str>) -> Next {
434        (self.f)(line, &mut self.sink)
435    }
436
437    fn into_output(self) -> Self::Output {
438        self.sink
439    }
440}
441
442/// [`AsyncLineVisitor`] holding a closure that takes each parsed line and a `&mut` borrow of the
443/// sink, returning a boxed future. Compose with
444/// [`ParseLines`](crate::output_stream::line::adapter::ParseLines) (most easily via
445/// [`ParseLines::collect_async`](crate::output_stream::line::adapter::ParseLines::collect_async))
446/// to build an async line-aware collecting consumer.
447///
448/// The boxed future incurs one allocation per observed line; for hot paths that cannot afford
449/// that, implement [`AsyncLineVisitor`] directly on a struct that owns the sink.
450pub struct CollectLinesAsync<T, F>
451where
452    T: Sink,
453    F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
454        + Send
455        + 'static,
456{
457    sink: T,
458    f: F,
459}
460
461impl<T, F> CollectLinesAsync<T, F>
462where
463    T: Sink,
464    F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
465        + Send
466        + 'static,
467{
468    /// Creates a new visitor that awaits `f` for each parsed line with a `&mut` borrow of `sink`.
469    /// The closure must wrap its async body in `Box::pin(async move { ... })`.
470    pub fn new(sink: T, f: F) -> Self {
471        Self { sink, f }
472    }
473}
474
475impl<T, F> AsyncLineVisitor for CollectLinesAsync<T, F>
476where
477    T: Sink,
478    F: for<'a> FnMut(Cow<'a, str>, &'a mut T) -> Pin<Box<dyn Future<Output = Next> + Send + 'a>>
479        + Send
480        + 'static,
481{
482    type Output = T;
483
484    fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a {
485        (self.f)(line, &mut self.sink)
486    }
487
488    fn into_output(self) -> Self::Output {
489        self.sink
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::ConsumerError;
497    use crate::output_stream::consumer::driver::{spawn_consumer_async, spawn_consumer_sync};
498    use crate::output_stream::event::StreamEvent;
499    use crate::output_stream::event::tests::event_receiver;
500    use crate::output_stream::line::adapter::ParseLines;
501    use crate::output_stream::line::options::LineParsingOptions;
502    use crate::output_stream::num_bytes::NumBytesExt;
503    use assertr::prelude::*;
504    use bytes::Bytes;
505    use std::borrow::Cow;
506    use std::io;
507
508    fn drop_oldest_options(max_bytes: usize, max_lines: usize) -> LineCollectionOptions {
509        LineCollectionOptions::Bounded {
510            max_bytes: max_bytes.bytes(),
511            max_lines,
512            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
513        }
514    }
515
516    fn assert_retained_bytes_match_lines(collected: &CollectedLines) {
517        assert_that!(collected.retained_bytes)
518            .is_equal_to(collected.lines.iter().map(String::len).sum::<usize>());
519    }
520
521    struct ChunkCase {
522        name: &'static str,
523        overflow: CollectionOverflowBehavior,
524        max_bytes: usize,
525        chunks: &'static [&'static [u8]],
526        expected_bytes: &'static [u8],
527        expected_truncated: bool,
528    }
529
530    const CHUNK_BOUNDARY_CASES: &[ChunkCase] = &[
531        ChunkCase {
532            name: "drop_additional/empty_chunk_is_no_op",
533            overflow: CollectionOverflowBehavior::DropAdditionalData,
534            max_bytes: 5,
535            chunks: &[b""],
536            expected_bytes: b"",
537            expected_truncated: false,
538        },
539        ChunkCase {
540            name: "drop_additional/single_chunk_exactly_fills_buffer",
541            overflow: CollectionOverflowBehavior::DropAdditionalData,
542            max_bytes: 5,
543            chunks: &[b"abcde"],
544            expected_bytes: b"abcde",
545            expected_truncated: false,
546        },
547        ChunkCase {
548            name: "drop_additional/single_chunk_overshoots_by_one_byte",
549            overflow: CollectionOverflowBehavior::DropAdditionalData,
550            max_bytes: 5,
551            chunks: &[b"abcdef"],
552            expected_bytes: b"abcde",
553            expected_truncated: true,
554        },
555        ChunkCase {
556            name: "drop_additional/second_chunk_straddles_limit",
557            overflow: CollectionOverflowBehavior::DropAdditionalData,
558            max_bytes: 5,
559            chunks: &[b"abc", b"def"],
560            expected_bytes: b"abcde",
561            expected_truncated: true,
562        },
563        ChunkCase {
564            name: "drop_additional/first_chunk_exactly_fills_then_second_chunk_rejected",
565            overflow: CollectionOverflowBehavior::DropAdditionalData,
566            max_bytes: 5,
567            chunks: &[b"abcde", b"f"],
568            expected_bytes: b"abcde",
569            expected_truncated: true,
570        },
571        ChunkCase {
572            name: "drop_oldest/empty_chunk_is_no_op",
573            overflow: CollectionOverflowBehavior::DropOldestData,
574            max_bytes: 5,
575            chunks: &[b""],
576            expected_bytes: b"",
577            expected_truncated: false,
578        },
579        ChunkCase {
580            name: "drop_oldest/single_chunk_exactly_fills_buffer",
581            overflow: CollectionOverflowBehavior::DropOldestData,
582            max_bytes: 5,
583            chunks: &[b"abcde"],
584            expected_bytes: b"abcde",
585            expected_truncated: false,
586        },
587        ChunkCase {
588            name: "drop_oldest/single_chunk_overshoots_by_one_byte_into_empty",
589            overflow: CollectionOverflowBehavior::DropOldestData,
590            max_bytes: 5,
591            chunks: &[b"abcdef"],
592            expected_bytes: b"bcdef",
593            expected_truncated: true,
594        },
595        ChunkCase {
596            name: "drop_oldest/second_chunk_straddles_limit_evicts_front",
597            overflow: CollectionOverflowBehavior::DropOldestData,
598            max_bytes: 5,
599            chunks: &[b"abc", b"def"],
600            expected_bytes: b"bcdef",
601            expected_truncated: true,
602        },
603        ChunkCase {
604            name: "drop_oldest/oversized_chunk_into_empty_clears_and_keeps_tail",
605            overflow: CollectionOverflowBehavior::DropOldestData,
606            max_bytes: 5,
607            chunks: &[b"abcdefgh"],
608            expected_bytes: b"defgh",
609            expected_truncated: true,
610        },
611        ChunkCase {
612            name: "drop_oldest/oversized_chunk_into_partial_clears_existing",
613            overflow: CollectionOverflowBehavior::DropOldestData,
614            max_bytes: 5,
615            chunks: &[b"ab", b"cdefgh"],
616            expected_bytes: b"defgh",
617            expected_truncated: true,
618        },
619        ChunkCase {
620            name: "drop_oldest/first_chunk_exactly_fills_then_second_evicts_front",
621            overflow: CollectionOverflowBehavior::DropOldestData,
622            max_bytes: 5,
623            chunks: &[b"abcde", b"f"],
624            expected_bytes: b"bcdef",
625            expected_truncated: true,
626        },
627    ];
628
629    #[test]
630    fn push_chunk_boundary_matrix() {
631        for case in CHUNK_BOUNDARY_CASES {
632            let mut collected = CollectedBytes::new();
633            let options = RawCollectionOptions::Bounded {
634                max_bytes: case.max_bytes.bytes(),
635                overflow_behavior: case.overflow,
636            };
637            for chunk in case.chunks {
638                collected.push_chunk(chunk, options);
639            }
640
641            assert_that!(collected.bytes.as_slice())
642                .with_detail_message(format!("case: {}", case.name))
643                .is_equal_to(case.expected_bytes);
644            assert_that!(collected.truncated)
645                .with_detail_message(format!("case: {}", case.name))
646                .is_equal_to(case.expected_truncated);
647        }
648    }
649
650    struct LineCase {
651        name: &'static str,
652        overflow: CollectionOverflowBehavior,
653        max_bytes: usize,
654        max_lines: usize,
655        push: &'static [&'static str],
656        expected_lines: &'static [&'static str],
657        expected_truncated: bool,
658    }
659
660    const LINE_BOUNDARY_CASES: &[LineCase] = &[
661        LineCase {
662            name: "drop_additional/line_exactly_fills_byte_budget_with_slot_left",
663            overflow: CollectionOverflowBehavior::DropAdditionalData,
664            max_bytes: 5,
665            max_lines: 2,
666            push: &["abcde"],
667            expected_lines: &["abcde"],
668            expected_truncated: false,
669        },
670        LineCase {
671            name: "drop_additional/max_lines_reached_before_max_bytes",
672            overflow: CollectionOverflowBehavior::DropAdditionalData,
673            max_bytes: 100,
674            max_lines: 2,
675            push: &["a", "b", "c"],
676            expected_lines: &["a", "b"],
677            expected_truncated: true,
678        },
679        LineCase {
680            name: "drop_additional/max_bytes_reached_before_max_lines",
681            overflow: CollectionOverflowBehavior::DropAdditionalData,
682            max_bytes: 4,
683            max_lines: 10,
684            push: &["aa", "bb", "cc"],
685            expected_lines: &["aa", "bb"],
686            expected_truncated: true,
687        },
688        LineCase {
689            name: "drop_additional/line_equal_to_remaining_budget_accepted",
690            overflow: CollectionOverflowBehavior::DropAdditionalData,
691            max_bytes: 6,
692            max_lines: 10,
693            push: &["abc", "def"],
694            expected_lines: &["abc", "def"],
695            expected_truncated: false,
696        },
697        LineCase {
698            name: "drop_additional/line_one_byte_over_remaining_rejected",
699            overflow: CollectionOverflowBehavior::DropAdditionalData,
700            max_bytes: 6,
701            max_lines: 10,
702            push: &["abc", "defg"],
703            expected_lines: &["abc"],
704            expected_truncated: true,
705        },
706        LineCase {
707            name: "drop_additional/line_strictly_larger_than_max_bytes_rejected",
708            overflow: CollectionOverflowBehavior::DropAdditionalData,
709            max_bytes: 5,
710            max_lines: 10,
711            push: &["abc", "xxxxxxxxx"],
712            expected_lines: &["abc"],
713            expected_truncated: true,
714        },
715        LineCase {
716            name: "drop_oldest/line_exactly_fills_byte_budget_with_slot_left",
717            overflow: CollectionOverflowBehavior::DropOldestData,
718            max_bytes: 5,
719            max_lines: 2,
720            push: &["abcde"],
721            expected_lines: &["abcde"],
722            expected_truncated: false,
723        },
724        LineCase {
725            name: "drop_oldest/max_lines_reached_before_max_bytes_evicts_one",
726            overflow: CollectionOverflowBehavior::DropOldestData,
727            max_bytes: 100,
728            max_lines: 2,
729            push: &["a", "b", "c"],
730            expected_lines: &["b", "c"],
731            expected_truncated: true,
732        },
733        LineCase {
734            name: "drop_oldest/max_bytes_reached_before_max_lines_evicts_one",
735            overflow: CollectionOverflowBehavior::DropOldestData,
736            max_bytes: 6,
737            max_lines: 10,
738            push: &["aaa", "bbb", "ccc"],
739            expected_lines: &["bbb", "ccc"],
740            expected_truncated: true,
741        },
742        LineCase {
743            name: "drop_oldest/incoming_line_requires_evicting_multiple_lines",
744            overflow: CollectionOverflowBehavior::DropOldestData,
745            max_bytes: 8,
746            max_lines: 100,
747            push: &["a", "b", "cc", "dddd", "eeeeee"],
748            expected_lines: &["eeeeee"],
749            expected_truncated: true,
750        },
751        LineCase {
752            name: "drop_oldest/line_strictly_larger_than_max_bytes_rejected",
753            overflow: CollectionOverflowBehavior::DropOldestData,
754            max_bytes: 5,
755            max_lines: 10,
756            push: &["abc", "xxxxxxxxx"],
757            expected_lines: &["abc"],
758            expected_truncated: true,
759        },
760    ];
761
762    #[test]
763    fn push_line_boundary_matrix() {
764        for case in LINE_BOUNDARY_CASES {
765            let mut collected = CollectedLines::new();
766            let options = LineCollectionOptions::Bounded {
767                max_bytes: case.max_bytes.bytes(),
768                max_lines: case.max_lines,
769                overflow_behavior: case.overflow,
770            };
771            for line in case.push {
772                collected.push_line((*line).to_string(), options);
773            }
774
775            let actual_lines: Vec<&str> = collected.lines().iter().map(String::as_str).collect();
776            assert_that!(actual_lines)
777                .with_detail_message(format!("case: {}", case.name))
778                .is_equal_to(case.expected_lines.to_vec());
779            assert_that!(collected.truncated())
780                .with_detail_message(format!("case: {}", case.name))
781                .is_equal_to(case.expected_truncated);
782            assert_that!(collected.retained_bytes)
783                .with_detail_message(format!("case: {} (retained_bytes)", case.name))
784                .is_equal_to(
785                    case.expected_lines
786                        .iter()
787                        .map(|line| line.len())
788                        .sum::<usize>(),
789                );
790        }
791    }
792
793    #[test]
794    fn raw_collection_keeps_expected_bytes_when_truncated() {
795        let mut collected = CollectedBytes::new();
796        let options = RawCollectionOptions::Bounded {
797            max_bytes: 5.bytes(),
798            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
799        };
800
801        collected.push_chunk(b"abc", options);
802        collected.push_chunk(b"def", options);
803
804        assert_that!(collected.bytes.as_slice()).is_equal_to(b"abcde".as_slice());
805        assert_that!(collected.truncated).is_true();
806
807        let mut collected = CollectedBytes::new();
808        let options = RawCollectionOptions::Bounded {
809            max_bytes: 5.bytes(),
810            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
811        };
812
813        collected.push_chunk(b"abc", options);
814        collected.push_chunk(b"def", options);
815
816        assert_that!(collected.bytes.as_slice()).is_equal_to(b"bcdef".as_slice());
817        assert_that!(collected.truncated).is_true();
818    }
819
820    #[test]
821    fn basic_line_collection_limit_modes() {
822        let mut collected = CollectedLines::new();
823        let options = LineCollectionOptions::Bounded {
824            max_bytes: 7.bytes(),
825            max_lines: 2,
826            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
827        };
828
829        collected.push_line("one".to_string(), options);
830        collected.push_line("two".to_string(), options);
831        collected.push_line("three".to_string(), options);
832
833        assert_that!(
834            collected
835                .lines()
836                .iter()
837                .map(String::as_str)
838                .collect::<Vec<_>>()
839        )
840        .is_equal_to(vec!["one", "two"]);
841        assert_that!(collected.truncated()).is_true();
842
843        let mut collected = CollectedLines::new();
844        let options = LineCollectionOptions::Bounded {
845            max_bytes: 6.bytes(),
846            max_lines: 2,
847            overflow_behavior: CollectionOverflowBehavior::DropOldestData,
848        };
849
850        collected.push_line("one".to_string(), options);
851        collected.push_line("two".to_string(), options);
852        collected.push_line("six".to_string(), options);
853
854        assert_that!(
855            collected
856                .lines()
857                .iter()
858                .map(String::as_str)
859                .collect::<Vec<_>>()
860        )
861        .is_equal_to(vec!["two", "six"]);
862        assert_that!(collected.truncated()).is_true();
863    }
864
865    #[test]
866    fn retained_bytes_tracks_appended_lines() {
867        let options = LineCollectionOptions::Bounded {
868            max_bytes: 100.bytes(),
869            max_lines: 100,
870            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
871        };
872        let mut collected = CollectedLines::new();
873
874        collected.push_line("aaa".to_string(), options);
875        collected.push_line("bbbb".to_string(), options);
876
877        assert_that!(collected.retained_bytes).is_equal_to(7);
878        assert_retained_bytes_match_lines(&collected);
879    }
880
881    #[test]
882    fn drop_oldest_preserves_retained_lines_when_oversized_line_arrives() {
883        let options = drop_oldest_options(10, 100);
884        let mut collected = CollectedLines::new();
885
886        collected.push_line("aaa".to_string(), options);
887        collected.push_line("bbb".to_string(), options);
888        collected.push_line("x".repeat(13), options);
889
890        assert_that!(collected.lines())
891            .with_detail_message(
892                "previously-retained lines must survive an oversized incoming line",
893            )
894            .is_equal_to(VecDeque::from(["aaa".to_string(), "bbb".to_string()]));
895        assert_that!(collected.retained_bytes).is_equal_to(6);
896        assert_retained_bytes_match_lines(&collected);
897        assert_that!(collected.truncated()).is_true();
898    }
899
900    #[test]
901    fn drop_oldest_evicts_old_lines_when_new_line_fits_but_budget_is_exceeded() {
902        let options = drop_oldest_options(10, 100);
903        let mut collected = CollectedLines::new();
904
905        collected.push_line("aaaa".to_string(), options);
906        collected.push_line("bbbb".to_string(), options);
907        collected.push_line("cccc".to_string(), options);
908
909        assert_that!(collected.lines())
910            .is_equal_to(VecDeque::from(["bbbb".to_string(), "cccc".to_string()]));
911        assert_that!(collected.retained_bytes).is_equal_to(8);
912        assert_retained_bytes_match_lines(&collected);
913        assert_that!(collected.truncated()).is_true();
914    }
915
916    #[test]
917    fn drop_oldest_updates_retained_bytes_when_evicting_by_line_count() {
918        let options = drop_oldest_options(100, 2);
919        let mut collected = CollectedLines::new();
920
921        collected.push_line("a".to_string(), options);
922        collected.push_line("bb".to_string(), options);
923        collected.push_line("ccc".to_string(), options);
924
925        assert_that!(collected.lines())
926            .is_equal_to(VecDeque::from(["bb".to_string(), "ccc".to_string()]));
927        assert_that!(collected.retained_bytes).is_equal_to(5);
928        assert_retained_bytes_match_lines(&collected);
929        assert_that!(collected.truncated()).is_true();
930    }
931
932    #[test]
933    fn drop_oldest_with_zero_max_lines_retains_nothing() {
934        let options = drop_oldest_options(100, 0);
935        let mut collected = CollectedLines::new();
936
937        collected.push_line("aaa".to_string(), options);
938
939        assert_that!(collected.lines().is_empty()).is_true();
940        assert_that!(collected.retained_bytes).is_equal_to(0);
941        assert_retained_bytes_match_lines(&collected);
942        assert_that!(collected.truncated()).is_true();
943    }
944
945    #[test]
946    fn drop_additional_preserves_retained_lines_when_oversized_line_arrives() {
947        let options = LineCollectionOptions::Bounded {
948            max_bytes: 10.bytes(),
949            max_lines: 100,
950            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
951        };
952        let mut collected = CollectedLines::new();
953
954        collected.push_line("aaa".to_string(), options);
955        collected.push_line("x".repeat(13), options);
956
957        assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
958        assert_that!(collected.retained_bytes).is_equal_to(3);
959        assert_retained_bytes_match_lines(&collected);
960        assert_that!(collected.truncated()).is_true();
961    }
962
963    #[test]
964    fn drop_additional_preserves_retained_bytes_when_limit_rejects_line() {
965        let options = LineCollectionOptions::Bounded {
966            max_bytes: 6.bytes(),
967            max_lines: 100,
968            overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
969        };
970        let mut collected = CollectedLines::new();
971
972        collected.push_line("aaa".to_string(), options);
973        collected.push_line("bbbb".to_string(), options);
974
975        assert_that!(collected.lines()).is_equal_to(VecDeque::from(["aaa".to_string()]));
976        assert_that!(collected.retained_bytes).is_equal_to(3);
977        assert_retained_bytes_match_lines(&collected);
978        assert_that!(collected.truncated()).is_true();
979    }
980
981    #[tokio::test]
982    async fn collectors_return_stream_read_error() {
983        let error =
984            crate::StreamReadError::new("custom", io::Error::from(io::ErrorKind::BrokenPipe));
985        let collector = spawn_consumer_sync(
986            "custom",
987            event_receiver(vec![
988                StreamEvent::Chunk(Chunk(Bytes::from_static(b"complete\npartial"))),
989                StreamEvent::ReadError(error),
990            ])
991            .await,
992            ParseLines::collect(
993                LineParsingOptions::default(),
994                Vec::<String>::new(),
995                |line, lines: &mut Vec<String>| {
996                    lines.push(line.into_owned());
997                    Next::Continue
998                },
999            ),
1000        );
1001
1002        match collector.wait().await {
1003            Err(ConsumerError::StreamRead { source }) => {
1004                assert_that!(source.stream_name()).is_equal_to("custom");
1005                assert_that!(source.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
1006            }
1007            other => {
1008                assert_that!(&other).fail(format_args!(
1009                    "expected consumer stream read error, got {other:?}"
1010                ));
1011            }
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn collectors_skip_gaps_and_keep_final_unterminated_line() {
1017        let collector = spawn_consumer_sync(
1018            "custom",
1019            event_receiver(vec![
1020                StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\npar"))),
1021                StreamEvent::Gap,
1022                StreamEvent::Chunk(Chunk(Bytes::from_static(b"\ntwo\nfinal"))),
1023                StreamEvent::Eof,
1024            ])
1025            .await,
1026            ParseLines::collect(
1027                LineParsingOptions::default(),
1028                Vec::<String>::new(),
1029                |line, lines| {
1030                    lines.push(line.into_owned());
1031                    Next::Continue
1032                },
1033            ),
1034        );
1035
1036        let lines = collector.wait().await.unwrap();
1037        assert_that!(lines).contains_exactly(["one", "two", "final"]);
1038    }
1039
1040    #[tokio::test]
1041    async fn chunk_collector_async_extends_sink_until_eof() {
1042        let collector = spawn_consumer_async(
1043            "custom",
1044            event_receiver(vec![
1045                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
1046                StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1047                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1048                StreamEvent::Eof,
1049            ])
1050            .await,
1051            CollectChunksAsync::fold(Vec::new(), |chunk, seen: &mut Vec<u8>| {
1052                Box::pin(async move {
1053                    seen.extend_from_slice(chunk.as_ref());
1054                    Next::Continue
1055                })
1056            }),
1057        );
1058
1059        let seen = collector.wait().await.unwrap();
1060        assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1061    }
1062
1063    #[tokio::test]
1064    async fn chunk_collector_accepts_stateful_callback() {
1065        let mut chunk_index = 0;
1066        let collector = spawn_consumer_sync(
1067            "custom",
1068            event_receiver(vec![
1069                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ab"))),
1070                StreamEvent::Chunk(Chunk(Bytes::from_static(b"cd"))),
1071                StreamEvent::Chunk(Chunk(Bytes::from_static(b"ef"))),
1072                StreamEvent::Eof,
1073            ])
1074            .await,
1075            CollectChunks::builder()
1076                .sink(Vec::new())
1077                .f(
1078                    move |chunk: Chunk, indexed_chunks: &mut Vec<(usize, Vec<u8>)>| {
1079                        chunk_index += 1;
1080                        indexed_chunks.push((chunk_index, chunk.as_ref().to_vec()));
1081                    },
1082                )
1083                .build(),
1084        );
1085
1086        let indexed_chunks = collector.wait().await.unwrap();
1087        assert_that!(indexed_chunks).is_equal_to(vec![
1088            (1, b"ab".to_vec()),
1089            (2, b"cd".to_vec()),
1090            (3, b"ef".to_vec()),
1091        ]);
1092    }
1093
1094    #[tokio::test]
1095    async fn line_collector_accepts_stateful_callback() {
1096        let mut line_index = 0;
1097        let collector = spawn_consumer_sync(
1098            "custom",
1099            event_receiver(vec![
1100                StreamEvent::Chunk(Chunk(Bytes::from_static(b"alpha\nbeta\ngamma\n"))),
1101                StreamEvent::Eof,
1102            ])
1103            .await,
1104            ParseLines::collect(
1105                LineParsingOptions::default(),
1106                Vec::new(),
1107                move |line: Cow<'_, str>, indexed_lines: &mut Vec<String>| {
1108                    line_index += 1;
1109                    indexed_lines.push(format!("{line_index}:{line}"));
1110                    Next::Continue
1111                },
1112            ),
1113        );
1114
1115        let indexed_lines = collector.wait().await.unwrap();
1116        assert_that!(indexed_lines).is_equal_to(vec![
1117            "1:alpha".to_string(),
1118            "2:beta".to_string(),
1119            "3:gamma".to_string(),
1120        ]);
1121    }
1122
1123    #[tokio::test]
1124    async fn line_collector_async_break_stops_after_requested_line() {
1125        let collector = spawn_consumer_async(
1126            "custom",
1127            event_receiver(vec![
1128                StreamEvent::Chunk(Chunk(Bytes::from_static(b"start\nbreak\nend\n"))),
1129                StreamEvent::Eof,
1130            ])
1131            .await,
1132            ParseLines::collect_async(
1133                LineParsingOptions::default(),
1134                Vec::new(),
1135                |line, seen: &mut Vec<String>| {
1136                    Box::pin(async move {
1137                        let is_break = line == "break";
1138                        seen.push(line.into_owned());
1139                        if is_break {
1140                            Next::Break
1141                        } else {
1142                            Next::Continue
1143                        }
1144                    })
1145                },
1146            ),
1147        );
1148
1149        let seen = collector.wait().await.unwrap();
1150        assert_that!(seen).contains_exactly(["start", "break"]);
1151    }
1152}