1use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4 impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
5 impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
6 impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
7 impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
8};
9use crate::output_stream::{
10 BackpressureControl, Chunk, FromStreamOptions, LineWriteMode, Next, OutputStream, StreamEvent,
11};
12use crate::{LineParsingOptions, NumBytes, WaitForLineResult};
13use atomic_take::AtomicTake;
14use bytes::Buf;
15use std::borrow::Cow;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::time::Duration;
19use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
20use tokio::sync::mpsc;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::task::JoinHandle;
23
24pub struct SingleSubscriberOutputStream {
31 stream_reader: JoinHandle<()>,
34
35 receiver: AtomicTake<mpsc::Receiver<StreamEvent>>,
41
42 chunk_size: NumBytes,
44
45 max_channel_capacity: usize,
47
48 backpressure_control: BackpressureControl,
50
51 name: &'static str,
53}
54
55impl OutputStream for SingleSubscriberOutputStream {
56 fn chunk_size(&self) -> NumBytes {
57 self.chunk_size
58 }
59
60 fn channel_capacity(&self) -> usize {
61 self.max_channel_capacity
62 }
63
64 fn name(&self) -> &'static str {
65 self.name
66 }
67}
68
69impl Drop for SingleSubscriberOutputStream {
70 fn drop(&mut self) {
71 self.stream_reader.abort();
72 }
73}
74
75impl Debug for SingleSubscriberOutputStream {
76 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("SingleSubscriberOutputStream")
78 .field("output_collector", &"non-debug < JoinHandle<()> >")
79 .field(
80 "receiver",
81 &"non-debug < tokio::sync::mpsc::Receiver<StreamEvent> >",
82 )
83 .finish()
84 }
85}
86
87#[expect(
91 clippy::too_many_lines,
92 reason = "the stream reader keeps tightly coupled buffer and backpressure state together"
93)]
94async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
95 mut read: R,
96 chunk_size: NumBytes,
97 sender: mpsc::Sender<StreamEvent>,
98 backpressure_control: BackpressureControl,
99) {
100 struct AfterSend {
101 do_break: bool,
102 }
103
104 enum TrySendStatus {
105 Sent,
106 Full,
107 Closed,
108 }
109
110 fn log_if_lagged(lagged: &mut usize) {
111 if *lagged > 0 {
112 tracing::debug!(lagged = *lagged, "Stream reader is lagging behind");
113 *lagged = 0;
114 }
115 }
116
117 fn try_send_gap(sender: &mpsc::Sender<StreamEvent>, lagged: &mut usize) -> TrySendStatus {
118 match sender.try_send(StreamEvent::Gap) {
119 Ok(()) => {
120 log_if_lagged(lagged);
121 TrySendStatus::Sent
122 }
123 Err(TrySendError::Full(_data)) => TrySendStatus::Full,
124 Err(TrySendError::Closed(_data)) => TrySendStatus::Closed,
125 }
126 }
127
128 fn try_send_chunk(
129 chunk: Chunk,
130 sender: &mpsc::Sender<StreamEvent>,
131 lagged: &mut usize,
132 ) -> TrySendStatus {
133 let event = StreamEvent::Chunk(chunk);
134 match sender.try_send(event) {
135 Ok(()) => {
136 log_if_lagged(lagged);
137 TrySendStatus::Sent
138 }
139 Err(TrySendError::Full(_data)) => {
140 *lagged += 1;
141 TrySendStatus::Full
142 }
143 Err(TrySendError::Closed(_data)) => {
144 TrySendStatus::Closed
149 }
150 }
151 }
152
153 async fn send_event(event: StreamEvent, sender: &mpsc::Sender<StreamEvent>) -> AfterSend {
154 match sender.send(event).await {
155 Ok(()) => {}
156 Err(_err) => {
157 return AfterSend { do_break: true };
162 }
163 }
164 AfterSend { do_break: false }
165 }
166
167 let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
169 let mut lagged: usize = 0;
170 let mut gap_pending = false;
171 'outer: loop {
172 let _ = buf.try_reclaim(chunk_size.bytes());
173 match read.read_buf(&mut buf).await {
174 Ok(bytes_read) => {
175 let is_eof = bytes_read == 0;
176
177 if is_eof {
178 match backpressure_control {
179 BackpressureControl::DropLatestIncomingIfBufferFull => {
180 if gap_pending {
181 let after = send_event(StreamEvent::Gap, &sender).await;
182 if after.do_break {
183 break 'outer;
184 }
185 gap_pending = false;
186 }
187 let after = send_event(StreamEvent::Eof, &sender).await;
188 if after.do_break {
189 break 'outer;
190 }
191 }
192 BackpressureControl::BlockUntilBufferHasSpace => {
193 let after = send_event(StreamEvent::Eof, &sender).await;
194 if after.do_break {
195 break 'outer;
196 }
197 }
198 }
199 } else {
200 while !buf.is_empty() {
201 let split_to = usize::min(chunk_size.bytes(), buf.len());
202
203 match backpressure_control {
204 BackpressureControl::DropLatestIncomingIfBufferFull => {
205 if gap_pending {
206 match try_send_gap(&sender, &mut lagged) {
207 TrySendStatus::Sent => {
208 gap_pending = false;
209 }
210 TrySendStatus::Full => {
211 let dropped_chunks = if chunk_size.bytes() == 0 {
212 buf.len()
213 } else {
214 buf.len().div_ceil(chunk_size.bytes())
215 };
216 buf.advance(buf.len());
217 lagged += dropped_chunks;
218 continue;
219 }
220 TrySendStatus::Closed => break 'outer,
221 }
222 }
223
224 let chunk = Chunk(buf.split_to(split_to).freeze());
225 match try_send_chunk(chunk, &sender, &mut lagged) {
226 TrySendStatus::Sent => {}
227 TrySendStatus::Full => {
228 gap_pending = true;
229 }
230 TrySendStatus::Closed => break 'outer,
231 }
232 }
233 BackpressureControl::BlockUntilBufferHasSpace => {
234 let event =
235 StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze()));
236 let after = send_event(event, &sender).await;
237 if after.do_break {
238 break 'outer;
239 }
240 }
241 }
242 }
243 }
244
245 if is_eof {
246 break;
247 }
248 }
249 Err(err) => panic!("Could not read from stream: {err}"),
250 }
251 }
252}
253
254impl SingleSubscriberOutputStream {
255 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
257 stream: S,
258 stream_name: &'static str,
259 backpressure_control: BackpressureControl,
260 options: FromStreamOptions,
261 ) -> SingleSubscriberOutputStream {
262 options.chunk_size.assert_non_zero("options.chunk_size");
263
264 let (tx_stdout, rx_stdout) = mpsc::channel::<StreamEvent>(options.channel_capacity);
265
266 let stream_reader = tokio::spawn(read_chunked(
267 stream,
268 options.chunk_size,
269 tx_stdout,
270 backpressure_control,
271 ));
272
273 SingleSubscriberOutputStream {
274 stream_reader,
275 receiver: AtomicTake::new(rx_stdout),
276 chunk_size: options.chunk_size,
277 max_channel_capacity: options.channel_capacity,
278 backpressure_control,
279 name: stream_name,
280 }
281 }
282
283 pub fn backpressure_control(&self) -> BackpressureControl {
285 self.backpressure_control
286 }
287
288 fn take_receiver(&self) -> mpsc::Receiver<StreamEvent> {
289 self.receiver.take().unwrap_or_else(|| {
290 panic!(
291 "Cannot create multiple consumers on SingleSubscriberOutputStream (stream: '{}'). \
292 Only one inspector or collector can be active at a time. \
293 Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.",
294 self.name
295 )
296 })
297 }
298}
299
300macro_rules! handle_subscription {
304 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
305 $loop_label: loop {
306 tokio::select! {
307 out = $receiver.recv() => {
308 match out {
309 Some(event) => {
310 let $chunk = event;
311 $body
312 }
313 None => {
314 break $loop_label;
316 }
317 }
318 }
319 _msg = &mut $term_rx => break $loop_label,
320 }
321 }
322 };
323}
324
325impl SingleSubscriberOutputStream {
327 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
332 pub fn inspect_chunks(&self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
333 let mut receiver = self.take_receiver();
334 impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
335 }
336
337 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
342 pub fn inspect_lines(
343 &self,
344 mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
345 options: LineParsingOptions,
346 ) -> Inspector {
347 let mut receiver = self.take_receiver();
348 impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
349 }
350
351 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
356 pub fn inspect_lines_async<Fut>(
357 &self,
358 mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
359 options: LineParsingOptions,
360 ) -> Inspector
361 where
362 Fut: Future<Output = Next> + Send,
363 {
364 let mut receiver = self.take_receiver();
365 impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
366 }
367}
368
369impl SingleSubscriberOutputStream {
371 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
375 pub fn collect_chunks<S: Sink>(
376 &self,
377 into: S,
378 collect: impl Fn(Chunk, &mut S) + Send + 'static,
379 ) -> Collector<S> {
380 let mut receiver = self.take_receiver();
381 impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
382 }
383
384 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
411 pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
412 where
413 S: Sink,
414 C: AsyncChunkCollector<S>,
415 {
416 let mut receiver = self.take_receiver();
417 impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
418 }
419
420 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
425 pub fn collect_lines<S: Sink>(
426 &self,
427 into: S,
428 collect: impl Fn(Cow<'_, str>, &mut S) -> Next + Send + 'static,
429 options: LineParsingOptions,
430 ) -> Collector<S> {
431 let mut receiver = self.take_receiver();
432 impl_collect_lines!(
433 self.name(),
434 receiver,
435 collect,
436 options,
437 into,
438 handle_subscription
439 )
440 }
441
442 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
479 pub fn collect_lines_async<S, C>(
480 &self,
481 into: S,
482 collect: C,
483 options: LineParsingOptions,
484 ) -> Collector<S>
485 where
486 S: Sink,
487 C: AsyncLineCollector<S>,
488 {
489 let mut receiver = self.take_receiver();
490 impl_collect_lines_async!(
491 self.name(),
492 receiver,
493 collect,
494 options,
495 into,
496 handle_subscription
497 )
498 }
499
500 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
502 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
503 self.collect_chunks(Vec::new(), |chunk, vec| {
504 vec.extend_from_slice(chunk.as_ref());
505 })
506 }
507
508 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
510 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
511 self.collect_lines(
512 Vec::new(),
513 |line, vec| {
514 vec.push(line.into_owned());
515 Next::Continue
516 },
517 options,
518 )
519 }
520
521 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
523 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
524 &self,
525 write: W,
526 ) -> Collector<W> {
527 let mut receiver = self.take_receiver();
528 impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
529 }
530
531 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
536 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
537 &self,
538 write: W,
539 options: LineParsingOptions,
540 mode: LineWriteMode,
541 ) -> Collector<W> {
542 let mut receiver = self.take_receiver();
543 impl_collect_lines_into_write!(
544 self.name(),
545 receiver,
546 write,
547 options,
548 mode,
549 handle_subscription
550 )
551 }
552
553 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
555 pub fn collect_chunks_into_write_mapped<
556 W: Sink + AsyncWriteExt + Unpin,
557 B: AsRef<[u8]> + Send,
558 >(
559 &self,
560 write: W,
561 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
562 ) -> Collector<W> {
563 let mut receiver = self.take_receiver();
564 impl_collect_chunks_into_write_mapped!(
565 self.name(),
566 receiver,
567 write,
568 mapper,
569 handle_subscription
570 )
571 }
572
573 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
579 pub fn collect_lines_into_write_mapped<
580 W: Sink + AsyncWriteExt + Unpin,
581 B: AsRef<[u8]> + Send,
582 >(
583 &self,
584 write: W,
585 mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
586 options: LineParsingOptions,
587 mode: LineWriteMode,
588 ) -> Collector<W> {
589 let mut receiver = self.take_receiver();
590 impl_collect_lines_into_write_mapped!(
591 self.name(),
592 receiver,
593 write,
594 mapper,
595 options,
596 mode,
597 handle_subscription
598 )
599 }
600}
601
602impl SingleSubscriberOutputStream {
604 async fn wait_for_line_inner(
605 &self,
606 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
607 options: LineParsingOptions,
608 ) -> WaitForLineResult {
609 let mut receiver = self.take_receiver();
610 let mut parser = crate::output_stream::LineParserState::new();
611
612 loop {
613 match receiver.recv().await {
614 Some(StreamEvent::Chunk(chunk)) => {
615 if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
616 if predicate(line) {
617 Next::Break
618 } else {
619 Next::Continue
620 }
621 }) == Next::Break
622 {
623 return WaitForLineResult::Matched;
624 }
625 }
626 Some(StreamEvent::Gap) => {
627 parser.on_gap();
628 }
629 Some(StreamEvent::Eof) | None => {
630 if visit_final_line(&parser, |line| {
631 if predicate(line) {
632 Next::Break
633 } else {
634 Next::Continue
635 }
636 }) == Next::Break
637 {
638 return WaitForLineResult::Matched;
639 }
640 return WaitForLineResult::StreamClosed;
641 }
642 }
643 }
644 }
645
646 pub async fn wait_for_line(
661 &self,
662 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
663 options: LineParsingOptions,
664 ) -> WaitForLineResult {
665 self.wait_for_line_inner(predicate, options).await
666 }
667
668 pub async fn wait_for_line_with_timeout(
684 &self,
685 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
686 options: LineParsingOptions,
687 timeout: Duration,
688 ) -> WaitForLineResult {
689 tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
690 .await
691 .unwrap_or(WaitForLineResult::Timeout)
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use crate::output_stream::Chunk;
698 use crate::output_stream::StreamEvent;
699 use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
700 use crate::output_stream::tests::write_test_data;
701 use crate::output_stream::{BackpressureControl, FromStreamOptions, LineWriteMode, Next};
702 use crate::single_subscriber::read_chunked;
703 use crate::{AsyncChunkCollector, AsyncLineCollector};
704 use crate::{LineParsingOptions, NumBytes, NumBytesExt, WaitForLineResult};
705 use assertr::prelude::*;
706 use atomic_take::AtomicTake;
707 use bytes::Bytes;
708 use mockall::{automock, predicate};
709 use std::borrow::Cow;
710 use std::io::{Cursor, Read, Seek, SeekFrom, Write};
711 use std::time::Duration;
712 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
713 use tokio::sync::mpsc;
714 use tokio::time::sleep;
715 use tracing_test::traced_test;
716
717 struct BreakOnLine;
718
719 impl AsyncLineCollector<Vec<String>> for BreakOnLine {
720 async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
721 if line == "break" {
722 seen.push(line.into_owned());
723 Next::Break
724 } else {
725 seen.push(line.into_owned());
726 Next::Continue
727 }
728 }
729 }
730
731 struct WriteLine;
732
733 impl AsyncLineCollector<std::fs::File> for WriteLine {
734 async fn collect<'a>(
735 &'a mut self,
736 line: Cow<'a, str>,
737 temp_file: &'a mut std::fs::File,
738 ) -> Next {
739 writeln!(temp_file, "{line}").unwrap();
740 Next::Continue
741 }
742 }
743
744 struct ExtendChunks;
745
746 impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
747 async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
748 seen.extend_from_slice(chunk.as_ref());
749 Next::Continue
750 }
751 }
752
753 #[test]
754 #[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
755 fn from_stream_panics_on_zero_chunk_size() {
756 let _stream = SingleSubscriberOutputStream::from_stream(
757 tokio::io::empty(),
758 "custom",
759 BackpressureControl::DropLatestIncomingIfBufferFull,
760 FromStreamOptions {
761 chunk_size: NumBytes::zero(),
762 ..FromStreamOptions::default()
763 },
764 );
765 }
766
767 #[tokio::test]
768 #[traced_test]
769 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
770 {
771 let (read_half, mut write_half) = tokio::io::duplex(64);
772 let (tx, mut rx) = mpsc::channel(64);
773
774 write_half.write_all(b"hello world").await.unwrap();
781 write_half.flush().await.unwrap();
782
783 let stream_reader = tokio::spawn(read_chunked(
784 read_half,
785 2.bytes(),
786 tx,
787 BackpressureControl::DropLatestIncomingIfBufferFull,
788 ));
789
790 drop(write_half); stream_reader.await.unwrap();
792
793 let mut chunks = Vec::<String>::new();
794 while let Some(event) = rx.recv().await {
795 match event {
796 StreamEvent::Chunk(chunk) => {
797 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
798 }
799 StreamEvent::Gap => {}
800 StreamEvent::Eof => break,
801 }
802 }
803 assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
804 }
805
806 #[tokio::test]
807 async fn read_chunked_sends_pending_gap_before_terminal_eof() {
808 let read = Cursor::new(b"aabbcc".to_vec());
809 let (tx, mut rx) = mpsc::channel(1);
810
811 let stream_reader = tokio::spawn(read_chunked(
812 read,
813 2.bytes(),
814 tx,
815 BackpressureControl::DropLatestIncomingIfBufferFull,
816 ));
817
818 match rx.recv().await.unwrap() {
819 StreamEvent::Chunk(chunk) => {
820 assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
821 }
822 other => panic!("expected first chunk, got {other:?}"),
823 }
824 assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
825 assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
826
827 stream_reader.await.unwrap();
828 assert_that!(rx.recv().await).is_none();
829 }
830
831 #[tokio::test]
832 async fn read_chunked_sends_pending_gap_before_resumed_chunk_delivery() {
833 let (read_half, mut write_half) = tokio::io::duplex(64);
834 let (tx, mut rx) = mpsc::channel(2);
835
836 let stream_reader = tokio::spawn(read_chunked(
837 read_half,
838 2.bytes(),
839 tx,
840 BackpressureControl::DropLatestIncomingIfBufferFull,
841 ));
842
843 write_half.write_all(b"aabbcc").await.unwrap();
844 write_half.flush().await.unwrap();
845 sleep(Duration::from_millis(25)).await;
846
847 for expected in [b"aa".as_slice(), b"bb".as_slice()] {
848 match rx.recv().await.unwrap() {
849 StreamEvent::Chunk(chunk) => {
850 assert_that!(chunk.as_ref()).is_equal_to(expected);
851 }
852 other => panic!("expected buffered chunk, got {other:?}"),
853 }
854 }
855
856 write_half.write_all(b"dd").await.unwrap();
857 write_half.flush().await.unwrap();
858 drop(write_half);
859
860 assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
861 match rx.recv().await.unwrap() {
862 StreamEvent::Chunk(chunk) => {
863 assert_that!(chunk.as_ref()).is_equal_to(b"dd".as_slice());
864 }
865 other => panic!("expected resumed chunk, got {other:?}"),
866 }
867 assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
868
869 stream_reader.await.unwrap();
870 assert_that!(rx.recv().await).is_none();
871 }
872
873 #[tokio::test]
874 async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
875 let (read_half, mut write_half) = tokio::io::duplex(64);
876 let os = SingleSubscriberOutputStream::from_stream(
877 read_half,
878 "custom",
879 BackpressureControl::DropLatestIncomingIfBufferFull,
880 FromStreamOptions::default(),
881 );
882
883 let waiter = tokio::spawn(async move {
884 os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
885 .await
886 });
887
888 write_half.write_all(b"booting\nready\n").await.unwrap();
889 write_half.flush().await.unwrap();
890 drop(write_half);
891
892 let result = waiter.await.unwrap();
893 assert_eq!(result, WaitForLineResult::Matched);
894 }
895
896 #[tokio::test]
897 async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
898 let (read_half, mut write_half) = tokio::io::duplex(64);
899 let os = SingleSubscriberOutputStream::from_stream(
900 read_half,
901 "custom",
902 BackpressureControl::DropLatestIncomingIfBufferFull,
903 FromStreamOptions::default(),
904 );
905
906 let waiter = tokio::spawn(async move {
907 os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
908 .await
909 });
910
911 write_half
912 .write_all(b"booting\nstill starting\n")
913 .await
914 .unwrap();
915 write_half.flush().await.unwrap();
916 drop(write_half);
917
918 let result = waiter.await.unwrap();
919 assert_eq!(result, WaitForLineResult::StreamClosed);
920 }
921
922 #[tokio::test]
923 async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
924 let (read_half, mut write_half) = tokio::io::duplex(64);
925 let os = SingleSubscriberOutputStream::from_stream(
926 read_half,
927 "custom",
928 BackpressureControl::DropLatestIncomingIfBufferFull,
929 FromStreamOptions::default(),
930 );
931
932 let waiter = tokio::spawn(async move {
933 os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
934 .await
935 });
936
937 write_half.write_all(b"booting\nready").await.unwrap();
938 write_half.flush().await.unwrap();
939 drop(write_half);
940
941 let result = waiter.await.unwrap();
942 assert_eq!(result, WaitForLineResult::Matched);
943 }
944
945 #[tokio::test]
946 async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
947 let (read_half, _write_half) = tokio::io::duplex(64);
948 let os = SingleSubscriberOutputStream::from_stream(
949 read_half,
950 "custom",
951 BackpressureControl::DropLatestIncomingIfBufferFull,
952 FromStreamOptions::default(),
953 );
954
955 let result = os
956 .wait_for_line_with_timeout(
957 |line| line.contains("ready"),
958 LineParsingOptions::default(),
959 Duration::from_millis(25),
960 )
961 .await;
962
963 assert_eq!(result, WaitForLineResult::Timeout);
964 }
965
966 #[tokio::test]
967 async fn wait_for_line_returns_stream_closed_when_stream_ends_after_writes_without_match() {
968 let (read_half, mut write_half) = tokio::io::duplex(64);
969 let os = SingleSubscriberOutputStream::from_stream(
970 read_half,
971 "custom",
972 BackpressureControl::DropLatestIncomingIfBufferFull,
973 FromStreamOptions::default(),
974 );
975
976 write_half.write_all(b"booting\n").await.unwrap();
977 write_half.flush().await.unwrap();
978 drop(write_half);
979
980 let result = os
984 .wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
985 .await;
986
987 assert_eq!(result, WaitForLineResult::StreamClosed);
988 }
989
990 #[tokio::test]
991 async fn wait_for_line_does_not_match_across_explicit_gap_event() {
992 let (tx, rx) = mpsc::channel::<StreamEvent>(4);
993 let os = SingleSubscriberOutputStream {
994 stream_reader: tokio::spawn(async {}),
995 receiver: AtomicTake::new(rx),
996 chunk_size: 4.bytes(),
997 max_channel_capacity: 4,
998 backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
999 name: "custom",
1000 };
1001
1002 tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
1003 .await
1004 .unwrap();
1005 tx.send(StreamEvent::Gap).await.unwrap();
1006 tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
1007 .await
1008 .unwrap();
1009 tx.send(StreamEvent::Eof).await.unwrap();
1010 drop(tx);
1011
1012 let result = os
1013 .wait_for_line(|line| line == "ready", LineParsingOptions::default())
1014 .await;
1015
1016 assert_eq!(result, WaitForLineResult::StreamClosed);
1017 }
1018
1019 #[tokio::test]
1020 #[traced_test]
1021 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
1022 let (read_half, mut write_half) = tokio::io::duplex(64);
1023 let os = SingleSubscriberOutputStream::from_stream(
1024 read_half,
1025 "custom",
1026 BackpressureControl::DropLatestIncomingIfBufferFull,
1027 FromStreamOptions {
1028 channel_capacity: 2,
1029 ..Default::default()
1030 },
1031 );
1032
1033 let inspector = os.inspect_lines_async(
1034 |_line| async move {
1035 sleep(Duration::from_millis(100)).await;
1037 Next::Continue
1038 },
1039 LineParsingOptions::default(),
1040 );
1041
1042 #[rustfmt::skip]
1043 let producer = tokio::spawn(async move {
1044 for count in 1..=15 {
1045 write_half
1046 .write_all(format!("{count}\n").as_bytes())
1047 .await
1048 .unwrap();
1049 sleep(Duration::from_millis(25)).await;
1050 }
1051 });
1052
1053 producer.await.unwrap();
1054 inspector.wait().await.unwrap();
1055 drop(os);
1056
1057 logs_assert(|lines: &[&str]| {
1058 let lagged_logs = lines
1059 .iter()
1060 .filter(|line| line.contains("Stream reader is lagging behind lagged="))
1061 .count();
1062 if lagged_logs == 0 {
1063 return Err("Expected at least one lagged log".to_string());
1064 }
1065 Ok(())
1066 });
1067 }
1068
1069 #[tokio::test]
1070 async fn inspect_lines() {
1071 #[automock]
1072 trait LineVisitor {
1073 fn visit(&self, line: String);
1074 }
1075
1076 #[rustfmt::skip]
1077 fn configure(mock: &mut MockLineVisitor) {
1078 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
1079 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
1080 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
1081 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
1082 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
1083 }
1084
1085 let (read_half, write_half) = tokio::io::duplex(64);
1086 let os = SingleSubscriberOutputStream::from_stream(
1087 read_half,
1088 "custom",
1089 BackpressureControl::DropLatestIncomingIfBufferFull,
1090 FromStreamOptions::default(),
1091 );
1092
1093 let mut mock = MockLineVisitor::new();
1094 configure(&mut mock);
1095
1096 let inspector = os.inspect_lines(
1097 move |line| {
1098 mock.visit(line.into_owned());
1099 Next::Continue
1100 },
1101 LineParsingOptions::default(),
1102 );
1103
1104 tokio::spawn(write_test_data(write_half)).await.unwrap();
1105
1106 inspector.cancel().await.unwrap();
1107 drop(os);
1108 }
1109
1110 #[tokio::test]
1114 #[traced_test]
1115 async fn inspect_lines_async() {
1116 let (read_half, mut write_half) = tokio::io::duplex(64);
1117 let os = SingleSubscriberOutputStream::from_stream(
1118 read_half,
1119 "custom",
1120 BackpressureControl::DropLatestIncomingIfBufferFull,
1121 FromStreamOptions {
1122 chunk_size: 32.bytes(),
1123 ..Default::default()
1124 },
1125 );
1126
1127 let seen: Vec<String> = Vec::new();
1128 let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
1129
1130 let _writer = tokio::spawn(async move {
1131 write_half.write_all("start\n".as_bytes()).await.unwrap();
1132 write_half.write_all("break\n".as_bytes()).await.unwrap();
1133 write_half.write_all("end\n".as_bytes()).await.unwrap();
1134
1135 loop {
1136 write_half
1137 .write_all("gibberish\n".as_bytes())
1138 .await
1139 .unwrap();
1140 tokio::time::sleep(Duration::from_millis(50)).await;
1141 }
1142 });
1143
1144 let seen = collector.wait().await.unwrap();
1145
1146 assert_that!(seen).contains_exactly(["start", "break"]);
1147 }
1148
1149 #[tokio::test]
1150 async fn collect_chunks_async_into_vec() {
1151 let (read_half, mut write_half) = tokio::io::duplex(64);
1152 let os = SingleSubscriberOutputStream::from_stream(
1153 read_half,
1154 "custom",
1155 BackpressureControl::DropLatestIncomingIfBufferFull,
1156 FromStreamOptions {
1157 chunk_size: 2.bytes(),
1158 ..Default::default()
1159 },
1160 );
1161
1162 let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
1163
1164 write_half.write_all(b"abcdef").await.unwrap();
1165 drop(write_half);
1166
1167 let seen = collector.wait().await.unwrap();
1168 assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1169 }
1170
1171 #[tokio::test]
1172 async fn collect_lines_to_file() {
1173 let (read_half, write_half) = tokio::io::duplex(64);
1174 let os = SingleSubscriberOutputStream::from_stream(
1175 read_half,
1176 "custom",
1177 BackpressureControl::DropLatestIncomingIfBufferFull,
1178 FromStreamOptions {
1179 channel_capacity: 32,
1180 ..Default::default()
1181 },
1182 );
1183
1184 let temp_file = tempfile::tempfile().unwrap();
1185 let collector = os.collect_lines(
1186 temp_file,
1187 |line, temp_file| {
1188 writeln!(temp_file, "{line}").unwrap();
1189 Next::Continue
1190 },
1191 LineParsingOptions::default(),
1192 );
1193
1194 tokio::spawn(write_test_data(write_half)).await.unwrap();
1195
1196 let mut temp_file = collector.cancel().await.unwrap();
1197 temp_file.seek(SeekFrom::Start(0)).unwrap();
1198 let mut contents = String::new();
1199 temp_file.read_to_string(&mut contents).unwrap();
1200
1201 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1202 }
1203
1204 #[tokio::test]
1205 async fn collect_lines_async_to_file() {
1206 let (read_half, write_half) = tokio::io::duplex(64);
1207 let os = SingleSubscriberOutputStream::from_stream(
1208 read_half,
1209 "custom",
1210 BackpressureControl::DropLatestIncomingIfBufferFull,
1211 FromStreamOptions {
1212 chunk_size: 32.bytes(),
1213 ..Default::default()
1214 },
1215 );
1216
1217 let temp_file = tempfile::tempfile().unwrap();
1218 let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
1219
1220 tokio::spawn(write_test_data(write_half)).await.unwrap();
1221
1222 let mut temp_file = collector.cancel().await.unwrap();
1223 temp_file.seek(SeekFrom::Start(0)).unwrap();
1224 let mut contents = String::new();
1225 temp_file.read_to_string(&mut contents).unwrap();
1226
1227 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1228 }
1229
1230 #[tokio::test]
1231 async fn collect_lines_into_write_respects_requested_line_delimiter_mode() {
1232 let (read_half, write_half) = tokio::io::duplex(64);
1233 let os = SingleSubscriberOutputStream::from_stream(
1234 read_half,
1235 "custom",
1236 BackpressureControl::DropLatestIncomingIfBufferFull,
1237 FromStreamOptions::default(),
1238 );
1239
1240 let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
1241 let collector = os.collect_lines_into_write(
1242 temp_file,
1243 LineParsingOptions::default(),
1244 LineWriteMode::AsIs,
1245 );
1246
1247 tokio::spawn(write_test_data(write_half)).await.unwrap();
1248
1249 let mut temp_file = collector.cancel().await.unwrap();
1250 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1251 let mut contents = String::new();
1252 temp_file.read_to_string(&mut contents).await.unwrap();
1253
1254 assert_that!(contents).is_equal_to("Cargo.lockCargo.tomlREADME.mdsrctarget");
1255 }
1256
1257 #[tokio::test]
1258 #[traced_test]
1259 async fn collect_chunks_into_write_mapped() {
1260 let (read_half, write_half) = tokio::io::duplex(64);
1261 let os = SingleSubscriberOutputStream::from_stream(
1262 read_half,
1263 "custom",
1264 BackpressureControl::DropLatestIncomingIfBufferFull,
1265 FromStreamOptions {
1266 chunk_size: 32.bytes(),
1267 ..Default::default()
1268 },
1269 );
1270
1271 let temp_file = tokio::fs::File::options()
1272 .create(true)
1273 .truncate(true)
1274 .write(true)
1275 .read(true)
1276 .open(std::env::temp_dir().join(
1277 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
1278 ))
1279 .await
1280 .unwrap();
1281
1282 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
1283 String::from_utf8_lossy(chunk.as_ref()).to_string()
1284 });
1285
1286 tokio::spawn(write_test_data(write_half)).await.unwrap();
1287
1288 let mut temp_file = collector.cancel().await.unwrap();
1289 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1290 let mut contents = String::new();
1291 temp_file.read_to_string(&mut contents).await.unwrap();
1292
1293 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1294 }
1295
1296 #[tokio::test]
1297 #[traced_test]
1298 async fn multiple_subscribers_are_not_possible() {
1299 let (read_half, _write_half) = tokio::io::duplex(64);
1300 let os = SingleSubscriberOutputStream::from_stream(
1301 read_half,
1302 "custom",
1303 BackpressureControl::DropLatestIncomingIfBufferFull,
1304 FromStreamOptions::default(),
1305 );
1306
1307 let _inspector = os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
1308
1309 assert_that_panic_by(move || {
1311 os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default())
1312 })
1313 .has_type::<String>()
1314 .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'custom'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
1315 }
1316}