sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::data::evaluation_context::EvaluationContext;
4use crate::data::query_engine::ExecutionContext;
5use crate::data::value_comparisons::compare_with_op;
6use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
9use tracing::debug;
10
11/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
12pub struct RecursiveWhereEvaluator<'a, 'ctx, 'exec> {
13    table: &'a DataTable,
14    case_insensitive: bool,
15    context: Option<&'ctx mut EvaluationContext>,
16    exec_context: Option<&'exec ExecutionContext>,
17}
18
19impl<'a, 'ctx, 'exec> RecursiveWhereEvaluator<'a, 'ctx, 'exec> {
20    #[must_use]
21    pub fn new(table: &'a DataTable) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
22        RecursiveWhereEvaluator {
23            table,
24            case_insensitive: false,
25            context: None,
26            exec_context: None,
27        }
28    }
29
30    /// Create evaluator with an evaluation context for caching
31    pub fn with_context(table: &'a DataTable, context: &'ctx mut EvaluationContext) -> Self {
32        let case_insensitive = context.is_case_insensitive();
33        Self {
34            table,
35            case_insensitive,
36            context: Some(context),
37            exec_context: None,
38        }
39    }
40
41    /// Create evaluator with execution context for alias resolution
42    pub fn with_exec_context(
43        table: &'a DataTable,
44        exec_context: &'exec ExecutionContext,
45        case_insensitive: bool,
46    ) -> Self {
47        Self {
48            table,
49            case_insensitive,
50            context: None,
51            exec_context: Some(exec_context),
52        }
53    }
54
55    /// Find a column name similar to the given name using edit distance
56    fn find_similar_column(&self, name: &str) -> Option<String> {
57        let columns = self.table.column_names();
58        let mut best_match: Option<(String, usize)> = None;
59
60        for col in columns {
61            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
62            // Only suggest if distance is small (likely a typo)
63            // Allow up to 3 edits for longer names
64            let max_distance = if name.len() > 10 { 3 } else { 2 };
65            if distance <= max_distance {
66                match &best_match {
67                    None => best_match = Some((col, distance)),
68                    Some((_, best_dist)) if distance < *best_dist => {
69                        best_match = Some((col, distance));
70                    }
71                    _ => {}
72                }
73            }
74        }
75
76        best_match.map(|(name, _)| name)
77    }
78
79    /// Calculate Levenshtein edit distance between two strings
80    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
81        let len1 = s1.len();
82        let len2 = s2.len();
83        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
84
85        for i in 0..=len1 {
86            matrix[i][0] = i;
87        }
88        for j in 0..=len2 {
89            matrix[0][j] = j;
90        }
91
92        for (i, c1) in s1.chars().enumerate() {
93            for (j, c2) in s2.chars().enumerate() {
94                let cost = usize::from(c1 != c2);
95                matrix[i + 1][j + 1] = std::cmp::min(
96                    matrix[i][j + 1] + 1, // deletion
97                    std::cmp::min(
98                        matrix[i + 1][j] + 1, // insertion
99                        matrix[i][j] + cost,  // substitution
100                    ),
101                );
102            }
103        }
104
105        matrix[len1][len2]
106    }
107
108    #[must_use]
109    pub fn with_case_insensitive(
110        table: &'a DataTable,
111        case_insensitive: bool,
112    ) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
113        RecursiveWhereEvaluator {
114            table,
115            case_insensitive,
116            context: None,
117            exec_context: None,
118        }
119    }
120
121    #[must_use]
122    pub fn with_config(
123        table: &'a DataTable,
124        case_insensitive: bool,
125        _date_notation: String, // No longer needed since we use centralized parse_datetime
126    ) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
127        RecursiveWhereEvaluator {
128            table,
129            case_insensitive,
130            context: None,
131            exec_context: None,
132        }
133    }
134
135    /// Convert ExprValue to DataValue for centralized comparison
136    fn expr_value_to_data_value(&self, expr_value: &ExprValue) -> DataValue {
137        match expr_value {
138            ExprValue::String(s) => DataValue::String(s.clone()),
139            ExprValue::Number(n) => {
140                // Check if it's an integer or float
141                if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
142                    DataValue::Integer(*n as i64)
143                } else {
144                    DataValue::Float(*n)
145                }
146            }
147            ExprValue::Boolean(b) => DataValue::Boolean(*b),
148            ExprValue::DateTime(dt) => {
149                DataValue::DateTime(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
150            }
151            ExprValue::Null => DataValue::Null,
152        }
153    }
154
155    /// Evaluate the `Length()` method on a column value
156    fn evaluate_length(
157        &self,
158        object: &str,
159        row_index: usize,
160    ) -> Result<(Option<DataValue>, String)> {
161        // Handle qualified column names (table.column or alias.column)
162        let resolved_column = if object.contains('.') {
163            if let Some(dot_pos) = object.rfind('.') {
164                let col_name = &object[dot_pos + 1..];
165                col_name
166            } else {
167                object
168            }
169        } else {
170            object
171        };
172
173        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
174            idx
175        } else if resolved_column != object {
176            // If not found, try the original name
177            if let Some(idx) = self.table.get_column_index(object) {
178                idx
179            } else {
180                let suggestion = self.find_similar_column(resolved_column);
181                return Err(match suggestion {
182                    Some(similar) => {
183                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
184                    }
185                    None => anyhow!("Column '{}' not found", object),
186                });
187            }
188        } else {
189            let suggestion = self.find_similar_column(resolved_column);
190            return Err(match suggestion {
191                Some(similar) => {
192                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
193                }
194                None => anyhow!("Column '{}' not found", object),
195            });
196        };
197
198        let value = self.table.get_value(row_index, col_index);
199        let length_value = match value {
200            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
201            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
202            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
203            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
204            _ => Some(DataValue::Integer(0)),
205        };
206        Ok((length_value, format!("{object}.Length()")))
207    }
208
209    /// Evaluate the `IndexOf()` method on a column value
210    fn evaluate_indexof(
211        &self,
212        object: &str,
213        search_str: &str,
214        row_index: usize,
215    ) -> Result<(Option<DataValue>, String)> {
216        // Handle qualified column names (table.column or alias.column)
217        let resolved_column = if object.contains('.') {
218            if let Some(dot_pos) = object.rfind('.') {
219                let col_name = &object[dot_pos + 1..];
220                col_name
221            } else {
222                object
223            }
224        } else {
225            object
226        };
227
228        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
229            idx
230        } else if resolved_column != object {
231            // If not found, try the original name
232            if let Some(idx) = self.table.get_column_index(object) {
233                idx
234            } else {
235                let suggestion = self.find_similar_column(resolved_column);
236                return Err(match suggestion {
237                    Some(similar) => {
238                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
239                    }
240                    None => anyhow!("Column '{}' not found", object),
241                });
242            }
243        } else {
244            let suggestion = self.find_similar_column(resolved_column);
245            return Err(match suggestion {
246                Some(similar) => {
247                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
248                }
249                None => anyhow!("Column '{}' not found", object),
250            });
251        };
252
253        let value = self.table.get_value(row_index, col_index);
254        let index_value = match value {
255            Some(DataValue::String(s)) => {
256                // Case-insensitive search by default, following Contains behavior
257                let pos = s
258                    .to_lowercase()
259                    .find(&search_str.to_lowercase())
260                    .map_or(-1, |idx| idx as i64);
261                Some(DataValue::Integer(pos))
262            }
263            Some(DataValue::InternedString(s)) => {
264                let pos = s
265                    .to_lowercase()
266                    .find(&search_str.to_lowercase())
267                    .map_or(-1, |idx| idx as i64);
268                Some(DataValue::Integer(pos))
269            }
270            Some(DataValue::Integer(n)) => {
271                let str_val = n.to_string();
272                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
273                Some(DataValue::Integer(pos))
274            }
275            Some(DataValue::Float(f)) => {
276                let str_val = f.to_string();
277                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
278                Some(DataValue::Integer(pos))
279            }
280            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
281        };
282
283        if row_index < 3 {
284            debug!(
285                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
286                row_index, search_str, index_value
287            );
288        }
289        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
290    }
291
292    /// Apply the appropriate trim operation based on `trim_type`
293    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
294        match trim_type {
295            "trim" => s.trim(),
296            "trimstart" => s.trim_start(),
297            "trimend" => s.trim_end(),
298            _ => s,
299        }
300    }
301
302    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
303    fn evaluate_trim(
304        &self,
305        object: &str,
306        row_index: usize,
307        trim_type: &str,
308    ) -> Result<(Option<DataValue>, String)> {
309        // Handle qualified column names (table.column or alias.column)
310        let resolved_column = if object.contains('.') {
311            if let Some(dot_pos) = object.rfind('.') {
312                let col_name = &object[dot_pos + 1..];
313                col_name
314            } else {
315                object
316            }
317        } else {
318            object
319        };
320
321        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
322            idx
323        } else if resolved_column != object {
324            // If not found, try the original name
325            if let Some(idx) = self.table.get_column_index(object) {
326                idx
327            } else {
328                let suggestion = self.find_similar_column(resolved_column);
329                return Err(match suggestion {
330                    Some(similar) => {
331                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
332                    }
333                    None => anyhow!("Column '{}' not found", object),
334                });
335            }
336        } else {
337            let suggestion = self.find_similar_column(resolved_column);
338            return Err(match suggestion {
339                Some(similar) => {
340                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
341                }
342                None => anyhow!("Column '{}' not found", object),
343            });
344        };
345
346        let value = self.table.get_value(row_index, col_index);
347        let trimmed_value = match value {
348            Some(DataValue::String(s)) => Some(DataValue::String(
349                Self::apply_trim(s, trim_type).to_string(),
350            )),
351            Some(DataValue::InternedString(s)) => Some(DataValue::String(
352                Self::apply_trim(s, trim_type).to_string(),
353            )),
354            Some(DataValue::Integer(n)) => {
355                let str_val = n.to_string();
356                Some(DataValue::String(
357                    Self::apply_trim(&str_val, trim_type).to_string(),
358                ))
359            }
360            Some(DataValue::Float(f)) => {
361                let str_val = f.to_string();
362                Some(DataValue::String(
363                    Self::apply_trim(&str_val, trim_type).to_string(),
364                ))
365            }
366            _ => Some(DataValue::String(String::new())),
367        };
368
369        let method_name = match trim_type {
370            "trim" => "Trim",
371            "trimstart" => "TrimStart",
372            "trimend" => "TrimEnd",
373            _ => "Trim",
374        };
375        Ok((trimmed_value, format!("{object}.{method_name}()")))
376    }
377
378    /// Evaluate a WHERE clause for a specific row
379    pub fn evaluate(&mut self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
380        // Only log for first few rows to avoid performance impact
381        if row_index < 3 {
382            debug!(
383                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
384                row_index,
385                where_clause.conditions.len(),
386                self.case_insensitive
387            );
388        }
389
390        if where_clause.conditions.is_empty() {
391            if row_index < 3 {
392                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
393            }
394            return Ok(true);
395        }
396
397        // With the new expression tree structure, we should have a single condition
398        // containing the entire WHERE clause expression tree
399        if where_clause.conditions.len() == 1 {
400            // New structure: single expression tree
401            if row_index < 3 {
402                debug!(
403                    "RecursiveWhereEvaluator: evaluate() - evaluating expression tree for row {}",
404                    row_index
405                );
406            }
407            self.evaluate_condition(&where_clause.conditions[0], row_index)
408        } else {
409            // Legacy structure: multiple conditions with connectors
410            // This path is kept for backward compatibility
411            if row_index < 3 {
412                debug!(
413                    "RecursiveWhereEvaluator: evaluate() - evaluating {} conditions with connectors for row {}",
414                    where_clause.conditions.len(),
415                    row_index
416                );
417            }
418            let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
419
420            // Apply connectors (AND/OR) with subsequent conditions
421            for i in 1..where_clause.conditions.len() {
422                let next_result =
423                    self.evaluate_condition(&where_clause.conditions[i], row_index)?;
424
425                // Use the connector from the previous condition
426                if let Some(connector) = &where_clause.conditions[i - 1].connector {
427                    result = match connector {
428                        LogicalOp::And => result && next_result,
429                        LogicalOp::Or => result || next_result,
430                    };
431                }
432            }
433
434            Ok(result)
435        }
436    }
437
438    fn evaluate_condition(&mut self, condition: &Condition, row_index: usize) -> Result<bool> {
439        // Only log first few rows to avoid performance impact
440        if row_index < 3 {
441            debug!(
442                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
443                row_index
444            );
445        }
446        let result = self.evaluate_expression(&condition.expr, row_index);
447        if row_index < 3 {
448            debug!(
449                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
450                row_index, result
451            );
452        }
453        result
454    }
455
456    fn evaluate_expression(&mut self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
457        // Only log first few rows to avoid performance impact
458        if row_index < 3 {
459            debug!(
460                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
461                row_index, expr
462            );
463        }
464
465        let result = match expr {
466            SqlExpression::BinaryOp { left, op, right } => {
467                self.evaluate_binary_op(left, op, right, row_index)
468            }
469            SqlExpression::InList { expr, values } => {
470                self.evaluate_in_list(expr, values, row_index, false)
471            }
472            SqlExpression::NotInList { expr, values } => {
473                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
474                Ok(!in_result)
475            }
476            SqlExpression::Between { expr, lower, upper } => {
477                self.evaluate_between(expr, lower, upper, row_index)
478            }
479            SqlExpression::Not { expr } => {
480                let inner_result = self.evaluate_expression(expr, row_index)?;
481                Ok(!inner_result)
482            }
483            SqlExpression::MethodCall {
484                object,
485                method,
486                args,
487            } => {
488                if row_index < 3 {
489                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
490                }
491                self.evaluate_method_call(object, method, args, row_index)
492            }
493            SqlExpression::CaseExpression {
494                when_branches,
495                else_branch,
496            } => {
497                if row_index < 3 {
498                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
499                }
500                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
501            }
502            _ => {
503                if row_index < 3 {
504                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
505                }
506                Ok(false) // Default to false for unsupported expressions
507            }
508        };
509
510        if row_index < 3 {
511            debug!(
512                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
513                row_index, result
514            );
515        }
516        result
517    }
518
519    fn evaluate_binary_op(
520        &mut self,
521        left: &SqlExpression,
522        op: &str,
523        right: &SqlExpression,
524        row_index: usize,
525    ) -> Result<bool> {
526        // Only log first few rows to avoid performance impact
527        if row_index < 3 {
528            debug!(
529                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
530                row_index, op
531            );
532        }
533
534        // Handle logical operators (AND, OR) specially
535        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
536            let left_result = self.evaluate_expression(left, row_index)?;
537            let right_result = self.evaluate_expression(right, row_index)?;
538
539            return Ok(match op.to_uppercase().as_str() {
540                "OR" => left_result || right_result,
541                "AND" => left_result && right_result,
542                _ => unreachable!(),
543            });
544        }
545
546        // For complex expressions (arithmetic, functions), use ArithmeticEvaluator
547        if matches!(left, SqlExpression::BinaryOp { .. })
548            || matches!(left, SqlExpression::FunctionCall { .. })
549            || matches!(right, SqlExpression::BinaryOp { .. })
550            || matches!(right, SqlExpression::FunctionCall { .. })
551        {
552            let comparison_expr = SqlExpression::BinaryOp {
553                left: Box::new(left.clone()),
554                op: op.to_string(),
555                right: Box::new(right.clone()),
556            };
557
558            let mut evaluator = ArithmeticEvaluator::new(self.table);
559            let result = evaluator.evaluate(&comparison_expr, row_index)?;
560
561            // Convert the result to boolean
562            return match result {
563                DataValue::Boolean(b) => Ok(b),
564                DataValue::Null => Ok(false),
565                _ => Err(anyhow!("Comparison did not return a boolean value")),
566            };
567        }
568
569        // For simple comparisons, use the original WHERE clause logic with improved date parsing
570        // Handle left side - could be a column or a method call
571        let (cell_value, column_name) = match left {
572            SqlExpression::MethodCall {
573                object,
574                method,
575                args,
576            } => {
577                // Handle method calls that return values (like Length(), IndexOf())
578                match method.to_lowercase().as_str() {
579                    "length" => {
580                        if !args.is_empty() {
581                            return Err(anyhow::anyhow!("Length() takes no arguments"));
582                        }
583                        self.evaluate_length(object, row_index)?
584                    }
585                    "indexof" => {
586                        if args.len() != 1 {
587                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
588                        }
589                        let search_str = self.extract_string_value(&args[0])?;
590                        self.evaluate_indexof(object, &search_str, row_index)?
591                    }
592                    "trim" => {
593                        if !args.is_empty() {
594                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
595                        }
596                        self.evaluate_trim(object, row_index, "trim")?
597                    }
598                    "trimstart" => {
599                        if !args.is_empty() {
600                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
601                        }
602                        self.evaluate_trim(object, row_index, "trimstart")?
603                    }
604                    "trimend" => {
605                        if !args.is_empty() {
606                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
607                        }
608                        self.evaluate_trim(object, row_index, "trimend")?
609                    }
610                    _ => {
611                        return Err(anyhow::anyhow!(
612                            "Method '{}' cannot be used in comparisons",
613                            method
614                        ));
615                    }
616                }
617            }
618            _ => {
619                // Regular column reference
620                let column_name = self.extract_column_name(left)?;
621                if row_index < 3 {
622                    debug!(
623                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
624                        column_name
625                    );
626                }
627
628                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
629                    let suggestion = self.find_similar_column(&column_name);
630                    match suggestion {
631                        Some(similar) => anyhow!(
632                            "Column '{}' not found. Did you mean '{}'?",
633                            column_name,
634                            similar
635                        ),
636                        None => anyhow!("Column '{}' not found", column_name),
637                    }
638                })?;
639
640                let cell_value = self.table.get_value(row_index, col_index).cloned();
641                (cell_value, column_name)
642            }
643        };
644
645        if row_index < 3 {
646            debug!(
647                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
648                row_index, column_name, cell_value
649            );
650        }
651
652        // Get comparison value from right side
653        let compare_value = self.extract_value(right)?;
654
655        // Handle special operators that aren't standard comparisons
656        let op_upper = op.to_uppercase();
657        match op_upper.as_str() {
658            // LIKE operator - handle specially
659            "LIKE" => {
660                let table_value = cell_value.unwrap_or(DataValue::Null);
661                let pattern = match compare_value {
662                    ExprValue::String(s) => s,
663                    _ => return Ok(false),
664                };
665
666                let text = match &table_value {
667                    DataValue::String(s) => s.as_str(),
668                    DataValue::InternedString(s) => s.as_str(),
669                    _ => return Ok(false),
670                };
671
672                // Use cached regex if context is available, otherwise compile fresh
673                if let Some(ctx) = &mut self.context {
674                    let regex = ctx
675                        .get_or_compile_like_regex(&pattern)
676                        .map_err(|e| anyhow::anyhow!("{}", e))?;
677                    Ok(regex.is_match(text))
678                } else {
679                    // Fallback to compiling regex each time (old behavior)
680                    let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
681                    let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
682                        .case_insensitive(self.case_insensitive)
683                        .build()
684                        .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
685                    Ok(regex.is_match(text))
686                }
687            }
688
689            // IS NULL / IS NOT NULL
690            "IS NULL" => Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null))),
691            "IS NOT NULL" => {
692                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
693            }
694
695            // Handle IS / IS NOT with NULL explicitly
696            "IS" if matches!(compare_value, ExprValue::Null) => {
697                Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null)))
698            }
699            "IS NOT" if matches!(compare_value, ExprValue::Null) => {
700                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
701            }
702
703            // Standard comparison operators - use centralized logic
704            _ => {
705                let table_value = cell_value.unwrap_or(DataValue::Null);
706                let comparison_value = self.expr_value_to_data_value(&compare_value);
707
708                if row_index < 3 {
709                    debug!(
710                        "RecursiveWhereEvaluator: Using centralized comparison - table: {:?}, op: '{}', comparison: {:?}, case_insensitive: {}",
711                        table_value, op, comparison_value, self.case_insensitive
712                    );
713                }
714
715                Ok(compare_with_op(
716                    &table_value,
717                    &comparison_value,
718                    op,
719                    self.case_insensitive,
720                ))
721            }
722        }
723    }
724
725    fn evaluate_in_list(
726        &self,
727        expr: &SqlExpression,
728        values: &[SqlExpression],
729        row_index: usize,
730        _ignore_case: bool,
731    ) -> Result<bool> {
732        let column_name = self.extract_column_name(expr)?;
733        let col_index = self
734            .table
735            .get_column_index(&column_name)
736            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
737
738        let cell_value = self.table.get_value(row_index, col_index).cloned();
739
740        for value_expr in values {
741            let compare_value = self.extract_value(value_expr)?;
742            let table_value = cell_value.as_ref().unwrap_or(&DataValue::Null);
743            let comparison_value = self.expr_value_to_data_value(&compare_value);
744
745            // Use centralized comparison for equality
746            if compare_with_op(table_value, &comparison_value, "=", self.case_insensitive) {
747                return Ok(true);
748            }
749        }
750
751        Ok(false)
752    }
753
754    fn evaluate_between(
755        &self,
756        expr: &SqlExpression,
757        lower: &SqlExpression,
758        upper: &SqlExpression,
759        row_index: usize,
760    ) -> Result<bool> {
761        let column_name = self.extract_column_name(expr)?;
762        let col_index = self
763            .table
764            .get_column_index(&column_name)
765            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
766
767        let cell_value = self.table.get_value(row_index, col_index).cloned();
768        let lower_value = self.extract_value(lower)?;
769        let upper_value = self.extract_value(upper)?;
770
771        let table_value = cell_value.unwrap_or(DataValue::Null);
772        let lower_data_value = self.expr_value_to_data_value(&lower_value);
773        let upper_data_value = self.expr_value_to_data_value(&upper_value);
774
775        // Use centralized comparison for BETWEEN: value >= lower AND value <= upper
776        let ge_lower =
777            compare_with_op(&table_value, &lower_data_value, ">=", self.case_insensitive);
778        let le_upper =
779            compare_with_op(&table_value, &upper_data_value, "<=", self.case_insensitive);
780
781        Ok(ge_lower && le_upper)
782    }
783
784    fn evaluate_method_call(
785        &self,
786        object: &str,
787        method: &str,
788        args: &[SqlExpression],
789        row_index: usize,
790    ) -> Result<bool> {
791        if row_index < 3 {
792            debug!(
793                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
794                object, method, row_index
795            );
796        }
797
798        // Get column value
799        let col_index = self.table.get_column_index(object).ok_or_else(|| {
800            let suggestion = self.find_similar_column(object);
801            match suggestion {
802                Some(similar) => {
803                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
804                }
805                None => anyhow!("Column '{}' not found", object),
806            }
807        })?;
808
809        let cell_value = self.table.get_value(row_index, col_index).cloned();
810        if row_index < 3 {
811            debug!(
812                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
813                row_index, object, cell_value
814            );
815        }
816
817        match method.to_lowercase().as_str() {
818            "contains" => {
819                if args.len() != 1 {
820                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
821                }
822                let search_str = self.extract_string_value(&args[0])?;
823                // Pre-compute lowercase once instead of for every row
824                let search_lower = search_str.to_lowercase();
825
826                // Type coercion: convert numeric values to strings for string methods
827                match cell_value {
828                    Some(DataValue::String(ref s)) => {
829                        let result = s.to_lowercase().contains(&search_lower);
830                        // Only log first few rows to avoid performance impact
831                        if row_index < 3 {
832                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
833                        }
834                        Ok(result)
835                    }
836                    Some(DataValue::InternedString(ref s)) => {
837                        let result = s.to_lowercase().contains(&search_lower);
838                        // Only log first few rows to avoid performance impact
839                        if row_index < 3 {
840                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
841                        }
842                        Ok(result)
843                    }
844                    Some(DataValue::Integer(n)) => {
845                        let str_val = n.to_string();
846                        let result = str_val.contains(&search_str);
847                        if row_index < 3 {
848                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
849                        }
850                        Ok(result)
851                    }
852                    Some(DataValue::Float(f)) => {
853                        let str_val = f.to_string();
854                        let result = str_val.contains(&search_str);
855                        if row_index < 3 {
856                            debug!(
857                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
858                                row_index, search_str, str_val, result
859                            );
860                        }
861                        Ok(result)
862                    }
863                    Some(DataValue::Boolean(b)) => {
864                        let str_val = b.to_string();
865                        let result = str_val.contains(&search_str);
866                        if row_index < 3 {
867                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
868                        }
869                        Ok(result)
870                    }
871                    Some(DataValue::DateTime(dt)) => {
872                        // DateTime columns can use string methods via coercion
873                        let result = dt.contains(&search_str);
874                        if row_index < 3 {
875                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
876                        }
877                        Ok(result)
878                    }
879                    _ => {
880                        if row_index < 3 {
881                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
882                        }
883                        Ok(false)
884                    }
885                }
886            }
887            "startswith" => {
888                if args.len() != 1 {
889                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
890                }
891                let prefix = self.extract_string_value(&args[0])?;
892
893                // Type coercion: convert numeric values to strings for string methods
894                match cell_value {
895                    Some(DataValue::String(ref s)) => {
896                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
897                    }
898                    Some(DataValue::InternedString(ref s)) => {
899                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
900                    }
901                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
902                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
903                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
904                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
905                    _ => Ok(false),
906                }
907            }
908            "endswith" => {
909                if args.len() != 1 {
910                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
911                }
912                let suffix = self.extract_string_value(&args[0])?;
913
914                // Type coercion: convert numeric values to strings for string methods
915                match cell_value {
916                    Some(DataValue::String(ref s)) => {
917                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
918                    }
919                    Some(DataValue::InternedString(ref s)) => {
920                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
921                    }
922                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
923                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
924                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
925                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
926                    _ => Ok(false),
927                }
928            }
929            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
930        }
931    }
932
933    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
934        match expr {
935            SqlExpression::Column(column_ref) => {
936                // Use ExecutionContext for proper alias resolution if available
937                if let Some(exec_ctx) = self.exec_context {
938                    // Use the unified column resolution
939                    let col_idx = exec_ctx.resolve_column_index(self.table, column_ref)?;
940                    // Return the actual column name (not the qualified one)
941                    Ok(self.table.column_names()[col_idx].clone())
942                } else {
943                    // Fallback: Handle qualified column names the old way (for backward compatibility)
944                    if column_ref.name.contains('.') {
945                        if let Some(dot_pos) = column_ref.name.rfind('.') {
946                            Ok(column_ref.name[dot_pos + 1..].to_string())
947                        } else {
948                            Ok(column_ref.name.clone())
949                        }
950                    } else {
951                        Ok(column_ref.name.clone())
952                    }
953                }
954            }
955            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
956        }
957    }
958
959    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
960        match expr {
961            SqlExpression::StringLiteral(s) => Ok(s.clone()),
962            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
963        }
964    }
965
966    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
967        match expr {
968            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
969            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
970            SqlExpression::NumberLiteral(n) => {
971                if let Ok(num) = n.parse::<f64>() {
972                    Ok(ExprValue::Number(num))
973                } else {
974                    Ok(ExprValue::String(n.clone()))
975                }
976            }
977            SqlExpression::DateTimeConstructor {
978                year,
979                month,
980                day,
981                hour,
982                minute,
983                second,
984            } => {
985                // Create a DateTime from the constructor
986                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
987                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
988                let naive_time = NaiveTime::from_hms_opt(
989                    hour.unwrap_or(0),
990                    minute.unwrap_or(0),
991                    second.unwrap_or(0),
992                )
993                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
994                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
995                let datetime = Utc.from_utc_datetime(&naive_datetime);
996                Ok(ExprValue::DateTime(datetime))
997            }
998            SqlExpression::DateTimeToday {
999                hour,
1000                minute,
1001                second,
1002            } => {
1003                // Get today's date with optional time
1004                let today = Local::now().date_naive();
1005                let time = NaiveTime::from_hms_opt(
1006                    hour.unwrap_or(0),
1007                    minute.unwrap_or(0),
1008                    second.unwrap_or(0),
1009                )
1010                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1011                let naive_datetime = NaiveDateTime::new(today, time);
1012                let datetime = Utc.from_utc_datetime(&naive_datetime);
1013                Ok(ExprValue::DateTime(datetime))
1014            }
1015            _ => Ok(ExprValue::Null),
1016        }
1017    }
1018
1019    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
1020    fn evaluate_case_expression_as_bool(
1021        &mut self,
1022        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1023        else_branch: &Option<Box<SqlExpression>>,
1024        row_index: usize,
1025    ) -> Result<bool> {
1026        debug!(
1027            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1028            row_index
1029        );
1030
1031        // Evaluate each WHEN condition in order
1032        for branch in when_branches {
1033            // Evaluate the condition as a boolean
1034            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1035
1036            if condition_result {
1037                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1038                // Evaluate the result and convert to boolean
1039                return self.evaluate_expression_as_bool(&branch.result, row_index);
1040            }
1041        }
1042
1043        // If no WHEN condition matched, evaluate ELSE clause (or return false)
1044        if let Some(else_expr) = else_branch {
1045            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1046            self.evaluate_expression_as_bool(else_expr, row_index)
1047        } else {
1048            debug!("CASE: No WHEN matched and no ELSE, returning false");
1049            Ok(false)
1050        }
1051    }
1052
1053    /// Helper method to evaluate any expression as a boolean
1054    fn evaluate_expression_as_bool(
1055        &mut self,
1056        expr: &SqlExpression,
1057        row_index: usize,
1058    ) -> Result<bool> {
1059        match expr {
1060            // For expressions that naturally return booleans, use the existing evaluator
1061            SqlExpression::BinaryOp { .. }
1062            | SqlExpression::InList { .. }
1063            | SqlExpression::NotInList { .. }
1064            | SqlExpression::Between { .. }
1065            | SqlExpression::Not { .. }
1066            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1067            // For CASE expressions, recurse
1068            SqlExpression::CaseExpression {
1069                when_branches,
1070                else_branch,
1071            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1072            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1073            _ => {
1074                // Use ArithmeticEvaluator to get the value, then convert to boolean
1075                let mut evaluator =
1076                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
1077                let value = evaluator.evaluate(expr, row_index)?;
1078
1079                match value {
1080                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1081                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1082                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1083                    crate::data::datatable::DataValue::Null => Ok(false),
1084                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1085                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1086                    _ => Ok(true), // Other types are considered truthy
1087                }
1088            }
1089        }
1090    }
1091}
1092
1093enum ExprValue {
1094    String(String),
1095    Number(f64),
1096    Boolean(bool),
1097    DateTime(DateTime<Utc>),
1098    Null,
1099}