1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
5 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
6};
7use crate::output_stream::{
8 BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
9};
10use crate::{InspectorError, LineParsingOptions};
11use std::fmt::{Debug, Formatter};
12use std::future::Future;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
16use tokio::sync::mpsc::error::TrySendError;
17use tokio::sync::{RwLock, mpsc};
18use tokio::task::JoinHandle;
19use tokio::time::error::Elapsed;
20
21pub struct SingleSubscriberOutputStream {
28 stream_reader: JoinHandle<()>,
31
32 receiver: Option<mpsc::Receiver<Option<Chunk>>>,
35}
36
37impl OutputStream for SingleSubscriberOutputStream {}
38
39impl Drop for SingleSubscriberOutputStream {
40 fn drop(&mut self) {
41 self.stream_reader.abort();
42 }
43}
44
45impl Debug for SingleSubscriberOutputStream {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("SingleSubscriberOutputStream")
48 .field("output_collector", &"non-debug < JoinHandle<()> >")
49 .field(
50 "receiver",
51 &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
52 )
53 .finish()
54 }
55}
56
57async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
61 mut read: R,
62 chunk_size: usize,
63 sender: mpsc::Sender<Option<Chunk>>,
64 backpressure_control: BackpressureControl,
65) {
66 struct AfterSend {
67 do_break: bool,
68 }
69
70 fn try_send_chunk(
71 chunk: Option<Chunk>,
72 sender: &mpsc::Sender<Option<Chunk>>,
73 lagged: &mut usize,
74 ) -> AfterSend {
75 match sender.try_send(chunk) {
76 Ok(()) => {
77 if *lagged > 0 {
78 tracing::debug!(lagged, "Stream reader is lagging behind");
79 *lagged = 0;
80 }
81 }
82 Err(err) => {
83 match err {
84 TrySendError::Full(_data) => {
85 *lagged += 1;
86 }
87 TrySendError::Closed(_data) => {
88 return AfterSend { do_break: true };
93 }
94 }
95 }
96 }
97 AfterSend { do_break: false }
98 }
99
100 async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
101 match sender.send(chunk).await {
102 Ok(()) => {}
103 Err(_err) => {
104 return AfterSend { do_break: true };
109 }
110 }
111 AfterSend { do_break: false }
112 }
113
114 let mut buf = bytes::BytesMut::with_capacity(chunk_size);
116 let mut lagged: usize = 0;
117 loop {
118 let _ = buf.try_reclaim(chunk_size);
119 match read.read_buf(&mut buf).await {
120 Ok(bytes_read) => {
121 let is_eof = bytes_read == 0;
122
123 match is_eof {
124 true => match backpressure_control {
125 BackpressureControl::DropLatestIncomingIfBufferFull => {
126 let after = try_send_chunk(None, &sender, &mut lagged);
127 if after.do_break {
128 break;
129 }
130 }
131 BackpressureControl::BlockUntilBufferHasSpace => {
132 let after = send_chunk(None, &sender).await;
133 if after.do_break {
134 break;
135 }
136 }
137 },
138 false => {
139 while !buf.is_empty() {
140 let split_to = usize::min(chunk_size, buf.len());
141
142 match backpressure_control {
143 BackpressureControl::DropLatestIncomingIfBufferFull => {
144 let after = try_send_chunk(
145 Some(Chunk(buf.split_to(split_to).freeze())),
146 &sender,
147 &mut lagged,
148 );
149 if after.do_break {
150 break;
151 }
152 }
153 BackpressureControl::BlockUntilBufferHasSpace => {
154 let after = send_chunk(
155 Some(Chunk(buf.split_to(split_to).freeze())),
156 &sender,
157 )
158 .await;
159 if after.do_break {
160 break;
161 }
162 }
163 }
164 }
165 }
166 };
167
168 if is_eof {
169 break;
170 }
171 }
172 Err(err) => panic!("Could not read from stream: {err}"),
173 }
174 }
175}
176
177impl SingleSubscriberOutputStream {
178 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
180 stream: S,
181 backpressure_control: BackpressureControl,
182 options: FromStreamOptions,
183 ) -> SingleSubscriberOutputStream {
184 let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
185
186 let stream_reader = tokio::spawn(read_chunked(
187 stream,
188 options.chunk_size,
189 tx_stdout,
190 backpressure_control,
191 ));
192
193 SingleSubscriberOutputStream {
194 stream_reader,
195 receiver: Some(rx_stdout),
196 }
197 }
198
199 fn take_receiver(&mut self) -> mpsc::Receiver<Option<Chunk>> {
200 self.receiver.take().expect("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.")
201 }
202}
203
204macro_rules! handle_subscription {
208 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
209 $loop_label: loop {
210 tokio::select! {
211 out = $receiver.recv() => {
212 match out {
213 Some(maybe_chunk) => {
214 let $chunk = maybe_chunk;
215 $body
216 }
217 None => {
218 break $loop_label;
220 }
221 }
222 }
223 _msg = &mut $term_rx => break $loop_label,
224 }
225 }
226 };
227}
228
229impl SingleSubscriberOutputStream {
231 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
236 pub fn inspect_chunks(&mut self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
237 let mut receiver = self.take_receiver();
238 impl_inspect_chunks!(receiver, f, handle_subscription)
239 }
240
241 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
246 pub fn inspect_lines(
247 &mut self,
248 mut f: impl FnMut(String) -> Next + Send + 'static,
249 options: LineParsingOptions,
250 ) -> Inspector {
251 let mut receiver = self.take_receiver();
252 impl_inspect_lines!(receiver, f, options, handle_subscription)
253 }
254
255 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
260 pub fn inspect_lines_async<Fut>(
261 &mut self,
262 mut f: impl FnMut(String) -> Fut + Send + 'static,
263 options: LineParsingOptions,
264 ) -> Inspector
265 where
266 Fut: Future<Output = Next> + Send,
267 {
268 let mut receiver = self.take_receiver();
269 impl_inspect_lines_async!(receiver, f, options, handle_subscription)
270 }
271}
272
273impl SingleSubscriberOutputStream {
275 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
279 pub fn collect_chunks<S: Sink>(
280 &mut self,
281 into: S,
282 collect: impl Fn(Chunk, &mut S) + Send + 'static,
283 ) -> Collector<S> {
284 let sink = Arc::new(RwLock::new(into));
285 let mut receiver = self.take_receiver();
286 impl_collect_chunks!(receiver, collect, sink, handle_subscription)
287 }
288
289 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
293 pub fn collect_chunks_async<S, F>(&mut self, into: S, collect: F) -> Collector<S>
294 where
295 S: Sink,
296 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
297 {
298 let sink = Arc::new(RwLock::new(into));
299 let mut receiver = self.take_receiver();
300 impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
301 }
302
303 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
308 pub fn collect_lines<S: Sink>(
309 &mut self,
310 into: S,
311 collect: impl Fn(String, &mut S) -> Next + Send + 'static,
312 options: LineParsingOptions,
313 ) -> Collector<S> {
314 let sink = Arc::new(RwLock::new(into));
315 let mut receiver = self.take_receiver();
316 impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
317 }
318
319 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
324 pub fn collect_lines_async<S, F>(
325 &mut self,
326 into: S,
327 collect: F,
328 options: LineParsingOptions,
329 ) -> Collector<S>
330 where
331 S: Sink,
332 F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
333 {
334 let sink = Arc::new(RwLock::new(into));
335 let mut receiver = self.take_receiver();
336 impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
337 }
338
339 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
341 pub fn collect_chunks_into_vec(&mut self) -> Collector<Vec<u8>> {
342 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
343 }
344
345 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
347 pub fn collect_lines_into_vec(
348 &mut self,
349 options: LineParsingOptions,
350 ) -> Collector<Vec<String>> {
351 self.collect_lines(
352 Vec::new(),
353 |line, vec| {
354 vec.push(line);
355 Next::Continue
356 },
357 options,
358 )
359 }
360
361 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
363 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
364 &mut self,
365 write: W,
366 ) -> Collector<W> {
367 self.collect_chunks_async(write, move |chunk, write| {
368 Box::pin(async move {
369 if let Err(err) = write.write_all(chunk.as_ref()).await {
370 tracing::warn!("Could not write chunk to write sink: {err:#?}");
371 };
372 Next::Continue
373 })
374 })
375 }
376
377 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
379 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
380 &mut self,
381 write: W,
382 options: LineParsingOptions,
383 ) -> Collector<W> {
384 self.collect_lines_async(
385 write,
386 move |line, write| {
387 Box::pin(async move {
388 if let Err(err) = write.write_all(line.as_bytes()).await {
389 tracing::warn!("Could not write line to write sink: {err:#?}");
390 };
391 Next::Continue
392 })
393 },
394 options,
395 )
396 }
397
398 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
400 pub fn collect_chunks_into_write_mapped<
401 W: Sink + AsyncWriteExt + Unpin,
402 B: AsRef<[u8]> + Send,
403 >(
404 &mut self,
405 write: W,
406 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
407 ) -> Collector<W> {
408 self.collect_chunks_async(write, move |chunk, write| {
409 Box::pin(async move {
410 let mapped = mapper(chunk);
411 let mapped = mapped.as_ref();
412 if let Err(err) = write.write_all(mapped).await {
413 tracing::warn!("Could not write chunk to write sink: {err:#?}");
414 };
415 Next::Continue
416 })
417 })
418 }
419
420 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
422 pub fn collect_lines_into_write_mapped<
423 W: Sink + AsyncWriteExt + Unpin,
424 B: AsRef<[u8]> + Send,
425 >(
426 &mut self,
427 write: W,
428 mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
429 options: LineParsingOptions,
430 ) -> Collector<W> {
431 self.collect_lines_async(
432 write,
433 move |line, write| {
434 Box::pin(async move {
435 let mapped = mapper(line);
436 let mapped = mapped.as_ref();
437 if let Err(err) = write.write_all(mapped).await {
438 tracing::warn!("Could not write line to write sink: {err:#?}");
439 };
440 Next::Continue
441 })
442 },
443 options,
444 )
445 }
446}
447
448impl SingleSubscriberOutputStream {
450 pub async fn wait_for_line(
454 &mut self,
455 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
456 options: LineParsingOptions,
457 ) {
458 let inspector = self.inspect_lines(
459 move |line| {
460 if predicate(line) {
461 Next::Break
462 } else {
463 Next::Continue
464 }
465 },
466 options,
467 );
468 match inspector.wait().await {
469 Ok(()) => {}
470 Err(err) => match err {
471 InspectorError::TaskJoin(join_error) => {
472 panic!("Inspector task join error: {join_error:#?}");
473 }
474 },
475 };
476 }
477
478 pub async fn wait_for_line_with_timeout(
482 &mut self,
483 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
484 options: LineParsingOptions,
485 timeout: Duration,
486 ) -> Result<(), Elapsed> {
487 tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use crate::LineParsingOptions;
494 use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
495 use crate::output_stream::tests::write_test_data;
496 use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
497 use crate::single_subscriber::read_chunked;
498 use assertr::prelude::*;
499 use mockall::{automock, predicate};
500 use std::io::{Read, Seek, SeekFrom, Write};
501 use std::time::Duration;
502 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
503 use tokio::sync::mpsc;
504 use tokio::time::sleep;
505 use tracing_test::traced_test;
506
507 #[tokio::test]
508 #[traced_test]
509 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
510 {
511 let (read_half, mut write_half) = tokio::io::duplex(64);
512 let (tx, mut rx) = mpsc::channel(64);
513
514 write_half.write_all(b"hello world").await.unwrap();
521 write_half.flush().await.unwrap();
522
523 let stream_reader = tokio::spawn(read_chunked(
524 read_half,
525 2,
526 tx,
527 BackpressureControl::DropLatestIncomingIfBufferFull,
528 ));
529
530 drop(write_half); stream_reader.await.unwrap();
532
533 let mut chunks = Vec::<String>::new();
534 while let Some(Some(chunk)) = rx.recv().await {
535 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
536 }
537 assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
538 }
539
540 #[tokio::test]
541 #[traced_test]
542 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
543 let (read_half, mut write_half) = tokio::io::duplex(64);
544 let mut os = SingleSubscriberOutputStream::from_stream(
545 read_half,
546 BackpressureControl::DropLatestIncomingIfBufferFull,
547 FromStreamOptions {
548 channel_capacity: 2,
549 ..Default::default()
550 },
551 );
552
553 let inspector = os.inspect_lines_async(
554 async |_line| {
555 sleep(Duration::from_millis(100)).await;
557 Next::Continue
558 },
559 LineParsingOptions::default(),
560 );
561
562 #[rustfmt::skip]
563 let producer = tokio::spawn(async move {
564 for count in 1..=15 {
565 write_half
566 .write(format!("{count}\n").as_bytes())
567 .await
568 .unwrap();
569 sleep(Duration::from_millis(25)).await;
570 }
571 });
572
573 producer.await.unwrap();
574 inspector.wait().await.unwrap();
575 drop(os);
576
577 logs_assert(|lines: &[&str]| {
578 match lines
579 .iter()
580 .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
581 .count()
582 {
583 1 => {}
584 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
585 };
586 match lines
587 .iter()
588 .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
589 .count()
590 {
591 2 => {}
592 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
593 };
594 Ok(())
595 });
596 }
597
598 #[tokio::test]
599 async fn inspect_lines() {
600 let (read_half, write_half) = tokio::io::duplex(64);
601 let mut os = SingleSubscriberOutputStream::from_stream(
602 read_half,
603 BackpressureControl::DropLatestIncomingIfBufferFull,
604 FromStreamOptions::default(),
605 );
606
607 #[automock]
608 trait LineVisitor {
609 fn visit(&self, line: String);
610 }
611
612 let mut mock = MockLineVisitor::new();
613 #[rustfmt::skip]
614 fn configure(mock: &mut MockLineVisitor) {
615 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
616 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
617 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
618 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
619 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
620 }
621 configure(&mut mock);
622
623 let inspector = os.inspect_lines(
624 move |line| {
625 mock.visit(line);
626 Next::Continue
627 },
628 LineParsingOptions::default(),
629 );
630
631 tokio::spawn(write_test_data(write_half)).await.unwrap();
632
633 inspector.cancel().await.unwrap();
634 drop(os)
635 }
636
637 #[tokio::test]
641 #[traced_test]
642 async fn inspect_lines_async() {
643 let (read_half, mut write_half) = tokio::io::duplex(64);
644 let mut os = SingleSubscriberOutputStream::from_stream(
645 read_half,
646 BackpressureControl::DropLatestIncomingIfBufferFull,
647 FromStreamOptions {
648 chunk_size: 32,
649 ..Default::default()
650 },
651 );
652
653 let seen: Vec<String> = Vec::new();
654 let collector = os.collect_lines_async(
655 seen,
656 move |line, seen: &mut Vec<String>| {
657 Box::pin(async move {
658 if line == "break" {
659 seen.push(line);
660 Next::Break
661 } else {
662 seen.push(line);
663 Next::Continue
664 }
665 })
666 },
667 LineParsingOptions::default(),
668 );
669
670 let _writer = tokio::spawn(async move {
671 write_half.write_all("start\n".as_bytes()).await.unwrap();
672 write_half.write_all("break\n".as_bytes()).await.unwrap();
673 write_half.write_all("end\n".as_bytes()).await.unwrap();
674
675 loop {
676 write_half
677 .write_all("gibberish\n".as_bytes())
678 .await
679 .unwrap();
680 tokio::time::sleep(Duration::from_millis(50)).await;
681 }
682 });
683
684 let seen = collector.wait().await.unwrap();
685
686 assert_that(seen).contains_exactly(&["start", "break"]);
687 }
688
689 #[tokio::test]
690 async fn collect_lines_to_file() {
691 let (read_half, write_half) = tokio::io::duplex(64);
692 let mut os = SingleSubscriberOutputStream::from_stream(
693 read_half,
694 BackpressureControl::DropLatestIncomingIfBufferFull,
695 FromStreamOptions {
696 channel_capacity: 32,
697 ..Default::default()
698 },
699 );
700
701 let temp_file = tempfile::tempfile().unwrap();
702 let collector = os.collect_lines(
703 temp_file,
704 |line, temp_file| {
705 writeln!(temp_file, "{}", line).unwrap();
706 Next::Continue
707 },
708 LineParsingOptions::default(),
709 );
710
711 tokio::spawn(write_test_data(write_half)).await.unwrap();
712
713 let mut temp_file = collector.cancel().await.unwrap();
714 temp_file.seek(SeekFrom::Start(0)).unwrap();
715 let mut contents = String::new();
716 temp_file.read_to_string(&mut contents).unwrap();
717
718 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
719 }
720
721 #[tokio::test]
722 async fn collect_lines_async_to_file() {
723 let (read_half, write_half) = tokio::io::duplex(64);
724 let mut os = SingleSubscriberOutputStream::from_stream(
725 read_half,
726 BackpressureControl::DropLatestIncomingIfBufferFull,
727 FromStreamOptions {
728 chunk_size: 32,
729 ..Default::default()
730 },
731 );
732
733 let temp_file = tempfile::tempfile().unwrap();
734 let collector = os.collect_lines_async(
735 temp_file,
736 |line, temp_file| {
737 Box::pin(async move {
738 writeln!(temp_file, "{}", line).unwrap();
739 Next::Continue
740 })
741 },
742 LineParsingOptions::default(),
743 );
744
745 tokio::spawn(write_test_data(write_half)).await.unwrap();
746
747 let mut temp_file = collector.cancel().await.unwrap();
748 temp_file.seek(SeekFrom::Start(0)).unwrap();
749 let mut contents = String::new();
750 temp_file.read_to_string(&mut contents).unwrap();
751
752 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
753 }
754
755 #[tokio::test]
756 #[traced_test]
757 async fn collect_chunks_into_write_mapped() {
758 let (read_half, write_half) = tokio::io::duplex(64);
759 let mut os = SingleSubscriberOutputStream::from_stream(
760 read_half,
761 BackpressureControl::DropLatestIncomingIfBufferFull,
762 FromStreamOptions {
763 chunk_size: 32,
764 ..Default::default()
765 },
766 );
767
768 let temp_file = tokio::fs::File::options()
769 .create(true)
770 .truncate(true)
771 .write(true)
772 .read(true)
773 .open(std::env::temp_dir().join(
774 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
775 ))
776 .await
777 .unwrap();
778
779 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
780 String::from_utf8_lossy(chunk.as_ref()).to_string()
781 });
782
783 tokio::spawn(write_test_data(write_half)).await.unwrap();
784
785 let mut temp_file = collector.cancel().await.unwrap();
786 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
787 let mut contents = String::new();
788 temp_file.read_to_string(&mut contents).await.unwrap();
789
790 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
791 }
792
793 #[tokio::test]
794 #[traced_test]
795 async fn multiple_subscribers_are_not_possible() {
796 let (read_half, _write_half) = tokio::io::duplex(64);
797 let mut os = SingleSubscriberOutputStream::from_stream(
798 read_half,
799 BackpressureControl::DropLatestIncomingIfBufferFull,
800 FromStreamOptions::default(),
801 );
802
803 let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
804
805 assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
807 .has_type::<String>()
808 .is_equal_to("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.");
809 }
810}