sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::data::evaluation_context::EvaluationContext;
4use crate::data::value_comparisons::compare_with_op;
5use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
8use tracing::debug;
9
10/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
11pub struct RecursiveWhereEvaluator<'a, 'ctx> {
12    table: &'a DataTable,
13    case_insensitive: bool,
14    context: Option<&'ctx mut EvaluationContext>,
15}
16
17impl<'a, 'ctx> RecursiveWhereEvaluator<'a, 'ctx> {
18    #[must_use]
19    pub fn new(table: &'a DataTable) -> RecursiveWhereEvaluator<'a, 'static> {
20        RecursiveWhereEvaluator {
21            table,
22            case_insensitive: false,
23            context: None,
24        }
25    }
26
27    /// Create evaluator with an evaluation context for caching
28    pub fn with_context(table: &'a DataTable, context: &'ctx mut EvaluationContext) -> Self {
29        let case_insensitive = context.is_case_insensitive();
30        Self {
31            table,
32            case_insensitive,
33            context: Some(context),
34        }
35    }
36
37    /// Find a column name similar to the given name using edit distance
38    fn find_similar_column(&self, name: &str) -> Option<String> {
39        let columns = self.table.column_names();
40        let mut best_match: Option<(String, usize)> = None;
41
42        for col in columns {
43            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
44            // Only suggest if distance is small (likely a typo)
45            // Allow up to 3 edits for longer names
46            let max_distance = if name.len() > 10 { 3 } else { 2 };
47            if distance <= max_distance {
48                match &best_match {
49                    None => best_match = Some((col, distance)),
50                    Some((_, best_dist)) if distance < *best_dist => {
51                        best_match = Some((col, distance));
52                    }
53                    _ => {}
54                }
55            }
56        }
57
58        best_match.map(|(name, _)| name)
59    }
60
61    /// Calculate Levenshtein edit distance between two strings
62    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
63        let len1 = s1.len();
64        let len2 = s2.len();
65        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
66
67        for i in 0..=len1 {
68            matrix[i][0] = i;
69        }
70        for j in 0..=len2 {
71            matrix[0][j] = j;
72        }
73
74        for (i, c1) in s1.chars().enumerate() {
75            for (j, c2) in s2.chars().enumerate() {
76                let cost = usize::from(c1 != c2);
77                matrix[i + 1][j + 1] = std::cmp::min(
78                    matrix[i][j + 1] + 1, // deletion
79                    std::cmp::min(
80                        matrix[i + 1][j] + 1, // insertion
81                        matrix[i][j] + cost,  // substitution
82                    ),
83                );
84            }
85        }
86
87        matrix[len1][len2]
88    }
89
90    #[must_use]
91    pub fn with_case_insensitive(
92        table: &'a DataTable,
93        case_insensitive: bool,
94    ) -> RecursiveWhereEvaluator<'a, 'static> {
95        RecursiveWhereEvaluator {
96            table,
97            case_insensitive,
98            context: None,
99        }
100    }
101
102    #[must_use]
103    pub fn with_config(
104        table: &'a DataTable,
105        case_insensitive: bool,
106        _date_notation: String, // No longer needed since we use centralized parse_datetime
107    ) -> RecursiveWhereEvaluator<'a, 'static> {
108        RecursiveWhereEvaluator {
109            table,
110            case_insensitive,
111            context: None,
112        }
113    }
114
115    /// Convert ExprValue to DataValue for centralized comparison
116    fn expr_value_to_data_value(&self, expr_value: &ExprValue) -> DataValue {
117        match expr_value {
118            ExprValue::String(s) => DataValue::String(s.clone()),
119            ExprValue::Number(n) => {
120                // Check if it's an integer or float
121                if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
122                    DataValue::Integer(*n as i64)
123                } else {
124                    DataValue::Float(*n)
125                }
126            }
127            ExprValue::Boolean(b) => DataValue::Boolean(*b),
128            ExprValue::DateTime(dt) => {
129                DataValue::DateTime(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
130            }
131            ExprValue::Null => DataValue::Null,
132        }
133    }
134
135    /// Evaluate the `Length()` method on a column value
136    fn evaluate_length(
137        &self,
138        object: &str,
139        row_index: usize,
140    ) -> Result<(Option<DataValue>, String)> {
141        // Handle qualified column names (table.column or alias.column)
142        let resolved_column = if object.contains('.') {
143            if let Some(dot_pos) = object.rfind('.') {
144                let col_name = &object[dot_pos + 1..];
145                col_name
146            } else {
147                object
148            }
149        } else {
150            object
151        };
152
153        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
154            idx
155        } else if resolved_column != object {
156            // If not found, try the original name
157            if let Some(idx) = self.table.get_column_index(object) {
158                idx
159            } else {
160                let suggestion = self.find_similar_column(resolved_column);
161                return Err(match suggestion {
162                    Some(similar) => {
163                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
164                    }
165                    None => anyhow!("Column '{}' not found", object),
166                });
167            }
168        } else {
169            let suggestion = self.find_similar_column(resolved_column);
170            return Err(match suggestion {
171                Some(similar) => {
172                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
173                }
174                None => anyhow!("Column '{}' not found", object),
175            });
176        };
177
178        let value = self.table.get_value(row_index, col_index);
179        let length_value = match value {
180            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
181            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
182            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
183            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
184            _ => Some(DataValue::Integer(0)),
185        };
186        Ok((length_value, format!("{object}.Length()")))
187    }
188
189    /// Evaluate the `IndexOf()` method on a column value
190    fn evaluate_indexof(
191        &self,
192        object: &str,
193        search_str: &str,
194        row_index: usize,
195    ) -> Result<(Option<DataValue>, String)> {
196        // Handle qualified column names (table.column or alias.column)
197        let resolved_column = if object.contains('.') {
198            if let Some(dot_pos) = object.rfind('.') {
199                let col_name = &object[dot_pos + 1..];
200                col_name
201            } else {
202                object
203            }
204        } else {
205            object
206        };
207
208        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
209            idx
210        } else if resolved_column != object {
211            // If not found, try the original name
212            if let Some(idx) = self.table.get_column_index(object) {
213                idx
214            } else {
215                let suggestion = self.find_similar_column(resolved_column);
216                return Err(match suggestion {
217                    Some(similar) => {
218                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
219                    }
220                    None => anyhow!("Column '{}' not found", object),
221                });
222            }
223        } else {
224            let suggestion = self.find_similar_column(resolved_column);
225            return Err(match suggestion {
226                Some(similar) => {
227                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
228                }
229                None => anyhow!("Column '{}' not found", object),
230            });
231        };
232
233        let value = self.table.get_value(row_index, col_index);
234        let index_value = match value {
235            Some(DataValue::String(s)) => {
236                // Case-insensitive search by default, following Contains behavior
237                let pos = s
238                    .to_lowercase()
239                    .find(&search_str.to_lowercase())
240                    .map_or(-1, |idx| idx as i64);
241                Some(DataValue::Integer(pos))
242            }
243            Some(DataValue::InternedString(s)) => {
244                let pos = s
245                    .to_lowercase()
246                    .find(&search_str.to_lowercase())
247                    .map_or(-1, |idx| idx as i64);
248                Some(DataValue::Integer(pos))
249            }
250            Some(DataValue::Integer(n)) => {
251                let str_val = n.to_string();
252                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
253                Some(DataValue::Integer(pos))
254            }
255            Some(DataValue::Float(f)) => {
256                let str_val = f.to_string();
257                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
258                Some(DataValue::Integer(pos))
259            }
260            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
261        };
262
263        if row_index < 3 {
264            debug!(
265                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
266                row_index, search_str, index_value
267            );
268        }
269        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
270    }
271
272    /// Apply the appropriate trim operation based on `trim_type`
273    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
274        match trim_type {
275            "trim" => s.trim(),
276            "trimstart" => s.trim_start(),
277            "trimend" => s.trim_end(),
278            _ => s,
279        }
280    }
281
282    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
283    fn evaluate_trim(
284        &self,
285        object: &str,
286        row_index: usize,
287        trim_type: &str,
288    ) -> Result<(Option<DataValue>, String)> {
289        // Handle qualified column names (table.column or alias.column)
290        let resolved_column = if object.contains('.') {
291            if let Some(dot_pos) = object.rfind('.') {
292                let col_name = &object[dot_pos + 1..];
293                col_name
294            } else {
295                object
296            }
297        } else {
298            object
299        };
300
301        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
302            idx
303        } else if resolved_column != object {
304            // If not found, try the original name
305            if let Some(idx) = self.table.get_column_index(object) {
306                idx
307            } else {
308                let suggestion = self.find_similar_column(resolved_column);
309                return Err(match suggestion {
310                    Some(similar) => {
311                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
312                    }
313                    None => anyhow!("Column '{}' not found", object),
314                });
315            }
316        } else {
317            let suggestion = self.find_similar_column(resolved_column);
318            return Err(match suggestion {
319                Some(similar) => {
320                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
321                }
322                None => anyhow!("Column '{}' not found", object),
323            });
324        };
325
326        let value = self.table.get_value(row_index, col_index);
327        let trimmed_value = match value {
328            Some(DataValue::String(s)) => Some(DataValue::String(
329                Self::apply_trim(s, trim_type).to_string(),
330            )),
331            Some(DataValue::InternedString(s)) => Some(DataValue::String(
332                Self::apply_trim(s, trim_type).to_string(),
333            )),
334            Some(DataValue::Integer(n)) => {
335                let str_val = n.to_string();
336                Some(DataValue::String(
337                    Self::apply_trim(&str_val, trim_type).to_string(),
338                ))
339            }
340            Some(DataValue::Float(f)) => {
341                let str_val = f.to_string();
342                Some(DataValue::String(
343                    Self::apply_trim(&str_val, trim_type).to_string(),
344                ))
345            }
346            _ => Some(DataValue::String(String::new())),
347        };
348
349        let method_name = match trim_type {
350            "trim" => "Trim",
351            "trimstart" => "TrimStart",
352            "trimend" => "TrimEnd",
353            _ => "Trim",
354        };
355        Ok((trimmed_value, format!("{object}.{method_name}()")))
356    }
357
358    /// Evaluate a WHERE clause for a specific row
359    pub fn evaluate(&mut self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
360        // Only log for first few rows to avoid performance impact
361        if row_index < 3 {
362            debug!(
363                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
364                row_index,
365                where_clause.conditions.len(),
366                self.case_insensitive
367            );
368        }
369
370        if where_clause.conditions.is_empty() {
371            if row_index < 3 {
372                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
373            }
374            return Ok(true);
375        }
376
377        // With the new expression tree structure, we should have a single condition
378        // containing the entire WHERE clause expression tree
379        if where_clause.conditions.len() == 1 {
380            // New structure: single expression tree
381            if row_index < 3 {
382                debug!(
383                    "RecursiveWhereEvaluator: evaluate() - evaluating expression tree for row {}",
384                    row_index
385                );
386            }
387            self.evaluate_condition(&where_clause.conditions[0], row_index)
388        } else {
389            // Legacy structure: multiple conditions with connectors
390            // This path is kept for backward compatibility
391            if row_index < 3 {
392                debug!(
393                    "RecursiveWhereEvaluator: evaluate() - evaluating {} conditions with connectors for row {}",
394                    where_clause.conditions.len(),
395                    row_index
396                );
397            }
398            let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
399
400            // Apply connectors (AND/OR) with subsequent conditions
401            for i in 1..where_clause.conditions.len() {
402                let next_result =
403                    self.evaluate_condition(&where_clause.conditions[i], row_index)?;
404
405                // Use the connector from the previous condition
406                if let Some(connector) = &where_clause.conditions[i - 1].connector {
407                    result = match connector {
408                        LogicalOp::And => result && next_result,
409                        LogicalOp::Or => result || next_result,
410                    };
411                }
412            }
413
414            Ok(result)
415        }
416    }
417
418    fn evaluate_condition(&mut self, condition: &Condition, row_index: usize) -> Result<bool> {
419        // Only log first few rows to avoid performance impact
420        if row_index < 3 {
421            debug!(
422                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
423                row_index
424            );
425        }
426        let result = self.evaluate_expression(&condition.expr, row_index);
427        if row_index < 3 {
428            debug!(
429                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
430                row_index, result
431            );
432        }
433        result
434    }
435
436    fn evaluate_expression(&mut self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
437        // Only log first few rows to avoid performance impact
438        if row_index < 3 {
439            debug!(
440                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
441                row_index, expr
442            );
443        }
444
445        let result = match expr {
446            SqlExpression::BinaryOp { left, op, right } => {
447                self.evaluate_binary_op(left, op, right, row_index)
448            }
449            SqlExpression::InList { expr, values } => {
450                self.evaluate_in_list(expr, values, row_index, false)
451            }
452            SqlExpression::NotInList { expr, values } => {
453                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
454                Ok(!in_result)
455            }
456            SqlExpression::Between { expr, lower, upper } => {
457                self.evaluate_between(expr, lower, upper, row_index)
458            }
459            SqlExpression::Not { expr } => {
460                let inner_result = self.evaluate_expression(expr, row_index)?;
461                Ok(!inner_result)
462            }
463            SqlExpression::MethodCall {
464                object,
465                method,
466                args,
467            } => {
468                if row_index < 3 {
469                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
470                }
471                self.evaluate_method_call(object, method, args, row_index)
472            }
473            SqlExpression::CaseExpression {
474                when_branches,
475                else_branch,
476            } => {
477                if row_index < 3 {
478                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
479                }
480                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
481            }
482            _ => {
483                if row_index < 3 {
484                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
485                }
486                Ok(false) // Default to false for unsupported expressions
487            }
488        };
489
490        if row_index < 3 {
491            debug!(
492                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
493                row_index, result
494            );
495        }
496        result
497    }
498
499    fn evaluate_binary_op(
500        &mut self,
501        left: &SqlExpression,
502        op: &str,
503        right: &SqlExpression,
504        row_index: usize,
505    ) -> Result<bool> {
506        // Only log first few rows to avoid performance impact
507        if row_index < 3 {
508            debug!(
509                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
510                row_index, op
511            );
512        }
513
514        // Handle logical operators (AND, OR) specially
515        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
516            let left_result = self.evaluate_expression(left, row_index)?;
517            let right_result = self.evaluate_expression(right, row_index)?;
518
519            return Ok(match op.to_uppercase().as_str() {
520                "OR" => left_result || right_result,
521                "AND" => left_result && right_result,
522                _ => unreachable!(),
523            });
524        }
525
526        // For complex expressions (arithmetic, functions), use ArithmeticEvaluator
527        if matches!(left, SqlExpression::BinaryOp { .. })
528            || matches!(left, SqlExpression::FunctionCall { .. })
529            || matches!(right, SqlExpression::BinaryOp { .. })
530            || matches!(right, SqlExpression::FunctionCall { .. })
531        {
532            let comparison_expr = SqlExpression::BinaryOp {
533                left: Box::new(left.clone()),
534                op: op.to_string(),
535                right: Box::new(right.clone()),
536            };
537
538            let mut evaluator = ArithmeticEvaluator::new(self.table);
539            let result = evaluator.evaluate(&comparison_expr, row_index)?;
540
541            // Convert the result to boolean
542            return match result {
543                DataValue::Boolean(b) => Ok(b),
544                DataValue::Null => Ok(false),
545                _ => Err(anyhow!("Comparison did not return a boolean value")),
546            };
547        }
548
549        // For simple comparisons, use the original WHERE clause logic with improved date parsing
550        // Handle left side - could be a column or a method call
551        let (cell_value, column_name) = match left {
552            SqlExpression::MethodCall {
553                object,
554                method,
555                args,
556            } => {
557                // Handle method calls that return values (like Length(), IndexOf())
558                match method.to_lowercase().as_str() {
559                    "length" => {
560                        if !args.is_empty() {
561                            return Err(anyhow::anyhow!("Length() takes no arguments"));
562                        }
563                        self.evaluate_length(object, row_index)?
564                    }
565                    "indexof" => {
566                        if args.len() != 1 {
567                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
568                        }
569                        let search_str = self.extract_string_value(&args[0])?;
570                        self.evaluate_indexof(object, &search_str, row_index)?
571                    }
572                    "trim" => {
573                        if !args.is_empty() {
574                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
575                        }
576                        self.evaluate_trim(object, row_index, "trim")?
577                    }
578                    "trimstart" => {
579                        if !args.is_empty() {
580                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
581                        }
582                        self.evaluate_trim(object, row_index, "trimstart")?
583                    }
584                    "trimend" => {
585                        if !args.is_empty() {
586                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
587                        }
588                        self.evaluate_trim(object, row_index, "trimend")?
589                    }
590                    _ => {
591                        return Err(anyhow::anyhow!(
592                            "Method '{}' cannot be used in comparisons",
593                            method
594                        ));
595                    }
596                }
597            }
598            _ => {
599                // Regular column reference
600                let column_name = self.extract_column_name(left)?;
601                if row_index < 3 {
602                    debug!(
603                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
604                        column_name
605                    );
606                }
607
608                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
609                    let suggestion = self.find_similar_column(&column_name);
610                    match suggestion {
611                        Some(similar) => anyhow!(
612                            "Column '{}' not found. Did you mean '{}'?",
613                            column_name,
614                            similar
615                        ),
616                        None => anyhow!("Column '{}' not found", column_name),
617                    }
618                })?;
619
620                let cell_value = self.table.get_value(row_index, col_index).cloned();
621                (cell_value, column_name)
622            }
623        };
624
625        if row_index < 3 {
626            debug!(
627                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
628                row_index, column_name, cell_value
629            );
630        }
631
632        // Get comparison value from right side
633        let compare_value = self.extract_value(right)?;
634
635        // Handle special operators that aren't standard comparisons
636        let op_upper = op.to_uppercase();
637        match op_upper.as_str() {
638            // LIKE operator - handle specially
639            "LIKE" => {
640                let table_value = cell_value.unwrap_or(DataValue::Null);
641                let pattern = match compare_value {
642                    ExprValue::String(s) => s,
643                    _ => return Ok(false),
644                };
645
646                let text = match &table_value {
647                    DataValue::String(s) => s.as_str(),
648                    DataValue::InternedString(s) => s.as_str(),
649                    _ => return Ok(false),
650                };
651
652                // Use cached regex if context is available, otherwise compile fresh
653                if let Some(ctx) = &mut self.context {
654                    let regex = ctx
655                        .get_or_compile_like_regex(&pattern)
656                        .map_err(|e| anyhow::anyhow!("{}", e))?;
657                    Ok(regex.is_match(text))
658                } else {
659                    // Fallback to compiling regex each time (old behavior)
660                    let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
661                    let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
662                        .case_insensitive(self.case_insensitive)
663                        .build()
664                        .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
665                    Ok(regex.is_match(text))
666                }
667            }
668
669            // IS NULL / IS NOT NULL
670            "IS NULL" => Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null))),
671            "IS NOT NULL" => {
672                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
673            }
674
675            // Handle IS / IS NOT with NULL explicitly
676            "IS" if matches!(compare_value, ExprValue::Null) => {
677                Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null)))
678            }
679            "IS NOT" if matches!(compare_value, ExprValue::Null) => {
680                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
681            }
682
683            // Standard comparison operators - use centralized logic
684            _ => {
685                let table_value = cell_value.unwrap_or(DataValue::Null);
686                let comparison_value = self.expr_value_to_data_value(&compare_value);
687
688                if row_index < 3 {
689                    debug!(
690                        "RecursiveWhereEvaluator: Using centralized comparison - table: {:?}, op: '{}', comparison: {:?}, case_insensitive: {}",
691                        table_value, op, comparison_value, self.case_insensitive
692                    );
693                }
694
695                Ok(compare_with_op(
696                    &table_value,
697                    &comparison_value,
698                    op,
699                    self.case_insensitive,
700                ))
701            }
702        }
703    }
704
705    fn evaluate_in_list(
706        &self,
707        expr: &SqlExpression,
708        values: &[SqlExpression],
709        row_index: usize,
710        _ignore_case: bool,
711    ) -> Result<bool> {
712        let column_name = self.extract_column_name(expr)?;
713        let col_index = self
714            .table
715            .get_column_index(&column_name)
716            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
717
718        let cell_value = self.table.get_value(row_index, col_index).cloned();
719
720        for value_expr in values {
721            let compare_value = self.extract_value(value_expr)?;
722            let table_value = cell_value.as_ref().unwrap_or(&DataValue::Null);
723            let comparison_value = self.expr_value_to_data_value(&compare_value);
724
725            // Use centralized comparison for equality
726            if compare_with_op(table_value, &comparison_value, "=", self.case_insensitive) {
727                return Ok(true);
728            }
729        }
730
731        Ok(false)
732    }
733
734    fn evaluate_between(
735        &self,
736        expr: &SqlExpression,
737        lower: &SqlExpression,
738        upper: &SqlExpression,
739        row_index: usize,
740    ) -> Result<bool> {
741        let column_name = self.extract_column_name(expr)?;
742        let col_index = self
743            .table
744            .get_column_index(&column_name)
745            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
746
747        let cell_value = self.table.get_value(row_index, col_index).cloned();
748        let lower_value = self.extract_value(lower)?;
749        let upper_value = self.extract_value(upper)?;
750
751        let table_value = cell_value.unwrap_or(DataValue::Null);
752        let lower_data_value = self.expr_value_to_data_value(&lower_value);
753        let upper_data_value = self.expr_value_to_data_value(&upper_value);
754
755        // Use centralized comparison for BETWEEN: value >= lower AND value <= upper
756        let ge_lower =
757            compare_with_op(&table_value, &lower_data_value, ">=", self.case_insensitive);
758        let le_upper =
759            compare_with_op(&table_value, &upper_data_value, "<=", self.case_insensitive);
760
761        Ok(ge_lower && le_upper)
762    }
763
764    fn evaluate_method_call(
765        &self,
766        object: &str,
767        method: &str,
768        args: &[SqlExpression],
769        row_index: usize,
770    ) -> Result<bool> {
771        if row_index < 3 {
772            debug!(
773                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
774                object, method, row_index
775            );
776        }
777
778        // Get column value
779        let col_index = self.table.get_column_index(object).ok_or_else(|| {
780            let suggestion = self.find_similar_column(object);
781            match suggestion {
782                Some(similar) => {
783                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
784                }
785                None => anyhow!("Column '{}' not found", object),
786            }
787        })?;
788
789        let cell_value = self.table.get_value(row_index, col_index).cloned();
790        if row_index < 3 {
791            debug!(
792                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
793                row_index, object, cell_value
794            );
795        }
796
797        match method.to_lowercase().as_str() {
798            "contains" => {
799                if args.len() != 1 {
800                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
801                }
802                let search_str = self.extract_string_value(&args[0])?;
803                // Pre-compute lowercase once instead of for every row
804                let search_lower = search_str.to_lowercase();
805
806                // Type coercion: convert numeric values to strings for string methods
807                match cell_value {
808                    Some(DataValue::String(ref s)) => {
809                        let result = s.to_lowercase().contains(&search_lower);
810                        // Only log first few rows to avoid performance impact
811                        if row_index < 3 {
812                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
813                        }
814                        Ok(result)
815                    }
816                    Some(DataValue::InternedString(ref s)) => {
817                        let result = s.to_lowercase().contains(&search_lower);
818                        // Only log first few rows to avoid performance impact
819                        if row_index < 3 {
820                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
821                        }
822                        Ok(result)
823                    }
824                    Some(DataValue::Integer(n)) => {
825                        let str_val = n.to_string();
826                        let result = str_val.contains(&search_str);
827                        if row_index < 3 {
828                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
829                        }
830                        Ok(result)
831                    }
832                    Some(DataValue::Float(f)) => {
833                        let str_val = f.to_string();
834                        let result = str_val.contains(&search_str);
835                        if row_index < 3 {
836                            debug!(
837                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
838                                row_index, search_str, str_val, result
839                            );
840                        }
841                        Ok(result)
842                    }
843                    Some(DataValue::Boolean(b)) => {
844                        let str_val = b.to_string();
845                        let result = str_val.contains(&search_str);
846                        if row_index < 3 {
847                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
848                        }
849                        Ok(result)
850                    }
851                    Some(DataValue::DateTime(dt)) => {
852                        // DateTime columns can use string methods via coercion
853                        let result = dt.contains(&search_str);
854                        if row_index < 3 {
855                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
856                        }
857                        Ok(result)
858                    }
859                    _ => {
860                        if row_index < 3 {
861                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
862                        }
863                        Ok(false)
864                    }
865                }
866            }
867            "startswith" => {
868                if args.len() != 1 {
869                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
870                }
871                let prefix = self.extract_string_value(&args[0])?;
872
873                // Type coercion: convert numeric values to strings for string methods
874                match cell_value {
875                    Some(DataValue::String(ref s)) => {
876                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
877                    }
878                    Some(DataValue::InternedString(ref s)) => {
879                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
880                    }
881                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
882                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
883                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
884                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
885                    _ => Ok(false),
886                }
887            }
888            "endswith" => {
889                if args.len() != 1 {
890                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
891                }
892                let suffix = self.extract_string_value(&args[0])?;
893
894                // Type coercion: convert numeric values to strings for string methods
895                match cell_value {
896                    Some(DataValue::String(ref s)) => {
897                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
898                    }
899                    Some(DataValue::InternedString(ref s)) => {
900                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
901                    }
902                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
903                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
904                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
905                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
906                    _ => Ok(false),
907                }
908            }
909            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
910        }
911    }
912
913    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
914        match expr {
915            SqlExpression::Column(column_ref) => {
916                // Handle qualified column names
917                if column_ref.name.contains('.') {
918                    if let Some(dot_pos) = column_ref.name.rfind('.') {
919                        Ok(column_ref.name[dot_pos + 1..].to_string())
920                    } else {
921                        Ok(column_ref.name.clone())
922                    }
923                } else {
924                    Ok(column_ref.name.clone())
925                }
926            }
927            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
928        }
929    }
930
931    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
932        match expr {
933            SqlExpression::StringLiteral(s) => Ok(s.clone()),
934            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
935        }
936    }
937
938    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
939        match expr {
940            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
941            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
942            SqlExpression::NumberLiteral(n) => {
943                if let Ok(num) = n.parse::<f64>() {
944                    Ok(ExprValue::Number(num))
945                } else {
946                    Ok(ExprValue::String(n.clone()))
947                }
948            }
949            SqlExpression::DateTimeConstructor {
950                year,
951                month,
952                day,
953                hour,
954                minute,
955                second,
956            } => {
957                // Create a DateTime from the constructor
958                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
959                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
960                let naive_time = NaiveTime::from_hms_opt(
961                    hour.unwrap_or(0),
962                    minute.unwrap_or(0),
963                    second.unwrap_or(0),
964                )
965                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
966                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
967                let datetime = Utc.from_utc_datetime(&naive_datetime);
968                Ok(ExprValue::DateTime(datetime))
969            }
970            SqlExpression::DateTimeToday {
971                hour,
972                minute,
973                second,
974            } => {
975                // Get today's date with optional time
976                let today = Local::now().date_naive();
977                let time = NaiveTime::from_hms_opt(
978                    hour.unwrap_or(0),
979                    minute.unwrap_or(0),
980                    second.unwrap_or(0),
981                )
982                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
983                let naive_datetime = NaiveDateTime::new(today, time);
984                let datetime = Utc.from_utc_datetime(&naive_datetime);
985                Ok(ExprValue::DateTime(datetime))
986            }
987            _ => Ok(ExprValue::Null),
988        }
989    }
990
991    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
992    fn evaluate_case_expression_as_bool(
993        &mut self,
994        when_branches: &[crate::sql::recursive_parser::WhenBranch],
995        else_branch: &Option<Box<SqlExpression>>,
996        row_index: usize,
997    ) -> Result<bool> {
998        debug!(
999            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1000            row_index
1001        );
1002
1003        // Evaluate each WHEN condition in order
1004        for branch in when_branches {
1005            // Evaluate the condition as a boolean
1006            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1007
1008            if condition_result {
1009                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1010                // Evaluate the result and convert to boolean
1011                return self.evaluate_expression_as_bool(&branch.result, row_index);
1012            }
1013        }
1014
1015        // If no WHEN condition matched, evaluate ELSE clause (or return false)
1016        if let Some(else_expr) = else_branch {
1017            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1018            self.evaluate_expression_as_bool(else_expr, row_index)
1019        } else {
1020            debug!("CASE: No WHEN matched and no ELSE, returning false");
1021            Ok(false)
1022        }
1023    }
1024
1025    /// Helper method to evaluate any expression as a boolean
1026    fn evaluate_expression_as_bool(
1027        &mut self,
1028        expr: &SqlExpression,
1029        row_index: usize,
1030    ) -> Result<bool> {
1031        match expr {
1032            // For expressions that naturally return booleans, use the existing evaluator
1033            SqlExpression::BinaryOp { .. }
1034            | SqlExpression::InList { .. }
1035            | SqlExpression::NotInList { .. }
1036            | SqlExpression::Between { .. }
1037            | SqlExpression::Not { .. }
1038            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1039            // For CASE expressions, recurse
1040            SqlExpression::CaseExpression {
1041                when_branches,
1042                else_branch,
1043            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1044            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1045            _ => {
1046                // Use ArithmeticEvaluator to get the value, then convert to boolean
1047                let mut evaluator =
1048                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
1049                let value = evaluator.evaluate(expr, row_index)?;
1050
1051                match value {
1052                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1053                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1054                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1055                    crate::data::datatable::DataValue::Null => Ok(false),
1056                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1057                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1058                    _ => Ok(true), // Other types are considered truthy
1059                }
1060            }
1061        }
1062    }
1063}
1064
1065enum ExprValue {
1066    String(String),
1067    Number(f64),
1068    Boolean(bool),
1069    DateTime(DateTime<Utc>),
1070    Null,
1071}