1use crate::{Column, DataFrame, DataError};
7
8#[derive(Debug, Clone)]
12pub struct CsvConfig {
13 pub delimiter: u8,
15 pub has_header: bool,
17 pub max_rows: Option<usize>,
19 pub trim_whitespace: bool,
21}
22
23impl Default for CsvConfig {
24 fn default() -> Self {
25 CsvConfig {
26 delimiter: b',',
27 has_header: true,
28 max_rows: None,
29 trim_whitespace: true,
30 }
31 }
32}
33
34pub struct CsvReader {
62 config: CsvConfig,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67enum InferredType {
68 Int,
69 Float,
70 Bool,
71 Str,
72}
73
74fn infer_type(s: &str) -> InferredType {
76 let t = s.trim();
77 if t == "true" || t == "false" || t == "1" || t == "0" {
78 return InferredType::Bool;
79 }
80 let digits = t.strip_prefix('-').unwrap_or(t);
82 if !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit()) {
83 return InferredType::Int;
84 }
85 let no_sign = t.strip_prefix('-').unwrap_or(t);
87 let dot_count = no_sign.chars().filter(|&c| c == '.').count();
88 if dot_count == 1 {
89 let without_dot: String = no_sign.chars().filter(|&c| c != '.').collect();
90 if !without_dot.is_empty() && without_dot.bytes().all(|b| b.is_ascii_digit()) {
91 return InferredType::Float;
92 }
93 }
94 if t.parse::<f64>().is_ok() {
96 return InferredType::Float;
97 }
98 InferredType::Str
99}
100
101fn split_fields<'a>(row: &'a [u8], delimiter: u8) -> Vec<&'a str> {
104 let mut fields = Vec::new();
105 let mut start = 0usize;
106 for i in 0..row.len() {
107 if row[i] == delimiter {
108 let field = std::str::from_utf8(&row[start..i]).unwrap_or("");
109 fields.push(field);
110 start = i + 1;
111 }
112 }
113 let tail = &row[start..];
115 let tail = tail.strip_suffix(b"\r").unwrap_or(tail);
116 let field = std::str::from_utf8(tail).unwrap_or("");
117 fields.push(field);
118 fields
119}
120
121impl CsvReader {
122 pub fn new(config: CsvConfig) -> Self {
124 CsvReader { config }
125 }
126
127 pub fn parse(&self, input: &[u8]) -> Result<DataFrame, DataError> {
134 if input.is_empty() {
135 return Ok(DataFrame::new());
136 }
137
138 let rows: Vec<&[u8]> = input
140 .split(|&b| b == b'\n')
141 .filter(|r| !r.is_empty() && *r != b"\r")
142 .collect();
143
144 if rows.is_empty() {
145 return Ok(DataFrame::new());
146 }
147
148 let delim = self.config.delimiter;
149
150 let (header_names, data_rows) = if self.config.has_header {
152 let names: Vec<String> = split_fields(rows[0], delim)
153 .into_iter()
154 .map(|s| {
155 if self.config.trim_whitespace {
156 s.trim().to_string()
157 } else {
158 s.to_string()
159 }
160 })
161 .collect();
162 (names, &rows[1..])
163 } else {
164 let ncols = split_fields(rows[0], delim).len();
166 let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
167 (names, &rows[..])
168 };
169
170 let ncols = header_names.len();
171 if ncols == 0 {
172 return Ok(DataFrame::new());
173 }
174
175 let data_rows = if let Some(max) = self.config.max_rows {
177 &data_rows[..data_rows.len().min(max)]
178 } else {
179 data_rows
180 };
181
182 if data_rows.is_empty() {
183 let columns: Vec<(String, Column)> = header_names
185 .into_iter()
186 .map(|name| (name, Column::Str(Vec::new())))
187 .collect();
188 return DataFrame::from_columns(columns);
189 }
190
191 let first_fields = split_fields(data_rows[0], delim);
193 let mut col_types: Vec<InferredType> = first_fields
194 .iter()
195 .map(|s| {
196 let s = if self.config.trim_whitespace { s.trim() } else { *s };
197 infer_type(s)
198 })
199 .collect();
200
201 while col_types.len() < ncols {
203 col_types.push(InferredType::Str);
204 }
205
206 let nrows = data_rows.len();
208 let mut int_bufs: Vec<Option<Vec<i64>>> = vec![None; ncols];
209 let mut float_bufs: Vec<Option<Vec<f64>>> = vec![None; ncols];
210 let mut bool_bufs: Vec<Option<Vec<bool>>> = vec![None; ncols];
211 let mut str_bufs: Vec<Option<Vec<String>>> = vec![None; ncols];
212
213 for (i, &t) in col_types.iter().enumerate() {
214 match t {
215 InferredType::Int => int_bufs[i] = Some(Vec::with_capacity(nrows)),
216 InferredType::Float => float_bufs[i] = Some(Vec::with_capacity(nrows)),
217 InferredType::Bool => bool_bufs[i] = Some(Vec::with_capacity(nrows)),
218 InferredType::Str => str_bufs[i] = Some(Vec::with_capacity(nrows)),
219 }
220 }
221
222 for (row_idx, &row_bytes) in data_rows.iter().enumerate() {
224 let fields = split_fields(row_bytes, delim);
225 for col_idx in 0..ncols {
226 let raw = if col_idx < fields.len() {
227 fields[col_idx]
228 } else {
229 ""
231 };
232 let s = if self.config.trim_whitespace { raw.trim() } else { raw };
233
234 match col_types[col_idx] {
235 InferredType::Int => {
236 let v = s.parse::<i64>().unwrap_or(0);
237 int_bufs[col_idx].as_mut().unwrap().push(v);
238 }
239 InferredType::Float => {
240 let v = s.parse::<f64>().unwrap_or(0.0);
241 float_bufs[col_idx].as_mut().unwrap().push(v);
242 }
243 InferredType::Bool => {
244 let v = matches!(s, "true" | "1");
245 bool_bufs[col_idx].as_mut().unwrap().push(v);
246 }
247 InferredType::Str => {
248 str_bufs[col_idx].as_mut().unwrap().push(s.to_string());
249 }
250 }
251
252 let _ = row_idx; }
254 }
255
256 let mut columns: Vec<(String, Column)> = Vec::with_capacity(ncols);
258 for (i, name) in header_names.into_iter().enumerate() {
259 let col = match col_types[i] {
260 InferredType::Int => Column::Int(int_bufs[i].take().unwrap()),
261 InferredType::Float => Column::Float(float_bufs[i].take().unwrap()),
262 InferredType::Bool => Column::Bool(bool_bufs[i].take().unwrap()),
263 InferredType::Str => Column::Str(str_bufs[i].take().unwrap()),
264 };
265 columns.push((name, col));
266 }
267
268 DataFrame::from_columns(columns)
269 }
270}
271
272pub struct StreamingCsvProcessor {
288 config: CsvConfig,
289}
290
291impl StreamingCsvProcessor {
292 pub fn new(config: CsvConfig) -> Self {
294 StreamingCsvProcessor { config }
295 }
296
297 pub fn sum_columns(&self, input: &[u8]) -> Result<(Vec<String>, Vec<f64>, usize), DataError> {
302 if input.is_empty() {
303 return Ok((vec![], vec![], 0));
304 }
305
306 let rows: Vec<&[u8]> = input
307 .split(|&b| b == b'\n')
308 .filter(|r| !r.is_empty() && *r != b"\r")
309 .collect();
310
311 if rows.is_empty() {
312 return Ok((vec![], vec![], 0));
313 }
314
315 let delim = self.config.delimiter;
316 let (header_names, data_rows) = if self.config.has_header {
317 let names: Vec<String> = split_fields(rows[0], delim)
318 .into_iter()
319 .map(|s| s.trim().to_string())
320 .collect();
321 (names, &rows[1..])
322 } else {
323 let ncols = split_fields(rows[0], delim).len();
324 let names: Vec<String> = (0..ncols).map(|i| format!("col_{}", i)).collect();
325 (names, &rows[..])
326 };
327
328 let ncols = header_names.len();
329 let mut sums: Vec<f64> = vec![0.0; ncols];
331 let mut comp: Vec<f64> = vec![0.0; ncols];
332 let mut row_count = 0usize;
333
334 let data_rows = if let Some(max) = self.config.max_rows {
335 &data_rows[..data_rows.len().min(max)]
336 } else {
337 data_rows
338 };
339
340 for &row_bytes in data_rows {
341 let fields = split_fields(row_bytes, delim);
342 for col_idx in 0..ncols {
343 let s = if col_idx < fields.len() {
344 if self.config.trim_whitespace {
345 fields[col_idx].trim()
346 } else {
347 fields[col_idx]
348 }
349 } else {
350 ""
351 };
352 let v: f64 = s.parse().unwrap_or(0.0);
353 let y = v - comp[col_idx];
355 let t = sums[col_idx] + y;
356 comp[col_idx] = (t - sums[col_idx]) - y;
357 sums[col_idx] = t;
358 }
359 row_count += 1;
360 }
361
362 Ok((header_names, sums, row_count))
363 }
364
365 pub fn minmax_columns(
370 &self,
371 input: &[u8],
372 ) -> Result<(Vec<String>, Vec<f64>, Vec<f64>, usize), DataError> {
373 if input.is_empty() {
374 return Ok((vec![], vec![], vec![], 0));
375 }
376
377 let rows: Vec<&[u8]> = input
378 .split(|&b| b == b'\n')
379 .filter(|r| !r.is_empty() && *r != b"\r")
380 .collect();
381
382 if rows.is_empty() {
383 return Ok((vec![], vec![], vec![], 0));
384 }
385
386 let delim = self.config.delimiter;
387 let (header_names, data_rows) = if self.config.has_header {
388 let names: Vec<String> = split_fields(rows[0], delim)
389 .into_iter()
390 .map(|s| s.trim().to_string())
391 .collect();
392 (names, &rows[1..])
393 } else {
394 let ncols = split_fields(rows[0], delim).len();
395 let names = (0..ncols).map(|i| format!("col_{}", i)).collect();
396 (names, &rows[..])
397 };
398
399 let ncols = header_names.len();
400 let mut mins: Vec<f64> = vec![f64::INFINITY; ncols];
401 let mut maxs: Vec<f64> = vec![f64::NEG_INFINITY; ncols];
402 let mut row_count = 0usize;
403
404 let data_rows = if let Some(max) = self.config.max_rows {
405 &data_rows[..data_rows.len().min(max)]
406 } else {
407 data_rows
408 };
409
410 for &row_bytes in data_rows {
411 let fields = split_fields(row_bytes, delim);
412 for col_idx in 0..ncols {
413 let s = if col_idx < fields.len() {
414 fields[col_idx].trim()
415 } else {
416 ""
417 };
418 if let Ok(v) = s.parse::<f64>() {
419 if v < mins[col_idx] { mins[col_idx] = v; }
420 if v > maxs[col_idx] { maxs[col_idx] = v; }
421 }
422 }
423 row_count += 1;
424 }
425
426 Ok((header_names, mins, maxs, row_count))
427 }
428}