Skip to main content

tokio_process_tools/output_stream/line/
adapter.rs

1//! Adapter that turns a chunk-level [`StreamVisitor`] / [`AsyncStreamVisitor`] into a
2//! line-level sink.
3//!
4//! Six visitors used to drive the same five-step dance independently:
5//!
6//! 1. Hold a [`LineParser`] + [`LineParsingOptions`].
7//! 2. On every chunk, feed bytes into the parser and dispatch each emitted line to the
8//!    visitor-specific per-line callback.
9//! 3. On a gap, reset the parser's partial-line buffer.
10//! 4. On EOF, flush any unterminated trailing line through the same callback.
11//! 5. Assert at construction time that `options.max_line_length` is non-zero.
12//!
13//! [`LineAdapter`] owns steps 1–5 in one place; each line-aware visitor (`InspectLines`,
14//! `CollectLines`, `WaitForLine`, `WriteLines`, plus their async twins) collapses to an inner
15//! [`LineSink`] / [`AsyncLineSink`] that only describes the per-line action. Adding a new
16//! line-aware visitor is now "implement [`LineSink`] (or [`AsyncLineSink`]) and compose with
17//! [`LineAdapter`]," not "re-derive the parser plumbing for the seventh time."
18//!
19//! A single [`LineAdapter<S>`] struct serves both the sync and async paths: it carries two
20//! trait impls — [`StreamVisitor`] when `S: LineSink`, [`AsyncStreamVisitor`] when
21//! `S: AsyncLineSink` — and Rust selects the right one based on which trait the inner sink
22//! implements. Implementing both [`LineSink`] and [`AsyncLineSink`] for the same sink is
23//! supported but creates ambiguity at the [`LineAdapter`] call site: the caller has to make
24//! the trait selection explicit (e.g., by which consumer driver they hand the adapter to).
25//! In practice you implement whichever trait matches the work the sink does — sync if
26//! `on_line` is non-blocking, async if it `.await`s.
27//!
28//! Encoding the `max_line_length > 0` invariant as a `NonZero*` newtype was rejected on
29//! ergonomic grounds: every caller of `LineParsingOptions` would inherit the wrapper, even
30//! when they construct it from a known-non-zero literal. The runtime assert lives in
31//! [`super::options`] as the single source of that check; the constraint is documented on
32//! [`LineParsingOptions::max_line_length`].
33
34use super::options::{LineParsingOptions, assert_max_line_length_non_zero};
35use super::parser::LineParser;
36use crate::output_stream::Next;
37use crate::output_stream::event::Chunk;
38use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
39use std::borrow::Cow;
40use std::future::Future;
41
42/// Per-line action selected by the [`StreamVisitor`] impl of [`LineAdapter`].
43///
44/// Implementors only describe what should happen for each parsed line; chunk parsing, gap
45/// handling, and EOF flushing are carried by the adapter.
46pub trait LineSink: Send + 'static {
47    /// Final value produced once the adapter is finished. Returned via
48    /// [`Consumer::wait`](crate::Consumer::wait).
49    type Output: Send + 'static;
50
51    /// Invoked for every parsed line. Return [`Next::Break`] to stop further parsing.
52    fn on_line(&mut self, line: Cow<'_, str>) -> Next;
53
54    /// Invoked after the adapter resets the parser following a stream gap. The default does
55    /// nothing; override when the inner sink keeps line-spanning state that the gap
56    /// invalidates.
57    fn on_gap(&mut self) {}
58
59    /// Invoked once the adapter finishes flushing any trailing line at EOF. The default does
60    /// nothing; override for sinks that want a finalization hook beyond the last `on_line`
61    /// call.
62    fn on_eof(&mut self) {}
63
64    /// Consumes the sink and returns its final output.
65    fn into_output(self) -> Self::Output;
66}
67
68/// Async per-line action selected by the [`AsyncStreamVisitor`] impl of [`LineAdapter`].
69///
70/// `on_line` is async because the line-aware async visitors (`inspect_lines_async`,
71/// `collect_lines_async`, `collect_lines_into_write*`) all need to `.await` per-line work.
72/// `on_gap` stays synchronous because gap notification carries no payload to await on,
73/// mirroring [`AsyncStreamVisitor::on_gap`].
74///
75/// # Allocation note
76///
77/// The async path materializes every parsed line as an owned `String` before handing it to
78/// `on_line`. The synchronous [`LineSink`] path can instead pass a `Cow::Borrowed` straight out
79/// of the chunk on the fast path, so it avoids a per-line allocation when the line fits in the
80/// current chunk. The allocation is the cost of holding the parser's borrow across an `.await`,
81/// which Rust does not allow because the next iteration re-borrows the parser. Use [`LineSink`]
82/// (and the corresponding `inspect_lines` / `collect_lines` factories) when per-line work is
83/// non-blocking and you want the zero-copy fast path; reach for [`AsyncLineSink`] only when the
84/// per-line work genuinely needs to await.
85pub trait AsyncLineSink: Send + 'static {
86    /// Final value produced once the adapter is finished.
87    type Output: Send + 'static;
88
89    /// Asynchronously observes a single parsed line. Return [`Next::Break`] to stop further
90    /// parsing.
91    fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a;
92
93    /// Synchronous gap hook; default no-op. See [`LineSink::on_gap`].
94    fn on_gap(&mut self) {}
95
96    /// Asynchronous EOF hook; default no-op. Invoked after the adapter has flushed any
97    /// trailing line through `on_line`.
98    fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
99        async {}
100    }
101
102    /// Consumes the sink and returns its final output.
103    fn into_output(self) -> Self::Output;
104}
105
106/// Adapter that drives [`LineParser`] over chunk events and dispatches every emitted line to
107/// the inner [`LineSink`] (sync) or [`AsyncLineSink`] (async).
108///
109/// One struct, two trait impls: [`StreamVisitor`] when `S: LineSink`, [`AsyncStreamVisitor`]
110/// when `S: AsyncLineSink`. Rust selects the right impl from the inner sink's type at the
111/// call site.
112pub struct LineAdapter<S> {
113    parser: LineParser,
114    options: LineParsingOptions,
115    inner: S,
116}
117
118impl<S> LineAdapter<S> {
119    /// Creates a new line adapter.
120    ///
121    /// # Panics
122    ///
123    /// Panics if `options.max_line_length` is zero. See
124    /// [`LineParsingOptions::max_line_length`] for the rationale; pass
125    /// [`crate::NumBytes::MAX`] for effectively-unbounded line parsing.
126    pub fn new(options: LineParsingOptions, inner: S) -> Self {
127        assert_max_line_length_non_zero(&options);
128        Self {
129            parser: LineParser::new(),
130            options,
131            inner,
132        }
133    }
134}
135
136impl<S: LineSink> StreamVisitor for LineAdapter<S> {
137    type Output = S::Output;
138
139    fn on_chunk(&mut self, chunk: Chunk) -> Next {
140        let Self {
141            parser,
142            options,
143            inner,
144        } = self;
145        let mut bytes: &[u8] = chunk.as_ref();
146        while let Some(line) = parser.next_line(&mut bytes, *options) {
147            if inner.on_line(line) == Next::Break {
148                return Next::Break;
149            }
150        }
151        Next::Continue
152    }
153
154    fn on_gap(&mut self) {
155        self.parser.on_gap();
156        self.inner.on_gap();
157    }
158
159    fn on_eof(&mut self) {
160        if let Some(line) = self.parser.finish() {
161            let _ = self.inner.on_line(line);
162        }
163        self.inner.on_eof();
164    }
165
166    fn into_output(self) -> Self::Output {
167        self.inner.into_output()
168    }
169}
170
171/// Async impl of the same [`LineAdapter`] struct.
172///
173/// Each per-line iteration calls `next_line` synchronously, materializes the line as a fresh
174/// `String` via `Cow::into_owned`, drops the parser borrow, then awaits the inner sink. The
175/// allocation per line is the price of supporting async per-line callbacks on stable Rust —
176/// holding a parser borrow across an `.await` is forbidden because the next iteration
177/// re-borrows the parser.
178impl<S: AsyncLineSink> AsyncStreamVisitor for LineAdapter<S> {
179    type Output = S::Output;
180
181    async fn on_chunk(&mut self, chunk: Chunk) -> Next {
182        let Self {
183            parser,
184            options,
185            inner,
186        } = self;
187        let mut bytes: &[u8] = chunk.as_ref();
188        loop {
189            let line = match parser.next_line(&mut bytes, *options) {
190                Some(line) => line.into_owned(),
191                None => return Next::Continue,
192            };
193            if inner.on_line(Cow::Owned(line)).await == Next::Break {
194                return Next::Break;
195            }
196        }
197    }
198
199    fn on_gap(&mut self) {
200        self.parser.on_gap();
201        self.inner.on_gap();
202    }
203
204    async fn on_eof(&mut self) {
205        let trailing = self.parser.finish().map(Cow::into_owned);
206        if let Some(line) = trailing {
207            let _ = self.inner.on_line(Cow::Owned(line)).await;
208        }
209        self.inner.on_eof().await;
210    }
211
212    fn into_output(self) -> Self::Output {
213        self.inner.into_output()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::super::options::LineOverflowBehavior;
220    use super::*;
221    use crate::NumBytesExt;
222    use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
223    use crate::output_stream::event::StreamEvent;
224    use crate::output_stream::event::tests::event_receiver;
225    use assertr::prelude::*;
226    use bytes::Bytes;
227    use std::sync::{Arc, Mutex};
228
229    struct CollectingSink {
230        seen: Arc<Mutex<Vec<String>>>,
231    }
232
233    impl LineSink for CollectingSink {
234        type Output = ();
235
236        fn on_line(&mut self, line: Cow<'_, str>) -> Next {
237            self.seen.lock().unwrap().push(line.into_owned());
238            Next::Continue
239        }
240
241        fn into_output(self) -> Self::Output {}
242    }
243
244    struct CollectingAsyncSink {
245        seen: Arc<Mutex<Vec<String>>>,
246    }
247
248    impl AsyncLineSink for CollectingAsyncSink {
249        type Output = ();
250
251        async fn on_line(&mut self, line: Cow<'_, str>) -> Next {
252            self.seen.lock().unwrap().push(line.into_owned());
253            Next::Continue
254        }
255
256        fn into_output(self) -> Self::Output {}
257    }
258
259    mod sync {
260        use super::*;
261
262        #[test]
263        #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
264        fn new_panics_when_max_line_length_is_zero() {
265            let _ = LineAdapter::new(
266                LineParsingOptions {
267                    max_line_length: 0.bytes(),
268                    overflow_behavior: LineOverflowBehavior::default(),
269                    buffer_compaction_threshold: None,
270                },
271                CollectingSink {
272                    seen: Arc::new(Mutex::new(Vec::new())),
273                },
274            );
275        }
276
277        #[tokio::test]
278        async fn flushes_trailing_unterminated_line_at_eof() {
279            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
280            let consumer = spawn_consumer_sync(
281                "custom",
282                event_receiver(vec![
283                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\nsec"))),
284                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"ond\nthird"))),
285                    StreamEvent::Eof,
286                ])
287                .await,
288                LineAdapter::new(
289                    LineParsingOptions::default(),
290                    CollectingSink {
291                        seen: Arc::clone(&seen),
292                    },
293                ),
294            );
295
296            consumer.wait().await.unwrap();
297            assert_that!(seen.lock().unwrap().clone())
298                .contains_exactly(["first", "second", "third"]);
299        }
300
301        #[tokio::test]
302        async fn gap_discards_partial_line() {
303            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
304            let consumer = spawn_consumer_sync(
305                "custom",
306                event_receiver(vec![
307                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"par"))),
308                    StreamEvent::Gap,
309                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"tial\nclean\n"))),
310                    StreamEvent::Eof,
311                ])
312                .await,
313                LineAdapter::new(
314                    LineParsingOptions::default(),
315                    CollectingSink {
316                        seen: Arc::clone(&seen),
317                    },
318                ),
319            );
320
321            consumer.wait().await.unwrap();
322            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["clean"]);
323        }
324
325        #[tokio::test]
326        async fn break_from_inner_stops_parsing_immediately() {
327            struct StopAtSecondLine {
328                seen: Arc<Mutex<Vec<String>>>,
329                count: usize,
330            }
331
332            impl LineSink for StopAtSecondLine {
333                type Output = ();
334                fn on_line(&mut self, line: Cow<'_, str>) -> Next {
335                    self.count += 1;
336                    self.seen.lock().unwrap().push(line.into_owned());
337                    if self.count == 2 {
338                        Next::Break
339                    } else {
340                        Next::Continue
341                    }
342                }
343                fn into_output(self) -> Self::Output {}
344            }
345
346            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
347            let consumer = spawn_consumer_sync(
348                "custom",
349                event_receiver(vec![
350                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\nc\nd\n"))),
351                    StreamEvent::Eof,
352                ])
353                .await,
354                LineAdapter::new(
355                    LineParsingOptions::default(),
356                    StopAtSecondLine {
357                        seen: Arc::clone(&seen),
358                        count: 0,
359                    },
360                ),
361            );
362
363            consumer.wait().await.unwrap();
364            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["a", "b"]);
365        }
366    }
367
368    mod r#async {
369        use super::*;
370
371        #[test]
372        #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
373        fn new_panics_when_max_line_length_is_zero() {
374            let _ = LineAdapter::new(
375                LineParsingOptions {
376                    max_line_length: 0.bytes(),
377                    overflow_behavior: LineOverflowBehavior::default(),
378                    buffer_compaction_threshold: None,
379                },
380                CollectingAsyncSink {
381                    seen: Arc::new(Mutex::new(Vec::new())),
382                },
383            );
384        }
385
386        #[tokio::test]
387        async fn flushes_trailing_unterminated_line_at_eof() {
388            let seen = Arc::new(Mutex::new(Vec::<String>::new()));
389            let consumer = spawn_consumer_async(
390                "custom",
391                event_receiver(vec![
392                    StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\ntail"))),
393                    StreamEvent::Eof,
394                ])
395                .await,
396                LineAdapter::new(
397                    LineParsingOptions::default(),
398                    CollectingAsyncSink {
399                        seen: Arc::clone(&seen),
400                    },
401                ),
402            );
403
404            consumer.wait().await.unwrap();
405            assert_that!(seen.lock().unwrap().clone()).contains_exactly(["first", "tail"]);
406        }
407    }
408}