Skip to main content

stygian_graph/adapters/
csv_source.rs

1//! CSV / TSV [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) adapter.
2//!
3//! Reads structured data from CSV or TSV files, returning rows as JSON objects
4//! with column names from the header row as keys.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::csv_source::CsvSource;
10//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
11//!
12//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
13//! let source = CsvSource::default();
14//! let params = QueryParams {
15//!     query: "/data/users.csv".into(),
16//!     parameters: vec![],
17//!     limit: Some(100),
18//! };
19//! let rows = source.query(params).await.unwrap();
20//! # });
21//! ```
22
23use async_trait::async_trait;
24use serde_json::{Map, Value, json};
25use std::io::Read;
26use std::path::Path;
27
28use crate::domain::error::{Result, ServiceError, StygianError};
29use crate::ports::data_source::{DataSourcePort, QueryParams};
30use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
31
32// ─── Configuration ────────────────────────────────────────────────────────────
33
34/// Delimiter byte for CSV parsing.
35#[derive(Debug, Clone, Copy)]
36pub enum Delimiter {
37    /// Comma (default CSV)
38    Comma,
39    /// Tab (TSV)
40    Tab,
41    /// Pipe
42    Pipe,
43    /// Semicolon
44    Semicolon,
45    /// Custom byte
46    Custom(u8),
47}
48
49impl Delimiter {
50    const fn as_byte(self) -> u8 {
51        match self {
52            Self::Comma => b',',
53            Self::Tab => b'\t',
54            Self::Pipe => b'|',
55            Self::Semicolon => b';',
56            Self::Custom(b) => b,
57        }
58    }
59
60    fn from_str(s: &str) -> Self {
61        match s {
62            "tab" | "tsv" | "\t" => Self::Tab,
63            "pipe" | "|" => Self::Pipe,
64            "semicolon" | ";" => Self::Semicolon,
65            "comma" | "," => Self::Comma,
66            _ => {
67                // Try single-char custom delimiter
68                let bytes = s.as_bytes();
69                if bytes.len() == 1 {
70                    Self::Custom(bytes.first().copied().unwrap_or(b','))
71                } else {
72                    Self::Comma
73                }
74            }
75        }
76    }
77}
78
79// ─── Adapter ──────────────────────────────────────────────────────────────────
80
81/// CSV / TSV data source adapter.
82///
83/// Reads CSV or TSV files using streaming iteration (no full-file buffering).
84/// Supports configurable delimiters and optional headers.
85#[derive(Default)]
86pub struct CsvSource;
87
88impl CsvSource {
89    /// Parse CSV data from a reader, returning JSON rows.
90    ///
91    /// # Arguments
92    ///
93    /// * `reader` — any `Read` source
94    /// * `delimiter` — field separator byte
95    /// * `has_headers` — whether the first row contains column names
96    /// * `skip` — number of data rows to skip
97    /// * `limit` — maximum number of data rows to return
98    fn parse_reader<R: Read>(
99        reader: R,
100        delimiter: Delimiter,
101        has_headers: bool,
102        skip: usize,
103        limit: Option<u64>,
104    ) -> Result<Vec<Value>> {
105        let mut csv_reader = csv::ReaderBuilder::new()
106            .delimiter(delimiter.as_byte())
107            .has_headers(has_headers)
108            .flexible(true)
109            .from_reader(reader);
110
111        let headers: Vec<String> = if has_headers {
112            let hdrs = csv_reader.headers().map_err(|e| {
113                StygianError::Service(ServiceError::InvalidResponse(format!(
114                    "CSV header parse error: {e}"
115                )))
116            })?;
117            hdrs.iter().map(|h| strip_bom(h).to_string()).collect()
118        } else {
119            Vec::new()
120        };
121
122        let mut rows = Vec::new();
123        let mut skipped = 0;
124
125        for result in csv_reader.records() {
126            let record = result.map_err(|e| {
127                StygianError::Service(ServiceError::InvalidResponse(format!(
128                    "CSV record parse error: {e}"
129                )))
130            })?;
131
132            if skipped < skip {
133                skipped += 1;
134                continue;
135            }
136
137            let mut map = Map::new();
138            for (i, field) in record.iter().enumerate() {
139                let key = if headers.is_empty() {
140                    format!("column_{i}")
141                } else {
142                    headers
143                        .get(i)
144                        .cloned()
145                        .unwrap_or_else(|| format!("column_{i}"))
146                };
147                map.insert(key, Value::String(field.to_string()));
148            }
149            let row = Value::Object(map);
150
151            rows.push(row);
152
153            if let Some(max) = limit
154                && rows.len() as u64 >= max
155            {
156                break;
157            }
158        }
159
160        Ok(rows)
161    }
162}
163
164/// Strip UTF-8 BOM (U+FEFF) from the beginning of a string.
165fn strip_bom(s: &str) -> &str {
166    s.strip_prefix('\u{FEFF}').unwrap_or(s)
167}
168
169/// Extract `delimiter`, `skip`, `limit`, and `has_headers` from params.
170fn extract_csv_params(params: &Value) -> (Delimiter, usize, Option<u64>, bool) {
171    let delimiter = params
172        .get("delimiter")
173        .and_then(serde_json::Value::as_str)
174        .map_or(Delimiter::Comma, Delimiter::from_str);
175
176    let skip_u64 = params
177        .get("skip")
178        .and_then(serde_json::Value::as_u64)
179        .unwrap_or(0);
180    let skip = usize::try_from(skip_u64).unwrap_or(usize::MAX);
181
182    let limit = params.get("limit").and_then(serde_json::Value::as_u64);
183
184    let has_headers = params
185        .get("has_headers")
186        .and_then(serde_json::Value::as_bool)
187        .unwrap_or(true);
188
189    (delimiter, skip, limit, has_headers)
190}
191
192// ─── DataSourcePort ───────────────────────────────────────────────────────────
193
194#[async_trait]
195impl DataSourcePort for CsvSource {
196    /// Query a CSV file.
197    ///
198    /// `params.query` is the file path. `params.parameters[0]` can be a JSON
199    /// object with `delimiter`, `skip`, `has_headers` keys.
200    async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
201        let path = Path::new(&params.query);
202        if !path.exists() {
203            return Err(StygianError::Service(ServiceError::Unavailable(format!(
204                "CSV file not found: {}",
205                params.query
206            ))));
207        }
208
209        let extra = params
210            .parameters
211            .first()
212            .cloned()
213            .unwrap_or_else(|| json!({}));
214        let (delimiter, skip, _, has_headers) = extract_csv_params(&extra);
215        let limit = params.limit;
216
217        let file = std::fs::File::open(path).map_err(|e| {
218            StygianError::Service(ServiceError::Unavailable(format!(
219                "failed to open CSV file: {e}"
220            )))
221        })?;
222
223        // Offload blocking I/O to a spawn_blocking task
224        tokio::task::spawn_blocking(move || {
225            Self::parse_reader(file, delimiter, has_headers, skip, limit)
226        })
227        .await
228        .map_err(|e| {
229            StygianError::Service(ServiceError::Unavailable(format!(
230                "CSV parse task failed: {e}"
231            )))
232        })?
233    }
234
235    async fn healthcheck(&self) -> Result<()> {
236        Ok(()) // File-based — always "healthy"
237    }
238
239    fn source_name(&self) -> &'static str {
240        "csv"
241    }
242}
243
244// ─── ScrapingService ──────────────────────────────────────────────────────────
245
246#[async_trait]
247impl ScrapingService for CsvSource {
248    /// Parse a CSV file, returning rows as JSON array.
249    ///
250    /// `input.url` is the file path. `input.params` can contain:
251    /// * `delimiter` — "comma", "tab", "pipe", "semicolon", or single char
252    /// * `skip` — number of data rows to skip
253    /// * `limit` — max rows to return
254    /// * `has_headers` — boolean (default true)
255    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
256        let path = Path::new(&input.url);
257        if !path.exists() {
258            return Err(StygianError::Service(ServiceError::Unavailable(format!(
259                "CSV file not found: {}",
260                input.url
261            ))));
262        }
263
264        let (delimiter, skip, limit, has_headers) = extract_csv_params(&input.params);
265
266        let file = std::fs::File::open(path).map_err(|e| {
267            StygianError::Service(ServiceError::Unavailable(format!(
268                "failed to open CSV file: {e}"
269            )))
270        })?;
271
272        let rows = tokio::task::spawn_blocking(move || {
273            Self::parse_reader(file, delimiter, has_headers, skip, limit)
274        })
275        .await
276        .map_err(|e| {
277            StygianError::Service(ServiceError::Unavailable(format!(
278                "CSV parse task failed: {e}"
279            )))
280        })??;
281
282        let count = rows.len();
283        let data = serde_json::to_string(&rows).map_err(|e| {
284            StygianError::Service(ServiceError::InvalidResponse(format!(
285                "CSV serialization failed: {e}"
286            )))
287        })?;
288
289        Ok(ServiceOutput {
290            data,
291            metadata: json!({
292                "source": "csv",
293                "row_count": count,
294                "source_path": input.url,
295            }),
296        })
297    }
298
299    fn name(&self) -> &'static str {
300        "csv"
301    }
302}
303
304// ─── Tests ────────────────────────────────────────────────────────────────────
305
306#[cfg(test)]
307#[allow(clippy::expect_used, clippy::indexing_slicing)]
308mod tests {
309    use super::*;
310    use std::io::Cursor;
311
312    const CSV_DATA: &str = "name,age,city\nAlice,30,NYC\nBob,25,SF\nCharlie,35,LA\n";
313
314    #[test]
315    fn parse_csv_with_headers() {
316        let reader = Cursor::new(CSV_DATA);
317        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
318        assert_eq!(rows.len(), 3);
319        assert_eq!(rows[0]["name"], "Alice");
320        assert_eq!(rows[0]["age"], "30");
321        assert_eq!(rows[0]["city"], "NYC");
322        assert_eq!(rows[2]["name"], "Charlie");
323    }
324
325    #[test]
326    fn parse_tsv() {
327        let tsv = "name\tage\nAlice\t30\nBob\t25\n";
328        let reader = Cursor::new(tsv);
329        let rows = CsvSource::parse_reader(reader, Delimiter::Tab, true, 0, None).expect("parse");
330        assert_eq!(rows.len(), 2);
331        assert_eq!(rows[0]["name"], "Alice");
332        assert_eq!(rows[1]["age"], "25");
333    }
334
335    #[test]
336    fn headerless_csv_generates_column_keys() {
337        let csv = "Alice,30,NYC\nBob,25,SF\n";
338        let reader = Cursor::new(csv);
339        let rows =
340            CsvSource::parse_reader(reader, Delimiter::Comma, false, 0, None).expect("parse");
341        assert_eq!(rows.len(), 2);
342        assert_eq!(rows[0]["column_0"], "Alice");
343        assert_eq!(rows[0]["column_1"], "30");
344        assert_eq!(rows[0]["column_2"], "NYC");
345    }
346
347    #[test]
348    fn row_limit() {
349        let reader = Cursor::new(CSV_DATA);
350        let rows =
351            CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, Some(2)).expect("parse");
352        assert_eq!(rows.len(), 2);
353        assert_eq!(rows[0]["name"], "Alice");
354        assert_eq!(rows[1]["name"], "Bob");
355    }
356
357    #[test]
358    fn skip_rows() {
359        let reader = Cursor::new(CSV_DATA);
360        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, None).expect("parse");
361        assert_eq!(rows.len(), 2);
362        assert_eq!(rows[0]["name"], "Bob");
363        assert_eq!(rows[1]["name"], "Charlie");
364    }
365
366    #[test]
367    fn skip_and_limit() {
368        let reader = Cursor::new(CSV_DATA);
369        let rows =
370            CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, Some(1)).expect("parse");
371        assert_eq!(rows.len(), 1);
372        assert_eq!(rows[0]["name"], "Bob");
373    }
374
375    #[test]
376    fn strip_utf8_bom() {
377        let csv = "\u{FEFF}name,age\nAlice,30\n";
378        let reader = Cursor::new(csv);
379        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
380        assert_eq!(rows.len(), 1);
381        // Key should NOT have the BOM
382        assert!(rows[0].get("name").is_some(), "BOM should be stripped");
383    }
384
385    #[test]
386    fn pipe_delimiter() {
387        let csv = "a|b|c\n1|2|3\n";
388        let reader = Cursor::new(csv);
389        let rows = CsvSource::parse_reader(reader, Delimiter::Pipe, true, 0, None).expect("parse");
390        assert_eq!(rows.len(), 1);
391        assert_eq!(rows[0]["a"], "1");
392        assert_eq!(rows[0]["b"], "2");
393    }
394
395    #[test]
396    fn semicolon_delimiter() {
397        let csv = "x;y\n10;20\n";
398        let reader = Cursor::new(csv);
399        let rows =
400            CsvSource::parse_reader(reader, Delimiter::Semicolon, true, 0, None).expect("parse");
401        assert_eq!(rows.len(), 1);
402        assert_eq!(rows[0]["x"], "10");
403    }
404
405    #[test]
406    fn empty_csv_returns_empty() {
407        let csv = "name,age\n";
408        let reader = Cursor::new(csv);
409        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
410        assert!(rows.is_empty());
411    }
412
413    #[test]
414    fn delimiter_from_str_parsing() {
415        assert_eq!(Delimiter::from_str("tab").as_byte(), b'\t');
416        assert_eq!(Delimiter::from_str("tsv").as_byte(), b'\t');
417        assert_eq!(Delimiter::from_str("pipe").as_byte(), b'|');
418        assert_eq!(Delimiter::from_str("semicolon").as_byte(), b';');
419        assert_eq!(Delimiter::from_str("comma").as_byte(), b',');
420        assert_eq!(Delimiter::from_str(",").as_byte(), b',');
421        assert_eq!(Delimiter::from_str("unknown").as_byte(), b','); // default
422    }
423
424    #[tokio::test]
425    async fn file_not_found_returns_error() {
426        let source = CsvSource;
427        let params = QueryParams {
428            query: "/tmp/nonexistent_csv_file_stygian.csv".into(),
429            parameters: vec![],
430            limit: None,
431        };
432        let result = source.query(params).await;
433        assert!(result.is_err());
434    }
435}