sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::data::value_comparisons::compare_with_op;
4use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
5use anyhow::{anyhow, Result};
6use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
7use tracing::debug;
8
9/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
10pub struct RecursiveWhereEvaluator<'a> {
11    table: &'a DataTable,
12    case_insensitive: bool,
13}
14
15impl<'a> RecursiveWhereEvaluator<'a> {
16    #[must_use]
17    pub fn new(table: &'a DataTable) -> Self {
18        Self {
19            table,
20            case_insensitive: false,
21        }
22    }
23
24    /// Find a column name similar to the given name using edit distance
25    fn find_similar_column(&self, name: &str) -> Option<String> {
26        let columns = self.table.column_names();
27        let mut best_match: Option<(String, usize)> = None;
28
29        for col in columns {
30            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
31            // Only suggest if distance is small (likely a typo)
32            // Allow up to 3 edits for longer names
33            let max_distance = if name.len() > 10 { 3 } else { 2 };
34            if distance <= max_distance {
35                match &best_match {
36                    None => best_match = Some((col, distance)),
37                    Some((_, best_dist)) if distance < *best_dist => {
38                        best_match = Some((col, distance));
39                    }
40                    _ => {}
41                }
42            }
43        }
44
45        best_match.map(|(name, _)| name)
46    }
47
48    /// Calculate Levenshtein edit distance between two strings
49    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
50        let len1 = s1.len();
51        let len2 = s2.len();
52        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
53
54        for i in 0..=len1 {
55            matrix[i][0] = i;
56        }
57        for j in 0..=len2 {
58            matrix[0][j] = j;
59        }
60
61        for (i, c1) in s1.chars().enumerate() {
62            for (j, c2) in s2.chars().enumerate() {
63                let cost = usize::from(c1 != c2);
64                matrix[i + 1][j + 1] = std::cmp::min(
65                    matrix[i][j + 1] + 1, // deletion
66                    std::cmp::min(
67                        matrix[i + 1][j] + 1, // insertion
68                        matrix[i][j] + cost,  // substitution
69                    ),
70                );
71            }
72        }
73
74        matrix[len1][len2]
75    }
76
77    #[must_use]
78    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
79        Self {
80            table,
81            case_insensitive,
82        }
83    }
84
85    #[must_use]
86    pub fn with_config(
87        table: &'a DataTable,
88        case_insensitive: bool,
89        _date_notation: String, // No longer needed since we use centralized parse_datetime
90    ) -> Self {
91        Self {
92            table,
93            case_insensitive,
94        }
95    }
96
97    /// Convert ExprValue to DataValue for centralized comparison
98    fn expr_value_to_data_value(&self, expr_value: &ExprValue) -> DataValue {
99        match expr_value {
100            ExprValue::String(s) => DataValue::String(s.clone()),
101            ExprValue::Number(n) => {
102                // Check if it's an integer or float
103                if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
104                    DataValue::Integer(*n as i64)
105                } else {
106                    DataValue::Float(*n)
107                }
108            }
109            ExprValue::Boolean(b) => DataValue::Boolean(*b),
110            ExprValue::DateTime(dt) => {
111                DataValue::DateTime(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
112            }
113            ExprValue::Null => DataValue::Null,
114        }
115    }
116
117    /// Evaluate the `Length()` method on a column value
118    fn evaluate_length(
119        &self,
120        object: &str,
121        row_index: usize,
122    ) -> Result<(Option<DataValue>, String)> {
123        // Handle qualified column names (table.column or alias.column)
124        let resolved_column = if object.contains('.') {
125            if let Some(dot_pos) = object.rfind('.') {
126                let col_name = &object[dot_pos + 1..];
127                col_name
128            } else {
129                object
130            }
131        } else {
132            object
133        };
134
135        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
136            idx
137        } else if resolved_column != object {
138            // If not found, try the original name
139            if let Some(idx) = self.table.get_column_index(object) {
140                idx
141            } else {
142                let suggestion = self.find_similar_column(resolved_column);
143                return Err(match suggestion {
144                    Some(similar) => {
145                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
146                    }
147                    None => anyhow!("Column '{}' not found", object),
148                });
149            }
150        } else {
151            let suggestion = self.find_similar_column(resolved_column);
152            return Err(match suggestion {
153                Some(similar) => {
154                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
155                }
156                None => anyhow!("Column '{}' not found", object),
157            });
158        };
159
160        let value = self.table.get_value(row_index, col_index);
161        let length_value = match value {
162            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
163            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
164            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
165            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
166            _ => Some(DataValue::Integer(0)),
167        };
168        Ok((length_value, format!("{object}.Length()")))
169    }
170
171    /// Evaluate the `IndexOf()` method on a column value
172    fn evaluate_indexof(
173        &self,
174        object: &str,
175        search_str: &str,
176        row_index: usize,
177    ) -> Result<(Option<DataValue>, String)> {
178        // Handle qualified column names (table.column or alias.column)
179        let resolved_column = if object.contains('.') {
180            if let Some(dot_pos) = object.rfind('.') {
181                let col_name = &object[dot_pos + 1..];
182                col_name
183            } else {
184                object
185            }
186        } else {
187            object
188        };
189
190        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
191            idx
192        } else if resolved_column != object {
193            // If not found, try the original name
194            if let Some(idx) = self.table.get_column_index(object) {
195                idx
196            } else {
197                let suggestion = self.find_similar_column(resolved_column);
198                return Err(match suggestion {
199                    Some(similar) => {
200                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
201                    }
202                    None => anyhow!("Column '{}' not found", object),
203                });
204            }
205        } else {
206            let suggestion = self.find_similar_column(resolved_column);
207            return Err(match suggestion {
208                Some(similar) => {
209                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
210                }
211                None => anyhow!("Column '{}' not found", object),
212            });
213        };
214
215        let value = self.table.get_value(row_index, col_index);
216        let index_value = match value {
217            Some(DataValue::String(s)) => {
218                // Case-insensitive search by default, following Contains behavior
219                let pos = s
220                    .to_lowercase()
221                    .find(&search_str.to_lowercase())
222                    .map_or(-1, |idx| idx as i64);
223                Some(DataValue::Integer(pos))
224            }
225            Some(DataValue::InternedString(s)) => {
226                let pos = s
227                    .to_lowercase()
228                    .find(&search_str.to_lowercase())
229                    .map_or(-1, |idx| idx as i64);
230                Some(DataValue::Integer(pos))
231            }
232            Some(DataValue::Integer(n)) => {
233                let str_val = n.to_string();
234                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
235                Some(DataValue::Integer(pos))
236            }
237            Some(DataValue::Float(f)) => {
238                let str_val = f.to_string();
239                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
240                Some(DataValue::Integer(pos))
241            }
242            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
243        };
244
245        if row_index < 3 {
246            debug!(
247                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
248                row_index, search_str, index_value
249            );
250        }
251        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
252    }
253
254    /// Apply the appropriate trim operation based on `trim_type`
255    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
256        match trim_type {
257            "trim" => s.trim(),
258            "trimstart" => s.trim_start(),
259            "trimend" => s.trim_end(),
260            _ => s,
261        }
262    }
263
264    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
265    fn evaluate_trim(
266        &self,
267        object: &str,
268        row_index: usize,
269        trim_type: &str,
270    ) -> Result<(Option<DataValue>, String)> {
271        // Handle qualified column names (table.column or alias.column)
272        let resolved_column = if object.contains('.') {
273            if let Some(dot_pos) = object.rfind('.') {
274                let col_name = &object[dot_pos + 1..];
275                col_name
276            } else {
277                object
278            }
279        } else {
280            object
281        };
282
283        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
284            idx
285        } else if resolved_column != object {
286            // If not found, try the original name
287            if let Some(idx) = self.table.get_column_index(object) {
288                idx
289            } else {
290                let suggestion = self.find_similar_column(resolved_column);
291                return Err(match suggestion {
292                    Some(similar) => {
293                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
294                    }
295                    None => anyhow!("Column '{}' not found", object),
296                });
297            }
298        } else {
299            let suggestion = self.find_similar_column(resolved_column);
300            return Err(match suggestion {
301                Some(similar) => {
302                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
303                }
304                None => anyhow!("Column '{}' not found", object),
305            });
306        };
307
308        let value = self.table.get_value(row_index, col_index);
309        let trimmed_value = match value {
310            Some(DataValue::String(s)) => Some(DataValue::String(
311                Self::apply_trim(s, trim_type).to_string(),
312            )),
313            Some(DataValue::InternedString(s)) => Some(DataValue::String(
314                Self::apply_trim(s, trim_type).to_string(),
315            )),
316            Some(DataValue::Integer(n)) => {
317                let str_val = n.to_string();
318                Some(DataValue::String(
319                    Self::apply_trim(&str_val, trim_type).to_string(),
320                ))
321            }
322            Some(DataValue::Float(f)) => {
323                let str_val = f.to_string();
324                Some(DataValue::String(
325                    Self::apply_trim(&str_val, trim_type).to_string(),
326                ))
327            }
328            _ => Some(DataValue::String(String::new())),
329        };
330
331        let method_name = match trim_type {
332            "trim" => "Trim",
333            "trimstart" => "TrimStart",
334            "trimend" => "TrimEnd",
335            _ => "Trim",
336        };
337        Ok((trimmed_value, format!("{object}.{method_name}()")))
338    }
339
340    /// Evaluate a WHERE clause for a specific row
341    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
342        // Only log for first few rows to avoid performance impact
343        if row_index < 3 {
344            debug!(
345                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
346                row_index,
347                where_clause.conditions.len(),
348                self.case_insensitive
349            );
350        }
351
352        if where_clause.conditions.is_empty() {
353            if row_index < 3 {
354                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
355            }
356            return Ok(true);
357        }
358
359        // With the new expression tree structure, we should have a single condition
360        // containing the entire WHERE clause expression tree
361        if where_clause.conditions.len() == 1 {
362            // New structure: single expression tree
363            if row_index < 3 {
364                debug!(
365                    "RecursiveWhereEvaluator: evaluate() - evaluating expression tree for row {}",
366                    row_index
367                );
368            }
369            self.evaluate_condition(&where_clause.conditions[0], row_index)
370        } else {
371            // Legacy structure: multiple conditions with connectors
372            // This path is kept for backward compatibility
373            if row_index < 3 {
374                debug!(
375                    "RecursiveWhereEvaluator: evaluate() - evaluating {} conditions with connectors for row {}",
376                    where_clause.conditions.len(),
377                    row_index
378                );
379            }
380            let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
381
382            // Apply connectors (AND/OR) with subsequent conditions
383            for i in 1..where_clause.conditions.len() {
384                let next_result =
385                    self.evaluate_condition(&where_clause.conditions[i], row_index)?;
386
387                // Use the connector from the previous condition
388                if let Some(connector) = &where_clause.conditions[i - 1].connector {
389                    result = match connector {
390                        LogicalOp::And => result && next_result,
391                        LogicalOp::Or => result || next_result,
392                    };
393                }
394            }
395
396            Ok(result)
397        }
398    }
399
400    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
401        // Only log first few rows to avoid performance impact
402        if row_index < 3 {
403            debug!(
404                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
405                row_index
406            );
407        }
408        let result = self.evaluate_expression(&condition.expr, row_index);
409        if row_index < 3 {
410            debug!(
411                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
412                row_index, result
413            );
414        }
415        result
416    }
417
418    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
419        // Only log first few rows to avoid performance impact
420        if row_index < 3 {
421            debug!(
422                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
423                row_index, expr
424            );
425        }
426
427        let result = match expr {
428            SqlExpression::BinaryOp { left, op, right } => {
429                self.evaluate_binary_op(left, op, right, row_index)
430            }
431            SqlExpression::InList { expr, values } => {
432                self.evaluate_in_list(expr, values, row_index, false)
433            }
434            SqlExpression::NotInList { expr, values } => {
435                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
436                Ok(!in_result)
437            }
438            SqlExpression::Between { expr, lower, upper } => {
439                self.evaluate_between(expr, lower, upper, row_index)
440            }
441            SqlExpression::Not { expr } => {
442                let inner_result = self.evaluate_expression(expr, row_index)?;
443                Ok(!inner_result)
444            }
445            SqlExpression::MethodCall {
446                object,
447                method,
448                args,
449            } => {
450                if row_index < 3 {
451                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
452                }
453                self.evaluate_method_call(object, method, args, row_index)
454            }
455            SqlExpression::CaseExpression {
456                when_branches,
457                else_branch,
458            } => {
459                if row_index < 3 {
460                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
461                }
462                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
463            }
464            _ => {
465                if row_index < 3 {
466                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
467                }
468                Ok(false) // Default to false for unsupported expressions
469            }
470        };
471
472        if row_index < 3 {
473            debug!(
474                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
475                row_index, result
476            );
477        }
478        result
479    }
480
481    fn evaluate_binary_op(
482        &self,
483        left: &SqlExpression,
484        op: &str,
485        right: &SqlExpression,
486        row_index: usize,
487    ) -> Result<bool> {
488        // Only log first few rows to avoid performance impact
489        if row_index < 3 {
490            debug!(
491                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
492                row_index, op
493            );
494        }
495
496        // Handle logical operators (AND, OR) specially
497        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
498            let left_result = self.evaluate_expression(left, row_index)?;
499            let right_result = self.evaluate_expression(right, row_index)?;
500
501            return Ok(match op.to_uppercase().as_str() {
502                "OR" => left_result || right_result,
503                "AND" => left_result && right_result,
504                _ => unreachable!(),
505            });
506        }
507
508        // For complex expressions (arithmetic, functions), use ArithmeticEvaluator
509        if matches!(left, SqlExpression::BinaryOp { .. })
510            || matches!(left, SqlExpression::FunctionCall { .. })
511            || matches!(right, SqlExpression::BinaryOp { .. })
512            || matches!(right, SqlExpression::FunctionCall { .. })
513        {
514            let comparison_expr = SqlExpression::BinaryOp {
515                left: Box::new(left.clone()),
516                op: op.to_string(),
517                right: Box::new(right.clone()),
518            };
519
520            let mut evaluator = ArithmeticEvaluator::new(self.table);
521            let result = evaluator.evaluate(&comparison_expr, row_index)?;
522
523            // Convert the result to boolean
524            return match result {
525                DataValue::Boolean(b) => Ok(b),
526                DataValue::Null => Ok(false),
527                _ => Err(anyhow!("Comparison did not return a boolean value")),
528            };
529        }
530
531        // For simple comparisons, use the original WHERE clause logic with improved date parsing
532        // Handle left side - could be a column or a method call
533        let (cell_value, column_name) = match left {
534            SqlExpression::MethodCall {
535                object,
536                method,
537                args,
538            } => {
539                // Handle method calls that return values (like Length(), IndexOf())
540                match method.to_lowercase().as_str() {
541                    "length" => {
542                        if !args.is_empty() {
543                            return Err(anyhow::anyhow!("Length() takes no arguments"));
544                        }
545                        self.evaluate_length(object, row_index)?
546                    }
547                    "indexof" => {
548                        if args.len() != 1 {
549                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
550                        }
551                        let search_str = self.extract_string_value(&args[0])?;
552                        self.evaluate_indexof(object, &search_str, row_index)?
553                    }
554                    "trim" => {
555                        if !args.is_empty() {
556                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
557                        }
558                        self.evaluate_trim(object, row_index, "trim")?
559                    }
560                    "trimstart" => {
561                        if !args.is_empty() {
562                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
563                        }
564                        self.evaluate_trim(object, row_index, "trimstart")?
565                    }
566                    "trimend" => {
567                        if !args.is_empty() {
568                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
569                        }
570                        self.evaluate_trim(object, row_index, "trimend")?
571                    }
572                    _ => {
573                        return Err(anyhow::anyhow!(
574                            "Method '{}' cannot be used in comparisons",
575                            method
576                        ));
577                    }
578                }
579            }
580            _ => {
581                // Regular column reference
582                let column_name = self.extract_column_name(left)?;
583                if row_index < 3 {
584                    debug!(
585                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
586                        column_name
587                    );
588                }
589
590                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
591                    let suggestion = self.find_similar_column(&column_name);
592                    match suggestion {
593                        Some(similar) => anyhow!(
594                            "Column '{}' not found. Did you mean '{}'?",
595                            column_name,
596                            similar
597                        ),
598                        None => anyhow!("Column '{}' not found", column_name),
599                    }
600                })?;
601
602                let cell_value = self.table.get_value(row_index, col_index).cloned();
603                (cell_value, column_name)
604            }
605        };
606
607        if row_index < 3 {
608            debug!(
609                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
610                row_index, column_name, cell_value
611            );
612        }
613
614        // Get comparison value from right side
615        let compare_value = self.extract_value(right)?;
616
617        // Handle special operators that aren't standard comparisons
618        let op_upper = op.to_uppercase();
619        match op_upper.as_str() {
620            // LIKE operator - handle specially
621            "LIKE" => {
622                let table_value = cell_value.unwrap_or(DataValue::Null);
623                let pattern = match compare_value {
624                    ExprValue::String(s) => s,
625                    _ => return Ok(false),
626                };
627
628                let text = match &table_value {
629                    DataValue::String(s) => s.as_str(),
630                    DataValue::InternedString(s) => s.as_str(),
631                    _ => return Ok(false),
632                };
633
634                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
635                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
636                    .case_insensitive(true)
637                    .build()
638                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
639                Ok(regex.is_match(text))
640            }
641
642            // IS NULL / IS NOT NULL
643            "IS NULL" => Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null))),
644            "IS NOT NULL" => {
645                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
646            }
647
648            // Handle IS / IS NOT with NULL explicitly
649            "IS" if matches!(compare_value, ExprValue::Null) => {
650                Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null)))
651            }
652            "IS NOT" if matches!(compare_value, ExprValue::Null) => {
653                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
654            }
655
656            // Standard comparison operators - use centralized logic
657            _ => {
658                let table_value = cell_value.unwrap_or(DataValue::Null);
659                let comparison_value = self.expr_value_to_data_value(&compare_value);
660
661                if row_index < 3 {
662                    debug!(
663                        "RecursiveWhereEvaluator: Using centralized comparison - table: {:?}, op: '{}', comparison: {:?}, case_insensitive: {}",
664                        table_value, op, comparison_value, self.case_insensitive
665                    );
666                }
667
668                Ok(compare_with_op(
669                    &table_value,
670                    &comparison_value,
671                    op,
672                    self.case_insensitive,
673                ))
674            }
675        }
676    }
677
678    fn evaluate_in_list(
679        &self,
680        expr: &SqlExpression,
681        values: &[SqlExpression],
682        row_index: usize,
683        _ignore_case: bool,
684    ) -> Result<bool> {
685        let column_name = self.extract_column_name(expr)?;
686        let col_index = self
687            .table
688            .get_column_index(&column_name)
689            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
690
691        let cell_value = self.table.get_value(row_index, col_index).cloned();
692
693        for value_expr in values {
694            let compare_value = self.extract_value(value_expr)?;
695            let table_value = cell_value.as_ref().unwrap_or(&DataValue::Null);
696            let comparison_value = self.expr_value_to_data_value(&compare_value);
697
698            // Use centralized comparison for equality
699            if compare_with_op(table_value, &comparison_value, "=", self.case_insensitive) {
700                return Ok(true);
701            }
702        }
703
704        Ok(false)
705    }
706
707    fn evaluate_between(
708        &self,
709        expr: &SqlExpression,
710        lower: &SqlExpression,
711        upper: &SqlExpression,
712        row_index: usize,
713    ) -> Result<bool> {
714        let column_name = self.extract_column_name(expr)?;
715        let col_index = self
716            .table
717            .get_column_index(&column_name)
718            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
719
720        let cell_value = self.table.get_value(row_index, col_index).cloned();
721        let lower_value = self.extract_value(lower)?;
722        let upper_value = self.extract_value(upper)?;
723
724        let table_value = cell_value.unwrap_or(DataValue::Null);
725        let lower_data_value = self.expr_value_to_data_value(&lower_value);
726        let upper_data_value = self.expr_value_to_data_value(&upper_value);
727
728        // Use centralized comparison for BETWEEN: value >= lower AND value <= upper
729        let ge_lower =
730            compare_with_op(&table_value, &lower_data_value, ">=", self.case_insensitive);
731        let le_upper =
732            compare_with_op(&table_value, &upper_data_value, "<=", self.case_insensitive);
733
734        Ok(ge_lower && le_upper)
735    }
736
737    fn evaluate_method_call(
738        &self,
739        object: &str,
740        method: &str,
741        args: &[SqlExpression],
742        row_index: usize,
743    ) -> Result<bool> {
744        if row_index < 3 {
745            debug!(
746                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
747                object, method, row_index
748            );
749        }
750
751        // Get column value
752        let col_index = self.table.get_column_index(object).ok_or_else(|| {
753            let suggestion = self.find_similar_column(object);
754            match suggestion {
755                Some(similar) => {
756                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
757                }
758                None => anyhow!("Column '{}' not found", object),
759            }
760        })?;
761
762        let cell_value = self.table.get_value(row_index, col_index).cloned();
763        if row_index < 3 {
764            debug!(
765                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
766                row_index, object, cell_value
767            );
768        }
769
770        match method.to_lowercase().as_str() {
771            "contains" => {
772                if args.len() != 1 {
773                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
774                }
775                let search_str = self.extract_string_value(&args[0])?;
776                // Pre-compute lowercase once instead of for every row
777                let search_lower = search_str.to_lowercase();
778
779                // Type coercion: convert numeric values to strings for string methods
780                match cell_value {
781                    Some(DataValue::String(ref s)) => {
782                        let result = s.to_lowercase().contains(&search_lower);
783                        // Only log first few rows to avoid performance impact
784                        if row_index < 3 {
785                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
786                        }
787                        Ok(result)
788                    }
789                    Some(DataValue::InternedString(ref s)) => {
790                        let result = s.to_lowercase().contains(&search_lower);
791                        // Only log first few rows to avoid performance impact
792                        if row_index < 3 {
793                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
794                        }
795                        Ok(result)
796                    }
797                    Some(DataValue::Integer(n)) => {
798                        let str_val = n.to_string();
799                        let result = str_val.contains(&search_str);
800                        if row_index < 3 {
801                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
802                        }
803                        Ok(result)
804                    }
805                    Some(DataValue::Float(f)) => {
806                        let str_val = f.to_string();
807                        let result = str_val.contains(&search_str);
808                        if row_index < 3 {
809                            debug!(
810                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
811                                row_index, search_str, str_val, result
812                            );
813                        }
814                        Ok(result)
815                    }
816                    Some(DataValue::Boolean(b)) => {
817                        let str_val = b.to_string();
818                        let result = str_val.contains(&search_str);
819                        if row_index < 3 {
820                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
821                        }
822                        Ok(result)
823                    }
824                    Some(DataValue::DateTime(dt)) => {
825                        // DateTime columns can use string methods via coercion
826                        let result = dt.contains(&search_str);
827                        if row_index < 3 {
828                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
829                        }
830                        Ok(result)
831                    }
832                    _ => {
833                        if row_index < 3 {
834                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
835                        }
836                        Ok(false)
837                    }
838                }
839            }
840            "startswith" => {
841                if args.len() != 1 {
842                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
843                }
844                let prefix = self.extract_string_value(&args[0])?;
845
846                // Type coercion: convert numeric values to strings for string methods
847                match cell_value {
848                    Some(DataValue::String(ref s)) => {
849                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
850                    }
851                    Some(DataValue::InternedString(ref s)) => {
852                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
853                    }
854                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
855                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
856                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
857                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
858                    _ => Ok(false),
859                }
860            }
861            "endswith" => {
862                if args.len() != 1 {
863                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
864                }
865                let suffix = self.extract_string_value(&args[0])?;
866
867                // Type coercion: convert numeric values to strings for string methods
868                match cell_value {
869                    Some(DataValue::String(ref s)) => {
870                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
871                    }
872                    Some(DataValue::InternedString(ref s)) => {
873                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
874                    }
875                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
876                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
877                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
878                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
879                    _ => Ok(false),
880                }
881            }
882            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
883        }
884    }
885
886    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
887        match expr {
888            SqlExpression::Column(name) => {
889                // Handle qualified column names
890                if name.contains('.') {
891                    if let Some(dot_pos) = name.rfind('.') {
892                        Ok(name[dot_pos + 1..].to_string())
893                    } else {
894                        Ok(name.clone())
895                    }
896                } else {
897                    Ok(name.clone())
898                }
899            }
900            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
901        }
902    }
903
904    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
905        match expr {
906            SqlExpression::StringLiteral(s) => Ok(s.clone()),
907            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
908        }
909    }
910
911    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
912        match expr {
913            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
914            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
915            SqlExpression::NumberLiteral(n) => {
916                if let Ok(num) = n.parse::<f64>() {
917                    Ok(ExprValue::Number(num))
918                } else {
919                    Ok(ExprValue::String(n.clone()))
920                }
921            }
922            SqlExpression::DateTimeConstructor {
923                year,
924                month,
925                day,
926                hour,
927                minute,
928                second,
929            } => {
930                // Create a DateTime from the constructor
931                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
932                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
933                let naive_time = NaiveTime::from_hms_opt(
934                    hour.unwrap_or(0),
935                    minute.unwrap_or(0),
936                    second.unwrap_or(0),
937                )
938                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
939                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
940                let datetime = Utc.from_utc_datetime(&naive_datetime);
941                Ok(ExprValue::DateTime(datetime))
942            }
943            SqlExpression::DateTimeToday {
944                hour,
945                minute,
946                second,
947            } => {
948                // Get today's date with optional time
949                let today = Local::now().date_naive();
950                let time = NaiveTime::from_hms_opt(
951                    hour.unwrap_or(0),
952                    minute.unwrap_or(0),
953                    second.unwrap_or(0),
954                )
955                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
956                let naive_datetime = NaiveDateTime::new(today, time);
957                let datetime = Utc.from_utc_datetime(&naive_datetime);
958                Ok(ExprValue::DateTime(datetime))
959            }
960            _ => Ok(ExprValue::Null),
961        }
962    }
963
964    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
965    fn evaluate_case_expression_as_bool(
966        &self,
967        when_branches: &[crate::sql::recursive_parser::WhenBranch],
968        else_branch: &Option<Box<SqlExpression>>,
969        row_index: usize,
970    ) -> Result<bool> {
971        debug!(
972            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
973            row_index
974        );
975
976        // Evaluate each WHEN condition in order
977        for branch in when_branches {
978            // Evaluate the condition as a boolean
979            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
980
981            if condition_result {
982                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
983                // Evaluate the result and convert to boolean
984                return self.evaluate_expression_as_bool(&branch.result, row_index);
985            }
986        }
987
988        // If no WHEN condition matched, evaluate ELSE clause (or return false)
989        if let Some(else_expr) = else_branch {
990            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
991            self.evaluate_expression_as_bool(else_expr, row_index)
992        } else {
993            debug!("CASE: No WHEN matched and no ELSE, returning false");
994            Ok(false)
995        }
996    }
997
998    /// Helper method to evaluate any expression as a boolean
999    fn evaluate_expression_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
1000        match expr {
1001            // For expressions that naturally return booleans, use the existing evaluator
1002            SqlExpression::BinaryOp { .. }
1003            | SqlExpression::InList { .. }
1004            | SqlExpression::NotInList { .. }
1005            | SqlExpression::Between { .. }
1006            | SqlExpression::Not { .. }
1007            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1008            // For CASE expressions, recurse
1009            SqlExpression::CaseExpression {
1010                when_branches,
1011                else_branch,
1012            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1013            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1014            _ => {
1015                // Use ArithmeticEvaluator to get the value, then convert to boolean
1016                let mut evaluator =
1017                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
1018                let value = evaluator.evaluate(expr, row_index)?;
1019
1020                match value {
1021                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1022                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1023                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1024                    crate::data::datatable::DataValue::Null => Ok(false),
1025                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1026                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1027                    _ => Ok(true), // Other types are considered truthy
1028                }
1029            }
1030        }
1031    }
1032}
1033
1034enum ExprValue {
1035    String(String),
1036    Number(f64),
1037    Boolean(bool),
1038    DateTime(DateTime<Utc>),
1039    Null,
1040}