1use crate::collector::{AsyncCollectFn, Collector, Sink};
2use crate::error::OutputError;
3use crate::inspector::Inspector;
4use crate::output_stream::impls::{
5 impl_collect_chunks, impl_collect_chunks_async, impl_collect_lines, impl_collect_lines_async,
6 impl_inspect_chunks, impl_inspect_lines, impl_inspect_lines_async,
7};
8use crate::output_stream::{
9 BackpressureControl, Chunk, FromStreamOptions, LineReader, Next, OutputStream,
10};
11use crate::{InspectorError, LineParsingOptions, NumBytes};
12use atomic_take::AtomicTake;
13use std::borrow::Cow;
14use std::fmt::{Debug, Formatter};
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
19use tokio::sync::mpsc::error::TrySendError;
20use tokio::sync::{RwLock, mpsc};
21use tokio::task::JoinHandle;
22
23pub struct SingleSubscriberOutputStream {
30 stream_reader: JoinHandle<()>,
33
34 receiver: AtomicTake<mpsc::Receiver<Option<Chunk>>>,
40
41 chunk_size: NumBytes,
43
44 max_channel_capacity: usize,
46
47 name: &'static str,
49}
50
51impl OutputStream for SingleSubscriberOutputStream {
52 fn chunk_size(&self) -> NumBytes {
53 self.chunk_size
54 }
55
56 fn channel_capacity(&self) -> usize {
57 self.max_channel_capacity
58 }
59
60 fn name(&self) -> &'static str {
61 self.name
62 }
63}
64
65impl Drop for SingleSubscriberOutputStream {
66 fn drop(&mut self) {
67 self.stream_reader.abort();
68 }
69}
70
71impl Debug for SingleSubscriberOutputStream {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("SingleSubscriberOutputStream")
74 .field("output_collector", &"non-debug < JoinHandle<()> >")
75 .field(
76 "receiver",
77 &"non-debug < tokio::sync::mpsc::Receiver<Option<Chunk>> >",
78 )
79 .finish()
80 }
81}
82
83async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
87 mut read: R,
88 chunk_size: NumBytes,
89 sender: mpsc::Sender<Option<Chunk>>,
90 backpressure_control: BackpressureControl,
91) {
92 struct AfterSend {
93 do_break: bool,
94 }
95
96 fn try_send_chunk(
97 chunk: Option<Chunk>,
98 sender: &mpsc::Sender<Option<Chunk>>,
99 lagged: &mut usize,
100 ) -> AfterSend {
101 match sender.try_send(chunk) {
102 Ok(()) => {
103 if *lagged > 0 {
104 tracing::debug!(lagged, "Stream reader is lagging behind");
105 *lagged = 0;
106 }
107 }
108 Err(err) => {
109 match err {
110 TrySendError::Full(_data) => {
111 *lagged += 1;
112 }
113 TrySendError::Closed(_data) => {
114 return AfterSend { do_break: true };
119 }
120 }
121 }
122 }
123 AfterSend { do_break: false }
124 }
125
126 async fn send_chunk(chunk: Option<Chunk>, sender: &mpsc::Sender<Option<Chunk>>) -> AfterSend {
127 match sender.send(chunk).await {
128 Ok(()) => {}
129 Err(_err) => {
130 return AfterSend { do_break: true };
135 }
136 }
137 AfterSend { do_break: false }
138 }
139
140 let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
142 let mut lagged: usize = 0;
143 loop {
144 let _ = buf.try_reclaim(chunk_size.bytes());
145 match read.read_buf(&mut buf).await {
146 Ok(bytes_read) => {
147 let is_eof = bytes_read == 0;
148
149 match is_eof {
150 true => match backpressure_control {
151 BackpressureControl::DropLatestIncomingIfBufferFull => {
152 let after = try_send_chunk(None, &sender, &mut lagged);
153 if after.do_break {
154 break;
155 }
156 }
157 BackpressureControl::BlockUntilBufferHasSpace => {
158 let after = send_chunk(None, &sender).await;
159 if after.do_break {
160 break;
161 }
162 }
163 },
164 false => {
165 while !buf.is_empty() {
166 let split_to = usize::min(chunk_size.bytes(), buf.len());
167
168 match backpressure_control {
169 BackpressureControl::DropLatestIncomingIfBufferFull => {
170 let after = try_send_chunk(
171 Some(Chunk(buf.split_to(split_to).freeze())),
172 &sender,
173 &mut lagged,
174 );
175 if after.do_break {
176 break;
177 }
178 }
179 BackpressureControl::BlockUntilBufferHasSpace => {
180 let after = send_chunk(
181 Some(Chunk(buf.split_to(split_to).freeze())),
182 &sender,
183 )
184 .await;
185 if after.do_break {
186 break;
187 }
188 }
189 }
190 }
191 }
192 };
193
194 if is_eof {
195 break;
196 }
197 }
198 Err(err) => panic!("Could not read from stream: {err}"),
199 }
200 }
201}
202
203impl SingleSubscriberOutputStream {
204 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
206 stream: S,
207 stream_name: &'static str,
208 backpressure_control: BackpressureControl,
209 options: FromStreamOptions,
210 ) -> SingleSubscriberOutputStream {
211 let (tx_stdout, rx_stdout) = mpsc::channel::<Option<Chunk>>(options.channel_capacity);
212
213 let stream_reader = tokio::spawn(read_chunked(
214 stream,
215 options.chunk_size,
216 tx_stdout,
217 backpressure_control,
218 ));
219
220 SingleSubscriberOutputStream {
221 stream_reader,
222 receiver: AtomicTake::new(rx_stdout),
223 chunk_size: options.chunk_size,
224 max_channel_capacity: options.channel_capacity,
225 name: stream_name,
226 }
227 }
228
229 fn take_receiver(&self) -> mpsc::Receiver<Option<Chunk>> {
230 self.receiver.take().unwrap_or_else(|| {
231 panic!(
232 "Cannot create multiple consumers on SingleSubscriberOutputStream (stream: '{}'). \
233 Only one inspector or collector can be active at a time. \
234 Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.",
235 self.name
236 )
237 })
238 }
239}
240
241macro_rules! handle_subscription {
245 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
246 $loop_label: loop {
247 tokio::select! {
248 out = $receiver.recv() => {
249 match out {
250 Some(maybe_chunk) => {
251 let $chunk = maybe_chunk;
252 $body
253 }
254 None => {
255 break $loop_label;
257 }
258 }
259 }
260 _msg = &mut $term_rx => break $loop_label,
261 }
262 }
263 };
264}
265
266impl SingleSubscriberOutputStream {
268 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
273 pub fn inspect_chunks(&self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
274 let mut receiver = self.take_receiver();
275 impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
276 }
277
278 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
283 pub fn inspect_lines(
284 &self,
285 mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
286 options: LineParsingOptions,
287 ) -> Inspector {
288 let mut receiver = self.take_receiver();
289 impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
290 }
291
292 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
297 pub fn inspect_lines_async<Fut>(
298 &self,
299 mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
300 options: LineParsingOptions,
301 ) -> Inspector
302 where
303 Fut: Future<Output = Next> + Send,
304 {
305 let mut receiver = self.take_receiver();
306 impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
307 }
308}
309
310impl SingleSubscriberOutputStream {
312 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
316 pub fn collect_chunks<S: Sink>(
317 &self,
318 into: S,
319 collect: impl Fn(Chunk, &mut S) + Send + 'static,
320 ) -> Collector<S> {
321 let sink = Arc::new(RwLock::new(into));
322 let mut receiver = self.take_receiver();
323 impl_collect_chunks!(self.name(), receiver, collect, sink, handle_subscription)
324 }
325
326 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
330 pub fn collect_chunks_async<S, F>(&self, into: S, collect: F) -> Collector<S>
331 where
332 S: Sink,
333 F: Fn(Chunk, &mut S) -> AsyncCollectFn<'_> + Send + 'static,
334 {
335 let sink = Arc::new(RwLock::new(into));
336 let mut receiver = self.take_receiver();
337 impl_collect_chunks_async!(self.name(), receiver, collect, sink, handle_subscription)
338 }
339
340 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
345 pub fn collect_lines<S: Sink>(
346 &self,
347 into: S,
348 collect: impl Fn(Cow<'_, str>, &mut S) -> Next + Send + 'static,
349 options: LineParsingOptions,
350 ) -> Collector<S> {
351 let sink = Arc::new(RwLock::new(into));
352 let mut receiver = self.take_receiver();
353 impl_collect_lines!(
354 self.name(),
355 receiver,
356 collect,
357 options,
358 sink,
359 handle_subscription
360 )
361 }
362
363 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
368 pub fn collect_lines_async<S, F>(
369 &self,
370 into: S,
371 collect: F,
372 options: LineParsingOptions,
373 ) -> Collector<S>
374 where
375 S: Sink,
376 F: for<'a> Fn(Cow<'a, str>, &'a mut S) -> AsyncCollectFn<'a> + Send + Sync + 'static,
377 {
378 let sink = Arc::new(RwLock::new(into));
379 let mut receiver = self.take_receiver();
380 impl_collect_lines_async!(
381 self.name(),
382 receiver,
383 collect,
384 options,
385 sink,
386 handle_subscription
387 )
388 }
389
390 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
392 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
393 self.collect_chunks(Vec::new(), |chunk, vec| vec.extend(chunk.as_ref()))
394 }
395
396 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
398 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
399 self.collect_lines(
400 Vec::new(),
401 |line, vec| {
402 vec.push(line.into_owned());
403 Next::Continue
404 },
405 options,
406 )
407 }
408
409 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
411 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
412 &self,
413 write: W,
414 ) -> Collector<W> {
415 self.collect_chunks_async(write, move |chunk, write| {
416 Box::pin(async move {
417 if let Err(err) = write.write_all(chunk.as_ref()).await {
418 tracing::warn!("Could not write chunk to write sink: {err:#?}");
419 };
420 Next::Continue
421 })
422 })
423 }
424
425 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
427 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
428 &self,
429 write: W,
430 options: LineParsingOptions,
431 ) -> Collector<W> {
432 self.collect_lines_async(
433 write,
434 move |line, write| {
435 Box::pin(async move {
436 if let Err(err) = write.write_all(line.as_bytes()).await {
437 tracing::warn!("Could not write line to write sink: {err:#?}");
438 };
439 Next::Continue
440 })
441 },
442 options,
443 )
444 }
445
446 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
448 pub fn collect_chunks_into_write_mapped<
449 W: Sink + AsyncWriteExt + Unpin,
450 B: AsRef<[u8]> + Send,
451 >(
452 &self,
453 write: W,
454 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
455 ) -> Collector<W> {
456 self.collect_chunks_async(write, move |chunk, write| {
457 Box::pin(async move {
458 let mapped = mapper(chunk);
459 let mapped = mapped.as_ref();
460 if let Err(err) = write.write_all(mapped).await {
461 tracing::warn!("Could not write chunk to write sink: {err:#?}");
462 };
463 Next::Continue
464 })
465 })
466 }
467
468 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
470 pub fn collect_lines_into_write_mapped<
471 W: Sink + AsyncWriteExt + Unpin,
472 B: AsRef<[u8]> + Send,
473 >(
474 &self,
475 write: W,
476 mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
477 options: LineParsingOptions,
478 ) -> Collector<W> {
479 self.collect_lines_async(
480 write,
481 move |line, write| {
482 Box::pin(async move {
483 let mapped = mapper(line);
484 let mapped = mapped.as_ref();
485 if let Err(err) = write.write_all(mapped).await {
486 tracing::warn!("Could not write line to write sink: {err:#?}");
487 };
488 Next::Continue
489 })
490 },
491 options,
492 )
493 }
494}
495
496impl SingleSubscriberOutputStream {
498 pub async fn wait_for_line(
502 &self,
503 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
504 options: LineParsingOptions,
505 ) -> Result<(), InspectorError> {
506 let inspector = self.inspect_lines(
507 move |line| {
508 if predicate(line) {
509 Next::Break
510 } else {
511 Next::Continue
512 }
513 },
514 options,
515 );
516 inspector.wait().await
517 }
518
519 pub async fn wait_for_line_with_timeout(
523 &self,
524 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
525 options: LineParsingOptions,
526 timeout: Duration,
527 ) -> Result<(), OutputError> {
528 tokio::time::timeout(timeout, self.wait_for_line(predicate, options))
529 .await
530 .map_err(|_elapsed| OutputError::Timeout {
531 stream_name: self.name(),
532 timeout,
533 })
534 .and_then(|res| res.map_err(OutputError::InspectorFailed))
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
541 use crate::output_stream::tests::write_test_data;
542 use crate::output_stream::{BackpressureControl, FromStreamOptions, Next};
543 use crate::single_subscriber::read_chunked;
544 use crate::{LineParsingOptions, NumBytesExt};
545 use assertr::prelude::*;
546 use mockall::{automock, predicate};
547 use std::io::{Read, Seek, SeekFrom, Write};
548 use std::time::Duration;
549 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
550 use tokio::sync::mpsc;
551 use tokio::time::sleep;
552 use tracing_test::traced_test;
553
554 #[tokio::test]
555 #[traced_test]
556 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
557 {
558 let (read_half, mut write_half) = tokio::io::duplex(64);
559 let (tx, mut rx) = mpsc::channel(64);
560
561 write_half.write_all(b"hello world").await.unwrap();
568 write_half.flush().await.unwrap();
569
570 let stream_reader = tokio::spawn(read_chunked(
571 read_half,
572 2.bytes(),
573 tx,
574 BackpressureControl::DropLatestIncomingIfBufferFull,
575 ));
576
577 drop(write_half); stream_reader.await.unwrap();
579
580 let mut chunks = Vec::<String>::new();
581 while let Some(Some(chunk)) = rx.recv().await {
582 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
583 }
584 assert_that(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
585 }
586
587 #[tokio::test]
588 #[traced_test]
589 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
590 let (read_half, mut write_half) = tokio::io::duplex(64);
591 let os = SingleSubscriberOutputStream::from_stream(
592 read_half,
593 "custom",
594 BackpressureControl::DropLatestIncomingIfBufferFull,
595 FromStreamOptions {
596 channel_capacity: 2,
597 ..Default::default()
598 },
599 );
600
601 let inspector = os.inspect_lines_async(
602 |_line| async move {
603 sleep(Duration::from_millis(100)).await;
605 Next::Continue
606 },
607 LineParsingOptions::default(),
608 );
609
610 #[rustfmt::skip]
611 let producer = tokio::spawn(async move {
612 for count in 1..=15 {
613 write_half
614 .write_all(format!("{count}\n").as_bytes())
615 .await
616 .unwrap();
617 sleep(Duration::from_millis(25)).await;
618 }
619 });
620
621 producer.await.unwrap();
622 inspector.wait().await.unwrap();
623 drop(os);
624
625 logs_assert(|lines: &[&str]| {
626 match lines
627 .iter()
628 .filter(|line| line.contains("Stream reader is lagging behind lagged=1"))
629 .count()
630 {
631 1 => {}
632 n => return Err(format!("Expected exactly one lagged=1 log, but found {n}")),
633 };
634 match lines
635 .iter()
636 .filter(|line| line.contains("Stream reader is lagging behind lagged=3"))
637 .count()
638 {
639 2 => {}
640 n => return Err(format!("Expected exactly two lagged=3 logs, but found {n}")),
641 };
642 Ok(())
643 });
644 }
645
646 #[tokio::test]
647 async fn inspect_lines() {
648 let (read_half, write_half) = tokio::io::duplex(64);
649 let os = SingleSubscriberOutputStream::from_stream(
650 read_half,
651 "custom",
652 BackpressureControl::DropLatestIncomingIfBufferFull,
653 FromStreamOptions::default(),
654 );
655
656 #[automock]
657 trait LineVisitor {
658 fn visit(&self, line: String);
659 }
660
661 let mut mock = MockLineVisitor::new();
662 #[rustfmt::skip]
663 fn configure(mock: &mut MockLineVisitor) {
664 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
665 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
666 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
667 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
668 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
669 }
670 configure(&mut mock);
671
672 let inspector = os.inspect_lines(
673 move |line| {
674 mock.visit(line.into_owned());
675 Next::Continue
676 },
677 LineParsingOptions::default(),
678 );
679
680 tokio::spawn(write_test_data(write_half)).await.unwrap();
681
682 inspector.cancel().await.unwrap();
683 drop(os)
684 }
685
686 #[tokio::test]
690 #[traced_test]
691 async fn inspect_lines_async() {
692 let (read_half, mut write_half) = tokio::io::duplex(64);
693 let os = SingleSubscriberOutputStream::from_stream(
694 read_half,
695 "custom",
696 BackpressureControl::DropLatestIncomingIfBufferFull,
697 FromStreamOptions {
698 chunk_size: 32.bytes(),
699 ..Default::default()
700 },
701 );
702
703 let seen: Vec<String> = Vec::new();
704 let collector = os.collect_lines_async(
705 seen,
706 move |line, seen: &mut Vec<String>| {
707 Box::pin(async move {
708 if line == "break" {
709 seen.push(line.into_owned());
710 Next::Break
711 } else {
712 seen.push(line.into_owned());
713 Next::Continue
714 }
715 })
716 },
717 LineParsingOptions::default(),
718 );
719
720 let _writer = tokio::spawn(async move {
721 write_half.write_all("start\n".as_bytes()).await.unwrap();
722 write_half.write_all("break\n".as_bytes()).await.unwrap();
723 write_half.write_all("end\n".as_bytes()).await.unwrap();
724
725 loop {
726 write_half
727 .write_all("gibberish\n".as_bytes())
728 .await
729 .unwrap();
730 tokio::time::sleep(Duration::from_millis(50)).await;
731 }
732 });
733
734 let seen = collector.wait().await.unwrap();
735
736 assert_that(seen).contains_exactly(["start", "break"]);
737 }
738
739 #[tokio::test]
740 async fn collect_lines_to_file() {
741 let (read_half, write_half) = tokio::io::duplex(64);
742 let os = SingleSubscriberOutputStream::from_stream(
743 read_half,
744 "custom",
745 BackpressureControl::DropLatestIncomingIfBufferFull,
746 FromStreamOptions {
747 channel_capacity: 32,
748 ..Default::default()
749 },
750 );
751
752 let temp_file = tempfile::tempfile().unwrap();
753 let collector = os.collect_lines(
754 temp_file,
755 |line, temp_file| {
756 writeln!(temp_file, "{}", line).unwrap();
757 Next::Continue
758 },
759 LineParsingOptions::default(),
760 );
761
762 tokio::spawn(write_test_data(write_half)).await.unwrap();
763
764 let mut temp_file = collector.cancel().await.unwrap();
765 temp_file.seek(SeekFrom::Start(0)).unwrap();
766 let mut contents = String::new();
767 temp_file.read_to_string(&mut contents).unwrap();
768
769 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
770 }
771
772 #[tokio::test]
773 async fn collect_lines_async_to_file() {
774 let (read_half, write_half) = tokio::io::duplex(64);
775 let os = SingleSubscriberOutputStream::from_stream(
776 read_half,
777 "custom",
778 BackpressureControl::DropLatestIncomingIfBufferFull,
779 FromStreamOptions {
780 chunk_size: 32.bytes(),
781 ..Default::default()
782 },
783 );
784
785 let temp_file = tempfile::tempfile().unwrap();
786 let collector = os.collect_lines_async(
787 temp_file,
788 |line, temp_file| {
789 Box::pin(async move {
790 writeln!(temp_file, "{}", line).unwrap();
791 Next::Continue
792 })
793 },
794 LineParsingOptions::default(),
795 );
796
797 tokio::spawn(write_test_data(write_half)).await.unwrap();
798
799 let mut temp_file = collector.cancel().await.unwrap();
800 temp_file.seek(SeekFrom::Start(0)).unwrap();
801 let mut contents = String::new();
802 temp_file.read_to_string(&mut contents).unwrap();
803
804 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
805 }
806
807 #[tokio::test]
808 #[traced_test]
809 async fn collect_chunks_into_write_mapped() {
810 let (read_half, write_half) = tokio::io::duplex(64);
811 let os = SingleSubscriberOutputStream::from_stream(
812 read_half,
813 "custom",
814 BackpressureControl::DropLatestIncomingIfBufferFull,
815 FromStreamOptions {
816 chunk_size: 32.bytes(),
817 ..Default::default()
818 },
819 );
820
821 let temp_file = tokio::fs::File::options()
822 .create(true)
823 .truncate(true)
824 .write(true)
825 .read(true)
826 .open(std::env::temp_dir().join(
827 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
828 ))
829 .await
830 .unwrap();
831
832 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
833 String::from_utf8_lossy(chunk.as_ref()).to_string()
834 });
835
836 tokio::spawn(write_test_data(write_half)).await.unwrap();
837
838 let mut temp_file = collector.cancel().await.unwrap();
839 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
840 let mut contents = String::new();
841 temp_file.read_to_string(&mut contents).await.unwrap();
842
843 assert_that(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
844 }
845
846 #[tokio::test]
847 #[traced_test]
848 async fn multiple_subscribers_are_not_possible() {
849 let (read_half, _write_half) = tokio::io::duplex(64);
850 let os = SingleSubscriberOutputStream::from_stream(
851 read_half,
852 "custom",
853 BackpressureControl::DropLatestIncomingIfBufferFull,
854 FromStreamOptions::default(),
855 );
856
857 let _inspector = os.inspect_lines(|_line| Next::Continue, Default::default());
858
859 assert_that_panic_by(move || os.inspect_lines(|_line| Next::Continue, Default::default()))
861 .has_type::<String>()
862 .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'custom'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
863 }
864}