tokio_process_tools/output_stream/
broadcast.rs

1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::error::OutputError;
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use crate::{InspectorError, NumBytes};
11use std::borrow::Cow;
12use std::fmt::{Debug, Formatter};
13use std::future::Future;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
17use tokio::sync::broadcast::error::RecvError;
18use tokio::sync::{RwLock, broadcast};
19use tokio::task::JoinHandle;
20
21/// The output stream from a process. Either representing stdout or stderr.
22///
23/// This is the broadcast variant, allowing for multiple simultaneous consumers with the downside
24/// of inducing memory allocations not required when only one consumer is listening.
25/// For that case, prefer using the
26/// [crate::output_stream::single_subscriber::SingleSubscriberOutputStream].
27pub struct BroadcastOutputStream {
28    /// The task that captured a clone of our `broadcast::Sender` and is now asynchronously
29    /// awaiting new output from the underlying stream, sending it to all registered receivers.
30    stream_reader: JoinHandle<()>,
31
32    /// We only store the chunk sender. The initial receiver is dropped immediately after creating
33    /// the channel.
34    /// New subscribers can be created from this sender.
35    sender: broadcast::Sender<Option<Chunk>>,
36
37    /// The maximum size of every chunk read by the backing `stream_reader`.
38    chunk_size: NumBytes,
39
40    /// The maximum capacity of the channel caching the chunks before being processed.
41    max_channel_capacity: usize,
42
43    /// Name of this stream.
44    name: &'static str,
45}
46
47impl OutputStream for BroadcastOutputStream {
48    fn chunk_size(&self) -> NumBytes {
49        self.chunk_size
50    }
51
52    fn channel_capacity(&self) -> usize {
53        self.max_channel_capacity
54    }
55
56    fn name(&self) -> &'static str {
57        self.name
58    }
59}
60
61impl Drop for BroadcastOutputStream {
62    fn drop(&mut self) {
63        self.stream_reader.abort();
64    }
65}
66
67impl Debug for BroadcastOutputStream {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("BroadcastOutputStream")
70            .field("output_collector", &"non-debug < JoinHandle<()> >")
71            .field(
72                "sender",
73                &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
74            )
75            .finish()
76    }
77}
78
79/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
80/// Every chunk sent into `sender` is a frozen slice of that buffer.
81/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
82async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
83    mut read: B,
84    chunk_size: NumBytes,
85    sender: broadcast::Sender<Option<Chunk>>,
86) {
87    let send_chunk = move |chunk: Option<Chunk>| {
88        // When we could not send the chunk, we get it back in the error value and
89        // then drop it. This means that the BytesMut storage portion of that chunk
90        // is now reclaimable and can be used for storing the next chunk of incoming
91        // bytes.
92        match sender.send(chunk) {
93            Ok(_received_by) => {}
94            Err(err) => {
95                // No receivers: All already dropped or none was yet created.
96                // We intentionally ignore these errors.
97                // If they occur, the user just wasn't interested in seeing this chunk.
98                // We won't store it (history) to later feed it back to a new subscriber.
99                tracing::debug!(
100                    error = %err,
101                    "No active receivers for the output chunk, dropping it"
102                );
103            }
104        }
105    };
106
107    // A BytesMut may grow when used in a `read_buf` call.
108    let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
109    loop {
110        let _ = buf.try_reclaim(chunk_size.bytes());
111        match read.read_buf(&mut buf).await {
112            Ok(bytes_read) => {
113                let is_eof = bytes_read == 0;
114
115                match is_eof {
116                    true => send_chunk(None),
117                    false => {
118                        while !buf.is_empty() {
119                            // Split of at least `chunk_size` bytes and send it, even if we were
120                            // able to read more than `chunk_size` bytes.
121                            // We could have read more
122                            let split_to = usize::min(chunk_size.bytes(), buf.len());
123                            // Splitting off bytes reduces the remaining capacity of our BytesMut.
124                            // It might have now reached a capacity of 0. But this is fine!
125                            // The next usage of it in `read_buf` will not return `0`, as you may
126                            // expect from the read_buf documentation. The BytesMut will grow
127                            // to allow buffering of more data.
128                            //
129                            // NOTE: If we only split of at max `chunk_size` bytes, we have to repeat
130                            // this, unless all data is processed.
131                            send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
132                        }
133                    }
134                };
135
136                if is_eof {
137                    break;
138                }
139            }
140            Err(err) => panic!("Could not read from stream: {err}"),
141        }
142    }
143}
144
145impl BroadcastOutputStream {
146    pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
147        stream: S,
148        stream_name: &'static str,
149        options: FromStreamOptions,
150    ) -> BroadcastOutputStream {
151        let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
152        drop(receiver);
153
154        let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
155
156        BroadcastOutputStream {
157            stream_reader,
158            sender,
159            chunk_size: options.chunk_size,
160            max_channel_capacity: options.channel_capacity,
161            name: stream_name,
162        }
163    }
164
165    fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
166        self.sender.subscribe()
167    }
168}
169
170// Expected types:
171// loop_label: &'static str
172// receiver: tokio::sync::broadcast::Receiver<Option<Chunk>>,
173// term_rx: tokio::sync::oneshot::Receiver<()>,
174macro_rules! handle_subscription {
175    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
176        $loop_label: loop {
177            tokio::select! {
178                out = $receiver.recv() => {
179                    match out {
180                        Ok(maybe_chunk) => {
181                            let $chunk = maybe_chunk;
182                            $body
183                        }
184                        Err(RecvError::Closed) => {
185                            // All senders have been dropped.
186                            break $loop_label;
187                        },
188                        Err(RecvError::Lagged(lagged)) => {
189                            tracing::warn!(lagged, "Inspector is lagging behind");
190                        }
191                    }
192                }
193                _msg = &mut $term_rx => break $loop_label,
194            }
195        }
196    };
197}
198
199// Impls for inspecting the output of the stream.
200impl BroadcastOutputStream {
201    /// Inspects chunks of output from the stream without storing them.
202    ///
203    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
204    /// processing or [`Next::Break`] to stop.
205    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
206    pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
207        let mut receiver = self.subscribe();
208        impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
209    }
210
211    /// Inspects lines of output from the stream without storing them.
212    ///
213    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
214    /// processing or [`Next::Break`] to stop.
215    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
216    pub fn inspect_lines(
217        &self,
218        mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
219        options: LineParsingOptions,
220    ) -> Inspector {
221        let mut receiver = self.subscribe();
222        impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
223    }
224
225    /// Inspects lines of output from the stream without storing them, using an async closure.
226    ///
227    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
228    /// processing or [`Next::Break`] to stop.
229    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
230    pub fn inspect_lines_async<Fut>(
231        &self,
232        mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
233        options: LineParsingOptions,
234    ) -> Inspector
235    where
236        Fut: Future<Output = Next> + Send,
237    {
238        let mut receiver = self.subscribe();
239        impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
240    }
241}
242
243// Impls for collecting the output of the stream.
244impl BroadcastOutputStream {
245    /// Collects chunks from the stream into a sink.
246    ///
247    /// The provided closure is called for each chunk, with mutable access to the sink.
248    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
249    pub fn collect_chunks<S: Sink>(
250        &self,
251        into: S,
252        mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
253    ) -> Collector<S> {
254        let sink = Arc::new(RwLock::new(into));
255        let mut receiver = self.subscribe();
256        impl_collect_chunks!(self.name(), receiver, collect, sink, handle_subscription)
257    }
258
259    /// Collects chunks from the stream into a sink using an async closure.
260    ///
261    /// The provided async closure is called for each chunk, with mutable access to the sink.
262    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
263    pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
264    where
265        S: Sink,
266        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
267    {
268        let sink = Arc::new(RwLock::new(into));
269        let mut receiver = self.subscribe();
270        impl_collect_chunks_async!(self.name(), receiver, collect, sink, handle_subscription)
271    }
272
273    /// Collects lines from the stream into a sink.
274    ///
275    /// The provided closure is called for each line, with mutable access to the sink.
276    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
277    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
278    pub fn collect_lines<S: Sink>(
279        &self,
280        into: S,
281        mut collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
282        options: LineParsingOptions,
283    ) -> Collector<S> {
284        let sink = Arc::new(RwLock::new(into));
285        let mut receiver = self.subscribe();
286        impl_collect_lines!(
287            self.name(),
288            receiver,
289            collect,
290            options,
291            sink,
292            handle_subscription
293        )
294    }
295
296    /// Collects lines from the stream into a sink using an async closure.
297    ///
298    /// The provided async closure is called for each line, with mutable access to the sink.
299    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
300    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
301    pub fn collect_lines_async<S, F>(
302        &self,
303        into: S,
304        collect: F,
305        options: LineParsingOptions,
306    ) -> Collector<S>
307    where
308        S: Sink,
309        F: for<'a> Fn(Cow<'a, str>, &'a mut S) -> AsyncCollectFn<'a> + Send + Sync + 'static,
310    {
311        let sink = Arc::new(RwLock::new(into));
312        let mut receiver = self.subscribe();
313        impl_collect_lines_async!(
314            self.name(),
315            receiver,
316            collect,
317            options,
318            sink,
319            handle_subscription
320        )
321    }
322
323    /// Convenience method to collect all chunks into a `Vec<u8>`.
324    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
325    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
326        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
327    }
328
329    /// Convenience method to collect all lines into a `Vec<String>`.
330    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
331    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
332        self.collect_lines(
333            Vec::new(),
334            |line, vec| {
335                vec.push(line.into_owned());
336                Next::Continue
337            },
338            options,
339        )
340    }
341
342    /// Collects chunks into an async writer.
343    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
344    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
345        &self,
346        write: W,
347    ) -> Collector<W> {
348        self.collect_chunks_async(write, move |chunk, write| {
349            Box::pin(async move {
350                if let Err(err) = write.write_all(chunk.as_ref()).await {
351                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
352                };
353                Next::Continue
354            })
355        })
356    }
357
358    /// Collects lines into an async writer.
359    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
360    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
361        &self,
362        write: W,
363        options: LineParsingOptions,
364    ) -> Collector<W> {
365        self.collect_lines_async(
366            write,
367            move |line, write| {
368                Box::pin(async move {
369                    if let Err(err) = write.write_all(line.as_bytes()).await {
370                        tracing::warn!("Could not write line to write sink: {err:#?}");
371                    };
372                    Next::Continue
373                })
374            },
375            options,
376        )
377    }
378
379    /// Collects chunks into an async writer after mapping them with the provided function.
380    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
381    pub fn collect_chunks_into_write_mapped<
382        W: Sink + AsyncWriteExt + Unpin,
383        B: AsRef<[u8]> + Send,
384    >(
385        &self,
386        write: W,
387        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
388    ) -> Collector<W> {
389        self.collect_chunks_async(write, move |chunk, write| {
390            Box::pin(async move {
391                let mapped = mapper(chunk);
392                let mapped = mapped.as_ref();
393                if let Err(err) = write.write_all(mapped).await {
394                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
395                };
396                Next::Continue
397            })
398        })
399    }
400
401    /// Collects lines into an async writer after mapping them with the provided function.
402    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
403    pub fn collect_lines_into_write_mapped<
404        W: Sink + AsyncWriteExt + Unpin,
405        B: AsRef<[u8]> + Send,
406    >(
407        &self,
408        write: W,
409        mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
410        options: LineParsingOptions,
411    ) -> Collector<W> {
412        self.collect_lines_async(
413            write,
414            move |line, write| {
415                Box::pin(async move {
416                    let mapped = mapper(line);
417                    let mapped = mapped.as_ref();
418                    if let Err(err) = write.write_all(mapped).await {
419                        tracing::warn!("Could not write line to write sink: {err:#?}");
420                    };
421                    Next::Continue
422                })
423            },
424            options,
425        )
426    }
427}
428
429// Impls for waiting for a specific line of output.
430impl BroadcastOutputStream {
431    /// Waits for a line that matches the given predicate.
432    ///
433    /// This method blocks until a line is found that satisfies the predicate.
434    pub async fn wait_for_line(
435        &self,
436        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
437        options: LineParsingOptions,
438    ) -> Result<(), InspectorError> {
439        let inspector = self.inspect_lines(
440            move |line| {
441                if predicate(line) {
442                    Next::Break
443                } else {
444                    Next::Continue
445                }
446            },
447            options,
448        );
449        inspector.wait().await
450    }
451
452    /// Waits for a line that matches the given predicate, with a timeout.
453    ///
454    /// Returns `Ok(())` if a matching line is found, or `Err(OutputError::Timeout)` if the timeout expires.
455    pub async fn wait_for_line_with_timeout(
456        &self,
457        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
458        options: LineParsingOptions,
459        timeout: Duration,
460    ) -> Result<(), OutputError> {
461        tokio::time::timeout(timeout, self.wait_for_line(predicate, options))
462            .await
463            .map_err(|_elapsed| OutputError::Timeout {
464                stream_name: self.name(),
465                timeout,
466            })
467            .and_then(|res| res.map_err(OutputError::InspectorFailed))
468    }
469}
470
471/// Configuration for line parsing behavior.
472pub struct LineConfig {
473    // Existing fields
474    // ...
475    /// Maximum length of a single line in bytes.
476    /// When reached, the current line will be emitted.
477    /// A value of 0 means no limit (default).
478    pub max_line_length: usize,
479}
480
481#[cfg(test)]
482mod tests {
483    use super::read_chunked;
484    use crate::NumBytesExt;
485    use crate::output_stream::broadcast::BroadcastOutputStream;
486    use crate::output_stream::tests::write_test_data;
487    use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
488    use assertr::assert_that;
489    use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
490    use mockall::*;
491    use std::io::Read;
492    use std::io::Seek;
493    use std::io::SeekFrom;
494    use std::io::Write;
495    use std::time::Duration;
496    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
497    use tokio::sync::broadcast;
498    use tokio::time::sleep;
499    use tracing_test::traced_test;
500
501    #[tokio::test]
502    #[traced_test]
503    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
504    {
505        let (read_half, mut write_half) = tokio::io::duplex(64);
506        let (tx, mut rx) = broadcast::channel(10);
507
508        // Let's preemptively write more data into the stream than our later selected chunk size (2)
509        // can handle, forcing the initial read to completely fill our chunk buffer.
510        // Our expectation is that we still receive all data written here through multiple
511        // consecutive reads.
512        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
513        // full buffer of, must not prevent this from happening!
514        write_half.write_all(b"hello world").await.unwrap();
515        write_half.flush().await.unwrap();
516
517        let stream_reader = tokio::spawn(read_chunked(read_half, 2.bytes(), tx));
518
519        drop(write_half); // This closes the stream and should let stream_reader terminate.
520        stream_reader.await.unwrap();
521
522        let mut chunks = Vec::<String>::new();
523        while let Ok(Some(chunk)) = rx.recv().await {
524            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
525        }
526        assert_that(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
527    }
528
529    #[tokio::test]
530    #[traced_test]
531    async fn read_chunked_no_data() {
532        let (read_half, write_half) = tokio::io::duplex(64);
533        let (tx, mut rx) = broadcast::channel(10);
534
535        let stream_reader = tokio::spawn(read_chunked(read_half, 2.bytes(), tx));
536
537        drop(write_half); // This closes the stream and should let stream_reader terminate.
538        stream_reader.await.unwrap();
539
540        let mut chunks = Vec::<String>::new();
541        while let Ok(Some(chunk)) = rx.recv().await {
542            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
543        }
544        assert_that(chunks).is_empty();
545    }
546
547    #[tokio::test]
548    #[traced_test]
549    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
550        let (read_half, mut write_half) = tokio::io::duplex(64);
551        let os = BroadcastOutputStream::from_stream(
552            read_half,
553            "custom",
554            FromStreamOptions {
555                channel_capacity: 2,
556                ..Default::default()
557            },
558        );
559
560        let consumer = os.inspect_lines_async(
561            |_line| async move {
562                // Mimic a slow consumer.
563                sleep(Duration::from_millis(100)).await;
564                Next::Continue
565            },
566            LineParsingOptions::default(),
567        );
568
569        #[rustfmt::skip]
570        let producer = tokio::spawn(async move {
571            for count in 1..=15 {
572                write_half
573                    .write_all(format!("{count}\n").as_bytes())
574                    .await
575                    .unwrap();
576                sleep(Duration::from_millis(25)).await;
577            }
578            write_half.flush().await.unwrap();
579            drop(write_half);
580        });
581
582        producer.await.unwrap();
583        consumer.wait().await.unwrap();
584        drop(os);
585
586        logs_assert(|lines: &[&str]| {
587            match lines
588                .iter()
589                .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
590                .count()
591            {
592                1 => {}
593                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
594            };
595            match lines
596                .iter()
597                .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
598                .count()
599            {
600                2 => {}
601                3 => {}
602                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
603            };
604            Ok(())
605        });
606    }
607
608    #[tokio::test]
609    async fn inspect_lines() {
610        let (read_half, write_half) = tokio::io::duplex(64);
611        let os =
612            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
613
614        #[automock]
615        trait LineVisitor {
616            fn visit(&self, line: String);
617        }
618
619        let mut mock = MockLineVisitor::new();
620        #[rustfmt::skip]
621        fn configure(mock: &mut MockLineVisitor) {
622            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
623            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
624            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
625            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
626            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
627        }
628        configure(&mut mock);
629
630        let inspector = os.inspect_lines(
631            move |line| {
632                mock.visit(line.into_owned());
633                Next::Continue
634            },
635            LineParsingOptions::default(),
636        );
637
638        tokio::spawn(write_test_data(write_half)).await.unwrap();
639
640        inspector.cancel().await.unwrap();
641        drop(os)
642    }
643
644    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
645    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
646    /// would not terminate the collector!
647    #[tokio::test]
648    #[traced_test]
649    async fn inspect_lines_async() {
650        let (read_half, mut write_half) = tokio::io::duplex(64);
651        let os = BroadcastOutputStream::from_stream(
652            read_half,
653            "custom",
654            FromStreamOptions {
655                chunk_size: 32.bytes(),
656                ..Default::default()
657            },
658        );
659
660        let seen: Vec<String> = Vec::new();
661        let collector = os.collect_lines_async(
662            seen,
663            move |line, seen: &mut Vec<String>| {
664                Box::pin(async move {
665                    if line == "break" {
666                        seen.push(line.into_owned());
667                        Next::Break
668                    } else {
669                        seen.push(line.into_owned());
670                        Next::Continue
671                    }
672                })
673            },
674            LineParsingOptions::default(),
675        );
676
677        let _writer = tokio::spawn(async move {
678            write_half.write_all("start\n".as_bytes()).await.unwrap();
679            write_half.write_all("break\n".as_bytes()).await.unwrap();
680            write_half.write_all("end\n".as_bytes()).await.unwrap();
681
682            loop {
683                write_half
684                    .write_all("gibberish\n".as_bytes())
685                    .await
686                    .unwrap();
687                tokio::time::sleep(Duration::from_millis(50)).await;
688            }
689        });
690
691        let seen = collector.wait().await.unwrap();
692
693        assert_that(seen).contains_exactly(["start", "break"]);
694    }
695
696    #[tokio::test]
697    async fn collect_lines_to_file() {
698        let (read_half, write_half) = tokio::io::duplex(64);
699        let os = BroadcastOutputStream::from_stream(
700            read_half,
701            "custom",
702            FromStreamOptions {
703                channel_capacity: 32,
704                ..Default::default()
705            },
706        );
707
708        let temp_file = tempfile::tempfile().unwrap();
709        let collector = os.collect_lines(
710            temp_file,
711            |line, temp_file| {
712                writeln!(temp_file, "{}", line).unwrap();
713                Next::Continue
714            },
715            LineParsingOptions::default(),
716        );
717
718        tokio::spawn(write_test_data(write_half)).await.unwrap();
719
720        let mut temp_file = collector.cancel().await.unwrap();
721        temp_file.seek(SeekFrom::Start(0)).unwrap();
722        let mut contents = String::new();
723        temp_file.read_to_string(&mut contents).unwrap();
724
725        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
726    }
727
728    #[tokio::test]
729    async fn collect_lines_async_to_file() {
730        let (read_half, write_half) = tokio::io::duplex(64);
731        let os = BroadcastOutputStream::from_stream(
732            read_half,
733            "custom",
734            FromStreamOptions {
735                chunk_size: 32.bytes(),
736                ..Default::default()
737            },
738        );
739
740        let temp_file = tempfile::tempfile().unwrap();
741        let collector = os.collect_lines_async(
742            temp_file,
743            |line, temp_file| {
744                Box::pin(async move {
745                    writeln!(temp_file, "{}", line).unwrap();
746                    Next::Continue
747                })
748            },
749            LineParsingOptions::default(),
750        );
751
752        tokio::spawn(write_test_data(write_half)).await.unwrap();
753
754        let mut temp_file = collector.cancel().await.unwrap();
755        temp_file.seek(SeekFrom::Start(0)).unwrap();
756        let mut contents = String::new();
757        temp_file.read_to_string(&mut contents).unwrap();
758
759        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
760    }
761
762    #[tokio::test]
763    #[traced_test]
764    async fn collect_chunks_into_write_mapped() {
765        let (read_half, write_half) = tokio::io::duplex(64);
766        let os = BroadcastOutputStream::from_stream(
767            read_half,
768            "custom",
769            FromStreamOptions {
770                chunk_size: 32.bytes(),
771                ..Default::default()
772            },
773        );
774
775        let temp_file = tokio::fs::File::options()
776            .create(true)
777            .truncate(true)
778            .write(true)
779            .read(true)
780            .open(std::env::temp_dir().join(
781                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
782            ))
783            .await
784            .unwrap();
785
786        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
787            String::from_utf8_lossy(chunk.as_ref()).to_string()
788        });
789
790        tokio::spawn(write_test_data(write_half)).await.unwrap();
791
792        let mut temp_file = collector.cancel().await.unwrap();
793        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
794        let mut contents = String::new();
795        temp_file.read_to_string(&mut contents).await.unwrap();
796
797        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
798    }
799
800    #[tokio::test]
801    #[traced_test]
802    async fn collect_chunks_into_write_in_parallel() {
803        // Big enough to hold any individual test write that we perform.
804        let (read_half, write_half) = tokio::io::duplex(64);
805
806        let os = BroadcastOutputStream::from_stream(
807            read_half,
808            "custom",
809            FromStreamOptions {
810                // Big enough to hold any individual test write that we perform.
811                // Actual chunks will be smaller.
812                chunk_size: 32.bytes(),
813                channel_capacity: 2,
814            },
815        );
816
817        let file1 = tokio::fs::File::options()
818            .create(true)
819            .truncate(true)
820            .write(true)
821            .read(true)
822            .open(
823                std::env::temp_dir()
824                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
825            )
826            .await
827            .unwrap();
828        let file2 = tokio::fs::File::options()
829            .create(true)
830            .truncate(true)
831            .write(true)
832            .read(true)
833            .open(
834                std::env::temp_dir()
835                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
836            )
837            .await
838            .unwrap();
839
840        let collector1 = os.collect_chunks_into_write(file1);
841        let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
842            format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
843        });
844
845        tokio::spawn(write_test_data(write_half)).await.unwrap();
846
847        let mut temp_file1 = collector1.cancel().await.unwrap();
848        temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
849        let mut contents = String::new();
850        temp_file1.read_to_string(&mut contents).await.unwrap();
851        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
852
853        let mut temp_file2 = collector2.cancel().await.unwrap();
854        temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
855        let mut contents = String::new();
856        temp_file2.read_to_string(&mut contents).await.unwrap();
857        assert_that(contents)
858            .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
859    }
860}