1use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
2use crate::data::datatable::{DataTable, DataValue};
3use crate::sql::recursive_parser::{Condition, LogicalOp, SqlExpression, WhereClause};
4use anyhow::{anyhow, Result};
5use chrono::{DateTime, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
6use tracing::debug;
7
8pub struct RecursiveWhereEvaluator<'a> {
10    table: &'a DataTable,
11    case_insensitive: bool,
12    date_notation: String,
13}
14
15impl<'a> RecursiveWhereEvaluator<'a> {
16    #[must_use]
17    pub fn new(table: &'a DataTable) -> Self {
18        Self {
19            table,
20            case_insensitive: false,
21            date_notation: "us".to_string(),
22        }
23    }
24
25    #[must_use]
26    pub fn with_date_notation(table: &'a DataTable, date_notation: String) -> Self {
27        Self {
28            table,
29            case_insensitive: false,
30            date_notation,
31        }
32    }
33
34    fn parse_datetime_with_notation(&self, s: &str) -> Result<DateTime<Utc>> {
36        if let Ok(parsed_dt) = s.parse::<DateTime<Utc>>() {
38            return Ok(parsed_dt);
39        }
40
41        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
43            return Ok(Utc.from_utc_datetime(&dt));
44        }
45        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
46            return Ok(Utc.from_utc_datetime(&dt));
47        }
48        if let Ok(dt) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
49            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
50        }
51
52        if self.date_notation == "european" {
54            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
57                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
58            }
59            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
60                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
61            }
62            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
64                return Ok(Utc.from_utc_datetime(&dt));
65            }
66            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
67                return Ok(Utc.from_utc_datetime(&dt));
68            }
69
70            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
73                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
74            }
75            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
76                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
77            }
78            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
80                return Ok(Utc.from_utc_datetime(&dt));
81            }
82            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
83                return Ok(Utc.from_utc_datetime(&dt));
84            }
85        } else {
86            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
89                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
90            }
91            if let Ok(dt) = NaiveDate::parse_from_str(s, "%m-%d-%Y") {
92                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
93            }
94            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m/%d/%Y %H:%M:%S") {
96                return Ok(Utc.from_utc_datetime(&dt));
97            }
98            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%m-%d-%Y %H:%M:%S") {
99                return Ok(Utc.from_utc_datetime(&dt));
100            }
101
102            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
105                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
106            }
107            if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
108                return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
109            }
110            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S") {
112                return Ok(Utc.from_utc_datetime(&dt));
113            }
114            if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%d-%m-%Y %H:%M:%S") {
115                return Ok(Utc.from_utc_datetime(&dt));
116            }
117        }
118
119        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d-%b-%Y") {
121            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
122        }
123
124        if let Ok(dt) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
126            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
127        }
128        if let Ok(dt) = NaiveDate::parse_from_str(s, "%d %B %Y") {
129            return Ok(Utc.from_utc_datetime(&dt.and_hms_opt(0, 0, 0).unwrap()));
130        }
131
132        if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y/%m/%d %H:%M:%S") {
134            return Ok(Utc.from_utc_datetime(&dt));
135        }
136
137        Err(anyhow!(
138            "Could not parse date/datetime: '{}'. Supported formats include: YYYY-MM-DD, MM/DD/YYYY, DD/MM/YYYY (based on config), with optional HH:MM:SS",
139            s
140        ))
141    }
142
143    fn find_similar_column(&self, name: &str) -> Option<String> {
145        let columns = self.table.column_names();
146        let mut best_match: Option<(String, usize)> = None;
147
148        for col in columns {
149            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
150            let max_distance = if name.len() > 10 { 3 } else { 2 };
153            if distance <= max_distance {
154                match &best_match {
155                    None => best_match = Some((col, distance)),
156                    Some((_, best_dist)) if distance < *best_dist => {
157                        best_match = Some((col, distance));
158                    }
159                    _ => {}
160                }
161            }
162        }
163
164        best_match.map(|(name, _)| name)
165    }
166
167    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
169        let len1 = s1.len();
170        let len2 = s2.len();
171        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
172
173        for i in 0..=len1 {
174            matrix[i][0] = i;
175        }
176        for j in 0..=len2 {
177            matrix[0][j] = j;
178        }
179
180        for (i, c1) in s1.chars().enumerate() {
181            for (j, c2) in s2.chars().enumerate() {
182                let cost = usize::from(c1 != c2);
183                matrix[i + 1][j + 1] = std::cmp::min(
184                    matrix[i][j + 1] + 1, std::cmp::min(
186                        matrix[i + 1][j] + 1, matrix[i][j] + cost,  ),
189                );
190            }
191        }
192
193        matrix[len1][len2]
194    }
195
196    #[must_use]
197    pub fn with_case_insensitive(table: &'a DataTable, case_insensitive: bool) -> Self {
198        Self {
199            table,
200            case_insensitive,
201            date_notation: "us".to_string(),
202        }
203    }
204
205    #[must_use]
206    pub fn with_config(
207        table: &'a DataTable,
208        case_insensitive: bool,
209        date_notation: String,
210    ) -> Self {
211        Self {
212            table,
213            case_insensitive,
214            date_notation,
215        }
216    }
217
218    fn compare_datetime(op: &str, left: &DateTime<Utc>, right: &DateTime<Utc>) -> bool {
220        match op {
221            "=" => left == right,
222            "!=" | "<>" => left != right,
223            ">" => left > right,
224            ">=" => left >= right,
225            "<" => left < right,
226            "<=" => left <= right,
227            _ => false,
228        }
229    }
230
231    fn compare_data_values(&self, left: &DataValue, op: &str, right: &DataValue) -> Result<bool> {
233        use crate::data::datavalue_compare::compare_datavalues;
234        use std::cmp::Ordering;
235
236        let ordering = compare_datavalues(left, right);
237        Ok(match op {
238            "=" => ordering == Ordering::Equal,
239            "!=" | "<>" => ordering != Ordering::Equal,
240            ">" => ordering == Ordering::Greater,
241            ">=" => ordering != Ordering::Less,
242            "<" => ordering == Ordering::Less,
243            "<=" => ordering != Ordering::Greater,
244            _ => return Err(anyhow!("Unsupported operator: {}", op)),
245        })
246    }
247
248    fn evaluate_length(
250        &self,
251        object: &str,
252        row_index: usize,
253    ) -> Result<(Option<DataValue>, String)> {
254        let col_index = self.table.get_column_index(object).ok_or_else(|| {
255            let suggestion = self.find_similar_column(object);
256            match suggestion {
257                Some(similar) => {
258                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
259                }
260                None => anyhow!("Column '{}' not found", object),
261            }
262        })?;
263
264        let value = self.table.get_value(row_index, col_index);
265        let length_value = match value {
266            Some(DataValue::String(s)) => Some(DataValue::Integer(s.len() as i64)),
267            Some(DataValue::InternedString(s)) => Some(DataValue::Integer(s.len() as i64)),
268            Some(DataValue::Integer(n)) => Some(DataValue::Integer(n.to_string().len() as i64)),
269            Some(DataValue::Float(f)) => Some(DataValue::Integer(f.to_string().len() as i64)),
270            _ => Some(DataValue::Integer(0)),
271        };
272        Ok((length_value, format!("{object}.Length()")))
273    }
274
275    fn evaluate_indexof(
277        &self,
278        object: &str,
279        search_str: &str,
280        row_index: usize,
281    ) -> Result<(Option<DataValue>, String)> {
282        let col_index = self.table.get_column_index(object).ok_or_else(|| {
283            let suggestion = self.find_similar_column(object);
284            match suggestion {
285                Some(similar) => {
286                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
287                }
288                None => anyhow!("Column '{}' not found", object),
289            }
290        })?;
291
292        let value = self.table.get_value(row_index, col_index);
293        let index_value = match value {
294            Some(DataValue::String(s)) => {
295                let pos = s
297                    .to_lowercase()
298                    .find(&search_str.to_lowercase())
299                    .map_or(-1, |idx| idx as i64);
300                Some(DataValue::Integer(pos))
301            }
302            Some(DataValue::InternedString(s)) => {
303                let pos = s
304                    .to_lowercase()
305                    .find(&search_str.to_lowercase())
306                    .map_or(-1, |idx| idx as i64);
307                Some(DataValue::Integer(pos))
308            }
309            Some(DataValue::Integer(n)) => {
310                let str_val = n.to_string();
311                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
312                Some(DataValue::Integer(pos))
313            }
314            Some(DataValue::Float(f)) => {
315                let str_val = f.to_string();
316                let pos = str_val.find(search_str).map_or(-1, |idx| idx as i64);
317                Some(DataValue::Integer(pos))
318            }
319            _ => Some(DataValue::Integer(-1)), };
321
322        if row_index < 3 {
323            debug!(
324                "RecursiveWhereEvaluator: Row {} IndexOf('{}') = {:?}",
325                row_index, search_str, index_value
326            );
327        }
328        Ok((index_value, format!("{object}.IndexOf('{search_str}')")))
329    }
330
331    fn apply_trim<'b>(s: &'b str, trim_type: &str) -> &'b str {
333        match trim_type {
334            "trim" => s.trim(),
335            "trimstart" => s.trim_start(),
336            "trimend" => s.trim_end(),
337            _ => s,
338        }
339    }
340
341    fn evaluate_trim(
343        &self,
344        object: &str,
345        row_index: usize,
346        trim_type: &str,
347    ) -> Result<(Option<DataValue>, String)> {
348        let col_index = self.table.get_column_index(object).ok_or_else(|| {
349            let suggestion = self.find_similar_column(object);
350            match suggestion {
351                Some(similar) => {
352                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
353                }
354                None => anyhow!("Column '{}' not found", object),
355            }
356        })?;
357
358        let value = self.table.get_value(row_index, col_index);
359        let trimmed_value = match value {
360            Some(DataValue::String(s)) => Some(DataValue::String(
361                Self::apply_trim(s, trim_type).to_string(),
362            )),
363            Some(DataValue::InternedString(s)) => Some(DataValue::String(
364                Self::apply_trim(s, trim_type).to_string(),
365            )),
366            Some(DataValue::Integer(n)) => {
367                let str_val = n.to_string();
368                Some(DataValue::String(
369                    Self::apply_trim(&str_val, trim_type).to_string(),
370                ))
371            }
372            Some(DataValue::Float(f)) => {
373                let str_val = f.to_string();
374                Some(DataValue::String(
375                    Self::apply_trim(&str_val, trim_type).to_string(),
376                ))
377            }
378            _ => Some(DataValue::String(String::new())),
379        };
380
381        let method_name = match trim_type {
382            "trim" => "Trim",
383            "trimstart" => "TrimStart",
384            "trimend" => "TrimEnd",
385            _ => "Trim",
386        };
387        Ok((trimmed_value, format!("{object}.{method_name}()")))
388    }
389
390    pub fn evaluate(&self, where_clause: &WhereClause, row_index: usize) -> Result<bool> {
392        if row_index < 3 {
394            debug!(
395                "RecursiveWhereEvaluator: evaluate() ENTRY - row {}, {} conditions, case_insensitive={}",
396                row_index,
397                where_clause.conditions.len(),
398                self.case_insensitive
399            );
400        }
401
402        if where_clause.conditions.is_empty() {
403            if row_index < 3 {
404                debug!("RecursiveWhereEvaluator: evaluate() EXIT - no conditions, returning true");
405            }
406            return Ok(true);
407        }
408
409        if row_index < 3 {
411            debug!(
412                "RecursiveWhereEvaluator: evaluate() - evaluating first condition for row {}",
413                row_index
414            );
415        }
416        let mut result = self.evaluate_condition(&where_clause.conditions[0], row_index)?;
417
418        for i in 1..where_clause.conditions.len() {
420            let next_result = self.evaluate_condition(&where_clause.conditions[i], row_index)?;
421
422            if let Some(connector) = &where_clause.conditions[i - 1].connector {
424                result = match connector {
425                    LogicalOp::And => result && next_result,
426                    LogicalOp::Or => result || next_result,
427                };
428            }
429        }
430
431        Ok(result)
432    }
433
434    fn evaluate_condition(&self, condition: &Condition, row_index: usize) -> Result<bool> {
435        if row_index < 3 {
437            debug!(
438                "RecursiveWhereEvaluator: evaluate_condition() ENTRY - row {}",
439                row_index
440            );
441        }
442        let result = self.evaluate_expression(&condition.expr, row_index);
443        if row_index < 3 {
444            debug!(
445                "RecursiveWhereEvaluator: evaluate_condition() EXIT - row {}, result = {:?}",
446                row_index, result
447            );
448        }
449        result
450    }
451
452    fn evaluate_expression(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
453        if row_index < 3 {
455            debug!(
456                "RecursiveWhereEvaluator: evaluate_expression() ENTRY - row {}, expr = {:?}",
457                row_index, expr
458            );
459        }
460
461        let result = match expr {
462            SqlExpression::BinaryOp { left, op, right } => {
463                self.evaluate_binary_op(left, op, right, row_index)
464            }
465            SqlExpression::InList { expr, values } => {
466                self.evaluate_in_list(expr, values, row_index, false)
467            }
468            SqlExpression::NotInList { expr, values } => {
469                let in_result = self.evaluate_in_list(expr, values, row_index, false)?;
470                Ok(!in_result)
471            }
472            SqlExpression::Between { expr, lower, upper } => {
473                self.evaluate_between(expr, lower, upper, row_index)
474            }
475            SqlExpression::Not { expr } => {
476                let inner_result = self.evaluate_expression(expr, row_index)?;
477                Ok(!inner_result)
478            }
479            SqlExpression::MethodCall {
480                object,
481                method,
482                args,
483            } => {
484                if row_index < 3 {
485                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found MethodCall, delegating to evaluate_method_call");
486                }
487                self.evaluate_method_call(object, method, args, row_index)
488            }
489            SqlExpression::CaseExpression {
490                when_branches,
491                else_branch,
492            } => {
493                if row_index < 3 {
494                    debug!("RecursiveWhereEvaluator: evaluate_expression() - found CaseExpression, evaluating");
495                }
496                self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index)
497            }
498            _ => {
499                if row_index < 3 {
500                    debug!("RecursiveWhereEvaluator: evaluate_expression() - unsupported expression type, returning false");
501                }
502                Ok(false) }
504        };
505
506        if row_index < 3 {
507            debug!(
508                "RecursiveWhereEvaluator: evaluate_expression() EXIT - row {}, result = {:?}",
509                row_index, result
510            );
511        }
512        result
513    }
514
515    fn evaluate_binary_op(
516        &self,
517        left: &SqlExpression,
518        op: &str,
519        right: &SqlExpression,
520        row_index: usize,
521    ) -> Result<bool> {
522        use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
523
524        if row_index < 3 {
526            debug!(
527                "RecursiveWhereEvaluator: evaluate_binary_op() ENTRY - row {}, op = '{}'",
528                row_index, op
529            );
530        }
531
532        if op.to_uppercase() == "OR" || op.to_uppercase() == "AND" {
534            let left_result = self.evaluate_expression(left, row_index)?;
535            let right_result = self.evaluate_expression(right, row_index)?;
536
537            return Ok(match op.to_uppercase().as_str() {
538                "OR" => left_result || right_result,
539                "AND" => left_result && right_result,
540                _ => unreachable!(),
541            });
542        }
543
544        if matches!(left, SqlExpression::BinaryOp { .. }) {
546            let mut evaluator =
548                ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
549            let left_result = evaluator.evaluate(left, row_index)?;
550
551            let right_expr_value = self.extract_value(right)?;
553            let right_data_value = match right_expr_value {
554                ExprValue::String(s) => DataValue::String(s),
555                ExprValue::Number(n) => {
556                    if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
558                        DataValue::Integer(n as i64)
559                    } else {
560                        DataValue::Float(n)
561                    }
562                }
563                ExprValue::Boolean(b) => DataValue::Boolean(b),
564                ExprValue::DateTime(_) => {
565                    return Err(anyhow!("DateTime comparison not supported in this context"))
566                }
567                ExprValue::Null => DataValue::Null,
568            };
569
570            return self.compare_data_values(&left_result, op, &right_data_value);
571        }
572
573        let (cell_value, column_name) = match left {
575            SqlExpression::MethodCall {
576                object,
577                method,
578                args,
579            } => {
580                match method.to_lowercase().as_str() {
582                    "length" => {
583                        if !args.is_empty() {
584                            return Err(anyhow::anyhow!("Length() takes no arguments"));
585                        }
586                        self.evaluate_length(object, row_index)?
587                    }
588                    "indexof" => {
589                        if args.len() != 1 {
590                            return Err(anyhow::anyhow!("IndexOf() requires exactly 1 argument"));
591                        }
592                        let search_str = self.extract_string_value(&args[0])?;
593                        self.evaluate_indexof(object, &search_str, row_index)?
594                    }
595                    "trim" => {
596                        if !args.is_empty() {
597                            return Err(anyhow::anyhow!("Trim() takes no arguments"));
598                        }
599                        self.evaluate_trim(object, row_index, "trim")?
600                    }
601                    "trimstart" => {
602                        if !args.is_empty() {
603                            return Err(anyhow::anyhow!("TrimStart() takes no arguments"));
604                        }
605                        self.evaluate_trim(object, row_index, "trimstart")?
606                    }
607                    "trimend" => {
608                        if !args.is_empty() {
609                            return Err(anyhow::anyhow!("TrimEnd() takes no arguments"));
610                        }
611                        self.evaluate_trim(object, row_index, "trimend")?
612                    }
613                    _ => {
614                        return Err(anyhow::anyhow!(
615                            "Method '{}' cannot be used in comparisons",
616                            method
617                        ));
618                    }
619                }
620            }
621            SqlExpression::BinaryOp {
622                left: _expr_left,
623                op: arith_op,
624                right: _expr_right,
625            } if matches!(arith_op.as_str(), "+" | "-" | "*" | "/") => {
626                let mut evaluator =
628                    ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
629                let computed_value = evaluator.evaluate(left, row_index)?;
630                if row_index < 3 {
631                    debug!(
632                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed arithmetic expression = {:?}",
633                        computed_value
634                    );
635                }
636                (Some(computed_value), "computed_expression".to_string())
637            }
638            SqlExpression::FunctionCall { name, .. } => {
639                let mut evaluator =
641                    ArithmeticEvaluator::with_date_notation(self.table, self.date_notation.clone());
642                let computed_value = evaluator.evaluate(left, row_index)?;
643                if row_index < 3 {
644                    debug!(
645                        "RecursiveWhereEvaluator: evaluate_binary_op() - computed function {} = {:?}",
646                        name, computed_value
647                    );
648                }
649                (Some(computed_value), format!("{name}()"))
650            }
651            _ => {
652                let column_name = self.extract_column_name(left)?;
654                if row_index < 3 {
655                    debug!(
656                        "RecursiveWhereEvaluator: evaluate_binary_op() - column_name = '{}'",
657                        column_name
658                    );
659                }
660
661                let col_index = self.table.get_column_index(&column_name).ok_or_else(|| {
662                    let suggestion = self.find_similar_column(&column_name);
663                    match suggestion {
664                        Some(similar) => anyhow!(
665                            "Column '{}' not found. Did you mean '{}'?",
666                            column_name,
667                            similar
668                        ),
669                        None => anyhow!("Column '{}' not found", column_name),
670                    }
671                })?;
672
673                let cell_value = self.table.get_value(row_index, col_index).cloned();
674                (cell_value, column_name)
675            }
676        };
677
678        if row_index < 3 {
679            debug!(
680                "RecursiveWhereEvaluator: evaluate_binary_op() - row {} column '{}' value = {:?}",
681                row_index, column_name, cell_value
682            );
683        }
684
685        let compare_value = self.extract_value(right)?;
687
688        match (cell_value, op.to_uppercase().as_str(), &compare_value) {
690            (Some(DataValue::String(ref a)), "=", ExprValue::String(b)) => {
691                if let (Ok(date_a), Ok(date_b)) = (
693                    self.parse_datetime_with_notation(a),
694                    self.parse_datetime_with_notation(b),
695                ) {
696                    if row_index < 3 {
697                        debug!(
698                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' = '{}' = {}",
699                            date_a.format("%Y-%m-%d %H:%M:%S"),
700                            date_b.format("%Y-%m-%d %H:%M:%S"),
701                            date_a == date_b
702                        );
703                    }
704                    Ok(date_a == date_b)
705                } else {
706                    if row_index < 3 {
707                        debug!(
708                            "RecursiveWhereEvaluator: String comparison '{}' = '{}' (case_insensitive={})",
709                            a, b, self.case_insensitive
710                        );
711                    }
712                    if self.case_insensitive {
713                        Ok(a.to_lowercase() == b.to_lowercase())
714                    } else {
715                        Ok(a == b)
716                    }
717                }
718            }
719            (Some(DataValue::InternedString(ref a)), "=", ExprValue::String(b)) => {
720                if let (Ok(date_a), Ok(date_b)) = (
722                    self.parse_datetime_with_notation(a.as_ref()),
723                    self.parse_datetime_with_notation(b),
724                ) {
725                    if row_index < 3 {
726                        debug!(
727                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' = '{}' = {}",
728                            date_a.format("%Y-%m-%d %H:%M:%S"),
729                            date_b.format("%Y-%m-%d %H:%M:%S"),
730                            date_a == date_b
731                        );
732                    }
733                    Ok(date_a == date_b)
734                } else {
735                    if row_index < 3 {
736                        debug!(
737                            "RecursiveWhereEvaluator: InternedString comparison '{}' = '{}' (case_insensitive={})",
738                            a, b, self.case_insensitive
739                        );
740                    }
741                    if self.case_insensitive {
742                        Ok(a.to_lowercase() == b.to_lowercase())
743                    } else {
744                        Ok(a.as_ref() == b)
745                    }
746                }
747            }
748            (Some(DataValue::String(ref a)), "!=" | "<>", ExprValue::String(b)) => {
749                if let (Ok(date_a), Ok(date_b)) = (
751                    self.parse_datetime_with_notation(a),
752                    self.parse_datetime_with_notation(b),
753                ) {
754                    if row_index < 3 {
755                        debug!(
756                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' != '{}' = {}",
757                            date_a.format("%Y-%m-%d %H:%M:%S"),
758                            date_b.format("%Y-%m-%d %H:%M:%S"),
759                            date_a != date_b
760                        );
761                    }
762                    Ok(date_a != date_b)
763                } else {
764                    if row_index < 3 {
765                        debug!(
766                            "RecursiveWhereEvaluator: String comparison '{}' != '{}' (case_insensitive={})",
767                            a, b, self.case_insensitive
768                        );
769                    }
770                    if self.case_insensitive {
771                        Ok(a.to_lowercase() != b.to_lowercase())
772                    } else {
773                        Ok(a != b)
774                    }
775                }
776            }
777            (Some(DataValue::InternedString(ref a)), "!=" | "<>", ExprValue::String(b)) => {
778                if let (Ok(date_a), Ok(date_b)) = (
780                    self.parse_datetime_with_notation(a.as_ref()),
781                    self.parse_datetime_with_notation(b),
782                ) {
783                    if row_index < 3 {
784                        debug!(
785                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' != '{}' = {}",
786                            date_a.format("%Y-%m-%d %H:%M:%S"),
787                            date_b.format("%Y-%m-%d %H:%M:%S"),
788                            date_a != date_b
789                        );
790                    }
791                    Ok(date_a != date_b)
792                } else {
793                    if row_index < 3 {
794                        debug!(
795                            "RecursiveWhereEvaluator: InternedString comparison '{}' != '{}' (case_insensitive={})",
796                            a, b, self.case_insensitive
797                        );
798                    }
799                    if self.case_insensitive {
800                        Ok(a.to_lowercase() != b.to_lowercase())
801                    } else {
802                        Ok(a.as_ref() != b)
803                    }
804                }
805            }
806            (Some(DataValue::String(ref a)), ">", ExprValue::String(b)) => {
807                if let (Ok(date_a), Ok(date_b)) = (
809                    self.parse_datetime_with_notation(a),
810                    self.parse_datetime_with_notation(b),
811                ) {
812                    if row_index < 3 {
813                        debug!(
814                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' > '{}' = {}",
815                            date_a.format("%Y-%m-%d %H:%M:%S"),
816                            date_b.format("%Y-%m-%d %H:%M:%S"),
817                            date_a > date_b
818                        );
819                    }
820                    Ok(date_a > date_b)
821                } else if self.case_insensitive {
822                    Ok(a.to_lowercase() > b.to_lowercase())
823                } else {
824                    Ok(a > b)
825                }
826            }
827            (Some(DataValue::InternedString(ref a)), ">", ExprValue::String(b)) => {
828                if let (Ok(date_a), Ok(date_b)) = (
830                    self.parse_datetime_with_notation(a.as_ref()),
831                    self.parse_datetime_with_notation(b),
832                ) {
833                    if row_index < 3 {
834                        debug!(
835                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' > '{}' = {}",
836                            date_a.format("%Y-%m-%d %H:%M:%S"),
837                            date_b.format("%Y-%m-%d %H:%M:%S"),
838                            date_a > date_b
839                        );
840                    }
841                    Ok(date_a > date_b)
842                } else if self.case_insensitive {
843                    Ok(a.to_lowercase() > b.to_lowercase())
844                } else {
845                    Ok(a.as_ref() > b)
846                }
847            }
848            (Some(DataValue::String(ref a)), ">=", ExprValue::String(b)) => {
849                if let (Ok(date_a), Ok(date_b)) = (
851                    self.parse_datetime_with_notation(a),
852                    self.parse_datetime_with_notation(b),
853                ) {
854                    if row_index < 3 {
855                        debug!(
856                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' >= '{}' = {}",
857                            date_a.format("%Y-%m-%d %H:%M:%S"),
858                            date_b.format("%Y-%m-%d %H:%M:%S"),
859                            date_a >= date_b
860                        );
861                    }
862                    Ok(date_a >= date_b)
863                } else if self.case_insensitive {
864                    Ok(a.to_lowercase() >= b.to_lowercase())
865                } else {
866                    Ok(a >= b)
867                }
868            }
869            (Some(DataValue::InternedString(ref a)), ">=", ExprValue::String(b)) => {
870                if let (Ok(date_a), Ok(date_b)) = (
872                    self.parse_datetime_with_notation(a.as_ref()),
873                    self.parse_datetime_with_notation(b),
874                ) {
875                    if row_index < 3 {
876                        debug!(
877                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' >= '{}' = {}",
878                            date_a.format("%Y-%m-%d %H:%M:%S"),
879                            date_b.format("%Y-%m-%d %H:%M:%S"),
880                            date_a >= date_b
881                        );
882                    }
883                    Ok(date_a >= date_b)
884                } else if self.case_insensitive {
885                    Ok(a.to_lowercase() >= b.to_lowercase())
886                } else {
887                    Ok(a.as_ref() >= b)
888                }
889            }
890            (Some(DataValue::String(ref a)), "<", ExprValue::String(b)) => {
891                if let (Ok(date_a), Ok(date_b)) = (
893                    self.parse_datetime_with_notation(a),
894                    self.parse_datetime_with_notation(b),
895                ) {
896                    if row_index < 3 {
897                        debug!(
898                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' < '{}' = {}",
899                            date_a.format("%Y-%m-%d %H:%M:%S"),
900                            date_b.format("%Y-%m-%d %H:%M:%S"),
901                            date_a < date_b
902                        );
903                    }
904                    Ok(date_a < date_b)
905                } else if self.case_insensitive {
906                    Ok(a.to_lowercase() < b.to_lowercase())
907                } else {
908                    Ok(a < b)
909                }
910            }
911            (Some(DataValue::InternedString(ref a)), "<", ExprValue::String(b)) => {
912                if let (Ok(date_a), Ok(date_b)) = (
914                    self.parse_datetime_with_notation(a.as_ref()),
915                    self.parse_datetime_with_notation(b),
916                ) {
917                    if row_index < 3 {
918                        debug!(
919                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' < '{}' = {}",
920                            date_a.format("%Y-%m-%d %H:%M:%S"),
921                            date_b.format("%Y-%m-%d %H:%M:%S"),
922                            date_a < date_b
923                        );
924                    }
925                    Ok(date_a < date_b)
926                } else if self.case_insensitive {
927                    Ok(a.to_lowercase() < b.to_lowercase())
928                } else {
929                    Ok(a.as_ref() < b)
930                }
931            }
932            (Some(DataValue::String(ref a)), "<=", ExprValue::String(b)) => {
933                if let (Ok(date_a), Ok(date_b)) = (
935                    self.parse_datetime_with_notation(a),
936                    self.parse_datetime_with_notation(b),
937                ) {
938                    if row_index < 3 {
939                        debug!(
940                            "RecursiveWhereEvaluator: Date comparison (from strings) '{}' <= '{}' = {}",
941                            date_a.format("%Y-%m-%d %H:%M:%S"),
942                            date_b.format("%Y-%m-%d %H:%M:%S"),
943                            date_a <= date_b
944                        );
945                    }
946                    Ok(date_a <= date_b)
947                } else if self.case_insensitive {
948                    Ok(a.to_lowercase() <= b.to_lowercase())
949                } else {
950                    Ok(a <= b)
951                }
952            }
953            (Some(DataValue::InternedString(ref a)), "<=", ExprValue::String(b)) => {
954                if let (Ok(date_a), Ok(date_b)) = (
956                    self.parse_datetime_with_notation(a.as_ref()),
957                    self.parse_datetime_with_notation(b),
958                ) {
959                    if row_index < 3 {
960                        debug!(
961                            "RecursiveWhereEvaluator: Date comparison (from interned strings) '{}' <= '{}' = {}",
962                            date_a.format("%Y-%m-%d %H:%M:%S"),
963                            date_b.format("%Y-%m-%d %H:%M:%S"),
964                            date_a <= date_b
965                        );
966                    }
967                    Ok(date_a <= date_b)
968                } else if self.case_insensitive {
969                    Ok(a.to_lowercase() <= b.to_lowercase())
970                } else {
971                    Ok(a.as_ref() <= b)
972                }
973            }
974
975            (Some(DataValue::Integer(a)), "=", ExprValue::Number(b)) => Ok(a as f64 == *b),
976            (Some(DataValue::Integer(a)), "!=" | "<>", ExprValue::Number(b)) => Ok(a as f64 != *b),
977            (Some(DataValue::Integer(a)), ">", ExprValue::Number(b)) => Ok(a as f64 > *b),
978            (Some(DataValue::Integer(a)), ">=", ExprValue::Number(b)) => Ok(a as f64 >= *b),
979            (Some(DataValue::Integer(a)), "<", ExprValue::Number(b)) => Ok((a as f64) < *b),
980            (Some(DataValue::Integer(a)), "<=", ExprValue::Number(b)) => Ok(a as f64 <= *b),
981
982            (Some(DataValue::Float(a)), "=", ExprValue::Number(b)) => {
983                Ok((a - b).abs() < f64::EPSILON)
984            }
985            (Some(DataValue::Float(a)), "!=" | "<>", ExprValue::Number(b)) => {
986                Ok((a - b).abs() >= f64::EPSILON)
987            }
988            (Some(DataValue::Float(a)), ">", ExprValue::Number(b)) => Ok(a > *b),
989            (Some(DataValue::Float(a)), ">=", ExprValue::Number(b)) => Ok(a >= *b),
990            (Some(DataValue::Float(a)), "<", ExprValue::Number(b)) => Ok(a < *b),
991            (Some(DataValue::Float(a)), "<=", ExprValue::Number(b)) => Ok(a <= *b),
992
993            (Some(DataValue::Boolean(a)), "=", ExprValue::Boolean(b)) => Ok(a == *b),
995            (Some(DataValue::Boolean(a)), "!=" | "<>", ExprValue::Boolean(b)) => Ok(a != *b),
996
997            (Some(DataValue::Boolean(a)), "=", ExprValue::String(b)) => {
999                let bool_str = a.to_string();
1000                if self.case_insensitive {
1001                    Ok(bool_str.to_lowercase() == b.to_lowercase())
1002                } else {
1003                    Ok(bool_str == *b)
1004                }
1005            }
1006            (Some(DataValue::Boolean(a)), "!=" | "<>", ExprValue::String(b)) => {
1007                let bool_str = a.to_string();
1008                if self.case_insensitive {
1009                    Ok(bool_str.to_lowercase() != b.to_lowercase())
1010                } else {
1011                    Ok(bool_str != *b)
1012                }
1013            }
1014
1015            (Some(DataValue::String(ref text)), "LIKE", ExprValue::String(pattern)) => {
1017                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
1018                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
1019                    .case_insensitive(true)
1020                    .build()
1021                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
1022                Ok(regex.is_match(text))
1023            }
1024            (Some(DataValue::InternedString(ref text)), "LIKE", ExprValue::String(pattern)) => {
1025                let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
1026                let regex = regex::RegexBuilder::new(&format!("^{regex_pattern}$"))
1027                    .case_insensitive(true)
1028                    .build()
1029                    .map_err(|e| anyhow::anyhow!("Invalid LIKE pattern: {}", e))?;
1030                Ok(regex.is_match(text.as_ref()))
1031            }
1032
1033            (None | Some(DataValue::Null), "IS", ExprValue::Null) => Ok(true),
1035            (Some(_), "IS", ExprValue::Null) => Ok(false),
1036            (None | Some(DataValue::Null), "IS NOT", ExprValue::Null) => Ok(false),
1037            (Some(_), "IS NOT", ExprValue::Null) => Ok(true),
1038
1039            (None | Some(DataValue::Null), "IS NULL", _) => Ok(true),
1041            (Some(_), "IS NULL", _) => Ok(false),
1042            (None | Some(DataValue::Null), "IS NOT NULL", _) => Ok(false),
1043            (Some(_), "IS NOT NULL", _) => Ok(true),
1044
1045            (Some(DataValue::String(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1047                if row_index < 3 {
1048                    debug!(
1049                        "RecursiveWhereEvaluator: DateTime comparison '{}' {} '{}' - attempting parse",
1050                        date_str,
1051                        op_str,
1052                        dt.format("%Y-%m-%d %H:%M:%S")
1053                    );
1054                }
1055
1056                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str) {
1058                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1059                    if row_index < 3 {
1060                        debug!(
1061                            "RecursiveWhereEvaluator: DateTime parsed successfully: '{}' {} '{}' = {}",
1062                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
1063                            op_str,
1064                            dt.format("%Y-%m-%d %H:%M:%S"),
1065                            result
1066                        );
1067                    }
1068                    Ok(result)
1069                } else {
1070                    if row_index < 3 {
1071                        debug!(
1072                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
1073                            date_str
1074                        );
1075                    }
1076                    Ok(false)
1077                }
1078            }
1079            (Some(DataValue::InternedString(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1080                if row_index < 3 {
1081                    debug!(
1082                        "RecursiveWhereEvaluator: DateTime comparison (interned) '{}' {} '{}' - attempting parse",
1083                        date_str,
1084                        op_str,
1085                        dt.format("%Y-%m-%d %H:%M:%S")
1086                    );
1087                }
1088
1089                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str.as_ref()) {
1091                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1092                    if row_index < 3 {
1093                        debug!(
1094                            "RecursiveWhereEvaluator: DateTime parsed successfully: '{}' {} '{}' = {}",
1095                            parsed_dt.format("%Y-%m-%d %H:%M:%S"),
1096                            op_str,
1097                            dt.format("%Y-%m-%d %H:%M:%S"),
1098                            result
1099                        );
1100                    }
1101                    Ok(result)
1102                } else {
1103                    if row_index < 3 {
1104                        debug!(
1105                            "RecursiveWhereEvaluator: DateTime parse FAILED for '{}' - no matching format",
1106                            date_str
1107                        );
1108                    }
1109                    Ok(false)
1110                }
1111            }
1112
1113            (Some(DataValue::DateTime(ref date_str)), op_str, ExprValue::DateTime(dt)) => {
1115                if row_index < 3 {
1116                    debug!(
1117                        "RecursiveWhereEvaluator: DateTime vs DateTime comparison '{}' {} '{}' - direct comparison",
1118                        date_str, op_str, dt.format("%Y-%m-%d %H:%M:%S")
1119                    );
1120                }
1121
1122                if let Ok(parsed_dt) = self.parse_datetime_with_notation(date_str) {
1124                    let result = Self::compare_datetime(op_str, &parsed_dt, dt);
1125                    if row_index < 3 {
1126                        debug!(
1127                            "RecursiveWhereEvaluator: DateTime vs DateTime parsed successfully: '{}' {} '{}' = {}",
1128                            parsed_dt.format("%Y-%m-%d %H:%M:%S"), op_str, dt.format("%Y-%m-%d %H:%M:%S"), result
1129                        );
1130                    }
1131                    Ok(result)
1132                } else {
1133                    if row_index < 3 {
1134                        debug!(
1135                            "RecursiveWhereEvaluator: DateTime vs DateTime parse FAILED for '{}' - no matching format",
1136                            date_str
1137                        );
1138                    }
1139                    Ok(false)
1140                }
1141            }
1142
1143            _ => Ok(false),
1144        }
1145    }
1146
1147    fn evaluate_in_list(
1148        &self,
1149        expr: &SqlExpression,
1150        values: &[SqlExpression],
1151        row_index: usize,
1152        _ignore_case: bool,
1153    ) -> Result<bool> {
1154        let column_name = self.extract_column_name(expr)?;
1155        let col_index = self
1156            .table
1157            .get_column_index(&column_name)
1158            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
1159
1160        let cell_value = self.table.get_value(row_index, col_index).cloned();
1161
1162        for value_expr in values {
1163            let compare_value = self.extract_value(value_expr)?;
1164            let matches = match (cell_value.as_ref(), &compare_value) {
1165                (Some(DataValue::String(a)), ExprValue::String(b)) => {
1166                    if self.case_insensitive {
1167                        if row_index < 3 {
1168                            debug!("RecursiveWhereEvaluator: IN list string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
1169                        }
1170                        a.to_lowercase() == b.to_lowercase()
1171                    } else {
1172                        a == b
1173                    }
1174                }
1175                (Some(DataValue::InternedString(a)), ExprValue::String(b)) => {
1176                    if self.case_insensitive {
1177                        if row_index < 3 {
1178                            debug!("RecursiveWhereEvaluator: IN list interned string comparison '{}' in '{}' (case_insensitive={})", a, b, self.case_insensitive);
1179                        }
1180                        a.to_lowercase() == b.to_lowercase()
1181                    } else {
1182                        a.as_ref() == b
1183                    }
1184                }
1185                (Some(DataValue::Integer(a)), ExprValue::Number(b)) => *a as f64 == *b,
1186                (Some(DataValue::Float(a)), ExprValue::Number(b)) => (*a - b).abs() < f64::EPSILON,
1187                _ => false,
1188            };
1189
1190            if matches {
1191                return Ok(true);
1192            }
1193        }
1194
1195        Ok(false)
1196    }
1197
1198    fn evaluate_between(
1199        &self,
1200        expr: &SqlExpression,
1201        lower: &SqlExpression,
1202        upper: &SqlExpression,
1203        row_index: usize,
1204    ) -> Result<bool> {
1205        let column_name = self.extract_column_name(expr)?;
1206        let col_index = self
1207            .table
1208            .get_column_index(&column_name)
1209            .ok_or_else(|| anyhow::anyhow!("Column '{}' not found", column_name))?;
1210
1211        let cell_value = self.table.get_value(row_index, col_index).cloned();
1212        let lower_value = self.extract_value(lower)?;
1213        let upper_value = self.extract_value(upper)?;
1214
1215        match (cell_value, &lower_value, &upper_value) {
1216            (Some(DataValue::Integer(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
1217                Ok(n as f64 >= *l && n as f64 <= *u)
1218            }
1219            (Some(DataValue::Float(n)), ExprValue::Number(l), ExprValue::Number(u)) => {
1220                Ok(n >= *l && n <= *u)
1221            }
1222            (Some(DataValue::String(ref s)), ExprValue::String(l), ExprValue::String(u)) => {
1223                Ok(s >= l && s <= u)
1224            }
1225            (
1226                Some(DataValue::InternedString(ref s)),
1227                ExprValue::String(l),
1228                ExprValue::String(u),
1229            ) => Ok(s.as_ref() >= l && s.as_ref() <= u),
1230            _ => Ok(false),
1231        }
1232    }
1233
1234    fn evaluate_method_call(
1235        &self,
1236        object: &str,
1237        method: &str,
1238        args: &[SqlExpression],
1239        row_index: usize,
1240    ) -> Result<bool> {
1241        if row_index < 3 {
1242            debug!(
1243                "RecursiveWhereEvaluator: evaluate_method_call - object='{}', method='{}', row={}",
1244                object, method, row_index
1245            );
1246        }
1247
1248        let col_index = self.table.get_column_index(object).ok_or_else(|| {
1250            let suggestion = self.find_similar_column(object);
1251            match suggestion {
1252                Some(similar) => {
1253                    anyhow!("Column '{}' not found. Did you mean '{}'?", object, similar)
1254                }
1255                None => anyhow!("Column '{}' not found", object),
1256            }
1257        })?;
1258
1259        let cell_value = self.table.get_value(row_index, col_index).cloned();
1260        if row_index < 3 {
1261            debug!(
1262                "RecursiveWhereEvaluator: Row {} column '{}' value = {:?}",
1263                row_index, object, cell_value
1264            );
1265        }
1266
1267        match method.to_lowercase().as_str() {
1268            "contains" => {
1269                if args.len() != 1 {
1270                    return Err(anyhow::anyhow!("Contains requires exactly 1 argument"));
1271                }
1272                let search_str = self.extract_string_value(&args[0])?;
1273                let search_lower = search_str.to_lowercase();
1275
1276                match cell_value {
1278                    Some(DataValue::String(ref s)) => {
1279                        let result = s.to_lowercase().contains(&search_lower);
1280                        if row_index < 3 {
1282                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1283                        }
1284                        Ok(result)
1285                    }
1286                    Some(DataValue::InternedString(ref s)) => {
1287                        let result = s.to_lowercase().contains(&search_lower);
1288                        if row_index < 3 {
1290                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on interned '{}' = {} (case-insensitive)", row_index, search_str, s, result);
1291                        }
1292                        Ok(result)
1293                    }
1294                    Some(DataValue::Integer(n)) => {
1295                        let str_val = n.to_string();
1296                        let result = str_val.contains(&search_str);
1297                        if row_index < 3 {
1298                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on integer '{}' = {}", row_index, search_str, str_val, result);
1299                        }
1300                        Ok(result)
1301                    }
1302                    Some(DataValue::Float(f)) => {
1303                        let str_val = f.to_string();
1304                        let result = str_val.contains(&search_str);
1305                        if row_index < 3 {
1306                            debug!(
1307                                "RecursiveWhereEvaluator: Row {} contains('{}') on float '{}' = {}",
1308                                row_index, search_str, str_val, result
1309                            );
1310                        }
1311                        Ok(result)
1312                    }
1313                    Some(DataValue::Boolean(b)) => {
1314                        let str_val = b.to_string();
1315                        let result = str_val.contains(&search_str);
1316                        if row_index < 3 {
1317                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on boolean '{}' = {}", row_index, search_str, str_val, result);
1318                        }
1319                        Ok(result)
1320                    }
1321                    Some(DataValue::DateTime(dt)) => {
1322                        let result = dt.contains(&search_str);
1324                        if row_index < 3 {
1325                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on datetime '{}' = {}", row_index, search_str, dt, result);
1326                        }
1327                        Ok(result)
1328                    }
1329                    _ => {
1330                        if row_index < 3 {
1331                            debug!("RecursiveWhereEvaluator: Row {} contains('{}') on null/empty value = false", row_index, search_str);
1332                        }
1333                        Ok(false)
1334                    }
1335                }
1336            }
1337            "startswith" => {
1338                if args.len() != 1 {
1339                    return Err(anyhow::anyhow!("StartsWith requires exactly 1 argument"));
1340                }
1341                let prefix = self.extract_string_value(&args[0])?;
1342
1343                match cell_value {
1345                    Some(DataValue::String(ref s)) => {
1346                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1347                    }
1348                    Some(DataValue::InternedString(ref s)) => {
1349                        Ok(s.to_lowercase().starts_with(&prefix.to_lowercase()))
1350                    }
1351                    Some(DataValue::Integer(n)) => Ok(n.to_string().starts_with(&prefix)),
1352                    Some(DataValue::Float(f)) => Ok(f.to_string().starts_with(&prefix)),
1353                    Some(DataValue::Boolean(b)) => Ok(b.to_string().starts_with(&prefix)),
1354                    Some(DataValue::DateTime(dt)) => Ok(dt.starts_with(&prefix)),
1355                    _ => Ok(false),
1356                }
1357            }
1358            "endswith" => {
1359                if args.len() != 1 {
1360                    return Err(anyhow::anyhow!("EndsWith requires exactly 1 argument"));
1361                }
1362                let suffix = self.extract_string_value(&args[0])?;
1363
1364                match cell_value {
1366                    Some(DataValue::String(ref s)) => {
1367                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1368                    }
1369                    Some(DataValue::InternedString(ref s)) => {
1370                        Ok(s.to_lowercase().ends_with(&suffix.to_lowercase()))
1371                    }
1372                    Some(DataValue::Integer(n)) => Ok(n.to_string().ends_with(&suffix)),
1373                    Some(DataValue::Float(f)) => Ok(f.to_string().ends_with(&suffix)),
1374                    Some(DataValue::Boolean(b)) => Ok(b.to_string().ends_with(&suffix)),
1375                    Some(DataValue::DateTime(dt)) => Ok(dt.ends_with(&suffix)),
1376                    _ => Ok(false),
1377                }
1378            }
1379            _ => Err(anyhow::anyhow!("Unsupported method: {}", method)),
1380        }
1381    }
1382
1383    fn extract_column_name(&self, expr: &SqlExpression) -> Result<String> {
1384        match expr {
1385            SqlExpression::Column(name) => Ok(name.clone()),
1386            _ => Err(anyhow::anyhow!("Expected column name, got: {:?}", expr)),
1387        }
1388    }
1389
1390    fn extract_string_value(&self, expr: &SqlExpression) -> Result<String> {
1391        match expr {
1392            SqlExpression::StringLiteral(s) => Ok(s.clone()),
1393            _ => Err(anyhow::anyhow!("Expected string literal, got: {:?}", expr)),
1394        }
1395    }
1396
1397    fn extract_value(&self, expr: &SqlExpression) -> Result<ExprValue> {
1398        match expr {
1399            SqlExpression::StringLiteral(s) => Ok(ExprValue::String(s.clone())),
1400            SqlExpression::BooleanLiteral(b) => Ok(ExprValue::Boolean(*b)),
1401            SqlExpression::NumberLiteral(n) => {
1402                if let Ok(num) = n.parse::<f64>() {
1403                    Ok(ExprValue::Number(num))
1404                } else {
1405                    Ok(ExprValue::String(n.clone()))
1406                }
1407            }
1408            SqlExpression::DateTimeConstructor {
1409                year,
1410                month,
1411                day,
1412                hour,
1413                minute,
1414                second,
1415            } => {
1416                let naive_date = NaiveDate::from_ymd_opt(*year, *month, *day)
1418                    .ok_or_else(|| anyhow::anyhow!("Invalid date: {}-{}-{}", year, month, day))?;
1419                let naive_time = NaiveTime::from_hms_opt(
1420                    hour.unwrap_or(0),
1421                    minute.unwrap_or(0),
1422                    second.unwrap_or(0),
1423                )
1424                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1425                let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
1426                let datetime = Utc.from_utc_datetime(&naive_datetime);
1427                Ok(ExprValue::DateTime(datetime))
1428            }
1429            SqlExpression::DateTimeToday {
1430                hour,
1431                minute,
1432                second,
1433            } => {
1434                let today = Local::now().date_naive();
1436                let time = NaiveTime::from_hms_opt(
1437                    hour.unwrap_or(0),
1438                    minute.unwrap_or(0),
1439                    second.unwrap_or(0),
1440                )
1441                .ok_or_else(|| anyhow::anyhow!("Invalid time"))?;
1442                let naive_datetime = NaiveDateTime::new(today, time);
1443                let datetime = Utc.from_utc_datetime(&naive_datetime);
1444                Ok(ExprValue::DateTime(datetime))
1445            }
1446            _ => Ok(ExprValue::Null),
1447        }
1448    }
1449
1450    fn evaluate_case_expression_as_bool(
1452        &self,
1453        when_branches: &[crate::sql::recursive_parser::WhenBranch],
1454        else_branch: &Option<Box<SqlExpression>>,
1455        row_index: usize,
1456    ) -> Result<bool> {
1457        debug!(
1458            "RecursiveWhereEvaluator: evaluating CASE expression as bool for row {}",
1459            row_index
1460        );
1461
1462        for branch in when_branches {
1464            let condition_result = self.evaluate_expression(&branch.condition, row_index)?;
1466
1467            if condition_result {
1468                debug!("CASE: WHEN condition matched, evaluating result expression as bool");
1469                return self.evaluate_expression_as_bool(&branch.result, row_index);
1471            }
1472        }
1473
1474        if let Some(else_expr) = else_branch {
1476            debug!("CASE: No WHEN matched, evaluating ELSE expression as bool");
1477            self.evaluate_expression_as_bool(else_expr, row_index)
1478        } else {
1479            debug!("CASE: No WHEN matched and no ELSE, returning false");
1480            Ok(false)
1481        }
1482    }
1483
1484    fn evaluate_expression_as_bool(&self, expr: &SqlExpression, row_index: usize) -> Result<bool> {
1486        match expr {
1487            SqlExpression::BinaryOp { .. }
1489            | SqlExpression::InList { .. }
1490            | SqlExpression::NotInList { .. }
1491            | SqlExpression::Between { .. }
1492            | SqlExpression::Not { .. }
1493            | SqlExpression::MethodCall { .. } => self.evaluate_expression(expr, row_index),
1494            SqlExpression::CaseExpression {
1496                when_branches,
1497                else_branch,
1498            } => self.evaluate_case_expression_as_bool(when_branches, else_branch, row_index),
1499            _ => {
1501                let mut evaluator =
1503                    crate::data::arithmetic_evaluator::ArithmeticEvaluator::with_date_notation(
1504                        self.table,
1505                        self.date_notation.clone(),
1506                    );
1507                let value = evaluator.evaluate(expr, row_index)?;
1508
1509                match value {
1510                    crate::data::datatable::DataValue::Boolean(b) => Ok(b),
1511                    crate::data::datatable::DataValue::Integer(i) => Ok(i != 0),
1512                    crate::data::datatable::DataValue::Float(f) => Ok(f != 0.0),
1513                    crate::data::datatable::DataValue::Null => Ok(false),
1514                    crate::data::datatable::DataValue::String(s) => Ok(!s.is_empty()),
1515                    crate::data::datatable::DataValue::InternedString(s) => Ok(!s.is_empty()),
1516                    _ => Ok(true), }
1518            }
1519        }
1520    }
1521}
1522
1523enum ExprValue {
1524    String(String),
1525    Number(f64),
1526    Boolean(bool),
1527    DateTime(DateTime<Utc>),
1528    Null,
1529}