use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
use crate::inspector::Inspector;
use crate::output_stream::impls::{
impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
};
use crate::output_stream::{
BackpressureControl, Chunk, FromStreamOptions, LineWriteMode, Next, OutputStream, StreamEvent,
};
use crate::{LineParsingOptions, NumBytes, WaitForLineResult};
use atomic_take::AtomicTake;
use bytes::Buf;
use std::borrow::Cow;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;
pub struct SingleSubscriberOutputStream {
stream_reader: JoinHandle<()>,
receiver: AtomicTake<mpsc::Receiver<StreamEvent>>,
chunk_size: NumBytes,
max_channel_capacity: usize,
backpressure_control: BackpressureControl,
name: &'static str,
}
impl OutputStream for SingleSubscriberOutputStream {
fn chunk_size(&self) -> NumBytes {
self.chunk_size
}
fn channel_capacity(&self) -> usize {
self.max_channel_capacity
}
fn name(&self) -> &'static str {
self.name
}
}
impl Drop for SingleSubscriberOutputStream {
fn drop(&mut self) {
self.stream_reader.abort();
}
}
impl Debug for SingleSubscriberOutputStream {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SingleSubscriberOutputStream")
.field("output_collector", &"non-debug < JoinHandle<()> >")
.field(
"receiver",
&"non-debug < tokio::sync::mpsc::Receiver<StreamEvent> >",
)
.finish()
}
}
#[expect(
clippy::too_many_lines,
reason = "the stream reader keeps tightly coupled buffer and backpressure state together"
)]
async fn read_chunked<R: AsyncRead + Unpin + Send + 'static>(
mut read: R,
chunk_size: NumBytes,
sender: mpsc::Sender<StreamEvent>,
backpressure_control: BackpressureControl,
) {
struct AfterSend {
do_break: bool,
}
enum TrySendStatus {
Sent,
Full,
Closed,
}
fn log_if_lagged(lagged: &mut usize) {
if *lagged > 0 {
tracing::debug!(lagged = *lagged, "Stream reader is lagging behind");
*lagged = 0;
}
}
fn try_send_gap(sender: &mpsc::Sender<StreamEvent>, lagged: &mut usize) -> TrySendStatus {
match sender.try_send(StreamEvent::Gap) {
Ok(()) => {
log_if_lagged(lagged);
TrySendStatus::Sent
}
Err(TrySendError::Full(_data)) => TrySendStatus::Full,
Err(TrySendError::Closed(_data)) => TrySendStatus::Closed,
}
}
fn try_send_chunk(
chunk: Chunk,
sender: &mpsc::Sender<StreamEvent>,
lagged: &mut usize,
) -> TrySendStatus {
let event = StreamEvent::Chunk(chunk);
match sender.try_send(event) {
Ok(()) => {
log_if_lagged(lagged);
TrySendStatus::Sent
}
Err(TrySendError::Full(_data)) => {
*lagged += 1;
TrySendStatus::Full
}
Err(TrySendError::Closed(_data)) => {
TrySendStatus::Closed
}
}
}
async fn send_event(event: StreamEvent, sender: &mpsc::Sender<StreamEvent>) -> AfterSend {
match sender.send(event).await {
Ok(()) => {}
Err(_err) => {
return AfterSend { do_break: true };
}
}
AfterSend { do_break: false }
}
let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
let mut lagged: usize = 0;
let mut gap_pending = false;
'outer: loop {
let _ = buf.try_reclaim(chunk_size.bytes());
match read.read_buf(&mut buf).await {
Ok(bytes_read) => {
let is_eof = bytes_read == 0;
if is_eof {
match backpressure_control {
BackpressureControl::DropLatestIncomingIfBufferFull => {
if gap_pending {
let after = send_event(StreamEvent::Gap, &sender).await;
if after.do_break {
break 'outer;
}
gap_pending = false;
}
let after = send_event(StreamEvent::Eof, &sender).await;
if after.do_break {
break 'outer;
}
}
BackpressureControl::BlockUntilBufferHasSpace => {
let after = send_event(StreamEvent::Eof, &sender).await;
if after.do_break {
break 'outer;
}
}
}
} else {
while !buf.is_empty() {
let split_to = usize::min(chunk_size.bytes(), buf.len());
match backpressure_control {
BackpressureControl::DropLatestIncomingIfBufferFull => {
if gap_pending {
match try_send_gap(&sender, &mut lagged) {
TrySendStatus::Sent => {
gap_pending = false;
}
TrySendStatus::Full => {
let dropped_chunks = if chunk_size.bytes() == 0 {
buf.len()
} else {
buf.len().div_ceil(chunk_size.bytes())
};
buf.advance(buf.len());
lagged += dropped_chunks;
continue;
}
TrySendStatus::Closed => break 'outer,
}
}
let chunk = Chunk(buf.split_to(split_to).freeze());
match try_send_chunk(chunk, &sender, &mut lagged) {
TrySendStatus::Sent => {}
TrySendStatus::Full => {
gap_pending = true;
}
TrySendStatus::Closed => break 'outer,
}
}
BackpressureControl::BlockUntilBufferHasSpace => {
let event =
StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze()));
let after = send_event(event, &sender).await;
if after.do_break {
break 'outer;
}
}
}
}
}
if is_eof {
break;
}
}
Err(err) => panic!("Could not read from stream: {err}"),
}
}
}
impl SingleSubscriberOutputStream {
pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
stream: S,
stream_name: &'static str,
backpressure_control: BackpressureControl,
options: FromStreamOptions,
) -> SingleSubscriberOutputStream {
options.chunk_size.assert_non_zero("options.chunk_size");
let (tx_stdout, rx_stdout) = mpsc::channel::<StreamEvent>(options.channel_capacity);
let stream_reader = tokio::spawn(read_chunked(
stream,
options.chunk_size,
tx_stdout,
backpressure_control,
));
SingleSubscriberOutputStream {
stream_reader,
receiver: AtomicTake::new(rx_stdout),
chunk_size: options.chunk_size,
max_channel_capacity: options.channel_capacity,
backpressure_control,
name: stream_name,
}
}
pub fn backpressure_control(&self) -> BackpressureControl {
self.backpressure_control
}
fn take_receiver(&self) -> mpsc::Receiver<StreamEvent> {
self.receiver.take().unwrap_or_else(|| {
panic!(
"Cannot create multiple consumers on SingleSubscriberOutputStream (stream: '{}'). \
Only one inspector or collector can be active at a time. \
Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.",
self.name
)
})
}
}
macro_rules! handle_subscription {
($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
$loop_label: loop {
tokio::select! {
out = $receiver.recv() => {
match out {
Some(event) => {
let $chunk = event;
$body
}
None => {
break $loop_label;
}
}
}
_msg = &mut $term_rx => break $loop_label,
}
}
};
}
impl SingleSubscriberOutputStream {
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_chunks(&self, f: impl Fn(Chunk) -> Next + Send + 'static) -> Inspector {
let mut receiver = self.take_receiver();
impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_lines(
&self,
mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Inspector {
let mut receiver = self.take_receiver();
impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_lines_async<Fut>(
&self,
mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
options: LineParsingOptions,
) -> Inspector
where
Fut: Future<Output = Next> + Send,
{
let mut receiver = self.take_receiver();
impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
}
}
impl SingleSubscriberOutputStream {
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks<S: Sink>(
&self,
into: S,
collect: impl Fn(Chunk, &mut S) + Send + 'static,
) -> Collector<S> {
let mut receiver = self.take_receiver();
impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
where
S: Sink,
C: AsyncChunkCollector<S>,
{
let mut receiver = self.take_receiver();
impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines<S: Sink>(
&self,
into: S,
collect: impl Fn(Cow<'_, str>, &mut S) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Collector<S> {
let mut receiver = self.take_receiver();
impl_collect_lines!(
self.name(),
receiver,
collect,
options,
into,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_async<S, C>(
&self,
into: S,
collect: C,
options: LineParsingOptions,
) -> Collector<S>
where
S: Sink,
C: AsyncLineCollector<S>,
{
let mut receiver = self.take_receiver();
impl_collect_lines_async!(
self.name(),
receiver,
collect,
options,
into,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
self.collect_chunks(Vec::new(), |chunk, vec| {
vec.extend_from_slice(chunk.as_ref());
})
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
self.collect_lines(
Vec::new(),
|line, vec| {
vec.push(line.into_owned());
Next::Continue
},
options,
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
&self,
write: W,
) -> Collector<W> {
let mut receiver = self.take_receiver();
impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
&self,
write: W,
options: LineParsingOptions,
mode: LineWriteMode,
) -> Collector<W> {
let mut receiver = self.take_receiver();
impl_collect_lines_into_write!(
self.name(),
receiver,
write,
options,
mode,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_write_mapped<
W: Sink + AsyncWriteExt + Unpin,
B: AsRef<[u8]> + Send,
>(
&self,
write: W,
mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
) -> Collector<W> {
let mut receiver = self.take_receiver();
impl_collect_chunks_into_write_mapped!(
self.name(),
receiver,
write,
mapper,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_write_mapped<
W: Sink + AsyncWriteExt + Unpin,
B: AsRef<[u8]> + Send,
>(
&self,
write: W,
mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
options: LineParsingOptions,
mode: LineWriteMode,
) -> Collector<W> {
let mut receiver = self.take_receiver();
impl_collect_lines_into_write_mapped!(
self.name(),
receiver,
write,
mapper,
options,
mode,
handle_subscription
)
}
}
impl SingleSubscriberOutputStream {
async fn wait_for_line_inner(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> WaitForLineResult {
let mut receiver = self.take_receiver();
let mut parser = crate::output_stream::LineParserState::new();
loop {
match receiver.recv().await {
Some(StreamEvent::Chunk(chunk)) => {
if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
if predicate(line) {
Next::Break
} else {
Next::Continue
}
}) == Next::Break
{
return WaitForLineResult::Matched;
}
}
Some(StreamEvent::Gap) => {
parser.on_gap();
}
Some(StreamEvent::Eof) | None => {
if visit_final_line(&parser, |line| {
if predicate(line) {
Next::Break
} else {
Next::Continue
}
}) == Next::Break
{
return WaitForLineResult::Matched;
}
return WaitForLineResult::StreamClosed;
}
}
}
}
pub async fn wait_for_line(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> WaitForLineResult {
self.wait_for_line_inner(predicate, options).await
}
pub async fn wait_for_line_with_timeout(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
timeout: Duration,
) -> WaitForLineResult {
tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
.await
.unwrap_or(WaitForLineResult::Timeout)
}
}
#[cfg(test)]
mod tests {
use crate::output_stream::Chunk;
use crate::output_stream::StreamEvent;
use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
use crate::output_stream::tests::write_test_data;
use crate::output_stream::{BackpressureControl, FromStreamOptions, LineWriteMode, Next};
use crate::single_subscriber::read_chunked;
use crate::{AsyncChunkCollector, AsyncLineCollector};
use crate::{LineParsingOptions, NumBytes, NumBytesExt, WaitForLineResult};
use assertr::prelude::*;
use atomic_take::AtomicTake;
use bytes::Bytes;
use mockall::{automock, predicate};
use std::borrow::Cow;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing_test::traced_test;
struct BreakOnLine;
impl AsyncLineCollector<Vec<String>> for BreakOnLine {
async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
if line == "break" {
seen.push(line.into_owned());
Next::Break
} else {
seen.push(line.into_owned());
Next::Continue
}
}
}
struct WriteLine;
impl AsyncLineCollector<std::fs::File> for WriteLine {
async fn collect<'a>(
&'a mut self,
line: Cow<'a, str>,
temp_file: &'a mut std::fs::File,
) -> Next {
writeln!(temp_file, "{line}").unwrap();
Next::Continue
}
}
struct ExtendChunks;
impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
seen.extend_from_slice(chunk.as_ref());
Next::Continue
}
}
#[test]
#[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
fn from_stream_panics_on_zero_chunk_size() {
let _stream = SingleSubscriberOutputStream::from_stream(
tokio::io::empty(),
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
chunk_size: NumBytes::zero(),
..FromStreamOptions::default()
},
);
}
#[tokio::test]
#[traced_test]
async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
{
let (read_half, mut write_half) = tokio::io::duplex(64);
let (tx, mut rx) = mpsc::channel(64);
write_half.write_all(b"hello world").await.unwrap();
write_half.flush().await.unwrap();
let stream_reader = tokio::spawn(read_chunked(
read_half,
2.bytes(),
tx,
BackpressureControl::DropLatestIncomingIfBufferFull,
));
drop(write_half); stream_reader.await.unwrap();
let mut chunks = Vec::<String>::new();
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Chunk(chunk) => {
chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
}
StreamEvent::Gap => {}
StreamEvent::Eof => break,
}
}
assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
}
#[tokio::test]
async fn read_chunked_sends_pending_gap_before_terminal_eof() {
let read = Cursor::new(b"aabbcc".to_vec());
let (tx, mut rx) = mpsc::channel(1);
let stream_reader = tokio::spawn(read_chunked(
read,
2.bytes(),
tx,
BackpressureControl::DropLatestIncomingIfBufferFull,
));
match rx.recv().await.unwrap() {
StreamEvent::Chunk(chunk) => {
assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
}
other => panic!("expected first chunk, got {other:?}"),
}
assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
stream_reader.await.unwrap();
assert_that!(rx.recv().await).is_none();
}
#[tokio::test]
async fn read_chunked_sends_pending_gap_before_resumed_chunk_delivery() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let (tx, mut rx) = mpsc::channel(2);
let stream_reader = tokio::spawn(read_chunked(
read_half,
2.bytes(),
tx,
BackpressureControl::DropLatestIncomingIfBufferFull,
));
write_half.write_all(b"aabbcc").await.unwrap();
write_half.flush().await.unwrap();
sleep(Duration::from_millis(25)).await;
for expected in [b"aa".as_slice(), b"bb".as_slice()] {
match rx.recv().await.unwrap() {
StreamEvent::Chunk(chunk) => {
assert_that!(chunk.as_ref()).is_equal_to(expected);
}
other => panic!("expected buffered chunk, got {other:?}"),
}
}
write_half.write_all(b"dd").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
match rx.recv().await.unwrap() {
StreamEvent::Chunk(chunk) => {
assert_that!(chunk.as_ref()).is_equal_to(b"dd".as_slice());
}
other => panic!("expected resumed chunk, got {other:?}"),
}
assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);
stream_reader.await.unwrap();
assert_that!(rx.recv().await).is_none();
}
#[tokio::test]
async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let waiter = tokio::spawn(async move {
os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
.await
});
write_half.write_all(b"booting\nready\n").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await.unwrap();
assert_eq!(result, WaitForLineResult::Matched);
}
#[tokio::test]
async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let waiter = tokio::spawn(async move {
os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
.await
});
write_half
.write_all(b"booting\nstill starting\n")
.await
.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await.unwrap();
assert_eq!(result, WaitForLineResult::StreamClosed);
}
#[tokio::test]
async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let waiter = tokio::spawn(async move {
os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
.await
});
write_half.write_all(b"booting\nready").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await.unwrap();
assert_eq!(result, WaitForLineResult::Matched);
}
#[tokio::test]
async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
let (read_half, _write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let result = os
.wait_for_line_with_timeout(
|line| line.contains("ready"),
LineParsingOptions::default(),
Duration::from_millis(25),
)
.await;
assert_eq!(result, WaitForLineResult::Timeout);
}
#[tokio::test]
async fn wait_for_line_returns_stream_closed_when_stream_ends_after_writes_without_match() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
write_half.write_all(b"booting\n").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = os
.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
.await;
assert_eq!(result, WaitForLineResult::StreamClosed);
}
#[tokio::test]
async fn wait_for_line_does_not_match_across_explicit_gap_event() {
let (tx, rx) = mpsc::channel::<StreamEvent>(4);
let os = SingleSubscriberOutputStream {
stream_reader: tokio::spawn(async {}),
receiver: AtomicTake::new(rx),
chunk_size: 4.bytes(),
max_channel_capacity: 4,
backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
name: "custom",
};
tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
.await
.unwrap();
tx.send(StreamEvent::Gap).await.unwrap();
tx.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
.await
.unwrap();
tx.send(StreamEvent::Eof).await.unwrap();
drop(tx);
let result = os
.wait_for_line(|line| line == "ready", LineParsingOptions::default())
.await;
assert_eq!(result, WaitForLineResult::StreamClosed);
}
#[tokio::test]
#[traced_test]
async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
channel_capacity: 2,
..Default::default()
},
);
let inspector = os.inspect_lines_async(
|_line| async move {
sleep(Duration::from_millis(100)).await;
Next::Continue
},
LineParsingOptions::default(),
);
#[rustfmt::skip]
let producer = tokio::spawn(async move {
for count in 1..=15 {
write_half
.write_all(format!("{count}\n").as_bytes())
.await
.unwrap();
sleep(Duration::from_millis(25)).await;
}
});
producer.await.unwrap();
inspector.wait().await.unwrap();
drop(os);
logs_assert(|lines: &[&str]| {
let lagged_logs = lines
.iter()
.filter(|line| line.contains("Stream reader is lagging behind lagged="))
.count();
if lagged_logs == 0 {
return Err("Expected at least one lagged log".to_string());
}
Ok(())
});
}
#[tokio::test]
async fn inspect_lines() {
#[automock]
trait LineVisitor {
fn visit(&self, line: String);
}
#[rustfmt::skip]
fn configure(mock: &mut MockLineVisitor) {
mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
}
let (read_half, write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let mut mock = MockLineVisitor::new();
configure(&mut mock);
let inspector = os.inspect_lines(
move |line| {
mock.visit(line.into_owned());
Next::Continue
},
LineParsingOptions::default(),
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
inspector.cancel().await.unwrap();
drop(os);
}
#[tokio::test]
#[traced_test]
async fn inspect_lines_async() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let seen: Vec<String> = Vec::new();
let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
let _writer = tokio::spawn(async move {
write_half.write_all("start\n".as_bytes()).await.unwrap();
write_half.write_all("break\n".as_bytes()).await.unwrap();
write_half.write_all("end\n".as_bytes()).await.unwrap();
loop {
write_half
.write_all("gibberish\n".as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
let seen = collector.wait().await.unwrap();
assert_that!(seen).contains_exactly(["start", "break"]);
}
#[tokio::test]
async fn collect_chunks_async_into_vec() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
chunk_size: 2.bytes(),
..Default::default()
},
);
let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
write_half.write_all(b"abcdef").await.unwrap();
drop(write_half);
let seen = collector.wait().await.unwrap();
assert_that!(seen).is_equal_to(b"abcdef".to_vec());
}
#[tokio::test]
async fn collect_lines_to_file() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
channel_capacity: 32,
..Default::default()
},
);
let temp_file = tempfile::tempfile().unwrap();
let collector = os.collect_lines(
temp_file,
|line, temp_file| {
writeln!(temp_file, "{line}").unwrap();
Next::Continue
},
LineParsingOptions::default(),
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
async fn collect_lines_async_to_file() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let temp_file = tempfile::tempfile().unwrap();
let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
async fn collect_lines_into_write_respects_requested_line_delimiter_mode() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let collector = os.collect_lines_into_write(
temp_file,
LineParsingOptions::default(),
LineWriteMode::AsIs,
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).await.unwrap();
assert_that!(contents).is_equal_to("Cargo.lockCargo.tomlREADME.mdsrctarget");
}
#[tokio::test]
#[traced_test]
async fn collect_chunks_into_write_mapped() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let temp_file = tokio::fs::File::options()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(std::env::temp_dir().join(
"tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
))
.await
.unwrap();
let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
String::from_utf8_lossy(chunk.as_ref()).to_string()
});
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).await.unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
#[traced_test]
async fn multiple_subscribers_are_not_possible() {
let (read_half, _write_half) = tokio::io::duplex(64);
let os = SingleSubscriberOutputStream::from_stream(
read_half,
"custom",
BackpressureControl::DropLatestIncomingIfBufferFull,
FromStreamOptions::default(),
);
let _inspector = os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
assert_that_panic_by(move || {
os.inspect_lines(|_line| Next::Continue, LineParsingOptions::default())
})
.has_type::<String>()
.is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'custom'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
}
}