scirs2_datasets/streaming_csv/
mod.rs1use crate::error::{DatasetsError, Result};
20use std::fs::File;
21use std::io::{BufRead, BufReader, Seek, SeekFrom};
22
23#[derive(Debug, Clone)]
29pub struct CsvStreamConfig {
30 pub chunk_size: usize,
32 pub has_header: bool,
34 pub delimiter: u8,
36 pub n_workers: usize,
38}
39
40impl Default for CsvStreamConfig {
41 fn default() -> Self {
42 Self {
43 chunk_size: 1000,
44 has_header: true,
45 delimiter: b',',
46 n_workers: 1,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
57pub struct CsvChunk {
58 pub headers: Vec<String>,
60 pub rows: Vec<Vec<String>>,
62 pub chunk_id: usize,
64 pub is_last: bool,
66}
67
68#[derive(Debug, Clone)]
70pub struct CsvStreamStats {
71 pub mean: f64,
73 pub variance: f64,
75 pub min: f64,
77 pub max: f64,
79 pub n_rows: usize,
81}
82
83pub struct CsvStreamReader {
89 pub path: String,
91 pub config: CsvStreamConfig,
93 pub position: u64,
95 pub chunk_id: usize,
97 headers: Vec<String>,
99 data_start: u64,
101 n_columns: Option<usize>,
103 exhausted: bool,
105}
106
107impl CsvStreamReader {
108 pub fn open(path: &str, config: CsvStreamConfig) -> Result<Self> {
110 let file = File::open(path).map_err(DatasetsError::IoError)?;
111 let mut reader = BufReader::new(file);
112
113 let mut headers = Vec::new();
114 let mut data_start = 0u64;
115
116 if config.has_header {
117 let mut header_line = String::new();
118 let bytes_read = reader
119 .read_line(&mut header_line)
120 .map_err(DatasetsError::IoError)?;
121 if bytes_read == 0 {
122 return Err(DatasetsError::InvalidFormat(
123 "CSV file is empty — cannot read header".into(),
124 ));
125 }
126 let line = header_line.trim_end_matches(['\n', '\r']);
127 headers = split_csv_line(line, config.delimiter);
128 data_start = bytes_read as u64;
129 }
130
131 let n_columns = if headers.is_empty() {
132 None
133 } else {
134 Some(headers.len())
135 };
136
137 Ok(Self {
138 path: path.to_owned(),
139 config,
140 position: data_start,
141 chunk_id: 0,
142 headers,
143 data_start,
144 n_columns,
145 exhausted: false,
146 })
147 }
148
149 pub fn n_columns(&self) -> Option<usize> {
152 self.n_columns
153 }
154
155 pub fn next_chunk(&mut self) -> Result<Option<CsvChunk>> {
159 if self.exhausted {
160 return Ok(None);
161 }
162
163 let file = File::open(&self.path).map_err(DatasetsError::IoError)?;
164 let mut reader = BufReader::new(file);
165 reader
166 .seek(SeekFrom::Start(self.position))
167 .map_err(DatasetsError::IoError)?;
168
169 let mut rows: Vec<Vec<String>> = Vec::with_capacity(self.config.chunk_size);
170 let mut bytes_consumed = 0u64;
171
172 for _ in 0..self.config.chunk_size {
173 let mut line = String::new();
174 let n = reader
175 .read_line(&mut line)
176 .map_err(DatasetsError::IoError)?;
177 if n == 0 {
178 self.exhausted = true;
179 break;
180 }
181 bytes_consumed += n as u64;
182 let trimmed = line.trim_end_matches(['\n', '\r']);
183 if trimmed.is_empty() {
184 continue;
186 }
187 let fields = split_csv_line(trimmed, self.config.delimiter);
188
189 if self.n_columns.is_none() {
191 self.n_columns = Some(fields.len());
192 }
193 rows.push(fields);
194 }
195
196 if rows.is_empty() {
197 return Ok(None);
198 }
199
200 self.position += bytes_consumed;
201 let chunk_id = self.chunk_id;
202 self.chunk_id += 1;
203
204 let is_last = self.exhausted || {
206 let mut peek_file = File::open(&self.path).map_err(DatasetsError::IoError)?;
207 let mut peek_reader = BufReader::new(&mut peek_file);
208 peek_reader
209 .seek(SeekFrom::Start(self.position))
210 .map_err(DatasetsError::IoError)?;
211 let mut tmp = String::new();
212 let peek_n = peek_reader
213 .read_line(&mut tmp)
214 .map_err(DatasetsError::IoError)?;
215 peek_n == 0
216 };
217 self.exhausted = is_last;
218
219 Ok(Some(CsvChunk {
220 headers: self.headers.clone(),
221 rows,
222 chunk_id,
223 is_last,
224 }))
225 }
226
227 pub fn reset(&mut self) -> Result<()> {
229 self.position = self.data_start;
230 self.chunk_id = 0;
231 self.exhausted = false;
232 Ok(())
233 }
234}
235
236fn split_csv_line(line: &str, delimiter: u8) -> Vec<String> {
242 let delim = delimiter as char;
243 let mut fields = Vec::new();
244 let mut current = String::new();
245 let mut in_quotes = false;
246
247 let mut chars = line.chars().peekable();
248 while let Some(ch) = chars.next() {
249 match ch {
250 '"' => {
251 if in_quotes && chars.peek() == Some(&'"') {
253 chars.next();
254 current.push('"');
255 } else {
256 in_quotes = !in_quotes;
257 }
258 }
259 c if c == delim && !in_quotes => {
260 fields.push(current.clone());
261 current.clear();
262 }
263 other => {
264 current.push(other);
265 }
266 }
267 }
268 fields.push(current);
269 fields
270}
271
272pub fn stream_csv_as_f64(
281 path: &str,
282 config: &CsvStreamConfig,
283 column_indices: &[usize],
284) -> Result<Vec<Vec<f64>>> {
285 let mut reader = CsvStreamReader::open(path, config.clone())?;
286 let mut result: Vec<Vec<f64>> = Vec::new();
287
288 loop {
289 match reader.next_chunk()? {
290 None => break,
291 Some(chunk) => {
292 for row in &chunk.rows {
293 let mut vals = Vec::with_capacity(column_indices.len());
294 let mut ok = true;
295 for &col_idx in column_indices {
296 match row.get(col_idx).and_then(|s| s.trim().parse::<f64>().ok()) {
297 Some(v) => vals.push(v),
298 None => {
299 ok = false;
300 break;
301 }
302 }
303 }
304 if ok {
305 result.push(vals);
306 }
307 }
308 if chunk.is_last {
309 break;
310 }
311 }
312 }
313 }
314
315 Ok(result)
316}
317
318pub fn streaming_statistics(
322 path: &str,
323 config: &CsvStreamConfig,
324 column: usize,
325) -> Result<CsvStreamStats> {
326 let mut reader = CsvStreamReader::open(path, config.clone())?;
327
328 let mut n: usize = 0;
329 let mut mean = 0.0f64;
330 let mut m2 = 0.0f64;
331 let mut min_val = f64::INFINITY;
332 let mut max_val = f64::NEG_INFINITY;
333
334 loop {
335 match reader.next_chunk()? {
336 None => break,
337 Some(chunk) => {
338 for row in &chunk.rows {
339 if let Some(val_str) = row.get(column) {
340 if let Ok(x) = val_str.trim().parse::<f64>() {
341 n += 1;
342 let delta = x - mean;
343 mean += delta / n as f64;
344 let delta2 = x - mean;
345 m2 += delta * delta2;
346 if x < min_val {
347 min_val = x;
348 }
349 if x > max_val {
350 max_val = x;
351 }
352 }
353 }
354 }
355 if chunk.is_last {
356 break;
357 }
358 }
359 }
360 }
361
362 if n == 0 {
363 return Err(DatasetsError::InvalidFormat(
364 "No parseable values found in the specified column".into(),
365 ));
366 }
367
368 let variance = if n > 1 { m2 / (n - 1) as f64 } else { 0.0 };
369
370 Ok(CsvStreamStats {
371 mean,
372 variance,
373 min: min_val,
374 max: max_val,
375 n_rows: n,
376 })
377}
378
379#[cfg(test)]
384mod tests {
385 use super::*;
386 use std::io::Write;
387
388 fn make_temp_csv(content: &str) -> String {
390 let dir = std::env::temp_dir();
391 let path = dir.join(format!(
392 "scirs2_csv_test_{}.csv",
393 std::time::SystemTime::now()
394 .duration_since(std::time::UNIX_EPOCH)
395 .unwrap_or_default()
396 .as_nanos()
397 ));
398 let mut f = File::create(&path).expect("create");
399 f.write_all(content.as_bytes()).expect("write");
400 path.to_string_lossy().into_owned()
401 }
402
403 #[test]
404 fn test_open_and_read_header() {
405 let csv = "a,b,c\n1,2,3\n4,5,6\n";
406 let path = make_temp_csv(csv);
407 let config = CsvStreamConfig {
408 chunk_size: 10,
409 ..Default::default()
410 };
411 let reader = CsvStreamReader::open(&path, config).expect("open");
412 assert_eq!(reader.headers, vec!["a", "b", "c"]);
413 assert_eq!(reader.n_columns(), Some(3));
414 let _ = std::fs::remove_file(&path);
415 }
416
417 #[test]
418 fn test_next_chunk_returns_rows() {
419 let csv = "x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0\n";
420 let path = make_temp_csv(csv);
421 let config = CsvStreamConfig {
422 chunk_size: 10,
423 ..Default::default()
424 };
425 let mut reader = CsvStreamReader::open(&path, config).expect("open");
426 let chunk = reader.next_chunk().expect("read").expect("some");
427 assert_eq!(chunk.rows.len(), 3);
428 assert_eq!(chunk.chunk_id, 0);
429 assert!(chunk.is_last);
430 let _ = std::fs::remove_file(&path);
431 }
432
433 #[test]
434 fn test_chunked_reading() {
435 let mut csv = "value\n".to_owned();
437 for i in 0..10u32 {
438 csv.push_str(&format!("{i}\n"));
439 }
440 let path = make_temp_csv(&csv);
441 let config = CsvStreamConfig {
442 chunk_size: 4,
443 ..Default::default()
444 };
445 let mut reader = CsvStreamReader::open(&path, config).expect("open");
446
447 let mut total_rows = 0;
448 let mut n_chunks = 0;
449 loop {
450 match reader.next_chunk().expect("read") {
451 None => break,
452 Some(chunk) => {
453 total_rows += chunk.rows.len();
454 n_chunks += 1;
455 if chunk.is_last {
456 break;
457 }
458 }
459 }
460 }
461 assert_eq!(total_rows, 10);
462 assert_eq!(n_chunks, 3);
463 let _ = std::fs::remove_file(&path);
464 }
465
466 #[test]
467 fn test_reset() {
468 let csv = "val\n1\n2\n3\n";
469 let path = make_temp_csv(csv);
470 let config = CsvStreamConfig {
471 chunk_size: 10,
472 ..Default::default()
473 };
474 let mut reader = CsvStreamReader::open(&path, config).expect("open");
475 let _first = reader.next_chunk().expect("read").expect("some");
476 reader.reset().expect("reset");
477 let second = reader.next_chunk().expect("read").expect("some");
478 assert_eq!(second.rows.len(), 3);
479 assert_eq!(second.chunk_id, 0);
480 let _ = std::fs::remove_file(&path);
481 }
482
483 #[test]
484 fn test_stream_csv_as_f64() {
485 let csv = "a,b,c\n1.0,2.0,3.0\n4.0,5.0,6.0\n";
486 let path = make_temp_csv(csv);
487 let config = CsvStreamConfig::default();
488 let data = stream_csv_as_f64(&path, &config, &[0, 2]).expect("ok");
489 assert_eq!(data.len(), 2);
490 assert!((data[0][0] - 1.0).abs() < 1e-10);
491 assert!((data[0][1] - 3.0).abs() < 1e-10);
492 let _ = std::fs::remove_file(&path);
493 }
494
495 #[test]
496 fn test_streaming_statistics_mean() {
497 let csv = "value\n1\n2\n3\n4\n5\n";
499 let path = make_temp_csv(csv);
500 let config = CsvStreamConfig::default();
501 let stats = streaming_statistics(&path, &config, 0).expect("stats");
502 assert!((stats.mean - 3.0).abs() < 1e-10, "mean={}", stats.mean);
503 assert_eq!(stats.n_rows, 5);
504 assert!((stats.min - 1.0).abs() < 1e-10);
505 assert!((stats.max - 5.0).abs() < 1e-10);
506 let _ = std::fs::remove_file(&path);
507 }
508
509 #[test]
510 fn test_split_csv_line_basic() {
511 let fields = split_csv_line("a,b,c", b',');
512 assert_eq!(fields, vec!["a", "b", "c"]);
513 }
514
515 #[test]
516 fn test_split_csv_line_quoted() {
517 let fields = split_csv_line("\"hello, world\",42", b',');
518 assert_eq!(fields, vec!["hello, world", "42"]);
519 }
520
521 #[test]
522 fn test_split_csv_line_escaped_quote() {
523 let fields = split_csv_line("\"say \"\"hi\"\"\",val", b',');
524 assert_eq!(fields, vec!["say \"hi\"", "val"]);
525 }
526}