tokio_process_tools/output_stream/
single_subscriber.rs

1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
5    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
6};
7use crate::output_stream::{
8    BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
9};
10use crate::{InspectorError, LineParsingOptions};
11use std::fmt::{Debug, Formatter};
12use std::future::Future;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
16use tokio::sync::mpsc::error::TrySendError;
17use tokio::sync::{RwLock, mpsc};
18use tokio::task::JoinHandle;
19use tokio::time::error::Elapsed;
20
21/// The output stream from a process. Either representing stdout or stderr.
22///
23/// This is the single-subscriber variant, allowing for just one consumer.
24/// This has the upside of requiring as few memory allocations as possible.
25/// If multiple concurrent inspections are required, prefer using the
26/// `output_stream::broadcast::BroadcastOutputSteam`.
27pub struct SingleSubscriberOutputStream {
28    /// The task that captured our `mpsc::Sender` and is now asynchronously awaiting
29    /// new output from the underlying stream, sending it to our registered receiver (if present).
30    stream_reader: JoinHandle<()>,
31
32    /// The receiver is wrapped in an `Option` so that we can take it out of it and move it
33    /// into an inspector or collector task.
34    receiver: Option<mpsc::Receiver<Option<Chunk>>>,
35}
36
37impl OutputStream for SingleSubscriberOutputStream {}
38
39impl Drop for SingleSubscriberOutputStream {
40    fn drop(&mut self) {
41        self.stream_reader.abort();
42    }
43}
44
45impl Debug for SingleSubscriberOutputStream {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("SingleSubscriberOutputStream")
48            .field("output_collector", &"non-debug < JoinHandle<()> >")
49            .field(
50                "receiver",
51                &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
52            )
53            .finish()
54    }
55}
56
57/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
58/// Every chunk sent into `sender` is a frozen slice of that buffer.
59/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
60async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
61    mut read: R,
62    chunk_size: usize,
63    sender: mpsc::Sender<Option<Chunk>>,
64    backpressure_control: BackpressureControl,
65) {
66    struct AfterSend {
67        do_break: bool,
68    }
69
70    fn try_send_chunk(
71        chunk: Option<Chunk>,
72        sender: &mpsc::Sender<Option<Chunk>>,
73        lagged: &mut usize,
74    ) -> AfterSend {
75        match sender.try_send(chunk) {
76            Ok(()) => {
77                if *lagged > 0 {
78                    tracing::debug!(lagged, "Stream reader is lagging behind");
79                    *lagged = 0;
80                }
81            }
82            Err(err) => {
83                match err {
84                    TrySendError::Full(_data) => {
85                        *lagged += 1;
86                    }
87                    TrySendError::Closed(_data) => {
88                        // All receivers already dropped.
89                        // We intentionally ignore this error.
90                        // If it occurs, the user just isn't interested in
91                        // newer chunks anymore.
92                        return AfterSend { do_break: true };
93                    }
94                }
95            }
96        }
97        AfterSend { do_break: false }
98    }
99
100    async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
101        match sender.send(chunk).await {
102            Ok(()) => {}
103            Err(_err) => {
104                // All receivers already dropped.
105                // We intentionally ignore this error.
106                // If it occurs, the user just isn't interested in
107                // newer chunks anymore.
108                return AfterSend { do_break: true };
109            }
110        }
111        AfterSend { do_break: false }
112    }
113
114    // NOTE: buf may grow when required!
115    let mut buf = bytes::BytesMut::with_capacity(chunk_size);
116    let mut lagged: usize = 0;
117    loop {
118        let _ = buf.try_reclaim(chunk_size);
119        match read.read_buf(&mut buf).await {
120            Ok(bytes_read) => {
121                let is_eof = bytes_read == 0;
122
123                match is_eof {
124                    true => match backpressure_control {
125                        BackpressureControl::DropLatestIncomingIfBufferFull => {
126                            let after = try_send_chunk(None, &sender, &mut lagged);
127                            if after.do_break {
128                                break;
129                            }
130                        }
131                        BackpressureControl::BlockUntilBufferHasSpace => {
132                            let after = send_chunk(None, &sender).await;
133                            if after.do_break {
134                                break;
135                            }
136                        }
137                    },
138                    false => {
139                        while !buf.is_empty() {
140                            let split_to = usize::min(chunk_size, buf.len());
141
142                            match backpressure_control {
143                                BackpressureControl::DropLatestIncomingIfBufferFull => {
144                                    let after = try_send_chunk(
145                                        Some(Chunk(buf.split_to(split_to).freeze())),
146                                        &sender,
147                                        &mut lagged,
148                                    );
149                                    if after.do_break {
150                                        break;
151                                    }
152                                }
153                                BackpressureControl::BlockUntilBufferHasSpace => {
154                                    let after = send_chunk(
155                                        Some(Chunk(buf.split_to(split_to).freeze())),
156                                        &sender,
157                                    )
158                                    .await;
159                                    if after.do_break {
160                                        break;
161                                    }
162                                }
163                            }
164                        }
165                    }
166                };
167
168                if is_eof {
169                    break;
170                }
171            }
172            Err(err) => panic!("Could not read from stream: {err}"),
173        }
174    }
175}
176
177impl SingleSubscriberOutputStream {
178    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
179        stream: S,
180        backpressure_control: BackpressureControl,
181        options: FromStreamOptions,
182    ) -> SingleSubscriberOutputStream {
183        let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
184
185        let stream_reader = tokio::spawn(read_chunked(
186            stream,
187            options.chunk_size,
188            tx_stdout,
189            backpressure_control,
190        ));
191
192        SingleSubscriberOutputStream {
193            stream_reader,
194            receiver: Some(rx_stdout),
195        }
196    }
197
198    fn take_receiver(&mut self) -> mpsc::Receiver<Option<Chunk>> {
199        self.receiver.take().expect("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.")
200    }
201}
202
203// Expected types:
204// receiver: tokio::sync::mpsc::Receiver<Option<Chunk>>
205// term_rx: tokio::sync::oneshot::Receiver<()>
206macro_rules! handle_subscription {
207    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
208        $loop_label: loop {
209            tokio::select! {
210                out = $receiver.recv() => {
211                    match out {
212                        Some(maybe_chunk) => {
213                            let $chunk = maybe_chunk;
214                            $body
215                        }
216                        None => {
217                            // All senders have been dropped.
218                            break $loop_label;
219                        }
220                    }
221                }
222                _msg = &mut $term_rx => break $loop_label,
223            }
224        }
225    };
226}
227
228// Impls for inspecting the output of the stream.
229impl SingleSubscriberOutputStream {
230    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
231    pub fn inspect_chunks(&mut self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
232        let mut receiver = self.take_receiver();
233        impl_inspect_chunks!(receiver, f, handle_subscription)
234    }
235
236    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
237    pub fn inspect_lines(
238        &mut self,
239        mut f: impl FnMut(String) -> Next + Send + 'static,
240        options: LineParsingOptions,
241    ) -> Inspector {
242        let mut receiver = self.take_receiver();
243        impl_inspect_lines!(receiver, f, options, handle_subscription)
244    }
245
246    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
247    pub fn inspect_lines_async<Fut>(
248        &mut self,
249        mut f: impl FnMut(String) -> Fut + Send + 'static,
250        options: LineParsingOptions,
251    ) -> Inspector
252    where
253        Fut: Future<Output = Next> + Send,
254    {
255        let mut receiver = self.take_receiver();
256        impl_inspect_lines_async!(receiver, f, options, handle_subscription)
257    }
258}
259
260// Impls for collecting the output of the stream.
261impl SingleSubscriberOutputStream {
262    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
263    pub fn collect_chunks<S: Sink>(
264        &mut self,
265        into: S,
266        collect: impl Fn(Chunk, &mut S) + Send + 'static,
267    ) -> Collector<S> {
268        let sink = Arc::new(RwLock::new(into));
269        let mut receiver = self.take_receiver();
270        impl_collect_chunks!(receiver, collect, sink, handle_subscription)
271    }
272
273    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
274    pub fn collect_chunks_async<S, F>(&mut self, into: S, collect: F) -> Collector<S>
275    where
276        S: Sink,
277        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
278    {
279        let sink = Arc::new(RwLock::new(into));
280        let mut receiver = self.take_receiver();
281        impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
282    }
283
284    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
285    pub fn collect_lines<S: Sink>(
286        &mut self,
287        into: S,
288        collect: impl Fn(String, &mut S) -> Next + Send + 'static,
289        options: LineParsingOptions,
290    ) -> Collector<S> {
291        let sink = Arc::new(RwLock::new(into));
292        let mut receiver = self.take_receiver();
293        impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
294    }
295
296    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
297    pub fn collect_lines_async<S, F>(
298        &mut self,
299        into: S,
300        collect: F,
301        options: LineParsingOptions,
302    ) -> Collector<S>
303    where
304        S: Sink,
305        F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
306    {
307        let sink = Arc::new(RwLock::new(into));
308        let mut receiver = self.take_receiver();
309        impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
310    }
311
312    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
313    pub fn collect_chunks_into_vec(&mut self) -> Collector<Vec<u8>> {
314        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
315    }
316
317    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
318    pub fn collect_lines_into_vec(
319        &mut self,
320        options: LineParsingOptions,
321    ) -> Collector<Vec<String>> {
322        self.collect_lines(
323            Vec::new(),
324            |line, vec| {
325                vec.push(line);
326                Next::Continue
327            },
328            options,
329        )
330    }
331
332    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
333    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
334        &mut self,
335        write: W,
336    ) -> Collector<W> {
337        self.collect_chunks_async(write, move |chunk, write| {
338            Box::pin(async move {
339                if let Err(err) = write.write_all(chunk.as_ref()).await {
340                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
341                };
342                Next::Continue
343            })
344        })
345    }
346
347    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
348    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
349        &mut self,
350        write: W,
351        options: LineParsingOptions,
352    ) -> Collector<W> {
353        self.collect_lines_async(
354            write,
355            move |line, write| {
356                Box::pin(async move {
357                    if let Err(err) = write.write_all(line.as_bytes()).await {
358                        tracing::warn!("Could not write line to write sink: {err:#?}");
359                    };
360                    Next::Continue
361                })
362            },
363            options,
364        )
365    }
366
367    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
368    pub fn collect_chunks_into_write_mapped<
369        W: Sink + AsyncWriteExt + Unpin,
370        B: AsRef<[u8]> + Send,
371    >(
372        &mut self,
373        write: W,
374        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
375    ) -> Collector<W> {
376        self.collect_chunks_async(write, move |chunk, write| {
377            Box::pin(async move {
378                let mapped = mapper(chunk);
379                let mapped = mapped.as_ref();
380                if let Err(err) = write.write_all(mapped).await {
381                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
382                };
383                Next::Continue
384            })
385        })
386    }
387
388    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
389    pub fn collect_lines_into_write_mapped<
390        W: Sink + AsyncWriteExt + Unpin,
391        B: AsRef<[u8]> + Send,
392    >(
393        &mut self,
394        write: W,
395        mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
396        options: LineParsingOptions,
397    ) -> Collector<W> {
398        self.collect_lines_async(
399            write,
400            move |line, write| {
401                Box::pin(async move {
402                    let mapped = mapper(line);
403                    let mapped = mapped.as_ref();
404                    if let Err(err) = write.write_all(mapped).await {
405                        tracing::warn!("Could not write line to write sink: {err:#?}");
406                    };
407                    Next::Continue
408                })
409            },
410            options,
411        )
412    }
413}
414
415// Impls for waiting for a specific line of output.
416impl SingleSubscriberOutputStream {
417    pub async fn wait_for_line(
418        &mut self,
419        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
420        options: LineParsingOptions,
421    ) {
422        let inspector = self.inspect_lines(
423            move |line| {
424                if predicate(line) {
425                    Next::Break
426                } else {
427                    Next::Continue
428                }
429            },
430            options,
431        );
432        match inspector.wait().await {
433            Ok(()) => {}
434            Err(err) => match err {
435                InspectorError::TaskJoin(join_error) => {
436                    panic!("Inspector task join error: {join_error:#?}");
437                }
438            },
439        };
440    }
441
442    pub async fn wait_for_line_with_timeout(
443        &mut self,
444        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
445        options: LineParsingOptions,
446        timeout: Duration,
447    ) -> Result<(), Elapsed> {
448        tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
455    use crate::output_stream::tests::write_test_data;
456    use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
457    use crate::single_subscriber::read_chunked;
458    use crate::LineParsingOptions;
459    use assertr::prelude::*;
460    use mockall::{automock, predicate};
461    use std::io::{Read, Seek, SeekFrom, Write};
462    use std::time::Duration;
463    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
464    use tokio::sync::mpsc;
465    use tokio::time::sleep;
466    use tracing_test::traced_test;
467
468    #[tokio::test]
469    #[traced_test]
470    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
471    {
472        let (read_half, mut write_half) = tokio::io::duplex(64);
473        let (tx, mut rx) = mpsc::channel(64);
474
475        // Let's preemptively write more data into the stream than our later selected chunk size (2)
476        // can handle, forcing the initial read to completely fill our chunk buffer.
477        // Our expectation is that we still receive all data written here through multiple
478        // consecutive reads.
479        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
480        // full buffer of, must not prevent this from happening but allocate more memory instead!
481        write_half.write_all(b"hello world").await.unwrap();
482        write_half.flush().await.unwrap();
483
484        let stream_reader = tokio::spawn(read_chunked(
485            read_half,
486            2,
487            tx,
488            BackpressureControl::DropLatestIncomingIfBufferFull,
489        ));
490
491        drop(write_half); // This closes the stream and should let stream_reader terminate.
492        stream_reader.await.unwrap();
493
494        let mut chunks = Vec::<String>::new();
495        while let Some(Some(chunk)) = rx.recv().await {
496            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
497        }
498        assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
499    }
500
501    #[tokio::test]
502    #[traced_test]
503    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
504        let (read_half, mut write_half) = tokio::io::duplex(64);
505        let mut os = SingleSubscriberOutputStream::from_stream(
506            read_half,
507            BackpressureControl::DropLatestIncomingIfBufferFull,
508            FromStreamOptions {
509                channel_capacity: 2,
510                ..Default::default()
511            },
512        );
513
514        let inspector = os.inspect_lines_async(
515            async |_line| {
516                // Mimic a slow consumer.
517                sleep(Duration::from_millis(100)).await;
518                Next::Continue
519            },
520            LineParsingOptions::default(),
521        );
522
523        #[rustfmt::skip]
524        let producer = tokio::spawn(async move {
525            for count in 1..=15 {
526                write_half
527                    .write(format!("{count}\n").as_bytes())
528                    .await
529                    .unwrap();
530                sleep(Duration::from_millis(25)).await;
531            }
532        });
533
534        producer.await.unwrap();
535        inspector.wait().await.unwrap();
536        drop(os);
537
538        logs_assert(|lines: &[&str]| {
539            match lines
540                .iter()
541                .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
542                .count()
543            {
544                1 => {}
545                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
546            };
547            match lines
548                .iter()
549                .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
550                .count()
551            {
552                2 => {}
553                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
554            };
555            Ok(())
556        });
557    }
558
559    #[tokio::test]
560    async fn inspect_lines() {
561        let (read_half, write_half) = tokio::io::duplex(64);
562        let mut os = SingleSubscriberOutputStream::from_stream(
563            read_half,
564            BackpressureControl::DropLatestIncomingIfBufferFull,
565            FromStreamOptions::default(),
566        );
567
568        #[automock]
569        trait LineVisitor {
570            fn visit(&self, line: String);
571        }
572
573        let mut mock = MockLineVisitor::new();
574        #[rustfmt::skip]
575        fn configure(mock: &mut MockLineVisitor) {
576            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
577            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
578            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
579            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
580            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
581        }
582        configure(&mut mock);
583
584        let inspector = os.inspect_lines(
585            move |line| {
586                mock.visit(line);
587                Next::Continue
588            },
589            LineParsingOptions::default(),
590        );
591
592        tokio::spawn(write_test_data(write_half)).await.unwrap();
593
594        inspector.cancel().await.unwrap();
595        drop(os)
596    }
597
598    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
599    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
600    /// would not terminate the collector!
601    #[tokio::test]
602    #[traced_test]
603    async fn inspect_lines_async() {
604        let (read_half, mut write_half) = tokio::io::duplex(64);
605        let mut os = SingleSubscriberOutputStream::from_stream(
606            read_half,
607            BackpressureControl::DropLatestIncomingIfBufferFull,
608            FromStreamOptions {
609                chunk_size: 32,
610                ..Default::default()
611            },
612        );
613
614        let seen: Vec<String> = Vec::new();
615        let collector = os.collect_lines_async(
616            seen,
617            move |line, seen: &mut Vec<String>| {
618                Box::pin(async move {
619                    if line == "break" {
620                        seen.push(line);
621                        Next::Break
622                    } else {
623                        seen.push(line);
624                        Next::Continue
625                    }
626                })
627            },
628            LineParsingOptions::default(),
629        );
630
631        let _writer = tokio::spawn(async move {
632            write_half.write_all("start\n".as_bytes()).await.unwrap();
633            write_half.write_all("break\n".as_bytes()).await.unwrap();
634            write_half.write_all("end\n".as_bytes()).await.unwrap();
635
636            loop {
637                write_half
638                    .write_all("gibberish\n".as_bytes())
639                    .await
640                    .unwrap();
641                tokio::time::sleep(Duration::from_millis(50)).await;
642            }
643        });
644
645        let seen = collector.wait().await.unwrap();
646
647        assert_that(seen).contains_exactly(&["start", "break"]);
648    }
649
650    #[tokio::test]
651    async fn collect_lines_to_file() {
652        let (read_half, write_half) = tokio::io::duplex(64);
653        let mut os = SingleSubscriberOutputStream::from_stream(
654            read_half,
655            BackpressureControl::DropLatestIncomingIfBufferFull,
656            FromStreamOptions {
657                channel_capacity: 32,
658                ..Default::default()
659            },
660        );
661
662        let temp_file = tempfile::tempfile().unwrap();
663        let collector = os.collect_lines(
664            temp_file,
665            |line, temp_file| {
666                writeln!(temp_file, "{}", line).unwrap();
667                Next::Continue
668            },
669            LineParsingOptions::default(),
670        );
671
672        tokio::spawn(write_test_data(write_half)).await.unwrap();
673
674        let mut temp_file = collector.cancel().await.unwrap();
675        temp_file.seek(SeekFrom::Start(0)).unwrap();
676        let mut contents = String::new();
677        temp_file.read_to_string(&mut contents).unwrap();
678
679        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
680    }
681
682    #[tokio::test]
683    async fn collect_lines_async_to_file() {
684        let (read_half, write_half) = tokio::io::duplex(64);
685        let mut os = SingleSubscriberOutputStream::from_stream(
686            read_half,
687            BackpressureControl::DropLatestIncomingIfBufferFull,
688            FromStreamOptions {
689                chunk_size: 32,
690                ..Default::default()
691            },
692        );
693
694        let temp_file = tempfile::tempfile().unwrap();
695        let collector = os.collect_lines_async(
696            temp_file,
697            |line, temp_file| {
698                Box::pin(async move {
699                    writeln!(temp_file, "{}", line).unwrap();
700                    Next::Continue
701                })
702            },
703            LineParsingOptions::default(),
704        );
705
706        tokio::spawn(write_test_data(write_half)).await.unwrap();
707
708        let mut temp_file = collector.cancel().await.unwrap();
709        temp_file.seek(SeekFrom::Start(0)).unwrap();
710        let mut contents = String::new();
711        temp_file.read_to_string(&mut contents).unwrap();
712
713        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
714    }
715
716    #[tokio::test]
717    #[traced_test]
718    async fn collect_chunks_into_write_mapped() {
719        let (read_half, write_half) = tokio::io::duplex(64);
720        let mut os = SingleSubscriberOutputStream::from_stream(
721            read_half,
722            BackpressureControl::DropLatestIncomingIfBufferFull,
723            FromStreamOptions {
724                chunk_size: 32,
725                ..Default::default()
726            },
727        );
728
729        let temp_file = tokio::fs::File::options()
730            .create(true)
731            .truncate(true)
732            .write(true)
733            .read(true)
734            .open(std::env::temp_dir().join(
735                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
736            ))
737            .await
738            .unwrap();
739
740        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
741            String::from_utf8_lossy(chunk.as_ref()).to_string()
742        });
743
744        tokio::spawn(write_test_data(write_half)).await.unwrap();
745
746        let mut temp_file = collector.cancel().await.unwrap();
747        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
748        let mut contents = String::new();
749        temp_file.read_to_string(&mut contents).await.unwrap();
750
751        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
752    }
753
754    #[tokio::test]
755    #[traced_test]
756    async fn multiple_subscribers_are_not_possible() {
757        let (read_half, _write_half) = tokio::io::duplex(64);
758        let mut os = SingleSubscriberOutputStream::from_stream(
759            read_half,
760            BackpressureControl::DropLatestIncomingIfBufferFull,
761            FromStreamOptions::default(),
762        );
763
764        let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
765
766        // Doesn't matter if we call `inspect_lines` or some other "consuming" function instead.
767        assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
768            .has_type::<String>()
769            .is_equal_to("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.");
770    }
771}