tokio_process_tools/output_stream/line/
adapter.rs1use super::options::{LineParsingOptions, assert_max_line_length_non_zero};
27use super::parser::LineParser;
28use crate::output_stream::Next;
29use crate::output_stream::consumer::Sink;
30use crate::output_stream::event::Chunk;
31use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
32use crate::output_stream::visitors::collect::{CollectLines, CollectLinesAsync};
33use crate::output_stream::visitors::inspect::{InspectLines, InspectLinesAsync};
34use std::borrow::Cow;
35use std::future::Future;
36
37pub trait LineVisitor: Send + 'static {
42 type Output: Send + 'static;
45
46 fn on_line(&mut self, line: Cow<'_, str>) -> Next;
48
49 fn on_gap(&mut self) {}
53
54 fn on_eof(&mut self) {}
58
59 #[must_use]
61 fn into_output(self) -> Self::Output;
62}
63
64pub trait AsyncLineVisitor: Send + 'static {
81 type Output: Send + 'static;
83
84 fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a;
87
88 fn on_gap(&mut self) {}
90
91 fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
94 async {}
95 }
96
97 #[must_use]
99 fn into_output(self) -> Self::Output;
100}
101
102pub struct ParseLines<S> {
108 parser: LineParser,
109 options: LineParsingOptions,
110 inner: S,
111}
112
113impl<S> ParseLines<S> {
114 pub fn new(options: LineParsingOptions, inner: S) -> Self {
122 assert_max_line_length_non_zero(&options);
123 Self {
124 parser: LineParser::new(),
125 options,
126 inner,
127 }
128 }
129}
130
131impl<F> ParseLines<InspectLines<F>>
132where
133 F: FnMut(Cow<'_, str>) -> Next + Send + 'static,
134{
135 pub fn inspect(options: LineParsingOptions, f: F) -> Self {
142 Self::new(options, InspectLines::new(f))
143 }
144}
145
146impl<F, Fut> ParseLines<InspectLinesAsync<F, Fut>>
147where
148 F: FnMut(Cow<'_, str>) -> Fut + Send + 'static,
149 Fut: Future<Output = Next> + Send + 'static,
150{
151 pub fn inspect_async(options: LineParsingOptions, f: F) -> Self {
158 Self::new(options, InspectLinesAsync::new(f))
159 }
160}
161
162impl<T, F> ParseLines<CollectLines<T, F>>
163where
164 T: Sink,
165 F: FnMut(Cow<'_, str>, &mut T) -> Next + Send + 'static,
166{
167 pub fn collect(options: LineParsingOptions, sink: T, f: F) -> Self {
174 Self::new(options, CollectLines::new(sink, f))
175 }
176}
177
178impl<T, F> ParseLines<CollectLinesAsync<T, F>>
179where
180 T: Sink,
181 F: for<'a> FnMut(
182 Cow<'a, str>,
183 &'a mut T,
184 ) -> std::pin::Pin<Box<dyn Future<Output = Next> + Send + 'a>>
185 + Send
186 + 'static,
187{
188 pub fn collect_async(options: LineParsingOptions, sink: T, f: F) -> Self {
195 Self::new(options, CollectLinesAsync::new(sink, f))
196 }
197}
198
199impl<S: LineVisitor> StreamVisitor for ParseLines<S> {
200 type Output = S::Output;
201
202 fn on_chunk(&mut self, chunk: Chunk) -> Next {
203 let Self {
204 parser,
205 options,
206 inner,
207 } = self;
208 let mut bytes: &[u8] = chunk.as_ref();
209 while let Some(line) = parser.next_line(&mut bytes, *options) {
210 if inner.on_line(line) == Next::Break {
211 return Next::Break;
212 }
213 }
214 Next::Continue
215 }
216
217 fn on_gap(&mut self) {
218 self.parser.on_gap();
219 self.inner.on_gap();
220 }
221
222 fn on_eof(&mut self) {
223 if let Some(line) = self.parser.finish() {
224 let _ = self.inner.on_line(line);
225 }
226 self.inner.on_eof();
227 }
228
229 fn into_output(self) -> Self::Output {
230 self.inner.into_output()
231 }
232}
233
234impl<S: AsyncLineVisitor> AsyncStreamVisitor for ParseLines<S> {
235 type Output = S::Output;
236
237 async fn on_chunk(&mut self, chunk: Chunk) -> Next {
238 let Self {
239 parser,
240 options,
241 inner,
242 } = self;
243 let mut bytes: &[u8] = chunk.as_ref();
244 loop {
245 let line = match parser.next_line(&mut bytes, *options) {
246 Some(line) => line.into_owned(),
247 None => return Next::Continue,
248 };
249 if inner.on_line(Cow::Owned(line)).await == Next::Break {
250 return Next::Break;
251 }
252 }
253 }
254
255 fn on_gap(&mut self) {
256 self.parser.on_gap();
257 self.inner.on_gap();
258 }
259
260 async fn on_eof(&mut self) {
261 let trailing = self.parser.finish().map(Cow::into_owned);
262 if let Some(line) = trailing {
263 let _ = self.inner.on_line(Cow::Owned(line)).await;
264 }
265 self.inner.on_eof().await;
266 }
267
268 fn into_output(self) -> Self::Output {
269 self.inner.into_output()
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::super::options::LineOverflowBehavior;
276 use super::*;
277 use crate::NumBytesExt;
278 use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
279 use crate::output_stream::event::StreamEvent;
280 use crate::output_stream::event::tests::event_receiver;
281 use assertr::prelude::*;
282 use bytes::Bytes;
283 use std::sync::{Arc, Mutex};
284
285 struct CollectingSink {
286 seen: Arc<Mutex<Vec<String>>>,
287 }
288
289 impl LineVisitor for CollectingSink {
290 type Output = ();
291
292 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
293 self.seen.lock().unwrap().push(line.into_owned());
294 Next::Continue
295 }
296
297 fn into_output(self) -> Self::Output {}
298 }
299
300 struct CollectingAsyncSink {
301 seen: Arc<Mutex<Vec<String>>>,
302 }
303
304 impl AsyncLineVisitor for CollectingAsyncSink {
305 type Output = ();
306
307 async fn on_line(&mut self, line: Cow<'_, str>) -> Next {
308 self.seen.lock().unwrap().push(line.into_owned());
309 Next::Continue
310 }
311
312 fn into_output(self) -> Self::Output {}
313 }
314
315 mod sync {
316 use super::*;
317
318 #[test]
319 #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
320 fn new_panics_when_max_line_length_is_zero() {
321 let _ = ParseLines::new(
322 LineParsingOptions {
323 max_line_length: 0.bytes(),
324 overflow_behavior: LineOverflowBehavior::default(),
325 buffer_compaction_threshold: None,
326 },
327 CollectingSink {
328 seen: Arc::new(Mutex::new(Vec::new())),
329 },
330 );
331 }
332
333 #[tokio::test]
334 async fn flushes_trailing_unterminated_line_at_eof() {
335 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
336 let consumer = spawn_consumer_sync(
337 "custom",
338 event_receiver(vec![
339 StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\nsec"))),
340 StreamEvent::Chunk(Chunk(Bytes::from_static(b"ond\nthird"))),
341 StreamEvent::Eof,
342 ])
343 .await,
344 ParseLines::new(
345 LineParsingOptions::default(),
346 CollectingSink {
347 seen: Arc::clone(&seen),
348 },
349 ),
350 );
351
352 consumer.wait().await.unwrap();
353 assert_that!(seen.lock().unwrap().clone())
354 .contains_exactly(["first", "second", "third"]);
355 }
356
357 #[tokio::test]
358 async fn gap_discards_partial_line() {
359 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
360 let consumer = spawn_consumer_sync(
361 "custom",
362 event_receiver(vec![
363 StreamEvent::Chunk(Chunk(Bytes::from_static(b"par"))),
364 StreamEvent::Gap,
365 StreamEvent::Chunk(Chunk(Bytes::from_static(b"tial\nclean\n"))),
366 StreamEvent::Eof,
367 ])
368 .await,
369 ParseLines::new(
370 LineParsingOptions::default(),
371 CollectingSink {
372 seen: Arc::clone(&seen),
373 },
374 ),
375 );
376
377 consumer.wait().await.unwrap();
378 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["clean"]);
379 }
380
381 #[tokio::test]
382 async fn break_from_inner_stops_parsing_immediately() {
383 struct StopAtSecondLine {
384 seen: Arc<Mutex<Vec<String>>>,
385 count: usize,
386 }
387
388 impl LineVisitor for StopAtSecondLine {
389 type Output = ();
390 fn on_line(&mut self, line: Cow<'_, str>) -> Next {
391 self.count += 1;
392 self.seen.lock().unwrap().push(line.into_owned());
393 if self.count == 2 {
394 Next::Break
395 } else {
396 Next::Continue
397 }
398 }
399 fn into_output(self) -> Self::Output {}
400 }
401
402 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
403 let consumer = spawn_consumer_sync(
404 "custom",
405 event_receiver(vec![
406 StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\nc\nd\n"))),
407 StreamEvent::Eof,
408 ])
409 .await,
410 ParseLines::new(
411 LineParsingOptions::default(),
412 StopAtSecondLine {
413 seen: Arc::clone(&seen),
414 count: 0,
415 },
416 ),
417 );
418
419 consumer.wait().await.unwrap();
420 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["a", "b"]);
421 }
422 }
423
424 mod r#async {
425 use super::*;
426
427 #[test]
428 #[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
429 fn new_panics_when_max_line_length_is_zero() {
430 let _ = ParseLines::new(
431 LineParsingOptions {
432 max_line_length: 0.bytes(),
433 overflow_behavior: LineOverflowBehavior::default(),
434 buffer_compaction_threshold: None,
435 },
436 CollectingAsyncSink {
437 seen: Arc::new(Mutex::new(Vec::new())),
438 },
439 );
440 }
441
442 #[tokio::test]
443 async fn flushes_trailing_unterminated_line_at_eof() {
444 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
445 let consumer = spawn_consumer_async(
446 "custom",
447 event_receiver(vec![
448 StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\ntail"))),
449 StreamEvent::Eof,
450 ])
451 .await,
452 ParseLines::new(
453 LineParsingOptions::default(),
454 CollectingAsyncSink {
455 seen: Arc::clone(&seen),
456 },
457 ),
458 );
459
460 consumer.wait().await.unwrap();
461 assert_that!(seen.lock().unwrap().clone()).contains_exactly(["first", "tail"]);
462 }
463 }
464}