use super::super::BroadcastOutputStream;
use super::common::{
best_effort_options_with, line_collection_options, reliable_options, wait_for_bytes_ingested,
};
use crate::output_stream::event::StreamEvent;
use crate::output_stream::event::tests::StreamEventAssertions;
use crate::{
BestEffortDelivery, LineParsingOptions, Next, NumBytesExt, ReplayEnabled, ReplayRetention,
StreamConfig, WaitForLineResult,
};
use assertr::prelude::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::AsyncWriteExt;
fn best_effort_replay_options(
max_buffered_chunks: usize,
) -> StreamConfig<BestEffortDelivery, ReplayEnabled> {
best_effort_options_with(ReplayRetention::All, 1.bytes(), max_buffered_chunks)
}
fn assert_recv_chunk(maybe_event: Option<StreamEvent>, expected: &[u8]) {
assert_that!(maybe_event)
.is_some()
.is_chunk()
.is_equal_to(expected);
}
#[tokio::test]
async fn late_subscriber_receives_startup_line_with_replay_all() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::All),
);
write_half.write_all(b"ready\n").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 6).await;
let result = stream
.wait_for_line(
Duration::from_secs(1),
|line| line == "ready",
LineParsingOptions::default(),
)
.await;
assert_that!(result).is_equal_to(Ok(WaitForLineResult::Matched));
}
#[tokio::test]
async fn seal_replay_drops_old_history_for_future_subscribers() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::All),
);
write_half.write_all(b"old\n").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 4).await;
stream.seal_replay();
let waiter = stream.wait_for_line(
Duration::from_millis(50),
|line| line == "old",
LineParsingOptions::default(),
);
assert_that!(waiter.await).is_equal_to(Ok(WaitForLineResult::Timeout));
}
#[tokio::test]
async fn subscriber_created_after_seal_starts_live_by_default() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::All),
);
write_half.write_all(b"old\n").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 4).await;
stream.seal_replay();
let waiter = stream.wait_for_line(
Duration::from_secs(1),
|line| line == "live",
LineParsingOptions::default(),
);
write_half.write_all(b"live\n").await.unwrap();
write_half.flush().await.unwrap();
assert_that!(waiter.await).is_equal_to(Ok(WaitForLineResult::Matched));
}
#[tokio::test]
async fn explicit_replay_after_seal_ignores_history_retained_for_active_subscribers() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::All),
);
let _active = stream.subscribe();
write_half.write_all(b"old\n").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 4).await;
stream.seal_replay();
let waiter = stream.wait_for_line(
Duration::from_millis(50),
|line| line == "old",
LineParsingOptions::default(),
);
assert_that!(waiter.await).is_equal_to(Ok(WaitForLineResult::Timeout));
}
#[tokio::test]
async fn active_subscribers_still_receive_unread_tail_data_after_seal() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::All),
);
let collector =
stream.collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
write_half.write_all(b"tail\n").await.unwrap();
drop(write_half);
wait_for_bytes_ingested(&stream, 5).await;
stream.seal_replay();
let collected = collector.wait().await.unwrap();
assert_that!(collected.lines().iter().map(String::as_str)).contains_exactly(["tail"]);
}
#[tokio::test]
async fn waiter_created_before_seal_can_match_replayed_startup_line() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream = BroadcastOutputStream::from_stream(
read_half,
"custom",
reliable_options(ReplayRetention::LastBytes(1.megabytes())),
);
let logs = Arc::new(Mutex::new(Vec::<String>::new()));
let logs_in_task = Arc::clone(&logs);
let _logger = stream.inspect_lines(
move |line| {
logs_in_task.lock().unwrap().push(line.into_owned());
Next::Continue
},
LineParsingOptions::default(),
);
write_half.write_all(b"ready\n").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 6).await;
let ready = stream.wait_for_line(
Duration::from_secs(1),
|line| line == "ready",
LineParsingOptions::default(),
);
stream.seal_replay();
assert_that!(ready.await).is_equal_to(Ok(WaitForLineResult::Matched));
assert_that!(logs.lock().unwrap().clone()).contains_exactly(["ready"]);
}
#[tokio::test]
async fn slow_best_effort_replay_subscriber_observes_gap_then_newer_live_data() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream =
BroadcastOutputStream::from_stream(read_half, "custom", best_effort_replay_options(2));
let mut subscriber = stream.subscribe();
write_half.write_all(b"abcde").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 5).await;
drop(write_half);
assert_that!(subscriber.recv().await)
.is_some()
.is_equal_to(StreamEvent::Gap);
assert_recv_chunk(subscriber.recv().await, b"e");
assert_that!(subscriber.recv().await)
.is_some()
.is_equal_to(StreamEvent::Eof);
}
#[tokio::test]
async fn best_effort_replay_delivers_terminal_after_pending_gap() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream =
BroadcastOutputStream::from_stream(read_half, "custom", best_effort_replay_options(1));
let mut subscriber = stream.subscribe();
write_half.write_all(b"ab").await.unwrap();
drop(write_half);
wait_for_bytes_ingested(&stream, 2).await;
assert_that!(subscriber.recv().await)
.is_some()
.is_equal_to(StreamEvent::Gap);
assert_that!(subscriber.recv().await)
.is_some()
.is_equal_to(StreamEvent::Eof);
}
#[tokio::test]
async fn late_best_effort_replay_subscriber_receives_retained_history_after_active_overflow() {
let (read_half, mut write_half) = tokio::io::duplex(64);
let stream =
BroadcastOutputStream::from_stream(read_half, "custom", best_effort_replay_options(1));
let _slow_active_subscriber = stream.subscribe();
write_half.write_all(b"abc").await.unwrap();
write_half.flush().await.unwrap();
wait_for_bytes_ingested(&stream, 3).await;
let mut late_subscriber = stream.subscribe();
assert_recv_chunk(late_subscriber.recv().await, b"a");
assert_recv_chunk(late_subscriber.recv().await, b"b");
assert_recv_chunk(late_subscriber.recv().await, b"c");
}