1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
5 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
6};
7use crate::output_stream::{
8 BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
9};
10use crate::{InspectorError, LineParsingOptions};
11use std::fmt::{Debug, Formatter};
12use std::future::Future;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
16use tokio::sync::mpsc::error::TrySendError;
17use tokio::sync::{RwLock, mpsc};
18use tokio::task::JoinHandle;
19use tokio::time::error::Elapsed;
20
21pub struct SingleSubscriberOutputStream {
28 stream_reader: JoinHandle<()>,
31
32 receiver: Option<mpsc::Receiver<Option<Chunk>>>,
35}
36
37impl OutputStream for SingleSubscriberOutputStream {}
38
39impl Drop for SingleSubscriberOutputStream {
40 fn drop(&mut self) {
41 self.stream_reader.abort();
42 }
43}
44
45impl Debug for SingleSubscriberOutputStream {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("SingleSubscriberOutputStream")
48 .field("output_collector", &"non-debug < JoinHandle<()> >")
49 .field(
50 "receiver",
51 &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
52 )
53 .finish()
54 }
55}
56
57async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
61 mut read: R,
62 chunk_size: usize,
63 sender: mpsc::Sender<Option<Chunk>>,
64 backpressure_control: BackpressureControl,
65) {
66 struct AfterSend {
67 do_break: bool,
68 }
69
70 fn try_send_chunk(
71 chunk: Option<Chunk>,
72 sender: &mpsc::Sender<Option<Chunk>>,
73 lagged: &mut usize,
74 ) -> AfterSend {
75 match sender.try_send(chunk) {
76 Ok(()) => {
77 if *lagged > 0 {
78 tracing::debug!(lagged, "Stream reader is lagging behind");
79 *lagged = 0;
80 }
81 }
82 Err(err) => {
83 match err {
84 TrySendError::Full(_data) => {
85 *lagged += 1;
86 }
87 TrySendError::Closed(_data) => {
88 return AfterSend { do_break: true };
93 }
94 }
95 }
96 }
97 AfterSend { do_break: false }
98 }
99
100 async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
101 match sender.send(chunk).await {
102 Ok(()) => {}
103 Err(_err) => {
104 return AfterSend { do_break: true };
109 }
110 }
111 AfterSend { do_break: false }
112 }
113
114 let mut buf = bytes::BytesMut::with_capacity(chunk_size);
116 let mut lagged: usize = 0;
117 loop {
118 let _ = buf.try_reclaim(chunk_size);
119 match read.read_buf(&mut buf).await {
120 Ok(bytes_read) => {
121 let is_eof = bytes_read == 0;
122
123 match is_eof {
124 true => match backpressure_control {
125 BackpressureControl::DropLatestIncomingIfBufferFull => {
126 let after = try_send_chunk(None, &sender, &mut lagged);
127 if after.do_break {
128 break;
129 }
130 }
131 BackpressureControl::BlockUntilBufferHasSpace => {
132 let after = send_chunk(None, &sender).await;
133 if after.do_break {
134 break;
135 }
136 }
137 },
138 false => {
139 while !buf.is_empty() {
140 let split_to = usize::min(chunk_size, buf.len());
141
142 match backpressure_control {
143 BackpressureControl::DropLatestIncomingIfBufferFull => {
144 let after = try_send_chunk(
145 Some(Chunk(buf.split_to(split_to).freeze())),
146 &sender,
147 &mut lagged,
148 );
149 if after.do_break {
150 break;
151 }
152 }
153 BackpressureControl::BlockUntilBufferHasSpace => {
154 let after = send_chunk(
155 Some(Chunk(buf.split_to(split_to).freeze())),
156 &sender,
157 )
158 .await;
159 if after.do_break {
160 break;
161 }
162 }
163 }
164 }
165 }
166 };
167
168 if is_eof {
169 break;
170 }
171 }
172 Err(err) => panic!("Could not read from stream: {err}"),
173 }
174 }
175}
176
177impl SingleSubscriberOutputStream {
178 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
179 stream: S,
180 backpressure_control: BackpressureControl,
181 options: FromStreamOptions,
182 ) -> SingleSubscriberOutputStream {
183 let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
184
185 let stream_reader = tokio::spawn(read_chunked(
186 stream,
187 options.chunk_size,
188 tx_stdout,
189 backpressure_control,
190 ));
191
192 SingleSubscriberOutputStream {
193 stream_reader,
194 receiver: Some(rx_stdout),
195 }
196 }
197
198 fn take_receiver(&mut self) -> mpsc::Receiver<Option<Chunk>> {
199 self.receiver.take().expect("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.")
200 }
201}
202
203macro_rules! handle_subscription {
207 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
208 $loop_label: loop {
209 tokio::select! {
210 out = $receiver.recv() => {
211 match out {
212 Some(maybe_chunk) => {
213 let $chunk = maybe_chunk;
214 $body
215 }
216 None => {
217 break $loop_label;
219 }
220 }
221 }
222 _msg = &mut $term_rx => break $loop_label,
223 }
224 }
225 };
226}
227
228impl SingleSubscriberOutputStream {
230 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
231 pub fn inspect_chunks(&mut self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
232 let mut receiver = self.take_receiver();
233 impl_inspect_chunks!(receiver, f, handle_subscription)
234 }
235
236 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
237 pub fn inspect_lines(
238 &mut self,
239 mut f: impl FnMut(String) -> Next + Send + 'static,
240 options: LineParsingOptions,
241 ) -> Inspector {
242 let mut receiver = self.take_receiver();
243 impl_inspect_lines!(receiver, f, options, handle_subscription)
244 }
245
246 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
247 pub fn inspect_lines_async<Fut>(
248 &mut self,
249 mut f: impl FnMut(String) -> Fut + Send + 'static,
250 options: LineParsingOptions,
251 ) -> Inspector
252 where
253 Fut: Future<Output = Next> + Send,
254 {
255 let mut receiver = self.take_receiver();
256 impl_inspect_lines_async!(receiver, f, options, handle_subscription)
257 }
258}
259
260impl SingleSubscriberOutputStream {
262 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
263 pub fn collect_chunks<S: Sink>(
264 &mut self,
265 into: S,
266 collect: impl Fn(Chunk, &mut S) + Send + 'static,
267 ) -> Collector<S> {
268 let sink = Arc::new(RwLock::new(into));
269 let mut receiver = self.take_receiver();
270 impl_collect_chunks!(receiver, collect, sink, handle_subscription)
271 }
272
273 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
274 pub fn collect_chunks_async<S, F>(&mut self, into: S, collect: F) -> Collector<S>
275 where
276 S: Sink,
277 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
278 {
279 let sink = Arc::new(RwLock::new(into));
280 let mut receiver = self.take_receiver();
281 impl_collect_chunks_async!(receiver, collect, sink, handle_subscription)
282 }
283
284 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
285 pub fn collect_lines<S: Sink>(
286 &mut self,
287 into: S,
288 collect: impl Fn(String, &mut S) -> Next + Send + 'static,
289 options: LineParsingOptions,
290 ) -> Collector<S> {
291 let sink = Arc::new(RwLock::new(into));
292 let mut receiver = self.take_receiver();
293 impl_collect_lines!(receiver, collect, options, sink, handle_subscription)
294 }
295
296 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
297 pub fn collect_lines_async<S, F>(
298 &mut self,
299 into: S,
300 collect: F,
301 options: LineParsingOptions,
302 ) -> Collector<S>
303 where
304 S: Sink,
305 F: Fn(String, &mut S) -> AsyncCollectFn<'_> + Send + Sync + 'static,
306 {
307 let sink = Arc::new(RwLock::new(into));
308 let mut receiver = self.take_receiver();
309 impl_collect_lines_async!(receiver, collect, options, sink, handle_subscription)
310 }
311
312 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
313 pub fn collect_chunks_into_vec(&mut self) -> Collector<Vec<u8>> {
314 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
315 }
316
317 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
318 pub fn collect_lines_into_vec(
319 &mut self,
320 options: LineParsingOptions,
321 ) -> Collector<Vec<String>> {
322 self.collect_lines(
323 Vec::new(),
324 |line, vec| {
325 vec.push(line);
326 Next::Continue
327 },
328 options,
329 )
330 }
331
332 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
333 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
334 &mut self,
335 write: W,
336 ) -> Collector<W> {
337 self.collect_chunks_async(write, move |chunk, write| {
338 Box::pin(async move {
339 if let Err(err) = write.write_all(chunk.as_ref()).await {
340 tracing::warn!("Could not write chunk to write sink: {err:#?}");
341 };
342 Next::Continue
343 })
344 })
345 }
346
347 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
348 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
349 &mut self,
350 write: W,
351 options: LineParsingOptions,
352 ) -> Collector<W> {
353 self.collect_lines_async(
354 write,
355 move |line, write| {
356 Box::pin(async move {
357 if let Err(err) = write.write_all(line.as_bytes()).await {
358 tracing::warn!("Could not write line to write sink: {err:#?}");
359 };
360 Next::Continue
361 })
362 },
363 options,
364 )
365 }
366
367 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
368 pub fn collect_chunks_into_write_mapped<
369 W: Sink + AsyncWriteExt + Unpin,
370 B: AsRef<[u8]> + Send,
371 >(
372 &mut self,
373 write: W,
374 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
375 ) -> Collector<W> {
376 self.collect_chunks_async(write, move |chunk, write| {
377 Box::pin(async move {
378 let mapped = mapper(chunk);
379 let mapped = mapped.as_ref();
380 if let Err(err) = write.write_all(mapped).await {
381 tracing::warn!("Could not write chunk to write sink: {err:#?}");
382 };
383 Next::Continue
384 })
385 })
386 }
387
388 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
389 pub fn collect_lines_into_write_mapped<
390 W: Sink + AsyncWriteExt + Unpin,
391 B: AsRef<[u8]> + Send,
392 >(
393 &mut self,
394 write: W,
395 mapper: impl Fn(String) -> B + Send + Sync + Copy + 'static,
396 options: LineParsingOptions,
397 ) -> Collector<W> {
398 self.collect_lines_async(
399 write,
400 move |line, write| {
401 Box::pin(async move {
402 let mapped = mapper(line);
403 let mapped = mapped.as_ref();
404 if let Err(err) = write.write_all(mapped).await {
405 tracing::warn!("Could not write line to write sink: {err:#?}");
406 };
407 Next::Continue
408 })
409 },
410 options,
411 )
412 }
413}
414
415impl SingleSubscriberOutputStream {
417 pub async fn wait_for_line(
418 &mut self,
419 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
420 options: LineParsingOptions,
421 ) {
422 let inspector = self.inspect_lines(
423 move |line| {
424 if predicate(line) {
425 Next::Break
426 } else {
427 Next::Continue
428 }
429 },
430 options,
431 );
432 match inspector.wait().await {
433 Ok(()) => {}
434 Err(err) => match err {
435 InspectorError::TaskJoin(join_error) => {
436 panic!("Inspector task join error: {join_error:#?}");
437 }
438 },
439 };
440 }
441
442 pub async fn wait_for_line_with_timeout(
443 &mut self,
444 predicate: impl Fn(String) -> bool + Send + Sync + 'static,
445 options: LineParsingOptions,
446 timeout: Duration,
447 ) -> Result<(), Elapsed> {
448 tokio::time::timeout(timeout, self.wait_for_line(predicate, options)).await
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
455 use crate::output_stream::tests::write_test_data;
456 use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
457 use crate::single_subscriber::read_chunked;
458 use crate::LineParsingOptions;
459 use assertr::prelude::*;
460 use mockall::{automock, predicate};
461 use std::io::{Read, Seek, SeekFrom, Write};
462 use std::time::Duration;
463 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
464 use tokio::sync::mpsc;
465 use tokio::time::sleep;
466 use tracing_test::traced_test;
467
468 #[tokio::test]
469 #[traced_test]
470 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
471 {
472 let (read_half, mut write_half) = tokio::io::duplex(64);
473 let (tx, mut rx) = mpsc::channel(64);
474
475 write_half.write_all(b"hello world").await.unwrap();
482 write_half.flush().await.unwrap();
483
484 let stream_reader = tokio::spawn(read_chunked(
485 read_half,
486 2,
487 tx,
488 BackpressureControl::DropLatestIncomingIfBufferFull,
489 ));
490
491 drop(write_half); stream_reader.await.unwrap();
493
494 let mut chunks = Vec::<String>::new();
495 while let Some(Some(chunk)) = rx.recv().await {
496 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
497 }
498 assert_that(chunks).contains_exactly(&["he", "ll", "o ", "wo", "rl", "d"]);
499 }
500
501 #[tokio::test]
502 #[traced_test]
503 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
504 let (read_half, mut write_half) = tokio::io::duplex(64);
505 let mut os = SingleSubscriberOutputStream::from_stream(
506 read_half,
507 BackpressureControl::DropLatestIncomingIfBufferFull,
508 FromStreamOptions {
509 channel_capacity: 2,
510 ..Default::default()
511 },
512 );
513
514 let inspector = os.inspect_lines_async(
515 async |_line| {
516 sleep(Duration::from_millis(100)).await;
518 Next::Continue
519 },
520 LineParsingOptions::default(),
521 );
522
523 #[rustfmt::skip]
524 let producer = tokio::spawn(async move {
525 for count in 1..=15 {
526 write_half
527 .write(format!("{count}\n").as_bytes())
528 .await
529 .unwrap();
530 sleep(Duration::from_millis(25)).await;
531 }
532 });
533
534 producer.await.unwrap();
535 inspector.wait().await.unwrap();
536 drop(os);
537
538 logs_assert(|lines: &[&str]| {
539 match lines
540 .iter()
541 .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
542 .count()
543 {
544 1 => {}
545 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
546 };
547 match lines
548 .iter()
549 .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
550 .count()
551 {
552 2 => {}
553 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
554 };
555 Ok(())
556 });
557 }
558
559 #[tokio::test]
560 async fn inspect_lines() {
561 let (read_half, write_half) = tokio::io::duplex(64);
562 let mut os = SingleSubscriberOutputStream::from_stream(
563 read_half,
564 BackpressureControl::DropLatestIncomingIfBufferFull,
565 FromStreamOptions::default(),
566 );
567
568 #[automock]
569 trait LineVisitor {
570 fn visit(&self, line: String);
571 }
572
573 let mut mock = MockLineVisitor::new();
574 #[rustfmt::skip]
575 fn configure(mock: &mut MockLineVisitor) {
576 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
577 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
578 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
579 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
580 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
581 }
582 configure(&mut mock);
583
584 let inspector = os.inspect_lines(
585 move |line| {
586 mock.visit(line);
587 Next::Continue
588 },
589 LineParsingOptions::default(),
590 );
591
592 tokio::spawn(write_test_data(write_half)).await.unwrap();
593
594 inspector.cancel().await.unwrap();
595 drop(os)
596 }
597
598 #[tokio::test]
602 #[traced_test]
603 async fn inspect_lines_async() {
604 let (read_half, mut write_half) = tokio::io::duplex(64);
605 let mut os = SingleSubscriberOutputStream::from_stream(
606 read_half,
607 BackpressureControl::DropLatestIncomingIfBufferFull,
608 FromStreamOptions {
609 chunk_size: 32,
610 ..Default::default()
611 },
612 );
613
614 let seen: Vec<String> = Vec::new();
615 let collector = os.collect_lines_async(
616 seen,
617 move |line, seen: &mut Vec<String>| {
618 Box::pin(async move {
619 if line == "break" {
620 seen.push(line);
621 Next::Break
622 } else {
623 seen.push(line);
624 Next::Continue
625 }
626 })
627 },
628 LineParsingOptions::default(),
629 );
630
631 let _writer = tokio::spawn(async move {
632 write_half.write_all("start\n".as_bytes()).await.unwrap();
633 write_half.write_all("break\n".as_bytes()).await.unwrap();
634 write_half.write_all("end\n".as_bytes()).await.unwrap();
635
636 loop {
637 write_half
638 .write_all("gibberish\n".as_bytes())
639 .await
640 .unwrap();
641 tokio::time::sleep(Duration::from_millis(50)).await;
642 }
643 });
644
645 let seen = collector.wait().await.unwrap();
646
647 assert_that(seen).contains_exactly(&["start", "break"]);
648 }
649
650 #[tokio::test]
651 async fn collect_lines_to_file() {
652 let (read_half, write_half) = tokio::io::duplex(64);
653 let mut os = SingleSubscriberOutputStream::from_stream(
654 read_half,
655 BackpressureControl::DropLatestIncomingIfBufferFull,
656 FromStreamOptions {
657 channel_capacity: 32,
658 ..Default::default()
659 },
660 );
661
662 let temp_file = tempfile::tempfile().unwrap();
663 let collector = os.collect_lines(
664 temp_file,
665 |line, temp_file| {
666 writeln!(temp_file, "{}", line).unwrap();
667 Next::Continue
668 },
669 LineParsingOptions::default(),
670 );
671
672 tokio::spawn(write_test_data(write_half)).await.unwrap();
673
674 let mut temp_file = collector.cancel().await.unwrap();
675 temp_file.seek(SeekFrom::Start(0)).unwrap();
676 let mut contents = String::new();
677 temp_file.read_to_string(&mut contents).unwrap();
678
679 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
680 }
681
682 #[tokio::test]
683 async fn collect_lines_async_to_file() {
684 let (read_half, write_half) = tokio::io::duplex(64);
685 let mut os = SingleSubscriberOutputStream::from_stream(
686 read_half,
687 BackpressureControl::DropLatestIncomingIfBufferFull,
688 FromStreamOptions {
689 chunk_size: 32,
690 ..Default::default()
691 },
692 );
693
694 let temp_file = tempfile::tempfile().unwrap();
695 let collector = os.collect_lines_async(
696 temp_file,
697 |line, temp_file| {
698 Box::pin(async move {
699 writeln!(temp_file, "{}", line).unwrap();
700 Next::Continue
701 })
702 },
703 LineParsingOptions::default(),
704 );
705
706 tokio::spawn(write_test_data(write_half)).await.unwrap();
707
708 let mut temp_file = collector.cancel().await.unwrap();
709 temp_file.seek(SeekFrom::Start(0)).unwrap();
710 let mut contents = String::new();
711 temp_file.read_to_string(&mut contents).unwrap();
712
713 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
714 }
715
716 #[tokio::test]
717 #[traced_test]
718 async fn collect_chunks_into_write_mapped() {
719 let (read_half, write_half) = tokio::io::duplex(64);
720 let mut os = SingleSubscriberOutputStream::from_stream(
721 read_half,
722 BackpressureControl::DropLatestIncomingIfBufferFull,
723 FromStreamOptions {
724 chunk_size: 32,
725 ..Default::default()
726 },
727 );
728
729 let temp_file = tokio::fs::File::options()
730 .create(true)
731 .truncate(true)
732 .write(true)
733 .read(true)
734 .open(std::env::temp_dir().join(
735 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
736 ))
737 .await
738 .unwrap();
739
740 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
741 String::from_utf8_lossy(chunk.as_ref()).to_string()
742 });
743
744 tokio::spawn(write_test_data(write_half)).await.unwrap();
745
746 let mut temp_file = collector.cancel().await.unwrap();
747 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
748 let mut contents = String::new();
749 temp_file.read_to_string(&mut contents).await.unwrap();
750
751 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
752 }
753
754 #[tokio::test]
755 #[traced_test]
756 async fn multiple_subscribers_are_not_possible() {
757 let (read_half, _write_half) = tokio::io::duplex(64);
758 let mut os = SingleSubscriberOutputStream::from_stream(
759 read_half,
760 BackpressureControl::DropLatestIncomingIfBufferFull,
761 FromStreamOptions::default(),
762 );
763
764 let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
765
766 assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
768 .has_type::<String>()
769 .is_equal_to("Receiver not yet to be taken. The SingleSubscriberOutputStream only supports one subscriber, but one was already created.");
770 }
771}