scirs2_io/csv/
mod.rs

1//! CSV file format support
2//!
3//! This module provides functionality for reading and writing CSV (Comma-Separated Values)
4//! files, commonly used for storing tabular data.
5//!
6//! Features:
7//! - Reading and writing CSV files with various configuration options
8//! - Support for custom delimiters, quotes, and line endings
9//! - Handling of missing values and type conversions
10//! - Memory-efficient processing of large files
11//! - Column-based I/O operations
12
13use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
14use scirs2_core::ndarray::{Array1, Array2};
15use scirs2_core::numeric::Complex64;
16use std::fs::File;
17use std::io::{BufRead, BufReader, BufWriter, Write};
18use std::path::Path;
19
20use crate::error::{IoError, Result};
21
22/// CSV reader configuration
23#[derive(Debug, Clone)]
24pub struct CsvReaderConfig {
25    /// Delimiter character (default: ',')
26    pub delimiter: char,
27    /// Quote character (default: '"')
28    pub quote_char: char,
29    /// Whether to trim whitespace from fields (default: false)
30    pub trim: bool,
31    /// Whether the file has a header row (default: true)
32    pub has_header: bool,
33    /// Comment character, lines starting with this will be ignored (default: None)
34    pub comment_char: Option<char>,
35    /// Skip rows at the beginning of the file (default: 0)
36    pub skip_rows: usize,
37    /// Maximum number of rows to read (default: None = all rows)
38    pub max_rows: Option<usize>,
39}
40
41impl Default for CsvReaderConfig {
42    fn default() -> Self {
43        Self {
44            delimiter: ',',
45            quote_char: '"',
46            trim: false,
47            has_header: true,
48            comment_char: None,
49            skip_rows: 0,
50            max_rows: None,
51        }
52    }
53}
54
55/// Read a CSV file into a 2D array of strings
56///
57/// # Arguments
58///
59/// * `path` - Path to the CSV file
60/// * `config` - Optional CSV reader configuration
61///
62/// # Returns
63///
64/// * `Result<(Vec<String>, Array2<String>)>` - Header labels and data as strings
65///
66/// # Examples
67///
68/// ```no_run
69/// use scirs2_io::csv::{read_csv, CsvReaderConfig};
70///
71/// // Read with default configuration
72/// let (headers, data) = read_csv("data.csv", None).unwrap();
73/// println!("Headers: {:?}", headers);
74/// println!("Data shape: {:?}", data.shape());
75///
76/// // Read with custom configuration
77/// let config = CsvReaderConfig {
78///     delimiter: ';',
79///     has_header: false,
80///     ..Default::default()
81/// };
82/// let (_, data) = read_csv("data.csv", Some(config)).unwrap();
83/// ```
84#[allow(dead_code)]
85pub fn read_csv<P: AsRef<Path>>(
86    path: P,
87    config: Option<CsvReaderConfig>,
88) -> Result<(Vec<String>, Array2<String>)> {
89    let config = config.unwrap_or_default();
90
91    let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
92    let reader = BufReader::new(file);
93
94    let mut lines = reader.lines();
95    let mut rows = Vec::new();
96
97    // Skip rows if needed
98    for _ in 0..config.skip_rows {
99        if lines.next().is_none() {
100            return Err(IoError::FormatError("Not enough rows in file".to_string()));
101        }
102    }
103
104    // Read header if present
105    let headers = if config.has_header {
106        match lines.next() {
107            Some(Ok(line)) => parse_csv_line(&line, &config),
108            Some(Err(e)) => return Err(IoError::FileError(e.to_string())),
109            None => return Err(IoError::FormatError("Empty file".to_string())),
110        }
111    } else {
112        Vec::new()
113    };
114
115    // Read data
116    let mut row_count = 0;
117    for line_result in lines {
118        // Break if we've read enough rows
119        if let Some(max) = config.max_rows {
120            if row_count >= max {
121                break;
122            }
123        }
124
125        let line = line_result.map_err(|e| IoError::FileError(e.to_string()))?;
126
127        // Skip comment lines
128        if let Some(comment_char) = config.comment_char {
129            if line.trim().starts_with(comment_char) {
130                continue;
131            }
132        }
133
134        // Skip empty lines
135        if line.trim().is_empty() {
136            continue;
137        }
138
139        let row = parse_csv_line(&line, &config);
140        rows.push(row);
141        row_count += 1;
142    }
143
144    // Check if we have any data
145    if rows.is_empty() {
146        return Err(IoError::FormatError("No data rows in file".to_string()));
147    }
148
149    // Determine the number of columns from the first row
150    let num_cols = rows[0].len();
151
152    // Ensure all rows have the same number of columns
153    for (i, row) in rows.iter().enumerate() {
154        if row.len() != num_cols {
155            return Err(IoError::FormatError(format!(
156                "Inconsistent number of columns: row {row_num} has {actual_cols} columns, expected {expected_cols}",
157                row_num = i + 1,
158                actual_cols = row.len(),
159                expected_cols = num_cols
160            )));
161        }
162    }
163
164    // Convert to Array2
165    let num_rows = rows.len();
166    let mut data = Array2::from_elem((num_rows, num_cols), String::new());
167
168    for (i, row) in rows.iter().enumerate() {
169        for (j, value) in row.iter().enumerate() {
170            data[[i, j]] = value.clone();
171        }
172    }
173
174    Ok((headers, data))
175}
176
177/// Parse a CSV line into fields
178#[allow(dead_code)]
179fn parse_csv_line(line: &str, config: &CsvReaderConfig) -> Vec<String> {
180    let mut fields = Vec::new();
181    let mut field = String::new();
182    let mut in_quotes = false;
183    let mut chars = line.chars().peekable();
184
185    while let Some(c) = chars.next() {
186        // Handle quotes
187        if c == config.quote_char {
188            // Check for escaped quotes (double quotes)
189            if in_quotes && chars.peek() == Some(&config.quote_char) {
190                chars.next(); // Consume the second quote
191                field.push(config.quote_char);
192            } else {
193                in_quotes = !in_quotes;
194            }
195        }
196        // Handle delimiters
197        else if c == config.delimiter && !in_quotes {
198            let processed_field = if config.trim {
199                field.trim().to_string()
200            } else {
201                field
202            };
203            fields.push(processed_field);
204            field = String::new();
205        }
206        // Handle regular characters
207        else {
208            field.push(c);
209        }
210    }
211
212    // Add the last field
213    let processed_field = if config.trim {
214        field.trim().to_string()
215    } else {
216        field
217    };
218    fields.push(processed_field);
219
220    fields
221}
222
223/// Read a CSV file and convert to numeric arrays
224///
225/// # Arguments
226///
227/// * `path` - Path to the CSV file
228/// * `config` - Optional CSV reader configuration
229///
230/// # Returns
231///
232/// * `Result<(Vec<String>, Array2<f64>)>` - Header labels and data as f64 values
233///
234/// # Examples
235///
236/// ```no_run
237/// use scirs2_io::csv::{read_csv_numeric, CsvReaderConfig};
238///
239/// let (headers, data) = read_csv_numeric("data.csv", None).unwrap();
240/// println!("Numeric data shape: {:?}", data.shape());
241/// ```
242#[allow(dead_code)]
243pub fn read_csv_numeric<P: AsRef<Path>>(
244    path: P,
245    config: Option<CsvReaderConfig>,
246) -> Result<(Vec<String>, Array2<f64>)> {
247    let (headers, string_data) = read_csv(path, config)?;
248
249    let shape = string_data.shape();
250    let mut numeric_data = Array2::<f64>::zeros((shape[0], shape[1]));
251
252    for i in 0..shape[0] {
253        for j in 0..shape[1] {
254            let value = string_data[[i, j]].parse::<f64>().map_err(|_| {
255                IoError::FormatError(format!(
256                    "Could not convert value '{value}' at position [{row}, {col}] to number",
257                    value = string_data[[i, j]],
258                    row = i,
259                    col = j
260                ))
261            })?;
262            numeric_data[[i, j]] = value;
263        }
264    }
265
266    Ok((headers, numeric_data))
267}
268
269/// CSV writer configuration
270#[derive(Debug, Clone)]
271pub struct CsvWriterConfig {
272    /// Delimiter character (default: ',')
273    pub delimiter: char,
274    /// Quote character (default: '"')
275    pub quote_char: char,
276    /// Always quote fields (default: false)
277    pub always_quote: bool,
278    /// Quote fields containing special characters (default: true)
279    pub quote_special: bool,
280    /// Write header row (default: true)
281    pub write_header: bool,
282    /// Line ending (default: LF)
283    pub line_ending: LineEnding,
284}
285
286impl Default for CsvWriterConfig {
287    fn default() -> Self {
288        Self {
289            delimiter: ',',
290            quote_char: '"',
291            always_quote: false,
292            quote_special: true,
293            write_header: true,
294            line_ending: LineEnding::default(),
295        }
296    }
297}
298
299/// Line ending options for CSV files
300#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
301pub enum LineEnding {
302    /// LF (Unix) line ending: \n
303    #[default]
304    LF,
305    /// CRLF (Windows) line ending: \r\n
306    CRLF,
307}
308
309impl LineEnding {
310    fn as_str(&self) -> &'static str {
311        match self {
312            LineEnding::LF => "\n",
313            LineEnding::CRLF => "\r\n",
314        }
315    }
316}
317
318/// Types of missing values that can be recognized in CSV files
319#[derive(Debug, Clone)]
320pub struct MissingValueOptions {
321    /// Strings to interpret as missing values (default: NA, N/A, NaN, null, "")
322    pub values: Vec<String>,
323    /// Replace missing values with a default value (default: None)
324    pub fill_value: Option<f64>,
325}
326
327impl Default for MissingValueOptions {
328    fn default() -> Self {
329        Self {
330            values: vec![
331                "NA".to_string(),
332                "N/A".to_string(),
333                "NaN".to_string(),
334                "null".to_string(),
335                "".to_string(),
336            ],
337            fill_value: None,
338        }
339    }
340}
341
342/// Column data type specification for type conversion
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum ColumnType {
345    /// String type (default)
346    String,
347    /// Integer type (i64)
348    Integer,
349    /// Float type (f64)
350    Float,
351    /// Boolean type (true/false, yes/no, 1/0)
352    Boolean,
353    /// Date type (YYYY-MM-DD)
354    Date,
355    /// Time type (HH:MM:SS)
356    Time,
357    /// DateTime type (YYYY-MM-DDThh:mm:ss)
358    DateTime,
359    /// Complex number (real+imagi)
360    Complex,
361}
362
363/// Column specification for CSV reading
364#[derive(Debug, Clone)]
365pub struct ColumnSpec {
366    /// Column index
367    pub index: usize,
368    /// Column name (optional)
369    pub name: Option<String>,
370    /// Column data type
371    pub dtype: ColumnType,
372    /// Custom missing values for this column
373    pub missing_values: Option<MissingValueOptions>,
374}
375
376/// Data value type for mixed type columns
377#[derive(Debug, Clone)]
378pub enum DataValue {
379    /// String value
380    String(String),
381    /// Integer value
382    Integer(i64),
383    /// Float value
384    Float(f64),
385    /// Boolean value
386    Boolean(bool),
387    /// Date value (year, month, day)
388    Date(NaiveDate),
389    /// Time value (hour, minute, second)
390    Time(NaiveTime),
391    /// DateTime value
392    DateTime(NaiveDateTime),
393    /// Complex number value (real, imaginary)
394    Complex(Complex64),
395    /// Missing value
396    Missing,
397}
398
399impl std::fmt::Display for DataValue {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        match self {
402            DataValue::String(s) => write!(f, "{s}"),
403            DataValue::Integer(i) => write!(f, "{i}"),
404            DataValue::Float(v) => write!(f, "{v}"),
405            DataValue::Boolean(b) => write!(f, "{b}"),
406            DataValue::Date(d) => write!(f, "{}", d.format("%Y-%m-%d")),
407            DataValue::Time(t) => write!(f, "{}", t.format("%H:%M:%S%.f")),
408            DataValue::DateTime(dt) => write!(f, "{}", dt.format("%Y-%m-%dT%H:%M:%S%.f")),
409            DataValue::Complex(c) => {
410                if c.im >= 0.0 {
411                    write!(f, "{}+{}i", c.re, c.im)
412                } else {
413                    write!(f, "{}{}i", c.re, c.im)
414                }
415            }
416            DataValue::Missing => write!(f, "NA"),
417        }
418    }
419}
420
421/// Automatically detect column types from data
422#[allow(dead_code)]
423pub fn detect_column_types(data: &Array2<String>) -> Vec<ColumnType> {
424    let (rows, cols) = (data.shape()[0], data.shape()[1]);
425
426    // Default to String if we can't determine type
427    if rows == 0 {
428        return vec![ColumnType::String; cols];
429    }
430
431    let mut col_types = vec![ColumnType::String; cols];
432
433    for col in 0..cols {
434        let mut is_int = true;
435        let mut is_float = true;
436        let mut is_bool = true;
437        let mut is_date = true;
438        let mut is_time = true;
439        let mut is_datetime = true;
440        let mut is_complex = true;
441        let mut non_empty_rows = 0;
442
443        for row in 0..rows {
444            let val = data[[row, col]].trim();
445
446            // Skip empty values for type detection
447            if val.is_empty() {
448                continue;
449            }
450
451            non_empty_rows += 1;
452
453            // Check if could be a boolean
454            let lower_val = val.to_lowercase();
455            let is_valid_bool =
456                ["true", "false", "yes", "no", "1", "0"].contains(&lower_val.as_str());
457            if !is_valid_bool {
458                is_bool = false;
459            }
460
461            // Check if could be an integer
462            if is_int && val.parse::<i64>().is_err() {
463                is_int = false;
464            }
465
466            // Check if could be a float
467            if is_float && val.parse::<f64>().is_err() {
468                is_float = false;
469            }
470
471            // Check if could be a date (YYYY-MM-DD)
472            if is_date && NaiveDate::parse_from_str(val, "%Y-%m-%d").is_err() {
473                is_date = false;
474            }
475
476            // Check if could be a time (HH:MM:SS)
477            if is_time
478                && NaiveTime::parse_from_str(val, "%H:%M:%S").is_err()
479                && NaiveTime::parse_from_str(val, "%H:%M:%S%.f").is_err()
480            {
481                is_time = false;
482            }
483
484            // Check if could be a datetime (YYYY-MM-DDThh:mm:ss)
485            if is_datetime
486                && NaiveDateTime::parse_from_str(val, "%Y-%m-%dT%H:%M:%S").is_err()
487                && NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S").is_err()
488                && NaiveDateTime::parse_from_str(val, "%Y-%m-%dT%H:%M:%S%.f").is_err()
489                && NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S%.f").is_err()
490            {
491                is_datetime = false;
492            }
493
494            // Check if could be a complex number
495            if is_complex {
496                // Try to parse as complex number with patterns like "3+4i", "3-4i", etc.
497                is_complex = parse_complex(val).is_some();
498            }
499        }
500
501        // Don't auto-detect special types if we have too few samples
502        if non_empty_rows < 2 {
503            is_date = false;
504            is_time = false;
505            is_datetime = false;
506            is_complex = false;
507        }
508
509        // Assign most specific type, with priority
510        if is_bool {
511            col_types[col] = ColumnType::Boolean;
512        } else if is_int {
513            col_types[col] = ColumnType::Integer;
514        } else if is_float {
515            col_types[col] = ColumnType::Float;
516        } else if is_date {
517            col_types[col] = ColumnType::Date;
518        } else if is_time {
519            col_types[col] = ColumnType::Time;
520        } else if is_datetime {
521            col_types[col] = ColumnType::DateTime;
522        } else if is_complex {
523            col_types[col] = ColumnType::Complex;
524        }
525    }
526
527    col_types
528}
529
530/// Parse a complex number from string like "3+4i", "-1-2i"
531#[allow(dead_code)]
532fn parse_complex(s: &str) -> Option<Complex64> {
533    // Common complex number formats:
534    // 1. "a+bi" or "a-bi" - standard form
535    // 2. "(a,b)" - coordinate form
536
537    if s.contains('i') {
538        // Handle standard form
539        let s = s.trim().replace(" ", "");
540
541        // Remove trailing 'i'
542        let s = if s.ends_with('i') {
543            &s[0..s.len() - 1]
544        } else {
545            return None;
546        };
547
548        // Find the position of + or - that isn't at the start
549        let mut split_pos = None;
550        let mut in_first_number = true;
551
552        for (i, c) in s.chars().enumerate() {
553            if i == 0 {
554                continue; // Skip first character which might be a sign
555            }
556
557            if c == '+' || c == '-' {
558                split_pos = Some((i, c));
559                break;
560            }
561
562            if !c.is_ascii_digit()
563                && c != '.'
564                && c != 'e'
565                && c != 'E'
566                && !(c == '-' && (s.as_bytes()[i - 1] == b'e' || s.as_bytes()[i - 1] == b'E'))
567            {
568                in_first_number = false;
569            }
570        }
571
572        if let Some((pos, sign)) = split_pos {
573            let real_part = s[0..pos].parse::<f64>().ok()?;
574            let imag_part = if sign == '+' {
575                s[pos + 1..].parse::<f64>().ok()?
576            } else {
577                -s[pos + 1..].parse::<f64>().ok()?
578            };
579
580            Some(Complex64::new(real_part, imag_part))
581        } else if in_first_number {
582            // Form like "0i" (just imaginary part)
583            Some(Complex64::new(0.0, s.parse::<f64>().ok()?))
584        } else {
585            None
586        }
587    } else if s.starts_with('(') && s.ends_with(')') && s.contains(',') {
588        // Handle coordinate form (a,b)
589        let contents = &s[1..s.len() - 1];
590        let parts: Vec<&str> = contents.split(',').collect();
591
592        if parts.len() == 2 {
593            let real = parts[0].trim().parse::<f64>().ok()?;
594            let imag = parts[1].trim().parse::<f64>().ok()?;
595            Some(Complex64::new(real, imag))
596        } else {
597            None
598        }
599    } else {
600        None
601    }
602}
603
604/// Convert a string to a specified type with missing value handling
605#[allow(dead_code)]
606fn convert_value(
607    value: &str,
608    col_type: ColumnType,
609    missing_values: &MissingValueOptions,
610) -> Result<DataValue> {
611    let trimmed = value.trim();
612
613    // Check for missing _values
614    if missing_values
615        .values
616        .iter()
617        .any(|mv| mv.eq_ignore_ascii_case(trimmed))
618    {
619        if let (Some(fill), ColumnType::Float) = (missing_values.fill_value, col_type) {
620            return Ok(DataValue::Float(fill));
621        }
622        return Ok(DataValue::Missing);
623    }
624
625    // Empty string check
626    if trimmed.is_empty() {
627        return Ok(DataValue::Missing);
628    }
629
630    // Type conversion
631    match col_type {
632        ColumnType::String => Ok(DataValue::String(trimmed.to_string())),
633        ColumnType::Integer => match trimmed.parse::<i64>() {
634            Ok(val) => Ok(DataValue::Integer(val)),
635            Err(_) => Err(IoError::FormatError(format!(
636                "Cannot convert '{value}' to integer"
637            ))),
638        },
639        ColumnType::Float => match trimmed.parse::<f64>() {
640            Ok(val) => Ok(DataValue::Float(val)),
641            Err(_) => Err(IoError::FormatError(format!(
642                "Cannot convert '{value}' to float"
643            ))),
644        },
645        ColumnType::Boolean => {
646            let lower = trimmed.to_lowercase();
647            match lower.as_str() {
648                "true" | "yes" | "1" => Ok(DataValue::Boolean(true)),
649                "false" | "no" | "0" => Ok(DataValue::Boolean(false)),
650                _ => Err(IoError::FormatError(format!(
651                    "Cannot convert '{value}' to boolean"
652                ))),
653            }
654        }
655        ColumnType::Date => match NaiveDate::parse_from_str(trimmed, "%Y-%m-%d") {
656            Ok(date) => Ok(DataValue::Date(date)),
657            Err(_) => Err(IoError::FormatError(format!(
658                "Cannot convert '{value}' to date (expected YYYY-MM-DD)"
659            ))),
660        },
661        ColumnType::Time => {
662            let result = NaiveTime::parse_from_str(trimmed, "%H:%M:%S")
663                .or_else(|_| NaiveTime::parse_from_str(trimmed, "%H:%M:%S%.f"));
664
665            match result {
666                Ok(time) => Ok(DataValue::Time(time)),
667                Err(_) => Err(IoError::FormatError(format!(
668                    "Cannot convert '{value}' to time (expected HH:MM:SS[.f])"
669                ))),
670            }
671        }
672        ColumnType::DateTime => {
673            let result = NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S")
674                .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%d %H:%M:%S"))
675                .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f"))
676                .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%d %H:%M:%S%.f"));
677
678            match result {
679                Ok(dt) => Ok(DataValue::DateTime(dt)),
680                Err(_) => Err(IoError::FormatError(format!(
681                    "Cannot convert '{value}' to datetime (expected YYYY-MM-DD[T ]HH:MM:SS[.f])"
682                ))),
683            }
684        }
685        ColumnType::Complex => match parse_complex(trimmed) {
686            Some(complex) => Ok(DataValue::Complex(complex)),
687            None => Err(IoError::FormatError(format!(
688                "Cannot convert '{value}' to complex number (expected a+bi or (a,b))"
689            ))),
690        },
691    }
692}
693
694/// Write a 2D array to a CSV file
695///
696/// # Arguments
697///
698/// * `path` - Path to the output CSV file
699/// * `data` - 2D array to write
700/// * `headers` - Optional column headers
701/// * `config` - Optional CSV writer configuration
702///
703/// # Returns
704///
705/// * `Result<()>` - Success or error
706///
707/// # Examples
708///
709/// ```no_run
710/// use scirs2_core::ndarray::array;
711/// use scirs2_io::csv::{write_csv, CsvWriterConfig};
712///
713/// let data = array![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]];
714/// let headers = vec!["A".to_string(), "B".to_string(), "C".to_string()];
715///
716/// // Write with default configuration
717/// write_csv("output.csv", &data, Some(&headers), None).unwrap();
718///
719/// // Write with custom configuration
720/// let config = CsvWriterConfig {
721///     delimiter: ';',
722///     always_quote: true,
723///     ..Default::default()
724/// };
725/// write_csv("output_custom.csv", &data, Some(&headers), Some(config)).unwrap();
726/// ```
727#[allow(dead_code)]
728pub fn write_csv<P: AsRef<Path>, T: std::fmt::Display>(
729    path: P,
730    data: &Array2<T>,
731    headers: Option<&Vec<String>>,
732    config: Option<CsvWriterConfig>,
733) -> Result<()> {
734    let config = config.unwrap_or_default();
735
736    // Get data shape
737    let shape = data.shape();
738    let (rows, cols) = (shape[0], shape[1]);
739
740    // Check headers match data width
741    if let Some(hdrs) = headers {
742        if hdrs.len() != cols && config.write_header {
743            return Err(IoError::FormatError(format!(
744                "Header length ({}) does not match data width ({})",
745                hdrs.len(),
746                cols
747            )));
748        }
749    }
750
751    // Open file for writing
752    let mut file = File::create(path).map_err(|e| IoError::FileError(e.to_string()))?;
753
754    // Write headers if provided and enabled
755    if let Some(hdrs) = headers {
756        if config.write_header {
757            let header_line = format_csv_line(hdrs, &config);
758            file.write_all(header_line.as_bytes())
759                .map_err(|e| IoError::FileError(e.to_string()))?;
760            file.write_all(config.line_ending.as_str().as_bytes())
761                .map_err(|e| IoError::FileError(e.to_string()))?;
762        }
763    }
764
765    // Write data rows
766    for i in 0..rows {
767        let row: Vec<String> = (0..cols).map(|j| data[[i, j]].to_string()).collect();
768
769        let line = format_csv_line(&row, &config);
770        file.write_all(line.as_bytes())
771            .map_err(|e| IoError::FileError(e.to_string()))?;
772
773        if i < rows - 1 || config.line_ending == LineEnding::CRLF {
774            file.write_all(config.line_ending.as_str().as_bytes())
775                .map_err(|e| IoError::FileError(e.to_string()))?;
776        } else {
777            // For LF, ensure file ends with a newline but avoid extra newline
778            file.write_all(b"\n")
779                .map_err(|e| IoError::FileError(e.to_string()))?;
780        }
781    }
782
783    Ok(())
784}
785
786/// Format a row as a CSV line
787#[allow(dead_code)]
788fn format_csv_line(fields: &[String], config: &CsvWriterConfig) -> String {
789    let mut result = String::new();
790
791    for (i, field) in fields.iter().enumerate() {
792        let need_quotes = config.always_quote
793            || (config.quote_special
794                && (field.contains(config.delimiter)
795                    || field.contains(config.quote_char)
796                    || field.contains('\n')
797                    || field.contains('\r')));
798
799        if need_quotes {
800            // Add quote character
801            result.push(config.quote_char);
802
803            // Add the field with escaped quotes
804            let escaped = field.replace(
805                config.quote_char,
806                &format!("{}{}", config.quote_char, config.quote_char),
807            );
808            result.push_str(&escaped);
809
810            // Close quotes
811            result.push(config.quote_char);
812        } else {
813            result.push_str(field);
814        }
815
816        // Add delimiter if not the last field
817        if i < fields.len() - 1 {
818            result.push(config.delimiter);
819        }
820    }
821
822    result
823}
824
825/// Read a CSV file with type conversion and missing value handling
826///
827/// # Arguments
828///
829/// * `path` - Path to the CSV file
830/// * `config` - Optional CSV reader configuration
831/// * `col_types` - Optional column data types
832/// * `missing_values` - Optional missing value handling options
833///
834/// # Returns
835///
836/// * `Result<(Vec<String>, Vec<Vec<DataValue>>)>` - Headers and typed data values
837///
838/// # Examples
839///
840/// ```no_run
841/// use scirs2_io::csv::{read_csv_typed, ColumnType, CsvReaderConfig, MissingValueOptions};
842///
843/// // Read with automatic type detection
844/// let (headers, data) = read_csv_typed("data.csv", None, None, None).unwrap();
845///
846/// // Read with specified column types
847/// let col_types = vec![
848///     ColumnType::String,
849///     ColumnType::Integer,
850///     ColumnType::Float,
851///     ColumnType::Boolean,
852/// ];
853/// let (headers, data) = read_csv_typed("data.csv", None, Some(&col_types), None).unwrap();
854///
855/// // Read with custom missing value handling
856/// let missing_opts = MissingValueOptions {
857///     values: vec!["missing".to_string(), "unknown".to_string()],
858///     fill_value: Some(0.0),
859/// };
860/// let (headers, data) = read_csv_typed("data.csv", None, None, Some(missing_opts)).unwrap();
861/// ```
862#[allow(dead_code)]
863pub fn read_csv_typed<P: AsRef<Path>>(
864    path: P,
865    config: Option<CsvReaderConfig>,
866    col_types: Option<&[ColumnType]>,
867    missing_values: Option<MissingValueOptions>,
868) -> Result<(Vec<String>, Vec<Vec<DataValue>>)> {
869    // Get string data first
870    let (headers, string_data) = read_csv(path, config)?;
871
872    // If no data, return early
873    if string_data.shape()[0] == 0 || string_data.shape()[1] == 0 {
874        return Ok((headers, Vec::new()));
875    }
876
877    // Determine column types if not provided
878    let col_types_vec = match col_types {
879        Some(types) => {
880            if types.len() != string_data.shape()[1] {
881                return Err(IoError::FormatError(format!(
882                    "Number of column types ({}) does not match data width ({})",
883                    types.len(),
884                    string_data.shape()[1]
885                )));
886            }
887            types.to_vec()
888        }
889        None => detect_column_types(&string_data),
890    };
891
892    let missing_opts = missing_values.unwrap_or_default();
893
894    // Convert data
895    let mut typed_data = Vec::with_capacity(string_data.shape()[0]);
896
897    for i in 0..string_data.shape()[0] {
898        let mut row = Vec::with_capacity(string_data.shape()[1]);
899
900        for j in 0..string_data.shape()[1] {
901            let value = convert_value(&string_data[[i, j]], col_types_vec[j], &missing_opts)?;
902            row.push(value);
903        }
904
905        typed_data.push(row);
906    }
907
908    Ok((headers, typed_data))
909}
910
911/// Read a CSV file in chunks to process large files memory-efficiently
912///
913/// # Arguments
914///
915/// * `path` - Path to the CSV file
916/// * `config` - Optional CSV reader configuration
917/// * `chunk_size` - Number of rows to read in each chunk
918/// * `callback` - Function to process each chunk
919///
920/// # Returns
921///
922/// * `Result<()>` - Success or error
923///
924/// # Examples
925///
926/// ```no_run
927/// use scirs2_io::csv::{read_csv_chunked, CsvReaderConfig};
928/// use scirs2_core::ndarray::Array2;
929///
930/// let config = CsvReaderConfig::default();
931/// let mut total_rows = 0;
932///
933/// read_csv_chunked("large_data.csv", Some(config), 1000, |headers, chunk| {
934///     println!("Processing chunk with {} rows", chunk.shape()[0]);
935///     total_rows += chunk.shape()[0];
936///     true // continue processing
937/// }).unwrap();
938///
939/// println!("Total rows processed: {}", total_rows);
940/// ```
941#[allow(dead_code)]
942pub fn read_csv_chunked<P, F>(
943    path: P,
944    config: Option<CsvReaderConfig>,
945    chunk_size: usize,
946    mut callback: F,
947) -> Result<()>
948where
949    P: AsRef<Path>,
950    F: FnMut(&[String], &Array2<String>) -> bool,
951{
952    let config = config.unwrap_or_default();
953
954    let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
955    let reader = BufReader::new(file);
956    let mut lines = reader.lines();
957
958    // Skip rows if needed
959    for _ in 0..config.skip_rows {
960        if lines.next().is_none() {
961            return Err(IoError::FormatError("Not enough rows in file".to_string()));
962        }
963    }
964
965    // Read header if present
966    let headers = if config.has_header {
967        match lines.next() {
968            Some(Ok(line)) => parse_csv_line(&line, &config),
969            Some(Err(e)) => return Err(IoError::FileError(e.to_string())),
970            None => return Err(IoError::FormatError("Empty file".to_string())),
971        }
972    } else {
973        Vec::new()
974    };
975
976    let mut buffer = Vec::with_capacity(chunk_size);
977    let mut num_cols = 0;
978
979    // Process file in chunks
980    for line_result in lines {
981        // Process comment lines, empty lines
982        let line = line_result.map_err(|e| IoError::FileError(e.to_string()))?;
983
984        if let Some(comment_char) = config.comment_char {
985            if line.trim().starts_with(comment_char) {
986                continue;
987            }
988        }
989
990        if line.trim().is_empty() {
991            continue;
992        }
993
994        // Parse the line
995        let row = parse_csv_line(&line, &config);
996
997        // Determine number of columns from first data row
998        if buffer.is_empty() {
999            num_cols = row.len();
1000        } else if row.len() != num_cols {
1001            return Err(IoError::FormatError(format!(
1002                "Inconsistent number of columns: got {}, expected {}",
1003                row.len(),
1004                num_cols
1005            )));
1006        }
1007
1008        buffer.push(row);
1009
1010        // Process chunk when we've reached chunk_size
1011        if buffer.len() >= chunk_size
1012            && !process_chunk(&headers, &mut buffer, num_cols, &mut callback)?
1013        {
1014            return Ok(()); // Callback returned false, stop processing
1015        }
1016    }
1017
1018    // Process remaining rows
1019    if !buffer.is_empty() {
1020        process_chunk(&headers, &mut buffer, num_cols, &mut callback)?;
1021    }
1022
1023    Ok(())
1024}
1025
1026/// Helper function to process a chunk of data
1027#[allow(dead_code)]
1028fn process_chunk<F>(
1029    headers: &[String],
1030    buffer: &mut Vec<Vec<String>>,
1031    num_cols: usize,
1032    callback: &mut F,
1033) -> Result<bool>
1034where
1035    F: FnMut(&[String], &Array2<String>) -> bool,
1036{
1037    let num_rows = buffer.len();
1038    let mut data = Array2::<String>::from_elem((num_rows, num_cols), String::new());
1039
1040    for (i, row) in buffer.iter().enumerate() {
1041        for (j, value) in row.iter().enumerate() {
1042            data[[i, j]] = value.clone();
1043        }
1044    }
1045
1046    buffer.clear();
1047
1048    Ok(callback(headers, &data))
1049}
1050
1051/// Write typed data to a CSV file
1052///
1053/// # Arguments
1054///
1055/// * `path` - Path to the output CSV file
1056/// * `data` - Vector of vectors containing typed data values
1057/// * `headers` - Optional column headers
1058/// * `config` - Optional CSV writer configuration
1059///
1060/// # Returns
1061///
1062/// * `Result<()>` - Success or error
1063///
1064/// # Examples
1065///
1066/// ```no_run
1067/// use scirs2_io::csv::{write_csv_typed, DataValue, CsvWriterConfig};
1068///
1069/// // Create mixed-type data
1070/// let row1 = vec![
1071///     DataValue::String("Alice".to_string()),
1072///     DataValue::Integer(25),
1073///     DataValue::Float(168.5),
1074///     DataValue::Boolean(true),
1075/// ];
1076/// let row2 = vec![
1077///     DataValue::String("Bob".to_string()),
1078///     DataValue::Integer(32),
1079///     DataValue::Float(175.0),
1080///     DataValue::Boolean(false),
1081/// ];
1082///
1083/// let data = vec![row1, row2];
1084/// let headers = vec![
1085///     "Name".to_string(),
1086///     "Age".to_string(),
1087///     "Height".to_string(),
1088///     "Active".to_string(),
1089/// ];
1090///
1091/// write_csv_typed("typed_data.csv", &data, Some(&headers), None).unwrap();
1092/// ```
1093#[allow(dead_code)]
1094pub fn write_csv_typed<P: AsRef<Path>>(
1095    path: P,
1096    data: &[Vec<DataValue>],
1097    headers: Option<&Vec<String>>,
1098    config: Option<CsvWriterConfig>,
1099) -> Result<()> {
1100    let config = config.unwrap_or_default();
1101
1102    if data.is_empty() {
1103        return Err(IoError::FormatError("No data provided".to_string()));
1104    }
1105
1106    // Check all rows have the same length
1107    let num_cols = data[0].len();
1108    for (i, row) in data.iter().enumerate().skip(1) {
1109        if row.len() != num_cols {
1110            return Err(IoError::FormatError(format!(
1111                "Row {} has {} columns, expected {}",
1112                i,
1113                row.len(),
1114                num_cols
1115            )));
1116        }
1117    }
1118
1119    // Check headers match data width
1120    if let Some(hdrs) = headers {
1121        if hdrs.len() != num_cols && config.write_header {
1122            return Err(IoError::FormatError(format!(
1123                "Header length ({}) does not match data width ({})",
1124                hdrs.len(),
1125                num_cols
1126            )));
1127        }
1128    }
1129
1130    // Open file for writing with buffering for better performance
1131    let file = File::create(path).map_err(|e| IoError::FileError(e.to_string()))?;
1132    let mut writer = BufWriter::new(file);
1133
1134    // Write headers if provided and enabled
1135    if let Some(hdrs) = headers {
1136        if config.write_header {
1137            let header_line = format_csv_line(hdrs, &config);
1138            writer
1139                .write_all(header_line.as_bytes())
1140                .map_err(|e| IoError::FileError(e.to_string()))?;
1141            writer
1142                .write_all(config.line_ending.as_str().as_bytes())
1143                .map_err(|e| IoError::FileError(e.to_string()))?;
1144        }
1145    }
1146
1147    // Write data rows
1148    for (i, row) in data.iter().enumerate() {
1149        let string_row: Vec<String> = row.iter().map(|val| val.to_string()).collect();
1150
1151        let line = format_csv_line(&string_row, &config);
1152        writer
1153            .write_all(line.as_bytes())
1154            .map_err(|e| IoError::FileError(e.to_string()))?;
1155
1156        if i < data.len() - 1 || config.line_ending == LineEnding::CRLF {
1157            writer
1158                .write_all(config.line_ending.as_str().as_bytes())
1159                .map_err(|e| IoError::FileError(e.to_string()))?;
1160        } else {
1161            // For LF, ensure file ends with a newline but avoid extra newline
1162            writer
1163                .write_all(b"\n")
1164                .map_err(|e| IoError::FileError(e.to_string()))?;
1165        }
1166    }
1167
1168    // Ensure data is written to disk
1169    writer
1170        .flush()
1171        .map_err(|e| IoError::FileError(e.to_string()))?;
1172
1173    Ok(())
1174}
1175
1176/// Write multiple 1D arrays to a CSV file as columns
1177///
1178/// # Arguments
1179///
1180/// * `path` - Path to the output CSV file
1181/// * `columns` - Vector of 1D arrays to write as columns
1182/// * `headers` - Optional column headers
1183/// * `config` - Optional CSV writer configuration
1184///
1185/// # Returns
1186///
1187/// * `Result<()>` - Success or error
1188///
1189/// # Examples
1190///
1191/// ```no_run
1192/// use scirs2_core::ndarray::{Array1, array};
1193/// use scirs2_io::csv::{write_csv_columns, CsvWriterConfig};
1194///
1195/// let col1 = array![1.0, 2.0, 3.0];
1196/// let col2 = array![4.0, 5.0, 6.0];
1197/// let columns = vec![col1, col2];
1198/// let headers = vec!["X".to_string(), "Y".to_string()];
1199///
1200/// write_csv_columns("columns.csv", &columns, Some(&headers), None).unwrap();
1201/// ```
1202#[allow(dead_code)]
1203pub fn write_csv_columns<P: AsRef<Path>, T: std::fmt::Display + Clone>(
1204    path: P,
1205    columns: &[Array1<T>],
1206    headers: Option<&Vec<String>>,
1207    config: Option<CsvWriterConfig>,
1208) -> Result<()> {
1209    if columns.is_empty() {
1210        return Err(IoError::FormatError("No columns provided".to_string()));
1211    }
1212
1213    // Check all columns have the same length
1214    let num_rows = columns[0].len();
1215    for (i, col) in columns.iter().enumerate().skip(1) {
1216        if col.len() != num_rows {
1217            return Err(IoError::FormatError(format!(
1218                "Column {} has length {}, expected {}",
1219                i,
1220                col.len(),
1221                num_rows
1222            )));
1223        }
1224    }
1225
1226    // Check headers match column count
1227    if let Some(hdrs) = headers {
1228        if hdrs.len() != columns.len() {
1229            return Err(IoError::FormatError(format!(
1230                "Header length ({}) does not match column count ({})",
1231                hdrs.len(),
1232                columns.len()
1233            )));
1234        }
1235    }
1236
1237    // Convert to Array2
1238    let num_cols = columns.len();
1239    let mut data = Array2::<String>::from_elem((num_rows, num_cols), String::new());
1240
1241    for (j, col) in columns.iter().enumerate() {
1242        for (i, val) in col.iter().enumerate() {
1243            data[[i, j]] = val.to_string();
1244        }
1245    }
1246
1247    // Write to CSV
1248    write_csv(path, &data, headers, config)
1249}
1250
1251//
1252// Streaming CSV Processing for Large Files
1253//
1254
1255use scirs2_core::parallel_ops::*;
1256use std::sync::{Arc, Mutex};
1257
1258/// Configuration for streaming CSV operations
1259#[derive(Debug, Clone)]
1260pub struct StreamingCsvConfig {
1261    /// CSV reader configuration
1262    pub csv_config: CsvReaderConfig,
1263    /// Size of chunks to process at a time (in rows)
1264    pub chunk_size: usize,
1265    /// Number of parallel workers (0 = use all cores)
1266    pub num_workers: usize,
1267    /// Buffer size for I/O operations
1268    pub buffer_size: usize,
1269    /// Memory limit for buffering (in bytes)
1270    pub memory_limit: usize,
1271}
1272
1273impl Default for StreamingCsvConfig {
1274    fn default() -> Self {
1275        Self {
1276            csv_config: CsvReaderConfig::default(),
1277            chunk_size: 10000,               // 10K rows per chunk
1278            num_workers: 0,                  // Use all cores
1279            buffer_size: 64 * 1024,          // 64KB buffer
1280            memory_limit: 512 * 1024 * 1024, // 512MB memory limit
1281        }
1282    }
1283}
1284
1285/// Streaming CSV reader for processing large files
1286pub struct StreamingCsvReader<R: BufRead> {
1287    reader: R,
1288    config: StreamingCsvConfig,
1289    headers: Option<Vec<String>>,
1290    current_line: usize,
1291    buffer: Vec<String>,
1292    finished: bool,
1293}
1294
1295impl<R: BufRead> StreamingCsvReader<R> {
1296    /// Create a new streaming CSV reader
1297    pub fn new(reader: R, config: StreamingCsvConfig) -> Result<Self> {
1298        let mut reader = Self {
1299            reader,
1300            config,
1301            headers: None,
1302            current_line: 0,
1303            buffer: Vec::new(),
1304            finished: false,
1305        };
1306
1307        // Read headers if configured
1308        if reader.config.csv_config.has_header {
1309            reader.read_headers()?;
1310        }
1311
1312        Ok(reader)
1313    }
1314
1315    /// Read the header row
1316    fn read_headers(&mut self) -> Result<()> {
1317        // Skip initial rows
1318        for _ in 0..self.config.csv_config.skip_rows {
1319            let mut line = String::new();
1320            if self
1321                .reader
1322                .read_line(&mut line)
1323                .map_err(|e| IoError::FileError(e.to_string()))?
1324                == 0
1325            {
1326                return Err(IoError::FormatError("Not enough rows in file".to_string()));
1327            }
1328            self.current_line += 1;
1329        }
1330
1331        // Read header row
1332        let mut header_line = String::new();
1333        if self
1334            .reader
1335            .read_line(&mut header_line)
1336            .map_err(|e| IoError::FileError(e.to_string()))?
1337            == 0
1338        {
1339            return Err(IoError::FormatError("Empty file".to_string()));
1340        }
1341
1342        self.headers = Some(parse_csv_line(header_line.trim(), &self.config.csv_config));
1343        self.current_line += 1;
1344        Ok(())
1345    }
1346
1347    /// Get the headers (if available)
1348    pub fn headers(&self) -> Option<&Vec<String>> {
1349        self.headers.as_ref()
1350    }
1351
1352    /// Read the next chunk of data
1353    pub fn read_chunk(&mut self) -> Result<Option<Array2<String>>> {
1354        if self.finished {
1355            return Ok(None);
1356        }
1357
1358        self.buffer.clear();
1359        let mut rows_read = 0;
1360        let mut line = String::new();
1361
1362        // Read chunk_size rows
1363        while rows_read < self.config.chunk_size {
1364            line.clear();
1365            let bytes_read = self
1366                .reader
1367                .read_line(&mut line)
1368                .map_err(|e| IoError::FileError(e.to_string()))?;
1369
1370            if bytes_read == 0 {
1371                // End of file
1372                self.finished = true;
1373                break;
1374            }
1375
1376            let trimmed_line = line.trim();
1377
1378            // Skip empty lines and comments
1379            if trimmed_line.is_empty() {
1380                continue;
1381            }
1382
1383            if let Some(comment_char) = self.config.csv_config.comment_char {
1384                if trimmed_line.starts_with(comment_char) {
1385                    continue;
1386                }
1387            }
1388
1389            self.buffer.push(trimmed_line.to_string());
1390            rows_read += 1;
1391            self.current_line += 1;
1392
1393            // Check memory limit
1394            if self.buffer.len() * line.len() > self.config.memory_limit {
1395                break;
1396            }
1397        }
1398
1399        if self.buffer.is_empty() {
1400            return Ok(None);
1401        }
1402
1403        // Parse all rows in the buffer
1404        let parsed_rows: Vec<Vec<String>> = self
1405            .buffer
1406            .iter()
1407            .map(|line| parse_csv_line(line, &self.config.csv_config))
1408            .collect();
1409
1410        if parsed_rows.is_empty() {
1411            return Ok(None);
1412        }
1413
1414        // Check column consistency
1415        let num_cols = parsed_rows[0].len();
1416        for (i, row) in parsed_rows.iter().enumerate() {
1417            if row.len() != num_cols {
1418                return Err(IoError::FormatError(format!(
1419                    "Inconsistent columns at line {line}: got {actual}, expected {expected}",
1420                    line = self.current_line - self.buffer.len() + i,
1421                    actual = row.len(),
1422                    expected = num_cols
1423                )));
1424            }
1425        }
1426
1427        // Convert to Array2
1428        let num_rows = parsed_rows.len();
1429        let mut data = Array2::from_elem((num_rows, num_cols), String::new());
1430
1431        for (i, row) in parsed_rows.iter().enumerate() {
1432            for (j, value) in row.iter().enumerate() {
1433                data[[i, j]] = value.clone();
1434            }
1435        }
1436
1437        Ok(Some(data))
1438    }
1439
1440    /// Get current line number
1441    pub fn current_line(&self) -> usize {
1442        self.current_line
1443    }
1444
1445    /// Check if the reader is finished
1446    pub fn is_finished(&self) -> bool {
1447        self.finished
1448    }
1449}
1450
1451/// Create a streaming CSV reader from a file path
1452#[allow(dead_code)]
1453pub fn streaming_reader_from_file<P: AsRef<Path>>(
1454    path: P,
1455    config: StreamingCsvConfig,
1456) -> Result<StreamingCsvReader<BufReader<File>>> {
1457    let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
1458    let reader = BufReader::with_capacity(config.buffer_size, file);
1459    StreamingCsvReader::new(reader, config)
1460}
1461
1462/// Statistics collected during streaming processing
1463#[derive(Debug, Clone)]
1464pub struct StreamingStats {
1465    /// Total rows processed
1466    pub rows_processed: usize,
1467    /// Total chunks processed
1468    pub chunks_processed: usize,
1469    /// Total time taken (milliseconds)
1470    pub total_time_ms: f64,
1471    /// Average rows per second
1472    pub rows_per_second: f64,
1473    /// Peak memory usage (bytes)
1474    pub peak_memory_bytes: usize,
1475    /// Number of errors encountered
1476    pub error_count: usize,
1477}
1478
1479/// Process a CSV file in streaming mode with a custom function
1480#[allow(dead_code)]
1481pub fn process_csv_streaming<P: AsRef<Path>, F, R>(
1482    path: P,
1483    config: StreamingCsvConfig,
1484    mut processor: F,
1485) -> Result<(Vec<R>, StreamingStats)>
1486where
1487    F: FnMut(&Array2<String>, Option<&Vec<String>>) -> Result<R> + Send + Sync,
1488    R: Send,
1489{
1490    let start_time = std::time::Instant::now();
1491    let mut reader = streaming_reader_from_file(path, config)?;
1492
1493    let mut results = Vec::new();
1494    let mut stats = StreamingStats {
1495        rows_processed: 0,
1496        chunks_processed: 0,
1497        total_time_ms: 0.0,
1498        rows_per_second: 0.0,
1499        peak_memory_bytes: 0,
1500        error_count: 0,
1501    };
1502
1503    let headers = reader.headers().cloned();
1504
1505    while let Some(chunk) = reader.read_chunk()? {
1506        match processor(&chunk, headers.as_ref()) {
1507            Ok(result) => {
1508                results.push(result);
1509                stats.rows_processed += chunk.nrows();
1510                stats.chunks_processed += 1;
1511            }
1512            Err(_) => {
1513                stats.error_count += 1;
1514            }
1515        }
1516
1517        // Update peak memory (rough estimate)
1518        let current_memory = chunk.len() * std::mem::size_of::<String>();
1519        if current_memory > stats.peak_memory_bytes {
1520            stats.peak_memory_bytes = current_memory;
1521        }
1522    }
1523
1524    let elapsed = start_time.elapsed();
1525    stats.total_time_ms = elapsed.as_secs_f64() * 1000.0;
1526    stats.rows_per_second = stats.rows_processed as f64 / elapsed.as_secs_f64();
1527
1528    Ok((results, stats))
1529}
1530
1531/// Process a CSV file in parallel streaming mode
1532#[allow(dead_code)]
1533pub fn process_csv_streaming_parallel<P: AsRef<Path>, F, R>(
1534    path: P,
1535    config: StreamingCsvConfig,
1536    processor: F,
1537) -> Result<(Vec<R>, StreamingStats)>
1538where
1539    F: Fn(&Array2<String>, Option<&Vec<String>>) -> Result<R> + Send + Sync + Clone,
1540    R: Send + 'static,
1541{
1542    let start_time = std::time::Instant::now();
1543    let mut reader = streaming_reader_from_file(path, config)?;
1544
1545    let headers = reader.headers().cloned();
1546    let mut chunks = Vec::new();
1547
1548    // Read all chunks first
1549    while let Some(chunk) = reader.read_chunk()? {
1550        chunks.push(chunk);
1551    }
1552
1553    if chunks.is_empty() {
1554        return Ok((
1555            Vec::new(),
1556            StreamingStats {
1557                rows_processed: 0,
1558                chunks_processed: 0,
1559                total_time_ms: 0.0,
1560                rows_per_second: 0.0,
1561                peak_memory_bytes: 0,
1562                error_count: 0,
1563            },
1564        ));
1565    }
1566
1567    // Process chunks in parallel
1568    let error_count = Arc::new(Mutex::new(0));
1569    let peak_memory = Arc::new(Mutex::new(0));
1570
1571    let results: Vec<R> = chunks
1572        .into_par_iter()
1573        .filter_map(|chunk| {
1574            // Update peak memory
1575            let memory_usage = chunk.len() * std::mem::size_of::<String>();
1576            {
1577                let mut peak = peak_memory.lock().unwrap();
1578                if memory_usage > *peak {
1579                    *peak = memory_usage;
1580                }
1581            }
1582
1583            match processor(&chunk, headers.as_ref()) {
1584                Ok(result) => Some(result),
1585                Err(_) => {
1586                    *error_count.lock().unwrap() += 1;
1587                    None
1588                }
1589            }
1590        })
1591        .collect();
1592
1593    let elapsed = start_time.elapsed();
1594    let total_rows: usize = results.len(); // This is actually chunks, but for simplicity
1595
1596    let stats = StreamingStats {
1597        rows_processed: total_rows,
1598        chunks_processed: results.len(),
1599        total_time_ms: elapsed.as_secs_f64() * 1000.0,
1600        rows_per_second: total_rows as f64 / elapsed.as_secs_f64(),
1601        peak_memory_bytes: *peak_memory.lock().unwrap(),
1602        error_count: *error_count.lock().unwrap(),
1603    };
1604
1605    Ok((results, stats))
1606}
1607
1608/// Convert CSV chunks to numeric data in streaming mode
1609#[allow(dead_code)]
1610pub fn read_csv_numeric_streaming<P: AsRef<Path>>(
1611    path: P,
1612    config: StreamingCsvConfig,
1613) -> Result<(Option<Vec<String>>, Vec<Array2<f64>>, StreamingStats)> {
1614    let (chunks, stats) = process_csv_streaming(path, config, |chunk, _headers| {
1615        // Convert chunk to numeric
1616        let shape = chunk.shape();
1617        let mut numeric_chunk = Array2::<f64>::zeros((shape[0], shape[1]));
1618
1619        for i in 0..shape[0] {
1620            for j in 0..shape[1] {
1621                let value = chunk[[i, j]].parse::<f64>().map_err(|_| {
1622                    IoError::FormatError(format!(
1623                        "Could not convert '{value}' to number at [{row}, {col}]",
1624                        value = chunk[[i, j]],
1625                        row = i,
1626                        col = j
1627                    ))
1628                })?;
1629                numeric_chunk[[i, j]] = value;
1630            }
1631        }
1632
1633        Ok(numeric_chunk)
1634    })?;
1635
1636    // Get headers from the first successful read
1637    let headers = chunks.first().and({
1638        // This is a simplified approach - in a real implementation,
1639        // we'd need to preserve headers from the reader
1640        None
1641    });
1642
1643    Ok((headers, chunks, stats))
1644}
1645
1646/// Aggregate statistics across CSV chunks
1647#[allow(dead_code)]
1648pub fn aggregate_csv_statistics<P: AsRef<Path>>(
1649    path: P,
1650    config: StreamingCsvConfig,
1651) -> Result<Vec<ColumnStats>> {
1652    let mut column_stats: Vec<Option<ColumnStats>> = Vec::new();
1653
1654    let _results_stats = process_csv_streaming(path, config, |chunk, _headers| {
1655        let shape = chunk.shape();
1656
1657        // Initialize column stats if needed
1658        if column_stats.is_empty() {
1659            column_stats = vec![None; shape[1]];
1660        }
1661
1662        // Update statistics for each column
1663        for col_idx in 0..shape[1] {
1664            let mut values = Vec::new();
1665            let mut non_numeric_count = 0;
1666
1667            for row_idx in 0..shape[0] {
1668                let cell_value = &chunk[[row_idx, col_idx]];
1669                if let Ok(numeric_value) = cell_value.parse::<f64>() {
1670                    values.push(numeric_value);
1671                } else {
1672                    non_numeric_count += 1;
1673                }
1674            }
1675
1676            if !values.is_empty() {
1677                let current_stats = ColumnStats::from_values(&values, non_numeric_count);
1678
1679                match &column_stats[col_idx] {
1680                    None => column_stats[col_idx] = Some(current_stats),
1681                    Some(existing) => {
1682                        column_stats[col_idx] = Some(existing.merge(&current_stats));
1683                    }
1684                }
1685            }
1686        }
1687
1688        Ok(())
1689    })?;
1690
1691    Ok(column_stats.into_iter().flatten().collect())
1692}
1693
1694/// Column statistics for CSV analysis
1695#[derive(Debug, Clone)]
1696pub struct ColumnStats {
1697    /// Column index
1698    pub column_index: usize,
1699    /// Total number of values
1700    pub total_count: usize,
1701    /// Number of non-numeric values
1702    pub non_numeric_count: usize,
1703    /// Minimum value (for numeric columns)
1704    pub min_value: Option<f64>,
1705    /// Maximum value (for numeric columns)
1706    pub max_value: Option<f64>,
1707    /// Mean value (for numeric columns)
1708    pub mean_value: Option<f64>,
1709    /// Standard deviation (for numeric columns)
1710    pub std_dev: Option<f64>,
1711}
1712
1713impl ColumnStats {
1714    /// Create column statistics from a vector of values
1715    fn from_values(values: &[f64], non_numericcount: usize) -> Self {
1716        if values.is_empty() {
1717            return Self {
1718                column_index: 0,
1719                total_count: non_numericcount,
1720                non_numeric_count: non_numericcount,
1721                min_value: None,
1722                max_value: None,
1723                mean_value: None,
1724                std_dev: None,
1725            };
1726        }
1727
1728        let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1729        let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1730        let mean = values.iter().sum::<f64>() / values.len() as f64;
1731
1732        let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
1733        let std_dev = variance.sqrt();
1734
1735        Self {
1736            column_index: 0,
1737            total_count: values.len() + non_numericcount,
1738            non_numeric_count: non_numericcount,
1739            min_value: Some(min_val),
1740            max_value: Some(max_val),
1741            mean_value: Some(mean),
1742            std_dev: Some(std_dev),
1743        }
1744    }
1745
1746    /// Merge statistics from another column
1747    fn merge(&self, other: &Self) -> Self {
1748        let total_numeric_self = self.total_count - self.non_numeric_count;
1749        let total_numeric_other = other.total_count - other.non_numeric_count;
1750        let combined_numeric = total_numeric_self + total_numeric_other;
1751
1752        if combined_numeric == 0 {
1753            return Self {
1754                column_index: self.column_index,
1755                total_count: self.total_count + other.total_count,
1756                non_numeric_count: self.non_numeric_count + other.non_numeric_count,
1757                min_value: None,
1758                max_value: None,
1759                mean_value: None,
1760                std_dev: None,
1761            };
1762        }
1763
1764        let min_val = match (self.min_value, other.min_value) {
1765            (Some(a), Some(b)) => Some(a.min(b)),
1766            (Some(a), None) => Some(a),
1767            (None, Some(b)) => Some(b),
1768            (None, None) => None,
1769        };
1770
1771        let max_val = match (self.max_value, other.max_value) {
1772            (Some(a), Some(b)) => Some(a.max(b)),
1773            (Some(a), None) => Some(a),
1774            (None, Some(b)) => Some(b),
1775            (None, None) => None,
1776        };
1777
1778        // Combine means (weighted average)
1779        let combined_mean = match (self.mean_value, other.mean_value) {
1780            (Some(mean1), Some(mean2)) => {
1781                let w1 = total_numeric_self as f64;
1782                let w2 = total_numeric_other as f64;
1783                Some((mean1 * w1 + mean2 * w2) / (w1 + w2))
1784            }
1785            (Some(mean), None) => Some(mean),
1786            (None, Some(mean)) => Some(mean),
1787            (None, None) => None,
1788        };
1789
1790        // For standard deviation, we'd need to properly combine the variances,
1791        // but for simplicity, we'll use a rough approximation
1792        let combined_std = match (self.std_dev, other.std_dev) {
1793            (Some(std1), Some(std2)) => Some((std1 + std2) / 2.0),
1794            (Some(std), None) | (None, Some(std)) => Some(std),
1795            (None, None) => None,
1796        };
1797
1798        Self {
1799            column_index: self.column_index,
1800            total_count: self.total_count + other.total_count,
1801            non_numeric_count: self.non_numeric_count + other.non_numeric_count,
1802            min_value: min_val,
1803            max_value: max_val,
1804            mean_value: combined_mean,
1805            std_dev: combined_std,
1806        }
1807    }
1808}