1use polars::prelude::*;
2use regex::Regex;
3use std::collections::HashMap;
4
5use crate::error::{DtransformError, Result};
6use crate::parser::ast::*;
7
8pub struct Executor {
9 variables: HashMap<String, DataFrame>,
10}
11
12fn auto_detect_delimiter(content: &str, file_extension: Option<&str>) -> Result<(char, bool)> {
15 if file_extension == Some("tsv") {
17 let needs_trim = content.lines().take(100).any(|line| {
18 line.trim() != line || line.contains(" ")
19 });
20 return Ok(('\t', needs_trim));
21 }
22
23 if file_extension == Some("csv") {
25 let needs_trim = content.lines().take(100).any(|line| {
27 line.trim() != line || line.contains(" ")
28 });
29 return Ok((',', needs_trim));
30 }
31
32 let sample_lines: Vec<&str> = content
34 .lines()
35 .filter(|l| !l.trim().is_empty())
36 .take(100)
37 .collect();
38
39 if sample_lines.is_empty() {
40 return Err(DtransformError::InvalidOperation(
41 "File is empty or contains no data".to_string()
42 ));
43 }
44
45 let needs_trim = sample_lines.iter().any(|line| {
47 line.trim() != *line || line.contains(" ")
48 });
49
50 let detection_lines: Vec<String> = if needs_trim {
52 sample_lines.iter().map(|line| {
53 let trimmed = line.trim();
54 trimmed.split_whitespace().collect::<Vec<_>>().join(" ")
55 }).collect()
56 } else {
57 sample_lines.iter().map(|s| s.to_string()).collect()
58 };
59
60 let delimiters = [',', '\t', '|', ';', ' '];
62 let mut delimiter_counts: HashMap<char, Vec<usize>> = HashMap::new();
63
64 for line in &detection_lines {
65 for &delim in &delimiters {
66 let count = line.matches(delim).count();
67 delimiter_counts.entry(delim).or_insert_with(Vec::new).push(count);
68 }
69 }
70
71 let mut best_delimiter = None;
75 let mut best_score = 0.0;
76
77 for (&delim, counts) in &delimiter_counts {
78 let non_zero_counts: Vec<usize> = counts.iter().filter(|&&c| c > 0).copied().collect();
80
81 if non_zero_counts.is_empty() {
82 continue;
83 }
84
85 let min = *non_zero_counts.iter().min().unwrap();
87 let max = *non_zero_counts.iter().max().unwrap();
88 let avg = non_zero_counts.iter().sum::<usize>() as f64 / non_zero_counts.len() as f64;
89
90 let consistency = non_zero_counts.len() as f64 / detection_lines.len() as f64;
93
94 if min == max || (max as f64 - min as f64) / avg < 0.3 {
96 let score = avg * consistency;
97 if score > best_score {
98 best_score = score;
99 best_delimiter = Some(delim);
100 }
101 }
102 }
103
104 match best_delimiter {
105 Some(delim) => Ok((delim, needs_trim)),
106 None => {
107 let all_zero = delimiter_counts.values().all(|counts| {
110 counts.iter().all(|&c| c == 0)
111 });
112
113 if all_zero {
114 Ok((',', needs_trim))
116 } else {
117 Err(DtransformError::InvalidOperation(
119 "Could not auto-detect delimiter. The file format is ambiguous.\n\n\
120 Please specify the delimiter explicitly:\n\
121 • Comma: read('file', delimiter=',')\n\
122 • Tab: read('file', delimiter='\\t')\n\
123 • Pipe: read('file', delimiter='|')\n\
124 • Semicolon: read('file', delimiter=';')\n\
125 • Space: read('file', delimiter=' ')".to_string()
126 ))
127 }
128 }
129 }
130}
131
132impl Executor {
133 pub fn new() -> Self {
134 Self {
135 variables: HashMap::new(),
136 }
137 }
138
139 pub fn execute_program(&mut self, program: Program) -> Result<Option<DataFrame>> {
140 let mut last_result = None;
141
142 for statement in program.statements {
143 match statement {
144 Statement::Assignment { name, pipeline } => {
145 let df = self.execute_pipeline(pipeline)?;
146 self.variables.insert(name, df);
147 }
149 Statement::Pipeline(pipeline) => {
150 let df = self.execute_pipeline(pipeline)?;
151 last_result = Some(df);
152 }
153 }
154 }
155
156 Ok(last_result)
157 }
158
159 pub fn execute_statement(&mut self, statement: Statement) -> Result<Option<DataFrame>> {
160 match statement {
161 Statement::Assignment { name, pipeline } => {
162 let df = self.execute_pipeline(pipeline)?;
163 self.variables.insert(name.clone(), df.clone());
164 Ok(Some(df))
165 }
166 Statement::Pipeline(pipeline) => {
167 let df = self.execute_pipeline(pipeline)?;
168 Ok(Some(df))
169 }
170 }
171 }
172
173 pub fn execute_pipeline(&mut self, pipeline: Pipeline) -> Result<DataFrame> {
174 let mut df = match pipeline.source {
175 Some(Source::Read(read_op)) => self.execute_read(read_op)?,
176 Some(Source::Variable(var_name)) => {
177 self.variables
178 .get(&var_name)
179 .ok_or_else(|| DtransformError::VariableNotFound(var_name.clone()))?
180 .clone()
181 }
182 None => {
183 return Err(DtransformError::InvalidOperation(
184 "Pipeline must start with a data source (read() or variable)".to_string(),
185 ));
186 }
187 };
188
189 for operation in pipeline.operations {
190 df = self.execute_operation(df, operation)?;
191 }
192
193 Ok(df)
194 }
195
196 fn execute_operation(&mut self, df: DataFrame, op: Operation) -> Result<DataFrame> {
197 match op {
198 Operation::Read(read_op) => self.execute_read(read_op),
199 Operation::Variable(_var_name) => {
200 Err(DtransformError::InvalidOperation(
202 "Variable references can only be used as pipeline sources, not as operations".to_string()
203 ))
204 }
205 Operation::Write(write_op) => self.execute_write(df, write_op),
206 Operation::Select(select_op) => self.execute_select(df, select_op),
207 Operation::Filter(filter_op) => self.execute_filter(df, filter_op),
208 Operation::Mutate(mutate_op) => self.execute_mutate(df, mutate_op),
209 Operation::Rename(rename_op) => self.execute_rename(df, rename_op),
210 Operation::RenameAll(rename_all_op) => self.execute_rename_all(df, rename_all_op),
211 Operation::Sort(sort_op) => self.execute_sort(df, sort_op),
212 Operation::Take(take_op) => self.execute_take(df, take_op),
213 Operation::Skip(skip_op) => self.execute_skip(df, skip_op),
214 Operation::Slice(slice_op) => self.execute_slice(df, slice_op),
215 Operation::Drop(drop_op) => self.execute_drop(df, drop_op),
216 Operation::Distinct(distinct_op) => self.execute_distinct(df, distinct_op),
217 }
218 }
219
220 fn check_duplicate_columns(&self, df: &DataFrame) -> Result<()> {
221 use std::collections::HashSet;
222 let column_names: Vec<String> = df.get_column_names().iter().map(|s| s.to_string()).collect();
223 let mut seen = HashSet::new();
224 let mut duplicates = Vec::new();
225
226 for name in &column_names {
227 if !seen.insert(name) {
228 duplicates.push(name.clone());
229 }
230 }
231
232 if !duplicates.is_empty() {
233 return Err(DtransformError::InvalidOperation(format!(
234 "File contains duplicate column names: {}. Malformed files with repeated columns are not allowed.",
235 duplicates.join(", ")
236 )));
237 }
238
239 Ok(())
240 }
241
242 fn execute_read(&self, op: ReadOp) -> Result<DataFrame> {
243 let path = std::path::Path::new(&op.path);
244
245 let format = op.format.as_deref().or_else(|| path.extension()?.to_str());
247
248 match format {
249 Some("csv") | Some("tsv") | None => {
250 let has_header = op.header.unwrap_or(true);
251 let skip_rows = op.skip_rows.unwrap_or(0);
252
253 let (delimiter, trim_whitespace) = if op.delimiter.is_none() || op.trim_whitespace.is_none() {
255 let content = std::fs::read_to_string(path)?;
257 let (detected_delim, detected_trim) = auto_detect_delimiter(&content, format)?;
258
259 (
260 op.delimiter.unwrap_or(detected_delim),
261 op.trim_whitespace.unwrap_or(detected_trim)
262 )
263 } else {
264 (op.delimiter.unwrap(), op.trim_whitespace.unwrap())
265 };
266
267 let result = if trim_whitespace {
268 let content = std::fs::read_to_string(path)?;
270 let trimmed_content: String = content
271 .lines()
272 .map(|line| {
273 let trimmed = line.trim();
275 trimmed.split_whitespace().collect::<Vec<_>>().join(" ")
277 })
278 .collect::<Vec<_>>()
279 .join("\n");
280
281 let cursor = std::io::Cursor::new(trimmed_content.as_bytes());
282 CsvReadOptions::default()
283 .with_has_header(has_header)
284 .with_skip_rows(skip_rows)
285 .with_parse_options(
286 CsvParseOptions::default()
287 .with_separator(delimiter as u8)
288 )
289 .into_reader_with_file_handle(cursor)
290 .finish()
291 } else {
292 CsvReadOptions::default()
294 .with_has_header(has_header)
295 .with_skip_rows(skip_rows)
296 .with_parse_options(
297 CsvParseOptions::default()
298 .with_separator(delimiter as u8)
299 )
300 .try_into_reader_with_file_path(Some(path.into()))?
301 .finish()
302 };
303
304 match result {
305 Ok(df) => {
306 self.check_duplicate_columns(&df)?;
307 Ok(df)
308 },
309 Err(e) => {
310 let error_msg = e.to_string();
311 if error_msg.contains("found more fields") || error_msg.contains("Schema") {
312 Err(DtransformError::InvalidOperation(
313 format!(
314 "CSV parsing error: Rows have different numbers of fields.\n\n\
315 The auto-detected settings may be incorrect:\n\
316 • Detected delimiter: {:?}\n\
317 • Detected trim_whitespace: {}\n\n\
318 Try specifying explicitly:\n\
319 • read('{}', delimiter=' ') # space-separated\n\
320 • read('{}', delimiter='\\t') # tab-separated\n\
321 • read('{}', trim_whitespace=true)\n\
322 • read('{}', skip_rows=N) # skip header lines",
323 delimiter, trim_whitespace,
324 path.display(), path.display(), path.display(), path.display()
325 )
326 ))
327 } else {
328 Err(DtransformError::PolarsError(e))
329 }
330 }
331 }
332 }
333 Some("json") => {
334 let file = std::fs::File::open(path)?;
335 let df = JsonReader::new(file).finish()?;
336 self.check_duplicate_columns(&df)?;
337 Ok(df)
338 }
339 Some("parquet") => {
340 let file = std::fs::File::open(path)?;
341 let df = ParquetReader::new(file).finish()?;
342 self.check_duplicate_columns(&df)?;
343 Ok(df)
344 }
345 Some(_) => {
346 let has_header = op.header.unwrap_or(true);
348 let skip_rows = op.skip_rows.unwrap_or(0);
349
350 let (delimiter, trim_whitespace) = if op.delimiter.is_none() || op.trim_whitespace.is_none() {
352 let content = std::fs::read_to_string(path)?;
354 let (detected_delim, detected_trim) = auto_detect_delimiter(&content, format)?;
355
356 (
357 op.delimiter.unwrap_or(detected_delim),
358 op.trim_whitespace.unwrap_or(detected_trim)
359 )
360 } else {
361 (op.delimiter.unwrap(), op.trim_whitespace.unwrap())
362 };
363
364 let result = if trim_whitespace {
365 let content = std::fs::read_to_string(path)?;
367 let trimmed_content: String = content
368 .lines()
369 .map(|line| {
370 let trimmed = line.trim();
372 trimmed.split_whitespace().collect::<Vec<_>>().join(" ")
374 })
375 .collect::<Vec<_>>()
376 .join("\n");
377
378 let cursor = std::io::Cursor::new(trimmed_content.as_bytes());
379 CsvReadOptions::default()
380 .with_has_header(has_header)
381 .with_skip_rows(skip_rows)
382 .with_parse_options(
383 CsvParseOptions::default()
384 .with_separator(delimiter as u8)
385 )
386 .into_reader_with_file_handle(cursor)
387 .finish()
388 } else {
389 CsvReadOptions::default()
391 .with_has_header(has_header)
392 .with_skip_rows(skip_rows)
393 .with_parse_options(
394 CsvParseOptions::default()
395 .with_separator(delimiter as u8)
396 )
397 .try_into_reader_with_file_path(Some(path.into()))?
398 .finish()
399 };
400
401 match result {
402 Ok(df) => {
403 self.check_duplicate_columns(&df)?;
404 Ok(df)
405 },
406 Err(e) => {
407 let error_msg = e.to_string();
408 if error_msg.contains("found more fields") || error_msg.contains("Schema") {
409 Err(DtransformError::InvalidOperation(
410 format!(
411 "CSV parsing error: Rows have different numbers of fields.\n\n\
412 The auto-detected settings may be incorrect:\n\
413 • Detected delimiter: {:?}\n\
414 • Detected trim_whitespace: {}\n\n\
415 Try specifying explicitly:\n\
416 • read('{}', delimiter=' ') # space-separated\n\
417 • read('{}', delimiter='\\t') # tab-separated\n\
418 • read('{}', trim_whitespace=true)\n\
419 • read('{}', skip_rows=N) # skip header lines",
420 delimiter, trim_whitespace,
421 path.display(), path.display(), path.display(), path.display()
422 )
423 ))
424 } else {
425 Err(DtransformError::PolarsError(e))
426 }
427 }
428 }
429 }
430 }
431 }
432
433 fn execute_write(&self, df: DataFrame, op: WriteOp) -> Result<DataFrame> {
434 let path = std::path::Path::new(&op.path);
435 let format = op.format.as_deref().or_else(|| path.extension()?.to_str());
436
437 match format {
438 Some("csv") | Some("tsv") | None => {
439 let mut file = std::fs::File::create(path)?;
440 let delimiter = op.delimiter.unwrap_or(if format == Some("tsv") { '\t' } else { ',' });
441 let has_header = op.header.unwrap_or(true); CsvWriter::new(&mut file)
444 .with_separator(delimiter as u8)
445 .include_header(has_header)
446 .finish(&mut df.clone())?;
447 }
448 Some("json") => {
449 let mut file = std::fs::File::create(path)?;
450 JsonWriter::new(&mut file)
451 .finish(&mut df.clone())?;
452 }
453 Some("parquet") => {
454 let mut file = std::fs::File::create(path)?;
455 ParquetWriter::new(&mut file)
456 .finish(&mut df.clone())?;
457 }
458 Some(_) => {
459 let mut file = std::fs::File::create(path)?;
461 let delimiter = op.delimiter.unwrap_or(',');
462 let has_header = op.header.unwrap_or(true);
463
464 CsvWriter::new(&mut file)
465 .with_separator(delimiter as u8)
466 .include_header(has_header)
467 .finish(&mut df.clone())?;
468 }
469 }
470
471 Ok(df)
472 }
473
474 fn execute_select(&self, df: DataFrame, op: SelectOp) -> Result<DataFrame> {
475 let schema = df.schema();
476 let mut selected_columns = Vec::new();
477 let mut aliases = Vec::new();
478
479 for (selector, alias) in op.selectors {
480 let cols = self.resolve_selector(&selector, &schema, &df)?;
481
482 for col in cols {
485 selected_columns.push(col);
486 aliases.push(alias.clone());
487 }
488 }
489
490 if selected_columns.is_empty() {
491 return Err(DtransformError::InvalidOperation(
492 "No columns selected".to_string(),
493 ));
494 }
495
496 let mut result = df.select(&selected_columns)?;
497
498 for (i, alias_opt) in aliases.iter().enumerate() {
500 if let Some(alias) = alias_opt {
501 let old_name = result.get_column_names()[i].to_string();
502 result.rename(&old_name, PlSmallStr::from(alias.as_str()))?;
503 }
504 }
505
506 Ok(result)
507 }
508
509 fn resolve_selector(
510 &self,
511 selector: &ColumnSelector,
512 schema: &Schema,
513 df: &DataFrame,
514 ) -> Result<Vec<String>> {
515 match selector {
516 ColumnSelector::Name(name) => {
517 if schema.contains(name) {
518 Ok(vec![name.clone()])
519 } else {
520 Err(DtransformError::ColumnNotFound(name.clone()))
521 }
522 }
523
524 ColumnSelector::Index(idx) => {
525 let name = schema
526 .get_at_index(*idx)
527 .ok_or_else(|| {
528 DtransformError::InvalidOperation(format!("Column index {} out of bounds", idx))
529 })?
530 .0
531 .clone();
532 Ok(vec![name.as_str().to_string()])
533 }
534
535 ColumnSelector::Range(start, end) => {
536 let names: Vec<String> = schema
537 .iter()
538 .enumerate()
539 .filter(|(i, _)| i >= start && i <= end)
540 .map(|(_, (name, _))| name.as_str().to_string())
541 .collect();
542
543 if names.is_empty() {
544 return Err(DtransformError::InvalidOperation(
545 format!("Range ${}..${} is out of bounds or invalid", start + 1, end + 1)
546 ));
547 }
548
549 Ok(names)
550 }
551
552 ColumnSelector::Regex(pattern) => {
553 let re = Regex::new(pattern)?;
554 let names: Vec<String> = schema
555 .iter()
556 .filter(|(name, _)| re.is_match(name.as_str()))
557 .map(|(name, _)| name.as_str().to_string())
558 .collect();
559 Ok(names)
560 }
561
562 ColumnSelector::Type(dtypes) => {
563 let names: Vec<String> = schema
564 .iter()
565 .filter(|(_, field)| {
566 dtypes.iter().any(|dt| self.matches_dtype(dt, field))
567 })
568 .map(|(name, _)| name.as_str().to_string())
569 .collect();
570 Ok(names)
571 }
572
573 ColumnSelector::All => Ok(schema.iter().map(|(name, _)| name.as_str().to_string()).collect()),
574
575 ColumnSelector::Except(inner) => {
576 let all_cols: Vec<String> = schema.iter().map(|(name, _)| name.as_str().to_string()).collect();
577 let excluded = self.resolve_selector(inner, schema, df)?;
578 Ok(all_cols
579 .into_iter()
580 .filter(|col| !excluded.contains(col))
581 .collect())
582 }
583
584 ColumnSelector::And(left, right) => {
585 let left_cols = self.resolve_selector(left, schema, df)?;
586 let right_cols = self.resolve_selector(right, schema, df)?;
587 Ok(left_cols
588 .into_iter()
589 .filter(|col| right_cols.contains(col))
590 .collect())
591 }
592 }
593 }
594
595 fn matches_dtype(&self, dt: &crate::parser::ast::DataType, polars_dt: &polars::datatypes::DataType) -> bool {
596 use polars::datatypes::DataType as PDT;
597 use crate::parser::ast::DataType as AstDT;
598 match dt {
599 AstDT::Number => matches!(
600 polars_dt,
601 PDT::Int8
602 | PDT::Int16
603 | PDT::Int32
604 | PDT::Int64
605 | PDT::UInt8
606 | PDT::UInt16
607 | PDT::UInt32
608 | PDT::UInt64
609 | PDT::Float32
610 | PDT::Float64
611 ),
612 AstDT::String => matches!(polars_dt, PDT::String),
613 AstDT::Boolean => matches!(polars_dt, PDT::Boolean),
614 AstDT::Date => matches!(polars_dt, PDT::Date),
615 AstDT::DateTime => matches!(polars_dt, PDT::Datetime(_, _)),
616 }
617 }
618
619 fn execute_filter(&self, df: DataFrame, op: FilterOp) -> Result<DataFrame> {
620 let mask = self.evaluate_expression(&op.condition, &df)?;
621 let mask_bool = mask.bool()?;
622 Ok(df.filter(mask_bool)?)
623 }
624
625 fn execute_mutate(&self, mut df: DataFrame, op: MutateOp) -> Result<DataFrame> {
626 for assignment in op.assignments {
627 let series = self.evaluate_expression(&assignment.expression, &df)?;
628
629 let col_name = match &assignment.column {
631 AssignmentTarget::Name(name) => name.clone(),
632 AssignmentTarget::Position(pos) => {
633 let col_names = df.get_column_names();
634 if *pos == 0 || *pos > col_names.len() {
635 return Err(DtransformError::InvalidOperation(format!(
636 "DataFrame has {} columns, but ${} was specified",
637 col_names.len(), pos
638 )));
639 }
640 col_names[pos - 1].to_string()
641 }
642 };
643
644 let renamed_series = series.with_name(PlSmallStr::from(col_name.as_str()));
645 let _ = df.with_column(renamed_series)?;
646 }
647
648 Ok(df)
649 }
650
651 fn execute_rename(&self, df: DataFrame, op: RenameOp) -> Result<DataFrame> {
652 let mut result = df;
653 for (col_ref, new_name) in op.mappings {
654 let old_name = self.resolve_column_name(&col_ref, &result)?;
655 result.rename(&old_name, PlSmallStr::from(new_name.as_str()))?;
656 }
657 Ok(result)
658 }
659
660 fn execute_rename_all(&self, mut df: DataFrame, op: RenameAllOp) -> Result<DataFrame> {
661 match &op.strategy {
662 RenameStrategy::Replace { old, new } => {
663 let old_names: Vec<String> = df
664 .get_column_names()
665 .iter()
666 .map(|s| s.as_str().to_string())
667 .collect();
668
669 for old_name in old_names {
670 let new_name = old_name.replace(old, new);
671 df.rename(&old_name, PlSmallStr::from(new_name.as_str()))?;
672 }
673
674 Ok(df)
675 }
676 RenameStrategy::Sequential { prefix, start, end } => {
677 let num_cols = df.width();
678 let range_size = end - start + 1;
679
680 if range_size != num_cols {
681 return Err(DtransformError::InvalidOperation(format!(
682 "Range {}..{} ({} columns) doesn't match table width ({} columns). Use select() first.",
683 start, end, range_size, num_cols
684 )));
685 }
686
687 let old_names: Vec<String> = df
688 .get_column_names()
689 .iter()
690 .map(|s| s.as_str().to_string())
691 .collect();
692
693 for (i, old_name) in old_names.iter().enumerate() {
694 let new_name = format!("{}{}", prefix, start + i);
695 df.rename(old_name, PlSmallStr::from(new_name.as_str()))?;
696 }
697
698 Ok(df)
699 }
700 }
701 }
702
703 fn execute_sort(&self, df: DataFrame, op: SortOp) -> Result<DataFrame> {
704 let col_names: Vec<String> = op
705 .columns
706 .iter()
707 .map(|(col_ref, _)| self.resolve_column_name(col_ref, &df))
708 .collect::<Result<Vec<_>>>()?;
709
710 let descending: Vec<bool> = op
711 .columns
712 .iter()
713 .map(|(_, desc)| *desc)
714 .collect();
715
716 Ok(df.sort(col_names, SortMultipleOptions::default().with_order_descending_multi(descending))?)
717 }
718
719 fn execute_take(&self, df: DataFrame, op: TakeOp) -> Result<DataFrame> {
720 Ok(df.head(Some(op.n)))
721 }
722
723 fn execute_skip(&self, df: DataFrame, op: SkipOp) -> Result<DataFrame> {
724 let height = df.height();
725 if op.n >= height {
726 Ok(df.head(Some(0)))
727 } else {
728 Ok(df.slice(op.n as i64, height - op.n))
729 }
730 }
731
732 fn execute_slice(&self, df: DataFrame, op: SliceOp) -> Result<DataFrame> {
733 let start = op.start.min(df.height());
734 let len = (op.end.saturating_sub(start)).min(df.height() - start);
735 Ok(df.slice(start as i64, len))
736 }
737
738 fn execute_drop(&self, df: DataFrame, op: DropOp) -> Result<DataFrame> {
739 let schema = df.schema();
740 let mut columns_to_drop: Vec<String> = Vec::new();
741
742 for selector in op.columns {
744 let names = self.resolve_selector(&selector, &schema, &df)?;
745 columns_to_drop.extend(names);
746 }
747
748 let mut result = df;
750 for col_name in columns_to_drop {
751 result = result.drop(&col_name)?;
752 }
753 Ok(result)
754 }
755
756 fn execute_distinct(&self, df: DataFrame, op: DistinctOp) -> Result<DataFrame> {
757 use polars::prelude::UniqueKeepStrategy;
758
759 match op.columns {
760 None => {
762 df.unique::<Vec<String>, String>(None, UniqueKeepStrategy::First, None)
763 .map_err(DtransformError::from)
764 }
765
766 Some(ref selectors) => {
768 let schema = df.schema();
770 let mut column_names: Vec<String> = Vec::new();
771
772 for selector in selectors {
773 let names = self.resolve_selector(selector, &schema, &df)?;
774 column_names.extend(names);
775 }
776
777 df.unique::<Vec<String>, String>(
779 Some(&column_names),
780 UniqueKeepStrategy::First,
781 None
782 ).map_err(DtransformError::from)
783 }
784 }
785 }
786
787 fn resolve_column_name(&self, col_ref: &ColumnRef, df: &DataFrame) -> Result<String> {
788 match col_ref {
789 ColumnRef::Name(name) => Ok(name.clone()),
790 ColumnRef::Index(idx) => {
791 let col_names = df.get_column_names();
792 if *idx < col_names.len() {
793 Ok(col_names[*idx].to_string())
794 } else {
795 Err(DtransformError::InvalidOperation(format!(
796 "Column index {} out of bounds (table has {} columns)",
797 idx, col_names.len()
798 )))
799 }
800 }
801 ColumnRef::Position(pos) => {
802 if *pos == 0 {
804 return Err(DtransformError::InvalidOperation(
805 "Positional columns start at $1, not $0".to_string()
806 ));
807 }
808 let zero_based_idx = pos - 1;
809 let col_names = df.get_column_names();
810 if zero_based_idx < col_names.len() {
811 Ok(col_names[zero_based_idx].to_string())
812 } else {
813 Err(DtransformError::InvalidOperation(format!(
814 "Column ${} out of bounds (table has {} columns)",
815 pos, col_names.len()
816 )))
817 }
818 }
819 }
820 }
821
822 fn evaluate_expression(&self, expr: &Expression, df: &DataFrame) -> Result<Series> {
823 match expr {
824 Expression::Literal(lit) => self.literal_to_series(lit, df.height()),
825
826 Expression::List(literals) => {
827 use crate::parser::ast::Literal as AstLiteral;
828 if literals.is_empty() {
831 return Ok(Series::new_empty(PlSmallStr::from("list"), &polars::datatypes::DataType::Null));
832 }
833 match &literals[0] {
835 AstLiteral::Number(_) => {
836 let values: Vec<f64> = literals.iter().map(|lit| {
837 match lit {
838 AstLiteral::Number(n) => *n,
839 _ => 0.0, }
841 }).collect();
842 Ok(Series::new(PlSmallStr::from("list"), values))
843 }
844 AstLiteral::String(_) => {
845 let values: Vec<String> = literals.iter().map(|lit| {
846 match lit {
847 AstLiteral::String(s) => s.clone(),
848 _ => String::new(),
849 }
850 }).collect();
851 Ok(Series::new(PlSmallStr::from("list"), values))
852 }
853 AstLiteral::Boolean(_) => {
854 let values: Vec<bool> = literals.iter().map(|lit| {
855 match lit {
856 AstLiteral::Boolean(b) => *b,
857 _ => false,
858 }
859 }).collect();
860 Ok(Series::new(PlSmallStr::from("list"), values))
861 }
862 AstLiteral::Null => {
863 Ok(Series::new_null(PlSmallStr::from("list"), literals.len()))
864 }
865 }
866 }
867
868 Expression::Column(col_ref) => {
869 if let ColumnRef::Name(name) = col_ref {
871 if let Some(var_df) = self.variables.get(name) {
872 let col = var_df.get_columns().first()
874 .ok_or_else(|| DtransformError::InvalidOperation(
875 format!("Variable '{}' has no columns", name)
876 ))?;
877 return Ok(col.as_materialized_series().clone());
878 }
879 }
880
881 let col_name = self.resolve_column_name(col_ref, df)?;
883 df.column(&col_name)
884 .map(|col| col.as_materialized_series().clone())
885 .map_err(|e| DtransformError::PolarsError(e))
886 }
887
888 Expression::Variable(var_name) => {
889 let var_df = self.variables.get(var_name)
891 .ok_or_else(|| DtransformError::VariableNotFound(var_name.clone()))?;
892 let col = var_df.get_columns().first()
893 .ok_or_else(|| DtransformError::InvalidOperation(
894 format!("Variable '{}' has no columns", var_name)
895 ))?;
896 Ok(col.as_materialized_series().clone())
897 }
898
899 Expression::BinaryOp { left, op, right } => {
900 let left_series = self.evaluate_expression(left, df)?;
901 let right_series = self.evaluate_expression(right, df)?;
902 self.apply_binary_op(&left_series, op, &right_series, df)
903 }
904
905 Expression::MethodCall { object, method, args } => {
906 let obj_series = self.evaluate_expression(object, df)?;
907 self.apply_method(&obj_series, method, args, df)
908 }
909
910 Expression::Split { string, delimiter, index } => {
911 let string_series = self.evaluate_expression(string, df)?;
913 let delimiter_series = self.evaluate_expression(delimiter, df)?;
914
915 let delim = match delimiter_series.dtype() {
917 polars::datatypes::DataType::String => {
918 delimiter_series.str()
919 .map_err(|_| DtransformError::InvalidOperation("Delimiter must be a string".to_string()))?
920 .get(0)
921 .ok_or_else(|| DtransformError::InvalidOperation("Delimiter is null".to_string()))?
922 .to_string()
923 }
924 _ => return Err(DtransformError::InvalidOperation("Delimiter must be a string".to_string())),
925 };
926
927 let string_ca = string_series.str()
929 .map_err(|_| DtransformError::InvalidOperation("Split can only be applied to string columns".to_string()))?;
930
931 let result: Vec<Option<String>> = string_ca.into_iter().map(|opt_str| {
933 opt_str.and_then(|s| {
934 let parts: Vec<&str> = s.split(&delim).collect();
935 parts.get(*index).map(|&part| part.to_string())
937 })
938 }).collect();
939
940 Ok(Series::new(PlSmallStr::from("split"), result))
941 }
942
943 Expression::Lookup { table, key, on, return_field } => {
944 use crate::parser::ast::LookupField;
945
946 let lookup_df = self.variables.get(table)
948 .ok_or_else(|| DtransformError::VariableNotFound(table.clone()))?;
949
950 let on_col_name = match on {
952 LookupField::Name(name) => name.clone(),
953 LookupField::Position(pos) => {
954 let schema = lookup_df.schema();
955 let col_names: Vec<_> = schema.iter_names().collect();
956 if *pos == 0 || *pos > col_names.len() {
957 return Err(DtransformError::InvalidOperation(format!(
958 "Lookup table '{}' has {} columns, but on=${} was specified",
959 table, col_names.len(), pos
960 )));
961 }
962 col_names[pos - 1].to_string()
963 }
964 };
965
966 let return_col_name = match return_field {
968 LookupField::Name(name) => name.clone(),
969 LookupField::Position(pos) => {
970 let schema = lookup_df.schema();
971 let col_names: Vec<_> = schema.iter_names().collect();
972 if *pos == 0 || *pos > col_names.len() {
973 return Err(DtransformError::InvalidOperation(format!(
974 "Lookup table '{}' has {} columns, but return=${} was specified",
975 table, col_names.len(), pos
976 )));
977 }
978 col_names[pos - 1].to_string()
979 }
980 };
981
982 if !lookup_df.schema().contains(&on_col_name) {
984 return Err(DtransformError::ColumnNotFound(format!(
985 "Lookup table '{}' does not have column '{}' (specified in on=)",
986 table, on_col_name
987 )));
988 }
989 if !lookup_df.schema().contains(&return_col_name) {
990 return Err(DtransformError::ColumnNotFound(format!(
991 "Lookup table '{}' does not have column '{}' (specified in return=)",
992 table, return_col_name
993 )));
994 }
995
996 let lookup_key_col = lookup_df.column(&on_col_name)
998 .map_err(|e| DtransformError::PolarsError(e))?
999 .as_materialized_series();
1000
1001 let lookup_value_col = lookup_df.column(&return_col_name)
1003 .map_err(|e| DtransformError::PolarsError(e))?
1004 .as_materialized_series();
1005
1006 let key_series = self.evaluate_expression(key, df)?;
1008
1009 use std::collections::HashMap;
1011 use polars::datatypes::DataType;
1012
1013 match (lookup_key_col.dtype(), lookup_value_col.dtype()) {
1014 (DataType::String, DataType::String) => {
1015 let lookup_keys = lookup_key_col.str()
1016 .map_err(|_| DtransformError::TypeMismatch {
1017 expected: "String".to_string(),
1018 got: format!("{:?}", lookup_key_col.dtype()),
1019 })?;
1020 let lookup_values = lookup_value_col.str()
1021 .map_err(|_| DtransformError::TypeMismatch {
1022 expected: "String".to_string(),
1023 got: format!("{:?}", lookup_value_col.dtype()),
1024 })?;
1025
1026 let mut map: HashMap<String, String> = HashMap::new();
1028 for i in 0..lookup_df.height() {
1029 if let (Some(k), Some(v)) = (lookup_keys.get(i), lookup_values.get(i)) {
1030 map.insert(k.to_string(), v.to_string());
1031 }
1032 }
1033
1034 let input_keys = key_series.str()
1036 .map_err(|_| DtransformError::TypeMismatch {
1037 expected: "String".to_string(),
1038 got: format!("{:?}", key_series.dtype()),
1039 })?;
1040
1041 let result: Vec<Option<String>> = input_keys.into_iter()
1042 .map(|opt_key| {
1043 opt_key.and_then(|k| map.get(k).cloned())
1044 })
1045 .collect();
1046
1047 Ok(Series::new(PlSmallStr::from(return_col_name.as_str()), result))
1048 }
1049 (DataType::String, value_dtype) if matches!(
1050 value_dtype,
1051 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 |
1052 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
1053 DataType::Float32 | DataType::Float64
1054 ) => {
1055 let lookup_keys = lookup_key_col.str()
1056 .map_err(|_| DtransformError::TypeMismatch {
1057 expected: "String".to_string(),
1058 got: format!("{:?}", lookup_key_col.dtype()),
1059 })?;
1060
1061 let lookup_values_f64 = lookup_value_col.cast(&DataType::Float64)
1063 .map_err(|e| DtransformError::PolarsError(e))?;
1064 let lookup_values = lookup_values_f64.f64()
1065 .map_err(|_| DtransformError::InvalidOperation("Failed to cast to Float64".to_string()))?;
1066
1067 let mut map: HashMap<String, f64> = HashMap::new();
1069 for i in 0..lookup_df.height() {
1070 if let (Some(k), Some(v)) = (lookup_keys.get(i), lookup_values.get(i)) {
1071 map.insert(k.to_string(), v);
1072 }
1073 }
1074
1075 let input_keys = key_series.str()
1077 .map_err(|_| DtransformError::TypeMismatch {
1078 expected: "String".to_string(),
1079 got: format!("{:?}", key_series.dtype()),
1080 })?;
1081
1082 let result: Vec<Option<f64>> = input_keys.into_iter()
1083 .map(|opt_key| {
1084 opt_key.and_then(|k| map.get(k).copied())
1085 })
1086 .collect();
1087
1088 Ok(Series::new(PlSmallStr::from(return_col_name.as_str()), result))
1089 }
1090 _ => {
1091 Err(DtransformError::InvalidOperation(
1094 format!(
1095 "Unsupported lookup type combination: key={:?}, value={:?}",
1096 lookup_key_col.dtype(),
1097 lookup_value_col.dtype()
1098 )
1099 ))
1100 }
1101 }
1102 }
1103
1104 Expression::Replace { text, old, new } => {
1105 let text_series = self.evaluate_expression(text, df)?;
1107 let new_series = self.evaluate_expression(new, df)?;
1108
1109 let text_ca = text_series.str()
1111 .map_err(|_| DtransformError::InvalidOperation(
1112 "replace() can only be applied to string columns".to_string()
1113 ))?;
1114
1115 use polars::datatypes::DataType;
1117 let new_str = match new_series.dtype() {
1118 DataType::String => {
1119 new_series.str()
1120 .map_err(|_| DtransformError::InvalidOperation("Replacement text must be a string".to_string()))?
1121 .get(0)
1122 .ok_or_else(|| DtransformError::InvalidOperation("Replacement text is null".to_string()))?
1123 .to_string()
1124 }
1125 _ => return Err(DtransformError::InvalidOperation("Replacement text must be a string".to_string())),
1126 };
1127
1128 match old.as_ref() {
1130 Expression::Regex(pattern) => {
1131 let re = Regex::new(pattern)
1133 .map_err(|e| DtransformError::InvalidOperation(
1134 format!("Invalid regex pattern '{}': {}", pattern, e)
1135 ))?;
1136
1137 let result: Vec<Option<String>> = text_ca.into_iter().map(|opt_str| {
1138 opt_str.map(|s| re.replace_all(s, &new_str).to_string())
1139 }).collect();
1140
1141 Ok(Series::new(PlSmallStr::from("replace"), result))
1142 }
1143 _ => {
1144 let old_series = self.evaluate_expression(old, df)?;
1146 let old_str = match old_series.dtype() {
1147 DataType::String => {
1148 old_series.str()
1149 .map_err(|_| DtransformError::InvalidOperation("Pattern must be a string".to_string()))?
1150 .get(0)
1151 .ok_or_else(|| DtransformError::InvalidOperation("Pattern is null".to_string()))?
1152 .to_string()
1153 }
1154 _ => return Err(DtransformError::InvalidOperation("Pattern must be a string".to_string())),
1155 };
1156
1157 let result: Vec<Option<String>> = text_ca.into_iter().map(|opt_str| {
1158 opt_str.map(|s| s.replace(&old_str, &new_str))
1159 }).collect();
1160
1161 Ok(Series::new(PlSmallStr::from("replace"), result))
1162 }
1163 }
1164 }
1165
1166 Expression::Regex(pattern) => {
1167 Err(DtransformError::InvalidOperation(
1169 format!("Regex pattern '{}' cannot be used directly. Use it with replace() function.", pattern)
1170 ))
1171 }
1172 }
1173 }
1174
1175 fn literal_to_series(&self, lit: &crate::parser::ast::Literal, len: usize) -> Result<Series> {
1176 use crate::parser::ast::Literal as Lit;
1177 match lit {
1178 Lit::Number(n) => Ok(Series::new(PlSmallStr::from("literal"), vec![*n; len])),
1179 Lit::String(s) => Ok(Series::new(PlSmallStr::from("literal"), vec![s.as_str(); len])),
1180 Lit::Boolean(b) => Ok(Series::new(PlSmallStr::from("literal"), vec![*b; len])),
1181 Lit::Null => Ok(Series::new_null(PlSmallStr::from("literal"), len)),
1182 }
1183 }
1184
1185 fn apply_binary_op(&self, left: &Series, op: &BinOp, right: &Series, _df: &DataFrame) -> Result<Series> {
1186 use polars::datatypes::DataType;
1187
1188 let result = match op {
1189 BinOp::Add => {
1190 match (left.dtype(), right.dtype()) {
1192 (DataType::String, DataType::String) => {
1193 let left_str = left.str().map_err(|_| DtransformError::TypeMismatch {
1194 expected: "String".to_string(),
1195 got: format!("{:?}", left.dtype()),
1196 })?;
1197 let right_str = right.str().map_err(|_| DtransformError::TypeMismatch {
1198 expected: "String".to_string(),
1199 got: format!("{:?}", right.dtype()),
1200 })?;
1201
1202 let result: Vec<Option<String>> = left_str.into_iter()
1204 .zip(right_str.into_iter())
1205 .map(|(l, r)| {
1206 match (l, r) {
1207 (Some(ls), Some(rs)) => Some(format!("{}{}", ls, rs)),
1208 _ => None,
1209 }
1210 })
1211 .collect();
1212
1213 Series::new(PlSmallStr::from("concat"), result)
1214 }
1215 _ => (left + right)?,
1217 }
1218 }
1219 BinOp::Sub => (left - right)?,
1220 BinOp::Mul => (left * right)?,
1221 BinOp::Div => (left / right)?,
1222 BinOp::Gt => left.gt(right)?.into_series(),
1223 BinOp::Lt => left.lt(right)?.into_series(),
1224 BinOp::Gte => left.gt_eq(right)?.into_series(),
1225 BinOp::Lte => left.lt_eq(right)?.into_series(),
1226 BinOp::Eq => left.equal(right)?.into_series(),
1227 BinOp::Neq => left.not_equal(right)?.into_series(),
1228 BinOp::And => {
1229 let left_bool = left.bool()?;
1230 let right_bool = right.bool()?;
1231 (left_bool & right_bool).into_series()
1232 }
1233 BinOp::Or => {
1234 let left_bool = left.bool()?;
1235 let right_bool = right.bool()?;
1236 (left_bool | right_bool).into_series()
1237 }
1238 BinOp::In => {
1239 use std::collections::HashSet;
1244 use polars::datatypes::DataType;
1245
1246 match left.dtype() {
1247 DataType::String => {
1248 let left_str = left.str()?;
1249 let right_str = right.str()?;
1250
1251 let right_set: HashSet<Option<&str>> = right_str.into_iter().collect();
1253
1254 let mask: BooleanChunked = left_str
1256 .into_iter()
1257 .map(|val| right_set.contains(&val))
1258 .collect();
1259
1260 mask.into_series()
1261 }
1262 DataType::Int64 | DataType::Int32 | DataType::Float64 | DataType::Float32 => {
1263 let left_f64 = left.cast(&DataType::Float64)?;
1265 let right_f64 = right.cast(&DataType::Float64)?;
1266
1267 let left_num = left_f64.f64()?;
1268 let right_num = right_f64.f64()?;
1269
1270 let right_values: Vec<Option<f64>> = right_num.into_iter().collect();
1272
1273 let mask: BooleanChunked = left_num
1275 .into_iter()
1276 .map(|left_val| {
1277 right_values.iter().any(|right_val| {
1278 match (left_val, right_val) {
1279 (Some(l), Some(r)) => (l - r).abs() < f64::EPSILON,
1280 (None, None) => true,
1281 _ => false,
1282 }
1283 })
1284 })
1285 .collect();
1286
1287 mask.into_series()
1288 }
1289 _ => {
1290 return Err(DtransformError::TypeMismatch {
1291 expected: "String or Number".to_string(),
1292 got: format!("{:?}", left.dtype()),
1293 });
1294 }
1295 }
1296 }
1297 };
1298 Ok(result)
1299 }
1300
1301 fn apply_method(&self, _obj: &Series, method: &str, _args: &[Expression], _df: &DataFrame) -> Result<Series> {
1302 Err(DtransformError::InvalidOperation(format!(
1305 "Method '{}' is not supported. Use function-based operations instead.\n\
1306 Example: mutate(clean = replace(text, 'old', 'new'))",
1307 method
1308 )))
1309 }
1310
1311 pub fn get_variable(&self, name: &str) -> Option<&DataFrame> {
1312 self.variables.get(name)
1313 }
1314
1315 pub fn set_variable(&mut self, name: String, df: DataFrame) {
1316 self.variables.insert(name, df);
1317 }
1318
1319 pub fn remove_variable(&mut self, name: &str) {
1320 self.variables.remove(name);
1321 }
1322
1323 pub fn list_variables(&self) -> Vec<String> {
1324 self.variables.keys().cloned().collect()
1325 }
1326
1327 pub fn get_all_variables(&self) -> HashMap<String, DataFrame> {
1328 self.variables.clone()
1329 }
1330
1331 pub fn restore_variables(&mut self, snapshot: HashMap<String, DataFrame>) {
1332 self.variables = snapshot;
1333 }
1334}