sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
4use anyhow::{anyhow, Result};
5use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
6use tracing::debug;
7
8/// Evaluates WHERE clauses from recursive_parser directly against DataTable
9pub struct RecursiveWhereEvaluator<'a> {
10    table: &'a DataTable,
11    case_insensitive: bool,
12}
13
14impl<'a> RecursiveWhereEvaluator<'a> {
15    pub fn new(table: &'a DataTable) -> Self {
16        Self {
17            table,
18            case_insensitive: false,
19        }
20    }
21
22    /// Find a column name similar to the given name using edit distance
23    fn find_similar_column(&self, name: &str) -> Option<String> {
24        let columns = self.table.column_names();
25        let mut best_match: Option<(String, usize)> = None;
26
27        for col in columns {
28            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
29            // Only suggest if distance is small (likely a typo)
30            // Allow up to 3 edits for longer names
31            let max_distance = if name.len() > 10 { 3 } else { 2 };
32            if distance <= max_distance {
33                match &best_match {
34                    None => best_match = Some((col, distance)),
35                    Some((_, best_dist)) if distance < *best_dist => {
36                        best_match = Some((col, distance));
37                    }
38                    _ => {}
39                }
40            }
41        }
42
43        best_match.map(|(name, _)| name)
44    }
45
46    /// Calculate Levenshtein edit distance between two strings
47    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
48        let len1 = s1.len();
49        let len2 = s2.len();
50        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
51
52        for i in 0..=len1 {
53            matrix[i][0] = i;
54        }
55        for j in 0..=len2 {
56            matrix[0][j] = j;
57        }
58
59        for (i, c1) in s1.chars().enumerate() {
60            for (j, c2) in s2.chars().enumerate() {
61                let cost = if c1 == c2 { 0 } else { 1 };
62                matrix[i + 1][j + 1] = std::cmp::min(
63                    matrix[i][j + 1] + 1, // deletion
64                    std::cmp::min(
65                        matrix[i + 1][j] + 1, // insertion
66                        matrix[i][j] + cost,  // substitution
67                    ),
68                );
69            }
70        }
71
72        matrix[len1][len2]
73    }
74
75    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
76        Self {
77            table,
78            case_insensitive,
79        }
80    }
81
82    /// Helper function to compare two datetime values based on the operator
83    fn compare_datetime(op: &str, left: &DateTime<Utc>, right: &DateTime<Utc>) -> bool {
84        match op {
85            "=" => left == right,
86            "!=" | "<>" => left != right,
87            ">" => left > right,
88            ">=" => left >= right,
89            "<" => left < right,
90            "<=" => left <= right,
91            _ => false,
92        }
93    }
94
95    /// Evaluate the Length() method on a column value
96    fn evaluate_length(
97        &self,
98        object: &str,
99        row_index: usize,
100    ) -> Result<(Option<DataValue>, String)> {
101        let col_index = self.table.get_column_index(object).ok_or_else(|| {
102            let suggestion = self.find_similar_column(object);
103            match suggestion {
104                Some(similar) => {
105                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
106                }
107                None => anyhow!("Column '{}' not found", object),
108            }
109        })?;
110
111        let value = self.table.get_value(row_index, col_index);
112        let length_value = match value {
113            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
114            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
115            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
116            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
117            _ => Some(DataValue::Integer(0)),
118        };
119        Ok((length_value, format!("{}.Length()", object)))
120    }
121
122    /// Evaluate the IndexOf() method on a column value
123    fn evaluate_indexof(
124        &self,
125        object: &str,
126        search_str: &str,
127        row_index: usize,
128    ) -> Result<(Option<DataValue>, String)> {
129        let col_index = self.table.get_column_index(object).ok_or_else(|| {
130            let suggestion = self.find_similar_column(object);
131            match suggestion {
132                Some(similar) => {
133                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
134                }
135                None => anyhow!("Column '{}' not found", object),
136            }
137        })?;
138
139        let value = self.table.get_value(row_index, col_index);
140        let index_value = match value {
141            Some(DataValue::String(s)) => {
142                // Case-insensitive search by default, following Contains behavior
143                let pos = s
144                    .to_lowercase()
145                    .find(&search_str.to_lowercase())
146                    .map(|idx| idx as i64)
147                    .unwrap_or(-1);
148                Some(DataValue::Integer(pos))
149            }
150            Some(DataValue::InternedString(s)) => {
151                let pos = s
152                    .to_lowercase()
153                    .find(&search_str.to_lowercase())
154                    .map(|idx| idx as i64)
155                    .unwrap_or(-1);
156                Some(DataValue::Integer(pos))
157            }
158            Some(DataValue::Integer(n)) => {
159                let str_val = n.to_string();
160                let pos = str_val.find(search_str).map(|idx| idx as i64).unwrap_or(-1);
161                Some(DataValue::Integer(pos))
162            }
163            Some(DataValue::Float(f)) => {
164                let str_val = f.to_string();
165                let pos = str_val.find(search_str).map(|idx| idx as i64).unwrap_or(-1);
166                Some(DataValue::Integer(pos))
167            }
168            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
169        };
170
171        if row_index < 3 {
172            debug!(
173                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
174                row_index, search_str, index_value
175            );
176        }
177        Ok((index_value, format!("{}.IndexOf('{}')", object, search_str)))
178    }
179
180    /// Apply the appropriate trim operation based on trim_type
181    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
182        match trim_type {
183            "trim" => s.trim(),
184            "trimstart" => s.trim_start(),
185            "trimend" => s.trim_end(),
186            _ => s,
187        }
188    }
189
190    /// Evaluate trim methods (Trim, TrimStart, TrimEnd) on a column value
191    fn evaluate_trim(
192        &self,
193        object: &str,
194        row_index: usize,
195        trim_type: &str,
196    ) -> Result<(Option<DataValue>, String)> {
197        let col_index = self.table.get_column_index(object).ok_or_else(|| {
198            let suggestion = self.find_similar_column(object);
199            match suggestion {
200                Some(similar) => {
201                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
202                }
203                None => anyhow!("Column '{}' not found", object),
204            }
205        })?;
206
207        let value = self.table.get_value(row_index, col_index);
208        let trimmed_value = match value {
209            Some(DataValue::String(s)) => Some(DataValue::String(
210                Self::apply_trim(&s, trim_type).to_string(),
211            )),
212            Some(DataValue::InternedString(s)) => Some(DataValue::String(
213                Self::apply_trim(&s, trim_type).to_string(),
214            )),
215            Some(DataValue::Integer(n)) => {
216                let str_val = n.to_string();
217                Some(DataValue::String(
218                    Self::apply_trim(&str_val, trim_type).to_string(),
219                ))
220            }
221            Some(DataValue::Float(f)) => {
222                let str_val = f.to_string();
223                Some(DataValue::String(
224                    Self::apply_trim(&str_val, trim_type).to_string(),
225                ))
226            }
227            _ => Some(DataValue::String(String::new())),
228        };
229
230        let method_name = match trim_type {
231            "trim" => "Trim",
232            "trimstart" => "TrimStart",
233            "trimend" => "TrimEnd",
234            _ => "Trim",
235        };
236        Ok((trimmed_value, format!("{}.{}()", object, method_name)))
237    }
238
239    /// Evaluate a WHERE clause for a specific row
240    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
241        // Only log for first few rows to avoid performance impact
242        if row_index < 3 {
243            debug!(
244                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
245                row_index,
246                where_clause.conditions.len(),
247                self.case_insensitive
248            );
249        }
250
251        if where_clause.conditions.is_empty() {
252            if row_index < 3 {
253                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
254            }
255            return Ok(true);
256        }
257
258        // Evaluate first condition
259        if row_index < 3 {
260            debug!(
261                "RecursiveWhereEvaluator: evaluate() - evaluating first condition for row {}",
262                row_index
263            );
264        }
265        let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
266
267        // Apply connectors (AND/OR) with subsequent conditions
268        for i in 1..where_clause.conditions.len() {
269            let next_result = self.evaluate_condition(&where_clause.conditions[i], row_index)?;
270
271            // Use the connector from the previous condition
272            if let Some(connector) = &where_clause.conditions[i - 1].connector {
273                result = match connector {
274                    LogicalOp::And => result && next_result,
275                    LogicalOp::Or => result || next_result,
276                };
277            }
278        }
279
280        Ok(result)
281    }
282
283    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
284        // Only log first few rows to avoid performance impact
285        if row_index < 3 {
286            debug!(
287                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
288                row_index
289            );
290        }
291        let result = self.evaluate_expression(&condition.expr, row_index);
292        if row_index < 3 {
293            debug!(
294                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
295                row_index, result
296            );
297        }
298        result
299    }
300
301    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
302        // Only log first few rows to avoid performance impact
303        if row_index < 3 {
304            debug!(
305                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
306                row_index, expr
307            );
308        }
309
310        let result = match expr {
311            SqlExpression::BinaryOp { left, op, right } => {
312                self.evaluate_binary_op(left, op, right, row_index)
313            }
314            SqlExpression::InList { expr, values } => {
315                self.evaluate_in_list(expr, values, row_index, false)
316            }
317            SqlExpression::NotInList { expr, values } => {
318                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
319                Ok(!in_result)
320            }
321            SqlExpression::Between { expr, lower, upper } => {
322                self.evaluate_between(expr, lower, upper, row_index)
323            }
324            SqlExpression::Not { expr } => {
325                let inner_result = self.evaluate_expression(expr, row_index)?;
326                Ok(!inner_result)
327            }
328            SqlExpression::MethodCall {
329                object,
330                method,
331                args,
332            } => {
333                if row_index < 3 {
334                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
335                }
336                self.evaluate_method_call(object, method, args, row_index)
337            }
338            SqlExpression::CaseExpression {
339                when_branches,
340                else_branch,
341            } => {
342                if row_index < 3 {
343                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
344                }
345                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
346            }
347            _ => {
348                if row_index < 3 {
349                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
350                }
351                Ok(false) // Default to false for unsupported expressions
352            }
353        };
354
355        if row_index < 3 {
356            debug!(
357                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
358                row_index, result
359            );
360        }
361        result
362    }
363
364    fn evaluate_binary_op(
365        &self,
366        left: &SqlExpression,
367        op: &str,
368        right: &SqlExpression,
369        row_index: usize,
370    ) -> Result<bool> {
371        // Only log first few rows to avoid performance impact
372        if row_index < 3 {
373            debug!(
374                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
375                row_index, op
376            );
377        }
378
379        // Handle left side - could be a column or a method call
380        let (cell_value, column_name) = match left {
381            SqlExpression::MethodCall {
382                object,
383                method,
384                args,
385            } => {
386                // Handle method calls that return values (like Length(), IndexOf())
387                match method.to_lowercase().as_str() {
388                    "length" => {
389                        if !args.is_empty() {
390                            return Err(anyhow::anyhow!("Length() takes no arguments"));
391                        }
392                        self.evaluate_length(object, row_index)?
393                    }
394                    "indexof" => {
395                        if args.len() != 1 {
396                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
397                        }
398                        let search_str = self.extract_string_value(&args[0])?;
399                        self.evaluate_indexof(object, &search_str, row_index)?
400                    }
401                    "trim" => {
402                        if !args.is_empty() {
403                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
404                        }
405                        self.evaluate_trim(object, row_index, "trim")?
406                    }
407                    "trimstart" => {
408                        if !args.is_empty() {
409                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
410                        }
411                        self.evaluate_trim(object, row_index, "trimstart")?
412                    }
413                    "trimend" => {
414                        if !args.is_empty() {
415                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
416                        }
417                        self.evaluate_trim(object, row_index, "trimend")?
418                    }
419                    _ => {
420                        return Err(anyhow::anyhow!(
421                            "Method '{}' cannot be used in comparisons",
422                            method
423                        ));
424                    }
425                }
426            }
427            SqlExpression::BinaryOp {
428                left: _expr_left,
429                op: arith_op,
430                right: _expr_right,
431            } if matches!(arith_op.as_str(), "+" | "-" | "*" | "/") => {
432                // Handle arithmetic expressions using ArithmeticEvaluator
433                let evaluator = ArithmeticEvaluator::new(self.table);
434                let computed_value = evaluator.evaluate(left, row_index)?;
435                if row_index < 3 {
436                    debug!(
437                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed arithmetic expression = {:?}",
438                        computed_value
439                    );
440                }
441                (Some(computed_value), "computed_expression".to_string())
442            }
443            SqlExpression::FunctionCall { name, .. } => {
444                // Handle function calls using ArithmeticEvaluator
445                let evaluator = ArithmeticEvaluator::new(self.table);
446                let computed_value = evaluator.evaluate(left, row_index)?;
447                if row_index < 3 {
448                    debug!(
449                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed function {} = {:?}",
450                        name, computed_value
451                    );
452                }
453                (Some(computed_value), format!("{}()", name))
454            }
455            _ => {
456                // Regular column reference
457                let column_name = self.extract_column_name(left)?;
458                if row_index < 3 {
459                    debug!(
460                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
461                        column_name
462                    );
463                }
464
465                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
466                    let suggestion = self.find_similar_column(&column_name);
467                    match suggestion {
468                        Some(similar) => anyhow!(
469                            "Column '{}' not found. Did you mean '{}'?",
470                            column_name,
471                            similar
472                        ),
473                        None => anyhow!("Column '{}' not found", column_name),
474                    }
475                })?;
476
477                let cell_value = self.table.get_value(row_index, col_index).cloned();
478                (cell_value, column_name)
479            }
480        };
481
482        if row_index < 3 {
483            debug!(
484                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
485                row_index, column_name, cell_value
486            );
487        }
488
489        // Get comparison value from right side
490        let compare_value = self.extract_value(right)?;
491
492        // Perform comparison
493        match (cell_value, op.to_uppercase().as_str(), &compare_value) {
494            (Some(DataValue::String(ref a)), "=", ExprValue::String(b)) => {
495                if row_index < 3 {
496                    debug!(
497                        "RecursiveWhereEvaluator: String comparison '{}' = '{}' (case_insensitive={})",
498                        a, b, self.case_insensitive
499                    );
500                }
501                if self.case_insensitive {
502                    Ok(a.to_lowercase() == b.to_lowercase())
503                } else {
504                    Ok(a == b)
505                }
506            }
507            (Some(DataValue::InternedString(ref a)), "=", ExprValue::String(b)) => {
508                if row_index < 3 {
509                    debug!(
510                        "RecursiveWhereEvaluator: InternedString comparison '{}' = '{}' (case_insensitive={})",
511                        a, b, self.case_insensitive
512                    );
513                }
514                if self.case_insensitive {
515                    Ok(a.to_lowercase() == b.to_lowercase())
516                } else {
517                    Ok(a.as_ref() == b)
518                }
519            }
520            (Some(DataValue::String(ref a)), "!=", ExprValue::String(b))
521            | (Some(DataValue::String(ref a)), "<>", ExprValue::String(b)) => {
522                if row_index < 3 {
523                    debug!(
524                        "RecursiveWhereEvaluator: String comparison '{}' != '{}' (case_insensitive={})",
525                        a, b, self.case_insensitive
526                    );
527                }
528                if self.case_insensitive {
529                    Ok(a.to_lowercase() != b.to_lowercase())
530                } else {
531                    Ok(a != b)
532                }
533            }
534            (Some(DataValue::InternedString(ref a)), "!=", ExprValue::String(b))
535            | (Some(DataValue::InternedString(ref a)), "<>", ExprValue::String(b)) => {
536                if row_index < 3 {
537                    debug!(
538                        "RecursiveWhereEvaluator: InternedString comparison '{}' != '{}' (case_insensitive={})",
539                        a, b, self.case_insensitive
540                    );
541                }
542                if self.case_insensitive {
543                    Ok(a.to_lowercase() != b.to_lowercase())
544                } else {
545                    Ok(a.as_ref() != b)
546                }
547            }
548            (Some(DataValue::String(ref a)), ">", ExprValue::String(b)) => {
549                if self.case_insensitive {
550                    Ok(a.to_lowercase() > b.to_lowercase())
551                } else {
552                    Ok(a > b)
553                }
554            }
555            (Some(DataValue::InternedString(ref a)), ">", ExprValue::String(b)) => {
556                if self.case_insensitive {
557                    Ok(a.to_lowercase() > b.to_lowercase())
558                } else {
559                    Ok(a.as_ref() > b)
560                }
561            }
562            (Some(DataValue::String(ref a)), ">=", ExprValue::String(b)) => {
563                if self.case_insensitive {
564                    Ok(a.to_lowercase() >= b.to_lowercase())
565                } else {
566                    Ok(a >= b)
567                }
568            }
569            (Some(DataValue::InternedString(ref a)), ">=", ExprValue::String(b)) => {
570                if self.case_insensitive {
571                    Ok(a.to_lowercase() >= b.to_lowercase())
572                } else {
573                    Ok(a.as_ref() >= b)
574                }
575            }
576            (Some(DataValue::String(ref a)), "<", ExprValue::String(b)) => {
577                if self.case_insensitive {
578                    Ok(a.to_lowercase() < b.to_lowercase())
579                } else {
580                    Ok(a < b)
581                }
582            }
583            (Some(DataValue::InternedString(ref a)), "<", ExprValue::String(b)) => {
584                if self.case_insensitive {
585                    Ok(a.to_lowercase() < b.to_lowercase())
586                } else {
587                    Ok(a.as_ref() < b)
588                }
589            }
590            (Some(DataValue::String(ref a)), "<=", ExprValue::String(b)) => {
591                if self.case_insensitive {
592                    Ok(a.to_lowercase() <= b.to_lowercase())
593                } else {
594                    Ok(a <= b)
595                }
596            }
597            (Some(DataValue::InternedString(ref a)), "<=", ExprValue::String(b)) => {
598                if self.case_insensitive {
599                    Ok(a.to_lowercase() <= b.to_lowercase())
600                } else {
601                    Ok(a.as_ref() <= b)
602                }
603            }
604
605            (Some(DataValue::Integer(a)), "=", ExprValue::Number(b)) => Ok(a as f64 == *b),
606            (Some(DataValue::Integer(a)), "!=", ExprValue::Number(b))
607            | (Some(DataValue::Integer(a)), "<>", ExprValue::Number(b)) => Ok(a as f64 != *b),
608            (Some(DataValue::Integer(a)), ">", ExprValue::Number(b)) => Ok(a as f64 > *b),
609            (Some(DataValue::Integer(a)), ">=", ExprValue::Number(b)) => Ok(a as f64 >= *b),
610            (Some(DataValue::Integer(a)), "<", ExprValue::Number(b)) => Ok((a as f64) < *b),
611            (Some(DataValue::Integer(a)), "<=", ExprValue::Number(b)) => Ok(a as f64 <= *b),
612
613            (Some(DataValue::Float(a)), "=", ExprValue::Number(b)) => {
614                Ok((a - b).abs() < f64::EPSILON)
615            }
616            (Some(DataValue::Float(a)), "!=", ExprValue::Number(b))
617            | (Some(DataValue::Float(a)), "<>", ExprValue::Number(b)) => {
618                Ok((a - b).abs() >= f64::EPSILON)
619            }
620            (Some(DataValue::Float(a)), ">", ExprValue::Number(b)) => Ok(a > *b),
621            (Some(DataValue::Float(a)), ">=", ExprValue::Number(b)) => Ok(a >= *b),
622            (Some(DataValue::Float(a)), "<", ExprValue::Number(b)) => Ok(a < *b),
623            (Some(DataValue::Float(a)), "<=", ExprValue::Number(b)) => Ok(a <= *b),
624
625            // LIKE operator
626            (Some(DataValue::String(ref text)), "LIKE", ExprValue::String(pattern)) => {
627                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
628                let regex = regex::RegexBuilder::new(&format!("^{}$", regex_pattern))
629                    .case_insensitive(true)
630                    .build()
631                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
632                Ok(regex.is_match(text))
633            }
634            (Some(DataValue::InternedString(ref text)), "LIKE", ExprValue::String(pattern)) => {
635                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
636                let regex = regex::RegexBuilder::new(&format!("^{}$", regex_pattern))
637                    .case_insensitive(true)
638                    .build()
639                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
640                Ok(regex.is_match(text.as_ref()))
641            }
642
643            // IS NULL / IS NOT NULL
644            (None, "IS", ExprValue::Null) | (Some(DataValue::Null), "IS", ExprValue::Null) => {
645                Ok(true)
646            }
647            (Some(_), "IS", ExprValue::Null) => Ok(false),
648            (None, "IS NOT", ExprValue::Null)
649            | (Some(DataValue::Null), "IS NOT", ExprValue::Null) => Ok(false),
650            (Some(_), "IS NOT", ExprValue::Null) => Ok(true),
651
652            // DateTime comparisons
653            (Some(DataValue::String(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
654                if row_index < 3 {
655                    debug!(
656                        "RecursiveWhereEvaluator: DateTime comparison '{}' {} '{}' - attempting parse",
657                        date_str,
658                        op_str,
659                        dt.format("%Y-%m-%d %H:%M:%S")
660                    );
661                }
662
663                // Try to parse the string as a datetime - first try ISO 8601 with UTC
664                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
665                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
666                    if row_index < 3 {
667                        debug!(
668                            "RecursiveWhereEvaluator: DateTime parsed as UTC: '{}' {} '{}' = {}",
669                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
670                            op_str,
671                            dt.format("%Y-%m-%d %H:%M:%S"),
672                            result
673                        );
674                    }
675                    Ok(result)
676                }
677                // Try ISO 8601 format without timezone (assume UTC)
678                else if let Ok(parsed_dt) =
679                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%dT%H:%M:%S")
680                {
681                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
682                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
683                    if row_index < 3 {
684                        debug!(
685                            "RecursiveWhereEvaluator: DateTime parsed as ISO 8601: '{}' {} '{}' = {}",
686                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
687                            op_str,
688                            dt.format("%Y-%m-%d %H:%M:%S"),
689                            result
690                        );
691                    }
692                    Ok(result)
693                }
694                // Try standard datetime format
695                else if let Ok(parsed_dt) =
696                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%d %H:%M:%S")
697                {
698                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
699                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
700                    if row_index < 3 {
701                        debug!(
702                            "RecursiveWhereEvaluator: DateTime parsed as standard format: '{}' {} '{}' = {}",
703                            parsed_utc.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
704                        );
705                    }
706                    Ok(result)
707                }
708                // Try date-only format
709                else if let Ok(parsed_date) = NaiveDate::parse_from_str(&date_str, "%Y-%m-%d") {
710                    let parsed_dt =
711                        NaiveDateTime::new(parsed_date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
712                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
713                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
714                    if row_index < 3 {
715                        debug!(
716                            "RecursiveWhereEvaluator: DateTime parsed as date-only: '{}' {} '{}' = {}",
717                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
718                            op_str,
719                            dt.format("%Y-%m-%d %H:%M:%S"),
720                            result
721                        );
722                    }
723                    Ok(result)
724                } else {
725                    if row_index < 3 {
726                        debug!(
727                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
728                            date_str
729                        );
730                    }
731                    Ok(false)
732                }
733            }
734            (Some(DataValue::InternedString(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
735                if row_index < 3 {
736                    debug!(
737                        "RecursiveWhereEvaluator: DateTime comparison (interned) '{}' {} '{}' - attempting parse",
738                        date_str,
739                        op_str,
740                        dt.format("%Y-%m-%d %H:%M:%S")
741                    );
742                }
743
744                // Try to parse the string as a datetime - first try ISO 8601 with UTC
745                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
746                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
747                    if row_index < 3 {
748                        debug!(
749                            "RecursiveWhereEvaluator: DateTime parsed as UTC: '{}' {} '{}' = {}",
750                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
751                            op_str,
752                            dt.format("%Y-%m-%d %H:%M:%S"),
753                            result
754                        );
755                    }
756                    Ok(result)
757                }
758                // Try ISO 8601 format without timezone (assume UTC)
759                else if let Ok(parsed_dt) =
760                    NaiveDateTime::parse_from_str(date_str.as_ref(), "%Y-%m-%dT%H:%M:%S")
761                {
762                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
763                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
764                    if row_index < 3 {
765                        debug!(
766                            "RecursiveWhereEvaluator: DateTime parsed as ISO 8601: '{}' {} '{}' = {}",
767                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
768                            op_str,
769                            dt.format("%Y-%m-%d %H:%M:%S"),
770                            result
771                        );
772                    }
773                    Ok(result)
774                }
775                // Try standard datetime format
776                else if let Ok(parsed_dt) =
777                    NaiveDateTime::parse_from_str(date_str.as_ref(), "%Y-%m-%d %H:%M:%S")
778                {
779                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
780                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
781                    if row_index < 3 {
782                        debug!(
783                            "RecursiveWhereEvaluator: DateTime parsed as standard format: '{}' {} '{}' = {}",
784                            parsed_utc.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
785                        );
786                    }
787                    Ok(result)
788                }
789                // Try date-only format
790                else if let Ok(parsed_date) =
791                    NaiveDate::parse_from_str(date_str.as_ref(), "%Y-%m-%d")
792                {
793                    let parsed_dt =
794                        NaiveDateTime::new(parsed_date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
795                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
796                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
797                    if row_index < 3 {
798                        debug!(
799                            "RecursiveWhereEvaluator: DateTime parsed as date-only: '{}' {} '{}' = {}",
800                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
801                            op_str,
802                            dt.format("%Y-%m-%d %H:%M:%S"),
803                            result
804                        );
805                    }
806                    Ok(result)
807                } else {
808                    if row_index < 3 {
809                        debug!(
810                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
811                            date_str
812                        );
813                    }
814                    Ok(false)
815                }
816            }
817
818            // DateTime vs DateTime comparisons (when column is already parsed as DateTime)
819            (Some(DataValue::DateTime(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
820                if row_index < 3 {
821                    debug!(
822                        "RecursiveWhereEvaluator: DateTime vs DateTime comparison '{}' {} '{}' - direct comparison",
823                        date_str, op_str, dt.format("%Y-%m-%d %H:%M:%S")
824                    );
825                }
826
827                // Parse the DataValue::DateTime string to DateTime<Utc>
828                if let Ok(parsed_dt) = date_str.parse::<DateTime<Utc>>() {
829                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
830                    if row_index < 3 {
831                        debug!(
832                            "RecursiveWhereEvaluator: DateTime vs DateTime parsed successfully: '{}' {} '{}' = {}",
833                            parsed_dt.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
834                        );
835                    }
836                    Ok(result)
837                }
838                // Try ISO 8601 format without timezone (assume UTC)
839                else if let Ok(parsed_dt) =
840                    NaiveDateTime::parse_from_str(&date_str, "%Y-%m-%dT%H:%M:%S")
841                {
842                    let parsed_utc = Utc.from_utc_datetime(&parsed_dt);
843                    let result = Self::compare_datetime(op_str, &parsed_utc, dt);
844                    if row_index < 3 {
845                        debug!(
846                            "RecursiveWhereEvaluator: DateTime vs DateTime ISO 8601: '{}' {} '{}' = {}",
847                            parsed_utc.format("%Y-%m-%d %H:%M:%S"),
848                            op_str,
849                            dt.format("%Y-%m-%d %H:%M:%S"),
850                            result
851                        );
852                    }
853                    Ok(result)
854                } else {
855                    if row_index < 3 {
856                        debug!(
857                            "RecursiveWhereEvaluator: DateTime vs DateTime parse FAILED for '{}' - no matching format",
858                            date_str
859                        );
860                    }
861                    Ok(false)
862                }
863            }
864
865            _ => Ok(false),
866        }
867    }
868
869    fn evaluate_in_list(
870        &self,
871        expr: &SqlExpression,
872        values: &[SqlExpression],
873        row_index: usize,
874        _ignore_case: bool,
875    ) -> Result<bool> {
876        let column_name = self.extract_column_name(expr)?;
877        let col_index = self
878            .table
879            .get_column_index(&column_name)
880            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
881
882        let cell_value = self.table.get_value(row_index, col_index).cloned();
883
884        for value_expr in values {
885            let compare_value = self.extract_value(value_expr)?;
886            let matches = match (cell_value.as_ref(), &compare_value) {
887                (Some(DataValue::String(a)), ExprValue::String(b)) => {
888                    if self.case_insensitive {
889                        if row_index < 3 {
890                            debug!("RecursiveWhereEvaluator: IN list string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
891                        }
892                        a.to_lowercase() == b.to_lowercase()
893                    } else {
894                        a == b
895                    }
896                }
897                (Some(DataValue::InternedString(a)), ExprValue::String(b)) => {
898                    if self.case_insensitive {
899                        if row_index < 3 {
900                            debug!("RecursiveWhereEvaluator: IN list interned string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
901                        }
902                        a.to_lowercase() == b.to_lowercase()
903                    } else {
904                        a.as_ref() == b
905                    }
906                }
907                (Some(DataValue::Integer(a)), ExprValue::Number(b)) => *a as f64 == *b,
908                (Some(DataValue::Float(a)), ExprValue::Number(b)) => (*a - b).abs() < f64::EPSILON,
909                _ => false,
910            };
911
912            if matches {
913                return Ok(true);
914            }
915        }
916
917        Ok(false)
918    }
919
920    fn evaluate_between(
921        &self,
922        expr: &SqlExpression,
923        lower: &SqlExpression,
924        upper: &SqlExpression,
925        row_index: usize,
926    ) -> Result<bool> {
927        let column_name = self.extract_column_name(expr)?;
928        let col_index = self
929            .table
930            .get_column_index(&column_name)
931            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
932
933        let cell_value = self.table.get_value(row_index, col_index).cloned();
934        let lower_value = self.extract_value(lower)?;
935        let upper_value = self.extract_value(upper)?;
936
937        match (cell_value, &lower_value, &upper_value) {
938            (Some(DataValue::Integer(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
939                Ok(n as f64 >= *l && n as f64 <= *u)
940            }
941            (Some(DataValue::Float(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
942                Ok(n >= *l && n <= *u)
943            }
944            (Some(DataValue::String(ref s)), ExprValue::String(l), ExprValue::String(u)) => {
945                Ok(s >= l && s <= u)
946            }
947            (
948                Some(DataValue::InternedString(ref s)),
949                ExprValue::String(l),
950                ExprValue::String(u),
951            ) => Ok(s.as_ref() >= l && s.as_ref() <= u),
952            _ => Ok(false),
953        }
954    }
955
956    fn evaluate_method_call(
957        &self,
958        object: &str,
959        method: &str,
960        args: &[SqlExpression],
961        row_index: usize,
962    ) -> Result<bool> {
963        if row_index < 3 {
964            debug!(
965                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
966                object, method, row_index
967            );
968        }
969
970        // Get column value
971        let col_index = self.table.get_column_index(object).ok_or_else(|| {
972            let suggestion = self.find_similar_column(object);
973            match suggestion {
974                Some(similar) => {
975                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
976                }
977                None => anyhow!("Column '{}' not found", object),
978            }
979        })?;
980
981        let cell_value = self.table.get_value(row_index, col_index).cloned();
982        if row_index < 3 {
983            debug!(
984                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
985                row_index, object, cell_value
986            );
987        }
988
989        match method.to_lowercase().as_str() {
990            "contains" => {
991                if args.len() != 1 {
992                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
993                }
994                let search_str = self.extract_string_value(&args[0])?;
995                // Pre-compute lowercase once instead of for every row
996                let search_lower = search_str.to_lowercase();
997
998                // Type coercion: convert numeric values to strings for string methods
999                match cell_value {
1000                    Some(DataValue::String(ref s)) => {
1001                        let result = s.to_lowercase().contains(&search_lower);
1002                        // Only log first few rows to avoid performance impact
1003                        if row_index < 3 {
1004                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1005                        }
1006                        Ok(result)
1007                    }
1008                    Some(DataValue::InternedString(ref s)) => {
1009                        let result = s.to_lowercase().contains(&search_lower);
1010                        // Only log first few rows to avoid performance impact
1011                        if row_index < 3 {
1012                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1013                        }
1014                        Ok(result)
1015                    }
1016                    Some(DataValue::Integer(n)) => {
1017                        let str_val = n.to_string();
1018                        let result = str_val.contains(&search_str);
1019                        if row_index < 3 {
1020                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
1021                        }
1022                        Ok(result)
1023                    }
1024                    Some(DataValue::Float(f)) => {
1025                        let str_val = f.to_string();
1026                        let result = str_val.contains(&search_str);
1027                        if row_index < 3 {
1028                            debug!(
1029                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
1030                                row_index, search_str, str_val, result
1031                            );
1032                        }
1033                        Ok(result)
1034                    }
1035                    Some(DataValue::Boolean(b)) => {
1036                        let str_val = b.to_string();
1037                        let result = str_val.contains(&search_str);
1038                        if row_index < 3 {
1039                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
1040                        }
1041                        Ok(result)
1042                    }
1043                    Some(DataValue::DateTime(dt)) => {
1044                        // DateTime columns can use string methods via coercion
1045                        let result = dt.contains(&search_str);
1046                        if row_index < 3 {
1047                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
1048                        }
1049                        Ok(result)
1050                    }
1051                    _ => {
1052                        if row_index < 3 {
1053                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
1054                        }
1055                        Ok(false)
1056                    }
1057                }
1058            }
1059            "startswith" => {
1060                if args.len() != 1 {
1061                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
1062                }
1063                let prefix = self.extract_string_value(&args[0])?;
1064
1065                // Type coercion: convert numeric values to strings for string methods
1066                match cell_value {
1067                    Some(DataValue::String(ref s)) => {
1068                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1069                    }
1070                    Some(DataValue::InternedString(ref s)) => {
1071                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1072                    }
1073                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
1074                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
1075                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
1076                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
1077                    _ => Ok(false),
1078                }
1079            }
1080            "endswith" => {
1081                if args.len() != 1 {
1082                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
1083                }
1084                let suffix = self.extract_string_value(&args[0])?;
1085
1086                // Type coercion: convert numeric values to strings for string methods
1087                match cell_value {
1088                    Some(DataValue::String(ref s)) => {
1089                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1090                    }
1091                    Some(DataValue::InternedString(ref s)) => {
1092                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1093                    }
1094                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
1095                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
1096                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
1097                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
1098                    _ => Ok(false),
1099                }
1100            }
1101            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
1102        }
1103    }
1104
1105    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
1106        match expr {
1107            SqlExpression::Column(name) => Ok(name.clone()),
1108            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
1109        }
1110    }
1111
1112    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
1113        match expr {
1114            SqlExpression::StringLiteral(s) => Ok(s.clone()),
1115            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
1116        }
1117    }
1118
1119    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
1120        match expr {
1121            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
1122            SqlExpression::NumberLiteral(n) => {
1123                if let Ok(num) = n.parse::<f64>() {
1124                    Ok(ExprValue::Number(num))
1125                } else {
1126                    Ok(ExprValue::String(n.clone()))
1127                }
1128            }
1129            SqlExpression::DateTimeConstructor {
1130                year,
1131                month,
1132                day,
1133                hour,
1134                minute,
1135                second,
1136            } => {
1137                // Create a DateTime from the constructor
1138                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
1139                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1140                let naive_time = NaiveTime::from_hms_opt(
1141                    hour.unwrap_or(0),
1142                    minute.unwrap_or(0),
1143                    second.unwrap_or(0),
1144                )
1145                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1146                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
1147                let datetime = Utc.from_utc_datetime(&naive_datetime);
1148                Ok(ExprValue::DateTime(datetime))
1149            }
1150            SqlExpression::DateTimeToday {
1151                hour,
1152                minute,
1153                second,
1154            } => {
1155                // Get today's date with optional time
1156                let today = Local::now().date_naive();
1157                let time = NaiveTime::from_hms_opt(
1158                    hour.unwrap_or(0),
1159                    minute.unwrap_or(0),
1160                    second.unwrap_or(0),
1161                )
1162                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1163                let naive_datetime = NaiveDateTime::new(today, time);
1164                let datetime = Utc.from_utc_datetime(&naive_datetime);
1165                Ok(ExprValue::DateTime(datetime))
1166            }
1167            _ => Ok(ExprValue::Null),
1168        }
1169    }
1170
1171    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
1172    fn evaluate_case_expression_as_bool(
1173        &self,
1174        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1175        else_branch: &Option<Box<SqlExpression>>,
1176        row_index: usize,
1177    ) -> Result<bool> {
1178        debug!(
1179            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1180            row_index
1181        );
1182
1183        // Evaluate each WHEN condition in order
1184        for branch in when_branches {
1185            // Evaluate the condition as a boolean
1186            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1187
1188            if condition_result {
1189                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1190                // Evaluate the result and convert to boolean
1191                return self.evaluate_expression_as_bool(&branch.result, row_index);
1192            }
1193        }
1194
1195        // If no WHEN condition matched, evaluate ELSE clause (or return false)
1196        match else_branch {
1197            Some(else_expr) => {
1198                debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1199                self.evaluate_expression_as_bool(else_expr, row_index)
1200            }
1201            None => {
1202                debug!("CASE: No WHEN matched and no ELSE, returning false");
1203                Ok(false)
1204            }
1205        }
1206    }
1207
1208    /// Helper method to evaluate any expression as a boolean
1209    fn evaluate_expression_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
1210        match expr {
1211            // For expressions that naturally return booleans, use the existing evaluator
1212            SqlExpression::BinaryOp { .. }
1213            | SqlExpression::InList { .. }
1214            | SqlExpression::NotInList { .. }
1215            | SqlExpression::Between { .. }
1216            | SqlExpression::Not { .. }
1217            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1218            // For CASE expressions, recurse
1219            SqlExpression::CaseExpression {
1220                when_branches,
1221                else_branch,
1222            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1223            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
1224            _ => {
1225                // Use ArithmeticEvaluator to get the value, then convert to boolean
1226                let evaluator =
1227                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
1228                let value = evaluator.evaluate(expr, row_index)?;
1229
1230                match value {
1231                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1232                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1233                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1234                    crate::data::datatable::DataValue::Null => Ok(false),
1235                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1236                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1237                    _ => Ok(true), // Other types are considered truthy
1238                }
1239            }
1240        }
1241    }
1242}
1243
1244enum ExprValue {
1245    String(String),
1246    Number(f64),
1247    DateTime(DateTime<Utc>),
1248    Null,
1249}