1use crate::InspectorError;
2use crate::collector::{AsyncCollectFn, Collector, Sink};
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use std::fmt::{Debug, Formatter};
11use std::future::Future;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
15use tokio::sync::broadcast::error::RecvError;
16use tokio::sync::{RwLock, broadcast};
17use tokio::task::JoinHandle;
18use tokio::time::error::Elapsed;
19
20pub struct BroadcastOutputStream {
27 stream_reader: JoinHandle<()>,
30
31 sender: broadcast::Sender<Option<Chunk>>,
35}
36
37impl OutputStream for BroadcastOutputStream {}
38
39impl Drop for BroadcastOutputStream {
40 fn drop(&mut self) {
41 self.stream_reader.abort();
42 }
43}
44
45impl Debug for BroadcastOutputStream {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("BroadcastOutputStream")
48 .field("output_collector", &"non-debug < JoinHandle<()> >")
49 .field(
50 "sender",
51 &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
52 )
53 .finish()
54 }
55}
56
57async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
61 mut read: B,
62 chunk_size: usize,
63 sender: broadcast::Sender<Option<Chunk>>,
64) {
65 let send_chunk = move |chunk: Option<Chunk>| {
66 match sender.send(chunk) {
71 Ok(_received_by) => {}
72 Err(err) => {
73 tracing::debug!(
78 error = %err,
79 "No active receivers for the output chunk, dropping it"
80 );
81 }
82 }
83 };
84
85 let mut buf = bytes::BytesMut::with_capacity(chunk_size);
87 loop {
88 let _ = buf.try_reclaim(chunk_size);
89 match read.read_buf(&mut buf).await {
90 Ok(bytes_read) => {
91 let is_eof = bytes_read == 0;
92
93 match is_eof {
94 true => send_chunk(None),
95 false => {
96 while !buf.is_empty() {
97 let split_to = usize::min(chunk_size, buf.len());
101 send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
110 }
111 }
112 };
113
114 if is_eof {
115 break;
116 }
117 }
118 Err(err) => panic!("Could not read from stream: {err}"),
119 }
120 }
121}
122
123impl BroadcastOutputStream {
124 pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
125 stream: S,
126 options: FromStreamOptions,
127 ) -> BroadcastOutputStream {
128 let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
129 drop(receiver);
130
131 let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
132
133 BroadcastOutputStream {
134 stream_reader,
135 sender,
136 }
137 }
138
139 fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
140 self.sender.subscribe()
141 }
142}
143
144macro_rules! handle_subscription {
149 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
150 $loop_label: loop {
151 tokio::select! {
152 out = $receiver.recv() => {
153 match out {
154 Ok(maybe_chunk) => {
155 let $chunk = maybe_chunk;
156 $body
157 }
158 Err(RecvError::Closed) => {
159 break $loop_label;
161 },
162 Err(RecvError::Lagged(lagged)) => {
163 tracing::warn!(lagged, "Inspector is lagging behind");
164 }
165 }
166 }
167 _msg = &mut $term_rx => break $loop_label,
168 }
169 }
170 };
171}
172
173impl BroadcastOutputStream {
175 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
180 pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
181 let mut receiver = self.subscribe();
182 impl_inspect_chunks!(receiver, f, handle_subscription)
183 }
184
185 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
190 pub fn inspect_lines(
191 &self,
192 mut f: impl FnMut(String) -> Next + Send + 'static,
193 options: LineParsingOptions,
194 ) -> Inspector {
195 let mut receiver = self.subscribe();
196 impl_inspect_lines!(receiver, f, options, handle_subscription)
197 }
198
199 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
204 pub fn inspect_lines_async<Fut>(
205 &self,
206 mut f: impl FnMut(String) -> Fut + Send + 'static,
207 options: LineParsingOptions,
208 ) -> Inspector
209 where
210 Fut: Future<Output = Next> + Send,
211 {
212 let mut receiver = self.subscribe();
213 impl_inspect_lines_async!(receiver, f, options, handle_subscription)
214 }
215}
216
217impl BroadcastOutputStream {
219 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
223 pub fn collect_chunks<S: Sink>(
224 &self,
225 into: S,
226 mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
227 ) -> Collector<S> {
228 let sink = Arc::new(RwLock::new(into));
229 let mut receiver = self.subscribe();
230 impl_collect_chunks!(receiver, collect, sink, handle_subscription)
231 }
232
233 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
237 pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
238 where
239 S: Sink,
240 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
241 {
242 let sink = Arc::new(RwLock::new(into));
243 let mut receiver = self.subscribe();
244 impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
245 }
246
247 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
252 pub fn collect_lines<S: Sink>(
253 &self,
254 into: S,
255 mut collect: impl FnMut(String, &mut S) -> Next + Send + 'static,
256 options: LineParsingOptions,
257 ) -> Collector<S> {
258 let sink = Arc::new(RwLock::new(into));
259 let mut receiver = self.subscribe();
260 impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
261 }
262
263 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
268 pub fn collect_lines_async<S, F>(
269 &self,
270 into: S,
271 collect: F,
272 options: LineParsingOptions,
273 ) -> Collector<S>
274 where
275 S: Sink,
276 F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
277 {
278 let sink = Arc::new(RwLock::new(into));
279 let mut receiver = self.subscribe();
280 impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
281 }
282
283 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
285 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
286 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
287 }
288
289 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
291 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
292 self.collect_lines(
293 Vec::new(),
294 |line, vec| {
295 vec.push(line);
296 Next::Continue
297 },
298 options,
299 )
300 }
301
302 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
304 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
305 &self,
306 write: W,
307 ) -> Collector<W> {
308 self.collect_chunks_async(write, move |chunk, write| {
309 Box::pin(async move {
310 if let Err(err) = write.write_all(chunk.as_ref()).await {
311 tracing::warn!("Could not write chunk to write sink: {err:#?}");
312 };
313 Next::Continue
314 })
315 })
316 }
317
318 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
320 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
321 &self,
322 write: W,
323 options: LineParsingOptions,
324 ) -> Collector<W> {
325 self.collect_lines_async(
326 write,
327 move |line, write| {
328 Box::pin(async move {
329 if let Err(err) = write.write_all(line.as_bytes()).await {
330 tracing::warn!("Could not write line to write sink: {err:#?}");
331 };
332 Next::Continue
333 })
334 },
335 options,
336 )
337 }
338
339 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
341 pub fn collect_chunks_into_write_mapped<
342 W: Sink + AsyncWriteExt + Unpin,
343 B: AsRef<[u8]> + Send,
344 >(
345 &self,
346 write: W,
347 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
348 ) -> Collector<W> {
349 self.collect_chunks_async(write, move |chunk, write| {
350 Box::pin(async move {
351 let mapped = mapper(chunk);
352 let mapped = mapped.as_ref();
353 if let Err(err) = write.write_all(mapped).await {
354 tracing::warn!("Could not write chunk to write sink: {err:#?}");
355 };
356 Next::Continue
357 })
358 })
359 }
360
361 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
363 pub fn collect_lines_into_write_mapped<
364 W: Sink + AsyncWriteExt + Unpin,
365 B: AsRef<[u8]> + Send,
366 >(
367 &self,
368 write: W,
369 mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
370 options: LineParsingOptions,
371 ) -> Collector<W> {
372 self.collect_lines_async(
373 write,
374 move |line, write| {
375 Box::pin(async move {
376 let mapped = mapper(line);
377 let mapped = mapped.as_ref();
378 if let Err(err) = write.write_all(mapped).await {
379 tracing::warn!("Could not write line to write sink: {err:#?}");
380 };
381 Next::Continue
382 })
383 },
384 options,
385 )
386 }
387}
388
389impl BroadcastOutputStream {
391 pub async fn wait_for_line(
395 &self,
396 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
397 options: LineParsingOptions,
398 ) {
399 let inspector = self.inspect_lines(
400 move |line| {
401 if predicate(line) {
402 Next::Break
403 } else {
404 Next::Continue
405 }
406 },
407 options,
408 );
409 match inspector.wait().await {
410 Ok(()) => {}
411 Err(err) => match err {
412 InspectorError::TaskJoin(join_error) => {
413 panic!("Inspector task join error: {join_error:#?}");
414 }
415 },
416 };
417 }
418
419 pub async fn wait_for_line_with_timeout(
423 &self,
424 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
425 options: LineParsingOptions,
426 timeout: Duration,
427 ) -> Result<(), Elapsed> {
428 tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
429 }
430}
431
432pub struct LineConfig {
434 pub max_line_length: usize,
440}
441
442#[cfg(test)]
443mod tests {
444 use super::read_chunked;
445 use crate::output_stream::broadcast::BroadcastOutputStream;
446 use crate::output_stream::tests::write_test_data;
447 use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
448 use assertr::assert_that;
449 use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
450 use mockall::*;
451 use std::io::Read;
452 use std::io::Seek;
453 use std::io::SeekFrom;
454 use std::io::Write;
455 use std::time::Duration;
456 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
457 use tokio::sync::broadcast;
458 use tokio::time::sleep;
459 use tracing_test::traced_test;
460
461 #[tokio::test]
462 #[traced_test]
463 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
464 {
465 let (read_half, mut write_half) = tokio::io::duplex(64);
466 let (tx, mut rx) = broadcast::channel(10);
467
468 write_half.write_all(b"hello world").await.unwrap();
475 write_half.flush().await.unwrap();
476
477 let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
478
479 drop(write_half); stream_reader.await.unwrap();
481
482 let mut chunks = Vec::<String>::new();
483 while let Ok(Some(chunk)) = rx.recv().await {
484 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
485 }
486 assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
487 }
488
489 #[tokio::test]
490 #[traced_test]
491 async fn read_chunked_no_data() {
492 let (read_half, write_half) = tokio::io::duplex(64);
493 let (tx, mut rx) = broadcast::channel(10);
494
495 let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
496
497 drop(write_half); stream_reader.await.unwrap();
499
500 let mut chunks = Vec::<String>::new();
501 while let Ok(Some(chunk)) = rx.recv().await {
502 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
503 }
504 assert_that(chunks).is_empty();
505 }
506
507 #[tokio::test]
508 #[traced_test]
509 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
510 let (read_half, mut write_half) = tokio::io::duplex(64);
511 let os = BroadcastOutputStream::from_stream(
512 read_half,
513 FromStreamOptions {
514 channel_capacity: 2,
515 ..Default::default()
516 },
517 );
518
519 let consumer = os.inspect_lines_async(
520 async |_line| {
521 sleep(Duration::from_millis(100)).await;
523 Next::Continue
524 },
525 LineParsingOptions::default(),
526 );
527
528 #[rustfmt::skip]
529 let producer = tokio::spawn(async move {
530 for count in 1..=15 {
531 write_half
532 .write(format!("{count}\n").as_bytes())
533 .await
534 .unwrap();
535 sleep(Duration::from_millis(25)).await;
536 }
537 write_half.flush().await.unwrap();
538 drop(write_half);
539 });
540
541 producer.await.unwrap();
542 consumer.wait().await.unwrap();
543 drop(os);
544
545 logs_assert(|lines: &[&str]| {
546 match lines
547 .iter()
548 .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
549 .count()
550 {
551 1 => {}
552 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
553 };
554 match lines
555 .iter()
556 .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
557 .count()
558 {
559 2 => {}
560 3 => {}
561 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
562 };
563 Ok(())
564 });
565 }
566
567 #[tokio::test]
568 async fn inspect_lines() {
569 let (read_half, write_half) = tokio::io::duplex(64);
570 let os = BroadcastOutputStream::from_stream(read_half, FromStreamOptions::default());
571
572 #[automock]
573 trait LineVisitor {
574 fn visit(&self, line: String);
575 }
576
577 let mut mock = MockLineVisitor::new();
578 #[rustfmt::skip]
579 fn configure(mock: &mut MockLineVisitor) {
580 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
581 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
582 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
583 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
584 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
585 }
586 configure(&mut mock);
587
588 let inspector = os.inspect_lines(
589 move |line| {
590 mock.visit(line);
591 Next::Continue
592 },
593 LineParsingOptions::default(),
594 );
595
596 tokio::spawn(write_test_data(write_half)).await.unwrap();
597
598 inspector.cancel().await.unwrap();
599 drop(os)
600 }
601
602 #[tokio::test]
606 #[traced_test]
607 async fn inspect_lines_async() {
608 let (read_half, mut write_half) = tokio::io::duplex(64);
609 let os = BroadcastOutputStream::from_stream(
610 read_half,
611 FromStreamOptions {
612 chunk_size: 32,
613 ..Default::default()
614 },
615 );
616
617 let seen: Vec<String> = Vec::new();
618 let collector = os.collect_lines_async(
619 seen,
620 move |line, seen: &mut Vec<String>| {
621 Box::pin(async move {
622 if line == "break" {
623 seen.push(line);
624 Next::Break
625 } else {
626 seen.push(line);
627 Next::Continue
628 }
629 })
630 },
631 LineParsingOptions::default(),
632 );
633
634 let _writer = tokio::spawn(async move {
635 write_half.write_all("start\n".as_bytes()).await.unwrap();
636 write_half.write_all("break\n".as_bytes()).await.unwrap();
637 write_half.write_all("end\n".as_bytes()).await.unwrap();
638
639 loop {
640 write_half
641 .write_all("gibberish\n".as_bytes())
642 .await
643 .unwrap();
644 tokio::time::sleep(Duration::from_millis(50)).await;
645 }
646 });
647
648 let seen = collector.wait().await.unwrap();
649
650 assert_that(seen).contains_exactly(&["start", "break"]);
651 }
652
653 #[tokio::test]
654 async fn collect_lines_to_file() {
655 let (read_half, write_half) = tokio::io::duplex(64);
656 let os = BroadcastOutputStream::from_stream(
657 read_half,
658 FromStreamOptions {
659 channel_capacity: 32,
660 ..Default::default()
661 },
662 );
663
664 let temp_file = tempfile::tempfile().unwrap();
665 let collector = os.collect_lines(
666 temp_file,
667 |line, temp_file| {
668 writeln!(temp_file, "{}", line).unwrap();
669 Next::Continue
670 },
671 LineParsingOptions::default(),
672 );
673
674 tokio::spawn(write_test_data(write_half)).await.unwrap();
675
676 let mut temp_file = collector.cancel().await.unwrap();
677 temp_file.seek(SeekFrom::Start(0)).unwrap();
678 let mut contents = String::new();
679 temp_file.read_to_string(&mut contents).unwrap();
680
681 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
682 }
683
684 #[tokio::test]
685 async fn collect_lines_async_to_file() {
686 let (read_half, write_half) = tokio::io::duplex(64);
687 let os = BroadcastOutputStream::from_stream(
688 read_half,
689 FromStreamOptions {
690 chunk_size: 32,
691 ..Default::default()
692 },
693 );
694
695 let temp_file = tempfile::tempfile().unwrap();
696 let collector = os.collect_lines_async(
697 temp_file,
698 |line, temp_file| {
699 Box::pin(async move {
700 writeln!(temp_file, "{}", line).unwrap();
701 Next::Continue
702 })
703 },
704 LineParsingOptions::default(),
705 );
706
707 tokio::spawn(write_test_data(write_half)).await.unwrap();
708
709 let mut temp_file = collector.cancel().await.unwrap();
710 temp_file.seek(SeekFrom::Start(0)).unwrap();
711 let mut contents = String::new();
712 temp_file.read_to_string(&mut contents).unwrap();
713
714 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
715 }
716
717 #[tokio::test]
718 #[traced_test]
719 async fn collect_chunks_into_write_mapped() {
720 let (read_half, write_half) = tokio::io::duplex(64);
721 let os = BroadcastOutputStream::from_stream(
722 read_half,
723 FromStreamOptions {
724 chunk_size: 32,
725 ..Default::default()
726 },
727 );
728
729 let temp_file = tokio::fs::File::options()
730 .create(true)
731 .truncate(true)
732 .write(true)
733 .read(true)
734 .open(std::env::temp_dir().join(
735 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
736 ))
737 .await
738 .unwrap();
739
740 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
741 String::from_utf8_lossy(chunk.as_ref()).to_string()
742 });
743
744 tokio::spawn(write_test_data(write_half)).await.unwrap();
745
746 let mut temp_file = collector.cancel().await.unwrap();
747 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
748 let mut contents = String::new();
749 temp_file.read_to_string(&mut contents).await.unwrap();
750
751 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
752 }
753
754 #[tokio::test]
755 #[traced_test]
756 async fn collect_chunks_into_write_in_parallel() {
757 let (read_half, write_half) = tokio::io::duplex(64);
759
760 let os = BroadcastOutputStream::from_stream(
761 read_half,
762 FromStreamOptions {
763 chunk_size: 64,
766 channel_capacity: 2,
767 ..Default::default()
768 },
769 );
770
771 let file1 = tokio::fs::File::options()
772 .create(true)
773 .truncate(true)
774 .write(true)
775 .read(true)
776 .open(
777 std::env::temp_dir()
778 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
779 )
780 .await
781 .unwrap();
782 let file2 = tokio::fs::File::options()
783 .create(true)
784 .truncate(true)
785 .write(true)
786 .read(true)
787 .open(
788 std::env::temp_dir()
789 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
790 )
791 .await
792 .unwrap();
793
794 let collector1 = os.collect_chunks_into_write(file1);
795 let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
796 format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
797 });
798
799 tokio::spawn(write_test_data(write_half)).await.unwrap();
800
801 let mut temp_file1 = collector1.cancel().await.unwrap();
802 temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
803 let mut contents = String::new();
804 temp_file1.read_to_string(&mut contents).await.unwrap();
805 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
806
807 let mut temp_file2 = collector2.cancel().await.unwrap();
808 temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
809 let mut contents = String::new();
810 temp_file2.read_to_string(&mut contents).await.unwrap();
811 assert_that(contents)
812 .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
813 }
814}