use super::state::{Shared, append_event};
use crate::StreamReadError;
use crate::output_stream::config::StreamConfig;
use crate::output_stream::event::{Chunk, StreamEvent};
use crate::output_stream::policy::{Delivery, Replay};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::task::JoinHandle;
pub(super) struct FanoutReplayBackend<D, R>
where
D: Delivery,
R: Replay,
{
pub(super) stream_reader: JoinHandle<()>,
pub(super) shared: Arc<Shared>,
pub(super) options: StreamConfig<D, R>,
pub(super) name: &'static str,
}
pub(super) fn new_fanout_backend<S, D, R>(
stream: S,
stream_name: &'static str,
options: StreamConfig<D, R>,
) -> FanoutReplayBackend<D, R>
where
S: AsyncRead + Unpin + Send + 'static,
D: Delivery,
R: Replay,
{
let shared = Arc::new(Shared::new());
let stream_reader = tokio::spawn(read_chunked_shared(
stream,
Arc::clone(&shared),
options,
stream_name,
));
FanoutReplayBackend {
stream_reader,
shared,
options,
name: stream_name,
}
}
async fn read_chunked_shared<S, D, R>(
mut read: S,
shared: Arc<Shared>,
options: StreamConfig<D, R>,
stream_name: &'static str,
) where
S: AsyncRead + Unpin + Send + 'static,
D: Delivery,
R: Replay,
{
let read_chunk_size = options.read_chunk_size;
let mut buf = bytes::BytesMut::with_capacity(read_chunk_size.bytes());
loop {
let _ = buf.try_reclaim(read_chunk_size.bytes());
match read.read_buf(&mut buf).await {
Ok(bytes_read) => {
shared.record_bytes_ingested(bytes_read);
let is_eof = bytes_read == 0;
if is_eof {
append_event(&shared, options, StreamEvent::Eof).await;
break;
}
while !buf.is_empty() {
let split_to = usize::min(read_chunk_size.bytes(), buf.len());
let event = StreamEvent::Chunk(Chunk(buf.split_to(split_to).freeze()));
append_event(&shared, options, event).await;
}
}
Err(err) => {
let err = StreamReadError::new(stream_name, err);
tracing::warn!(error = %err, "Could not read from stream");
append_event(&shared, options, StreamEvent::ReadError(err)).await;
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::output_stream::event::tests::StreamEventAssertions;
use crate::{NumBytesExt, StreamConfig};
use assertr::prelude::*;
use std::io::Cursor;
fn replay_all_options() -> StreamConfig<crate::BestEffortDelivery, crate::ReplayEnabled> {
StreamConfig::builder()
.best_effort_delivery()
.replay_all()
.read_chunk_size(2.bytes())
.max_buffered_chunks(8)
.build()
}
#[tokio::test]
async fn fanout_reader_splits_input_into_configured_chunks_and_eof() {
let shared = Arc::new(Shared::new());
let options = replay_all_options();
read_chunked_shared(
Cursor::new(b"abcdef".to_vec()),
Arc::clone(&shared),
options,
"custom",
)
.await;
let state = shared.state.lock().expect("broadcast state poisoned");
assert_that!(state.replay.len()).is_equal_to(3);
assert_that!(&state.replay[0].event)
.is_chunk()
.is_equal_to(b"ab");
assert_that!(&state.replay[1].event)
.is_chunk()
.is_equal_to(b"cd");
assert_that!(&state.replay[2].event)
.is_chunk()
.is_equal_to(b"ef");
assert_that!(state.terminal.as_ref().map(|event| &event.event))
.is_some()
.is_equal_to(&StreamEvent::Eof);
}
}