sql_cli/data/
arithmetic_evaluator.rs

1use crate::data::datatable::{DataTable, DataValue};
2use crate::sql::aggregates::AggregateRegistry;
3use crate::sql::functions::FunctionRegistry;
4use crate::sql::recursive_parser::SqlExpression;
5use anyhow::{anyhow, Result};
6use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeZone, Utc};
7use std::sync::Arc;
8use tracing::debug;
9
10/// Evaluates SQL expressions to compute DataValues (for SELECT clauses)
11/// This is different from RecursiveWhereEvaluator which returns boolean
12pub struct ArithmeticEvaluator<'a> {
13    table: &'a DataTable,
14    date_notation: String,
15    function_registry: Arc<FunctionRegistry>,
16    aggregate_registry: Arc<AggregateRegistry>,
17    visible_rows: Option<Vec<usize>>, // For aggregate functions on filtered views
18}
19
20impl<'a> ArithmeticEvaluator<'a> {
21    pub fn new(table: &'a DataTable) -> Self {
22        Self {
23            table,
24            date_notation: "us".to_string(),
25            function_registry: Arc::new(FunctionRegistry::new()),
26            aggregate_registry: Arc::new(AggregateRegistry::new()),
27            visible_rows: None,
28        }
29    }
30
31    pub fn with_date_notation(table: &'a DataTable, date_notation: String) -> Self {
32        Self {
33            table,
34            date_notation,
35            function_registry: Arc::new(FunctionRegistry::new()),
36            aggregate_registry: Arc::new(AggregateRegistry::new()),
37            visible_rows: None,
38        }
39    }
40
41    /// Set visible rows for aggregate functions (for filtered views)
42    pub fn with_visible_rows(mut self, rows: Vec<usize>) -> Self {
43        self.visible_rows = Some(rows);
44        self
45    }
46
47    pub fn with_date_notation_and_registry(
48        table: &'a DataTable,
49        date_notation: String,
50        function_registry: Arc<FunctionRegistry>,
51    ) -> Self {
52        Self {
53            table,
54            date_notation,
55            function_registry,
56            aggregate_registry: Arc::new(AggregateRegistry::new()),
57            visible_rows: None,
58        }
59    }
60
61    /// Central function to parse date/datetime strings respecting date_notation preference
62    fn parse_datetime_with_notation(&self, s: &str) -> Result<DateTime<Utc>> {
63        // ISO formats (most common and unambiguous)
64        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
65            return Ok(Utc.from_utc_datetime(&dt));
66        }
67        if let Ok(dt) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
68            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
69        }
70
71        // Date notation preference for ambiguous formats
72        if self.date_notation == "european" {
73            // European formats (DD/MM/YYYY) - try first
74            // Date only formats
75            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
76                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
77            }
78            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
79                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
80            }
81            // With time formats
82            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
83                return Ok(Utc.from_utc_datetime(&dt));
84            }
85            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
86                return Ok(Utc.from_utc_datetime(&dt));
87            }
88
89            // US formats (MM/DD/YYYY) - fallback
90            // Date only formats
91            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
92                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
93            }
94            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
95                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
96            }
97            // With time formats
98            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
99                return Ok(Utc.from_utc_datetime(&dt));
100            }
101            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
102                return Ok(Utc.from_utc_datetime(&dt));
103            }
104        } else {
105            // US formats (MM/DD/YYYY) - default, try first
106            // Date only formats
107            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
108                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
109            }
110            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
111                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
112            }
113            // With time formats
114            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
115                return Ok(Utc.from_utc_datetime(&dt));
116            }
117            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
118                return Ok(Utc.from_utc_datetime(&dt));
119            }
120
121            // European formats (DD/MM/YYYY) - fallback
122            // Date only formats
123            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
124                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
125            }
126            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
127                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
128            }
129            // With time formats
130            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
131                return Ok(Utc.from_utc_datetime(&dt));
132            }
133            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
134                return Ok(Utc.from_utc_datetime(&dt));
135            }
136        }
137
138        // Excel/Windows format: DD-MMM-YYYY (e.g., 15-Jan-2024)
139        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%b-%Y") {
140            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
141        }
142
143        // Full month names: January 15, 2024 or 15 January 2024
144        if let Ok(dt) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
145            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
146        }
147        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d %B %Y") {
148            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
149        }
150
151        // RFC3339 (e.g., 2024-01-15T10:30:00Z)
152        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
153            return Ok(dt.with_timezone(&Utc));
154        }
155
156        // Additional formats with time variations
157        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y/%m/%d %H:%M:%S") {
158            return Ok(Utc.from_utc_datetime(&dt));
159        }
160
161        Err(anyhow!(
162            "Could not parse date/datetime: '{}'. Supported formats include: YYYY-MM-DD, MM/DD/YYYY, DD/MM/YYYY (based on config), with optional HH:MM:SS",
163            s
164        ))
165    }
166
167    /// Find a column name similar to the given name using edit distance
168    fn find_similar_column(&self, name: &str) -> Option<String> {
169        let columns = self.table.column_names();
170        let mut best_match: Option<(String, usize)> = None;
171
172        for col in columns {
173            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
174            // Only suggest if distance is small (likely a typo)
175            // Allow up to 3 edits for longer names
176            let max_distance = if name.len() > 10 { 3 } else { 2 };
177            if distance <= max_distance {
178                match &best_match {
179                    None => best_match = Some((col, distance)),
180                    Some((_, best_dist)) if distance < *best_dist => {
181                        best_match = Some((col, distance));
182                    }
183                    _ => {}
184                }
185            }
186        }
187
188        best_match.map(|(name, _)| name)
189    }
190
191    /// Calculate Levenshtein edit distance between two strings
192    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
193        let len1 = s1.len();
194        let len2 = s2.len();
195        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
196
197        for i in 0..=len1 {
198            matrix[i][0] = i;
199        }
200        for j in 0..=len2 {
201            matrix[0][j] = j;
202        }
203
204        for (i, c1) in s1.chars().enumerate() {
205            for (j, c2) in s2.chars().enumerate() {
206                let cost = if c1 == c2 { 0 } else { 1 };
207                matrix[i + 1][j + 1] = std::cmp::min(
208                    matrix[i][j + 1] + 1, // deletion
209                    std::cmp::min(
210                        matrix[i + 1][j] + 1, // insertion
211                        matrix[i][j] + cost,  // substitution
212                    ),
213                );
214            }
215        }
216
217        matrix[len1][len2]
218    }
219
220    /// Evaluate an SQL expression to produce a DataValue
221    pub fn evaluate(&self, expr: &SqlExpression, row_index: usize) -> Result<DataValue> {
222        debug!(
223            "ArithmeticEvaluator: evaluating {:?} for row {}",
224            expr, row_index
225        );
226
227        match expr {
228            SqlExpression::Column(column_name) => self.evaluate_column(column_name, row_index),
229            SqlExpression::StringLiteral(s) => Ok(DataValue::String(s.clone())),
230            SqlExpression::BooleanLiteral(b) => Ok(DataValue::Boolean(*b)),
231            SqlExpression::NumberLiteral(n) => self.evaluate_number_literal(n),
232            SqlExpression::BinaryOp { left, op, right } => {
233                self.evaluate_binary_op(left, op, right, row_index)
234            }
235            SqlExpression::FunctionCall { name, args } => {
236                self.evaluate_function(name, args, row_index)
237            }
238            SqlExpression::MethodCall {
239                object,
240                method,
241                args,
242            } => self.evaluate_method_call(object, method, args, row_index),
243            SqlExpression::ChainedMethodCall { base, method, args } => {
244                // Evaluate the base expression first, then apply the method
245                let base_value = self.evaluate(base, row_index)?;
246                self.evaluate_method_on_value(&base_value, method, args, row_index)
247            }
248            SqlExpression::CaseExpression {
249                when_branches,
250                else_branch,
251            } => self.evaluate_case_expression(when_branches, else_branch, row_index),
252            _ => Err(anyhow!(
253                "Unsupported expression type for arithmetic evaluation: {:?}",
254                expr
255            )),
256        }
257    }
258
259    /// Evaluate a column reference
260    fn evaluate_column(&self, column_name: &str, row_index: usize) -> Result<DataValue> {
261        let col_index = self.table.get_column_index(column_name).ok_or_else(|| {
262            let suggestion = self.find_similar_column(column_name);
263            match suggestion {
264                Some(similar) => anyhow!(
265                    "Column '{}' not found. Did you mean '{}'?",
266                    column_name,
267                    similar
268                ),
269                None => anyhow!("Column '{}' not found", column_name),
270            }
271        })?;
272
273        if row_index >= self.table.row_count() {
274            return Err(anyhow!("Row index {} out of bounds", row_index));
275        }
276
277        let row = self
278            .table
279            .get_row(row_index)
280            .ok_or_else(|| anyhow!("Row {} not found", row_index))?;
281
282        let value = row
283            .get(col_index)
284            .ok_or_else(|| anyhow!("Column index {} out of bounds for row", col_index))?;
285
286        Ok(value.clone())
287    }
288
289    /// Evaluate a number literal (handles both integers and floats)
290    fn evaluate_number_literal(&self, number_str: &str) -> Result<DataValue> {
291        // Try to parse as integer first
292        if let Ok(int_val) = number_str.parse::<i64>() {
293            return Ok(DataValue::Integer(int_val));
294        }
295
296        // If that fails, try as float
297        if let Ok(float_val) = number_str.parse::<f64>() {
298            return Ok(DataValue::Float(float_val));
299        }
300
301        Err(anyhow!("Invalid number literal: {}", number_str))
302    }
303
304    /// Evaluate a binary operation (arithmetic)
305    fn evaluate_binary_op(
306        &self,
307        left: &SqlExpression,
308        op: &str,
309        right: &SqlExpression,
310        row_index: usize,
311    ) -> Result<DataValue> {
312        let left_val = self.evaluate(left, row_index)?;
313        let right_val = self.evaluate(right, row_index)?;
314
315        debug!(
316            "ArithmeticEvaluator: {} {} {}",
317            self.format_value(&left_val),
318            op,
319            self.format_value(&right_val)
320        );
321
322        match op {
323            "+" => self.add_values(&left_val, &right_val),
324            "-" => self.subtract_values(&left_val, &right_val),
325            "*" => self.multiply_values(&left_val, &right_val),
326            "/" => self.divide_values(&left_val, &right_val),
327            // Comparison operators (return boolean results)
328            ">" => self.compare_values(&left_val, &right_val, |a, b| a > b),
329            "<" => self.compare_values(&left_val, &right_val, |a, b| a < b),
330            ">=" => self.compare_values(&left_val, &right_val, |a, b| a >= b),
331            "<=" => self.compare_values(&left_val, &right_val, |a, b| a <= b),
332            "=" => self.compare_values(&left_val, &right_val, |a, b| a == b),
333            "!=" | "<>" => self.compare_values(&left_val, &right_val, |a, b| a != b),
334            _ => Err(anyhow!("Unsupported arithmetic operator: {}", op)),
335        }
336    }
337
338    /// Add two DataValues with type coercion
339    fn add_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
340        match (left, right) {
341            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a + b)),
342            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 + b)),
343            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a + *b as f64)),
344            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a + b)),
345            _ => Err(anyhow!("Cannot add {:?} and {:?}", left, right)),
346        }
347    }
348
349    /// Subtract two DataValues with type coercion
350    fn subtract_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
351        match (left, right) {
352            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a - b)),
353            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 - b)),
354            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a - *b as f64)),
355            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a - b)),
356            _ => Err(anyhow!("Cannot subtract {:?} and {:?}", left, right)),
357        }
358    }
359
360    /// Multiply two DataValues with type coercion
361    fn multiply_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
362        match (left, right) {
363            (DataValue::Integer(a), DataValue::Integer(b)) => Ok(DataValue::Integer(a * b)),
364            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 * b)),
365            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a * *b as f64)),
366            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a * b)),
367            _ => Err(anyhow!("Cannot multiply {:?} and {:?}", left, right)),
368        }
369    }
370
371    /// Divide two DataValues with type coercion
372    fn divide_values(&self, left: &DataValue, right: &DataValue) -> Result<DataValue> {
373        // Check for division by zero first
374        let is_zero = match right {
375            DataValue::Integer(0) => true,
376            DataValue::Float(f) if *f == 0.0 => true, // Only check for exact zero, not epsilon
377            _ => false,
378        };
379
380        if is_zero {
381            return Err(anyhow!("Division by zero"));
382        }
383
384        match (left, right) {
385            (DataValue::Integer(a), DataValue::Integer(b)) => {
386                // Integer division - if result is exact, keep as int, otherwise promote to float
387                if a % b == 0 {
388                    Ok(DataValue::Integer(a / b))
389                } else {
390                    Ok(DataValue::Float(*a as f64 / *b as f64))
391                }
392            }
393            (DataValue::Integer(a), DataValue::Float(b)) => Ok(DataValue::Float(*a as f64 / b)),
394            (DataValue::Float(a), DataValue::Integer(b)) => Ok(DataValue::Float(a / *b as f64)),
395            (DataValue::Float(a), DataValue::Float(b)) => Ok(DataValue::Float(a / b)),
396            _ => Err(anyhow!("Cannot divide {:?} and {:?}", left, right)),
397        }
398    }
399
400    /// Format a DataValue for debug output
401    fn format_value(&self, value: &DataValue) -> String {
402        match value {
403            DataValue::Integer(i) => i.to_string(),
404            DataValue::Float(f) => f.to_string(),
405            DataValue::String(s) => format!("'{}'", s),
406            _ => format!("{:?}", value),
407        }
408    }
409
410    /// Compare two DataValues using the provided comparison function
411    fn compare_values<F>(&self, left: &DataValue, right: &DataValue, op: F) -> Result<DataValue>
412    where
413        F: Fn(f64, f64) -> bool,
414    {
415        debug!(
416            "ArithmeticEvaluator: comparing values {:?} and {:?}",
417            left, right
418        );
419
420        let result = match (left, right) {
421            // Integer comparisons
422            (DataValue::Integer(a), DataValue::Integer(b)) => op(*a as f64, *b as f64),
423            (DataValue::Integer(a), DataValue::Float(b)) => op(*a as f64, *b),
424            (DataValue::Float(a), DataValue::Integer(b)) => op(*a, *b as f64),
425            (DataValue::Float(a), DataValue::Float(b)) => op(*a, *b),
426
427            // String comparisons (lexicographic)
428            (DataValue::String(a), DataValue::String(b)) => {
429                let a_num = a.parse::<f64>();
430                let b_num = b.parse::<f64>();
431                match (a_num, b_num) {
432                    (Ok(a_val), Ok(b_val)) => op(a_val, b_val), // Both are numbers
433                    _ => op(a.len() as f64, b.len() as f64),    // Fallback to length comparison
434                }
435            }
436            (DataValue::InternedString(a), DataValue::InternedString(b)) => {
437                let a_num = a.parse::<f64>();
438                let b_num = b.parse::<f64>();
439                match (a_num, b_num) {
440                    (Ok(a_val), Ok(b_val)) => op(a_val, b_val), // Both are numbers
441                    _ => op(a.len() as f64, b.len() as f64),    // Fallback to length comparison
442                }
443            }
444            (DataValue::String(a), DataValue::InternedString(b)) => {
445                let a_num = a.parse::<f64>();
446                let b_num = b.parse::<f64>();
447                match (a_num, b_num) {
448                    (Ok(a_val), Ok(b_val)) => op(a_val, b_val), // Both are numbers
449                    _ => op(a.len() as f64, b.len() as f64),    // Fallback to length comparison
450                }
451            }
452            (DataValue::InternedString(a), DataValue::String(b)) => {
453                let a_num = a.parse::<f64>();
454                let b_num = b.parse::<f64>();
455                match (a_num, b_num) {
456                    (Ok(a_val), Ok(b_val)) => op(a_val, b_val), // Both are numbers
457                    _ => op(a.len() as f64, b.len() as f64),    // Fallback to length comparison
458                }
459            }
460
461            // Mixed type comparisons (try to convert to numbers)
462            (DataValue::String(a), DataValue::Integer(b)) => {
463                match a.parse::<f64>() {
464                    Ok(a_val) => op(a_val, *b as f64),
465                    Err(_) => false, // String can't be compared with number
466                }
467            }
468            (DataValue::Integer(a), DataValue::String(b)) => {
469                match b.parse::<f64>() {
470                    Ok(b_val) => op(*a as f64, b_val),
471                    Err(_) => false, // String can't be compared with number
472                }
473            }
474            (DataValue::String(a), DataValue::Float(b)) => match a.parse::<f64>() {
475                Ok(a_val) => op(a_val, *b),
476                Err(_) => false,
477            },
478            (DataValue::Float(a), DataValue::String(b)) => match b.parse::<f64>() {
479                Ok(b_val) => op(*a, b_val),
480                Err(_) => false,
481            },
482
483            // NULL comparisons
484            (DataValue::Null, _) | (_, DataValue::Null) => false,
485
486            // Boolean comparisons
487            (DataValue::Boolean(a), DataValue::Boolean(b)) => {
488                op(if *a { 1.0 } else { 0.0 }, if *b { 1.0 } else { 0.0 })
489            }
490
491            _ => {
492                debug!(
493                    "ArithmeticEvaluator: unsupported comparison between {:?} and {:?}",
494                    left, right
495                );
496                false
497            }
498        };
499
500        debug!("ArithmeticEvaluator: comparison result: {}", result);
501        Ok(DataValue::Boolean(result))
502    }
503
504    /// Evaluate a function call
505    fn evaluate_function(
506        &self,
507        name: &str,
508        args: &[SqlExpression],
509        row_index: usize,
510    ) -> Result<DataValue> {
511        // Check if this is an aggregate function
512        let name_upper = name.to_uppercase();
513
514        // Handle COUNT(*) special case
515        if name_upper == "COUNT" && args.len() == 1 {
516            match &args[0] {
517                SqlExpression::Column(col) if col == "*" => {
518                    // COUNT(*) - count all rows (or visible rows if filtered)
519                    let count = if let Some(ref visible) = self.visible_rows {
520                        visible.len() as i64
521                    } else {
522                        self.table.rows.len() as i64
523                    };
524                    return Ok(DataValue::Integer(count));
525                }
526                SqlExpression::StringLiteral(s) if s == "*" => {
527                    // COUNT(*) parsed as StringLiteral
528                    let count = if let Some(ref visible) = self.visible_rows {
529                        visible.len() as i64
530                    } else {
531                        self.table.rows.len() as i64
532                    };
533                    return Ok(DataValue::Integer(count));
534                }
535                _ => {
536                    // COUNT(column) - will be handled below
537                }
538            }
539        }
540
541        // Check aggregate registry
542        if let Some(agg_func) = self.aggregate_registry.get(&name_upper) {
543            // For aggregate functions, we need to evaluate over all rows (or visible rows)
544            let mut state = agg_func.init();
545
546            // Determine which rows to process
547            let rows_to_process: Vec<usize> = if let Some(ref visible) = self.visible_rows {
548                visible.clone()
549            } else {
550                (0..self.table.rows.len()).collect()
551            };
552
553            // If no arguments (like COUNT(*)), process all rows
554            if args.is_empty()
555                || (args.len() == 1 && matches!(&args[0], SqlExpression::Column(c) if c == "*"))
556            {
557                for _ in &rows_to_process {
558                    agg_func.accumulate(&mut state, &DataValue::Integer(1))?;
559                }
560            } else {
561                // Evaluate the argument expression for each row
562                for &row_idx in &rows_to_process {
563                    let value = self.evaluate(&args[0], row_idx)?;
564                    agg_func.accumulate(&mut state, &value)?;
565                }
566            }
567
568            return Ok(agg_func.finalize(state));
569        }
570
571        // First check if this function exists in the registry
572        if let Some(func) = self.function_registry.get(name) {
573            // Evaluate all arguments first
574            let mut evaluated_args = Vec::new();
575            for arg in args {
576                evaluated_args.push(self.evaluate(arg, row_index)?);
577            }
578
579            // Call the function from the registry
580            return func.evaluate(&evaluated_args);
581        }
582
583        // If not in registry, check built-in functions that need special handling
584        // (functions that need access to table data or row context)
585        // Convert function name to uppercase for case-insensitive matching
586        match name_upper.as_str() {
587            "ROUND" => {
588                if args.is_empty() || args.len() > 2 {
589                    return Err(anyhow!("ROUND requires 1 or 2 arguments"));
590                }
591
592                // Evaluate the value to round
593                let value = self.evaluate(&args[0], row_index)?;
594
595                // Get decimal places (default to 0 if not specified)
596                let decimals = if args.len() == 2 {
597                    match self.evaluate(&args[1], row_index)? {
598                        DataValue::Integer(n) => n as i32,
599                        DataValue::Float(f) => f as i32,
600                        _ => return Err(anyhow!("ROUND precision must be a number")),
601                    }
602                } else {
603                    0
604                };
605
606                // Perform rounding
607                match value {
608                    DataValue::Integer(n) => Ok(DataValue::Integer(n)), // Already an integer
609                    DataValue::Float(f) => {
610                        if decimals >= 0 {
611                            let multiplier = 10_f64.powi(decimals);
612                            let rounded = (f * multiplier).round() / multiplier;
613                            if decimals == 0 {
614                                // Return as integer if rounding to 0 decimals
615                                Ok(DataValue::Integer(rounded as i64))
616                            } else {
617                                Ok(DataValue::Float(rounded))
618                            }
619                        } else {
620                            // Negative decimals round to left of decimal point
621                            let divisor = 10_f64.powi(-decimals);
622                            let rounded = (f / divisor).round() * divisor;
623                            Ok(DataValue::Float(rounded))
624                        }
625                    }
626                    _ => Err(anyhow!("ROUND can only be applied to numeric values")),
627                }
628            }
629            "ABS" => {
630                if args.len() != 1 {
631                    return Err(anyhow!("ABS requires exactly 1 argument"));
632                }
633
634                let value = self.evaluate(&args[0], row_index)?;
635                match value {
636                    DataValue::Integer(n) => Ok(DataValue::Integer(n.abs())),
637                    DataValue::Float(f) => Ok(DataValue::Float(f.abs())),
638                    _ => Err(anyhow!("ABS can only be applied to numeric values")),
639                }
640            }
641            "FLOOR" => {
642                if args.len() != 1 {
643                    return Err(anyhow!("FLOOR requires exactly 1 argument"));
644                }
645
646                let value = self.evaluate(&args[0], row_index)?;
647                match value {
648                    DataValue::Integer(n) => Ok(DataValue::Integer(n)),
649                    DataValue::Float(f) => Ok(DataValue::Integer(f.floor() as i64)),
650                    _ => Err(anyhow!("FLOOR can only be applied to numeric values")),
651                }
652            }
653            "CEILING" | "CEIL" => {
654                if args.len() != 1 {
655                    return Err(anyhow!("CEILING requires exactly 1 argument"));
656                }
657
658                let value = self.evaluate(&args[0], row_index)?;
659                match value {
660                    DataValue::Integer(n) => Ok(DataValue::Integer(n)),
661                    DataValue::Float(f) => Ok(DataValue::Integer(f.ceil() as i64)),
662                    _ => Err(anyhow!("CEILING can only be applied to numeric values")),
663                }
664            }
665            "CONVERT" => {
666                if args.len() != 3 {
667                    return Err(anyhow!(
668                        "CONVERT requires exactly 3 arguments: value, from_unit, to_unit"
669                    ));
670                }
671
672                // Evaluate the value
673                let value = self.evaluate(&args[0], row_index)?;
674                let numeric_value = match value {
675                    DataValue::Integer(n) => n as f64,
676                    DataValue::Float(f) => f,
677                    _ => return Err(anyhow!("CONVERT first argument must be numeric")),
678                };
679
680                // Get unit strings
681                let from_unit = match self.evaluate(&args[1], row_index)? {
682                    DataValue::String(s) => s,
683                    DataValue::InternedString(s) => s.to_string(),
684                    _ => {
685                        return Err(anyhow!(
686                            "CONVERT second argument must be a string (from_unit)"
687                        ))
688                    }
689                };
690
691                let to_unit = match self.evaluate(&args[2], row_index)? {
692                    DataValue::String(s) => s,
693                    DataValue::InternedString(s) => s.to_string(),
694                    _ => return Err(anyhow!("CONVERT third argument must be a string (to_unit)")),
695                };
696
697                // Perform conversion
698                match crate::data::unit_converter::convert_units(
699                    numeric_value,
700                    &from_unit,
701                    &to_unit,
702                ) {
703                    Ok(result) => Ok(DataValue::Float(result)),
704                    Err(e) => Err(anyhow!("Unit conversion error: {}", e)),
705                }
706            }
707            "MOD" => {
708                if args.len() != 2 {
709                    return Err(anyhow!("MOD requires exactly 2 arguments"));
710                }
711
712                let dividend = self.evaluate(&args[0], row_index)?;
713                let divisor = self.evaluate(&args[1], row_index)?;
714
715                match (&dividend, &divisor) {
716                    (DataValue::Integer(n), DataValue::Integer(d)) => {
717                        if *d == 0 {
718                            return Err(anyhow!("Division by zero in MOD"));
719                        }
720                        Ok(DataValue::Integer(n % d))
721                    }
722                    _ => {
723                        // Convert to float for mixed types
724                        let n = match dividend {
725                            DataValue::Integer(i) => i as f64,
726                            DataValue::Float(f) => f,
727                            _ => return Err(anyhow!("MOD requires numeric arguments")),
728                        };
729                        let d = match divisor {
730                            DataValue::Integer(i) => i as f64,
731                            DataValue::Float(f) => f,
732                            _ => return Err(anyhow!("MOD requires numeric arguments")),
733                        };
734                        if d == 0.0 {
735                            return Err(anyhow!("Division by zero in MOD"));
736                        }
737                        Ok(DataValue::Float(n % d))
738                    }
739                }
740            }
741            "QUOTIENT" => {
742                if args.len() != 2 {
743                    return Err(anyhow!("QUOTIENT requires exactly 2 arguments"));
744                }
745
746                let numerator = self.evaluate(&args[0], row_index)?;
747                let denominator = self.evaluate(&args[1], row_index)?;
748
749                match (&numerator, &denominator) {
750                    (DataValue::Integer(n), DataValue::Integer(d)) => {
751                        if *d == 0 {
752                            return Err(anyhow!("Division by zero in QUOTIENT"));
753                        }
754                        Ok(DataValue::Integer(n / d))
755                    }
756                    _ => {
757                        // Convert to float for mixed types
758                        let n = match numerator {
759                            DataValue::Integer(i) => i as f64,
760                            DataValue::Float(f) => f,
761                            _ => return Err(anyhow!("QUOTIENT requires numeric arguments")),
762                        };
763                        let d = match denominator {
764                            DataValue::Integer(i) => i as f64,
765                            DataValue::Float(f) => f,
766                            _ => return Err(anyhow!("QUOTIENT requires numeric arguments")),
767                        };
768                        if d == 0.0 {
769                            return Err(anyhow!("Division by zero in QUOTIENT"));
770                        }
771                        Ok(DataValue::Integer((n / d).trunc() as i64))
772                    }
773                }
774            }
775            "POWER" | "POW" => {
776                if args.len() != 2 {
777                    return Err(anyhow!("POWER requires exactly 2 arguments"));
778                }
779
780                let base = self.evaluate(&args[0], row_index)?;
781                let exponent = self.evaluate(&args[1], row_index)?;
782
783                match (&base, &exponent) {
784                    (DataValue::Integer(b), DataValue::Integer(e)) => {
785                        if *e >= 0 && *e <= i32::MAX as i64 {
786                            Ok(DataValue::Float((*b as f64).powi(*e as i32)))
787                        } else {
788                            Ok(DataValue::Float((*b as f64).powf(*e as f64)))
789                        }
790                    }
791                    _ => {
792                        // Convert to float for mixed types or floats
793                        let b = match base {
794                            DataValue::Integer(i) => i as f64,
795                            DataValue::Float(f) => f,
796                            _ => return Err(anyhow!("POWER requires numeric arguments")),
797                        };
798                        let e = match exponent {
799                            DataValue::Integer(i) => i as f64,
800                            DataValue::Float(f) => f,
801                            _ => return Err(anyhow!("POWER requires numeric arguments")),
802                        };
803                        Ok(DataValue::Float(b.powf(e)))
804                    }
805                }
806            }
807            "SQRT" => {
808                if args.len() != 1 {
809                    return Err(anyhow!("SQRT requires exactly 1 argument"));
810                }
811
812                let value = self.evaluate(&args[0], row_index)?;
813                match value {
814                    DataValue::Integer(n) => {
815                        if n < 0 {
816                            return Err(anyhow!("SQRT of negative number"));
817                        }
818                        Ok(DataValue::Float((n as f64).sqrt()))
819                    }
820                    DataValue::Float(f) => {
821                        if f < 0.0 {
822                            return Err(anyhow!("SQRT of negative number"));
823                        }
824                        Ok(DataValue::Float(f.sqrt()))
825                    }
826                    _ => Err(anyhow!("SQRT can only be applied to numeric values")),
827                }
828            }
829            "EXP" => {
830                if args.len() != 1 {
831                    return Err(anyhow!("EXP requires exactly 1 argument"));
832                }
833
834                let value = self.evaluate(&args[0], row_index)?;
835                match value {
836                    DataValue::Integer(n) => Ok(DataValue::Float((n as f64).exp())),
837                    DataValue::Float(f) => Ok(DataValue::Float(f.exp())),
838                    _ => Err(anyhow!("EXP can only be applied to numeric values")),
839                }
840            }
841            "LN" => {
842                if args.len() != 1 {
843                    return Err(anyhow!("LN requires exactly 1 argument"));
844                }
845
846                let value = self.evaluate(&args[0], row_index)?;
847                match value {
848                    DataValue::Integer(n) => {
849                        if n <= 0 {
850                            return Err(anyhow!("LN of non-positive number"));
851                        }
852                        Ok(DataValue::Float((n as f64).ln()))
853                    }
854                    DataValue::Float(f) => {
855                        if f <= 0.0 {
856                            return Err(anyhow!("LN of non-positive number"));
857                        }
858                        Ok(DataValue::Float(f.ln()))
859                    }
860                    _ => Err(anyhow!("LN can only be applied to numeric values")),
861                }
862            }
863            "LOG" | "LOG10" => {
864                if name == "LOG" && args.len() == 2 {
865                    // LOG with custom base
866                    let value = self.evaluate(&args[0], row_index)?;
867                    let base = self.evaluate(&args[1], row_index)?;
868
869                    let n = match value {
870                        DataValue::Integer(i) => i as f64,
871                        DataValue::Float(f) => f,
872                        _ => return Err(anyhow!("LOG requires numeric arguments")),
873                    };
874                    let b = match base {
875                        DataValue::Integer(i) => i as f64,
876                        DataValue::Float(f) => f,
877                        _ => return Err(anyhow!("LOG requires numeric arguments")),
878                    };
879
880                    if n <= 0.0 {
881                        return Err(anyhow!("LOG of non-positive number"));
882                    }
883                    if b <= 0.0 || b == 1.0 {
884                        return Err(anyhow!("Invalid LOG base"));
885                    }
886                    Ok(DataValue::Float(n.log(b)))
887                } else if (name == "LOG" && args.len() == 1) || name == "LOG10" {
888                    // LOG10 or LOG with default base 10
889                    if args.len() != 1 {
890                        return Err(anyhow!("{} requires exactly 1 argument", name));
891                    }
892
893                    let value = self.evaluate(&args[0], row_index)?;
894                    match value {
895                        DataValue::Integer(n) => {
896                            if n <= 0 {
897                                return Err(anyhow!("LOG10 of non-positive number"));
898                            }
899                            Ok(DataValue::Float((n as f64).log10()))
900                        }
901                        DataValue::Float(f) => {
902                            if f <= 0.0 {
903                                return Err(anyhow!("LOG10 of non-positive number"));
904                            }
905                            Ok(DataValue::Float(f.log10()))
906                        }
907                        _ => Err(anyhow!("LOG10 can only be applied to numeric values")),
908                    }
909                } else {
910                    Err(anyhow!("LOG requires 1 or 2 arguments"))
911                }
912            }
913            "DATEDIFF" => {
914                if args.len() != 3 {
915                    return Err(anyhow!(
916                        "DATEDIFF requires exactly 3 arguments: unit, date1, date2"
917                    ));
918                }
919
920                // First argument: unit (day, month, year, hour, minute, second)
921                let unit = match self.evaluate(&args[0], row_index)? {
922                    DataValue::String(s) => s.to_lowercase(),
923                    DataValue::InternedString(s) => s.to_lowercase(),
924                    _ => return Err(anyhow!("DATEDIFF unit must be a string")),
925                };
926
927                // Parse both dates using the central function
928                let date1_val = self.evaluate(&args[1], row_index)?;
929                let date1 = match date1_val {
930                    DataValue::String(ref s) | DataValue::DateTime(ref s) => {
931                        self.parse_datetime_with_notation(s)?
932                    }
933                    DataValue::InternedString(ref s) => {
934                        self.parse_datetime_with_notation(s.as_str())?
935                    }
936                    _ => return Err(anyhow!("DATEDIFF requires date/datetime values")),
937                };
938
939                let date2_val = self.evaluate(&args[2], row_index)?;
940                let date2 = match date2_val {
941                    DataValue::String(ref s) | DataValue::DateTime(ref s) => {
942                        self.parse_datetime_with_notation(s)?
943                    }
944                    DataValue::InternedString(ref s) => {
945                        self.parse_datetime_with_notation(s.as_str())?
946                    }
947                    _ => return Err(anyhow!("DATEDIFF requires date/datetime values")),
948                };
949
950                // Calculate difference based on unit
951                let diff = match unit.as_str() {
952                    "day" | "days" => {
953                        let duration = date2.signed_duration_since(date1);
954                        duration.num_days()
955                    }
956                    "month" | "months" => {
957                        // Approximate months as 30.44 days
958                        let duration = date2.signed_duration_since(date1);
959                        duration.num_days() / 30
960                    }
961                    "year" | "years" => {
962                        // Approximate years as 365.25 days
963                        let duration = date2.signed_duration_since(date1);
964                        duration.num_days() / 365
965                    }
966                    "hour" | "hours" => {
967                        let duration = date2.signed_duration_since(date1);
968                        duration.num_hours()
969                    }
970                    "minute" | "minutes" => {
971                        let duration = date2.signed_duration_since(date1);
972                        duration.num_minutes()
973                    }
974                    "second" | "seconds" => {
975                        let duration = date2.signed_duration_since(date1);
976                        duration.num_seconds()
977                    }
978                    _ => {
979                        return Err(anyhow!(
980                        "Unknown DATEDIFF unit: {}. Use: day, month, year, hour, minute, second",
981                        unit
982                    ))
983                    }
984                };
985
986                Ok(DataValue::Integer(diff))
987            }
988            "NOW" => {
989                if !args.is_empty() {
990                    return Err(anyhow!("NOW takes no arguments"));
991                }
992                let now = Utc::now();
993                Ok(DataValue::DateTime(
994                    now.format("%Y-%m-%d %H:%M:%S").to_string(),
995                ))
996            }
997            "TODAY" => {
998                if !args.is_empty() {
999                    return Err(anyhow!("TODAY takes no arguments"));
1000                }
1001                let today = Utc::now().date_naive();
1002                Ok(DataValue::String(today.format("%Y-%m-%d").to_string()))
1003            }
1004            "DATEADD" => {
1005                if args.len() != 3 {
1006                    return Err(anyhow!(
1007                        "DATEADD requires exactly 3 arguments: unit, number, date"
1008                    ));
1009                }
1010
1011                // First argument: unit (day, month, year, hour, minute, second)
1012                let unit = match self.evaluate(&args[0], row_index)? {
1013                    DataValue::String(s) => s.to_lowercase(),
1014                    DataValue::InternedString(s) => s.to_lowercase(),
1015                    _ => return Err(anyhow!("DATEADD unit must be a string")),
1016                };
1017
1018                // Second argument: number to add (can be negative for subtraction)
1019                let amount = match self.evaluate(&args[1], row_index)? {
1020                    DataValue::Integer(i) => i,
1021                    DataValue::Float(f) => f as i64,
1022                    _ => return Err(anyhow!("DATEADD amount must be a number")),
1023                };
1024
1025                // Third argument: base date - parse using the central function
1026                let base_date_value = self.evaluate(&args[2], row_index)?;
1027                let base_date = match base_date_value {
1028                    DataValue::String(ref s) | DataValue::DateTime(ref s) => {
1029                        self.parse_datetime_with_notation(s)?
1030                    }
1031                    DataValue::InternedString(ref s) => {
1032                        self.parse_datetime_with_notation(s.as_str())?
1033                    }
1034                    _ => return Err(anyhow!("DATEADD requires date/datetime values")),
1035                };
1036
1037                // Add the specified amount based on unit
1038                let result_date = match unit.as_str() {
1039                    "day" | "days" => base_date + chrono::Duration::days(amount),
1040                    "month" | "months" => {
1041                        // For months, we need to be careful about month boundaries
1042                        let mut year = base_date.year();
1043                        let mut month = base_date.month() as i32;
1044                        let day = base_date.day();
1045
1046                        month += amount as i32;
1047
1048                        // Handle month overflow/underflow
1049                        while month > 12 {
1050                            month -= 12;
1051                            year += 1;
1052                        }
1053                        while month < 1 {
1054                            month += 12;
1055                            year -= 1;
1056                        }
1057
1058                        // Create new date, handling day overflow (e.g., Jan 31 + 1 month = Feb 28/29)
1059                        let target_date = NaiveDate::from_ymd_opt(year, month as u32, day)
1060                            .unwrap_or_else(|| {
1061                                // If day doesn't exist in target month, use the last day of that month
1062                                // Try decreasing days until we find a valid one
1063                                for test_day in (1..=day).rev() {
1064                                    if let Some(date) =
1065                                        NaiveDate::from_ymd_opt(year, month as u32, test_day)
1066                                    {
1067                                        return date;
1068                                    }
1069                                }
1070                                // This should never happen, but fallback to day 28 as safety
1071                                NaiveDate::from_ymd_opt(year, month as u32, 28).unwrap()
1072                            });
1073
1074                        Utc.from_utc_datetime(&target_date.and_time(base_date.time()))
1075                    }
1076                    "year" | "years" => {
1077                        let new_year = base_date.year() + amount as i32;
1078                        let target_date =
1079                            NaiveDate::from_ymd_opt(new_year, base_date.month(), base_date.day())
1080                                .unwrap_or_else(|| {
1081                                    // Handle Feb 29 in non-leap years
1082                                    NaiveDate::from_ymd_opt(new_year, base_date.month(), 28)
1083                                        .unwrap()
1084                                });
1085                        Utc.from_utc_datetime(&target_date.and_time(base_date.time()))
1086                    }
1087                    "hour" | "hours" => base_date + chrono::Duration::hours(amount),
1088                    "minute" | "minutes" => base_date + chrono::Duration::minutes(amount),
1089                    "second" | "seconds" => base_date + chrono::Duration::seconds(amount),
1090                    _ => {
1091                        return Err(anyhow!(
1092                            "Unknown DATEADD unit: {}. Use: day, month, year, hour, minute, second",
1093                            unit
1094                        ))
1095                    }
1096                };
1097
1098                // Return as datetime string
1099                Ok(DataValue::DateTime(
1100                    result_date.format("%Y-%m-%d %H:%M:%S").to_string(),
1101                ))
1102            }
1103            "TEXTJOIN" => {
1104                if args.len() < 3 {
1105                    return Err(anyhow!("TEXTJOIN requires at least 3 arguments: delimiter, ignore_empty, text1, [text2, ...]"));
1106                }
1107
1108                // First argument: delimiter
1109                let delimiter = match self.evaluate(&args[0], row_index)? {
1110                    DataValue::String(s) => s,
1111                    DataValue::InternedString(s) => s.to_string(),
1112                    DataValue::Integer(n) => n.to_string(),
1113                    DataValue::Float(f) => f.to_string(),
1114                    DataValue::Boolean(b) => b.to_string(),
1115                    DataValue::Null => String::new(),
1116                    _ => String::new(),
1117                };
1118
1119                // Second argument: ignore_empty (treat as boolean - 0 is false, anything else is true)
1120                let ignore_empty = match self.evaluate(&args[1], row_index)? {
1121                    DataValue::Integer(n) => n != 0,
1122                    DataValue::Float(f) => f != 0.0,
1123                    DataValue::Boolean(b) => b,
1124                    DataValue::String(s) => {
1125                        !s.is_empty() && s != "0" && s.to_lowercase() != "false"
1126                    }
1127                    DataValue::InternedString(s) => {
1128                        !s.is_empty() && s.as_str() != "0" && s.to_lowercase() != "false"
1129                    }
1130                    DataValue::Null => false,
1131                    _ => true,
1132                };
1133
1134                // Remaining arguments: values to join
1135                let mut values = Vec::new();
1136                for i in 2..args.len() {
1137                    let value = self.evaluate(&args[i], row_index)?;
1138                    let string_value = match value {
1139                        DataValue::String(s) => Some(s),
1140                        DataValue::InternedString(s) => Some(s.to_string()),
1141                        DataValue::Integer(n) => Some(n.to_string()),
1142                        DataValue::Float(f) => Some(f.to_string()),
1143                        DataValue::Boolean(b) => Some(b.to_string()),
1144                        DataValue::DateTime(dt) => Some(dt),
1145                        DataValue::Null => {
1146                            if ignore_empty {
1147                                None
1148                            } else {
1149                                Some(String::new())
1150                            }
1151                        }
1152                        _ => {
1153                            if ignore_empty {
1154                                None
1155                            } else {
1156                                Some(String::new())
1157                            }
1158                        }
1159                    };
1160
1161                    if let Some(s) = string_value {
1162                        if !ignore_empty || !s.is_empty() {
1163                            values.push(s);
1164                        }
1165                    }
1166                }
1167
1168                Ok(DataValue::String(values.join(&delimiter)))
1169            }
1170            "EULER" => {
1171                // Euler's number constant - no arguments
1172                if !args.is_empty() {
1173                    return Err(anyhow!("EULER() takes no arguments"));
1174                }
1175                Ok(DataValue::Float(std::f64::consts::E))
1176            }
1177            "TAU" => {
1178                // Tau constant (2*PI) - no arguments
1179                if !args.is_empty() {
1180                    return Err(anyhow!("TAU() takes no arguments"));
1181                }
1182                Ok(DataValue::Float(std::f64::consts::TAU))
1183            }
1184            "PHI" => {
1185                // Golden ratio constant - no arguments
1186                if !args.is_empty() {
1187                    return Err(anyhow!("PHI() takes no arguments"));
1188                }
1189                Ok(DataValue::Float(1.618033988749895))
1190            }
1191            "SQRT2" => {
1192                // Square root of 2 constant - no arguments
1193                if !args.is_empty() {
1194                    return Err(anyhow!("SQRT2() takes no arguments"));
1195                }
1196                Ok(DataValue::Float(std::f64::consts::SQRT_2))
1197            }
1198            "LN2" => {
1199                // Natural logarithm of 2 - no arguments
1200                if !args.is_empty() {
1201                    return Err(anyhow!("LN2() takes no arguments"));
1202                }
1203                Ok(DataValue::Float(std::f64::consts::LN_2))
1204            }
1205            "LN10" => {
1206                // Natural logarithm of 10 - no arguments
1207                if !args.is_empty() {
1208                    return Err(anyhow!("LN10() takes no arguments"));
1209                }
1210                Ok(DataValue::Float(std::f64::consts::LN_10))
1211            }
1212            // Physics Constants - Fundamental Constants
1213            "C" | "SPEED_OF_LIGHT" => {
1214                // Speed of light in vacuum (m/s)
1215                if !args.is_empty() {
1216                    return Err(anyhow!("C() takes no arguments"));
1217                }
1218                Ok(DataValue::Float(299792458.0))
1219            }
1220            "G" | "GRAVITATIONAL_CONSTANT" => {
1221                // Gravitational constant (m^3 kg^-1 s^-2)
1222                if !args.is_empty() {
1223                    return Err(anyhow!("G() takes no arguments"));
1224                }
1225                Ok(DataValue::Float(6.67430e-11))
1226            }
1227            "H" | "PLANCK" => {
1228                // Planck constant (J⋅s)
1229                if !args.is_empty() {
1230                    return Err(anyhow!("PLANCK() takes no arguments"));
1231                }
1232                Ok(DataValue::Float(6.62607015e-34))
1233            }
1234            "HBAR" => {
1235                // Reduced Planck constant ℏ = h/(2π) (J⋅s)
1236                if !args.is_empty() {
1237                    return Err(anyhow!("HBAR() takes no arguments"));
1238                }
1239                Ok(DataValue::Float(1.054571817e-34))
1240            }
1241            "K" | "BOLTZMANN" => {
1242                // Boltzmann constant (J/K)
1243                if !args.is_empty() {
1244                    return Err(anyhow!("BOLTZMANN() takes no arguments"));
1245                }
1246                Ok(DataValue::Float(1.380649e-23))
1247            }
1248            "NA" | "AVOGADRO" => {
1249                // Avogadro's number (mol^-1)
1250                if !args.is_empty() {
1251                    return Err(anyhow!("AVOGADRO() takes no arguments"));
1252                }
1253                Ok(DataValue::Float(6.02214076e23))
1254            }
1255            "R" | "GAS_CONSTANT" => {
1256                // Universal gas constant (J mol^-1 K^-1)
1257                if !args.is_empty() {
1258                    return Err(anyhow!("R() takes no arguments"));
1259                }
1260                Ok(DataValue::Float(8.314462618))
1261            }
1262            // Physics Constants - Electromagnetic
1263            "E0" | "EPSILON0" | "PERMITTIVITY" => {
1264                // Electric permittivity of vacuum (F/m)
1265                if !args.is_empty() {
1266                    return Err(anyhow!("E0() takes no arguments"));
1267                }
1268                Ok(DataValue::Float(8.8541878128e-12))
1269            }
1270            "MU0" | "PERMEABILITY" => {
1271                // Magnetic permeability of vacuum (N/A^2)
1272                if !args.is_empty() {
1273                    return Err(anyhow!("MU0() takes no arguments"));
1274                }
1275                Ok(DataValue::Float(1.25663706212e-6))
1276            }
1277            "QE" | "ELEMENTARY_CHARGE" => {
1278                // Elementary charge (C)
1279                if !args.is_empty() {
1280                    return Err(anyhow!("QE() takes no arguments"));
1281                }
1282                Ok(DataValue::Float(1.602176634e-19))
1283            }
1284            "MP" | "MASS_PROTON" => {
1285                // Proton mass (kg)
1286                if !args.is_empty() {
1287                    return Err(anyhow!("MP() takes no arguments"));
1288                }
1289                Ok(DataValue::Float(1.67262192369e-27))
1290            }
1291            "MN" | "MASS_NEUTRON" => {
1292                // Neutron mass (kg)
1293                if !args.is_empty() {
1294                    return Err(anyhow!("MN() takes no arguments"));
1295                }
1296                Ok(DataValue::Float(1.67492749804e-27))
1297            }
1298            "AMU" | "ATOMIC_MASS_UNIT" => {
1299                // Atomic mass unit (kg)
1300                if !args.is_empty() {
1301                    return Err(anyhow!("AMU() takes no arguments"));
1302                }
1303                Ok(DataValue::Float(1.66053906660e-27))
1304            }
1305            // Physics Constants - Other
1306            "ALPHA" | "FINE_STRUCTURE" => {
1307                // Fine structure constant (dimensionless)
1308                if !args.is_empty() {
1309                    return Err(anyhow!("ALPHA() takes no arguments"));
1310                }
1311                Ok(DataValue::Float(7.2973525693e-3))
1312            }
1313            "RY" | "RYDBERG" => {
1314                // Rydberg constant (m^-1)
1315                if !args.is_empty() {
1316                    return Err(anyhow!("RYDBERG() takes no arguments"));
1317                }
1318                Ok(DataValue::Float(10973731.568160))
1319            }
1320            "SIGMA" | "STEFAN_BOLTZMANN" => {
1321                // Stefan-Boltzmann constant (W m^-2 K^-4)
1322                if !args.is_empty() {
1323                    return Err(anyhow!("SIGMA() takes no arguments"));
1324                }
1325                Ok(DataValue::Float(5.670374419e-8))
1326            }
1327            // Particle Radii Constants
1328            "RE" | "RADIUS_ELECTRON" => {
1329                // Classical electron radius (m)
1330                if !args.is_empty() {
1331                    return Err(anyhow!("RE() takes no arguments"));
1332                }
1333                Ok(DataValue::Float(2.8179403262e-15))
1334            }
1335            "RP" | "RADIUS_PROTON" => {
1336                // Proton radius (m) - CODATA 2018 value
1337                if !args.is_empty() {
1338                    return Err(anyhow!("RP() takes no arguments"));
1339                }
1340                Ok(DataValue::Float(8.414e-16))
1341            }
1342            "RN" | "RADIUS_NEUTRON" => {
1343                // Neutron radius (m) - approximate
1344                if !args.is_empty() {
1345                    return Err(anyhow!("RN() takes no arguments"));
1346                }
1347                Ok(DataValue::Float(8.4e-16))
1348            }
1349            // Astronomical Constants - Solar System
1350            "MASS_SUN" | "MSUN" => {
1351                // Solar mass (kg)
1352                if !args.is_empty() {
1353                    return Err(anyhow!("MASS_SUN() takes no arguments"));
1354                }
1355                Ok(DataValue::Float(1.98892e30))
1356            }
1357            "RADIUS_SUN" | "RSUN" => {
1358                // Solar radius (m)
1359                if !args.is_empty() {
1360                    return Err(anyhow!("RADIUS_SUN() takes no arguments"));
1361                }
1362                Ok(DataValue::Float(6.96342e8))
1363            }
1364            "RADIUS_EARTH" | "REARTH" => {
1365                // Earth mean radius (m)
1366                if !args.is_empty() {
1367                    return Err(anyhow!("RADIUS_EARTH() takes no arguments"));
1368                }
1369                Ok(DataValue::Float(6.371e6))
1370            }
1371            "MASS_MOON" | "MMOON" => {
1372                // Moon mass (kg)
1373                if !args.is_empty() {
1374                    return Err(anyhow!("MASS_MOON() takes no arguments"));
1375                }
1376                Ok(DataValue::Float(7.342e22))
1377            }
1378            "RADIUS_MOON" | "RMOON" => {
1379                // Moon mean radius (m)
1380                if !args.is_empty() {
1381                    return Err(anyhow!("RADIUS_MOON() takes no arguments"));
1382                }
1383                Ok(DataValue::Float(1.7374e6))
1384            }
1385            // Planets
1386            "MASS_MERCURY" => {
1387                // Mercury mass (kg)
1388                if !args.is_empty() {
1389                    return Err(anyhow!("MASS_MERCURY() takes no arguments"));
1390                }
1391                Ok(DataValue::Float(3.3011e23))
1392            }
1393            "MASS_VENUS" => {
1394                // Venus mass (kg)
1395                if !args.is_empty() {
1396                    return Err(anyhow!("MASS_VENUS() takes no arguments"));
1397                }
1398                Ok(DataValue::Float(4.8675e24))
1399            }
1400            "MASS_MARS" => {
1401                // Mars mass (kg)
1402                if !args.is_empty() {
1403                    return Err(anyhow!("MASS_MARS() takes no arguments"));
1404                }
1405                Ok(DataValue::Float(6.4171e23))
1406            }
1407            "MASS_JUPITER" => {
1408                // Jupiter mass (kg)
1409                if !args.is_empty() {
1410                    return Err(anyhow!("MASS_JUPITER() takes no arguments"));
1411                }
1412                Ok(DataValue::Float(1.8982e27))
1413            }
1414            "MASS_SATURN" => {
1415                // Saturn mass (kg)
1416                if !args.is_empty() {
1417                    return Err(anyhow!("MASS_SATURN() takes no arguments"));
1418                }
1419                Ok(DataValue::Float(5.6834e26))
1420            }
1421            "MASS_URANUS" => {
1422                // Uranus mass (kg)
1423                if !args.is_empty() {
1424                    return Err(anyhow!("MASS_URANUS() takes no arguments"));
1425                }
1426                Ok(DataValue::Float(8.6810e25))
1427            }
1428            "MASS_NEPTUNE" => {
1429                // Neptune mass (kg)
1430                if !args.is_empty() {
1431                    return Err(anyhow!("MASS_NEPTUNE() takes no arguments"));
1432                }
1433                Ok(DataValue::Float(1.02413e26))
1434            }
1435            // Orbital distances (mean distance from Sun in meters)
1436            "DIST_MERCURY" | "AU_MERCURY" => {
1437                // Mercury mean distance from Sun (m)
1438                if !args.is_empty() {
1439                    return Err(anyhow!("DIST_MERCURY() takes no arguments"));
1440                }
1441                Ok(DataValue::Float(5.791e10))
1442            }
1443            "DIST_VENUS" | "AU_VENUS" => {
1444                // Venus mean distance from Sun (m)
1445                if !args.is_empty() {
1446                    return Err(anyhow!("DIST_VENUS() takes no arguments"));
1447                }
1448                Ok(DataValue::Float(1.082e11))
1449            }
1450            "DIST_EARTH" | "AU_EARTH" | "AU" => {
1451                // Earth mean distance from Sun = 1 AU (m)
1452                if !args.is_empty() {
1453                    return Err(anyhow!("AU() takes no arguments"));
1454                }
1455                Ok(DataValue::Float(1.495978707e11))
1456            }
1457            "DIST_MARS" | "AU_MARS" => {
1458                // Mars mean distance from Sun (m)
1459                if !args.is_empty() {
1460                    return Err(anyhow!("DIST_MARS() takes no arguments"));
1461                }
1462                Ok(DataValue::Float(2.279e11))
1463            }
1464            "DIST_JUPITER" | "AU_JUPITER" => {
1465                // Jupiter mean distance from Sun (m)
1466                if !args.is_empty() {
1467                    return Err(anyhow!("DIST_JUPITER() takes no arguments"));
1468                }
1469                Ok(DataValue::Float(7.786e11))
1470            }
1471            "DIST_SATURN" | "AU_SATURN" => {
1472                // Saturn mean distance from Sun (m)
1473                if !args.is_empty() {
1474                    return Err(anyhow!("DIST_SATURN() takes no arguments"));
1475                }
1476                Ok(DataValue::Float(1.4335e12))
1477            }
1478            "DIST_URANUS" | "AU_URANUS" => {
1479                // Uranus mean distance from Sun (m)
1480                if !args.is_empty() {
1481                    return Err(anyhow!("DIST_URANUS() takes no arguments"));
1482                }
1483                Ok(DataValue::Float(2.8725e12))
1484            }
1485            "DIST_NEPTUNE" | "AU_NEPTUNE" => {
1486                // Neptune mean distance from Sun (m)
1487                if !args.is_empty() {
1488                    return Err(anyhow!("DIST_NEPTUNE() takes no arguments"));
1489                }
1490                Ok(DataValue::Float(4.4951e12))
1491            }
1492            // Other useful astronomical constants
1493            "PARSEC" | "PC" => {
1494                // Parsec in meters
1495                if !args.is_empty() {
1496                    return Err(anyhow!("PARSEC() takes no arguments"));
1497                }
1498                Ok(DataValue::Float(3.0857e16))
1499            }
1500            "LIGHTYEAR" | "LY" => {
1501                // Light year in meters
1502                if !args.is_empty() {
1503                    return Err(anyhow!("LIGHTYEAR() takes no arguments"));
1504                }
1505                Ok(DataValue::Float(9.4607e15))
1506            }
1507            // Angle conversion functions (demonstrating easy extensibility)
1508            "DEGREES" => {
1509                // Convert radians to degrees
1510                if args.len() != 1 {
1511                    return Err(anyhow!("DEGREES requires exactly 1 argument"));
1512                }
1513                let radians = match self.evaluate(&args[0], row_index)? {
1514                    DataValue::Integer(n) => n as f64,
1515                    DataValue::Float(f) => f,
1516                    _ => return Err(anyhow!("DEGREES requires a numeric argument")),
1517                };
1518                Ok(DataValue::Float(radians * 180.0 / std::f64::consts::PI))
1519            }
1520            "RADIANS" => {
1521                // Convert degrees to radians
1522                if args.len() != 1 {
1523                    return Err(anyhow!("RADIANS requires exactly 1 argument"));
1524                }
1525                let degrees = match self.evaluate(&args[0], row_index)? {
1526                    DataValue::Integer(n) => n as f64,
1527                    DataValue::Float(f) => f,
1528                    _ => return Err(anyhow!("RADIANS requires a numeric argument")),
1529                };
1530                Ok(DataValue::Float(degrees * std::f64::consts::PI / 180.0))
1531            }
1532            "MID" => {
1533                // MID(text, start_position, length) - Excel-compatible, 1-based indexing
1534                if args.len() != 3 {
1535                    return Err(anyhow!(
1536                        "MID requires exactly 3 arguments: text, start_position, length"
1537                    ));
1538                }
1539
1540                let text = match self.evaluate(&args[0], row_index)? {
1541                    DataValue::String(s) => s,
1542                    DataValue::InternedString(s) => s.to_string(),
1543                    DataValue::Integer(n) => n.to_string(),
1544                    DataValue::Float(f) => f.to_string(),
1545                    DataValue::Null => String::new(),
1546                    _ => return Err(anyhow!("MID first argument must be convertible to text")),
1547                };
1548
1549                let start_pos = match self.evaluate(&args[1], row_index)? {
1550                    DataValue::Integer(n) => n,
1551                    DataValue::Float(f) => f as i64,
1552                    _ => return Err(anyhow!("MID start_position must be a number")),
1553                };
1554
1555                let length = match self.evaluate(&args[2], row_index)? {
1556                    DataValue::Integer(n) => n,
1557                    DataValue::Float(f) => f as i64,
1558                    _ => return Err(anyhow!("MID length must be a number")),
1559                };
1560
1561                // Excel MID uses 1-based indexing
1562                if start_pos < 1 {
1563                    return Err(anyhow!("MID start_position must be >= 1"));
1564                }
1565                if length < 0 {
1566                    return Err(anyhow!("MID length must be >= 0"));
1567                }
1568
1569                // Convert to 0-based index
1570                let start_idx = (start_pos - 1) as usize;
1571                let chars: Vec<char> = text.chars().collect();
1572
1573                // If start position is beyond string length, return empty string
1574                if start_idx >= chars.len() {
1575                    return Ok(DataValue::String(String::new()));
1576                }
1577
1578                // Extract substring
1579                let end_idx = std::cmp::min(start_idx + length as usize, chars.len());
1580                let result: String = chars[start_idx..end_idx].iter().collect();
1581
1582                Ok(DataValue::String(result))
1583            }
1584            "UPPER" => {
1585                if args.len() != 1 {
1586                    return Err(anyhow!("UPPER requires exactly 1 argument"));
1587                }
1588
1589                let text = match self.evaluate(&args[0], row_index)? {
1590                    DataValue::String(s) => s,
1591                    DataValue::InternedString(s) => s.to_string(),
1592                    DataValue::Integer(n) => n.to_string(),
1593                    DataValue::Float(f) => f.to_string(),
1594                    DataValue::Null => String::new(),
1595                    _ => return Err(anyhow!("UPPER argument must be convertible to text")),
1596                };
1597
1598                Ok(DataValue::String(text.to_uppercase()))
1599            }
1600            "LOWER" => {
1601                if args.len() != 1 {
1602                    return Err(anyhow!("LOWER requires exactly 1 argument"));
1603                }
1604
1605                let text = match self.evaluate(&args[0], row_index)? {
1606                    DataValue::String(s) => s,
1607                    DataValue::InternedString(s) => s.to_string(),
1608                    DataValue::Integer(n) => n.to_string(),
1609                    DataValue::Float(f) => f.to_string(),
1610                    DataValue::Null => String::new(),
1611                    _ => return Err(anyhow!("LOWER argument must be convertible to text")),
1612                };
1613
1614                Ok(DataValue::String(text.to_lowercase()))
1615            }
1616            "TRIM" => {
1617                if args.len() != 1 {
1618                    return Err(anyhow!("TRIM requires exactly 1 argument"));
1619                }
1620
1621                let text = match self.evaluate(&args[0], row_index)? {
1622                    DataValue::String(s) => s,
1623                    DataValue::InternedString(s) => s.to_string(),
1624                    DataValue::Integer(n) => n.to_string(),
1625                    DataValue::Float(f) => f.to_string(),
1626                    DataValue::Null => String::new(),
1627                    _ => return Err(anyhow!("TRIM argument must be convertible to text")),
1628                };
1629
1630                Ok(DataValue::String(text.trim().to_string()))
1631            }
1632            _ => Err(anyhow!("Unknown function: {}", name)),
1633        }
1634    }
1635
1636    /// Evaluate a method call on a column (e.g., column.Trim())
1637    fn evaluate_method_call(
1638        &self,
1639        object: &str,
1640        method: &str,
1641        args: &[SqlExpression],
1642        row_index: usize,
1643    ) -> Result<DataValue> {
1644        // Get column value
1645        let col_index = self.table.get_column_index(object).ok_or_else(|| {
1646            let suggestion = self.find_similar_column(object);
1647            match suggestion {
1648                Some(similar) => {
1649                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
1650                }
1651                None => anyhow!("Column '{}' not found", object),
1652            }
1653        })?;
1654
1655        let cell_value = self.table.get_value(row_index, col_index).cloned();
1656
1657        self.evaluate_method_on_value(
1658            &cell_value.unwrap_or(DataValue::Null),
1659            method,
1660            args,
1661            row_index,
1662        )
1663    }
1664
1665    /// Evaluate a method on a value
1666    fn evaluate_method_on_value(
1667        &self,
1668        value: &DataValue,
1669        method: &str,
1670        args: &[SqlExpression],
1671        row_index: usize,
1672    ) -> Result<DataValue> {
1673        match method.to_lowercase().as_str() {
1674            "trim" | "trimstart" | "trimend" => {
1675                if !args.is_empty() {
1676                    return Err(anyhow!("{} takes no arguments", method));
1677                }
1678
1679                // Convert value to string and apply trim
1680                let str_val = match value {
1681                    DataValue::String(s) => s.clone(),
1682                    DataValue::InternedString(s) => s.to_string(),
1683                    DataValue::Integer(n) => n.to_string(),
1684                    DataValue::Float(f) => f.to_string(),
1685                    DataValue::Boolean(b) => b.to_string(),
1686                    DataValue::DateTime(dt) => dt.clone(),
1687                    DataValue::Null => return Ok(DataValue::Null),
1688                };
1689
1690                let result = match method.to_lowercase().as_str() {
1691                    "trim" => str_val.trim().to_string(),
1692                    "trimstart" => str_val.trim_start().to_string(),
1693                    "trimend" => str_val.trim_end().to_string(),
1694                    _ => unreachable!(),
1695                };
1696
1697                Ok(DataValue::String(result))
1698            }
1699            "length" => {
1700                if !args.is_empty() {
1701                    return Err(anyhow!("Length takes no arguments"));
1702                }
1703
1704                // Get string length
1705                let len = match value {
1706                    DataValue::String(s) => s.len(),
1707                    DataValue::InternedString(s) => s.len(),
1708                    DataValue::Integer(n) => n.to_string().len(),
1709                    DataValue::Float(f) => f.to_string().len(),
1710                    DataValue::Boolean(b) => b.to_string().len(),
1711                    DataValue::DateTime(dt) => dt.len(),
1712                    DataValue::Null => return Ok(DataValue::Integer(0)),
1713                };
1714
1715                Ok(DataValue::Integer(len as i64))
1716            }
1717            "indexof" => {
1718                if args.len() != 1 {
1719                    return Err(anyhow!("IndexOf requires exactly 1 argument"));
1720                }
1721
1722                // Get the search string from args
1723                let search_str = match self.evaluate(&args[0], row_index)? {
1724                    DataValue::String(s) => s,
1725                    DataValue::InternedString(s) => s.to_string(),
1726                    DataValue::Integer(n) => n.to_string(),
1727                    DataValue::Float(f) => f.to_string(),
1728                    _ => return Err(anyhow!("IndexOf argument must be a string")),
1729                };
1730
1731                // Convert value to string and find index
1732                let str_val = match value {
1733                    DataValue::String(s) => s.clone(),
1734                    DataValue::InternedString(s) => s.to_string(),
1735                    DataValue::Integer(n) => n.to_string(),
1736                    DataValue::Float(f) => f.to_string(),
1737                    DataValue::Boolean(b) => b.to_string(),
1738                    DataValue::DateTime(dt) => dt.clone(),
1739                    DataValue::Null => return Ok(DataValue::Integer(-1)),
1740                };
1741
1742                let index = str_val.find(&search_str).map(|i| i as i64).unwrap_or(-1);
1743
1744                Ok(DataValue::Integer(index))
1745            }
1746            "contains" => {
1747                if args.len() != 1 {
1748                    return Err(anyhow!("Contains requires exactly 1 argument"));
1749                }
1750
1751                // Get the search string from args
1752                let search_str = match self.evaluate(&args[0], row_index)? {
1753                    DataValue::String(s) => s,
1754                    DataValue::InternedString(s) => s.to_string(),
1755                    DataValue::Integer(n) => n.to_string(),
1756                    DataValue::Float(f) => f.to_string(),
1757                    _ => return Err(anyhow!("Contains argument must be a string")),
1758                };
1759
1760                // Convert value to string and check contains
1761                let str_val = match value {
1762                    DataValue::String(s) => s.clone(),
1763                    DataValue::InternedString(s) => s.to_string(),
1764                    DataValue::Integer(n) => n.to_string(),
1765                    DataValue::Float(f) => f.to_string(),
1766                    DataValue::Boolean(b) => b.to_string(),
1767                    DataValue::DateTime(dt) => dt.clone(),
1768                    DataValue::Null => return Ok(DataValue::Boolean(false)),
1769                };
1770
1771                // Case-insensitive search
1772                let result = str_val.to_lowercase().contains(&search_str.to_lowercase());
1773                Ok(DataValue::Boolean(result))
1774            }
1775            "startswith" => {
1776                if args.len() != 1 {
1777                    return Err(anyhow!("StartsWith requires exactly 1 argument"));
1778                }
1779
1780                // Get the prefix from args
1781                let prefix = match self.evaluate(&args[0], row_index)? {
1782                    DataValue::String(s) => s,
1783                    DataValue::InternedString(s) => s.to_string(),
1784                    DataValue::Integer(n) => n.to_string(),
1785                    DataValue::Float(f) => f.to_string(),
1786                    _ => return Err(anyhow!("StartsWith argument must be a string")),
1787                };
1788
1789                // Convert value to string and check starts_with
1790                let str_val = match value {
1791                    DataValue::String(s) => s.clone(),
1792                    DataValue::InternedString(s) => s.to_string(),
1793                    DataValue::Integer(n) => n.to_string(),
1794                    DataValue::Float(f) => f.to_string(),
1795                    DataValue::Boolean(b) => b.to_string(),
1796                    DataValue::DateTime(dt) => dt.clone(),
1797                    DataValue::Null => return Ok(DataValue::Boolean(false)),
1798                };
1799
1800                // Case-insensitive check
1801                let result = str_val.to_lowercase().starts_with(&prefix.to_lowercase());
1802                Ok(DataValue::Boolean(result))
1803            }
1804            "endswith" => {
1805                if args.len() != 1 {
1806                    return Err(anyhow!("EndsWith requires exactly 1 argument"));
1807                }
1808
1809                // Get the suffix from args
1810                let suffix = match self.evaluate(&args[0], row_index)? {
1811                    DataValue::String(s) => s,
1812                    DataValue::InternedString(s) => s.to_string(),
1813                    DataValue::Integer(n) => n.to_string(),
1814                    DataValue::Float(f) => f.to_string(),
1815                    _ => return Err(anyhow!("EndsWith argument must be a string")),
1816                };
1817
1818                // Convert value to string and check ends_with
1819                let str_val = match value {
1820                    DataValue::String(s) => s.clone(),
1821                    DataValue::InternedString(s) => s.to_string(),
1822                    DataValue::Integer(n) => n.to_string(),
1823                    DataValue::Float(f) => f.to_string(),
1824                    DataValue::Boolean(b) => b.to_string(),
1825                    DataValue::DateTime(dt) => dt.clone(),
1826                    DataValue::Null => return Ok(DataValue::Boolean(false)),
1827                };
1828
1829                // Case-insensitive check
1830                let result = str_val.to_lowercase().ends_with(&suffix.to_lowercase());
1831                Ok(DataValue::Boolean(result))
1832            }
1833            _ => Err(anyhow!("Unsupported method: {}", method)),
1834        }
1835    }
1836
1837    /// Evaluate a CASE expression
1838    fn evaluate_case_expression(
1839        &self,
1840        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1841        else_branch: &Option<Box<SqlExpression>>,
1842        row_index: usize,
1843    ) -> Result<DataValue> {
1844        debug!(
1845            "ArithmeticEvaluator: evaluating CASE expression for row {}",
1846            row_index
1847        );
1848
1849        // Evaluate each WHEN condition in order
1850        for branch in when_branches {
1851            // Evaluate the condition as a boolean
1852            let condition_result = self.evaluate_condition_as_bool(&branch.condition, row_index)?;
1853
1854            if condition_result {
1855                debug!("CASE: WHEN condition matched, evaluating result expression");
1856                return self.evaluate(&branch.result, row_index);
1857            }
1858        }
1859
1860        // If no WHEN condition matched, evaluate ELSE clause (or return NULL)
1861        match else_branch {
1862            Some(else_expr) => {
1863                debug!("CASE: No WHEN matched, evaluating ELSE expression");
1864                self.evaluate(else_expr, row_index)
1865            }
1866            None => {
1867                debug!("CASE: No WHEN matched and no ELSE, returning NULL");
1868                Ok(DataValue::Null)
1869            }
1870        }
1871    }
1872
1873    /// Helper method to evaluate an expression as a boolean (for CASE WHEN conditions)
1874    fn evaluate_condition_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
1875        let value = self.evaluate(expr, row_index)?;
1876
1877        match value {
1878            DataValue::Boolean(b) => Ok(b),
1879            DataValue::Integer(i) => Ok(i != 0),
1880            DataValue::Float(f) => Ok(f != 0.0),
1881            DataValue::Null => Ok(false),
1882            DataValue::String(s) => Ok(!s.is_empty()),
1883            DataValue::InternedString(s) => Ok(!s.is_empty()),
1884            _ => Ok(true), // Other types are considered truthy
1885        }
1886    }
1887}
1888
1889#[cfg(test)]
1890mod tests {
1891    use super::*;
1892    use crate::data::datatable::{DataColumn, DataRow};
1893
1894    fn create_test_table() -> DataTable {
1895        let mut table = DataTable::new("test");
1896        table.add_column(DataColumn::new("a"));
1897        table.add_column(DataColumn::new("b"));
1898        table.add_column(DataColumn::new("c"));
1899
1900        table
1901            .add_row(DataRow::new(vec![
1902                DataValue::Integer(10),
1903                DataValue::Float(2.5),
1904                DataValue::Integer(4),
1905            ]))
1906            .unwrap();
1907
1908        table
1909    }
1910
1911    #[test]
1912    fn test_evaluate_column() {
1913        let table = create_test_table();
1914        let evaluator = ArithmeticEvaluator::new(&table);
1915
1916        let expr = SqlExpression::Column("a".to_string());
1917        let result = evaluator.evaluate(&expr, 0).unwrap();
1918        assert_eq!(result, DataValue::Integer(10));
1919    }
1920
1921    #[test]
1922    fn test_evaluate_number_literal() {
1923        let table = create_test_table();
1924        let evaluator = ArithmeticEvaluator::new(&table);
1925
1926        let expr = SqlExpression::NumberLiteral("42".to_string());
1927        let result = evaluator.evaluate(&expr, 0).unwrap();
1928        assert_eq!(result, DataValue::Integer(42));
1929
1930        let expr = SqlExpression::NumberLiteral("3.14".to_string());
1931        let result = evaluator.evaluate(&expr, 0).unwrap();
1932        assert_eq!(result, DataValue::Float(3.14));
1933    }
1934
1935    #[test]
1936    fn test_add_values() {
1937        let table = create_test_table();
1938        let evaluator = ArithmeticEvaluator::new(&table);
1939
1940        // Integer + Integer
1941        let result = evaluator
1942            .add_values(&DataValue::Integer(5), &DataValue::Integer(3))
1943            .unwrap();
1944        assert_eq!(result, DataValue::Integer(8));
1945
1946        // Integer + Float
1947        let result = evaluator
1948            .add_values(&DataValue::Integer(5), &DataValue::Float(2.5))
1949            .unwrap();
1950        assert_eq!(result, DataValue::Float(7.5));
1951    }
1952
1953    #[test]
1954    fn test_multiply_values() {
1955        let table = create_test_table();
1956        let evaluator = ArithmeticEvaluator::new(&table);
1957
1958        // Integer * Float
1959        let result = evaluator
1960            .multiply_values(&DataValue::Integer(4), &DataValue::Float(2.5))
1961            .unwrap();
1962        assert_eq!(result, DataValue::Float(10.0));
1963    }
1964
1965    #[test]
1966    fn test_divide_values() {
1967        let table = create_test_table();
1968        let evaluator = ArithmeticEvaluator::new(&table);
1969
1970        // Exact division
1971        let result = evaluator
1972            .divide_values(&DataValue::Integer(10), &DataValue::Integer(2))
1973            .unwrap();
1974        assert_eq!(result, DataValue::Integer(5));
1975
1976        // Non-exact division
1977        let result = evaluator
1978            .divide_values(&DataValue::Integer(10), &DataValue::Integer(3))
1979            .unwrap();
1980        assert_eq!(result, DataValue::Float(10.0 / 3.0));
1981    }
1982
1983    #[test]
1984    fn test_division_by_zero() {
1985        let table = create_test_table();
1986        let evaluator = ArithmeticEvaluator::new(&table);
1987
1988        let result = evaluator.divide_values(&DataValue::Integer(10), &DataValue::Integer(0));
1989        assert!(result.is_err());
1990        assert!(result.unwrap_err().to_string().contains("Division by zero"));
1991    }
1992
1993    #[test]
1994    fn test_binary_op_expression() {
1995        let table = create_test_table();
1996        let evaluator = ArithmeticEvaluator::new(&table);
1997
1998        // a * b where a=10, b=2.5
1999        let expr = SqlExpression::BinaryOp {
2000            left: Box::new(SqlExpression::Column("a".to_string())),
2001            op: "*".to_string(),
2002            right: Box::new(SqlExpression::Column("b".to_string())),
2003        };
2004
2005        let result = evaluator.evaluate(&expr, 0).unwrap();
2006        assert_eq!(result, DataValue::Float(25.0));
2007    }
2008}