1use dsq_functions::BuiltinRegistry;
14use dsq_parser::{BinaryOperator, Expr, FilterParser, Literal, ObjectEntry, UnaryOperator};
15use dsq_shared::ops::*;
16use dsq_shared::value::{is_truthy, Value};
17use dsq_shared::Result;
18use polars::prelude::*;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22fn value_from_any_value(any_val: AnyValue) -> Result<Value> {
24 match any_val {
25 AnyValue::Null => Ok(Value::Null),
26 AnyValue::Boolean(b) => Ok(Value::Bool(b)),
27 AnyValue::Int8(i) => Ok(Value::Int(i as i64)),
28 AnyValue::Int16(i) => Ok(Value::Int(i as i64)),
29 AnyValue::Int32(i) => Ok(Value::Int(i as i64)),
30 AnyValue::Int64(i) => Ok(Value::Int(i)),
31 AnyValue::UInt8(i) => Ok(Value::Int(i as i64)),
32 AnyValue::UInt16(i) => Ok(Value::Int(i as i64)),
33 AnyValue::UInt32(i) => Ok(Value::Int(i as i64)),
34 AnyValue::UInt64(i) => Ok(Value::Int(i as i64)),
35 AnyValue::Float32(f) => Ok(Value::Float(f as f64)),
36 AnyValue::Float64(f) => Ok(Value::Float(f)),
37 AnyValue::String(s) => Ok(Value::String(s.to_string())),
38 _ => Ok(Value::String(any_val.to_string())),
39 }
40}
41
42fn values_to_series(name: &str, values: &[Value]) -> Result<Series> {
44 if values.is_empty() {
45 return Ok(Series::new_empty(name.into(), &DataType::Null));
46 }
47
48 let dtype = values
50 .iter()
51 .find(|v| !matches!(v, Value::Null))
52 .map(|v| match v {
53 Value::Bool(_) => DataType::Boolean,
54 Value::Int(_) => DataType::Int64,
55 Value::Float(_) => DataType::Float64,
56 Value::String(_) => DataType::String,
57 _ => DataType::Null,
58 })
59 .unwrap_or(DataType::Null);
60
61 match dtype {
62 DataType::Boolean => {
63 let vec: Vec<Option<bool>> = values
64 .iter()
65 .map(|v| match v {
66 Value::Bool(b) => Some(*b),
67 Value::Null => None,
68 _ => None,
69 })
70 .collect();
71 Ok(Series::new(name.into(), vec))
72 }
73 DataType::Int64 => {
74 let vec: Vec<Option<i64>> = values
75 .iter()
76 .map(|v| match v {
77 Value::Int(i) => Some(*i),
78 Value::Null => None,
79 _ => None,
80 })
81 .collect();
82 Ok(Series::new(name.into(), vec))
83 }
84 DataType::Float64 => {
85 let vec: Vec<Option<f64>> = values
86 .iter()
87 .map(|v| match v {
88 Value::Float(f) => Some(*f),
89 Value::Int(i) => Some(*i as f64),
90 Value::Null => None,
91 _ => None,
92 })
93 .collect();
94 Ok(Series::new(name.into(), vec))
95 }
96 DataType::String => {
97 let vec: Vec<Option<&str>> = values
98 .iter()
99 .map(|v| match v {
100 Value::String(s) => Some(s.as_str()),
101 Value::Null => None,
102 _ => None,
103 })
104 .collect();
105 Ok(Series::new(name.into(), vec))
106 }
107 _ => Ok(Series::new_null(name.into(), values.len())),
108 }
109}
110
111#[allow(dead_code)]
113fn compare_values_for_sorting(a: &Value, b: &Value) -> std::cmp::Ordering {
114 match (a, b) {
115 (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
116 (Value::Null, _) => std::cmp::Ordering::Less,
117 (_, Value::Null) => std::cmp::Ordering::Greater,
118 (Value::Bool(a_val), Value::Bool(b_val)) => a_val.cmp(b_val),
119 (Value::Int(a_val), Value::Int(b_val)) => a_val.cmp(b_val),
120 (Value::Float(a_val), Value::Float(b_val)) => a_val
121 .partial_cmp(b_val)
122 .unwrap_or(std::cmp::Ordering::Equal),
123 (Value::String(a_val), Value::String(b_val)) => a_val.cmp(b_val),
124 (Value::Int(a_val), Value::Float(b_val)) => (*a_val as f64)
125 .partial_cmp(b_val)
126 .unwrap_or(std::cmp::Ordering::Equal),
127 (Value::Float(a_val), Value::Int(b_val)) => a_val
128 .partial_cmp(&(*b_val as f64))
129 .unwrap_or(std::cmp::Ordering::Equal),
130 (Value::Array(a_arr), Value::Array(b_arr)) => {
131 let len_cmp = a_arr.len().cmp(&b_arr.len());
132 if len_cmp != std::cmp::Ordering::Equal {
133 len_cmp
134 } else {
135 for (a_item, b_item) in a_arr.iter().zip(b_arr.iter()) {
136 let item_cmp = compare_values_for_sorting(a_item, b_item);
137 if item_cmp != std::cmp::Ordering::Equal {
138 return item_cmp;
139 }
140 }
141 std::cmp::Ordering::Equal
142 }
143 }
144 (Value::Object(a_obj), Value::Object(b_obj)) => {
145 let a_keys: Vec<&String> = a_obj.keys().collect();
146 let b_keys: Vec<&String> = b_obj.keys().collect();
147 let keys_cmp = a_keys.len().cmp(&b_keys.len());
148 if keys_cmp != std::cmp::Ordering::Equal {
149 keys_cmp
150 } else {
151 for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) {
152 let key_cmp = a_key.cmp(b_key);
153 if key_cmp != std::cmp::Ordering::Equal {
154 return key_cmp;
155 }
156 let a_val = a_obj.get(*a_key).unwrap();
157 let b_val = b_obj.get(*b_key).unwrap();
158 let val_cmp = compare_values_for_sorting(a_val, b_val);
159 if val_cmp != std::cmp::Ordering::Equal {
160 return val_cmp;
161 }
162 }
163 std::cmp::Ordering::Equal
164 }
165 }
166 (Value::Bool(_), Value::Int(_)) => std::cmp::Ordering::Less,
168 (Value::Bool(_), Value::Float(_)) => std::cmp::Ordering::Less,
169 (Value::Bool(_), Value::String(_)) => std::cmp::Ordering::Less,
170 (Value::Bool(_), Value::Array(_)) => std::cmp::Ordering::Less,
171 (Value::Bool(_), Value::Object(_)) => std::cmp::Ordering::Less,
172 (Value::Int(_), Value::Bool(_)) => std::cmp::Ordering::Greater,
173 (Value::Int(_), Value::String(_)) => std::cmp::Ordering::Less,
174 (Value::Int(_), Value::Array(_)) => std::cmp::Ordering::Less,
175 (Value::Int(_), Value::Object(_)) => std::cmp::Ordering::Less,
176 (Value::Float(_), Value::Bool(_)) => std::cmp::Ordering::Greater,
177 (Value::Float(_), Value::String(_)) => std::cmp::Ordering::Less,
178 (Value::Float(_), Value::Array(_)) => std::cmp::Ordering::Less,
179 (Value::Float(_), Value::Object(_)) => std::cmp::Ordering::Less,
180 (Value::String(_), Value::Bool(_)) => std::cmp::Ordering::Greater,
181 (Value::String(_), Value::Int(_)) => std::cmp::Ordering::Greater,
182 (Value::String(_), Value::Float(_)) => std::cmp::Ordering::Greater,
183 (Value::String(_), Value::Array(_)) => std::cmp::Ordering::Less,
184 (Value::String(_), Value::Object(_)) => std::cmp::Ordering::Less,
185 (Value::Array(_), Value::Bool(_)) => std::cmp::Ordering::Greater,
186 (Value::Array(_), Value::Int(_)) => std::cmp::Ordering::Greater,
187 (Value::Array(_), Value::Float(_)) => std::cmp::Ordering::Greater,
188 (Value::Array(_), Value::String(_)) => std::cmp::Ordering::Greater,
189 (Value::Array(_), Value::Object(_)) => std::cmp::Ordering::Less,
190 (Value::Object(_), Value::Bool(_)) => std::cmp::Ordering::Greater,
191 (Value::Object(_), Value::Int(_)) => std::cmp::Ordering::Greater,
192 (Value::Object(_), Value::Float(_)) => std::cmp::Ordering::Greater,
193 (Value::Object(_), Value::String(_)) => std::cmp::Ordering::Greater,
194 (Value::Object(_), Value::Array(_)) => std::cmp::Ordering::Greater,
195 _ => std::cmp::Ordering::Equal, }
197}
198
199pub struct FilterCompiler {
201 builtins: Arc<BuiltinRegistry>,
203 optimization_level: OptimizationLevel,
205 dataframe_optimizations: bool,
207 max_recursion_depth: usize,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213pub enum OptimizationLevel {
214 None,
216 Basic,
218 Advanced,
220}
221
222pub struct CompiledFilter {
224 pub operations: Vec<Box<dyn Operation + Send + Sync>>,
226 pub variables: HashMap<String, Value>,
228 pub functions: HashMap<String, FunctionDef>,
230 pub requires_lazy: bool,
232 pub complexity: usize,
234}
235
236#[derive(Debug, Clone)]
243pub struct CompilationContext {
244 pub depth: usize,
246 pub max_depth: usize,
248 pub variables: HashMap<String, Value>,
250 pub functions: HashMap<String, FunctionDef>,
252}
253
254impl Default for CompilationContext {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260impl CompilationContext {
261 pub fn new() -> Self {
263 Self {
264 depth: 0,
265 max_depth: 1000,
266 variables: HashMap::new(),
267 functions: HashMap::new(),
268 }
269 }
270
271 pub fn with_max_depth(max_depth: usize) -> Self {
273 Self {
274 depth: 0,
275 max_depth,
276 variables: HashMap::new(),
277 functions: HashMap::new(),
278 }
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct FunctionDef {
285 pub name: String,
287
288 pub parameters: Vec<String>,
290
291 pub body: FunctionBody,
293
294 pub is_recursive: bool,
296}
297
298pub enum FunctionBody {
300 Compiled(Vec<Box<dyn Operation + Send + Sync>>),
302
303 Ast(String), Builtin(BuiltinFunction),
308}
309
310impl Clone for FunctionBody {
311 fn clone(&self) -> Self {
312 match self {
313 FunctionBody::Compiled(_) => panic!("Cannot clone Compiled FunctionBody"),
314 FunctionBody::Ast(s) => FunctionBody::Ast(s.clone()),
315 FunctionBody::Builtin(f) => FunctionBody::Builtin(f.clone()),
316 }
317 }
318}
319
320impl std::fmt::Debug for FunctionBody {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 match self {
323 FunctionBody::Compiled(ops) => write!(f, "Compiled({} operations)", ops.len()),
324 FunctionBody::Ast(ast) => write!(f, "Ast({ast})"),
325 FunctionBody::Builtin(_) => write!(f, "Builtin"),
326 }
327 }
328}
329
330pub type BuiltinFunction = dsq_functions::BuiltinFunction;
332
333#[derive(Debug, Clone)]
335pub struct FilterContext {
336 variables: HashMap<String, Value>,
338
339 functions: HashMap<String, FunctionDef>,
341
342 builtins: Arc<BuiltinRegistry>,
344
345 call_stack: Vec<StackFrame>,
347
348 max_recursion_depth: usize,
350
351 debug_mode: bool,
353
354 current_input: Option<Value>,
356
357 error_mode: ErrorMode,
359}
360
361#[derive(Debug, Clone)]
363pub struct StackFrame {
364 pub name: String,
366
367 pub input: Value,
369
370 pub location: Option<Location>,
372}
373
374#[derive(Debug, Clone)]
376pub struct Location {
377 pub line: usize,
379
380 pub column: usize,
382
383 pub source: Option<String>,
385}
386
387#[derive(Debug, Clone, Copy, PartialEq, Eq)]
389pub enum ErrorMode {
390 Strict,
392
393 Collect,
395
396 Ignore,
398}
399
400impl Default for FilterContext {
401 fn default() -> Self {
402 Self::new()
403 }
404}
405
406impl FilterContext {
407 pub fn new() -> Self {
409 Self {
410 variables: HashMap::new(),
411 functions: HashMap::new(),
412 builtins: Arc::new(BuiltinRegistry::new()),
413 call_stack: Vec::new(),
414 max_recursion_depth: 1000,
415 debug_mode: false,
416 current_input: None,
417 error_mode: ErrorMode::Strict,
418 }
419 }
420
421 pub fn set_variable(&mut self, name: impl Into<String>, value: Value) {
423 self.variables.insert(name.into(), value);
424 }
425
426 pub fn set_functions(&mut self, functions: HashMap<String, FunctionDef>) {
428 self.functions = functions;
429 }
430
431 pub fn get_variable(&self, name: &str) -> Option<&Value> {
433 self.variables.get(name)
434 }
435
436 pub fn has_variable(&self, name: &str) -> bool {
438 self.variables.contains_key(name)
439 }
440
441 pub fn has_function(&self, name: &str) -> bool {
443 self.functions.contains_key(name) || self.builtins.has_function(name)
444 }
445
446 pub fn call_function(&mut self, name: &str, args: &[Value]) -> Result<Value> {
448 if self.call_stack.len() >= self.max_recursion_depth {
450 return Err(dsq_shared::error::operation_error(format!(
451 "Maximum recursion depth of {} exceeded in function '{}'",
452 self.max_recursion_depth, name
453 )));
454 }
455
456 if let Some(func_def) = self.functions.get(name).cloned() {
458 self.call_user_function(&func_def, args)
459 } else if self.builtins.has_function(name) {
460 self.builtins.call_function(name, args)
461 } else {
462 Err(dsq_shared::error::operation_error(format!(
463 "function '{name}'"
464 )))
465 }
466 }
467
468 fn call_user_function(&mut self, func_def: &FunctionDef, args: &[Value]) -> Result<Value> {
470 if !matches!(func_def.body, FunctionBody::Builtin(_))
472 && args.len() != func_def.parameters.len()
473 {
474 return Err(dsq_shared::error::operation_error(format!(
475 "Expected {} arguments, got {}",
476 func_def.parameters.len(),
477 args.len()
478 )));
479 }
480
481 let saved_vars = self.variables.clone();
483
484 for (param, arg) in func_def.parameters.iter().zip(args.iter()) {
486 self.set_variable(param.clone(), arg.clone());
487 }
488
489 let frame = StackFrame {
491 name: func_def.name.clone(),
492 input: self.current_input.clone().unwrap_or(Value::Null),
493 location: None,
494 };
495 self.call_stack.push(frame);
496
497 let result = match &func_def.body {
498 FunctionBody::Compiled(ops) => {
499 let mut current_value = self.current_input.clone().unwrap_or(Value::Null);
501 for op in ops {
502 current_value = op.apply(¤t_value)?;
503 }
504 Ok(current_value)
505 }
506 FunctionBody::Ast(_ast) => {
507 Err(dsq_shared::error::operation_error(
509 "AST execution not yet implemented",
510 ))
511 }
512 FunctionBody::Builtin(builtin_fn) => builtin_fn(args),
513 };
514
515 self.call_stack.pop();
517 self.variables = saved_vars;
518
519 result
520 }
521
522 pub fn set_input(&mut self, value: Value) {
524 self.current_input = Some(value);
525 }
526
527 pub fn get_input(&self) -> Option<&Value> {
529 self.current_input.as_ref()
530 }
531
532 pub fn recursion_depth(&self) -> usize {
534 self.call_stack.len()
535 }
536
537 pub fn set_debug_mode(&mut self, debug: bool) {
539 self.debug_mode = debug;
540 }
541
542 pub fn is_debug_mode(&self) -> bool {
544 self.debug_mode
545 }
546
547 pub fn set_error_mode(&mut self, mode: ErrorMode) {
549 self.error_mode = mode;
550 }
551
552 pub fn error_mode(&self) -> ErrorMode {
554 self.error_mode
555 }
556}
557
558impl dsq_shared::ops::Context for FilterContext {
559 fn get_variable(&self, name: &str) -> Option<&Value> {
560 self.variables.get(name)
561 }
562
563 fn set_variable(&mut self, name: &str, value: Value) {
564 self.variables.insert(name.to_string(), value);
565 }
566
567 fn as_any(&self) -> &dyn std::any::Any {
568 self
569 }
570
571 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
572 self
573 }
574}
575
576impl Default for FilterCompiler {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582impl FilterCompiler {
583 pub fn new() -> Self {
585 Self {
586 builtins: Arc::new(BuiltinRegistry::new()),
587 optimization_level: OptimizationLevel::Basic,
588 dataframe_optimizations: true,
589 max_recursion_depth: 1000,
590 }
591 }
592
593 pub fn with_optimization_level(mut self, level: OptimizationLevel) -> Self {
595 self.optimization_level = level;
596 self
597 }
598
599 pub fn with_dataframe_optimizations(mut self, enabled: bool) -> Self {
601 self.dataframe_optimizations = enabled;
602 self
603 }
604
605 pub fn with_max_recursion_depth(mut self, depth: usize) -> Self {
607 self.max_recursion_depth = depth;
608 self
609 }
610
611 pub fn compile_str(&self, filter: &str) -> Result<CompiledFilter> {
613 let parser = FilterParser::new();
615 let parsed = parser
616 .parse(filter)
617 .map_err(|e| dsq_shared::error::operation_error(format!("{e}")))?;
618
619 let mut ctx = CompilationContext::new();
621 for name in self.builtins.function_names() {
622 if let Some(func) = self.builtins.get_function(&name) {
623 ctx.functions.insert(
624 name.clone(),
625 FunctionDef {
626 name: name.clone(),
627 parameters: vec![], body: FunctionBody::Builtin(func),
629 is_recursive: false,
630 },
631 );
632 }
633 }
634
635 self.compile_expr(&parsed.expr, &mut ctx)
636 }
637
638 pub fn compile_expr(
640 &self,
641 expr: &Expr,
642 ctx: &mut CompilationContext,
643 ) -> Result<CompiledFilter> {
644 if ctx.depth >= self.max_recursion_depth {
645 return Err(dsq_shared::error::operation_error(format!(
646 "Maximum compilation depth {} exceeded",
647 self.max_recursion_depth
648 )));
649 }
650
651 ctx.depth += 1;
652 let result = self.compile_expr_inner(expr, ctx);
653 ctx.depth -= 1;
654 result
655 }
656
657 fn compile_expr_inner(
659 &self,
660 expr: &Expr,
661 ctx: &mut CompilationContext,
662 ) -> Result<CompiledFilter> {
663 match expr {
664 Expr::Identity => {
665 Ok(CompiledFilter {
667 operations: vec![Box::new(IdentityOperation)],
668 variables: HashMap::new(),
669 functions: HashMap::new(),
670 requires_lazy: false,
671 complexity: 1,
672 })
673 }
674 Expr::FieldAccess { base, fields } => {
675 self.compile_field_access(base, fields, ctx)
677 }
678 Expr::ArrayAccess { array, index } => {
679 self.compile_array_access(array, index, ctx)
681 }
682 Expr::ArraySlice { array, start, end } => {
683 self.compile_array_slice(
685 array,
686 start.as_ref().map(|v| &**v),
687 end.as_ref().map(|v| &**v),
688 ctx,
689 )
690 }
691 Expr::ArrayIteration(array) => {
692 self.compile_array_iteration(array, ctx)
694 }
695 Expr::FunctionCall { name, args } => {
696 self.compile_function_call(name, args, ctx)
698 }
699 Expr::BinaryOp { left, op, right } => {
700 self.compile_binary_op(left, op, right, ctx)
702 }
703 Expr::UnaryOp { op, expr } => {
704 self.compile_unary_op(op, expr, ctx)
706 }
707 Expr::Assignment { target, value, op } => {
708 self.compile_assignment(target, value, op, ctx)
710 }
711 Expr::Object { pairs } => {
712 self.compile_object(pairs, ctx)
714 }
715 Expr::Array(elements) => {
716 self.compile_array(elements, ctx)
718 }
719 Expr::Literal(lit) => {
720 self.compile_literal(lit)
722 }
723 Expr::Identifier(name) => {
724 self.compile_identifier(name)
726 }
727 Expr::Paren(inner) => {
728 self.compile_expr(inner, ctx)
730 }
731 Expr::Pipeline(exprs) => {
732 self.compile_pipeline(exprs, ctx)
734 }
735 Expr::Sequence(exprs) => {
736 self.compile_sequence(exprs, ctx)
738 }
739 Expr::Variable(name) => {
740 Ok(CompiledFilter {
742 operations: vec![Box::new(VariableOperation::new(name.clone()))],
743 variables: HashMap::new(),
744 functions: HashMap::new(),
745 requires_lazy: false,
746 complexity: 1,
747 })
748 }
749
750 Expr::If {
751 condition,
752 then_branch,
753 else_branch,
754 } => {
755 self.compile_if(condition, then_branch, else_branch, ctx)
757 }
758 }
759 }
760
761 fn compile_field_access(
763 &self,
764 base: &Expr,
765 fields: &[String],
766 ctx: &mut CompilationContext,
767 ) -> Result<CompiledFilter> {
768 let base_filter = self.compile_expr(base, ctx)?;
769
770 let mut operations = base_filter.operations;
771 let complexity = base_filter.complexity + fields.len();
772
773 if !fields.is_empty() {
776 operations.push(Box::new(FieldAccessOperation::with_fields(fields.to_vec())));
777 }
778
779 Ok(CompiledFilter {
780 operations,
781 variables: base_filter.variables,
782 functions: base_filter.functions,
783 requires_lazy: base_filter.requires_lazy,
784 complexity,
785 })
786 }
787
788 fn compile_field_access_literal(
790 &self,
791 base: &Expr,
792 field_name: &str,
793 ctx: &mut CompilationContext,
794 ) -> Result<CompiledFilter> {
795 let base_filter = self.compile_expr(base, ctx)?;
796
797 let mut operations = base_filter.operations;
798 operations.push(Box::new(FieldAccessOperation::new(field_name.to_string())));
799
800 Ok(CompiledFilter {
801 operations,
802 variables: base_filter.variables,
803 functions: base_filter.functions,
804 requires_lazy: base_filter.requires_lazy,
805 complexity: base_filter.complexity + 1,
806 })
807 }
808
809 fn compile_array_access(
811 &self,
812 array: &Expr,
813 index: &Expr,
814 ctx: &mut CompilationContext,
815 ) -> Result<CompiledFilter> {
816 if let Expr::Literal(Literal::String(field_name)) = index {
818 return self.compile_field_access_literal(array, field_name, ctx);
819 }
820
821 let array_filter = self.compile_expr(array, ctx)?;
822 let index_filter = self.compile_expr(index, ctx)?;
823
824 let mut variables = array_filter.variables;
825 variables.extend(index_filter.variables);
826
827 let mut functions = array_filter.functions;
828 functions.extend(index_filter.functions);
829
830 let operations: Vec<Box<dyn Operation + Send + Sync>> =
831 vec![Box::new(IndexOperation::new(index_filter.operations))];
832 let mut all_operations = array_filter.operations;
833 all_operations.extend(operations);
834
835 Ok(CompiledFilter {
836 operations: all_operations,
837 variables,
838 functions,
839 requires_lazy: array_filter.requires_lazy || index_filter.requires_lazy,
840 complexity: array_filter.complexity + index_filter.complexity + 2,
841 })
842 }
843
844 fn compile_array_slice(
846 &self,
847 array: &Expr,
848 start: Option<&Expr>,
849 end: Option<&Expr>,
850 ctx: &mut CompilationContext,
851 ) -> Result<CompiledFilter> {
852 let array_filter = self.compile_expr(array, ctx)?;
853
854 let start_ops = if let Some(start_expr) = start {
855 Some(self.compile_expr(start_expr, ctx)?.operations)
856 } else {
857 None
858 };
859
860 let end_ops = if let Some(end_expr) = end {
861 Some(self.compile_expr(end_expr, ctx)?.operations)
862 } else {
863 None
864 };
865
866 let operations: Vec<Box<dyn Operation + Send + Sync>> =
867 vec![Box::new(SliceOperation::new(start_ops, end_ops))];
868 let mut all_operations = array_filter.operations;
869 all_operations.extend(operations);
870
871 Ok(CompiledFilter {
872 operations: all_operations,
873 variables: array_filter.variables,
874 functions: array_filter.functions,
875 requires_lazy: array_filter.requires_lazy,
876 complexity: array_filter.complexity + 3,
877 })
878 }
879
880 fn compile_array_iteration(
882 &self,
883 array: &Expr,
884 ctx: &mut CompilationContext,
885 ) -> Result<CompiledFilter> {
886 let array_filter = self.compile_expr(array, ctx)?;
887
888 let operations: Vec<Box<dyn Operation + Send + Sync>> = vec![Box::new(IterateOperation)];
889 let mut all_operations = array_filter.operations;
890 all_operations.extend(operations);
891
892 Ok(CompiledFilter {
893 operations: all_operations,
894 variables: array_filter.variables,
895 functions: array_filter.functions,
896 requires_lazy: array_filter.requires_lazy,
897 complexity: array_filter.complexity + 3,
898 })
899 }
900
901 fn compile_binary_op(
903 &self,
904 left: &Expr,
905 op: &BinaryOperator,
906 right: &Expr,
907 ctx: &mut CompilationContext,
908 ) -> Result<CompiledFilter> {
909 let left_filter = self.compile_expr(left, ctx)?;
910 let right_filter = self.compile_expr(right, ctx)?;
911
912 let mut variables = left_filter.variables;
913 variables.extend(right_filter.variables);
914
915 let mut functions = left_filter.functions;
916 functions.extend(right_filter.functions);
917
918 let operation: Box<dyn Operation + Send + Sync> = match op {
919 BinaryOperator::Add => Box::new(AddOperation::new(
920 left_filter.operations,
921 right_filter.operations,
922 )),
923 BinaryOperator::Sub => Box::new(SubOperation::new(
924 left_filter.operations,
925 right_filter.operations,
926 )),
927 BinaryOperator::Mul => Box::new(MulOperation::new(
928 left_filter.operations,
929 right_filter.operations,
930 )),
931 BinaryOperator::Div => Box::new(DivOperation::new(
932 left_filter.operations,
933 right_filter.operations,
934 )),
935 BinaryOperator::Eq => Box::new(EqOperation::new(
936 left_filter.operations,
937 right_filter.operations,
938 )),
939 BinaryOperator::Ne => Box::new(NeOperation::new(
940 left_filter.operations,
941 right_filter.operations,
942 )),
943 BinaryOperator::Lt => Box::new(LtOperation::new(
944 left_filter.operations,
945 right_filter.operations,
946 )),
947 BinaryOperator::Le => Box::new(LeOperation::new(
948 left_filter.operations,
949 right_filter.operations,
950 )),
951 BinaryOperator::Gt => Box::new(GtOperation::new(
952 left_filter.operations,
953 right_filter.operations,
954 )),
955 BinaryOperator::Ge => Box::new(GeOperation::new(
956 left_filter.operations,
957 right_filter.operations,
958 )),
959 BinaryOperator::And => Box::new(AndOperation::new(
960 left_filter.operations,
961 right_filter.operations,
962 )),
963 BinaryOperator::Or => Box::new(OrOperation::new(
964 left_filter.operations,
965 right_filter.operations,
966 )),
967 };
968
969 Ok(CompiledFilter {
970 operations: vec![operation],
971 variables,
972 functions,
973 requires_lazy: left_filter.requires_lazy || right_filter.requires_lazy,
974 complexity: left_filter.complexity + right_filter.complexity + 2,
975 })
976 }
977
978 fn compile_unary_op(
980 &self,
981 op: &UnaryOperator,
982 expr: &Expr,
983 ctx: &mut CompilationContext,
984 ) -> Result<CompiledFilter> {
985 let expr_filter = self.compile_expr(expr, ctx)?;
986
987 let operation: Box<dyn Operation + Send + Sync> = match op {
988 UnaryOperator::Not => Box::new(NegationOperation::new(expr_filter.operations)),
989 UnaryOperator::Del => Box::new(DelOperation::new(expr_filter.operations)),
990 };
991
992 Ok(CompiledFilter {
993 operations: vec![operation],
994 variables: expr_filter.variables,
995 functions: expr_filter.functions,
996 requires_lazy: expr_filter.requires_lazy,
997 complexity: expr_filter.complexity + 1,
998 })
999 }
1000
1001 fn compile_if(
1003 &self,
1004 condition: &Expr,
1005 then_branch: &Expr,
1006 else_branch: &Expr,
1007 ctx: &mut CompilationContext,
1008 ) -> Result<CompiledFilter> {
1009 let condition_filter = self.compile_expr(condition, ctx)?;
1010 let then_filter = self.compile_expr(then_branch, ctx)?;
1011 let else_filter = self.compile_expr(else_branch, ctx)?;
1012
1013 let mut variables = condition_filter.variables;
1014 variables.extend(then_filter.variables);
1015 variables.extend(else_filter.variables);
1016
1017 let mut functions = condition_filter.functions;
1018 functions.extend(then_filter.functions);
1019 functions.extend(else_filter.functions);
1020
1021 let operation = Box::new(IfOperation::new(
1022 condition_filter.operations,
1023 then_filter.operations,
1024 else_filter.operations,
1025 ));
1026
1027 Ok(CompiledFilter {
1028 operations: vec![operation],
1029 variables,
1030 functions,
1031 requires_lazy: condition_filter.requires_lazy
1032 || then_filter.requires_lazy
1033 || else_filter.requires_lazy,
1034 complexity: condition_filter.complexity
1035 + then_filter.complexity
1036 + else_filter.complexity
1037 + 3,
1038 })
1039 }
1040
1041 fn compile_function_call(
1043 &self,
1044 name: &str,
1045 args: &[Expr],
1046 ctx: &mut CompilationContext,
1047 ) -> Result<CompiledFilter> {
1048 if name == "join" && args.len() == 2 {
1050 if let (
1051 Expr::Literal(Literal::String(file_path)),
1052 Expr::BinaryOp {
1053 left,
1054 op: BinaryOperator::Eq,
1055 right,
1056 },
1057 ) = (&args[0], &args[1])
1058 {
1059 if let (
1061 Expr::FieldAccess {
1062 base: left_base,
1063 fields: left_fields,
1064 },
1065 Expr::FieldAccess {
1066 base: right_base,
1067 fields: right_fields,
1068 },
1069 ) = (&**left, &**right)
1070 {
1071 if matches!(**left_base, Expr::Identity)
1072 && matches!(**right_base, Expr::Identity)
1073 && left_fields.len() == 1
1074 && right_fields.len() == 1
1075 {
1076 let left_key = left_fields[0].clone();
1077 let right_key = right_fields[0].clone();
1078 let operation = Box::new(JoinFromFileOperation::new(
1079 file_path.clone(),
1080 left_key,
1081 right_key,
1082 ));
1083 return Ok(CompiledFilter {
1084 operations: vec![operation],
1085 variables: HashMap::new(),
1086 functions: HashMap::new(),
1087 requires_lazy: false,
1088 complexity: 10, });
1090 }
1091 }
1092 }
1093 }
1094
1095 let mut arg_filters = Vec::new();
1097 let mut variables = HashMap::new();
1098 let mut functions = HashMap::new();
1099 let mut complexity = 1; let mut requires_lazy = false;
1101
1102 for arg in args {
1103 let arg_filter = self.compile_expr(arg, ctx)?;
1104 variables.extend(arg_filter.variables);
1105 functions.extend(arg_filter.functions);
1106 complexity += arg_filter.complexity;
1107 requires_lazy |= arg_filter.requires_lazy;
1108 arg_filters.push(arg_filter.operations);
1109 }
1110
1111 let operation = Box::new(FunctionCallOperation::new(
1112 name.to_string(),
1113 arg_filters,
1114 self.builtins.clone(),
1115 ));
1116
1117 Ok(CompiledFilter {
1118 operations: vec![operation],
1119 variables,
1120 functions,
1121 requires_lazy,
1122 complexity,
1123 })
1124 }
1125
1126 fn compile_assignment(
1128 &self,
1129 target: &Expr,
1130 value: &Expr,
1131 op: &dsq_parser::AssignmentOperator,
1132 ctx: &mut CompilationContext,
1133 ) -> Result<CompiledFilter> {
1134 let shared_op = match op {
1135 dsq_parser::AssignmentOperator::AddAssign => AssignmentOperator::AddAssign,
1136 dsq_parser::AssignmentOperator::UpdateAssign => AssignmentOperator::UpdateAssign,
1137 };
1138
1139 match shared_op {
1140 AssignmentOperator::AddAssign => {
1141 let target_filter = self.compile_expr(target, ctx)?;
1143 let value_filter = self.compile_expr(value, ctx)?;
1144
1145 let mut variables = target_filter.variables;
1146 variables.extend(value_filter.variables);
1147
1148 let mut functions = target_filter.functions;
1149 functions.extend(value_filter.functions);
1150
1151 let operation = Box::new(AssignmentOperation::new(
1152 target_filter.operations,
1153 AssignmentOperator::AddAssign,
1154 value_filter.operations,
1155 ));
1156
1157 Ok(CompiledFilter {
1158 operations: vec![operation],
1159 variables,
1160 functions,
1161 requires_lazy: target_filter.requires_lazy || value_filter.requires_lazy,
1162 complexity: target_filter.complexity + value_filter.complexity + 3,
1163 })
1164 }
1165 AssignmentOperator::UpdateAssign => {
1166 let target_filter = self.compile_expr(target, ctx)?;
1168 let value_filter = self.compile_expr(value, ctx)?;
1169
1170 let mut variables = target_filter.variables;
1171 variables.extend(value_filter.variables);
1172
1173 let mut functions = target_filter.functions;
1174 functions.extend(value_filter.functions);
1175
1176 let operation = Box::new(AssignmentOperation::new(
1177 target_filter.operations,
1178 AssignmentOperator::UpdateAssign,
1179 value_filter.operations,
1180 ));
1181
1182 Ok(CompiledFilter {
1183 operations: vec![operation],
1184 variables,
1185 functions,
1186 requires_lazy: target_filter.requires_lazy || value_filter.requires_lazy,
1187 complexity: target_filter.complexity + value_filter.complexity + 3,
1188 })
1189 }
1190 }
1191 }
1192
1193 fn compile_object(
1195 &self,
1196 pairs: &[ObjectEntry],
1197 ctx: &mut CompilationContext,
1198 ) -> Result<CompiledFilter> {
1199 let mut field_operations = Vec::new();
1200 let mut variables = HashMap::new();
1201 let mut functions = HashMap::new();
1202 let mut complexity = 2; let mut requires_lazy = false;
1204
1205 for pair in pairs {
1206 match pair {
1207 ObjectEntry::KeyValue { key, value } => {
1208 let key_filter = self.compile_literal(&Literal::String(key.clone()))?;
1209 variables.extend(key_filter.variables);
1210 functions.extend(key_filter.functions);
1211 complexity += key_filter.complexity;
1212 requires_lazy |= key_filter.requires_lazy;
1213
1214 let value_filter = self.compile_expr(value, ctx)?;
1215 variables.extend(value_filter.variables);
1216 functions.extend(value_filter.functions);
1217 complexity += value_filter.complexity;
1218 requires_lazy |= value_filter.requires_lazy;
1219
1220 let key_op = key_filter
1221 .operations
1222 .into_iter()
1223 .next()
1224 .unwrap_or_else(|| Box::new(IdentityOperation));
1225 let value_ops = value_filter.operations;
1226 field_operations.push((key_op, Some(value_ops)));
1227 }
1228 ObjectEntry::Shorthand(key) => {
1229 let key_filter = self.compile_literal(&Literal::String(key.clone()))?;
1230 variables.extend(key_filter.variables);
1231 functions.extend(key_filter.functions);
1232 complexity += key_filter.complexity;
1233 requires_lazy |= key_filter.requires_lazy;
1234
1235 let value_expr = Expr::FieldAccess {
1237 base: Box::new(Expr::Identity),
1238 fields: vec![key.clone()],
1239 };
1240 let value_filter = self.compile_expr(&value_expr, ctx)?;
1241 variables.extend(value_filter.variables);
1242 functions.extend(value_filter.functions);
1243 complexity += value_filter.complexity;
1244 requires_lazy |= value_filter.requires_lazy;
1245
1246 let key_op = key_filter
1247 .operations
1248 .into_iter()
1249 .next()
1250 .unwrap_or_else(|| Box::new(IdentityOperation));
1251 let value_ops = value_filter.operations;
1252 field_operations.push((key_op, Some(value_ops)));
1253 }
1254 }
1255 }
1256
1257 Ok(CompiledFilter {
1258 operations: vec![Box::new(ObjectConstructOperation::new(field_operations))],
1259 variables,
1260 functions,
1261 requires_lazy,
1262 complexity,
1263 })
1264 }
1265
1266 fn compile_array(
1268 &self,
1269 elements: &[Expr],
1270 ctx: &mut CompilationContext,
1271 ) -> Result<CompiledFilter> {
1272 let mut element_filters = Vec::new();
1273 let mut variables = HashMap::new();
1274 let mut functions = HashMap::new();
1275 let mut complexity = 1;
1276 let mut requires_lazy = false;
1277
1278 for element in elements {
1279 let element_filter = self.compile_expr(element, ctx)?;
1280 variables.extend(element_filter.variables);
1281 functions.extend(element_filter.functions);
1282 complexity += element_filter.complexity;
1283 requires_lazy |= element_filter.requires_lazy;
1284 element_filters.push(element_filter.operations);
1285 }
1286
1287 let mut all_element_ops = Vec::new();
1288 for element_ops in element_filters {
1289 all_element_ops.extend(element_ops);
1290 }
1291 let operations: Vec<Box<dyn Operation + Send + Sync>> =
1292 vec![Box::new(ArrayConstructOperation::new(all_element_ops))];
1293
1294 Ok(CompiledFilter {
1295 operations,
1296 variables,
1297 functions,
1298 requires_lazy,
1299 complexity,
1300 })
1301 }
1302
1303 fn compile_literal(&self, lit: &Literal) -> Result<CompiledFilter> {
1305 let value = match lit {
1306 Literal::Int(i) => Value::Int(*i),
1307 Literal::BigInt(bi) => Value::BigInt(bi.clone()),
1308 Literal::Float(f) => Value::Float(*f),
1309 Literal::String(s) => Value::String(s.clone()),
1310 Literal::Bool(b) => Value::Bool(*b),
1311 Literal::Null => Value::Null,
1312 };
1313
1314 Ok(CompiledFilter {
1315 operations: vec![Box::new(LiteralOperation::new(value))],
1316 variables: HashMap::new(),
1317 functions: HashMap::new(),
1318 requires_lazy: false,
1319 complexity: 1,
1320 })
1321 }
1322
1323 fn compile_identifier(&self, name: &str) -> Result<CompiledFilter> {
1325 if self.builtins.has_function(name) {
1327 Ok(CompiledFilter {
1328 operations: vec![Box::new(FunctionCallOperation::new(
1329 name.to_string(),
1330 vec![], self.builtins.clone(),
1332 ))],
1333 variables: HashMap::new(),
1334 functions: HashMap::new(),
1335 requires_lazy: false,
1336 complexity: 1,
1337 })
1338 } else {
1339 Ok(CompiledFilter {
1340 operations: vec![Box::new(VariableOperation::new(name.to_string()))],
1341 variables: HashMap::new(),
1342 functions: HashMap::new(),
1343 requires_lazy: false,
1344 complexity: 1,
1345 })
1346 }
1347 }
1348
1349 fn compile_pipeline(
1351 &self,
1352 exprs: &[Expr],
1353 ctx: &mut CompilationContext,
1354 ) -> Result<CompiledFilter> {
1355 if exprs.is_empty() {
1356 return Ok(CompiledFilter {
1357 operations: vec![Box::new(IdentityOperation)],
1358 variables: HashMap::new(),
1359 functions: HashMap::new(),
1360 requires_lazy: false,
1361 complexity: 1,
1362 });
1363 }
1364
1365 if exprs.len() == 1 {
1366 return self.compile_expr(&exprs[0], ctx);
1367 }
1368
1369 let all_maps = exprs.iter().all(|expr| {
1371 matches!(expr, Expr::FunctionCall { name, args } if name == "map" && args.len() == 1)
1372 });
1373
1374 if all_maps {
1375 let map_args: Vec<Expr> = exprs
1377 .iter()
1378 .filter_map(|expr| {
1379 if let Expr::FunctionCall { name, args } = expr {
1380 if name == "map" && args.len() == 1 {
1381 Some(args[0].clone())
1382 } else {
1383 None
1384 }
1385 } else {
1386 None
1387 }
1388 })
1389 .collect();
1390
1391 let combined_expr = Expr::FunctionCall {
1392 name: "map".to_string(),
1393 args: vec![Expr::Pipeline(map_args)],
1394 };
1395 return self.compile_expr(&combined_expr, ctx);
1396 }
1397
1398 let mut all_operations = Vec::new();
1400 let mut variables = HashMap::new();
1401 let mut functions = HashMap::new();
1402 let mut complexity = 0;
1403 let mut requires_lazy = false;
1404
1405 for expr in exprs {
1406 let filter = self.compile_expr(expr, ctx)?;
1407 all_operations.extend(filter.operations);
1408 variables.extend(filter.variables);
1409 functions.extend(filter.functions);
1410 complexity += filter.complexity;
1411 requires_lazy |= filter.requires_lazy;
1412 }
1413
1414 Ok(CompiledFilter {
1415 operations: all_operations,
1416 variables,
1417 functions,
1418 requires_lazy,
1419 complexity: complexity + (exprs.len() - 1), })
1421 }
1422
1423 fn compile_sequence(
1425 &self,
1426 exprs: &[Expr],
1427 ctx: &mut CompilationContext,
1428 ) -> Result<CompiledFilter> {
1429 if exprs.is_empty() {
1430 return Ok(CompiledFilter {
1431 operations: vec![Box::new(IdentityOperation)],
1432 variables: HashMap::new(),
1433 functions: HashMap::new(),
1434 requires_lazy: false,
1435 complexity: 1,
1436 });
1437 }
1438
1439 if exprs.len() == 1 {
1440 return self.compile_expr(&exprs[0], ctx);
1441 }
1442
1443 let mut expr_operations = Vec::new();
1445 let mut variables = HashMap::new();
1446 let mut functions = HashMap::new();
1447 let mut complexity = 0;
1448 let mut requires_lazy = false;
1449
1450 for expr in exprs {
1451 let filter = self.compile_expr(expr, ctx)?;
1452 expr_operations.push(filter.operations);
1453 variables.extend(filter.variables);
1454 functions.extend(filter.functions);
1455 complexity += filter.complexity;
1456 requires_lazy |= filter.requires_lazy;
1457 }
1458
1459 let operations: Vec<Box<dyn Operation + Send + Sync>> =
1460 vec![Box::new(SequenceOperation::new(expr_operations))];
1461
1462 Ok(CompiledFilter {
1463 operations,
1464 variables,
1465 functions,
1466 requires_lazy,
1467 complexity: complexity + (exprs.len() - 1), })
1469 }
1470}
1471
1472struct FunctionCallOperation {
1473 name: String,
1474 arg_ops: Vec<Vec<Box<dyn Operation + Send + Sync>>>,
1475 builtins: Arc<BuiltinRegistry>,
1476}
1477
1478impl FunctionCallOperation {
1479 fn new(
1480 name: String,
1481 arg_ops: Vec<Vec<Box<dyn Operation + Send + Sync>>>,
1482 builtins: Arc<BuiltinRegistry>,
1483 ) -> Self {
1484 Self {
1485 name,
1486 arg_ops,
1487 builtins,
1488 }
1489 }
1490}
1491
1492impl Operation for FunctionCallOperation {
1493 fn apply(&self, value: &Value) -> Result<Value> {
1494 let mut context = None;
1495 self.apply_with_context(value, &mut context)
1496 }
1497
1498 fn apply_with_context(
1499 &self,
1500 value: &Value,
1501 context: &mut Option<&mut dyn dsq_shared::ops::Context>,
1502 ) -> Result<Value> {
1503 match self.name.as_str() {
1504 "add" | "head" | "tail" | "limit" | "url_set_protocol" | "replace"
1505 | "spaces_to_tabs" | "tabs_to_spaces" | "contains" => {
1506 let mut arg_values = vec![value.clone()];
1508
1509 for arg_ops in &self.arg_ops {
1511 let mut arg_val = value.clone();
1512 for op in arg_ops {
1513 arg_val = op.apply(&arg_val)?;
1514 }
1515 arg_values.push(arg_val);
1516 }
1517
1518 self.builtins.call_function(&self.name, &arg_values)
1520 }
1521 "length" | "len" => {
1522 if self.arg_ops.is_empty() {
1523 match value {
1524 Value::Array(arr) => Ok(Value::Int(arr.len() as i64)),
1525 Value::String(s) => Ok(Value::Int(s.chars().count() as i64)),
1526 Value::Object(obj) => Ok(Value::Int(obj.len() as i64)),
1527 Value::DataFrame(df) => Ok(Value::Int(df.height() as i64)),
1528 Value::Series(s) => Ok(Value::Int(s.len() as i64)),
1529 Value::Null => Ok(Value::Int(0)),
1530 _ => Ok(Value::Int(1)),
1531 }
1532 } else if self.arg_ops.len() == 1 {
1533 let mut arg_val = value.clone();
1534 for op in &self.arg_ops[0] {
1535 arg_val = op.apply_with_context(&arg_val, context)?;
1536 }
1537 match arg_val {
1538 Value::Array(arr) => Ok(Value::Int(arr.len() as i64)),
1539 Value::String(s) => Ok(Value::Int(s.chars().count() as i64)),
1540 Value::Object(obj) => Ok(Value::Int(obj.len() as i64)),
1541 Value::DataFrame(df) => Ok(Value::Int(df.height() as i64)),
1542 Value::Series(s) => Ok(Value::Int(s.len() as i64)),
1543 Value::Null => Ok(Value::Int(0)),
1544 _ => Ok(Value::Int(1)),
1545 }
1546 } else {
1547 Err(dsq_shared::error::operation_error(
1548 "length() expects 0 or 1 argument",
1549 ))
1550 }
1551 }
1552
1553 "keys" => match value {
1554 Value::Object(obj) => {
1555 let keys: Vec<Value> = obj.keys().map(|k| Value::String(k.clone())).collect();
1556 Ok(Value::Array(keys))
1557 }
1558 _ => Err(dsq_shared::error::operation_error(
1559 "keys() requires an object",
1560 )),
1561 },
1562 "values" => match value {
1563 Value::Object(obj) => {
1564 let values: Vec<Value> = obj.values().cloned().collect();
1565 Ok(Value::Array(values))
1566 }
1567 _ => Err(dsq_shared::error::operation_error(
1568 "values() requires an object",
1569 )),
1570 },
1571 "group_by" => {
1572 if self.arg_ops.len() != 1 {
1573 return Err(dsq_shared::error::operation_error(
1574 "group_by() expects 1 argument",
1575 ));
1576 }
1577
1578 match value {
1579 Value::Array(arr) => {
1580 let mut groups: HashMap<String, Vec<Value>> = HashMap::new();
1581 for item in arr {
1582 let mut key_value = item.clone();
1583 for op in &self.arg_ops[0] {
1584 key_value = op.apply_with_context(&key_value, context)?;
1585 }
1586 let key = match key_value {
1587 Value::String(s) => s,
1588 Value::Int(i) => i.to_string(),
1589 Value::Float(f) => f.to_string(),
1590 Value::Bool(b) => b.to_string(),
1591 _ => "".to_string(),
1592 };
1593 groups.entry(key).or_default().push(item.clone());
1594 }
1595 let result: Vec<Value> = groups.into_values().map(Value::Array).collect();
1596 Ok(Value::Array(result))
1597 }
1598 Value::DataFrame(df) => {
1599 let mut groups: HashMap<String, Vec<usize>> = HashMap::new();
1602 for i in 0..df.height() {
1603 let mut row_obj = HashMap::new();
1605 for col_name in df.get_column_names() {
1606 if let Ok(s) = df.column(col_name) {
1607 if let Ok(val) = s.get(i) {
1608 let value =
1609 value_from_any_value(val).unwrap_or(Value::Null);
1610 row_obj.insert(col_name.to_string(), value);
1611 }
1612 }
1613 }
1614 let row_value = Value::Object(row_obj);
1615 let mut key_value = row_value.clone();
1616 for op in &self.arg_ops[0] {
1617 key_value = op.apply_with_context(&key_value, context)?;
1618 }
1619 let key = match key_value {
1620 Value::String(s) => s,
1621 Value::Int(i) => i.to_string(),
1622 Value::Float(f) => f.to_string(),
1623 Value::Bool(b) => b.to_string(),
1624 _ => "".to_string(),
1625 };
1626 groups.entry(key).or_default().push(i);
1627 }
1628 let mut result = Vec::new();
1629 for (_key, indices) in groups {
1630 let mut group = Vec::new();
1631 for &i in &indices {
1632 let mut row_obj = HashMap::new();
1633 for col_name in df.get_column_names() {
1634 if let Ok(s) = df.column(col_name) {
1635 if let Ok(val) = s.get(i) {
1636 let value =
1637 value_from_any_value(val).unwrap_or(Value::Null);
1638 row_obj.insert(col_name.to_string(), value);
1639 }
1640 }
1641 }
1642 group.push(Value::Object(row_obj));
1643 }
1644 result.push(Value::Array(group));
1645 }
1646 Ok(Value::Array(result))
1647 }
1648 _ => Err(dsq_shared::error::operation_error(
1649 "group_by() requires an array or DataFrame",
1650 )),
1651 }
1652 }
1653 "reverse" => {
1654 if !self.arg_ops.is_empty() {
1655 return Err(dsq_shared::error::operation_error(
1656 "reverse() expects no arguments",
1657 ));
1658 }
1659 self.builtins
1660 .call_function("reverse", std::slice::from_ref(value))
1661 }
1662 "sort_by" => {
1663 if self.arg_ops.len() != 1 {
1664 return Err(dsq_shared::error::operation_error(
1665 "sort_by() expects 1 argument",
1666 ));
1667 }
1668
1669 let mut key_values = Vec::new();
1671 match value {
1672 Value::Array(arr) => {
1673 for item in arr {
1674 let mut key_value = item.clone();
1675 for op in &self.arg_ops[0] {
1676 key_value = op.apply_with_context(&key_value, context)?;
1677 }
1678 key_values.push(key_value);
1679 }
1680 self.builtins
1681 .call_function("sort_by", &[value.clone(), Value::Array(key_values)])
1682 }
1683 Value::DataFrame(df) => {
1684 for i in 0..df.height() {
1686 let mut row_obj = HashMap::new();
1687 for col_name in df.get_column_names() {
1688 if let Ok(s) = df.column(col_name) {
1689 if let Ok(val) = s.get(i) {
1690 let value =
1691 value_from_any_value(val).unwrap_or(Value::Null);
1692 row_obj.insert(col_name.to_string(), value);
1693 }
1694 }
1695 }
1696 let row_value = Value::Object(row_obj);
1697 let mut key_value = row_value;
1698 for op in &self.arg_ops[0] {
1699 key_value = op.apply_with_context(&key_value, context)?;
1700 }
1701 key_values.push(key_value);
1702 }
1703 self.builtins
1704 .call_function("sort_by", &[value.clone(), Value::Array(key_values)])
1705 }
1706 _ => Err(dsq_shared::error::operation_error(
1707 "sort_by() requires an array or DataFrame",
1708 )),
1709 }
1710 }
1711 "min_by" => {
1712 if self.arg_ops.len() != 1 {
1713 return Err(dsq_shared::error::operation_error(
1714 "min_by() expects 1 argument",
1715 ));
1716 }
1717
1718 let mut key_values = Vec::new();
1720 match value {
1721 Value::Array(arr) => {
1722 for item in arr {
1723 let mut key_value = item.clone();
1724 for op in &self.arg_ops[0] {
1725 key_value = op.apply_with_context(&key_value, context)?;
1726 }
1727 key_values.push(key_value);
1728 }
1729 self.builtins
1730 .call_function("min_by", &[value.clone(), Value::Array(key_values)])
1731 }
1732 Value::DataFrame(df) => {
1733 for i in 0..df.height() {
1735 let mut row_obj = HashMap::new();
1736 for col_name in df.get_column_names() {
1737 if let Ok(s) = df.column(col_name) {
1738 if let Ok(val) = s.get(i) {
1739 let value =
1740 value_from_any_value(val).unwrap_or(Value::Null);
1741 row_obj.insert(col_name.to_string(), value);
1742 }
1743 }
1744 }
1745 let row_value = Value::Object(row_obj);
1746 let mut key_value = row_value;
1747 for op in &self.arg_ops[0] {
1748 key_value = op.apply_with_context(&key_value, context)?;
1749 }
1750 key_values.push(key_value);
1751 }
1752 self.builtins
1753 .call_function("min_by", &[value.clone(), Value::Array(key_values)])
1754 }
1755 _ => Err(dsq_shared::error::operation_error(
1756 "min_by() requires an array or DataFrame",
1757 )),
1758 }
1759 }
1760 "max_by" => {
1761 if self.arg_ops.len() != 1 {
1762 return Err(dsq_shared::error::operation_error(
1763 "max_by() expects 1 argument",
1764 ));
1765 }
1766
1767 let mut key_values = Vec::new();
1769 match value {
1770 Value::Array(arr) => {
1771 for item in arr {
1772 let mut key_value = item.clone();
1773 for op in &self.arg_ops[0] {
1774 key_value = op.apply_with_context(&key_value, context)?;
1775 }
1776 key_values.push(key_value);
1777 }
1778 self.builtins
1779 .call_function("max_by", &[value.clone(), Value::Array(key_values)])
1780 }
1781 Value::DataFrame(df) => {
1782 for i in 0..df.height() {
1784 let mut row_obj = HashMap::new();
1785 for col_name in df.get_column_names() {
1786 if let Ok(s) = df.column(col_name) {
1787 if let Ok(val) = s.get(i) {
1788 let value =
1789 value_from_any_value(val).unwrap_or(Value::Null);
1790 row_obj.insert(col_name.to_string(), value);
1791 }
1792 }
1793 }
1794 let row_value = Value::Object(row_obj);
1795 let mut key_value = row_value;
1796 for op in &self.arg_ops[0] {
1797 key_value = op.apply_with_context(&key_value, context)?;
1798 }
1799 key_values.push(key_value);
1800 }
1801 self.builtins
1802 .call_function("max_by", &[value.clone(), Value::Array(key_values)])
1803 }
1804 _ => Err(dsq_shared::error::operation_error(
1805 "max_by() requires an array or DataFrame",
1806 )),
1807 }
1808 }
1809 "map" => {
1810 if self.arg_ops.len() != 1 {
1811 return Err(dsq_shared::error::operation_error(
1812 "map() expects 1 argument",
1813 ));
1814 }
1815
1816 match value {
1817 Value::Array(arr) => {
1818 let mut result = Vec::new();
1819 let is_identity = self.arg_ops[0].len() == 1
1821 && self.arg_ops[0][0]
1822 .as_any()
1823 .is::<dsq_shared::ops::IdentityOperation>();
1824 for item in arr {
1825 let mut transformed = item.clone();
1826 for op in &self.arg_ops[0] {
1827 transformed = op.apply_with_context(&transformed, context)?;
1828 }
1829 if !matches!(transformed, Value::Null) || is_identity {
1831 result.push(transformed);
1832 }
1833 }
1834 Ok(Value::Array(result))
1835 }
1836 Value::DataFrame(df) => {
1837 let mut results = Vec::new();
1841 for i in 0..df.height() {
1842 let mut row_obj = HashMap::new();
1844 for col_name in df.get_column_names() {
1845 let series = df.column(col_name).map_err(|e| {
1846 dsq_shared::error::operation_error(format!(
1847 "Failed to get column: {}",
1848 e
1849 ))
1850 })?;
1851 let any_val = series.get(i).map_err(|e| {
1852 dsq_shared::error::operation_error(format!(
1853 "Failed to get value: {}",
1854 e
1855 ))
1856 })?;
1857 let val = value_from_any_value(any_val).unwrap_or(Value::Null);
1858 row_obj.insert(col_name.to_string(), val);
1859 }
1860 let row_value = Value::Object(row_obj);
1861
1862 let mut ctx = dsq_shared::ops::SimpleContext {
1863 value: row_value.clone(),
1864 };
1865 let mut transformed = row_value;
1866 for op in &self.arg_ops[0] {
1867 transformed =
1868 op.apply_with_context(&transformed, &mut Some(&mut ctx))?;
1869 }
1870 if !matches!(transformed, Value::Null) {
1872 results.push(transformed);
1873 }
1874 }
1875 Ok(Value::Array(results))
1876 }
1877 Value::Series(series) => {
1878 let mut results = Vec::new();
1879 for i in 0..series.len() {
1880 let any_val = series.get(i).map_err(|e| {
1881 dsq_shared::error::operation_error(format!(
1882 "Failed to get value: {e}"
1883 ))
1884 })?;
1885 let val = value_from_any_value(any_val)?;
1886 let mut ctx = dsq_shared::ops::SimpleContext { value: val.clone() };
1887 let mut transformed = val;
1888 for op in &self.arg_ops[0] {
1889 transformed =
1890 op.apply_with_context(&transformed, &mut Some(&mut ctx))?;
1891 }
1892 if !matches!(transformed, Value::Null) {
1893 results.push(transformed);
1894 }
1895 }
1896 Ok(Value::Array(results))
1897 }
1898 _ => {
1899 let mut transformed = value.clone();
1900 for op in &self.arg_ops[0] {
1901 transformed = op.apply_with_context(&transformed, context)?;
1902 }
1903 Ok(transformed)
1904 }
1905 }
1906 }
1907 "filter" => {
1908 if self.arg_ops.len() != 1 {
1909 return Err(dsq_shared::error::operation_error(
1910 "filter() expects 1 argument",
1911 ));
1912 }
1913
1914 match value {
1915 Value::Array(arr) => {
1916 let mut result = Vec::new();
1917 for item in arr {
1918 let mut predicate_result = item.clone();
1919 for op in &self.arg_ops[0] {
1920 predicate_result =
1921 op.apply_with_context(&predicate_result, context)?;
1922 }
1923 if dsq_shared::value::is_truthy(&predicate_result) {
1924 result.push(item.clone());
1925 }
1926 }
1927 Ok(Value::Array(result))
1928 }
1929 Value::DataFrame(df) => {
1930 let mut mask_values = Vec::new();
1932 for i in 0..df.height() {
1933 let mut row_obj = HashMap::new();
1935 for col_name in df.get_column_names() {
1936 let series = df.column(col_name).map_err(|e| {
1937 dsq_shared::error::operation_error(format!(
1938 "Failed to get column: {e}"
1939 ))
1940 })?;
1941 let any_val = series.get(i).map_err(|e| {
1942 dsq_shared::error::operation_error(format!(
1943 "Failed to get value: {e}"
1944 ))
1945 })?;
1946 let val = value_from_any_value(any_val).unwrap_or(Value::Null);
1947 row_obj.insert(col_name.to_string(), val);
1948 }
1949 let row_value = Value::Object(row_obj);
1950
1951 let mut predicate_result = row_value;
1952 for op in &self.arg_ops[0] {
1953 predicate_result =
1954 op.apply_with_context(&predicate_result, context)?;
1955 }
1956 mask_values.push(dsq_shared::value::is_truthy(&predicate_result));
1957 }
1958
1959 let mask_series = Series::new("mask".into(), mask_values);
1960 let boolean_chunked = mask_series.bool().map_err(|e| {
1961 dsq_shared::error::operation_error(format!(
1962 "Failed to create boolean mask: {}",
1963 e
1964 ))
1965 })?;
1966 let filtered_df = df.filter(boolean_chunked).map_err(|e| {
1967 dsq_shared::error::operation_error(format!(
1968 "Failed to filter DataFrame: {}",
1969 e
1970 ))
1971 })?;
1972 Ok(Value::DataFrame(filtered_df))
1973 }
1974 _ => Err(dsq_shared::error::operation_error(
1975 "filter() requires an array or DataFrame",
1976 )),
1977 }
1978 }
1979 "transform_values" => {
1980 if self.arg_ops.len() != 2 {
1981 return Err(dsq_shared::error::operation_error(
1982 "transform_values() expects 2 arguments",
1983 ));
1984 }
1985
1986 let mut collection_val = value.clone();
1988 for op in &self.arg_ops[0] {
1989 collection_val = op.apply_with_context(&collection_val, context)?;
1990 }
1991
1992 match collection_val {
1993 Value::Array(arr) => {
1994 let mut result = Vec::new();
1995 for item in arr {
1996 let mut transformed = item.clone();
1997 for op in &self.arg_ops[1] {
1998 transformed = op.apply_with_context(&transformed, context)?;
1999 }
2000 result.push(transformed);
2001 }
2002 Ok(Value::Array(result))
2003 }
2004 Value::DataFrame(_) => Err(dsq_shared::error::operation_error(
2005 "transform_values() on DataFrame not implemented",
2006 )),
2007 _ => Err(dsq_shared::error::operation_error(
2008 "transform_values() requires an array",
2009 )),
2010 }
2011 }
2012 "map_values" => {
2013 if self.arg_ops.len() != 1 {
2014 return Err(dsq_shared::error::operation_error(
2015 "map_values() expects 1 argument",
2016 ));
2017 }
2018
2019 match value {
2020 Value::Object(obj) => {
2021 let mut result = HashMap::new();
2022 for (key, val) in obj {
2023 let mut transformed = val.clone();
2024 for op in &self.arg_ops[0] {
2025 transformed = op.apply_with_context(&transformed, context)?;
2026 }
2027 result.insert(key.clone(), transformed);
2028 }
2029 Ok(Value::Object(result))
2030 }
2031 Value::Array(arr) => {
2032 let mut result = Vec::new();
2033 for val in arr {
2034 let mut transformed = val.clone();
2035 for op in &self.arg_ops[0] {
2036 transformed = op.apply_with_context(&transformed, context)?;
2037 }
2038 result.push(transformed);
2039 }
2040 Ok(Value::Array(result))
2041 }
2042 Value::DataFrame(df) => {
2043 let mut new_series = Vec::new();
2045 for col_name in df.get_column_names() {
2046 if let Ok(series) = df.column(col_name) {
2047 let mut values = Vec::new();
2049 for i in 0..series.len() {
2050 if let Ok(val) = series.get(i) {
2051 let value =
2052 value_from_any_value(val).unwrap_or(Value::Null);
2053 let mut transformed = value;
2054 for op in &self.arg_ops[0] {
2055 transformed =
2056 op.apply_with_context(&transformed, context)?;
2057 }
2058 values.push(transformed);
2059 }
2060 }
2061 let new_series_data = values_to_series(col_name, &values)?;
2063 new_series.push(new_series_data);
2064 }
2065 }
2066 let columns: Vec<_> = new_series.into_iter().map(|s| s.into()).collect();
2067 match DataFrame::new(columns) {
2068 Ok(new_df) => Ok(Value::DataFrame(new_df)),
2069 Err(e) => Err(dsq_shared::error::operation_error(format!(
2070 "map_values() failed on DataFrame: {}",
2071 e
2072 ))),
2073 }
2074 }
2075 Value::Series(series) => {
2076 let mut values = Vec::new();
2077 for i in 0..series.len() {
2078 if let Ok(val) = series.get(i) {
2079 let value = value_from_any_value(val).unwrap_or(Value::Null);
2080 let mut transformed = value;
2081 for op in &self.arg_ops[0] {
2082 transformed = op.apply_with_context(&transformed, context)?;
2083 }
2084 values.push(transformed);
2085 }
2086 }
2087 let new_series = values_to_series("transformed", &values)?;
2088 Ok(Value::Series(new_series))
2089 }
2090 _ => Err(dsq_shared::error::operation_error(
2091 "map_values() requires an object, array, DataFrame, or Series",
2092 )),
2093 }
2094 }
2095 "select" => {
2096 if self.arg_ops.len() != 1 {
2097 return Err(dsq_shared::error::operation_error(
2098 "select() expects 1 argument",
2099 ));
2100 }
2101 match &value {
2102 Value::Array(arr) => {
2103 let mut filtered = Vec::new();
2104 for item in arr {
2105 let mut condition_val = item.clone();
2106 for op in &self.arg_ops[0] {
2107 condition_val = op.apply_with_context(&condition_val, context)?;
2108 }
2109 if is_truthy(&condition_val) {
2110 filtered.push(item.clone());
2111 }
2112 }
2113 Ok(Value::Array(filtered))
2114 }
2115 Value::DataFrame(df) => {
2116 let mut mask_values = Vec::new();
2118 for i in 0..df.height() {
2119 let mut row_obj = HashMap::new();
2121 for col_name in df.get_column_names() {
2122 let series = df.column(col_name).map_err(|e| {
2123 dsq_shared::error::operation_error(format!(
2124 "Failed to get column: {}",
2125 e
2126 ))
2127 })?;
2128 let any_val = series.get(i).map_err(|e| {
2129 dsq_shared::error::operation_error(format!(
2130 "Failed to get value: {}",
2131 e
2132 ))
2133 })?;
2134 let val = value_from_any_value(any_val).unwrap_or(Value::Null);
2135 row_obj.insert(col_name.to_string(), val);
2136 }
2137 let row_value = Value::Object(row_obj);
2138
2139 let mut condition_val = row_value;
2140 for op in &self.arg_ops[0] {
2141 condition_val = op.apply_with_context(&condition_val, context)?;
2142 }
2143 mask_values.push(is_truthy(&condition_val));
2144 }
2145
2146 let mask_series = Series::new("mask".into(), mask_values);
2147 let boolean_chunked = mask_series.bool().map_err(|e| {
2148 dsq_shared::error::operation_error(format!(
2149 "Failed to create boolean mask: {}",
2150 e
2151 ))
2152 })?;
2153 let filtered_df = df.filter(boolean_chunked).map_err(|e| {
2154 dsq_shared::error::operation_error(format!(
2155 "Failed to filter DataFrame: {}",
2156 e
2157 ))
2158 })?;
2159 Ok(Value::DataFrame(filtered_df))
2160 }
2161 _ => {
2162 let mut condition_val = value.clone();
2163 for op in &self.arg_ops[0] {
2164 condition_val = op.apply_with_context(&condition_val, context)?;
2165 }
2166 if is_truthy(&condition_val) {
2167 Ok(value.clone())
2168 } else {
2169 Ok(Value::Null)
2170 }
2171 }
2172 }
2173 }
2174 "ceil" => {
2175 if self.arg_ops.is_empty() {
2176 self.builtins
2177 .call_function("ceil", std::slice::from_ref(value))
2178 } else if self.arg_ops.len() == 1 {
2179 let mut arg_val = value.clone();
2180 for op in &self.arg_ops[0] {
2181 arg_val = op.apply_with_context(&arg_val, context)?;
2182 }
2183 self.builtins.call_function("ceil", &[arg_val])
2184 } else {
2185 Err(dsq_shared::error::operation_error(
2186 "ceil() expects 0 or 1 argument",
2187 ))
2188 }
2189 }
2190
2191 "today" | "now" => {
2192 if !self.arg_ops.is_empty() {
2194 return Err(dsq_shared::error::operation_error(format!(
2195 "{}() expects no arguments",
2196 self.name
2197 )));
2198 }
2199 self.builtins.call_function(&self.name, &[])
2200 }
2201 _ => {
2202 if self.name == "iferror" {
2204 if self.arg_ops.len() != 2 {
2205 return Err(dsq_shared::error::operation_error(
2206 "iferror() expects 2 arguments",
2207 ));
2208 }
2209
2210 let first_result: Result<Value> = (|| {
2212 let mut arg_val = value.clone();
2213 for op in &self.arg_ops[0] {
2214 arg_val = op.apply(&arg_val)?;
2215 }
2216 Ok(arg_val)
2217 })();
2218
2219 match first_result {
2220 Ok(val) => Ok(val),
2221 Err(_) => {
2222 let mut arg_val = value.clone();
2224 for op in &self.arg_ops[1] {
2225 arg_val = op.apply(&arg_val)?;
2226 }
2227 Ok(arg_val)
2228 }
2229 }
2230 } else if self.name == "filter" {
2231 if self.arg_ops.len() != 1 {
2233 return Err(dsq_shared::error::operation_error(
2234 "filter() expects 1 argument",
2235 ));
2236 }
2237 let arg_ops = &self.arg_ops[0];
2238 match value {
2239 Value::Array(arr) => {
2240 let mut result = Vec::new();
2241 for item in arr {
2242 let mut condition = item.clone();
2243 for op in arg_ops {
2244 condition = op.apply(&condition)?;
2245 }
2246 if is_truthy(&condition) {
2247 result.push(item.clone());
2248 }
2249 }
2250 Ok(Value::Array(result))
2251 }
2252 Value::DataFrame(df) => {
2253 let mut mask = Vec::new();
2255 for i in 0..df.height() {
2256 let mut row_obj = std::collections::HashMap::new();
2258 for col_name in df.get_column_names() {
2259 if let Ok(series) = df.column(col_name) {
2260 if let Ok(val) = series.get(i) {
2261 let value =
2262 value_from_any_value(val).unwrap_or(Value::Null);
2263 row_obj.insert(col_name.to_string(), value);
2264 }
2265 }
2266 }
2267 let row_value = Value::Object(row_obj);
2268 let mut condition = row_value;
2269 for op in arg_ops {
2270 condition = op.apply(&condition)?;
2271 }
2272 mask.push(is_truthy(&condition));
2273 }
2274 let mask_chunked =
2275 polars::prelude::BooleanChunked::from_slice("".into(), &mask);
2276 match df.filter(&mask_chunked) {
2277 Ok(filtered_df) => Ok(Value::DataFrame(filtered_df)),
2278 Err(e) => Err(dsq_shared::error::operation_error(format!(
2279 "filter() failed to filter DataFrame: {}",
2280 e
2281 ))),
2282 }
2283 }
2284 _ => {
2285 let mut condition = value.clone();
2287 for op in arg_ops {
2288 condition = op.apply(&condition)?;
2289 }
2290 if is_truthy(&condition) {
2291 Ok(value.clone())
2292 } else {
2293 Ok(Value::Null)
2294 }
2295 }
2296 }
2297 } else if self.builtins.has_function(&self.name) {
2298 let mut arg_values = Vec::new();
2300 if self.arg_ops.is_empty() {
2301 arg_values.push(value.clone());
2303 } else {
2304 for arg_ops in &self.arg_ops {
2306 let mut arg_val = value.clone();
2307 for op in arg_ops {
2308 arg_val = op.apply(&arg_val)?;
2309 }
2310 arg_values.push(arg_val);
2311 }
2312 }
2313
2314 self.builtins.call_function(&self.name, &arg_values)
2316 } else {
2317 Err(dsq_shared::error::operation_error(format!(
2318 "Unknown function: {}",
2319 self.name
2320 )))
2321 }
2322 }
2323 }
2324 }
2325
2326 fn description(&self) -> String {
2327 format!("function call: {}", self.name)
2328 }
2329
2330 fn as_any(&self) -> &dyn std::any::Any {
2331 self
2332 }
2333}
2334
2335pub struct VariableOperation {
2336 pub name: String,
2337}
2338
2339impl VariableOperation {
2340 pub fn new(name: String) -> Self {
2341 Self { name }
2342 }
2343}
2344
2345impl Operation for VariableOperation {
2346 fn apply(&self, _value: &Value) -> Result<Value> {
2347 Err(dsq_shared::error::operation_error(format!(
2349 "Variable '{}' requires context",
2350 self.name
2351 )))
2352 }
2353
2354 fn apply_with_context(
2355 &self,
2356 value: &Value,
2357 context: &mut Option<&mut dyn dsq_shared::ops::Context>,
2358 ) -> Result<Value> {
2359 if let Some(ctx) = context {
2360 if let Some(var_value) = ctx.get_variable(&self.name) {
2361 Ok(var_value.clone())
2362 } else {
2363 if let Some(filter_ctx) = ctx.as_any_mut().downcast_mut::<FilterContext>() {
2365 if filter_ctx.has_function(&self.name) {
2366 filter_ctx.call_function(&self.name, std::slice::from_ref(value))
2368 } else {
2369 Err(dsq_shared::error::operation_error(format!(
2370 "Variable '{}' not found",
2371 self.name
2372 )))
2373 }
2374 } else {
2375 Err(dsq_shared::error::operation_error(format!(
2376 "Variable '{}' not found",
2377 self.name
2378 )))
2379 }
2380 }
2381 } else {
2382 Err(dsq_shared::error::operation_error(format!(
2383 "Variable '{}' requires context",
2384 self.name
2385 )))
2386 }
2387 }
2388
2389 fn description(&self) -> String {
2390 format!("variable: {}", self.name)
2391 }
2392
2393 fn as_any(&self) -> &dyn std::any::Any {
2394 self
2395 }
2396}
2397
2398pub struct AssignAddOperation {
2399 pub target_ops: Vec<Box<dyn Operation + Send + Sync>>,
2400 pub value_ops: Vec<Box<dyn Operation + Send + Sync>>,
2401}
2402
2403impl AssignAddOperation {
2404 pub fn new(
2405 target_ops: Vec<Box<dyn Operation + Send + Sync>>,
2406 value_ops: Vec<Box<dyn Operation + Send + Sync>>,
2407 ) -> Self {
2408 Self {
2409 target_ops,
2410 value_ops,
2411 }
2412 }
2413}
2414
2415impl Operation for AssignAddOperation {
2416 fn apply(&self, value: &Value) -> Result<Value> {
2417 let mut target_val = value.clone();
2420 for op in &self.target_ops {
2421 target_val = op.apply(&target_val)?;
2422 }
2423
2424 let mut add_val = value.clone();
2425 for op in &self.value_ops {
2426 add_val = op.apply(&add_val)?;
2427 }
2428
2429 add_values(&target_val, &add_val)
2430 }
2431
2432 fn description(&self) -> String {
2433 "assign add".to_string()
2434 }
2435
2436 fn as_any(&self) -> &dyn std::any::Any {
2437 self
2438 }
2439}
2440
2441pub struct AssignUpdateOperation {
2442 pub target_ops: Vec<Box<dyn Operation + Send + Sync>>,
2443 pub value_ops: Vec<Box<dyn Operation + Send + Sync>>,
2444}
2445
2446impl AssignUpdateOperation {
2447 pub fn new(
2448 target_ops: Vec<Box<dyn Operation + Send + Sync>>,
2449 value_ops: Vec<Box<dyn Operation + Send + Sync>>,
2450 ) -> Self {
2451 Self {
2452 target_ops,
2453 value_ops,
2454 }
2455 }
2456}
2457
2458impl Operation for AssignUpdateOperation {
2459 fn apply(&self, value: &Value) -> Result<Value> {
2460 if let Value::Object(ref obj) = value {
2462 if self.target_ops.len() == 1 {
2464 if let Some(field_op) = self.target_ops.first() {
2465 if let Some(field_access) =
2467 field_op.as_any().downcast_ref::<FieldAccessOperation>()
2468 {
2469 let field_name = &field_access.fields[0];
2470
2471 let mut value_val = value.clone();
2473 for op in &self.value_ops {
2474 value_val = op.apply(&value_val)?;
2475 }
2476
2477 let mut new_obj = obj.clone();
2479 new_obj.insert(field_name.clone(), value_val);
2480 return Ok(Value::Object(new_obj));
2481 }
2482 }
2483 }
2484 }
2485
2486 let mut value_val = value.clone();
2488 for op in &self.value_ops {
2489 value_val = op.apply(&value_val)?;
2490 }
2491
2492 Ok(value_val)
2493 }
2494
2495 fn description(&self) -> String {
2496 "assign update".to_string()
2497 }
2498
2499 fn as_any(&self) -> &dyn std::any::Any {
2500 self
2501 }
2502}
2503
2504pub struct SliceOperation {
2505 pub start_ops: Option<Vec<Box<dyn Operation + Send + Sync>>>,
2506 pub end_ops: Option<Vec<Box<dyn Operation + Send + Sync>>>,
2507}
2508
2509impl SliceOperation {
2510 pub fn new(
2511 start_ops: Option<Vec<Box<dyn Operation + Send + Sync>>>,
2512 end_ops: Option<Vec<Box<dyn Operation + Send + Sync>>>,
2513 ) -> Self {
2514 Self { start_ops, end_ops }
2515 }
2516}
2517
2518impl Operation for SliceOperation {
2519 fn apply(&self, value: &Value) -> Result<Value> {
2520 let start = if let Some(ref ops) = self.start_ops {
2521 let mut start_val = value.clone();
2522 for op in ops {
2523 start_val = op.apply(&start_val)?;
2524 }
2525 match start_val {
2526 Value::Int(i) => Some(i as usize),
2527 _ => {
2528 return Err(dsq_shared::error::operation_error(
2529 "Slice start must be an integer",
2530 ));
2531 }
2532 }
2533 } else {
2534 None
2535 };
2536
2537 let end = if let Some(ref ops) = self.end_ops {
2538 let mut end_val = value.clone();
2539 for op in ops {
2540 end_val = op.apply(&end_val)?;
2541 }
2542 match end_val {
2543 Value::Int(i) => Some(i as usize),
2544 _ => {
2545 return Err(dsq_shared::error::operation_error(
2546 "Slice end must be an integer",
2547 ));
2548 }
2549 }
2550 } else {
2551 None
2552 };
2553
2554 match value {
2555 Value::Array(arr) => {
2556 let start_idx = start.unwrap_or(0);
2557 let end_idx = end.unwrap_or(arr.len());
2558 Ok(Value::Array(arr[start_idx..end_idx].to_vec()))
2559 }
2560 Value::DataFrame(df) => {
2561 let start_idx = start.unwrap_or(0) as i64;
2562 let end_idx = end.unwrap_or(df.height());
2563 let length = (end_idx as i64 - start_idx) as usize;
2564 Ok(Value::DataFrame(df.slice(start_idx, length)))
2565 }
2566 _ => Err(dsq_shared::error::operation_error(
2567 "Slice operation requires array or DataFrame",
2568 )),
2569 }
2570 }
2571
2572 fn description(&self) -> String {
2573 "array slice".to_string()
2574 }
2575
2576 fn as_any(&self) -> &dyn std::any::Any {
2577 self
2578 }
2579}
2580
2581pub struct AssignFieldAddOperation {
2582 pub field: String,
2583 pub value_ops: Vec<Box<dyn Operation + Send + Sync>>,
2584}
2585
2586impl AssignFieldAddOperation {
2587 pub fn new(field: String, value_ops: Vec<Box<dyn Operation + Send + Sync>>) -> Self {
2588 Self { field, value_ops }
2589 }
2590}
2591
2592impl Operation for AssignFieldAddOperation {
2593 fn apply(&self, value: &Value) -> Result<Value> {
2594 let mut add_val = value.clone();
2595 for op in &self.value_ops {
2596 add_val = op.apply(&add_val)?;
2597 }
2598
2599 match value {
2600 Value::Object(obj) => {
2601 let mut new_obj = obj.clone();
2602 let current_val = obj.get(&self.field).cloned().unwrap_or(Value::Null);
2603 let new_val = add_values(¤t_val, &add_val)?;
2604 new_obj.insert(self.field.clone(), new_val);
2605 Ok(Value::Object(new_obj))
2606 }
2607 _ => Err(dsq_shared::error::operation_error(
2608 "Assignment requires an object",
2609 )),
2610 }
2611 }
2612
2613 fn description(&self) -> String {
2614 format!("assign add to field {}", self.field)
2615 }
2616
2617 fn as_any(&self) -> &dyn std::any::Any {
2618 self
2619 }
2620}
2621
2622fn add_values(a: &Value, b: &Value) -> Result<Value> {
2624 match (a, b) {
2625 (Value::Int(a), Value::Int(b)) => Ok(Value::Int(a + b)),
2626 (Value::Float(a), Value::Float(b)) => Ok(Value::Float(a + b)),
2627 (Value::Int(a), Value::Float(b)) => Ok(Value::Float(*a as f64 + b)),
2628 (Value::Float(a), Value::Int(b)) => Ok(Value::Float(a + *b as f64)),
2629 (Value::String(a), Value::String(b)) => Ok(Value::String(format!("{}{}", a, b))),
2630 _ => Err(dsq_shared::error::operation_error("Cannot add these types")),
2631 }
2632}
2633
2634#[cfg(test)]
2635mod tests {
2636 use super::*;
2637 use dsq_shared::value::Value;
2638 use std::collections::HashMap;
2639
2640 #[test]
2641 fn test_filter_context_new() {
2642 let ctx = FilterContext::new();
2643
2644 assert!(ctx.variables.is_empty());
2646 assert!(ctx.functions.is_empty());
2647 assert!(ctx.call_stack.is_empty());
2648 assert_eq!(ctx.max_recursion_depth, 1000);
2649 assert!(!ctx.debug_mode);
2650 assert!(ctx.current_input.is_none());
2651 assert_eq!(ctx.error_mode, ErrorMode::Strict);
2652 }
2653
2654 #[test]
2655 fn test_filter_context_variable_operations() {
2656 let mut ctx = FilterContext::new();
2657
2658 let value = Value::Int(42);
2660 ctx.set_variable("test_var", value.clone());
2661 assert!(ctx.has_variable("test_var"));
2662 assert_eq!(ctx.get_variable("test_var"), Some(&value));
2663
2664 assert!(!ctx.has_variable("nonexistent"));
2666 assert_eq!(ctx.get_variable("nonexistent"), None);
2667
2668 let new_value = Value::String("hello".to_string());
2670 ctx.set_variable("test_var", new_value.clone());
2671 assert_eq!(ctx.get_variable("test_var"), Some(&new_value));
2672 }
2673
2674 #[test]
2675 fn test_filter_context_function_operations() {
2676 let mut ctx = FilterContext::new();
2677
2678 assert!(ctx.has_function("length"));
2680 assert!(ctx.has_function("add"));
2681 assert!(!ctx.has_function("nonexistent_function"));
2682
2683 let func_def = FunctionDef {
2685 name: "test_func".to_string(),
2686 parameters: vec!["x".to_string()],
2687 body: FunctionBody::Ast("x * 2".to_string()),
2688 is_recursive: false,
2689 };
2690 ctx.set_functions(HashMap::from([("test_func".to_string(), func_def)]));
2691 assert!(ctx.has_function("test_func"));
2692 }
2693
2694 #[test]
2695 fn test_filter_context_call_function_builtin() {
2696 let mut ctx = FilterContext::new();
2697
2698 let result = ctx.call_function(
2700 "length",
2701 &[Value::Array(vec![Value::Int(1), Value::Int(2)])],
2702 );
2703 assert!(result.is_ok());
2704 assert_eq!(result.unwrap(), Value::Int(2));
2705
2706 let result = ctx.call_function("length", &[Value::Int(1), Value::Int(2)]);
2708 assert!(result.is_err());
2709 }
2710
2711 #[test]
2712 fn test_filter_context_call_user_function() {
2713 let mut ctx = FilterContext::new();
2714
2715 let func_def = FunctionDef {
2718 name: "identity".to_string(),
2719 parameters: vec!["x".to_string()],
2720 body: FunctionBody::Ast(".".to_string()), is_recursive: false,
2722 };
2723
2724 ctx.set_functions(HashMap::from([("identity".to_string(), func_def)]));
2725
2726 let result = ctx.call_function("identity", &[Value::String("test".to_string())]);
2729 assert!(result.is_err()); assert!(result
2731 .unwrap_err()
2732 .to_string()
2733 .contains("AST execution not yet implemented"));
2734 }
2735
2736 #[test]
2737 fn test_filter_context_call_function_wrong_args() {
2738 let mut ctx = FilterContext::new();
2739
2740 let func_def = FunctionDef {
2741 name: "test_func".to_string(),
2742 parameters: vec!["a".to_string(), "b".to_string()],
2743 body: FunctionBody::Ast("a + b".to_string()),
2744 is_recursive: false,
2745 };
2746
2747 ctx.set_functions(HashMap::from([("test_func".to_string(), func_def)]));
2748
2749 let result = ctx.call_function("test_func", &[Value::Int(1)]);
2751 assert!(result.is_err());
2752 assert!(result
2753 .unwrap_err()
2754 .to_string()
2755 .contains("Expected 2 arguments"));
2756 }
2757
2758 #[test]
2759 fn test_filter_context_input_operations() {
2760 let mut ctx = FilterContext::new();
2761
2762 assert_eq!(ctx.get_input(), None);
2764
2765 let value = Value::Object(HashMap::from([("key".to_string(), Value::Int(123))]));
2767 ctx.set_input(value.clone());
2768 assert_eq!(ctx.get_input(), Some(&value));
2769 }
2770
2771 #[test]
2772 fn test_filter_context_recursion_depth() {
2773 let mut ctx = FilterContext::new();
2774
2775 assert_eq!(ctx.recursion_depth(), 0);
2777
2778 ctx.call_stack.push(StackFrame {
2780 name: "test1".to_string(),
2781 input: Value::Null,
2782 location: None,
2783 });
2784 assert_eq!(ctx.recursion_depth(), 1);
2785
2786 ctx.call_stack.push(StackFrame {
2787 name: "test2".to_string(),
2788 input: Value::Null,
2789 location: None,
2790 });
2791 assert_eq!(ctx.recursion_depth(), 2);
2792
2793 ctx.call_stack.pop();
2795 assert_eq!(ctx.recursion_depth(), 1);
2796 }
2797
2798 #[test]
2799 fn test_filter_context_debug_mode() {
2800 let mut ctx = FilterContext::new();
2801
2802 assert!(!ctx.is_debug_mode());
2804
2805 ctx.set_debug_mode(true);
2807 assert!(ctx.is_debug_mode());
2808
2809 ctx.set_debug_mode(false);
2810 assert!(!ctx.is_debug_mode());
2811 }
2812
2813 #[test]
2814 fn test_filter_context_error_mode() {
2815 let mut ctx = FilterContext::new();
2816
2817 assert_eq!(ctx.error_mode(), ErrorMode::Strict);
2819
2820 ctx.set_error_mode(ErrorMode::Collect);
2822 assert_eq!(ctx.error_mode(), ErrorMode::Collect);
2823
2824 ctx.set_error_mode(ErrorMode::Ignore);
2825 assert_eq!(ctx.error_mode(), ErrorMode::Ignore);
2826 }
2827
2828 #[test]
2829 fn test_filter_context_as_context_trait() {
2830 let mut ctx = FilterContext::new();
2831
2832 let value = Value::Float(std::f64::consts::PI);
2834 ctx.set_variable("pi", value.clone());
2835
2836 assert_eq!(ctx.get_variable("pi"), Some(&value));
2838 assert_eq!(ctx.get_variable("nonexistent"), None);
2839
2840 let new_value = Value::String("hello".to_string());
2842 ctx.set_variable("greeting", new_value.clone());
2843 assert_eq!(ctx.get_variable("greeting"), Some(&new_value));
2844 }
2845
2846 #[test]
2847 fn test_filter_context_max_recursion_prevention() {
2848 let mut ctx = FilterContext::new();
2849 ctx.max_recursion_depth = 2; let func_def = FunctionDef {
2853 name: "recursive_func".to_string(),
2854 parameters: vec![],
2855 body: FunctionBody::Ast("recursive_func".to_string()), is_recursive: true,
2857 };
2858
2859 ctx.set_functions(HashMap::from([("recursive_func".to_string(), func_def)]));
2860
2861 let result = ctx.call_function("recursive_func", &[]);
2864 assert!(result.is_err());
2865 assert!(result
2866 .unwrap_err()
2867 .to_string()
2868 .contains("AST execution not yet implemented"));
2869 }
2870
2871 #[test]
2872 fn test_filter_context_builtin_function_call() {
2873 let mut ctx = FilterContext::new();
2874
2875 let result = ctx.call_function("add", &[Value::Int(1), Value::Int(2), Value::Int(3)]);
2877 assert!(result.is_ok() || result.is_err()); }
2881
2882 #[test]
2883 fn test_filter_context_unknown_function() {
2884 let mut ctx = FilterContext::new();
2885
2886 let result = ctx.call_function("unknown_function", &[]);
2888 assert!(result.is_err());
2889 assert!(result
2890 .unwrap_err()
2891 .to_string()
2892 .contains("function 'unknown_function'"));
2893 }
2894
2895 #[test]
2896 fn test_compilation_context_new() {
2897 let ctx = CompilationContext::new();
2898
2899 assert_eq!(ctx.depth, 0);
2900 assert_eq!(ctx.max_depth, 1000);
2901 assert!(ctx.variables.is_empty());
2902 assert!(ctx.functions.is_empty());
2903 }
2904
2905 #[test]
2906 fn test_compilation_context_with_max_depth() {
2907 let ctx = CompilationContext::with_max_depth(500);
2908
2909 assert_eq!(ctx.depth, 0);
2910 assert_eq!(ctx.max_depth, 500);
2911 assert!(ctx.variables.is_empty());
2912 assert!(ctx.functions.is_empty());
2913 }
2914
2915 #[test]
2916 fn test_error_mode_variants() {
2917 assert_eq!(ErrorMode::Strict as u8, 0);
2919 assert_eq!(ErrorMode::Collect as u8, 1);
2920 assert_eq!(ErrorMode::Ignore as u8, 2);
2921 }
2922
2923 #[test]
2924 fn test_function_def_creation() {
2925 let func_def = FunctionDef {
2926 name: "test".to_string(),
2927 parameters: vec!["a".to_string(), "b".to_string()],
2928 body: FunctionBody::Ast("a + b".to_string()),
2929 is_recursive: false,
2930 };
2931
2932 assert_eq!(func_def.name, "test");
2933 assert_eq!(func_def.parameters.len(), 2);
2934 assert!(!func_def.is_recursive);
2935 }
2936
2937 #[test]
2938 fn test_function_body_clone() {
2939 let body = FunctionBody::Ast("test".to_string());
2941 let cloned = body.clone();
2942
2943 match cloned {
2944 FunctionBody::Ast(s) => assert_eq!(s, "test"),
2945 _ => panic!("Expected Ast variant"),
2946 }
2947 }
2948
2949 #[test]
2950 fn test_stack_frame_creation() {
2951 let frame = StackFrame {
2952 name: "test_func".to_string(),
2953 input: Value::Int(42),
2954 location: Some(Location {
2955 line: 10,
2956 column: 5,
2957 source: Some("test.rs".to_string()),
2958 }),
2959 };
2960
2961 assert_eq!(frame.name, "test_func");
2962 assert_eq!(frame.input, Value::Int(42));
2963 assert!(frame.location.is_some());
2964 }
2965
2966 #[test]
2967 fn test_location_creation() {
2968 let loc = Location {
2969 line: 1,
2970 column: 1,
2971 source: None,
2972 };
2973
2974 assert_eq!(loc.line, 1);
2975 assert_eq!(loc.column, 1);
2976 assert!(loc.source.is_none());
2977 }
2978}