#![allow(dead_code)]
use crate::event::Event;
use crate::parsers::type_conversion::{convert_value_to_type, parse_field_with_type, TypeMap};
use crate::pipeline::EventParser;
use anyhow::{Context, Result};
use csv::ReaderBuilder;
use rhai::Dynamic;
pub struct CsvParser {
delimiter: u8,
has_headers: bool,
headers: Vec<String>,
type_map: TypeMap,
strict: bool,
}
impl CsvParser {
pub fn new_csv() -> Self {
Self {
delimiter: b',',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_tsv() -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_csv_no_headers() -> Self {
Self {
delimiter: b',',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_tsv_no_headers() -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_csv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_tsv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_csv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
}
}
pub fn new_tsv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
}
}
pub fn get_headers(&self) -> Vec<String> {
self.headers.clone()
}
pub fn with_field_spec(mut self, field_spec: &str) -> Result<Self> {
for spec in field_spec.split_whitespace() {
let (field_name, field_type) = parse_field_with_type(spec)
.map_err(|e| anyhow::anyhow!("Invalid field spec '{}': {}", spec, e))?;
if let Some(ftype) = field_type {
self.type_map.insert(field_name, ftype);
}
}
Ok(self)
}
pub fn with_strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn initialize_headers_from_line(&mut self, line: &str) -> Result<bool> {
if !self.headers.is_empty() {
return Ok(false);
}
if self.has_headers {
self.headers = self.parse_header_line(line)?;
Ok(true) } else {
self.headers = self.generate_column_names(line)?;
Ok(false) }
}
fn parse_header_line(&self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV header line")?;
let headers: Vec<String> = record.iter().map(|s| s.trim().to_string()).collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV header line"))
}
}
fn generate_column_names(&self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line for column count")?;
let headers: Vec<String> = (1..=record.len()).map(|i| format!("c{}", i)).collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV data line"))
}
}
fn parse_data_line(&self, line: &str) -> Result<Event> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line")?;
let mut event = Event::with_capacity(line.to_string(), record.len());
for (i, field) in record.iter().enumerate() {
if let Some(header) = self.headers.get(i) {
let value = if let Some(field_type) = self.type_map.get(header) {
match convert_value_to_type(field, field_type, self.strict) {
Ok(converted) => converted,
Err(e) => {
if self.strict {
return Err(anyhow::anyhow!(
"Type conversion failed for field '{}': {}",
header,
e
));
} else {
Dynamic::from(field.to_string())
}
}
}
} else {
Dynamic::from(field.to_string())
};
event.set_field(header.clone(), value);
}
}
event.extract_timestamp();
Ok(event)
} else {
Err(anyhow::anyhow!("Empty CSV record"))
}
}
fn create_skip_event(&self, line: &str) -> Event {
let mut event = Event::default_with_line(line.to_string());
event.set_field("__skip__".to_string(), Dynamic::from(true));
event
}
}
impl EventParser for CsvParser {
fn parse(&self, line: &str) -> Result<Event> {
let line = line.trim_end_matches('\n').trim_end_matches('\r');
if line.trim().is_empty() {
return Ok(self.create_skip_event(line));
}
if self.headers.is_empty() {
return Err(anyhow::anyhow!(
"CSV parser not properly initialized. This should not happen in normal usage."
));
}
self.parse_data_line(line)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_csv_with_headers() {
let mut parser = CsvParser::new_csv();
let header_line = "name,age,city";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_no_headers() {
let mut parser = CsvParser::new_csv_no_headers();
let data_line = "Alice,25,New York";
let was_consumed = parser.initialize_headers_from_line(data_line).unwrap();
assert!(!was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("c2").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("c3").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_tsv_with_headers() {
let mut parser = CsvParser::new_tsv();
let header_line = "name\tage\tcity";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice\t25\tNew York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_with_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser.parse("\"John Smith\",\"Hello, world!\"").unwrap();
assert_eq!(
data_result.fields.get("name").unwrap().to_string(),
"John Smith"
);
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"Hello, world!"
);
}
#[test]
fn test_csv_with_escaped_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser
.parse("\"John\",\"He said \"\"hello\"\" to me\"")
.unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "John");
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"He said \"hello\" to me"
);
}
#[test]
fn test_csv_empty_fields() {
let mut parser = CsvParser::new_csv();
let _ = parser
.initialize_headers_from_line("name,age,city")
.unwrap();
let data_result = parser.parse("Alice,,Boston").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"Boston"
);
}
#[test]
fn test_csv_variable_columns() {
let mut parser = CsvParser::new_csv_no_headers();
let first_line = "Alice,25";
let _ = parser.initialize_headers_from_line(first_line).unwrap();
let data_result1 = parser.parse("Alice,25").unwrap();
assert_eq!(data_result1.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result1.fields.get("c2").unwrap().to_string(), "25");
assert!(data_result1.fields.get("c3").is_none());
let data_result2 = parser.parse("Bob,30,Engineer").unwrap();
assert_eq!(data_result2.fields.get("c1").unwrap().to_string(), "Bob");
assert_eq!(data_result2.fields.get("c2").unwrap().to_string(), "30");
assert!(data_result2.fields.get("c3").is_none());
}
}