1use async_trait::async_trait;
24use serde_json::{Map, Value, json};
25use std::io::Read;
26use std::path::Path;
27
28use crate::domain::error::{Result, ServiceError, StygianError};
29use crate::ports::data_source::{DataSourcePort, QueryParams};
30use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
31
32#[derive(Debug, Clone, Copy)]
36pub enum Delimiter {
37 Comma,
39 Tab,
41 Pipe,
43 Semicolon,
45 Custom(u8),
47}
48
49impl Delimiter {
50 const fn as_byte(self) -> u8 {
51 match self {
52 Self::Comma => b',',
53 Self::Tab => b'\t',
54 Self::Pipe => b'|',
55 Self::Semicolon => b';',
56 Self::Custom(b) => b,
57 }
58 }
59
60 fn from_str(s: &str) -> Self {
61 match s {
62 "tab" | "tsv" | "\t" => Self::Tab,
63 "pipe" | "|" => Self::Pipe,
64 "semicolon" | ";" => Self::Semicolon,
65 "comma" | "," => Self::Comma,
66 _ => {
67 let bytes = s.as_bytes();
69 if bytes.len() == 1 {
70 Self::Custom(bytes.first().copied().unwrap_or(b','))
71 } else {
72 Self::Comma
73 }
74 }
75 }
76 }
77}
78
79#[derive(Default)]
86pub struct CsvSource;
87
88impl CsvSource {
89 fn parse_reader<R: Read>(
99 reader: R,
100 delimiter: Delimiter,
101 has_headers: bool,
102 skip: usize,
103 limit: Option<u64>,
104 ) -> Result<Vec<Value>> {
105 let mut csv_reader = csv::ReaderBuilder::new()
106 .delimiter(delimiter.as_byte())
107 .has_headers(has_headers)
108 .flexible(true)
109 .from_reader(reader);
110
111 let headers: Vec<String> = if has_headers {
112 let hdrs = csv_reader.headers().map_err(|e| {
113 StygianError::Service(ServiceError::InvalidResponse(format!(
114 "CSV header parse error: {e}"
115 )))
116 })?;
117 hdrs.iter().map(|h| strip_bom(h).to_string()).collect()
118 } else {
119 Vec::new()
120 };
121
122 let mut rows = Vec::new();
123 let mut skipped = 0;
124
125 for result in csv_reader.records() {
126 let record = result.map_err(|e| {
127 StygianError::Service(ServiceError::InvalidResponse(format!(
128 "CSV record parse error: {e}"
129 )))
130 })?;
131
132 if skipped < skip {
133 skipped += 1;
134 continue;
135 }
136
137 let mut map = Map::new();
138 for (i, field) in record.iter().enumerate() {
139 let key = if headers.is_empty() {
140 format!("column_{i}")
141 } else {
142 headers
143 .get(i)
144 .cloned()
145 .unwrap_or_else(|| format!("column_{i}"))
146 };
147 map.insert(key, Value::String(field.to_string()));
148 }
149 let row = Value::Object(map);
150
151 rows.push(row);
152
153 if let Some(max) = limit
154 && rows.len() as u64 >= max
155 {
156 break;
157 }
158 }
159
160 Ok(rows)
161 }
162}
163
164fn strip_bom(s: &str) -> &str {
166 s.strip_prefix('\u{FEFF}').unwrap_or(s)
167}
168
169fn extract_csv_params(params: &Value) -> (Delimiter, usize, Option<u64>, bool) {
171 let delimiter = params
172 .get("delimiter")
173 .and_then(serde_json::Value::as_str)
174 .map_or(Delimiter::Comma, Delimiter::from_str);
175
176 let skip_u64 = params
177 .get("skip")
178 .and_then(serde_json::Value::as_u64)
179 .unwrap_or(0);
180 let skip = usize::try_from(skip_u64).unwrap_or(usize::MAX);
181
182 let limit = params.get("limit").and_then(serde_json::Value::as_u64);
183
184 let has_headers = params
185 .get("has_headers")
186 .and_then(serde_json::Value::as_bool)
187 .unwrap_or(true);
188
189 (delimiter, skip, limit, has_headers)
190}
191
192#[async_trait]
195impl DataSourcePort for CsvSource {
196 async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
201 let path = Path::new(¶ms.query);
202 if !path.exists() {
203 return Err(StygianError::Service(ServiceError::Unavailable(format!(
204 "CSV file not found: {}",
205 params.query
206 ))));
207 }
208
209 let extra = params
210 .parameters
211 .first()
212 .cloned()
213 .unwrap_or_else(|| json!({}));
214 let (delimiter, skip, _, has_headers) = extract_csv_params(&extra);
215 let limit = params.limit;
216
217 let file = std::fs::File::open(path).map_err(|e| {
218 StygianError::Service(ServiceError::Unavailable(format!(
219 "failed to open CSV file: {e}"
220 )))
221 })?;
222
223 tokio::task::spawn_blocking(move || {
225 Self::parse_reader(file, delimiter, has_headers, skip, limit)
226 })
227 .await
228 .map_err(|e| {
229 StygianError::Service(ServiceError::Unavailable(format!(
230 "CSV parse task failed: {e}"
231 )))
232 })?
233 }
234
235 async fn healthcheck(&self) -> Result<()> {
236 Ok(()) }
238
239 fn source_name(&self) -> &'static str {
240 "csv"
241 }
242}
243
244#[async_trait]
247impl ScrapingService for CsvSource {
248 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
256 let path = Path::new(&input.url);
257 if !path.exists() {
258 return Err(StygianError::Service(ServiceError::Unavailable(format!(
259 "CSV file not found: {}",
260 input.url
261 ))));
262 }
263
264 let (delimiter, skip, limit, has_headers) = extract_csv_params(&input.params);
265
266 let file = std::fs::File::open(path).map_err(|e| {
267 StygianError::Service(ServiceError::Unavailable(format!(
268 "failed to open CSV file: {e}"
269 )))
270 })?;
271
272 let rows = tokio::task::spawn_blocking(move || {
273 Self::parse_reader(file, delimiter, has_headers, skip, limit)
274 })
275 .await
276 .map_err(|e| {
277 StygianError::Service(ServiceError::Unavailable(format!(
278 "CSV parse task failed: {e}"
279 )))
280 })??;
281
282 let count = rows.len();
283 let data = serde_json::to_string(&rows).map_err(|e| {
284 StygianError::Service(ServiceError::InvalidResponse(format!(
285 "CSV serialization failed: {e}"
286 )))
287 })?;
288
289 Ok(ServiceOutput {
290 data,
291 metadata: json!({
292 "source": "csv",
293 "row_count": count,
294 "source_path": input.url,
295 }),
296 })
297 }
298
299 fn name(&self) -> &'static str {
300 "csv"
301 }
302}
303
304#[cfg(test)]
307#[allow(clippy::expect_used, clippy::indexing_slicing)]
308mod tests {
309 use super::*;
310 use std::io::Cursor;
311
312 const CSV_DATA: &str = "name,age,city\nAlice,30,NYC\nBob,25,SF\nCharlie,35,LA\n";
313
314 #[test]
315 fn parse_csv_with_headers() {
316 let reader = Cursor::new(CSV_DATA);
317 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
318 assert_eq!(rows.len(), 3);
319 assert_eq!(rows[0]["name"], "Alice");
320 assert_eq!(rows[0]["age"], "30");
321 assert_eq!(rows[0]["city"], "NYC");
322 assert_eq!(rows[2]["name"], "Charlie");
323 }
324
325 #[test]
326 fn parse_tsv() {
327 let tsv = "name\tage\nAlice\t30\nBob\t25\n";
328 let reader = Cursor::new(tsv);
329 let rows = CsvSource::parse_reader(reader, Delimiter::Tab, true, 0, None).expect("parse");
330 assert_eq!(rows.len(), 2);
331 assert_eq!(rows[0]["name"], "Alice");
332 assert_eq!(rows[1]["age"], "25");
333 }
334
335 #[test]
336 fn headerless_csv_generates_column_keys() {
337 let csv = "Alice,30,NYC\nBob,25,SF\n";
338 let reader = Cursor::new(csv);
339 let rows =
340 CsvSource::parse_reader(reader, Delimiter::Comma, false, 0, None).expect("parse");
341 assert_eq!(rows.len(), 2);
342 assert_eq!(rows[0]["column_0"], "Alice");
343 assert_eq!(rows[0]["column_1"], "30");
344 assert_eq!(rows[0]["column_2"], "NYC");
345 }
346
347 #[test]
348 fn row_limit() {
349 let reader = Cursor::new(CSV_DATA);
350 let rows =
351 CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, Some(2)).expect("parse");
352 assert_eq!(rows.len(), 2);
353 assert_eq!(rows[0]["name"], "Alice");
354 assert_eq!(rows[1]["name"], "Bob");
355 }
356
357 #[test]
358 fn skip_rows() {
359 let reader = Cursor::new(CSV_DATA);
360 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, None).expect("parse");
361 assert_eq!(rows.len(), 2);
362 assert_eq!(rows[0]["name"], "Bob");
363 assert_eq!(rows[1]["name"], "Charlie");
364 }
365
366 #[test]
367 fn skip_and_limit() {
368 let reader = Cursor::new(CSV_DATA);
369 let rows =
370 CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, Some(1)).expect("parse");
371 assert_eq!(rows.len(), 1);
372 assert_eq!(rows[0]["name"], "Bob");
373 }
374
375 #[test]
376 fn strip_utf8_bom() {
377 let csv = "\u{FEFF}name,age\nAlice,30\n";
378 let reader = Cursor::new(csv);
379 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
380 assert_eq!(rows.len(), 1);
381 assert!(rows[0].get("name").is_some(), "BOM should be stripped");
383 }
384
385 #[test]
386 fn pipe_delimiter() {
387 let csv = "a|b|c\n1|2|3\n";
388 let reader = Cursor::new(csv);
389 let rows = CsvSource::parse_reader(reader, Delimiter::Pipe, true, 0, None).expect("parse");
390 assert_eq!(rows.len(), 1);
391 assert_eq!(rows[0]["a"], "1");
392 assert_eq!(rows[0]["b"], "2");
393 }
394
395 #[test]
396 fn semicolon_delimiter() {
397 let csv = "x;y\n10;20\n";
398 let reader = Cursor::new(csv);
399 let rows =
400 CsvSource::parse_reader(reader, Delimiter::Semicolon, true, 0, None).expect("parse");
401 assert_eq!(rows.len(), 1);
402 assert_eq!(rows[0]["x"], "10");
403 }
404
405 #[test]
406 fn empty_csv_returns_empty() {
407 let csv = "name,age\n";
408 let reader = Cursor::new(csv);
409 let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
410 assert!(rows.is_empty());
411 }
412
413 #[test]
414 fn delimiter_from_str_parsing() {
415 assert_eq!(Delimiter::from_str("tab").as_byte(), b'\t');
416 assert_eq!(Delimiter::from_str("tsv").as_byte(), b'\t');
417 assert_eq!(Delimiter::from_str("pipe").as_byte(), b'|');
418 assert_eq!(Delimiter::from_str("semicolon").as_byte(), b';');
419 assert_eq!(Delimiter::from_str("comma").as_byte(), b',');
420 assert_eq!(Delimiter::from_str(",").as_byte(), b',');
421 assert_eq!(Delimiter::from_str("unknown").as_byte(), b','); }
423
424 #[tokio::test]
425 async fn file_not_found_returns_error() {
426 let source = CsvSource;
427 let params = QueryParams {
428 query: "/tmp/nonexistent_csv_file_stygian.csv".into(),
429 parameters: vec![],
430 limit: None,
431 };
432 let result = source.query(params).await;
433 assert!(result.is_err());
434 }
435}