1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::error::OutputError;
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use crate::{InspectorError, NumBytes};
11use std::borrow::Cow;
12use std::fmt::{Debug, Formatter};
13use std::future::Future;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
17use tokio::sync::broadcast::error::RecvError;
18use tokio::sync::{RwLock, broadcast};
19use tokio::task::JoinHandle;
20
21pub struct BroadcastOutputStream {
28 stream_reader: JoinHandle<()>,
31
32 sender: broadcast::Sender<Option<Chunk>>,
36
37 chunk_size: NumBytes,
39
40 max_channel_capacity: usize,
42
43 name: &'static str,
45}
46
47impl OutputStream for BroadcastOutputStream {
48 fn chunk_size(&self) -> NumBytes {
49 self.chunk_size
50 }
51
52 fn channel_capacity(&self) -> usize {
53 self.max_channel_capacity
54 }
55
56 fn name(&self) -> &'static str {
57 self.name
58 }
59}
60
61impl Drop for BroadcastOutputStream {
62 fn drop(&mut self) {
63 self.stream_reader.abort();
64 }
65}
66
67impl Debug for BroadcastOutputStream {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("BroadcastOutputStream")
70 .field("output_collector", &"non-debug < JoinHandle<()> >")
71 .field(
72 "sender",
73 &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
74 )
75 .finish()
76 }
77}
78
79async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
83 mut read: B,
84 chunk_size: NumBytes,
85 sender: broadcast::Sender<Option<Chunk>>,
86) {
87 let send_chunk = move |chunk: Option<Chunk>| {
88 match sender.send(chunk) {
93 Ok(_received_by) => {}
94 Err(err) => {
95 tracing::debug!(
100 error = %err,
101 "No active receivers for the output chunk, dropping it"
102 );
103 }
104 }
105 };
106
107 let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
109 loop {
110 let _ = buf.try_reclaim(chunk_size.bytes());
111 match read.read_buf(&mut buf).await {
112 Ok(bytes_read) => {
113 let is_eof = bytes_read == 0;
114
115 match is_eof {
116 true => send_chunk(None),
117 false => {
118 while !buf.is_empty() {
119 let split_to = usize::min(chunk_size.bytes(), buf.len());
123 send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
132 }
133 }
134 };
135
136 if is_eof {
137 break;
138 }
139 }
140 Err(err) => panic!("Could not read from stream: {err}"),
141 }
142 }
143}
144
145impl BroadcastOutputStream {
146 pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
147 stream: S,
148 stream_name: &'static str,
149 options: FromStreamOptions,
150 ) -> BroadcastOutputStream {
151 let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
152 drop(receiver);
153
154 let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
155
156 BroadcastOutputStream {
157 stream_reader,
158 sender,
159 chunk_size: options.chunk_size,
160 max_channel_capacity: options.channel_capacity,
161 name: stream_name,
162 }
163 }
164
165 fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
166 self.sender.subscribe()
167 }
168}
169
170macro_rules! handle_subscription {
175 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
176 $loop_label: loop {
177 tokio::select! {
178 out = $receiver.recv() => {
179 match out {
180 Ok(maybe_chunk) => {
181 let $chunk = maybe_chunk;
182 $body
183 }
184 Err(RecvError::Closed) => {
185 break $loop_label;
187 },
188 Err(RecvError::Lagged(lagged)) => {
189 tracing::warn!(lagged, "Inspector is lagging behind");
190 }
191 }
192 }
193 _msg = &mut $term_rx => break $loop_label,
194 }
195 }
196 };
197}
198
199impl BroadcastOutputStream {
201 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
206 pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
207 let mut receiver = self.subscribe();
208 impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
209 }
210
211 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
216 pub fn inspect_lines(
217 &self,
218 mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
219 options: LineParsingOptions,
220 ) -> Inspector {
221 let mut receiver = self.subscribe();
222 impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
223 }
224
225 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
230 pub fn inspect_lines_async<Fut>(
231 &self,
232 mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
233 options: LineParsingOptions,
234 ) -> Inspector
235 where
236 Fut: Future<Output = Next> + Send,
237 {
238 let mut receiver = self.subscribe();
239 impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
240 }
241}
242
243impl BroadcastOutputStream {
245 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
249 pub fn collect_chunks<S: Sink>(
250 &self,
251 into: S,
252 mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
253 ) -> Collector<S> {
254 let sink = Arc::new(RwLock::new(into));
255 let mut receiver = self.subscribe();
256 impl_collect_chunks!(self.name(), receiver, collect, sink, handle_subscription)
257 }
258
259 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
263 pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
264 where
265 S: Sink,
266 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
267 {
268 let sink = Arc::new(RwLock::new(into));
269 let mut receiver = self.subscribe();
270 impl_collect_chunks_async!(self.name(), receiver, collect, sink, handle_subscription)
271 }
272
273 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
278 pub fn collect_lines<S: Sink>(
279 &self,
280 into: S,
281 mut collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
282 options: LineParsingOptions,
283 ) -> Collector<S> {
284 let sink = Arc::new(RwLock::new(into));
285 let mut receiver = self.subscribe();
286 impl_collect_lines!(
287 self.name(),
288 receiver,
289 collect,
290 options,
291 sink,
292 handle_subscription
293 )
294 }
295
296 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
301 pub fn collect_lines_async<S, F>(
302 &self,
303 into: S,
304 collect: F,
305 options: LineParsingOptions,
306 ) -> Collector<S>
307 where
308 S: Sink,
309 F: for<'a> Fn(Cow<'a, str>, &'a mut S) -> AsyncCollectFn<'a> + Send + Sync + 'static,
310 {
311 let sink = Arc::new(RwLock::new(into));
312 let mut receiver = self.subscribe();
313 impl_collect_lines_async!(
314 self.name(),
315 receiver,
316 collect,
317 options,
318 sink,
319 handle_subscription
320 )
321 }
322
323 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
325 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
326 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
327 }
328
329 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
331 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
332 self.collect_lines(
333 Vec::new(),
334 |line, vec| {
335 vec.push(line.into_owned());
336 Next::Continue
337 },
338 options,
339 )
340 }
341
342 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
344 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
345 &self,
346 write: W,
347 ) -> Collector<W> {
348 self.collect_chunks_async(write, move |chunk, write| {
349 Box::pin(async move {
350 if let Err(err) = write.write_all(chunk.as_ref()).await {
351 tracing::warn!("Could not write chunk to write sink: {err:#?}");
352 };
353 Next::Continue
354 })
355 })
356 }
357
358 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
360 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
361 &self,
362 write: W,
363 options: LineParsingOptions,
364 ) -> Collector<W> {
365 self.collect_lines_async(
366 write,
367 move |line, write| {
368 Box::pin(async move {
369 if let Err(err) = write.write_all(line.as_bytes()).await {
370 tracing::warn!("Could not write line to write sink: {err:#?}");
371 };
372 Next::Continue
373 })
374 },
375 options,
376 )
377 }
378
379 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
381 pub fn collect_chunks_into_write_mapped<
382 W: Sink + AsyncWriteExt + Unpin,
383 B: AsRef<[u8]> + Send,
384 >(
385 &self,
386 write: W,
387 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
388 ) -> Collector<W> {
389 self.collect_chunks_async(write, move |chunk, write| {
390 Box::pin(async move {
391 let mapped = mapper(chunk);
392 let mapped = mapped.as_ref();
393 if let Err(err) = write.write_all(mapped).await {
394 tracing::warn!("Could not write chunk to write sink: {err:#?}");
395 };
396 Next::Continue
397 })
398 })
399 }
400
401 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
403 pub fn collect_lines_into_write_mapped<
404 W: Sink + AsyncWriteExt + Unpin,
405 B: AsRef<[u8]> + Send,
406 >(
407 &self,
408 write: W,
409 mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
410 options: LineParsingOptions,
411 ) -> Collector<W> {
412 self.collect_lines_async(
413 write,
414 move |line, write| {
415 Box::pin(async move {
416 let mapped = mapper(line);
417 let mapped = mapped.as_ref();
418 if let Err(err) = write.write_all(mapped).await {
419 tracing::warn!("Could not write line to write sink: {err:#?}");
420 };
421 Next::Continue
422 })
423 },
424 options,
425 )
426 }
427}
428
429impl BroadcastOutputStream {
431 pub async fn wait_for_line(
435 &self,
436 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
437 options: LineParsingOptions,
438 ) -> Result<(), InspectorError> {
439 let inspector = self.inspect_lines(
440 move |line| {
441 if predicate(line) {
442 Next::Break
443 } else {
444 Next::Continue
445 }
446 },
447 options,
448 );
449 inspector.wait().await
450 }
451
452 pub async fn wait_for_line_with_timeout(
456 &self,
457 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
458 options: LineParsingOptions,
459 timeout: Duration,
460 ) -> Result<(), OutputError> {
461 tokio::time::timeout(timeout, self.wait_for_line(predicate, options))
462 .await
463 .map_err(|_elapsed| OutputError::Timeout {
464 stream_name: self.name(),
465 timeout,
466 })
467 .and_then(|res| res.map_err(OutputError::InspectorFailed))
468 }
469}
470
471pub struct LineConfig {
473 pub max_line_length: usize,
479}
480
481#[cfg(test)]
482mod tests {
483 use super::read_chunked;
484 use crate::NumBytesExt;
485 use crate::output_stream::broadcast::BroadcastOutputStream;
486 use crate::output_stream::tests::write_test_data;
487 use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
488 use assertr::assert_that;
489 use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
490 use mockall::*;
491 use std::io::Read;
492 use std::io::Seek;
493 use std::io::SeekFrom;
494 use std::io::Write;
495 use std::time::Duration;
496 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
497 use tokio::sync::broadcast;
498 use tokio::time::sleep;
499 use tracing_test::traced_test;
500
501 #[tokio::test]
502 #[traced_test]
503 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
504 {
505 let (read_half, mut write_half) = tokio::io::duplex(64);
506 let (tx, mut rx) = broadcast::channel(10);
507
508 write_half.write_all(b"hello world").await.unwrap();
515 write_half.flush().await.unwrap();
516
517 let stream_reader = tokio::spawn(read_chunked(read_half, 2.bytes(), tx));
518
519 drop(write_half); stream_reader.await.unwrap();
521
522 let mut chunks = Vec::<String>::new();
523 while let Ok(Some(chunk)) = rx.recv().await {
524 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
525 }
526 assert_that(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
527 }
528
529 #[tokio::test]
530 #[traced_test]
531 async fn read_chunked_no_data() {
532 let (read_half, write_half) = tokio::io::duplex(64);
533 let (tx, mut rx) = broadcast::channel(10);
534
535 let stream_reader = tokio::spawn(read_chunked(read_half, 2.bytes(), tx));
536
537 drop(write_half); stream_reader.await.unwrap();
539
540 let mut chunks = Vec::<String>::new();
541 while let Ok(Some(chunk)) = rx.recv().await {
542 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
543 }
544 assert_that(chunks).is_empty();
545 }
546
547 #[tokio::test]
548 #[traced_test]
549 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
550 let (read_half, mut write_half) = tokio::io::duplex(64);
551 let os = BroadcastOutputStream::from_stream(
552 read_half,
553 "custom",
554 FromStreamOptions {
555 channel_capacity: 2,
556 ..Default::default()
557 },
558 );
559
560 let consumer = os.inspect_lines_async(
561 |_line| async move {
562 sleep(Duration::from_millis(100)).await;
564 Next::Continue
565 },
566 LineParsingOptions::default(),
567 );
568
569 #[rustfmt::skip]
570 let producer = tokio::spawn(async move {
571 for count in 1..=15 {
572 write_half
573 .write_all(format!("{count}\n").as_bytes())
574 .await
575 .unwrap();
576 sleep(Duration::from_millis(25)).await;
577 }
578 write_half.flush().await.unwrap();
579 drop(write_half);
580 });
581
582 producer.await.unwrap();
583 consumer.wait().await.unwrap();
584 drop(os);
585
586 logs_assert(|lines: &[&str]| {
587 match lines
588 .iter()
589 .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
590 .count()
591 {
592 1 => {}
593 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
594 };
595 match lines
596 .iter()
597 .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
598 .count()
599 {
600 2 => {}
601 3 => {}
602 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
603 };
604 Ok(())
605 });
606 }
607
608 #[tokio::test]
609 async fn inspect_lines() {
610 let (read_half, write_half) = tokio::io::duplex(64);
611 let os =
612 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
613
614 #[automock]
615 trait LineVisitor {
616 fn visit(&self, line: String);
617 }
618
619 let mut mock = MockLineVisitor::new();
620 #[rustfmt::skip]
621 fn configure(mock: &mut MockLineVisitor) {
622 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
623 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
624 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
625 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
626 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
627 }
628 configure(&mut mock);
629
630 let inspector = os.inspect_lines(
631 move |line| {
632 mock.visit(line.into_owned());
633 Next::Continue
634 },
635 LineParsingOptions::default(),
636 );
637
638 tokio::spawn(write_test_data(write_half)).await.unwrap();
639
640 inspector.cancel().await.unwrap();
641 drop(os)
642 }
643
644 #[tokio::test]
648 #[traced_test]
649 async fn inspect_lines_async() {
650 let (read_half, mut write_half) = tokio::io::duplex(64);
651 let os = BroadcastOutputStream::from_stream(
652 read_half,
653 "custom",
654 FromStreamOptions {
655 chunk_size: 32.bytes(),
656 ..Default::default()
657 },
658 );
659
660 let seen: Vec<String> = Vec::new();
661 let collector = os.collect_lines_async(
662 seen,
663 move |line, seen: &mut Vec<String>| {
664 Box::pin(async move {
665 if line == "break" {
666 seen.push(line.into_owned());
667 Next::Break
668 } else {
669 seen.push(line.into_owned());
670 Next::Continue
671 }
672 })
673 },
674 LineParsingOptions::default(),
675 );
676
677 let _writer = tokio::spawn(async move {
678 write_half.write_all("start\n".as_bytes()).await.unwrap();
679 write_half.write_all("break\n".as_bytes()).await.unwrap();
680 write_half.write_all("end\n".as_bytes()).await.unwrap();
681
682 loop {
683 write_half
684 .write_all("gibberish\n".as_bytes())
685 .await
686 .unwrap();
687 tokio::time::sleep(Duration::from_millis(50)).await;
688 }
689 });
690
691 let seen = collector.wait().await.unwrap();
692
693 assert_that(seen).contains_exactly(["start", "break"]);
694 }
695
696 #[tokio::test]
697 async fn collect_lines_to_file() {
698 let (read_half, write_half) = tokio::io::duplex(64);
699 let os = BroadcastOutputStream::from_stream(
700 read_half,
701 "custom",
702 FromStreamOptions {
703 channel_capacity: 32,
704 ..Default::default()
705 },
706 );
707
708 let temp_file = tempfile::tempfile().unwrap();
709 let collector = os.collect_lines(
710 temp_file,
711 |line, temp_file| {
712 writeln!(temp_file, "{}", line).unwrap();
713 Next::Continue
714 },
715 LineParsingOptions::default(),
716 );
717
718 tokio::spawn(write_test_data(write_half)).await.unwrap();
719
720 let mut temp_file = collector.cancel().await.unwrap();
721 temp_file.seek(SeekFrom::Start(0)).unwrap();
722 let mut contents = String::new();
723 temp_file.read_to_string(&mut contents).unwrap();
724
725 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
726 }
727
728 #[tokio::test]
729 async fn collect_lines_async_to_file() {
730 let (read_half, write_half) = tokio::io::duplex(64);
731 let os = BroadcastOutputStream::from_stream(
732 read_half,
733 "custom",
734 FromStreamOptions {
735 chunk_size: 32.bytes(),
736 ..Default::default()
737 },
738 );
739
740 let temp_file = tempfile::tempfile().unwrap();
741 let collector = os.collect_lines_async(
742 temp_file,
743 |line, temp_file| {
744 Box::pin(async move {
745 writeln!(temp_file, "{}", line).unwrap();
746 Next::Continue
747 })
748 },
749 LineParsingOptions::default(),
750 );
751
752 tokio::spawn(write_test_data(write_half)).await.unwrap();
753
754 let mut temp_file = collector.cancel().await.unwrap();
755 temp_file.seek(SeekFrom::Start(0)).unwrap();
756 let mut contents = String::new();
757 temp_file.read_to_string(&mut contents).unwrap();
758
759 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
760 }
761
762 #[tokio::test]
763 #[traced_test]
764 async fn collect_chunks_into_write_mapped() {
765 let (read_half, write_half) = tokio::io::duplex(64);
766 let os = BroadcastOutputStream::from_stream(
767 read_half,
768 "custom",
769 FromStreamOptions {
770 chunk_size: 32.bytes(),
771 ..Default::default()
772 },
773 );
774
775 let temp_file = tokio::fs::File::options()
776 .create(true)
777 .truncate(true)
778 .write(true)
779 .read(true)
780 .open(std::env::temp_dir().join(
781 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
782 ))
783 .await
784 .unwrap();
785
786 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
787 String::from_utf8_lossy(chunk.as_ref()).to_string()
788 });
789
790 tokio::spawn(write_test_data(write_half)).await.unwrap();
791
792 let mut temp_file = collector.cancel().await.unwrap();
793 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
794 let mut contents = String::new();
795 temp_file.read_to_string(&mut contents).await.unwrap();
796
797 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
798 }
799
800 #[tokio::test]
801 #[traced_test]
802 async fn collect_chunks_into_write_in_parallel() {
803 let (read_half, write_half) = tokio::io::duplex(64);
805
806 let os = BroadcastOutputStream::from_stream(
807 read_half,
808 "custom",
809 FromStreamOptions {
810 chunk_size: 32.bytes(),
813 channel_capacity: 2,
814 },
815 );
816
817 let file1 = tokio::fs::File::options()
818 .create(true)
819 .truncate(true)
820 .write(true)
821 .read(true)
822 .open(
823 std::env::temp_dir()
824 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
825 )
826 .await
827 .unwrap();
828 let file2 = tokio::fs::File::options()
829 .create(true)
830 .truncate(true)
831 .write(true)
832 .read(true)
833 .open(
834 std::env::temp_dir()
835 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
836 )
837 .await
838 .unwrap();
839
840 let collector1 = os.collect_chunks_into_write(file1);
841 let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
842 format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
843 });
844
845 tokio::spawn(write_test_data(write_half)).await.unwrap();
846
847 let mut temp_file1 = collector1.cancel().await.unwrap();
848 temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
849 let mut contents = String::new();
850 temp_file1.read_to_string(&mut contents).await.unwrap();
851 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
852
853 let mut temp_file2 = collector2.cancel().await.unwrap();
854 temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
855 let mut contents = String::new();
856 temp_file2.read_to_string(&mut contents).await.unwrap();
857 assert_that(contents)
858 .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
859 }
860}