Skip to main content

tokio_process_tools/output_stream/line/
adapter.rs

1//! Adapter that turns a chunk-level [`StreamVisitor`] / [`AsyncStreamVisitor`] into a
2//! line-level [`LineVisitor`] / [`AsyncLineVisitor`].
3//!
4//! Each line-aware visitor ([`InspectLines`], [`CollectLines`],
5//! [`crate::visitors::wait::WaitForLine`], [`crate::visitors::write::WriteLines`],
6//! plus their async twins) collapses to an inner  [`LineVisitor`] / [`AsyncLineVisitor`] that only
7//! describes the per-line action. Adding a new line-aware visitor is now "implement [`LineVisitor`]
8//! (or [`AsyncLineVisitor`]) and compose with [`ParseLines`]," not "re-derive the parser plumbing
9//! for the seventh time."
10//!
11//! A single [`ParseLines<S>`] struct serves both the sync and async paths: it carries two
12//! trait impls ([`StreamVisitor`] when `S: LineVisitor`, [`AsyncStreamVisitor`] when
13//! `S: AsyncLineVisitor`), and Rust selects the right one based on which trait the inner sink
14//! implements. Implementing both [`LineVisitor`] and [`AsyncLineVisitor`] for the same sink is
15//! supported but creates ambiguity at the [`ParseLines`] call site: the caller has to make
16//! the trait selection explicit (e.g., by which consumer driver they hand the adapter to).
17//! In practice, you implement whichever trait matches the work the sink does: sync if
18//! `on_line` is non-blocking, async if it `.await`s.
19//!
20//! Encoding the `max_line_length > 0` invariant as a `NonZero*` newtype was rejected on
21//! ergonomic grounds: every caller of `LineParsingOptions` would inherit the wrapper, even
22//! when they construct it from a known-non-zero literal. The runtime assert lives in
23//! [`super::options`] as the single source of that check; the constraint is documented on
24//! [`LineParsingOptions::max_line_length`].
25
26use super::options::{LineParsingOptions, assert_max_line_length_non_zero};
27use super::parser::LineParser;
28use crate::output_stream::Next;
29use crate::output_stream::consumer::Sink;
30use crate::output_stream::event::Chunk;
31use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
32use crate::output_stream::visitors::collect::{CollectLines, CollectLinesAsync};
33use crate::output_stream::visitors::inspect::{InspectLines, InspectLinesAsync};
34use std::borrow::Cow;
35use std::future::Future;
36
37/// Per-line action selected by the [`StreamVisitor`] impl of [`ParseLines`].
38///
39/// Implementors only describe what should happen for each parsed line; chunk parsing, gap
40/// handling, and EOF flushing are carried by the adapter.
41pub trait LineVisitor: Send + 'static {
42    /// Final value produced once the adapter is finished. Returned via
43    /// [`Consumer::wait`](crate::Consumer::wait).
44    type Output: Send + 'static;
45
46    /// Invoked for every parsed line. Return [`Next::Break`] to stop further parsing.
47    fn on_line(&mut self, line: Cow<'_, str>) -> Next;
48
49    /// Invoked after the adapter resets the parser following a stream gap. The default does
50    /// nothing; override when the inner sink keeps line-spanning state that the gap
51    /// invalidates.
52    fn on_gap(&mut self) {}
53
54    /// Invoked once the adapter finishes flushing any trailing line at EOF. The default does
55    /// nothing; override for sinks that want a finalization hook beyond the last `on_line`
56    /// call.
57    fn on_eof(&mut self) {}
58
59    /// Consumes the sink and returns its final output.
60    #[must_use]
61    fn into_output(self) -> Self::Output;
62}
63
64/// Async per-line action selected by the [`AsyncStreamVisitor`] impl of [`ParseLines`].
65///
66/// `on_line` is async because async per-line work (writing to an async sink, awaiting a
67/// channel) needs `.await`. `on_gap` stays synchronous because gap notification carries no
68/// payload to await on, mirroring [`AsyncStreamVisitor::on_gap`].
69///
70/// # Allocation note
71///
72/// The async path materializes every parsed line as an owned `String` before handing it to
73/// `on_line`. The synchronous [`LineVisitor`] path can instead pass a `Cow::Borrowed` straight out
74/// of the chunk on the fast path, so it avoids a per-line allocation when the line fits in the
75/// current chunk. The allocation is the cost of holding the parser's borrow across an `.await`,
76/// which Rust does not allow because the next iteration re-borrows the parser. Prefer
77/// [`LineVisitor`] (composed via [`ParseLines::inspect`] / [`ParseLines::collect`]) when
78/// per-line work is non-blocking, and you want the zero-copy fast path; reach for
79/// [`AsyncLineVisitor`] only when the per-line work genuinely needs to await.
80pub trait AsyncLineVisitor: Send + 'static {
81    /// Final value produced once the adapter is finished.
82    type Output: Send + 'static;
83
84    /// Asynchronously observes a single parsed line. Return [`Next::Break`] to stop further
85    /// parsing.
86    fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a;
87
88    /// Synchronous gap hook; default no-op. See [`LineVisitor::on_gap`].
89    fn on_gap(&mut self) {}
90
91    /// Asynchronous EOF hook; default no-op. Invoked after the adapter has flushed any
92    /// trailing line through `on_line`.
93    fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
94        async {}
95    }
96
97    /// Consumes the sink and returns its final output.
98    #[must_use]
99    fn into_output(self) -> Self::Output;
100}
101
102/// Adapter that drives a [`LineParser`] over chunk events and dispatches every emitted line to
103/// the `inner` [`LineVisitor`] (sync) or [`AsyncLineVisitor`] (async).
104///
105/// Implements [`StreamVisitor`] when `S: LineVisitor` and [`AsyncStreamVisitor`] when
106/// `S: AsyncLineVisitor`.
107pub struct ParseLines<S> {
108    parser: LineParser,
109    options: LineParsingOptions,
110    inner: S,
111}
112
113impl<S> ParseLines<S> {
114    /// Creates a new line adapter.
115    ///
116    /// # Panics
117    ///
118    /// Panics if `options.max_line_length` is zero. See
119    /// [`LineParsingOptions::max_line_length`] for the rationale; pass
120    /// [`crate::NumBytes::MAX`] for effectively-unbounded line parsing.
121    pub fn new(options: LineParsingOptions, inner: S) -> Self {
122        assert_max_line_length_non_zero(&options);
123        Self {
124            parser: LineParser::new(),
125            options,
126            inner,
127        }
128    }
129}
130
131impl<F> ParseLines<InspectLines<F>>
132where
133    F: FnMut(Cow<'_, str>) -> Next + Send + 'static,
134{
135    /// Convenience constructor: wraps `f` in an [`InspectLines`] and composes it with this
136    /// adapter. Equivalent to `ParseLines::new(options, InspectLines::new(f))`.
137    ///
138    /// # Panics
139    ///
140    /// Panics if `options.max_line_length` is zero.
141    pub fn inspect(options: LineParsingOptions, f: F) -> Self {
142        Self::new(options, InspectLines::new(f))
143    }
144}
145
146impl<F, Fut> ParseLines<InspectLinesAsync<F, Fut>>
147where
148    F: FnMut(Cow<'_, str>) -> Fut + Send + 'static,
149    Fut: Future<Output = Next> + Send + 'static,
150{
151    /// Convenience constructor: wraps `f` in an [`InspectLinesAsync`] and composes it with this
152    /// adapter. Equivalent to `ParseLines::new(options, InspectLinesAsync::new(f))`.
153    ///
154    /// # Panics
155    ///
156    /// Panics if `options.max_line_length` is zero.
157    pub fn inspect_async(options: LineParsingOptions, f: F) -> Self {
158        Self::new(options, InspectLinesAsync::new(f))
159    }
160}
161
162impl<T, F> ParseLines<CollectLines<T, F>>
163where
164    T: Sink,
165    F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
166{
167    /// Convenience constructor: wraps `sink` and `f` in a [`CollectLines`] and composes it with
168    /// this adapter.
169    ///
170    /// # Panics
171    ///
172    /// Panics if `options.max_line_length` is zero.
173    pub fn collect(options: LineParsingOptions, sink: T, f: F) -> Self {
174        Self::new(options, CollectLines::new(sink, f))
175    }
176}
177
178impl<T, F> ParseLines<CollectLinesAsync<T, F>>
179where
180    T: Sink,
181    F: for<'a> FnMut(
182            Cow<'a, str>,
183            &'a mut T,
184        ) -> std::pin::Pin<Box<dyn Future<Output = Next> + Send + 'a>>
185        + Send
186        + 'static,
187{
188    /// Convenience constructor: wraps `sink` and `f` in a [`CollectLinesAsync`] and composes it
189    /// with this adapter. The closure must wrap its async body in `Box::pin(async move { ... })`.
190    ///
191    /// # Panics
192    ///
193    /// Panics if `options.max_line_length` is zero.
194    pub fn collect_async(options: LineParsingOptions, sink: T, f: F) -> Self {
195        Self::new(options, CollectLinesAsync::new(sink, f))
196    }
197}
198
199impl<S: LineVisitor> StreamVisitor for ParseLines<S> {
200    type Output = S::Output;
201
202    fn on_chunk(&mut self, chunk: Chunk) -> Next {
203        let Self {
204            parser,
205            options,
206            inner,
207        } = self;
208        let mut bytes: &[u8] = chunk.as_ref();
209        while let Some(line) = parser.next_line(&mut bytes, *options) {
210            if inner.on_line(line) == Next::Break {
211                return Next::Break;
212            }
213        }
214        Next::Continue
215    }
216
217    fn on_gap(&mut self) {
218        self.parser.on_gap();
219        self.inner.on_gap();
220    }
221
222    fn on_eof(&mut self) {
223        if let Some(line) = self.parser.finish() {
224            let _ = self.inner.on_line(line);
225        }
226        self.inner.on_eof();
227    }
228
229    fn into_output(self) -> Self::Output {
230        self.inner.into_output()
231    }
232}
233
234impl<S: AsyncLineVisitor> AsyncStreamVisitor for ParseLines<S> {
235    type Output = S::Output;
236
237    async fn on_chunk(&mut self, chunk: Chunk) -> Next {
238        let Self {
239            parser,
240            options,
241            inner,
242        } = self;
243        let mut bytes: &[u8] = chunk.as_ref();
244        loop {
245            let line = match parser.next_line(&mut bytes, *options) {
246                Some(line) => line.into_owned(),
247                None => return Next::Continue,
248            };
249            if inner.on_line(Cow::Owned(line)).await == Next::Break {
250                return Next::Break;
251            }
252        }
253    }
254
255    fn on_gap(&mut self) {
256        self.parser.on_gap();
257        self.inner.on_gap();
258    }
259
260    async fn on_eof(&mut self) {
261        let trailing = self.parser.finish().map(Cow::into_owned);
262        if let Some(line) = trailing {
263            let _ = self.inner.on_line(Cow::Owned(line)).await;
264        }
265        self.inner.on_eof().await;
266    }
267
268    fn into_output(self) -> Self::Output {
269        self.inner.into_output()
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::super::options::LineOverflowBehavior;
276    use super::*;
277    use crate::NumBytesExt;
278    use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
279    use crate::output_stream::event::StreamEvent;
280    use crate::output_stream::event::tests::event_receiver;
281    use assertr::prelude::*;
282    use bytes::Bytes;
283    use std::sync::{Arc, Mutex};
284
285    struct CollectingSink {
286        seen: Arc<Mutex<Vec<String>>>,
287    }
288
289    impl LineVisitor for CollectingSink {
290        type Output = ();
291
292        fn on_line(&mut self, line: Cow<'_, str>) -> Next {
293            self.seen.lock().unwrap().push(line.into_owned());
294            Next::Continue
295        }
296
297        fn into_output(self) -> Self::Output {}
298    }
299
300    struct CollectingAsyncSink {
301        seen: Arc<Mutex<Vec<String>>>,
302    }
303
304    impl AsyncLineVisitor for CollectingAsyncSink {
305        type Output = ();
306
307        async fn on_line(&mut self, line: Cow<'_, str>) -> Next {
308            self.seen.lock().unwrap().push(line.into_owned());
309            Next::Continue
310        }
311
312        fn into_output(self) -> Self::Output {}
313    }
314
315    mod sync {
316        use super::*;
317
318        #[test]
319        #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
320        fn new_panics_when_max_line_length_is_zero() {
321            let _ = ParseLines::new(
322                LineParsingOptions {
323                    max_line_length: 0.bytes(),
324                    overflow_behavior: LineOverflowBehavior::default(),
325                    buffer_compaction_threshold: None,
326                },
327                CollectingSink {
328                    seen: Arc::new(Mutex::new(Vec::new())),
329                },
330            );
331        }
332
333        #[tokio::test]
334        async fn flushes_trailing_unterminated_line_at_eof() {
335            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
336            let consumer = spawn_consumer_sync(
337                "custom",
338                event_receiver(vec![
339                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\nsec"))),
340                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"ond\nthird"))),
341                    StreamEvent::Eof,
342                ])
343                .await,
344                ParseLines::new(
345                    LineParsingOptions::default(),
346                    CollectingSink {
347                        seen: Arc::clone(&seen),
348                    },
349                ),
350            );
351
352            consumer.wait().await.unwrap();
353            assert_that!(seen.lock().unwrap().clone())
354                .contains_exactly(["first", "second", "third"]);
355        }
356
357        #[tokio::test]
358        async fn gap_discards_partial_line() {
359            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
360            let consumer = spawn_consumer_sync(
361                "custom",
362                event_receiver(vec![
363                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"par"))),
364                    StreamEvent::Gap,
365                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"tial\nclean\n"))),
366                    StreamEvent::Eof,
367                ])
368                .await,
369                ParseLines::new(
370                    LineParsingOptions::default(),
371                    CollectingSink {
372                        seen: Arc::clone(&seen),
373                    },
374                ),
375            );
376
377            consumer.wait().await.unwrap();
378            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["clean"]);
379        }
380
381        #[tokio::test]
382        async fn break_from_inner_stops_parsing_immediately() {
383            struct StopAtSecondLine {
384                seen: Arc<Mutex<Vec<String>>>,
385                count: usize,
386            }
387
388            impl LineVisitor for StopAtSecondLine {
389                type Output = ();
390                fn on_line(&mut self, line: Cow<'_, str>) -> Next {
391                    self.count += 1;
392                    self.seen.lock().unwrap().push(line.into_owned());
393                    if self.count == 2 {
394                        Next::Break
395                    } else {
396                        Next::Continue
397                    }
398                }
399                fn into_output(self) -> Self::Output {}
400            }
401
402            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
403            let consumer = spawn_consumer_sync(
404                "custom",
405                event_receiver(vec![
406                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\nc\nd\n"))),
407                    StreamEvent::Eof,
408                ])
409                .await,
410                ParseLines::new(
411                    LineParsingOptions::default(),
412                    StopAtSecondLine {
413                        seen: Arc::clone(&seen),
414                        count: 0,
415                    },
416                ),
417            );
418
419            consumer.wait().await.unwrap();
420            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["a", "b"]);
421        }
422    }
423
424    mod r#async {
425        use super::*;
426
427        #[test]
428        #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
429        fn new_panics_when_max_line_length_is_zero() {
430            let _ = ParseLines::new(
431                LineParsingOptions {
432                    max_line_length: 0.bytes(),
433                    overflow_behavior: LineOverflowBehavior::default(),
434                    buffer_compaction_threshold: None,
435                },
436                CollectingAsyncSink {
437                    seen: Arc::new(Mutex::new(Vec::new())),
438                },
439            );
440        }
441
442        #[tokio::test]
443        async fn flushes_trailing_unterminated_line_at_eof() {
444            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
445            let consumer = spawn_consumer_async(
446                "custom",
447                event_receiver(vec![
448                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\ntail"))),
449                    StreamEvent::Eof,
450                ])
451                .await,
452                ParseLines::new(
453                    LineParsingOptions::default(),
454                    CollectingAsyncSink {
455                        seen: Arc::clone(&seen),
456                    },
457                ),
458            );
459
460            consumer.wait().await.unwrap();
461            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["first", "tail"]);
462        }
463    }
464}