1use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
14use scirs2_core::ndarray::{Array1, Array2};
15use scirs2_core::numeric::Complex64;
16use std::fs::File;
17use std::io::{BufRead, BufReader, BufWriter, Write};
18use std::path::Path;
19
20use crate::error::{IoError, Result};
21
22#[derive(Debug, Clone)]
24pub struct CsvReaderConfig {
25 pub delimiter: char,
27 pub quote_char: char,
29 pub trim: bool,
31 pub has_header: bool,
33 pub comment_char: Option<char>,
35 pub skip_rows: usize,
37 pub max_rows: Option<usize>,
39}
40
41impl Default for CsvReaderConfig {
42 fn default() -> Self {
43 Self {
44 delimiter: ',',
45 quote_char: '"',
46 trim: false,
47 has_header: true,
48 comment_char: None,
49 skip_rows: 0,
50 max_rows: None,
51 }
52 }
53}
54
55#[allow(dead_code)]
85pub fn read_csv<P: AsRef<Path>>(
86 path: P,
87 config: Option<CsvReaderConfig>,
88) -> Result<(Vec<String>, Array2<String>)> {
89 let config = config.unwrap_or_default();
90
91 let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
92 let reader = BufReader::new(file);
93
94 let mut lines = reader.lines();
95 let mut rows = Vec::new();
96
97 for _ in 0..config.skip_rows {
99 if lines.next().is_none() {
100 return Err(IoError::FormatError("Not enough rows in file".to_string()));
101 }
102 }
103
104 let headers = if config.has_header {
106 match lines.next() {
107 Some(Ok(line)) => parse_csv_line(&line, &config),
108 Some(Err(e)) => return Err(IoError::FileError(e.to_string())),
109 None => return Err(IoError::FormatError("Empty file".to_string())),
110 }
111 } else {
112 Vec::new()
113 };
114
115 let mut row_count = 0;
117 for line_result in lines {
118 if let Some(max) = config.max_rows {
120 if row_count >= max {
121 break;
122 }
123 }
124
125 let line = line_result.map_err(|e| IoError::FileError(e.to_string()))?;
126
127 if let Some(comment_char) = config.comment_char {
129 if line.trim().starts_with(comment_char) {
130 continue;
131 }
132 }
133
134 if line.trim().is_empty() {
136 continue;
137 }
138
139 let row = parse_csv_line(&line, &config);
140 rows.push(row);
141 row_count += 1;
142 }
143
144 if rows.is_empty() {
146 return Err(IoError::FormatError("No data rows in file".to_string()));
147 }
148
149 let num_cols = rows[0].len();
151
152 for (i, row) in rows.iter().enumerate() {
154 if row.len() != num_cols {
155 return Err(IoError::FormatError(format!(
156 "Inconsistent number of columns: row {row_num} has {actual_cols} columns, expected {expected_cols}",
157 row_num = i + 1,
158 actual_cols = row.len(),
159 expected_cols = num_cols
160 )));
161 }
162 }
163
164 let num_rows = rows.len();
166 let mut data = Array2::from_elem((num_rows, num_cols), String::new());
167
168 for (i, row) in rows.iter().enumerate() {
169 for (j, value) in row.iter().enumerate() {
170 data[[i, j]] = value.clone();
171 }
172 }
173
174 Ok((headers, data))
175}
176
177#[allow(dead_code)]
179fn parse_csv_line(line: &str, config: &CsvReaderConfig) -> Vec<String> {
180 let mut fields = Vec::new();
181 let mut field = String::new();
182 let mut in_quotes = false;
183 let mut chars = line.chars().peekable();
184
185 while let Some(c) = chars.next() {
186 if c == config.quote_char {
188 if in_quotes && chars.peek() == Some(&config.quote_char) {
190 chars.next(); field.push(config.quote_char);
192 } else {
193 in_quotes = !in_quotes;
194 }
195 }
196 else if c == config.delimiter && !in_quotes {
198 let processed_field = if config.trim {
199 field.trim().to_string()
200 } else {
201 field
202 };
203 fields.push(processed_field);
204 field = String::new();
205 }
206 else {
208 field.push(c);
209 }
210 }
211
212 let processed_field = if config.trim {
214 field.trim().to_string()
215 } else {
216 field
217 };
218 fields.push(processed_field);
219
220 fields
221}
222
223#[allow(dead_code)]
243pub fn read_csv_numeric<P: AsRef<Path>>(
244 path: P,
245 config: Option<CsvReaderConfig>,
246) -> Result<(Vec<String>, Array2<f64>)> {
247 let (headers, string_data) = read_csv(path, config)?;
248
249 let shape = string_data.shape();
250 let mut numeric_data = Array2::<f64>::zeros((shape[0], shape[1]));
251
252 for i in 0..shape[0] {
253 for j in 0..shape[1] {
254 let value = string_data[[i, j]].parse::<f64>().map_err(|_| {
255 IoError::FormatError(format!(
256 "Could not convert value '{value}' at position [{row}, {col}] to number",
257 value = string_data[[i, j]],
258 row = i,
259 col = j
260 ))
261 })?;
262 numeric_data[[i, j]] = value;
263 }
264 }
265
266 Ok((headers, numeric_data))
267}
268
269#[derive(Debug, Clone)]
271pub struct CsvWriterConfig {
272 pub delimiter: char,
274 pub quote_char: char,
276 pub always_quote: bool,
278 pub quote_special: bool,
280 pub write_header: bool,
282 pub line_ending: LineEnding,
284}
285
286impl Default for CsvWriterConfig {
287 fn default() -> Self {
288 Self {
289 delimiter: ',',
290 quote_char: '"',
291 always_quote: false,
292 quote_special: true,
293 write_header: true,
294 line_ending: LineEnding::default(),
295 }
296 }
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
301pub enum LineEnding {
302 #[default]
304 LF,
305 CRLF,
307}
308
309impl LineEnding {
310 fn as_str(&self) -> &'static str {
311 match self {
312 LineEnding::LF => "\n",
313 LineEnding::CRLF => "\r\n",
314 }
315 }
316}
317
318#[derive(Debug, Clone)]
320pub struct MissingValueOptions {
321 pub values: Vec<String>,
323 pub fill_value: Option<f64>,
325}
326
327impl Default for MissingValueOptions {
328 fn default() -> Self {
329 Self {
330 values: vec![
331 "NA".to_string(),
332 "N/A".to_string(),
333 "NaN".to_string(),
334 "null".to_string(),
335 "".to_string(),
336 ],
337 fill_value: None,
338 }
339 }
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum ColumnType {
345 String,
347 Integer,
349 Float,
351 Boolean,
353 Date,
355 Time,
357 DateTime,
359 Complex,
361}
362
363#[derive(Debug, Clone)]
365pub struct ColumnSpec {
366 pub index: usize,
368 pub name: Option<String>,
370 pub dtype: ColumnType,
372 pub missing_values: Option<MissingValueOptions>,
374}
375
376#[derive(Debug, Clone)]
378pub enum DataValue {
379 String(String),
381 Integer(i64),
383 Float(f64),
385 Boolean(bool),
387 Date(NaiveDate),
389 Time(NaiveTime),
391 DateTime(NaiveDateTime),
393 Complex(Complex64),
395 Missing,
397}
398
399impl std::fmt::Display for DataValue {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 match self {
402 DataValue::String(s) => write!(f, "{s}"),
403 DataValue::Integer(i) => write!(f, "{i}"),
404 DataValue::Float(v) => write!(f, "{v}"),
405 DataValue::Boolean(b) => write!(f, "{b}"),
406 DataValue::Date(d) => write!(f, "{}", d.format("%Y-%m-%d")),
407 DataValue::Time(t) => write!(f, "{}", t.format("%H:%M:%S%.f")),
408 DataValue::DateTime(dt) => write!(f, "{}", dt.format("%Y-%m-%dT%H:%M:%S%.f")),
409 DataValue::Complex(c) => {
410 if c.im >= 0.0 {
411 write!(f, "{}+{}i", c.re, c.im)
412 } else {
413 write!(f, "{}{}i", c.re, c.im)
414 }
415 }
416 DataValue::Missing => write!(f, "NA"),
417 }
418 }
419}
420
421#[allow(dead_code)]
423pub fn detect_column_types(data: &Array2<String>) -> Vec<ColumnType> {
424 let (rows, cols) = (data.shape()[0], data.shape()[1]);
425
426 if rows == 0 {
428 return vec![ColumnType::String; cols];
429 }
430
431 let mut col_types = vec![ColumnType::String; cols];
432
433 for col in 0..cols {
434 let mut is_int = true;
435 let mut is_float = true;
436 let mut is_bool = true;
437 let mut is_date = true;
438 let mut is_time = true;
439 let mut is_datetime = true;
440 let mut is_complex = true;
441 let mut non_empty_rows = 0;
442
443 for row in 0..rows {
444 let val = data[[row, col]].trim();
445
446 if val.is_empty() {
448 continue;
449 }
450
451 non_empty_rows += 1;
452
453 let lower_val = val.to_lowercase();
455 let is_valid_bool =
456 ["true", "false", "yes", "no", "1", "0"].contains(&lower_val.as_str());
457 if !is_valid_bool {
458 is_bool = false;
459 }
460
461 if is_int && val.parse::<i64>().is_err() {
463 is_int = false;
464 }
465
466 if is_float && val.parse::<f64>().is_err() {
468 is_float = false;
469 }
470
471 if is_date && NaiveDate::parse_from_str(val, "%Y-%m-%d").is_err() {
473 is_date = false;
474 }
475
476 if is_time
478 && NaiveTime::parse_from_str(val, "%H:%M:%S").is_err()
479 && NaiveTime::parse_from_str(val, "%H:%M:%S%.f").is_err()
480 {
481 is_time = false;
482 }
483
484 if is_datetime
486 && NaiveDateTime::parse_from_str(val, "%Y-%m-%dT%H:%M:%S").is_err()
487 && NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S").is_err()
488 && NaiveDateTime::parse_from_str(val, "%Y-%m-%dT%H:%M:%S%.f").is_err()
489 && NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S%.f").is_err()
490 {
491 is_datetime = false;
492 }
493
494 if is_complex {
496 is_complex = parse_complex(val).is_some();
498 }
499 }
500
501 if non_empty_rows < 2 {
503 is_date = false;
504 is_time = false;
505 is_datetime = false;
506 is_complex = false;
507 }
508
509 if is_bool {
511 col_types[col] = ColumnType::Boolean;
512 } else if is_int {
513 col_types[col] = ColumnType::Integer;
514 } else if is_float {
515 col_types[col] = ColumnType::Float;
516 } else if is_date {
517 col_types[col] = ColumnType::Date;
518 } else if is_time {
519 col_types[col] = ColumnType::Time;
520 } else if is_datetime {
521 col_types[col] = ColumnType::DateTime;
522 } else if is_complex {
523 col_types[col] = ColumnType::Complex;
524 }
525 }
526
527 col_types
528}
529
530#[allow(dead_code)]
532fn parse_complex(s: &str) -> Option<Complex64> {
533 if s.contains('i') {
538 let s = s.trim().replace(" ", "");
540
541 let s = if s.ends_with('i') {
543 &s[0..s.len() - 1]
544 } else {
545 return None;
546 };
547
548 let mut split_pos = None;
550 let mut in_first_number = true;
551
552 for (i, c) in s.chars().enumerate() {
553 if i == 0 {
554 continue; }
556
557 if c == '+' || c == '-' {
558 split_pos = Some((i, c));
559 break;
560 }
561
562 if !c.is_ascii_digit()
563 && c != '.'
564 && c != 'e'
565 && c != 'E'
566 && !(c == '-' && (s.as_bytes()[i - 1] == b'e' || s.as_bytes()[i - 1] == b'E'))
567 {
568 in_first_number = false;
569 }
570 }
571
572 if let Some((pos, sign)) = split_pos {
573 let real_part = s[0..pos].parse::<f64>().ok()?;
574 let imag_part = if sign == '+' {
575 s[pos + 1..].parse::<f64>().ok()?
576 } else {
577 -s[pos + 1..].parse::<f64>().ok()?
578 };
579
580 Some(Complex64::new(real_part, imag_part))
581 } else if in_first_number {
582 Some(Complex64::new(0.0, s.parse::<f64>().ok()?))
584 } else {
585 None
586 }
587 } else if s.starts_with('(') && s.ends_with(')') && s.contains(',') {
588 let contents = &s[1..s.len() - 1];
590 let parts: Vec<&str> = contents.split(',').collect();
591
592 if parts.len() == 2 {
593 let real = parts[0].trim().parse::<f64>().ok()?;
594 let imag = parts[1].trim().parse::<f64>().ok()?;
595 Some(Complex64::new(real, imag))
596 } else {
597 None
598 }
599 } else {
600 None
601 }
602}
603
604#[allow(dead_code)]
606fn convert_value(
607 value: &str,
608 col_type: ColumnType,
609 missing_values: &MissingValueOptions,
610) -> Result<DataValue> {
611 let trimmed = value.trim();
612
613 if missing_values
615 .values
616 .iter()
617 .any(|mv| mv.eq_ignore_ascii_case(trimmed))
618 {
619 if let (Some(fill), ColumnType::Float) = (missing_values.fill_value, col_type) {
620 return Ok(DataValue::Float(fill));
621 }
622 return Ok(DataValue::Missing);
623 }
624
625 if trimmed.is_empty() {
627 return Ok(DataValue::Missing);
628 }
629
630 match col_type {
632 ColumnType::String => Ok(DataValue::String(trimmed.to_string())),
633 ColumnType::Integer => match trimmed.parse::<i64>() {
634 Ok(val) => Ok(DataValue::Integer(val)),
635 Err(_) => Err(IoError::FormatError(format!(
636 "Cannot convert '{value}' to integer"
637 ))),
638 },
639 ColumnType::Float => match trimmed.parse::<f64>() {
640 Ok(val) => Ok(DataValue::Float(val)),
641 Err(_) => Err(IoError::FormatError(format!(
642 "Cannot convert '{value}' to float"
643 ))),
644 },
645 ColumnType::Boolean => {
646 let lower = trimmed.to_lowercase();
647 match lower.as_str() {
648 "true" | "yes" | "1" => Ok(DataValue::Boolean(true)),
649 "false" | "no" | "0" => Ok(DataValue::Boolean(false)),
650 _ => Err(IoError::FormatError(format!(
651 "Cannot convert '{value}' to boolean"
652 ))),
653 }
654 }
655 ColumnType::Date => match NaiveDate::parse_from_str(trimmed, "%Y-%m-%d") {
656 Ok(date) => Ok(DataValue::Date(date)),
657 Err(_) => Err(IoError::FormatError(format!(
658 "Cannot convert '{value}' to date (expected YYYY-MM-DD)"
659 ))),
660 },
661 ColumnType::Time => {
662 let result = NaiveTime::parse_from_str(trimmed, "%H:%M:%S")
663 .or_else(|_| NaiveTime::parse_from_str(trimmed, "%H:%M:%S%.f"));
664
665 match result {
666 Ok(time) => Ok(DataValue::Time(time)),
667 Err(_) => Err(IoError::FormatError(format!(
668 "Cannot convert '{value}' to time (expected HH:MM:SS[.f])"
669 ))),
670 }
671 }
672 ColumnType::DateTime => {
673 let result = NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S")
674 .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%d %H:%M:%S"))
675 .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f"))
676 .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%d %H:%M:%S%.f"));
677
678 match result {
679 Ok(dt) => Ok(DataValue::DateTime(dt)),
680 Err(_) => Err(IoError::FormatError(format!(
681 "Cannot convert '{value}' to datetime (expected YYYY-MM-DD[T ]HH:MM:SS[.f])"
682 ))),
683 }
684 }
685 ColumnType::Complex => match parse_complex(trimmed) {
686 Some(complex) => Ok(DataValue::Complex(complex)),
687 None => Err(IoError::FormatError(format!(
688 "Cannot convert '{value}' to complex number (expected a+bi or (a,b))"
689 ))),
690 },
691 }
692}
693
694#[allow(dead_code)]
728pub fn write_csv<P: AsRef<Path>, T: std::fmt::Display>(
729 path: P,
730 data: &Array2<T>,
731 headers: Option<&Vec<String>>,
732 config: Option<CsvWriterConfig>,
733) -> Result<()> {
734 let config = config.unwrap_or_default();
735
736 let shape = data.shape();
738 let (rows, cols) = (shape[0], shape[1]);
739
740 if let Some(hdrs) = headers {
742 if hdrs.len() != cols && config.write_header {
743 return Err(IoError::FormatError(format!(
744 "Header length ({}) does not match data width ({})",
745 hdrs.len(),
746 cols
747 )));
748 }
749 }
750
751 let mut file = File::create(path).map_err(|e| IoError::FileError(e.to_string()))?;
753
754 if let Some(hdrs) = headers {
756 if config.write_header {
757 let header_line = format_csv_line(hdrs, &config);
758 file.write_all(header_line.as_bytes())
759 .map_err(|e| IoError::FileError(e.to_string()))?;
760 file.write_all(config.line_ending.as_str().as_bytes())
761 .map_err(|e| IoError::FileError(e.to_string()))?;
762 }
763 }
764
765 for i in 0..rows {
767 let row: Vec<String> = (0..cols).map(|j| data[[i, j]].to_string()).collect();
768
769 let line = format_csv_line(&row, &config);
770 file.write_all(line.as_bytes())
771 .map_err(|e| IoError::FileError(e.to_string()))?;
772
773 if i < rows - 1 || config.line_ending == LineEnding::CRLF {
774 file.write_all(config.line_ending.as_str().as_bytes())
775 .map_err(|e| IoError::FileError(e.to_string()))?;
776 } else {
777 file.write_all(b"\n")
779 .map_err(|e| IoError::FileError(e.to_string()))?;
780 }
781 }
782
783 Ok(())
784}
785
786#[allow(dead_code)]
788fn format_csv_line(fields: &[String], config: &CsvWriterConfig) -> String {
789 let mut result = String::new();
790
791 for (i, field) in fields.iter().enumerate() {
792 let need_quotes = config.always_quote
793 || (config.quote_special
794 && (field.contains(config.delimiter)
795 || field.contains(config.quote_char)
796 || field.contains('\n')
797 || field.contains('\r')));
798
799 if need_quotes {
800 result.push(config.quote_char);
802
803 let escaped = field.replace(
805 config.quote_char,
806 &format!("{}{}", config.quote_char, config.quote_char),
807 );
808 result.push_str(&escaped);
809
810 result.push(config.quote_char);
812 } else {
813 result.push_str(field);
814 }
815
816 if i < fields.len() - 1 {
818 result.push(config.delimiter);
819 }
820 }
821
822 result
823}
824
825#[allow(dead_code)]
863pub fn read_csv_typed<P: AsRef<Path>>(
864 path: P,
865 config: Option<CsvReaderConfig>,
866 col_types: Option<&[ColumnType]>,
867 missing_values: Option<MissingValueOptions>,
868) -> Result<(Vec<String>, Vec<Vec<DataValue>>)> {
869 let (headers, string_data) = read_csv(path, config)?;
871
872 if string_data.shape()[0] == 0 || string_data.shape()[1] == 0 {
874 return Ok((headers, Vec::new()));
875 }
876
877 let col_types_vec = match col_types {
879 Some(types) => {
880 if types.len() != string_data.shape()[1] {
881 return Err(IoError::FormatError(format!(
882 "Number of column types ({}) does not match data width ({})",
883 types.len(),
884 string_data.shape()[1]
885 )));
886 }
887 types.to_vec()
888 }
889 None => detect_column_types(&string_data),
890 };
891
892 let missing_opts = missing_values.unwrap_or_default();
893
894 let mut typed_data = Vec::with_capacity(string_data.shape()[0]);
896
897 for i in 0..string_data.shape()[0] {
898 let mut row = Vec::with_capacity(string_data.shape()[1]);
899
900 for j in 0..string_data.shape()[1] {
901 let value = convert_value(&string_data[[i, j]], col_types_vec[j], &missing_opts)?;
902 row.push(value);
903 }
904
905 typed_data.push(row);
906 }
907
908 Ok((headers, typed_data))
909}
910
911#[allow(dead_code)]
942pub fn read_csv_chunked<P, F>(
943 path: P,
944 config: Option<CsvReaderConfig>,
945 chunk_size: usize,
946 mut callback: F,
947) -> Result<()>
948where
949 P: AsRef<Path>,
950 F: FnMut(&[String], &Array2<String>) -> bool,
951{
952 let config = config.unwrap_or_default();
953
954 let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
955 let reader = BufReader::new(file);
956 let mut lines = reader.lines();
957
958 for _ in 0..config.skip_rows {
960 if lines.next().is_none() {
961 return Err(IoError::FormatError("Not enough rows in file".to_string()));
962 }
963 }
964
965 let headers = if config.has_header {
967 match lines.next() {
968 Some(Ok(line)) => parse_csv_line(&line, &config),
969 Some(Err(e)) => return Err(IoError::FileError(e.to_string())),
970 None => return Err(IoError::FormatError("Empty file".to_string())),
971 }
972 } else {
973 Vec::new()
974 };
975
976 let mut buffer = Vec::with_capacity(chunk_size);
977 let mut num_cols = 0;
978
979 for line_result in lines {
981 let line = line_result.map_err(|e| IoError::FileError(e.to_string()))?;
983
984 if let Some(comment_char) = config.comment_char {
985 if line.trim().starts_with(comment_char) {
986 continue;
987 }
988 }
989
990 if line.trim().is_empty() {
991 continue;
992 }
993
994 let row = parse_csv_line(&line, &config);
996
997 if buffer.is_empty() {
999 num_cols = row.len();
1000 } else if row.len() != num_cols {
1001 return Err(IoError::FormatError(format!(
1002 "Inconsistent number of columns: got {}, expected {}",
1003 row.len(),
1004 num_cols
1005 )));
1006 }
1007
1008 buffer.push(row);
1009
1010 if buffer.len() >= chunk_size
1012 && !process_chunk(&headers, &mut buffer, num_cols, &mut callback)?
1013 {
1014 return Ok(()); }
1016 }
1017
1018 if !buffer.is_empty() {
1020 process_chunk(&headers, &mut buffer, num_cols, &mut callback)?;
1021 }
1022
1023 Ok(())
1024}
1025
1026#[allow(dead_code)]
1028fn process_chunk<F>(
1029 headers: &[String],
1030 buffer: &mut Vec<Vec<String>>,
1031 num_cols: usize,
1032 callback: &mut F,
1033) -> Result<bool>
1034where
1035 F: FnMut(&[String], &Array2<String>) -> bool,
1036{
1037 let num_rows = buffer.len();
1038 let mut data = Array2::<String>::from_elem((num_rows, num_cols), String::new());
1039
1040 for (i, row) in buffer.iter().enumerate() {
1041 for (j, value) in row.iter().enumerate() {
1042 data[[i, j]] = value.clone();
1043 }
1044 }
1045
1046 buffer.clear();
1047
1048 Ok(callback(headers, &data))
1049}
1050
1051#[allow(dead_code)]
1094pub fn write_csv_typed<P: AsRef<Path>>(
1095 path: P,
1096 data: &[Vec<DataValue>],
1097 headers: Option<&Vec<String>>,
1098 config: Option<CsvWriterConfig>,
1099) -> Result<()> {
1100 let config = config.unwrap_or_default();
1101
1102 if data.is_empty() {
1103 return Err(IoError::FormatError("No data provided".to_string()));
1104 }
1105
1106 let num_cols = data[0].len();
1108 for (i, row) in data.iter().enumerate().skip(1) {
1109 if row.len() != num_cols {
1110 return Err(IoError::FormatError(format!(
1111 "Row {} has {} columns, expected {}",
1112 i,
1113 row.len(),
1114 num_cols
1115 )));
1116 }
1117 }
1118
1119 if let Some(hdrs) = headers {
1121 if hdrs.len() != num_cols && config.write_header {
1122 return Err(IoError::FormatError(format!(
1123 "Header length ({}) does not match data width ({})",
1124 hdrs.len(),
1125 num_cols
1126 )));
1127 }
1128 }
1129
1130 let file = File::create(path).map_err(|e| IoError::FileError(e.to_string()))?;
1132 let mut writer = BufWriter::new(file);
1133
1134 if let Some(hdrs) = headers {
1136 if config.write_header {
1137 let header_line = format_csv_line(hdrs, &config);
1138 writer
1139 .write_all(header_line.as_bytes())
1140 .map_err(|e| IoError::FileError(e.to_string()))?;
1141 writer
1142 .write_all(config.line_ending.as_str().as_bytes())
1143 .map_err(|e| IoError::FileError(e.to_string()))?;
1144 }
1145 }
1146
1147 for (i, row) in data.iter().enumerate() {
1149 let string_row: Vec<String> = row.iter().map(|val| val.to_string()).collect();
1150
1151 let line = format_csv_line(&string_row, &config);
1152 writer
1153 .write_all(line.as_bytes())
1154 .map_err(|e| IoError::FileError(e.to_string()))?;
1155
1156 if i < data.len() - 1 || config.line_ending == LineEnding::CRLF {
1157 writer
1158 .write_all(config.line_ending.as_str().as_bytes())
1159 .map_err(|e| IoError::FileError(e.to_string()))?;
1160 } else {
1161 writer
1163 .write_all(b"\n")
1164 .map_err(|e| IoError::FileError(e.to_string()))?;
1165 }
1166 }
1167
1168 writer
1170 .flush()
1171 .map_err(|e| IoError::FileError(e.to_string()))?;
1172
1173 Ok(())
1174}
1175
1176#[allow(dead_code)]
1203pub fn write_csv_columns<P: AsRef<Path>, T: std::fmt::Display + Clone>(
1204 path: P,
1205 columns: &[Array1<T>],
1206 headers: Option<&Vec<String>>,
1207 config: Option<CsvWriterConfig>,
1208) -> Result<()> {
1209 if columns.is_empty() {
1210 return Err(IoError::FormatError("No columns provided".to_string()));
1211 }
1212
1213 let num_rows = columns[0].len();
1215 for (i, col) in columns.iter().enumerate().skip(1) {
1216 if col.len() != num_rows {
1217 return Err(IoError::FormatError(format!(
1218 "Column {} has length {}, expected {}",
1219 i,
1220 col.len(),
1221 num_rows
1222 )));
1223 }
1224 }
1225
1226 if let Some(hdrs) = headers {
1228 if hdrs.len() != columns.len() {
1229 return Err(IoError::FormatError(format!(
1230 "Header length ({}) does not match column count ({})",
1231 hdrs.len(),
1232 columns.len()
1233 )));
1234 }
1235 }
1236
1237 let num_cols = columns.len();
1239 let mut data = Array2::<String>::from_elem((num_rows, num_cols), String::new());
1240
1241 for (j, col) in columns.iter().enumerate() {
1242 for (i, val) in col.iter().enumerate() {
1243 data[[i, j]] = val.to_string();
1244 }
1245 }
1246
1247 write_csv(path, &data, headers, config)
1249}
1250
1251use scirs2_core::parallel_ops::*;
1256use std::sync::{Arc, Mutex};
1257
1258#[derive(Debug, Clone)]
1260pub struct StreamingCsvConfig {
1261 pub csv_config: CsvReaderConfig,
1263 pub chunk_size: usize,
1265 pub num_workers: usize,
1267 pub buffer_size: usize,
1269 pub memory_limit: usize,
1271}
1272
1273impl Default for StreamingCsvConfig {
1274 fn default() -> Self {
1275 Self {
1276 csv_config: CsvReaderConfig::default(),
1277 chunk_size: 10000, num_workers: 0, buffer_size: 64 * 1024, memory_limit: 512 * 1024 * 1024, }
1282 }
1283}
1284
1285pub struct StreamingCsvReader<R: BufRead> {
1287 reader: R,
1288 config: StreamingCsvConfig,
1289 headers: Option<Vec<String>>,
1290 current_line: usize,
1291 buffer: Vec<String>,
1292 finished: bool,
1293}
1294
1295impl<R: BufRead> StreamingCsvReader<R> {
1296 pub fn new(reader: R, config: StreamingCsvConfig) -> Result<Self> {
1298 let mut reader = Self {
1299 reader,
1300 config,
1301 headers: None,
1302 current_line: 0,
1303 buffer: Vec::new(),
1304 finished: false,
1305 };
1306
1307 if reader.config.csv_config.has_header {
1309 reader.read_headers()?;
1310 }
1311
1312 Ok(reader)
1313 }
1314
1315 fn read_headers(&mut self) -> Result<()> {
1317 for _ in 0..self.config.csv_config.skip_rows {
1319 let mut line = String::new();
1320 if self
1321 .reader
1322 .read_line(&mut line)
1323 .map_err(|e| IoError::FileError(e.to_string()))?
1324 == 0
1325 {
1326 return Err(IoError::FormatError("Not enough rows in file".to_string()));
1327 }
1328 self.current_line += 1;
1329 }
1330
1331 let mut header_line = String::new();
1333 if self
1334 .reader
1335 .read_line(&mut header_line)
1336 .map_err(|e| IoError::FileError(e.to_string()))?
1337 == 0
1338 {
1339 return Err(IoError::FormatError("Empty file".to_string()));
1340 }
1341
1342 self.headers = Some(parse_csv_line(header_line.trim(), &self.config.csv_config));
1343 self.current_line += 1;
1344 Ok(())
1345 }
1346
1347 pub fn headers(&self) -> Option<&Vec<String>> {
1349 self.headers.as_ref()
1350 }
1351
1352 pub fn read_chunk(&mut self) -> Result<Option<Array2<String>>> {
1354 if self.finished {
1355 return Ok(None);
1356 }
1357
1358 self.buffer.clear();
1359 let mut rows_read = 0;
1360 let mut line = String::new();
1361
1362 while rows_read < self.config.chunk_size {
1364 line.clear();
1365 let bytes_read = self
1366 .reader
1367 .read_line(&mut line)
1368 .map_err(|e| IoError::FileError(e.to_string()))?;
1369
1370 if bytes_read == 0 {
1371 self.finished = true;
1373 break;
1374 }
1375
1376 let trimmed_line = line.trim();
1377
1378 if trimmed_line.is_empty() {
1380 continue;
1381 }
1382
1383 if let Some(comment_char) = self.config.csv_config.comment_char {
1384 if trimmed_line.starts_with(comment_char) {
1385 continue;
1386 }
1387 }
1388
1389 self.buffer.push(trimmed_line.to_string());
1390 rows_read += 1;
1391 self.current_line += 1;
1392
1393 if self.buffer.len() * line.len() > self.config.memory_limit {
1395 break;
1396 }
1397 }
1398
1399 if self.buffer.is_empty() {
1400 return Ok(None);
1401 }
1402
1403 let parsed_rows: Vec<Vec<String>> = self
1405 .buffer
1406 .iter()
1407 .map(|line| parse_csv_line(line, &self.config.csv_config))
1408 .collect();
1409
1410 if parsed_rows.is_empty() {
1411 return Ok(None);
1412 }
1413
1414 let num_cols = parsed_rows[0].len();
1416 for (i, row) in parsed_rows.iter().enumerate() {
1417 if row.len() != num_cols {
1418 return Err(IoError::FormatError(format!(
1419 "Inconsistent columns at line {line}: got {actual}, expected {expected}",
1420 line = self.current_line - self.buffer.len() + i,
1421 actual = row.len(),
1422 expected = num_cols
1423 )));
1424 }
1425 }
1426
1427 let num_rows = parsed_rows.len();
1429 let mut data = Array2::from_elem((num_rows, num_cols), String::new());
1430
1431 for (i, row) in parsed_rows.iter().enumerate() {
1432 for (j, value) in row.iter().enumerate() {
1433 data[[i, j]] = value.clone();
1434 }
1435 }
1436
1437 Ok(Some(data))
1438 }
1439
1440 pub fn current_line(&self) -> usize {
1442 self.current_line
1443 }
1444
1445 pub fn is_finished(&self) -> bool {
1447 self.finished
1448 }
1449}
1450
1451#[allow(dead_code)]
1453pub fn streaming_reader_from_file<P: AsRef<Path>>(
1454 path: P,
1455 config: StreamingCsvConfig,
1456) -> Result<StreamingCsvReader<BufReader<File>>> {
1457 let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
1458 let reader = BufReader::with_capacity(config.buffer_size, file);
1459 StreamingCsvReader::new(reader, config)
1460}
1461
1462#[derive(Debug, Clone)]
1464pub struct StreamingStats {
1465 pub rows_processed: usize,
1467 pub chunks_processed: usize,
1469 pub total_time_ms: f64,
1471 pub rows_per_second: f64,
1473 pub peak_memory_bytes: usize,
1475 pub error_count: usize,
1477}
1478
1479#[allow(dead_code)]
1481pub fn process_csv_streaming<P: AsRef<Path>, F, R>(
1482 path: P,
1483 config: StreamingCsvConfig,
1484 mut processor: F,
1485) -> Result<(Vec<R>, StreamingStats)>
1486where
1487 F: FnMut(&Array2<String>, Option<&Vec<String>>) -> Result<R> + Send + Sync,
1488 R: Send,
1489{
1490 let start_time = std::time::Instant::now();
1491 let mut reader = streaming_reader_from_file(path, config)?;
1492
1493 let mut results = Vec::new();
1494 let mut stats = StreamingStats {
1495 rows_processed: 0,
1496 chunks_processed: 0,
1497 total_time_ms: 0.0,
1498 rows_per_second: 0.0,
1499 peak_memory_bytes: 0,
1500 error_count: 0,
1501 };
1502
1503 let headers = reader.headers().cloned();
1504
1505 while let Some(chunk) = reader.read_chunk()? {
1506 match processor(&chunk, headers.as_ref()) {
1507 Ok(result) => {
1508 results.push(result);
1509 stats.rows_processed += chunk.nrows();
1510 stats.chunks_processed += 1;
1511 }
1512 Err(_) => {
1513 stats.error_count += 1;
1514 }
1515 }
1516
1517 let current_memory = chunk.len() * std::mem::size_of::<String>();
1519 if current_memory > stats.peak_memory_bytes {
1520 stats.peak_memory_bytes = current_memory;
1521 }
1522 }
1523
1524 let elapsed = start_time.elapsed();
1525 stats.total_time_ms = elapsed.as_secs_f64() * 1000.0;
1526 stats.rows_per_second = stats.rows_processed as f64 / elapsed.as_secs_f64();
1527
1528 Ok((results, stats))
1529}
1530
1531#[allow(dead_code)]
1533pub fn process_csv_streaming_parallel<P: AsRef<Path>, F, R>(
1534 path: P,
1535 config: StreamingCsvConfig,
1536 processor: F,
1537) -> Result<(Vec<R>, StreamingStats)>
1538where
1539 F: Fn(&Array2<String>, Option<&Vec<String>>) -> Result<R> + Send + Sync + Clone,
1540 R: Send + 'static,
1541{
1542 let start_time = std::time::Instant::now();
1543 let mut reader = streaming_reader_from_file(path, config)?;
1544
1545 let headers = reader.headers().cloned();
1546 let mut chunks = Vec::new();
1547
1548 while let Some(chunk) = reader.read_chunk()? {
1550 chunks.push(chunk);
1551 }
1552
1553 if chunks.is_empty() {
1554 return Ok((
1555 Vec::new(),
1556 StreamingStats {
1557 rows_processed: 0,
1558 chunks_processed: 0,
1559 total_time_ms: 0.0,
1560 rows_per_second: 0.0,
1561 peak_memory_bytes: 0,
1562 error_count: 0,
1563 },
1564 ));
1565 }
1566
1567 let error_count = Arc::new(Mutex::new(0));
1569 let peak_memory = Arc::new(Mutex::new(0));
1570
1571 let results: Vec<R> = chunks
1572 .into_par_iter()
1573 .filter_map(|chunk| {
1574 let memory_usage = chunk.len() * std::mem::size_of::<String>();
1576 {
1577 let mut peak = peak_memory.lock().unwrap();
1578 if memory_usage > *peak {
1579 *peak = memory_usage;
1580 }
1581 }
1582
1583 match processor(&chunk, headers.as_ref()) {
1584 Ok(result) => Some(result),
1585 Err(_) => {
1586 *error_count.lock().unwrap() += 1;
1587 None
1588 }
1589 }
1590 })
1591 .collect();
1592
1593 let elapsed = start_time.elapsed();
1594 let total_rows: usize = results.len(); let stats = StreamingStats {
1597 rows_processed: total_rows,
1598 chunks_processed: results.len(),
1599 total_time_ms: elapsed.as_secs_f64() * 1000.0,
1600 rows_per_second: total_rows as f64 / elapsed.as_secs_f64(),
1601 peak_memory_bytes: *peak_memory.lock().unwrap(),
1602 error_count: *error_count.lock().unwrap(),
1603 };
1604
1605 Ok((results, stats))
1606}
1607
1608#[allow(dead_code)]
1610pub fn read_csv_numeric_streaming<P: AsRef<Path>>(
1611 path: P,
1612 config: StreamingCsvConfig,
1613) -> Result<(Option<Vec<String>>, Vec<Array2<f64>>, StreamingStats)> {
1614 let (chunks, stats) = process_csv_streaming(path, config, |chunk, _headers| {
1615 let shape = chunk.shape();
1617 let mut numeric_chunk = Array2::<f64>::zeros((shape[0], shape[1]));
1618
1619 for i in 0..shape[0] {
1620 for j in 0..shape[1] {
1621 let value = chunk[[i, j]].parse::<f64>().map_err(|_| {
1622 IoError::FormatError(format!(
1623 "Could not convert '{value}' to number at [{row}, {col}]",
1624 value = chunk[[i, j]],
1625 row = i,
1626 col = j
1627 ))
1628 })?;
1629 numeric_chunk[[i, j]] = value;
1630 }
1631 }
1632
1633 Ok(numeric_chunk)
1634 })?;
1635
1636 let headers = chunks.first().and({
1638 None
1641 });
1642
1643 Ok((headers, chunks, stats))
1644}
1645
1646#[allow(dead_code)]
1648pub fn aggregate_csv_statistics<P: AsRef<Path>>(
1649 path: P,
1650 config: StreamingCsvConfig,
1651) -> Result<Vec<ColumnStats>> {
1652 let mut column_stats: Vec<Option<ColumnStats>> = Vec::new();
1653
1654 let _results_stats = process_csv_streaming(path, config, |chunk, _headers| {
1655 let shape = chunk.shape();
1656
1657 if column_stats.is_empty() {
1659 column_stats = vec![None; shape[1]];
1660 }
1661
1662 for col_idx in 0..shape[1] {
1664 let mut values = Vec::new();
1665 let mut non_numeric_count = 0;
1666
1667 for row_idx in 0..shape[0] {
1668 let cell_value = &chunk[[row_idx, col_idx]];
1669 if let Ok(numeric_value) = cell_value.parse::<f64>() {
1670 values.push(numeric_value);
1671 } else {
1672 non_numeric_count += 1;
1673 }
1674 }
1675
1676 if !values.is_empty() {
1677 let current_stats = ColumnStats::from_values(&values, non_numeric_count);
1678
1679 match &column_stats[col_idx] {
1680 None => column_stats[col_idx] = Some(current_stats),
1681 Some(existing) => {
1682 column_stats[col_idx] = Some(existing.merge(¤t_stats));
1683 }
1684 }
1685 }
1686 }
1687
1688 Ok(())
1689 })?;
1690
1691 Ok(column_stats.into_iter().flatten().collect())
1692}
1693
1694#[derive(Debug, Clone)]
1696pub struct ColumnStats {
1697 pub column_index: usize,
1699 pub total_count: usize,
1701 pub non_numeric_count: usize,
1703 pub min_value: Option<f64>,
1705 pub max_value: Option<f64>,
1707 pub mean_value: Option<f64>,
1709 pub std_dev: Option<f64>,
1711}
1712
1713impl ColumnStats {
1714 fn from_values(values: &[f64], non_numericcount: usize) -> Self {
1716 if values.is_empty() {
1717 return Self {
1718 column_index: 0,
1719 total_count: non_numericcount,
1720 non_numeric_count: non_numericcount,
1721 min_value: None,
1722 max_value: None,
1723 mean_value: None,
1724 std_dev: None,
1725 };
1726 }
1727
1728 let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1729 let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1730 let mean = values.iter().sum::<f64>() / values.len() as f64;
1731
1732 let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
1733 let std_dev = variance.sqrt();
1734
1735 Self {
1736 column_index: 0,
1737 total_count: values.len() + non_numericcount,
1738 non_numeric_count: non_numericcount,
1739 min_value: Some(min_val),
1740 max_value: Some(max_val),
1741 mean_value: Some(mean),
1742 std_dev: Some(std_dev),
1743 }
1744 }
1745
1746 fn merge(&self, other: &Self) -> Self {
1748 let total_numeric_self = self.total_count - self.non_numeric_count;
1749 let total_numeric_other = other.total_count - other.non_numeric_count;
1750 let combined_numeric = total_numeric_self + total_numeric_other;
1751
1752 if combined_numeric == 0 {
1753 return Self {
1754 column_index: self.column_index,
1755 total_count: self.total_count + other.total_count,
1756 non_numeric_count: self.non_numeric_count + other.non_numeric_count,
1757 min_value: None,
1758 max_value: None,
1759 mean_value: None,
1760 std_dev: None,
1761 };
1762 }
1763
1764 let min_val = match (self.min_value, other.min_value) {
1765 (Some(a), Some(b)) => Some(a.min(b)),
1766 (Some(a), None) => Some(a),
1767 (None, Some(b)) => Some(b),
1768 (None, None) => None,
1769 };
1770
1771 let max_val = match (self.max_value, other.max_value) {
1772 (Some(a), Some(b)) => Some(a.max(b)),
1773 (Some(a), None) => Some(a),
1774 (None, Some(b)) => Some(b),
1775 (None, None) => None,
1776 };
1777
1778 let combined_mean = match (self.mean_value, other.mean_value) {
1780 (Some(mean1), Some(mean2)) => {
1781 let w1 = total_numeric_self as f64;
1782 let w2 = total_numeric_other as f64;
1783 Some((mean1 * w1 + mean2 * w2) / (w1 + w2))
1784 }
1785 (Some(mean), None) => Some(mean),
1786 (None, Some(mean)) => Some(mean),
1787 (None, None) => None,
1788 };
1789
1790 let combined_std = match (self.std_dev, other.std_dev) {
1793 (Some(std1), Some(std2)) => Some((std1 + std2) / 2.0),
1794 (Some(std), None) | (None, Some(std)) => Some(std),
1795 (None, None) => None,
1796 };
1797
1798 Self {
1799 column_index: self.column_index,
1800 total_count: self.total_count + other.total_count,
1801 non_numeric_count: self.non_numeric_count + other.non_numeric_count,
1802 min_value: min_val,
1803 max_value: max_val,
1804 mean_value: combined_mean,
1805 std_dev: combined_std,
1806 }
1807 }
1808}