Skip to main content

tokio_process_tools/output_stream/
broadcast.rs

1use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4    impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
5    impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
6    impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
7    impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
8};
9use crate::output_stream::{Chunk, FromStreamOptions, LineWriteMode, Next, StreamEvent};
10use crate::output_stream::{LineParserState, LineParsingOptions, OutputStream};
11use crate::{NumBytes, WaitForLineResult};
12use std::borrow::Cow;
13use std::fmt::{Debug, Formatter};
14use std::future::Future;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
18use tokio::sync::broadcast;
19use tokio::sync::broadcast::error::RecvError;
20use tokio::task::JoinHandle;
21
22/// The output stream from a process. Either representing stdout or stderr.
23///
24/// This is the broadcast variant, allowing for multiple simultaneous consumers with the downside
25/// of inducing memory allocations not required when only one consumer is listening.
26/// For that case, prefer using the
27/// [`crate::output_stream::single_subscriber::SingleSubscriberOutputStream`].
28pub struct BroadcastOutputStream {
29    /// The task that captured a clone of our `broadcast::Sender` and is now asynchronously
30    /// awaiting new output from the underlying stream, sending it to all registered receivers.
31    stream_reader: JoinHandle<()>,
32
33    /// We only store the chunk sender. The initial receiver is dropped immediately after creating
34    /// the channel.
35    /// New subscribers can be created from this sender.
36    sender: broadcast::Sender<StreamEvent>,
37
38    /// Tracks whether the underlying stream has already reached EOF.
39    ///
40    /// Broadcast channels do not replay old messages to late subscribers, so we need to know
41    /// whether to synthesize a terminal EOF event for consumers that attach after stream closure.
42    closure_state: Arc<Mutex<ClosureState>>,
43
44    /// The maximum size of every chunk read by the backing `stream_reader`.
45    chunk_size: NumBytes,
46
47    /// The maximum capacity of the channel caching the chunks before being processed.
48    max_channel_capacity: usize,
49
50    /// Name of this stream.
51    name: &'static str,
52}
53
54struct ClosureState {
55    closed: bool,
56}
57
58struct Subscription {
59    receiver: broadcast::Receiver<StreamEvent>,
60    emit_terminal_eof: bool,
61}
62
63impl Subscription {
64    async fn recv(&mut self) -> Result<StreamEvent, RecvError> {
65        if self.emit_terminal_eof {
66            self.emit_terminal_eof = false;
67            return Ok(StreamEvent::Eof);
68        }
69
70        self.receiver.recv().await
71    }
72}
73
74impl OutputStream for BroadcastOutputStream {
75    fn chunk_size(&self) -> NumBytes {
76        self.chunk_size
77    }
78
79    fn channel_capacity(&self) -> usize {
80        self.max_channel_capacity
81    }
82
83    fn name(&self) -> &'static str {
84        self.name
85    }
86}
87
88impl Drop for BroadcastOutputStream {
89    fn drop(&mut self) {
90        self.stream_reader.abort();
91    }
92}
93
94impl Debug for BroadcastOutputStream {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("BroadcastOutputStream")
97            .field("output_collector", &"non-debug < JoinHandle<()> >")
98            .field(
99                "sender",
100                &"non-debug < tokio::sync::broadcast::Sender<StreamEvent> >",
101            )
102            .finish()
103    }
104}
105
106/// Uses a single `bytes::BytesMut` instance into which the input stream is read.
107/// Every chunk sent into `sender` is a frozen slice of that buffer.
108/// Once chunks were handled by all active receivers, the space of the chunk is reclaimed and reused.
109async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
110    mut read: B,
111    chunk_size: NumBytes,
112    sender: broadcast::Sender<StreamEvent>,
113    closure_state: Arc<Mutex<ClosureState>>,
114) {
115    let send_chunk = move |event: StreamEvent| {
116        // When we could not send the chunk, we get it back in the error value and
117        // then drop it. This means that the BytesMut storage portion of that chunk
118        // is now reclaimable and can be used for storing the next chunk of incoming
119        // bytes.
120        match sender.send(event) {
121            Ok(_received_by) => {}
122            Err(err) => {
123                // No receivers: All already dropped or none was yet created.
124                // We intentionally ignore these errors.
125                // If they occur, the user just wasn't interested in seeing this chunk.
126                // We won't store it (history) to later feed it back to a new subscriber.
127                tracing::debug!(
128                    error = %err,
129                    "No active receivers for the output chunk, dropping it"
130                );
131            }
132        }
133    };
134
135    // A BytesMut may grow when used in a `read_buf` call.
136    let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
137    loop {
138        let _ = buf.try_reclaim(chunk_size.bytes());
139        match read.read_buf(&mut buf).await {
140            Ok(bytes_read) => {
141                let is_eof = bytes_read == 0;
142
143                if is_eof {
144                    // `subscribe()` and EOF publication both synchronize on `closure_state`.
145                    // This makes late subscribers observe either:
146                    // 1. a receiver that was created before the real terminal EOF, or
147                    // 2. a closed state that requires synthesizing EOF.
148                    // They can no longer observe "closed" while still skipping queued tail data.
149                    let mut state = closure_state.lock().expect("closure_state poisoned");
150                    state.closed = true;
151                    send_chunk(StreamEvent::Eof);
152                } else {
153                    while !buf.is_empty() {
154                        // Split of at least `chunk_size` bytes and send it, even if we were
155                        // able to read more than `chunk_size` bytes.
156                        // We could have read more
157                        let split_to = usize::min(chunk_size.bytes(), buf.len());
158                        // Splitting off bytes reduces the remaining capacity of our BytesMut.
159                        // It might have now reached a capacity of 0. But this is fine!
160                        // The next usage of it in `read_buf` will not return `0`, as you may
161                        // expect from the read_buf documentation. The BytesMut will grow
162                        // to allow buffering of more data.
163                        //
164                        // NOTE: If we only split of at max `chunk_size` bytes, we have to repeat
165                        // this, unless all data is processed.
166                        send_chunk(StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze())));
167                    }
168                }
169
170                if is_eof {
171                    break;
172                }
173            }
174            Err(err) => panic!("Could not read from stream: {err}"),
175        }
176    }
177}
178
179impl BroadcastOutputStream {
180    /// Creates a new broadcast output stream from an async read stream.
181    pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
182        stream: S,
183        stream_name: &'static str,
184        options: FromStreamOptions,
185    ) -> BroadcastOutputStream {
186        options.chunk_size.assert_non_zero("options.chunk_size");
187
188        let (sender, receiver) = broadcast::channel::<StreamEvent>(options.channel_capacity);
189        drop(receiver);
190
191        let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
192
193        let stream_reader = tokio::spawn(read_chunked(
194            stream,
195            options.chunk_size,
196            sender.clone(),
197            Arc::clone(&closure_state),
198        ));
199
200        BroadcastOutputStream {
201            stream_reader,
202            sender,
203            closure_state,
204            chunk_size: options.chunk_size,
205            max_channel_capacity: options.channel_capacity,
206            name: stream_name,
207        }
208    }
209
210    fn subscribe(&self) -> Subscription {
211        let (receiver, emit_terminal_eof) = {
212            let state = self.closure_state.lock().expect("closure_state poisoned");
213            let receiver = self.sender.subscribe();
214
215            // If the stream already finished before this subscriber attached, broadcast channels
216            // have no history for us. Synthesize EOF for this subscriber only so
217            // we do not disturb existing subscribers or evict buffered chunks from the shared
218            // channel. The lock makes this decision linearizable with EOF publication.
219            (receiver, state.closed)
220        };
221
222        Subscription {
223            receiver,
224            emit_terminal_eof,
225        }
226    }
227}
228
229// Expected types:
230// loop_label: &'static str
231// receiver: Subscription
232// term_rx: tokio::sync::oneshot::Receiver<()>,
233macro_rules! handle_subscription {
234    ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
235        $loop_label: loop {
236            tokio::select! {
237                out = $receiver.recv() => {
238                    match out {
239                        Ok(event) => {
240                            let $chunk = event;
241                            $body
242                        }
243                        Err(RecvError::Closed) => {
244                            // All senders have been dropped.
245                            break $loop_label;
246                        },
247                        Err(RecvError::Lagged(lagged)) => {
248                            tracing::warn!(lagged, "Inspector is lagging behind");
249                            let $chunk = StreamEvent::Gap;
250                            $body
251                        }
252                    }
253                }
254                _msg = &mut $term_rx => break $loop_label,
255            }
256        }
257    };
258}
259
260// Impls for inspecting the output of the stream.
261impl BroadcastOutputStream {
262    /// Inspects chunks of output from the stream without storing them.
263    ///
264    /// The provided closure is called for each chunk of data. Return [`Next::Continue`] to keep
265    /// processing or [`Next::Break`] to stop.
266    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
267    pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
268        let mut receiver = self.subscribe();
269        impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
270    }
271
272    /// Inspects lines of output from the stream without storing them.
273    ///
274    /// The provided closure is called for each line. Return [`Next::Continue`] to keep
275    /// processing or [`Next::Break`] to stop.
276    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
277    pub fn inspect_lines(
278        &self,
279        mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
280        options: LineParsingOptions,
281    ) -> Inspector {
282        let mut receiver = self.subscribe();
283        impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
284    }
285
286    /// Inspects lines of output from the stream without storing them, using an async closure.
287    ///
288    /// The provided async closure is called for each line. Return [`Next::Continue`] to keep
289    /// processing or [`Next::Break`] to stop.
290    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
291    pub fn inspect_lines_async<Fut>(
292        &self,
293        mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
294        options: LineParsingOptions,
295    ) -> Inspector
296    where
297        Fut: Future<Output = Next> + Send,
298    {
299        let mut receiver = self.subscribe();
300        impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
301    }
302}
303
304// Impls for collecting the output of the stream.
305impl BroadcastOutputStream {
306    /// Collects chunks from the stream into a sink.
307    ///
308    /// The provided closure is called for each chunk, with mutable access to the sink.
309    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
310    pub fn collect_chunks<S: Sink>(
311        &self,
312        into: S,
313        mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
314    ) -> Collector<S> {
315        let mut receiver = self.subscribe();
316        impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
317    }
318
319    /// Collects chunks from the stream into a sink using an async collector.
320    ///
321    /// The provided async collector is called for each chunk, with mutable access to the sink.
322    ///
323    /// # Example
324    ///
325    /// ```rust,no_run
326    /// use tokio_process_tools::{AsyncChunkCollector, Chunk, Next, Process};
327    ///
328    /// struct ExtendChunks;
329    ///
330    /// impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
331    ///     async fn collect<'a>(&'a mut self, chunk: Chunk, bytes: &'a mut Vec<u8>) -> Next {
332    ///         bytes.extend_from_slice(chunk.as_ref());
333    ///         Next::Continue
334    ///     }
335    /// }
336    ///
337    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
338    /// let process = Process::new(tokio::process::Command::new("some-command"))
339    ///     .spawn_broadcast()?;
340    /// let collector = process.stdout().collect_chunks_async(Vec::new(), ExtendChunks);
341    /// # drop(collector);
342    /// # Ok(())
343    /// # }
344    /// ```
345    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
346    pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
347    where
348        S: Sink,
349        C: AsyncChunkCollector<S>,
350    {
351        let mut receiver = self.subscribe();
352        impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
353    }
354
355    /// Collects lines from the stream into a sink.
356    ///
357    /// The provided closure is called for each line, with mutable access to the sink.
358    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
359    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
360    pub fn collect_lines<S: Sink>(
361        &self,
362        into: S,
363        mut collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
364        options: LineParsingOptions,
365    ) -> Collector<S> {
366        let mut receiver = self.subscribe();
367        impl_collect_lines!(
368            self.name(),
369            receiver,
370            collect,
371            options,
372            into,
373            handle_subscription
374        )
375    }
376
377    /// Collects lines from the stream into a sink using an async collector.
378    ///
379    /// The provided async collector is called for each line, with mutable access to the sink.
380    /// Return [`Next::Continue`] to keep processing or [`Next::Break`] to stop.
381    ///
382    /// # Example
383    ///
384    /// ```rust,no_run
385    /// use std::borrow::Cow;
386    /// use tokio_process_tools::{AsyncLineCollector, LineParsingOptions, Next, Process};
387    ///
388    /// struct PushLines;
389    ///
390    /// impl AsyncLineCollector<Vec<String>> for PushLines {
391    ///     async fn collect<'a>(
392    ///         &'a mut self,
393    ///         line: Cow<'a, str>,
394    ///         lines: &'a mut Vec<String>,
395    ///     ) -> Next {
396    ///         lines.push(line.into_owned());
397    ///         Next::Continue
398    ///     }
399    /// }
400    ///
401    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
402    /// let process = Process::new(tokio::process::Command::new("some-command"))
403    ///     .spawn_broadcast()?;
404    /// let collector = process.stdout().collect_lines_async(
405    ///     Vec::new(),
406    ///     PushLines,
407    ///     LineParsingOptions::default(),
408    /// );
409    /// # drop(collector);
410    /// # Ok(())
411    /// # }
412    /// ```
413    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
414    pub fn collect_lines_async<S, C>(
415        &self,
416        into: S,
417        collect: C,
418        options: LineParsingOptions,
419    ) -> Collector<S>
420    where
421        S: Sink,
422        C: AsyncLineCollector<S>,
423    {
424        let mut receiver = self.subscribe();
425        impl_collect_lines_async!(
426            self.name(),
427            receiver,
428            collect,
429            options,
430            into,
431            handle_subscription
432        )
433    }
434
435    /// Convenience method to collect all chunks into a `Vec<u8>`.
436    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
437    pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
438        self.collect_chunks(Vec::new(), |chunk, vec| {
439            vec.extend_from_slice(chunk.as_ref());
440        })
441    }
442
443    /// Convenience method to collect all lines into a `Vec<String>`.
444    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
445    pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
446        self.collect_lines(
447            Vec::new(),
448            |line, vec| {
449                vec.push(line.into_owned());
450                Next::Continue
451            },
452            options,
453        )
454    }
455
456    /// Collects chunks into an async writer.
457    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
458    pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
459        &self,
460        write: W,
461    ) -> Collector<W> {
462        let mut receiver = self.subscribe();
463        impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
464    }
465
466    /// Collects lines into an async writer.
467    ///
468    /// Parsed lines no longer include their trailing newline byte, so `mode` controls whether a
469    /// `\n` delimiter should be reintroduced for each emitted line.
470    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
471    pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
472        &self,
473        write: W,
474        options: LineParsingOptions,
475        mode: LineWriteMode,
476    ) -> Collector<W> {
477        let mut receiver = self.subscribe();
478        impl_collect_lines_into_write!(
479            self.name(),
480            receiver,
481            write,
482            options,
483            mode,
484            handle_subscription
485        )
486    }
487
488    /// Collects chunks into an async writer after mapping them with the provided function.
489    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
490    pub fn collect_chunks_into_write_mapped<
491        W: Sink + AsyncWriteExt + Unpin,
492        B: AsRef<[u8]> + Send,
493    >(
494        &self,
495        write: W,
496        mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
497    ) -> Collector<W> {
498        let mut receiver = self.subscribe();
499        impl_collect_chunks_into_write_mapped!(
500            self.name(),
501            receiver,
502            write,
503            mapper,
504            handle_subscription
505        )
506    }
507
508    /// Collects lines into an async writer after mapping them with the provided function.
509    ///
510    /// `mode` applies after `mapper`: choose [`LineWriteMode::AsIs`] when the mapped output
511    /// already contains delimiters, or [`LineWriteMode::AppendLf`] to append `\n` after each
512    /// mapped line.
513    #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
514    pub fn collect_lines_into_write_mapped<
515        W: Sink + AsyncWriteExt + Unpin,
516        B: AsRef<[u8]> + Send,
517    >(
518        &self,
519        write: W,
520        mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
521        options: LineParsingOptions,
522        mode: LineWriteMode,
523    ) -> Collector<W> {
524        let mut receiver = self.subscribe();
525        impl_collect_lines_into_write_mapped!(
526            self.name(),
527            receiver,
528            write,
529            mapper,
530            options,
531            mode,
532            handle_subscription
533        )
534    }
535}
536
537// Impls for waiting for a specific line of output.
538impl BroadcastOutputStream {
539    async fn wait_for_line_inner(
540        &self,
541        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
542        options: LineParsingOptions,
543    ) -> WaitForLineResult {
544        let mut receiver = self.subscribe();
545        let mut parser = LineParserState::new();
546
547        loop {
548            match receiver.recv().await {
549                Ok(StreamEvent::Chunk(chunk)) => {
550                    if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
551                        if predicate(line) {
552                            Next::Break
553                        } else {
554                            Next::Continue
555                        }
556                    }) == Next::Break
557                    {
558                        return WaitForLineResult::Matched;
559                    }
560                }
561                Ok(StreamEvent::Gap) => {
562                    parser.on_gap();
563                }
564                Ok(StreamEvent::Eof) | Err(RecvError::Closed) => {
565                    if visit_final_line(&parser, |line| {
566                        if predicate(line) {
567                            Next::Break
568                        } else {
569                            Next::Continue
570                        }
571                    }) == Next::Break
572                    {
573                        return WaitForLineResult::Matched;
574                    }
575                    return WaitForLineResult::StreamClosed;
576                }
577                Err(RecvError::Lagged(lagged)) => {
578                    tracing::warn!(lagged, "Waiter is lagging behind");
579                    parser.on_gap();
580                }
581            }
582        }
583    }
584
585    /// Waits for a line that matches the given predicate.
586    ///
587    /// Returns [`WaitForLineResult::Matched`] if a matching line is found, or
588    /// [`WaitForLineResult::StreamClosed`] if the stream ends first.
589    /// This method never returns [`WaitForLineResult::Timeout`]; use
590    /// [`BroadcastOutputStream::wait_for_line_with_timeout`] if you need a bounded wait.
591    ///
592    /// Broadcast delivery is lossy under pressure. If this waiter lags and misses chunks, it
593    /// discards any partial line in progress and resynchronizes at the next newline instead of
594    /// matching across a gap.
595    pub async fn wait_for_line(
596        &self,
597        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
598        options: LineParsingOptions,
599    ) -> WaitForLineResult {
600        self.wait_for_line_inner(predicate, options).await
601    }
602
603    /// Waits for a line that matches the given predicate, with a timeout.
604    ///
605    /// Returns [`WaitForLineResult::Matched`] if a matching line is found,
606    /// [`WaitForLineResult::StreamClosed`] if the stream ends first, or
607    /// [`WaitForLineResult::Timeout`] if the timeout expires first.
608    ///
609    /// This is the only line-wait variant on this type that can return
610    /// [`WaitForLineResult::Timeout`].
611    ///
612    /// Broadcast delivery is lossy under pressure. If this waiter lags and misses chunks, it
613    /// discards any partial line in progress and resynchronizes at the next newline instead of
614    /// matching across a gap.
615    pub async fn wait_for_line_with_timeout(
616        &self,
617        predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
618        options: LineParsingOptions,
619        timeout: Duration,
620    ) -> WaitForLineResult {
621        tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
622            .await
623            .unwrap_or(WaitForLineResult::Timeout)
624    }
625}
626
627/// Configuration for line parsing behavior.
628pub struct LineConfig {
629    // Existing fields
630    // ...
631    /// Maximum length of a single line in bytes.
632    /// When reached, the current line will be emitted.
633    /// A value of 0 means no limit (default).
634    pub max_line_length: usize,
635}
636
637#[cfg(test)]
638mod tests {
639    use super::{ClosureState, read_chunked};
640    use crate::WaitForLineResult;
641    use crate::output_stream::broadcast::BroadcastOutputStream;
642    use crate::output_stream::tests::write_test_data;
643    use crate::output_stream::{Chunk, StreamEvent};
644    use crate::output_stream::{FromStreamOptions, LineParsingOptions, LineWriteMode, Next};
645    use crate::{AsyncChunkCollector, AsyncLineCollector};
646    use crate::{NumBytes, NumBytesExt};
647    use assertr::prelude::*;
648    use bytes::Bytes;
649    use mockall::*;
650    use std::borrow::Cow;
651    use std::future::poll_fn;
652    use std::io::Read;
653    use std::io::Seek;
654    use std::io::SeekFrom;
655    use std::io::Write;
656    use std::pin::pin;
657    use std::sync::{Arc, Mutex};
658    use std::task::Poll;
659    use std::time::Duration;
660    use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
661    use tokio::sync::broadcast;
662    use tokio::sync::broadcast::error::RecvError;
663    use tokio::time::sleep;
664    use tracing_test::traced_test;
665
666    struct BreakOnLine;
667
668    impl AsyncLineCollector<Vec<String>> for BreakOnLine {
669        async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
670            if line == "break" {
671                seen.push(line.into_owned());
672                Next::Break
673            } else {
674                seen.push(line.into_owned());
675                Next::Continue
676            }
677        }
678    }
679
680    struct WriteLine;
681
682    impl AsyncLineCollector<std::fs::File> for WriteLine {
683        async fn collect<'a>(
684            &'a mut self,
685            line: Cow<'a, str>,
686            temp_file: &'a mut std::fs::File,
687        ) -> Next {
688            writeln!(temp_file, "{line}").unwrap();
689            Next::Continue
690        }
691    }
692
693    struct ExtendChunks;
694
695    impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
696        async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
697            seen.extend_from_slice(chunk.as_ref());
698            Next::Continue
699        }
700    }
701
702    #[test]
703    #[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
704    fn from_stream_panics_on_zero_chunk_size() {
705        let _stream = BroadcastOutputStream::from_stream(
706            tokio::io::empty(),
707            "custom",
708            FromStreamOptions {
709                chunk_size: NumBytes::zero(),
710                ..FromStreamOptions::default()
711            },
712        );
713    }
714
715    #[tokio::test]
716    #[traced_test]
717    async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
718    {
719        let (read_half, mut write_half) = tokio::io::duplex(64);
720        let (tx, mut rx) = broadcast::channel(10);
721
722        // Let's preemptively write more data into the stream than our later selected chunk size (2)
723        // can handle, forcing the initial read to completely fill our chunk buffer.
724        // Our expectation is that we still receive all data written here through multiple
725        // consecutive reads.
726        // The behavior of bytes::BytesMut, potentially reaching zero capacity when splitting a
727        // full buffer of, must not prevent this from happening!
728        write_half.write_all(b"hello world").await.unwrap();
729        write_half.flush().await.unwrap();
730
731        let stream_reader = tokio::spawn(read_chunked(
732            read_half,
733            2.bytes(),
734            tx,
735            Arc::new(Mutex::new(ClosureState { closed: false })),
736        ));
737
738        drop(write_half); // This closes the stream and should let stream_reader terminate.
739        stream_reader.await.unwrap();
740
741        let mut chunks = Vec::<String>::new();
742        while let Ok(event) = rx.recv().await {
743            match event {
744                StreamEvent::Chunk(chunk) => {
745                    chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
746                }
747                StreamEvent::Gap => {}
748                StreamEvent::Eof => break,
749            }
750        }
751        assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
752    }
753
754    #[tokio::test]
755    #[traced_test]
756    async fn read_chunked_no_data() {
757        let (read_half, write_half) = tokio::io::duplex(64);
758        let (tx, mut rx) = broadcast::channel(10);
759
760        let stream_reader = tokio::spawn(read_chunked(
761            read_half,
762            2.bytes(),
763            tx,
764            Arc::new(Mutex::new(ClosureState { closed: false })),
765        ));
766
767        drop(write_half); // This closes the stream and should let stream_reader terminate.
768        stream_reader.await.unwrap();
769
770        let mut chunks = Vec::<String>::new();
771        while let Ok(event) = rx.recv().await {
772            match event {
773                StreamEvent::Chunk(chunk) => {
774                    chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
775                }
776                StreamEvent::Gap => {}
777                StreamEvent::Eof => break,
778            }
779        }
780        assert_that!(chunks).is_empty();
781    }
782
783    #[tokio::test]
784    async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
785        let (read_half, mut write_half) = tokio::io::duplex(64);
786        let os =
787            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
788
789        let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
790        let mut waiter = pin!(waiter);
791
792        // Drive the first poll on the current task so the synchronous subscribe step
793        // inside `wait_for_line` runs before the producer writes. Once polled, the
794        // future has either completed already or is parked waiting on the broadcast
795        // channel — either outcome is race-free with respect to subsequent writes.
796        poll_fn(|cx| {
797            let _ = waiter.as_mut().poll(cx);
798            Poll::Ready(())
799        })
800        .await;
801
802        write_half.write_all(b"booting\nready\n").await.unwrap();
803        write_half.flush().await.unwrap();
804        drop(write_half);
805
806        let result = waiter.await;
807        assert_that!(result).is_equal_to(WaitForLineResult::Matched);
808    }
809
810    #[tokio::test]
811    async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
812        let (read_half, mut write_half) = tokio::io::duplex(64);
813        let os =
814            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
815
816        let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
817        let mut waiter = pin!(waiter);
818
819        // See `wait_for_line_returns_matched_when_line_appears_before_eof` for why
820        // we drive the first poll explicitly here instead of yielding once.
821        poll_fn(|cx| {
822            let _ = waiter.as_mut().poll(cx);
823            Poll::Ready(())
824        })
825        .await;
826
827        write_half
828            .write_all(b"booting\nstill starting\n")
829            .await
830            .unwrap();
831        write_half.flush().await.unwrap();
832        drop(write_half);
833
834        let result = waiter.await;
835        assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
836    }
837
838    #[tokio::test]
839    async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
840        let (read_half, mut write_half) = tokio::io::duplex(64);
841        let os =
842            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
843
844        let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
845        let mut waiter = pin!(waiter);
846
847        // See `wait_for_line_returns_matched_when_line_appears_before_eof` for why
848        // we drive the first poll explicitly here instead of yielding once.
849        poll_fn(|cx| {
850            let _ = waiter.as_mut().poll(cx);
851            Poll::Ready(())
852        })
853        .await;
854
855        write_half.write_all(b"booting\nready").await.unwrap();
856        write_half.flush().await.unwrap();
857        drop(write_half);
858
859        let result = waiter.await;
860        assert_that!(result).is_equal_to(WaitForLineResult::Matched);
861    }
862
863    #[tokio::test]
864    async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
865        let (read_half, _write_half) = tokio::io::duplex(64);
866        let os =
867            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
868
869        let result = os
870            .wait_for_line_with_timeout(
871                |line| line.contains("ready"),
872                LineParsingOptions::default(),
873                Duration::from_millis(25),
874            )
875            .await;
876
877        assert_that!(result).is_equal_to(WaitForLineResult::Timeout);
878    }
879
880    #[tokio::test]
881    async fn wait_for_line_returns_stream_closed_for_late_subscriber_after_eof() {
882        let (read_half, mut write_half) = tokio::io::duplex(64);
883        let os =
884            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
885
886        write_half.write_all(b"booting\n").await.unwrap();
887        write_half.flush().await.unwrap();
888        drop(write_half);
889
890        // Wait deterministically until `read_chunked` has observed EOF and flipped
891        // `closed`. This ensures we exercise the late-subscriber path (with the
892        // synthesized terminal EOF) rather than racing it.
893        while !os
894            .closure_state
895            .lock()
896            .expect("closure_state poisoned")
897            .closed
898        {
899            tokio::task::yield_now().await;
900        }
901
902        let result = os
903            .wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
904            .await;
905
906        assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
907    }
908
909    #[tokio::test]
910    async fn late_subscriber_after_eof_does_not_disturb_existing_subscribers() {
911        let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
912        drop(receiver);
913
914        let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
915        let os = BroadcastOutputStream {
916            stream_reader: tokio::spawn(async {}),
917            sender: sender.clone(),
918            closure_state: Arc::clone(&closure_state),
919            chunk_size: 4.bytes(),
920            max_channel_capacity: 2,
921            name: "custom",
922        };
923
924        let mut existing = os.subscribe();
925
926        sender
927            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\n"))))
928            .unwrap();
929        sender
930            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))))
931            .unwrap();
932        sender.send(StreamEvent::Eof).unwrap();
933        closure_state.lock().expect("closure_state poisoned").closed = true;
934
935        let mut late = os.subscribe();
936        assert_that!(late.recv().await)
937            .is_ok()
938            .is_equal_to(StreamEvent::Eof);
939
940        assert_that!(existing.recv().await)
941            .is_err()
942            .is_equal_to(RecvError::Lagged(1));
943        let chunk = existing.recv().await.unwrap();
944        assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))));
945        assert_that!(existing.recv().await)
946            .is_ok()
947            .is_equal_to(StreamEvent::Eof);
948    }
949
950    #[tokio::test]
951    async fn subscriber_created_before_closure_receives_tail_data_before_terminal_eof() {
952        let (sender, receiver) = broadcast::channel::<StreamEvent>(4);
953        drop(receiver);
954
955        let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
956        let os = BroadcastOutputStream {
957            stream_reader: tokio::spawn(async {}),
958            sender: sender.clone(),
959            closure_state: Arc::clone(&closure_state),
960            chunk_size: 4.bytes(),
961            max_channel_capacity: 4,
962            name: "custom",
963        };
964
965        let mut subscriber = os.subscribe();
966
967        sender
968            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))))
969            .unwrap();
970        {
971            let mut state = closure_state.lock().expect("closure_state poisoned");
972            state.closed = true;
973            sender.send(StreamEvent::Eof).unwrap();
974        }
975
976        let chunk = subscriber.recv().await.unwrap();
977        assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))));
978        assert_that!(subscriber.recv().await)
979            .is_ok()
980            .is_equal_to(StreamEvent::Eof);
981    }
982
983    #[tokio::test]
984    async fn wait_for_line_does_not_match_across_lag_gap() {
985        let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
986        drop(receiver);
987
988        let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
989        let os = BroadcastOutputStream {
990            stream_reader: tokio::spawn(async {}),
991            sender: sender.clone(),
992            closure_state: Arc::clone(&closure_state),
993            chunk_size: 4.bytes(),
994            max_channel_capacity: 2,
995            name: "custom",
996        };
997
998        let waiter = os.wait_for_line(|line| line == "ready", LineParsingOptions::default());
999        let mut waiter = pin!(waiter);
1000
1001        poll_fn(|cx| {
1002            let _ = waiter.as_mut().poll(cx);
1003            Poll::Ready(())
1004        })
1005        .await;
1006
1007        sender
1008            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
1009            .unwrap();
1010        sender
1011            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"lost"))))
1012            .unwrap();
1013        sender
1014            .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
1015            .unwrap();
1016        {
1017            let mut state = closure_state.lock().expect("closure_state poisoned");
1018            state.closed = true;
1019        }
1020        sender.send(StreamEvent::Eof).unwrap();
1021
1022        assert_that!(waiter.await).is_equal_to(WaitForLineResult::StreamClosed);
1023    }
1024
1025    #[tokio::test]
1026    async fn collect_lines_into_write_appends_requested_line_delimiter() {
1027        let (read_half, write_half) = tokio::io::duplex(64);
1028        let os =
1029            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
1030
1031        let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
1032        let collector = os.collect_lines_into_write(
1033            temp_file,
1034            LineParsingOptions::default(),
1035            LineWriteMode::AppendLf,
1036        );
1037
1038        tokio::spawn(write_test_data(write_half)).await.unwrap();
1039
1040        let mut temp_file = collector.cancel().await.unwrap();
1041        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1042        let mut contents = String::new();
1043        temp_file.read_to_string(&mut contents).await.unwrap();
1044
1045        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1046    }
1047
1048    #[tokio::test]
1049    #[traced_test]
1050    async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
1051        let (read_half, mut write_half) = tokio::io::duplex(64);
1052        let os = BroadcastOutputStream::from_stream(
1053            read_half,
1054            "custom",
1055            FromStreamOptions {
1056                channel_capacity: 2,
1057                ..Default::default()
1058            },
1059        );
1060
1061        let consumer = os.inspect_lines_async(
1062            |_line| async move {
1063                // Mimic a slow consumer.
1064                sleep(Duration::from_millis(100)).await;
1065                Next::Continue
1066            },
1067            LineParsingOptions::default(),
1068        );
1069
1070        #[rustfmt::skip]
1071        let producer = tokio::spawn(async move {
1072            for count in 1..=15 {
1073                write_half
1074                    .write_all(format!("{count}\n").as_bytes())
1075                    .await
1076                    .unwrap();
1077                sleep(Duration::from_millis(25)).await;
1078            }
1079            write_half.flush().await.unwrap();
1080            drop(write_half);
1081        });
1082
1083        producer.await.unwrap();
1084        consumer.wait().await.unwrap();
1085        drop(os);
1086
1087        logs_assert(|lines: &[&str]| {
1088            let lagged_logs = lines
1089                .iter()
1090                .filter(|line| line.contains("Inspector is lagging behind lagged="))
1091                .count();
1092            if lagged_logs == 0 {
1093                return Err("Expected at least one lagged log".to_string());
1094            }
1095            Ok(())
1096        });
1097    }
1098
1099    #[tokio::test]
1100    async fn inspect_lines() {
1101        #[automock]
1102        trait LineVisitor {
1103            fn visit(&self, line: String);
1104        }
1105
1106        #[rustfmt::skip]
1107        fn configure(mock: &mut MockLineVisitor) {
1108            mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
1109            mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
1110            mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
1111            mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
1112            mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
1113        }
1114
1115        let (read_half, write_half) = tokio::io::duplex(64);
1116        let os =
1117            BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
1118
1119        let mut mock = MockLineVisitor::new();
1120        configure(&mut mock);
1121
1122        let inspector = os.inspect_lines(
1123            move |line| {
1124                mock.visit(line.into_owned());
1125                Next::Continue
1126            },
1127            LineParsingOptions::default(),
1128        );
1129
1130        tokio::spawn(write_test_data(write_half)).await.unwrap();
1131
1132        inspector.cancel().await.unwrap();
1133        drop(os);
1134    }
1135
1136    /// This tests that our impl macros properly `break 'outer`, as they might be in an inner loop!
1137    /// With `break` instead of `break 'outer`, this test would never complete, as the `Next::Break`
1138    /// would not terminate the collector!
1139    #[tokio::test]
1140    #[traced_test]
1141    async fn inspect_lines_async() {
1142        let (read_half, mut write_half) = tokio::io::duplex(64);
1143        let os = BroadcastOutputStream::from_stream(
1144            read_half,
1145            "custom",
1146            FromStreamOptions {
1147                chunk_size: 32.bytes(),
1148                ..Default::default()
1149            },
1150        );
1151
1152        let seen: Vec<String> = Vec::new();
1153        let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
1154
1155        let _writer = tokio::spawn(async move {
1156            write_half.write_all("start\n".as_bytes()).await.unwrap();
1157            write_half.write_all("break\n".as_bytes()).await.unwrap();
1158            write_half.write_all("end\n".as_bytes()).await.unwrap();
1159
1160            loop {
1161                write_half
1162                    .write_all("gibberish\n".as_bytes())
1163                    .await
1164                    .unwrap();
1165                tokio::time::sleep(Duration::from_millis(50)).await;
1166            }
1167        });
1168
1169        let seen = collector.wait().await.unwrap();
1170
1171        assert_that!(seen).contains_exactly(["start", "break"]);
1172    }
1173
1174    #[tokio::test]
1175    async fn collect_chunks_async_into_vec() {
1176        let (read_half, mut write_half) = tokio::io::duplex(64);
1177        let os = BroadcastOutputStream::from_stream(
1178            read_half,
1179            "custom",
1180            FromStreamOptions {
1181                chunk_size: 2.bytes(),
1182                ..Default::default()
1183            },
1184        );
1185
1186        let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
1187
1188        write_half.write_all(b"abcdef").await.unwrap();
1189        drop(write_half);
1190
1191        let seen = collector.wait().await.unwrap();
1192        assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1193    }
1194
1195    #[tokio::test]
1196    async fn collect_lines_to_file() {
1197        let (read_half, write_half) = tokio::io::duplex(64);
1198        let os = BroadcastOutputStream::from_stream(
1199            read_half,
1200            "custom",
1201            FromStreamOptions {
1202                channel_capacity: 32,
1203                ..Default::default()
1204            },
1205        );
1206
1207        let temp_file = tempfile::tempfile().unwrap();
1208        let collector = os.collect_lines(
1209            temp_file,
1210            |line, temp_file| {
1211                writeln!(temp_file, "{line}").unwrap();
1212                Next::Continue
1213            },
1214            LineParsingOptions::default(),
1215        );
1216
1217        tokio::spawn(write_test_data(write_half)).await.unwrap();
1218
1219        let mut temp_file = collector.cancel().await.unwrap();
1220        temp_file.seek(SeekFrom::Start(0)).unwrap();
1221        let mut contents = String::new();
1222        temp_file.read_to_string(&mut contents).unwrap();
1223
1224        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1225    }
1226
1227    #[tokio::test]
1228    async fn collect_lines_async_to_file() {
1229        let (read_half, write_half) = tokio::io::duplex(64);
1230        let os = BroadcastOutputStream::from_stream(
1231            read_half,
1232            "custom",
1233            FromStreamOptions {
1234                chunk_size: 32.bytes(),
1235                ..Default::default()
1236            },
1237        );
1238
1239        let temp_file = tempfile::tempfile().unwrap();
1240        let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
1241
1242        tokio::spawn(write_test_data(write_half)).await.unwrap();
1243
1244        let mut temp_file = collector.cancel().await.unwrap();
1245        temp_file.seek(SeekFrom::Start(0)).unwrap();
1246        let mut contents = String::new();
1247        temp_file.read_to_string(&mut contents).unwrap();
1248
1249        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1250    }
1251
1252    #[tokio::test]
1253    #[traced_test]
1254    async fn collect_chunks_into_write_mapped() {
1255        let (read_half, write_half) = tokio::io::duplex(64);
1256        let os = BroadcastOutputStream::from_stream(
1257            read_half,
1258            "custom",
1259            FromStreamOptions {
1260                chunk_size: 32.bytes(),
1261                ..Default::default()
1262            },
1263        );
1264
1265        let temp_file = tokio::fs::File::options()
1266            .create(true)
1267            .truncate(true)
1268            .write(true)
1269            .read(true)
1270            .open(std::env::temp_dir().join(
1271                "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
1272            ))
1273            .await
1274            .unwrap();
1275
1276        let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
1277            String::from_utf8_lossy(chunk.as_ref()).to_string()
1278        });
1279
1280        tokio::spawn(write_test_data(write_half)).await.unwrap();
1281
1282        let mut temp_file = collector.cancel().await.unwrap();
1283        temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1284        let mut contents = String::new();
1285        temp_file.read_to_string(&mut contents).await.unwrap();
1286
1287        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1288    }
1289
1290    #[tokio::test]
1291    #[traced_test]
1292    async fn collect_chunks_into_write_in_parallel() {
1293        // Big enough to hold any individual test write that we perform.
1294        let (read_half, write_half) = tokio::io::duplex(64);
1295
1296        let os = BroadcastOutputStream::from_stream(
1297            read_half,
1298            "custom",
1299            FromStreamOptions {
1300                // Big enough to hold any individual test write that we perform.
1301                // Actual chunks will be smaller.
1302                chunk_size: 32.bytes(),
1303                channel_capacity: 2,
1304            },
1305        );
1306
1307        let file1 = tokio::fs::File::options()
1308            .create(true)
1309            .truncate(true)
1310            .write(true)
1311            .read(true)
1312            .open(
1313                std::env::temp_dir()
1314                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
1315            )
1316            .await
1317            .unwrap();
1318        let file2 = tokio::fs::File::options()
1319            .create(true)
1320            .truncate(true)
1321            .write(true)
1322            .read(true)
1323            .open(
1324                std::env::temp_dir()
1325                    .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
1326            )
1327            .await
1328            .unwrap();
1329
1330        let collector1 = os.collect_chunks_into_write(file1);
1331        let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
1332            format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
1333        });
1334
1335        tokio::spawn(write_test_data(write_half)).await.unwrap();
1336
1337        let mut temp_file1 = collector1.cancel().await.unwrap();
1338        temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
1339        let mut contents = String::new();
1340        temp_file1.read_to_string(&mut contents).await.unwrap();
1341        assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1342
1343        let mut temp_file2 = collector2.cancel().await.unwrap();
1344        temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
1345        let mut contents = String::new();
1346        temp_file2.read_to_string(&mut contents).await.unwrap();
1347        assert_that!(contents)
1348            .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
1349    }
1350}