sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::datatable::{DataTable, DataValue};
2use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
3use anyhow::{anyhow, Result};
4use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
5use tracing::debug;
6
7/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
8pub struct RecursiveWhereEvaluator<'a> {
9    table: &'a DataTable,
10    case_insensitive: bool,
11    date_notation: String,
12}
13
14impl<'a> RecursiveWhereEvaluator<'a> {
15    #[must_use]
16    pub fn new(table: &'a DataTable) -> Self {
17        Self {
18            table,
19            case_insensitive: false,
20            date_notation: "us".to_string(),
21        }
22    }
23
24    #[must_use]
25    pub fn with_date_notation(table: &'a DataTable, date_notation: String) -> Self {
26        Self {
27            table,
28            case_insensitive: false,
29            date_notation,
30        }
31    }
32
33    /// Central function to parse date/datetime strings respecting `date_notation` preference
34    fn parse_datetime_with_notation(&self, s: &str) -> Result<DateTime<Utc>> {
35        // Try ISO 8601 with timezone first (unambiguous)
36        if let Ok(parsed_dt) = s.parse::<DateTime<Utc>>() {
37            return Ok(parsed_dt);
38        }
39
40        // ISO formats without timezone (unambiguous)
41        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
42            return Ok(Utc.from_utc_datetime(&dt));
43        }
44        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
45            return Ok(Utc.from_utc_datetime(&dt));
46        }
47        if let Ok(dt) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
48            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
49        }
50
51        // Date notation preference for ambiguous formats
52        if self.date_notation == "european" {
53            // European formats (DD/MM/YYYY) - try first
54            // Date only formats
55            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
56                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
57            }
58            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
59                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
60            }
61            // With time formats
62            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
63                return Ok(Utc.from_utc_datetime(&dt));
64            }
65            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
66                return Ok(Utc.from_utc_datetime(&dt));
67            }
68
69            // US formats (MM/DD/YYYY) - fallback
70            // Date only formats
71            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
72                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
73            }
74            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
75                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
76            }
77            // With time formats
78            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
79                return Ok(Utc.from_utc_datetime(&dt));
80            }
81            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
82                return Ok(Utc.from_utc_datetime(&dt));
83            }
84        } else {
85            // US formats (MM/DD/YYYY) - default, try first
86            // Date only formats
87            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
88                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
89            }
90            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
91                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
92            }
93            // With time formats
94            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
95                return Ok(Utc.from_utc_datetime(&dt));
96            }
97            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
98                return Ok(Utc.from_utc_datetime(&dt));
99            }
100
101            // European formats (DD/MM/YYYY) - fallback
102            // Date only formats
103            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
104                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
105            }
106            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
107                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
108            }
109            // With time formats
110            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
111                return Ok(Utc.from_utc_datetime(&dt));
112            }
113            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
114                return Ok(Utc.from_utc_datetime(&dt));
115            }
116        }
117
118        // Excel/Windows format: DD-MMM-YYYY (e.g., 15-Jan-2024)
119        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%b-%Y") {
120            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
121        }
122
123        // Full month names
124        if let Ok(dt) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
125            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
126        }
127        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d %B %Y") {
128            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
129        }
130
131        // Additional formats with time variations
132        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y/%m/%d %H:%M:%S") {
133            return Ok(Utc.from_utc_datetime(&dt));
134        }
135
136        Err(anyhow!(
137            "Could not parse date/datetime: '{}'. Supported formats include: YYYY-MM-DD, MM/DD/YYYY, DD/MM/YYYY (based on config), with optional HH:MM:SS",
138            s
139        ))
140    }
141
142    /// Find a column name similar to the given name using edit distance
143    fn find_similar_column(&self, name: &str) -> Option<String> {
144        let columns = self.table.column_names();
145        let mut best_match: Option<(String, usize)> = None;
146
147        for col in columns {
148            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
149            // Only suggest if distance is small (likely a typo)
150            // Allow up to 3 edits for longer names
151            let max_distance = if name.len() > 10 { 3 } else { 2 };
152            if distance <= max_distance {
153                match &best_match {
154                    None => best_match = Some((col, distance)),
155                    Some((_, best_dist)) if distance < *best_dist => {
156                        best_match = Some((col, distance));
157                    }
158                    _ => {}
159                }
160            }
161        }
162
163        best_match.map(|(name, _)| name)
164    }
165
166    /// Calculate Levenshtein edit distance between two strings
167    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
168        let len1 = s1.len();
169        let len2 = s2.len();
170        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
171
172        for i in 0..=len1 {
173            matrix[i][0] = i;
174        }
175        for j in 0..=len2 {
176            matrix[0][j] = j;
177        }
178
179        for (i, c1) in s1.chars().enumerate() {
180            for (j, c2) in s2.chars().enumerate() {
181                let cost = usize::from(c1 != c2);
182                matrix[i + 1][j + 1] = std::cmp::min(
183                    matrix[i][j + 1] + 1, // deletion
184                    std::cmp::min(
185                        matrix[i + 1][j] + 1, // insertion
186                        matrix[i][j] + cost,  // substitution
187                    ),
188                );
189            }
190        }
191
192        matrix[len1][len2]
193    }
194
195    #[must_use]
196    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
197        Self {
198            table,
199            case_insensitive,
200            date_notation: "us".to_string(),
201        }
202    }
203
204    #[must_use]
205    pub fn with_config(
206        table: &'a DataTable,
207        case_insensitive: bool,
208        date_notation: String,
209    ) -> Self {
210        Self {
211            table,
212            case_insensitive,
213            date_notation,
214        }
215    }
216
217    /// Helper function to compare two datetime values based on the operator
218    fn compare_datetime(op: &str, left: &DateTime<Utc>, right: &DateTime<Utc>) -> bool {
219        match op {
220            "=" => left == right,
221            "!=" | "<>" => left != right,
222            ">" => left > right,
223            ">=" => left >= right,
224            "<" => left < right,
225            "<=" => left <= right,
226            _ => false,
227        }
228    }
229
230    /// Compare two DataValue instances using the given operator
231    fn compare_data_values(&self, left: &DataValue, op: &str, right: &DataValue) -> Result<bool> {
232        use crate::data::datavalue_compare::compare_datavalues;
233        use std::cmp::Ordering;
234
235        let ordering = compare_datavalues(left, right);
236        Ok(match op {
237            "=" => ordering == Ordering::Equal,
238            "!=" | "<>" => ordering != Ordering::Equal,
239            ">" => ordering == Ordering::Greater,
240            ">=" => ordering != Ordering::Less,
241            "<" => ordering == Ordering::Less,
242            "<=" => ordering != Ordering::Greater,
243            _ => return Err(anyhow!("Unsupported operator: {}", op)),
244        })
245    }
246
247    /// Evaluate the `Length()` method on a column value
248    fn evaluate_length(
249        &self,
250        object: &str,
251        row_index: usize,
252    ) -> Result<(Option<DataValue>, String)> {
253        let col_index = self.table.get_column_index(object).ok_or_else(|| {
254            let suggestion = self.find_similar_column(object);
255            match suggestion {
256                Some(similar) => {
257                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
258                }
259                None => anyhow!("Column '{}' not found", object),
260            }
261        })?;
262
263        let value = self.table.get_value(row_index, col_index);
264        let length_value = match value {
265            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
266            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
267            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
268            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
269            _ => Some(DataValue::Integer(0)),
270        };
271        Ok((length_value, format!("{object}.Length()")))
272    }
273
274    /// Evaluate the `IndexOf()` method on a column value
275    fn evaluate_indexof(
276        &self,
277        object: &str,
278        search_str: &str,
279        row_index: usize,
280    ) -> Result<(Option<DataValue>, String)> {
281        let col_index = self.table.get_column_index(object).ok_or_else(|| {
282            let suggestion = self.find_similar_column(object);
283            match suggestion {
284                Some(similar) => {
285                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
286                }
287                None => anyhow!("Column '{}' not found", object),
288            }
289        })?;
290
291        let value = self.table.get_value(row_index, col_index);
292        let index_value = match value {
293            Some(DataValue::String(s)) => {
294                // Case-insensitive search by default, following Contains behavior
295                let pos = s
296                    .to_lowercase()
297                    .find(&search_str.to_lowercase())
298                    .map_or(-1, |idx| idx as i64);
299                Some(DataValue::Integer(pos))
300            }
301            Some(DataValue::InternedString(s)) => {
302                let pos = s
303                    .to_lowercase()
304                    .find(&search_str.to_lowercase())
305                    .map_or(-1, |idx| idx as i64);
306                Some(DataValue::Integer(pos))
307            }
308            Some(DataValue::Integer(n)) => {
309                let str_val = n.to_string();
310                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
311                Some(DataValue::Integer(pos))
312            }
313            Some(DataValue::Float(f)) => {
314                let str_val = f.to_string();
315                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
316                Some(DataValue::Integer(pos))
317            }
318            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
319        };
320
321        if row_index < 3 {
322            debug!(
323                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
324                row_index, search_str, index_value
325            );
326        }
327        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
328    }
329
330    /// Apply the appropriate trim operation based on `trim_type`
331    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
332        match trim_type {
333            "trim" => s.trim(),
334            "trimstart" => s.trim_start(),
335            "trimend" => s.trim_end(),
336            _ => s,
337        }
338    }
339
340    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
341    fn evaluate_trim(
342        &self,
343        object: &str,
344        row_index: usize,
345        trim_type: &str,
346    ) -> Result<(Option<DataValue>, String)> {
347        let col_index = self.table.get_column_index(object).ok_or_else(|| {
348            let suggestion = self.find_similar_column(object);
349            match suggestion {
350                Some(similar) => {
351                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
352                }
353                None => anyhow!("Column '{}' not found", object),
354            }
355        })?;
356
357        let value = self.table.get_value(row_index, col_index);
358        let trimmed_value = match value {
359            Some(DataValue::String(s)) => Some(DataValue::String(
360                Self::apply_trim(s, trim_type).to_string(),
361            )),
362            Some(DataValue::InternedString(s)) => Some(DataValue::String(
363                Self::apply_trim(s, trim_type).to_string(),
364            )),
365            Some(DataValue::Integer(n)) => {
366                let str_val = n.to_string();
367                Some(DataValue::String(
368                    Self::apply_trim(&str_val, trim_type).to_string(),
369                ))
370            }
371            Some(DataValue::Float(f)) => {
372                let str_val = f.to_string();
373                Some(DataValue::String(
374                    Self::apply_trim(&str_val, trim_type).to_string(),
375                ))
376            }
377            _ => Some(DataValue::String(String::new())),
378        };
379
380        let method_name = match trim_type {
381            "trim" => "Trim",
382            "trimstart" => "TrimStart",
383            "trimend" => "TrimEnd",
384            _ => "Trim",
385        };
386        Ok((trimmed_value, format!("{object}.{method_name}()")))
387    }
388
389    /// Evaluate a WHERE clause for a specific row
390    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
391        // Only log for first few rows to avoid performance impact
392        if row_index < 3 {
393            debug!(
394                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
395                row_index,
396                where_clause.conditions.len(),
397                self.case_insensitive
398            );
399        }
400
401        if where_clause.conditions.is_empty() {
402            if row_index < 3 {
403                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
404            }
405            return Ok(true);
406        }
407
408        // Evaluate first condition
409        if row_index < 3 {
410            debug!(
411                "RecursiveWhereEvaluator: evaluate() - evaluating first condition for row {}",
412                row_index
413            );
414        }
415        let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
416
417        // Apply connectors (AND/OR) with subsequent conditions
418        for i in 1..where_clause.conditions.len() {
419            let next_result = self.evaluate_condition(&where_clause.conditions[i], row_index)?;
420
421            // Use the connector from the previous condition
422            if let Some(connector) = &where_clause.conditions[i - 1].connector {
423                result = match connector {
424                    LogicalOp::And => result && next_result,
425                    LogicalOp::Or => result || next_result,
426                };
427            }
428        }
429
430        Ok(result)
431    }
432
433    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
434        // Only log first few rows to avoid performance impact
435        if row_index < 3 {
436            debug!(
437                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
438                row_index
439            );
440        }
441        let result = self.evaluate_expression(&condition.expr, row_index);
442        if row_index < 3 {
443            debug!(
444                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
445                row_index, result
446            );
447        }
448        result
449    }
450
451    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
452        // Only log first few rows to avoid performance impact
453        if row_index < 3 {
454            debug!(
455                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
456                row_index, expr
457            );
458        }
459
460        let result = match expr {
461            SqlExpression::BinaryOp { left, op, right } => {
462                self.evaluate_binary_op(left, op, right, row_index)
463            }
464            SqlExpression::InList { expr, values } => {
465                self.evaluate_in_list(expr, values, row_index, false)
466            }
467            SqlExpression::NotInList { expr, values } => {
468                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
469                Ok(!in_result)
470            }
471            SqlExpression::Between { expr, lower, upper } => {
472                self.evaluate_between(expr, lower, upper, row_index)
473            }
474            SqlExpression::Not { expr } => {
475                let inner_result = self.evaluate_expression(expr, row_index)?;
476                Ok(!inner_result)
477            }
478            SqlExpression::MethodCall {
479                object,
480                method,
481                args,
482            } => {
483                if row_index < 3 {
484                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
485                }
486                self.evaluate_method_call(object, method, args, row_index)
487            }
488            SqlExpression::CaseExpression {
489                when_branches,
490                else_branch,
491            } => {
492                if row_index < 3 {
493                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
494                }
495                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
496            }
497            _ => {
498                if row_index < 3 {
499                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
500                }
501                Ok(false) // Default to false for unsupported expressions
502            }
503        };
504
505        if row_index < 3 {
506            debug!(
507                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
508                row_index, result
509            );
510        }
511        result
512    }
513
514    fn evaluate_binary_op(
515        &self,
516        left: &SqlExpression,
517        op: &str,
518        right: &SqlExpression,
519        row_index: usize,
520    ) -> Result<bool> {
521        use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
522
523        // Only log first few rows to avoid performance impact
524        if row_index < 3 {
525            debug!(
526                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
527                row_index, op
528            );
529        }
530
531        // Handle logical operators (AND, OR) specially
532        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
533            let left_result = self.evaluate_expression(left, row_index)?;
534            let right_result = self.evaluate_expression(right, row_index)?;
535
536            return Ok(match op.to_uppercase().as_str() {
537                "OR" => left_result || right_result,
538                "AND" => left_result && right_result,
539                _ => unreachable!(),
540            });
541        }
542
543        // For nested binary operations (like value % 3), evaluate the left side as an expression
544        if matches!(left, SqlExpression::BinaryOp { .. }) {
545            // Use ArithmeticEvaluator to evaluate the complex expression
546            let mut evaluator =
547                ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
548            let left_result = evaluator.evaluate(left, row_index)?;
549
550            // Convert right ExprValue to DataValue
551            let right_expr_value = self.extract_value(right)?;
552            let right_data_value = match right_expr_value {
553                ExprValue::String(s) => DataValue::String(s),
554                ExprValue::Number(n) => {
555                    // Check if it's an integer or float
556                    if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
557                        DataValue::Integer(n as i64)
558                    } else {
559                        DataValue::Float(n)
560                    }
561                }
562                ExprValue::Boolean(b) => DataValue::Boolean(b),
563                ExprValue::DateTime(_) => {
564                    return Err(anyhow!("DateTime comparison not supported in this context"))
565                }
566                ExprValue::Null => DataValue::Null,
567            };
568
569            return self.compare_data_values(&left_result, op, &right_data_value);
570        }
571
572        // Handle left side - could be a column or a method call
573        let (cell_value, column_name) = match left {
574            SqlExpression::MethodCall {
575                object,
576                method,
577                args,
578            } => {
579                // Handle method calls that return values (like Length(), IndexOf())
580                match method.to_lowercase().as_str() {
581                    "length" => {
582                        if !args.is_empty() {
583                            return Err(anyhow::anyhow!("Length() takes no arguments"));
584                        }
585                        self.evaluate_length(object, row_index)?
586                    }
587                    "indexof" => {
588                        if args.len() != 1 {
589                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
590                        }
591                        let search_str = self.extract_string_value(&args[0])?;
592                        self.evaluate_indexof(object, &search_str, row_index)?
593                    }
594                    "trim" => {
595                        if !args.is_empty() {
596                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
597                        }
598                        self.evaluate_trim(object, row_index, "trim")?
599                    }
600                    "trimstart" => {
601                        if !args.is_empty() {
602                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
603                        }
604                        self.evaluate_trim(object, row_index, "trimstart")?
605                    }
606                    "trimend" => {
607                        if !args.is_empty() {
608                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
609                        }
610                        self.evaluate_trim(object, row_index, "trimend")?
611                    }
612                    _ => {
613                        return Err(anyhow::anyhow!(
614                            "Method '{}' cannot be used in comparisons",
615                            method
616                        ));
617                    }
618                }
619            }
620            SqlExpression::BinaryOp {
621                left: _expr_left,
622                op: arith_op,
623                right: _expr_right,
624            } if matches!(arith_op.as_str(), "+" | "-" | "*" | "/") => {
625                // Handle arithmetic expressions using ArithmeticEvaluator
626                let mut evaluator =
627                    ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
628                let computed_value = evaluator.evaluate(left, row_index)?;
629                if row_index < 3 {
630                    debug!(
631                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed arithmetic expression = {:?}",
632                        computed_value
633                    );
634                }
635                (Some(computed_value), "computed_expression".to_string())
636            }
637            SqlExpression::FunctionCall { name, .. } => {
638                // Handle function calls using ArithmeticEvaluator
639                let mut evaluator =
640                    ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
641                let computed_value = evaluator.evaluate(left, row_index)?;
642                if row_index < 3 {
643                    debug!(
644                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed function {} = {:?}",
645                        name, computed_value
646                    );
647                }
648                (Some(computed_value), format!("{name}()"))
649            }
650            _ => {
651                // Regular column reference
652                let column_name = self.extract_column_name(left)?;
653                if row_index < 3 {
654                    debug!(
655                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
656                        column_name
657                    );
658                }
659
660                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
661                    let suggestion = self.find_similar_column(&column_name);
662                    match suggestion {
663                        Some(similar) => anyhow!(
664                            "Column '{}' not found. Did you mean '{}'?",
665                            column_name,
666                            similar
667                        ),
668                        None => anyhow!("Column '{}' not found", column_name),
669                    }
670                })?;
671
672                let cell_value = self.table.get_value(row_index, col_index).cloned();
673                (cell_value, column_name)
674            }
675        };
676
677        if row_index < 3 {
678            debug!(
679                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
680                row_index, column_name, cell_value
681            );
682        }
683
684        // Get comparison value from right side
685        let compare_value = self.extract_value(right)?;
686
687        // Perform comparison
688        match (cell_value, op.to_uppercase().as_str(), &compare_value) {
689            (Some(DataValue::String(ref a)), "=", ExprValue::String(b)) => {
690                // Try to parse both as dates if they look like dates
691                if let (Ok(date_a), Ok(date_b)) = (
692                    self.parse_datetime_with_notation(a),
693                    self.parse_datetime_with_notation(b),
694                ) {
695                    if row_index < 3 {
696                        debug!(
697                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' = '{}' = {}",
698                            date_a.format("%Y-%m-%d %H:%M:%S"),
699                            date_b.format("%Y-%m-%d %H:%M:%S"),
700                            date_a == date_b
701                        );
702                    }
703                    Ok(date_a == date_b)
704                } else {
705                    if row_index < 3 {
706                        debug!(
707                            "RecursiveWhereEvaluator: String comparison '{}' = '{}' (case_insensitive={})",
708                            a, b, self.case_insensitive
709                        );
710                    }
711                    if self.case_insensitive {
712                        Ok(a.to_lowercase() == b.to_lowercase())
713                    } else {
714                        Ok(a == b)
715                    }
716                }
717            }
718            (Some(DataValue::InternedString(ref a)), "=", ExprValue::String(b)) => {
719                // Try to parse both as dates if they look like dates
720                if let (Ok(date_a), Ok(date_b)) = (
721                    self.parse_datetime_with_notation(a.as_ref()),
722                    self.parse_datetime_with_notation(b),
723                ) {
724                    if row_index < 3 {
725                        debug!(
726                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' = '{}' = {}",
727                            date_a.format("%Y-%m-%d %H:%M:%S"),
728                            date_b.format("%Y-%m-%d %H:%M:%S"),
729                            date_a == date_b
730                        );
731                    }
732                    Ok(date_a == date_b)
733                } else {
734                    if row_index < 3 {
735                        debug!(
736                            "RecursiveWhereEvaluator: InternedString comparison '{}' = '{}' (case_insensitive={})",
737                            a, b, self.case_insensitive
738                        );
739                    }
740                    if self.case_insensitive {
741                        Ok(a.to_lowercase() == b.to_lowercase())
742                    } else {
743                        Ok(a.as_ref() == b)
744                    }
745                }
746            }
747            (Some(DataValue::String(ref a)), "!=" | "<>", ExprValue::String(b)) => {
748                // Try to parse both as dates if they look like dates
749                if let (Ok(date_a), Ok(date_b)) = (
750                    self.parse_datetime_with_notation(a),
751                    self.parse_datetime_with_notation(b),
752                ) {
753                    if row_index < 3 {
754                        debug!(
755                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' != '{}' = {}",
756                            date_a.format("%Y-%m-%d %H:%M:%S"),
757                            date_b.format("%Y-%m-%d %H:%M:%S"),
758                            date_a != date_b
759                        );
760                    }
761                    Ok(date_a != date_b)
762                } else {
763                    if row_index < 3 {
764                        debug!(
765                            "RecursiveWhereEvaluator: String comparison '{}' != '{}' (case_insensitive={})",
766                            a, b, self.case_insensitive
767                        );
768                    }
769                    if self.case_insensitive {
770                        Ok(a.to_lowercase() != b.to_lowercase())
771                    } else {
772                        Ok(a != b)
773                    }
774                }
775            }
776            (Some(DataValue::InternedString(ref a)), "!=" | "<>", ExprValue::String(b)) => {
777                // Try to parse both as dates if they look like dates
778                if let (Ok(date_a), Ok(date_b)) = (
779                    self.parse_datetime_with_notation(a.as_ref()),
780                    self.parse_datetime_with_notation(b),
781                ) {
782                    if row_index < 3 {
783                        debug!(
784                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' != '{}' = {}",
785                            date_a.format("%Y-%m-%d %H:%M:%S"),
786                            date_b.format("%Y-%m-%d %H:%M:%S"),
787                            date_a != date_b
788                        );
789                    }
790                    Ok(date_a != date_b)
791                } else {
792                    if row_index < 3 {
793                        debug!(
794                            "RecursiveWhereEvaluator: InternedString comparison '{}' != '{}' (case_insensitive={})",
795                            a, b, self.case_insensitive
796                        );
797                    }
798                    if self.case_insensitive {
799                        Ok(a.to_lowercase() != b.to_lowercase())
800                    } else {
801                        Ok(a.as_ref() != b)
802                    }
803                }
804            }
805            (Some(DataValue::String(ref a)), ">", ExprValue::String(b)) => {
806                // Try to parse both as dates if they look like dates
807                if let (Ok(date_a), Ok(date_b)) = (
808                    self.parse_datetime_with_notation(a),
809                    self.parse_datetime_with_notation(b),
810                ) {
811                    if row_index < 3 {
812                        debug!(
813                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' > '{}' = {}",
814                            date_a.format("%Y-%m-%d %H:%M:%S"),
815                            date_b.format("%Y-%m-%d %H:%M:%S"),
816                            date_a > date_b
817                        );
818                    }
819                    Ok(date_a > date_b)
820                } else if self.case_insensitive {
821                    Ok(a.to_lowercase() > b.to_lowercase())
822                } else {
823                    Ok(a > b)
824                }
825            }
826            (Some(DataValue::InternedString(ref a)), ">", ExprValue::String(b)) => {
827                // Try to parse both as dates if they look like dates
828                if let (Ok(date_a), Ok(date_b)) = (
829                    self.parse_datetime_with_notation(a.as_ref()),
830                    self.parse_datetime_with_notation(b),
831                ) {
832                    if row_index < 3 {
833                        debug!(
834                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' > '{}' = {}",
835                            date_a.format("%Y-%m-%d %H:%M:%S"),
836                            date_b.format("%Y-%m-%d %H:%M:%S"),
837                            date_a > date_b
838                        );
839                    }
840                    Ok(date_a > date_b)
841                } else if self.case_insensitive {
842                    Ok(a.to_lowercase() > b.to_lowercase())
843                } else {
844                    Ok(a.as_ref() > b)
845                }
846            }
847            (Some(DataValue::String(ref a)), ">=", ExprValue::String(b)) => {
848                // Try to parse both as dates if they look like dates
849                if let (Ok(date_a), Ok(date_b)) = (
850                    self.parse_datetime_with_notation(a),
851                    self.parse_datetime_with_notation(b),
852                ) {
853                    if row_index < 3 {
854                        debug!(
855                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' >= '{}' = {}",
856                            date_a.format("%Y-%m-%d %H:%M:%S"),
857                            date_b.format("%Y-%m-%d %H:%M:%S"),
858                            date_a >= date_b
859                        );
860                    }
861                    Ok(date_a >= date_b)
862                } else if self.case_insensitive {
863                    Ok(a.to_lowercase() >= b.to_lowercase())
864                } else {
865                    Ok(a >= b)
866                }
867            }
868            (Some(DataValue::InternedString(ref a)), ">=", ExprValue::String(b)) => {
869                // Try to parse both as dates if they look like dates
870                if let (Ok(date_a), Ok(date_b)) = (
871                    self.parse_datetime_with_notation(a.as_ref()),
872                    self.parse_datetime_with_notation(b),
873                ) {
874                    if row_index < 3 {
875                        debug!(
876                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' >= '{}' = {}",
877                            date_a.format("%Y-%m-%d %H:%M:%S"),
878                            date_b.format("%Y-%m-%d %H:%M:%S"),
879                            date_a >= date_b
880                        );
881                    }
882                    Ok(date_a >= date_b)
883                } else if self.case_insensitive {
884                    Ok(a.to_lowercase() >= b.to_lowercase())
885                } else {
886                    Ok(a.as_ref() >= b)
887                }
888            }
889            (Some(DataValue::String(ref a)), "<", ExprValue::String(b)) => {
890                // Try to parse both as dates if they look like dates
891                if let (Ok(date_a), Ok(date_b)) = (
892                    self.parse_datetime_with_notation(a),
893                    self.parse_datetime_with_notation(b),
894                ) {
895                    if row_index < 3 {
896                        debug!(
897                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' < '{}' = {}",
898                            date_a.format("%Y-%m-%d %H:%M:%S"),
899                            date_b.format("%Y-%m-%d %H:%M:%S"),
900                            date_a < date_b
901                        );
902                    }
903                    Ok(date_a < date_b)
904                } else if self.case_insensitive {
905                    Ok(a.to_lowercase() < b.to_lowercase())
906                } else {
907                    Ok(a < b)
908                }
909            }
910            (Some(DataValue::InternedString(ref a)), "<", ExprValue::String(b)) => {
911                // Try to parse both as dates if they look like dates
912                if let (Ok(date_a), Ok(date_b)) = (
913                    self.parse_datetime_with_notation(a.as_ref()),
914                    self.parse_datetime_with_notation(b),
915                ) {
916                    if row_index < 3 {
917                        debug!(
918                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' < '{}' = {}",
919                            date_a.format("%Y-%m-%d %H:%M:%S"),
920                            date_b.format("%Y-%m-%d %H:%M:%S"),
921                            date_a < date_b
922                        );
923                    }
924                    Ok(date_a < date_b)
925                } else if self.case_insensitive {
926                    Ok(a.to_lowercase() < b.to_lowercase())
927                } else {
928                    Ok(a.as_ref() < b)
929                }
930            }
931            (Some(DataValue::String(ref a)), "<=", ExprValue::String(b)) => {
932                // Try to parse both as dates if they look like dates
933                if let (Ok(date_a), Ok(date_b)) = (
934                    self.parse_datetime_with_notation(a),
935                    self.parse_datetime_with_notation(b),
936                ) {
937                    if row_index < 3 {
938                        debug!(
939                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' <= '{}' = {}",
940                            date_a.format("%Y-%m-%d %H:%M:%S"),
941                            date_b.format("%Y-%m-%d %H:%M:%S"),
942                            date_a <= date_b
943                        );
944                    }
945                    Ok(date_a <= date_b)
946                } else if self.case_insensitive {
947                    Ok(a.to_lowercase() <= b.to_lowercase())
948                } else {
949                    Ok(a <= b)
950                }
951            }
952            (Some(DataValue::InternedString(ref a)), "<=", ExprValue::String(b)) => {
953                // Try to parse both as dates if they look like dates
954                if let (Ok(date_a), Ok(date_b)) = (
955                    self.parse_datetime_with_notation(a.as_ref()),
956                    self.parse_datetime_with_notation(b),
957                ) {
958                    if row_index < 3 {
959                        debug!(
960                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' <= '{}' = {}",
961                            date_a.format("%Y-%m-%d %H:%M:%S"),
962                            date_b.format("%Y-%m-%d %H:%M:%S"),
963                            date_a <= date_b
964                        );
965                    }
966                    Ok(date_a <= date_b)
967                } else if self.case_insensitive {
968                    Ok(a.to_lowercase() <= b.to_lowercase())
969                } else {
970                    Ok(a.as_ref() <= b)
971                }
972            }
973
974            (Some(DataValue::Integer(a)), "=", ExprValue::Number(b)) => Ok(a as f64 == *b),
975            (Some(DataValue::Integer(a)), "!=" | "<>", ExprValue::Number(b)) => Ok(a as f64 != *b),
976            (Some(DataValue::Integer(a)), ">", ExprValue::Number(b)) => Ok(a as f64 > *b),
977            (Some(DataValue::Integer(a)), ">=", ExprValue::Number(b)) => Ok(a as f64 >= *b),
978            (Some(DataValue::Integer(a)), "<", ExprValue::Number(b)) => Ok((a as f64) < *b),
979            (Some(DataValue::Integer(a)), "<=", ExprValue::Number(b)) => Ok(a as f64 <= *b),
980
981            (Some(DataValue::Float(a)), "=", ExprValue::Number(b)) => {
982                Ok((a - b).abs() < f64::EPSILON)
983            }
984            (Some(DataValue::Float(a)), "!=" | "<>", ExprValue::Number(b)) => {
985                Ok((a - b).abs() >= f64::EPSILON)
986            }
987            (Some(DataValue::Float(a)), ">", ExprValue::Number(b)) => Ok(a > *b),
988            (Some(DataValue::Float(a)), ">=", ExprValue::Number(b)) => Ok(a >= *b),
989            (Some(DataValue::Float(a)), "<", ExprValue::Number(b)) => Ok(a < *b),
990            (Some(DataValue::Float(a)), "<=", ExprValue::Number(b)) => Ok(a <= *b),
991
992            // Boolean comparisons
993            (Some(DataValue::Boolean(a)), "=", ExprValue::Boolean(b)) => Ok(a == *b),
994            (Some(DataValue::Boolean(a)), "!=" | "<>", ExprValue::Boolean(b)) => Ok(a != *b),
995
996            // Boolean to string comparisons (for backward compatibility)
997            (Some(DataValue::Boolean(a)), "=", ExprValue::String(b)) => {
998                let bool_str = a.to_string();
999                if self.case_insensitive {
1000                    Ok(bool_str.to_lowercase() == b.to_lowercase())
1001                } else {
1002                    Ok(bool_str == *b)
1003                }
1004            }
1005            (Some(DataValue::Boolean(a)), "!=" | "<>", ExprValue::String(b)) => {
1006                let bool_str = a.to_string();
1007                if self.case_insensitive {
1008                    Ok(bool_str.to_lowercase() != b.to_lowercase())
1009                } else {
1010                    Ok(bool_str != *b)
1011                }
1012            }
1013
1014            // LIKE operator
1015            (Some(DataValue::String(ref text)), "LIKE", ExprValue::String(pattern)) => {
1016                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
1017                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
1018                    .case_insensitive(true)
1019                    .build()
1020                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
1021                Ok(regex.is_match(text))
1022            }
1023            (Some(DataValue::InternedString(ref text)), "LIKE", ExprValue::String(pattern)) => {
1024                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
1025                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
1026                    .case_insensitive(true)
1027                    .build()
1028                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
1029                Ok(regex.is_match(text.as_ref()))
1030            }
1031
1032            // IS NULL / IS NOT NULL (old style)
1033            (None | Some(DataValue::Null), "IS", ExprValue::Null) => Ok(true),
1034            (Some(_), "IS", ExprValue::Null) => Ok(false),
1035            (None | Some(DataValue::Null), "IS NOT", ExprValue::Null) => Ok(false),
1036            (Some(_), "IS NOT", ExprValue::Null) => Ok(true),
1037
1038            // IS NULL / IS NOT NULL (new style with combined operator)
1039            (None | Some(DataValue::Null), "IS NULL", _) => Ok(true),
1040            (Some(_), "IS NULL", _) => Ok(false),
1041            (None | Some(DataValue::Null), "IS NOT NULL", _) => Ok(false),
1042            (Some(_), "IS NOT NULL", _) => Ok(true),
1043
1044            // DateTime comparisons
1045            (Some(DataValue::String(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1046                if row_index < 3 {
1047                    debug!(
1048                        "RecursiveWhereEvaluator: DateTime comparison '{}' {} '{}' - attempting parse",
1049                        date_str,
1050                        op_str,
1051                        dt.format("%Y-%m-%d %H:%M:%S")
1052                    );
1053                }
1054
1055                // Use the central parsing function that respects date_notation
1056                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str) {
1057                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1058                    if row_index < 3 {
1059                        debug!(
1060                            "RecursiveWhereEvaluator: DateTime parsed successfully: '{}' {} '{}' = {}",
1061                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
1062                            op_str,
1063                            dt.format("%Y-%m-%d %H:%M:%S"),
1064                            result
1065                        );
1066                    }
1067                    Ok(result)
1068                } else {
1069                    if row_index < 3 {
1070                        debug!(
1071                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
1072                            date_str
1073                        );
1074                    }
1075                    Ok(false)
1076                }
1077            }
1078            (Some(DataValue::InternedString(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1079                if row_index < 3 {
1080                    debug!(
1081                        "RecursiveWhereEvaluator: DateTime comparison (interned) '{}' {} '{}' - attempting parse",
1082                        date_str,
1083                        op_str,
1084                        dt.format("%Y-%m-%d %H:%M:%S")
1085                    );
1086                }
1087
1088                // Use the central parsing function that respects date_notation
1089                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str.as_ref()) {
1090                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1091                    if row_index < 3 {
1092                        debug!(
1093                            "RecursiveWhereEvaluator: DateTime parsed successfully: '{}' {} '{}' = {}",
1094                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
1095                            op_str,
1096                            dt.format("%Y-%m-%d %H:%M:%S"),
1097                            result
1098                        );
1099                    }
1100                    Ok(result)
1101                } else {
1102                    if row_index < 3 {
1103                        debug!(
1104                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
1105                            date_str
1106                        );
1107                    }
1108                    Ok(false)
1109                }
1110            }
1111
1112            // DateTime vs DateTime comparisons (when column is already parsed as DateTime)
1113            (Some(DataValue::DateTime(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1114                if row_index < 3 {
1115                    debug!(
1116                        "RecursiveWhereEvaluator: DateTime vs DateTime comparison '{}' {} '{}' - direct comparison",
1117                        date_str, op_str, dt.format("%Y-%m-%d %H:%M:%S")
1118                    );
1119                }
1120
1121                // Use the central parsing function that respects date_notation
1122                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str) {
1123                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1124                    if row_index < 3 {
1125                        debug!(
1126                            "RecursiveWhereEvaluator: DateTime vs DateTime parsed successfully: '{}' {} '{}' = {}",
1127                            parsed_dt.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
1128                        );
1129                    }
1130                    Ok(result)
1131                } else {
1132                    if row_index < 3 {
1133                        debug!(
1134                            "RecursiveWhereEvaluator: DateTime vs DateTime parse FAILED for '{}' - no matching format",
1135                            date_str
1136                        );
1137                    }
1138                    Ok(false)
1139                }
1140            }
1141
1142            _ => Ok(false),
1143        }
1144    }
1145
1146    fn evaluate_in_list(
1147        &self,
1148        expr: &SqlExpression,
1149        values: &[SqlExpression],
1150        row_index: usize,
1151        _ignore_case: bool,
1152    ) -> Result<bool> {
1153        let column_name = self.extract_column_name(expr)?;
1154        let col_index = self
1155            .table
1156            .get_column_index(&column_name)
1157            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
1158
1159        let cell_value = self.table.get_value(row_index, col_index).cloned();
1160
1161        for value_expr in values {
1162            let compare_value = self.extract_value(value_expr)?;
1163            let matches = match (cell_value.as_ref(), &compare_value) {
1164                (Some(DataValue::String(a)), ExprValue::String(b)) => {
1165                    if self.case_insensitive {
1166                        if row_index < 3 {
1167                            debug!("RecursiveWhereEvaluator: IN list string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
1168                        }
1169                        a.to_lowercase() == b.to_lowercase()
1170                    } else {
1171                        a == b
1172                    }
1173                }
1174                (Some(DataValue::InternedString(a)), ExprValue::String(b)) => {
1175                    if self.case_insensitive {
1176                        if row_index < 3 {
1177                            debug!("RecursiveWhereEvaluator: IN list interned string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
1178                        }
1179                        a.to_lowercase() == b.to_lowercase()
1180                    } else {
1181                        a.as_ref() == b
1182                    }
1183                }
1184                (Some(DataValue::Integer(a)), ExprValue::Number(b)) => *a as f64 == *b,
1185                (Some(DataValue::Float(a)), ExprValue::Number(b)) => (*a - b).abs() < f64::EPSILON,
1186                _ => false,
1187            };
1188
1189            if matches {
1190                return Ok(true);
1191            }
1192        }
1193
1194        Ok(false)
1195    }
1196
1197    fn evaluate_between(
1198        &self,
1199        expr: &SqlExpression,
1200        lower: &SqlExpression,
1201        upper: &SqlExpression,
1202        row_index: usize,
1203    ) -> Result<bool> {
1204        let column_name = self.extract_column_name(expr)?;
1205        let col_index = self
1206            .table
1207            .get_column_index(&column_name)
1208            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
1209
1210        let cell_value = self.table.get_value(row_index, col_index).cloned();
1211        let lower_value = self.extract_value(lower)?;
1212        let upper_value = self.extract_value(upper)?;
1213
1214        match (cell_value, &lower_value, &upper_value) {
1215            (Some(DataValue::Integer(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
1216                Ok(n as f64 >= *l && n as f64 <= *u)
1217            }
1218            (Some(DataValue::Float(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
1219                Ok(n >= *l && n <= *u)
1220            }
1221            (Some(DataValue::String(ref s)), ExprValue::String(l), ExprValue::String(u)) => {
1222                Ok(s >= l && s <= u)
1223            }
1224            (
1225                Some(DataValue::InternedString(ref s)),
1226                ExprValue::String(l),
1227                ExprValue::String(u),
1228            ) => Ok(s.as_ref() >= l && s.as_ref() <= u),
1229            _ => Ok(false),
1230        }
1231    }
1232
1233    fn evaluate_method_call(
1234        &self,
1235        object: &str,
1236        method: &str,
1237        args: &[SqlExpression],
1238        row_index: usize,
1239    ) -> Result<bool> {
1240        if row_index < 3 {
1241            debug!(
1242                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
1243                object, method, row_index
1244            );
1245        }
1246
1247        // Get column value
1248        let col_index = self.table.get_column_index(object).ok_or_else(|| {
1249            let suggestion = self.find_similar_column(object);
1250            match suggestion {
1251                Some(similar) => {
1252                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
1253                }
1254                None => anyhow!("Column '{}' not found", object),
1255            }
1256        })?;
1257
1258        let cell_value = self.table.get_value(row_index, col_index).cloned();
1259        if row_index < 3 {
1260            debug!(
1261                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
1262                row_index, object, cell_value
1263            );
1264        }
1265
1266        match method.to_lowercase().as_str() {
1267            "contains" => {
1268                if args.len() != 1 {
1269                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
1270                }
1271                let search_str = self.extract_string_value(&args[0])?;
1272                // Pre-compute lowercase once instead of for every row
1273                let search_lower = search_str.to_lowercase();
1274
1275                // Type coercion: convert numeric values to strings for string methods
1276                match cell_value {
1277                    Some(DataValue::String(ref s)) => {
1278                        let result = s.to_lowercase().contains(&search_lower);
1279                        // Only log first few rows to avoid performance impact
1280                        if row_index < 3 {
1281                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1282                        }
1283                        Ok(result)
1284                    }
1285                    Some(DataValue::InternedString(ref s)) => {
1286                        let result = s.to_lowercase().contains(&search_lower);
1287                        // Only log first few rows to avoid performance impact
1288                        if row_index < 3 {
1289                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1290                        }
1291                        Ok(result)
1292                    }
1293                    Some(DataValue::Integer(n)) => {
1294                        let str_val = n.to_string();
1295                        let result = str_val.contains(&search_str);
1296                        if row_index < 3 {
1297                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
1298                        }
1299                        Ok(result)
1300                    }
1301                    Some(DataValue::Float(f)) => {
1302                        let str_val = f.to_string();
1303                        let result = str_val.contains(&search_str);
1304                        if row_index < 3 {
1305                            debug!(
1306                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
1307                                row_index, search_str, str_val, result
1308                            );
1309                        }
1310                        Ok(result)
1311                    }
1312                    Some(DataValue::Boolean(b)) => {
1313                        let str_val = b.to_string();
1314                        let result = str_val.contains(&search_str);
1315                        if row_index < 3 {
1316                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
1317                        }
1318                        Ok(result)
1319                    }
1320                    Some(DataValue::DateTime(dt)) => {
1321                        // DateTime columns can use string methods via coercion
1322                        let result = dt.contains(&search_str);
1323                        if row_index < 3 {
1324                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
1325                        }
1326                        Ok(result)
1327                    }
1328                    _ => {
1329                        if row_index < 3 {
1330                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
1331                        }
1332                        Ok(false)
1333                    }
1334                }
1335            }
1336            "startswith" => {
1337                if args.len() != 1 {
1338                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
1339                }
1340                let prefix = self.extract_string_value(&args[0])?;
1341
1342                // Type coercion: convert numeric values to strings for string methods
1343                match cell_value {
1344                    Some(DataValue::String(ref s)) => {
1345                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1346                    }
1347                    Some(DataValue::InternedString(ref s)) => {
1348                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1349                    }
1350                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
1351                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
1352                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
1353                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
1354                    _ => Ok(false),
1355                }
1356            }
1357            "endswith" => {
1358                if args.len() != 1 {
1359                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
1360                }
1361                let suffix = self.extract_string_value(&args[0])?;
1362
1363                // Type coercion: convert numeric values to strings for string methods
1364                match cell_value {
1365                    Some(DataValue::String(ref s)) => {
1366                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1367                    }
1368                    Some(DataValue::InternedString(ref s)) => {
1369                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1370                    }
1371                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
1372                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
1373                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
1374                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
1375                    _ => Ok(false),
1376                }
1377            }
1378            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
1379        }
1380    }
1381
1382    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
1383        match expr {
1384            SqlExpression::Column(name) => Ok(name.clone()),
1385            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
1386        }
1387    }
1388
1389    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
1390        match expr {
1391            SqlExpression::StringLiteral(s) => Ok(s.clone()),
1392            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
1393        }
1394    }
1395
1396    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
1397        match expr {
1398            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
1399            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
1400            SqlExpression::NumberLiteral(n) => {
1401                if let Ok(num) = n.parse::<f64>() {
1402                    Ok(ExprValue::Number(num))
1403                } else {
1404                    Ok(ExprValue::String(n.clone()))
1405                }
1406            }
1407            SqlExpression::DateTimeConstructor {
1408                year,
1409                month,
1410                day,
1411                hour,
1412                minute,
1413                second,
1414            } => {
1415                // Create a DateTime from the constructor
1416                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
1417                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1418                let naive_time = NaiveTime::from_hms_opt(
1419                    hour.unwrap_or(0),
1420                    minute.unwrap_or(0),
1421                    second.unwrap_or(0),
1422                )
1423                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1424                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
1425                let datetime = Utc.from_utc_datetime(&naive_datetime);
1426                Ok(ExprValue::DateTime(datetime))
1427            }
1428            SqlExpression::DateTimeToday {
1429                hour,
1430                minute,
1431                second,
1432            } => {
1433                // Get today's date with optional time
1434                let today = Local::now().date_naive();
1435                let time = NaiveTime::from_hms_opt(
1436                    hour.unwrap_or(0),
1437                    minute.unwrap_or(0),
1438                    second.unwrap_or(0),
1439                )
1440                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1441                let naive_datetime = NaiveDateTime::new(today, time);
1442                let datetime = Utc.from_utc_datetime(&naive_datetime);
1443                Ok(ExprValue::DateTime(datetime))
1444            }
1445            _ => Ok(ExprValue::Null),
1446        }
1447    }
1448
1449    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
1450    fn evaluate_case_expression_as_bool(
1451        &self,
1452        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1453        else_branch: &Option<Box<SqlExpression>>,
1454        row_index: usize,
1455    ) -> Result<bool> {
1456        debug!(
1457            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1458            row_index
1459        );
1460
1461        // Evaluate each WHEN condition in order
1462        for branch in when_branches {
1463            // Evaluate the condition as a boolean
1464            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1465
1466            if condition_result {
1467                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1468                // Evaluate the result and convert to boolean
1469                return self.evaluate_expression_as_bool(&branch.result, row_index);
1470            }
1471        }
1472
1473        // If no WHEN condition matched, evaluate ELSE clause (or return false)
1474        if let Some(else_expr) = else_branch {
1475            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1476            self.evaluate_expression_as_bool(else_expr, row_index)
1477        } else {
1478            debug!("CASE: No WHEN matched and no ELSE, returning false");
1479            Ok(false)
1480        }
1481    }
1482
1483    /// Helper method to evaluate any expression as a boolean
1484    fn evaluate_expression_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
1485        match expr {
1486            // For expressions that naturally return booleans, use the existing evaluator
1487            SqlExpression::BinaryOp { .. }
1488            | SqlExpression::InList { .. }
1489            | SqlExpression::NotInList { .. }
1490            | SqlExpression::Between { .. }
1491            | SqlExpression::Not { .. }
1492            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1493            // For CASE expressions, recurse
1494            SqlExpression::CaseExpression {
1495                when_branches,
1496                else_branch,
1497            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1498            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1499            _ => {
1500                // Use ArithmeticEvaluator to get the value, then convert to boolean
1501                let mut evaluator =
1502                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::with_date_notation(
1503                        self.table,
1504                        self.date_notation.clone(),
1505                    );
1506                let value = evaluator.evaluate(expr, row_index)?;
1507
1508                match value {
1509                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1510                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1511                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1512                    crate::data::datatable::DataValue::Null => Ok(false),
1513                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1514                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1515                    _ => Ok(true), // Other types are considered truthy
1516                }
1517            }
1518        }
1519    }
1520}
1521
1522enum ExprValue {
1523    String(String),
1524    Number(f64),
1525    Boolean(bool),
1526    DateTime(DateTime<Utc>),
1527    Null,
1528}