1use super::functions::*;
6use crate::{Error, Result};
7use std::fs::{File, OpenOptions};
8use std::io::{BufRead, BufReader, BufWriter, Write};
9use std::path::Path;
10
11#[derive(Debug, Clone)]
13pub struct CsvWriterConfig {
14 pub delimiter: char,
16 pub line_ending: String,
18 pub quote_all: bool,
20 pub precision: usize,
22}
23pub struct CsvParser<'a> {
45 pub(super) input: &'a str,
46 pub(super) delimiter: char,
47 pub(super) comment_prefix: Option<char>,
48}
49impl<'a> CsvParser<'a> {
50 pub fn new(input: &'a str, delimiter: char) -> Self {
52 Self {
53 input,
54 delimiter,
55 comment_prefix: None,
56 }
57 }
58 pub fn with_comment_prefix(mut self, prefix: char) -> Self {
60 self.comment_prefix = Some(prefix);
61 self
62 }
63 pub fn parse_all(self) -> std::result::Result<Vec<CsvRecord>, Error> {
67 let mut records = Vec::new();
68 let mut chars = self.input.chars().peekable();
69 'outer: loop {
70 if chars.peek().is_none() {
71 break;
72 }
73 let mut fields: Vec<String> = Vec::new();
74 let mut field = String::new();
75 let mut in_quotes = false;
76 loop {
77 match chars.next() {
78 None => {
79 if in_quotes {
80 return Err(Error::Parse(
81 "unterminated quoted field at EOF".to_string(),
82 ));
83 }
84 fields.push(field);
85 break;
86 }
87 Some('"') if !in_quotes => {
88 in_quotes = true;
89 }
90 Some('"') if in_quotes => {
91 if chars.peek() == Some(&'"') {
92 chars.next();
93 field.push('"');
94 } else {
95 in_quotes = false;
96 }
97 }
98 Some('\\') if in_quotes => match chars.next() {
99 Some('n') => field.push('\n'),
100 Some('t') => field.push('\t'),
101 Some('r') => field.push('\r'),
102 Some('"') => field.push('"'),
103 Some('\\') => field.push('\\'),
104 Some(c) => {
105 field.push('\\');
106 field.push(c);
107 }
108 None => {
109 return Err(Error::Parse("trailing backslash at EOF".to_string()));
110 }
111 },
112 Some('\r') if !in_quotes => {
113 if chars.peek() == Some(&'\n') {
114 chars.next();
115 }
116 fields.push(field);
117 break;
118 }
119 Some('\n') if !in_quotes => {
120 fields.push(field);
121 break;
122 }
123 Some(c) if c == self.delimiter && !in_quotes => {
124 fields.push(field.clone());
125 field = String::new();
126 }
127 Some(c) => {
128 field.push(c);
129 }
130 }
131 }
132 if let Some(prefix) = self.comment_prefix
133 && fields
134 .first()
135 .map(|f| f.trim_start().starts_with(prefix))
136 .unwrap_or(false)
137 {
138 continue 'outer;
139 }
140 if fields.len() == 1 && fields[0].trim().is_empty() {
141 continue 'outer;
142 }
143 records.push(CsvRecord { fields });
144 }
145 Ok(records)
146 }
147}
148#[derive(Debug, Clone, PartialEq)]
150pub struct CsvRecord {
151 pub fields: Vec<String>,
153}
154impl CsvRecord {
155 pub fn len(&self) -> usize {
157 self.fields.len()
158 }
159 pub fn is_empty(&self) -> bool {
161 self.fields.is_empty()
162 }
163 pub fn get(&self, index: usize) -> &str {
165 self.fields.get(index).map(|s| s.as_str()).unwrap_or("")
166 }
167}
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum PivotAgg {
171 Sum,
173 Mean,
175 Count,
177 Min,
179 Max,
181}
182#[derive(Debug, Clone)]
187pub struct CsvTable {
188 pub headers: Vec<String>,
190 pub rows: Vec<Vec<String>>,
192}
193impl CsvTable {
194 pub fn new(headers: Vec<String>) -> Self {
196 Self {
197 headers,
198 rows: Vec::new(),
199 }
200 }
201 pub fn parse(data: &str, delimiter: char) -> std::result::Result<Self, Error> {
203 let parser = CsvParser::new(data, delimiter).with_comment_prefix('#');
204 let mut records = parser.parse_all()?;
205 if records.is_empty() {
206 return Err(Error::Parse("CSV table is empty".to_string()));
207 }
208 let header_rec = records.remove(0);
209 let headers: Vec<String> = header_rec
210 .fields
211 .iter()
212 .map(|s| s.trim().to_string())
213 .collect();
214 let ncols = headers.len();
215 let mut rows = Vec::new();
216 for rec in records {
217 let mut row: Vec<String> = rec
218 .fields
219 .into_iter()
220 .map(|s| s.trim().to_string())
221 .collect();
222 while row.len() < ncols {
223 row.push(String::new());
224 }
225 rows.push(row);
226 }
227 Ok(Self { headers, rows })
228 }
229 pub fn to_csv_string(&self, delimiter: char) -> String {
231 let mut out = String::new();
232 out.push_str(&self.headers.join(&delimiter.to_string()));
233 out.push('\n');
234 for row in &self.rows {
235 let line: Vec<String> = row.iter().map(|f| quote_field(f, delimiter)).collect();
236 out.push_str(&line.join(&delimiter.to_string()));
237 out.push('\n');
238 }
239 out
240 }
241 pub fn column_index(&self, name: &str) -> std::result::Result<usize, Error> {
243 self.headers
244 .iter()
245 .position(|h| h == name)
246 .ok_or_else(|| Error::Parse(format!("column '{}' not found", name)))
247 }
248 pub fn column_values(&self, name: &str) -> std::result::Result<Vec<&str>, Error> {
250 let idx = self.column_index(name)?;
251 Ok(self.rows.iter().map(|r| r[idx].as_str()).collect())
252 }
253 pub fn column_f64(&self, name: &str) -> std::result::Result<Vec<f64>, Error> {
255 let idx = self.column_index(name)?;
256 self.rows
257 .iter()
258 .enumerate()
259 .map(|(i, r)| {
260 let s = r[idx].trim();
261 if s.is_empty() {
262 Ok(f64::NAN)
263 } else {
264 s.parse::<f64>().map_err(|_| {
265 Error::Parse(format!("row {}: cannot parse '{}' as f64", i, s))
266 })
267 }
268 })
269 .collect()
270 }
271 pub fn row_count(&self) -> usize {
273 self.rows.len()
274 }
275 pub fn col_count(&self) -> usize {
277 self.headers.len()
278 }
279}
280pub struct InMemoryCsvWriter {
282 pub(super) columns: Vec<String>,
283 pub(super) delimiter: char,
284 pub(super) precision: usize,
285 pub(super) rows: Vec<Vec<f64>>,
286}
287impl InMemoryCsvWriter {
288 pub fn new(columns: &[&str], delimiter: char) -> Self {
290 Self {
291 columns: columns.iter().map(|s| s.to_string()).collect(),
292 delimiter,
293 precision: 6,
294 rows: Vec::new(),
295 }
296 }
297 pub fn with_precision(mut self, precision: usize) -> Self {
299 self.precision = precision;
300 self
301 }
302 pub fn write_row(&self, values: &[f64]) -> std::result::Result<String, Error> {
304 if values.len() != self.columns.len() {
305 return Err(Error::Parse(format!(
306 "expected {} values, got {}",
307 self.columns.len(),
308 values.len()
309 )));
310 }
311 let parts: Vec<String> = values
312 .iter()
313 .map(|v| format!("{:.prec$}", v, prec = self.precision))
314 .collect();
315 Ok(parts.join(&self.delimiter.to_string()))
316 }
317 pub fn write_header(&self) -> String {
319 self.columns.join(&self.delimiter.to_string())
320 }
321 pub fn add_row(&mut self, values: Vec<f64>) {
323 self.rows.push(values);
324 }
325 pub fn write_all(&self, rows: &[Vec<f64>]) -> String {
327 let mut out = self.write_header();
328 out.push('\n');
329 for row in rows {
330 if let Ok(line) = self.write_row(row) {
331 out.push_str(&line);
332 out.push('\n');
333 }
334 }
335 out
336 }
337}
338pub struct CsvReader;
340impl CsvReader {
341 pub fn read(path: &str) -> Result<(Vec<String>, Vec<Vec<f64>>)> {
345 let file = File::open(Path::new(path))?;
346 let reader = BufReader::new(file);
347 let mut lines = reader.lines();
348 let header_line = lines
349 .next()
350 .ok_or_else(|| Error::Parse("empty CSV file".to_string()))??;
351 let headers: Vec<String> = header_line
352 .split(',')
353 .map(|s| s.trim().to_string())
354 .collect();
355 let mut rows = Vec::new();
356 for line in lines {
357 let line = line?;
358 let trimmed = line.trim();
359 if trimmed.is_empty() {
360 continue;
361 }
362 let row: std::result::Result<Vec<f64>, _> = trimmed
363 .split(',')
364 .map(|s| s.trim().parse::<f64>())
365 .collect();
366 let row = row.map_err(|e| Error::Parse(e.to_string()))?;
367 rows.push(row);
368 }
369 Ok((headers, rows))
370 }
371}
372#[derive(Debug, Clone, PartialEq, Eq)]
374pub enum ColumnType {
375 Integer,
377 Float,
379 Boolean,
381 Text,
383 Empty,
385}
386pub struct CsvStreamParser {
405 pub(super) reader: BufReader<File>,
406 pub(super) delimiter: char,
407 pub(super) headers: Vec<String>,
408 pub(super) pending: String,
410 pub(super) open_quotes: bool,
411}
412impl CsvStreamParser {
413 pub fn open(path: &str, delimiter: char) -> std::result::Result<Self, Error> {
415 let file = File::open(Path::new(path))?;
416 let mut reader = BufReader::new(file);
417 let mut header_line = String::new();
418 reader.read_line(&mut header_line)?;
419 let headers = split_csv_line(
420 header_line.trim_end_matches('\n').trim_end_matches('\r'),
421 delimiter,
422 )
423 .into_iter()
424 .map(|s| s.trim().to_string())
425 .collect();
426 Ok(Self {
427 reader,
428 delimiter,
429 headers,
430 pending: String::new(),
431 open_quotes: false,
432 })
433 }
434 pub fn headers(&self) -> &[String] {
436 &self.headers
437 }
438 pub fn next_record(&mut self) -> std::result::Result<Option<CsvRecord>, Error> {
442 loop {
443 let mut line = String::new();
444 let bytes_read = self.reader.read_line(&mut line)?;
445 if bytes_read == 0 {
446 if self.pending.is_empty() {
447 return Ok(None);
448 }
449 if self.open_quotes {
450 return Err(Error::Parse("unterminated quoted field at EOF".to_string()));
451 }
452 let record = self.flush_pending()?;
453 return Ok(Some(record));
454 }
455 for ch in line.chars() {
456 if ch == '"' {
457 self.open_quotes = !self.open_quotes;
458 }
459 }
460 self.pending.push_str(&line);
461 if !self.open_quotes {
462 let record = self.flush_pending()?;
463 if record.fields.len() == 1 && record.fields[0].trim().is_empty() {
464 continue;
465 }
466 return Ok(Some(record));
467 }
468 }
469 }
470 fn flush_pending(&mut self) -> std::result::Result<CsvRecord, Error> {
471 let line = std::mem::take(&mut self.pending);
472 let trimmed = line.trim_end_matches('\n').trim_end_matches('\r');
473 let fields = split_csv_line(trimmed, self.delimiter);
474 Ok(CsvRecord {
475 fields: fields.into_iter().map(|s| s.trim().to_string()).collect(),
476 })
477 }
478}
479pub struct InMemoryCsvReader {
487 pub(super) headers: Vec<String>,
488 pub(super) rows: Vec<Vec<Option<f64>>>,
489}
490impl InMemoryCsvReader {
491 pub fn parse_with_delimiter(data: &str, delim: char) -> std::result::Result<Self, Error> {
493 let mut non_empty_lines: Vec<&str> = data
494 .lines()
495 .filter(|l| {
496 let t = l.trim();
497 !t.is_empty() && !t.starts_with('#')
498 })
499 .collect();
500 if non_empty_lines.is_empty() {
501 return Err(Error::Parse("CSV input is empty".to_string()));
502 }
503 let header_line = non_empty_lines.remove(0);
504 let headers: Vec<String> = split_csv_line(header_line, delim)
505 .into_iter()
506 .map(|s| s.trim().to_string())
507 .collect();
508 if headers.is_empty() {
509 return Err(Error::Parse("no headers found".to_string()));
510 }
511 let mut rows: Vec<Vec<Option<f64>>> = Vec::new();
512 for line in &non_empty_lines {
513 let fields = split_csv_line(line, delim);
514 let parsed: Vec<Option<f64>> = fields
515 .iter()
516 .map(|f| {
517 let t = f.trim();
518 if t.is_empty() {
519 None
520 } else {
521 t.parse::<f64>().ok()
522 }
523 })
524 .collect();
525 rows.push(parsed);
526 }
527 Ok(Self { headers, rows })
528 }
529 pub fn get_column_f64(&self, name: &str) -> std::result::Result<Vec<f64>, Error> {
533 let idx = self
534 .headers
535 .iter()
536 .position(|h| h == name)
537 .ok_or_else(|| Error::Parse(format!("column '{}' not found", name)))?;
538 let col: Vec<f64> = self
539 .rows
540 .iter()
541 .map(|row| row.get(idx).copied().flatten().unwrap_or(f64::NAN))
542 .collect();
543 Ok(col)
544 }
545 pub fn get_row_count(&self) -> usize {
547 self.rows.len()
548 }
549 pub fn headers(&self) -> &[String] {
551 &self.headers
552 }
553 pub fn column_stats(&self, name: &str) -> std::result::Result<(f64, f64, f64, f64), Error> {
555 let col = self.get_column_f64(name)?;
556 let valid: Vec<f64> = col.into_iter().filter(|v| !v.is_nan()).collect();
557 if valid.is_empty() {
558 return Err(Error::Parse(format!(
559 "column '{}' has no valid numeric data",
560 name
561 )));
562 }
563 let n = valid.len() as f64;
564 let min = valid.iter().cloned().fold(f64::INFINITY, f64::min);
565 let max = valid.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
566 let mean = valid.iter().sum::<f64>() / n;
567 let variance = valid.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
568 let std = variance.sqrt();
569 Ok((min, max, mean, std))
570 }
571}
572impl std::str::FromStr for InMemoryCsvReader {
573 type Err = Error;
574 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
575 Self::parse_with_delimiter(s, ',')
576 }
577}
578pub struct TypedCsvReader {
595 pub(super) table: CsvTable,
596}
597impl TypedCsvReader {
598 pub fn with_delimiter(data: &str, delimiter: char) -> std::result::Result<Self, Error> {
600 let table = CsvTable::parse(data, delimiter)?;
601 Ok(Self { table })
602 }
603 pub fn column_type(&self, name: &str) -> std::result::Result<ColumnType, Error> {
605 let idx = self.table.column_index(name)?;
606 let values: Vec<&str> = self.table.rows.iter().map(|r| r[idx].as_str()).collect();
607 Ok(infer_column_type(&values))
608 }
609 pub fn column_as_i64(&self, name: &str) -> std::result::Result<Vec<i64>, Error> {
611 let idx = self.table.column_index(name)?;
612 self.table
613 .rows
614 .iter()
615 .enumerate()
616 .map(|(i, r)| {
617 let s = r[idx].trim();
618 s.parse::<i64>()
619 .map_err(|_| Error::Parse(format!("row {}: cannot parse '{}' as i64", i, s)))
620 })
621 .collect()
622 }
623 pub fn column_as_f64(&self, name: &str) -> std::result::Result<Vec<f64>, Error> {
625 self.table.column_f64(name)
626 }
627 pub fn column_as_bool(&self, name: &str) -> std::result::Result<Vec<bool>, Error> {
631 let idx = self.table.column_index(name)?;
632 self.table
633 .rows
634 .iter()
635 .enumerate()
636 .map(|(i, r)| {
637 let s = r[idx].trim().to_lowercase();
638 match s.as_str() {
639 "true" | "1" | "yes" => Ok(true),
640 "false" | "0" | "no" => Ok(false),
641 other => Err(Error::Parse(format!(
642 "row {}: cannot parse '{}' as bool",
643 i, other
644 ))),
645 }
646 })
647 .collect()
648 }
649 pub fn table(&self) -> &CsvTable {
651 &self.table
652 }
653 pub fn headers(&self) -> &[String] {
655 &self.table.headers
656 }
657 pub fn row_count(&self) -> usize {
659 self.table.row_count()
660 }
661}
662impl std::str::FromStr for TypedCsvReader {
663 type Err = Error;
664 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
665 let table = CsvTable::parse(s, ',')?;
666 Ok(Self { table })
667 }
668}
669#[derive(Debug, Clone)]
671pub struct CsvDiff {
672 pub removed: Vec<Vec<String>>,
674 pub added: Vec<Vec<String>>,
676 pub changed: Vec<CsvChangedRow>,
678}
679pub struct CsvWriter {
681 pub(super) writer: BufWriter<File>,
682}
683impl CsvWriter {
684 pub fn new(path: &str, headers: &[&str]) -> Result<Self> {
686 let file = File::create(Path::new(path))?;
687 let mut writer = BufWriter::new(file);
688 writeln!(writer, "{}", headers.join(","))?;
689 writer.flush()?;
690 let file = OpenOptions::new().append(true).open(Path::new(path))?;
691 let writer = BufWriter::new(file);
692 Ok(Self { writer })
693 }
694 pub fn write_row(&mut self, values: &[f64]) -> Result<()> {
696 let row: Vec<String> = values.iter().map(|v| v.to_string()).collect();
697 writeln!(self.writer, "{}", row.join(","))?;
698 self.writer.flush()?;
699 Ok(())
700 }
701}
702#[derive(Debug, Clone)]
704pub struct CsvChangedRow {
705 pub key: String,
707 pub before: Vec<String>,
709 pub after: Vec<String>,
711}
712pub struct ConfigurableCsvWriter {
728 pub(super) config: CsvWriterConfig,
729 pub(super) buffer: String,
730}
731impl ConfigurableCsvWriter {
732 pub fn new(config: CsvWriterConfig) -> Self {
734 Self {
735 config,
736 buffer: String::new(),
737 }
738 }
739 pub fn write_header(&mut self, headers: &[&str]) {
741 let line = if self.config.quote_all {
742 headers
743 .iter()
744 .map(|h| format!("\"{}\"", h.replace('"', "\"\"")))
745 .collect::<Vec<_>>()
746 .join(&self.config.delimiter.to_string())
747 } else {
748 headers
749 .iter()
750 .map(|h| quote_field(h, self.config.delimiter))
751 .collect::<Vec<_>>()
752 .join(&self.config.delimiter.to_string())
753 };
754 self.buffer.push_str(&line);
755 self.buffer.push_str(&self.config.line_ending);
756 }
757 pub fn write_f64_row(&mut self, values: &[f64]) {
759 let prec = self.config.precision;
760 let line: Vec<String> = values
761 .iter()
762 .map(|v| format!("{:.prec$}", v, prec = prec))
763 .collect();
764 self.buffer
765 .push_str(&line.join(&self.config.delimiter.to_string()));
766 self.buffer.push_str(&self.config.line_ending);
767 }
768 pub fn write_str_row(&mut self, values: &[&str]) {
770 let delim = self.config.delimiter;
771 let line: Vec<String> = values
772 .iter()
773 .map(|v| {
774 if self.config.quote_all {
775 format!("\"{}\"", v.replace('"', "\"\""))
776 } else {
777 quote_field(v, delim)
778 }
779 })
780 .collect();
781 self.buffer
782 .push_str(&line.join(&self.config.delimiter.to_string()));
783 self.buffer.push_str(&self.config.line_ending);
784 }
785 pub fn finish(self) -> String {
787 self.buffer
788 }
789}