tokio_process_tools/output_stream/line/adapter.rs
1//! Adapter that turns a chunk-level [`StreamVisitor`] / [`AsyncStreamVisitor`] into a
2//! line-level sink.
3//!
4//! Six visitors used to drive the same five-step dance independently:
5//!
6//! 1. Hold a [`LineParser`] + [`LineParsingOptions`].
7//! 2. On every chunk, feed bytes into the parser and dispatch each emitted line to the
8//! visitor-specific per-line callback.
9//! 3. On a gap, reset the parser's partial-line buffer.
10//! 4. On EOF, flush any unterminated trailing line through the same callback.
11//! 5. Assert at construction time that `options.max_line_length` is non-zero.
12//!
13//! [`LineAdapter`] owns steps 1–5 in one place; each line-aware visitor (`InspectLines`,
14//! `CollectLines`, `WaitForLine`, `WriteLines`, plus their async twins) collapses to an inner
15//! [`LineSink`] / [`AsyncLineSink`] that only describes the per-line action. Adding a new
16//! line-aware visitor is now "implement [`LineSink`] (or [`AsyncLineSink`]) and compose with
17//! [`LineAdapter`]," not "re-derive the parser plumbing for the seventh time."
18//!
19//! A single [`LineAdapter<S>`] struct serves both the sync and async paths: it carries two
20//! trait impls — [`StreamVisitor`] when `S: LineSink`, [`AsyncStreamVisitor`] when
21//! `S: AsyncLineSink` — and Rust selects the right one based on which trait the inner sink
22//! implements. Implementing both [`LineSink`] and [`AsyncLineSink`] for the same sink is
23//! supported but creates ambiguity at the [`LineAdapter`] call site: the caller has to make
24//! the trait selection explicit (e.g., by which consumer driver they hand the adapter to).
25//! In practice you implement whichever trait matches the work the sink does — sync if
26//! `on_line` is non-blocking, async if it `.await`s.
27//!
28//! Encoding the `max_line_length > 0` invariant as a `NonZero*` newtype was rejected on
29//! ergonomic grounds: every caller of `LineParsingOptions` would inherit the wrapper, even
30//! when they construct it from a known-non-zero literal. The runtime assert lives in
31//! [`super::options`] as the single source of that check; the constraint is documented on
32//! [`LineParsingOptions::max_line_length`].
33
34use super::options::{LineParsingOptions, assert_max_line_length_non_zero};
35use super::parser::LineParser;
36use crate::output_stream::Next;
37use crate::output_stream::event::Chunk;
38use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
39use std::borrow::Cow;
40use std::future::Future;
41
42/// Per-line action selected by the [`StreamVisitor`] impl of [`LineAdapter`].
43///
44/// Implementors only describe what should happen for each parsed line; chunk parsing, gap
45/// handling, and EOF flushing are carried by the adapter.
46pub trait LineSink: Send + 'static {
47 /// Final value produced once the adapter is finished. Returned via
48 /// [`Consumer::wait`](crate::Consumer::wait).
49 type Output: Send + 'static;
50
51 /// Invoked for every parsed line. Return [`Next::Break`] to stop further parsing.
52 fn on_line(&mut self, line: Cow<'_, str>) -> Next;
53
54 /// Invoked after the adapter resets the parser following a stream gap. The default does
55 /// nothing; override when the inner sink keeps line-spanning state that the gap
56 /// invalidates.
57 fn on_gap(&mut self) {}
58
59 /// Invoked once the adapter finishes flushing any trailing line at EOF. The default does
60 /// nothing; override for sinks that want a finalization hook beyond the last `on_line`
61 /// call.
62 fn on_eof(&mut self) {}
63
64 /// Consumes the sink and returns its final output.
65 fn into_output(self) -> Self::Output;
66}
67
68/// Async per-line action selected by the [`AsyncStreamVisitor`] impl of [`LineAdapter`].
69///
70/// `on_line` is async because the line-aware async visitors (`inspect_lines_async`,
71/// `collect_lines_async`, `collect_lines_into_write*`) all need to `.await` per-line work.
72/// `on_gap` stays synchronous because gap notification carries no payload to await on,
73/// mirroring [`AsyncStreamVisitor::on_gap`].
74///
75/// # Allocation note
76///
77/// The async path materializes every parsed line as an owned `String` before handing it to
78/// `on_line`. The synchronous [`LineSink`] path can instead pass a `Cow::Borrowed` straight out
79/// of the chunk on the fast path, so it avoids a per-line allocation when the line fits in the
80/// current chunk. The allocation is the cost of holding the parser's borrow across an `.await`,
81/// which Rust does not allow because the next iteration re-borrows the parser. Use [`LineSink`]
82/// (and the corresponding `inspect_lines` / `collect_lines` factories) when per-line work is
83/// non-blocking and you want the zero-copy fast path; reach for [`AsyncLineSink`] only when the
84/// per-line work genuinely needs to await.
85pub trait AsyncLineSink: Send + 'static {
86 /// Final value produced once the adapter is finished.
87 type Output: Send + 'static;
88
89 /// Asynchronously observes a single parsed line. Return [`Next::Break`] to stop further
90 /// parsing.
91 fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a;
92
93 /// Synchronous gap hook; default no-op. See [`LineSink::on_gap`].
94 fn on_gap(&mut self) {}
95
96 /// Asynchronous EOF hook; default no-op. Invoked after the adapter has flushed any
97 /// trailing line through `on_line`.
98 fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
99 async {}
100 }
101
102 /// Consumes the sink and returns its final output.
103 fn into_output(self) -> Self::Output;
104}
105
106/// Adapter that drives [`LineParser`] over chunk events and dispatches every emitted line to
107/// the inner [`LineSink`] (sync) or [`AsyncLineSink`] (async).
108///
109/// One struct, two trait impls: [`StreamVisitor`] when `S: LineSink`, [`AsyncStreamVisitor`]
110/// when `S: AsyncLineSink`. Rust selects the right impl from the inner sink's type at the
111/// call site.
112pub struct LineAdapter<S> {
113 parser: LineParser,
114 options: LineParsingOptions,
115 inner: S,
116}
117
118impl<S> LineAdapter<S> {
119 /// Creates a new line adapter.
120 ///
121 /// # Panics
122 ///
123 /// Panics if `options.max_line_length` is zero. See
124 /// [`LineParsingOptions::max_line_length`] for the rationale; pass
125 /// [`crate::NumBytes::MAX`] for effectively-unbounded line parsing.
126 pub fn new(options: LineParsingOptions, inner: S) -> Self {
127 assert_max_line_length_non_zero(&options);
128 Self {
129 parser: LineParser::new(),
130 options,
131 inner,
132 }
133 }
134}
135
136impl<S: LineSink> StreamVisitor for LineAdapter<S> {
137 type Output = S::Output;
138
139 fn on_chunk(&mut self, chunk: Chunk) -> Next {
140 let Self {
141 parser,
142 options,
143 inner,
144 } = self;
145 let mut bytes: &[u8] = chunk.as_ref();
146 while let Some(line) = parser.next_line(&mut bytes, *options) {
147 if inner.on_line(line) == Next::Break {
148 return Next::Break;
149 }
150 }
151 Next::Continue
152 }
153
154 fn on_gap(&mut self) {
155 self.parser.on_gap();
156 self.inner.on_gap();
157 }
158
159 fn on_eof(&mut self) {
160 if let Some(line) = self.parser.finish() {
161 let _ = self.inner.on_line(line);
162 }
163 self.inner.on_eof();
164 }
165
166 fn into_output(self) -> Self::Output {
167 self.inner.into_output()
168 }
169}
170
171/// Async impl of the same [`LineAdapter`] struct.
172///
173/// Each per-line iteration calls `next_line` synchronously, materializes the line as a fresh
174/// `String` via `Cow::into_owned`, drops the parser borrow, then awaits the inner sink. The
175/// allocation per line is the price of supporting async per-line callbacks on stable Rust —
176/// holding a parser borrow across an `.await` is forbidden because the next iteration
177/// re-borrows the parser.
178impl<S: AsyncLineSink> AsyncStreamVisitor for LineAdapter<S> {
179 type Output = S::Output;
180
181 async fn on_chunk(&mut self, chunk: Chunk) -> Next {
182 let Self {
183 parser,
184 options,
185 inner,
186 } = self;
187 let mut bytes: &[u8] = chunk.as_ref();
188 loop {
189 let line = match parser.next_line(&mut bytes, *options) {
190 Some(line) => line.into_owned(),
191 None => return Next::Continue,
192 };
193 if inner.on_line(Cow::Owned(line)).await == Next::Break {
194 return Next::Break;
195 }
196 }
197 }
198
199 fn on_gap(&mut self) {
200 self.parser.on_gap();
201 self.inner.on_gap();
202 }
203
204 async fn on_eof(&mut self) {
205 let trailing = self.parser.finish().map(Cow::into_owned);
206 if let Some(line) = trailing {
207 let _ = self.inner.on_line(Cow::Owned(line)).await;
208 }
209 self.inner.on_eof().await;
210 }
211
212 fn into_output(self) -> Self::Output {
213 self.inner.into_output()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::super::options::LineOverflowBehavior;
220 use super::*;
221 use crate::NumBytesExt;
222 use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
223 use crate::output_stream::event::StreamEvent;
224 use crate::output_stream::event::tests::event_receiver;
225 use assertr::prelude::*;
226 use bytes::Bytes;
227 use std::sync::{Arc, Mutex};
228
229 struct CollectingSink {
230 seen: Arc<Mutex<Vec<String>>>,
231 }
232
233 impl LineSink for CollectingSink {
234 type Output = ();
235
236 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
237 self.seen.lock().unwrap().push(line.into_owned());
238 Next::Continue
239 }
240
241 fn into_output(self) -> Self::Output {}
242 }
243
244 struct CollectingAsyncSink {
245 seen: Arc<Mutex<Vec<String>>>,
246 }
247
248 impl AsyncLineSink for CollectingAsyncSink {
249 type Output = ();
250
251 async fn on_line(&mut self, line: Cow<'_, str>) -> Next {
252 self.seen.lock().unwrap().push(line.into_owned());
253 Next::Continue
254 }
255
256 fn into_output(self) -> Self::Output {}
257 }
258
259 mod sync {
260 use super::*;
261
262 #[test]
263 #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
264 fn new_panics_when_max_line_length_is_zero() {
265 let _ = LineAdapter::new(
266 LineParsingOptions {
267 max_line_length: 0.bytes(),
268 overflow_behavior: LineOverflowBehavior::default(),
269 buffer_compaction_threshold: None,
270 },
271 CollectingSink {
272 seen: Arc::new(Mutex::new(Vec::new())),
273 },
274 );
275 }
276
277 #[tokio::test]
278 async fn flushes_trailing_unterminated_line_at_eof() {
279 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
280 let consumer = spawn_consumer_sync(
281 "custom",
282 event_receiver(vec![
283 StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\nsec"))),
284 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ond\nthird"))),
285 StreamEvent::Eof,
286 ])
287 .await,
288 LineAdapter::new(
289 LineParsingOptions::default(),
290 CollectingSink {
291 seen: Arc::clone(&seen),
292 },
293 ),
294 );
295
296 consumer.wait().await.unwrap();
297 assert_that!(seen.lock().unwrap().clone())
298 .contains_exactly(["first", "second", "third"]);
299 }
300
301 #[tokio::test]
302 async fn gap_discards_partial_line() {
303 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
304 let consumer = spawn_consumer_sync(
305 "custom",
306 event_receiver(vec![
307 StreamEvent::Chunk(Chunk(Bytes::from_static(b"par"))),
308 StreamEvent::Gap,
309 StreamEvent::Chunk(Chunk(Bytes::from_static(b"tial\nclean\n"))),
310 StreamEvent::Eof,
311 ])
312 .await,
313 LineAdapter::new(
314 LineParsingOptions::default(),
315 CollectingSink {
316 seen: Arc::clone(&seen),
317 },
318 ),
319 );
320
321 consumer.wait().await.unwrap();
322 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["clean"]);
323 }
324
325 #[tokio::test]
326 async fn break_from_inner_stops_parsing_immediately() {
327 struct StopAtSecondLine {
328 seen: Arc<Mutex<Vec<String>>>,
329 count: usize,
330 }
331
332 impl LineSink for StopAtSecondLine {
333 type Output = ();
334 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
335 self.count += 1;
336 self.seen.lock().unwrap().push(line.into_owned());
337 if self.count == 2 {
338 Next::Break
339 } else {
340 Next::Continue
341 }
342 }
343 fn into_output(self) -> Self::Output {}
344 }
345
346 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
347 let consumer = spawn_consumer_sync(
348 "custom",
349 event_receiver(vec![
350 StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\nc\nd\n"))),
351 StreamEvent::Eof,
352 ])
353 .await,
354 LineAdapter::new(
355 LineParsingOptions::default(),
356 StopAtSecondLine {
357 seen: Arc::clone(&seen),
358 count: 0,
359 },
360 ),
361 );
362
363 consumer.wait().await.unwrap();
364 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["a", "b"]);
365 }
366 }
367
368 mod r#async {
369 use super::*;
370
371 #[test]
372 #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
373 fn new_panics_when_max_line_length_is_zero() {
374 let _ = LineAdapter::new(
375 LineParsingOptions {
376 max_line_length: 0.bytes(),
377 overflow_behavior: LineOverflowBehavior::default(),
378 buffer_compaction_threshold: None,
379 },
380 CollectingAsyncSink {
381 seen: Arc::new(Mutex::new(Vec::new())),
382 },
383 );
384 }
385
386 #[tokio::test]
387 async fn flushes_trailing_unterminated_line_at_eof() {
388 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
389 let consumer = spawn_consumer_async(
390 "custom",
391 event_receiver(vec![
392 StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\ntail"))),
393 StreamEvent::Eof,
394 ])
395 .await,
396 LineAdapter::new(
397 LineParsingOptions::default(),
398 CollectingAsyncSink {
399 seen: Arc::clone(&seen),
400 },
401 ),
402 );
403
404 consumer.wait().await.unwrap();
405 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["first", "tail"]);
406 }
407 }
408}