1use crate::InspectorError;
2use crate::collector::{AsyncCollectFn, Collector, Sink};
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{Chunk, FromStreamOptions, LineReader, Next};
9use crate::output_stream::{LineParsingOptions, OutputStream};
10use std::fmt::{Debug, Formatter};
11use std::future::Future;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
15use tokio::sync::broadcast::error::RecvError;
16use tokio::sync::{RwLock, broadcast};
17use tokio::task::JoinHandle;
18use tokio::time::error::Elapsed;
19
20pub struct BroadcastOutputStream {
26 stream_reader: JoinHandle<()>,
29
30 sender: broadcast::Sender<Option<Chunk>>,
34}
35
36impl OutputStream for BroadcastOutputStream {}
37
38impl Drop for BroadcastOutputStream {
39 fn drop(&mut self) {
40 self.stream_reader.abort();
41 }
42}
43
44impl Debug for BroadcastOutputStream {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("BroadcastOutputStream")
47 .field("output_collector", &"non-debug < JoinHandle<()> >")
48 .field(
49 "sender",
50 &"non-debug < tokio::sync::broadcast::Sender<Option<Chunk>> >",
51 )
52 .finish()
53 }
54}
55
56async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
60 mut read: B,
61 chunk_size: usize,
62 sender: broadcast::Sender<Option<Chunk>>,
63) {
64 let send_chunk = move |chunk: Option<Chunk>| {
65 match sender.send(chunk) {
70 Ok(_received_by) => {}
71 Err(err) => {
72 tracing::debug!(
77 error = %err,
78 "No active receivers for the output chunk, dropping it"
79 );
80 }
81 }
82 };
83
84 let mut buf = bytes::BytesMut::with_capacity(chunk_size);
86 loop {
87 let _ = buf.try_reclaim(chunk_size);
88 match read.read_buf(&mut buf).await {
89 Ok(bytes_read) => {
90 let is_eof = bytes_read == 0;
91
92 match is_eof {
93 true => send_chunk(None),
94 false => {
95 while !buf.is_empty() {
96 let split_to = usize::min(chunk_size, buf.len());
100 send_chunk(Some(Chunk(buf.split_to(split_to).freeze())));
109 }
110 }
111 };
112
113 if is_eof {
114 break;
115 }
116 }
117 Err(err) => panic!("Could not read from stream: {err}"),
118 }
119 }
120}
121
122impl BroadcastOutputStream {
123 pub(crate) fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
124 stream: S,
125 options: FromStreamOptions,
126 ) -> BroadcastOutputStream {
127 let (sender, receiver) = broadcast::channel::<Option<Chunk>>(options.channel_capacity);
128 drop(receiver);
129
130 let stream_reader = tokio::spawn(read_chunked(stream, options.chunk_size, sender.clone()));
131
132 BroadcastOutputStream {
133 stream_reader,
134 sender,
135 }
136 }
137
138 fn subscribe(&self) -> broadcast::Receiver<Option<Chunk>> {
139 self.sender.subscribe()
140 }
141}
142
143macro_rules! handle_subscription {
148 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
149 $loop_label: loop {
150 tokio::select! {
151 out = $receiver.recv() => {
152 match out {
153 Ok(maybe_chunk) => {
154 let $chunk = maybe_chunk;
155 $body
156 }
157 Err(RecvError::Closed) => {
158 break $loop_label;
160 },
161 Err(RecvError::Lagged(lagged)) => {
162 tracing::warn!(lagged, "Inspector is lagging behind");
163 }
164 }
165 }
166 _msg = &mut $term_rx => break $loop_label,
167 }
168 }
169 };
170}
171
172impl BroadcastOutputStream {
174 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
175 pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
176 let mut receiver = self.subscribe();
177 impl_inspect_chunks!(receiver, f, handle_subscription)
178 }
179
180 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
181 pub fn inspect_lines(
182 &self,
183 mut f: impl FnMut(String) -> Next + Send + 'static,
184 options: LineParsingOptions,
185 ) -> Inspector {
186 let mut receiver = self.subscribe();
187 impl_inspect_lines!(receiver, f, options, handle_subscription)
188 }
189
190 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
191 pub fn inspect_lines_async<Fut>(
192 &self,
193 mut f: impl FnMut(String) -> Fut + Send + 'static,
194 options: LineParsingOptions,
195 ) -> Inspector
196 where
197 Fut: Future<Output = Next> + Send,
198 {
199 let mut receiver = self.subscribe();
200 impl_inspect_lines_async!(receiver, f, options, handle_subscription)
201 }
202}
203
204impl BroadcastOutputStream {
206 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
207 pub fn collect_chunks<S: Sink>(
208 &self,
209 into: S,
210 mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
211 ) -> Collector<S> {
212 let sink = Arc::new(RwLock::new(into));
213 let mut receiver = self.subscribe();
214 impl_collect_chunks!(receiver, collect, sink, handle_subscription)
215 }
216
217 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
218 pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
219 where
220 S: Sink,
221 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
222 {
223 let sink = Arc::new(RwLock::new(into));
224 let mut receiver = self.subscribe();
225 impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
226 }
227
228 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
229 pub fn collect_lines<S: Sink>(
230 &self,
231 into: S,
232 mut collect: impl FnMut(String, &mut S) -> Next + Send + 'static,
233 options: LineParsingOptions,
234 ) -> Collector<S> {
235 let sink = Arc::new(RwLock::new(into));
236 let mut receiver = self.subscribe();
237 impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
238 }
239
240 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
241 pub fn collect_lines_async<S, F>(
242 &self,
243 into: S,
244 collect: F,
245 options: LineParsingOptions,
246 ) -> Collector<S>
247 where
248 S: Sink,
249 F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
250 {
251 let sink = Arc::new(RwLock::new(into));
252 let mut receiver = self.subscribe();
253 impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
254 }
255
256 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
257 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
258 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
259 }
260
261 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
262 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
263 self.collect_lines(
264 Vec::new(),
265 |line, vec| {
266 vec.push(line);
267 Next::Continue
268 },
269 options,
270 )
271 }
272
273 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
274 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
275 &self,
276 write: W,
277 ) -> Collector<W> {
278 self.collect_chunks_async(write, move |chunk, write| {
279 Box::pin(async move {
280 if let Err(err) = write.write_all(chunk.as_ref()).await {
281 tracing::warn!("Could not write chunk to write sink: {err:#?}");
282 };
283 Next::Continue
284 })
285 })
286 }
287
288 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
289 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
290 &self,
291 write: W,
292 options: LineParsingOptions,
293 ) -> Collector<W> {
294 self.collect_lines_async(
295 write,
296 move |line, write| {
297 Box::pin(async move {
298 if let Err(err) = write.write_all(line.as_bytes()).await {
299 tracing::warn!("Could not write line to write sink: {err:#?}");
300 };
301 Next::Continue
302 })
303 },
304 options,
305 )
306 }
307
308 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
309 pub fn collect_chunks_into_write_mapped<
310 W: Sink + AsyncWriteExt + Unpin,
311 B: AsRef<[u8]> + Send,
312 >(
313 &self,
314 write: W,
315 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
316 ) -> Collector<W> {
317 self.collect_chunks_async(write, move |chunk, write| {
318 Box::pin(async move {
319 let mapped = mapper(chunk);
320 let mapped = mapped.as_ref();
321 if let Err(err) = write.write_all(mapped).await {
322 tracing::warn!("Could not write chunk to write sink: {err:#?}");
323 };
324 Next::Continue
325 })
326 })
327 }
328
329 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
330 pub fn collect_lines_into_write_mapped<
331 W: Sink + AsyncWriteExt + Unpin,
332 B: AsRef<[u8]> + Send,
333 >(
334 &self,
335 write: W,
336 mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
337 options: LineParsingOptions,
338 ) -> Collector<W> {
339 self.collect_lines_async(
340 write,
341 move |line, write| {
342 Box::pin(async move {
343 let mapped = mapper(line);
344 let mapped = mapped.as_ref();
345 if let Err(err) = write.write_all(mapped).await {
346 tracing::warn!("Could not write line to write sink: {err:#?}");
347 };
348 Next::Continue
349 })
350 },
351 options,
352 )
353 }
354}
355
356impl BroadcastOutputStream {
358 pub async fn wait_for_line(
359 &self,
360 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
361 options: LineParsingOptions,
362 ) {
363 let inspector = self.inspect_lines(
364 move |line| {
365 if predicate(line) {
366 Next::Break
367 } else {
368 Next::Continue
369 }
370 },
371 options,
372 );
373 match inspector.wait().await {
374 Ok(()) => {}
375 Err(err) => match err {
376 InspectorError::TaskJoin(join_error) => {
377 panic!("Inspector task join error: {join_error:#?}");
378 }
379 },
380 };
381 }
382
383 pub async fn wait_for_line_with_timeout(
384 &self,
385 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
386 options: LineParsingOptions,
387 timeout: Duration,
388 ) -> Result<(), Elapsed> {
389 tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
390 }
391}
392
393pub struct LineConfig {
395 pub max_line_length: usize,
401}
402
403#[cfg(test)]
404mod tests {
405 use super::read_chunked;
406 use crate::output_stream::broadcast::BroadcastOutputStream;
407 use crate::output_stream::tests::write_test_data;
408 use crate::output_stream::{FromStreamOptions, LineParsingOptions, Next};
409 use assertr::assert_that;
410 use assertr::prelude::{LengthAssertions, PartialEqAssertions, VecAssertions};
411 use mockall::*;
412 use std::io::Read;
413 use std::io::Seek;
414 use std::io::SeekFrom;
415 use std::io::Write;
416 use std::time::Duration;
417 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
418 use tokio::sync::broadcast;
419 use tokio::time::sleep;
420 use tracing_test::traced_test;
421
422 #[tokio::test]
423 #[traced_test]
424 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
425 {
426 let (read_half, mut write_half) = tokio::io::duplex(64);
427 let (tx, mut rx) = broadcast::channel(10);
428
429 write_half.write_all(b"hello world").await.unwrap();
436 write_half.flush().await.unwrap();
437
438 let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
439
440 drop(write_half); stream_reader.await.unwrap();
442
443 let mut chunks = Vec::<String>::new();
444 while let Ok(Some(chunk)) = rx.recv().await {
445 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
446 }
447 assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
448 }
449
450 #[tokio::test]
451 #[traced_test]
452 async fn read_chunked_no_data() {
453 let (read_half, write_half) = tokio::io::duplex(64);
454 let (tx, mut rx) = broadcast::channel(10);
455
456 let stream_reader = tokio::spawn(read_chunked(read_half, 2, tx));
457
458 drop(write_half); stream_reader.await.unwrap();
460
461 let mut chunks = Vec::<String>::new();
462 while let Ok(Some(chunk)) = rx.recv().await {
463 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
464 }
465 assert_that(chunks).is_empty();
466 }
467
468 #[tokio::test]
469 #[traced_test]
470 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
471 let (read_half, mut write_half) = tokio::io::duplex(64);
472 let os = BroadcastOutputStream::from_stream(
473 read_half,
474 FromStreamOptions {
475 channel_capacity: 2,
476 ..Default::default()
477 },
478 );
479
480 let consumer = os.inspect_lines_async(
481 async |_line| {
482 sleep(Duration::from_millis(100)).await;
484 Next::Continue
485 },
486 LineParsingOptions::default(),
487 );
488
489 #[rustfmt::skip]
490 let producer = tokio::spawn(async move {
491 for count in 1..=15 {
492 write_half
493 .write(format!("{count}\n").as_bytes())
494 .await
495 .unwrap();
496 sleep(Duration::from_millis(25)).await;
497 }
498 write_half.flush().await.unwrap();
499 drop(write_half);
500 });
501
502 producer.await.unwrap();
503 consumer.wait().await.unwrap();
504 drop(os);
505
506 logs_assert(|lines: &[&str]| {
507 match lines
508 .iter()
509 .filter(|line| line.contains("Inspector is lagging behind lagged=1"))
510 .count()
511 {
512 1 => {}
513 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
514 };
515 match lines
516 .iter()
517 .filter(|line| line.contains("Inspector is lagging behind lagged=3"))
518 .count()
519 {
520 2 => {}
521 3 => {}
522 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
523 };
524 Ok(())
525 });
526 }
527
528 #[tokio::test]
529 async fn inspect_lines() {
530 let (read_half, write_half) = tokio::io::duplex(64);
531 let os = BroadcastOutputStream::from_stream(read_half, FromStreamOptions::default());
532
533 #[automock]
534 trait LineVisitor {
535 fn visit(&self, line: String);
536 }
537
538 let mut mock = MockLineVisitor::new();
539 #[rustfmt::skip]
540 fn configure(mock: &mut MockLineVisitor) {
541 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
542 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
543 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
544 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
545 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
546 }
547 configure(&mut mock);
548
549 let inspector = os.inspect_lines(
550 move |line| {
551 mock.visit(line);
552 Next::Continue
553 },
554 LineParsingOptions::default(),
555 );
556
557 tokio::spawn(write_test_data(write_half)).await.unwrap();
558
559 inspector.cancel().await.unwrap();
560 drop(os)
561 }
562
563 #[tokio::test]
567 #[traced_test]
568 async fn inspect_lines_async() {
569 let (read_half, mut write_half) = tokio::io::duplex(64);
570 let os = BroadcastOutputStream::from_stream(
571 read_half,
572 FromStreamOptions {
573 chunk_size: 32,
574 ..Default::default()
575 },
576 );
577
578 let seen: Vec<String> = Vec::new();
579 let collector = os.collect_lines_async(
580 seen,
581 move |line, seen: &mut Vec<String>| {
582 Box::pin(async move {
583 if line == "break" {
584 seen.push(line);
585 Next::Break
586 } else {
587 seen.push(line);
588 Next::Continue
589 }
590 })
591 },
592 LineParsingOptions::default(),
593 );
594
595 let _writer = tokio::spawn(async move {
596 write_half.write_all("start\n".as_bytes()).await.unwrap();
597 write_half.write_all("break\n".as_bytes()).await.unwrap();
598 write_half.write_all("end\n".as_bytes()).await.unwrap();
599
600 loop {
601 write_half
602 .write_all("gibberish\n".as_bytes())
603 .await
604 .unwrap();
605 tokio::time::sleep(Duration::from_millis(50)).await;
606 }
607 });
608
609 let seen = collector.wait().await.unwrap();
610
611 assert_that(seen).contains_exactly(&["start", "break"]);
612 }
613
614 #[tokio::test]
615 async fn collect_lines_to_file() {
616 let (read_half, write_half) = tokio::io::duplex(64);
617 let os = BroadcastOutputStream::from_stream(
618 read_half,
619 FromStreamOptions {
620 channel_capacity: 32,
621 ..Default::default()
622 },
623 );
624
625 let temp_file = tempfile::tempfile().unwrap();
626 let collector = os.collect_lines(
627 temp_file,
628 |line, temp_file| {
629 writeln!(temp_file, "{}", line).unwrap();
630 Next::Continue
631 },
632 LineParsingOptions::default(),
633 );
634
635 tokio::spawn(write_test_data(write_half)).await.unwrap();
636
637 let mut temp_file = collector.cancel().await.unwrap();
638 temp_file.seek(SeekFrom::Start(0)).unwrap();
639 let mut contents = String::new();
640 temp_file.read_to_string(&mut contents).unwrap();
641
642 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
643 }
644
645 #[tokio::test]
646 async fn collect_lines_async_to_file() {
647 let (read_half, write_half) = tokio::io::duplex(64);
648 let os = BroadcastOutputStream::from_stream(
649 read_half,
650 FromStreamOptions {
651 chunk_size: 32,
652 ..Default::default()
653 },
654 );
655
656 let temp_file = tempfile::tempfile().unwrap();
657 let collector = os.collect_lines_async(
658 temp_file,
659 |line, temp_file| {
660 Box::pin(async move {
661 writeln!(temp_file, "{}", line).unwrap();
662 Next::Continue
663 })
664 },
665 LineParsingOptions::default(),
666 );
667
668 tokio::spawn(write_test_data(write_half)).await.unwrap();
669
670 let mut temp_file = collector.cancel().await.unwrap();
671 temp_file.seek(SeekFrom::Start(0)).unwrap();
672 let mut contents = String::new();
673 temp_file.read_to_string(&mut contents).unwrap();
674
675 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
676 }
677
678 #[tokio::test]
679 #[traced_test]
680 async fn collect_chunks_into_write_mapped() {
681 let (read_half, write_half) = tokio::io::duplex(64);
682 let os = BroadcastOutputStream::from_stream(
683 read_half,
684 FromStreamOptions {
685 chunk_size: 32,
686 ..Default::default()
687 },
688 );
689
690 let temp_file = tokio::fs::File::options()
691 .create(true)
692 .truncate(true)
693 .write(true)
694 .read(true)
695 .open(std::env::temp_dir().join(
696 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
697 ))
698 .await
699 .unwrap();
700
701 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
702 String::from_utf8_lossy(chunk.as_ref()).to_string()
703 });
704
705 tokio::spawn(write_test_data(write_half)).await.unwrap();
706
707 let mut temp_file = collector.cancel().await.unwrap();
708 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
709 let mut contents = String::new();
710 temp_file.read_to_string(&mut contents).await.unwrap();
711
712 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
713 }
714
715 #[tokio::test]
716 #[traced_test]
717 async fn collect_chunks_into_write_in_parallel() {
718 let (read_half, write_half) = tokio::io::duplex(64);
720
721 let os = BroadcastOutputStream::from_stream(
722 read_half,
723 FromStreamOptions {
724 chunk_size: 64,
727 channel_capacity: 2,
728 ..Default::default()
729 },
730 );
731
732 let file1 = tokio::fs::File::options()
733 .create(true)
734 .truncate(true)
735 .write(true)
736 .read(true)
737 .open(
738 std::env::temp_dir()
739 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
740 )
741 .await
742 .unwrap();
743 let file2 = tokio::fs::File::options()
744 .create(true)
745 .truncate(true)
746 .write(true)
747 .read(true)
748 .open(
749 std::env::temp_dir()
750 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
751 )
752 .await
753 .unwrap();
754
755 let collector1 = os.collect_chunks_into_write(file1);
756 let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
757 format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
758 });
759
760 tokio::spawn(write_test_data(write_half)).await.unwrap();
761
762 let mut temp_file1 = collector1.cancel().await.unwrap();
763 temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
764 let mut contents = String::new();
765 temp_file1.read_to_string(&mut contents).await.unwrap();
766 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
767
768 let mut temp_file2 = collector2.cancel().await.unwrap();
769 temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
770 let mut contents = String::new();
771 temp_file2.read_to_string(&mut contents).await.unwrap();
772 assert_that(contents)
773 .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
774 }
775}