use super::options::{LineParsingOptions, assert_max_line_length_non_zero};
use super::parser::LineParser;
use crate::output_stream::Next;
use crate::output_stream::event::Chunk;
use crate::output_stream::visitor::{AsyncStreamVisitor, StreamVisitor};
use std::borrow::Cow;
use std::future::Future;
pub trait LineSink: Send + 'static {
type Output: Send + 'static;
fn on_line(&mut self, line: Cow<'_, str>) -> Next;
fn on_gap(&mut self) {}
fn on_eof(&mut self) {}
fn into_output(self) -> Self::Output;
}
pub trait AsyncLineSink: Send + 'static {
type Output: Send + 'static;
fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> impl Future<Output = Next> + Send + 'a;
fn on_gap(&mut self) {}
fn on_eof(&mut self) -> impl Future<Output = ()> + Send + '_ {
async {}
}
fn into_output(self) -> Self::Output;
}
pub struct LineAdapter<S> {
parser: LineParser,
options: LineParsingOptions,
inner: S,
}
impl<S> LineAdapter<S> {
pub fn new(options: LineParsingOptions, inner: S) -> Self {
assert_max_line_length_non_zero(&options);
Self {
parser: LineParser::new(),
options,
inner,
}
}
}
impl<S: LineSink> StreamVisitor for LineAdapter<S> {
type Output = S::Output;
fn on_chunk(&mut self, chunk: Chunk) -> Next {
let Self {
parser,
options,
inner,
} = self;
let mut bytes: &[u8] = chunk.as_ref();
while let Some(line) = parser.next_line(&mut bytes, *options) {
if inner.on_line(line) == Next::Break {
return Next::Break;
}
}
Next::Continue
}
fn on_gap(&mut self) {
self.parser.on_gap();
self.inner.on_gap();
}
fn on_eof(&mut self) {
if let Some(line) = self.parser.finish() {
let _ = self.inner.on_line(line);
}
self.inner.on_eof();
}
fn into_output(self) -> Self::Output {
self.inner.into_output()
}
}
impl<S: AsyncLineSink> AsyncStreamVisitor for LineAdapter<S> {
type Output = S::Output;
async fn on_chunk(&mut self, chunk: Chunk) -> Next {
let Self {
parser,
options,
inner,
} = self;
let mut bytes: &[u8] = chunk.as_ref();
loop {
let line = match parser.next_line(&mut bytes, *options) {
Some(line) => line.into_owned(),
None => return Next::Continue,
};
if inner.on_line(Cow::Owned(line)).await == Next::Break {
return Next::Break;
}
}
}
fn on_gap(&mut self) {
self.parser.on_gap();
self.inner.on_gap();
}
async fn on_eof(&mut self) {
let trailing = self.parser.finish().map(Cow::into_owned);
if let Some(line) = trailing {
let _ = self.inner.on_line(Cow::Owned(line)).await;
}
self.inner.on_eof().await;
}
fn into_output(self) -> Self::Output {
self.inner.into_output()
}
}
#[cfg(test)]
mod tests {
use super::super::options::LineOverflowBehavior;
use super::*;
use crate::NumBytesExt;
use crate::output_stream::consumer::{spawn_consumer_async, spawn_consumer_sync};
use crate::output_stream::event::StreamEvent;
use crate::output_stream::event::tests::event_receiver;
use assertr::prelude::*;
use bytes::Bytes;
use std::sync::{Arc, Mutex};
struct CollectingSink {
seen: Arc<Mutex<Vec<String>>>,
}
impl LineSink for CollectingSink {
type Output = ();
fn on_line(&mut self, line: Cow<'_, str>) -> Next {
self.seen.lock().unwrap().push(line.into_owned());
Next::Continue
}
fn into_output(self) -> Self::Output {}
}
struct CollectingAsyncSink {
seen: Arc<Mutex<Vec<String>>>,
}
impl AsyncLineSink for CollectingAsyncSink {
type Output = ();
async fn on_line(&mut self, line: Cow<'_, str>) -> Next {
self.seen.lock().unwrap().push(line.into_owned());
Next::Continue
}
fn into_output(self) -> Self::Output {}
}
mod sync {
use super::*;
#[test]
#[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
fn new_panics_when_max_line_length_is_zero() {
let _ = LineAdapter::new(
LineParsingOptions {
max_line_length: 0.bytes(),
overflow_behavior: LineOverflowBehavior::default(),
buffer_compaction_threshold: None,
},
CollectingSink {
seen: Arc::new(Mutex::new(Vec::new())),
},
);
}
#[tokio::test]
async fn flushes_trailing_unterminated_line_at_eof() {
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let consumer = spawn_consumer_sync(
"custom",
event_receiver(vec![
StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\nsec"))),
StreamEvent::Chunk(Chunk(Bytes::from_static(b"ond\nthird"))),
StreamEvent::Eof,
])
.await,
LineAdapter::new(
LineParsingOptions::default(),
CollectingSink {
seen: Arc::clone(&seen),
},
),
);
consumer.wait().await.unwrap();
assert_that!(seen.lock().unwrap().clone())
.contains_exactly(["first", "second", "third"]);
}
#[tokio::test]
async fn gap_discards_partial_line() {
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let consumer = spawn_consumer_sync(
"custom",
event_receiver(vec![
StreamEvent::Chunk(Chunk(Bytes::from_static(b"par"))),
StreamEvent::Gap,
StreamEvent::Chunk(Chunk(Bytes::from_static(b"tial\nclean\n"))),
StreamEvent::Eof,
])
.await,
LineAdapter::new(
LineParsingOptions::default(),
CollectingSink {
seen: Arc::clone(&seen),
},
),
);
consumer.wait().await.unwrap();
assert_that!(seen.lock().unwrap().clone()).contains_exactly(["clean"]);
}
#[tokio::test]
async fn break_from_inner_stops_parsing_immediately() {
struct StopAtSecondLine {
seen: Arc<Mutex<Vec<String>>>,
count: usize,
}
impl LineSink for StopAtSecondLine {
type Output = ();
fn on_line(&mut self, line: Cow<'_, str>) -> Next {
self.count += 1;
self.seen.lock().unwrap().push(line.into_owned());
if self.count == 2 {
Next::Break
} else {
Next::Continue
}
}
fn into_output(self) -> Self::Output {}
}
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let consumer = spawn_consumer_sync(
"custom",
event_receiver(vec![
StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\nc\nd\n"))),
StreamEvent::Eof,
])
.await,
LineAdapter::new(
LineParsingOptions::default(),
StopAtSecondLine {
seen: Arc::clone(&seen),
count: 0,
},
),
);
consumer.wait().await.unwrap();
assert_that!(seen.lock().unwrap().clone()).contains_exactly(["a", "b"]);
}
}
mod r#async {
use super::*;
#[test]
#[should_panic(expected = "LineParsingOptions::max_line_length must be greater than zero")]
fn new_panics_when_max_line_length_is_zero() {
let _ = LineAdapter::new(
LineParsingOptions {
max_line_length: 0.bytes(),
overflow_behavior: LineOverflowBehavior::default(),
buffer_compaction_threshold: None,
},
CollectingAsyncSink {
seen: Arc::new(Mutex::new(Vec::new())),
},
);
}
#[tokio::test]
async fn flushes_trailing_unterminated_line_at_eof() {
let seen = Arc::new(Mutex::new(Vec::<String>::new()));
let consumer = spawn_consumer_async(
"custom",
event_receiver(vec![
StreamEvent::Chunk(Chunk(Bytes::from_static(b"first\ntail"))),
StreamEvent::Eof,
])
.await,
LineAdapter::new(
LineParsingOptions::default(),
CollectingAsyncSink {
seen: Arc::clone(&seen),
},
),
);
consumer.wait().await.unwrap();
assert_that!(seen.lock().unwrap().clone()).contains_exactly(["first", "tail"]);
}
}
}