use crate::event::Event;
use crate::parsers::type_conversion::{
convert_value_to_type, parse_field_with_type, FieldType, TypeMap,
};
use crate::pipeline::EventParser;
use anyhow::{Context, Result};
use csv::ReaderBuilder;
use rhai::Dynamic;
pub fn csv_record_complete(record: &str) -> bool {
record.bytes().filter(|&b| b == b'"').count() % 2 == 0
}
pub struct CsvParser {
delimiter: u8,
has_headers: bool,
headers: Vec<String>,
type_map: TypeMap,
strict: bool,
auto_timestamp: bool,
}
impl CsvParser {
pub fn new_csv() -> Self {
Self {
delimiter: b',',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv() -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_no_headers() -> Self {
Self {
delimiter: b',',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_no_headers() -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn get_headers(&self) -> Vec<String> {
self.headers.clone()
}
pub fn get_type_map(&self) -> TypeMap {
self.type_map.clone()
}
pub fn with_type_map(mut self, type_map: TypeMap) -> Self {
self.type_map = type_map;
self
}
pub fn with_field_spec(mut self, field_spec: &str) -> Result<Self> {
for spec in field_spec.split_whitespace() {
let (field_name, field_type) = parse_field_with_type(spec)
.map_err(|e| anyhow::anyhow!("Invalid field spec '{}': {}", spec, e))?;
if let Some(ftype) = field_type {
self.type_map.insert(field_name, ftype);
}
}
Ok(self)
}
pub fn with_strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn with_auto_timestamp(mut self, auto_timestamp: bool) -> Self {
self.auto_timestamp = auto_timestamp;
self
}
pub fn initialize_headers_from_line(&mut self, line: &str) -> Result<bool> {
if !self.headers.is_empty() {
return Ok(false);
}
if self.has_headers {
self.headers = self.parse_header_line(line)?;
Ok(true) } else {
self.headers = self.generate_column_names(line)?;
Ok(false) }
}
fn parse_header_line(&mut self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV header line")?;
let headers: Vec<String> = record
.iter()
.enumerate()
.map(|(i, s)| {
let raw = s.trim();
let mut parts = raw.splitn(2, ':');
let field_name = parts.next().unwrap_or("").trim();
let field_type = parts.next().map(str::trim);
if let Some(type_str) = field_type {
if let Some(ftype) = FieldType::from_str(type_str) {
if !field_name.is_empty() && !self.type_map.contains_key(field_name) {
self.type_map.insert(field_name.to_string(), ftype);
}
if field_name.is_empty() {
return format!("c{}", i + 1);
}
return field_name.to_string();
}
}
if raw.is_empty() {
format!("c{}", i + 1)
} else {
raw.to_string()
}
})
.collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV header line"))
}
}
fn generate_column_names(&self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line for column count")?;
let headers: Vec<String> = (1..=record.len()).map(|i| format!("c{}", i)).collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV data line"))
}
}
fn build_value(&self, header: &str, field: &str) -> Result<Dynamic> {
if let Some(field_type) = self.type_map.get(header) {
convert_value_to_type(field, field_type, self.strict).map_err(|e| {
anyhow::anyhow!("Type conversion failed for field '{}': {}", header, e)
})
} else {
Ok(Dynamic::from(field.to_string()))
}
}
fn set_positional_field(&self, event: &mut Event, i: usize, field: &str) -> Result<()> {
match self.headers.get(i) {
Some(header) => {
let value = self.build_value(header, field)?;
event.set_field(header.clone(), value);
}
None => {
let name = format!("c{}", i + 1);
if !self.headers.iter().any(|h| *h == name) {
event.set_field(name, Dynamic::from(field.to_string()));
}
}
}
Ok(())
}
fn check_row_shape(&self, field_count: usize) -> Result<()> {
let expected = self.headers.len();
match field_count.cmp(&expected) {
std::cmp::Ordering::Equal => Ok(()),
std::cmp::Ordering::Greater => {
if self.strict {
return Err(self.ragged_row_error(field_count));
}
crate::stats::stats_add_csv_row_extra_columns(expected + 1);
Ok(())
}
std::cmp::Ordering::Less => {
if self.strict {
return Err(self.ragged_row_error(field_count));
}
crate::stats::stats_add_csv_row_missing_columns();
Ok(())
}
}
}
fn ragged_row_error(&self, field_count: usize) -> anyhow::Error {
let origin = if self.has_headers {
"from header"
} else {
"from first line"
};
anyhow::anyhow!(
"Row has {} columns, expected {} ({})",
field_count,
self.headers.len(),
origin
)
}
fn parse_data_line(&self, line: &str) -> Result<Event> {
if !csv_record_complete(line) {
return Err(anyhow::anyhow!(
"Unterminated quoted field: a quoted value was opened but never closed \
before end of input. Check for an unbalanced double quote (\")."
));
}
if !line.as_bytes().contains(&b'"') {
let mut event = Event::with_capacity(line.to_string(), self.headers.len());
let mut field_count = 0;
for (i, field) in line.split(self.delimiter as char).enumerate() {
field_count = i + 1;
self.set_positional_field(&mut event, i, field)?;
}
self.check_row_shape(field_count)?;
if self.auto_timestamp {
event.extract_timestamp();
}
return Ok(event);
}
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line")?;
let mut event = Event::with_capacity(line.to_string(), record.len());
for (i, field) in record.iter().enumerate() {
self.set_positional_field(&mut event, i, field)?;
}
self.check_row_shape(record.len())?;
if self.auto_timestamp {
event.extract_timestamp();
}
Ok(event)
} else {
Err(anyhow::anyhow!("Empty CSV record"))
}
}
fn create_skip_event(&self, line: &str) -> Event {
let mut event = Event::default_with_line(line.to_string());
event.set_field("__skip__".to_string(), Dynamic::from(true));
event
}
}
impl EventParser for CsvParser {
fn parse(&self, line: &str) -> Result<Event> {
let line = line.trim_end_matches('\n').trim_end_matches('\r');
if line.trim().is_empty() {
return Ok(self.create_skip_event(line));
}
if self.headers.is_empty() {
return Err(anyhow::anyhow!(
"CSV parser not properly initialized. This should not happen in normal usage."
));
}
self.parse_data_line(line)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn csv_crate_fields(line: &str, delimiter: u8) -> Vec<String> {
let mut reader = ReaderBuilder::new()
.delimiter(delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
let record = reader
.records()
.next()
.expect("non-empty line")
.expect("valid record");
record.iter().map(|s| s.to_string()).collect()
}
#[test]
fn fast_path_matches_csv_crate_for_unquoted_lines() {
let cases = [
"a,b,c",
"Alice,25,New York",
"a,,b", ",a,b", "a,b,", ",,", "single", " a , b , c ", "a\\,b\\n", "café,naïve,日本語", "tab\tinside", "1,2,3,4,5,6,7,8", ];
for &line in &cases {
let expected = csv_crate_fields(line, b',');
let headers: Vec<String> = (1..=expected.len()).map(|i| format!("c{i}")).collect();
let parser =
CsvParser::new_csv_with_headers(headers.clone()).with_auto_timestamp(false);
let event = parser.parse(line).expect("fast path parse");
for (i, header) in headers.iter().enumerate() {
let got = event
.fields
.get(header.as_str())
.map(|v| v.to_string())
.unwrap_or_default();
assert_eq!(
got, expected[i],
"mismatch on line {line:?} field {header}: fast={got:?} csv={:?}",
expected[i]
);
}
}
}
#[test]
fn fast_path_matches_csv_crate_for_unquoted_tsv() {
let cases = ["a\tb\tc", "a\t\tb", "\ta\tb", " a \t b ", "one"];
for &line in &cases {
let expected = csv_crate_fields(line, b'\t');
let headers: Vec<String> = (1..=expected.len()).map(|i| format!("c{i}")).collect();
let parser =
CsvParser::new_tsv_with_headers(headers.clone()).with_auto_timestamp(false);
let event = parser.parse(line).expect("fast path parse");
for (i, header) in headers.iter().enumerate() {
let got = event
.fields
.get(header.as_str())
.map(|v| v.to_string())
.unwrap_or_default();
assert_eq!(got, expected[i], "tsv mismatch on {line:?} field {header}");
}
}
}
#[test]
fn test_csv_with_headers() {
let mut parser = CsvParser::new_csv();
let header_line = "name,age,city";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_no_headers() {
let mut parser = CsvParser::new_csv_no_headers();
let data_line = "Alice,25,New York";
let was_consumed = parser.initialize_headers_from_line(data_line).unwrap();
assert!(!was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("c2").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("c3").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_tsv_with_headers() {
let mut parser = CsvParser::new_tsv();
let header_line = "name\tage\tcity";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice\t25\tNew York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_with_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser.parse("\"John Smith\",\"Hello, world!\"").unwrap();
assert_eq!(
data_result.fields.get("name").unwrap().to_string(),
"John Smith"
);
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"Hello, world!"
);
}
#[test]
fn test_csv_with_escaped_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser
.parse("\"John\",\"He said \"\"hello\"\" to me\"")
.unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "John");
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"He said \"hello\" to me"
);
}
#[test]
fn test_csv_empty_fields() {
let mut parser = CsvParser::new_csv();
let _ = parser
.initialize_headers_from_line("name,age,city")
.unwrap();
let data_result = parser.parse("Alice,,Boston").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"Boston"
);
}
#[test]
fn test_csv_variable_columns() {
let mut parser = CsvParser::new_csv_no_headers();
let first_line = "Alice,25";
let _ = parser.initialize_headers_from_line(first_line).unwrap();
let data_result1 = parser.parse("Alice,25").unwrap();
assert_eq!(data_result1.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result1.fields.get("c2").unwrap().to_string(), "25");
assert!(data_result1.fields.get("c3").is_none());
let data_result2 = parser.parse("Bob,30,Engineer").unwrap();
assert_eq!(data_result2.fields.get("c1").unwrap().to_string(), "Bob");
assert_eq!(data_result2.fields.get("c2").unwrap().to_string(), "30");
assert_eq!(
data_result2.fields.get("c3").unwrap().to_string(),
"Engineer"
);
let data_result3 = parser.parse("Carol").unwrap();
assert_eq!(data_result3.fields.get("c1").unwrap().to_string(), "Carol");
assert!(data_result3.fields.get("c2").is_none());
}
#[test]
fn test_csv_headered_overflow_gets_positional_names() {
let mut parser = CsvParser::new_csv();
let _ = parser
.initialize_headers_from_line("name,age,city")
.unwrap();
let event = parser.parse("Alice,25,Boston,extra1,extra2").unwrap();
assert_eq!(event.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(event.fields.get("city").unwrap().to_string(), "Boston");
assert_eq!(event.fields.get("c4").unwrap().to_string(), "extra1");
assert_eq!(event.fields.get("c5").unwrap().to_string(), "extra2");
}
#[test]
fn test_csv_overflow_quoted_slow_path() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,msg").unwrap();
let event = parser.parse("\"Alice\",\"hello, world\",overflow").unwrap();
assert_eq!(event.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(event.fields.get("msg").unwrap().to_string(), "hello, world");
assert_eq!(event.fields.get("c3").unwrap().to_string(), "overflow");
}
#[test]
fn test_csv_overflow_does_not_clobber_real_header() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("a,c3").unwrap();
let event = parser.parse("x,y,z").unwrap();
assert_eq!(event.fields.get("a").unwrap().to_string(), "x");
assert_eq!(event.fields.get("c3").unwrap().to_string(), "y");
}
#[test]
fn test_csv_strict_rejects_ragged_rows() {
let mut parser = CsvParser::new_csv();
let _ = parser
.initialize_headers_from_line("name,age,city")
.unwrap();
let parser = parser.with_strict(true);
assert!(parser.parse("Alice,25,Boston").is_ok());
let err = parser.parse("Bob,30,Boston,extra").unwrap_err();
assert!(
err.to_string().contains("expected 3 (from header)"),
"{}",
err
);
assert!(parser.parse("\"Bob\",30,Boston,extra").is_err());
let err = parser.parse("Carol,41").unwrap_err();
assert!(
err.to_string().contains("expected 3 (from header)"),
"{}",
err
);
}
#[test]
fn test_csv_strict_headerless_error_names_first_line_origin() {
let mut parser = CsvParser::new_csv_no_headers();
let _ = parser.initialize_headers_from_line("a,b").unwrap();
let parser = parser.with_strict(true);
let err = parser.parse("1,2,3").unwrap_err();
assert!(
err.to_string().contains("expected 2 (from first line)"),
"{}",
err
);
}
#[test]
fn csv_record_complete_counts_quote_parity() {
assert!(csv_record_complete("a,b,c"));
assert!(csv_record_complete("\"a\",\"b\""));
assert!(csv_record_complete("\"he said \"\"hi\"\"\"")); assert!(!csv_record_complete("\"a\",\"unclosed")); assert!(!csv_record_complete("trailing\"")); }
#[test]
fn test_parse_data_line_rejects_unterminated_quoted_field() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,note").unwrap();
let err = parser.parse("\"alice\",\"hello").unwrap_err();
assert!(
err.to_string().contains("Unterminated quoted field"),
"{}",
err
);
let ok = parser.parse("\"alice\",\"hello world\"").unwrap();
assert_eq!(ok.fields.get("note").unwrap().to_string(), "hello world");
}
#[test]
fn test_parse_reassembled_multiline_record() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,note").unwrap();
let event = parser.parse("\"alice\",\"hello\nworld\"").unwrap();
assert_eq!(event.fields.get("name").unwrap().to_string(), "alice");
assert_eq!(
event.fields.get("note").unwrap().to_string(),
"hello\nworld"
);
}
#[test]
fn test_csv_header_type_annotations() {
let mut parser = CsvParser::new_csv();
let header_line = "name,status:int,bytes:int";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice,200,1234").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(
data_result.fields.get("status").unwrap().as_int().unwrap(),
200
);
assert_eq!(
data_result.fields.get("bytes").unwrap().as_int().unwrap(),
1234
);
assert!(data_result.fields.get("status:int").is_none());
}
#[test]
fn test_csv_blank_header_columns_get_positional_names() {
let mut parser = CsvParser::new_csv();
let was_consumed = parser.initialize_headers_from_line("a,,,c").unwrap();
assert!(was_consumed);
let event = parser.parse("1,2,3,4").unwrap();
assert_eq!(event.fields.get("a").unwrap().to_string(), "1");
assert_eq!(event.fields.get("c2").unwrap().to_string(), "2");
assert_eq!(event.fields.get("c3").unwrap().to_string(), "3");
assert_eq!(event.fields.get("c").unwrap().to_string(), "4");
assert!(event.fields.get("").is_none());
}
#[test]
fn test_csv_blank_typed_header_gets_positional_name() {
let mut parser = CsvParser::new_csv();
parser.initialize_headers_from_line("a,:int,c").unwrap();
let event = parser.parse("1,2,3").unwrap();
assert_eq!(event.fields.get("c2").unwrap().to_string(), "2");
assert!(event.fields.get("").is_none());
}
}