sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::data::evaluation_context::EvaluationContext;
4use crate::data::query_engine::ExecutionContext;
5use crate::data::value_comparisons::compare_with_op;
6use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
7use anyhow::{anyhow, Result};
8use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
9use tracing::debug;
10
11/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
12pub struct RecursiveWhereEvaluator<'a, 'ctx, 'exec> {
13    table: &'a DataTable,
14    case_insensitive: bool,
15    context: Option<&'ctx mut EvaluationContext>,
16    exec_context: Option<&'exec ExecutionContext>,
17}
18
19impl<'a, 'ctx, 'exec> RecursiveWhereEvaluator<'a, 'ctx, 'exec> {
20    #[must_use]
21    pub fn new(table: &'a DataTable) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
22        RecursiveWhereEvaluator {
23            table,
24            case_insensitive: false,
25            context: None,
26            exec_context: None,
27        }
28    }
29
30    /// Create evaluator with an evaluation context for caching
31    pub fn with_context(table: &'a DataTable, context: &'ctx mut EvaluationContext) -> Self {
32        let case_insensitive = context.is_case_insensitive();
33        Self {
34            table,
35            case_insensitive,
36            context: Some(context),
37            exec_context: None,
38        }
39    }
40
41    /// Create evaluator with execution context for alias resolution
42    pub fn with_exec_context(
43        table: &'a DataTable,
44        exec_context: &'exec ExecutionContext,
45        case_insensitive: bool,
46    ) -> Self {
47        Self {
48            table,
49            case_insensitive,
50            context: None,
51            exec_context: Some(exec_context),
52        }
53    }
54
55    /// Create evaluator with both execution context (for alias resolution) and evaluation context (for regex caching)
56    pub fn with_both_contexts(
57        table: &'a DataTable,
58        context: &'ctx mut EvaluationContext,
59        exec_context: &'exec ExecutionContext,
60    ) -> Self {
61        let case_insensitive = context.is_case_insensitive();
62        Self {
63            table,
64            case_insensitive,
65            context: Some(context),
66            exec_context: Some(exec_context),
67        }
68    }
69
70    /// Find a column name similar to the given name using edit distance
71    fn find_similar_column(&self, name: &str) -> Option<String> {
72        let columns = self.table.column_names();
73        let mut best_match: Option<(String, usize)> = None;
74
75        for col in columns {
76            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
77            // Only suggest if distance is small (likely a typo)
78            // Allow up to 3 edits for longer names
79            let max_distance = if name.len() > 10 { 3 } else { 2 };
80            if distance <= max_distance {
81                match &best_match {
82                    None => best_match = Some((col, distance)),
83                    Some((_, best_dist)) if distance < *best_dist => {
84                        best_match = Some((col, distance));
85                    }
86                    _ => {}
87                }
88            }
89        }
90
91        best_match.map(|(name, _)| name)
92    }
93
94    /// Calculate Levenshtein edit distance between two strings
95    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
96        let len1 = s1.len();
97        let len2 = s2.len();
98        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
99
100        for i in 0..=len1 {
101            matrix[i][0] = i;
102        }
103        for j in 0..=len2 {
104            matrix[0][j] = j;
105        }
106
107        for (i, c1) in s1.chars().enumerate() {
108            for (j, c2) in s2.chars().enumerate() {
109                let cost = usize::from(c1 != c2);
110                matrix[i + 1][j + 1] = std::cmp::min(
111                    matrix[i][j + 1] + 1, // deletion
112                    std::cmp::min(
113                        matrix[i + 1][j] + 1, // insertion
114                        matrix[i][j] + cost,  // substitution
115                    ),
116                );
117            }
118        }
119
120        matrix[len1][len2]
121    }
122
123    #[must_use]
124    pub fn with_case_insensitive(
125        table: &'a DataTable,
126        case_insensitive: bool,
127    ) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
128        RecursiveWhereEvaluator {
129            table,
130            case_insensitive,
131            context: None,
132            exec_context: None,
133        }
134    }
135
136    #[must_use]
137    pub fn with_config(
138        table: &'a DataTable,
139        case_insensitive: bool,
140        _date_notation: String, // No longer needed since we use centralized parse_datetime
141    ) -> RecursiveWhereEvaluator<'a, 'static, 'static> {
142        RecursiveWhereEvaluator {
143            table,
144            case_insensitive,
145            context: None,
146            exec_context: None,
147        }
148    }
149
150    /// Convert ExprValue to DataValue for centralized comparison
151    fn expr_value_to_data_value(&self, expr_value: &ExprValue) -> DataValue {
152        match expr_value {
153            ExprValue::String(s) => DataValue::String(s.clone()),
154            ExprValue::Number(n) => {
155                // Check if it's an integer or float
156                if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
157                    DataValue::Integer(*n as i64)
158                } else {
159                    DataValue::Float(*n)
160                }
161            }
162            ExprValue::Boolean(b) => DataValue::Boolean(*b),
163            ExprValue::DateTime(dt) => {
164                DataValue::DateTime(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
165            }
166            ExprValue::Null => DataValue::Null,
167        }
168    }
169
170    /// Evaluate the `Length()` method on a column value
171    fn evaluate_length(
172        &self,
173        object: &str,
174        row_index: usize,
175    ) -> Result<(Option<DataValue>, String)> {
176        // Handle qualified column names (table.column or alias.column)
177        let resolved_column = if object.contains('.') {
178            if let Some(dot_pos) = object.rfind('.') {
179                let col_name = &object[dot_pos + 1..];
180                col_name
181            } else {
182                object
183            }
184        } else {
185            object
186        };
187
188        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
189            idx
190        } else if resolved_column != object {
191            // If not found, try the original name
192            if let Some(idx) = self.table.get_column_index(object) {
193                idx
194            } else {
195                let suggestion = self.find_similar_column(resolved_column);
196                return Err(match suggestion {
197                    Some(similar) => {
198                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
199                    }
200                    None => anyhow!("Column '{}' not found", object),
201                });
202            }
203        } else {
204            let suggestion = self.find_similar_column(resolved_column);
205            return Err(match suggestion {
206                Some(similar) => {
207                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
208                }
209                None => anyhow!("Column '{}' not found", object),
210            });
211        };
212
213        let value = self.table.get_value(row_index, col_index);
214        let length_value = match value {
215            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
216            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
217            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
218            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
219            _ => Some(DataValue::Integer(0)),
220        };
221        Ok((length_value, format!("{object}.Length()")))
222    }
223
224    /// Evaluate the `IndexOf()` method on a column value
225    fn evaluate_indexof(
226        &self,
227        object: &str,
228        search_str: &str,
229        row_index: usize,
230    ) -> Result<(Option<DataValue>, String)> {
231        // Handle qualified column names (table.column or alias.column)
232        let resolved_column = if object.contains('.') {
233            if let Some(dot_pos) = object.rfind('.') {
234                let col_name = &object[dot_pos + 1..];
235                col_name
236            } else {
237                object
238            }
239        } else {
240            object
241        };
242
243        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
244            idx
245        } else if resolved_column != object {
246            // If not found, try the original name
247            if let Some(idx) = self.table.get_column_index(object) {
248                idx
249            } else {
250                let suggestion = self.find_similar_column(resolved_column);
251                return Err(match suggestion {
252                    Some(similar) => {
253                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
254                    }
255                    None => anyhow!("Column '{}' not found", object),
256                });
257            }
258        } else {
259            let suggestion = self.find_similar_column(resolved_column);
260            return Err(match suggestion {
261                Some(similar) => {
262                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
263                }
264                None => anyhow!("Column '{}' not found", object),
265            });
266        };
267
268        let value = self.table.get_value(row_index, col_index);
269        let index_value = match value {
270            Some(DataValue::String(s)) => {
271                // Case-insensitive search by default, following Contains behavior
272                let pos = s
273                    .to_lowercase()
274                    .find(&search_str.to_lowercase())
275                    .map_or(-1, |idx| idx as i64);
276                Some(DataValue::Integer(pos))
277            }
278            Some(DataValue::InternedString(s)) => {
279                let pos = s
280                    .to_lowercase()
281                    .find(&search_str.to_lowercase())
282                    .map_or(-1, |idx| idx as i64);
283                Some(DataValue::Integer(pos))
284            }
285            Some(DataValue::Integer(n)) => {
286                let str_val = n.to_string();
287                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
288                Some(DataValue::Integer(pos))
289            }
290            Some(DataValue::Float(f)) => {
291                let str_val = f.to_string();
292                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
293                Some(DataValue::Integer(pos))
294            }
295            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
296        };
297
298        if row_index < 3 {
299            debug!(
300                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
301                row_index, search_str, index_value
302            );
303        }
304        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
305    }
306
307    /// Apply the appropriate trim operation based on `trim_type`
308    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
309        match trim_type {
310            "trim" => s.trim(),
311            "trimstart" => s.trim_start(),
312            "trimend" => s.trim_end(),
313            _ => s,
314        }
315    }
316
317    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
318    fn evaluate_trim(
319        &self,
320        object: &str,
321        row_index: usize,
322        trim_type: &str,
323    ) -> Result<(Option<DataValue>, String)> {
324        // Handle qualified column names (table.column or alias.column)
325        let resolved_column = if object.contains('.') {
326            if let Some(dot_pos) = object.rfind('.') {
327                let col_name = &object[dot_pos + 1..];
328                col_name
329            } else {
330                object
331            }
332        } else {
333            object
334        };
335
336        let col_index = if let Some(idx) = self.table.get_column_index(resolved_column) {
337            idx
338        } else if resolved_column != object {
339            // If not found, try the original name
340            if let Some(idx) = self.table.get_column_index(object) {
341                idx
342            } else {
343                let suggestion = self.find_similar_column(resolved_column);
344                return Err(match suggestion {
345                    Some(similar) => {
346                        anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
347                    }
348                    None => anyhow!("Column '{}' not found", object),
349                });
350            }
351        } else {
352            let suggestion = self.find_similar_column(resolved_column);
353            return Err(match suggestion {
354                Some(similar) => {
355                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
356                }
357                None => anyhow!("Column '{}' not found", object),
358            });
359        };
360
361        let value = self.table.get_value(row_index, col_index);
362        let trimmed_value = match value {
363            Some(DataValue::String(s)) => Some(DataValue::String(
364                Self::apply_trim(s, trim_type).to_string(),
365            )),
366            Some(DataValue::InternedString(s)) => Some(DataValue::String(
367                Self::apply_trim(s, trim_type).to_string(),
368            )),
369            Some(DataValue::Integer(n)) => {
370                let str_val = n.to_string();
371                Some(DataValue::String(
372                    Self::apply_trim(&str_val, trim_type).to_string(),
373                ))
374            }
375            Some(DataValue::Float(f)) => {
376                let str_val = f.to_string();
377                Some(DataValue::String(
378                    Self::apply_trim(&str_val, trim_type).to_string(),
379                ))
380            }
381            _ => Some(DataValue::String(String::new())),
382        };
383
384        let method_name = match trim_type {
385            "trim" => "Trim",
386            "trimstart" => "TrimStart",
387            "trimend" => "TrimEnd",
388            _ => "Trim",
389        };
390        Ok((trimmed_value, format!("{object}.{method_name}()")))
391    }
392
393    /// Evaluate a WHERE clause for a specific row
394    pub fn evaluate(&mut self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
395        // Only log for first few rows to avoid performance impact
396        if row_index < 3 {
397            debug!(
398                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
399                row_index,
400                where_clause.conditions.len(),
401                self.case_insensitive
402            );
403        }
404
405        if where_clause.conditions.is_empty() {
406            if row_index < 3 {
407                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
408            }
409            return Ok(true);
410        }
411
412        // With the new expression tree structure, we should have a single condition
413        // containing the entire WHERE clause expression tree
414        if where_clause.conditions.len() == 1 {
415            // New structure: single expression tree
416            if row_index < 3 {
417                debug!(
418                    "RecursiveWhereEvaluator: evaluate() - evaluating expression tree for row {}",
419                    row_index
420                );
421            }
422            self.evaluate_condition(&where_clause.conditions[0], row_index)
423        } else {
424            // Legacy structure: multiple conditions with connectors
425            // This path is kept for backward compatibility
426            if row_index < 3 {
427                debug!(
428                    "RecursiveWhereEvaluator: evaluate() - evaluating {} conditions with connectors for row {}",
429                    where_clause.conditions.len(),
430                    row_index
431                );
432            }
433            let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
434
435            // Apply connectors (AND/OR) with subsequent conditions
436            for i in 1..where_clause.conditions.len() {
437                let next_result =
438                    self.evaluate_condition(&where_clause.conditions[i], row_index)?;
439
440                // Use the connector from the previous condition
441                if let Some(connector) = &where_clause.conditions[i - 1].connector {
442                    result = match connector {
443                        LogicalOp::And => result && next_result,
444                        LogicalOp::Or => result || next_result,
445                    };
446                }
447            }
448
449            Ok(result)
450        }
451    }
452
453    fn evaluate_condition(&mut self, condition: &Condition, row_index: usize) -> Result<bool> {
454        // Only log first few rows to avoid performance impact
455        if row_index < 3 {
456            debug!(
457                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
458                row_index
459            );
460        }
461        let result = self.evaluate_expression(&condition.expr, row_index);
462        if row_index < 3 {
463            debug!(
464                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
465                row_index, result
466            );
467        }
468        result
469    }
470
471    fn evaluate_expression(&mut self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
472        // Only log first few rows to avoid performance impact
473        if row_index < 3 {
474            debug!(
475                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
476                row_index, expr
477            );
478        }
479
480        let result = match expr {
481            SqlExpression::BinaryOp { left, op, right } => {
482                self.evaluate_binary_op(left, op, right, row_index)
483            }
484            SqlExpression::InList { expr, values } => {
485                self.evaluate_in_list(expr, values, row_index, false)
486            }
487            SqlExpression::NotInList { expr, values } => {
488                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
489                Ok(!in_result)
490            }
491            SqlExpression::Between { expr, lower, upper } => {
492                self.evaluate_between(expr, lower, upper, row_index)
493            }
494            SqlExpression::Not { expr } => {
495                let inner_result = self.evaluate_expression(expr, row_index)?;
496                Ok(!inner_result)
497            }
498            SqlExpression::MethodCall {
499                object,
500                method,
501                args,
502            } => {
503                if row_index < 3 {
504                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
505                }
506                self.evaluate_method_call(object, method, args, row_index)
507            }
508            SqlExpression::CaseExpression {
509                when_branches,
510                else_branch,
511            } => {
512                if row_index < 3 {
513                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
514                }
515                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
516            }
517            _ => {
518                if row_index < 3 {
519                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
520                }
521                Ok(false) // Default to false for unsupported expressions
522            }
523        };
524
525        if row_index < 3 {
526            debug!(
527                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
528                row_index, result
529            );
530        }
531        result
532    }
533
534    fn evaluate_binary_op(
535        &mut self,
536        left: &SqlExpression,
537        op: &str,
538        right: &SqlExpression,
539        row_index: usize,
540    ) -> Result<bool> {
541        // Only log first few rows to avoid performance impact
542        if row_index < 3 {
543            debug!(
544                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
545                row_index, op
546            );
547        }
548
549        // Handle logical operators (AND, OR) specially
550        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
551            let left_result = self.evaluate_expression(left, row_index)?;
552            let right_result = self.evaluate_expression(right, row_index)?;
553
554            return Ok(match op.to_uppercase().as_str() {
555                "OR" => left_result || right_result,
556                "AND" => left_result && right_result,
557                _ => unreachable!(),
558            });
559        }
560
561        // For complex expressions (arithmetic, functions), use ArithmeticEvaluator
562        if matches!(left, SqlExpression::BinaryOp { .. })
563            || matches!(left, SqlExpression::FunctionCall { .. })
564            || matches!(right, SqlExpression::BinaryOp { .. })
565            || matches!(right, SqlExpression::FunctionCall { .. })
566        {
567            let comparison_expr = SqlExpression::BinaryOp {
568                left: Box::new(left.clone()),
569                op: op.to_string(),
570                right: Box::new(right.clone()),
571            };
572
573            let mut evaluator = ArithmeticEvaluator::new(self.table);
574            let result = evaluator.evaluate(&comparison_expr, row_index)?;
575
576            // Convert the result to boolean
577            return match result {
578                DataValue::Boolean(b) => Ok(b),
579                DataValue::Null => Ok(false),
580                _ => Err(anyhow!("Comparison did not return a boolean value")),
581            };
582        }
583
584        // For simple comparisons, use the original WHERE clause logic with improved date parsing
585        // Handle left side - could be a column or a method call
586        let (cell_value, column_name) = match left {
587            SqlExpression::MethodCall {
588                object,
589                method,
590                args,
591            } => {
592                // Handle method calls that return values (like Length(), IndexOf())
593                match method.to_lowercase().as_str() {
594                    "length" => {
595                        if !args.is_empty() {
596                            return Err(anyhow::anyhow!("Length() takes no arguments"));
597                        }
598                        self.evaluate_length(object, row_index)?
599                    }
600                    "indexof" => {
601                        if args.len() != 1 {
602                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
603                        }
604                        let search_str = self.extract_string_value(&args[0])?;
605                        self.evaluate_indexof(object, &search_str, row_index)?
606                    }
607                    "trim" => {
608                        if !args.is_empty() {
609                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
610                        }
611                        self.evaluate_trim(object, row_index, "trim")?
612                    }
613                    "trimstart" => {
614                        if !args.is_empty() {
615                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
616                        }
617                        self.evaluate_trim(object, row_index, "trimstart")?
618                    }
619                    "trimend" => {
620                        if !args.is_empty() {
621                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
622                        }
623                        self.evaluate_trim(object, row_index, "trimend")?
624                    }
625                    _ => {
626                        return Err(anyhow::anyhow!(
627                            "Method '{}' cannot be used in comparisons",
628                            method
629                        ));
630                    }
631                }
632            }
633            _ => {
634                // Regular column reference
635                let column_name = self.extract_column_name(left)?;
636                if row_index < 3 {
637                    debug!(
638                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
639                        column_name
640                    );
641                }
642
643                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
644                    let suggestion = self.find_similar_column(&column_name);
645                    match suggestion {
646                        Some(similar) => anyhow!(
647                            "Column '{}' not found. Did you mean '{}'?",
648                            column_name,
649                            similar
650                        ),
651                        None => anyhow!("Column '{}' not found", column_name),
652                    }
653                })?;
654
655                let cell_value = self.table.get_value(row_index, col_index).cloned();
656                (cell_value, column_name)
657            }
658        };
659
660        if row_index < 3 {
661            debug!(
662                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
663                row_index, column_name, cell_value
664            );
665        }
666
667        // Get comparison value from right side
668        let compare_value = self.extract_value(right)?;
669
670        // Handle special operators that aren't standard comparisons
671        let op_upper = op.to_uppercase();
672        match op_upper.as_str() {
673            // LIKE operator - handle specially
674            "LIKE" => {
675                let table_value = cell_value.unwrap_or(DataValue::Null);
676                let pattern = match compare_value {
677                    ExprValue::String(s) => s,
678                    _ => return Ok(false),
679                };
680
681                let text = match &table_value {
682                    DataValue::String(s) => s.as_str(),
683                    DataValue::InternedString(s) => s.as_str(),
684                    _ => return Ok(false),
685                };
686
687                // Use cached regex if context is available, otherwise compile fresh
688                if let Some(ctx) = &mut self.context {
689                    let regex = ctx
690                        .get_or_compile_like_regex(&pattern)
691                        .map_err(|e| anyhow::anyhow!("{}", e))?;
692                    Ok(regex.is_match(text))
693                } else {
694                    // Fallback to compiling regex each time (old behavior)
695                    let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
696                    let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
697                        .case_insensitive(self.case_insensitive)
698                        .build()
699                        .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
700                    Ok(regex.is_match(text))
701                }
702            }
703
704            // IS NULL / IS NOT NULL
705            "IS NULL" => Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null))),
706            "IS NOT NULL" => {
707                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
708            }
709
710            // Handle IS / IS NOT with NULL explicitly
711            "IS" if matches!(compare_value, ExprValue::Null) => {
712                Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null)))
713            }
714            "IS NOT" if matches!(compare_value, ExprValue::Null) => {
715                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
716            }
717
718            // Standard comparison operators - use centralized logic
719            _ => {
720                let table_value = cell_value.unwrap_or(DataValue::Null);
721                let comparison_value = self.expr_value_to_data_value(&compare_value);
722
723                if row_index < 3 {
724                    debug!(
725                        "RecursiveWhereEvaluator: Using centralized comparison - table: {:?}, op: '{}', comparison: {:?}, case_insensitive: {}",
726                        table_value, op, comparison_value, self.case_insensitive
727                    );
728                }
729
730                Ok(compare_with_op(
731                    &table_value,
732                    &comparison_value,
733                    op,
734                    self.case_insensitive,
735                ))
736            }
737        }
738    }
739
740    fn evaluate_in_list(
741        &self,
742        expr: &SqlExpression,
743        values: &[SqlExpression],
744        row_index: usize,
745        _ignore_case: bool,
746    ) -> Result<bool> {
747        let column_name = self.extract_column_name(expr)?;
748        let col_index = self
749            .table
750            .get_column_index(&column_name)
751            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
752
753        let cell_value = self.table.get_value(row_index, col_index).cloned();
754
755        for value_expr in values {
756            let compare_value = self.extract_value(value_expr)?;
757            let table_value = cell_value.as_ref().unwrap_or(&DataValue::Null);
758            let comparison_value = self.expr_value_to_data_value(&compare_value);
759
760            // Use centralized comparison for equality
761            if compare_with_op(table_value, &comparison_value, "=", self.case_insensitive) {
762                return Ok(true);
763            }
764        }
765
766        Ok(false)
767    }
768
769    fn evaluate_between(
770        &self,
771        expr: &SqlExpression,
772        lower: &SqlExpression,
773        upper: &SqlExpression,
774        row_index: usize,
775    ) -> Result<bool> {
776        let column_name = self.extract_column_name(expr)?;
777        let col_index = self
778            .table
779            .get_column_index(&column_name)
780            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
781
782        let cell_value = self.table.get_value(row_index, col_index).cloned();
783        let lower_value = self.extract_value(lower)?;
784        let upper_value = self.extract_value(upper)?;
785
786        let table_value = cell_value.unwrap_or(DataValue::Null);
787        let lower_data_value = self.expr_value_to_data_value(&lower_value);
788        let upper_data_value = self.expr_value_to_data_value(&upper_value);
789
790        // Use centralized comparison for BETWEEN: value >= lower AND value <= upper
791        let ge_lower =
792            compare_with_op(&table_value, &lower_data_value, ">=", self.case_insensitive);
793        let le_upper =
794            compare_with_op(&table_value, &upper_data_value, "<=", self.case_insensitive);
795
796        Ok(ge_lower && le_upper)
797    }
798
799    fn evaluate_method_call(
800        &self,
801        object: &str,
802        method: &str,
803        args: &[SqlExpression],
804        row_index: usize,
805    ) -> Result<bool> {
806        if row_index < 3 {
807            debug!(
808                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
809                object, method, row_index
810            );
811        }
812
813        // Get column value
814        let col_index = self.table.get_column_index(object).ok_or_else(|| {
815            let suggestion = self.find_similar_column(object);
816            match suggestion {
817                Some(similar) => {
818                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
819                }
820                None => anyhow!("Column '{}' not found", object),
821            }
822        })?;
823
824        let cell_value = self.table.get_value(row_index, col_index).cloned();
825        if row_index < 3 {
826            debug!(
827                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
828                row_index, object, cell_value
829            );
830        }
831
832        match method.to_lowercase().as_str() {
833            "contains" => {
834                if args.len() != 1 {
835                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
836                }
837                let search_str = self.extract_string_value(&args[0])?;
838                // Pre-compute lowercase once instead of for every row
839                let search_lower = search_str.to_lowercase();
840
841                // Type coercion: convert numeric values to strings for string methods
842                match cell_value {
843                    Some(DataValue::String(ref s)) => {
844                        let result = s.to_lowercase().contains(&search_lower);
845                        // Only log first few rows to avoid performance impact
846                        if row_index < 3 {
847                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
848                        }
849                        Ok(result)
850                    }
851                    Some(DataValue::InternedString(ref s)) => {
852                        let result = s.to_lowercase().contains(&search_lower);
853                        // Only log first few rows to avoid performance impact
854                        if row_index < 3 {
855                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
856                        }
857                        Ok(result)
858                    }
859                    Some(DataValue::Integer(n)) => {
860                        let str_val = n.to_string();
861                        let result = str_val.contains(&search_str);
862                        if row_index < 3 {
863                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
864                        }
865                        Ok(result)
866                    }
867                    Some(DataValue::Float(f)) => {
868                        let str_val = f.to_string();
869                        let result = str_val.contains(&search_str);
870                        if row_index < 3 {
871                            debug!(
872                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
873                                row_index, search_str, str_val, result
874                            );
875                        }
876                        Ok(result)
877                    }
878                    Some(DataValue::Boolean(b)) => {
879                        let str_val = b.to_string();
880                        let result = str_val.contains(&search_str);
881                        if row_index < 3 {
882                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
883                        }
884                        Ok(result)
885                    }
886                    Some(DataValue::DateTime(dt)) => {
887                        // DateTime columns can use string methods via coercion
888                        let result = dt.contains(&search_str);
889                        if row_index < 3 {
890                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
891                        }
892                        Ok(result)
893                    }
894                    _ => {
895                        if row_index < 3 {
896                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
897                        }
898                        Ok(false)
899                    }
900                }
901            }
902            "startswith" => {
903                if args.len() != 1 {
904                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
905                }
906                let prefix = self.extract_string_value(&args[0])?;
907
908                // Type coercion: convert numeric values to strings for string methods
909                match cell_value {
910                    Some(DataValue::String(ref s)) => {
911                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
912                    }
913                    Some(DataValue::InternedString(ref s)) => {
914                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
915                    }
916                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
917                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
918                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
919                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
920                    _ => Ok(false),
921                }
922            }
923            "endswith" => {
924                if args.len() != 1 {
925                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
926                }
927                let suffix = self.extract_string_value(&args[0])?;
928
929                // Type coercion: convert numeric values to strings for string methods
930                match cell_value {
931                    Some(DataValue::String(ref s)) => {
932                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
933                    }
934                    Some(DataValue::InternedString(ref s)) => {
935                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
936                    }
937                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
938                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
939                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
940                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
941                    _ => Ok(false),
942                }
943            }
944            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
945        }
946    }
947
948    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
949        match expr {
950            SqlExpression::Column(column_ref) => {
951                // Use ExecutionContext for proper alias resolution if available
952                if let Some(exec_ctx) = self.exec_context {
953                    // Use the unified column resolution
954                    let col_idx = exec_ctx.resolve_column_index(self.table, column_ref)?;
955                    // Return the actual column name (not the qualified one)
956                    Ok(self.table.column_names()[col_idx].clone())
957                } else {
958                    // Fallback: Handle qualified column names the old way (for backward compatibility)
959                    if column_ref.name.contains('.') {
960                        if let Some(dot_pos) = column_ref.name.rfind('.') {
961                            Ok(column_ref.name[dot_pos + 1..].to_string())
962                        } else {
963                            Ok(column_ref.name.clone())
964                        }
965                    } else {
966                        Ok(column_ref.name.clone())
967                    }
968                }
969            }
970            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
971        }
972    }
973
974    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
975        match expr {
976            SqlExpression::StringLiteral(s) => Ok(s.clone()),
977            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
978        }
979    }
980
981    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
982        match expr {
983            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
984            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
985            SqlExpression::NumberLiteral(n) => {
986                if let Ok(num) = n.parse::<f64>() {
987                    Ok(ExprValue::Number(num))
988                } else {
989                    Ok(ExprValue::String(n.clone()))
990                }
991            }
992            SqlExpression::DateTimeConstructor {
993                year,
994                month,
995                day,
996                hour,
997                minute,
998                second,
999            } => {
1000                // Create a DateTime from the constructor
1001                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
1002                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1003                let naive_time = NaiveTime::from_hms_opt(
1004                    hour.unwrap_or(0),
1005                    minute.unwrap_or(0),
1006                    second.unwrap_or(0),
1007                )
1008                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1009                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
1010                let datetime = Utc.from_utc_datetime(&naive_datetime);
1011                Ok(ExprValue::DateTime(datetime))
1012            }
1013            SqlExpression::DateTimeToday {
1014                hour,
1015                minute,
1016                second,
1017            } => {
1018                // Get today's date with optional time
1019                let today = Local::now().date_naive();
1020                let time = NaiveTime::from_hms_opt(
1021                    hour.unwrap_or(0),
1022                    minute.unwrap_or(0),
1023                    second.unwrap_or(0),
1024                )
1025                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1026                let naive_datetime = NaiveDateTime::new(today, time);
1027                let datetime = Utc.from_utc_datetime(&naive_datetime);
1028                Ok(ExprValue::DateTime(datetime))
1029            }
1030            _ => Ok(ExprValue::Null),
1031        }
1032    }
1033
1034    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
1035    fn evaluate_case_expression_as_bool(
1036        &mut self,
1037        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1038        else_branch: &Option<Box<SqlExpression>>,
1039        row_index: usize,
1040    ) -> Result<bool> {
1041        debug!(
1042            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1043            row_index
1044        );
1045
1046        // Evaluate each WHEN condition in order
1047        for branch in when_branches {
1048            // Evaluate the condition as a boolean
1049            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1050
1051            if condition_result {
1052                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1053                // Evaluate the result and convert to boolean
1054                return self.evaluate_expression_as_bool(&branch.result, row_index);
1055            }
1056        }
1057
1058        // If no WHEN condition matched, evaluate ELSE clause (or return false)
1059        if let Some(else_expr) = else_branch {
1060            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1061            self.evaluate_expression_as_bool(else_expr, row_index)
1062        } else {
1063            debug!("CASE: No WHEN matched and no ELSE, returning false");
1064            Ok(false)
1065        }
1066    }
1067
1068    /// Helper method to evaluate any expression as a boolean
1069    fn evaluate_expression_as_bool(
1070        &mut self,
1071        expr: &SqlExpression,
1072        row_index: usize,
1073    ) -> Result<bool> {
1074        match expr {
1075            // For expressions that naturally return booleans, use the existing evaluator
1076            SqlExpression::BinaryOp { .. }
1077            | SqlExpression::InList { .. }
1078            | SqlExpression::NotInList { .. }
1079            | SqlExpression::Between { .. }
1080            | SqlExpression::Not { .. }
1081            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1082            // For CASE expressions, recurse
1083            SqlExpression::CaseExpression {
1084                when_branches,
1085                else_branch,
1086            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1087            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1088            _ => {
1089                // Use ArithmeticEvaluator to get the value, then convert to boolean
1090                let mut evaluator =
1091                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
1092                let value = evaluator.evaluate(expr, row_index)?;
1093
1094                match value {
1095                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1096                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1097                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1098                    crate::data::datatable::DataValue::Null => Ok(false),
1099                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1100                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1101                    _ => Ok(true), // Other types are considered truthy
1102                }
1103            }
1104        }
1105    }
1106}
1107
1108enum ExprValue {
1109    String(String),
1110    Number(f64),
1111    Boolean(bool),
1112    DateTime(DateTime<Utc>),
1113    Null,
1114}