tokio_process_tools/output_stream/
single_subscriber.rs

1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::error::OutputError;
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5    impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6    impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{
9    BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
10};
11use crate::{InspectorError, LineParsingOptions, NumBytes};
12use std::borrow::Cow;
13use std::cell::Cell;
14use std::fmt::{Debug, Formatter};
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
19use tokio::sync::mpsc::error::TrySendError;
20use tokio::sync::{RwLock, mpsc};
21use tokio::task::JoinHandle;
22
23/// The output stream from a process. Either representing stdout or stderr.
24///
25/// This is the single-subscriber variant, allowing for just one consumer.
26/// This has the upside of requiring as few memory allocations as possible.
27/// If multiple concurrent inspections are required, prefer using the
28/// `output_stream::broadcast::BroadcastOutputSteam`.
29pub struct SingleSubscriberOutputStream {
30    /// The task that captured our `mpsc::Sender` and is now asynchronously awaiting
31    /// new output from the underlying stream, sending it to our registered receiver (if present).
32    stream_reader: JoinHandle<()>,
33
34    /// The receiver is wrapped in a `Cell<Option<>>` to allow interior mutability and to take the
35    /// receiver out and move it into an inspector or collector task.
36    /// This enables `&self` methods while tracking if the receiver has been taken.
37    /// Once taken by a consumer, attempting to create another consumer will panic with a clear
38    /// message, stating that a broadcast subscriber should be used instead.
39    receiver: Cell<Option<mpsc::Receiver<Option<Chunk>>>>,
40
41    /// The maximum size of every chunk read by the backing `stream_reader`.
42    chunk_size: NumBytes,
43
44    /// The maximum capacity of the channel caching the chunks before being processed.
45    max_channel_capacity: usize,
46
47    /// Name of this stream.
48    name: &'static str,
49}
50
51impl OutputStream for SingleSubscriberOutputStream {
52    fn chunk_size(&self) -> NumBytes {
53        self.chunk_size
54    }
55
56    fn channel_capacity(&self) -> usize {
57        self.max_channel_capacity
58    }
59
60    fn name(&self) -> &'static str {
61        self.name
62    }
63}
64
65impl Drop for SingleSubscriberOutputStream {
66    fn drop(&mut self) {
67        self.stream_reader.abort();
68    }
69}
70
71impl Debug for SingleSubscriberOutputStream {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("SingleSubscriberOutputStream")
74            .field("output_collector", &"non-debug < JoinHandle<()> >")
75            .field(
76                "receiver",
77                &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
78            )
79            .finish()
80    }
81}
82
83/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
84/// Every chunk sent into `sender` is a frozen slice of that buffer.
85/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
86async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
87    mut read: R,
88    chunk_size: NumBytes,
89    sender: mpsc::Sender<Option<Chunk>>,
90    backpressure_control: BackpressureControl,
91) {
92    struct AfterSend {
93        do_break: bool,
94    }
95
96    fn try_send_chunk(
97        chunk: Option<Chunk>,
98        sender: &mpsc::Sender<Option<Chunk>>,
99        lagged: &mut usize,
100    ) -> AfterSend {
101        match sender.try_send(chunk) {
102            Ok(()) => {
103                if *lagged > 0 {
104                    tracing::debug!(lagged, "Stream reader is lagging behind");
105                    *lagged = 0;
106                }
107            }
108            Err(err) => {
109                match err {
110                    TrySendError::Full(_data) => {
111                        *lagged += 1;
112                    }
113                    TrySendError::Closed(_data) => {
114                        // All receivers already dropped.
115                        // We intentionally ignore this error.
116                        // If it occurs, the user just isn't interested in
117                        // newer chunks anymore.
118                        return AfterSend { do_break: true };
119                    }
120                }
121            }
122        }
123        AfterSend { do_break: false }
124    }
125
126    async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
127        match sender.send(chunk).await {
128            Ok(()) => {}
129            Err(_err) => {
130                // All receivers already dropped.
131                // We intentionally ignore this error.
132                // If it occurs, the user just isn't interested in
133                // newer chunks anymore.
134                return AfterSend { do_break: true };
135            }
136        }
137        AfterSend { do_break: false }
138    }
139
140    // NOTE: buf may grow when required!
141    let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
142    let mut lagged: usize = 0;
143    loop {
144        let _ = buf.try_reclaim(chunk_size.bytes());
145        match read.read_buf(&mut buf).await {
146            Ok(bytes_read) => {
147                let is_eof = bytes_read == 0;
148
149                match is_eof {
150                    true => match backpressure_control {
151                        BackpressureControl::DropLatestIncomingIfBufferFull => {
152                            let after = try_send_chunk(None, &sender, &mut lagged);
153                            if after.do_break {
154                                break;
155                            }
156                        }
157                        BackpressureControl::BlockUntilBufferHasSpace => {
158                            let after = send_chunk(None, &sender).await;
159                            if after.do_break {
160                                break;
161                            }
162                        }
163                    },
164                    false => {
165                        while !buf.is_empty() {
166                            let split_to = usize::min(chunk_size.bytes(), buf.len());
167
168                            match backpressure_control {
169                                BackpressureControl::DropLatestIncomingIfBufferFull => {
170                                    let after = try_send_chunk(
171                                        Some(Chunk(buf.split_to(split_to).freeze())),
172                                        &sender,
173                                        &mut lagged,
174                                    );
175                                    if after.do_break {
176                                        break;
177                                    }
178                                }
179                                BackpressureControl::BlockUntilBufferHasSpace => {
180                                    let after = send_chunk(
181                                        Some(Chunk(buf.split_to(split_to).freeze())),
182                                        &sender,
183                                    )
184                                    .await;
185                                    if after.do_break {
186                                        break;
187                                    }
188                                }
189                            }
190                        }
191                    }
192                };
193
194                if is_eof {
195                    break;
196                }
197            }
198            Err(err) => panic!("Could not read from stream: {err}"),
199        }
200    }
201}
202
203impl SingleSubscriberOutputStream {
204    /// Creates a new single subscriber output stream from an async read stream.
205    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
206        stream: S,
207        stream_name: &'static str,
208        backpressure_control: BackpressureControl,
209        options: FromStreamOptions,
210    ) -> SingleSubscriberOutputStream {
211        let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
212
213        let stream_reader = tokio::spawn(read_chunked(
214            stream,
215            options.chunk_size,
216            tx_stdout,
217            backpressure_control,
218        ));
219
220        SingleSubscriberOutputStream {
221            stream_reader,
222            receiver: Cell::new(Some(rx_stdout)),
223            chunk_size: options.chunk_size,
224            max_channel_capacity: options.channel_capacity,
225            name: stream_name,
226        }
227    }
228
229    fn take_receiver(&self) -> mpsc::Receiver<Option<Chunk>> {
230        self.receiver.take().unwrap_or_else(|| {
231            panic!(
232                "Cannot create multiple consumers on SingleSubscriberOutputStream (stream: '{}'). \
233                Only one inspector or collector can be active at a time. \
234                Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.",
235                self.name
236            )
237        })
238    }
239}
240
241// Expected types:
242// receiver: tokio::sync::mpsc::Receiver<Option<Chunk>>
243// term_rx: tokio::sync::oneshot::Receiver<()>
244macro_rules! handle_subscription {
245    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
246        $loop_label: loop {
247            tokio::select! {
248                out = $receiver.recv() => {
249                    match out {
250                        Some(maybe_chunk) => {
251                            let $chunk = maybe_chunk;
252                            $body
253                        }
254                        None => {
255                            // All senders have been dropped.
256                            break $loop_label;
257                        }
258                    }
259                }
260                _msg = &mut $term_rx => break $loop_label,
261            }
262        }
263    };
264}
265
266// Impls for inspecting the output of the stream.
267impl SingleSubscriberOutputStream {
268    /// Inspects chunks of output from the stream without storing them.
269    ///
270    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
271    /// processing or [`Next::Break`] to stop.
272    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
273    pub fn inspect_chunks(&self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
274        let mut receiver = self.take_receiver();
275        impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
276    }
277
278    /// Inspects lines of output from the stream without storing them.
279    ///
280    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
281    /// processing or [`Next::Break`] to stop.
282    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
283    pub fn inspect_lines(
284        &self,
285        mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
286        options: LineParsingOptions,
287    ) -> Inspector {
288        let mut receiver = self.take_receiver();
289        impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
290    }
291
292    /// Inspects lines of output from the stream without storing them, using an async closure.
293    ///
294    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
295    /// processing or [`Next::Break`] to stop.
296    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
297    pub fn inspect_lines_async<Fut>(
298        &self,
299        mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
300        options: LineParsingOptions,
301    ) -> Inspector
302    where
303        Fut: Future<Output = Next> + Send,
304    {
305        let mut receiver = self.take_receiver();
306        impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
307    }
308}
309
310// Impls for collecting the output of the stream.
311impl SingleSubscriberOutputStream {
312    /// Collects chunks from the stream into a sink.
313    ///
314    /// The provided closure is called for each chunk, with mutable access to the sink.
315    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
316    pub fn collect_chunks<S: Sink>(
317        &self,
318        into: S,
319        collect: impl Fn(Chunk, &mut S) + Send + 'static,
320    ) -> Collector<S> {
321        let sink = Arc::new(RwLock::new(into));
322        let mut receiver = self.take_receiver();
323        impl_collect_chunks!(self.name(), receiver, collect, sink, handle_subscription)
324    }
325
326    /// Collects chunks from the stream into a sink using an async closure.
327    ///
328    /// The provided async closure is called for each chunk, with mutable access to the sink.
329    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
330    pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
331    where
332        S: Sink,
333        F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
334    {
335        let sink = Arc::new(RwLock::new(into));
336        let mut receiver = self.take_receiver();
337        impl_collect_chunks_async!(self.name(), receiver, collect, sink, handle_subscription)
338    }
339
340    /// Collects lines from the stream into a sink.
341    ///
342    /// The provided closure is called for each line, with mutable access to the sink.
343    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
344    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
345    pub fn collect_lines<S: Sink>(
346        &self,
347        into: S,
348        collect: impl Fn(Cow<'_, str>, &mut S) -> Next + Send + 'static,
349        options: LineParsingOptions,
350    ) -> Collector<S> {
351        let sink = Arc::new(RwLock::new(into));
352        let mut receiver = self.take_receiver();
353        impl_collect_lines!(
354            self.name(),
355            receiver,
356            collect,
357            options,
358            sink,
359            handle_subscription
360        )
361    }
362
363    /// Collects lines from the stream into a sink using an async closure.
364    ///
365    /// The provided async closure is called for each line, with mutable access to the sink.
366    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
367    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
368    pub fn collect_lines_async<S, F>(
369        &self,
370        into: S,
371        collect: F,
372        options: LineParsingOptions,
373    ) -> Collector<S>
374    where
375        S: Sink,
376        F: for<'a> Fn(Cow<'a, str>, &'a mut S) -> AsyncCollectFn<'a> + Send + Sync + 'static,
377    {
378        let sink = Arc::new(RwLock::new(into));
379        let mut receiver = self.take_receiver();
380        impl_collect_lines_async!(
381            self.name(),
382            receiver,
383            collect,
384            options,
385            sink,
386            handle_subscription
387        )
388    }
389
390    /// Convenience method to collect all chunks into a `Vec<u8>`.
391    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
392    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
393        self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
394    }
395
396    /// Convenience method to collect all lines into a `Vec<String>`.
397    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
398    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
399        self.collect_lines(
400            Vec::new(),
401            |line, vec| {
402                vec.push(line.into_owned());
403                Next::Continue
404            },
405            options,
406        )
407    }
408
409    /// Collects chunks into an async writer.
410    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
411    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
412        &self,
413        write: W,
414    ) -> Collector<W> {
415        self.collect_chunks_async(write, move |chunk, write| {
416            Box::pin(async move {
417                if let Err(err) = write.write_all(chunk.as_ref()).await {
418                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
419                };
420                Next::Continue
421            })
422        })
423    }
424
425    /// Collects lines into an async writer.
426    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
427    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
428        &self,
429        write: W,
430        options: LineParsingOptions,
431    ) -> Collector<W> {
432        self.collect_lines_async(
433            write,
434            move |line, write| {
435                Box::pin(async move {
436                    if let Err(err) = write.write_all(line.as_bytes()).await {
437                        tracing::warn!("Could not write line to write sink: {err:#?}");
438                    };
439                    Next::Continue
440                })
441            },
442            options,
443        )
444    }
445
446    /// Collects chunks into an async writer after mapping them with the provided function.
447    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
448    pub fn collect_chunks_into_write_mapped<
449        W: Sink + AsyncWriteExt + Unpin,
450        B: AsRef<[u8]> + Send,
451    >(
452        &self,
453        write: W,
454        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
455    ) -> Collector<W> {
456        self.collect_chunks_async(write, move |chunk, write| {
457            Box::pin(async move {
458                let mapped = mapper(chunk);
459                let mapped = mapped.as_ref();
460                if let Err(err) = write.write_all(mapped).await {
461                    tracing::warn!("Could not write chunk to write sink: {err:#?}");
462                };
463                Next::Continue
464            })
465        })
466    }
467
468    /// Collects lines into an async writer after mapping them with the provided function.
469    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
470    pub fn collect_lines_into_write_mapped<
471        W: Sink + AsyncWriteExt + Unpin,
472        B: AsRef<[u8]> + Send,
473    >(
474        &self,
475        write: W,
476        mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
477        options: LineParsingOptions,
478    ) -> Collector<W> {
479        self.collect_lines_async(
480            write,
481            move |line, write| {
482                Box::pin(async move {
483                    let mapped = mapper(line);
484                    let mapped = mapped.as_ref();
485                    if let Err(err) = write.write_all(mapped).await {
486                        tracing::warn!("Could not write line to write sink: {err:#?}");
487                    };
488                    Next::Continue
489                })
490            },
491            options,
492        )
493    }
494}
495
496// Impls for waiting for a specific line of output.
497impl SingleSubscriberOutputStream {
498    /// Waits for a line that matches the given predicate.
499    ///
500    /// This method blocks until a line is found that satisfies the predicate.
501    pub async fn wait_for_line(
502        &self,
503        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
504        options: LineParsingOptions,
505    ) -> Result<(), InspectorError> {
506        let inspector = self.inspect_lines(
507            move |line| {
508                if predicate(line) {
509                    Next::Break
510                } else {
511                    Next::Continue
512                }
513            },
514            options,
515        );
516        inspector.wait().await
517    }
518
519    /// Waits for a line that matches the given predicate, with a timeout.
520    ///
521    /// Returns `Ok(())` if a matching line is found, or `Err(OutputError::Timeout)` if the timeout expires.
522    pub async fn wait_for_line_with_timeout(
523        &self,
524        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
525        options: LineParsingOptions,
526        timeout: Duration,
527    ) -> Result<(), OutputError> {
528        tokio::time::timeout(timeout, self.wait_for_line(predicate, options))
529            .await
530            .map_err(|_elapsed| OutputError::Timeout {
531                stream_name: self.name(),
532                timeout,
533            })
534            .and_then(|res| res.map_err(OutputError::InspectorFailed))
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
541    use crate::output_stream::tests::write_test_data;
542    use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
543    use crate::single_subscriber::read_chunked;
544    use crate::{LineParsingOptions, NumBytesExt};
545    use assertr::prelude::*;
546    use mockall::{automock, predicate};
547    use std::io::{Read, Seek, SeekFrom, Write};
548    use std::time::Duration;
549    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
550    use tokio::sync::mpsc;
551    use tokio::time::sleep;
552    use tracing_test::traced_test;
553
554    #[tokio::test]
555    #[traced_test]
556    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
557    {
558        let (read_half, mut write_half) = tokio::io::duplex(64);
559        let (tx, mut rx) = mpsc::channel(64);
560
561        // Let's preemptively write more data into the stream than our later selected chunk size (2)
562        // can handle, forcing the initial read to completely fill our chunk buffer.
563        // Our expectation is that we still receive all data written here through multiple
564        // consecutive reads.
565        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
566        // full buffer of, must not prevent this from happening but allocate more memory instead!
567        write_half.write_all(b"hello world").await.unwrap();
568        write_half.flush().await.unwrap();
569
570        let stream_reader = tokio::spawn(read_chunked(
571            read_half,
572            2.bytes(),
573            tx,
574            BackpressureControl::DropLatestIncomingIfBufferFull,
575        ));
576
577        drop(write_half); // This closes the stream and should let stream_reader terminate.
578        stream_reader.await.unwrap();
579
580        let mut chunks = Vec::<String>::new();
581        while let Some(Some(chunk)) = rx.recv().await {
582            chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
583        }
584        assert_that(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
585    }
586
587    #[tokio::test]
588    #[traced_test]
589    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
590        let (read_half, mut write_half) = tokio::io::duplex(64);
591        let os = SingleSubscriberOutputStream::from_stream(
592            read_half,
593            "custom",
594            BackpressureControl::DropLatestIncomingIfBufferFull,
595            FromStreamOptions {
596                channel_capacity: 2,
597                ..Default::default()
598            },
599        );
600
601        let inspector = os.inspect_lines_async(
602            |_line| async move {
603                // Mimic a slow consumer.
604                sleep(Duration::from_millis(100)).await;
605                Next::Continue
606            },
607            LineParsingOptions::default(),
608        );
609
610        #[rustfmt::skip]
611        let producer = tokio::spawn(async move {
612            for count in 1..=15 {
613                write_half
614                    .write_all(format!("{count}\n").as_bytes())
615                    .await
616                    .unwrap();
617                sleep(Duration::from_millis(25)).await;
618            }
619        });
620
621        producer.await.unwrap();
622        inspector.wait().await.unwrap();
623        drop(os);
624
625        logs_assert(|lines: &[&str]| {
626            match lines
627                .iter()
628                .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
629                .count()
630            {
631                1 => {}
632                n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
633            };
634            match lines
635                .iter()
636                .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
637                .count()
638            {
639                2 => {}
640                n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
641            };
642            Ok(())
643        });
644    }
645
646    #[tokio::test]
647    async fn inspect_lines() {
648        let (read_half, write_half) = tokio::io::duplex(64);
649        let os = SingleSubscriberOutputStream::from_stream(
650            read_half,
651            "custom",
652            BackpressureControl::DropLatestIncomingIfBufferFull,
653            FromStreamOptions::default(),
654        );
655
656        #[automock]
657        trait LineVisitor {
658            fn visit(&self, line: String);
659        }
660
661        let mut mock = MockLineVisitor::new();
662        #[rustfmt::skip]
663        fn configure(mock: &mut MockLineVisitor) {
664            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
665            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
666            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
667            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
668            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
669        }
670        configure(&mut mock);
671
672        let inspector = os.inspect_lines(
673            move |line| {
674                mock.visit(line.into_owned());
675                Next::Continue
676            },
677            LineParsingOptions::default(),
678        );
679
680        tokio::spawn(write_test_data(write_half)).await.unwrap();
681
682        inspector.cancel().await.unwrap();
683        drop(os)
684    }
685
686    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
687    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
688    /// would not terminate the collector!
689    #[tokio::test]
690    #[traced_test]
691    async fn inspect_lines_async() {
692        let (read_half, mut write_half) = tokio::io::duplex(64);
693        let os = SingleSubscriberOutputStream::from_stream(
694            read_half,
695            "custom",
696            BackpressureControl::DropLatestIncomingIfBufferFull,
697            FromStreamOptions {
698                chunk_size: 32.bytes(),
699                ..Default::default()
700            },
701        );
702
703        let seen: Vec<String> = Vec::new();
704        let collector = os.collect_lines_async(
705            seen,
706            move |line, seen: &mut Vec<String>| {
707                Box::pin(async move {
708                    if line == "break" {
709                        seen.push(line.into_owned());
710                        Next::Break
711                    } else {
712                        seen.push(line.into_owned());
713                        Next::Continue
714                    }
715                })
716            },
717            LineParsingOptions::default(),
718        );
719
720        let _writer = tokio::spawn(async move {
721            write_half.write_all("start\n".as_bytes()).await.unwrap();
722            write_half.write_all("break\n".as_bytes()).await.unwrap();
723            write_half.write_all("end\n".as_bytes()).await.unwrap();
724
725            loop {
726                write_half
727                    .write_all("gibberish\n".as_bytes())
728                    .await
729                    .unwrap();
730                tokio::time::sleep(Duration::from_millis(50)).await;
731            }
732        });
733
734        let seen = collector.wait().await.unwrap();
735
736        assert_that(seen).contains_exactly(["start", "break"]);
737    }
738
739    #[tokio::test]
740    async fn collect_lines_to_file() {
741        let (read_half, write_half) = tokio::io::duplex(64);
742        let os = SingleSubscriberOutputStream::from_stream(
743            read_half,
744            "custom",
745            BackpressureControl::DropLatestIncomingIfBufferFull,
746            FromStreamOptions {
747                channel_capacity: 32,
748                ..Default::default()
749            },
750        );
751
752        let temp_file = tempfile::tempfile().unwrap();
753        let collector = os.collect_lines(
754            temp_file,
755            |line, temp_file| {
756                writeln!(temp_file, "{}", line).unwrap();
757                Next::Continue
758            },
759            LineParsingOptions::default(),
760        );
761
762        tokio::spawn(write_test_data(write_half)).await.unwrap();
763
764        let mut temp_file = collector.cancel().await.unwrap();
765        temp_file.seek(SeekFrom::Start(0)).unwrap();
766        let mut contents = String::new();
767        temp_file.read_to_string(&mut contents).unwrap();
768
769        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
770    }
771
772    #[tokio::test]
773    async fn collect_lines_async_to_file() {
774        let (read_half, write_half) = tokio::io::duplex(64);
775        let os = SingleSubscriberOutputStream::from_stream(
776            read_half,
777            "custom",
778            BackpressureControl::DropLatestIncomingIfBufferFull,
779            FromStreamOptions {
780                chunk_size: 32.bytes(),
781                ..Default::default()
782            },
783        );
784
785        let temp_file = tempfile::tempfile().unwrap();
786        let collector = os.collect_lines_async(
787            temp_file,
788            |line, temp_file| {
789                Box::pin(async move {
790                    writeln!(temp_file, "{}", line).unwrap();
791                    Next::Continue
792                })
793            },
794            LineParsingOptions::default(),
795        );
796
797        tokio::spawn(write_test_data(write_half)).await.unwrap();
798
799        let mut temp_file = collector.cancel().await.unwrap();
800        temp_file.seek(SeekFrom::Start(0)).unwrap();
801        let mut contents = String::new();
802        temp_file.read_to_string(&mut contents).unwrap();
803
804        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
805    }
806
807    #[tokio::test]
808    #[traced_test]
809    async fn collect_chunks_into_write_mapped() {
810        let (read_half, write_half) = tokio::io::duplex(64);
811        let os = SingleSubscriberOutputStream::from_stream(
812            read_half,
813            "custom",
814            BackpressureControl::DropLatestIncomingIfBufferFull,
815            FromStreamOptions {
816                chunk_size: 32.bytes(),
817                ..Default::default()
818            },
819        );
820
821        let temp_file = tokio::fs::File::options()
822            .create(true)
823            .truncate(true)
824            .write(true)
825            .read(true)
826            .open(std::env::temp_dir().join(
827                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
828            ))
829            .await
830            .unwrap();
831
832        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
833            String::from_utf8_lossy(chunk.as_ref()).to_string()
834        });
835
836        tokio::spawn(write_test_data(write_half)).await.unwrap();
837
838        let mut temp_file = collector.cancel().await.unwrap();
839        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
840        let mut contents = String::new();
841        temp_file.read_to_string(&mut contents).await.unwrap();
842
843        assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
844    }
845
846    #[tokio::test]
847    #[traced_test]
848    async fn multiple_subscribers_are_not_possible() {
849        let (read_half, _write_half) = tokio::io::duplex(64);
850        let os = SingleSubscriberOutputStream::from_stream(
851            read_half,
852            "custom",
853            BackpressureControl::DropLatestIncomingIfBufferFull,
854            FromStreamOptions::default(),
855        );
856
857        let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
858
859        // Doesn't matter if we call `inspect_lines` or some other "consuming" function instead.
860        assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
861            .has_type::<String>()
862            .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'custom'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
863    }
864}