Skip to main content

scirs2_datasets/streaming_csv/
mod.rs

1//! Chunked CSV streaming with single-pass statistics.
2//!
3//! This module provides a low-memory CSV reader that delivers data in
4//! configurable chunks rather than loading the entire file at once.
5//!
6//! ## Example
7//!
8//! ```no_run
9//! use scirs2_datasets::streaming_csv::{CsvStreamConfig, CsvStreamReader, streaming_statistics};
10//!
11//! let config = CsvStreamConfig::default();
12//! let mut reader = CsvStreamReader::open("data.csv", config).expect("open");
13//! while let Ok(Some(chunk)) = reader.next_chunk() {
14//!     println!("chunk {}: {} rows", chunk.chunk_id, chunk.rows.len());
15//!     if chunk.is_last { break; }
16//! }
17//! ```
18
19use crate::error::{DatasetsError, Result};
20use std::fs::File;
21use std::io::{BufRead, BufReader, Seek, SeekFrom};
22
23// ─────────────────────────────────────────────────────────────────────────────
24// Configuration
25// ─────────────────────────────────────────────────────────────────────────────
26
27/// Configuration for chunked CSV streaming.
28#[derive(Debug, Clone)]
29pub struct CsvStreamConfig {
30    /// Number of data rows per chunk (header not counted).
31    pub chunk_size: usize,
32    /// Whether the first non-empty line is a header.
33    pub has_header: bool,
34    /// Field delimiter byte (default `b','`).
35    pub delimiter: u8,
36    /// Number of parallel worker threads (reserved for future use; currently unused).
37    pub n_workers: usize,
38}
39
40impl Default for CsvStreamConfig {
41    fn default() -> Self {
42        Self {
43            chunk_size: 1000,
44            has_header: true,
45            delimiter: b',',
46            n_workers: 1,
47        }
48    }
49}
50
51// ─────────────────────────────────────────────────────────────────────────────
52// Output types
53// ─────────────────────────────────────────────────────────────────────────────
54
55/// A chunk of rows read from a CSV file.
56#[derive(Debug, Clone)]
57pub struct CsvChunk {
58    /// Column headers (empty if `has_header` is `false`).
59    pub headers: Vec<String>,
60    /// Data rows; each row is a `Vec<String>` of field values.
61    pub rows: Vec<Vec<String>>,
62    /// Zero-based index of this chunk.
63    pub chunk_id: usize,
64    /// `true` if no more data follows this chunk.
65    pub is_last: bool,
66}
67
68/// Statistics computed via a single streaming pass over a CSV column.
69#[derive(Debug, Clone)]
70pub struct CsvStreamStats {
71    /// Arithmetic mean (Welford online algorithm).
72    pub mean: f64,
73    /// Sample variance (Welford online algorithm).
74    pub variance: f64,
75    /// Minimum value.
76    pub min: f64,
77    /// Maximum value.
78    pub max: f64,
79    /// Total number of successfully parsed rows.
80    pub n_rows: usize,
81}
82
83// ─────────────────────────────────────────────────────────────────────────────
84// CsvStreamReader
85// ─────────────────────────────────────────────────────────────────────────────
86
87/// A streaming CSV reader that yields data in fixed-size chunks.
88pub struct CsvStreamReader {
89    /// Path to the CSV file.
90    pub path: String,
91    /// Configuration used by this reader.
92    pub config: CsvStreamConfig,
93    /// Byte offset of the next chunk start (after the header).
94    pub position: u64,
95    /// Identifier of the next chunk to be returned.
96    pub chunk_id: usize,
97    /// Column headers (populated on open when `has_header` is `true`).
98    headers: Vec<String>,
99    /// Byte offset immediately after the header line.
100    data_start: u64,
101    /// Number of columns (derived from header or first row).
102    n_columns: Option<usize>,
103    /// Set to `true` once we reach EOF.
104    exhausted: bool,
105}
106
107impl CsvStreamReader {
108    /// Open a CSV file for streaming.
109    pub fn open(path: &str, config: CsvStreamConfig) -> Result<Self> {
110        let file = File::open(path).map_err(DatasetsError::IoError)?;
111        let mut reader = BufReader::new(file);
112
113        let mut headers = Vec::new();
114        let mut data_start = 0u64;
115
116        if config.has_header {
117            let mut header_line = String::new();
118            let bytes_read = reader
119                .read_line(&mut header_line)
120                .map_err(DatasetsError::IoError)?;
121            if bytes_read == 0 {
122                return Err(DatasetsError::InvalidFormat(
123                    "CSV file is empty — cannot read header".into(),
124                ));
125            }
126            let line = header_line.trim_end_matches(['\n', '\r']);
127            headers = split_csv_line(line, config.delimiter);
128            data_start = bytes_read as u64;
129        }
130
131        let n_columns = if headers.is_empty() {
132            None
133        } else {
134            Some(headers.len())
135        };
136
137        Ok(Self {
138            path: path.to_owned(),
139            config,
140            position: data_start,
141            chunk_id: 0,
142            headers,
143            data_start,
144            n_columns,
145            exhausted: false,
146        })
147    }
148
149    /// Number of columns (known only after the header has been read or the
150    /// first row has been parsed).
151    pub fn n_columns(&self) -> Option<usize> {
152        self.n_columns
153    }
154
155    /// Read the next chunk of rows.
156    ///
157    /// Returns `Ok(None)` when the file is exhausted.
158    pub fn next_chunk(&mut self) -> Result<Option<CsvChunk>> {
159        if self.exhausted {
160            return Ok(None);
161        }
162
163        let file = File::open(&self.path).map_err(DatasetsError::IoError)?;
164        let mut reader = BufReader::new(file);
165        reader
166            .seek(SeekFrom::Start(self.position))
167            .map_err(DatasetsError::IoError)?;
168
169        let mut rows: Vec<Vec<String>> = Vec::with_capacity(self.config.chunk_size);
170        let mut bytes_consumed = 0u64;
171
172        for _ in 0..self.config.chunk_size {
173            let mut line = String::new();
174            let n = reader
175                .read_line(&mut line)
176                .map_err(DatasetsError::IoError)?;
177            if n == 0 {
178                self.exhausted = true;
179                break;
180            }
181            bytes_consumed += n as u64;
182            let trimmed = line.trim_end_matches(['\n', '\r']);
183            if trimmed.is_empty() {
184                // Skip blank lines.
185                continue;
186            }
187            let fields = split_csv_line(trimmed, self.config.delimiter);
188
189            // Infer column count from the first row we see.
190            if self.n_columns.is_none() {
191                self.n_columns = Some(fields.len());
192            }
193            rows.push(fields);
194        }
195
196        if rows.is_empty() {
197            return Ok(None);
198        }
199
200        self.position += bytes_consumed;
201        let chunk_id = self.chunk_id;
202        self.chunk_id += 1;
203
204        // We need to peek ahead to know whether this is the last chunk.
205        let is_last = self.exhausted || {
206            let mut peek_file = File::open(&self.path).map_err(DatasetsError::IoError)?;
207            let mut peek_reader = BufReader::new(&mut peek_file);
208            peek_reader
209                .seek(SeekFrom::Start(self.position))
210                .map_err(DatasetsError::IoError)?;
211            let mut tmp = String::new();
212            let peek_n = peek_reader
213                .read_line(&mut tmp)
214                .map_err(DatasetsError::IoError)?;
215            peek_n == 0
216        };
217        self.exhausted = is_last;
218
219        Ok(Some(CsvChunk {
220            headers: self.headers.clone(),
221            rows,
222            chunk_id,
223            is_last,
224        }))
225    }
226
227    /// Reset the reader back to the beginning of the data (after the header).
228    pub fn reset(&mut self) -> Result<()> {
229        self.position = self.data_start;
230        self.chunk_id = 0;
231        self.exhausted = false;
232        Ok(())
233    }
234}
235
236// ─────────────────────────────────────────────────────────────────────────────
237// Helpers
238// ─────────────────────────────────────────────────────────────────────────────
239
240/// Split a single CSV line respecting double-quoted fields.
241fn split_csv_line(line: &str, delimiter: u8) -> Vec<String> {
242    let delim = delimiter as char;
243    let mut fields = Vec::new();
244    let mut current = String::new();
245    let mut in_quotes = false;
246
247    let mut chars = line.chars().peekable();
248    while let Some(ch) = chars.next() {
249        match ch {
250            '"' => {
251                // Handle escaped double-quote ("")
252                if in_quotes && chars.peek() == Some(&'"') {
253                    chars.next();
254                    current.push('"');
255                } else {
256                    in_quotes = !in_quotes;
257                }
258            }
259            c if c == delim && !in_quotes => {
260                fields.push(current.clone());
261                current.clear();
262            }
263            other => {
264                current.push(other);
265            }
266        }
267    }
268    fields.push(current);
269    fields
270}
271
272// ─────────────────────────────────────────────────────────────────────────────
273// Convenience functions
274// ─────────────────────────────────────────────────────────────────────────────
275
276/// Stream a CSV file and extract the specified columns as `f64`.
277///
278/// Rows where any requested column cannot be parsed as `f64` are silently
279/// skipped.  Column indices are 0-based.
280pub fn stream_csv_as_f64(
281    path: &str,
282    config: &CsvStreamConfig,
283    column_indices: &[usize],
284) -> Result<Vec<Vec<f64>>> {
285    let mut reader = CsvStreamReader::open(path, config.clone())?;
286    let mut result: Vec<Vec<f64>> = Vec::new();
287
288    loop {
289        match reader.next_chunk()? {
290            None => break,
291            Some(chunk) => {
292                for row in &chunk.rows {
293                    let mut vals = Vec::with_capacity(column_indices.len());
294                    let mut ok = true;
295                    for &col_idx in column_indices {
296                        match row.get(col_idx).and_then(|s| s.trim().parse::<f64>().ok()) {
297                            Some(v) => vals.push(v),
298                            None => {
299                                ok = false;
300                                break;
301                            }
302                        }
303                    }
304                    if ok {
305                        result.push(vals);
306                    }
307                }
308                if chunk.is_last {
309                    break;
310                }
311            }
312        }
313    }
314
315    Ok(result)
316}
317
318/// Compute streaming statistics for a single column using the Welford algorithm.
319///
320/// Rows where the specified column cannot be parsed as `f64` are skipped.
321pub fn streaming_statistics(
322    path: &str,
323    config: &CsvStreamConfig,
324    column: usize,
325) -> Result<CsvStreamStats> {
326    let mut reader = CsvStreamReader::open(path, config.clone())?;
327
328    let mut n: usize = 0;
329    let mut mean = 0.0f64;
330    let mut m2 = 0.0f64;
331    let mut min_val = f64::INFINITY;
332    let mut max_val = f64::NEG_INFINITY;
333
334    loop {
335        match reader.next_chunk()? {
336            None => break,
337            Some(chunk) => {
338                for row in &chunk.rows {
339                    if let Some(val_str) = row.get(column) {
340                        if let Ok(x) = val_str.trim().parse::<f64>() {
341                            n += 1;
342                            let delta = x - mean;
343                            mean += delta / n as f64;
344                            let delta2 = x - mean;
345                            m2 += delta * delta2;
346                            if x < min_val {
347                                min_val = x;
348                            }
349                            if x > max_val {
350                                max_val = x;
351                            }
352                        }
353                    }
354                }
355                if chunk.is_last {
356                    break;
357                }
358            }
359        }
360    }
361
362    if n == 0 {
363        return Err(DatasetsError::InvalidFormat(
364            "No parseable values found in the specified column".into(),
365        ));
366    }
367
368    let variance = if n > 1 { m2 / (n - 1) as f64 } else { 0.0 };
369
370    Ok(CsvStreamStats {
371        mean,
372        variance,
373        min: min_val,
374        max: max_val,
375        n_rows: n,
376    })
377}
378
379// ─────────────────────────────────────────────────────────────────────────────
380// Tests
381// ─────────────────────────────────────────────────────────────────────────────
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use std::io::Write;
387
388    // Helper: create a temp CSV and get its path.  Returns (content, path).
389    fn make_temp_csv(content: &str) -> String {
390        let dir = std::env::temp_dir();
391        let path = dir.join(format!(
392            "scirs2_csv_test_{}.csv",
393            std::time::SystemTime::now()
394                .duration_since(std::time::UNIX_EPOCH)
395                .unwrap_or_default()
396                .as_nanos()
397        ));
398        let mut f = File::create(&path).expect("create");
399        f.write_all(content.as_bytes()).expect("write");
400        path.to_string_lossy().into_owned()
401    }
402
403    #[test]
404    fn test_open_and_read_header() {
405        let csv = "a,b,c\n1,2,3\n4,5,6\n";
406        let path = make_temp_csv(csv);
407        let config = CsvStreamConfig {
408            chunk_size: 10,
409            ..Default::default()
410        };
411        let reader = CsvStreamReader::open(&path, config).expect("open");
412        assert_eq!(reader.headers, vec!["a", "b", "c"]);
413        assert_eq!(reader.n_columns(), Some(3));
414        let _ = std::fs::remove_file(&path);
415    }
416
417    #[test]
418    fn test_next_chunk_returns_rows() {
419        let csv = "x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0\n";
420        let path = make_temp_csv(csv);
421        let config = CsvStreamConfig {
422            chunk_size: 10,
423            ..Default::default()
424        };
425        let mut reader = CsvStreamReader::open(&path, config).expect("open");
426        let chunk = reader.next_chunk().expect("read").expect("some");
427        assert_eq!(chunk.rows.len(), 3);
428        assert_eq!(chunk.chunk_id, 0);
429        assert!(chunk.is_last);
430        let _ = std::fs::remove_file(&path);
431    }
432
433    #[test]
434    fn test_chunked_reading() {
435        // 10 rows, chunk_size = 4 → chunks of 4, 4, 2
436        let mut csv = "value\n".to_owned();
437        for i in 0..10u32 {
438            csv.push_str(&format!("{i}\n"));
439        }
440        let path = make_temp_csv(&csv);
441        let config = CsvStreamConfig {
442            chunk_size: 4,
443            ..Default::default()
444        };
445        let mut reader = CsvStreamReader::open(&path, config).expect("open");
446
447        let mut total_rows = 0;
448        let mut n_chunks = 0;
449        loop {
450            match reader.next_chunk().expect("read") {
451                None => break,
452                Some(chunk) => {
453                    total_rows += chunk.rows.len();
454                    n_chunks += 1;
455                    if chunk.is_last {
456                        break;
457                    }
458                }
459            }
460        }
461        assert_eq!(total_rows, 10);
462        assert_eq!(n_chunks, 3);
463        let _ = std::fs::remove_file(&path);
464    }
465
466    #[test]
467    fn test_reset() {
468        let csv = "val\n1\n2\n3\n";
469        let path = make_temp_csv(csv);
470        let config = CsvStreamConfig {
471            chunk_size: 10,
472            ..Default::default()
473        };
474        let mut reader = CsvStreamReader::open(&path, config).expect("open");
475        let _first = reader.next_chunk().expect("read").expect("some");
476        reader.reset().expect("reset");
477        let second = reader.next_chunk().expect("read").expect("some");
478        assert_eq!(second.rows.len(), 3);
479        assert_eq!(second.chunk_id, 0);
480        let _ = std::fs::remove_file(&path);
481    }
482
483    #[test]
484    fn test_stream_csv_as_f64() {
485        let csv = "a,b,c\n1.0,2.0,3.0\n4.0,5.0,6.0\n";
486        let path = make_temp_csv(csv);
487        let config = CsvStreamConfig::default();
488        let data = stream_csv_as_f64(&path, &config, &[0, 2]).expect("ok");
489        assert_eq!(data.len(), 2);
490        assert!((data[0][0] - 1.0).abs() < 1e-10);
491        assert!((data[0][1] - 3.0).abs() < 1e-10);
492        let _ = std::fs::remove_file(&path);
493    }
494
495    #[test]
496    fn test_streaming_statistics_mean() {
497        // 5 values: 1..=5, mean = 3.0
498        let csv = "value\n1\n2\n3\n4\n5\n";
499        let path = make_temp_csv(csv);
500        let config = CsvStreamConfig::default();
501        let stats = streaming_statistics(&path, &config, 0).expect("stats");
502        assert!((stats.mean - 3.0).abs() < 1e-10, "mean={}", stats.mean);
503        assert_eq!(stats.n_rows, 5);
504        assert!((stats.min - 1.0).abs() < 1e-10);
505        assert!((stats.max - 5.0).abs() < 1e-10);
506        let _ = std::fs::remove_file(&path);
507    }
508
509    #[test]
510    fn test_split_csv_line_basic() {
511        let fields = split_csv_line("a,b,c", b',');
512        assert_eq!(fields, vec!["a", "b", "c"]);
513    }
514
515    #[test]
516    fn test_split_csv_line_quoted() {
517        let fields = split_csv_line("\"hello, world\",42", b',');
518        assert_eq!(fields, vec!["hello, world", "42"]);
519    }
520
521    #[test]
522    fn test_split_csv_line_escaped_quote() {
523        let fields = split_csv_line("\"say \"\"hi\"\"\",val", b',');
524        assert_eq!(fields, vec!["say \"hi\"", "val"]);
525    }
526}