use std::io::{BufRead, BufWriter, Write};
use crate::error::{Error, Result};
use crate::framing::{OnError, Stats};
use crate::io::Input;
#[derive(Debug, Clone)]
pub struct Config {
pub max_line: u64,
pub on_error: OnError,
pub sample: Option<u64>,
pub ensure_trailing_newline: bool,
pub partition: Option<(u32, u32)>,
}
impl Default for Config {
fn default() -> Self {
Self {
max_line: 16 * 1024 * 1024,
on_error: OnError::Skip,
sample: None,
ensure_trailing_newline: true,
partition: None,
}
}
}
pub const DEFAULT_WRITER_CAPACITY: usize = 2 * 1024 * 1024;
pub fn run(mut input: Input, sink: impl Write, cfg: &Config) -> Result<Stats> {
input.reject_if_compressed()?;
let mut writer = BufWriter::with_capacity(DEFAULT_WRITER_CAPACITY, sink);
let mut stats = Stats::default();
let mut line: Vec<u8> = Vec::with_capacity(8 * 1024);
let mut byte_offset: u64 = 0;
let (rank, world_size) = cfg.partition.unwrap_or((0, 1));
loop {
line.clear();
let n = input.read_until(b'\n', &mut line).map_err(Error::Io)?;
if n == 0 {
break;
}
stats.records_in += 1;
stats.bytes_in += n as u64;
let this_offset = byte_offset;
byte_offset += n as u64;
let has_newline = line.last() == Some(&b'\n');
if !has_newline {
stats.had_trailing_partial = true;
}
if (n as u64) > cfg.max_line {
let keep =
stats.apply_oversize_policy(cfg.on_error, this_offset, n as u64, cfg.max_line)?;
if !keep {
continue;
}
}
if world_size > 1 && ((stats.records_in - 1) as u32) % world_size != rank {
continue;
}
writer.write_all(&line).map_err(Error::Io)?;
if !has_newline && cfg.ensure_trailing_newline {
writer.write_all(b"\n").map_err(Error::Io)?;
stats.bytes_out += 1;
}
stats.bytes_out += line.len() as u64;
stats.records_out += 1;
if let Some(cap) = cfg.sample
&& stats.records_out >= cap
{
break;
}
}
writer.flush().map_err(Error::Io)?;
Ok(stats)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::Input;
fn input_of(bytes: &'static [u8]) -> Input {
Input::from_reader(Box::new(bytes), Some(bytes.len() as u64), None).unwrap()
}
#[test]
fn trivial_three_lines() {
let inp = input_of(b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 3);
assert_eq!(stats.records_out, 3);
assert_eq!(out, b"{\"a\":1}\n{\"a\":2}\n{\"a\":3}\n");
}
#[test]
fn missing_trailing_newline_is_patched() {
let inp = input_of(b"first\nsecond");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 2);
assert_eq!(stats.records_out, 2);
assert!(stats.had_trailing_partial);
assert_eq!(out, b"first\nsecond\n");
}
#[test]
fn empty_input_emits_nothing() {
let inp = input_of(b"");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 0);
assert_eq!(stats.records_out, 0);
assert_eq!(stats.bytes_out, 0);
assert!(out.is_empty());
}
#[test]
fn sample_caps_output() {
let inp = input_of(b"a\nb\nc\nd\ne\n");
let mut out = Vec::new();
let cfg = Config {
sample: Some(3),
..Config::default()
};
let stats = run(inp, &mut out, &cfg).unwrap();
assert_eq!(stats.records_out, 3);
assert_eq!(out, b"a\nb\nc\n");
}
#[test]
fn oversized_skip_drops_and_continues() {
let inp = input_of(b"ok\nWAY_TOO_LONG\nalso_ok\n");
let mut out = Vec::new();
let cfg = Config {
max_line: 10,
on_error: OnError::Skip,
..Config::default()
};
let stats = run(inp, &mut out, &cfg).unwrap();
assert_eq!(stats.records_in, 3);
assert_eq!(stats.records_out, 2);
assert_eq!(stats.oversized_skipped, 1);
assert_eq!(out, b"ok\nalso_ok\n");
}
#[test]
fn oversized_fail_errors() {
let inp = input_of(b"ok\nWAY_TOO_LONG\n");
let mut out = Vec::new();
let cfg = Config {
max_line: 5,
on_error: OnError::Fail,
..Config::default()
};
let err = run(inp, &mut out, &cfg).unwrap_err();
assert!(matches!(err, Error::OversizedRecord { .. }));
}
#[test]
fn oversized_passthrough_emits_anyway() {
let inp = input_of(b"ok\nWAY_TOO_LONG\nfin\n");
let mut out = Vec::new();
let cfg = Config {
max_line: 4, on_error: OnError::Passthrough,
..Config::default()
};
let stats = run(inp, &mut out, &cfg).unwrap();
assert_eq!(stats.records_out, 3);
assert_eq!(stats.oversized_passthrough, 1);
assert_eq!(out, b"ok\nWAY_TOO_LONG\nfin\n");
}
#[cfg(feature = "gzip")]
#[test]
fn accepts_gzip_input_transparently() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let original = b"one\ntwo\nthree\n";
let mut enc = GzEncoder::new(Vec::new(), Compression::default());
enc.write_all(original).unwrap();
let compressed = enc.finish().unwrap();
let inp =
crate::io::Input::from_reader(Box::new(std::io::Cursor::new(compressed)), None, None)
.unwrap();
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 3);
assert_eq!(out, original);
}
#[test]
fn handles_crlf_by_default_preserving_cr() {
let inp = input_of(b"one\r\ntwo\r\n");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 2);
assert_eq!(out, b"one\r\ntwo\r\n");
}
#[test]
fn embedded_nuls_pass_through() {
let inp = input_of(b"before\0after\nnext\n");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 2);
assert_eq!(out, b"before\0after\nnext\n");
}
#[test]
fn zero_byte_file_is_empty_output() {
let inp = input_of(b"");
let mut out = Vec::new();
run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(out, b"");
}
#[test]
fn single_line_no_trailing_newline() {
let inp = input_of(b"solo");
let mut out = Vec::new();
let stats = run(inp, &mut out, &Config::default()).unwrap();
assert_eq!(stats.records_in, 1);
assert_eq!(out, b"solo\n");
}
}