Skip to main content

tokio_process_tools/output_stream/
single_subscriber.rs

1use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4    impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
5    impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
6    impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
7    impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
8};
9use crate::output_stream::{
10    BackpressureControl, Chunk, FromStreamOptions, LineWriteMode, Next, OutputStream, StreamEvent,
11};
12use crate::{LineParsingOptions, NumBytes, WaitForLineResult};
13use atomic_take::AtomicTake;
14use bytes::Buf;
15use std::borrow::Cow;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::time::Duration;
19use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
20use tokio::sync::mpsc;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::task::JoinHandle;
23
24/// The output stream from a process. Either representing stdout or stderr.
25///
26/// This is the single-subscriber variant, allowing for just one consumer.
27/// This has the upside of requiring as few memory allocations as possible.
28/// If multiple concurrent inspections are required, prefer using the
29/// `output_stream::broadcast::BroadcastOutputSteam`.
30pub struct SingleSubscriberOutputStream {
31    /// The task that captured our `mpsc::Sender` and is now asynchronously awaiting
32    /// new output from the underlying stream, sending it to our registered receiver (if present).
33    stream_reader: JoinHandle<()>,
34
35    /// The receiver is wrapped in a `Cell<Option<>>` to allow interior mutability and to take the
36    /// receiver out and move it into an inspector or collector task.
37    /// This enables `&self` methods while tracking if the receiver has been taken.
38    /// Once taken by a consumer, attempting to create another consumer will panic with a clear
39    /// message, stating that a broadcast subscriber should be used instead.
40    receiver: AtomicTake<mpsc::Receiver<StreamEvent>>,
41
42    /// The maximum size of every chunk read by the backing `stream_reader`.
43    chunk_size: NumBytes,
44
45    /// The maximum capacity of the channel caching the chunks before being processed.
46    max_channel_capacity: usize,
47
48    /// The backpressure strategy used by the stream reader.
49    backpressure_control: BackpressureControl,
50
51    /// Name of this stream.
52    name: &'static str,
53}
54
55impl OutputStream for SingleSubscriberOutputStream {
56    fn chunk_size(&self) -> NumBytes {
57        self.chunk_size
58    }
59
60    fn channel_capacity(&self) -> usize {
61        self.max_channel_capacity
62    }
63
64    fn name(&self) -> &'static str {
65        self.name
66    }
67}
68
69impl Drop for SingleSubscriberOutputStream {
70    fn drop(&mut self) {
71        self.stream_reader.abort();
72    }
73}
74
75impl Debug for SingleSubscriberOutputStream {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("SingleSubscriberOutputStream")
78            .field("output_collector", &"non-debug < JoinHandle<()> >")
79            .field(
80                "receiver",
81                &"non-debug < tokio::sync::mpsc::Receiver<StreamEvent> >",
82            )
83            .finish()
84    }
85}
86
87/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
88/// Every chunk sent into `sender` is a frozen slice of that buffer.
89/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
90#[expect(
91    clippy::too_many_lines,
92    reason = "the stream reader keeps tightly coupled buffer and backpressure state together"
93)]
94async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
95    mut read: R,
96    chunk_size: NumBytes,
97    sender: mpsc::Sender<StreamEvent>,
98    backpressure_control: BackpressureControl,
99) {
100    struct AfterSend {
101        do_break: bool,
102    }
103
104    enum TrySendStatus {
105        Sent,
106        Full,
107        Closed,
108    }
109
110    fn log_if_lagged(lagged: &mut usize) {
111        if *lagged > 0 {
112            tracing::debug!(lagged = *lagged, "Stream reader is lagging behind");
113            *lagged = 0;
114        }
115    }
116
117    fn try_send_gap(sender: &mpsc::Sender<StreamEvent>, lagged: &mut usize) -> TrySendStatus {
118        match sender.try_send(StreamEvent::Gap) {
119            Ok(()) => {
120                log_if_lagged(lagged);
121                TrySendStatus::Sent
122            }
123            Err(TrySendError::Full(_data)) => TrySendStatus::Full,
124            Err(TrySendError::Closed(_data)) => TrySendStatus::Closed,
125        }
126    }
127
128    fn try_send_chunk(
129        chunk: Chunk,
130        sender: &mpsc::Sender<StreamEvent>,
131        lagged: &mut usize,
132    ) -> TrySendStatus {
133        let event = StreamEvent::Chunk(chunk);
134        match sender.try_send(event) {
135            Ok(()) => {
136                log_if_lagged(lagged);
137                TrySendStatus::Sent
138            }
139            Err(TrySendError::Full(_data)) => {
140                *lagged += 1;
141                TrySendStatus::Full
142            }
143            Err(TrySendError::Closed(_data)) => {
144                // All receivers already dropped.
145                // We intentionally ignore this error.
146                // If it occurs, the user just isn't interested in
147                // newer chunks anymore.
148                TrySendStatus::Closed
149            }
150        }
151    }
152
153    async fn send_event(event: StreamEvent, sender: &mpsc::Sender<StreamEvent>) -> AfterSend {
154        match sender.send(event).await {
155            Ok(()) => {}
156            Err(_err) => {
157                // All receivers already dropped.
158                // We intentionally ignore this error.
159                // If it occurs, the user just isn't interested in
160                // newer chunks anymore.
161                return AfterSend { do_break: true };
162            }
163        }
164        AfterSend { do_break: false }
165    }
166
167    // NOTE: buf may grow when required!
168    let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
169    let mut lagged: usize = 0;
170    let mut gap_pending = false;
171    'outer: loop {
172        let _ = buf.try_reclaim(chunk_size.bytes());
173        match read.read_buf(&mut buf).await {
174            Ok(bytes_read) => {
175                let is_eof = bytes_read == 0;
176
177                if is_eof {
178                    match backpressure_control {
179                        BackpressureControl::DropLatestIncomingIfBufferFull => {
180                            if gap_pending {
181                                let after = send_event(StreamEvent::Gap, &sender).await;
182                                if after.do_break {
183                                    break 'outer;
184                                }
185                                gap_pending = false;
186                            }
187                            let after = send_event(StreamEvent::Eof, &sender).await;
188                            if after.do_break {
189                                break 'outer;
190                            }
191                        }
192                        BackpressureControl::BlockUntilBufferHasSpace => {
193                            let after = send_event(StreamEvent::Eof, &sender).await;
194                            if after.do_break {
195                                break 'outer;
196                            }
197                        }
198                    }
199                } else {
200                    while !buf.is_empty() {
201                        let split_to = usize::min(chunk_size.bytes(), buf.len());
202
203                        match backpressure_control {
204                            BackpressureControl::DropLatestIncomingIfBufferFull => {
205                                if gap_pending {
206                                    match try_send_gap(&sender, &mut lagged) {
207                                        TrySendStatus::Sent => {
208                                            gap_pending = false;
209                                        }
210                                        TrySendStatus::Full => {
211                                            let dropped_chunks = if chunk_size.bytes() == 0 {
212                                                buf.len()
213                                            } else {
214                                                buf.len().div_ceil(chunk_size.bytes())
215                                            };
216                                            buf.advance(buf.len());
217                                            lagged += dropped_chunks;
218                                            continue;
219                                        }
220                                        TrySendStatus::Closed => break 'outer,
221                                    }
222                                }
223
224                                let chunk = Chunk(buf.split_to(split_to).freeze());
225                                match try_send_chunk(chunk, &sender, &mut lagged) {
226                                    TrySendStatus::Sent => {}
227                                    TrySendStatus::Full => {
228                                        gap_pending = true;
229                                    }
230                                    TrySendStatus::Closed => break 'outer,
231                                }
232                            }
233                            BackpressureControl::BlockUntilBufferHasSpace => {
234                                let event =
235                                    StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze()));
236                                let after = send_event(event, &sender).await;
237                                if after.do_break {
238                                    break 'outer;
239                                }
240                            }
241                        }
242                    }
243                }
244
245                if is_eof {
246                    break;
247                }
248            }
249            Err(err) => panic!("Could not read from stream: {err}"),
250        }
251    }
252}
253
254impl SingleSubscriberOutputStream {
255    /// Creates a new single subscriber output stream from an async read stream.
256    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
257        stream: S,
258        stream_name: &'static str,
259        backpressure_control: BackpressureControl,
260        options: FromStreamOptions,
261    ) -> SingleSubscriberOutputStream {
262        options.chunk_size.assert_non_zero("options.chunk_size");
263
264        let (tx_stdout, rx_stdout) = mpsc::channel::<StreamEvent>(options.channel_capacity);
265
266        let stream_reader = tokio::spawn(read_chunked(
267            stream,
268            options.chunk_size,
269            tx_stdout,
270            backpressure_control,
271        ));
272
273        SingleSubscriberOutputStream {
274            stream_reader,
275            receiver: AtomicTake::new(rx_stdout),
276            chunk_size: options.chunk_size,
277            max_channel_capacity: options.channel_capacity,
278            backpressure_control,
279            name: stream_name,
280        }
281    }
282
283    /// Returns the configured backpressure policy.
284    pub fn backpressure_control(&self) -> BackpressureControl {
285        self.backpressure_control
286    }
287
288    fn take_receiver(&self) -> mpsc::Receiver<StreamEvent> {
289        self.receiver.take().unwrap_or_else(|| {
290            panic!(
291                "Cannot create multiple consumers on SingleSubscriberOutputStream (stream: '{}'). \
292                Only one inspector or collector can be active at a time. \
293                Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.",
294                self.name
295            )
296        })
297    }
298}
299
300// Expected types:
301// receiver: tokio::sync::mpsc::Receiver<StreamEvent>
302// term_rx: tokio::sync::oneshot::Receiver<()>
303macro_rules! handle_subscription {
304    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
305        $loop_label: loop {
306            tokio::select! {
307                out = $receiver.recv() => {
308                    match out {
309                        Some(event) => {
310                            let $chunk = event;
311                            $body
312                        }
313                        None => {
314                            // All senders have been dropped.
315                            break $loop_label;
316                        }
317                    }
318                }
319                _msg = &mut $term_rx => break $loop_label,
320            }
321        }
322    };
323}
324
325// Impls for inspecting the output of the stream.
326impl SingleSubscriberOutputStream {
327    /// Inspects chunks of output from the stream without storing them.
328    ///
329    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
330    /// processing or [`Next::Break`] to stop.
331    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
332    pub fn inspect_chunks(&self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
333        let mut receiver = self.take_receiver();
334        impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
335    }
336
337    /// Inspects lines of output from the stream without storing them.
338    ///
339    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
340    /// processing or [`Next::Break`] to stop.
341    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
342    pub fn inspect_lines(
343        &self,
344        mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
345        options: LineParsingOptions,
346    ) -> Inspector {
347        let mut receiver = self.take_receiver();
348        impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
349    }
350
351    /// Inspects lines of output from the stream without storing them, using an async closure.
352    ///
353    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
354    /// processing or [`Next::Break`] to stop.
355    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
356    pub fn inspect_lines_async<Fut>(
357        &self,
358        mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
359        options: LineParsingOptions,
360    ) -> Inspector
361    where
362        Fut: Future<Output = Next> + Send,
363    {
364        let mut receiver = self.take_receiver();
365        impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
366    }
367}
368
369// Impls for collecting the output of the stream.
370impl SingleSubscriberOutputStream {
371    /// Collects chunks from the stream into a sink.
372    ///
373    /// The provided closure is called for each chunk, with mutable access to the sink.
374    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
375    pub fn collect_chunks<S: Sink>(
376        &self,
377        into: S,
378        collect: impl Fn(Chunk, &mut S) + Send + 'static,
379    ) -> Collector<S> {
380        let mut receiver = self.take_receiver();
381        impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
382    }
383
384    /// Collects chunks from the stream into a sink using an async collector.
385    ///
386    /// The provided async collector is called for each chunk, with mutable access to the sink.
387    ///
388    /// # Example
389    ///
390    /// ```rust,no_run
391    /// use tokio_process_tools::{AsyncChunkCollector, Chunk, Next, Process};
392    ///
393    /// struct ExtendChunks;
394    ///
395    /// impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
396    ///     async fn collect<'a>(&'a mut self, chunk: Chunk, bytes: &'a mut Vec<u8>) -> Next {
397    ///         bytes.extend_from_slice(chunk.as_ref());
398    ///         Next::Continue
399    ///     }
400    /// }
401    ///
402    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
403    /// let process = Process::new(tokio::process::Command::new("some-command"))
404    ///     .spawn_single_subscriber()?;
405    /// let collector = process.stdout().collect_chunks_async(Vec::new(), ExtendChunks);
406    /// # drop(collector);
407    /// # Ok(())
408    /// # }
409    /// ```
410    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
411    pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
412    where
413        S: Sink,
414        C: AsyncChunkCollector<S>,
415    {
416        let mut receiver = self.take_receiver();
417        impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
418    }
419
420    /// Collects lines from the stream into a sink.
421    ///
422    /// The provided closure is called for each line, with mutable access to the sink.
423    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
424    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
425    pub fn collect_lines<S: Sink>(
426        &self,
427        into: S,
428        collect: impl Fn(Cow<'_, str>, &mut S) -> Next + Send + 'static,
429        options: LineParsingOptions,
430    ) -> Collector<S> {
431        let mut receiver = self.take_receiver();
432        impl_collect_lines!(
433            self.name(),
434            receiver,
435            collect,
436            options,
437            into,
438            handle_subscription
439        )
440    }
441
442    /// Collects lines from the stream into a sink using an async collector.
443    ///
444    /// The provided async collector is called for each line, with mutable access to the sink.
445    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
446    ///
447    /// # Example
448    ///
449    /// ```rust,no_run
450    /// use std::borrow::Cow;
451    /// use tokio_process_tools::{AsyncLineCollector, LineParsingOptions, Next, Process};
452    ///
453    /// struct PushLines;
454    ///
455    /// impl AsyncLineCollector<Vec<String>> for PushLines {
456    ///     async fn collect<'a>(
457    ///         &'a mut self,
458    ///         line: Cow<'a, str>,
459    ///         lines: &'a mut Vec<String>,
460    ///     ) -> Next {
461    ///         lines.push(line.into_owned());
462    ///         Next::Continue
463    ///     }
464    /// }
465    ///
466    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
467    /// let process = Process::new(tokio::process::Command::new("some-command"))
468    ///     .spawn_single_subscriber()?;
469    /// let collector = process.stdout().collect_lines_async(
470    ///     Vec::new(),
471    ///     PushLines,
472    ///     LineParsingOptions::default(),
473    /// );
474    /// # drop(collector);
475    /// # Ok(())
476    /// # }
477    /// ```
478    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
479    pub fn collect_lines_async<S, C>(
480        &self,
481        into: S,
482        collect: C,
483        options: LineParsingOptions,
484    ) -> Collector<S>
485    where
486        S: Sink,
487        C: AsyncLineCollector<S>,
488    {
489        let mut receiver = self.take_receiver();
490        impl_collect_lines_async!(
491            self.name(),
492            receiver,
493            collect,
494            options,
495            into,
496            handle_subscription
497        )
498    }
499
500    /// Convenience method to collect all chunks into a `Vec<u8>`.
501    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
502    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
503        self.collect_chunks(Vec::new(), |chunk, vec| {
504            vec.extend_from_slice(chunk.as_ref());
505        })
506    }
507
508    /// Convenience method to collect all lines into a `Vec<String>`.
509    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
510    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
511        self.collect_lines(
512            Vec::new(),
513            |line, vec| {
514                vec.push(line.into_owned());
515                Next::Continue
516            },
517            options,
518        )
519    }
520
521    /// Collects chunks into an async writer.
522    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
523    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
524        &self,
525        write: W,
526    ) -> Collector<W> {
527        let mut receiver = self.take_receiver();
528        impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
529    }
530
531    /// Collects lines into an async writer.
532    ///
533    /// Parsed lines no longer include their trailing newline byte, so `mode` controls whether a
534    /// `\n` delimiter should be reintroduced for each emitted line.
535    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
536    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
537        &self,
538        write: W,
539        options: LineParsingOptions,
540        mode: LineWriteMode,
541    ) -> Collector<W> {
542        let mut receiver = self.take_receiver();
543        impl_collect_lines_into_write!(
544            self.name(),
545            receiver,
546            write,
547            options,
548            mode,
549            handle_subscription
550        )
551    }
552
553    /// Collects chunks into an async writer after mapping them with the provided function.
554    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
555    pub fn collect_chunks_into_write_mapped<
556        W: Sink + AsyncWriteExt + Unpin,
557        B: AsRef<[u8]> + Send,
558    >(
559        &self,
560        write: W,
561        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
562    ) -> Collector<W> {
563        let mut receiver = self.take_receiver();
564        impl_collect_chunks_into_write_mapped!(
565            self.name(),
566            receiver,
567            write,
568            mapper,
569            handle_subscription
570        )
571    }
572
573    /// Collects lines into an async writer after mapping them with the provided function.
574    ///
575    /// `mode` applies after `mapper`: choose [`LineWriteMode::AsIs`] when the mapped output
576    /// already contains delimiters, or [`LineWriteMode::AppendLf`] to append `\n` after each
577    /// mapped line.
578    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
579    pub fn collect_lines_into_write_mapped<
580        W: Sink + AsyncWriteExt + Unpin,
581        B: AsRef<[u8]> + Send,
582    >(
583        &self,
584        write: W,
585        mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
586        options: LineParsingOptions,
587        mode: LineWriteMode,
588    ) -> Collector<W> {
589        let mut receiver = self.take_receiver();
590        impl_collect_lines_into_write_mapped!(
591            self.name(),
592            receiver,
593            write,
594            mapper,
595            options,
596            mode,
597            handle_subscription
598        )
599    }
600}
601
602// Impls for waiting for a specific line of output.
603impl SingleSubscriberOutputStream {
604    async fn wait_for_line_inner(
605        &self,
606        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
607        options: LineParsingOptions,
608    ) -> WaitForLineResult {
609        let mut receiver = self.take_receiver();
610        let mut parser = crate::output_stream::LineParserState::new();
611
612        loop {
613            match receiver.recv().await {
614                Some(StreamEvent::Chunk(chunk)) => {
615                    if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
616                        if predicate(line) {
617                            Next::Break
618                        } else {
619                            Next::Continue
620                        }
621                    }) == Next::Break
622                    {
623                        return WaitForLineResult::Matched;
624                    }
625                }
626                Some(StreamEvent::Gap) => {
627                    parser.on_gap();
628                }
629                Some(StreamEvent::Eof) | None => {
630                    if visit_final_line(&parser, |line| {
631                        if predicate(line) {
632                            Next::Break
633                        } else {
634                            Next::Continue
635                        }
636                    }) == Next::Break
637                    {
638                        return WaitForLineResult::Matched;
639                    }
640                    return WaitForLineResult::StreamClosed;
641                }
642            }
643        }
644    }
645
646    /// Waits for a line that matches the given predicate.
647    ///
648    /// Returns [`WaitForLineResult::Matched`] if a matching line is found, or
649    /// [`WaitForLineResult::StreamClosed`] if the stream ends first.
650    /// This method never returns [`WaitForLineResult::Timeout`]; use
651    /// [`SingleSubscriberOutputStream::wait_for_line_with_timeout`] if you need a bounded wait.
652    ///
653    /// This method consumes the only receiver owned by the single-subscriber stream. After calling
654    /// it, no other inspector or collector can be created for the same stream. Use the broadcast
655    /// stream implementation if you need multiple consumers.
656    ///
657    /// When chunks are dropped because [`BackpressureControl::DropLatestIncomingIfBufferFull`]
658    /// is active, this waiter discards any partial line in progress and resynchronizes at the next
659    /// newline instead of matching across the gap.
660    pub async fn wait_for_line(
661        &self,
662        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
663        options: LineParsingOptions,
664    ) -> WaitForLineResult {
665        self.wait_for_line_inner(predicate, options).await
666    }
667
668    /// Waits for a line that matches the given predicate, with a timeout.
669    ///
670    /// Returns [`WaitForLineResult::Matched`] if a matching line is found,
671    /// [`WaitForLineResult::StreamClosed`] if the stream ends first, or
672    /// [`WaitForLineResult::Timeout`] if the timeout expires first.
673    /// This is the only line-wait variant on this type that can return
674    /// [`WaitForLineResult::Timeout`].
675    ///
676    /// This method consumes the only receiver owned by the single-subscriber stream. After calling
677    /// it, no other inspector or collector can be created for the same stream. Use the broadcast
678    /// stream implementation if you need multiple consumers.
679    ///
680    /// When chunks are dropped because [`BackpressureControl::DropLatestIncomingIfBufferFull`]
681    /// is active, this waiter discards any partial line in progress and resynchronizes at the next
682    /// newline instead of matching across the gap.
683    pub async fn wait_for_line_with_timeout(
684        &self,
685        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
686        options: LineParsingOptions,
687        timeout: Duration,
688    ) -> WaitForLineResult {
689        tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
690            .await
691            .unwrap_or(WaitForLineResult::Timeout)
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use crate::output_stream::Chunk;
698    use crate::output_stream::StreamEvent;
699    use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
700    use crate::output_stream::tests::write_test_data;
701    use crate::output_stream::{BackpressureControl, FromStreamOptions, LineWriteMode, Next};
702    use crate::single_subscriber::read_chunked;
703    use crate::{AsyncChunkCollector, AsyncLineCollector};
704    use crate::{LineParsingOptions, NumBytes, NumBytesExt, WaitForLineResult};
705    use assertr::prelude::*;
706    use atomic_take::AtomicTake;
707    use bytes::Bytes;
708    use mockall::{automock, predicate};
709    use std::borrow::Cow;
710    use std::io::{Cursor, Read, Seek, SeekFrom, Write};
711    use std::time::Duration;
712    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
713    use tokio::sync::mpsc;
714    use tokio::time::sleep;
715    use tracing_test::traced_test;
716
717    struct BreakOnLine;
718
719    impl AsyncLineCollector<Vec<String>> for BreakOnLine {
720        async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
721            if line == "break" {
722                seen.push(line.into_owned());
723                Next::Break
724            } else {
725                seen.push(line.into_owned());
726                Next::Continue
727            }
728        }
729    }
730
731    struct WriteLine;
732
733    impl AsyncLineCollector<std::fs::File> for WriteLine {
734        async fn collect<'a>(
735            &'a mut self,
736            line: Cow<'a, str>,
737            temp_file: &'a mut std::fs::File,
738        ) -> Next {
739            writeln!(temp_file, "{line}").unwrap();
740            Next::Continue
741        }
742    }
743
744    struct ExtendChunks;
745
746    impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
747        async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
748            seen.extend_from_slice(chunk.as_ref());
749            Next::Continue
750        }
751    }
752
753    #[test]
754    #[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
755    fn from_stream_panics_on_zero_chunk_size() {
756        let _stream = SingleSubscriberOutputStream::from_stream(
757            tokio::io::empty(),
758            "custom",
759            BackpressureControl::DropLatestIncomingIfBufferFull,
760            FromStreamOptions {
761                chunk_size: NumBytes::zero(),
762                ..FromStreamOptions::default()
763            },
764        );
765    }
766
767    #[tokio::test]
768    #[traced_test]
769    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
770    {
771        let (read_half, mut write_half) = tokio::io::duplex(64);
772        let (tx, mut rx) = mpsc::channel(64);
773
774        // Let's preemptively write more data into the stream than our later selected chunk size (2)
775        // can handle, forcing the initial read to completely fill our chunk buffer.
776        // Our expectation is that we still receive all data written here through multiple
777        // consecutive reads.
778        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
779        // full buffer of, must not prevent this from happening but allocate more memory instead!
780        write_half.write_all(b"hello world").await.unwrap();
781        write_half.flush().await.unwrap();
782
783        let stream_reader = tokio::spawn(read_chunked(
784            read_half,
785            2.bytes(),
786            tx,
787            BackpressureControl::DropLatestIncomingIfBufferFull,
788        ));
789
790        drop(write_half); // This closes the stream and should let stream_reader terminate.
791        stream_reader.await.unwrap();
792
793        let mut chunks = Vec::<String>::new();
794        while let Some(event) = rx.recv().await {
795            match event {
796                StreamEvent::Chunk(chunk) => {
797                    chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
798                }
799                StreamEvent::Gap => {}
800                StreamEvent::Eof => break,
801            }
802        }
803        assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
804    }
805
806    #[tokio::test]
807    async fn read_chunked_sends_pending_gap_before_terminal_eof() {
808        let read = Cursor::new(b"aabbcc".to_vec());
809        let (tx, mut rx) = mpsc::channel(1);
810
811        let stream_reader = tokio::spawn(read_chunked(
812            read,
813            2.bytes(),
814            tx,
815            BackpressureControl::DropLatestIncomingIfBufferFull,
816        ));
817
818        match rx.recv().await.unwrap() {
819            StreamEvent::Chunk(chunk) => {
820                assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
821            }
822            other => panic!("expected first chunk, got {other:?}"),
823        }
824        assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
825        assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
826
827        stream_reader.await.unwrap();
828        assert_that!(rx.recv().await).is_none();
829    }
830
831    #[tokio::test]
832    async fn read_chunked_sends_pending_gap_before_resumed_chunk_delivery() {
833        let (read_half, mut write_half) = tokio::io::duplex(64);
834        let (tx, mut rx) = mpsc::channel(2);
835
836        let stream_reader = tokio::spawn(read_chunked(
837            read_half,
838            2.bytes(),
839            tx,
840            BackpressureControl::DropLatestIncomingIfBufferFull,
841        ));
842
843        write_half.write_all(b"aabbcc").await.unwrap();
844        write_half.flush().await.unwrap();
845        sleep(Duration::from_millis(25)).await;
846
847        for expected in [b"aa".as_slice(), b"bb".as_slice()] {
848            match rx.recv().await.unwrap() {
849                StreamEvent::Chunk(chunk) => {
850                    assert_that!(chunk.as_ref()).is_equal_to(expected);
851                }
852                other => panic!("expected buffered chunk, got {other:?}"),
853            }
854        }
855
856        write_half.write_all(b"dd").await.unwrap();
857        write_half.flush().await.unwrap();
858        drop(write_half);
859
860        assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
861        match rx.recv().await.unwrap() {
862            StreamEvent::Chunk(chunk) => {
863                assert_that!(chunk.as_ref()).is_equal_to(b"dd".as_slice());
864            }
865            other => panic!("expected resumed chunk, got {other:?}"),
866        }
867        assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
868
869        stream_reader.await.unwrap();
870        assert_that!(rx.recv().await).is_none();
871    }
872
873    #[tokio::test]
874    async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
875        let (read_half, mut write_half) = tokio::io::duplex(64);
876        let os = SingleSubscriberOutputStream::from_stream(
877            read_half,
878            "custom",
879            BackpressureControl::DropLatestIncomingIfBufferFull,
880            FromStreamOptions::default(),
881        );
882
883        let waiter = tokio::spawn(async move {
884            os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
885                .await
886        });
887
888        write_half.write_all(b"booting\nready\n").await.unwrap();
889        write_half.flush().await.unwrap();
890        drop(write_half);
891
892        let result = waiter.await.unwrap();
893        assert_eq!(result, WaitForLineResult::Matched);
894    }
895
896    #[tokio::test]
897    async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
898        let (read_half, mut write_half) = tokio::io::duplex(64);
899        let os = SingleSubscriberOutputStream::from_stream(
900            read_half,
901            "custom",
902            BackpressureControl::DropLatestIncomingIfBufferFull,
903            FromStreamOptions::default(),
904        );
905
906        let waiter = tokio::spawn(async move {
907            os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
908                .await
909        });
910
911        write_half
912            .write_all(b"booting\nstill starting\n")
913            .await
914            .unwrap();
915        write_half.flush().await.unwrap();
916        drop(write_half);
917
918        let result = waiter.await.unwrap();
919        assert_eq!(result, WaitForLineResult::StreamClosed);
920    }
921
922    #[tokio::test]
923    async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
924        let (read_half, mut write_half) = tokio::io::duplex(64);
925        let os = SingleSubscriberOutputStream::from_stream(
926            read_half,
927            "custom",
928            BackpressureControl::DropLatestIncomingIfBufferFull,
929            FromStreamOptions::default(),
930        );
931
932        let waiter = tokio::spawn(async move {
933            os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
934                .await
935        });
936
937        write_half.write_all(b"booting\nready").await.unwrap();
938        write_half.flush().await.unwrap();
939        drop(write_half);
940
941        let result = waiter.await.unwrap();
942        assert_eq!(result, WaitForLineResult::Matched);
943    }
944
945    #[tokio::test]
946    async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
947        let (read_half, _write_half) = tokio::io::duplex(64);
948        let os = SingleSubscriberOutputStream::from_stream(
949            read_half,
950            "custom",
951            BackpressureControl::DropLatestIncomingIfBufferFull,
952            FromStreamOptions::default(),
953        );
954
955        let result = os
956            .wait_for_line_with_timeout(
957                |line| line.contains("ready"),
958                LineParsingOptions::default(),
959                Duration::from_millis(25),
960            )
961            .await;
962
963        assert_eq!(result, WaitForLineResult::Timeout);
964    }
965
966    #[tokio::test]
967    async fn wait_for_line_returns_stream_closed_when_stream_ends_after_writes_without_match() {
968        let (read_half, mut write_half) = tokio::io::duplex(64);
969        let os = SingleSubscriberOutputStream::from_stream(
970            read_half,
971            "custom",
972            BackpressureControl::DropLatestIncomingIfBufferFull,
973            FromStreamOptions::default(),
974        );
975
976        write_half.write_all(b"booting\n").await.unwrap();
977        write_half.flush().await.unwrap();
978        drop(write_half);
979
980        // No yield needed: `SingleSubscriberOutputStream` is built on an mpsc channel
981        // that buffers every chunk and the terminal EOF event regardless of when the
982        // consumer attaches, so this is race-free by construction.
983        let result = os
984            .wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
985            .await;
986
987        assert_eq!(result, WaitForLineResult::StreamClosed);
988    }
989
990    #[tokio::test]
991    async fn wait_for_line_does_not_match_across_explicit_gap_event() {
992        let (tx, rx) = mpsc::channel::<StreamEvent>(4);
993        let os = SingleSubscriberOutputStream {
994            stream_reader: tokio::spawn(async {}),
995            receiver: AtomicTake::new(rx),
996            chunk_size: 4.bytes(),
997            max_channel_capacity: 4,
998            backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
999            name: "custom",
1000        };
1001
1002        tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
1003            .await
1004            .unwrap();
1005        tx.send(StreamEvent::Gap).await.unwrap();
1006        tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
1007            .await
1008            .unwrap();
1009        tx.send(StreamEvent::Eof).await.unwrap();
1010        drop(tx);
1011
1012        let result = os
1013            .wait_for_line(|line| line == "ready", LineParsingOptions::default())
1014            .await;
1015
1016        assert_eq!(result, WaitForLineResult::StreamClosed);
1017    }
1018
1019    #[tokio::test]
1020    #[traced_test]
1021    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
1022        let (read_half, mut write_half) = tokio::io::duplex(64);
1023        let os = SingleSubscriberOutputStream::from_stream(
1024            read_half,
1025            "custom",
1026            BackpressureControl::DropLatestIncomingIfBufferFull,
1027            FromStreamOptions {
1028                channel_capacity: 2,
1029                ..Default::default()
1030            },
1031        );
1032
1033        let inspector = os.inspect_lines_async(
1034            |_line| async move {
1035                // Mimic a slow consumer.
1036                sleep(Duration::from_millis(100)).await;
1037                Next::Continue
1038            },
1039            LineParsingOptions::default(),
1040        );
1041
1042        #[rustfmt::skip]
1043        let producer = tokio::spawn(async move {
1044            for count in 1..=15 {
1045                write_half
1046                    .write_all(format!("{count}\n").as_bytes())
1047                    .await
1048                    .unwrap();
1049                sleep(Duration::from_millis(25)).await;
1050            }
1051        });
1052
1053        producer.await.unwrap();
1054        inspector.wait().await.unwrap();
1055        drop(os);
1056
1057        logs_assert(|lines: &[&str]| {
1058            let lagged_logs = lines
1059                .iter()
1060                .filter(|line| line.contains("Stream reader is lagging behind lagged="))
1061                .count();
1062            if lagged_logs == 0 {
1063                return Err("Expected at least one lagged log".to_string());
1064            }
1065            Ok(())
1066        });
1067    }
1068
1069    #[tokio::test]
1070    async fn inspect_lines() {
1071        #[automock]
1072        trait LineVisitor {
1073            fn visit(&self, line: String);
1074        }
1075
1076        #[rustfmt::skip]
1077        fn configure(mock: &mut MockLineVisitor) {
1078            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
1079            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
1080            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
1081            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
1082            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
1083        }
1084
1085        let (read_half, write_half) = tokio::io::duplex(64);
1086        let os = SingleSubscriberOutputStream::from_stream(
1087            read_half,
1088            "custom",
1089            BackpressureControl::DropLatestIncomingIfBufferFull,
1090            FromStreamOptions::default(),
1091        );
1092
1093        let mut mock = MockLineVisitor::new();
1094        configure(&mut mock);
1095
1096        let inspector = os.inspect_lines(
1097            move |line| {
1098                mock.visit(line.into_owned());
1099                Next::Continue
1100            },
1101            LineParsingOptions::default(),
1102        );
1103
1104        tokio::spawn(write_test_data(write_half)).await.unwrap();
1105
1106        inspector.cancel().await.unwrap();
1107        drop(os);
1108    }
1109
1110    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
1111    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
1112    /// would not terminate the collector!
1113    #[tokio::test]
1114    #[traced_test]
1115    async fn inspect_lines_async() {
1116        let (read_half, mut write_half) = tokio::io::duplex(64);
1117        let os = SingleSubscriberOutputStream::from_stream(
1118            read_half,
1119            "custom",
1120            BackpressureControl::DropLatestIncomingIfBufferFull,
1121            FromStreamOptions {
1122                chunk_size: 32.bytes(),
1123                ..Default::default()
1124            },
1125        );
1126
1127        let seen: Vec<String> = Vec::new();
1128        let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
1129
1130        let _writer = tokio::spawn(async move {
1131            write_half.write_all("start\n".as_bytes()).await.unwrap();
1132            write_half.write_all("break\n".as_bytes()).await.unwrap();
1133            write_half.write_all("end\n".as_bytes()).await.unwrap();
1134
1135            loop {
1136                write_half
1137                    .write_all("gibberish\n".as_bytes())
1138                    .await
1139                    .unwrap();
1140                tokio::time::sleep(Duration::from_millis(50)).await;
1141            }
1142        });
1143
1144        let seen = collector.wait().await.unwrap();
1145
1146        assert_that!(seen).contains_exactly(["start", "break"]);
1147    }
1148
1149    #[tokio::test]
1150    async fn collect_chunks_async_into_vec() {
1151        let (read_half, mut write_half) = tokio::io::duplex(64);
1152        let os = SingleSubscriberOutputStream::from_stream(
1153            read_half,
1154            "custom",
1155            BackpressureControl::DropLatestIncomingIfBufferFull,
1156            FromStreamOptions {
1157                chunk_size: 2.bytes(),
1158                ..Default::default()
1159            },
1160        );
1161
1162        let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
1163
1164        write_half.write_all(b"abcdef").await.unwrap();
1165        drop(write_half);
1166
1167        let seen = collector.wait().await.unwrap();
1168        assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1169    }
1170
1171    #[tokio::test]
1172    async fn collect_lines_to_file() {
1173        let (read_half, write_half) = tokio::io::duplex(64);
1174        let os = SingleSubscriberOutputStream::from_stream(
1175            read_half,
1176            "custom",
1177            BackpressureControl::DropLatestIncomingIfBufferFull,
1178            FromStreamOptions {
1179                channel_capacity: 32,
1180                ..Default::default()
1181            },
1182        );
1183
1184        let temp_file = tempfile::tempfile().unwrap();
1185        let collector = os.collect_lines(
1186            temp_file,
1187            |line, temp_file| {
1188                writeln!(temp_file, "{line}").unwrap();
1189                Next::Continue
1190            },
1191            LineParsingOptions::default(),
1192        );
1193
1194        tokio::spawn(write_test_data(write_half)).await.unwrap();
1195
1196        let mut temp_file = collector.cancel().await.unwrap();
1197        temp_file.seek(SeekFrom::Start(0)).unwrap();
1198        let mut contents = String::new();
1199        temp_file.read_to_string(&mut contents).unwrap();
1200
1201        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1202    }
1203
1204    #[tokio::test]
1205    async fn collect_lines_async_to_file() {
1206        let (read_half, write_half) = tokio::io::duplex(64);
1207        let os = SingleSubscriberOutputStream::from_stream(
1208            read_half,
1209            "custom",
1210            BackpressureControl::DropLatestIncomingIfBufferFull,
1211            FromStreamOptions {
1212                chunk_size: 32.bytes(),
1213                ..Default::default()
1214            },
1215        );
1216
1217        let temp_file = tempfile::tempfile().unwrap();
1218        let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
1219
1220        tokio::spawn(write_test_data(write_half)).await.unwrap();
1221
1222        let mut temp_file = collector.cancel().await.unwrap();
1223        temp_file.seek(SeekFrom::Start(0)).unwrap();
1224        let mut contents = String::new();
1225        temp_file.read_to_string(&mut contents).unwrap();
1226
1227        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1228    }
1229
1230    #[tokio::test]
1231    async fn collect_lines_into_write_respects_requested_line_delimiter_mode() {
1232        let (read_half, write_half) = tokio::io::duplex(64);
1233        let os = SingleSubscriberOutputStream::from_stream(
1234            read_half,
1235            "custom",
1236            BackpressureControl::DropLatestIncomingIfBufferFull,
1237            FromStreamOptions::default(),
1238        );
1239
1240        let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
1241        let collector = os.collect_lines_into_write(
1242            temp_file,
1243            LineParsingOptions::default(),
1244            LineWriteMode::AsIs,
1245        );
1246
1247        tokio::spawn(write_test_data(write_half)).await.unwrap();
1248
1249        let mut temp_file = collector.cancel().await.unwrap();
1250        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1251        let mut contents = String::new();
1252        temp_file.read_to_string(&mut contents).await.unwrap();
1253
1254        assert_that!(contents).is_equal_to("Cargo.lockCargo.tomlREADME.mdsrctarget");
1255    }
1256
1257    #[tokio::test]
1258    #[traced_test]
1259    async fn collect_chunks_into_write_mapped() {
1260        let (read_half, write_half) = tokio::io::duplex(64);
1261        let os = SingleSubscriberOutputStream::from_stream(
1262            read_half,
1263            "custom",
1264            BackpressureControl::DropLatestIncomingIfBufferFull,
1265            FromStreamOptions {
1266                chunk_size: 32.bytes(),
1267                ..Default::default()
1268            },
1269        );
1270
1271        let temp_file = tokio::fs::File::options()
1272            .create(true)
1273            .truncate(true)
1274            .write(true)
1275            .read(true)
1276            .open(std::env::temp_dir().join(
1277                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
1278            ))
1279            .await
1280            .unwrap();
1281
1282        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
1283            String::from_utf8_lossy(chunk.as_ref()).to_string()
1284        });
1285
1286        tokio::spawn(write_test_data(write_half)).await.unwrap();
1287
1288        let mut temp_file = collector.cancel().await.unwrap();
1289        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1290        let mut contents = String::new();
1291        temp_file.read_to_string(&mut contents).await.unwrap();
1292
1293        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1294    }
1295
1296    #[tokio::test]
1297    #[traced_test]
1298    async fn multiple_subscribers_are_not_possible() {
1299        let (read_half, _write_half) = tokio::io::duplex(64);
1300        let os = SingleSubscriberOutputStream::from_stream(
1301            read_half,
1302            "custom",
1303            BackpressureControl::DropLatestIncomingIfBufferFull,
1304            FromStreamOptions::default(),
1305        );
1306
1307        let _inspector = os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
1308
1309        // Doesn't matter if we call `inspect_lines` or some other "consuming" function instead.
1310        assert_that_panic_by(move || {
1311            os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default())
1312        })
1313        .has_type::<String>()
1314        .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'custom'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
1315    }
1316}