Skip to main content

cjc_data/
csv.rs

1//! CSV ingestion: `CsvConfig`, `CsvReader`, and `StreamingCsvProcessor`.
2//!
3//! Provides zero-copy CSV parsing into `DataFrame` and streaming aggregation
4//! (sum, min/max) without materializing the full frame.
5
6use crate::{Column, DataFrame, DataError};
7
8// ── CsvConfig ───────────────────────────────────────────────────────────────
9
10/// Configuration for `CsvReader`.
11#[derive(Debug, Clone)]
12pub struct CsvConfig {
13    /// Field delimiter byte. Default: `b','`.
14    pub delimiter: u8,
15    /// Whether the first row is a header. Default: `true`.
16    pub has_header: bool,
17    /// Maximum number of data rows to read. `None` = read all. Default: `None`.
18    pub max_rows: Option<usize>,
19    /// Trim ASCII whitespace from field values before type inference. Default: `true`.
20    pub trim_whitespace: bool,
21}
22
23impl Default for CsvConfig {
24    fn default() -> Self {
25        CsvConfig {
26            delimiter: b',',
27            has_header: true,
28            max_rows: None,
29            trim_whitespace: true,
30        }
31    }
32}
33
34// ── CsvReader ───────────────────────────────────────────────────────────────
35
36/// Zero-copy CSV parser for byte slices.
37///
38/// # Design
39///
40/// `CsvReader` operates directly on a `&[u8]` byte slice — no file I/O, no
41/// intermediate `String` allocation per field during the scan phase. Fields
42/// are referenced as sub-slices of the original input and parsed in a single
43/// pass.
44///
45/// # Type inference
46///
47/// Each column's type is inferred from the first data row:
48/// - All digits (optionally signed, one optional `.`) → `Float`
49/// - All digits (optionally signed, no `.`) → `Int` (but stored as `Float` for
50///   numeric safety — explicit `Int` columns can be forced via `CsvConfig`)
51/// - `"true"` / `"false"` / `"1"` / `"0"` → `Bool`
52/// - Anything else → `Str`
53///
54/// # Example
55///
56/// ```rust,ignore
57/// let csv = b"name,age,score\nAlice,30,9.5\nBob,25,8.1";
58/// let df = CsvReader::new(CsvConfig::default()).parse(csv)?;
59/// assert_eq!(df.nrows(), 2);
60/// ```
61pub struct CsvReader {
62    config: CsvConfig,
63}
64
65/// Inferred column type from first data row.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum InferredType {
68    Int,
69    Float,
70    Bool,
71    Str,
72}
73
74/// Infer the type of a single field string.
75fn infer_type(s: &str) -> InferredType {
76    let t = s.trim();
77    if t == "true" || t == "false" || t == "1" || t == "0" {
78        return InferredType::Bool;
79    }
80    // Try int: optional leading '-', all digits
81    let digits = t.strip_prefix('-').unwrap_or(t);
82    if !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit()) {
83        return InferredType::Int;
84    }
85    // Try float: optional leading '-', digits, one '.', digits
86    let no_sign = t.strip_prefix('-').unwrap_or(t);
87    let dot_count = no_sign.chars().filter(|&c| c == '.').count();
88    if dot_count == 1 {
89        let without_dot: String = no_sign.chars().filter(|&c| c != '.').collect();
90        if !without_dot.is_empty() && without_dot.bytes().all(|b| b.is_ascii_digit()) {
91            return InferredType::Float;
92        }
93    }
94    // Also handle scientific notation (e.g., 1.5e-3)
95    if t.parse::<f64>().is_ok() {
96        return InferredType::Float;
97    }
98    InferredType::Str
99}
100
101/// Split a byte slice on `delimiter`, returning field sub-slices.
102/// Handles the case where the last field has a trailing `\r`.
103fn split_fields<'a>(row: &'a [u8], delimiter: u8) -> Vec<&'a str> {
104    let mut fields = Vec::new();
105    let mut start = 0usize;
106    for i in 0..row.len() {
107        if row[i] == delimiter {
108            let field = std::str::from_utf8(&row[start..i]).unwrap_or("");
109            fields.push(field);
110            start = i + 1;
111        }
112    }
113    // Last field (strip trailing \r if present)
114    let tail = &row[start..];
115    let tail = tail.strip_suffix(b"\r").unwrap_or(tail);
116    let field = std::str::from_utf8(tail).unwrap_or("");
117    fields.push(field);
118    fields
119}
120
121impl CsvReader {
122    /// Create a new `CsvReader` with the given configuration.
123    pub fn new(config: CsvConfig) -> Self {
124        CsvReader { config }
125    }
126
127    /// Parse a CSV byte slice into a `DataFrame`.
128    ///
129    /// # Errors
130    /// Returns `DataError::InvalidOperation` if:
131    /// - The input is empty.
132    /// - A data row has fewer fields than the header.
133    pub fn parse(&self, input: &[u8]) -> Result<DataFrame, DataError> {
134        if input.is_empty() {
135            return Ok(DataFrame::new());
136        }
137
138        // Split on newlines, skipping empty trailing lines.
139        let rows: Vec<&[u8]> = input
140            .split(|&b| b == b'\n')
141            .filter(|r| !r.is_empty() && *r != b"\r")
142            .collect();
143
144        if rows.is_empty() {
145            return Ok(DataFrame::new());
146        }
147
148        let delim = self.config.delimiter;
149
150        // Parse header or generate column names.
151        let (header_names, data_rows) = if self.config.has_header {
152            let names: Vec<String> = split_fields(rows[0], delim)
153                .into_iter()
154                .map(|s| {
155                    if self.config.trim_whitespace {
156                        s.trim().to_string()
157                    } else {
158                        s.to_string()
159                    }
160                })
161                .collect();
162            (names, &rows[1..])
163        } else {
164            // Generate column names: col_0, col_1, ...
165            let ncols = split_fields(rows[0], delim).len();
166            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
167            (names, &rows[..])
168        };
169
170        let ncols = header_names.len();
171        if ncols == 0 {
172            return Ok(DataFrame::new());
173        }
174
175        // Limit rows if configured.
176        let data_rows = if let Some(max) = self.config.max_rows {
177            &data_rows[..data_rows.len().min(max)]
178        } else {
179            data_rows
180        };
181
182        if data_rows.is_empty() {
183            // No data rows — return header-only DataFrame with empty columns.
184            let columns: Vec<(String, Column)> = header_names
185                .into_iter()
186                .map(|name| (name, Column::Str(Vec::new())))
187                .collect();
188            return DataFrame::from_columns(columns);
189        }
190
191        // Type-infer from first data row.
192        let first_fields = split_fields(data_rows[0], delim);
193        let mut col_types: Vec<InferredType> = first_fields
194            .iter()
195            .map(|s| {
196                let s = if self.config.trim_whitespace { s.trim() } else { *s };
197                infer_type(s)
198            })
199            .collect();
200
201        // Pad col_types if first row is shorter than header.
202        while col_types.len() < ncols {
203            col_types.push(InferredType::Str);
204        }
205
206        // Allocate column buffers.
207        let nrows = data_rows.len();
208        let mut int_bufs:   Vec<Option<Vec<i64>>>    = vec![None; ncols];
209        let mut float_bufs: Vec<Option<Vec<f64>>>    = vec![None; ncols];
210        let mut bool_bufs:  Vec<Option<Vec<bool>>>   = vec![None; ncols];
211        let mut str_bufs:   Vec<Option<Vec<String>>> = vec![None; ncols];
212
213        for (i, &t) in col_types.iter().enumerate() {
214            match t {
215                InferredType::Int   => int_bufs[i]   = Some(Vec::with_capacity(nrows)),
216                InferredType::Float => float_bufs[i] = Some(Vec::with_capacity(nrows)),
217                InferredType::Bool  => bool_bufs[i]  = Some(Vec::with_capacity(nrows)),
218                InferredType::Str   => str_bufs[i]   = Some(Vec::with_capacity(nrows)),
219            }
220        }
221
222        // Parse each data row.
223        for (row_idx, &row_bytes) in data_rows.iter().enumerate() {
224            let fields = split_fields(row_bytes, delim);
225            for col_idx in 0..ncols {
226                let raw = if col_idx < fields.len() {
227                    fields[col_idx]
228                } else {
229                    // Missing field: treat as empty string.
230                    ""
231                };
232                let s = if self.config.trim_whitespace { raw.trim() } else { raw };
233
234                match col_types[col_idx] {
235                    InferredType::Int => {
236                        let v = s.parse::<i64>().unwrap_or(0);
237                        int_bufs[col_idx].as_mut().unwrap().push(v);
238                    }
239                    InferredType::Float => {
240                        let v = s.parse::<f64>().unwrap_or(0.0);
241                        float_bufs[col_idx].as_mut().unwrap().push(v);
242                    }
243                    InferredType::Bool => {
244                        let v = matches!(s, "true" | "1");
245                        bool_bufs[col_idx].as_mut().unwrap().push(v);
246                    }
247                    InferredType::Str => {
248                        str_bufs[col_idx].as_mut().unwrap().push(s.to_string());
249                    }
250                }
251
252                let _ = row_idx; // suppress unused warning
253            }
254        }
255
256        // Assemble columns.
257        let mut columns: Vec<(String, Column)> = Vec::with_capacity(ncols);
258        for (i, name) in header_names.into_iter().enumerate() {
259            let col = match col_types[i] {
260                InferredType::Int   => Column::Int(int_bufs[i].take().unwrap()),
261                InferredType::Float => Column::Float(float_bufs[i].take().unwrap()),
262                InferredType::Bool  => Column::Bool(bool_bufs[i].take().unwrap()),
263                InferredType::Str   => Column::Str(str_bufs[i].take().unwrap()),
264            };
265            columns.push((name, col));
266        }
267
268        DataFrame::from_columns(columns)
269    }
270}
271
272// ── StreamingCsvProcessor ───────────────────────────────────────────────────
273
274/// A streaming CSV processor that visits rows one at a time without
275/// materializing the full DataFrame.
276///
277/// Useful for large datasets where only aggregate statistics are needed.
278/// Memory usage is O(ncols) regardless of the number of rows.
279///
280/// # Example
281///
282/// ```rust,ignore
283/// let csv = b"x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0";
284/// let mut proc = StreamingCsvProcessor::new(CsvConfig::default());
285/// let (headers, sums, count) = proc.sum_columns(csv)?;
286/// ```
287pub struct StreamingCsvProcessor {
288    config: CsvConfig,
289}
290
291impl StreamingCsvProcessor {
292    pub fn new(config: CsvConfig) -> Self {
293        StreamingCsvProcessor { config }
294    }
295
296    /// Stream through CSV, accumulating per-column sums using Kahan summation.
297    ///
298    /// Returns `(column_names, sums_per_col, row_count)`.
299    /// Non-numeric fields contribute `0.0` to the sum.
300    pub fn sum_columns(&self, input: &[u8]) -> Result<(Vec<String>, Vec<f64>, usize), DataError> {
301        if input.is_empty() {
302            return Ok((vec![], vec![], 0));
303        }
304
305        let rows: Vec<&[u8]> = input
306            .split(|&b| b == b'\n')
307            .filter(|r| !r.is_empty() && *r != b"\r")
308            .collect();
309
310        if rows.is_empty() {
311            return Ok((vec![], vec![], 0));
312        }
313
314        let delim = self.config.delimiter;
315        let (header_names, data_rows) = if self.config.has_header {
316            let names: Vec<String> = split_fields(rows[0], delim)
317                .into_iter()
318                .map(|s| s.trim().to_string())
319                .collect();
320            (names, &rows[1..])
321        } else {
322            let ncols = split_fields(rows[0], delim).len();
323            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
324            (names, &rows[..])
325        };
326
327        let ncols = header_names.len();
328        // Kahan compensated sums per column.
329        let mut sums: Vec<f64> = vec![0.0; ncols];
330        let mut comp: Vec<f64> = vec![0.0; ncols];
331        let mut row_count = 0usize;
332
333        let data_rows = if let Some(max) = self.config.max_rows {
334            &data_rows[..data_rows.len().min(max)]
335        } else {
336            data_rows
337        };
338
339        for &row_bytes in data_rows {
340            let fields = split_fields(row_bytes, delim);
341            for col_idx in 0..ncols {
342                let s = if col_idx < fields.len() {
343                    if self.config.trim_whitespace {
344                        fields[col_idx].trim()
345                    } else {
346                        fields[col_idx]
347                    }
348                } else {
349                    ""
350                };
351                let v: f64 = s.parse().unwrap_or(0.0);
352                // Kahan step
353                let y = v - comp[col_idx];
354                let t = sums[col_idx] + y;
355                comp[col_idx] = (t - sums[col_idx]) - y;
356                sums[col_idx] = t;
357            }
358            row_count += 1;
359        }
360
361        Ok((header_names, sums, row_count))
362    }
363
364    /// Stream through CSV, collecting per-column min/max without materializing.
365    ///
366    /// Returns `(column_names, mins, maxs, row_count)`.
367    /// Non-numeric fields contribute `f64::NAN` to min/max.
368    pub fn minmax_columns(
369        &self,
370        input: &[u8],
371    ) -> Result<(Vec<String>, Vec<f64>, Vec<f64>, usize), DataError> {
372        if input.is_empty() {
373            return Ok((vec![], vec![], vec![], 0));
374        }
375
376        let rows: Vec<&[u8]> = input
377            .split(|&b| b == b'\n')
378            .filter(|r| !r.is_empty() && *r != b"\r")
379            .collect();
380
381        if rows.is_empty() {
382            return Ok((vec![], vec![], vec![], 0));
383        }
384
385        let delim = self.config.delimiter;
386        let (header_names, data_rows) = if self.config.has_header {
387            let names: Vec<String> = split_fields(rows[0], delim)
388                .into_iter()
389                .map(|s| s.trim().to_string())
390                .collect();
391            (names, &rows[1..])
392        } else {
393            let ncols = split_fields(rows[0], delim).len();
394            let names = (0..ncols).map(|i| format!("col_{}", i)).collect();
395            (names, &rows[..])
396        };
397
398        let ncols = header_names.len();
399        let mut mins: Vec<f64> = vec![f64::INFINITY; ncols];
400        let mut maxs: Vec<f64> = vec![f64::NEG_INFINITY; ncols];
401        let mut row_count = 0usize;
402
403        let data_rows = if let Some(max) = self.config.max_rows {
404            &data_rows[..data_rows.len().min(max)]
405        } else {
406            data_rows
407        };
408
409        for &row_bytes in data_rows {
410            let fields = split_fields(row_bytes, delim);
411            for col_idx in 0..ncols {
412                let s = if col_idx < fields.len() {
413                    fields[col_idx].trim()
414                } else {
415                    ""
416                };
417                if let Ok(v) = s.parse::<f64>() {
418                    if v < mins[col_idx] { mins[col_idx] = v; }
419                    if v > maxs[col_idx] { maxs[col_idx] = v; }
420                }
421            }
422            row_count += 1;
423        }
424
425        Ok((header_names, mins, maxs, row_count))
426    }
427}