1use async_trait::async_trait;
24use serde_json::{Map, Value, json};
25use std::io::Read;
26use std::path::Path;
27
28use crate::domain::error::{Result, ServiceError, StygianError};
29use crate::ports::data_source::{DataSourcePort, QueryParams};
30use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
31
32#[derive(Debug, Clone, Copy)]
36pub enum Delimiter {
37 Comma,
39 Tab,
41 Pipe,
43 Semicolon,
45 Custom(u8),
47}
48
49impl Delimiter {
50 fn as_byte(self) -> u8 {
51 match self {
52 Self::Comma => b',',
53 Self::Tab => b'\t',
54 Self::Pipe => b'|',
55 Self::Semicolon => b';',
56 Self::Custom(b) => b,
57 }
58 }
59
60 fn from_str(s: &str) -> Self {
61 match s {
62 "tab" | "tsv" | "\t" => Self::Tab,
63 "pipe" | "|" => Self::Pipe,
64 "semicolon" | ";" => Self::Semicolon,
65 "comma" | "," => Self::Comma,
66 _ => {
67 let bytes = s.as_bytes();
69 if bytes.len() == 1 {
70 Self::Custom(bytes[0])
71 } else {
72 Self::Comma
73 }
74 }
75 }
76 }
77}
78
79#[derive(Default)]
86pub struct CsvSource;
87
88impl CsvSource {
89 fn parse_reader<R: Read>(
99 reader: R,
100 delimiter: Delimiter,
101 has_headers: bool,
102 skip: usize,
103 limit: Option<u64>,
104 ) -> Result<Vec<Value>> {
105 let mut csv_reader = csv::ReaderBuilder::new()
106 .delimiter(delimiter.as_byte())
107 .has_headers(has_headers)
108 .flexible(true)
109 .from_reader(reader);
110
111 let headers: Vec<String> = if has_headers {
112 let hdrs = csv_reader.headers().map_err(|e| {
113 StygianError::Service(ServiceError::InvalidResponse(format!(
114 "CSV header parse error: {e}"
115 )))
116 })?;
117 hdrs.iter().map(|h| strip_bom(h).to_string()).collect()
118 } else {
119 Vec::new()
120 };
121
122 let mut rows = Vec::new();
123 let mut skipped = 0;
124
125 for result in csv_reader.records() {
126 let record = result.map_err(|e| {
127 StygianError::Service(ServiceError::InvalidResponse(format!(
128 "CSV record parse error: {e}"
129 )))
130 })?;
131
132 if skipped < skip {
133 skipped += 1;
134 continue;
135 }
136
137 let row = if headers.is_empty() {
138 let mut map = Map::new();
140 for (i, field) in record.iter().enumerate() {
141 map.insert(format!("column_{i}"), Value::String(field.to_string()));
142 }
143 Value::Object(map)
144 } else {
145 let mut map = Map::new();
146 for (i, field) in record.iter().enumerate() {
147 let key = headers
148 .get(i)
149 .cloned()
150 .unwrap_or_else(|| format!("column_{i}"));
151 map.insert(key, Value::String(field.to_string()));
152 }
153 Value::Object(map)
154 };
155
156 rows.push(row);
157
158 if let Some(max) = limit
159 && rows.len() as u64 >= max
160 {
161 break;
162 }
163 }
164
165 Ok(rows)
166 }
167}
168
169fn strip_bom(s: &str) -> &str {
171 s.strip_prefix('\u{FEFF}').unwrap_or(s)
172}
173
174fn extract_csv_params(params: &Value) -> (Delimiter, usize, Option<u64>, bool) {
176 let delimiter = params
177 .get("delimiter")
178 .and_then(|v| v.as_str())
179 .map(Delimiter::from_str)
180 .unwrap_or(Delimiter::Comma);
181
182 let skip = params.get("skip").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
183
184 let limit = params.get("limit").and_then(|v| v.as_u64());
185
186 let has_headers = params
187 .get("has_headers")
188 .and_then(|v| v.as_bool())
189 .unwrap_or(true);
190
191 (delimiter, skip, limit, has_headers)
192}
193
194#[async_trait]
197impl DataSourcePort for CsvSource {
198 async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
203 let path = Path::new(¶ms.query);
204 if !path.exists() {
205 return Err(StygianError::Service(ServiceError::Unavailable(format!(
206 "CSV file not found: {}",
207 params.query
208 ))));
209 }
210
211 let extra = params.parameters.first().cloned().unwrap_or(json!({}));
212 let (delimiter, skip, _, has_headers) = extract_csv_params(&extra);
213 let limit = params.limit;
214
215 let file = std::fs::File::open(path).map_err(|e| {
216 StygianError::Service(ServiceError::Unavailable(format!(
217 "failed to open CSV file: {e}"
218 )))
219 })?;
220
221 tokio::task::spawn_blocking(move || {
223 Self::parse_reader(file, delimiter, has_headers, skip, limit)
224 })
225 .await
226 .map_err(|e| {
227 StygianError::Service(ServiceError::Unavailable(format!(
228 "CSV parse task failed: {e}"
229 )))
230 })?
231 }
232
233 async fn healthcheck(&self) -> Result<()> {
234 Ok(()) }
236
237 fn source_name(&self) -> &str {
238 "csv"
239 }
240}
241
242#[async_trait]
245impl ScrapingService for CsvSource {
246 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
254 let path = Path::new(&input.url);
255 if !path.exists() {
256 return Err(StygianError::Service(ServiceError::Unavailable(format!(
257 "CSV file not found: {}",
258 input.url
259 ))));
260 }
261
262 let (delimiter, skip, limit, has_headers) = extract_csv_params(&input.params);
263
264 let file = std::fs::File::open(path).map_err(|e| {
265 StygianError::Service(ServiceError::Unavailable(format!(
266 "failed to open CSV file: {e}"
267 )))
268 })?;
269
270 let rows = tokio::task::spawn_blocking(move || {
271 Self::parse_reader(file, delimiter, has_headers, skip, limit)
272 })
273 .await
274 .map_err(|e| {
275 StygianError::Service(ServiceError::Unavailable(format!(
276 "CSV parse task failed: {e}"
277 )))
278 })??;
279
280 let count = rows.len();
281 let data = serde_json::to_string(&rows).map_err(|e| {
282 StygianError::Service(ServiceError::InvalidResponse(format!(
283 "CSV serialization failed: {e}"
284 )))
285 })?;
286
287 Ok(ServiceOutput {
288 data,
289 metadata: json!({
290 "source": "csv",
291 "row_count": count,
292 "source_path": input.url,
293 }),
294 })
295 }
296
297 fn name(&self) -> &'static str {
298 "csv"
299 }
300}
301
302#[cfg(test)]
305mod tests {
306 use super::*;
307 use std::io::Cursor;
308
309 const CSV_DATA: &str = "name,age,city\nAlice,30,NYC\nBob,25,SF\nCharlie,35,LA\n";
310
311 #[test]
312 fn parse_csv_with_headers() {
313 let reader = Cursor::new(CSV_DATA);
314 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
315 assert_eq!(rows.len(), 3);
316 assert_eq!(rows[0]["name"], "Alice");
317 assert_eq!(rows[0]["age"], "30");
318 assert_eq!(rows[0]["city"], "NYC");
319 assert_eq!(rows[2]["name"], "Charlie");
320 }
321
322 #[test]
323 fn parse_tsv() {
324 let tsv = "name\tage\nAlice\t30\nBob\t25\n";
325 let reader = Cursor::new(tsv);
326 let rows = CsvSource::parse_reader(reader, Delimiter::Tab, true, 0, None).expect("parse");
327 assert_eq!(rows.len(), 2);
328 assert_eq!(rows[0]["name"], "Alice");
329 assert_eq!(rows[1]["age"], "25");
330 }
331
332 #[test]
333 fn headerless_csv_generates_column_keys() {
334 let csv = "Alice,30,NYC\nBob,25,SF\n";
335 let reader = Cursor::new(csv);
336 let rows =
337 CsvSource::parse_reader(reader, Delimiter::Comma, false, 0, None).expect("parse");
338 assert_eq!(rows.len(), 2);
339 assert_eq!(rows[0]["column_0"], "Alice");
340 assert_eq!(rows[0]["column_1"], "30");
341 assert_eq!(rows[0]["column_2"], "NYC");
342 }
343
344 #[test]
345 fn row_limit() {
346 let reader = Cursor::new(CSV_DATA);
347 let rows =
348 CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, Some(2)).expect("parse");
349 assert_eq!(rows.len(), 2);
350 assert_eq!(rows[0]["name"], "Alice");
351 assert_eq!(rows[1]["name"], "Bob");
352 }
353
354 #[test]
355 fn skip_rows() {
356 let reader = Cursor::new(CSV_DATA);
357 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, None).expect("parse");
358 assert_eq!(rows.len(), 2);
359 assert_eq!(rows[0]["name"], "Bob");
360 assert_eq!(rows[1]["name"], "Charlie");
361 }
362
363 #[test]
364 fn skip_and_limit() {
365 let reader = Cursor::new(CSV_DATA);
366 let rows =
367 CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, Some(1)).expect("parse");
368 assert_eq!(rows.len(), 1);
369 assert_eq!(rows[0]["name"], "Bob");
370 }
371
372 #[test]
373 fn strip_utf8_bom() {
374 let csv = "\u{FEFF}name,age\nAlice,30\n";
375 let reader = Cursor::new(csv);
376 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
377 assert_eq!(rows.len(), 1);
378 assert!(rows[0].get("name").is_some(), "BOM should be stripped");
380 }
381
382 #[test]
383 fn pipe_delimiter() {
384 let csv = "a|b|c\n1|2|3\n";
385 let reader = Cursor::new(csv);
386 let rows = CsvSource::parse_reader(reader, Delimiter::Pipe, true, 0, None).expect("parse");
387 assert_eq!(rows.len(), 1);
388 assert_eq!(rows[0]["a"], "1");
389 assert_eq!(rows[0]["b"], "2");
390 }
391
392 #[test]
393 fn semicolon_delimiter() {
394 let csv = "x;y\n10;20\n";
395 let reader = Cursor::new(csv);
396 let rows =
397 CsvSource::parse_reader(reader, Delimiter::Semicolon, true, 0, None).expect("parse");
398 assert_eq!(rows.len(), 1);
399 assert_eq!(rows[0]["x"], "10");
400 }
401
402 #[test]
403 fn empty_csv_returns_empty() {
404 let csv = "name,age\n";
405 let reader = Cursor::new(csv);
406 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
407 assert!(rows.is_empty());
408 }
409
410 #[test]
411 fn delimiter_from_str_parsing() {
412 assert_eq!(Delimiter::from_str("tab").as_byte(), b'\t');
413 assert_eq!(Delimiter::from_str("tsv").as_byte(), b'\t');
414 assert_eq!(Delimiter::from_str("pipe").as_byte(), b'|');
415 assert_eq!(Delimiter::from_str("semicolon").as_byte(), b';');
416 assert_eq!(Delimiter::from_str("comma").as_byte(), b',');
417 assert_eq!(Delimiter::from_str(",").as_byte(), b',');
418 assert_eq!(Delimiter::from_str("unknown").as_byte(), b','); }
420
421 #[tokio::test]
422 async fn file_not_found_returns_error() {
423 let source = CsvSource;
424 let params = QueryParams {
425 query: "/tmp/nonexistent_csv_file_stygian.csv".into(),
426 parameters: vec![],
427 limit: None,
428 };
429 let result = source.query(params).await;
430 assert!(result.is_err());
431 }
432}