sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
4use anyhow::{anyhow, Result};
5use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
6use tracing::debug;
7
8/// Evaluates WHERE clauses from recursive_parser directly against DataTable
9pub struct RecursiveWhereEvaluator<'a> {
10    table: &'a DataTable,
11    case_insensitive: bool,
12}
13
14impl<'a> RecursiveWhereEvaluator<'a> {
15    pub fn new(table: &'a DataTable) -> Self {
16        Self {
17            table,
18            case_insensitive: false,
19        }
20    }
21
22    /// Find a column name similar to the given name using edit distance
23    fn find_similar_column(&self, name: &str) -> Option<String> {
24        let columns = self.table.column_names();
25        let mut best_match: Option<(String, usize)> = None;
26
27        for col in columns {
28            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
29            // Only suggest if distance is small (likely a typo)
30            // Allow up to 3 edits for longer names
31            let max_distance = if name.len() > 10 { 3 } else { 2 };
32            if distance <= max_distance {
33                match &best_match {
34                    None => best_match = Some((col, distance)),
35                    Some((_, best_dist)) if distance < *best_dist => {
36                        best_match = Some((col, distance));
37                    }
38                    _ => {}
39                }
40            }
41        }
42
43        best_match.map(|(name, _)| name)
44    }
45
46    /// Calculate Levenshtein edit distance between two strings
47    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
48        let len1 = s1.len();
49        let len2 = s2.len();
50        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
51
52        for i in 0..=len1 {
53            matrix[i][0] = i;
54        }
55        for j in 0..=len2 {
56            matrix[0][j] = j;
57        }
58
59        for (i, c1) in s1.chars().enumerate() {
60            for (j, c2) in s2.chars().enumerate() {
61                let cost = if c1 == c2 { 0 } else { 1 };
62                matrix[i + 1][j + 1] = std::cmp::min(
63                    matrix[i][j + 1] + 1, // deletion
64                    std::cmp::min(
65                        matrix[i + 1][j] + 1, // insertion
66                        matrix[i][j] + cost,  // substitution
67                    ),
68                );
69            }
70        }
71
72        matrix[len1][len2]
73    }
74
75    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
76        Self {
77            table,
78            case_insensitive,
79        }
80    }
81
82    /// Helper function to compare two datetime values based on the operator
83    fn compare_datetime(op: &str, left: &DateTime<Utc>, right: &DateTime<Utc>) -> bool {
84        match op {
85            "=" => left == right,
86            "!=" | "<>" => left != right,
87            ">" => left > right,
88            ">=" => left >= right,
89            "<" => left < right,
90            "<=" => left <= right,
91            _ => false,
92        }
93    }
94
95    /// Evaluate the Length() method on a column value
96    fn evaluate_length(
97        &self,
98        object: &str,
99        row_index: usize,
100    ) -> Result<(Option<DataValue>, String)> {
101        let col_index = self.table.get_column_index(object).ok_or_else(|| {
102            let suggestion = self.find_similar_column(object);
103            match suggestion {
104                Some(similar) => {
105                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
106                }
107                None => anyhow!("Column '{}' not found", object),
108            }
109        })?;
110
111        let value = self.table.get_value(row_index, col_index);
112        let length_value = match value {
113            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
114            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
115            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
116            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
117            _ => Some(DataValue::Integer(0)),
118        };
119        Ok((length_value, format!("{}.Length()", object)))
120    }
121
122    /// Evaluate the IndexOf() method on a column value
123    fn evaluate_indexof(
124        &self,
125        object: &str,
126        search_str: &str,
127        row_index: usize,
128    ) -> Result<(Option<DataValue>, String)> {
129        let col_index = self.table.get_column_index(object).ok_or_else(|| {
130            let suggestion = self.find_similar_column(object);
131            match suggestion {
132                Some(similar) => {
133                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
134                }
135                None => anyhow!("Column '{}' not found", object),
136            }
137        })?;
138
139        let value = self.table.get_value(row_index, col_index);
140        let index_value = match value {
141            Some(DataValue::String(s)) => {
142                // Case-insensitive search by default, following Contains behavior
143                let pos = s
144                    .to_lowercase()
145                    .find(&search_str.to_lowercase())
146                    .map(|idx| idx as i64)
147                    .unwrap_or(-1);
148                Some(DataValue::Integer(pos))
149            }
150            Some(DataValue::InternedString(s)) => {
151                let pos = s
152                    .to_lowercase()
153                    .find(&search_str.to_lowercase())
154                    .map(|idx| idx as i64)
155                    .unwrap_or(-1);
156                Some(DataValue::Integer(pos))
157            }
158            Some(DataValue::Integer(n)) => {
159                let str_val = n.to_string();
160                let pos = str_val.find(search_str).map(|idx| idx as i64).unwrap_or(-1);
161                Some(DataValue::Integer(pos))
162            }
163            Some(DataValue::Float(f)) => {
164                let str_val = f.to_string();
165                let pos = str_val.find(search_str).map(|idx| idx as i64).unwrap_or(-1);
166                Some(DataValue::Integer(pos))
167            }
168            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
169        };
170
171        if row_index < 3 {
172            debug!(
173                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
174                row_index, search_str, index_value
175            );
176        }
177        Ok((index_value, format!("{}.IndexOf('{}')", object, search_str)))
178    }
179
180    /// Apply the appropriate trim operation based on trim_type
181    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
182        match trim_type {
183            "trim" => s.trim(),
184            "trimstart" => s.trim_start(),
185            "trimend" => s.trim_end(),
186            _ => s,
187        }
188    }
189
190    /// Evaluate trim methods (Trim, TrimStart, TrimEnd) on a column value
191    fn evaluate_trim(
192        &self,
193        object: &str,
194        row_index: usize,
195        trim_type: &str,
196    ) -> Result<(Option<DataValue>, String)> {
197        let col_index = self.table.get_column_index(object).ok_or_else(|| {
198            let suggestion = self.find_similar_column(object);
199            match suggestion {
200                Some(similar) => {
201                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
202                }
203                None => anyhow!("Column '{}' not found", object),
204            }
205        })?;
206
207        let value = self.table.get_value(row_index, col_index);
208        let trimmed_value = match value {
209            Some(DataValue::String(s)) => Some(DataValue::String(
210                Self::apply_trim(&s, trim_type).to_string(),
211            )),
212            Some(DataValue::InternedString(s)) => Some(DataValue::String(
213                Self::apply_trim(&s, trim_type).to_string(),
214            )),
215            Some(DataValue::Integer(n)) => {
216                let str_val = n.to_string();
217                Some(DataValue::String(
218                    Self::apply_trim(&str_val, trim_type).to_string(),
219                ))
220            }
221            Some(DataValue::Float(f)) => {
222                let str_val = f.to_string();
223                Some(DataValue::String(
224                    Self::apply_trim(&str_val, trim_type).to_string(),
225                ))
226            }
227            _ => Some(DataValue::String(String::new())),
228        };
229
230        let method_name = match trim_type {
231            "trim" => "Trim",
232            "trimstart" => "TrimStart",
233            "trimend" => "TrimEnd",
234            _ => "Trim",
235        };
236        Ok((trimmed_value, format!("{}.{}()", object, method_name)))
237    }
238
239    /// Evaluate a WHERE clause for a specific row
240    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
241        // Only log for first few rows to avoid performance impact
242        if row_index < 3 {
243            debug!(
244                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
245                row_index,
246                where_clause.conditions.len(),
247                self.case_insensitive
248            );
249        }
250
251        if where_clause.conditions.is_empty() {
252            if row_index < 3 {
253                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
254            }
255            return Ok(true);
256        }
257
258        // Evaluate first condition
259        if row_index < 3 {
260            debug!(
261                "RecursiveWhereEvaluator: evaluate() - evaluating first condition for row {}",
262                row_index
263            );
264        }
265        let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
266
267        // Apply connectors (AND/OR) with subsequent conditions
268        for i in 1..where_clause.conditions.len() {
269            let next_result = self.evaluate_condition(&where_clause.conditions[i], row_index)?;
270
271            // Use the connector from the previous condition
272            if let Some(connector) = &where_clause.conditions[i - 1].connector {
273                result = match connector {
274                    LogicalOp::And => result && next_result,
275                    LogicalOp::Or => result || next_result,
276                };
277            }
278        }
279
280        Ok(result)
281    }
282
283    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
284        // Only log first few rows to avoid performance impact
285        if row_index < 3 {
286            debug!(
287                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
288                row_index
289            );
290        }
291        let result = self.evaluate_expression(&condition.expr, row_index);
292        if row_index < 3 {
293            debug!(
294                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
295                row_index, result
296            );
297        }
298        result
299    }
300
301    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
302        // Only log first few rows to avoid performance impact
303        if row_index < 3 {
304            debug!(
305                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
306                row_index, expr
307            );
308        }
309
310        let result = match expr {
311            SqlExpression::BinaryOp { left, op, right } => {
312                self.evaluate_binary_op(left, op, right, row_index)
313            }
314            SqlExpression::InList { expr, values } => {
315                self.evaluate_in_list(expr, values, row_index, false)
316            }
317            SqlExpression::NotInList { expr, values } => {
318                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
319                Ok(!in_result)
320            }
321            SqlExpression::Between { expr, lower, upper } => {
322                self.evaluate_between(expr, lower, upper, row_index)
323            }
324            SqlExpression::Not { expr } => {
325                let inner_result = self.evaluate_expression(expr, row_index)?;
326                Ok(!inner_result)
327            }
328            SqlExpression::MethodCall {
329                object,
330                method,
331                args,
332            } => {
333                if row_index < 3 {
334                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
335                }
336                self.evaluate_method_call(object, method, args, row_index)
337            }
338            _ => {
339                if row_index < 3 {
340                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
341                }
342                Ok(false) // Default to false for unsupported expressions
343            }
344        };
345
346        if row_index < 3 {
347            debug!(
348                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
349                row_index, result
350            );
351        }
352        result
353    }
354
355    fn evaluate_binary_op(
356        &self,
357        left: &SqlExpression,
358        op: &str,
359        right: &SqlExpression,
360        row_index: usize,
361    ) -> Result<bool> {
362        // Only log first few rows to avoid performance impact
363        if row_index < 3 {
364            debug!(
365                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
366                row_index, op
367            );
368        }
369
370        // Handle left side - could be a column or a method call
371        let (cell_value, column_name) = match left {
372            SqlExpression::MethodCall {
373                object,
374                method,
375                args,
376            } => {
377                // Handle method calls that return values (like Length(), IndexOf())
378                match method.to_lowercase().as_str() {
379                    "length" => {
380                        if !args.is_empty() {
381                            return Err(anyhow::anyhow!("Length() takes no arguments"));
382                        }
383                        self.evaluate_length(object, row_index)?
384                    }
385                    "indexof" => {
386                        if args.len() != 1 {
387                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
388                        }
389                        let search_str = self.extract_string_value(&args[0])?;
390                        self.evaluate_indexof(object, &search_str, row_index)?
391                    }
392                    "trim" => {
393                        if !args.is_empty() {
394                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
395                        }
396                        self.evaluate_trim(object, row_index, "trim")?
397                    }
398                    "trimstart" => {
399                        if !args.is_empty() {
400                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
401                        }
402                        self.evaluate_trim(object, row_index, "trimstart")?
403                    }
404                    "trimend" => {
405                        if !args.is_empty() {
406                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
407                        }
408                        self.evaluate_trim(object, row_index, "trimend")?
409                    }
410                    _ => {
411                        return Err(anyhow::anyhow!(
412                            "Method '{}' cannot be used in comparisons",
413                            method
414                        ));
415                    }
416                }
417            }
418            SqlExpression::BinaryOp {
419                left: _expr_left,
420                op: arith_op,
421                right: _expr_right,
422            } if matches!(arith_op.as_str(), "+" | "-" | "*" | "/") => {
423                // Handle arithmetic expressions using ArithmeticEvaluator
424                let evaluator = ArithmeticEvaluator::new(self.table);
425                let computed_value = evaluator.evaluate(left, row_index)?;
426                if row_index < 3 {
427                    debug!(
428                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed arithmetic expression = {:?}",
429                        computed_value
430                    );
431                }
432                (Some(computed_value), "computed_expression".to_string())
433            }
434            SqlExpression::FunctionCall { name, .. } => {
435                // Handle function calls using ArithmeticEvaluator
436                let evaluator = ArithmeticEvaluator::new(self.table);
437                let computed_value = evaluator.evaluate(left, row_index)?;
438                if row_index < 3 {
439                    debug!(
440                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed function {} = {:?}",
441                        name, computed_value
442                    );
443                }
444                (Some(computed_value), format!("{}()", name))
445            }
446            _ => {
447                // Regular column reference
448                let column_name = self.extract_column_name(left)?;
449                if row_index < 3 {
450                    debug!(
451                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
452                        column_name
453                    );
454                }
455
456                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
457                    let suggestion = self.find_similar_column(&column_name);
458                    match suggestion {
459                        Some(similar) => anyhow!(
460                            "Column '{}' not found. Did you mean '{}'?",
461                            column_name,
462                            similar
463                        ),
464                        None => anyhow!("Column '{}' not found", column_name),
465                    }
466                })?;
467
468                let cell_value = self.table.get_value(row_index, col_index).cloned();
469                (cell_value, column_name)
470            }
471        };
472
473        if row_index < 3 {
474            debug!(
475                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
476                row_index, column_name, cell_value
477            );
478        }
479
480        // Get comparison value from right side
481        let compare_value = self.extract_value(right)?;
482
483        // Perform comparison
484        match (cell_value, op.to_uppercase().as_str(), &compare_value) {
485            (Some(DataValue::String(ref a)), "=", ExprValue::String(b)) => {
486                if row_index < 3 {
487                    debug!(
488                        "RecursiveWhereEvaluator: String comparison '{}' = '{}' (case_insensitive={})",
489                        a, b, self.case_insensitive
490                    );
491                }
492                if self.case_insensitive {
493                    Ok(a.to_lowercase() == b.to_lowercase())
494                } else {
495                    Ok(a == b)
496                }
497            }
498            (Some(DataValue::InternedString(ref a)), "=", ExprValue::String(b)) => {
499                if row_index < 3 {
500                    debug!(
501                        "RecursiveWhereEvaluator: InternedString comparison '{}' = '{}' (case_insensitive={})",
502                        a, b, self.case_insensitive
503                    );
504                }
505                if self.case_insensitive {
506                    Ok(a.to_lowercase() == b.to_lowercase())
507                } else {
508                    Ok(a.as_ref() == b)
509                }
510            }
511            (Some(DataValue::String(ref a)), "!=", ExprValue::String(b))
512            | (Some(DataValue::String(ref a)), "<>", ExprValue::String(b)) => {
513                if row_index < 3 {
514                    debug!(
515                        "RecursiveWhereEvaluator: String comparison '{}' != '{}' (case_insensitive={})",
516                        a, b, self.case_insensitive
517                    );
518                }
519                if self.case_insensitive {
520                    Ok(a.to_lowercase() != b.to_lowercase())
521                } else {
522                    Ok(a != b)
523                }
524            }
525            (Some(DataValue::InternedString(ref a)), "!=", ExprValue::String(b))
526            | (Some(DataValue::InternedString(ref a)), "<>", ExprValue::String(b)) => {
527                if row_index < 3 {
528                    debug!(
529                        "RecursiveWhereEvaluator: InternedString comparison '{}' != '{}' (case_insensitive={})",
530                        a, b, self.case_insensitive
531                    );
532                }
533                if self.case_insensitive {
534                    Ok(a.to_lowercase() != b.to_lowercase())
535                } else {
536                    Ok(a.as_ref() != b)
537                }
538            }
539            (Some(DataValue::String(ref a)), ">", ExprValue::String(b)) => {
540                if self.case_insensitive {
541                    Ok(a.to_lowercase() > b.to_lowercase())
542                } else {
543                    Ok(a > b)
544                }
545            }
546            (Some(DataValue::InternedString(ref a)), ">", ExprValue::String(b)) => {
547                if self.case_insensitive {
548                    Ok(a.to_lowercase() > b.to_lowercase())
549                } else {
550                    Ok(a.as_ref() > b)
551                }
552            }
553            (Some(DataValue::String(ref a)), ">=", ExprValue::String(b)) => {
554                if self.case_insensitive {
555                    Ok(a.to_lowercase() >= b.to_lowercase())
556                } else {
557                    Ok(a >= b)
558                }
559            }
560            (Some(DataValue::InternedString(ref a)), ">=", ExprValue::String(b)) => {
561                if self.case_insensitive {
562                    Ok(a.to_lowercase() >= b.to_lowercase())
563                } else {
564                    Ok(a.as_ref() >= b)
565                }
566            }
567            (Some(DataValue::String(ref a)), "<", ExprValue::String(b)) => {
568                if self.case_insensitive {
569                    Ok(a.to_lowercase() < b.to_lowercase())
570                } else {
571                    Ok(a < b)
572                }
573            }
574            (Some(DataValue::InternedString(ref a)), "<", ExprValue::String(b)) => {
575                if self.case_insensitive {
576                    Ok(a.to_lowercase() < b.to_lowercase())
577                } else {
578                    Ok(a.as_ref() < b)
579                }
580            }
581            (Some(DataValue::String(ref a)), "<=", ExprValue::String(b)) => {
582                if self.case_insensitive {
583                    Ok(a.to_lowercase() <= b.to_lowercase())
584                } else {
585                    Ok(a <= b)
586                }
587            }
588            (Some(DataValue::InternedString(ref a)), "<=", ExprValue::String(b)) => {
589                if self.case_insensitive {
590                    Ok(a.to_lowercase() <= b.to_lowercase())
591                } else {
592                    Ok(a.as_ref() <= b)
593                }
594            }
595
596            (Some(DataValue::Integer(a)), "=", ExprValue::Number(b)) => Ok(a as f64 == *b),
597            (Some(DataValue::Integer(a)), "!=", ExprValue::Number(b))
598            | (Some(DataValue::Integer(a)), "<>", ExprValue::Number(b)) => Ok(a as f64 != *b),
599            (Some(DataValue::Integer(a)), ">", ExprValue::Number(b)) => Ok(a as f64 > *b),
600            (Some(DataValue::Integer(a)), ">=", ExprValue::Number(b)) => Ok(a as f64 >= *b),
601            (Some(DataValue::Integer(a)), "<", ExprValue::Number(b)) => Ok((a as f64) < *b),
602            (Some(DataValue::Integer(a)), "<=", ExprValue::Number(b)) => Ok(a as f64 <= *b),
603
604            (Some(DataValue::Float(a)), "=", ExprValue::Number(b)) => {
605                Ok((a - b).abs() < f64::EPSILON)
606            }
607            (Some(DataValue::Float(a)), "!=", ExprValue::Number(b))
608            | (Some(DataValue::Float(a)), "<>", ExprValue::Number(b)) => {
609                Ok((a - b).abs() >= f64::EPSILON)
610            }
611            (Some(DataValue::Float(a)), ">", ExprValue::Number(b)) => Ok(a > *b),
612            (Some(DataValue::Float(a)), ">=", ExprValue::Number(b)) => Ok(a >= *b),
613            (Some(DataValue::Float(a)), "<", ExprValue::Number(b)) => Ok(a < *b),
614            (Some(DataValue::Float(a)), "<=", ExprValue::Number(b)) => Ok(a <= *b),
615
616            // LIKE operator
617            (Some(DataValue::String(ref text)), "LIKE", ExprValue::String(pattern)) => {
618                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
619                let regex = regex::RegexBuilder::new(&format!("^{}$", regex_pattern))
620                    .case_insensitive(true)
621                    .build()
622                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
623                Ok(regex.is_match(text))
624            }
625            (Some(DataValue::InternedString(ref text)), "LIKE", ExprValue::String(pattern)) => {
626                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
627                let regex = regex::RegexBuilder::new(&format!("^{}$", regex_pattern))
628                    .case_insensitive(true)
629                    .build()
630                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
631                Ok(regex.is_match(text.as_ref()))
632            }
633
634            // IS NULL / IS NOT NULL
635            (None, "IS", ExprValue::Null) | (Some(DataValue::Null), "IS", ExprValue::Null) => {
636                Ok(true)
637            }
638            (Some(_), "IS", ExprValue::Null) => Ok(false),
639            (None, "IS NOT", ExprValue::Null)
640            | (Some(DataValue::Null), "IS NOT", ExprValue::Null) => Ok(false),
641            (Some(_), "IS NOT", ExprValue::Null) => Ok(true),
642
643            // DateTime comparisons
644            (Some(DataValue::String(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
645                if row_index < 3 {
646                    debug!(
647                        "RecursiveWhereEvaluator: DateTime comparison '{}' {} '{}' - attempting parse",
648                        date_str,
649                        op_str,
650                        dt.format("%Y-%m-%d %H:%M:%S")
651                    );
652                }
653
654                // Try to parse the string as a datetime - first try ISO 8601 with UTC
655                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
656                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
657                    if row_index < 3 {
658                        debug!(
659                            "RecursiveWhereEvaluator: DateTime parsed as UTC: '{}' {} '{}' = {}",
660                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
661                            op_str,
662                            dt.format("%Y-%m-%d %H:%M:%S"),
663                            result
664                        );
665                    }
666                    Ok(result)
667                }
668                // Try ISO 8601 format without timezone (assume UTC)
669                else if let Ok(parsed_dt) =
670                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%dT%H:%M:%S")
671                {
672                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
673                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
674                    if row_index < 3 {
675                        debug!(
676                            "RecursiveWhereEvaluator: DateTime parsed as ISO 8601: '{}' {} '{}' = {}",
677                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
678                            op_str,
679                            dt.format("%Y-%m-%d %H:%M:%S"),
680                            result
681                        );
682                    }
683                    Ok(result)
684                }
685                // Try standard datetime format
686                else if let Ok(parsed_dt) =
687                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%d %H:%M:%S")
688                {
689                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
690                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
691                    if row_index < 3 {
692                        debug!(
693                            "RecursiveWhereEvaluator: DateTime parsed as standard format: '{}' {} '{}' = {}",
694                            parsed_utc.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
695                        );
696                    }
697                    Ok(result)
698                }
699                // Try date-only format
700                else if let Ok(parsed_date) = NaiveDate::parse_from_str(&date_str, "%Y-%m-%d") {
701                    let parsed_dt =
702                        NaiveDateTime::new(parsed_date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
703                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
704                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
705                    if row_index < 3 {
706                        debug!(
707                            "RecursiveWhereEvaluator: DateTime parsed as date-only: '{}' {} '{}' = {}",
708                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
709                            op_str,
710                            dt.format("%Y-%m-%d %H:%M:%S"),
711                            result
712                        );
713                    }
714                    Ok(result)
715                } else {
716                    if row_index < 3 {
717                        debug!(
718                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
719                            date_str
720                        );
721                    }
722                    Ok(false)
723                }
724            }
725            (Some(DataValue::InternedString(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
726                if row_index < 3 {
727                    debug!(
728                        "RecursiveWhereEvaluator: DateTime comparison (interned) '{}' {} '{}' - attempting parse",
729                        date_str,
730                        op_str,
731                        dt.format("%Y-%m-%d %H:%M:%S")
732                    );
733                }
734
735                // Try to parse the string as a datetime - first try ISO 8601 with UTC
736                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
737                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
738                    if row_index < 3 {
739                        debug!(
740                            "RecursiveWhereEvaluator: DateTime parsed as UTC: '{}' {} '{}' = {}",
741                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
742                            op_str,
743                            dt.format("%Y-%m-%d %H:%M:%S"),
744                            result
745                        );
746                    }
747                    Ok(result)
748                }
749                // Try ISO 8601 format without timezone (assume UTC)
750                else if let Ok(parsed_dt) =
751                    NaiveDateTime::parse_from_str(date_str.as_ref(), "%Y-%m-%dT%H:%M:%S")
752                {
753                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
754                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
755                    if row_index < 3 {
756                        debug!(
757                            "RecursiveWhereEvaluator: DateTime parsed as ISO 8601: '{}' {} '{}' = {}",
758                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
759                            op_str,
760                            dt.format("%Y-%m-%d %H:%M:%S"),
761                            result
762                        );
763                    }
764                    Ok(result)
765                }
766                // Try standard datetime format
767                else if let Ok(parsed_dt) =
768                    NaiveDateTime::parse_from_str(date_str.as_ref(), "%Y-%m-%d %H:%M:%S")
769                {
770                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
771                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
772                    if row_index < 3 {
773                        debug!(
774                            "RecursiveWhereEvaluator: DateTime parsed as standard format: '{}' {} '{}' = {}",
775                            parsed_utc.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
776                        );
777                    }
778                    Ok(result)
779                }
780                // Try date-only format
781                else if let Ok(parsed_date) =
782                    NaiveDate::parse_from_str(date_str.as_ref(), "%Y-%m-%d")
783                {
784                    let parsed_dt =
785                        NaiveDateTime::new(parsed_date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
786                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
787                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
788                    if row_index < 3 {
789                        debug!(
790                            "RecursiveWhereEvaluator: DateTime parsed as date-only: '{}' {} '{}' = {}",
791                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
792                            op_str,
793                            dt.format("%Y-%m-%d %H:%M:%S"),
794                            result
795                        );
796                    }
797                    Ok(result)
798                } else {
799                    if row_index < 3 {
800                        debug!(
801                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
802                            date_str
803                        );
804                    }
805                    Ok(false)
806                }
807            }
808
809            // DateTime vs DateTime comparisons (when column is already parsed as DateTime)
810            (Some(DataValue::DateTime(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
811                if row_index < 3 {
812                    debug!(
813                        "RecursiveWhereEvaluator: DateTime vs DateTime comparison '{}' {} '{}' - direct comparison",
814                        date_str, op_str, dt.format("%Y-%m-%d %H:%M:%S")
815                    );
816                }
817
818                // Parse the DataValue::DateTime string to DateTime<Utc>
819                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
820                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
821                    if row_index < 3 {
822                        debug!(
823                            "RecursiveWhereEvaluator: DateTime vs DateTime parsed successfully: '{}' {} '{}' = {}",
824                            parsed_dt.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
825                        );
826                    }
827                    Ok(result)
828                }
829                // Try ISO 8601 format without timezone (assume UTC)
830                else if let Ok(parsed_dt) =
831                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%dT%H:%M:%S")
832                {
833                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
834                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
835                    if row_index < 3 {
836                        debug!(
837                            "RecursiveWhereEvaluator: DateTime vs DateTime ISO 8601: '{}' {} '{}' = {}",
838                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
839                            op_str,
840                            dt.format("%Y-%m-%d %H:%M:%S"),
841                            result
842                        );
843                    }
844                    Ok(result)
845                } else {
846                    if row_index < 3 {
847                        debug!(
848                            "RecursiveWhereEvaluator: DateTime vs DateTime parse FAILED for '{}' - no matching format",
849                            date_str
850                        );
851                    }
852                    Ok(false)
853                }
854            }
855
856            _ => Ok(false),
857        }
858    }
859
860    fn evaluate_in_list(
861        &self,
862        expr: &SqlExpression,
863        values: &[SqlExpression],
864        row_index: usize,
865        _ignore_case: bool,
866    ) -> Result<bool> {
867        let column_name = self.extract_column_name(expr)?;
868        let col_index = self
869            .table
870            .get_column_index(&column_name)
871            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
872
873        let cell_value = self.table.get_value(row_index, col_index).cloned();
874
875        for value_expr in values {
876            let compare_value = self.extract_value(value_expr)?;
877            let matches = match (cell_value.as_ref(), &compare_value) {
878                (Some(DataValue::String(a)), ExprValue::String(b)) => {
879                    if self.case_insensitive {
880                        if row_index < 3 {
881                            debug!("RecursiveWhereEvaluator: IN list string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
882                        }
883                        a.to_lowercase() == b.to_lowercase()
884                    } else {
885                        a == b
886                    }
887                }
888                (Some(DataValue::InternedString(a)), ExprValue::String(b)) => {
889                    if self.case_insensitive {
890                        if row_index < 3 {
891                            debug!("RecursiveWhereEvaluator: IN list interned string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
892                        }
893                        a.to_lowercase() == b.to_lowercase()
894                    } else {
895                        a.as_ref() == b
896                    }
897                }
898                (Some(DataValue::Integer(a)), ExprValue::Number(b)) => *a as f64 == *b,
899                (Some(DataValue::Float(a)), ExprValue::Number(b)) => (*a - b).abs() < f64::EPSILON,
900                _ => false,
901            };
902
903            if matches {
904                return Ok(true);
905            }
906        }
907
908        Ok(false)
909    }
910
911    fn evaluate_between(
912        &self,
913        expr: &SqlExpression,
914        lower: &SqlExpression,
915        upper: &SqlExpression,
916        row_index: usize,
917    ) -> Result<bool> {
918        let column_name = self.extract_column_name(expr)?;
919        let col_index = self
920            .table
921            .get_column_index(&column_name)
922            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
923
924        let cell_value = self.table.get_value(row_index, col_index).cloned();
925        let lower_value = self.extract_value(lower)?;
926        let upper_value = self.extract_value(upper)?;
927
928        match (cell_value, &lower_value, &upper_value) {
929            (Some(DataValue::Integer(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
930                Ok(n as f64 >= *l && n as f64 <= *u)
931            }
932            (Some(DataValue::Float(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
933                Ok(n >= *l && n <= *u)
934            }
935            (Some(DataValue::String(ref s)), ExprValue::String(l), ExprValue::String(u)) => {
936                Ok(s >= l && s <= u)
937            }
938            (
939                Some(DataValue::InternedString(ref s)),
940                ExprValue::String(l),
941                ExprValue::String(u),
942            ) => Ok(s.as_ref() >= l && s.as_ref() <= u),
943            _ => Ok(false),
944        }
945    }
946
947    fn evaluate_method_call(
948        &self,
949        object: &str,
950        method: &str,
951        args: &[SqlExpression],
952        row_index: usize,
953    ) -> Result<bool> {
954        if row_index < 3 {
955            debug!(
956                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
957                object, method, row_index
958            );
959        }
960
961        // Get column value
962        let col_index = self.table.get_column_index(object).ok_or_else(|| {
963            let suggestion = self.find_similar_column(object);
964            match suggestion {
965                Some(similar) => {
966                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
967                }
968                None => anyhow!("Column '{}' not found", object),
969            }
970        })?;
971
972        let cell_value = self.table.get_value(row_index, col_index).cloned();
973        if row_index < 3 {
974            debug!(
975                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
976                row_index, object, cell_value
977            );
978        }
979
980        match method.to_lowercase().as_str() {
981            "contains" => {
982                if args.len() != 1 {
983                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
984                }
985                let search_str = self.extract_string_value(&args[0])?;
986                // Pre-compute lowercase once instead of for every row
987                let search_lower = search_str.to_lowercase();
988
989                // Type coercion: convert numeric values to strings for string methods
990                match cell_value {
991                    Some(DataValue::String(ref s)) => {
992                        let result = s.to_lowercase().contains(&search_lower);
993                        // Only log first few rows to avoid performance impact
994                        if row_index < 3 {
995                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
996                        }
997                        Ok(result)
998                    }
999                    Some(DataValue::InternedString(ref s)) => {
1000                        let result = s.to_lowercase().contains(&search_lower);
1001                        // Only log first few rows to avoid performance impact
1002                        if row_index < 3 {
1003                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1004                        }
1005                        Ok(result)
1006                    }
1007                    Some(DataValue::Integer(n)) => {
1008                        let str_val = n.to_string();
1009                        let result = str_val.contains(&search_str);
1010                        if row_index < 3 {
1011                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
1012                        }
1013                        Ok(result)
1014                    }
1015                    Some(DataValue::Float(f)) => {
1016                        let str_val = f.to_string();
1017                        let result = str_val.contains(&search_str);
1018                        if row_index < 3 {
1019                            debug!(
1020                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
1021                                row_index, search_str, str_val, result
1022                            );
1023                        }
1024                        Ok(result)
1025                    }
1026                    Some(DataValue::Boolean(b)) => {
1027                        let str_val = b.to_string();
1028                        let result = str_val.contains(&search_str);
1029                        if row_index < 3 {
1030                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
1031                        }
1032                        Ok(result)
1033                    }
1034                    Some(DataValue::DateTime(dt)) => {
1035                        // DateTime columns can use string methods via coercion
1036                        let result = dt.contains(&search_str);
1037                        if row_index < 3 {
1038                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
1039                        }
1040                        Ok(result)
1041                    }
1042                    _ => {
1043                        if row_index < 3 {
1044                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
1045                        }
1046                        Ok(false)
1047                    }
1048                }
1049            }
1050            "startswith" => {
1051                if args.len() != 1 {
1052                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
1053                }
1054                let prefix = self.extract_string_value(&args[0])?;
1055
1056                // Type coercion: convert numeric values to strings for string methods
1057                match cell_value {
1058                    Some(DataValue::String(ref s)) => {
1059                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1060                    }
1061                    Some(DataValue::InternedString(ref s)) => {
1062                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1063                    }
1064                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
1065                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
1066                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
1067                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
1068                    _ => Ok(false),
1069                }
1070            }
1071            "endswith" => {
1072                if args.len() != 1 {
1073                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
1074                }
1075                let suffix = self.extract_string_value(&args[0])?;
1076
1077                // Type coercion: convert numeric values to strings for string methods
1078                match cell_value {
1079                    Some(DataValue::String(ref s)) => {
1080                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1081                    }
1082                    Some(DataValue::InternedString(ref s)) => {
1083                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1084                    }
1085                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
1086                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
1087                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
1088                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
1089                    _ => Ok(false),
1090                }
1091            }
1092            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
1093        }
1094    }
1095
1096    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
1097        match expr {
1098            SqlExpression::Column(name) => Ok(name.clone()),
1099            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
1100        }
1101    }
1102
1103    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
1104        match expr {
1105            SqlExpression::StringLiteral(s) => Ok(s.clone()),
1106            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
1107        }
1108    }
1109
1110    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
1111        match expr {
1112            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
1113            SqlExpression::NumberLiteral(n) => {
1114                if let Ok(num) = n.parse::<f64>() {
1115                    Ok(ExprValue::Number(num))
1116                } else {
1117                    Ok(ExprValue::String(n.clone()))
1118                }
1119            }
1120            SqlExpression::DateTimeConstructor {
1121                year,
1122                month,
1123                day,
1124                hour,
1125                minute,
1126                second,
1127            } => {
1128                // Create a DateTime from the constructor
1129                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
1130                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1131                let naive_time = NaiveTime::from_hms_opt(
1132                    hour.unwrap_or(0),
1133                    minute.unwrap_or(0),
1134                    second.unwrap_or(0),
1135                )
1136                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1137                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
1138                let datetime = Utc.from_utc_datetime(&naive_datetime);
1139                Ok(ExprValue::DateTime(datetime))
1140            }
1141            SqlExpression::DateTimeToday {
1142                hour,
1143                minute,
1144                second,
1145            } => {
1146                // Get today's date with optional time
1147                let today = Local::now().date_naive();
1148                let time = NaiveTime::from_hms_opt(
1149                    hour.unwrap_or(0),
1150                    minute.unwrap_or(0),
1151                    second.unwrap_or(0),
1152                )
1153                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1154                let naive_datetime = NaiveDateTime::new(today, time);
1155                let datetime = Utc.from_utc_datetime(&naive_datetime);
1156                Ok(ExprValue::DateTime(datetime))
1157            }
1158            _ => Ok(ExprValue::Null),
1159        }
1160    }
1161}
1162
1163enum ExprValue {
1164    String(String),
1165    Number(f64),
1166    DateTime(DateTime<Utc>),
1167    Null,
1168}