use crate::collector::{AsyncChunkCollector, AsyncLineCollector, Collector, Sink};
use crate::inspector::Inspector;
use crate::output_stream::impls::{
impl_collect_chunks, impl_collect_chunks_async, impl_collect_chunks_into_write,
impl_collect_chunks_into_write_mapped, impl_collect_lines, impl_collect_lines_async,
impl_collect_lines_into_write, impl_collect_lines_into_write_mapped, impl_inspect_chunks,
impl_inspect_lines, impl_inspect_lines_async, visit_final_line, visit_lines,
};
use crate::output_stream::{Chunk, FromStreamOptions, LineWriteMode, Next, StreamEvent};
use crate::output_stream::{LineParserState, LineParsingOptions, OutputStream};
use crate::{NumBytes, WaitForLineResult};
use std::borrow::Cow;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
pub struct BroadcastOutputStream {
stream_reader: JoinHandle<()>,
sender: broadcast::Sender<StreamEvent>,
closure_state: Arc<Mutex<ClosureState>>,
chunk_size: NumBytes,
max_channel_capacity: usize,
name: &'static str,
}
struct ClosureState {
closed: bool,
}
struct Subscription {
receiver: broadcast::Receiver<StreamEvent>,
emit_terminal_eof: bool,
}
impl Subscription {
async fn recv(&mut self) -> Result<StreamEvent, RecvError> {
if self.emit_terminal_eof {
self.emit_terminal_eof = false;
return Ok(StreamEvent::Eof);
}
self.receiver.recv().await
}
}
impl OutputStream for BroadcastOutputStream {
fn chunk_size(&self) -> NumBytes {
self.chunk_size
}
fn channel_capacity(&self) -> usize {
self.max_channel_capacity
}
fn name(&self) -> &'static str {
self.name
}
}
impl Drop for BroadcastOutputStream {
fn drop(&mut self) {
self.stream_reader.abort();
}
}
impl Debug for BroadcastOutputStream {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BroadcastOutputStream")
.field("output_collector", &"non-debug < JoinHandle<()> >")
.field(
"sender",
&"non-debug < tokio::sync::broadcast::Sender<StreamEvent> >",
)
.finish()
}
}
async fn read_chunked<B: AsyncRead + Unpin + Send + 'static>(
mut read: B,
chunk_size: NumBytes,
sender: broadcast::Sender<StreamEvent>,
closure_state: Arc<Mutex<ClosureState>>,
) {
let send_chunk = move |event: StreamEvent| {
match sender.send(event) {
Ok(_received_by) => {}
Err(err) => {
tracing::debug!(
error = %err,
"No active receivers for the output chunk, dropping it"
);
}
}
};
let mut buf = bytes::BytesMut::with_capacity(chunk_size.bytes());
loop {
let _ = buf.try_reclaim(chunk_size.bytes());
match read.read_buf(&mut buf).await {
Ok(bytes_read) => {
let is_eof = bytes_read == 0;
if is_eof {
let mut state = closure_state.lock().expect("closure_state poisoned");
state.closed = true;
send_chunk(StreamEvent::Eof);
} else {
while !buf.is_empty() {
let split_to = usize::min(chunk_size.bytes(), buf.len());
send_chunk(StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze())));
}
}
if is_eof {
break;
}
}
Err(err) => panic!("Could not read from stream: {err}"),
}
}
}
impl BroadcastOutputStream {
pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
stream: S,
stream_name: &'static str,
options: FromStreamOptions,
) -> BroadcastOutputStream {
options.chunk_size.assert_non_zero("options.chunk_size");
let (sender, receiver) = broadcast::channel::<StreamEvent>(options.channel_capacity);
drop(receiver);
let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
let stream_reader = tokio::spawn(read_chunked(
stream,
options.chunk_size,
sender.clone(),
Arc::clone(&closure_state),
));
BroadcastOutputStream {
stream_reader,
sender,
closure_state,
chunk_size: options.chunk_size,
max_channel_capacity: options.channel_capacity,
name: stream_name,
}
}
fn subscribe(&self) -> Subscription {
let (receiver, emit_terminal_eof) = {
let state = self.closure_state.lock().expect("closure_state poisoned");
let receiver = self.sender.subscribe();
(receiver, state.closed)
};
Subscription {
receiver,
emit_terminal_eof,
}
}
}
macro_rules! handle_subscription {
($loop_label:tt, $receiver:expr, $term_rx:expr, |$chunk:ident| $body:block) => {
$loop_label: loop {
tokio::select! {
out = $receiver.recv() => {
match out {
Ok(event) => {
let $chunk = event;
$body
}
Err(RecvError::Closed) => {
break $loop_label;
},
Err(RecvError::Lagged(lagged)) => {
tracing::warn!(lagged, "Inspector is lagging behind");
let $chunk = StreamEvent::Gap;
$body
}
}
}
_msg = &mut $term_rx => break $loop_label,
}
}
};
}
impl BroadcastOutputStream {
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_chunks(&self, mut f: impl FnMut(Chunk) -> Next + Send + 'static) -> Inspector {
let mut receiver = self.subscribe();
impl_inspect_chunks!(self.name(), receiver, f, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_lines(
&self,
mut f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Inspector {
let mut receiver = self.subscribe();
impl_inspect_lines!(self.name(), receiver, f, options, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the inspector effectively dies immediately. You can safely do a `let _inspector = ...` binding to ignore the typical 'unused' warning."]
pub fn inspect_lines_async<Fut>(
&self,
mut f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
options: LineParsingOptions,
) -> Inspector
where
Fut: Future<Output = Next> + Send,
{
let mut receiver = self.subscribe();
impl_inspect_lines_async!(self.name(), receiver, f, options, handle_subscription)
}
}
impl BroadcastOutputStream {
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks<S: Sink>(
&self,
into: S,
mut collect: impl FnMut(Chunk, &mut S) + Send + 'static,
) -> Collector<S> {
let mut receiver = self.subscribe();
impl_collect_chunks!(self.name(), receiver, collect, into, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Collector<S>
where
S: Sink,
C: AsyncChunkCollector<S>,
{
let mut receiver = self.subscribe();
impl_collect_chunks_async!(self.name(), receiver, collect, into, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines<S: Sink>(
&self,
into: S,
mut collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Collector<S> {
let mut receiver = self.subscribe();
impl_collect_lines!(
self.name(),
receiver,
collect,
options,
into,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_async<S, C>(
&self,
into: S,
collect: C,
options: LineParsingOptions,
) -> Collector<S>
where
S: Sink,
C: AsyncLineCollector<S>,
{
let mut receiver = self.subscribe();
impl_collect_lines_async!(
self.name(),
receiver,
collect,
options,
into,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_vec(&self) -> Collector<Vec<u8>> {
self.collect_chunks(Vec::new(), |chunk, vec| {
vec.extend_from_slice(chunk.as_ref());
})
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_vec(&self, options: LineParsingOptions) -> Collector<Vec<String>> {
self.collect_lines(
Vec::new(),
|line, vec| {
vec.push(line.into_owned());
Next::Continue
},
options,
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_write<W: Sink + AsyncWriteExt + Unpin>(
&self,
write: W,
) -> Collector<W> {
let mut receiver = self.subscribe();
impl_collect_chunks_into_write!(self.name(), receiver, write, handle_subscription)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_write<W: Sink + AsyncWriteExt + Unpin>(
&self,
write: W,
options: LineParsingOptions,
mode: LineWriteMode,
) -> Collector<W> {
let mut receiver = self.subscribe();
impl_collect_lines_into_write!(
self.name(),
receiver,
write,
options,
mode,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_chunks_into_write_mapped<
W: Sink + AsyncWriteExt + Unpin,
B: AsRef<[u8]> + Send,
>(
&self,
write: W,
mapper: impl Fn(Chunk) -> B + Send + Sync + Copy + 'static,
) -> Collector<W> {
let mut receiver = self.subscribe();
impl_collect_chunks_into_write_mapped!(
self.name(),
receiver,
write,
mapper,
handle_subscription
)
}
#[must_use = "If not at least assigned to a variable, the return value will be dropped immediately, which in turn drops the internal tokio task, meaning that your callback is never called and the collector effectively dies immediately. You can safely do a `let _collector = ...` binding to ignore the typical 'unused' warning."]
pub fn collect_lines_into_write_mapped<
W: Sink + AsyncWriteExt + Unpin,
B: AsRef<[u8]> + Send,
>(
&self,
write: W,
mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + Copy + 'static,
options: LineParsingOptions,
mode: LineWriteMode,
) -> Collector<W> {
let mut receiver = self.subscribe();
impl_collect_lines_into_write_mapped!(
self.name(),
receiver,
write,
mapper,
options,
mode,
handle_subscription
)
}
}
impl BroadcastOutputStream {
async fn wait_for_line_inner(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> WaitForLineResult {
let mut receiver = self.subscribe();
let mut parser = LineParserState::new();
loop {
match receiver.recv().await {
Ok(StreamEvent::Chunk(chunk)) => {
if visit_lines(chunk.as_ref(), &mut parser, options, |line| {
if predicate(line) {
Next::Break
} else {
Next::Continue
}
}) == Next::Break
{
return WaitForLineResult::Matched;
}
}
Ok(StreamEvent::Gap) => {
parser.on_gap();
}
Ok(StreamEvent::Eof) | Err(RecvError::Closed) => {
if visit_final_line(&parser, |line| {
if predicate(line) {
Next::Break
} else {
Next::Continue
}
}) == Next::Break
{
return WaitForLineResult::Matched;
}
return WaitForLineResult::StreamClosed;
}
Err(RecvError::Lagged(lagged)) => {
tracing::warn!(lagged, "Waiter is lagging behind");
parser.on_gap();
}
}
}
}
pub async fn wait_for_line(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> WaitForLineResult {
self.wait_for_line_inner(predicate, options).await
}
pub async fn wait_for_line_with_timeout(
&self,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
timeout: Duration,
) -> WaitForLineResult {
tokio::time::timeout(timeout, self.wait_for_line_inner(predicate, options))
.await
.unwrap_or(WaitForLineResult::Timeout)
}
}
pub struct LineConfig {
pub max_line_length: usize,
}
#[cfg(test)]
mod tests {
use super::{ClosureState, read_chunked};
use crate::WaitForLineResult;
use crate::output_stream::broadcast::BroadcastOutputStream;
use crate::output_stream::tests::write_test_data;
use crate::output_stream::{Chunk, StreamEvent};
use crate::output_stream::{FromStreamOptions, LineParsingOptions, LineWriteMode, Next};
use crate::{AsyncChunkCollector, AsyncLineCollector};
use crate::{NumBytes, NumBytesExt};
use assertr::prelude::*;
use bytes::Bytes;
use mockall::*;
use std::borrow::Cow;
use std::future::poll_fn;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::pin::pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::sleep;
use tracing_test::traced_test;
struct BreakOnLine;
impl AsyncLineCollector<Vec<String>> for BreakOnLine {
async fn collect<'a>(&'a mut self, line: Cow<'a, str>, seen: &'a mut Vec<String>) -> Next {
if line == "break" {
seen.push(line.into_owned());
Next::Break
} else {
seen.push(line.into_owned());
Next::Continue
}
}
}
struct WriteLine;
impl AsyncLineCollector<std::fs::File> for WriteLine {
async fn collect<'a>(
&'a mut self,
line: Cow<'a, str>,
temp_file: &'a mut std::fs::File,
) -> Next {
writeln!(temp_file, "{line}").unwrap();
Next::Continue
}
}
struct ExtendChunks;
impl AsyncChunkCollector<Vec<u8>> for ExtendChunks {
async fn collect<'a>(&'a mut self, chunk: Chunk, seen: &'a mut Vec<u8>) -> Next {
seen.extend_from_slice(chunk.as_ref());
Next::Continue
}
}
#[test]
#[should_panic(expected = "options.chunk_size must be greater than zero bytes")]
fn from_stream_panics_on_zero_chunk_size() {
let _stream = BroadcastOutputStream::from_stream(
tokio::io::empty(),
"custom",
FromStreamOptions {
chunk_size: NumBytes::zero(),
..FromStreamOptions::default()
},
);
}
#[tokio::test]
#[traced_test]
async fn read_chunked_does_not_terminate_when_first_read_can_fill_the_entire_bytes_mut_buffer()
{
let (read_half, mut write_half) = tokio::io::duplex(64);
let (tx, mut rx) = broadcast::channel(10);
write_half.write_all(b"hello world").await.unwrap();
write_half.flush().await.unwrap();
let stream_reader = tokio::spawn(read_chunked(
read_half,
2.bytes(),
tx,
Arc::new(Mutex::new(ClosureState { closed: false })),
));
drop(write_half); stream_reader.await.unwrap();
let mut chunks = Vec::<String>::new();
while let Ok(event) = rx.recv().await {
match event {
StreamEvent::Chunk(chunk) => {
chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
}
StreamEvent::Gap => {}
StreamEvent::Eof => break,
}
}
assert_that!(chunks).contains_exactly(["he", "ll", "o ", "wo", "rl", "d"]);
}
#[tokio::test]
#[traced_test]
async fn read_chunked_no_data() {
let (read_half, write_half) = tokio::io::duplex(64);
let (tx, mut rx) = broadcast::channel(10);
let stream_reader = tokio::spawn(read_chunked(
read_half,
2.bytes(),
tx,
Arc::new(Mutex::new(ClosureState { closed: false })),
));
drop(write_half); stream_reader.await.unwrap();
let mut chunks = Vec::<String>::new();
while let Ok(event) = rx.recv().await {
match event {
StreamEvent::Chunk(chunk) => {
chunks.push(String::from_utf8_lossy(chunk.as_ref()).to_string());
}
StreamEvent::Gap => {}
StreamEvent::Eof => break,
}
}
assert_that!(chunks).is_empty();
}
#[tokio::test]
async fn wait_for_line_returns_matched_when_line_appears_before_eof() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
let mut waiter = pin!(waiter);
poll_fn(|cx| {
let _ = waiter.as_mut().poll(cx);
Poll::Ready(())
})
.await;
write_half.write_all(b"booting\nready\n").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await;
assert_that!(result).is_equal_to(WaitForLineResult::Matched);
}
#[tokio::test]
async fn wait_for_line_returns_stream_closed_when_stream_ends_before_match() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
let mut waiter = pin!(waiter);
poll_fn(|cx| {
let _ = waiter.as_mut().poll(cx);
Poll::Ready(())
})
.await;
write_half
.write_all(b"booting\nstill starting\n")
.await
.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await;
assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
}
#[tokio::test]
async fn wait_for_line_returns_matched_for_partial_final_line_at_eof() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let waiter = os.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default());
let mut waiter = pin!(waiter);
poll_fn(|cx| {
let _ = waiter.as_mut().poll(cx);
Poll::Ready(())
})
.await;
write_half.write_all(b"booting\nready").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
let result = waiter.await;
assert_that!(result).is_equal_to(WaitForLineResult::Matched);
}
#[tokio::test]
async fn wait_for_line_with_timeout_returns_timeout_while_stream_stays_open() {
let (read_half, _write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let result = os
.wait_for_line_with_timeout(
|line| line.contains("ready"),
LineParsingOptions::default(),
Duration::from_millis(25),
)
.await;
assert_that!(result).is_equal_to(WaitForLineResult::Timeout);
}
#[tokio::test]
async fn wait_for_line_returns_stream_closed_for_late_subscriber_after_eof() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
write_half.write_all(b"booting\n").await.unwrap();
write_half.flush().await.unwrap();
drop(write_half);
while !os
.closure_state
.lock()
.expect("closure_state poisoned")
.closed
{
tokio::task::yield_now().await;
}
let result = os
.wait_for_line(|line| line.contains("ready"), LineParsingOptions::default())
.await;
assert_that!(result).is_equal_to(WaitForLineResult::StreamClosed);
}
#[tokio::test]
async fn late_subscriber_after_eof_does_not_disturb_existing_subscribers() {
let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
drop(receiver);
let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
let os = BroadcastOutputStream {
stream_reader: tokio::spawn(async {}),
sender: sender.clone(),
closure_state: Arc::clone(&closure_state),
chunk_size: 4.bytes(),
max_channel_capacity: 2,
name: "custom",
};
let mut existing = os.subscribe();
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\n"))))
.unwrap();
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))))
.unwrap();
sender.send(StreamEvent::Eof).unwrap();
closure_state.lock().expect("closure_state poisoned").closed = true;
let mut late = os.subscribe();
assert_that!(late.recv().await)
.is_ok()
.is_equal_to(StreamEvent::Eof);
assert_that!(existing.recv().await)
.is_err()
.is_equal_to(RecvError::Lagged(1));
let chunk = existing.recv().await.unwrap();
assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"two\n"))));
assert_that!(existing.recv().await)
.is_ok()
.is_equal_to(StreamEvent::Eof);
}
#[tokio::test]
async fn subscriber_created_before_closure_receives_tail_data_before_terminal_eof() {
let (sender, receiver) = broadcast::channel::<StreamEvent>(4);
drop(receiver);
let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
let os = BroadcastOutputStream {
stream_reader: tokio::spawn(async {}),
sender: sender.clone(),
closure_state: Arc::clone(&closure_state),
chunk_size: 4.bytes(),
max_channel_capacity: 4,
name: "custom",
};
let mut subscriber = os.subscribe();
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))))
.unwrap();
{
let mut state = closure_state.lock().expect("closure_state poisoned");
state.closed = true;
sender.send(StreamEvent::Eof).unwrap();
}
let chunk = subscriber.recv().await.unwrap();
assert_that!(chunk).is_equal_to(StreamEvent::Chunk(Chunk(Bytes::from_static(b"tail\n"))));
assert_that!(subscriber.recv().await)
.is_ok()
.is_equal_to(StreamEvent::Eof);
}
#[tokio::test]
async fn wait_for_line_does_not_match_across_lag_gap() {
let (sender, receiver) = broadcast::channel::<StreamEvent>(2);
drop(receiver);
let closure_state = Arc::new(Mutex::new(ClosureState { closed: false }));
let os = BroadcastOutputStream {
stream_reader: tokio::spawn(async {}),
sender: sender.clone(),
closure_state: Arc::clone(&closure_state),
chunk_size: 4.bytes(),
max_channel_capacity: 2,
name: "custom",
};
let waiter = os.wait_for_line(|line| line == "ready", LineParsingOptions::default());
let mut waiter = pin!(waiter);
poll_fn(|cx| {
let _ = waiter.as_mut().poll(cx);
Poll::Ready(())
})
.await;
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"rea"))))
.unwrap();
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"lost"))))
.unwrap();
sender
.send(StreamEvent::Chunk(Chunk(Bytes::from_static(b"dy\n"))))
.unwrap();
{
let mut state = closure_state.lock().expect("closure_state poisoned");
state.closed = true;
}
sender.send(StreamEvent::Eof).unwrap();
assert_that!(waiter.await).is_equal_to(WaitForLineResult::StreamClosed);
}
#[tokio::test]
async fn collect_lines_into_write_appends_requested_line_delimiter() {
let (read_half, write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let temp_file = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let collector = os.collect_lines_into_write(
temp_file,
LineParsingOptions::default(),
LineWriteMode::AppendLf,
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).await.unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
#[traced_test]
async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
channel_capacity: 2,
..Default::default()
},
);
let consumer = os.inspect_lines_async(
|_line| async move {
sleep(Duration::from_millis(100)).await;
Next::Continue
},
LineParsingOptions::default(),
);
#[rustfmt::skip]
let producer = tokio::spawn(async move {
for count in 1..=15 {
write_half
.write_all(format!("{count}\n").as_bytes())
.await
.unwrap();
sleep(Duration::from_millis(25)).await;
}
write_half.flush().await.unwrap();
drop(write_half);
});
producer.await.unwrap();
consumer.wait().await.unwrap();
drop(os);
logs_assert(|lines: &[&str]| {
let lagged_logs = lines
.iter()
.filter(|line| line.contains("Inspector is lagging behind lagged="))
.count();
if lagged_logs == 0 {
return Err("Expected at least one lagged log".to_string());
}
Ok(())
});
}
#[tokio::test]
async fn inspect_lines() {
#[automock]
trait LineVisitor {
fn visit(&self, line: String);
}
#[rustfmt::skip]
fn configure(mock: &mut MockLineVisitor) {
mock.expect_visit().with(predicate::eq("Cargo.lock".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("Cargo.toml".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("README.md".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("src".to_string())).times(1).return_const(());
mock.expect_visit().with(predicate::eq("target".to_string())).times(1).return_const(());
}
let (read_half, write_half) = tokio::io::duplex(64);
let os =
BroadcastOutputStream::from_stream(read_half, "custom", FromStreamOptions::default());
let mut mock = MockLineVisitor::new();
configure(&mut mock);
let inspector = os.inspect_lines(
move |line| {
mock.visit(line.into_owned());
Next::Continue
},
LineParsingOptions::default(),
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
inspector.cancel().await.unwrap();
drop(os);
}
#[tokio::test]
#[traced_test]
async fn inspect_lines_async() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let seen: Vec<String> = Vec::new();
let collector = os.collect_lines_async(seen, BreakOnLine, LineParsingOptions::default());
let _writer = tokio::spawn(async move {
write_half.write_all("start\n".as_bytes()).await.unwrap();
write_half.write_all("break\n".as_bytes()).await.unwrap();
write_half.write_all("end\n".as_bytes()).await.unwrap();
loop {
write_half
.write_all("gibberish\n".as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
let seen = collector.wait().await.unwrap();
assert_that!(seen).contains_exactly(["start", "break"]);
}
#[tokio::test]
async fn collect_chunks_async_into_vec() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
chunk_size: 2.bytes(),
..Default::default()
},
);
let collector = os.collect_chunks_async(Vec::new(), ExtendChunks);
write_half.write_all(b"abcdef").await.unwrap();
drop(write_half);
let seen = collector.wait().await.unwrap();
assert_that!(seen).is_equal_to(b"abcdef".to_vec());
}
#[tokio::test]
async fn collect_lines_to_file() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
channel_capacity: 32,
..Default::default()
},
);
let temp_file = tempfile::tempfile().unwrap();
let collector = os.collect_lines(
temp_file,
|line, temp_file| {
writeln!(temp_file, "{line}").unwrap();
Next::Continue
},
LineParsingOptions::default(),
);
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
async fn collect_lines_async_to_file() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let temp_file = tempfile::tempfile().unwrap();
let collector = os.collect_lines_async(temp_file, WriteLine, LineParsingOptions::default());
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
#[traced_test]
async fn collect_chunks_into_write_mapped() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
chunk_size: 32.bytes(),
..Default::default()
},
);
let temp_file = tokio::fs::File::options()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(std::env::temp_dir().join(
"tokio_process_tools_test_single_subscriber_collect_chunks_into_write_mapped.txt",
))
.await
.unwrap();
let collector = os.collect_chunks_into_write_mapped(temp_file, |chunk| {
String::from_utf8_lossy(chunk.as_ref()).to_string()
});
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file = collector.cancel().await.unwrap();
temp_file.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file.read_to_string(&mut contents).await.unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
}
#[tokio::test]
#[traced_test]
async fn collect_chunks_into_write_in_parallel() {
let (read_half, write_half) = tokio::io::duplex(64);
let os = BroadcastOutputStream::from_stream(
read_half,
"custom",
FromStreamOptions {
chunk_size: 32.bytes(),
channel_capacity: 2,
},
);
let file1 = tokio::fs::File::options()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(
std::env::temp_dir()
.join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_1.txt"),
)
.await
.unwrap();
let file2 = tokio::fs::File::options()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(
std::env::temp_dir()
.join("tokio_process_tools_test_broadcast_stream_collect_chunks_into_write_in_parallel_2.txt"),
)
.await
.unwrap();
let collector1 = os.collect_chunks_into_write(file1);
let collector2 = os.collect_chunks_into_write_mapped(file2, |chunk| {
format!("ok-{}", String::from_utf8_lossy(chunk.as_ref()))
});
tokio::spawn(write_test_data(write_half)).await.unwrap();
let mut temp_file1 = collector1.cancel().await.unwrap();
temp_file1.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file1.read_to_string(&mut contents).await.unwrap();
assert_that!(contents).is_equal_to("Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n");
let mut temp_file2 = collector2.cancel().await.unwrap();
temp_file2.seek(SeekFrom::Start(0)).await.unwrap();
let mut contents = String::new();
temp_file2.read_to_string(&mut contents).await.unwrap();
assert_that!(contents)
.is_equal_to("ok-Cargo.lock\nok-Cargo.toml\nok-README.md\nok-src\nok-target\n");
}
}