1use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
2use crate::inspector::Inspector;
3use crate::output_stream::impls::{
4 impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
5 impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
6 impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
7 impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
8};
9use crate::output_stream::{Chunk, FromStreamOptions, LineWriteMode, Next, StreamEvent};
10use crate::output_stream::{LineParserState, LineParsingOptions, OutputStream};
11use crate::{NumBytes, WaitForLineResult};
12use std::borrow::Cow;
13use std::fmt::{Debug, Formatter};
14use std::future::Future;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
18use tokio::sync::broadcast;
19use tokio::sync::broadcast::error::RecvError;
20use tokio::task::JoinHandle;
21
22pub struct BroadcastOutputStream {
29 stream_reader: JoinHandle<()>,
32
33 sender: broadcast::Sender<StreamEvent>,
37
38 closure_state: Arc<Mutex<ClosureState>>,
43
44 chunk_size: NumBytes,
46
47 max_channel_capacity: usize,
49
50 name: &'static str,
52}
53
54struct ClosureState {
55 closed: bool,
56}
57
58struct Subscription {
59 receiver: broadcast::Receiver<StreamEvent>,
60 emit_terminal_eof: bool,
61}
62
63impl Subscription {
64 async fn recv(&mut self) -> Result<StreamEvent, RecvError> {
65 if self.emit_terminal_eof {
66 self.emit_terminal_eof = false;
67 return Ok(StreamEvent::Eof);
68 }
69
70 self.receiver.recv().await
71 }
72}
73
74impl OutputStream for BroadcastOutputStream {
75 fn chunk_size(&self) -> NumBytes {
76 self.chunk_size
77 }
78
79 fn channel_capacity(&self) -> usize {
80 self.max_channel_capacity
81 }
82
83 fn name(&self) -> &'static str {
84 self.name
85 }
86}
87
88impl Drop for BroadcastOutputStream {
89 fn drop(&mut self) {
90 self.stream_reader.abort();
91 }
92}
93
94impl Debug for BroadcastOutputStream {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("BroadcastOutputStream")
97 .field("output_collector", &"non-debug < JoinHandle<()> >")
98 .field(
99 "sender",
100 &"non-debug < tokio::sync::broadcast::Sender<StreamEvent> >",
101 )
102 .finish()
103 }
104}
105
106async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
110 mut read: B,
111 chunk_size: NumBytes,
112 sender: broadcast::Sender<StreamEvent>,
113 closure_state: Arc<Mutex<ClosureState>>,
114) {
115 let send_chunk = move |event: StreamEvent| {
116 match sender.send(event) {
121 Ok(_received_by) => {}
122 Err(err) => {
123 tracing::debug!(
128 error = %err,
129 "No active receivers for the output chunk, dropping it"
130 );
131 }
132 }
133 };
134
135 let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
137 loop {
138 let _ = buf.try_reclaim(chunk_size.bytes());
139 match read.read_buf(&mut buf).await {
140 Ok(bytes_read) => {
141 let is_eof = bytes_read == 0;
142
143 if is_eof {
144 let mut state = closure_state.lock().expect("closure_state poisoned");
150 state.closed = true;
151 send_chunk(StreamEvent::Eof);
152 } else {
153 while !buf.is_empty() {
154 let split_to = usize::min(chunk_size.bytes(), buf.len());
158 send_chunk(StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze())));
167 }
168 }
169
170 if is_eof {
171 break;
172 }
173 }
174 Err(err) => panic!("Could not read from stream: {err}"),
175 }
176 }
177}
178
179impl BroadcastOutputStream {
180 pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
182 stream: S,
183 stream_name: &'static str,
184 options: FromStreamOptions,
185 ) -> BroadcastOutputStream {
186 options.chunk_size.assert_non_zero("options.chunk_size");
187
188 let (sender, receiver) = broadcast::channel::<StreamEvent>(options.channel_capacity);
189 drop(receiver);
190
191 let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
192
193 let stream_reader = tokio::spawn(read_chunked(
194 stream,
195 options.chunk_size,
196 sender.clone(),
197 Arc::clone(&closure_state),
198 ));
199
200 BroadcastOutputStream {
201 stream_reader,
202 sender,
203 closure_state,
204 chunk_size: options.chunk_size,
205 max_channel_capacity: options.channel_capacity,
206 name: stream_name,
207 }
208 }
209
210 fn subscribe(&self) -> Subscription {
211 let (receiver, emit_terminal_eof) = {
212 let state = self.closure_state.lock().expect("closure_state poisoned");
213 let receiver = self.sender.subscribe();
214
215 (receiver, state.closed)
220 };
221
222 Subscription {
223 receiver,
224 emit_terminal_eof,
225 }
226 }
227}
228
229macro_rules! handle_subscription {
234 ($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
235 $loop_label: loop {
236 tokio::select! {
237 out = $receiver.recv() => {
238 match out {
239 Ok(event) => {
240 let $chunk = event;
241 $body
242 }
243 Err(RecvError::Closed) => {
244 break $loop_label;
246 },
247 Err(RecvError::Lagged(lagged)) => {
248 tracing::warn!(lagged, "Inspector is lagging behind");
249 let $chunk = StreamEvent::Gap;
250 $body
251 }
252 }
253 }
254 _msg = &mut $term_rx => break $loop_label,
255 }
256 }
257 };
258}
259
260impl BroadcastOutputStream {
262 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
267 pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
268 let mut receiver = self.subscribe();
269 impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
270 }
271
272 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
277 pub fn inspect_lines(
278 &self,
279 mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
280 options: LineParsingOptions,
281 ) -> Inspector {
282 let mut receiver = self.subscribe();
283 impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
284 }
285
286 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
291 pub fn inspect_lines_async<Fut>(
292 &self,
293 mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
294 options: LineParsingOptions,
295 ) -> Inspector
296 where
297 Fut: Future<Output = Next> + Send,
298 {
299 let mut receiver = self.subscribe();
300 impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
301 }
302}
303
304impl BroadcastOutputStream {
306 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
310 pub fn collect_chunks<S: Sink>(
311 &self,
312 into: S,
313 mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
314 ) -> Collector<S> {
315 let mut receiver = self.subscribe();
316 impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
317 }
318
319 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
346 pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
347 where
348 S: Sink,
349 C: AsyncChunkCollector<S>,
350 {
351 let mut receiver = self.subscribe();
352 impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
353 }
354
355 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
360 pub fn collect_lines<S: Sink>(
361 &self,
362 into: S,
363 mut collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
364 options: LineParsingOptions,
365 ) -> Collector<S> {
366 let mut receiver = self.subscribe();
367 impl_collect_lines!(
368 self.name(),
369 receiver,
370 collect,
371 options,
372 into,
373 handle_subscription
374 )
375 }
376
377 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
414 pub fn collect_lines_async<S, C>(
415 &self,
416 into: S,
417 collect: C,
418 options: LineParsingOptions,
419 ) -> Collector<S>
420 where
421 S: Sink,
422 C: AsyncLineCollector<S>,
423 {
424 let mut receiver = self.subscribe();
425 impl_collect_lines_async!(
426 self.name(),
427 receiver,
428 collect,
429 options,
430 into,
431 handle_subscription
432 )
433 }
434
435 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
437 pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
438 self.collect_chunks(Vec::new(), |chunk, vec| {
439 vec.extend_from_slice(chunk.as_ref());
440 })
441 }
442
443 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
445 pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
446 self.collect_lines(
447 Vec::new(),
448 |line, vec| {
449 vec.push(line.into_owned());
450 Next::Continue
451 },
452 options,
453 )
454 }
455
456 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
458 pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
459 &self,
460 write: W,
461 ) -> Collector<W> {
462 let mut receiver = self.subscribe();
463 impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
464 }
465
466 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
471 pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
472 &self,
473 write: W,
474 options: LineParsingOptions,
475 mode: LineWriteMode,
476 ) -> Collector<W> {
477 let mut receiver = self.subscribe();
478 impl_collect_lines_into_write!(
479 self.name(),
480 receiver,
481 write,
482 options,
483 mode,
484 handle_subscription
485 )
486 }
487
488 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
490 pub fn collect_chunks_into_write_mapped<
491 W: Sink + AsyncWriteExt + Unpin,
492 B: AsRef<[u8]> + Send,
493 >(
494 &self,
495 write: W,
496 mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
497 ) -> Collector<W> {
498 let mut receiver = self.subscribe();
499 impl_collect_chunks_into_write_mapped!(
500 self.name(),
501 receiver,
502 write,
503 mapper,
504 handle_subscription
505 )
506 }
507
508 #[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
514 pub fn collect_lines_into_write_mapped<
515 W: Sink + AsyncWriteExt + Unpin,
516 B: AsRef<[u8]> + Send,
517 >(
518 &self,
519 write: W,
520 mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
521 options: LineParsingOptions,
522 mode: LineWriteMode,
523 ) -> Collector<W> {
524 let mut receiver = self.subscribe();
525 impl_collect_lines_into_write_mapped!(
526 self.name(),
527 receiver,
528 write,
529 mapper,
530 options,
531 mode,
532 handle_subscription
533 )
534 }
535}
536
537impl BroadcastOutputStream {
539 async fn wait_for_line_inner(
540 &self,
541 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
542 options: LineParsingOptions,
543 ) -> WaitForLineResult {
544 let mut receiver = self.subscribe();
545 let mut parser = LineParserState::new();
546
547 loop {
548 match receiver.recv().await {
549 Ok(StreamEvent::Chunk(chunk)) => {
550 if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
551 if predicate(line) {
552 Next::Break
553 } else {
554 Next::Continue
555 }
556 }) == Next::Break
557 {
558 return WaitForLineResult::Matched;
559 }
560 }
561 Ok(StreamEvent::Gap) => {
562 parser.on_gap();
563 }
564 Ok(StreamEvent::Eof) | Err(RecvError::Closed) => {
565 if visit_final_line(&parser, |line| {
566 if predicate(line) {
567 Next::Break
568 } else {
569 Next::Continue
570 }
571 }) == Next::Break
572 {
573 return WaitForLineResult::Matched;
574 }
575 return WaitForLineResult::StreamClosed;
576 }
577 Err(RecvError::Lagged(lagged)) => {
578 tracing::warn!(lagged, "Waiter is lagging behind");
579 parser.on_gap();
580 }
581 }
582 }
583 }
584
585 pub async fn wait_for_line(
596 &self,
597 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
598 options: LineParsingOptions,
599 ) -> WaitForLineResult {
600 self.wait_for_line_inner(predicate, options).await
601 }
602
603 pub async fn wait_for_line_with_timeout(
616 &self,
617 predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
618 options: LineParsingOptions,
619 timeout: Duration,
620 ) -> WaitForLineResult {
621 tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
622 .await
623 .unwrap_or(WaitForLineResult::Timeout)
624 }
625}
626
627pub struct LineConfig {
629 pub max_line_length: usize,
635}
636
637#[cfg(test)]
638mod tests {
639 use super::{ClosureState, read_chunked};
640 use crate::WaitForLineResult;
641 use crate::output_stream::broadcast::BroadcastOutputStream;
642 use crate::output_stream::tests::write_test_data;
643 use crate::output_stream::{Chunk, StreamEvent};
644 use crate::output_stream::{FromStreamOptions, LineParsingOptions, LineWriteMode, Next};
645 use crate::{AsyncChunkCollector, AsyncLineCollector};
646 use crate::{NumBytes, NumBytesExt};
647 use assertr::prelude::*;
648 use bytes::Bytes;
649 use mockall::*;
650 use std::borrow::Cow;
651 use std::future::poll_fn;
652 use std::io::Read;
653 use std::io::Seek;
654 use std::io::SeekFrom;
655 use std::io::Write;
656 use std::pin::pin;
657 use std::sync::{Arc, Mutex};
658 use std::task::Poll;
659 use std::time::Duration;
660 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
661 use tokio::sync::broadcast;
662 use tokio::sync::broadcast::error::RecvError;
663 use tokio::time::sleep;
664 use tracing_test::traced_test;
665
666 struct BreakOnLine;
667
668 impl AsyncLineCollector<Vec<String>> for BreakOnLine {
669 async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
670 if line == "break" {
671 seen.push(line.into_owned());
672 Next::Break
673 } else {
674 seen.push(line.into_owned());
675 Next::Continue
676 }
677 }
678 }
679
680 struct WriteLine;
681
682 impl AsyncLineCollector<std::fs::File> for WriteLine {
683 async fn collect<'a>(
684 &'a mut self,
685 line: Cow<'a, str>,
686 temp_file: &'a mut std::fs::File,
687 ) -> Next {
688 writeln!(temp_file, "{line}").unwrap();
689 Next::Continue
690 }
691 }
692
693 struct ExtendChunks;
694
695 impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
696 async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
697 seen.extend_from_slice(chunk.as_ref());
698 Next::Continue
699 }
700 }
701
702 #[test]
703 #[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
704 fn from_stream_panics_on_zero_chunk_size() {
705 let _stream = BroadcastOutputStream::from_stream(
706 tokio::io::empty(),
707 "custom",
708 FromStreamOptions {
709 chunk_size: NumBytes::zero(),
710 ..FromStreamOptions::default()
711 },
712 );
713 }
714
715 #[tokio::test]
716 #[traced_test]
717 async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
718 {
719 let (read_half, mut write_half) = tokio::io::duplex(64);
720 let (tx, mut rx) = broadcast::channel(10);
721
722 write_half.write_all(b"hello world").await.unwrap();
729 write_half.flush().await.unwrap();
730
731 let stream_reader = tokio::spawn(read_chunked(
732 read_half,
733 2.bytes(),
734 tx,
735 Arc::new(Mutex::new(ClosureState { closed: false })),
736 ));
737
738 drop(write_half); stream_reader.await.unwrap();
740
741 let mut chunks = Vec::<String>::new();
742 while let Ok(event) = rx.recv().await {
743 match event {
744 StreamEvent::Chunk(chunk) => {
745 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
746 }
747 StreamEvent::Gap => {}
748 StreamEvent::Eof => break,
749 }
750 }
751 assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
752 }
753
754 #[tokio::test]
755 #[traced_test]
756 async fn read_chunked_no_data() {
757 let (read_half, write_half) = tokio::io::duplex(64);
758 let (tx, mut rx) = broadcast::channel(10);
759
760 let stream_reader = tokio::spawn(read_chunked(
761 read_half,
762 2.bytes(),
763 tx,
764 Arc::new(Mutex::new(ClosureState { closed: false })),
765 ));
766
767 drop(write_half); stream_reader.await.unwrap();
769
770 let mut chunks = Vec::<String>::new();
771 while let Ok(event) = rx.recv().await {
772 match event {
773 StreamEvent::Chunk(chunk) => {
774 chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
775 }
776 StreamEvent::Gap => {}
777 StreamEvent::Eof => break,
778 }
779 }
780 assert_that!(chunks).is_empty();
781 }
782
783 #[tokio::test]
784 async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
785 let (read_half, mut write_half) = tokio::io::duplex(64);
786 let os =
787 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
788
789 let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
790 let mut waiter = pin!(waiter);
791
792 poll_fn(|cx| {
797 let _ = waiter.as_mut().poll(cx);
798 Poll::Ready(())
799 })
800 .await;
801
802 write_half.write_all(b"booting\nready\n").await.unwrap();
803 write_half.flush().await.unwrap();
804 drop(write_half);
805
806 let result = waiter.await;
807 assert_that!(result).is_equal_to(WaitForLineResult::Matched);
808 }
809
810 #[tokio::test]
811 async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
812 let (read_half, mut write_half) = tokio::io::duplex(64);
813 let os =
814 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
815
816 let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
817 let mut waiter = pin!(waiter);
818
819 poll_fn(|cx| {
822 let _ = waiter.as_mut().poll(cx);
823 Poll::Ready(())
824 })
825 .await;
826
827 write_half
828 .write_all(b"booting\nstill starting\n")
829 .await
830 .unwrap();
831 write_half.flush().await.unwrap();
832 drop(write_half);
833
834 let result = waiter.await;
835 assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
836 }
837
838 #[tokio::test]
839 async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
840 let (read_half, mut write_half) = tokio::io::duplex(64);
841 let os =
842 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
843
844 let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
845 let mut waiter = pin!(waiter);
846
847 poll_fn(|cx| {
850 let _ = waiter.as_mut().poll(cx);
851 Poll::Ready(())
852 })
853 .await;
854
855 write_half.write_all(b"booting\nready").await.unwrap();
856 write_half.flush().await.unwrap();
857 drop(write_half);
858
859 let result = waiter.await;
860 assert_that!(result).is_equal_to(WaitForLineResult::Matched);
861 }
862
863 #[tokio::test]
864 async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
865 let (read_half, _write_half) = tokio::io::duplex(64);
866 let os =
867 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
868
869 let result = os
870 .wait_for_line_with_timeout(
871 |line| line.contains("ready"),
872 LineParsingOptions::default(),
873 Duration::from_millis(25),
874 )
875 .await;
876
877 assert_that!(result).is_equal_to(WaitForLineResult::Timeout);
878 }
879
880 #[tokio::test]
881 async fn wait_for_line_returns_stream_closed_for_late_subscriber_after_eof() {
882 let (read_half, mut write_half) = tokio::io::duplex(64);
883 let os =
884 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
885
886 write_half.write_all(b"booting\n").await.unwrap();
887 write_half.flush().await.unwrap();
888 drop(write_half);
889
890 while !os
894 .closure_state
895 .lock()
896 .expect("closure_state poisoned")
897 .closed
898 {
899 tokio::task::yield_now().await;
900 }
901
902 let result = os
903 .wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
904 .await;
905
906 assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
907 }
908
909 #[tokio::test]
910 async fn late_subscriber_after_eof_does_not_disturb_existing_subscribers() {
911 let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
912 drop(receiver);
913
914 let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
915 let os = BroadcastOutputStream {
916 stream_reader: tokio::spawn(async {}),
917 sender: sender.clone(),
918 closure_state: Arc::clone(&closure_state),
919 chunk_size: 4.bytes(),
920 max_channel_capacity: 2,
921 name: "custom",
922 };
923
924 let mut existing = os.subscribe();
925
926 sender
927 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\n"))))
928 .unwrap();
929 sender
930 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))))
931 .unwrap();
932 sender.send(StreamEvent::Eof).unwrap();
933 closure_state.lock().expect("closure_state poisoned").closed = true;
934
935 let mut late = os.subscribe();
936 assert_that!(late.recv().await)
937 .is_ok()
938 .is_equal_to(StreamEvent::Eof);
939
940 assert_that!(existing.recv().await)
941 .is_err()
942 .is_equal_to(RecvError::Lagged(1));
943 let chunk = existing.recv().await.unwrap();
944 assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))));
945 assert_that!(existing.recv().await)
946 .is_ok()
947 .is_equal_to(StreamEvent::Eof);
948 }
949
950 #[tokio::test]
951 async fn subscriber_created_before_closure_receives_tail_data_before_terminal_eof() {
952 let (sender, receiver) = broadcast::channel::<StreamEvent>(4);
953 drop(receiver);
954
955 let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
956 let os = BroadcastOutputStream {
957 stream_reader: tokio::spawn(async {}),
958 sender: sender.clone(),
959 closure_state: Arc::clone(&closure_state),
960 chunk_size: 4.bytes(),
961 max_channel_capacity: 4,
962 name: "custom",
963 };
964
965 let mut subscriber = os.subscribe();
966
967 sender
968 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))))
969 .unwrap();
970 {
971 let mut state = closure_state.lock().expect("closure_state poisoned");
972 state.closed = true;
973 sender.send(StreamEvent::Eof).unwrap();
974 }
975
976 let chunk = subscriber.recv().await.unwrap();
977 assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))));
978 assert_that!(subscriber.recv().await)
979 .is_ok()
980 .is_equal_to(StreamEvent::Eof);
981 }
982
983 #[tokio::test]
984 async fn wait_for_line_does_not_match_across_lag_gap() {
985 let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
986 drop(receiver);
987
988 let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
989 let os = BroadcastOutputStream {
990 stream_reader: tokio::spawn(async {}),
991 sender: sender.clone(),
992 closure_state: Arc::clone(&closure_state),
993 chunk_size: 4.bytes(),
994 max_channel_capacity: 2,
995 name: "custom",
996 };
997
998 let waiter = os.wait_for_line(|line| line == "ready", LineParsingOptions::default());
999 let mut waiter = pin!(waiter);
1000
1001 poll_fn(|cx| {
1002 let _ = waiter.as_mut().poll(cx);
1003 Poll::Ready(())
1004 })
1005 .await;
1006
1007 sender
1008 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
1009 .unwrap();
1010 sender
1011 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"lost"))))
1012 .unwrap();
1013 sender
1014 .send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
1015 .unwrap();
1016 {
1017 let mut state = closure_state.lock().expect("closure_state poisoned");
1018 state.closed = true;
1019 }
1020 sender.send(StreamEvent::Eof).unwrap();
1021
1022 assert_that!(waiter.await).is_equal_to(WaitForLineResult::StreamClosed);
1023 }
1024
1025 #[tokio::test]
1026 async fn collect_lines_into_write_appends_requested_line_delimiter() {
1027 let (read_half, write_half) = tokio::io::duplex(64);
1028 let os =
1029 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
1030
1031 let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
1032 let collector = os.collect_lines_into_write(
1033 temp_file,
1034 LineParsingOptions::default(),
1035 LineWriteMode::AppendLf,
1036 );
1037
1038 tokio::spawn(write_test_data(write_half)).await.unwrap();
1039
1040 let mut temp_file = collector.cancel().await.unwrap();
1041 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1042 let mut contents = String::new();
1043 temp_file.read_to_string(&mut contents).await.unwrap();
1044
1045 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1046 }
1047
1048 #[tokio::test]
1049 #[traced_test]
1050 async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
1051 let (read_half, mut write_half) = tokio::io::duplex(64);
1052 let os = BroadcastOutputStream::from_stream(
1053 read_half,
1054 "custom",
1055 FromStreamOptions {
1056 channel_capacity: 2,
1057 ..Default::default()
1058 },
1059 );
1060
1061 let consumer = os.inspect_lines_async(
1062 |_line| async move {
1063 sleep(Duration::from_millis(100)).await;
1065 Next::Continue
1066 },
1067 LineParsingOptions::default(),
1068 );
1069
1070 #[rustfmt::skip]
1071 let producer = tokio::spawn(async move {
1072 for count in 1..=15 {
1073 write_half
1074 .write_all(format!("{count}\n").as_bytes())
1075 .await
1076 .unwrap();
1077 sleep(Duration::from_millis(25)).await;
1078 }
1079 write_half.flush().await.unwrap();
1080 drop(write_half);
1081 });
1082
1083 producer.await.unwrap();
1084 consumer.wait().await.unwrap();
1085 drop(os);
1086
1087 logs_assert(|lines: &[&str]| {
1088 let lagged_logs = lines
1089 .iter()
1090 .filter(|line| line.contains("Inspector is lagging behind lagged="))
1091 .count();
1092 if lagged_logs == 0 {
1093 return Err("Expected at least one lagged log".to_string());
1094 }
1095 Ok(())
1096 });
1097 }
1098
1099 #[tokio::test]
1100 async fn inspect_lines() {
1101 #[automock]
1102 trait LineVisitor {
1103 fn visit(&self, line: String);
1104 }
1105
1106 #[rustfmt::skip]
1107 fn configure(mock: &mut MockLineVisitor) {
1108 mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
1109 mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
1110 mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
1111 mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
1112 mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
1113 }
1114
1115 let (read_half, write_half) = tokio::io::duplex(64);
1116 let os =
1117 BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
1118
1119 let mut mock = MockLineVisitor::new();
1120 configure(&mut mock);
1121
1122 let inspector = os.inspect_lines(
1123 move |line| {
1124 mock.visit(line.into_owned());
1125 Next::Continue
1126 },
1127 LineParsingOptions::default(),
1128 );
1129
1130 tokio::spawn(write_test_data(write_half)).await.unwrap();
1131
1132 inspector.cancel().await.unwrap();
1133 drop(os);
1134 }
1135
1136 #[tokio::test]
1140 #[traced_test]
1141 async fn inspect_lines_async() {
1142 let (read_half, mut write_half) = tokio::io::duplex(64);
1143 let os = BroadcastOutputStream::from_stream(
1144 read_half,
1145 "custom",
1146 FromStreamOptions {
1147 chunk_size: 32.bytes(),
1148 ..Default::default()
1149 },
1150 );
1151
1152 let seen: Vec<String> = Vec::new();
1153 let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
1154
1155 let _writer = tokio::spawn(async move {
1156 write_half.write_all("start\n".as_bytes()).await.unwrap();
1157 write_half.write_all("break\n".as_bytes()).await.unwrap();
1158 write_half.write_all("end\n".as_bytes()).await.unwrap();
1159
1160 loop {
1161 write_half
1162 .write_all("gibberish\n".as_bytes())
1163 .await
1164 .unwrap();
1165 tokio::time::sleep(Duration::from_millis(50)).await;
1166 }
1167 });
1168
1169 let seen = collector.wait().await.unwrap();
1170
1171 assert_that!(seen).contains_exactly(["start", "break"]);
1172 }
1173
1174 #[tokio::test]
1175 async fn collect_chunks_async_into_vec() {
1176 let (read_half, mut write_half) = tokio::io::duplex(64);
1177 let os = BroadcastOutputStream::from_stream(
1178 read_half,
1179 "custom",
1180 FromStreamOptions {
1181 chunk_size: 2.bytes(),
1182 ..Default::default()
1183 },
1184 );
1185
1186 let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
1187
1188 write_half.write_all(b"abcdef").await.unwrap();
1189 drop(write_half);
1190
1191 let seen = collector.wait().await.unwrap();
1192 assert_that!(seen).is_equal_to(b"abcdef".to_vec());
1193 }
1194
1195 #[tokio::test]
1196 async fn collect_lines_to_file() {
1197 let (read_half, write_half) = tokio::io::duplex(64);
1198 let os = BroadcastOutputStream::from_stream(
1199 read_half,
1200 "custom",
1201 FromStreamOptions {
1202 channel_capacity: 32,
1203 ..Default::default()
1204 },
1205 );
1206
1207 let temp_file = tempfile::tempfile().unwrap();
1208 let collector = os.collect_lines(
1209 temp_file,
1210 |line, temp_file| {
1211 writeln!(temp_file, "{line}").unwrap();
1212 Next::Continue
1213 },
1214 LineParsingOptions::default(),
1215 );
1216
1217 tokio::spawn(write_test_data(write_half)).await.unwrap();
1218
1219 let mut temp_file = collector.cancel().await.unwrap();
1220 temp_file.seek(SeekFrom::Start(0)).unwrap();
1221 let mut contents = String::new();
1222 temp_file.read_to_string(&mut contents).unwrap();
1223
1224 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1225 }
1226
1227 #[tokio::test]
1228 async fn collect_lines_async_to_file() {
1229 let (read_half, write_half) = tokio::io::duplex(64);
1230 let os = BroadcastOutputStream::from_stream(
1231 read_half,
1232 "custom",
1233 FromStreamOptions {
1234 chunk_size: 32.bytes(),
1235 ..Default::default()
1236 },
1237 );
1238
1239 let temp_file = tempfile::tempfile().unwrap();
1240 let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
1241
1242 tokio::spawn(write_test_data(write_half)).await.unwrap();
1243
1244 let mut temp_file = collector.cancel().await.unwrap();
1245 temp_file.seek(SeekFrom::Start(0)).unwrap();
1246 let mut contents = String::new();
1247 temp_file.read_to_string(&mut contents).unwrap();
1248
1249 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1250 }
1251
1252 #[tokio::test]
1253 #[traced_test]
1254 async fn collect_chunks_into_write_mapped() {
1255 let (read_half, write_half) = tokio::io::duplex(64);
1256 let os = BroadcastOutputStream::from_stream(
1257 read_half,
1258 "custom",
1259 FromStreamOptions {
1260 chunk_size: 32.bytes(),
1261 ..Default::default()
1262 },
1263 );
1264
1265 let temp_file = tokio::fs::File::options()
1266 .create(true)
1267 .truncate(true)
1268 .write(true)
1269 .read(true)
1270 .open(std::env::temp_dir().join(
1271 "tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
1272 ))
1273 .await
1274 .unwrap();
1275
1276 let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
1277 String::from_utf8_lossy(chunk.as_ref()).to_string()
1278 });
1279
1280 tokio::spawn(write_test_data(write_half)).await.unwrap();
1281
1282 let mut temp_file = collector.cancel().await.unwrap();
1283 temp_file.seek(SeekFrom::Start(0)).await.unwrap();
1284 let mut contents = String::new();
1285 temp_file.read_to_string(&mut contents).await.unwrap();
1286
1287 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1288 }
1289
1290 #[tokio::test]
1291 #[traced_test]
1292 async fn collect_chunks_into_write_in_parallel() {
1293 let (read_half, write_half) = tokio::io::duplex(64);
1295
1296 let os = BroadcastOutputStream::from_stream(
1297 read_half,
1298 "custom",
1299 FromStreamOptions {
1300 chunk_size: 32.bytes(),
1303 channel_capacity: 2,
1304 },
1305 );
1306
1307 let file1 = tokio::fs::File::options()
1308 .create(true)
1309 .truncate(true)
1310 .write(true)
1311 .read(true)
1312 .open(
1313 std::env::temp_dir()
1314 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
1315 )
1316 .await
1317 .unwrap();
1318 let file2 = tokio::fs::File::options()
1319 .create(true)
1320 .truncate(true)
1321 .write(true)
1322 .read(true)
1323 .open(
1324 std::env::temp_dir()
1325 .join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
1326 )
1327 .await
1328 .unwrap();
1329
1330 let collector1 = os.collect_chunks_into_write(file1);
1331 let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
1332 format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
1333 });
1334
1335 tokio::spawn(write_test_data(write_half)).await.unwrap();
1336
1337 let mut temp_file1 = collector1.cancel().await.unwrap();
1338 temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
1339 let mut contents = String::new();
1340 temp_file1.read_to_string(&mut contents).await.unwrap();
1341 assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
1342
1343 let mut temp_file2 = collector2.cancel().await.unwrap();
1344 temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
1345 let mut contents = String::new();
1346 temp_file2.read_to_string(&mut contents).await.unwrap();
1347 assert_that!(contents)
1348 .is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
1349 }
1350}