use bytes::{Buf, BytesMut};
use memchr::memchr;
use std::borrow::Cow;
pub const DEFAULT_CHUNK_SIZE: NumBytes = NumBytes(16 * 1024);
pub const DEFAULT_CHANNEL_CAPACITY: usize = 128;
pub mod broadcast;
pub(crate) mod impls;
pub mod single_subscriber;
pub trait OutputStream {
fn chunk_size(&self) -> NumBytes;
fn channel_capacity(&self) -> usize;
fn name(&self) -> &'static str;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FromStreamOptions {
pub chunk_size: NumBytes,
pub channel_capacity: usize,
}
impl Default for FromStreamOptions {
fn default() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
channel_capacity: DEFAULT_CHANNEL_CAPACITY, }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Chunk(bytes::Bytes);
impl AsRef<[u8]> for Chunk {
fn as_ref(&self) -> &[u8] {
self.0.chunk()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum StreamEvent {
Chunk(Chunk),
Gap,
Eof,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureControl {
DropLatestIncomingIfBufferFull,
BlockUntilBufferHasSpace,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Next {
Continue,
Break,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LineOverflowBehavior {
#[default]
DropAdditionalData,
EmitAdditionalAsNewLines,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LineWriteMode {
AsIs,
AppendLf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LineParsingOptions {
pub max_line_length: NumBytes,
pub overflow_behavior: LineOverflowBehavior,
}
impl Default for LineParsingOptions {
fn default() -> Self {
Self {
max_line_length: 16.kilobytes(),
overflow_behavior: LineOverflowBehavior::default(),
}
}
}
pub(crate) struct LineParserState {
line_buffer: BytesMut,
discard_until_newline: bool,
}
impl LineParserState {
pub fn new() -> Self {
Self {
line_buffer: BytesMut::new(),
discard_until_newline: false,
}
}
pub fn on_gap(&mut self) {
self.line_buffer.clear();
self.discard_until_newline = true;
}
pub fn visit_chunk(
&mut self,
mut chunk: &[u8],
options: LineParsingOptions,
mut f: impl FnMut(Cow<'_, str>) -> Next,
) -> Next {
while !chunk.is_empty() {
if self.discard_until_newline {
match memchr(b'\n', chunk) {
Some(pos) => {
self.discard_until_newline = false;
chunk = &chunk[pos + 1..];
}
None => return Next::Continue,
}
continue;
}
if options.max_line_length.0 != 0 && self.line_buffer.len() == options.max_line_length.0
{
match options.overflow_behavior {
LineOverflowBehavior::DropAdditionalData => {
if self.emit_line(&mut f) == Next::Break {
return Next::Break;
}
self.discard_until_newline = true;
}
LineOverflowBehavior::EmitAdditionalAsNewLines => {
if self.emit_line(&mut f) == Next::Break {
return Next::Break;
}
}
}
continue;
}
let remaining_line_length = if options.max_line_length.0 == 0 {
chunk.len()
} else {
options.max_line_length.0 - self.line_buffer.len()
};
let scan_len = remaining_line_length.min(chunk.len());
let scan = &chunk[..scan_len];
if let Some(pos) = memchr(b'\n', scan) {
let result = if self.line_buffer.is_empty() {
f(String::from_utf8_lossy(&scan[..pos]))
} else {
self.line_buffer.extend_from_slice(&scan[..pos]);
self.emit_line(&mut f)
};
if result == Next::Break {
return Next::Break;
}
chunk = &chunk[pos + 1..];
continue;
}
self.line_buffer.extend_from_slice(scan);
chunk = &chunk[scan_len..];
if options.max_line_length.0 != 0
&& self.line_buffer.len() == options.max_line_length.0
&& matches!(
options.overflow_behavior,
LineOverflowBehavior::EmitAdditionalAsNewLines
)
&& self.emit_line(&mut f) == Next::Break
{
return Next::Break;
}
}
Next::Continue
}
pub(crate) fn owned_lines<'a>(
&'a mut self,
chunk: &'a [u8],
options: LineParsingOptions,
) -> OwnedLineReader<'a> {
OwnedLineReader {
parser: self,
chunk,
options,
}
}
pub fn finish(&self, f: impl FnOnce(Cow<'_, str>) -> Next) -> Next {
if self.discard_until_newline || self.line_buffer.is_empty() {
Next::Continue
} else {
f(String::from_utf8_lossy(&self.line_buffer))
}
}
pub(crate) fn finish_owned(&self) -> Option<String> {
if self.discard_until_newline || self.line_buffer.is_empty() {
None
} else {
Some(String::from_utf8_lossy(&self.line_buffer).into_owned())
}
}
fn emit_line(&mut self, f: &mut impl FnMut(Cow<'_, str>) -> Next) -> Next {
let line = self.line_buffer.split().freeze();
f(String::from_utf8_lossy(&line))
}
fn emit_owned_line(&mut self) -> String {
let line = self.line_buffer.split().freeze();
String::from_utf8_lossy(&line).into_owned()
}
}
pub(crate) struct OwnedLineReader<'a> {
parser: &'a mut LineParserState,
chunk: &'a [u8],
options: LineParsingOptions,
}
impl Iterator for OwnedLineReader<'_> {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
while !self.chunk.is_empty() {
if self.parser.discard_until_newline {
if let Some(pos) = memchr(b'\n', self.chunk) {
self.parser.discard_until_newline = false;
self.chunk = &self.chunk[pos + 1..];
} else {
self.chunk = &[];
return None;
}
continue;
}
if self.options.max_line_length.0 != 0
&& self.parser.line_buffer.len() == self.options.max_line_length.0
{
return match self.options.overflow_behavior {
LineOverflowBehavior::DropAdditionalData => {
self.parser.discard_until_newline = true;
Some(self.parser.emit_owned_line())
}
LineOverflowBehavior::EmitAdditionalAsNewLines => {
Some(self.parser.emit_owned_line())
}
};
}
let remaining_line_length = if self.options.max_line_length.0 == 0 {
self.chunk.len()
} else {
self.options.max_line_length.0 - self.parser.line_buffer.len()
};
let scan_len = remaining_line_length.min(self.chunk.len());
let scan = &self.chunk[..scan_len];
if let Some(pos) = memchr(b'\n', scan) {
self.chunk = &self.chunk[pos + 1..];
if self.parser.line_buffer.is_empty() {
return Some(String::from_utf8_lossy(&scan[..pos]).into_owned());
}
self.parser.line_buffer.extend_from_slice(&scan[..pos]);
return Some(self.parser.emit_owned_line());
}
self.parser.line_buffer.extend_from_slice(scan);
self.chunk = &self.chunk[scan_len..];
if self.options.max_line_length.0 != 0
&& self.parser.line_buffer.len() == self.options.max_line_length.0
&& matches!(
self.options.overflow_behavior,
LineOverflowBehavior::EmitAdditionalAsNewLines
)
{
return Some(self.parser.emit_owned_line());
}
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NumBytes(usize);
impl NumBytes {
#[must_use]
pub fn zero() -> Self {
Self(0)
}
pub(crate) fn assert_non_zero(self, parameter_name: &str) {
assert!(
self.0 > 0,
"{parameter_name} must be greater than zero bytes"
);
}
#[must_use]
pub fn bytes(&self) -> usize {
self.0
}
}
pub trait NumBytesExt {
fn bytes(self) -> NumBytes;
fn kilobytes(self) -> NumBytes;
fn megabytes(self) -> NumBytes;
}
impl NumBytesExt for usize {
fn bytes(self) -> NumBytes {
NumBytes(self)
}
fn kilobytes(self) -> NumBytes {
NumBytes(self * 1024)
}
fn megabytes(self) -> NumBytes {
NumBytes(self * 1024 * 1024)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::io::{AsyncWrite, AsyncWriteExt};
pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
write.write_all("README.md\n".as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
write.write_all("src\n".as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
write.write_all("target\n".as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
mod line_parser_state {
use crate::output_stream::LineParserState;
use crate::{LineOverflowBehavior, LineParsingOptions, Next, NumBytes, NumBytesExt};
use assertr::prelude::*;
fn run_test_case(
chunks: &[&[u8]],
mark_gap_before_chunk: Option<usize>,
expected_lines: &[&str],
options: LineParsingOptions,
) {
let mut parser = LineParserState::new();
let mut collected_lines = Vec::<String>::new();
for (index, chunk) in chunks.iter().enumerate() {
if mark_gap_before_chunk == Some(index) {
parser.on_gap();
}
assert_that!(parser.visit_chunk(chunk, options, |line| {
collected_lines.push(line.into_owned());
Next::Continue
}))
.is_equal_to(Next::Continue);
}
let _ = parser.finish(|line| {
collected_lines.push(line.into_owned());
Next::Continue
});
let expected_lines: Vec<String> = expected_lines
.iter()
.map(std::string::ToString::to_string)
.collect();
assert_that!(collected_lines).is_equal_to(expected_lines);
}
fn as_single_byte_chunks(data: &str) -> Vec<&[u8]> {
data.as_bytes().iter().map(std::slice::from_ref).collect()
}
#[test]
fn empty_chunk() {
run_test_case(&[b""], None, &[], LineParsingOptions::default());
}
#[test]
fn chunk_without_any_newlines() {
run_test_case(
&[b"no newlines here"],
None,
&["no newlines here"],
LineParsingOptions::default(),
);
}
#[test]
fn single_completed_line() {
run_test_case(
&[b"one line\n"],
None,
&["one line"],
LineParsingOptions::default(),
);
}
#[test]
fn multiple_completed_lines() {
run_test_case(
&[b"first line\nsecond line\nthird line\n"],
None,
&["first line", "second line", "third line"],
LineParsingOptions::default(),
);
}
#[test]
fn partial_line_at_the_end() {
run_test_case(
&[b"complete line\npartial"],
None,
&["complete line", "partial"],
LineParsingOptions::default(),
);
}
#[test]
fn initial_line_with_multiple_newlines() {
run_test_case(
&[b"previous: continuation\nmore lines\n"],
None,
&["previous: continuation", "more lines"],
LineParsingOptions::default(),
);
}
#[test]
fn invalid_utf8_data() {
run_test_case(
&[b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n"],
None,
&["valid utf8�(�� invalid utf8"],
LineParsingOptions::default(),
);
}
#[test]
fn rest_of_too_long_line_is_dropped() {
run_test_case(
&[b"123456789\nabcdefghi\n"],
None,
&["1234", "abcd"],
LineParsingOptions {
max_line_length: 4.bytes(),
overflow_behavior: LineOverflowBehavior::DropAdditionalData,
},
);
}
#[test]
fn rest_of_too_long_line_is_returned_as_additional_lines() {
run_test_case(
&[b"123456789\nabcdefghi\n"],
None,
&["1234", "5678", "9", "abcd", "efgh", "i"],
LineParsingOptions {
max_line_length: 4.bytes(),
overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
},
);
}
#[test]
fn max_line_length_of_0_disables_line_length_checks_test1() {
run_test_case(
&[b"123456789\nabcdefghi\n"],
None,
&["123456789", "abcdefghi"],
LineParsingOptions {
max_line_length: NumBytes::zero(),
overflow_behavior: LineOverflowBehavior::DropAdditionalData,
},
);
}
#[test]
fn max_line_length_of_0_disables_line_length_checks_test2() {
run_test_case(
&[b"123456789\nabcdefghi\n"],
None,
&["123456789", "abcdefghi"],
LineParsingOptions {
max_line_length: NumBytes::zero(),
overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
},
);
}
#[test]
fn leading_and_trailing_whitespace_is_preserved() {
run_test_case(
&[b" 123456789 \n abcdefghi \n"],
None,
&[" 123456789 ", " abcdefghi "],
LineParsingOptions {
max_line_length: NumBytes::zero(),
overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
},
);
}
#[test]
fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
let chunks = as_single_byte_chunks("❤️❤️❤️\n👍\n");
run_test_case(
&chunks,
None,
&["❤️❤️❤️", "👍"],
LineParsingOptions::default(),
);
}
#[test]
fn overflow_drop_additional_data_persists_across_chunks() {
run_test_case(
&[b"1234", b"5678", b"9\nok\n"],
None,
&["1234", "ok"],
LineParsingOptions {
max_line_length: 4.bytes(),
overflow_behavior: LineOverflowBehavior::DropAdditionalData,
},
);
}
#[test]
fn gap_discards_partial_line_until_next_newline() {
run_test_case(
&[b"rea", b"dy\nnext\n"],
Some(1),
&["next"],
LineParsingOptions::default(),
);
}
}
}