use super::{Chunker, EventLimiter, OutputWriter, WindowManager};
use crate::event::Event;
use std::collections::VecDeque;
pub const DEFAULT_MULTILINE_FLUSH_TIMEOUT_MS: u64 = 400;
pub struct SimpleChunker;
impl Chunker for SimpleChunker {
fn feed_line(&mut self, line: String) -> Option<String> {
Some(line)
}
fn flush(&mut self) -> Option<String> {
None
}
fn has_pending(&self) -> bool {
false
}
}
#[derive(Default)]
pub struct CsvChunker {
buffer: String,
in_quoted_field: bool,
}
impl CsvChunker {
pub fn new() -> Self {
Self::default()
}
fn push_line(&mut self, line: &str) {
if !self.buffer.is_empty() {
self.buffer.push('\n');
}
self.buffer.push_str(line);
}
}
impl Chunker for CsvChunker {
fn feed_line(&mut self, line: String) -> Option<String> {
let odd_quotes = line.bytes().filter(|&b| b == b'"').count() % 2 == 1;
if self.buffer.is_empty() && !self.in_quoted_field && !odd_quotes {
return Some(line);
}
self.push_line(&line);
if odd_quotes {
self.in_quoted_field = !self.in_quoted_field;
}
if self.in_quoted_field {
None } else {
Some(std::mem::take(&mut self.buffer))
}
}
fn flush(&mut self) -> Option<String> {
if self.buffer.is_empty() {
None
} else {
self.in_quoted_field = false;
Some(std::mem::take(&mut self.buffer))
}
}
fn has_pending(&self) -> bool {
!self.buffer.is_empty()
}
}
pub struct SimpleWindowManager {
current: Option<Event>,
}
impl Default for SimpleWindowManager {
fn default() -> Self {
Self::new()
}
}
impl SimpleWindowManager {
pub fn new() -> Self {
Self { current: None }
}
}
impl WindowManager for SimpleWindowManager {
fn get_window(&self) -> Vec<Event> {
if let Some(ref event) = self.current {
vec![event.clone()]
} else {
Vec::new()
}
}
fn update(&mut self, current: &Event) {
self.current = Some(current.clone());
}
}
pub struct StdoutWriter;
impl OutputWriter for StdoutWriter {
fn write(&mut self, line: &str) -> std::io::Result<()> {
println!("{}", line);
Ok(())
}
fn flush(&mut self) -> std::io::Result<()> {
use std::io::Write;
std::io::stdout().flush()
}
}
pub struct TakeNLimiter {
remaining: usize,
}
impl TakeNLimiter {
pub fn new(limit: usize) -> Self {
Self { remaining: limit }
}
}
impl EventLimiter for TakeNLimiter {
fn allow(&mut self) -> bool {
if self.remaining > 0 {
self.remaining -= 1;
true
} else {
false
}
}
fn is_exhausted(&self) -> bool {
self.remaining == 0
}
}
pub struct SlidingWindowManager {
window_size: usize,
buffer: VecDeque<Event>,
}
impl SlidingWindowManager {
pub fn new(window_size: usize) -> Self {
Self {
window_size,
buffer: VecDeque::with_capacity(window_size + 1),
}
}
}
impl WindowManager for SlidingWindowManager {
fn get_window(&self) -> Vec<Event> {
self.buffer.iter().cloned().collect()
}
fn update(&mut self, current: &Event) {
self.buffer.push_front(current.clone());
while self.buffer.len() > self.window_size + 1 {
self.buffer.pop_back();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn chunk_all(input: &str) -> Vec<String> {
let mut chunker = CsvChunker::new();
let mut out = Vec::new();
for line in input.lines() {
if let Some(record) = chunker.feed_line(line.to_string()) {
out.push(record);
}
}
if let Some(record) = chunker.flush() {
out.push(record);
}
out
}
#[test]
fn single_line_records_pass_through_unbuffered() {
let records = chunk_all("a,b,c\nd,e,f\n");
assert_eq!(records, vec!["a,b,c", "d,e,f"]);
}
#[test]
fn quoted_field_with_embedded_newline_is_reassembled() {
let records = chunk_all("name,note\n\"alice\",\"hello\nworld\"\n\"bob\",\"ok\"\n");
assert_eq!(
records,
vec!["name,note", "\"alice\",\"hello\nworld\"", "\"bob\",\"ok\""]
);
assert!(records
.iter()
.all(|r| crate::parsers::csv::csv_record_complete(r)));
}
#[test]
fn field_spanning_several_lines_is_reassembled() {
let records = chunk_all("\"a\",\"one\ntwo\nthree\"\nx,y\n");
assert_eq!(records, vec!["\"a\",\"one\ntwo\nthree\"", "x,y"]);
}
#[test]
fn escaped_quotes_inside_a_field_do_not_close_it() {
let records = chunk_all("\"a\",\"he said \"\"hi\"\"\nbye\"\nz\n");
assert_eq!(records, vec!["\"a\",\"he said \"\"hi\"\"\nbye\"", "z"]);
}
#[test]
fn unterminated_quote_at_eof_is_flushed_for_the_parser_to_reject() {
let mut chunker = CsvChunker::new();
assert!(chunker.feed_line("\"oops,unclosed".to_string()).is_none());
assert!(chunker.has_pending());
let flushed = chunker.flush().expect("buffered partial record");
assert_eq!(flushed, "\"oops,unclosed");
assert!(!crate::parsers::csv::csv_record_complete(&flushed));
assert!(!chunker.has_pending());
}
}