sql_cli/data/
recursive_where_evaluator.rs

1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::data::value_comparisons::compare_with_op;
4use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
5use anyhow::{anyhow, Result};
6use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
7use tracing::debug;
8
9/// Evaluates WHERE clauses from `recursive_parser` directly against `DataTable`
10pub struct RecursiveWhereEvaluator<'a> {
11    table: &'a DataTable,
12    case_insensitive: bool,
13}
14
15impl<'a> RecursiveWhereEvaluator<'a> {
16    #[must_use]
17    pub fn new(table: &'a DataTable) -> Self {
18        Self {
19            table,
20            case_insensitive: false,
21        }
22    }
23
24    /// Find a column name similar to the given name using edit distance
25    fn find_similar_column(&self, name: &str) -> Option<String> {
26        let columns = self.table.column_names();
27        let mut best_match: Option<(String, usize)> = None;
28
29        for col in columns {
30            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
31            // Only suggest if distance is small (likely a typo)
32            // Allow up to 3 edits for longer names
33            let max_distance = if name.len() > 10 { 3 } else { 2 };
34            if distance <= max_distance {
35                match &best_match {
36                    None => best_match = Some((col, distance)),
37                    Some((_, best_dist)) if distance < *best_dist => {
38                        best_match = Some((col, distance));
39                    }
40                    _ => {}
41                }
42            }
43        }
44
45        best_match.map(|(name, _)| name)
46    }
47
48    /// Calculate Levenshtein edit distance between two strings
49    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
50        let len1 = s1.len();
51        let len2 = s2.len();
52        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
53
54        for i in 0..=len1 {
55            matrix[i][0] = i;
56        }
57        for j in 0..=len2 {
58            matrix[0][j] = j;
59        }
60
61        for (i, c1) in s1.chars().enumerate() {
62            for (j, c2) in s2.chars().enumerate() {
63                let cost = usize::from(c1 != c2);
64                matrix[i + 1][j + 1] = std::cmp::min(
65                    matrix[i][j + 1] + 1, // deletion
66                    std::cmp::min(
67                        matrix[i + 1][j] + 1, // insertion
68                        matrix[i][j] + cost,  // substitution
69                    ),
70                );
71            }
72        }
73
74        matrix[len1][len2]
75    }
76
77    #[must_use]
78    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
79        Self {
80            table,
81            case_insensitive,
82        }
83    }
84
85    #[must_use]
86    pub fn with_config(
87        table: &'a DataTable,
88        case_insensitive: bool,
89        _date_notation: String, // No longer needed since we use centralized parse_datetime
90    ) -> Self {
91        Self {
92            table,
93            case_insensitive,
94        }
95    }
96
97    /// Convert ExprValue to DataValue for centralized comparison
98    fn expr_value_to_data_value(&self, expr_value: &ExprValue) -> DataValue {
99        match expr_value {
100            ExprValue::String(s) => DataValue::String(s.clone()),
101            ExprValue::Number(n) => {
102                // Check if it's an integer or float
103                if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
104                    DataValue::Integer(*n as i64)
105                } else {
106                    DataValue::Float(*n)
107                }
108            }
109            ExprValue::Boolean(b) => DataValue::Boolean(*b),
110            ExprValue::DateTime(dt) => {
111                DataValue::DateTime(dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string())
112            }
113            ExprValue::Null => DataValue::Null,
114        }
115    }
116
117    /// Evaluate the `Length()` method on a column value
118    fn evaluate_length(
119        &self,
120        object: &str,
121        row_index: usize,
122    ) -> Result<(Option<DataValue>, String)> {
123        let col_index = self.table.get_column_index(object).ok_or_else(|| {
124            let suggestion = self.find_similar_column(object);
125            match suggestion {
126                Some(similar) => {
127                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
128                }
129                None => anyhow!("Column '{}' not found", object),
130            }
131        })?;
132
133        let value = self.table.get_value(row_index, col_index);
134        let length_value = match value {
135            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
136            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
137            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
138            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
139            _ => Some(DataValue::Integer(0)),
140        };
141        Ok((length_value, format!("{object}.Length()")))
142    }
143
144    /// Evaluate the `IndexOf()` method on a column value
145    fn evaluate_indexof(
146        &self,
147        object: &str,
148        search_str: &str,
149        row_index: usize,
150    ) -> Result<(Option<DataValue>, String)> {
151        let col_index = self.table.get_column_index(object).ok_or_else(|| {
152            let suggestion = self.find_similar_column(object);
153            match suggestion {
154                Some(similar) => {
155                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
156                }
157                None => anyhow!("Column '{}' not found", object),
158            }
159        })?;
160
161        let value = self.table.get_value(row_index, col_index);
162        let index_value = match value {
163            Some(DataValue::String(s)) => {
164                // Case-insensitive search by default, following Contains behavior
165                let pos = s
166                    .to_lowercase()
167                    .find(&search_str.to_lowercase())
168                    .map_or(-1, |idx| idx as i64);
169                Some(DataValue::Integer(pos))
170            }
171            Some(DataValue::InternedString(s)) => {
172                let pos = s
173                    .to_lowercase()
174                    .find(&search_str.to_lowercase())
175                    .map_or(-1, |idx| idx as i64);
176                Some(DataValue::Integer(pos))
177            }
178            Some(DataValue::Integer(n)) => {
179                let str_val = n.to_string();
180                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
181                Some(DataValue::Integer(pos))
182            }
183            Some(DataValue::Float(f)) => {
184                let str_val = f.to_string();
185                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
186                Some(DataValue::Integer(pos))
187            }
188            _ => Some(DataValue::Integer(-1)), // Return -1 for not found
189        };
190
191        if row_index < 3 {
192            debug!(
193                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
194                row_index, search_str, index_value
195            );
196        }
197        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
198    }
199
200    /// Apply the appropriate trim operation based on `trim_type`
201    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
202        match trim_type {
203            "trim" => s.trim(),
204            "trimstart" => s.trim_start(),
205            "trimend" => s.trim_end(),
206            _ => s,
207        }
208    }
209
210    /// Evaluate trim methods (Trim, `TrimStart`, `TrimEnd`) on a column value
211    fn evaluate_trim(
212        &self,
213        object: &str,
214        row_index: usize,
215        trim_type: &str,
216    ) -> Result<(Option<DataValue>, String)> {
217        let col_index = self.table.get_column_index(object).ok_or_else(|| {
218            let suggestion = self.find_similar_column(object);
219            match suggestion {
220                Some(similar) => {
221                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
222                }
223                None => anyhow!("Column '{}' not found", object),
224            }
225        })?;
226
227        let value = self.table.get_value(row_index, col_index);
228        let trimmed_value = match value {
229            Some(DataValue::String(s)) => Some(DataValue::String(
230                Self::apply_trim(s, trim_type).to_string(),
231            )),
232            Some(DataValue::InternedString(s)) => Some(DataValue::String(
233                Self::apply_trim(s, trim_type).to_string(),
234            )),
235            Some(DataValue::Integer(n)) => {
236                let str_val = n.to_string();
237                Some(DataValue::String(
238                    Self::apply_trim(&str_val, trim_type).to_string(),
239                ))
240            }
241            Some(DataValue::Float(f)) => {
242                let str_val = f.to_string();
243                Some(DataValue::String(
244                    Self::apply_trim(&str_val, trim_type).to_string(),
245                ))
246            }
247            _ => Some(DataValue::String(String::new())),
248        };
249
250        let method_name = match trim_type {
251            "trim" => "Trim",
252            "trimstart" => "TrimStart",
253            "trimend" => "TrimEnd",
254            _ => "Trim",
255        };
256        Ok((trimmed_value, format!("{object}.{method_name}()")))
257    }
258
259    /// Evaluate a WHERE clause for a specific row
260    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
261        // Only log for first few rows to avoid performance impact
262        if row_index < 3 {
263            debug!(
264                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
265                row_index,
266                where_clause.conditions.len(),
267                self.case_insensitive
268            );
269        }
270
271        if where_clause.conditions.is_empty() {
272            if row_index < 3 {
273                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
274            }
275            return Ok(true);
276        }
277
278        // With the new expression tree structure, we should have a single condition
279        // containing the entire WHERE clause expression tree
280        if where_clause.conditions.len() == 1 {
281            // New structure: single expression tree
282            if row_index < 3 {
283                debug!(
284                    "RecursiveWhereEvaluator: evaluate() - evaluating expression tree for row {}",
285                    row_index
286                );
287            }
288            self.evaluate_condition(&where_clause.conditions[0], row_index)
289        } else {
290            // Legacy structure: multiple conditions with connectors
291            // This path is kept for backward compatibility
292            if row_index < 3 {
293                debug!(
294                    "RecursiveWhereEvaluator: evaluate() - evaluating {} conditions with connectors for row {}",
295                    where_clause.conditions.len(),
296                    row_index
297                );
298            }
299            let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
300
301            // Apply connectors (AND/OR) with subsequent conditions
302            for i in 1..where_clause.conditions.len() {
303                let next_result =
304                    self.evaluate_condition(&where_clause.conditions[i], row_index)?;
305
306                // Use the connector from the previous condition
307                if let Some(connector) = &where_clause.conditions[i - 1].connector {
308                    result = match connector {
309                        LogicalOp::And => result && next_result,
310                        LogicalOp::Or => result || next_result,
311                    };
312                }
313            }
314
315            Ok(result)
316        }
317    }
318
319    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
320        // Only log first few rows to avoid performance impact
321        if row_index < 3 {
322            debug!(
323                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
324                row_index
325            );
326        }
327        let result = self.evaluate_expression(&condition.expr, row_index);
328        if row_index < 3 {
329            debug!(
330                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
331                row_index, result
332            );
333        }
334        result
335    }
336
337    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
338        // Only log first few rows to avoid performance impact
339        if row_index < 3 {
340            debug!(
341                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
342                row_index, expr
343            );
344        }
345
346        let result = match expr {
347            SqlExpression::BinaryOp { left, op, right } => {
348                self.evaluate_binary_op(left, op, right, row_index)
349            }
350            SqlExpression::InList { expr, values } => {
351                self.evaluate_in_list(expr, values, row_index, false)
352            }
353            SqlExpression::NotInList { expr, values } => {
354                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
355                Ok(!in_result)
356            }
357            SqlExpression::Between { expr, lower, upper } => {
358                self.evaluate_between(expr, lower, upper, row_index)
359            }
360            SqlExpression::Not { expr } => {
361                let inner_result = self.evaluate_expression(expr, row_index)?;
362                Ok(!inner_result)
363            }
364            SqlExpression::MethodCall {
365                object,
366                method,
367                args,
368            } => {
369                if row_index < 3 {
370                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
371                }
372                self.evaluate_method_call(object, method, args, row_index)
373            }
374            SqlExpression::CaseExpression {
375                when_branches,
376                else_branch,
377            } => {
378                if row_index < 3 {
379                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
380                }
381                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
382            }
383            _ => {
384                if row_index < 3 {
385                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
386                }
387                Ok(false) // Default to false for unsupported expressions
388            }
389        };
390
391        if row_index < 3 {
392            debug!(
393                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
394                row_index, result
395            );
396        }
397        result
398    }
399
400    fn evaluate_binary_op(
401        &self,
402        left: &SqlExpression,
403        op: &str,
404        right: &SqlExpression,
405        row_index: usize,
406    ) -> Result<bool> {
407        // Only log first few rows to avoid performance impact
408        if row_index < 3 {
409            debug!(
410                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
411                row_index, op
412            );
413        }
414
415        // Handle logical operators (AND, OR) specially
416        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
417            let left_result = self.evaluate_expression(left, row_index)?;
418            let right_result = self.evaluate_expression(right, row_index)?;
419
420            return Ok(match op.to_uppercase().as_str() {
421                "OR" => left_result || right_result,
422                "AND" => left_result && right_result,
423                _ => unreachable!(),
424            });
425        }
426
427        // For complex expressions (arithmetic, functions), use ArithmeticEvaluator
428        if matches!(left, SqlExpression::BinaryOp { .. })
429            || matches!(left, SqlExpression::FunctionCall { .. })
430            || matches!(right, SqlExpression::BinaryOp { .. })
431            || matches!(right, SqlExpression::FunctionCall { .. })
432        {
433            let comparison_expr = SqlExpression::BinaryOp {
434                left: Box::new(left.clone()),
435                op: op.to_string(),
436                right: Box::new(right.clone()),
437            };
438
439            let mut evaluator = ArithmeticEvaluator::new(self.table);
440            let result = evaluator.evaluate(&comparison_expr, row_index)?;
441
442            // Convert the result to boolean
443            return match result {
444                DataValue::Boolean(b) => Ok(b),
445                DataValue::Null => Ok(false),
446                _ => Err(anyhow!("Comparison did not return a boolean value")),
447            };
448        }
449
450        // For simple comparisons, use the original WHERE clause logic with improved date parsing
451        // Handle left side - could be a column or a method call
452        let (cell_value, column_name) = match left {
453            SqlExpression::MethodCall {
454                object,
455                method,
456                args,
457            } => {
458                // Handle method calls that return values (like Length(), IndexOf())
459                match method.to_lowercase().as_str() {
460                    "length" => {
461                        if !args.is_empty() {
462                            return Err(anyhow::anyhow!("Length() takes no arguments"));
463                        }
464                        self.evaluate_length(object, row_index)?
465                    }
466                    "indexof" => {
467                        if args.len() != 1 {
468                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
469                        }
470                        let search_str = self.extract_string_value(&args[0])?;
471                        self.evaluate_indexof(object, &search_str, row_index)?
472                    }
473                    "trim" => {
474                        if !args.is_empty() {
475                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
476                        }
477                        self.evaluate_trim(object, row_index, "trim")?
478                    }
479                    "trimstart" => {
480                        if !args.is_empty() {
481                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
482                        }
483                        self.evaluate_trim(object, row_index, "trimstart")?
484                    }
485                    "trimend" => {
486                        if !args.is_empty() {
487                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
488                        }
489                        self.evaluate_trim(object, row_index, "trimend")?
490                    }
491                    _ => {
492                        return Err(anyhow::anyhow!(
493                            "Method '{}' cannot be used in comparisons",
494                            method
495                        ));
496                    }
497                }
498            }
499            _ => {
500                // Regular column reference
501                let column_name = self.extract_column_name(left)?;
502                if row_index < 3 {
503                    debug!(
504                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
505                        column_name
506                    );
507                }
508
509                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
510                    let suggestion = self.find_similar_column(&column_name);
511                    match suggestion {
512                        Some(similar) => anyhow!(
513                            "Column '{}' not found. Did you mean '{}'?",
514                            column_name,
515                            similar
516                        ),
517                        None => anyhow!("Column '{}' not found", column_name),
518                    }
519                })?;
520
521                let cell_value = self.table.get_value(row_index, col_index).cloned();
522                (cell_value, column_name)
523            }
524        };
525
526        if row_index < 3 {
527            debug!(
528                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
529                row_index, column_name, cell_value
530            );
531        }
532
533        // Get comparison value from right side
534        let compare_value = self.extract_value(right)?;
535
536        // Handle special operators that aren't standard comparisons
537        let op_upper = op.to_uppercase();
538        match op_upper.as_str() {
539            // LIKE operator - handle specially
540            "LIKE" => {
541                let table_value = cell_value.unwrap_or(DataValue::Null);
542                let pattern = match compare_value {
543                    ExprValue::String(s) => s,
544                    _ => return Ok(false),
545                };
546
547                let text = match &table_value {
548                    DataValue::String(s) => s.as_str(),
549                    DataValue::InternedString(s) => s.as_str(),
550                    _ => return Ok(false),
551                };
552
553                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
554                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
555                    .case_insensitive(true)
556                    .build()
557                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
558                Ok(regex.is_match(text))
559            }
560
561            // IS NULL / IS NOT NULL
562            "IS NULL" => Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null))),
563            "IS NOT NULL" => {
564                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
565            }
566
567            // Handle IS / IS NOT with NULL explicitly
568            "IS" if matches!(compare_value, ExprValue::Null) => {
569                Ok(cell_value.is_none() || matches!(cell_value, Some(DataValue::Null)))
570            }
571            "IS NOT" if matches!(compare_value, ExprValue::Null) => {
572                Ok(cell_value.is_some() && !matches!(cell_value, Some(DataValue::Null)))
573            }
574
575            // Standard comparison operators - use centralized logic
576            _ => {
577                let table_value = cell_value.unwrap_or(DataValue::Null);
578                let comparison_value = self.expr_value_to_data_value(&compare_value);
579
580                if row_index < 3 {
581                    debug!(
582                        "RecursiveWhereEvaluator: Using centralized comparison - table: {:?}, op: '{}', comparison: {:?}, case_insensitive: {}",
583                        table_value, op, comparison_value, self.case_insensitive
584                    );
585                }
586
587                Ok(compare_with_op(
588                    &table_value,
589                    &comparison_value,
590                    op,
591                    self.case_insensitive,
592                ))
593            }
594        }
595    }
596
597    fn evaluate_in_list(
598        &self,
599        expr: &SqlExpression,
600        values: &[SqlExpression],
601        row_index: usize,
602        _ignore_case: bool,
603    ) -> Result<bool> {
604        let column_name = self.extract_column_name(expr)?;
605        let col_index = self
606            .table
607            .get_column_index(&column_name)
608            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
609
610        let cell_value = self.table.get_value(row_index, col_index).cloned();
611
612        for value_expr in values {
613            let compare_value = self.extract_value(value_expr)?;
614            let table_value = cell_value.as_ref().unwrap_or(&DataValue::Null);
615            let comparison_value = self.expr_value_to_data_value(&compare_value);
616
617            // Use centralized comparison for equality
618            if compare_with_op(table_value, &comparison_value, "=", self.case_insensitive) {
619                return Ok(true);
620            }
621        }
622
623        Ok(false)
624    }
625
626    fn evaluate_between(
627        &self,
628        expr: &SqlExpression,
629        lower: &SqlExpression,
630        upper: &SqlExpression,
631        row_index: usize,
632    ) -> Result<bool> {
633        let column_name = self.extract_column_name(expr)?;
634        let col_index = self
635            .table
636            .get_column_index(&column_name)
637            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
638
639        let cell_value = self.table.get_value(row_index, col_index).cloned();
640        let lower_value = self.extract_value(lower)?;
641        let upper_value = self.extract_value(upper)?;
642
643        let table_value = cell_value.unwrap_or(DataValue::Null);
644        let lower_data_value = self.expr_value_to_data_value(&lower_value);
645        let upper_data_value = self.expr_value_to_data_value(&upper_value);
646
647        // Use centralized comparison for BETWEEN: value >= lower AND value <= upper
648        let ge_lower =
649            compare_with_op(&table_value, &lower_data_value, ">=", self.case_insensitive);
650        let le_upper =
651            compare_with_op(&table_value, &upper_data_value, "<=", self.case_insensitive);
652
653        Ok(ge_lower && le_upper)
654    }
655
656    fn evaluate_method_call(
657        &self,
658        object: &str,
659        method: &str,
660        args: &[SqlExpression],
661        row_index: usize,
662    ) -> Result<bool> {
663        if row_index < 3 {
664            debug!(
665                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
666                object, method, row_index
667            );
668        }
669
670        // Get column value
671        let col_index = self.table.get_column_index(object).ok_or_else(|| {
672            let suggestion = self.find_similar_column(object);
673            match suggestion {
674                Some(similar) => {
675                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
676                }
677                None => anyhow!("Column '{}' not found", object),
678            }
679        })?;
680
681        let cell_value = self.table.get_value(row_index, col_index).cloned();
682        if row_index < 3 {
683            debug!(
684                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
685                row_index, object, cell_value
686            );
687        }
688
689        match method.to_lowercase().as_str() {
690            "contains" => {
691                if args.len() != 1 {
692                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
693                }
694                let search_str = self.extract_string_value(&args[0])?;
695                // Pre-compute lowercase once instead of for every row
696                let search_lower = search_str.to_lowercase();
697
698                // Type coercion: convert numeric values to strings for string methods
699                match cell_value {
700                    Some(DataValue::String(ref s)) => {
701                        let result = s.to_lowercase().contains(&search_lower);
702                        // Only log first few rows to avoid performance impact
703                        if row_index < 3 {
704                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
705                        }
706                        Ok(result)
707                    }
708                    Some(DataValue::InternedString(ref s)) => {
709                        let result = s.to_lowercase().contains(&search_lower);
710                        // Only log first few rows to avoid performance impact
711                        if row_index < 3 {
712                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
713                        }
714                        Ok(result)
715                    }
716                    Some(DataValue::Integer(n)) => {
717                        let str_val = n.to_string();
718                        let result = str_val.contains(&search_str);
719                        if row_index < 3 {
720                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
721                        }
722                        Ok(result)
723                    }
724                    Some(DataValue::Float(f)) => {
725                        let str_val = f.to_string();
726                        let result = str_val.contains(&search_str);
727                        if row_index < 3 {
728                            debug!(
729                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
730                                row_index, search_str, str_val, result
731                            );
732                        }
733                        Ok(result)
734                    }
735                    Some(DataValue::Boolean(b)) => {
736                        let str_val = b.to_string();
737                        let result = str_val.contains(&search_str);
738                        if row_index < 3 {
739                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
740                        }
741                        Ok(result)
742                    }
743                    Some(DataValue::DateTime(dt)) => {
744                        // DateTime columns can use string methods via coercion
745                        let result = dt.contains(&search_str);
746                        if row_index < 3 {
747                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
748                        }
749                        Ok(result)
750                    }
751                    _ => {
752                        if row_index < 3 {
753                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
754                        }
755                        Ok(false)
756                    }
757                }
758            }
759            "startswith" => {
760                if args.len() != 1 {
761                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
762                }
763                let prefix = self.extract_string_value(&args[0])?;
764
765                // Type coercion: convert numeric values to strings for string methods
766                match cell_value {
767                    Some(DataValue::String(ref s)) => {
768                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
769                    }
770                    Some(DataValue::InternedString(ref s)) => {
771                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
772                    }
773                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
774                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
775                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
776                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
777                    _ => Ok(false),
778                }
779            }
780            "endswith" => {
781                if args.len() != 1 {
782                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
783                }
784                let suffix = self.extract_string_value(&args[0])?;
785
786                // Type coercion: convert numeric values to strings for string methods
787                match cell_value {
788                    Some(DataValue::String(ref s)) => {
789                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
790                    }
791                    Some(DataValue::InternedString(ref s)) => {
792                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
793                    }
794                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
795                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
796                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
797                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
798                    _ => Ok(false),
799                }
800            }
801            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
802        }
803    }
804
805    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
806        match expr {
807            SqlExpression::Column(name) => Ok(name.clone()),
808            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
809        }
810    }
811
812    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
813        match expr {
814            SqlExpression::StringLiteral(s) => Ok(s.clone()),
815            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
816        }
817    }
818
819    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
820        match expr {
821            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
822            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
823            SqlExpression::NumberLiteral(n) => {
824                if let Ok(num) = n.parse::<f64>() {
825                    Ok(ExprValue::Number(num))
826                } else {
827                    Ok(ExprValue::String(n.clone()))
828                }
829            }
830            SqlExpression::DateTimeConstructor {
831                year,
832                month,
833                day,
834                hour,
835                minute,
836                second,
837            } => {
838                // Create a DateTime from the constructor
839                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
840                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
841                let naive_time = NaiveTime::from_hms_opt(
842                    hour.unwrap_or(0),
843                    minute.unwrap_or(0),
844                    second.unwrap_or(0),
845                )
846                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
847                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
848                let datetime = Utc.from_utc_datetime(&naive_datetime);
849                Ok(ExprValue::DateTime(datetime))
850            }
851            SqlExpression::DateTimeToday {
852                hour,
853                minute,
854                second,
855            } => {
856                // Get today's date with optional time
857                let today = Local::now().date_naive();
858                let time = NaiveTime::from_hms_opt(
859                    hour.unwrap_or(0),
860                    minute.unwrap_or(0),
861                    second.unwrap_or(0),
862                )
863                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
864                let naive_datetime = NaiveDateTime::new(today, time);
865                let datetime = Utc.from_utc_datetime(&naive_datetime);
866                Ok(ExprValue::DateTime(datetime))
867            }
868            _ => Ok(ExprValue::Null),
869        }
870    }
871
872    /// Evaluate a CASE expression as a boolean (for WHERE clauses)
873    fn evaluate_case_expression_as_bool(
874        &self,
875        when_branches: &[crate::sql::recursive_parser::WhenBranch],
876        else_branch: &Option<Box<SqlExpression>>,
877        row_index: usize,
878    ) -> Result<bool> {
879        debug!(
880            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
881            row_index
882        );
883
884        // Evaluate each WHEN condition in order
885        for branch in when_branches {
886            // Evaluate the condition as a boolean
887            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
888
889            if condition_result {
890                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
891                // Evaluate the result and convert to boolean
892                return self.evaluate_expression_as_bool(&branch.result, row_index);
893            }
894        }
895
896        // If no WHEN condition matched, evaluate ELSE clause (or return false)
897        if let Some(else_expr) = else_branch {
898            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
899            self.evaluate_expression_as_bool(else_expr, row_index)
900        } else {
901            debug!("CASE: No WHEN matched and no ELSE, returning false");
902            Ok(false)
903        }
904    }
905
906    /// Helper method to evaluate any expression as a boolean
907    fn evaluate_expression_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
908        match expr {
909            // For expressions that naturally return booleans, use the existing evaluator
910            SqlExpression::BinaryOp { .. }
911            | SqlExpression::InList { .. }
912            | SqlExpression::NotInList { .. }
913            | SqlExpression::Between { .. }
914            | SqlExpression::Not { .. }
915            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
916            // For CASE expressions, recurse
917            SqlExpression::CaseExpression {
918                when_branches,
919                else_branch,
920            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
921            // For other expressions (columns, literals), use ArithmeticEvaluator and convert
922            _ => {
923                // Use ArithmeticEvaluator to get the value, then convert to boolean
924                let mut evaluator =
925                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::new(self.table);
926                let value = evaluator.evaluate(expr, row_index)?;
927
928                match value {
929                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
930                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
931                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
932                    crate::data::datatable::DataValue::Null => Ok(false),
933                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
934                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
935                    _ => Ok(true), // Other types are considered truthy
936                }
937            }
938        }
939    }
940}
941
942enum ExprValue {
943    String(String),
944    Number(f64),
945    Boolean(bool),
946    DateTime(DateTime<Utc>),
947    Null,
948}