1use crate::column::Column;
7use crate::dataframe::{DataFrame, DataError};
8
9#[derive(Debug, Clone)]
13pub struct CsvConfig {
14 pub delimiter: u8,
16 pub has_header: bool,
18 pub max_rows: Option<usize>,
20 pub trim_whitespace: bool,
22}
23
24impl Default for CsvConfig {
25 fn default() -> Self {
26 CsvConfig {
27 delimiter: b',',
28 has_header: true,
29 max_rows: None,
30 trim_whitespace: true,
31 }
32 }
33}
34
35pub struct CsvReader {
42 config: CsvConfig,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46enum InferredType {
47 Int,
48 Float,
49 Bool,
50 Str,
51}
52
53fn infer_type(s: &str) -> InferredType {
54 let t = s.trim();
55 if t == "true" || t == "false" || t == "1" || t == "0" {
56 return InferredType::Bool;
57 }
58 let digits = t.strip_prefix('-').unwrap_or(t);
59 if !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit()) {
60 return InferredType::Int;
61 }
62 let no_sign = t.strip_prefix('-').unwrap_or(t);
63 let dot_count = no_sign.chars().filter(|&c| c == '.').count();
64 if dot_count == 1 {
65 let without_dot: String = no_sign.chars().filter(|&c| c != '.').collect();
66 if !without_dot.is_empty() && without_dot.bytes().all(|b| b.is_ascii_digit()) {
67 return InferredType::Float;
68 }
69 }
70 if t.parse::<f64>().is_ok() {
71 return InferredType::Float;
72 }
73 InferredType::Str
74}
75
76fn split_fields<'a>(row: &'a [u8], delimiter: u8) -> Vec<&'a str> {
77 let mut fields = Vec::new();
78 let mut start = 0usize;
79 for i in 0..row.len() {
80 if row[i] == delimiter {
81 let field = std::str::from_utf8(&row[start..i]).unwrap_or("");
82 fields.push(field);
83 start = i + 1;
84 }
85 }
86 let tail = &row[start..];
87 let tail = tail.strip_suffix(b"\r").unwrap_or(tail);
88 let field = std::str::from_utf8(tail).unwrap_or("");
89 fields.push(field);
90 fields
91}
92
93impl CsvReader {
94 pub fn new(config: CsvConfig) -> Self {
96 CsvReader { config }
97 }
98
99 pub fn parse(&self, input: &[u8]) -> Result<DataFrame, DataError> {
101 if input.is_empty() {
102 return Ok(DataFrame::new());
103 }
104
105 let rows: Vec<&[u8]> = input
106 .split(|&b| b == b'\n')
107 .filter(|r| !r.is_empty() && *r != b"\r")
108 .collect();
109
110 if rows.is_empty() {
111 return Ok(DataFrame::new());
112 }
113
114 let delim = self.config.delimiter;
115
116 let (header_names, data_rows) = if self.config.has_header {
117 let names: Vec<String> = split_fields(rows[0], delim)
118 .into_iter()
119 .map(|s| {
120 if self.config.trim_whitespace {
121 s.trim().to_string()
122 } else {
123 s.to_string()
124 }
125 })
126 .collect();
127 (names, &rows[1..])
128 } else {
129 let ncols = split_fields(rows[0], delim).len();
130 let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
131 (names, &rows[..])
132 };
133
134 let ncols = header_names.len();
135 if ncols == 0 {
136 return Ok(DataFrame::new());
137 }
138
139 let data_rows = if let Some(max) = self.config.max_rows {
140 &data_rows[..data_rows.len().min(max)]
141 } else {
142 data_rows
143 };
144
145 if data_rows.is_empty() {
146 let columns: Vec<(String, Column)> = header_names
147 .into_iter()
148 .map(|name| (name, Column::Str(Vec::new())))
149 .collect();
150 return DataFrame::from_columns(columns);
151 }
152
153 let first_fields = split_fields(data_rows[0], delim);
154 let mut col_types: Vec<InferredType> = first_fields
155 .iter()
156 .map(|s| {
157 let s = if self.config.trim_whitespace {
158 s.trim()
159 } else {
160 *s
161 };
162 infer_type(s)
163 })
164 .collect();
165
166 while col_types.len() < ncols {
167 col_types.push(InferredType::Str);
168 }
169
170 let nrows = data_rows.len();
171 let mut int_bufs: Vec<Option<Vec<i64>>> = vec![None; ncols];
172 let mut float_bufs: Vec<Option<Vec<f64>>> = vec![None; ncols];
173 let mut bool_bufs: Vec<Option<Vec<bool>>> = vec![None; ncols];
174 let mut str_bufs: Vec<Option<Vec<String>>> = vec![None; ncols];
175
176 for (i, &t) in col_types.iter().enumerate() {
177 match t {
178 InferredType::Int => int_bufs[i] = Some(Vec::with_capacity(nrows)),
179 InferredType::Float => float_bufs[i] = Some(Vec::with_capacity(nrows)),
180 InferredType::Bool => bool_bufs[i] = Some(Vec::with_capacity(nrows)),
181 InferredType::Str => str_bufs[i] = Some(Vec::with_capacity(nrows)),
182 }
183 }
184
185 for &row_bytes in data_rows.iter() {
186 let fields = split_fields(row_bytes, delim);
187 for col_idx in 0..ncols {
188 let raw = if col_idx < fields.len() {
189 fields[col_idx]
190 } else {
191 ""
192 };
193 let s = if self.config.trim_whitespace {
194 raw.trim()
195 } else {
196 raw
197 };
198
199 match col_types[col_idx] {
200 InferredType::Int => {
201 let v = s.parse::<i64>().unwrap_or(0);
202 int_bufs[col_idx].as_mut().unwrap().push(v);
203 }
204 InferredType::Float => {
205 let v = s.parse::<f64>().unwrap_or(0.0);
206 float_bufs[col_idx].as_mut().unwrap().push(v);
207 }
208 InferredType::Bool => {
209 let v = matches!(s, "true" | "1");
210 bool_bufs[col_idx].as_mut().unwrap().push(v);
211 }
212 InferredType::Str => {
213 str_bufs[col_idx].as_mut().unwrap().push(s.to_string());
214 }
215 }
216 }
217 }
218
219 let mut columns: Vec<(String, Column)> = Vec::with_capacity(ncols);
220 for (i, name) in header_names.into_iter().enumerate() {
221 let col = match col_types[i] {
222 InferredType::Int => Column::Int(int_bufs[i].take().unwrap()),
223 InferredType::Float => Column::Float(float_bufs[i].take().unwrap()),
224 InferredType::Bool => Column::Bool(bool_bufs[i].take().unwrap()),
225 InferredType::Str => Column::Str(str_bufs[i].take().unwrap()),
226 };
227 columns.push((name, col));
228 }
229
230 DataFrame::from_columns(columns)
231 }
232}
233
234pub struct StreamingCsvProcessor {
239 config: CsvConfig,
240}
241
242impl StreamingCsvProcessor {
243 pub fn new(config: CsvConfig) -> Self {
245 StreamingCsvProcessor { config }
246 }
247
248 pub fn sum_columns(&self, input: &[u8]) -> Result<(Vec<String>, Vec<f64>, usize), DataError> {
252 if input.is_empty() {
253 return Ok((vec![], vec![], 0));
254 }
255
256 let rows: Vec<&[u8]> = input
257 .split(|&b| b == b'\n')
258 .filter(|r| !r.is_empty() && *r != b"\r")
259 .collect();
260
261 if rows.is_empty() {
262 return Ok((vec![], vec![], 0));
263 }
264
265 let delim = self.config.delimiter;
266 let (header_names, data_rows) = if self.config.has_header {
267 let names: Vec<String> = split_fields(rows[0], delim)
268 .into_iter()
269 .map(|s| s.trim().to_string())
270 .collect();
271 (names, &rows[1..])
272 } else {
273 let ncols = split_fields(rows[0], delim).len();
274 let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
275 (names, &rows[..])
276 };
277
278 let ncols = header_names.len();
279 let mut sums: Vec<f64> = vec![0.0; ncols];
280 let mut comp: Vec<f64> = vec![0.0; ncols];
281 let mut row_count = 0usize;
282
283 let data_rows = if let Some(max) = self.config.max_rows {
284 &data_rows[..data_rows.len().min(max)]
285 } else {
286 data_rows
287 };
288
289 for &row_bytes in data_rows {
290 let fields = split_fields(row_bytes, delim);
291 for col_idx in 0..ncols {
292 let s = if col_idx < fields.len() {
293 if self.config.trim_whitespace {
294 fields[col_idx].trim()
295 } else {
296 fields[col_idx]
297 }
298 } else {
299 ""
300 };
301 let v: f64 = s.parse().unwrap_or(0.0);
302 let y = v - comp[col_idx];
303 let t = sums[col_idx] + y;
304 comp[col_idx] = (t - sums[col_idx]) - y;
305 sums[col_idx] = t;
306 }
307 row_count += 1;
308 }
309
310 Ok((header_names, sums, row_count))
311 }
312
313 pub fn minmax_columns(
317 &self,
318 input: &[u8],
319 ) -> Result<(Vec<String>, Vec<f64>, Vec<f64>, usize), DataError> {
320 if input.is_empty() {
321 return Ok((vec![], vec![], vec![], 0));
322 }
323
324 let rows: Vec<&[u8]> = input
325 .split(|&b| b == b'\n')
326 .filter(|r| !r.is_empty() && *r != b"\r")
327 .collect();
328
329 if rows.is_empty() {
330 return Ok((vec![], vec![], vec![], 0));
331 }
332
333 let delim = self.config.delimiter;
334 let (header_names, data_rows) = if self.config.has_header {
335 let names: Vec<String> = split_fields(rows[0], delim)
336 .into_iter()
337 .map(|s| s.trim().to_string())
338 .collect();
339 (names, &rows[1..])
340 } else {
341 let ncols = split_fields(rows[0], delim).len();
342 let names = (0..ncols).map(|i| format!("col_{}", i)).collect();
343 (names, &rows[..])
344 };
345
346 let ncols = header_names.len();
347 let mut mins: Vec<f64> = vec![f64::INFINITY; ncols];
348 let mut maxs: Vec<f64> = vec![f64::NEG_INFINITY; ncols];
349 let mut row_count = 0usize;
350
351 let data_rows = if let Some(max) = self.config.max_rows {
352 &data_rows[..data_rows.len().min(max)]
353 } else {
354 data_rows
355 };
356
357 for &row_bytes in data_rows {
358 let fields = split_fields(row_bytes, delim);
359 for col_idx in 0..ncols {
360 let s = if col_idx < fields.len() {
361 fields[col_idx].trim()
362 } else {
363 ""
364 };
365 if let Ok(v) = s.parse::<f64>() {
366 if v < mins[col_idx] {
367 mins[col_idx] = v;
368 }
369 if v > maxs[col_idx] {
370 maxs[col_idx] = v;
371 }
372 }
373 }
374 row_count += 1;
375 }
376
377 Ok((header_names, mins, maxs, row_count))
378 }
379}
380
381#[cfg(test)]
384mod tests {
385 use super::*;
386
387 #[test]
388 fn test_parse_basic_csv() {
389 let csv = b"name,age,score\nAlice,30,9.5\nBob,25,8.1";
390 let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
391 assert_eq!(df.nrows(), 2);
392 assert_eq!(df.ncols(), 3);
393 }
394
395 #[test]
396 fn test_parse_empty() {
397 let df = CsvReader::new(CsvConfig::default()).parse(b"").unwrap();
398 assert_eq!(df.nrows(), 0);
399 }
400
401 #[test]
402 fn test_parse_header_only() {
403 let csv = b"x,y,z\n";
404 let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
405 assert_eq!(df.nrows(), 0);
406 assert_eq!(df.ncols(), 3);
407 }
408
409 #[test]
410 fn test_parse_type_inference() {
411 let csv = b"a,b,c,d\n42,3.14,true,hello\n10,2.71,false,world";
412 let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
413 assert_eq!(df.nrows(), 2);
414 assert!(matches!(df.get_column("a"), Some(Column::Int(_))));
416 assert!(matches!(df.get_column("b"), Some(Column::Float(_))));
417 assert!(matches!(df.get_column("c"), Some(Column::Bool(_))));
418 assert!(matches!(df.get_column("d"), Some(Column::Str(_))));
419 }
420
421 #[test]
422 fn test_parse_tsv() {
423 let csv = b"x\ty\n1\t2\n3\t4";
424 let config = CsvConfig {
425 delimiter: b'\t',
426 ..Default::default()
427 };
428 let df = CsvReader::new(config).parse(csv).unwrap();
429 assert_eq!(df.nrows(), 2);
430 assert_eq!(df.ncols(), 2);
431 }
432
433 #[test]
434 fn test_parse_max_rows() {
435 let csv = b"x\n1\n2\n3\n4\n5";
436 let config = CsvConfig {
437 max_rows: Some(3),
438 ..Default::default()
439 };
440 let df = CsvReader::new(config).parse(csv).unwrap();
441 assert_eq!(df.nrows(), 3);
442 }
443
444 #[test]
445 fn test_streaming_sum() {
446 let csv = b"x,y\n1.0,2.0\n3.0,4.0\n5.0,6.0";
447 let proc = StreamingCsvProcessor::new(CsvConfig::default());
448 let (headers, sums, count) = proc.sum_columns(csv).unwrap();
449 assert_eq!(headers, vec!["x", "y"]);
450 assert_eq!(count, 3);
451 assert!((sums[0] - 9.0).abs() < 1e-10);
452 assert!((sums[1] - 12.0).abs() < 1e-10);
453 }
454
455 #[test]
456 fn test_streaming_minmax() {
457 let csv = b"x,y\n1.0,6.0\n3.0,2.0\n5.0,4.0";
458 let proc = StreamingCsvProcessor::new(CsvConfig::default());
459 let (headers, mins, maxs, count) = proc.minmax_columns(csv).unwrap();
460 assert_eq!(headers, vec!["x", "y"]);
461 assert_eq!(count, 3);
462 assert!((mins[0] - 1.0).abs() < 1e-10);
463 assert!((maxs[0] - 5.0).abs() < 1e-10);
464 assert!((mins[1] - 2.0).abs() < 1e-10);
465 assert!((maxs[1] - 6.0).abs() < 1e-10);
466 }
467
468 #[test]
469 fn test_parse_windows_line_endings() {
470 let csv = b"a,b\r\n1,2\r\n3,4\r\n";
471 let df = CsvReader::new(CsvConfig::default()).parse(csv).unwrap();
472 assert_eq!(df.nrows(), 2);
473 }
474
475 #[test]
476 fn test_no_header() {
477 let csv = b"1,2,3\n4,5,6";
478 let config = CsvConfig {
479 has_header: false,
480 ..Default::default()
481 };
482 let df = CsvReader::new(config).parse(csv).unwrap();
483 assert_eq!(df.nrows(), 2);
484 assert_eq!(df.column_names(), vec!["col_0", "col_1", "col_2"]);
485 }
486}