tokio_process_tools/output_stream/
broadcast.rs

1use crate::InspectorError;
2use crate::collector::{AsyncCollectFn, Collector, Sink};
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use std::fmt::{Debug, Formatter};
11use std::future::Future;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
15use tokio::sync::broadcast::error::RecvError;
16use tokio::sync::{RwLock, broadcast};
17use tokio::task::JoinHandle;
18use tokio::time::error::Elapsed;
19
20/// The output stream from a process. Either representing stdout or stderr.
21///
22/// This is the broadcast variant, allowing for multiple simultaneous consumers with the downside
23/// of inducing memory allocations not required when only one consumer is listening.
24/// For that case, prefer using the
25/// [crate::output_stream::single_subscriber::SingleSubscriberOutputStream].
26pub struct BroadcastOutputStream {
27    /// The task that captured a clone of our `broadcast::Sender` and is now asynchronously
28    /// awaiting new output from the underlying stream, sending it to all registered receivers.
29    stream_reader: JoinHandle<()>,
30
31    /// We only store the chunk sender. The initial receiver is dropped immediately after creating
32    /// the channel.
33    /// New subscribers can be created from this sender.
34    sender: broadcast::Sender<Option<Chunk>>,
35}
36
37impl OutputStream for BroadcastOutputStream {}
38
39impl Drop for BroadcastOutputStream {
40    fn drop(&mut self) {
41        self.stream_reader.abort();
42    }
43}
44
45impl Debug for BroadcastOutputStream {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("BroadcastOutputStream")
48            .field("output_collector", &"non-debug < JoinHandle<()> >")
49            .field(
50                "sender",
51                &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
52            )
53            .finish()
54    }
55}
56
57/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
58/// Every chunk sent into `sender` is a frozen slice of that buffer.
59/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
60async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
61    mut read: B,
62    chunk_size: usize,
63    sender: broadcast::Sender<Option<Chunk>>,
64) {
65    let send_chunk = move |chunk: Option<Chunk>| {
66        // When we could not send the chunk, we get it back in the error value and
67        // then drop it. This means that the BytesMut storage portion of that chunk
68        // is now reclaimable and can be used for storing the next chunk of incoming
69        // bytes.
70        match sender.send(chunk) {
71            Ok(_received_by) => {}
72            Err(err) => {
73                // No receivers: All already dropped or none was yet created.
74                // We intentionally ignore these errors.
75                // If they occur, the user just wasn't interested in seeing this chunk.
76                // We won't store it (history) to later feed it back to a new subscriber.
77                tracing::debug!(
78                    error = %err,
79                    "No active receivers for the output chunk, dropping it"
80                );
81            }
82        }
83    };
84
85    // A BytesMut may grow when used in a `read_buf` call.
86    let mut buf = bytes::BytesMut::with_capacity(chunk_size);
87    loop {
88        let _ = buf.try_reclaim(chunk_size);
89        match read.read_buf(&mut buf).await {
90            Ok(bytes_read) => {
91                let is_eof = bytes_read == 0;
92
93                match is_eof {
94                    true => send_chunk(None),
95                    false => {
96                        while !buf.is_empty() {
97                            // Split of at least `chunk_size` bytes and send it, even if we were
98                            // able to read more than `chunk_size` bytes.
99                            // We could have read more
100                            let split_to = usize::min(chunk_size, buf.len());
101                            // Splitting off bytes reduces the remaining capacity of our BytesMut.
102                            // It might have now reached a capacity of 0. But this is fine!
103                            // The next usage of it in `read_buf` will not return `0`, as you may
104                            // expect from the read_buf documentation. The BytesMut will grow
105                            // to allow buffering of more data.
106                            //
107                            // NOTE: If we only split of at max `chunk_size` bytes, we have to repeat
108                            // this, unless all data is processed.
109                            send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
110                        }
111                    }
112                };
113
114                if is_eof {
115                    break;
116                }
117            }
118            Err(err) => panic!("Could not read from stream: {err}"),
119        }
120    }
121}
122
123impl BroadcastOutputStream {
124    pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
125        stream: S,
126        options: FromStreamOptions,
127    ) -> BroadcastOutputStream {
128        let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
129        drop(receiver);
130
131        let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
132
133        BroadcastOutputStream {
134            stream_reader,
135            sender,
136        }
137    }
138
139    fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
140        self.sender.subscribe()
141    }
142}
143
144// Expected types:
145// loop_label: &'static str
146// receiver: tokio::sync::broadcast::Receiver<Option<Chunk>>,
147// term_rx: tokio::sync::oneshot::Receiver<()>,
148macro_rules! handle_subscription {
149    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
150        $loop_label: loop {
151            tokio::select! {
152                out = $receiver.recv() => {
153                    match out {
154                        Ok(maybe_chunk) => {
155                            let $chunk = maybe_chunk;
156                            $body
157                        }
158                        Err(RecvError::Closed) => {
159                            // All senders have been dropped.
160                            break $loop_label;
161                        },
162                        Err(RecvError::Lagged(lagged)) => {
163                            tracing::warn!(lagged, "Inspector is lagging behind");
164                        }
165                    }
166                }
167                _msg = &mut $term_rx => break $loop_label,
168            }
169        }
170    };
171}
172
173// Impls for inspecting the output of the stream.
174impl BroadcastOutputStream {
175    /// Inspects chunks of output from the stream without storing them.
176    ///
177    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
178    /// processing or [`Next::Break`] to stop.
179    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
180    pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
181        let mut receiver = self.subscribe();
182        impl_inspect_chunks!(receiver, f, handle_subscription)
183    }
184
185    /// Inspects lines of output from the stream without storing them.
186    ///
187    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
188    /// processing or [`Next::Break`] to stop.
189    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
190    pub fn inspect_lines(
191        &self,
192        mut f: impl FnMut(String) -> Next + Send + 'static,
193        options: LineParsingOptions,
194    ) -> Inspector {
195        let mut receiver = self.subscribe();
196        impl_inspect_lines!(receiver, f, options, handle_subscription)
197    }
198
199    /// Inspects lines of output from the stream without storing them, using an async closure.
200    ///
201    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
202    /// processing or [`Next::Break`] to stop.
203    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
204    pub fn inspect_lines_async<Fut>(
205        &self,
206        mut f: impl FnMut(String) -> Fut + Send + 'static,
207        options: LineParsingOptions,
208    ) -> Inspector
209    where
210        Fut: Future<Output = Next> + Send,
211    {
212        let mut receiver = self.subscribe();
213        impl_inspect_lines_async!(receiver, f, options, handle_subscription)
214    }
215}
216
217// Impls for collecting the output of the stream.
218impl BroadcastOutputStream {
219    /// Collects chunks from the stream into a sink.
220    ///
221    /// The provided closure is called for each chunk, with mutable access to the sink.
222    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
223    pub fn collect_chunks<S: Sink>(
224        &self,
225        into: S,
226        mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
227    ) -> Collector<S> {
228        let sink = Arc::new(RwLock::new(into));
229        let mut receiver = self.subscribe();
230        impl_collect_chunks!(receiver, collect, sink, handle_subscription)
231    }
232
233    /// Collects chunks from the stream into a sink using an async closure.
234    ///
235    /// The provided async closure is called for each chunk, with mutable access to the sink.
236    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
237    pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
238    where
239        S: Sink,
240        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
241    {
242        let sink = Arc::new(RwLock::new(into));
243        let mut receiver = self.subscribe();
244        impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
245    }
246
247    /// Collects lines from the stream into a sink.
248    ///
249    /// The provided closure is called for each line, with mutable access to the sink.
250    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
251    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
252    pub fn collect_lines<S: Sink>(
253        &self,
254        into: S,
255        mut collect: impl FnMut(String, &mut S) -> Next + Send + 'static,
256        options: LineParsingOptions,
257    ) -> Collector<S> {
258        let sink = Arc::new(RwLock::new(into));
259        let mut receiver = self.subscribe();
260        impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
261    }
262
263    /// Collects lines from the stream into a sink using an async closure.
264    ///
265    /// The provided async closure is called for each line, with mutable access to the sink.
266    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
267    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
268    pub fn collect_lines_async<S, F>(
269        &self,
270        into: S,
271        collect: F,
272        options: LineParsingOptions,
273    ) -> Collector<S>
274    where
275        S: Sink,
276        F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
277    {
278        let sink = Arc::new(RwLock::new(into));
279        let mut receiver = self.subscribe();
280        impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
281    }
282
283    /// Convenience method to collect all chunks into a `Vec<u8>`.
284    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
285    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
286        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
287    }
288
289    /// Convenience method to collect all lines into a `Vec<String>`.
290    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
291    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
292        self.collect_lines(
293            Vec::new(),
294            |line, vec| {
295                vec.push(line);
296                Next::Continue
297            },
298            options,
299        )
300    }
301
302    /// Collects chunks into an async writer.
303    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
304    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
305        &self,
306        write: W,
307    ) -> Collector<W> {
308        self.collect_chunks_async(write, move |chunk, write| {
309            Box::pin(async move {
310                if let Err(err) = write.write_all(chunk.as_ref()).await {
311                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
312                };
313                Next::Continue
314            })
315        })
316    }
317
318    /// Collects lines into an async writer.
319    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
320    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
321        &self,
322        write: W,
323        options: LineParsingOptions,
324    ) -> Collector<W> {
325        self.collect_lines_async(
326            write,
327            move |line, write| {
328                Box::pin(async move {
329                    if let Err(err) = write.write_all(line.as_bytes()).await {
330                        tracing::warn!("Could not write line to write sink: {err:#?}");
331                    };
332                    Next::Continue
333                })
334            },
335            options,
336        )
337    }
338
339    /// Collects chunks into an async writer after mapping them with the provided function.
340    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
341    pub fn collect_chunks_into_write_mapped<
342        W: Sink + AsyncWriteExt + Unpin,
343        B: AsRef<[u8]> + Send,
344    >(
345        &self,
346        write: W,
347        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
348    ) -> Collector<W> {
349        self.collect_chunks_async(write, move |chunk, write| {
350            Box::pin(async move {
351                let mapped = mapper(chunk);
352                let mapped = mapped.as_ref();
353                if let Err(err) = write.write_all(mapped).await {
354                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
355                };
356                Next::Continue
357            })
358        })
359    }
360
361    /// Collects lines into an async writer after mapping them with the provided function.
362    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
363    pub fn collect_lines_into_write_mapped<
364        W: Sink + AsyncWriteExt + Unpin,
365        B: AsRef<[u8]> + Send,
366    >(
367        &self,
368        write: W,
369        mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
370        options: LineParsingOptions,
371    ) -> Collector<W> {
372        self.collect_lines_async(
373            write,
374            move |line, write| {
375                Box::pin(async move {
376                    let mapped = mapper(line);
377                    let mapped = mapped.as_ref();
378                    if let Err(err) = write.write_all(mapped).await {
379                        tracing::warn!("Could not write line to write sink: {err:#?}");
380                    };
381                    Next::Continue
382                })
383            },
384            options,
385        )
386    }
387}
388
389// Impls for waiting for a specific line of output.
390impl BroadcastOutputStream {
391    /// Waits for a line that matches the given predicate.
392    ///
393    /// This method blocks until a line is found that satisfies the predicate.
394    pub async fn wait_for_line(
395        &self,
396        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
397        options: LineParsingOptions,
398    ) {
399        let inspector = self.inspect_lines(
400            move |line| {
401                if predicate(line) {
402                    Next::Break
403                } else {
404                    Next::Continue
405                }
406            },
407            options,
408        );
409        match inspector.wait().await {
410            Ok(()) => {}
411            Err(err) => match err {
412                InspectorError::TaskJoin(join_error) => {
413                    panic!("Inspector task join error: {join_error:#?}");
414                }
415            },
416        };
417    }
418
419    /// Waits for a line that matches the given predicate, with a timeout.
420    ///
421    /// Returns `Ok(())` if a matching line is found, or `Err(Elapsed)` if the timeout expires.
422    pub async fn wait_for_line_with_timeout(
423        &self,
424        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
425        options: LineParsingOptions,
426        timeout: Duration,
427    ) -> Result<(), Elapsed> {
428        tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
429    }
430}
431
432/// Configuration for line parsing behavior.
433pub struct LineConfig {
434    // Existing fields
435    // ...
436    /// Maximum length of a single line in bytes.
437    /// When reached, the current line will be emitted.
438    /// A value of 0 means no limit (default).
439    pub max_line_length: usize,
440}
441
442#[cfg(test)]
443mod tests {
444    use super::read_chunked;
445    use crate::output_stream::broadcast::BroadcastOutputStream;
446    use crate::output_stream::tests::write_test_data;
447    use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
448    use assertr::assert_that;
449    use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
450    use mockall::*;
451    use std::io::Read;
452    use std::io::Seek;
453    use std::io::SeekFrom;
454    use std::io::Write;
455    use std::time::Duration;
456    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
457    use tokio::sync::broadcast;
458    use tokio::time::sleep;
459    use tracing_test::traced_test;
460
461    #[tokio::test]
462    #[traced_test]
463    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
464    {
465        let (read_half, mut write_half) = tokio::io::duplex(64);
466        let (tx, mut rx) = broadcast::channel(10);
467
468        // Let's preemptively write more data into the stream than our later selected chunk size (2)
469        // can handle, forcing the initial read to completely fill our chunk buffer.
470        // Our expectation is that we still receive all data written here through multiple
471        // consecutive reads.
472        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
473        // full buffer of, must not prevent this from happening!
474        write_half.write_all(b"hello world").await.unwrap();
475        write_half.flush().await.unwrap();
476
477        let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
478
479        drop(write_half); // This closes the stream and should let stream_reader terminate.
480        stream_reader.await.unwrap();
481
482        let mut chunks = Vec::<String>::new();
483        while let Ok(Some(chunk)) = rx.recv().await {
484            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
485        }
486        assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
487    }
488
489    #[tokio::test]
490    #[traced_test]
491    async fn read_chunked_no_data() {
492        let (read_half, write_half) = tokio::io::duplex(64);
493        let (tx, mut rx) = broadcast::channel(10);
494
495        let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
496
497        drop(write_half); // This closes the stream and should let stream_reader terminate.
498        stream_reader.await.unwrap();
499
500        let mut chunks = Vec::<String>::new();
501        while let Ok(Some(chunk)) = rx.recv().await {
502            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
503        }
504        assert_that(chunks).is_empty();
505    }
506
507    #[tokio::test]
508    #[traced_test]
509    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
510        let (read_half, mut write_half) = tokio::io::duplex(64);
511        let os = BroadcastOutputStream::from_stream(
512            read_half,
513            FromStreamOptions {
514                channel_capacity: 2,
515                ..Default::default()
516            },
517        );
518
519        let consumer = os.inspect_lines_async(
520            async |_line| {
521                // Mimic a slow consumer.
522                sleep(Duration::from_millis(100)).await;
523                Next::Continue
524            },
525            LineParsingOptions::default(),
526        );
527
528        #[rustfmt::skip]
529        let producer = tokio::spawn(async move {
530            for count in 1..=15 {
531                write_half
532                    .write(format!("{count}\n").as_bytes())
533                    .await
534                    .unwrap();
535                sleep(Duration::from_millis(25)).await;
536            }
537            write_half.flush().await.unwrap();
538            drop(write_half);
539        });
540
541        producer.await.unwrap();
542        consumer.wait().await.unwrap();
543        drop(os);
544
545        logs_assert(|lines: &[&str]| {
546            match lines
547                .iter()
548                .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
549                .count()
550            {
551                1 => {}
552                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
553            };
554            match lines
555                .iter()
556                .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
557                .count()
558            {
559                2 => {}
560                3 => {}
561                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
562            };
563            Ok(())
564        });
565    }
566
567    #[tokio::test]
568    async fn inspect_lines() {
569        let (read_half, write_half) = tokio::io::duplex(64);
570        let os = BroadcastOutputStream::from_stream(read_half, FromStreamOptions::default());
571
572        #[automock]
573        trait LineVisitor {
574            fn visit(&self, line: String);
575        }
576
577        let mut mock = MockLineVisitor::new();
578        #[rustfmt::skip]
579        fn configure(mock: &mut MockLineVisitor) {
580            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
581            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
582            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
583            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
584            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
585        }
586        configure(&mut mock);
587
588        let inspector = os.inspect_lines(
589            move |line| {
590                mock.visit(line);
591                Next::Continue
592            },
593            LineParsingOptions::default(),
594        );
595
596        tokio::spawn(write_test_data(write_half)).await.unwrap();
597
598        inspector.cancel().await.unwrap();
599        drop(os)
600    }
601
602    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
603    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
604    /// would not terminate the collector!
605    #[tokio::test]
606    #[traced_test]
607    async fn inspect_lines_async() {
608        let (read_half, mut write_half) = tokio::io::duplex(64);
609        let os = BroadcastOutputStream::from_stream(
610            read_half,
611            FromStreamOptions {
612                chunk_size: 32,
613                ..Default::default()
614            },
615        );
616
617        let seen: Vec<String> = Vec::new();
618        let collector = os.collect_lines_async(
619            seen,
620            move |line, seen: &mut Vec<String>| {
621                Box::pin(async move {
622                    if line == "break" {
623                        seen.push(line);
624                        Next::Break
625                    } else {
626                        seen.push(line);
627                        Next::Continue
628                    }
629                })
630            },
631            LineParsingOptions::default(),
632        );
633
634        let _writer = tokio::spawn(async move {
635            write_half.write_all("start\n".as_bytes()).await.unwrap();
636            write_half.write_all("break\n".as_bytes()).await.unwrap();
637            write_half.write_all("end\n".as_bytes()).await.unwrap();
638
639            loop {
640                write_half
641                    .write_all("gibberish\n".as_bytes())
642                    .await
643                    .unwrap();
644                tokio::time::sleep(Duration::from_millis(50)).await;
645            }
646        });
647
648        let seen = collector.wait().await.unwrap();
649
650        assert_that(seen).contains_exactly(&["start", "break"]);
651    }
652
653    #[tokio::test]
654    async fn collect_lines_to_file() {
655        let (read_half, write_half) = tokio::io::duplex(64);
656        let os = BroadcastOutputStream::from_stream(
657            read_half,
658            FromStreamOptions {
659                channel_capacity: 32,
660                ..Default::default()
661            },
662        );
663
664        let temp_file = tempfile::tempfile().unwrap();
665        let collector = os.collect_lines(
666            temp_file,
667            |line, temp_file| {
668                writeln!(temp_file, "{}", line).unwrap();
669                Next::Continue
670            },
671            LineParsingOptions::default(),
672        );
673
674        tokio::spawn(write_test_data(write_half)).await.unwrap();
675
676        let mut temp_file = collector.cancel().await.unwrap();
677        temp_file.seek(SeekFrom::Start(0)).unwrap();
678        let mut contents = String::new();
679        temp_file.read_to_string(&mut contents).unwrap();
680
681        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
682    }
683
684    #[tokio::test]
685    async fn collect_lines_async_to_file() {
686        let (read_half, write_half) = tokio::io::duplex(64);
687        let os = BroadcastOutputStream::from_stream(
688            read_half,
689            FromStreamOptions {
690                chunk_size: 32,
691                ..Default::default()
692            },
693        );
694
695        let temp_file = tempfile::tempfile().unwrap();
696        let collector = os.collect_lines_async(
697            temp_file,
698            |line, temp_file| {
699                Box::pin(async move {
700                    writeln!(temp_file, "{}", line).unwrap();
701                    Next::Continue
702                })
703            },
704            LineParsingOptions::default(),
705        );
706
707        tokio::spawn(write_test_data(write_half)).await.unwrap();
708
709        let mut temp_file = collector.cancel().await.unwrap();
710        temp_file.seek(SeekFrom::Start(0)).unwrap();
711        let mut contents = String::new();
712        temp_file.read_to_string(&mut contents).unwrap();
713
714        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
715    }
716
717    #[tokio::test]
718    #[traced_test]
719    async fn collect_chunks_into_write_mapped() {
720        let (read_half, write_half) = tokio::io::duplex(64);
721        let os = BroadcastOutputStream::from_stream(
722            read_half,
723            FromStreamOptions {
724                chunk_size: 32,
725                ..Default::default()
726            },
727        );
728
729        let temp_file = tokio::fs::File::options()
730            .create(true)
731            .truncate(true)
732            .write(true)
733            .read(true)
734            .open(std::env::temp_dir().join(
735                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
736            ))
737            .await
738            .unwrap();
739
740        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
741            String::from_utf8_lossy(chunk.as_ref()).to_string()
742        });
743
744        tokio::spawn(write_test_data(write_half)).await.unwrap();
745
746        let mut temp_file = collector.cancel().await.unwrap();
747        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
748        let mut contents = String::new();
749        temp_file.read_to_string(&mut contents).await.unwrap();
750
751        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
752    }
753
754    #[tokio::test]
755    #[traced_test]
756    async fn collect_chunks_into_write_in_parallel() {
757        // Big enough to hold any individual test write that we perform.
758        let (read_half, write_half) = tokio::io::duplex(64);
759
760        let os = BroadcastOutputStream::from_stream(
761            read_half,
762            FromStreamOptions {
763                // Big enough to hold any individual test write that we perform.
764                // Actual chunks will be smaller.
765                chunk_size: 64,
766                channel_capacity: 2,
767                ..Default::default()
768            },
769        );
770
771        let file1 = tokio::fs::File::options()
772            .create(true)
773            .truncate(true)
774            .write(true)
775            .read(true)
776            .open(
777                std::env::temp_dir()
778                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
779            )
780            .await
781            .unwrap();
782        let file2 = tokio::fs::File::options()
783            .create(true)
784            .truncate(true)
785            .write(true)
786            .read(true)
787            .open(
788                std::env::temp_dir()
789                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
790            )
791            .await
792            .unwrap();
793
794        let collector1 = os.collect_chunks_into_write(file1);
795        let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
796            format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
797        });
798
799        tokio::spawn(write_test_data(write_half)).await.unwrap();
800
801        let mut temp_file1 = collector1.cancel().await.unwrap();
802        temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
803        let mut contents = String::new();
804        temp_file1.read_to_string(&mut contents).await.unwrap();
805        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
806
807        let mut temp_file2 = collector2.cancel().await.unwrap();
808        temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
809        let mut contents = String::new();
810        temp_file2.read_to_string(&mut contents).await.unwrap();
811        assert_that(contents)
812            .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
813    }
814}