use crate::error::{Error, Result};
use crate::runtime::SharedInputReader;
use memchr::memchr;
use regex::bytes::Regex as BytesRegex;
use std::io::BufRead;
pub fn trim_end_record_bytes(buf: &[u8]) -> usize {
let mut end = buf.len();
while end > 0 && buf[end - 1] == b'\n' {
end -= 1;
}
end
}
pub fn read_next_record(
reader: &SharedInputReader,
rs: &str,
out: &mut Vec<u8>,
rt_sep: &mut Vec<u8>,
regex_rs: Option<&BytesRegex>,
leftover: &mut Vec<u8>,
) -> Result<bool> {
out.clear();
rt_sep.clear();
let mut guard = reader
.lock()
.map_err(|_| Error::Runtime("input reader lock poisoned".into()))?;
let r = &mut *guard;
if rs == "\n" {
return read_until_lf(r, out, rt_sep, leftover);
}
if rs.is_empty() {
let ok = read_paragraph_record(r, out, rt_sep, leftover)?;
return Ok(ok);
}
if let Some(re) = regex_rs {
return read_until_regex_bytes(r, re, out, rt_sep, leftover);
}
read_until_bytes(r, rs.as_bytes(), out, rt_sep, leftover)
}
fn read_until_lf<R: BufRead>(
reader: &mut R,
out: &mut Vec<u8>,
rt_sep: &mut Vec<u8>,
leftover: &mut Vec<u8>,
) -> Result<bool> {
if !leftover.is_empty() {
if let Some(pos) = memchr(b'\n', leftover) {
out.extend_from_slice(&leftover[..=pos]);
leftover.drain(..=pos);
rt_sep.extend_from_slice(b"\n");
return Ok(true);
}
out.extend_from_slice(leftover);
leftover.clear();
}
let n = reader.read_until(b'\n', out).map_err(Error::Io)?;
if !out.is_empty() {
if out.last() == Some(&b'\n') {
rt_sep.extend_from_slice(b"\n");
}
return Ok(true);
}
Ok(n > 0)
}
fn read_paragraph_record<R: BufRead>(
reader: &mut R,
out: &mut Vec<u8>,
rt_sep: &mut Vec<u8>,
leftover: &mut Vec<u8>,
) -> Result<bool> {
let mut line = Vec::<u8>::new();
let mut saw_content = false;
loop {
line.clear();
if !leftover.is_empty() {
if let Some(pos) = memchr(b'\n', leftover) {
line.extend_from_slice(&leftover[..=pos]);
leftover.drain(..=pos);
} else {
line.extend_from_slice(leftover);
leftover.clear();
let _ = reader.read_until(b'\n', &mut line).map_err(Error::Io)?;
}
} else {
let n = reader.read_until(b'\n', &mut line).map_err(Error::Io)?;
if n == 0 {
let end = trim_end_record_bytes(out);
rt_sep.extend_from_slice(&out[end..]);
out.truncate(end);
return Ok(saw_content);
}
}
if line.is_empty() {
let end = trim_end_record_bytes(out);
rt_sep.extend_from_slice(&out[end..]);
out.truncate(end);
return Ok(saw_content);
}
let is_blank = line
.iter()
.all(|b| matches!(*b, b' ' | b'\t' | b'\r' | b'\n'));
if is_blank {
if saw_content {
let end = trim_end_record_bytes(out);
rt_sep.extend_from_slice(&out[end..]);
out.truncate(end);
rt_sep.extend_from_slice(&line);
let mut peek = Vec::<u8>::new();
loop {
peek.clear();
if !leftover.is_empty() {
if let Some(pos) = memchr(b'\n', leftover) {
peek.extend_from_slice(&leftover[..=pos]);
leftover.drain(..=pos);
} else {
peek.extend_from_slice(leftover);
leftover.clear();
let _ = reader.read_until(b'\n', &mut peek).map_err(Error::Io)?;
}
} else {
let n = reader.read_until(b'\n', &mut peek).map_err(Error::Io)?;
if n == 0 {
break;
}
}
let peek_blank = peek
.iter()
.all(|b| matches!(*b, b' ' | b'\t' | b'\r' | b'\n'));
if peek_blank {
rt_sep.extend_from_slice(&peek);
} else {
let mut new_leftover = peek.clone();
new_leftover.extend_from_slice(leftover);
*leftover = new_leftover;
break;
}
}
return Ok(true);
}
continue;
}
saw_content = true;
out.extend_from_slice(&line);
}
}
fn read_until_regex_bytes<R: BufRead>(
reader: &mut R,
re: &BytesRegex,
out: &mut Vec<u8>,
rt_sep: &mut Vec<u8>,
leftover: &mut Vec<u8>,
) -> Result<bool> {
let mut chunk = [0u8; 4096];
loop {
if let Some(m) = re.find(leftover) {
out.extend_from_slice(&leftover[..m.start()]);
rt_sep.extend_from_slice(m.as_bytes());
leftover.drain(..m.end());
return Ok(true);
}
let n = reader.read(&mut chunk).map_err(Error::Io)?;
if n == 0 {
if leftover.is_empty() {
return Ok(false);
}
out.extend_from_slice(leftover);
leftover.clear();
return Ok(true);
}
leftover.extend_from_slice(&chunk[..n]);
}
}
fn read_until_bytes<R: BufRead>(
reader: &mut R,
delim: &[u8],
out: &mut Vec<u8>,
rt_sep: &mut Vec<u8>,
leftover: &mut Vec<u8>,
) -> Result<bool> {
if delim.is_empty() {
return Ok(false);
}
if delim.len() == 1 {
if !leftover.is_empty() {
if let Some(pos) = memchr(delim[0], leftover) {
out.extend_from_slice(&leftover[..pos]);
leftover.drain(..=pos);
rt_sep.push(delim[0]);
return Ok(true);
}
out.extend_from_slice(leftover);
leftover.clear();
}
let n = reader.read_until(delim[0], out).map_err(Error::Io)?;
if !out.is_empty() {
if out.last() == Some(&delim[0]) {
out.pop();
rt_sep.push(delim[0]);
}
return Ok(true);
}
return Ok(n > 0);
}
let mut chunk = [0u8; 4096];
loop {
if leftover.len() >= delim.len() {
for start in 0..=leftover.len() - delim.len() {
if &leftover[start..start + delim.len()] == delim {
out.extend_from_slice(&leftover[..start]);
rt_sep.extend_from_slice(delim);
leftover.drain(..start + delim.len());
return Ok(true);
}
}
}
let n = reader.read(&mut chunk).map_err(Error::Io)?;
if n == 0 {
if leftover.is_empty() {
return Ok(false);
}
out.extend_from_slice(leftover);
leftover.clear();
return Ok(true);
}
leftover.extend_from_slice(&chunk[..n]);
}
}
pub fn split_input_into_records<'a>(
data: &'a [u8],
rs: &str,
regex_rs: Option<&BytesRegex>,
) -> Vec<&'a [u8]> {
if data.is_empty() {
return Vec::new();
}
if rs == "\n" {
return split_lines_unix(data);
}
if rs.is_empty() {
return split_paragraph_mmap(data);
}
if let Some(re) = regex_rs {
return split_by_regex_mmap(data, re);
}
split_by_delimiter_mmap(data, rs.as_bytes())
}
fn split_by_regex_mmap<'a>(data: &'a [u8], re: &BytesRegex) -> Vec<&'a [u8]> {
let mut out = Vec::new();
let mut last = 0usize;
for m in re.find_iter(data) {
out.push(&data[last..m.start()]);
last = m.end();
}
out.push(&data[last..]);
out
}
fn split_lines_unix(data: &[u8]) -> Vec<&[u8]> {
let mut v = Vec::new();
let mut pos = 0usize;
let len = data.len();
while pos < len {
let eol = memchr(b'\n', &data[pos..len])
.map(|i| pos + i)
.unwrap_or(len);
v.push(&data[pos..eol]);
pos = eol + 1;
}
v
}
fn split_paragraph_mmap(data: &[u8]) -> Vec<&[u8]> {
let mut out = Vec::new();
let mut start: Option<usize> = None;
let mut cur_end: usize = 0;
let mut pos = 0usize;
let len = data.len();
while pos < len {
let eol = memchr(b'\n', &data[pos..len])
.map(|i| pos + i)
.unwrap_or(len);
let line = &data[pos..eol];
let blank = line.iter().all(|b| b.is_ascii_whitespace());
if blank {
if let Some(s) = start.take() {
let trimmed_end = s + trim_end_record_bytes(&data[s..cur_end]);
out.push(&data[s..trimmed_end]);
}
} else if start.is_none() {
start = Some(pos);
cur_end = if eol < len { eol + 1 } else { eol };
} else {
cur_end = if eol < len { eol + 1 } else { eol };
}
pos = if eol < len { eol + 1 } else { len };
}
if let Some(s) = start {
let trimmed_end = s + trim_end_record_bytes(&data[s..cur_end]);
out.push(&data[s..trimmed_end]);
}
out
}
fn split_by_delimiter_mmap<'a>(data: &'a [u8], delim: &[u8]) -> Vec<&'a [u8]> {
if delim.is_empty() {
return vec![data];
}
let mut out = Vec::new();
let mut start = 0usize;
let finder = memchr::memmem::Finder::new(delim);
while start < data.len() {
let hay = &data[start..];
if let Some(rel) = finder.find(hay) {
let abs = start + rel;
out.push(&data[start..abs]);
start = abs + delim.len();
} else {
out.push(&data[start..]);
break;
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::SharedInputReader;
use std::io::{BufReader, Cursor, Read};
use std::sync::{Arc, Mutex};
fn shared_reader(data: &[u8]) -> SharedInputReader {
Arc::new(Mutex::new(BufReader::new(
Box::new(Cursor::new(data.to_vec())) as Box<dyn Read + Send>,
)))
}
#[test]
fn trim_end_record_bytes_strips_only_trailing_lf_not_cr() {
assert_eq!(trim_end_record_bytes(b"abc\n"), 3);
assert_eq!(trim_end_record_bytes(b"abc\r\n"), 4); assert_eq!(trim_end_record_bytes(b"abc\r\r\n"), 5); assert_eq!(trim_end_record_bytes(b"abc\n\n"), 3); }
#[test]
fn trim_end_record_bytes_empty() {
assert_eq!(trim_end_record_bytes(b""), 0);
}
#[test]
fn trim_end_record_bytes_only_lf_yields_zero_len_content() {
assert_eq!(trim_end_record_bytes(b"\n"), 0);
assert_eq!(trim_end_record_bytes(b"\r\n"), 1);
}
#[test]
fn trim_end_record_bytes_preserves_inner_newlines() {
assert_eq!(trim_end_record_bytes(b"a\nb"), 3);
}
#[test]
fn split_empty_input_yields_empty_vec() {
assert!(split_input_into_records(b"", "\n", None).is_empty());
assert!(split_input_into_records(b"", "XX", None).is_empty());
}
#[test]
fn split_newline_default() {
let d = b"a\nb\n";
let r = split_input_into_records(d, "\n", None);
assert_eq!(r, vec![&b"a"[..], &b"b"[..]]);
}
#[test]
fn split_newline_preserves_cr_before_lf() {
let d = b"a\r\nb\r\n";
let r = split_input_into_records(d, "\n", None);
assert_eq!(r, vec![&b"a\r"[..], &b"b\r"[..]]);
}
#[test]
fn split_newline_last_record_without_newline() {
let d = b"only";
let r = split_input_into_records(d, "\n", None);
assert_eq!(r, vec![&b"only"[..]]);
}
#[test]
fn split_paragraph_mode_blank_line_separator_strips_trailing_newlines() {
let d = b"para one line\n\npara two\n";
let r = split_input_into_records(d, "", None);
assert_eq!(r.len(), 2);
assert_eq!(r[0], &b"para one line"[..]);
assert_eq!(r[1], &b"para two"[..]);
}
#[test]
fn split_paragraph_leading_blank_lines_skipped() {
let d = b"\n\nbody\n\n";
let r = split_input_into_records(d, "", None);
assert_eq!(r, vec![&b"body"[..]]);
}
#[test]
fn split_paragraph_whitespace_only_input_yields_no_records() {
let d = b"\n\n \t\n";
let r = split_input_into_records(d, "", None);
assert!(r.is_empty(), "expected no paragraph records, got {r:?}");
}
#[test]
fn split_custom_rs() {
let d = b"aXXbXX";
let r = split_input_into_records(d, "XX", None);
assert_eq!(r, vec![&b"a"[..], &b"b"[..]]);
}
#[test]
fn split_single_byte_literal_rs() {
let d = b"a|b|c";
let r = split_input_into_records(d, "|", None);
assert_eq!(r, vec![&b"a"[..], &b"b"[..], &b"c"[..]]);
}
#[test]
fn split_custom_rs_ending_at_delimiter_omits_trailing_empty_mmap_chunk() {
let d = b"aXX";
let r = split_input_into_records(d, "XX", None);
assert_eq!(r, vec![&b"a"[..]]);
}
#[test]
fn split_multibyte_literal_rs() {
let d = "α•β•γ".as_bytes();
let r = split_input_into_records(d, "•", None);
assert_eq!(r.len(), 3);
assert_eq!(r[0], "α".as_bytes());
assert_eq!(r[1], "β".as_bytes());
assert_eq!(r[2], "γ".as_bytes());
}
#[test]
fn split_regex_rs_mmap() {
let d = b"axxbxx";
let re = BytesRegex::new("x+").unwrap();
let r = split_input_into_records(d, "x+", Some(&re));
assert_eq!(r, vec![&b"a"[..], &b"b"[..], &b""[..]]);
}
#[test]
fn read_next_record_default_rs_reads_until_lf() {
let rdr = shared_reader(b"hi\nthere\n");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "\n", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"hi\n");
assert_eq!(sep, b"\n");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "\n", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"there\n");
assert!(!read_next_record(&rdr, "\n", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_literal_multibyte_delimiter() {
let rdr = shared_reader(b"axXXbXX");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "XX", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"ax");
assert_eq!(sep, b"XX");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "XX", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"b");
assert_eq!(sep, b"XX");
}
#[test]
fn read_next_record_regex_rs_reads_every_record() {
let rdr = shared_reader(b"a---b--c-d");
let re = BytesRegex::new("-+").unwrap();
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "-+", &mut out, &mut sep, Some(&re), &mut lo).unwrap());
assert_eq!(out, b"a");
assert_eq!(sep, b"---");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "-+", &mut out, &mut sep, Some(&re), &mut lo).unwrap());
assert_eq!(out, b"b");
assert_eq!(sep, b"--");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "-+", &mut out, &mut sep, Some(&re), &mut lo).unwrap());
assert_eq!(out, b"c");
assert_eq!(sep, b"-");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "-+", &mut out, &mut sep, Some(&re), &mut lo).unwrap());
assert_eq!(out, b"d");
assert_eq!(sep, b"");
assert!(!read_next_record(&rdr, "-+", &mut out, &mut sep, Some(&re), &mut lo).unwrap());
}
#[test]
fn read_next_record_paragraph_mode_blank_line_boundary() {
let rdr = shared_reader(b"first para line\n\nsecond\n");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"first para line");
assert_eq!(sep, b"\n\n");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"second");
assert!(!read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_paragraph_skips_leading_blanks() {
let rdr = shared_reader(b"\n\nbody\n\n");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"body");
assert!(!read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_custom_rs_char_strips_separator_from_record() {
let rdr = shared_reader(b"a:b:c");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, ":", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"a");
assert_eq!(sep, b":");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, ":", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"b");
assert_eq!(sep, b":");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, ":", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"c");
assert!(!read_next_record(&rdr, ":", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_multi_char_rs_returns_every_record() {
let rdr = shared_reader(b"a--b--c");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "--", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"a");
assert_eq!(sep, b"--");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "--", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"b");
assert_eq!(sep, b"--");
out.clear();
sep.clear();
assert!(read_next_record(&rdr, "--", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"c");
assert_eq!(sep, b"");
assert!(!read_next_record(&rdr, "--", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_empty_rs_at_eof() {
let rdr = shared_reader(b"para1\n\npara2");
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"para1");
out.clear();
assert!(read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, b"para2");
assert!(!read_next_record(&rdr, "", &mut out, &mut sep, None, &mut lo).unwrap());
}
#[test]
fn read_next_record_large_buffer() {
let data = vec![b'x'; 10000];
let rdr = shared_reader(&data);
let mut out = Vec::new();
let mut sep = Vec::new();
let mut lo = Vec::new();
assert!(read_next_record(&rdr, "\n", &mut out, &mut sep, None, &mut lo).unwrap());
assert_eq!(out, data);
}
}