tokio_process_tools/output_stream/
single_subscriber.rs

1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
5    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
6};
7use crate::output_stream::{
8    BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
9};
10use crate::{InspectorError, LineParsingOptions};
11use std::fmt::{Debug, Formatter};
12use std::future::Future;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
16use tokio::sync::mpsc::error::TrySendError;
17use tokio::sync::{RwLock, mpsc};
18use tokio::task::JoinHandle;
19use tokio::time::error::Elapsed;
20
21/// The output stream from a process. Either representing stdout or stderr.
22///
23/// This is the single-subscriber variant, allowing for just one consumer.
24/// This has the upside of requiring as few memory allocations as possible.
25/// If multiple concurrent inspections are required, prefer using the
26/// `output_stream::broadcast::BroadcastOutputSteam`.
27pub struct SingleSubscriberOutputStream {
28    /// The task that captured our `mpsc::Sender` and is now asynchronously awaiting
29    /// new output from the underlying stream, sending it to our registered receiver (if present).
30    stream_reader: JoinHandle<()>,
31
32    /// The receiver is wrapped in an `Option` so that we can take it out of it and move it
33    /// into an inspector or collector task.
34    receiver: Option<mpsc::Receiver<Option<Chunk>>>,
35}
36
37impl OutputStream for SingleSubscriberOutputStream {}
38
39impl Drop for SingleSubscriberOutputStream {
40    fn drop(&mut self) {
41        self.stream_reader.abort();
42    }
43}
44
45impl Debug for SingleSubscriberOutputStream {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("SingleSubscriberOutputStream")
48            .field("output_collector", &"non-debug < JoinHandle<()> >")
49            .field(
50                "receiver",
51                &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
52            )
53            .finish()
54    }
55}
56
57/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
58/// Every chunk sent into `sender` is a frozen slice of that buffer.
59/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
60async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
61    mut read: R,
62    chunk_size: usize,
63    sender: mpsc::Sender<Option<Chunk>>,
64    backpressure_control: BackpressureControl,
65) {
66    struct AfterSend {
67        do_break: bool,
68    }
69
70    fn try_send_chunk(
71        chunk: Option<Chunk>,
72        sender: &mpsc::Sender<Option<Chunk>>,
73        lagged: &mut usize,
74    ) -> AfterSend {
75        match sender.try_send(chunk) {
76            Ok(()) => {
77                if *lagged > 0 {
78                    tracing::debug!(lagged, "Stream reader is lagging behind");
79                    *lagged = 0;
80                }
81            }
82            Err(err) => {
83                match err {
84                    TrySendError::Full(_data) => {
85                        *lagged += 1;
86                    }
87                    TrySendError::Closed(_data) => {
88                        // All receivers already dropped.
89                        // We intentionally ignore this error.
90                        // If it occurs, the user just isn't interested in
91                        // newer chunks anymore.
92                        return AfterSend { do_break: true };
93                    }
94                }
95            }
96        }
97        AfterSend { do_break: false }
98    }
99
100    async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
101        match sender.send(chunk).await {
102            Ok(()) => {}
103            Err(_err) => {
104                // All receivers already dropped.
105                // We intentionally ignore this error.
106                // If it occurs, the user just isn't interested in
107                // newer chunks anymore.
108                return AfterSend { do_break: true };
109            }
110        }
111        AfterSend { do_break: false }
112    }
113
114    // NOTE: buf may grow when required!
115    let mut buf = bytes::BytesMut::with_capacity(chunk_size);
116    let mut lagged: usize = 0;
117    loop {
118        let _ = buf.try_reclaim(chunk_size);
119        match read.read_buf(&mut buf).await {
120            Ok(bytes_read) => {
121                let is_eof = bytes_read == 0;
122
123                match is_eof {
124                    true => match backpressure_control {
125                        BackpressureControl::DropLatestIncomingIfBufferFull => {
126                            let after = try_send_chunk(None, &sender, &mut lagged);
127                            if after.do_break {
128                                break;
129                            }
130                        }
131                        BackpressureControl::BlockUntilBufferHasSpace => {
132                            let after = send_chunk(None, &sender).await;
133                            if after.do_break {
134                                break;
135                            }
136                        }
137                    },
138                    false => {
139                        while !buf.is_empty() {
140                            let split_to = usize::min(chunk_size, buf.len());
141
142                            match backpressure_control {
143                                BackpressureControl::DropLatestIncomingIfBufferFull => {
144                                    let after = try_send_chunk(
145                                        Some(Chunk(buf.split_to(split_to).freeze())),
146                                        &sender,
147                                        &mut lagged,
148                                    );
149                                    if after.do_break {
150                                        break;
151                                    }
152                                }
153                                BackpressureControl::BlockUntilBufferHasSpace => {
154                                    let after = send_chunk(
155                                        Some(Chunk(buf.split_to(split_to).freeze())),
156                                        &sender,
157                                    )
158                                    .await;
159                                    if after.do_break {
160                                        break;
161                                    }
162                                }
163                            }
164                        }
165                    }
166                };
167
168                if is_eof {
169                    break;
170                }
171            }
172            Err(err) => panic!("Could not read from stream: {err}"),
173        }
174    }
175}
176
177impl SingleSubscriberOutputStream {
178    /// Creates a new single subscriber output stream from an async read stream.
179    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
180        stream: S,
181        backpressure_control: BackpressureControl,
182        options: FromStreamOptions,
183    ) -> SingleSubscriberOutputStream {
184        let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
185
186        let stream_reader = tokio::spawn(read_chunked(
187            stream,
188            options.chunk_size,
189            tx_stdout,
190            backpressure_control,
191        ));
192
193        SingleSubscriberOutputStream {
194            stream_reader,
195            receiver: Some(rx_stdout),
196        }
197    }
198
199    fn take_receiver(&mut self) -> mpsc::Receiver<Option<Chunk>> {
200        self.receiver.take().expect("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.")
201    }
202}
203
204// Expected types:
205// receiver: tokio::sync::mpsc::Receiver<Option<Chunk>>
206// term_rx: tokio::sync::oneshot::Receiver<()>
207macro_rules! handle_subscription {
208    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
209        $loop_label: loop {
210            tokio::select! {
211                out = $receiver.recv() => {
212                    match out {
213                        Some(maybe_chunk) => {
214                            let $chunk = maybe_chunk;
215                            $body
216                        }
217                        None => {
218                            // All senders have been dropped.
219                            break $loop_label;
220                        }
221                    }
222                }
223                _msg = &mut $term_rx => break $loop_label,
224            }
225        }
226    };
227}
228
229// Impls for inspecting the output of the stream.
230impl SingleSubscriberOutputStream {
231    /// Inspects chunks of output from the stream without storing them.
232    ///
233    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
234    /// processing or [`Next::Break`] to stop.
235    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
236    pub fn inspect_chunks(&mut self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
237        let mut receiver = self.take_receiver();
238        impl_inspect_chunks!(receiver, f, handle_subscription)
239    }
240
241    /// Inspects lines of output from the stream without storing them.
242    ///
243    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
244    /// processing or [`Next::Break`] to stop.
245    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
246    pub fn inspect_lines(
247        &mut self,
248        mut f: impl FnMut(String) -> Next + Send + 'static,
249        options: LineParsingOptions,
250    ) -> Inspector {
251        let mut receiver = self.take_receiver();
252        impl_inspect_lines!(receiver, f, options, handle_subscription)
253    }
254
255    /// Inspects lines of output from the stream without storing them, using an async closure.
256    ///
257    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
258    /// processing or [`Next::Break`] to stop.
259    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
260    pub fn inspect_lines_async<Fut>(
261        &mut self,
262        mut f: impl FnMut(String) -> Fut + Send + 'static,
263        options: LineParsingOptions,
264    ) -> Inspector
265    where
266        Fut: Future<Output = Next> + Send,
267    {
268        let mut receiver = self.take_receiver();
269        impl_inspect_lines_async!(receiver, f, options, handle_subscription)
270    }
271}
272
273// Impls for collecting the output of the stream.
274impl SingleSubscriberOutputStream {
275    /// Collects chunks from the stream into a sink.
276    ///
277    /// The provided closure is called for each chunk, with mutable access to the sink.
278    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
279    pub fn collect_chunks<S: Sink>(
280        &mut self,
281        into: S,
282        collect: impl Fn(Chunk, &mut S) + Send + 'static,
283    ) -> Collector<S> {
284        let sink = Arc::new(RwLock::new(into));
285        let mut receiver = self.take_receiver();
286        impl_collect_chunks!(receiver, collect, sink, handle_subscription)
287    }
288
289    /// Collects chunks from the stream into a sink using an async closure.
290    ///
291    /// The provided async closure is called for each chunk, with mutable access to the sink.
292    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
293    pub fn collect_chunks_async<S, F>(&mut self, into: S, collect: F) -> Collector<S>
294    where
295        S: Sink,
296        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
297    {
298        let sink = Arc::new(RwLock::new(into));
299        let mut receiver = self.take_receiver();
300        impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
301    }
302
303    /// Collects lines from the stream into a sink.
304    ///
305    /// The provided closure is called for each line, with mutable access to the sink.
306    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
307    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
308    pub fn collect_lines<S: Sink>(
309        &mut self,
310        into: S,
311        collect: impl Fn(String, &mut S) -> Next + Send + 'static,
312        options: LineParsingOptions,
313    ) -> Collector<S> {
314        let sink = Arc::new(RwLock::new(into));
315        let mut receiver = self.take_receiver();
316        impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
317    }
318
319    /// Collects lines from the stream into a sink using an async closure.
320    ///
321    /// The provided async closure is called for each line, with mutable access to the sink.
322    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
323    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
324    pub fn collect_lines_async<S, F>(
325        &mut self,
326        into: S,
327        collect: F,
328        options: LineParsingOptions,
329    ) -> Collector<S>
330    where
331        S: Sink,
332        F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
333    {
334        let sink = Arc::new(RwLock::new(into));
335        let mut receiver = self.take_receiver();
336        impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
337    }
338
339    /// Convenience method to collect all chunks into a `Vec<u8>`.
340    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
341    pub fn collect_chunks_into_vec(&mut self) -> Collector<Vec<u8>> {
342        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
343    }
344
345    /// Convenience method to collect all lines into a `Vec<String>`.
346    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
347    pub fn collect_lines_into_vec(
348        &mut self,
349        options: LineParsingOptions,
350    ) -> Collector<Vec<String>> {
351        self.collect_lines(
352            Vec::new(),
353            |line, vec| {
354                vec.push(line);
355                Next::Continue
356            },
357            options,
358        )
359    }
360
361    /// Collects chunks into an async writer.
362    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
363    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
364        &mut self,
365        write: W,
366    ) -> Collector<W> {
367        self.collect_chunks_async(write, move |chunk, write| {
368            Box::pin(async move {
369                if let Err(err) = write.write_all(chunk.as_ref()).await {
370                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
371                };
372                Next::Continue
373            })
374        })
375    }
376
377    /// Collects lines into an async writer.
378    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
379    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
380        &mut self,
381        write: W,
382        options: LineParsingOptions,
383    ) -> Collector<W> {
384        self.collect_lines_async(
385            write,
386            move |line, write| {
387                Box::pin(async move {
388                    if let Err(err) = write.write_all(line.as_bytes()).await {
389                        tracing::warn!("Could not write line to write sink: {err:#?}");
390                    };
391                    Next::Continue
392                })
393            },
394            options,
395        )
396    }
397
398    /// Collects chunks into an async writer after mapping them with the provided function.
399    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
400    pub fn collect_chunks_into_write_mapped<
401        W: Sink + AsyncWriteExt + Unpin,
402        B: AsRef<[u8]> + Send,
403    >(
404        &mut self,
405        write: W,
406        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
407    ) -> Collector<W> {
408        self.collect_chunks_async(write, move |chunk, write| {
409            Box::pin(async move {
410                let mapped = mapper(chunk);
411                let mapped = mapped.as_ref();
412                if let Err(err) = write.write_all(mapped).await {
413                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
414                };
415                Next::Continue
416            })
417        })
418    }
419
420    /// Collects lines into an async writer after mapping them with the provided function.
421    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
422    pub fn collect_lines_into_write_mapped<
423        W: Sink + AsyncWriteExt + Unpin,
424        B: AsRef<[u8]> + Send,
425    >(
426        &mut self,
427        write: W,
428        mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
429        options: LineParsingOptions,
430    ) -> Collector<W> {
431        self.collect_lines_async(
432            write,
433            move |line, write| {
434                Box::pin(async move {
435                    let mapped = mapper(line);
436                    let mapped = mapped.as_ref();
437                    if let Err(err) = write.write_all(mapped).await {
438                        tracing::warn!("Could not write line to write sink: {err:#?}");
439                    };
440                    Next::Continue
441                })
442            },
443            options,
444        )
445    }
446}
447
448// Impls for waiting for a specific line of output.
449impl SingleSubscriberOutputStream {
450    /// Waits for a line that matches the given predicate.
451    ///
452    /// This method blocks until a line is found that satisfies the predicate.
453    pub async fn wait_for_line(
454        &mut self,
455        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
456        options: LineParsingOptions,
457    ) {
458        let inspector = self.inspect_lines(
459            move |line| {
460                if predicate(line) {
461                    Next::Break
462                } else {
463                    Next::Continue
464                }
465            },
466            options,
467        );
468        match inspector.wait().await {
469            Ok(()) => {}
470            Err(err) => match err {
471                InspectorError::TaskJoin(join_error) => {
472                    panic!("Inspector task join error: {join_error:#?}");
473                }
474            },
475        };
476    }
477
478    /// Waits for a line that matches the given predicate, with a timeout.
479    ///
480    /// Returns `Ok(())` if a matching line is found, or `Err(Elapsed)` if the timeout expires.
481    pub async fn wait_for_line_with_timeout(
482        &mut self,
483        predicate: impl Fn(String) -> bool + Send + Sync + 'static,
484        options: LineParsingOptions,
485        timeout: Duration,
486    ) -> Result<(), Elapsed> {
487        tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use crate::LineParsingOptions;
494    use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
495    use crate::output_stream::tests::write_test_data;
496    use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
497    use crate::single_subscriber::read_chunked;
498    use assertr::prelude::*;
499    use mockall::{automock, predicate};
500    use std::io::{Read, Seek, SeekFrom, Write};
501    use std::time::Duration;
502    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
503    use tokio::sync::mpsc;
504    use tokio::time::sleep;
505    use tracing_test::traced_test;
506
507    #[tokio::test]
508    #[traced_test]
509    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
510    {
511        let (read_half, mut write_half) = tokio::io::duplex(64);
512        let (tx, mut rx) = mpsc::channel(64);
513
514        // Let's preemptively write more data into the stream than our later selected chunk size (2)
515        // can handle, forcing the initial read to completely fill our chunk buffer.
516        // Our expectation is that we still receive all data written here through multiple
517        // consecutive reads.
518        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
519        // full buffer of, must not prevent this from happening but allocate more memory instead!
520        write_half.write_all(b"hello world").await.unwrap();
521        write_half.flush().await.unwrap();
522
523        let stream_reader = tokio::spawn(read_chunked(
524            read_half,
525            2,
526            tx,
527            BackpressureControl::DropLatestIncomingIfBufferFull,
528        ));
529
530        drop(write_half); // This closes the stream and should let stream_reader terminate.
531        stream_reader.await.unwrap();
532
533        let mut chunks = Vec::<String>::new();
534        while let Some(Some(chunk)) = rx.recv().await {
535            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
536        }
537        assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
538    }
539
540    #[tokio::test]
541    #[traced_test]
542    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
543        let (read_half, mut write_half) = tokio::io::duplex(64);
544        let mut os = SingleSubscriberOutputStream::from_stream(
545            read_half,
546            BackpressureControl::DropLatestIncomingIfBufferFull,
547            FromStreamOptions {
548                channel_capacity: 2,
549                ..Default::default()
550            },
551        );
552
553        let inspector = os.inspect_lines_async(
554            async |_line| {
555                // Mimic a slow consumer.
556                sleep(Duration::from_millis(100)).await;
557                Next::Continue
558            },
559            LineParsingOptions::default(),
560        );
561
562        #[rustfmt::skip]
563        let producer = tokio::spawn(async move {
564            for count in 1..=15 {
565                write_half
566                    .write(format!("{count}\n").as_bytes())
567                    .await
568                    .unwrap();
569                sleep(Duration::from_millis(25)).await;
570            }
571        });
572
573        producer.await.unwrap();
574        inspector.wait().await.unwrap();
575        drop(os);
576
577        logs_assert(|lines: &[&str]| {
578            match lines
579                .iter()
580                .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
581                .count()
582            {
583                1 => {}
584                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
585            };
586            match lines
587                .iter()
588                .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
589                .count()
590            {
591                2 => {}
592                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
593            };
594            Ok(())
595        });
596    }
597
598    #[tokio::test]
599    async fn inspect_lines() {
600        let (read_half, write_half) = tokio::io::duplex(64);
601        let mut os = SingleSubscriberOutputStream::from_stream(
602            read_half,
603            BackpressureControl::DropLatestIncomingIfBufferFull,
604            FromStreamOptions::default(),
605        );
606
607        #[automock]
608        trait LineVisitor {
609            fn visit(&self, line: String);
610        }
611
612        let mut mock = MockLineVisitor::new();
613        #[rustfmt::skip]
614        fn configure(mock: &mut MockLineVisitor) {
615            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
616            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
617            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
618            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
619            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
620        }
621        configure(&mut mock);
622
623        let inspector = os.inspect_lines(
624            move |line| {
625                mock.visit(line);
626                Next::Continue
627            },
628            LineParsingOptions::default(),
629        );
630
631        tokio::spawn(write_test_data(write_half)).await.unwrap();
632
633        inspector.cancel().await.unwrap();
634        drop(os)
635    }
636
637    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
638    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
639    /// would not terminate the collector!
640    #[tokio::test]
641    #[traced_test]
642    async fn inspect_lines_async() {
643        let (read_half, mut write_half) = tokio::io::duplex(64);
644        let mut os = SingleSubscriberOutputStream::from_stream(
645            read_half,
646            BackpressureControl::DropLatestIncomingIfBufferFull,
647            FromStreamOptions {
648                chunk_size: 32,
649                ..Default::default()
650            },
651        );
652
653        let seen: Vec<String> = Vec::new();
654        let collector = os.collect_lines_async(
655            seen,
656            move |line, seen: &mut Vec<String>| {
657                Box::pin(async move {
658                    if line == "break" {
659                        seen.push(line);
660                        Next::Break
661                    } else {
662                        seen.push(line);
663                        Next::Continue
664                    }
665                })
666            },
667            LineParsingOptions::default(),
668        );
669
670        let _writer = tokio::spawn(async move {
671            write_half.write_all("start\n".as_bytes()).await.unwrap();
672            write_half.write_all("break\n".as_bytes()).await.unwrap();
673            write_half.write_all("end\n".as_bytes()).await.unwrap();
674
675            loop {
676                write_half
677                    .write_all("gibberish\n".as_bytes())
678                    .await
679                    .unwrap();
680                tokio::time::sleep(Duration::from_millis(50)).await;
681            }
682        });
683
684        let seen = collector.wait().await.unwrap();
685
686        assert_that(seen).contains_exactly(&["start", "break"]);
687    }
688
689    #[tokio::test]
690    async fn collect_lines_to_file() {
691        let (read_half, write_half) = tokio::io::duplex(64);
692        let mut os = SingleSubscriberOutputStream::from_stream(
693            read_half,
694            BackpressureControl::DropLatestIncomingIfBufferFull,
695            FromStreamOptions {
696                channel_capacity: 32,
697                ..Default::default()
698            },
699        );
700
701        let temp_file = tempfile::tempfile().unwrap();
702        let collector = os.collect_lines(
703            temp_file,
704            |line, temp_file| {
705                writeln!(temp_file, "{}", line).unwrap();
706                Next::Continue
707            },
708            LineParsingOptions::default(),
709        );
710
711        tokio::spawn(write_test_data(write_half)).await.unwrap();
712
713        let mut temp_file = collector.cancel().await.unwrap();
714        temp_file.seek(SeekFrom::Start(0)).unwrap();
715        let mut contents = String::new();
716        temp_file.read_to_string(&mut contents).unwrap();
717
718        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
719    }
720
721    #[tokio::test]
722    async fn collect_lines_async_to_file() {
723        let (read_half, write_half) = tokio::io::duplex(64);
724        let mut os = SingleSubscriberOutputStream::from_stream(
725            read_half,
726            BackpressureControl::DropLatestIncomingIfBufferFull,
727            FromStreamOptions {
728                chunk_size: 32,
729                ..Default::default()
730            },
731        );
732
733        let temp_file = tempfile::tempfile().unwrap();
734        let collector = os.collect_lines_async(
735            temp_file,
736            |line, temp_file| {
737                Box::pin(async move {
738                    writeln!(temp_file, "{}", line).unwrap();
739                    Next::Continue
740                })
741            },
742            LineParsingOptions::default(),
743        );
744
745        tokio::spawn(write_test_data(write_half)).await.unwrap();
746
747        let mut temp_file = collector.cancel().await.unwrap();
748        temp_file.seek(SeekFrom::Start(0)).unwrap();
749        let mut contents = String::new();
750        temp_file.read_to_string(&mut contents).unwrap();
751
752        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
753    }
754
755    #[tokio::test]
756    #[traced_test]
757    async fn collect_chunks_into_write_mapped() {
758        let (read_half, write_half) = tokio::io::duplex(64);
759        let mut os = SingleSubscriberOutputStream::from_stream(
760            read_half,
761            BackpressureControl::DropLatestIncomingIfBufferFull,
762            FromStreamOptions {
763                chunk_size: 32,
764                ..Default::default()
765            },
766        );
767
768        let temp_file = tokio::fs::File::options()
769            .create(true)
770            .truncate(true)
771            .write(true)
772            .read(true)
773            .open(std::env::temp_dir().join(
774                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
775            ))
776            .await
777            .unwrap();
778
779        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
780            String::from_utf8_lossy(chunk.as_ref()).to_string()
781        });
782
783        tokio::spawn(write_test_data(write_half)).await.unwrap();
784
785        let mut temp_file = collector.cancel().await.unwrap();
786        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
787        let mut contents = String::new();
788        temp_file.read_to_string(&mut contents).await.unwrap();
789
790        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
791    }
792
793    #[tokio::test]
794    #[traced_test]
795    async fn multiple_subscribers_are_not_possible() {
796        let (read_half, _write_half) = tokio::io::duplex(64);
797        let mut os = SingleSubscriberOutputStream::from_stream(
798            read_half,
799            BackpressureControl::DropLatestIncomingIfBufferFull,
800            FromStreamOptions::default(),
801        );
802
803        let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
804
805        // Doesn't matter if we call `inspect_lines` or some other "consuming" function instead.
806        assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
807            .has_type::<String>()
808            .is_equal_to("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.");
809    }
810}