Skip to main content

stygian_graph/adapters/
csv_source.rs

1//! CSV / TSV [`DataSourcePort`](crate::ports::data_source::DataSourcePort) and [`ScrapingService`](crate::ports::ScrapingService) adapter.
2//!
3//! Reads structured data from CSV or TSV files, returning rows as JSON objects
4//! with column names from the header row as keys.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::csv_source::CsvSource;
10//! use stygian_graph::ports::data_source::{DataSourcePort, QueryParams};
11//!
12//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
13//! let source = CsvSource::default();
14//! let params = QueryParams {
15//!     query: "/data/users.csv".into(),
16//!     parameters: vec![],
17//!     limit: Some(100),
18//! };
19//! let rows = source.query(params).await.unwrap();
20//! # });
21//! ```
22
23use async_trait::async_trait;
24use serde_json::{Map, Value, json};
25use std::io::Read;
26use std::path::Path;
27
28use crate::domain::error::{Result, ServiceError, StygianError};
29use crate::ports::data_source::{DataSourcePort, QueryParams};
30use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
31
32// ─── Configuration ────────────────────────────────────────────────────────────
33
34/// Delimiter byte for CSV parsing.
35#[derive(Debug, Clone, Copy)]
36pub enum Delimiter {
37    /// Comma (default CSV)
38    Comma,
39    /// Tab (TSV)
40    Tab,
41    /// Pipe
42    Pipe,
43    /// Semicolon
44    Semicolon,
45    /// Custom byte
46    Custom(u8),
47}
48
49impl Delimiter {
50    fn as_byte(self) -> u8 {
51        match self {
52            Self::Comma => b',',
53            Self::Tab => b'\t',
54            Self::Pipe => b'|',
55            Self::Semicolon => b';',
56            Self::Custom(b) => b,
57        }
58    }
59
60    fn from_str(s: &str) -> Self {
61        match s {
62            "tab" | "tsv" | "\t" => Self::Tab,
63            "pipe" | "|" => Self::Pipe,
64            "semicolon" | ";" => Self::Semicolon,
65            "comma" | "," => Self::Comma,
66            _ => {
67                // Try single-char custom delimiter
68                let bytes = s.as_bytes();
69                if bytes.len() == 1 {
70                    Self::Custom(bytes[0])
71                } else {
72                    Self::Comma
73                }
74            }
75        }
76    }
77}
78
79// ─── Adapter ──────────────────────────────────────────────────────────────────
80
81/// CSV / TSV data source adapter.
82///
83/// Reads CSV or TSV files using streaming iteration (no full-file buffering).
84/// Supports configurable delimiters and optional headers.
85#[derive(Default)]
86pub struct CsvSource;
87
88impl CsvSource {
89    /// Parse CSV data from a reader, returning JSON rows.
90    ///
91    /// # Arguments
92    ///
93    /// * `reader` — any `Read` source
94    /// * `delimiter` — field separator byte
95    /// * `has_headers` — whether the first row contains column names
96    /// * `skip` — number of data rows to skip
97    /// * `limit` — maximum number of data rows to return
98    fn parse_reader<R: Read>(
99        reader: R,
100        delimiter: Delimiter,
101        has_headers: bool,
102        skip: usize,
103        limit: Option<u64>,
104    ) -> Result<Vec<Value>> {
105        let mut csv_reader = csv::ReaderBuilder::new()
106            .delimiter(delimiter.as_byte())
107            .has_headers(has_headers)
108            .flexible(true)
109            .from_reader(reader);
110
111        let headers: Vec<String> = if has_headers {
112            let hdrs = csv_reader.headers().map_err(|e| {
113                StygianError::Service(ServiceError::InvalidResponse(format!(
114                    "CSV header parse error: {e}"
115                )))
116            })?;
117            hdrs.iter().map(|h| strip_bom(h).to_string()).collect()
118        } else {
119            Vec::new()
120        };
121
122        let mut rows = Vec::new();
123        let mut skipped = 0;
124
125        for result in csv_reader.records() {
126            let record = result.map_err(|e| {
127                StygianError::Service(ServiceError::InvalidResponse(format!(
128                    "CSV record parse error: {e}"
129                )))
130            })?;
131
132            if skipped < skip {
133                skipped += 1;
134                continue;
135            }
136
137            let row = if headers.is_empty() {
138                // Generate column_0, column_1, ... keys
139                let mut map = Map::new();
140                for (i, field) in record.iter().enumerate() {
141                    map.insert(format!("column_{i}"), Value::String(field.to_string()));
142                }
143                Value::Object(map)
144            } else {
145                let mut map = Map::new();
146                for (i, field) in record.iter().enumerate() {
147                    let key = headers
148                        .get(i)
149                        .cloned()
150                        .unwrap_or_else(|| format!("column_{i}"));
151                    map.insert(key, Value::String(field.to_string()));
152                }
153                Value::Object(map)
154            };
155
156            rows.push(row);
157
158            if let Some(max) = limit
159                && rows.len() as u64 >= max
160            {
161                break;
162            }
163        }
164
165        Ok(rows)
166    }
167}
168
169/// Strip UTF-8 BOM (U+FEFF) from the beginning of a string.
170fn strip_bom(s: &str) -> &str {
171    s.strip_prefix('\u{FEFF}').unwrap_or(s)
172}
173
174/// Extract delimiter, skip, limit, and has_headers from params.
175fn extract_csv_params(params: &Value) -> (Delimiter, usize, Option<u64>, bool) {
176    let delimiter = params
177        .get("delimiter")
178        .and_then(|v| v.as_str())
179        .map(Delimiter::from_str)
180        .unwrap_or(Delimiter::Comma);
181
182    let skip = params.get("skip").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
183
184    let limit = params.get("limit").and_then(|v| v.as_u64());
185
186    let has_headers = params
187        .get("has_headers")
188        .and_then(|v| v.as_bool())
189        .unwrap_or(true);
190
191    (delimiter, skip, limit, has_headers)
192}
193
194// ─── DataSourcePort ───────────────────────────────────────────────────────────
195
196#[async_trait]
197impl DataSourcePort for CsvSource {
198    /// Query a CSV file.
199    ///
200    /// `params.query` is the file path. `params.parameters[0]` can be a JSON
201    /// object with `delimiter`, `skip`, `has_headers` keys.
202    async fn query(&self, params: QueryParams) -> Result<Vec<Value>> {
203        let path = Path::new(&params.query);
204        if !path.exists() {
205            return Err(StygianError::Service(ServiceError::Unavailable(format!(
206                "CSV file not found: {}",
207                params.query
208            ))));
209        }
210
211        let extra = params.parameters.first().cloned().unwrap_or(json!({}));
212        let (delimiter, skip, _, has_headers) = extract_csv_params(&extra);
213        let limit = params.limit;
214
215        let file = std::fs::File::open(path).map_err(|e| {
216            StygianError::Service(ServiceError::Unavailable(format!(
217                "failed to open CSV file: {e}"
218            )))
219        })?;
220
221        // Offload blocking I/O to a spawn_blocking task
222        tokio::task::spawn_blocking(move || {
223            Self::parse_reader(file, delimiter, has_headers, skip, limit)
224        })
225        .await
226        .map_err(|e| {
227            StygianError::Service(ServiceError::Unavailable(format!(
228                "CSV parse task failed: {e}"
229            )))
230        })?
231    }
232
233    async fn healthcheck(&self) -> Result<()> {
234        Ok(()) // File-based — always "healthy"
235    }
236
237    fn source_name(&self) -> &str {
238        "csv"
239    }
240}
241
242// ─── ScrapingService ──────────────────────────────────────────────────────────
243
244#[async_trait]
245impl ScrapingService for CsvSource {
246    /// Parse a CSV file, returning rows as JSON array.
247    ///
248    /// `input.url` is the file path. `input.params` can contain:
249    /// * `delimiter` — "comma", "tab", "pipe", "semicolon", or single char
250    /// * `skip` — number of data rows to skip
251    /// * `limit` — max rows to return
252    /// * `has_headers` — boolean (default true)
253    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
254        let path = Path::new(&input.url);
255        if !path.exists() {
256            return Err(StygianError::Service(ServiceError::Unavailable(format!(
257                "CSV file not found: {}",
258                input.url
259            ))));
260        }
261
262        let (delimiter, skip, limit, has_headers) = extract_csv_params(&input.params);
263
264        let file = std::fs::File::open(path).map_err(|e| {
265            StygianError::Service(ServiceError::Unavailable(format!(
266                "failed to open CSV file: {e}"
267            )))
268        })?;
269
270        let rows = tokio::task::spawn_blocking(move || {
271            Self::parse_reader(file, delimiter, has_headers, skip, limit)
272        })
273        .await
274        .map_err(|e| {
275            StygianError::Service(ServiceError::Unavailable(format!(
276                "CSV parse task failed: {e}"
277            )))
278        })??;
279
280        let count = rows.len();
281        let data = serde_json::to_string(&rows).map_err(|e| {
282            StygianError::Service(ServiceError::InvalidResponse(format!(
283                "CSV serialization failed: {e}"
284            )))
285        })?;
286
287        Ok(ServiceOutput {
288            data,
289            metadata: json!({
290                "source": "csv",
291                "row_count": count,
292                "source_path": input.url,
293            }),
294        })
295    }
296
297    fn name(&self) -> &'static str {
298        "csv"
299    }
300}
301
302// ─── Tests ────────────────────────────────────────────────────────────────────
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use std::io::Cursor;
308
309    const CSV_DATA: &str = "name,age,city\nAlice,30,NYC\nBob,25,SF\nCharlie,35,LA\n";
310
311    #[test]
312    fn parse_csv_with_headers() {
313        let reader = Cursor::new(CSV_DATA);
314        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
315        assert_eq!(rows.len(), 3);
316        assert_eq!(rows[0]["name"], "Alice");
317        assert_eq!(rows[0]["age"], "30");
318        assert_eq!(rows[0]["city"], "NYC");
319        assert_eq!(rows[2]["name"], "Charlie");
320    }
321
322    #[test]
323    fn parse_tsv() {
324        let tsv = "name\tage\nAlice\t30\nBob\t25\n";
325        let reader = Cursor::new(tsv);
326        let rows = CsvSource::parse_reader(reader, Delimiter::Tab, true, 0, None).expect("parse");
327        assert_eq!(rows.len(), 2);
328        assert_eq!(rows[0]["name"], "Alice");
329        assert_eq!(rows[1]["age"], "25");
330    }
331
332    #[test]
333    fn headerless_csv_generates_column_keys() {
334        let csv = "Alice,30,NYC\nBob,25,SF\n";
335        let reader = Cursor::new(csv);
336        let rows =
337            CsvSource::parse_reader(reader, Delimiter::Comma, false, 0, None).expect("parse");
338        assert_eq!(rows.len(), 2);
339        assert_eq!(rows[0]["column_0"], "Alice");
340        assert_eq!(rows[0]["column_1"], "30");
341        assert_eq!(rows[0]["column_2"], "NYC");
342    }
343
344    #[test]
345    fn row_limit() {
346        let reader = Cursor::new(CSV_DATA);
347        let rows =
348            CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, Some(2)).expect("parse");
349        assert_eq!(rows.len(), 2);
350        assert_eq!(rows[0]["name"], "Alice");
351        assert_eq!(rows[1]["name"], "Bob");
352    }
353
354    #[test]
355    fn skip_rows() {
356        let reader = Cursor::new(CSV_DATA);
357        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, None).expect("parse");
358        assert_eq!(rows.len(), 2);
359        assert_eq!(rows[0]["name"], "Bob");
360        assert_eq!(rows[1]["name"], "Charlie");
361    }
362
363    #[test]
364    fn skip_and_limit() {
365        let reader = Cursor::new(CSV_DATA);
366        let rows =
367            CsvSource::parse_reader(reader, Delimiter::Comma, true, 1, Some(1)).expect("parse");
368        assert_eq!(rows.len(), 1);
369        assert_eq!(rows[0]["name"], "Bob");
370    }
371
372    #[test]
373    fn strip_utf8_bom() {
374        let csv = "\u{FEFF}name,age\nAlice,30\n";
375        let reader = Cursor::new(csv);
376        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
377        assert_eq!(rows.len(), 1);
378        // Key should NOT have the BOM
379        assert!(rows[0].get("name").is_some(), "BOM should be stripped");
380    }
381
382    #[test]
383    fn pipe_delimiter() {
384        let csv = "a|b|c\n1|2|3\n";
385        let reader = Cursor::new(csv);
386        let rows = CsvSource::parse_reader(reader, Delimiter::Pipe, true, 0, None).expect("parse");
387        assert_eq!(rows.len(), 1);
388        assert_eq!(rows[0]["a"], "1");
389        assert_eq!(rows[0]["b"], "2");
390    }
391
392    #[test]
393    fn semicolon_delimiter() {
394        let csv = "x;y\n10;20\n";
395        let reader = Cursor::new(csv);
396        let rows =
397            CsvSource::parse_reader(reader, Delimiter::Semicolon, true, 0, None).expect("parse");
398        assert_eq!(rows.len(), 1);
399        assert_eq!(rows[0]["x"], "10");
400    }
401
402    #[test]
403    fn empty_csv_returns_empty() {
404        let csv = "name,age\n";
405        let reader = Cursor::new(csv);
406        let rows = CsvSource::parse_reader(reader, Delimiter::Comma, true, 0, None).expect("parse");
407        assert!(rows.is_empty());
408    }
409
410    #[test]
411    fn delimiter_from_str_parsing() {
412        assert_eq!(Delimiter::from_str("tab").as_byte(), b'\t');
413        assert_eq!(Delimiter::from_str("tsv").as_byte(), b'\t');
414        assert_eq!(Delimiter::from_str("pipe").as_byte(), b'|');
415        assert_eq!(Delimiter::from_str("semicolon").as_byte(), b';');
416        assert_eq!(Delimiter::from_str("comma").as_byte(), b',');
417        assert_eq!(Delimiter::from_str(",").as_byte(), b',');
418        assert_eq!(Delimiter::from_str("unknown").as_byte(), b','); // default
419    }
420
421    #[tokio::test]
422    async fn file_not_found_returns_error() {
423        let source = CsvSource;
424        let params = QueryParams {
425            query: "/tmp/nonexistent_csv_file_stygian.csv".into(),
426            parameters: vec![],
427            limit: None,
428        };
429        let result = source.query(params).await;
430        assert!(result.is_err());
431    }
432}