use async_trait::async_trait;
use serde_json::{Map, Value, json};
use std::io::Read;
use std::path::Path;
use crate::domain::error::{Result, ServiceError, StygianError};
use crate::ports::data_source::{DataSourcePort, QueryParams};
use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
#[derive(Debug, Clone, Copy)]
pub enum Delimiter {
Comma,
Tab,
Pipe,
Semicolon,
Custom(u8),
}
impl Delimiter {
const fn as_byte(self) -> u8 {
match self {
Self::Comma => b',',
Self::Tab => b'\t',
Self::Pipe => b'|',
Self::Semicolon => b';',
Self::Custom(b) => b,
}
}
fn from_str(s: &str) -> Self {
match s {
"tab" | "tsv" | "\t" => Self::Tab,
"pipe" | "|" => Self::Pipe,
"semicolon" | ";" => Self::Semicolon,
"comma" | "," => Self::Comma,
_ => {
let bytes = s.as_bytes();
if bytes.len() == 1 {
Self::Custom(bytes.first().copied().unwrap_or(b','))
} else {
Self::Comma
}
}
}
}
}
#[derive(Default)]
pub struct CsvSource;
impl CsvSource {
fn parse_reader<R: Read>(
reader: R,
delimiter: Delimiter,
has_headers: bool,
skip: usize,
limit: Option<u64>,
) -> Result<Vec<Value>> {
let mut csv_reader = csv::ReaderBuilder::new()
.delimiter(delimiter.as_byte())
.has_headers(has_headers)
.flexible(true)
.from_reader(reader);
let headers: Vec<String> = if has_headers {
let hdrs = csv_reader.headers().map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"CSV header parse error: {e}"
)))
})?;
hdrs.iter().map(|h| strip_bom(h).to_string()).collect()
} else {
Vec::new()
};
let mut rows = Vec::new();
let mut skipped = 0;
for result in csv_reader.records() {
let record = result.map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"CSV record parse error: {e}"
)))
})?;
if skipped < skip {
skipped += 1;
continue;
}
let mut map = Map::new();
for (i, field) in record.iter().enumerate() {
let key = if headers.is_empty() {
format!("column_{i}")
} else {
headers
.get(i)
.cloned()
.unwrap_or_else(|| format!("column_{i}"))
};
map.insert(key, Value::String(field.to_string()));
}
let row = Value::Object(map);
rows.push(row);
if let Some(max) = limit
&& rows.len() as u64 >= max
{
break;
}
}
Ok(rows)
}
}
fn strip_bom(s: &str) -> &str {
s.strip_prefix('\u{FEFF}').unwrap_or(s)
}
fn extract_csv_params(params: &Value) -> (Delimiter, usize, Option<u64>, bool) {
let delimiter = params
.get("delimiter")
.and_then(serde_json::Value::as_str)
.map_or(Delimiter::Comma, Delimiter::from_str);
let skip_u64 = params
.get("skip")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
let skip = usize::try_from(skip_u64).unwrap_or(usize::MAX);
let limit = params.get("limit").and_then(serde_json::Value::as_u64);
let has_headers = params
.get("has_headers")
.and_then(serde_json::Value::as_bool)
.unwrap_or(true);
(delimiter, skip, limit, has_headers)
}
#[async_trait]
impl DataSourcePort for CsvSource {
async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
let path = Path::new(¶ms.query);
if !path.exists() {
return Err(StygianError::Service(ServiceError::Unavailable(format!(
"CSV file not found: {}",
params.query
))));
}
let extra = params
.parameters
.first()
.cloned()
.unwrap_or_else(|| json!({}));
let (delimiter, skip, _, has_headers) = extract_csv_params(&extra);
let limit = params.limit;
let file = std::fs::File::open(path).map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"failed to open CSV file: {e}"
)))
})?;
tokio::task::spawn_blocking(move || {
Self::parse_reader(file, delimiter, has_headers, skip, limit)
})
.await
.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"CSV parse task failed: {e}"
)))
})?
}
async fn healthcheck(&self) -> Result<()> {
Ok(()) }
fn source_name(&self) -> &'static str {
"csv"
}
}
#[async_trait]
impl ScrapingService for CsvSource {
async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
let path = Path::new(&input.url);
if !path.exists() {
return Err(StygianError::Service(ServiceError::Unavailable(format!(
"CSV file not found: {}",
input.url
))));
}
let (delimiter, skip, limit, has_headers) = extract_csv_params(&input.params);
let file = std::fs::File::open(path).map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"failed to open CSV file: {e}"
)))
})?;
let rows = tokio::task::spawn_blocking(move || {
Self::parse_reader(file, delimiter, has_headers, skip, limit)
})
.await
.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"CSV parse task failed: {e}"
)))
})??;
let count = rows.len();
let data = serde_json::to_string(&rows).map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"CSV serialization failed: {e}"
)))
})?;
Ok(ServiceOutput {
data,
metadata: json!({
"source": "csv",
"row_count": count,
"source_path": input.url,
}),
})
}
fn name(&self) -> &'static str {
"csv"
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::indexing_slicing)]
mod tests {
use super::*;
use std::io::Cursor;
const CSV_DATA: &str = "name,age,city\nAlice,30,NYC\nBob,25,SF\nCharlie,35,LA\n";
#[test]
fn parse_csv_with_headers() {
let reader = Cursor::new(CSV_DATA);
let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
assert_eq!(rows.len(), 3);
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[0]["age"], "30");
assert_eq!(rows[0]["city"], "NYC");
assert_eq!(rows[2]["name"], "Charlie");
}
#[test]
fn parse_tsv() {
let tsv = "name\tage\nAlice\t30\nBob\t25\n";
let reader = Cursor::new(tsv);
let rows = CsvSource::parse_reader(reader, Delimiter::Tab, true, 0, None).expect("parse");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[1]["age"], "25");
}
#[test]
fn headerless_csv_generates_column_keys() {
let csv = "Alice,30,NYC\nBob,25,SF\n";
let reader = Cursor::new(csv);
let rows =
CsvSource::parse_reader(reader, Delimiter::Comma, false, 0, None).expect("parse");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["column_0"], "Alice");
assert_eq!(rows[0]["column_1"], "30");
assert_eq!(rows[0]["column_2"], "NYC");
}
#[test]
fn row_limit() {
let reader = Cursor::new(CSV_DATA);
let rows =
CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, Some(2)).expect("parse");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[1]["name"], "Bob");
}
#[test]
fn skip_rows() {
let reader = Cursor::new(CSV_DATA);
let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, None).expect("parse");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["name"], "Bob");
assert_eq!(rows[1]["name"], "Charlie");
}
#[test]
fn skip_and_limit() {
let reader = Cursor::new(CSV_DATA);
let rows =
CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, Some(1)).expect("parse");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], "Bob");
}
#[test]
fn strip_utf8_bom() {
let csv = "\u{FEFF}name,age\nAlice,30\n";
let reader = Cursor::new(csv);
let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
assert_eq!(rows.len(), 1);
assert!(rows[0].get("name").is_some(), "BOM should be stripped");
}
#[test]
fn pipe_delimiter() {
let csv = "a|b|c\n1|2|3\n";
let reader = Cursor::new(csv);
let rows = CsvSource::parse_reader(reader, Delimiter::Pipe, true, 0, None).expect("parse");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["a"], "1");
assert_eq!(rows[0]["b"], "2");
}
#[test]
fn semicolon_delimiter() {
let csv = "x;y\n10;20\n";
let reader = Cursor::new(csv);
let rows =
CsvSource::parse_reader(reader, Delimiter::Semicolon, true, 0, None).expect("parse");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["x"], "10");
}
#[test]
fn empty_csv_returns_empty() {
let csv = "name,age\n";
let reader = Cursor::new(csv);
let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
assert!(rows.is_empty());
}
#[test]
fn delimiter_from_str_parsing() {
assert_eq!(Delimiter::from_str("tab").as_byte(), b'\t');
assert_eq!(Delimiter::from_str("tsv").as_byte(), b'\t');
assert_eq!(Delimiter::from_str("pipe").as_byte(), b'|');
assert_eq!(Delimiter::from_str("semicolon").as_byte(), b';');
assert_eq!(Delimiter::from_str("comma").as_byte(), b',');
assert_eq!(Delimiter::from_str(",").as_byte(), b',');
assert_eq!(Delimiter::from_str("unknown").as_byte(), b','); }
#[tokio::test]
async fn file_not_found_returns_error() {
let source = CsvSource;
let params = QueryParams {
query: "/tmp/nonexistent_csv_file_stygian.csv".into(),
parameters: vec![],
limit: None,
};
let result = source.query(params).await;
assert!(result.is_err());
}
}