Skip to main content

cjc_data/
csv.rs

1//! CSV ingestion: `CsvConfig`, `CsvReader`, and `StreamingCsvProcessor`.
2//!
3//! Provides zero-copy CSV parsing into `DataFrame` and streaming aggregation
4//! (sum, min/max) without materializing the full frame.
5
6use crate::{Column, DataFrame, DataError};
7
8// ── CsvConfig ───────────────────────────────────────────────────────────────
9
10/// Configuration for `CsvReader`.
11#[derive(Debug, Clone)]
12pub struct CsvConfig {
13    /// Field delimiter byte. Default: `b','`.
14    pub delimiter: u8,
15    /// Whether the first row is a header. Default: `true`.
16    pub has_header: bool,
17    /// Maximum number of data rows to read. `None` = read all. Default: `None`.
18    pub max_rows: Option<usize>,
19    /// Trim ASCII whitespace from field values before type inference. Default: `true`.
20    pub trim_whitespace: bool,
21}
22
23impl Default for CsvConfig {
24    fn default() -> Self {
25        CsvConfig {
26            delimiter: b',',
27            has_header: true,
28            max_rows: None,
29            trim_whitespace: true,
30        }
31    }
32}
33
34// ── CsvReader ───────────────────────────────────────────────────────────────
35
36/// Zero-copy CSV parser for byte slices.
37///
38/// # Design
39///
40/// `CsvReader` operates directly on a `&[u8]` byte slice — no file I/O, no
41/// intermediate `String` allocation per field during the scan phase. Fields
42/// are referenced as sub-slices of the original input and parsed in a single
43/// pass.
44///
45/// # Type inference
46///
47/// Each column's type is inferred from the first data row:
48/// - All digits (optionally signed, one optional `.`) → `Float`
49/// - All digits (optionally signed, no `.`) → `Int` (but stored as `Float` for
50///   numeric safety — explicit `Int` columns can be forced via `CsvConfig`)
51/// - `"true"` / `"false"` / `"1"` / `"0"` → `Bool`
52/// - Anything else → `Str`
53///
54/// # Example
55///
56/// ```rust,ignore
57/// let csv = b"name,age,score\nAlice,30,9.5\nBob,25,8.1";
58/// let df = CsvReader::new(CsvConfig::default()).parse(csv)?;
59/// assert_eq!(df.nrows(), 2);
60/// ```
61pub struct CsvReader {
62    config: CsvConfig,
63}
64
65/// Inferred column type from first data row.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum InferredType {
68    Int,
69    Float,
70    Bool,
71    Str,
72}
73
74/// Infer the type of a single field string.
75fn infer_type(s: &str) -> InferredType {
76    let t = s.trim();
77    if t == "true" || t == "false" || t == "1" || t == "0" {
78        return InferredType::Bool;
79    }
80    // Try int: optional leading '-', all digits
81    let digits = t.strip_prefix('-').unwrap_or(t);
82    if !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit()) {
83        return InferredType::Int;
84    }
85    // Try float: optional leading '-', digits, one '.', digits
86    let no_sign = t.strip_prefix('-').unwrap_or(t);
87    let dot_count = no_sign.chars().filter(|&c| c == '.').count();
88    if dot_count == 1 {
89        let without_dot: String = no_sign.chars().filter(|&c| c != '.').collect();
90        if !without_dot.is_empty() && without_dot.bytes().all(|b| b.is_ascii_digit()) {
91            return InferredType::Float;
92        }
93    }
94    // Also handle scientific notation (e.g., 1.5e-3)
95    if t.parse::<f64>().is_ok() {
96        return InferredType::Float;
97    }
98    InferredType::Str
99}
100
101/// Split a byte slice on `delimiter`, returning field sub-slices.
102/// Handles the case where the last field has a trailing `\r`.
103fn split_fields<'a>(row: &'a [u8], delimiter: u8) -> Vec<&'a str> {
104    let mut fields = Vec::new();
105    let mut start = 0usize;
106    for i in 0..row.len() {
107        if row[i] == delimiter {
108            let field = std::str::from_utf8(&row[start..i]).unwrap_or("");
109            fields.push(field);
110            start = i + 1;
111        }
112    }
113    // Last field (strip trailing \r if present)
114    let tail = &row[start..];
115    let tail = tail.strip_suffix(b"\r").unwrap_or(tail);
116    let field = std::str::from_utf8(tail).unwrap_or("");
117    fields.push(field);
118    fields
119}
120
121impl CsvReader {
122    /// Create a new `CsvReader` with the given configuration.
123    pub fn new(config: CsvConfig) -> Self {
124        CsvReader { config }
125    }
126
127    /// Parse a CSV byte slice into a `DataFrame`.
128    ///
129    /// # Errors
130    /// Returns `DataError::InvalidOperation` if:
131    /// - The input is empty.
132    /// - A data row has fewer fields than the header.
133    pub fn parse(&self, input: &[u8]) -> Result<DataFrame, DataError> {
134        if input.is_empty() {
135            return Ok(DataFrame::new());
136        }
137
138        // Split on newlines, skipping empty trailing lines.
139        let rows: Vec<&[u8]> = input
140            .split(|&b| b == b'\n')
141            .filter(|r| !r.is_empty() && *r != b"\r")
142            .collect();
143
144        if rows.is_empty() {
145            return Ok(DataFrame::new());
146        }
147
148        let delim = self.config.delimiter;
149
150        // Parse header or generate column names.
151        let (header_names, data_rows) = if self.config.has_header {
152            let names: Vec<String> = split_fields(rows[0], delim)
153                .into_iter()
154                .map(|s| {
155                    if self.config.trim_whitespace {
156                        s.trim().to_string()
157                    } else {
158                        s.to_string()
159                    }
160                })
161                .collect();
162            (names, &rows[1..])
163        } else {
164            // Generate column names: col_0, col_1, ...
165            let ncols = split_fields(rows[0], delim).len();
166            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
167            (names, &rows[..])
168        };
169
170        let ncols = header_names.len();
171        if ncols == 0 {
172            return Ok(DataFrame::new());
173        }
174
175        // Limit rows if configured.
176        let data_rows = if let Some(max) = self.config.max_rows {
177            &data_rows[..data_rows.len().min(max)]
178        } else {
179            data_rows
180        };
181
182        if data_rows.is_empty() {
183            // No data rows — return header-only DataFrame with empty columns.
184            let columns: Vec<(String, Column)> = header_names
185                .into_iter()
186                .map(|name| (name, Column::Str(Vec::new())))
187                .collect();
188            return DataFrame::from_columns(columns);
189        }
190
191        // Type-infer from first data row.
192        let first_fields = split_fields(data_rows[0], delim);
193        let mut col_types: Vec<InferredType> = first_fields
194            .iter()
195            .map(|s| {
196                let s = if self.config.trim_whitespace { s.trim() } else { *s };
197                infer_type(s)
198            })
199            .collect();
200
201        // Pad col_types if first row is shorter than header.
202        while col_types.len() < ncols {
203            col_types.push(InferredType::Str);
204        }
205
206        // Allocate column buffers.
207        let nrows = data_rows.len();
208        let mut int_bufs:   Vec<Option<Vec<i64>>>    = vec![None; ncols];
209        let mut float_bufs: Vec<Option<Vec<f64>>>    = vec![None; ncols];
210        let mut bool_bufs:  Vec<Option<Vec<bool>>>   = vec![None; ncols];
211        let mut str_bufs:   Vec<Option<Vec<String>>> = vec![None; ncols];
212
213        for (i, &t) in col_types.iter().enumerate() {
214            match t {
215                InferredType::Int   => int_bufs[i]   = Some(Vec::with_capacity(nrows)),
216                InferredType::Float => float_bufs[i] = Some(Vec::with_capacity(nrows)),
217                InferredType::Bool  => bool_bufs[i]  = Some(Vec::with_capacity(nrows)),
218                InferredType::Str   => str_bufs[i]   = Some(Vec::with_capacity(nrows)),
219            }
220        }
221
222        // Parse each data row.
223        for (row_idx, &row_bytes) in data_rows.iter().enumerate() {
224            let fields = split_fields(row_bytes, delim);
225            for col_idx in 0..ncols {
226                let raw = if col_idx < fields.len() {
227                    fields[col_idx]
228                } else {
229                    // Missing field: treat as empty string.
230                    ""
231                };
232                let s = if self.config.trim_whitespace { raw.trim() } else { raw };
233
234                match col_types[col_idx] {
235                    InferredType::Int => {
236                        let v = s.parse::<i64>().unwrap_or(0);
237                        int_bufs[col_idx].as_mut().unwrap().push(v);
238                    }
239                    InferredType::Float => {
240                        let v = s.parse::<f64>().unwrap_or(0.0);
241                        float_bufs[col_idx].as_mut().unwrap().push(v);
242                    }
243                    InferredType::Bool => {
244                        let v = matches!(s, "true" | "1");
245                        bool_bufs[col_idx].as_mut().unwrap().push(v);
246                    }
247                    InferredType::Str => {
248                        str_bufs[col_idx].as_mut().unwrap().push(s.to_string());
249                    }
250                }
251
252                let _ = row_idx; // suppress unused warning
253            }
254        }
255
256        // Assemble columns.
257        let mut columns: Vec<(String, Column)> = Vec::with_capacity(ncols);
258        for (i, name) in header_names.into_iter().enumerate() {
259            let col = match col_types[i] {
260                InferredType::Int   => Column::Int(int_bufs[i].take().unwrap()),
261                InferredType::Float => Column::Float(float_bufs[i].take().unwrap()),
262                InferredType::Bool  => Column::Bool(bool_bufs[i].take().unwrap()),
263                InferredType::Str   => Column::Str(str_bufs[i].take().unwrap()),
264            };
265            columns.push((name, col));
266        }
267
268        DataFrame::from_columns(columns)
269    }
270}
271
272// ── StreamingCsvProcessor ───────────────────────────────────────────────────
273
274/// A streaming CSV processor that visits rows one at a time without
275/// materializing the full DataFrame.
276///
277/// Useful for large datasets where only aggregate statistics are needed.
278/// Memory usage is O(ncols) regardless of the number of rows.
279///
280/// # Example
281///
282/// ```rust,ignore
283/// let csv = b"x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0";
284/// let mut proc = StreamingCsvProcessor::new(CsvConfig::default());
285/// let (headers, sums, count) = proc.sum_columns(csv)?;
286/// ```
287pub struct StreamingCsvProcessor {
288    config: CsvConfig,
289}
290
291impl StreamingCsvProcessor {
292    /// Create a new streaming CSV processor with the given configuration.
293    pub fn new(config: CsvConfig) -> Self {
294        StreamingCsvProcessor { config }
295    }
296
297    /// Stream through CSV, accumulating per-column sums using Kahan summation.
298    ///
299    /// Returns `(column_names, sums_per_col, row_count)`.
300    /// Non-numeric fields contribute `0.0` to the sum.
301    pub fn sum_columns(&self, input: &[u8]) -> Result<(Vec<String>, Vec<f64>, usize), DataError> {
302        if input.is_empty() {
303            return Ok((vec![], vec![], 0));
304        }
305
306        let rows: Vec<&[u8]> = input
307            .split(|&b| b == b'\n')
308            .filter(|r| !r.is_empty() && *r != b"\r")
309            .collect();
310
311        if rows.is_empty() {
312            return Ok((vec![], vec![], 0));
313        }
314
315        let delim = self.config.delimiter;
316        let (header_names, data_rows) = if self.config.has_header {
317            let names: Vec<String> = split_fields(rows[0], delim)
318                .into_iter()
319                .map(|s| s.trim().to_string())
320                .collect();
321            (names, &rows[1..])
322        } else {
323            let ncols = split_fields(rows[0], delim).len();
324            let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
325            (names, &rows[..])
326        };
327
328        let ncols = header_names.len();
329        // Kahan compensated sums per column.
330        let mut sums: Vec<f64> = vec![0.0; ncols];
331        let mut comp: Vec<f64> = vec![0.0; ncols];
332        let mut row_count = 0usize;
333
334        let data_rows = if let Some(max) = self.config.max_rows {
335            &data_rows[..data_rows.len().min(max)]
336        } else {
337            data_rows
338        };
339
340        for &row_bytes in data_rows {
341            let fields = split_fields(row_bytes, delim);
342            for col_idx in 0..ncols {
343                let s = if col_idx < fields.len() {
344                    if self.config.trim_whitespace {
345                        fields[col_idx].trim()
346                    } else {
347                        fields[col_idx]
348                    }
349                } else {
350                    ""
351                };
352                let v: f64 = s.parse().unwrap_or(0.0);
353                // Kahan step
354                let y = v - comp[col_idx];
355                let t = sums[col_idx] + y;
356                comp[col_idx] = (t - sums[col_idx]) - y;
357                sums[col_idx] = t;
358            }
359            row_count += 1;
360        }
361
362        Ok((header_names, sums, row_count))
363    }
364
365    /// Stream through CSV, collecting per-column min/max without materializing.
366    ///
367    /// Returns `(column_names, mins, maxs, row_count)`.
368    /// Non-numeric fields contribute `f64::NAN` to min/max.
369    pub fn minmax_columns(
370        &self,
371        input: &[u8],
372    ) -> Result<(Vec<String>, Vec<f64>, Vec<f64>, usize), DataError> {
373        if input.is_empty() {
374            return Ok((vec![], vec![], vec![], 0));
375        }
376
377        let rows: Vec<&[u8]> = input
378            .split(|&b| b == b'\n')
379            .filter(|r| !r.is_empty() && *r != b"\r")
380            .collect();
381
382        if rows.is_empty() {
383            return Ok((vec![], vec![], vec![], 0));
384        }
385
386        let delim = self.config.delimiter;
387        let (header_names, data_rows) = if self.config.has_header {
388            let names: Vec<String> = split_fields(rows[0], delim)
389                .into_iter()
390                .map(|s| s.trim().to_string())
391                .collect();
392            (names, &rows[1..])
393        } else {
394            let ncols = split_fields(rows[0], delim).len();
395            let names = (0..ncols).map(|i| format!("col_{}", i)).collect();
396            (names, &rows[..])
397        };
398
399        let ncols = header_names.len();
400        let mut mins: Vec<f64> = vec![f64::INFINITY; ncols];
401        let mut maxs: Vec<f64> = vec![f64::NEG_INFINITY; ncols];
402        let mut row_count = 0usize;
403
404        let data_rows = if let Some(max) = self.config.max_rows {
405            &data_rows[..data_rows.len().min(max)]
406        } else {
407            data_rows
408        };
409
410        for &row_bytes in data_rows {
411            let fields = split_fields(row_bytes, delim);
412            for col_idx in 0..ncols {
413                let s = if col_idx < fields.len() {
414                    fields[col_idx].trim()
415                } else {
416                    ""
417                };
418                if let Ok(v) = s.parse::<f64>() {
419                    if v < mins[col_idx] { mins[col_idx] = v; }
420                    if v > maxs[col_idx] { maxs[col_idx] = v; }
421                }
422            }
423            row_count += 1;
424        }
425
426        Ok((header_names, mins, maxs, row_count))
427    }
428}