Skip to main content

virtual_frame/
csv.rs

1//! CSV ingestion: `CsvConfig`, `CsvReader`, and `StreamingCsvProcessor`.
2//!
3//! Provides CSV parsing into `DataFrame` and streaming aggregation
4//! (sum, min/max) without materializing the full frame.
5
6use crate::column::Column;
7use crate::dataframe::{DataFrame, DataError};
8
9// ── CsvConfig ───────────────────────────────────────────────────────────────
10
11/// Configuration for `CsvReader`.
12#[derive(Debug, Clone)]
13pub struct CsvConfig {
14    /// Field delimiter byte. Default: `b','`.
15    pub delimiter: u8,
16    /// Whether the first row is a header. Default: `true`.
17    pub has_header: bool,
18    /// Maximum number of data rows to read. `None` = read all.
19    pub max_rows: Option<usize>,
20    /// Trim ASCII whitespace from field values before type inference. Default: `true`.
21    pub trim_whitespace: bool,
22}
23
24impl Default for CsvConfig {
25    fn default() -> Self {
26        CsvConfig {
27            delimiter: b',',
28            has_header: true,
29            max_rows: None,
30            trim_whitespace: true,
31        }
32    }
33}
34
35// ── CsvReader ───────────────────────────────────────────────────────────────
36
37/// Zero-copy CSV parser for byte slices.
38///
39/// Fields are referenced as sub-slices of the original input and parsed in a
40/// single pass. Type inference is performed from the first data row.
41pub struct CsvReader {
42    config: CsvConfig,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46enum InferredType {
47    Int,
48    Float,
49    Bool,
50    Str,
51}
52
53fn infer_type(s: &str) -> InferredType {
54    let t = s.trim();
55    if t == "true" || t == "false" || t == "1" || t == "0" {
56        return InferredType::Bool;
57    }
58    let digits = t.strip_prefix('-').unwrap_or(t);
59    if !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit()) {
60        return InferredType::Int;
61    }
62    let no_sign = t.strip_prefix('-').unwrap_or(t);
63    let dot_count = no_sign.chars().filter(|&c| c == '.').count();
64    if dot_count == 1 {
65        let without_dot: String = no_sign.chars().filter(|&c| c != '.').collect();
66        if !without_dot.is_empty() && without_dot.bytes().all(|b| b.is_ascii_digit()) {
67            return InferredType::Float;
68        }
69    }
70    if t.parse::<f64>().is_ok() {
71        return InferredType::Float;
72    }
73    InferredType::Str
74}
75
76fn split_fields<'a>(row: &'a [u8], delimiter: u8) -> Vec<&'a str> {
77    let mut fields = Vec::new();
78    let mut start = 0usize;
79    for i in 0..row.len() {
80        if row[i] == delimiter {
81            let field = std::str::from_utf8(&row[start..i]).unwrap_or("");
82            fields.push(field);
83            start = i + 1;
84        }
85    }
86    let tail = &row[start..];
87    let tail = tail.strip_suffix(b"\r").unwrap_or(tail);
88    let field = std::str::from_utf8(tail).unwrap_or("");
89    fields.push(field);
90    fields
91}
92
93impl CsvReader {
94    /// Create a new `CsvReader` with the given configuration.
95    pub fn new(config: CsvConfig) -> Self {
96        CsvReader { config }
97    }
98
99    /// Parse a CSV byte slice into a `DataFrame`.
100    pub fn parse(&self, input: &[u8]) -> Result<DataFrame, DataError> {
101        if input.is_empty() {
102            return Ok(DataFrame::new());
103        }
104
105        let rows: Vec<&[u8]> = input
106            .split(|&b| b == b'\n')
107            .filter(|r| !r.is_empty() && *r != b"\r")
108            .collect();
109
110        if rows.is_empty() {
111            return Ok(DataFrame::new());
112        }
113
114        let delim = self.config.delimiter;
115
116        let (header_names, data_rows) = if self.config.has_header {
117            let names: Vec<String> = split_fields(rows[0], delim)
118                .into_iter()
119                .map(|s| {
120                    if self.config.trim_whitespace {
121                        s.trim().to_string()
122                    } else {
123                        s.to_string()
124                    }
125                })
126                .collect();
127            (names, &rows[1..])
128        } else {
129            let ncols = split_fields(rows[0], delim).len();
130            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
131            (names, &rows[..])
132        };
133
134        let ncols = header_names.len();
135        if ncols == 0 {
136            return Ok(DataFrame::new());
137        }
138
139        let data_rows = if let Some(max) = self.config.max_rows {
140            &data_rows[..data_rows.len().min(max)]
141        } else {
142            data_rows
143        };
144
145        if data_rows.is_empty() {
146            let columns: Vec<(String, Column)> = header_names
147                .into_iter()
148                .map(|name| (name, Column::Str(Vec::new())))
149                .collect();
150            return DataFrame::from_columns(columns);
151        }
152
153        let first_fields = split_fields(data_rows[0], delim);
154        let mut col_types: Vec<InferredType> = first_fields
155            .iter()
156            .map(|s| {
157                let s = if self.config.trim_whitespace {
158                    s.trim()
159                } else {
160                    *s
161                };
162                infer_type(s)
163            })
164            .collect();
165
166        while col_types.len() < ncols {
167            col_types.push(InferredType::Str);
168        }
169
170        let nrows = data_rows.len();
171        let mut int_bufs: Vec<Option<Vec<i64>>> = vec![None; ncols];
172        let mut float_bufs: Vec<Option<Vec<f64>>> = vec![None; ncols];
173        let mut bool_bufs: Vec<Option<Vec<bool>>> = vec![None; ncols];
174        let mut str_bufs: Vec<Option<Vec<String>>> = vec![None; ncols];
175
176        for (i, &t) in col_types.iter().enumerate() {
177            match t {
178                InferredType::Int => int_bufs[i] = Some(Vec::with_capacity(nrows)),
179                InferredType::Float => float_bufs[i] = Some(Vec::with_capacity(nrows)),
180                InferredType::Bool => bool_bufs[i] = Some(Vec::with_capacity(nrows)),
181                InferredType::Str => str_bufs[i] = Some(Vec::with_capacity(nrows)),
182            }
183        }
184
185        for &row_bytes in data_rows.iter() {
186            let fields = split_fields(row_bytes, delim);
187            for col_idx in 0..ncols {
188                let raw = if col_idx < fields.len() {
189                    fields[col_idx]
190                } else {
191                    ""
192                };
193                let s = if self.config.trim_whitespace {
194                    raw.trim()
195                } else {
196                    raw
197                };
198
199                match col_types[col_idx] {
200                    InferredType::Int => {
201                        let v = s.parse::<i64>().unwrap_or(0);
202                        int_bufs[col_idx].as_mut().unwrap().push(v);
203                    }
204                    InferredType::Float => {
205                        let v = s.parse::<f64>().unwrap_or(0.0);
206                        float_bufs[col_idx].as_mut().unwrap().push(v);
207                    }
208                    InferredType::Bool => {
209                        let v = matches!(s, "true" | "1");
210                        bool_bufs[col_idx].as_mut().unwrap().push(v);
211                    }
212                    InferredType::Str => {
213                        str_bufs[col_idx].as_mut().unwrap().push(s.to_string());
214                    }
215                }
216            }
217        }
218
219        let mut columns: Vec<(String, Column)> = Vec::with_capacity(ncols);
220        for (i, name) in header_names.into_iter().enumerate() {
221            let col = match col_types[i] {
222                InferredType::Int => Column::Int(int_bufs[i].take().unwrap()),
223                InferredType::Float => Column::Float(float_bufs[i].take().unwrap()),
224                InferredType::Bool => Column::Bool(bool_bufs[i].take().unwrap()),
225                InferredType::Str => Column::Str(str_bufs[i].take().unwrap()),
226            };
227            columns.push((name, col));
228        }
229
230        DataFrame::from_columns(columns)
231    }
232}
233
234// ── StreamingCsvProcessor ───────────────────────────────────────────────────
235
236/// Streaming CSV processor that visits rows one at a time without
237/// materializing the full DataFrame. Memory usage is O(ncols).
238pub struct StreamingCsvProcessor {
239    config: CsvConfig,
240}
241
242impl StreamingCsvProcessor {
243    /// Create a new streaming CSV processor.
244    pub fn new(config: CsvConfig) -> Self {
245        StreamingCsvProcessor { config }
246    }
247
248    /// Stream through CSV, accumulating per-column sums using Kahan summation.
249    ///
250    /// Returns `(column_names, sums_per_col, row_count)`.
251    pub fn sum_columns(&self, input: &[u8]) -> Result<(Vec<String>, Vec<f64>, usize), DataError> {
252        if input.is_empty() {
253            return Ok((vec![], vec![], 0));
254        }
255
256        let rows: Vec<&[u8]> = input
257            .split(|&b| b == b'\n')
258            .filter(|r| !r.is_empty() && *r != b"\r")
259            .collect();
260
261        if rows.is_empty() {
262            return Ok((vec![], vec![], 0));
263        }
264
265        let delim = self.config.delimiter;
266        let (header_names, data_rows) = if self.config.has_header {
267            let names: Vec<String> = split_fields(rows[0], delim)
268                .into_iter()
269                .map(|s| s.trim().to_string())
270                .collect();
271            (names, &rows[1..])
272        } else {
273            let ncols = split_fields(rows[0], delim).len();
274            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
275            (names, &rows[..])
276        };
277
278        let ncols = header_names.len();
279        let mut sums: Vec<f64> = vec![0.0; ncols];
280        let mut comp: Vec<f64> = vec![0.0; ncols];
281        let mut row_count = 0usize;
282
283        let data_rows = if let Some(max) = self.config.max_rows {
284            &data_rows[..data_rows.len().min(max)]
285        } else {
286            data_rows
287        };
288
289        for &row_bytes in data_rows {
290            let fields = split_fields(row_bytes, delim);
291            for col_idx in 0..ncols {
292                let s = if col_idx < fields.len() {
293                    if self.config.trim_whitespace {
294                        fields[col_idx].trim()
295                    } else {
296                        fields[col_idx]
297                    }
298                } else {
299                    ""
300                };
301                let v: f64 = s.parse().unwrap_or(0.0);
302                let y = v - comp[col_idx];
303                let t = sums[col_idx] + y;
304                comp[col_idx] = (t - sums[col_idx]) - y;
305                sums[col_idx] = t;
306            }
307            row_count += 1;
308        }
309
310        Ok((header_names, sums, row_count))
311    }
312
313    /// Stream through CSV, collecting per-column min/max without materializing.
314    ///
315    /// Returns `(column_names, mins, maxs, row_count)`.
316    pub fn minmax_columns(
317        &self,
318        input: &[u8],
319    ) -> Result<(Vec<String>, Vec<f64>, Vec<f64>, usize), DataError> {
320        if input.is_empty() {
321            return Ok((vec![], vec![], vec![], 0));
322        }
323
324        let rows: Vec<&[u8]> = input
325            .split(|&b| b == b'\n')
326            .filter(|r| !r.is_empty() && *r != b"\r")
327            .collect();
328
329        if rows.is_empty() {
330            return Ok((vec![], vec![], vec![], 0));
331        }
332
333        let delim = self.config.delimiter;
334        let (header_names, data_rows) = if self.config.has_header {
335            let names: Vec<String> = split_fields(rows[0], delim)
336                .into_iter()
337                .map(|s| s.trim().to_string())
338                .collect();
339            (names, &rows[1..])
340        } else {
341            let ncols = split_fields(rows[0], delim).len();
342            let names = (0..ncols).map(|i| format!("col_{}", i)).collect();
343            (names, &rows[..])
344        };
345
346        let ncols = header_names.len();
347        let mut mins: Vec<f64> = vec![f64::INFINITY; ncols];
348        let mut maxs: Vec<f64> = vec![f64::NEG_INFINITY; ncols];
349        let mut row_count = 0usize;
350
351        let data_rows = if let Some(max) = self.config.max_rows {
352            &data_rows[..data_rows.len().min(max)]
353        } else {
354            data_rows
355        };
356
357        for &row_bytes in data_rows {
358            let fields = split_fields(row_bytes, delim);
359            for col_idx in 0..ncols {
360                let s = if col_idx < fields.len() {
361                    fields[col_idx].trim()
362                } else {
363                    ""
364                };
365                if let Ok(v) = s.parse::<f64>() {
366                    if v < mins[col_idx] {
367                        mins[col_idx] = v;
368                    }
369                    if v > maxs[col_idx] {
370                        maxs[col_idx] = v;
371                    }
372                }
373            }
374            row_count += 1;
375        }
376
377        Ok((header_names, mins, maxs, row_count))
378    }
379}
380
381// ── Tests ───────────────────────────────────────────────────────────────────
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386
387    #[test]
388    fn test_parse_basic_csv() {
389        let csv = b"name,age,score\nAlice,30,9.5\nBob,25,8.1";
390        let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
391        assert_eq!(df.nrows(), 2);
392        assert_eq!(df.ncols(), 3);
393    }
394
395    #[test]
396    fn test_parse_empty() {
397        let df = CsvReader::new(CsvConfig::default()).parse(b"").unwrap();
398        assert_eq!(df.nrows(), 0);
399    }
400
401    #[test]
402    fn test_parse_header_only() {
403        let csv = b"x,y,z\n";
404        let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
405        assert_eq!(df.nrows(), 0);
406        assert_eq!(df.ncols(), 3);
407    }
408
409    #[test]
410    fn test_parse_type_inference() {
411        let csv = b"a,b,c,d\n42,3.14,true,hello\n10,2.71,false,world";
412        let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
413        assert_eq!(df.nrows(), 2);
414        // a → Int, b → Float, c → Bool, d → Str
415        assert!(matches!(df.get_column("a"), Some(Column::Int(_))));
416        assert!(matches!(df.get_column("b"), Some(Column::Float(_))));
417        assert!(matches!(df.get_column("c"), Some(Column::Bool(_))));
418        assert!(matches!(df.get_column("d"), Some(Column::Str(_))));
419    }
420
421    #[test]
422    fn test_parse_tsv() {
423        let csv = b"x\ty\n1\t2\n3\t4";
424        let config = CsvConfig {
425            delimiter: b'\t',
426            ..Default::default()
427        };
428        let df = CsvReader::new(config).parse(csv).unwrap();
429        assert_eq!(df.nrows(), 2);
430        assert_eq!(df.ncols(), 2);
431    }
432
433    #[test]
434    fn test_parse_max_rows() {
435        let csv = b"x\n1\n2\n3\n4\n5";
436        let config = CsvConfig {
437            max_rows: Some(3),
438            ..Default::default()
439        };
440        let df = CsvReader::new(config).parse(csv).unwrap();
441        assert_eq!(df.nrows(), 3);
442    }
443
444    #[test]
445    fn test_streaming_sum() {
446        let csv = b"x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0";
447        let proc = StreamingCsvProcessor::new(CsvConfig::default());
448        let (headers, sums, count) = proc.sum_columns(csv).unwrap();
449        assert_eq!(headers, vec!["x", "y"]);
450        assert_eq!(count, 3);
451        assert!((sums[0] - 9.0).abs() < 1e-10);
452        assert!((sums[1] - 12.0).abs() < 1e-10);
453    }
454
455    #[test]
456    fn test_streaming_minmax() {
457        let csv = b"x,y\n1.0,6.0\n3.0,2.0\n5.0,4.0";
458        let proc = StreamingCsvProcessor::new(CsvConfig::default());
459        let (headers, mins, maxs, count) = proc.minmax_columns(csv).unwrap();
460        assert_eq!(headers, vec!["x", "y"]);
461        assert_eq!(count, 3);
462        assert!((mins[0] - 1.0).abs() < 1e-10);
463        assert!((maxs[0] - 5.0).abs() < 1e-10);
464        assert!((mins[1] - 2.0).abs() < 1e-10);
465        assert!((maxs[1] - 6.0).abs() < 1e-10);
466    }
467
468    #[test]
469    fn test_parse_windows_line_endings() {
470        let csv = b"a,b\r\n1,2\r\n3,4\r\n";
471        let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
472        assert_eq!(df.nrows(), 2);
473    }
474
475    #[test]
476    fn test_no_header() {
477        let csv = b"1,2,3\n4,5,6";
478        let config = CsvConfig {
479            has_header: false,
480            ..Default::default()
481        };
482        let df = CsvReader::new(config).parse(csv).unwrap();
483        assert_eq!(df.nrows(), 2);
484        assert_eq!(df.column_names(), vec!["col_0", "col_1", "col_2"]);
485    }
486}