use crate::event::Event;
use crate::parsers::type_conversion::{
convert_value_to_type, parse_field_with_type, FieldType, TypeMap,
};
use crate::pipeline::EventParser;
use anyhow::{Context, Result};
use csv::ReaderBuilder;
use rhai::Dynamic;
pub struct CsvParser {
delimiter: u8,
has_headers: bool,
headers: Vec<String>,
type_map: TypeMap,
strict: bool,
auto_timestamp: bool,
}
impl CsvParser {
pub fn new_csv() -> Self {
Self {
delimiter: b',',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv() -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_no_headers() -> Self {
Self {
delimiter: b',',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_no_headers() -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers: Vec::new(),
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_with_headers(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: true,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_csv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b',',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn new_tsv_no_headers_with_columns(headers: Vec<String>) -> Self {
Self {
delimiter: b'\t',
has_headers: false,
headers,
type_map: TypeMap::new(),
strict: false,
auto_timestamp: true,
}
}
pub fn get_headers(&self) -> Vec<String> {
self.headers.clone()
}
pub fn get_type_map(&self) -> TypeMap {
self.type_map.clone()
}
pub fn with_type_map(mut self, type_map: TypeMap) -> Self {
self.type_map = type_map;
self
}
pub fn with_field_spec(mut self, field_spec: &str) -> Result<Self> {
for spec in field_spec.split_whitespace() {
let (field_name, field_type) = parse_field_with_type(spec)
.map_err(|e| anyhow::anyhow!("Invalid field spec '{}': {}", spec, e))?;
if let Some(ftype) = field_type {
self.type_map.insert(field_name, ftype);
}
}
Ok(self)
}
pub fn with_strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn with_auto_timestamp(mut self, auto_timestamp: bool) -> Self {
self.auto_timestamp = auto_timestamp;
self
}
pub fn initialize_headers_from_line(&mut self, line: &str) -> Result<bool> {
if !self.headers.is_empty() {
return Ok(false);
}
if self.has_headers {
self.headers = self.parse_header_line(line)?;
Ok(true) } else {
self.headers = self.generate_column_names(line)?;
Ok(false) }
}
fn parse_header_line(&mut self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV header line")?;
let headers: Vec<String> = record
.iter()
.map(|s| {
let raw = s.trim();
let mut parts = raw.splitn(2, ':');
let field_name = parts.next().unwrap_or("").trim();
let field_type = parts.next().map(str::trim);
if let Some(type_str) = field_type {
if let Some(ftype) = FieldType::from_str(type_str) {
if !self.type_map.contains_key(field_name) {
self.type_map.insert(field_name.to_string(), ftype);
}
return field_name.to_string();
}
}
raw.to_string()
})
.collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV header line"))
}
}
fn generate_column_names(&self, line: &str) -> Result<Vec<String>> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line for column count")?;
let headers: Vec<String> = (1..=record.len()).map(|i| format!("c{}", i)).collect();
Ok(headers)
} else {
Err(anyhow::anyhow!("Empty CSV data line"))
}
}
fn parse_data_line(&self, line: &str) -> Result<Event> {
let mut reader = ReaderBuilder::new()
.delimiter(self.delimiter)
.has_headers(false)
.flexible(true)
.from_reader(line.as_bytes());
if let Some(result) = reader.records().next() {
let record = result.context("Failed to parse CSV data line")?;
let mut event = Event::with_capacity(line.to_string(), record.len());
for (i, field) in record.iter().enumerate() {
if let Some(header) = self.headers.get(i) {
let value = if let Some(field_type) = self.type_map.get(header) {
convert_value_to_type(field, field_type, self.strict).map_err(|e| {
anyhow::anyhow!("Type conversion failed for field '{}': {}", header, e)
})?
} else {
Dynamic::from(field.to_string())
};
event.set_field(header.clone(), value);
}
}
if self.auto_timestamp {
event.extract_timestamp();
}
Ok(event)
} else {
Err(anyhow::anyhow!("Empty CSV record"))
}
}
fn create_skip_event(&self, line: &str) -> Event {
let mut event = Event::default_with_line(line.to_string());
event.set_field("__skip__".to_string(), Dynamic::from(true));
event
}
}
impl EventParser for CsvParser {
fn parse(&self, line: &str) -> Result<Event> {
let line = line.trim_end_matches('\n').trim_end_matches('\r');
if line.trim().is_empty() {
return Ok(self.create_skip_event(line));
}
if self.headers.is_empty() {
return Err(anyhow::anyhow!(
"CSV parser not properly initialized. This should not happen in normal usage."
));
}
self.parse_data_line(line)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_csv_with_headers() {
let mut parser = CsvParser::new_csv();
let header_line = "name,age,city";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_no_headers() {
let mut parser = CsvParser::new_csv_no_headers();
let data_line = "Alice,25,New York";
let was_consumed = parser.initialize_headers_from_line(data_line).unwrap();
assert!(!was_consumed);
let data_result = parser.parse("Alice,25,New York").unwrap();
assert_eq!(data_result.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("c2").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("c3").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_tsv_with_headers() {
let mut parser = CsvParser::new_tsv();
let header_line = "name\tage\tcity";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice\t25\tNew York").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "25");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"New York"
);
}
#[test]
fn test_csv_with_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser.parse("\"John Smith\",\"Hello, world!\"").unwrap();
assert_eq!(
data_result.fields.get("name").unwrap().to_string(),
"John Smith"
);
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"Hello, world!"
);
}
#[test]
fn test_csv_with_escaped_quotes() {
let mut parser = CsvParser::new_csv();
let _ = parser.initialize_headers_from_line("name,message").unwrap();
let data_result = parser
.parse("\"John\",\"He said \"\"hello\"\" to me\"")
.unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "John");
assert_eq!(
data_result.fields.get("message").unwrap().to_string(),
"He said \"hello\" to me"
);
}
#[test]
fn test_csv_empty_fields() {
let mut parser = CsvParser::new_csv();
let _ = parser
.initialize_headers_from_line("name,age,city")
.unwrap();
let data_result = parser.parse("Alice,,Boston").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(data_result.fields.get("age").unwrap().to_string(), "");
assert_eq!(
data_result.fields.get("city").unwrap().to_string(),
"Boston"
);
}
#[test]
fn test_csv_variable_columns() {
let mut parser = CsvParser::new_csv_no_headers();
let first_line = "Alice,25";
let _ = parser.initialize_headers_from_line(first_line).unwrap();
let data_result1 = parser.parse("Alice,25").unwrap();
assert_eq!(data_result1.fields.get("c1").unwrap().to_string(), "Alice");
assert_eq!(data_result1.fields.get("c2").unwrap().to_string(), "25");
assert!(data_result1.fields.get("c3").is_none());
let data_result2 = parser.parse("Bob,30,Engineer").unwrap();
assert_eq!(data_result2.fields.get("c1").unwrap().to_string(), "Bob");
assert_eq!(data_result2.fields.get("c2").unwrap().to_string(), "30");
assert!(data_result2.fields.get("c3").is_none());
}
#[test]
fn test_csv_header_type_annotations() {
let mut parser = CsvParser::new_csv();
let header_line = "name,status:int,bytes:int";
let was_consumed = parser.initialize_headers_from_line(header_line).unwrap();
assert!(was_consumed);
let data_result = parser.parse("Alice,200,1234").unwrap();
assert_eq!(data_result.fields.get("name").unwrap().to_string(), "Alice");
assert_eq!(
data_result.fields.get("status").unwrap().as_int().unwrap(),
200
);
assert_eq!(
data_result.fields.get("bytes").unwrap().as_int().unwrap(),
1234
);
assert!(data_result.fields.get("status:int").is_none());
}
}