Skip to main content

sql_cli/data/
arithmetic_evaluator.rs

1use crate::config::global::get_date_notation;
2use crate::data::data_view::DataView;
3use crate::data::datatable::{DataTable, DataValue};
4use crate::data::value_comparisons::compare_with_op;
5use crate::sql::aggregate_functions::AggregateFunctionRegistry; // New registry
6use crate::sql::aggregates::AggregateRegistry; // Old registry (for migration)
7use crate::sql::functions::FunctionRegistry;
8use crate::sql::parser::ast::{ColumnRef, WindowSpec};
9use crate::sql::recursive_parser::SqlExpression;
10use crate::sql::window_context::WindowContext;
11use crate::sql::window_functions::{ExpressionEvaluator, WindowFunctionRegistry};
12use anyhow::{anyhow, Result};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::{debug, info};
17
18/// Evaluates SQL expressions to compute `DataValues` (for SELECT clauses)
19/// This is different from `RecursiveWhereEvaluator` which returns boolean
20pub struct ArithmeticEvaluator<'a> {
21    table: &'a DataTable,
22    _date_notation: String,
23    function_registry: Arc<FunctionRegistry>,
24    aggregate_registry: Arc<AggregateRegistry>, // Old registry (being phased out)
25    new_aggregate_registry: Arc<AggregateFunctionRegistry>, // New registry
26    window_function_registry: Arc<WindowFunctionRegistry>,
27    visible_rows: Option<Vec<usize>>, // For aggregate functions on filtered views
28    window_contexts: HashMap<u64, Arc<WindowContext>>, // Cache window contexts by hash
29    table_aliases: HashMap<String, String>, // Map alias -> table name for qualified columns
30}
31
32impl<'a> ArithmeticEvaluator<'a> {
33    #[must_use]
34    pub fn new(table: &'a DataTable) -> Self {
35        Self {
36            table,
37            _date_notation: get_date_notation(),
38            function_registry: Arc::new(FunctionRegistry::new()),
39            aggregate_registry: Arc::new(AggregateRegistry::new()),
40            new_aggregate_registry: Arc::new(AggregateFunctionRegistry::new()),
41            window_function_registry: Arc::new(WindowFunctionRegistry::new()),
42            visible_rows: None,
43            window_contexts: HashMap::new(),
44            table_aliases: HashMap::new(),
45        }
46    }
47
48    #[must_use]
49    pub fn with_date_notation(table: &'a DataTable, date_notation: String) -> Self {
50        Self {
51            table,
52            _date_notation: date_notation,
53            function_registry: Arc::new(FunctionRegistry::new()),
54            aggregate_registry: Arc::new(AggregateRegistry::new()),
55            new_aggregate_registry: Arc::new(AggregateFunctionRegistry::new()),
56            window_function_registry: Arc::new(WindowFunctionRegistry::new()),
57            visible_rows: None,
58            window_contexts: HashMap::new(),
59            table_aliases: HashMap::new(),
60        }
61    }
62
63    /// Set visible rows for aggregate functions (for filtered views)
64    #[must_use]
65    pub fn with_visible_rows(mut self, rows: Vec<usize>) -> Self {
66        self.visible_rows = Some(rows);
67        self
68    }
69
70    /// Set table aliases for qualified column resolution
71    #[must_use]
72    pub fn with_table_aliases(mut self, aliases: HashMap<String, String>) -> Self {
73        self.table_aliases = aliases;
74        self
75    }
76
77    #[must_use]
78    pub fn with_date_notation_and_registry(
79        table: &'a DataTable,
80        date_notation: String,
81        function_registry: Arc<FunctionRegistry>,
82    ) -> Self {
83        Self {
84            table,
85            _date_notation: date_notation,
86            function_registry,
87            aggregate_registry: Arc::new(AggregateRegistry::new()),
88            new_aggregate_registry: Arc::new(AggregateFunctionRegistry::new()),
89            window_function_registry: Arc::new(WindowFunctionRegistry::new()),
90            visible_rows: None,
91            window_contexts: HashMap::new(),
92            table_aliases: HashMap::new(),
93        }
94    }
95
96    /// Find a column name similar to the given name using edit distance
97    fn find_similar_column(&self, name: &str) -> Option<String> {
98        let columns = self.table.column_names();
99        let mut best_match: Option<(String, usize)> = None;
100
101        for col in columns {
102            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
103            // Only suggest if distance is small (likely a typo)
104            // Allow up to 3 edits for longer names
105            let max_distance = if name.len() > 10 { 3 } else { 2 };
106            if distance <= max_distance {
107                match &best_match {
108                    None => best_match = Some((col, distance)),
109                    Some((_, best_dist)) if distance < *best_dist => {
110                        best_match = Some((col, distance));
111                    }
112                    _ => {}
113                }
114            }
115        }
116
117        best_match.map(|(name, _)| name)
118    }
119
120    /// Calculate Levenshtein edit distance between two strings
121    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
122        // Use the shared implementation from string_methods
123        crate::sql::functions::string_methods::EditDistanceFunction::calculate_edit_distance(s1, s2)
124    }
125
126    /// Evaluate an SQL expression to produce a `DataValue`
127    pub fn evaluate(&mut self, expr: &SqlExpression, row_index: usize) -> Result<DataValue> {
128        debug!(
129            "ArithmeticEvaluator: evaluating {:?} for row {}",
130            expr, row_index
131        );
132
133        match expr {
134            SqlExpression::Column(column_ref) => self.evaluate_column_ref(column_ref, row_index),
135            SqlExpression::StringLiteral(s) => Ok(DataValue::String(s.clone())),
136            SqlExpression::BooleanLiteral(b) => Ok(DataValue::Boolean(*b)),
137            SqlExpression::NumberLiteral(n) => self.evaluate_number_literal(n),
138            SqlExpression::Null => Ok(DataValue::Null),
139            SqlExpression::BinaryOp { left, op, right } => {
140                self.evaluate_binary_op(left, op, right, row_index)
141            }
142            SqlExpression::FunctionCall {
143                name,
144                args,
145                distinct,
146            } => self.evaluate_function_with_distinct(name, args, *distinct, row_index),
147            SqlExpression::WindowFunction {
148                name,
149                args,
150                window_spec,
151            } => self.evaluate_window_function(name, args, window_spec, row_index),
152            SqlExpression::MethodCall {
153                object,
154                method,
155                args,
156            } => self.evaluate_method_call(object, method, args, row_index),
157            SqlExpression::ChainedMethodCall { base, method, args } => {
158                // Evaluate the base expression first, then apply the method
159                let base_value = self.evaluate(base, row_index)?;
160                self.evaluate_method_on_value(&base_value, method, args, row_index)
161            }
162            SqlExpression::Between { expr, lower, upper } => {
163                let val = self.evaluate(expr, row_index)?;
164                let lo = self.evaluate(lower, row_index)?;
165                let hi = self.evaluate(upper, row_index)?;
166                let ge = compare_with_op(&val, &lo, ">=", false);
167                let le = compare_with_op(&val, &hi, "<=", false);
168                Ok(DataValue::Boolean(ge && le))
169            }
170            // IN / NOT IN as a value-producing expression — needed when they
171            // appear inside CASE branches or other arithmetic contexts. The
172            // WHERE path has its own evaluator; this mirrors its equality
173            // semantics via compare_with_op. After subquery rewriting, an
174            // `x IN (SELECT ...)` arrives here as an InList of literals.
175            SqlExpression::InList { expr, values } => {
176                let val = self.evaluate(expr, row_index)?;
177                for v in values {
178                    let item = self.evaluate(v, row_index)?;
179                    if compare_with_op(&val, &item, "=", false) {
180                        return Ok(DataValue::Boolean(true));
181                    }
182                }
183                Ok(DataValue::Boolean(false))
184            }
185            SqlExpression::NotInList { expr, values } => {
186                let val = self.evaluate(expr, row_index)?;
187                for v in values {
188                    let item = self.evaluate(v, row_index)?;
189                    if compare_with_op(&val, &item, "=", false) {
190                        return Ok(DataValue::Boolean(false));
191                    }
192                }
193                Ok(DataValue::Boolean(true))
194            }
195            SqlExpression::CaseExpression {
196                when_branches,
197                else_branch,
198            } => self.evaluate_case_expression(when_branches, else_branch, row_index),
199            SqlExpression::SimpleCaseExpression {
200                expr,
201                when_branches,
202                else_branch,
203            } => self.evaluate_simple_case_expression(expr, when_branches, else_branch, row_index),
204            SqlExpression::DateTimeConstructor {
205                year,
206                month,
207                day,
208                hour,
209                minute,
210                second,
211            } => self.evaluate_datetime_constructor(*year, *month, *day, *hour, *minute, *second),
212            SqlExpression::DateTimeToday {
213                hour,
214                minute,
215                second,
216            } => self.evaluate_datetime_today(*hour, *minute, *second),
217            _ => Err(anyhow!(
218                "Unsupported expression type for arithmetic evaluation: {:?}",
219                expr
220            )),
221        }
222    }
223
224    /// Evaluate a column reference with proper table scoping
225    fn evaluate_column_ref(&self, column_ref: &ColumnRef, row_index: usize) -> Result<DataValue> {
226        if let Some(table_prefix) = &column_ref.table_prefix {
227            // Resolve alias if it exists in table_aliases map
228            let actual_table = self
229                .table_aliases
230                .get(table_prefix)
231                .map(|s| s.as_str())
232                .unwrap_or(table_prefix);
233
234            // Try qualified lookup with resolved table name
235            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
236
237            if let Some(col_idx) = self.table.find_column_by_qualified_name(&qualified_name) {
238                debug!(
239                    "Resolved {}.{} -> '{}' at index {}",
240                    table_prefix, column_ref.name, qualified_name, col_idx
241                );
242                return self
243                    .table
244                    .get_value(row_index, col_idx)
245                    .ok_or_else(|| anyhow!("Row {} out of bounds", row_index))
246                    .map(|v| v.clone());
247            }
248
249            // Fallback: try unqualified lookup
250            if let Some(col_idx) = self.table.get_column_index(&column_ref.name) {
251                debug!(
252                    "Resolved {}.{} -> unqualified '{}' at index {}",
253                    table_prefix, column_ref.name, column_ref.name, col_idx
254                );
255                return self
256                    .table
257                    .get_value(row_index, col_idx)
258                    .ok_or_else(|| anyhow!("Row {} out of bounds", row_index))
259                    .map(|v| v.clone());
260            }
261
262            // If not found, return error
263            Err(anyhow!(
264                "Column '{}' not found. Table '{}' may not support qualified column names",
265                qualified_name,
266                actual_table
267            ))
268        } else {
269            // Simple column name lookup
270            self.evaluate_column(&column_ref.name, row_index)
271        }
272    }
273
274    /// Evaluate a column reference
275    fn evaluate_column(&self, column_name: &str, row_index: usize) -> Result<DataValue> {
276        // First try to resolve qualified column names (table.column or alias.column)
277        let resolved_column = if column_name.contains('.') {
278            // Split on last dot to handle cases like "schema.table.column"
279            if let Some(dot_pos) = column_name.rfind('.') {
280                let _table_or_alias = &column_name[..dot_pos];
281                let col_name = &column_name[dot_pos + 1..];
282
283                // For now, just use the column name part
284                // In the future, we could validate the table/alias part
285                debug!(
286                    "Resolving qualified column: {} -> {}",
287                    column_name, col_name
288                );
289                col_name.to_string()
290            } else {
291                column_name.to_string()
292            }
293        } else {
294            column_name.to_string()
295        };
296
297        let col_index = if let Some(idx) = self.table.get_column_index(&resolved_column) {
298            idx
299        } else if resolved_column != column_name {
300            // If not found, try the original name
301            if let Some(idx) = self.table.get_column_index(column_name) {
302                idx
303            } else {
304                let suggestion = self.find_similar_column(&resolved_column);
305                return Err(match suggestion {
306                    Some(similar) => anyhow!(
307                        "Column '{}' not found. Did you mean '{}'?",
308                        column_name,
309                        similar
310                    ),
311                    None => anyhow!("Column '{}' not found", column_name),
312                });
313            }
314        } else {
315            let suggestion = self.find_similar_column(&resolved_column);
316            return Err(match suggestion {
317                Some(similar) => anyhow!(
318                    "Column '{}' not found. Did you mean '{}'?",
319                    column_name,
320                    similar
321                ),
322                None => anyhow!("Column '{}' not found", column_name),
323            });
324        };
325
326        if row_index >= self.table.row_count() {
327            return Err(anyhow!("Row index {} out of bounds", row_index));
328        }
329
330        let row = self
331            .table
332            .get_row(row_index)
333            .ok_or_else(|| anyhow!("Row {} not found", row_index))?;
334
335        let value = row
336            .get(col_index)
337            .ok_or_else(|| anyhow!("Column index {} out of bounds for row", col_index))?;
338
339        Ok(value.clone())
340    }
341
342    /// Evaluate a number literal (handles both integers and floats)
343    fn evaluate_number_literal(&self, number_str: &str) -> Result<DataValue> {
344        // Try to parse as integer first
345        if let Ok(int_val) = number_str.parse::<i64>() {
346            return Ok(DataValue::Integer(int_val));
347        }
348
349        // If that fails, try as float
350        if let Ok(float_val) = number_str.parse::<f64>() {
351            return Ok(DataValue::Float(float_val));
352        }
353
354        Err(anyhow!("Invalid number literal: {}", number_str))
355    }
356
357    /// Evaluate a binary operation (arithmetic)
358    fn evaluate_binary_op(
359        &mut self,
360        left: &SqlExpression,
361        op: &str,
362        right: &SqlExpression,
363        row_index: usize,
364    ) -> Result<DataValue> {
365        let left_val = self.evaluate(left, row_index)?;
366        let right_val = self.evaluate(right, row_index)?;
367
368        debug!(
369            "ArithmeticEvaluator: {} {} {}",
370            self.format_value(&left_val),
371            op,
372            self.format_value(&right_val)
373        );
374
375        match op {
376            "+" => self.add_values(&left_val, &right_val),
377            "-" => self.subtract_values(&left_val, &right_val),
378            "*" => self.multiply_values(&left_val, &right_val),
379            "/" => self.divide_values(&left_val, &right_val),
380            "%" => {
381                // Modulo operator - call MOD function
382                let args = vec![left.clone(), right.clone()];
383                self.evaluate_function("MOD", &args, row_index)
384            }
385            // Comparison operators (return boolean results)
386            // Use centralized comparison logic for consistency
387            ">" | "<" | ">=" | "<=" | "=" | "!=" | "<>" => {
388                let result = compare_with_op(&left_val, &right_val, op, false);
389                Ok(DataValue::Boolean(result))
390            }
391            // IS NULL / IS NOT NULL operators
392            "IS NULL" => Ok(DataValue::Boolean(matches!(left_val, DataValue::Null))),
393            "IS NOT NULL" => Ok(DataValue::Boolean(!matches!(left_val, DataValue::Null))),
394            // Logical operators
395            "AND" => {
396                let left_bool = self.to_bool(&left_val)?;
397                let right_bool = self.to_bool(&right_val)?;
398                Ok(DataValue::Boolean(left_bool && right_bool))
399            }
400            "OR" => {
401                let left_bool = self.to_bool(&left_val)?;
402                let right_bool = self.to_bool(&right_val)?;
403                Ok(DataValue::Boolean(left_bool || right_bool))
404            }
405            // LIKE operator - SQL pattern matching
406            "LIKE" => {
407                let text = self.value_to_string(&left_val);
408                let pattern = self.value_to_string(&right_val);
409                let matches = self.sql_like_match(&text, &pattern);
410                Ok(DataValue::Boolean(matches))
411            }
412            _ => Err(anyhow!("Unsupported arithmetic operator: {}", op)),
413        }
414    }
415
416    /// Add two `DataValues` with type coercion
417    fn add_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
418        // NULL handling - any operation with NULL returns NULL
419        if matches!(left, DataValue::Null) || matches!(right, DataValue::Null) {
420            return Ok(DataValue::Null);
421        }
422
423        match (left, right) {
424            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a + b)),
425            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 + b)),
426            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a + *b as f64)),
427            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a + b)),
428            _ => Err(anyhow!("Cannot add {:?} and {:?}", left, right)),
429        }
430    }
431
432    /// Subtract two `DataValues` with type coercion
433    fn subtract_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
434        // NULL handling - any operation with NULL returns NULL
435        if matches!(left, DataValue::Null) || matches!(right, DataValue::Null) {
436            return Ok(DataValue::Null);
437        }
438
439        match (left, right) {
440            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a - b)),
441            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 - b)),
442            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a - *b as f64)),
443            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a - b)),
444            _ => Err(anyhow!("Cannot subtract {:?} and {:?}", left, right)),
445        }
446    }
447
448    /// Multiply two `DataValues` with type coercion
449    fn multiply_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
450        // NULL handling - any operation with NULL returns NULL
451        if matches!(left, DataValue::Null) || matches!(right, DataValue::Null) {
452            return Ok(DataValue::Null);
453        }
454
455        match (left, right) {
456            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a * b)),
457            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 * b)),
458            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a * *b as f64)),
459            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a * b)),
460            _ => Err(anyhow!("Cannot multiply {:?} and {:?}", left, right)),
461        }
462    }
463
464    /// Divide two `DataValues` with type coercion
465    fn divide_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
466        // NULL handling - any operation with NULL returns NULL
467        if matches!(left, DataValue::Null) || matches!(right, DataValue::Null) {
468            return Ok(DataValue::Null);
469        }
470
471        // Check for division by zero first
472        let is_zero = match right {
473            DataValue::Integer(0) => true,
474            DataValue::Float(f) if *f == 0.0 => true, // Only check for exact zero, not epsilon
475            _ => false,
476        };
477
478        if is_zero {
479            return Err(anyhow!("Division by zero"));
480        }
481
482        match (left, right) {
483            (DataValue::Integer(a), DataValue::Integer(b)) => {
484                // Integer division - if result is exact, keep as int, otherwise promote to float
485                if a % b == 0 {
486                    Ok(DataValue::Integer(a / b))
487                } else {
488                    Ok(DataValue::Float(*a as f64 / *b as f64))
489                }
490            }
491            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 / b)),
492            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a / *b as f64)),
493            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a / b)),
494            _ => Err(anyhow!("Cannot divide {:?} and {:?}", left, right)),
495        }
496    }
497
498    /// Format a `DataValue` for debug output
499    fn format_value(&self, value: &DataValue) -> String {
500        match value {
501            DataValue::Integer(i) => i.to_string(),
502            DataValue::Float(f) => f.to_string(),
503            DataValue::String(s) => format!("'{s}'"),
504            _ => format!("{value:?}"),
505        }
506    }
507
508    /// Convert a DataValue to boolean for logical operations
509    fn to_bool(&self, value: &DataValue) -> Result<bool> {
510        match value {
511            DataValue::Boolean(b) => Ok(*b),
512            DataValue::Integer(i) => Ok(*i != 0),
513            DataValue::Float(f) => Ok(*f != 0.0),
514            DataValue::Null => Ok(false),
515            _ => Err(anyhow!("Cannot convert {:?} to boolean", value)),
516        }
517    }
518
519    /// Convert DataValue to string for pattern matching
520    fn value_to_string(&self, value: &DataValue) -> String {
521        match value {
522            DataValue::String(s) => s.clone(),
523            DataValue::InternedString(s) => s.to_string(),
524            DataValue::Integer(i) => i.to_string(),
525            DataValue::Float(f) => f.to_string(),
526            DataValue::Boolean(b) => b.to_string(),
527            DataValue::DateTime(dt) => dt.to_string(),
528            DataValue::Vector(v) => {
529                // Format as "[x,y,z]"
530                let components: Vec<String> = v.iter().map(|f| f.to_string()).collect();
531                format!("[{}]", components.join(","))
532            }
533            DataValue::Null => String::new(),
534        }
535    }
536
537    /// SQL LIKE pattern matching
538    /// Supports % (any chars) and _ (single char)
539    fn sql_like_match(&self, text: &str, pattern: &str) -> bool {
540        let pattern_chars: Vec<char> = pattern.chars().collect();
541        let text_chars: Vec<char> = text.chars().collect();
542
543        self.like_match_recursive(&text_chars, 0, &pattern_chars, 0)
544    }
545
546    /// Recursive helper for LIKE matching
547    fn like_match_recursive(
548        &self,
549        text: &[char],
550        text_pos: usize,
551        pattern: &[char],
552        pattern_pos: usize,
553    ) -> bool {
554        // If we've consumed both text and pattern, it's a match
555        if pattern_pos >= pattern.len() {
556            return text_pos >= text.len();
557        }
558
559        // Handle % wildcard (matches zero or more characters)
560        if pattern[pattern_pos] == '%' {
561            // Try matching zero characters (skip the %)
562            if self.like_match_recursive(text, text_pos, pattern, pattern_pos + 1) {
563                return true;
564            }
565            // Try matching one or more characters
566            if text_pos < text.len() {
567                return self.like_match_recursive(text, text_pos + 1, pattern, pattern_pos);
568            }
569            return false;
570        }
571
572        // If text is consumed but pattern isn't, no match
573        if text_pos >= text.len() {
574            return false;
575        }
576
577        // Handle _ wildcard (matches exactly one character)
578        if pattern[pattern_pos] == '_' {
579            return self.like_match_recursive(text, text_pos + 1, pattern, pattern_pos + 1);
580        }
581
582        // Handle literal character match
583        if text[text_pos] == pattern[pattern_pos] {
584            return self.like_match_recursive(text, text_pos + 1, pattern, pattern_pos + 1);
585        }
586
587        false
588    }
589
590    /// Evaluate a function call
591    fn evaluate_function_with_distinct(
592        &mut self,
593        name: &str,
594        args: &[SqlExpression],
595        distinct: bool,
596        row_index: usize,
597    ) -> Result<DataValue> {
598        // If DISTINCT is specified, handle it specially for aggregate functions
599        if distinct {
600            let name_upper = name.to_uppercase();
601
602            // Check if it's an aggregate function in either registry
603            if self.aggregate_registry.is_aggregate(&name_upper)
604                || self.new_aggregate_registry.contains(&name_upper)
605            {
606                return self.evaluate_aggregate_with_distinct(&name_upper, args, row_index);
607            } else {
608                return Err(anyhow!(
609                    "DISTINCT can only be used with aggregate functions"
610                ));
611            }
612        }
613
614        // Otherwise, use the regular evaluation
615        self.evaluate_function(name, args, row_index)
616    }
617
618    fn evaluate_aggregate_with_distinct(
619        &mut self,
620        name: &str,
621        args: &[SqlExpression],
622        _row_index: usize,
623    ) -> Result<DataValue> {
624        let name_upper = name.to_uppercase();
625
626        // Check new aggregate registry first for migrated functions
627        if self.new_aggregate_registry.get(&name_upper).is_some() {
628            let rows_to_process: Vec<usize> = if let Some(ref visible) = self.visible_rows {
629                visible.clone()
630            } else {
631                (0..self.table.rows.len()).collect()
632            };
633
634            // Collect and deduplicate values for DISTINCT
635            let mut vals = Vec::new();
636            for &row_idx in &rows_to_process {
637                if !args.is_empty() {
638                    let value = self.evaluate(&args[0], row_idx)?;
639                    vals.push(value);
640                }
641            }
642
643            // Deduplicate values
644            let mut seen = HashSet::new();
645            let unique_values: Vec<_> = vals
646                .into_iter()
647                .filter(|v| {
648                    let key = format!("{:?}", v);
649                    seen.insert(key)
650                })
651                .collect();
652
653            // Get the aggregate function from the new registry
654            let agg_func = self.new_aggregate_registry.get(&name_upper).unwrap();
655            let mut state = agg_func.create_state();
656
657            // Use unique values
658            for value in &unique_values {
659                state.accumulate(value)?;
660            }
661
662            return Ok(state.finalize());
663        }
664
665        // Check old aggregate registry (DISTINCT handling)
666        if self.aggregate_registry.get(&name_upper).is_some() {
667            // Determine which rows to process first
668            let rows_to_process: Vec<usize> = if let Some(ref visible) = self.visible_rows {
669                visible.clone()
670            } else {
671                (0..self.table.rows.len()).collect()
672            };
673
674            // Special handling for STRING_AGG with separator parameter
675            if name_upper == "STRING_AGG" && args.len() >= 2 {
676                // STRING_AGG(DISTINCT column, separator)
677                let mut state = crate::sql::aggregates::AggregateState::StringAgg(
678                    // Evaluate the separator (second argument) once
679                    if args.len() >= 2 {
680                        let separator = self.evaluate(&args[1], 0)?; // Separator doesn't depend on row
681                        match separator {
682                            DataValue::String(s) => crate::sql::aggregates::StringAggState::new(&s),
683                            DataValue::InternedString(s) => {
684                                crate::sql::aggregates::StringAggState::new(&s)
685                            }
686                            _ => crate::sql::aggregates::StringAggState::new(","), // Default separator
687                        }
688                    } else {
689                        crate::sql::aggregates::StringAggState::new(",")
690                    },
691                );
692
693                // Evaluate the first argument (column) for each row and accumulate
694                // Handle DISTINCT - use a HashSet to track seen values
695                let mut seen_values = HashSet::new();
696
697                for &row_idx in &rows_to_process {
698                    let value = self.evaluate(&args[0], row_idx)?;
699
700                    // Skip if we've seen this value
701                    if !seen_values.insert(value.clone()) {
702                        continue; // Skip duplicate values
703                    }
704
705                    // Now get the aggregate function and accumulate
706                    let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
707                    agg_func.accumulate(&mut state, &value)?;
708                }
709
710                // Finalize the aggregate
711                let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
712                return Ok(agg_func.finalize(state));
713            }
714
715            // For other aggregates with DISTINCT
716            // Evaluate the argument expression for each row
717            let mut vals = Vec::new();
718            for &row_idx in &rows_to_process {
719                if !args.is_empty() {
720                    let value = self.evaluate(&args[0], row_idx)?;
721                    vals.push(value);
722                }
723            }
724
725            // Deduplicate values for DISTINCT
726            let mut seen = HashSet::new();
727            let mut unique_values = Vec::new();
728            for value in vals {
729                if seen.insert(value.clone()) {
730                    unique_values.push(value);
731                }
732            }
733
734            // Now get the aggregate function and process
735            let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
736            let mut state = agg_func.init();
737
738            // Use unique values
739            for value in &unique_values {
740                agg_func.accumulate(&mut state, value)?;
741            }
742
743            return Ok(agg_func.finalize(state));
744        }
745
746        Err(anyhow!("Unknown aggregate function: {}", name))
747    }
748
749    fn evaluate_function(
750        &mut self,
751        name: &str,
752        args: &[SqlExpression],
753        row_index: usize,
754    ) -> Result<DataValue> {
755        // Check if this is an aggregate function
756        let name_upper = name.to_uppercase();
757
758        // Check new aggregate registry first (for migrated functions)
759        if self.new_aggregate_registry.get(&name_upper).is_some() {
760            // Use new registry for SUM
761            let rows_to_process: Vec<usize> = if let Some(ref visible) = self.visible_rows {
762                visible.clone()
763            } else {
764                (0..self.table.rows.len()).collect()
765            };
766
767            // Get the aggregate function from the new registry
768            let agg_func = self.new_aggregate_registry.get(&name_upper).unwrap();
769            let mut state = agg_func.create_state();
770
771            // Special handling for COUNT(*)
772            if name_upper == "COUNT" || name_upper == "COUNT_STAR" {
773                if args.is_empty()
774                    || (args.len() == 1
775                        && matches!(&args[0], SqlExpression::Column(col) if col.name == "*"))
776                    || (args.len() == 1
777                        && matches!(&args[0], SqlExpression::StringLiteral(s) if s == "*"))
778                {
779                    // COUNT(*) or COUNT_STAR - count all rows
780                    for _ in &rows_to_process {
781                        state.accumulate(&DataValue::Integer(1))?;
782                    }
783                } else {
784                    // COUNT(column) - count non-null values
785                    for &row_idx in &rows_to_process {
786                        let value = self.evaluate(&args[0], row_idx)?;
787                        state.accumulate(&value)?;
788                    }
789                }
790            } else {
791                // Other aggregates - evaluate arguments and accumulate
792                if !args.is_empty() {
793                    for &row_idx in &rows_to_process {
794                        let value = self.evaluate(&args[0], row_idx)?;
795                        state.accumulate(&value)?;
796                    }
797                }
798            }
799
800            return Ok(state.finalize());
801        }
802
803        // Check old aggregate registry (for non-migrated functions)
804        if self.aggregate_registry.get(&name_upper).is_some() {
805            // Determine which rows to process first
806            let rows_to_process: Vec<usize> = if let Some(ref visible) = self.visible_rows {
807                visible.clone()
808            } else {
809                (0..self.table.rows.len()).collect()
810            };
811
812            // Special handling for STRING_AGG with separator parameter
813            if name_upper == "STRING_AGG" && args.len() >= 2 {
814                // STRING_AGG(column, separator) - without DISTINCT (handled separately)
815                let mut state = crate::sql::aggregates::AggregateState::StringAgg(
816                    // Evaluate the separator (second argument) once
817                    if args.len() >= 2 {
818                        let separator = self.evaluate(&args[1], 0)?; // Separator doesn't depend on row
819                        match separator {
820                            DataValue::String(s) => crate::sql::aggregates::StringAggState::new(&s),
821                            DataValue::InternedString(s) => {
822                                crate::sql::aggregates::StringAggState::new(&s)
823                            }
824                            _ => crate::sql::aggregates::StringAggState::new(","), // Default separator
825                        }
826                    } else {
827                        crate::sql::aggregates::StringAggState::new(",")
828                    },
829                );
830
831                // Evaluate the first argument (column) for each row and accumulate
832                for &row_idx in &rows_to_process {
833                    let value = self.evaluate(&args[0], row_idx)?;
834                    // Now get the aggregate function and accumulate
835                    let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
836                    agg_func.accumulate(&mut state, &value)?;
837                }
838
839                // Finalize the aggregate
840                let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
841                return Ok(agg_func.finalize(state));
842            }
843
844            // Evaluate arguments first if needed (to avoid borrow issues)
845            let values = if !args.is_empty()
846                && !(args.len() == 1
847                    && matches!(&args[0], SqlExpression::Column(c) if c.name == "*"))
848            {
849                // Evaluate the argument expression for each row
850                let mut vals = Vec::new();
851                for &row_idx in &rows_to_process {
852                    let value = self.evaluate(&args[0], row_idx)?;
853                    vals.push(value);
854                }
855                Some(vals)
856            } else {
857                None
858            };
859
860            // Now get the aggregate function and process
861            let agg_func = self.aggregate_registry.get(&name_upper).unwrap();
862            let mut state = agg_func.init();
863
864            if let Some(values) = values {
865                // Use evaluated values (DISTINCT is handled in evaluate_aggregate_with_distinct)
866                for value in &values {
867                    agg_func.accumulate(&mut state, value)?;
868                }
869            } else {
870                // COUNT(*) case
871                for _ in &rows_to_process {
872                    agg_func.accumulate(&mut state, &DataValue::Integer(1))?;
873                }
874            }
875
876            return Ok(agg_func.finalize(state));
877        }
878
879        // First check if this function exists in the registry
880        if self.function_registry.get(name).is_some() {
881            // Evaluate all arguments first to avoid borrow issues
882            let mut evaluated_args = Vec::new();
883            for arg in args {
884                evaluated_args.push(self.evaluate(arg, row_index)?);
885            }
886
887            // Get the function and call it
888            let func = self.function_registry.get(name).unwrap();
889            return func.evaluate(&evaluated_args);
890        }
891
892        // If not in registry, return error for unknown function
893        Err(anyhow!("Unknown function: {}", name))
894    }
895
896    /// Get or create a WindowContext for the given specification
897    /// Public to allow pre-creation of contexts in query engine (optimization)
898    pub fn get_or_create_window_context(
899        &mut self,
900        spec: &WindowSpec,
901    ) -> Result<Arc<WindowContext>> {
902        let overall_start = Instant::now();
903
904        // Create a hash-based key for fast caching (much faster than format!("{:?}", spec))
905        let key = spec.compute_hash();
906
907        if let Some(context) = self.window_contexts.get(&key) {
908            info!(
909                "WindowContext cache hit for spec (lookup: {:.2}μs)",
910                overall_start.elapsed().as_micros()
911            );
912            return Ok(Arc::clone(context));
913        }
914
915        info!("WindowContext cache miss - creating new context");
916        let dataview_start = Instant::now();
917
918        // Create a DataView from the table (with visible rows if filtered)
919        let data_view = if let Some(ref _visible_rows) = self.visible_rows {
920            // Create a filtered view
921            let view = DataView::new(Arc::new(self.table.clone()));
922            // Apply filtering based on visible rows
923            // Note: This is a simplified approach - in production we'd need proper filtering
924            view
925        } else {
926            DataView::new(Arc::new(self.table.clone()))
927        };
928
929        info!(
930            "DataView creation took {:.2}μs",
931            dataview_start.elapsed().as_micros()
932        );
933        let context_start = Instant::now();
934
935        // Create the WindowContext with the full spec (including frame)
936        let context = WindowContext::new_with_spec(Arc::new(data_view), spec.clone())?;
937
938        info!(
939            "WindowContext::new_with_spec took {:.2}ms (rows: {})",
940            context_start.elapsed().as_secs_f64() * 1000.0,
941            self.table.row_count()
942        );
943
944        let context = Arc::new(context);
945        self.window_contexts.insert(key, Arc::clone(&context));
946
947        info!(
948            "Total WindowContext creation (cache miss) took {:.2}ms",
949            overall_start.elapsed().as_secs_f64() * 1000.0
950        );
951
952        Ok(context)
953    }
954
955    /// Evaluate a window function
956    fn evaluate_window_function(
957        &mut self,
958        name: &str,
959        args: &[SqlExpression],
960        spec: &WindowSpec,
961        row_index: usize,
962    ) -> Result<DataValue> {
963        let func_start = Instant::now();
964        let name_upper = name.to_uppercase();
965
966        // First check if this is a syntactic sugar function in the registry
967        debug!("Looking for window function {} in registry", name_upper);
968        if let Some(window_fn_arc) = self.window_function_registry.get(&name_upper) {
969            debug!("Found window function {} in registry", name_upper);
970
971            // Dereference to get the actual window function
972            let window_fn = window_fn_arc.as_ref();
973
974            // Validate arguments
975            window_fn.validate_args(args)?;
976
977            // Transform the window spec based on the function's requirements
978            let transformed_spec = window_fn.transform_window_spec(spec, args)?;
979
980            // Get or create the window context with the transformed spec
981            let context = self.get_or_create_window_context(&transformed_spec)?;
982
983            // Create an expression evaluator adapter
984            struct EvaluatorAdapter<'a, 'b> {
985                evaluator: &'a mut ArithmeticEvaluator<'b>,
986                row_index: usize,
987            }
988
989            impl<'a, 'b> ExpressionEvaluator for EvaluatorAdapter<'a, 'b> {
990                fn evaluate(
991                    &mut self,
992                    expr: &SqlExpression,
993                    row_index: usize,
994                ) -> Result<DataValue> {
995                    self.evaluator.evaluate(expr, row_index)
996                }
997            }
998
999            let mut adapter = EvaluatorAdapter {
1000                evaluator: self,
1001                row_index,
1002            };
1003
1004            let compute_start = Instant::now();
1005            // Call the window function's compute method
1006            let result = window_fn.compute(&context, row_index, args, &mut adapter);
1007
1008            info!(
1009                "{} (registry) evaluation: total={:.2}μs, compute={:.2}μs",
1010                name_upper,
1011                func_start.elapsed().as_micros(),
1012                compute_start.elapsed().as_micros()
1013            );
1014
1015            return result;
1016        }
1017
1018        // Fall back to built-in window functions
1019        let context_start = Instant::now();
1020        let context = self.get_or_create_window_context(spec)?;
1021        let context_time = context_start.elapsed();
1022
1023        let eval_start = Instant::now();
1024
1025        let result = match name_upper.as_str() {
1026            "LAG" => {
1027                // LAG(column, offset, default)
1028                if args.is_empty() {
1029                    return Err(anyhow!("LAG requires at least 1 argument"));
1030                }
1031
1032                // Get column name
1033                let column = match &args[0] {
1034                    SqlExpression::Column(col) => col.clone(),
1035                    _ => return Err(anyhow!("LAG first argument must be a column")),
1036                };
1037
1038                // Get offset (default 1)
1039                let offset = if args.len() > 1 {
1040                    match self.evaluate(&args[1], row_index)? {
1041                        DataValue::Integer(i) => i as i32,
1042                        _ => return Err(anyhow!("LAG offset must be an integer")),
1043                    }
1044                } else {
1045                    1
1046                };
1047
1048                let offset_start = Instant::now();
1049                // Get value at offset
1050                let value = context
1051                    .get_offset_value(row_index, -offset, &column.name)
1052                    .unwrap_or(DataValue::Null);
1053
1054                debug!(
1055                    "LAG offset access took {:.2}μs (offset={})",
1056                    offset_start.elapsed().as_micros(),
1057                    offset
1058                );
1059
1060                Ok(value)
1061            }
1062            "LEAD" => {
1063                // LEAD(column, offset, default)
1064                if args.is_empty() {
1065                    return Err(anyhow!("LEAD requires at least 1 argument"));
1066                }
1067
1068                // Get column name
1069                let column = match &args[0] {
1070                    SqlExpression::Column(col) => col.clone(),
1071                    _ => return Err(anyhow!("LEAD first argument must be a column")),
1072                };
1073
1074                // Get offset (default 1)
1075                let offset = if args.len() > 1 {
1076                    match self.evaluate(&args[1], row_index)? {
1077                        DataValue::Integer(i) => i as i32,
1078                        _ => return Err(anyhow!("LEAD offset must be an integer")),
1079                    }
1080                } else {
1081                    1
1082                };
1083
1084                let offset_start = Instant::now();
1085                // Get value at offset
1086                let value = context
1087                    .get_offset_value(row_index, offset, &column.name)
1088                    .unwrap_or(DataValue::Null);
1089
1090                debug!(
1091                    "LEAD offset access took {:.2}μs (offset={})",
1092                    offset_start.elapsed().as_micros(),
1093                    offset
1094                );
1095
1096                Ok(value)
1097            }
1098            "ROW_NUMBER" => {
1099                // ROW_NUMBER() - no arguments
1100                Ok(DataValue::Integer(context.get_row_number(row_index) as i64))
1101            }
1102            "RANK" => {
1103                // RANK() - no arguments
1104                Ok(DataValue::Integer(context.get_rank(row_index)))
1105            }
1106            "DENSE_RANK" => {
1107                // DENSE_RANK() - no arguments
1108                Ok(DataValue::Integer(context.get_dense_rank(row_index)))
1109            }
1110            "FIRST_VALUE" => {
1111                // FIRST_VALUE(column) OVER (... ROWS ...)
1112                if args.is_empty() {
1113                    return Err(anyhow!("FIRST_VALUE requires 1 argument"));
1114                }
1115
1116                let column = match &args[0] {
1117                    SqlExpression::Column(col) => col.clone(),
1118                    _ => return Err(anyhow!("FIRST_VALUE argument must be a column")),
1119                };
1120
1121                // Use frame-aware version if frame is specified
1122                if context.has_frame() {
1123                    Ok(context
1124                        .get_frame_first_value(row_index, &column.name)
1125                        .unwrap_or(DataValue::Null))
1126                } else {
1127                    Ok(context
1128                        .get_first_value(row_index, &column.name)
1129                        .unwrap_or(DataValue::Null))
1130                }
1131            }
1132            "LAST_VALUE" => {
1133                // LAST_VALUE(column) OVER (... ROWS ...)
1134                if args.is_empty() {
1135                    return Err(anyhow!("LAST_VALUE requires 1 argument"));
1136                }
1137
1138                let column = match &args[0] {
1139                    SqlExpression::Column(col) => col.clone(),
1140                    _ => return Err(anyhow!("LAST_VALUE argument must be a column")),
1141                };
1142
1143                // Use frame-aware version if frame is specified
1144                if context.has_frame() {
1145                    Ok(context
1146                        .get_frame_last_value(row_index, &column.name)
1147                        .unwrap_or(DataValue::Null))
1148                } else {
1149                    Ok(context
1150                        .get_last_value(row_index, &column.name)
1151                        .unwrap_or(DataValue::Null))
1152                }
1153            }
1154            "SUM" => {
1155                // SUM(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1156                if args.is_empty() {
1157                    return Err(anyhow!("SUM requires 1 argument"));
1158                }
1159
1160                let column = match &args[0] {
1161                    SqlExpression::Column(col) => col.clone(),
1162                    _ => return Err(anyhow!("SUM argument must be a column")),
1163                };
1164
1165                // Use frame-aware sum if frame is specified, otherwise use partition sum
1166                if context.has_frame() {
1167                    Ok(context
1168                        .get_frame_sum(row_index, &column.name)
1169                        .unwrap_or(DataValue::Null))
1170                } else {
1171                    Ok(context
1172                        .get_partition_sum(row_index, &column.name)
1173                        .unwrap_or(DataValue::Null))
1174                }
1175            }
1176            "AVG" => {
1177                // AVG(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1178                if args.is_empty() {
1179                    return Err(anyhow!("AVG requires 1 argument"));
1180                }
1181
1182                let column = match &args[0] {
1183                    SqlExpression::Column(col) => col.clone(),
1184                    _ => return Err(anyhow!("AVG argument must be a column")),
1185                };
1186
1187                // Use frame-aware avg if frame is specified, otherwise use partition avg
1188                if context.has_frame() {
1189                    Ok(context
1190                        .get_frame_avg(row_index, &column.name)
1191                        .unwrap_or(DataValue::Null))
1192                } else {
1193                    Ok(context
1194                        .get_partition_avg(row_index, &column.name)
1195                        .unwrap_or(DataValue::Null))
1196                }
1197            }
1198            "STDDEV" | "STDEV" => {
1199                // STDDEV(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1200                if args.is_empty() {
1201                    return Err(anyhow!("STDDEV requires 1 argument"));
1202                }
1203
1204                let column = match &args[0] {
1205                    SqlExpression::Column(col) => col.clone(),
1206                    _ => return Err(anyhow!("STDDEV argument must be a column")),
1207                };
1208
1209                Ok(context
1210                    .get_frame_stddev(row_index, &column.name)
1211                    .unwrap_or(DataValue::Null))
1212            }
1213            "VARIANCE" | "VAR" => {
1214                // VARIANCE(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1215                if args.is_empty() {
1216                    return Err(anyhow!("VARIANCE requires 1 argument"));
1217                }
1218
1219                let column = match &args[0] {
1220                    SqlExpression::Column(col) => col.clone(),
1221                    _ => return Err(anyhow!("VARIANCE argument must be a column")),
1222                };
1223
1224                Ok(context
1225                    .get_frame_variance(row_index, &column.name)
1226                    .unwrap_or(DataValue::Null))
1227            }
1228            "MIN" => {
1229                // MIN(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1230                if args.is_empty() {
1231                    return Err(anyhow!("MIN requires 1 argument"));
1232                }
1233
1234                let column = match &args[0] {
1235                    SqlExpression::Column(col) => col.clone(),
1236                    _ => return Err(anyhow!("MIN argument must be a column")),
1237                };
1238
1239                let frame_rows = context.get_frame_rows(row_index);
1240                if frame_rows.is_empty() {
1241                    return Ok(DataValue::Null);
1242                }
1243
1244                let source_table = context.source();
1245                let col_idx = source_table
1246                    .get_column_index(&column.name)
1247                    .ok_or_else(|| anyhow!("Column '{}' not found", column.name))?;
1248
1249                let mut min_value: Option<DataValue> = None;
1250                for &row_idx in &frame_rows {
1251                    if let Some(value) = source_table.get_value(row_idx, col_idx) {
1252                        if !matches!(value, DataValue::Null) {
1253                            match &min_value {
1254                                None => min_value = Some(value.clone()),
1255                                Some(current_min) => {
1256                                    if value < current_min {
1257                                        min_value = Some(value.clone());
1258                                    }
1259                                }
1260                            }
1261                        }
1262                    }
1263                }
1264
1265                Ok(min_value.unwrap_or(DataValue::Null))
1266            }
1267            "MAX" => {
1268                // MAX(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1269                if args.is_empty() {
1270                    return Err(anyhow!("MAX requires 1 argument"));
1271                }
1272
1273                let column = match &args[0] {
1274                    SqlExpression::Column(col) => col.clone(),
1275                    _ => return Err(anyhow!("MAX argument must be a column")),
1276                };
1277
1278                let frame_rows = context.get_frame_rows(row_index);
1279                if frame_rows.is_empty() {
1280                    return Ok(DataValue::Null);
1281                }
1282
1283                let source_table = context.source();
1284                let col_idx = source_table
1285                    .get_column_index(&column.name)
1286                    .ok_or_else(|| anyhow!("Column '{}' not found", column.name))?;
1287
1288                let mut max_value: Option<DataValue> = None;
1289                for &row_idx in &frame_rows {
1290                    if let Some(value) = source_table.get_value(row_idx, col_idx) {
1291                        if !matches!(value, DataValue::Null) {
1292                            match &max_value {
1293                                None => max_value = Some(value.clone()),
1294                                Some(current_max) => {
1295                                    if value > current_max {
1296                                        max_value = Some(value.clone());
1297                                    }
1298                                }
1299                            }
1300                        }
1301                    }
1302                }
1303
1304                Ok(max_value.unwrap_or(DataValue::Null))
1305            }
1306            "COUNT" => {
1307                // COUNT(*) or COUNT(column) OVER (PARTITION BY ... ROWS n PRECEDING)
1308                // Use frame-aware count if frame is specified, otherwise use partition count
1309
1310                if args.is_empty() {
1311                    // COUNT(*) OVER (...)
1312                    if context.has_frame() {
1313                        Ok(context
1314                            .get_frame_count(row_index, None)
1315                            .unwrap_or(DataValue::Null))
1316                    } else {
1317                        Ok(context
1318                            .get_partition_count(row_index, None)
1319                            .unwrap_or(DataValue::Null))
1320                    }
1321                } else {
1322                    // Check for COUNT(*)
1323                    let column = match &args[0] {
1324                        SqlExpression::Column(col) => {
1325                            if col.name == "*" {
1326                                // COUNT(*) - count all rows
1327                                if context.has_frame() {
1328                                    return Ok(context
1329                                        .get_frame_count(row_index, None)
1330                                        .unwrap_or(DataValue::Null));
1331                                } else {
1332                                    return Ok(context
1333                                        .get_partition_count(row_index, None)
1334                                        .unwrap_or(DataValue::Null));
1335                                }
1336                            }
1337                            col.clone()
1338                        }
1339                        SqlExpression::StringLiteral(s) if s == "*" => {
1340                            // COUNT(*) as StringLiteral
1341                            if context.has_frame() {
1342                                return Ok(context
1343                                    .get_frame_count(row_index, None)
1344                                    .unwrap_or(DataValue::Null));
1345                            } else {
1346                                return Ok(context
1347                                    .get_partition_count(row_index, None)
1348                                    .unwrap_or(DataValue::Null));
1349                            }
1350                        }
1351                        _ => return Err(anyhow!("COUNT argument must be a column or *")),
1352                    };
1353
1354                    // COUNT(column) - count non-null values
1355                    if context.has_frame() {
1356                        Ok(context
1357                            .get_frame_count(row_index, Some(&column.name))
1358                            .unwrap_or(DataValue::Null))
1359                    } else {
1360                        Ok(context
1361                            .get_partition_count(row_index, Some(&column.name))
1362                            .unwrap_or(DataValue::Null))
1363                    }
1364                }
1365            }
1366            _ => Err(anyhow!("Unknown window function: {}", name)),
1367        };
1368
1369        let eval_time = eval_start.elapsed();
1370
1371        info!(
1372            "{} (built-in) evaluation: total={:.2}μs, context={:.2}μs, eval={:.2}μs",
1373            name_upper,
1374            func_start.elapsed().as_micros(),
1375            context_time.as_micros(),
1376            eval_time.as_micros()
1377        );
1378
1379        result
1380    }
1381
1382    /// Evaluate a method call on a column (e.g., `column.Trim()`)
1383    fn evaluate_method_call(
1384        &mut self,
1385        object: &str,
1386        method: &str,
1387        args: &[SqlExpression],
1388        row_index: usize,
1389    ) -> Result<DataValue> {
1390        // Get column value
1391        let col_index = self.table.get_column_index(object).ok_or_else(|| {
1392            let suggestion = self.find_similar_column(object);
1393            match suggestion {
1394                Some(similar) => {
1395                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
1396                }
1397                None => anyhow!("Column '{}' not found", object),
1398            }
1399        })?;
1400
1401        let cell_value = self.table.get_value(row_index, col_index).cloned();
1402
1403        self.evaluate_method_on_value(
1404            &cell_value.unwrap_or(DataValue::Null),
1405            method,
1406            args,
1407            row_index,
1408        )
1409    }
1410
1411    /// Evaluate a method on a value
1412    fn evaluate_method_on_value(
1413        &mut self,
1414        value: &DataValue,
1415        method: &str,
1416        args: &[SqlExpression],
1417        row_index: usize,
1418    ) -> Result<DataValue> {
1419        // First, try to proxy the method through the function registry
1420        // Many string methods have corresponding functions (TRIM, LENGTH, CONTAINS, etc.)
1421
1422        // Map method names to function names (case-insensitive matching)
1423        let function_name = match method.to_lowercase().as_str() {
1424            "trim" => "TRIM",
1425            "trimstart" | "trimbegin" => "TRIMSTART",
1426            "trimend" => "TRIMEND",
1427            "length" | "len" => "LENGTH",
1428            "contains" => "CONTAINS",
1429            "startswith" => "STARTSWITH",
1430            "endswith" => "ENDSWITH",
1431            "indexof" => "INDEXOF",
1432            _ => method, // Try the method name as-is
1433        };
1434
1435        // Check if we have this function in the registry
1436        if self.function_registry.get(function_name).is_some() {
1437            debug!(
1438                "Proxying method '{}' through function registry as '{}'",
1439                method, function_name
1440            );
1441
1442            // Prepare arguments: receiver is the first argument, followed by method args
1443            let mut func_args = vec![value.clone()];
1444
1445            // Evaluate method arguments and add them
1446            for arg in args {
1447                func_args.push(self.evaluate(arg, row_index)?);
1448            }
1449
1450            // Get the function and call it
1451            let func = self.function_registry.get(function_name).unwrap();
1452            return func.evaluate(&func_args);
1453        }
1454
1455        // If not in registry, the method is not supported
1456        // All methods should be registered in the function registry
1457        Err(anyhow!(
1458            "Method '{}' not found. It should be registered in the function registry.",
1459            method
1460        ))
1461    }
1462
1463    /// Evaluate a CASE expression
1464    fn evaluate_case_expression(
1465        &mut self,
1466        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1467        else_branch: &Option<Box<SqlExpression>>,
1468        row_index: usize,
1469    ) -> Result<DataValue> {
1470        debug!(
1471            "ArithmeticEvaluator: evaluating CASE expression for row {}",
1472            row_index
1473        );
1474
1475        // Evaluate each WHEN condition in order
1476        for branch in when_branches {
1477            // Evaluate the condition as a boolean
1478            let condition_result = self.evaluate_condition_as_bool(&branch.condition, row_index)?;
1479
1480            if condition_result {
1481                debug!("CASE: WHEN condition matched, evaluating result expression");
1482                return self.evaluate(&branch.result, row_index);
1483            }
1484        }
1485
1486        // If no WHEN condition matched, evaluate ELSE clause (or return NULL)
1487        if let Some(else_expr) = else_branch {
1488            debug!("CASE: No WHEN matched, evaluating ELSE expression");
1489            self.evaluate(else_expr, row_index)
1490        } else {
1491            debug!("CASE: No WHEN matched and no ELSE, returning NULL");
1492            Ok(DataValue::Null)
1493        }
1494    }
1495
1496    /// Evaluate a simple CASE expression
1497    fn evaluate_simple_case_expression(
1498        &mut self,
1499        expr: &Box<SqlExpression>,
1500        when_branches: &[crate::sql::parser::ast::SimpleWhenBranch],
1501        else_branch: &Option<Box<SqlExpression>>,
1502        row_index: usize,
1503    ) -> Result<DataValue> {
1504        debug!(
1505            "ArithmeticEvaluator: evaluating simple CASE expression for row {}",
1506            row_index
1507        );
1508
1509        // Evaluate the main expression once
1510        let case_value = self.evaluate(expr, row_index)?;
1511        debug!("Simple CASE: evaluated expression to {:?}", case_value);
1512
1513        // Compare against each WHEN value in order
1514        for branch in when_branches {
1515            // Evaluate the WHEN value
1516            let when_value = self.evaluate(&branch.value, row_index)?;
1517
1518            // Check for equality
1519            if self.values_equal(&case_value, &when_value)? {
1520                debug!("Simple CASE: WHEN value matched, evaluating result expression");
1521                return self.evaluate(&branch.result, row_index);
1522            }
1523        }
1524
1525        // If no WHEN value matched, evaluate ELSE clause (or return NULL)
1526        if let Some(else_expr) = else_branch {
1527            debug!("Simple CASE: No WHEN matched, evaluating ELSE expression");
1528            self.evaluate(else_expr, row_index)
1529        } else {
1530            debug!("Simple CASE: No WHEN matched and no ELSE, returning NULL");
1531            Ok(DataValue::Null)
1532        }
1533    }
1534
1535    /// Check if two DataValues are equal
1536    fn values_equal(&self, left: &DataValue, right: &DataValue) -> Result<bool> {
1537        match (left, right) {
1538            (DataValue::Null, DataValue::Null) => Ok(true),
1539            (DataValue::Null, _) | (_, DataValue::Null) => Ok(false),
1540            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(a == b),
1541            (DataValue::Float(a), DataValue::Float(b)) => Ok((a - b).abs() < f64::EPSILON),
1542            (DataValue::String(a), DataValue::String(b)) => Ok(a == b),
1543            (DataValue::Boolean(a), DataValue::Boolean(b)) => Ok(a == b),
1544            (DataValue::DateTime(a), DataValue::DateTime(b)) => Ok(a == b),
1545            // Type coercion for numeric comparisons
1546            (DataValue::Integer(a), DataValue::Float(b)) => {
1547                Ok((*a as f64 - b).abs() < f64::EPSILON)
1548            }
1549            (DataValue::Float(a), DataValue::Integer(b)) => {
1550                Ok((a - *b as f64).abs() < f64::EPSILON)
1551            }
1552            _ => Ok(false),
1553        }
1554    }
1555
1556    /// Helper method to evaluate an expression as a boolean (for CASE WHEN conditions)
1557    fn evaluate_condition_as_bool(
1558        &mut self,
1559        expr: &SqlExpression,
1560        row_index: usize,
1561    ) -> Result<bool> {
1562        let value = self.evaluate(expr, row_index)?;
1563
1564        match value {
1565            DataValue::Boolean(b) => Ok(b),
1566            DataValue::Integer(i) => Ok(i != 0),
1567            DataValue::Float(f) => Ok(f != 0.0),
1568            DataValue::Null => Ok(false),
1569            DataValue::String(s) => Ok(!s.is_empty()),
1570            DataValue::InternedString(s) => Ok(!s.is_empty()),
1571            _ => Ok(true), // Other types are considered truthy
1572        }
1573    }
1574
1575    /// Evaluate a DATETIME constructor expression
1576    fn evaluate_datetime_constructor(
1577        &self,
1578        year: i32,
1579        month: u32,
1580        day: u32,
1581        hour: Option<u32>,
1582        minute: Option<u32>,
1583        second: Option<u32>,
1584    ) -> Result<DataValue> {
1585        use chrono::{NaiveDate, TimeZone, Utc};
1586
1587        // Create a NaiveDate
1588        let date = NaiveDate::from_ymd_opt(year, month, day)
1589            .ok_or_else(|| anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1590
1591        // Create datetime with provided time components or defaults
1592        let hour = hour.unwrap_or(0);
1593        let minute = minute.unwrap_or(0);
1594        let second = second.unwrap_or(0);
1595
1596        let naive_datetime = date
1597            .and_hms_opt(hour, minute, second)
1598            .ok_or_else(|| anyhow!("Invalid time: {}:{}:{}", hour, minute, second))?;
1599
1600        // Convert to UTC DateTime
1601        let datetime = Utc.from_utc_datetime(&naive_datetime);
1602
1603        // Format as string with milliseconds
1604        let datetime_str = datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
1605        Ok(DataValue::String(datetime_str))
1606    }
1607
1608    /// Evaluate a DATETIME.TODAY constructor expression
1609    fn evaluate_datetime_today(
1610        &self,
1611        hour: Option<u32>,
1612        minute: Option<u32>,
1613        second: Option<u32>,
1614    ) -> Result<DataValue> {
1615        use chrono::{TimeZone, Utc};
1616
1617        // Get today's date in UTC
1618        let today = Utc::now().date_naive();
1619
1620        // Create datetime with provided time components or defaults
1621        let hour = hour.unwrap_or(0);
1622        let minute = minute.unwrap_or(0);
1623        let second = second.unwrap_or(0);
1624
1625        let naive_datetime = today
1626            .and_hms_opt(hour, minute, second)
1627            .ok_or_else(|| anyhow!("Invalid time: {}:{}:{}", hour, minute, second))?;
1628
1629        // Convert to UTC DateTime
1630        let datetime = Utc.from_utc_datetime(&naive_datetime);
1631
1632        // Format as string with milliseconds
1633        let datetime_str = datetime.format("%Y-%m-%d %H:%M:%S%.3f").to_string();
1634        Ok(DataValue::String(datetime_str))
1635    }
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640    use super::*;
1641    use crate::data::datatable::{DataColumn, DataRow};
1642
1643    fn create_test_table() -> DataTable {
1644        let mut table = DataTable::new("test");
1645        table.add_column(DataColumn::new("a"));
1646        table.add_column(DataColumn::new("b"));
1647        table.add_column(DataColumn::new("c"));
1648
1649        table
1650            .add_row(DataRow::new(vec![
1651                DataValue::Integer(10),
1652                DataValue::Float(2.5),
1653                DataValue::Integer(4),
1654            ]))
1655            .unwrap();
1656
1657        table
1658    }
1659
1660    #[test]
1661    fn test_evaluate_column() {
1662        let table = create_test_table();
1663        let mut evaluator = ArithmeticEvaluator::new(&table);
1664
1665        let expr = SqlExpression::Column(ColumnRef::unquoted("a".to_string()));
1666        let result = evaluator.evaluate(&expr, 0).unwrap();
1667        assert_eq!(result, DataValue::Integer(10));
1668    }
1669
1670    #[test]
1671    fn test_evaluate_between_column_in_range() {
1672        let table = create_test_table();
1673        let mut evaluator = ArithmeticEvaluator::new(&table);
1674
1675        // column 'a' is 10 — 5 <= 10 <= 20 is true
1676        let expr = SqlExpression::Between {
1677            expr: Box::new(SqlExpression::Column(ColumnRef::unquoted("a".to_string()))),
1678            lower: Box::new(SqlExpression::NumberLiteral("5".to_string())),
1679            upper: Box::new(SqlExpression::NumberLiteral("20".to_string())),
1680        };
1681        assert_eq!(
1682            evaluator.evaluate(&expr, 0).unwrap(),
1683            DataValue::Boolean(true)
1684        );
1685    }
1686
1687    #[test]
1688    fn test_evaluate_between_column_out_of_range() {
1689        let table = create_test_table();
1690        let mut evaluator = ArithmeticEvaluator::new(&table);
1691
1692        // column 'a' is 10 — 11 <= 10 <= 20 is false
1693        let expr = SqlExpression::Between {
1694            expr: Box::new(SqlExpression::Column(ColumnRef::unquoted("a".to_string()))),
1695            lower: Box::new(SqlExpression::NumberLiteral("11".to_string())),
1696            upper: Box::new(SqlExpression::NumberLiteral("20".to_string())),
1697        };
1698        assert_eq!(
1699            evaluator.evaluate(&expr, 0).unwrap(),
1700            DataValue::Boolean(false)
1701        );
1702    }
1703
1704    #[test]
1705    fn test_evaluate_between_endpoints_inclusive() {
1706        let table = create_test_table();
1707        let mut evaluator = ArithmeticEvaluator::new(&table);
1708
1709        // column 'a' is 10 — 10 <= 10 <= 10 is true (both endpoints inclusive)
1710        let expr = SqlExpression::Between {
1711            expr: Box::new(SqlExpression::Column(ColumnRef::unquoted("a".to_string()))),
1712            lower: Box::new(SqlExpression::NumberLiteral("10".to_string())),
1713            upper: Box::new(SqlExpression::NumberLiteral("10".to_string())),
1714        };
1715        assert_eq!(
1716            evaluator.evaluate(&expr, 0).unwrap(),
1717            DataValue::Boolean(true)
1718        );
1719    }
1720
1721    #[test]
1722    fn test_evaluate_number_literal() {
1723        let table = create_test_table();
1724        let mut evaluator = ArithmeticEvaluator::new(&table);
1725
1726        let expr = SqlExpression::NumberLiteral("42".to_string());
1727        let result = evaluator.evaluate(&expr, 0).unwrap();
1728        assert_eq!(result, DataValue::Integer(42));
1729
1730        let expr = SqlExpression::NumberLiteral("3.14".to_string());
1731        let result = evaluator.evaluate(&expr, 0).unwrap();
1732        assert_eq!(result, DataValue::Float(3.14));
1733    }
1734
1735    #[test]
1736    fn test_add_values() {
1737        let table = create_test_table();
1738        let mut evaluator = ArithmeticEvaluator::new(&table);
1739
1740        // Integer + Integer
1741        let result = evaluator
1742            .add_values(&DataValue::Integer(5), &DataValue::Integer(3))
1743            .unwrap();
1744        assert_eq!(result, DataValue::Integer(8));
1745
1746        // Integer + Float
1747        let result = evaluator
1748            .add_values(&DataValue::Integer(5), &DataValue::Float(2.5))
1749            .unwrap();
1750        assert_eq!(result, DataValue::Float(7.5));
1751    }
1752
1753    #[test]
1754    fn test_multiply_values() {
1755        let table = create_test_table();
1756        let mut evaluator = ArithmeticEvaluator::new(&table);
1757
1758        // Integer * Float
1759        let result = evaluator
1760            .multiply_values(&DataValue::Integer(4), &DataValue::Float(2.5))
1761            .unwrap();
1762        assert_eq!(result, DataValue::Float(10.0));
1763    }
1764
1765    #[test]
1766    fn test_divide_values() {
1767        let table = create_test_table();
1768        let mut evaluator = ArithmeticEvaluator::new(&table);
1769
1770        // Exact division
1771        let result = evaluator
1772            .divide_values(&DataValue::Integer(10), &DataValue::Integer(2))
1773            .unwrap();
1774        assert_eq!(result, DataValue::Integer(5));
1775
1776        // Non-exact division
1777        let result = evaluator
1778            .divide_values(&DataValue::Integer(10), &DataValue::Integer(3))
1779            .unwrap();
1780        assert_eq!(result, DataValue::Float(10.0 / 3.0));
1781    }
1782
1783    #[test]
1784    fn test_division_by_zero() {
1785        let table = create_test_table();
1786        let mut evaluator = ArithmeticEvaluator::new(&table);
1787
1788        let result = evaluator.divide_values(&DataValue::Integer(10), &DataValue::Integer(0));
1789        assert!(result.is_err());
1790        assert!(result.unwrap_err().to_string().contains("Division by zero"));
1791    }
1792
1793    #[test]
1794    fn test_binary_op_expression() {
1795        let table = create_test_table();
1796        let mut evaluator = ArithmeticEvaluator::new(&table);
1797
1798        // a * b where a=10, b=2.5
1799        let expr = SqlExpression::BinaryOp {
1800            left: Box::new(SqlExpression::Column(ColumnRef::unquoted("a".to_string()))),
1801            op: "*".to_string(),
1802            right: Box::new(SqlExpression::Column(ColumnRef::unquoted("b".to_string()))),
1803        };
1804
1805        let result = evaluator.evaluate(&expr, 0).unwrap();
1806        assert_eq!(result, DataValue::Float(25.0));
1807    }
1808}