1use super::core::DataOperations;
4use super::types::SortOrder;
5use crate::regex_cache::where_clause_regex;
6use anyhow::Result;
7use rayon::prelude::*;
8
9#[derive(Clone, Copy)]
10enum RollingAgg {
11 Mean,
12 Sum,
13}
14
15struct QueryCondition {
16 column: usize,
17 operator: String,
18 value: String,
19}
20
21impl DataOperations {
22 pub fn query(&self, data: &[Vec<String>], where_clause: &str) -> Result<Vec<Vec<String>>> {
24 if data.is_empty() {
25 return Ok(Vec::new());
26 }
27
28 let header = &data[0];
29 let mut result = Vec::with_capacity(data.len());
30 result.push(header.clone());
31
32 let conditions = self.parse_where_clause(where_clause, header)?;
33
34 for row in data.iter().skip(1) {
35 if self.evaluate_conditions(row, &conditions, header)? {
36 result.push(row.clone());
37 }
38 }
39
40 Ok(result)
41 }
42
43 fn parse_where_clause(&self, clause: &str, header: &[String]) -> Result<Vec<QueryCondition>> {
44 let mut conditions = Vec::new();
45 let re_pattern = where_clause_regex();
46
47 for cap in re_pattern.captures_iter(clause) {
48 let col_name = cap.get(1).map(|m| m.as_str()).unwrap_or("");
49 let op = cap.get(2).map(|m| m.as_str()).unwrap_or("=");
50 let value = cap.get(3).map(|m| m.as_str().trim()).unwrap_or("");
51
52 let col_idx = header
53 .iter()
54 .position(|h| h == col_name)
55 .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", col_name))?;
56
57 conditions.push(QueryCondition {
58 column: col_idx,
59 operator: op.to_string(),
60 value: value.to_string(),
61 });
62 }
63
64 Ok(conditions)
65 }
66
67 fn evaluate_conditions(
68 &self,
69 row: &[String],
70 conditions: &[QueryCondition],
71 _header: &[String],
72 ) -> Result<bool> {
73 for cond in conditions {
74 let cell_value = row.get(cond.column).map(|s| s.as_str()).unwrap_or("");
75 if !self.evaluate_filter_condition(cell_value, &cond.operator, &cond.value)? {
76 return Ok(false);
77 }
78 }
79 Ok(true)
80 }
81
82 pub fn mutate(
84 &self,
85 data: &mut Vec<Vec<String>>,
86 new_col_name: &str,
87 formula: &str,
88 ) -> Result<()> {
89 if data.is_empty() {
90 return Ok(());
91 }
92
93 data[0].push(new_col_name.to_string());
94 let header = data[0].clone();
95
96 for row_idx in 1..data.len() {
97 let value = self.evaluate_row_formula(formula, &data[row_idx], &header)?;
98 data[row_idx].push(value);
99 }
100
101 Ok(())
102 }
103
104 fn evaluate_row_formula(
105 &self,
106 formula: &str,
107 row: &[String],
108 header: &[String],
109 ) -> Result<String> {
110 let mut expr = formula.to_string();
111
112 for (idx, col_name) in header.iter().enumerate() {
113 if expr.contains(col_name) {
114 let val = row.get(idx).cloned().unwrap_or_default();
115 expr = expr.replace(col_name, &val);
116 }
117 }
118
119 for idx in 0..row.len() {
120 let letter = (b'A' + idx as u8) as char;
121 let pattern = format!("{}", letter);
122 if expr.contains(&pattern) {
123 let val = row.get(idx).cloned().unwrap_or_default();
124 expr = expr.replace(&pattern, &val);
125 }
126 }
127
128 if let Ok(result) = self.eval_arithmetic(&expr) {
129 return Ok(format!("{:.2}", result));
130 }
131
132 Ok(expr)
133 }
134
135 pub(crate) fn eval_arithmetic(&self, expr: &str) -> Result<f64> {
136 let expr = expr.replace(" ", "");
137
138 if let Ok(n) = expr.parse::<f64>() {
139 return Ok(n);
140 }
141
142 if let Some(pos) = expr.rfind('*') {
143 let left = self.eval_arithmetic(&expr[..pos])?;
144 let right = self.eval_arithmetic(&expr[pos + 1..])?;
145 return Ok(left * right);
146 }
147 if let Some(pos) = expr.rfind('/') {
148 let left = self.eval_arithmetic(&expr[..pos])?;
149 let right = self.eval_arithmetic(&expr[pos + 1..])?;
150 if right == 0.0 {
151 anyhow::bail!("Division by zero");
152 }
153 return Ok(left / right);
154 }
155
156 let bytes = expr.as_bytes();
157 for i in (1..bytes.len()).rev() {
158 if bytes[i] == b'+' {
159 let left = self.eval_arithmetic(&expr[..i])?;
160 let right = self.eval_arithmetic(&expr[i + 1..])?;
161 return Ok(left + right);
162 }
163 if bytes[i] == b'-' {
164 let left = self.eval_arithmetic(&expr[..i])?;
165 let right = self.eval_arithmetic(&expr[i + 1..])?;
166 return Ok(left - right);
167 }
168 }
169
170 anyhow::bail!("Cannot evaluate: {}", expr)
171 }
172
173 pub fn astype(&self, data: &mut Vec<Vec<String>>, column: usize, dtype: &str) -> Result<usize> {
175 if data.is_empty() {
176 return Ok(0);
177 }
178
179 let mut converted = 0;
180 for row in data.iter_mut().skip(1) {
181 if let Some(cell) = row.get_mut(column) {
182 let new_val = match dtype.to_lowercase().as_str() {
183 "int" | "integer" => {
184 if let Ok(f) = cell.parse::<f64>() {
185 converted += 1;
186 (f as i64).to_string()
187 } else {
188 cell.clone()
189 }
190 }
191 "float" | "double" => {
192 if let Ok(f) = cell.parse::<f64>() {
193 converted += 1;
194 format!("{:.2}", f)
195 } else {
196 cell.clone()
197 }
198 }
199 "string" | "str" => {
200 converted += 1;
201 cell.clone()
202 }
203 "bool" | "boolean" => {
204 let lower = cell.to_lowercase();
205 converted += 1;
206 if lower == "true" || lower == "1" || lower == "yes" {
207 "true".to_string()
208 } else if lower == "false" || lower == "0" || lower == "no" {
209 "false".to_string()
210 } else {
211 cell.clone()
212 }
213 }
214 _ => anyhow::bail!("Unknown type: {}. Use: int, float, string, bool", dtype),
215 };
216 *cell = new_val;
217 }
218 }
219
220 Ok(converted)
221 }
222
223 pub fn sort_by_columns(
225 &self,
226 data: &mut Vec<Vec<String>>,
227 columns: &[(usize, SortOrder)],
228 ) -> Result<()> {
229 if data.len() <= 1 || columns.is_empty() {
230 return Ok(());
231 }
232
233 let header = data.remove(0);
234
235 data.par_sort_by(|a, b| {
237 for (col, order) in columns {
238 let val_a = a.get(*col).map(|s| s.as_str()).unwrap_or("");
239 let val_b = b.get(*col).map(|s| s.as_str()).unwrap_or("");
240
241 let cmp = match (val_a.parse::<f64>(), val_b.parse::<f64>()) {
242 (Ok(num_a), Ok(num_b)) => num_a
243 .partial_cmp(&num_b)
244 .unwrap_or(std::cmp::Ordering::Equal),
245 _ => val_a.cmp(val_b),
246 };
247
248 let cmp = match order {
249 SortOrder::Ascending => cmp,
250 SortOrder::Descending => cmp.reverse(),
251 };
252
253 if cmp != std::cmp::Ordering::Equal {
254 return cmp;
255 }
256 }
257 std::cmp::Ordering::Equal
258 });
259
260 data.insert(0, header);
261 Ok(())
262 }
263
264 pub fn apply_column<F>(&self, data: &mut Vec<Vec<String>>, column: usize, f: F) -> Result<()>
266 where
267 F: Fn(&str) -> String,
268 {
269 for row in data.iter_mut().skip(1) {
270 if let Some(cell) = row.get_mut(column) {
271 *cell = f(cell);
272 }
273 }
274 Ok(())
275 }
276
277 pub fn clip(
279 &self,
280 data: &mut Vec<Vec<String>>,
281 column: usize,
282 min: Option<f64>,
283 max: Option<f64>,
284 ) -> Result<usize> {
285 let mut clipped = 0;
286
287 for row in data.iter_mut().skip(1) {
288 if let Some(cell) = row.get_mut(column) {
289 if let Ok(val) = cell.parse::<f64>() {
290 let mut new_val = val;
291 if let Some(min_val) = min {
292 if val < min_val {
293 new_val = min_val;
294 clipped += 1;
295 }
296 }
297 if let Some(max_val) = max {
298 if val > max_val {
299 new_val = max_val;
300 clipped += 1;
301 }
302 }
303 if new_val != val {
304 *cell = format!("{:.2}", new_val);
305 }
306 }
307 }
308 }
309
310 Ok(clipped)
311 }
312
313 pub fn normalize(&self, data: &mut Vec<Vec<String>>, column: usize) -> Result<()> {
315 let values: Vec<f64> = data
316 .par_iter()
317 .skip(1)
318 .filter_map(|row| row.get(column))
319 .filter_map(|s| s.parse::<f64>().ok())
320 .collect();
321
322 if values.is_empty() {
323 return Ok(());
324 }
325
326 let (min_val, max_val) = if values.len() > 1000 {
328 let (min, max) = values.par_iter().fold(
329 || (f64::INFINITY, f64::NEG_INFINITY),
330 |(acc_min, acc_max), &val| (acc_min.min(val), acc_max.max(val)),
331 ).reduce(
332 || (f64::INFINITY, f64::NEG_INFINITY),
333 |(min1, max1), (min2, max2)| (min1.min(min2), max1.max(max2)),
334 );
335 (min, max)
336 } else {
337 let min_val = values.iter().cloned().fold(f64::INFINITY, f64::min);
338 let max_val = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
339 (min_val, max_val)
340 };
341
342 let range = max_val - min_val;
343
344 if range == 0.0 {
345 return Ok(());
346 }
347
348 for row in data.iter_mut().skip(1) {
349 if let Some(cell) = row.get_mut(column) {
350 if let Ok(val) = cell.parse::<f64>() {
351 let normalized = (val - min_val) / range;
352 *cell = format!("{:.4}", normalized);
353 }
354 }
355 }
356
357 Ok(())
358 }
359
360 pub fn zscore(&self, data: &mut Vec<Vec<String>>, column: usize) -> Result<()> {
365 let values: Vec<f64> = data
366 .iter()
367 .skip(1)
368 .filter_map(|row| row.get(column))
369 .filter_map(|s| s.parse::<f64>().ok())
370 .collect();
371
372 if values.len() < 2 {
373 return Ok(());
374 }
375
376 let n = values.len() as f64;
377 let mean = values.iter().sum::<f64>() / n;
378 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
379 let std = variance.sqrt();
380
381 if std < f64::EPSILON {
382 return Ok(());
383 }
384
385 for row in data.iter_mut().skip(1) {
386 if let Some(cell) = row.get_mut(column) {
387 if let Ok(val) = cell.parse::<f64>() {
388 let z = (val - mean) / std;
389 *cell = format!("{:.6}", z);
390 }
391 }
392 }
393
394 Ok(())
395 }
396
397 pub fn rolling_mean_column(
400 &self,
401 data: &mut Vec<Vec<String>>,
402 value_col: usize,
403 window: usize,
404 new_col_name: &str,
405 ) -> Result<()> {
406 self.rolling_column(data, value_col, window, new_col_name, RollingAgg::Mean)
407 }
408
409 pub fn rolling_sum_column(
411 &self,
412 data: &mut Vec<Vec<String>>,
413 value_col: usize,
414 window: usize,
415 new_col_name: &str,
416 ) -> Result<()> {
417 self.rolling_column(data, value_col, window, new_col_name, RollingAgg::Sum)
418 }
419
420 fn rolling_column(
421 &self,
422 data: &mut Vec<Vec<String>>,
423 value_col: usize,
424 window: usize,
425 new_col_name: &str,
426 agg: RollingAgg,
427 ) -> Result<()> {
428 if window == 0 {
429 anyhow::bail!("window must be >= 1");
430 }
431 if data.is_empty() {
432 return Ok(());
433 }
434
435 let max_len = data.iter().map(|r| r.len()).max().unwrap_or(0);
436 if value_col >= max_len {
437 anyhow::bail!("column index {} out of range (max {})", value_col, max_len.saturating_sub(1));
438 }
439
440 for row in data.iter_mut() {
441 while row.len() < max_len {
442 row.push(String::new());
443 }
444 }
445
446 data[0].push(new_col_name.to_string());
447
448 for i in 1..data.len() {
449 let win_start = i.saturating_sub(window - 1).max(1);
450 let vals: Vec<f64> = (win_start..=i)
451 .filter_map(|r| data[r].get(value_col).and_then(|s| s.parse::<f64>().ok()))
452 .collect();
453
454 let cell = if vals.is_empty() {
455 String::new()
456 } else {
457 match agg {
458 RollingAgg::Mean => {
459 let m = vals.iter().sum::<f64>() / vals.len() as f64;
460 format!("{:.6}", m)
461 }
462 RollingAgg::Sum => {
463 let s: f64 = vals.iter().sum();
464 format!("{:.6}", s)
465 }
466 }
467 };
468 data[i].push(cell);
469 }
470
471 Ok(())
472 }
473
474 pub fn parse_date(
476 &self,
477 data: &mut Vec<Vec<String>>,
478 column: usize,
479 from_format: &str,
480 to_format: &str,
481 ) -> Result<usize> {
482 use chrono::NaiveDate;
483
484 let mut converted = 0;
485 for row in data.iter_mut().skip(1) {
486 if let Some(cell) = row.get_mut(column) {
487 if cell.is_empty() {
488 continue;
489 }
490 if let Ok(date) = NaiveDate::parse_from_str(cell, from_format) {
491 *cell = date.format(to_format).to_string();
492 converted += 1;
493 }
494 }
495 }
496
497 Ok(converted)
498 }
499
500 pub fn regex_filter(
502 &self,
503 data: &[Vec<String>],
504 column: usize,
505 pattern: &str,
506 ) -> Result<Vec<Vec<String>>> {
507 let re = regex::Regex::new(pattern)?;
508
509 let mut result = Vec::with_capacity(data.len());
510 result.push(data[0].clone());
511
512 for row in data.iter().skip(1) {
513 if let Some(cell) = row.get(column) {
514 if re.is_match(cell) {
515 result.push(row.clone());
516 }
517 }
518 }
519
520 Ok(result)
521 }
522
523 pub fn regex_replace(
525 &self,
526 data: &mut Vec<Vec<String>>,
527 column: usize,
528 pattern: &str,
529 replacement: &str,
530 ) -> Result<usize> {
531 let re = regex::Regex::new(pattern)?;
532
533 let mut replaced = 0;
534 for row in data.iter_mut().skip(1) {
535 if let Some(cell) = row.get_mut(column) {
536 let new_val = re.replace_all(cell, replacement).to_string();
537 if &new_val != cell {
538 *cell = new_val;
539 replaced += 1;
540 }
541 }
542 }
543
544 Ok(replaced)
545 }
546
547 pub fn extract_date_part(
549 &self,
550 data: &mut Vec<Vec<String>>,
551 column: usize,
552 part: &str,
553 new_col_name: &str,
554 date_format: &str,
555 ) -> Result<()> {
556 use chrono::{Datelike, NaiveDate};
557
558 if data.is_empty() {
559 return Ok(());
560 }
561
562 data[0].push(new_col_name.to_string());
563
564 for row in data.iter_mut().skip(1) {
565 let value = if let Some(cell) = row.get(column) {
566 if let Ok(date) = NaiveDate::parse_from_str(cell, date_format) {
567 match part.to_lowercase().as_str() {
568 "year" => date.year().to_string(),
569 "month" => date.month().to_string(),
570 "day" => date.day().to_string(),
571 "weekday" => date.weekday().to_string(),
572 "quarter" => ((date.month() - 1) / 3 + 1).to_string(),
573 "dayofyear" => date.ordinal().to_string(),
574 _ => String::new(),
575 }
576 } else {
577 String::new()
578 }
579 } else {
580 String::new()
581 };
582 row.push(value);
583 }
584
585 Ok(())
586 }
587}