tokio_process_tools/output_stream/
broadcast.rs

1use crate::InspectorError;
2use crate::collector::{AsyncCollectFn, Collector, Sink};
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use std::fmt::{Debug, Formatter};
11use std::future::Future;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
15use tokio::sync::broadcast::error::RecvError;
16use tokio::sync::{RwLock, broadcast};
17use tokio::task::JoinHandle;
18use tokio::time::error::Elapsed;
19
20/// The output stream from a process. Either representing stdout or stderr.
21///
22/// This is the broadcast variant, allowing for multiple simultaneous consumers with the downside
23/// of inducing memory allocations not required when only one consumer is listening.
24/// For that case, prefer using the `output_stream::single_subscriber::SingleOutputSteam`.
25pub struct BroadcastOutputStream {
26    /// The task that captured a clone of our `broadcast::Sender` and is now asynchronously
27    /// awaiting new output from the underlying stream, sending it to all registered receivers.
28    stream_reader: JoinHandle<()>,
29
30    /// We only store the chunk sender. The initial receiver is dropped immediately after creating
31    /// the channel.
32    /// New subscribers can be created from this sender.
33    sender: broadcast::Sender<Option<Chunk>>,
34}
35
36impl OutputStream for BroadcastOutputStream {}
37
38impl Drop for BroadcastOutputStream {
39    fn drop(&mut self) {
40        self.stream_reader.abort();
41    }
42}
43
44impl Debug for BroadcastOutputStream {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("BroadcastOutputStream")
47            .field("output_collector", &"non-debug < JoinHandle<()> >")
48            .field(
49                "sender",
50                &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
51            )
52            .finish()
53    }
54}
55
56/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
57/// Every chunk sent into `sender` is a frozen slice of that buffer.
58/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
59async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
60    mut read: B,
61    chunk_size: usize,
62    sender: broadcast::Sender<Option<Chunk>>,
63) {
64    let send_chunk = move |chunk: Option<Chunk>| {
65        // When we could not send the chunk, we get it back in the error value and
66        // then drop it. This means that the BytesMut storage portion of that chunk
67        // is now reclaimable and can be used for storing the next chunk of incoming
68        // bytes.
69        match sender.send(chunk) {
70            Ok(_received_by) => {}
71            Err(err) => {
72                // No receivers: All already dropped or none was yet created.
73                // We intentionally ignore these errors.
74                // If they occur, the user just wasn't interested in seeing this chunk.
75                // We won't store it (history) to later feed it back to a new subscriber.
76                tracing::debug!(
77                    error = %err,
78                    "No active receivers for the output chunk, dropping it"
79                );
80            }
81        }
82    };
83
84    // A BytesMut may grow when used in a `read_buf` call.
85    let mut buf = bytes::BytesMut::with_capacity(chunk_size);
86    loop {
87        let _ = buf.try_reclaim(chunk_size);
88        match read.read_buf(&mut buf).await {
89            Ok(bytes_read) => {
90                let is_eof = bytes_read == 0;
91
92                match is_eof {
93                    true => send_chunk(None),
94                    false => {
95                        while !buf.is_empty() {
96                            // Split of at least `chunk_size` bytes and send it, even if we were
97                            // able to read more than `chunk_size` bytes.
98                            // We could have read more
99                            let split_to = usize::min(chunk_size, buf.len());
100                            // Splitting off bytes reduces the remaining capacity of our BytesMut.
101                            // It might have now reached a capacity of 0. But this is fine!
102                            // The next usage of it in `read_buf` will not return `0`, as you may
103                            // expect from the read_buf documentation. The BytesMut will grow
104                            // to allow buffering of more data.
105                            //
106                            // NOTE: If we only split of at max `chunk_size` bytes, we have to repeat
107                            // this, unless all data is processed.
108                            send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
109                        }
110                    }
111                };
112
113                if is_eof {
114                    break;
115                }
116            }
117            Err(err) => panic!("Could not read from stream: {err}"),
118        }
119    }
120}
121
122impl BroadcastOutputStream {
123    pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
124        stream: S,
125        options: FromStreamOptions,
126    ) -> BroadcastOutputStream {
127        let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
128        drop(receiver);
129
130        let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
131
132        BroadcastOutputStream {
133            stream_reader,
134            sender,
135        }
136    }
137
138    fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
139        self.sender.subscribe()
140    }
141}
142
143// Expected types:
144// loop_label: &'static str
145// receiver: tokio::sync::broadcast::Receiver<Option<Chunk>>,
146// term_rx: tokio::sync::oneshot::Receiver<()>,
147macro_rules! handle_subscription {
148    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
149        $loop_label: loop {
150            tokio::select! {
151                out = $receiver.recv() => {
152                    match out {
153                        Ok(maybe_chunk) => {
154                            let $chunk = maybe_chunk;
155                            $body
156                        }
157                        Err(RecvError::Closed) => {
158                            // All senders have been dropped.
159                            break $loop_label;
160                        },
161                        Err(RecvError::Lagged(lagged)) => {
162                            tracing::warn!(lagged, "Inspector is lagging behind");
163                        }
164                    }
165                }
166                _msg = &mut $term_rx => break $loop_label,
167            }
168        }
169    };
170}
171
172// Impls for inspecting the output of the stream.
173impl BroadcastOutputStream {
174    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
175    pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
176        let mut receiver = self.subscribe();
177        impl_inspect_chunks!(receiver, f, handle_subscription)
178    }
179
180    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
181    pub fn inspect_lines(
182        &self,
183        mut f: impl FnMut(String) -> Next + Send + 'static,
184        options: LineParsingOptions,
185    ) -> Inspector {
186        let mut receiver = self.subscribe();
187        impl_inspect_lines!(receiver, f, options, handle_subscription)
188    }
189
190    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
191    pub fn inspect_lines_async<Fut>(
192        &self,
193        mut f: impl FnMut(String) -> Fut + Send + 'static,
194        options: LineParsingOptions,
195    ) -> Inspector
196    where
197        Fut: Future<Output = Next> + Send,
198    {
199        let mut receiver = self.subscribe();
200        impl_inspect_lines_async!(receiver, f, options, handle_subscription)
201    }
202}
203
204// Impls for collecting the output of the stream.
205impl BroadcastOutputStream {
206    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
207    pub fn collect_chunks<S: Sink>(
208        &self,
209        into: S,
210        mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
211    ) -> Collector<S> {
212        let sink = Arc::new(RwLock::new(into));
213        let mut receiver = self.subscribe();
214        impl_collect_chunks!(receiver, collect, sink, handle_subscription)
215    }
216
217    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
218    pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
219    where
220        S: Sink,
221        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
222    {
223        let sink = Arc::new(RwLock::new(into));
224        let mut receiver = self.subscribe();
225        impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
226    }
227
228    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
229    pub fn collect_lines<S: Sink>(
230        &self,
231        into: S,
232        mut collect: impl FnMut(String, &mut S) -> Next + Send + 'static,
233        options: LineParsingOptions,
234    ) -> Collector<S> {
235        let sink = Arc::new(RwLock::new(into));
236        let mut receiver = self.subscribe();
237        impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
238    }
239
240    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
241    pub fn collect_lines_async<S, F>(
242        &self,
243        into: S,
244        collect: F,
245        options: LineParsingOptions,
246    ) -> Collector<S>
247    where
248        S: Sink,
249        F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
250    {
251        let sink = Arc::new(RwLock::new(into));
252        let mut receiver = self.subscribe();
253        impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
254    }
255
256    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
257    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
258        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
259    }
260
261    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
262    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
263        self.collect_lines(
264            Vec::new(),
265            |line, vec| {
266                vec.push(line);
267                Next::Continue
268            },
269            options,
270        )
271    }
272
273    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
274    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
275        &self,
276        write: W,
277    ) -> Collector<W> {
278        self.collect_chunks_async(write, move |chunk, write| {
279            Box::pin(async move {
280                if let Err(err) = write.write_all(chunk.as_ref()).await {
281                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
282                };
283                Next::Continue
284            })
285        })
286    }
287
288    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
289    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
290        &self,
291        write: W,
292        options: LineParsingOptions,
293    ) -> Collector<W> {
294        self.collect_lines_async(
295            write,
296            move |line, write| {
297                Box::pin(async move {
298                    if let Err(err) = write.write_all(line.as_bytes()).await {
299                        tracing::warn!("Could not write line to write sink: {err:#?}");
300                    };
301                    Next::Continue
302                })
303            },
304            options,
305        )
306    }
307
308    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
309    pub fn collect_chunks_into_write_mapped<
310        W: Sink + AsyncWriteExt + Unpin,
311        B: AsRef<[u8]> + Send,
312    >(
313        &self,
314        write: W,
315        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
316    ) -> Collector<W> {
317        self.collect_chunks_async(write, move |chunk, write| {
318            Box::pin(async move {
319                let mapped = mapper(chunk);
320                let mapped = mapped.as_ref();
321                if let Err(err) = write.write_all(mapped).await {
322                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
323                };
324                Next::Continue
325            })
326        })
327    }
328
329    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
330    pub fn collect_lines_into_write_mapped<
331        W: Sink + AsyncWriteExt + Unpin,
332        B: AsRef<[u8]> + Send,
333    >(
334        &self,
335        write: W,
336        mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
337        options: LineParsingOptions,
338    ) -> Collector<W> {
339        self.collect_lines_async(
340            write,
341            move |line, write| {
342                Box::pin(async move {
343                    let mapped = mapper(line);
344                    let mapped = mapped.as_ref();
345                    if let Err(err) = write.write_all(mapped).await {
346                        tracing::warn!("Could not write line to write sink: {err:#?}");
347                    };
348                    Next::Continue
349                })
350            },
351            options,
352        )
353    }
354}
355
356// Impls for waiting for a specific line of output.
357impl BroadcastOutputStream {
358    pub async fn wait_for_line(
359        &self,
360        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
361        options: LineParsingOptions,
362    ) {
363        let inspector = self.inspect_lines(
364            move |line| {
365                if predicate(line) {
366                    Next::Break
367                } else {
368                    Next::Continue
369                }
370            },
371            options,
372        );
373        match inspector.wait().await {
374            Ok(()) => {}
375            Err(err) => match err {
376                InspectorError::TaskJoin(join_error) => {
377                    panic!("Inspector task join error: {join_error:#?}");
378                }
379            },
380        };
381    }
382
383    pub async fn wait_for_line_with_timeout(
384        &self,
385        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
386        options: LineParsingOptions,
387        timeout: Duration,
388    ) -> Result<(), Elapsed> {
389        tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
390    }
391}
392
393/// Configuration for line parsing behavior.
394pub struct LineConfig {
395    // Existing fields
396    // ...
397    /// Maximum length of a single line in bytes.
398    /// When reached, the current line will be emitted.
399    /// A value of 0 means no limit (default).
400    pub max_line_length: usize,
401}
402
403#[cfg(test)]
404mod tests {
405    use super::read_chunked;
406    use crate::output_stream::broadcast::BroadcastOutputStream;
407    use crate::output_stream::tests::write_test_data;
408    use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
409    use assertr::assert_that;
410    use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
411    use mockall::*;
412    use std::io::Read;
413    use std::io::Seek;
414    use std::io::SeekFrom;
415    use std::io::Write;
416    use std::time::Duration;
417    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
418    use tokio::sync::broadcast;
419    use tokio::time::sleep;
420    use tracing_test::traced_test;
421
422    #[tokio::test]
423    #[traced_test]
424    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
425    {
426        let (read_half, mut write_half) = tokio::io::duplex(64);
427        let (tx, mut rx) = broadcast::channel(10);
428
429        // Let's preemptively write more data into the stream than our later selected chunk size (2)
430        // can handle, forcing the initial read to completely fill our chunk buffer.
431        // Our expectation is that we still receive all data written here through multiple
432        // consecutive reads.
433        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
434        // full buffer of, must not prevent this from happening!
435        write_half.write_all(b"hello world").await.unwrap();
436        write_half.flush().await.unwrap();
437
438        let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
439
440        drop(write_half); // This closes the stream and should let stream_reader terminate.
441        stream_reader.await.unwrap();
442
443        let mut chunks = Vec::<String>::new();
444        while let Ok(Some(chunk)) = rx.recv().await {
445            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
446        }
447        assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
448    }
449
450    #[tokio::test]
451    #[traced_test]
452    async fn read_chunked_no_data() {
453        let (read_half, write_half) = tokio::io::duplex(64);
454        let (tx, mut rx) = broadcast::channel(10);
455
456        let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
457
458        drop(write_half); // This closes the stream and should let stream_reader terminate.
459        stream_reader.await.unwrap();
460
461        let mut chunks = Vec::<String>::new();
462        while let Ok(Some(chunk)) = rx.recv().await {
463            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
464        }
465        assert_that(chunks).is_empty();
466    }
467
468    #[tokio::test]
469    #[traced_test]
470    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
471        let (read_half, mut write_half) = tokio::io::duplex(64);
472        let os = BroadcastOutputStream::from_stream(
473            read_half,
474            FromStreamOptions {
475                channel_capacity: 2,
476                ..Default::default()
477            },
478        );
479
480        let consumer = os.inspect_lines_async(
481            async |_line| {
482                // Mimic a slow consumer.
483                sleep(Duration::from_millis(100)).await;
484                Next::Continue
485            },
486            LineParsingOptions::default(),
487        );
488
489        #[rustfmt::skip]
490        let producer = tokio::spawn(async move {
491            for count in 1..=15 {
492                write_half
493                    .write(format!("{count}\n").as_bytes())
494                    .await
495                    .unwrap();
496                sleep(Duration::from_millis(25)).await;
497            }
498            write_half.flush().await.unwrap();
499            drop(write_half);
500        });
501
502        producer.await.unwrap();
503        consumer.wait().await.unwrap();
504        drop(os);
505
506        logs_assert(|lines: &[&str]| {
507            match lines
508                .iter()
509                .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
510                .count()
511            {
512                1 => {}
513                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
514            };
515            match lines
516                .iter()
517                .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
518                .count()
519            {
520                2 => {}
521                3 => {}
522                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
523            };
524            Ok(())
525        });
526    }
527
528    #[tokio::test]
529    async fn inspect_lines() {
530        let (read_half, write_half) = tokio::io::duplex(64);
531        let os = BroadcastOutputStream::from_stream(read_half, FromStreamOptions::default());
532
533        #[automock]
534        trait LineVisitor {
535            fn visit(&self, line: String);
536        }
537
538        let mut mock = MockLineVisitor::new();
539        #[rustfmt::skip]
540        fn configure(mock: &mut MockLineVisitor) {
541            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
542            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
543            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
544            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
545            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
546        }
547        configure(&mut mock);
548
549        let inspector = os.inspect_lines(
550            move |line| {
551                mock.visit(line);
552                Next::Continue
553            },
554            LineParsingOptions::default(),
555        );
556
557        tokio::spawn(write_test_data(write_half)).await.unwrap();
558
559        inspector.cancel().await.unwrap();
560        drop(os)
561    }
562
563    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
564    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
565    /// would not terminate the collector!
566    #[tokio::test]
567    #[traced_test]
568    async fn inspect_lines_async() {
569        let (read_half, mut write_half) = tokio::io::duplex(64);
570        let os = BroadcastOutputStream::from_stream(
571            read_half,
572            FromStreamOptions {
573                chunk_size: 32,
574                ..Default::default()
575            },
576        );
577
578        let seen: Vec<String> = Vec::new();
579        let collector = os.collect_lines_async(
580            seen,
581            move |line, seen: &mut Vec<String>| {
582                Box::pin(async move {
583                    if line == "break" {
584                        seen.push(line);
585                        Next::Break
586                    } else {
587                        seen.push(line);
588                        Next::Continue
589                    }
590                })
591            },
592            LineParsingOptions::default(),
593        );
594
595        let _writer = tokio::spawn(async move {
596            write_half.write_all("start\n".as_bytes()).await.unwrap();
597            write_half.write_all("break\n".as_bytes()).await.unwrap();
598            write_half.write_all("end\n".as_bytes()).await.unwrap();
599
600            loop {
601                write_half
602                    .write_all("gibberish\n".as_bytes())
603                    .await
604                    .unwrap();
605                tokio::time::sleep(Duration::from_millis(50)).await;
606            }
607        });
608
609        let seen = collector.wait().await.unwrap();
610
611        assert_that(seen).contains_exactly(&["start", "break"]);
612    }
613
614    #[tokio::test]
615    async fn collect_lines_to_file() {
616        let (read_half, write_half) = tokio::io::duplex(64);
617        let os = BroadcastOutputStream::from_stream(
618            read_half,
619            FromStreamOptions {
620                channel_capacity: 32,
621                ..Default::default()
622            },
623        );
624
625        let temp_file = tempfile::tempfile().unwrap();
626        let collector = os.collect_lines(
627            temp_file,
628            |line, temp_file| {
629                writeln!(temp_file, "{}", line).unwrap();
630                Next::Continue
631            },
632            LineParsingOptions::default(),
633        );
634
635        tokio::spawn(write_test_data(write_half)).await.unwrap();
636
637        let mut temp_file = collector.cancel().await.unwrap();
638        temp_file.seek(SeekFrom::Start(0)).unwrap();
639        let mut contents = String::new();
640        temp_file.read_to_string(&mut contents).unwrap();
641
642        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
643    }
644
645    #[tokio::test]
646    async fn collect_lines_async_to_file() {
647        let (read_half, write_half) = tokio::io::duplex(64);
648        let os = BroadcastOutputStream::from_stream(
649            read_half,
650            FromStreamOptions {
651                chunk_size: 32,
652                ..Default::default()
653            },
654        );
655
656        let temp_file = tempfile::tempfile().unwrap();
657        let collector = os.collect_lines_async(
658            temp_file,
659            |line, temp_file| {
660                Box::pin(async move {
661                    writeln!(temp_file, "{}", line).unwrap();
662                    Next::Continue
663                })
664            },
665            LineParsingOptions::default(),
666        );
667
668        tokio::spawn(write_test_data(write_half)).await.unwrap();
669
670        let mut temp_file = collector.cancel().await.unwrap();
671        temp_file.seek(SeekFrom::Start(0)).unwrap();
672        let mut contents = String::new();
673        temp_file.read_to_string(&mut contents).unwrap();
674
675        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
676    }
677
678    #[tokio::test]
679    #[traced_test]
680    async fn collect_chunks_into_write_mapped() {
681        let (read_half, write_half) = tokio::io::duplex(64);
682        let os = BroadcastOutputStream::from_stream(
683            read_half,
684            FromStreamOptions {
685                chunk_size: 32,
686                ..Default::default()
687            },
688        );
689
690        let temp_file = tokio::fs::File::options()
691            .create(true)
692            .truncate(true)
693            .write(true)
694            .read(true)
695            .open(std::env::temp_dir().join(
696                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
697            ))
698            .await
699            .unwrap();
700
701        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
702            String::from_utf8_lossy(chunk.as_ref()).to_string()
703        });
704
705        tokio::spawn(write_test_data(write_half)).await.unwrap();
706
707        let mut temp_file = collector.cancel().await.unwrap();
708        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
709        let mut contents = String::new();
710        temp_file.read_to_string(&mut contents).await.unwrap();
711
712        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
713    }
714
715    #[tokio::test]
716    #[traced_test]
717    async fn collect_chunks_into_write_in_parallel() {
718        // Big enough to hold any individual test write that we perform.
719        let (read_half, write_half) = tokio::io::duplex(64);
720
721        let os = BroadcastOutputStream::from_stream(
722            read_half,
723            FromStreamOptions {
724                // Big enough to hold any individual test write that we perform.
725                // Actual chunks will be smaller.
726                chunk_size: 64,
727                channel_capacity: 2,
728                ..Default::default()
729            },
730        );
731
732        let file1 = tokio::fs::File::options()
733            .create(true)
734            .truncate(true)
735            .write(true)
736            .read(true)
737            .open(
738                std::env::temp_dir()
739                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
740            )
741            .await
742            .unwrap();
743        let file2 = tokio::fs::File::options()
744            .create(true)
745            .truncate(true)
746            .write(true)
747            .read(true)
748            .open(
749                std::env::temp_dir()
750                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
751            )
752            .await
753            .unwrap();
754
755        let collector1 = os.collect_chunks_into_write(file1);
756        let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
757            format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
758        });
759
760        tokio::spawn(write_test_data(write_half)).await.unwrap();
761
762        let mut temp_file1 = collector1.cancel().await.unwrap();
763        temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
764        let mut contents = String::new();
765        temp_file1.read_to_string(&mut contents).await.unwrap();
766        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
767
768        let mut temp_file2 = collector2.cancel().await.unwrap();
769        temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
770        let mut contents = String::new();
771        temp_file2.read_to_string(&mut contents).await.unwrap();
772        assert_that(contents)
773            .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
774    }
775}