Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9pub struct ColumnMap {
10    exact: FxHashMap<String, usize>,
11    short: FxHashMap<String, ShortMatch>,
12}
13
14#[derive(Clone)]
15enum ShortMatch {
16    Unique(usize),
17    Ambiguous,
18}
19
20impl Clone for ColumnMap {
21    fn clone(&self) -> Self {
22        Self {
23            exact: self.exact.clone(),
24            short: self.short.clone(),
25        }
26    }
27}
28
29impl ColumnMap {
30    pub fn new(columns: &[ColumnDef]) -> Self {
31        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
32        let mut short: FxHashMap<String, ShortMatch> =
33            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
34
35        for (i, col) in columns.iter().enumerate() {
36            let lower = col.name.to_ascii_lowercase();
37            exact.insert(lower.clone(), i);
38
39            let unqualified = if let Some(dot) = lower.rfind('.') {
40                &lower[dot + 1..]
41            } else {
42                &lower
43            };
44            short
45                .entry(unqualified.to_string())
46                .and_modify(|e| *e = ShortMatch::Ambiguous)
47                .or_insert(ShortMatch::Unique(i));
48        }
49
50        Self { exact, short }
51    }
52
53    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
54        if let Some(&idx) = self.exact.get(name) {
55            return Ok(idx);
56        }
57        match self.short.get(name) {
58            Some(ShortMatch::Unique(idx)) => Ok(*idx),
59            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
60            None => Err(SqlError::ColumnNotFound(name.to_string())),
61        }
62    }
63
64    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
65        let qualified = format!("{table}.{column}");
66        if let Some(&idx) = self.exact.get(&qualified) {
67            return Ok(idx);
68        }
69        match self.short.get(column) {
70            Some(ShortMatch::Unique(idx)) => Ok(*idx),
71            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
72        }
73    }
74}
75
76pub struct EvalCtx<'a> {
77    pub col_map: &'a ColumnMap,
78    pub row: &'a [Value],
79    pub params: &'a [Value],
80    pub excluded: Option<ExcludedRow<'a>>,
81}
82
83pub struct ExcludedRow<'a> {
84    pub col_map: &'a ColumnMap,
85    pub row: &'a [Value],
86}
87
88impl<'a> EvalCtx<'a> {
89    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
90        Self {
91            col_map,
92            row,
93            params: &[],
94            excluded: None,
95        }
96    }
97
98    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
99        Self {
100            col_map,
101            row,
102            params,
103            excluded: None,
104        }
105    }
106
107    pub fn with_excluded(
108        col_map: &'a ColumnMap,
109        row: &'a [Value],
110        excluded_col_map: &'a ColumnMap,
111        excluded_row: &'a [Value],
112    ) -> Self {
113        Self {
114            col_map,
115            row,
116            params: &[],
117            excluded: Some(ExcludedRow {
118                col_map: excluded_col_map,
119                row: excluded_row,
120            }),
121        }
122    }
123}
124
125thread_local! {
126    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
127        const { std::cell::Cell::new((std::ptr::null(), 0)) };
128}
129
130/// Install positional parameters for `Expr::Parameter` resolution during `f`.
131pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
132    struct Guard((*const Value, usize));
133    impl Drop for Guard {
134        fn drop(&mut self) {
135            SCOPED_PARAMS.with(|slot| slot.set(self.0));
136        }
137    }
138    SCOPED_PARAMS.with(|slot| {
139        let prev = slot.get();
140        slot.set((params.as_ptr(), params.len()));
141        let _guard = Guard(prev);
142        f()
143    })
144}
145
146fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
147    if !ctx_params.is_empty() {
148        if n == 0 || n > ctx_params.len() {
149            return Err(SqlError::ParameterCountMismatch {
150                expected: n,
151                got: ctx_params.len(),
152            });
153        }
154        return Ok(ctx_params[n - 1].clone());
155    }
156    resolve_scoped_param(n)
157}
158
159pub fn resolve_scoped_param(n: usize) -> Result<Value> {
160    SCOPED_PARAMS.with(|slot| {
161        let (ptr, len) = slot.get();
162        if n == 0 || n > len {
163            return Err(SqlError::ParameterCountMismatch {
164                expected: n,
165                got: len,
166            });
167        }
168        // SAFETY: `with_scoped_params` keeps the slice alive for the duration of `f()`
169        // and restores the previous pointer on return. Reads only happen inside `f()`.
170        unsafe { Ok((*ptr.add(n - 1)).clone()) }
171    })
172}
173
174pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
175    match expr {
176        Expr::Literal(v) => Ok(v.clone()),
177
178        Expr::Column(name) => {
179            let idx = ctx.col_map.resolve(name)?;
180            Ok(ctx.row[idx].clone())
181        }
182
183        Expr::QualifiedColumn { table, column } => {
184            if let Some(excluded) = ctx.excluded.as_ref() {
185                if table.eq_ignore_ascii_case("excluded") {
186                    let lowered = column.to_ascii_lowercase();
187                    let idx = excluded.col_map.resolve(&lowered)?;
188                    return Ok(excluded.row[idx].clone());
189                }
190            }
191            let idx = ctx.col_map.resolve_qualified(table, column)?;
192            Ok(ctx.row[idx].clone())
193        }
194
195        Expr::BinaryOp { left, op, right } => {
196            let lval = eval_expr(left, ctx)?;
197            let rval = eval_expr(right, ctx)?;
198            eval_binary_op(&lval, *op, &rval)
199        }
200
201        Expr::UnaryOp { op, expr } => {
202            let val = eval_expr(expr, ctx)?;
203            eval_unary_op(*op, &val)
204        }
205
206        Expr::IsNull(e) => {
207            let val = eval_expr(e, ctx)?;
208            Ok(Value::Boolean(val.is_null()))
209        }
210
211        Expr::IsNotNull(e) => {
212            let val = eval_expr(e, ctx)?;
213            Ok(Value::Boolean(!val.is_null()))
214        }
215
216        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
217
218        Expr::CountStar => Err(SqlError::Unsupported(
219            "COUNT(*) in non-aggregate context".into(),
220        )),
221
222        Expr::InList {
223            expr: e,
224            list,
225            negated,
226        } => {
227            let lhs = eval_expr(e, ctx)?;
228            eval_in_values(&lhs, list, ctx, *negated)
229        }
230
231        Expr::InSet {
232            expr: e,
233            values,
234            has_null,
235            negated,
236        } => {
237            let lhs = eval_expr(e, ctx)?;
238            eval_in_set(&lhs, values, *has_null, *negated)
239        }
240
241        Expr::Between {
242            expr: e,
243            low,
244            high,
245            negated,
246        } => {
247            let val = eval_expr(e, ctx)?;
248            let lo = eval_expr(low, ctx)?;
249            let hi = eval_expr(high, ctx)?;
250            eval_between(&val, &lo, &hi, *negated)
251        }
252
253        Expr::Like {
254            expr: e,
255            pattern,
256            escape,
257            negated,
258        } => {
259            let val = eval_expr(e, ctx)?;
260            let pat = eval_expr(pattern, ctx)?;
261            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
262            eval_like(&val, &pat, esc.as_ref(), *negated)
263        }
264
265        Expr::Case {
266            operand,
267            conditions,
268            else_result,
269        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
270
271        Expr::Coalesce(args) => {
272            for arg in args {
273                let val = eval_expr(arg, ctx)?;
274                if !val.is_null() {
275                    return Ok(val);
276                }
277            }
278            Ok(Value::Null)
279        }
280
281        Expr::Cast { expr: e, data_type } => {
282            let val = eval_expr(e, ctx)?;
283            eval_cast(&val, *data_type)
284        }
285
286        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
287            SqlError::Unsupported("subquery not materialized (internal error)".into()),
288        ),
289
290        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
291
292        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
293            "window functions are only allowed in SELECT columns".into(),
294        )),
295    }
296}
297
298/// Planner-level constant folding hook; shares semantics with row evaluation.
299pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
300    eval_binary_op(left, op, right)
301}
302
303fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
304    match op {
305        BinOp::And => return eval_and(left, right),
306        BinOp::Or => return eval_or(left, right),
307        _ => {}
308    }
309
310    if left.is_null() || right.is_null() {
311        return Ok(Value::Null);
312    }
313
314    if let Some(res) = eval_temporal_op(left, op, right) {
315        return res;
316    }
317
318    match op {
319        BinOp::Eq => Ok(Value::Boolean(left == right)),
320        BinOp::NotEq => Ok(Value::Boolean(left != right)),
321        BinOp::Lt => Ok(Value::Boolean(left < right)),
322        BinOp::Gt => Ok(Value::Boolean(left > right)),
323        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
324        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
325        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
326        BinOp::Sub => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
327        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
328        BinOp::Div => {
329            match right {
330                Value::Integer(0) => return Err(SqlError::DivisionByZero),
331                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
332                _ => {}
333            }
334            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
335        }
336        BinOp::Mod => {
337            match right {
338                Value::Integer(0) => return Err(SqlError::DivisionByZero),
339                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
340                _ => {}
341            }
342            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
343        }
344        BinOp::Concat => {
345            let ls = value_to_text(left);
346            let rs = value_to_text(right);
347            Ok(Value::Text(format!("{ls}{rs}").into()))
348        }
349        BinOp::And | BinOp::Or => unreachable!(),
350    }
351}
352
353/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
354fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
355    use crate::datetime as dt;
356    use std::cmp::Ordering;
357
358    let is_temporal = |v: &Value| {
359        matches!(
360            v,
361            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
362        )
363    };
364    if matches!(op, BinOp::Add | BinOp::Sub)
365        && ((is_temporal(left) && matches!(right, Value::Real(_)))
366            || (matches!(left, Value::Real(_)) && is_temporal(right)))
367    {
368        return Some(Err(SqlError::TypeMismatch {
369            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
370            got: format!("{} and {}", left.data_type(), right.data_type()),
371        }));
372    }
373
374    match (left, op, right) {
375        (Value::Date(d), BinOp::Add, Value::Integer(n))
376        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
377            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
378        }
379        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
380            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
381        }
382        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
383            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
384        }
385        // DATE ± INTERVAL → TIMESTAMP (PG rule).
386        (
387            Value::Date(d),
388            BinOp::Add,
389            Value::Interval {
390                months,
391                days,
392                micros,
393            },
394        )
395        | (
396            Value::Interval {
397                months,
398                days,
399                micros,
400            },
401            BinOp::Add,
402            Value::Date(d),
403        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
404        (
405            Value::Date(d),
406            BinOp::Sub,
407            Value::Interval {
408                months,
409                days,
410                micros,
411            },
412        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
413        (
414            Value::Timestamp(t),
415            BinOp::Add,
416            Value::Interval {
417                months,
418                days,
419                micros,
420            },
421        )
422        | (
423            Value::Interval {
424                months,
425                days,
426                micros,
427            },
428            BinOp::Add,
429            Value::Timestamp(t),
430        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
431        (
432            Value::Timestamp(t),
433            BinOp::Sub,
434            Value::Interval {
435                months,
436                days,
437                micros,
438            },
439        ) => Some(
440            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
441        ),
442        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
443            let (days, micros) = dt::subtract_timestamps(*a, *b);
444            Some(Ok(Value::Interval {
445                months: 0,
446                days,
447                micros,
448            }))
449        }
450        (
451            Value::Time(t),
452            BinOp::Add,
453            Value::Interval {
454                months,
455                days,
456                micros,
457            },
458        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
459        (
460            Value::Time(t),
461            BinOp::Sub,
462            Value::Interval {
463                months,
464                days,
465                micros,
466            },
467        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
468        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
469            months: 0,
470            days: 0,
471            micros: *a - *b,
472        })),
473        (
474            Value::Interval {
475                months: am,
476                days: ad,
477                micros: au,
478            },
479            BinOp::Add,
480            Value::Interval {
481                months: bm,
482                days: bd,
483                micros: bu,
484            },
485        ) => Some(Ok(Value::Interval {
486            months: am.saturating_add(*bm),
487            days: ad.saturating_add(*bd),
488            micros: au.saturating_add(*bu),
489        })),
490        (
491            Value::Interval {
492                months: am,
493                days: ad,
494                micros: au,
495            },
496            BinOp::Sub,
497            Value::Interval {
498                months: bm,
499                days: bd,
500                micros: bu,
501            },
502        ) => Some(Ok(Value::Interval {
503            months: am.saturating_sub(*bm),
504            days: ad.saturating_sub(*bd),
505            micros: au.saturating_sub(*bu),
506        })),
507        (
508            Value::Interval {
509                months,
510                days,
511                micros,
512            },
513            BinOp::Mul,
514            Value::Integer(n),
515        )
516        | (
517            Value::Integer(n),
518            BinOp::Mul,
519            Value::Interval {
520                months,
521                days,
522                micros,
523            },
524        ) => {
525            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
526            Some(Ok(Value::Interval {
527                months: months.saturating_mul(n32),
528                days: days.saturating_mul(n32),
529                micros: micros.saturating_mul(*n),
530            }))
531        }
532        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
533        (
534            Value::Interval {
535                months,
536                days,
537                micros,
538            },
539            BinOp::Mul,
540            Value::Real(r),
541        )
542        | (
543            Value::Real(r),
544            BinOp::Mul,
545            Value::Interval {
546                months,
547                days,
548                micros,
549            },
550        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
551        (
552            Value::Interval {
553                months,
554                days,
555                micros,
556            },
557            BinOp::Div,
558            Value::Integer(n),
559        ) if *n != 0 => Some(Ok(Value::Interval {
560            months: (*months as i64 / *n) as i32,
561            days: (*days as i64 / *n) as i32,
562            micros: *micros / *n,
563        })),
564        (
565            Value::Interval {
566                months,
567                days,
568                micros,
569            },
570            BinOp::Div,
571            Value::Real(r),
572        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
573        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
574        (
575            Value::Interval {
576                months: am,
577                days: ad,
578                micros: au,
579            },
580            op,
581            Value::Interval {
582                months: bm,
583                days: bd,
584                micros: bu,
585            },
586        ) if matches!(
587            op,
588            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
589        ) =>
590        {
591            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
592            let b = match op {
593                BinOp::Eq => ord == Ordering::Equal,
594                BinOp::NotEq => ord != Ordering::Equal,
595                BinOp::Lt => ord == Ordering::Less,
596                BinOp::Gt => ord == Ordering::Greater,
597                BinOp::LtEq => ord != Ordering::Greater,
598                BinOp::GtEq => ord != Ordering::Less,
599                _ => unreachable!(),
600            };
601            Some(Ok(Value::Boolean(b)))
602        }
603        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
604        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
605        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
606            Some(Err(SqlError::TypeMismatch {
607                expected: "INTERVAL (use CAST or explicit unit)".into(),
608                got: format!("{} and {}", left.data_type(), right.data_type()),
609            }))
610        }
611        _ => None,
612    }
613}
614
615/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
616fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
617    let raw_months = months as f64 * factor;
618    let whole_months = raw_months.trunc() as i64;
619    let frac_months = raw_months - whole_months as f64;
620    let months_frac_as_days = frac_months * 30.0;
621
622    let raw_days = days as f64 * factor + months_frac_as_days;
623    let whole_days = raw_days.trunc() as i64;
624    let frac_days = raw_days - whole_days as f64;
625    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
626
627    let raw_micros = (micros as f64 * factor).round() as i64;
628    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
629
630    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
631    Value::Interval {
632        months: clamp_i32(whole_months),
633        days: clamp_i32(whole_days),
634        micros: total_micros,
635    }
636}
637
638/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
639fn eval_and(left: &Value, right: &Value) -> Result<Value> {
640    let l = to_bool_or_null(left)?;
641    let r = to_bool_or_null(right)?;
642    match (l, r) {
643        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
644        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
645        _ => Ok(Value::Null),
646    }
647}
648
649/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
650fn eval_or(left: &Value, right: &Value) -> Result<Value> {
651    let l = to_bool_or_null(left)?;
652    let r = to_bool_or_null(right)?;
653    match (l, r) {
654        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
655        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
656        _ => Ok(Value::Null),
657    }
658}
659
660fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
661    match val {
662        Value::Boolean(b) => Ok(Some(*b)),
663        Value::Null => Ok(None),
664        Value::Integer(i) => Ok(Some(*i != 0)),
665        _ => Err(SqlError::TypeMismatch {
666            expected: "BOOLEAN".into(),
667            got: format!("{}", val.data_type()),
668        }),
669    }
670}
671
672fn eval_arithmetic(
673    left: &Value,
674    right: &Value,
675    int_op: fn(i64, i64) -> Option<i64>,
676    real_op: fn(f64, f64) -> f64,
677) -> Result<Value> {
678    match (left, right) {
679        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
680            .map(Value::Integer)
681            .ok_or(SqlError::IntegerOverflow),
682        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
683        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
684        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
685        _ => Err(SqlError::TypeMismatch {
686            expected: "numeric".into(),
687            got: format!("{} and {}", left.data_type(), right.data_type()),
688        }),
689    }
690}
691
692fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
693    if list.is_empty() {
694        return Ok(Value::Boolean(negated));
695    }
696    if lhs.is_null() {
697        return Ok(Value::Null);
698    }
699    let mut has_null = false;
700    for item in list {
701        let rhs = eval_expr(item, ctx)?;
702        if rhs.is_null() {
703            has_null = true;
704        } else if lhs == &rhs {
705            return Ok(Value::Boolean(!negated));
706        }
707    }
708    if has_null {
709        Ok(Value::Null)
710    } else {
711        Ok(Value::Boolean(negated))
712    }
713}
714
715fn eval_in_set(
716    lhs: &Value,
717    values: &std::collections::HashSet<Value>,
718    has_null: bool,
719    negated: bool,
720) -> Result<Value> {
721    if values.is_empty() && !has_null {
722        return Ok(Value::Boolean(negated));
723    }
724    if lhs.is_null() {
725        return Ok(Value::Null);
726    }
727    if values.contains(lhs) {
728        return Ok(Value::Boolean(!negated));
729    }
730    if has_null {
731        Ok(Value::Null)
732    } else {
733        Ok(Value::Boolean(negated))
734    }
735}
736
737fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
738    if val.is_null() {
739        return Ok(Value::Null);
740    }
741    match op {
742        UnaryOp::Neg => match val {
743            Value::Integer(i) => i
744                .checked_neg()
745                .map(Value::Integer)
746                .ok_or(SqlError::IntegerOverflow),
747            Value::Real(r) => Ok(Value::Real(-r)),
748            Value::Interval {
749                months,
750                days,
751                micros,
752            } => {
753                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
754                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
755                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
756                Ok(Value::Interval {
757                    months: m,
758                    days: d,
759                    micros: u,
760                })
761            }
762            _ => Err(SqlError::TypeMismatch {
763                expected: "numeric or INTERVAL".into(),
764                got: format!("{}", val.data_type()),
765            }),
766        },
767        UnaryOp::Not => match val {
768            Value::Boolean(b) => Ok(Value::Boolean(!b)),
769            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
770            _ => Err(SqlError::TypeMismatch {
771                expected: "BOOLEAN".into(),
772                got: format!("{}", val.data_type()),
773            }),
774        },
775    }
776}
777
778fn value_to_text(val: &Value) -> String {
779    match val {
780        Value::Text(s) => s.to_string(),
781        Value::Integer(i) => i.to_string(),
782        Value::Real(r) => {
783            if r.fract() == 0.0 && r.is_finite() {
784                format!("{r:.1}")
785            } else {
786                format!("{r}")
787            }
788        }
789        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
790        Value::Null => String::new(),
791        Value::Blob(b) => {
792            let mut s = String::with_capacity(b.len() * 2);
793            for byte in b {
794                s.push_str(&format!("{byte:02X}"));
795            }
796            s
797        }
798        Value::Date(d) => crate::datetime::format_date(*d),
799        Value::Time(t) => crate::datetime::format_time(*t),
800        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
801        Value::Interval {
802            months,
803            days,
804            micros,
805        } => crate::datetime::format_interval(*months, *days, *micros),
806    }
807}
808
809fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
810    if val.is_null() || low.is_null() || high.is_null() {
811        let ge = if val.is_null() || low.is_null() {
812            None
813        } else {
814            Some(*val >= *low)
815        };
816        let le = if val.is_null() || high.is_null() {
817            None
818        } else {
819            Some(*val <= *high)
820        };
821
822        let result = match (ge, le) {
823            (Some(false), _) | (_, Some(false)) => Some(false),
824            (Some(true), Some(true)) => Some(true),
825            _ => None,
826        };
827
828        return match result {
829            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
830            None => Ok(Value::Null),
831        };
832    }
833
834    let in_range = *val >= *low && *val <= *high;
835    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
836}
837
838const MAX_LIKE_PATTERN_LEN: usize = 10_000;
839
840fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
841    if val.is_null() || pattern.is_null() {
842        return Ok(Value::Null);
843    }
844    let text = match val {
845        Value::Text(s) => s.as_str(),
846        _ => {
847            return Err(SqlError::TypeMismatch {
848                expected: "TEXT".into(),
849                got: val.data_type().to_string(),
850            })
851        }
852    };
853    let pat = match pattern {
854        Value::Text(s) => s.as_str(),
855        _ => {
856            return Err(SqlError::TypeMismatch {
857                expected: "TEXT".into(),
858                got: pattern.data_type().to_string(),
859            })
860        }
861    };
862
863    if pat.len() > MAX_LIKE_PATTERN_LEN {
864        return Err(SqlError::InvalidValue(format!(
865            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
866            pat.len()
867        )));
868    }
869
870    let esc_char = match escape {
871        Some(Value::Text(s)) => {
872            let mut chars = s.chars();
873            let c = chars.next().ok_or_else(|| {
874                SqlError::InvalidValue("ESCAPE must be a single character".into())
875            })?;
876            if chars.next().is_some() {
877                return Err(SqlError::InvalidValue(
878                    "ESCAPE must be a single character".into(),
879                ));
880            }
881            Some(c)
882        }
883        Some(Value::Null) => return Ok(Value::Null),
884        Some(_) => {
885            return Err(SqlError::TypeMismatch {
886                expected: "TEXT".into(),
887                got: "non-text".into(),
888            })
889        }
890        None => None,
891    };
892
893    let matched = like_match(text, pat, esc_char);
894    Ok(Value::Boolean(if negated { !matched } else { matched }))
895}
896
897fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
898    let t: Vec<char> = text.chars().collect();
899    let p: Vec<char> = pattern.chars().collect();
900    like_match_impl(&t, &p, 0, 0, escape)
901}
902
903fn like_match_impl(
904    t: &[char],
905    p: &[char],
906    mut ti: usize,
907    mut pi: usize,
908    esc: Option<char>,
909) -> bool {
910    let mut star_pi: Option<usize> = None;
911    let mut star_ti: usize = 0;
912
913    while ti < t.len() {
914        if pi < p.len() {
915            if let Some(ec) = esc {
916                if p[pi] == ec && pi + 1 < p.len() {
917                    pi += 1;
918                    let pc_lower = p[pi].to_ascii_lowercase();
919                    let tc_lower = t[ti].to_ascii_lowercase();
920                    if pc_lower == tc_lower {
921                        pi += 1;
922                        ti += 1;
923                        continue;
924                    } else if let Some(sp) = star_pi {
925                        pi = sp + 1;
926                        star_ti += 1;
927                        ti = star_ti;
928                        continue;
929                    } else {
930                        return false;
931                    }
932                }
933            }
934            if p[pi] == '%' {
935                star_pi = Some(pi);
936                star_ti = ti;
937                pi += 1;
938                continue;
939            }
940            if p[pi] == '_' {
941                pi += 1;
942                ti += 1;
943                continue;
944            }
945            if p[pi].eq_ignore_ascii_case(&t[ti]) {
946                pi += 1;
947                ti += 1;
948                continue;
949            }
950        }
951        if let Some(sp) = star_pi {
952            pi = sp + 1;
953            star_ti += 1;
954            ti = star_ti;
955        } else {
956            return false;
957        }
958    }
959
960    while pi < p.len() && p[pi] == '%' {
961        pi += 1;
962    }
963    pi == p.len()
964}
965
966fn eval_case(
967    operand: Option<&Expr>,
968    conditions: &[(Expr, Expr)],
969    else_result: Option<&Expr>,
970    ctx: &EvalCtx,
971) -> Result<Value> {
972    if let Some(op_expr) = operand {
973        let op_val = eval_expr(op_expr, ctx)?;
974        for (cond, result) in conditions {
975            let cond_val = eval_expr(cond, ctx)?;
976            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
977                return eval_expr(result, ctx);
978            }
979        }
980    } else {
981        for (cond, result) in conditions {
982            let cond_val = eval_expr(cond, ctx)?;
983            if is_truthy(&cond_val) {
984                return eval_expr(result, ctx);
985            }
986        }
987    }
988    match else_result {
989        Some(e) => eval_expr(e, ctx),
990        None => Ok(Value::Null),
991    }
992}
993
994fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
995    if val.is_null() {
996        return Ok(Value::Null);
997    }
998    match target {
999        DataType::Integer => match val {
1000            Value::Integer(_) => Ok(val.clone()),
1001            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1002            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1003            Value::Text(s) => s
1004                .trim()
1005                .parse::<i64>()
1006                .map(Value::Integer)
1007                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1008                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1009            _ => Err(SqlError::InvalidValue(format!(
1010                "cannot cast {} to INTEGER",
1011                val.data_type()
1012            ))),
1013        },
1014        DataType::Real => match val {
1015            Value::Real(_) => Ok(val.clone()),
1016            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1017            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1018            Value::Text(s) => s
1019                .trim()
1020                .parse::<f64>()
1021                .map(Value::Real)
1022                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1023            _ => Err(SqlError::InvalidValue(format!(
1024                "cannot cast {} to REAL",
1025                val.data_type()
1026            ))),
1027        },
1028        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1029        DataType::Boolean => match val {
1030            Value::Boolean(_) => Ok(val.clone()),
1031            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1032            Value::Text(s) => {
1033                let lower = s.trim().to_ascii_lowercase();
1034                match lower.as_str() {
1035                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1036                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1037                    _ => Err(SqlError::InvalidValue(format!(
1038                        "cannot cast '{s}' to BOOLEAN"
1039                    ))),
1040                }
1041            }
1042            _ => Err(SqlError::InvalidValue(format!(
1043                "cannot cast {} to BOOLEAN",
1044                val.data_type()
1045            ))),
1046        },
1047        DataType::Blob => match val {
1048            Value::Blob(_) => Ok(val.clone()),
1049            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1050            _ => Err(SqlError::InvalidValue(format!(
1051                "cannot cast {} to BLOB",
1052                val.data_type()
1053            ))),
1054        },
1055        DataType::Null => Ok(Value::Null),
1056        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1057            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1058        }),
1059        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1060            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1061        }),
1062        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1063            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1064        }),
1065        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1066            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1067        }),
1068    }
1069}
1070
1071fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1072    let evaluated: Vec<Value> = args
1073        .iter()
1074        .map(|a| eval_expr(a, ctx))
1075        .collect::<Result<Vec<_>>>()?;
1076
1077    match name {
1078        "LENGTH" => {
1079            check_args(name, &evaluated, 1)?;
1080            match &evaluated[0] {
1081                Value::Null => Ok(Value::Null),
1082                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1083                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1084                _ => Ok(Value::Integer(
1085                    value_to_text(&evaluated[0]).chars().count() as i64
1086                )),
1087            }
1088        }
1089        "UPPER" => {
1090            check_args(name, &evaluated, 1)?;
1091            match &evaluated[0] {
1092                Value::Null => Ok(Value::Null),
1093                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1094                _ => Ok(Value::Text(
1095                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1096                )),
1097            }
1098        }
1099        "LOWER" => {
1100            check_args(name, &evaluated, 1)?;
1101            match &evaluated[0] {
1102                Value::Null => Ok(Value::Null),
1103                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1104                _ => Ok(Value::Text(
1105                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1106                )),
1107            }
1108        }
1109        "SUBSTR" | "SUBSTRING" => {
1110            if evaluated.len() < 2 || evaluated.len() > 3 {
1111                return Err(SqlError::InvalidValue(format!(
1112                    "{name} requires 2 or 3 arguments"
1113                )));
1114            }
1115            if evaluated.iter().any(|v| v.is_null()) {
1116                return Ok(Value::Null);
1117            }
1118            let s = value_to_text(&evaluated[0]);
1119            let chars: Vec<char> = s.chars().collect();
1120            let start = match &evaluated[1] {
1121                Value::Integer(i) => *i,
1122                _ => {
1123                    return Err(SqlError::TypeMismatch {
1124                        expected: "INTEGER".into(),
1125                        got: evaluated[1].data_type().to_string(),
1126                    })
1127                }
1128            };
1129            let len = chars.len() as i64;
1130
1131            let (begin, count) = if evaluated.len() == 3 {
1132                let cnt = match &evaluated[2] {
1133                    Value::Integer(i) => *i,
1134                    _ => {
1135                        return Err(SqlError::TypeMismatch {
1136                            expected: "INTEGER".into(),
1137                            got: evaluated[2].data_type().to_string(),
1138                        })
1139                    }
1140                };
1141                if start >= 1 {
1142                    let b = (start - 1).min(len) as usize;
1143                    let c = cnt.max(0) as usize;
1144                    (b, c)
1145                } else if start == 0 {
1146                    let c = (cnt - 1).max(0) as usize;
1147                    (0usize, c)
1148                } else {
1149                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1150                    (0usize, adjusted_cnt)
1151                }
1152            } else if start >= 1 {
1153                let b = (start - 1).min(len) as usize;
1154                (b, chars.len() - b)
1155            } else if start == 0 {
1156                (0usize, chars.len())
1157            } else {
1158                let b = (len + start).max(0) as usize;
1159                (b, chars.len() - b)
1160            };
1161
1162            let result: String = chars.iter().skip(begin).take(count).collect();
1163            Ok(Value::Text(result.into()))
1164        }
1165        "TRIM" | "LTRIM" | "RTRIM" => {
1166            if evaluated.is_empty() || evaluated.len() > 2 {
1167                return Err(SqlError::InvalidValue(format!(
1168                    "{name} requires 1 or 2 arguments"
1169                )));
1170            }
1171            if evaluated[0].is_null() {
1172                return Ok(Value::Null);
1173            }
1174            let s = value_to_text(&evaluated[0]);
1175            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1176                if evaluated[1].is_null() {
1177                    return Ok(Value::Null);
1178                }
1179                value_to_text(&evaluated[1]).chars().collect()
1180            } else {
1181                vec![' ']
1182            };
1183            let result = match name {
1184                "TRIM" => s
1185                    .trim_matches(|c: char| trim_chars.contains(&c))
1186                    .to_string(),
1187                "LTRIM" => s
1188                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1189                    .to_string(),
1190                "RTRIM" => s
1191                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1192                    .to_string(),
1193                _ => unreachable!(),
1194            };
1195            Ok(Value::Text(result.into()))
1196        }
1197        "REPLACE" => {
1198            check_args(name, &evaluated, 3)?;
1199            if evaluated.iter().any(|v| v.is_null()) {
1200                return Ok(Value::Null);
1201            }
1202            let s = value_to_text(&evaluated[0]);
1203            let from = value_to_text(&evaluated[1]);
1204            let to = value_to_text(&evaluated[2]);
1205            if from.is_empty() {
1206                return Ok(Value::Text(s.into()));
1207            }
1208            Ok(Value::Text(s.replace(&from, &to).into()))
1209        }
1210        "INSTR" => {
1211            check_args(name, &evaluated, 2)?;
1212            if evaluated.iter().any(|v| v.is_null()) {
1213                return Ok(Value::Null);
1214            }
1215            let haystack = value_to_text(&evaluated[0]);
1216            let needle = value_to_text(&evaluated[1]);
1217            let pos = haystack
1218                .find(&needle)
1219                .map(|i| haystack[..i].chars().count() as i64 + 1)
1220                .unwrap_or(0);
1221            Ok(Value::Integer(pos))
1222        }
1223        "CONCAT" => {
1224            if evaluated.is_empty() {
1225                return Ok(Value::Text(CompactString::default()));
1226            }
1227            let mut result = String::new();
1228            for v in &evaluated {
1229                match v {
1230                    Value::Null => {}
1231                    _ => result.push_str(&value_to_text(v)),
1232                }
1233            }
1234            Ok(Value::Text(result.into()))
1235        }
1236        "ABS" => {
1237            check_args(name, &evaluated, 1)?;
1238            match &evaluated[0] {
1239                Value::Null => Ok(Value::Null),
1240                Value::Integer(i) => i
1241                    .checked_abs()
1242                    .map(Value::Integer)
1243                    .ok_or(SqlError::IntegerOverflow),
1244                Value::Real(r) => Ok(Value::Real(r.abs())),
1245                _ => Err(SqlError::TypeMismatch {
1246                    expected: "numeric".into(),
1247                    got: evaluated[0].data_type().to_string(),
1248                }),
1249            }
1250        }
1251        "ROUND" => {
1252            if evaluated.is_empty() || evaluated.len() > 2 {
1253                return Err(SqlError::InvalidValue(
1254                    "ROUND requires 1 or 2 arguments".into(),
1255                ));
1256            }
1257            if evaluated[0].is_null() {
1258                return Ok(Value::Null);
1259            }
1260            let val = match &evaluated[0] {
1261                Value::Integer(i) => *i as f64,
1262                Value::Real(r) => *r,
1263                _ => {
1264                    return Err(SqlError::TypeMismatch {
1265                        expected: "numeric".into(),
1266                        got: evaluated[0].data_type().to_string(),
1267                    })
1268                }
1269            };
1270            let places = if evaluated.len() == 2 {
1271                match &evaluated[1] {
1272                    Value::Null => return Ok(Value::Null),
1273                    Value::Integer(i) => *i,
1274                    _ => {
1275                        return Err(SqlError::TypeMismatch {
1276                            expected: "INTEGER".into(),
1277                            got: evaluated[1].data_type().to_string(),
1278                        })
1279                    }
1280                }
1281            } else {
1282                0
1283            };
1284            let factor = 10f64.powi(places as i32);
1285            let rounded = (val * factor).round() / factor;
1286            Ok(Value::Real(rounded))
1287        }
1288        "CEIL" | "CEILING" => {
1289            check_args(name, &evaluated, 1)?;
1290            match &evaluated[0] {
1291                Value::Null => Ok(Value::Null),
1292                Value::Integer(i) => Ok(Value::Integer(*i)),
1293                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1294                _ => Err(SqlError::TypeMismatch {
1295                    expected: "numeric".into(),
1296                    got: evaluated[0].data_type().to_string(),
1297                }),
1298            }
1299        }
1300        "FLOOR" => {
1301            check_args(name, &evaluated, 1)?;
1302            match &evaluated[0] {
1303                Value::Null => Ok(Value::Null),
1304                Value::Integer(i) => Ok(Value::Integer(*i)),
1305                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1306                _ => Err(SqlError::TypeMismatch {
1307                    expected: "numeric".into(),
1308                    got: evaluated[0].data_type().to_string(),
1309                }),
1310            }
1311        }
1312        "SIGN" => {
1313            check_args(name, &evaluated, 1)?;
1314            match &evaluated[0] {
1315                Value::Null => Ok(Value::Null),
1316                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1317                Value::Real(r) => {
1318                    if *r > 0.0 {
1319                        Ok(Value::Integer(1))
1320                    } else if *r < 0.0 {
1321                        Ok(Value::Integer(-1))
1322                    } else {
1323                        Ok(Value::Integer(0))
1324                    }
1325                }
1326                _ => Err(SqlError::TypeMismatch {
1327                    expected: "numeric".into(),
1328                    got: evaluated[0].data_type().to_string(),
1329                }),
1330            }
1331        }
1332        "SQRT" => {
1333            check_args(name, &evaluated, 1)?;
1334            match &evaluated[0] {
1335                Value::Null => Ok(Value::Null),
1336                Value::Integer(i) => {
1337                    if *i < 0 {
1338                        Ok(Value::Null)
1339                    } else {
1340                        Ok(Value::Real((*i as f64).sqrt()))
1341                    }
1342                }
1343                Value::Real(r) => {
1344                    if *r < 0.0 {
1345                        Ok(Value::Null)
1346                    } else {
1347                        Ok(Value::Real(r.sqrt()))
1348                    }
1349                }
1350                _ => Err(SqlError::TypeMismatch {
1351                    expected: "numeric".into(),
1352                    got: evaluated[0].data_type().to_string(),
1353                }),
1354            }
1355        }
1356        "RANDOM" => {
1357            check_args(name, &evaluated, 0)?;
1358            use std::collections::hash_map::DefaultHasher;
1359            use std::hash::{Hash, Hasher};
1360            use std::time::SystemTime;
1361            let mut hasher = DefaultHasher::new();
1362            SystemTime::now().hash(&mut hasher);
1363            std::thread::current().id().hash(&mut hasher);
1364            let mut val = hasher.finish() as i64;
1365            if val == i64::MIN {
1366                val = i64::MAX;
1367            }
1368            Ok(Value::Integer(val))
1369        }
1370        "TYPEOF" => {
1371            check_args(name, &evaluated, 1)?;
1372            let type_name = match &evaluated[0] {
1373                Value::Null => "null",
1374                Value::Integer(_) => "integer",
1375                Value::Real(_) => "real",
1376                Value::Text(_) => "text",
1377                Value::Blob(_) => "blob",
1378                Value::Boolean(_) => "boolean",
1379                Value::Date(_) => "date",
1380                Value::Time(_) => "time",
1381                Value::Timestamp(_) => "timestamp",
1382                Value::Interval { .. } => "interval",
1383            };
1384            Ok(Value::Text(type_name.into()))
1385        }
1386        "MIN" => {
1387            check_args(name, &evaluated, 2)?;
1388            if evaluated[0].is_null() {
1389                return Ok(evaluated[1].clone());
1390            }
1391            if evaluated[1].is_null() {
1392                return Ok(evaluated[0].clone());
1393            }
1394            if evaluated[0] <= evaluated[1] {
1395                Ok(evaluated[0].clone())
1396            } else {
1397                Ok(evaluated[1].clone())
1398            }
1399        }
1400        "MAX" => {
1401            check_args(name, &evaluated, 2)?;
1402            if evaluated[0].is_null() {
1403                return Ok(evaluated[1].clone());
1404            }
1405            if evaluated[1].is_null() {
1406                return Ok(evaluated[0].clone());
1407            }
1408            if evaluated[0] >= evaluated[1] {
1409                Ok(evaluated[0].clone())
1410            } else {
1411                Ok(evaluated[1].clone())
1412            }
1413        }
1414        "HEX" => {
1415            check_args(name, &evaluated, 1)?;
1416            match &evaluated[0] {
1417                Value::Null => Ok(Value::Null),
1418                Value::Blob(b) => {
1419                    let mut s = String::with_capacity(b.len() * 2);
1420                    for byte in b {
1421                        s.push_str(&format!("{byte:02X}"));
1422                    }
1423                    Ok(Value::Text(s.into()))
1424                }
1425                Value::Text(s) => {
1426                    let mut r = String::with_capacity(s.len() * 2);
1427                    for byte in s.as_bytes() {
1428                        r.push_str(&format!("{byte:02X}"));
1429                    }
1430                    Ok(Value::Text(r.into()))
1431                }
1432                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
1433            }
1434        }
1435        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
1436            check_args(name, &evaluated, 0)?;
1437            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
1438        }
1439        "CURRENT_DATE" => {
1440            check_args(name, &evaluated, 0)?;
1441            Ok(Value::Date(crate::datetime::ts_to_date_floor(
1442                crate::datetime::txn_or_clock_micros(),
1443            )))
1444        }
1445        "CURRENT_TIME" | "LOCALTIME" => {
1446            check_args(name, &evaluated, 0)?;
1447            Ok(Value::Time(
1448                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
1449            ))
1450        }
1451        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
1452            check_args(name, &evaluated, 0)?;
1453            let ts = match name {
1454                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
1455                _ => crate::datetime::txn_or_clock_micros(),
1456            };
1457            Ok(Value::Timestamp(ts))
1458        }
1459        "EXTRACT" | "DATE_PART" | "DATEPART" => {
1460            check_args(name, &evaluated, 2)?;
1461            // Borrow the field str without allocating; datetime::extract accepts &str.
1462            let field: &str = match &evaluated[0] {
1463                Value::Null => return Ok(Value::Null),
1464                Value::Text(s) => s.as_str(),
1465                _ => {
1466                    return Err(SqlError::TypeMismatch {
1467                        expected: "TEXT field name".into(),
1468                        got: evaluated[0].data_type().to_string(),
1469                    })
1470                }
1471            };
1472            if evaluated[1].is_null() {
1473                return Ok(Value::Null);
1474            }
1475            crate::datetime::extract(field, &evaluated[1])
1476        }
1477        "DATE_TRUNC" => {
1478            if evaluated.len() < 2 || evaluated.len() > 3 {
1479                return Err(SqlError::InvalidValue(
1480                    "DATE_TRUNC requires 2 or 3 arguments".into(),
1481                ));
1482            }
1483            let unit = match &evaluated[0] {
1484                Value::Null => return Ok(Value::Null),
1485                Value::Text(s) => s.to_string(),
1486                _ => {
1487                    return Err(SqlError::TypeMismatch {
1488                        expected: "TEXT unit name".into(),
1489                        got: evaluated[0].data_type().to_string(),
1490                    })
1491                }
1492            };
1493            if evaluated[1].is_null() {
1494                return Ok(Value::Null);
1495            }
1496            // Optional tz arg: truncate in that zone, then convert back to UTC.
1497            if evaluated.len() == 3 {
1498                if let Value::Text(tz) = &evaluated[2] {
1499                    if !tz.eq_ignore_ascii_case("UTC") {
1500                        if let Value::Timestamp(ts) = &evaluated[1] {
1501                            return date_trunc_in_zone(&unit, *ts, tz);
1502                        }
1503                    }
1504                }
1505            }
1506            crate::datetime::date_trunc(&unit, &evaluated[1])
1507        }
1508        "DATE_BIN" => {
1509            check_args(name, &evaluated, 3)?;
1510            if evaluated.iter().any(|v| v.is_null()) {
1511                return Ok(Value::Null);
1512            }
1513            let stride = match &evaluated[0] {
1514                Value::Interval {
1515                    months: _,
1516                    days,
1517                    micros,
1518                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
1519                _ => {
1520                    return Err(SqlError::TypeMismatch {
1521                        expected: "INTERVAL stride".into(),
1522                        got: evaluated[0].data_type().to_string(),
1523                    })
1524                }
1525            };
1526            if stride <= 0 {
1527                return Err(SqlError::InvalidValue(
1528                    "DATE_BIN stride must be positive".into(),
1529                ));
1530            }
1531            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
1532                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
1533                _ => {
1534                    return Err(SqlError::TypeMismatch {
1535                        expected: "TIMESTAMP, TIMESTAMP".into(),
1536                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
1537                    })
1538                }
1539            };
1540            let diff = src - origin;
1541            let binned = origin + (diff.div_euclid(stride)) * stride;
1542            Ok(Value::Timestamp(binned))
1543        }
1544        "AGE" => {
1545            if evaluated.len() == 1 {
1546                if evaluated[0].is_null() {
1547                    return Ok(Value::Null);
1548                }
1549                let ts = match &evaluated[0] {
1550                    Value::Timestamp(t) => *t,
1551                    Value::Date(d) => crate::datetime::date_to_ts(*d),
1552                    _ => {
1553                        return Err(SqlError::TypeMismatch {
1554                            expected: "TIMESTAMP or DATE".into(),
1555                            got: evaluated[0].data_type().to_string(),
1556                        })
1557                    }
1558                };
1559                // Implicit reference: today at midnight UTC.
1560                let today = crate::datetime::today_days();
1561                let midnight = crate::datetime::date_to_ts(today);
1562                let (m, d, u) = crate::datetime::age(midnight, ts)?;
1563                return Ok(Value::Interval {
1564                    months: m,
1565                    days: d,
1566                    micros: u,
1567                });
1568            }
1569            check_args(name, &evaluated, 2)?;
1570            if evaluated.iter().any(|v| v.is_null()) {
1571                return Ok(Value::Null);
1572            }
1573            let a = ts_of(&evaluated[0])?;
1574            let b = ts_of(&evaluated[1])?;
1575            let (m, d, u) = crate::datetime::age(a, b)?;
1576            Ok(Value::Interval {
1577                months: m,
1578                days: d,
1579                micros: u,
1580            })
1581        }
1582        "MAKE_DATE" => {
1583            check_args(name, &evaluated, 3)?;
1584            if evaluated.iter().any(|v| v.is_null()) {
1585                return Ok(Value::Null);
1586            }
1587            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
1588            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
1589            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
1590            crate::datetime::ymd_to_days(y, m, d)
1591                .map(Value::Date)
1592                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
1593        }
1594        "MAKE_TIME" => {
1595            check_args(name, &evaluated, 3)?;
1596            if evaluated.iter().any(|v| v.is_null()) {
1597                return Ok(Value::Null);
1598            }
1599            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
1600            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
1601            let (s, us) = real_sec_arg(&evaluated[2])?;
1602            crate::datetime::hmsn_to_micros(h, mi, s, us)
1603                .map(Value::Time)
1604                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
1605        }
1606        "MAKE_TIMESTAMP" => {
1607            check_args(name, &evaluated, 6)?;
1608            if evaluated.iter().any(|v| v.is_null()) {
1609                return Ok(Value::Null);
1610            }
1611            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
1612            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
1613            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
1614            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
1615            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
1616            let (s, us) = real_sec_arg(&evaluated[5])?;
1617            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
1618                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
1619            })?;
1620            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
1621                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
1622            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
1623        }
1624        "MAKE_INTERVAL" => {
1625            // Positional args: years, months, weeks, days, hours, mins, secs.
1626            if evaluated.len() > 7 {
1627                return Err(SqlError::InvalidValue(
1628                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
1629                ));
1630            }
1631            let mut months: i64 = 0;
1632            let mut days: i64 = 0;
1633            let mut micros: i64 = 0;
1634            for (i, v) in evaluated.iter().enumerate() {
1635                if v.is_null() {
1636                    continue;
1637                }
1638                let n = match v {
1639                    Value::Integer(n) => *n,
1640                    Value::Real(r) => *r as i64,
1641                    _ => {
1642                        return Err(SqlError::TypeMismatch {
1643                            expected: "numeric".into(),
1644                            got: v.data_type().to_string(),
1645                        })
1646                    }
1647                };
1648                match i {
1649                    0 => months = months.saturating_add(n.saturating_mul(12)),
1650                    1 => months = months.saturating_add(n),
1651                    2 => days = days.saturating_add(n.saturating_mul(7)),
1652                    3 => days = days.saturating_add(n),
1653                    4 => {
1654                        micros = micros
1655                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
1656                    }
1657                    5 => {
1658                        micros =
1659                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
1660                    }
1661                    6 => {
1662                        // Seconds may be fractional — also check Real.
1663                        if let Value::Real(r) = v {
1664                            micros = micros.saturating_add(
1665                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
1666                            );
1667                        } else {
1668                            micros = micros
1669                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
1670                        }
1671                    }
1672                    _ => unreachable!(),
1673                }
1674            }
1675            Ok(Value::Interval {
1676                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1677                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1678                micros,
1679            })
1680        }
1681        "JUSTIFY_DAYS" => {
1682            check_args(name, &evaluated, 1)?;
1683            match &evaluated[0] {
1684                Value::Null => Ok(Value::Null),
1685                Value::Interval {
1686                    months,
1687                    days,
1688                    micros,
1689                } => {
1690                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
1691                    Ok(Value::Interval {
1692                        months: m,
1693                        days: d,
1694                        micros: u,
1695                    })
1696                }
1697                other => Err(SqlError::TypeMismatch {
1698                    expected: "INTERVAL".into(),
1699                    got: other.data_type().to_string(),
1700                }),
1701            }
1702        }
1703        "JUSTIFY_HOURS" => {
1704            check_args(name, &evaluated, 1)?;
1705            match &evaluated[0] {
1706                Value::Null => Ok(Value::Null),
1707                Value::Interval {
1708                    months,
1709                    days,
1710                    micros,
1711                } => {
1712                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
1713                    Ok(Value::Interval {
1714                        months: m,
1715                        days: d,
1716                        micros: u,
1717                    })
1718                }
1719                other => Err(SqlError::TypeMismatch {
1720                    expected: "INTERVAL".into(),
1721                    got: other.data_type().to_string(),
1722                }),
1723            }
1724        }
1725        "JUSTIFY_INTERVAL" => {
1726            check_args(name, &evaluated, 1)?;
1727            match &evaluated[0] {
1728                Value::Null => Ok(Value::Null),
1729                Value::Interval {
1730                    months,
1731                    days,
1732                    micros,
1733                } => {
1734                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
1735                    Ok(Value::Interval {
1736                        months: m,
1737                        days: d,
1738                        micros: u,
1739                    })
1740                }
1741                other => Err(SqlError::TypeMismatch {
1742                    expected: "INTERVAL".into(),
1743                    got: other.data_type().to_string(),
1744                }),
1745            }
1746        }
1747        "ISFINITE" => {
1748            check_args(name, &evaluated, 1)?;
1749            if evaluated[0].is_null() {
1750                return Ok(Value::Null);
1751            }
1752            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
1753        }
1754        "DATE" => {
1755            if evaluated.is_empty() {
1756                return Err(SqlError::InvalidValue(
1757                    "DATE requires at least 1 argument".into(),
1758                ));
1759            }
1760            if evaluated[0].is_null() {
1761                return Ok(Value::Null);
1762            }
1763            let d = match &evaluated[0] {
1764                Value::Date(d) => *d,
1765                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
1766                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
1767                Value::Text(s) => crate::datetime::parse_date(s)?,
1768                Value::Integer(n) => {
1769                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
1770                }
1771                other => {
1772                    return Err(SqlError::TypeMismatch {
1773                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1774                        got: other.data_type().to_string(),
1775                    })
1776                }
1777            };
1778            Ok(Value::Date(d))
1779        }
1780        "TIME" => {
1781            if evaluated.is_empty() {
1782                return Err(SqlError::InvalidValue(
1783                    "TIME requires at least 1 argument".into(),
1784                ));
1785            }
1786            if evaluated[0].is_null() {
1787                return Ok(Value::Null);
1788            }
1789            let t = match &evaluated[0] {
1790                Value::Time(t) => *t,
1791                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
1792                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
1793                    crate::datetime::current_time_micros()
1794                }
1795                Value::Text(s) => crate::datetime::parse_time(s)?,
1796                other => {
1797                    return Err(SqlError::TypeMismatch {
1798                        expected: "TIMESTAMP, TIME, or TEXT".into(),
1799                        got: other.data_type().to_string(),
1800                    })
1801                }
1802            };
1803            Ok(Value::Time(t))
1804        }
1805        "DATETIME" => {
1806            if evaluated.is_empty() {
1807                return Err(SqlError::InvalidValue(
1808                    "DATETIME requires at least 1 argument".into(),
1809                ));
1810            }
1811            if evaluated[0].is_null() {
1812                return Ok(Value::Null);
1813            }
1814            let t = match &evaluated[0] {
1815                Value::Timestamp(t) => *t,
1816                Value::Date(d) => crate::datetime::date_to_ts(*d),
1817                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
1818                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
1819                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
1820                other => {
1821                    return Err(SqlError::TypeMismatch {
1822                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1823                        got: other.data_type().to_string(),
1824                    })
1825                }
1826            };
1827            Ok(Value::Timestamp(t))
1828        }
1829        "STRFTIME" => {
1830            if evaluated.len() < 2 {
1831                return Err(SqlError::InvalidValue(
1832                    "STRFTIME requires format + value".into(),
1833                ));
1834            }
1835            if evaluated.iter().take(2).any(|v| v.is_null()) {
1836                return Ok(Value::Null);
1837            }
1838            let fmt = match &evaluated[0] {
1839                Value::Text(s) => s.to_string(),
1840                _ => {
1841                    return Err(SqlError::TypeMismatch {
1842                        expected: "TEXT format".into(),
1843                        got: evaluated[0].data_type().to_string(),
1844                    })
1845                }
1846            };
1847            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
1848            Ok(Value::Text(out.into()))
1849        }
1850        "JULIANDAY" => {
1851            if evaluated.is_empty() {
1852                return Err(SqlError::InvalidValue(
1853                    "JULIANDAY requires at least 1 argument".into(),
1854                ));
1855            }
1856            if evaluated[0].is_null() {
1857                return Ok(Value::Null);
1858            }
1859            let micros = ts_of(&evaluated[0])?;
1860            let (days, tmicros) = crate::datetime::ts_split(micros);
1861            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
1862            let julian =
1863                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
1864            Ok(Value::Real(julian))
1865        }
1866        "UNIXEPOCH" => {
1867            if evaluated.is_empty() {
1868                return Err(SqlError::InvalidValue(
1869                    "UNIXEPOCH requires at least 1 argument".into(),
1870                ));
1871            }
1872            if evaluated[0].is_null() {
1873                return Ok(Value::Null);
1874            }
1875            let micros = ts_of(&evaluated[0])?;
1876            let subsec = evaluated
1877                .get(1)
1878                .and_then(|v| {
1879                    if let Value::Text(s) = v {
1880                        Some(s.to_string())
1881                    } else {
1882                        None
1883                    }
1884                })
1885                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
1886                .unwrap_or(false);
1887            if subsec {
1888                Ok(Value::Real(
1889                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
1890                ))
1891            } else {
1892                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
1893            }
1894        }
1895        "TIMEDIFF" => {
1896            check_args(name, &evaluated, 2)?;
1897            if evaluated.iter().any(|v| v.is_null()) {
1898                return Ok(Value::Null);
1899            }
1900            let a = ts_of(&evaluated[0])?;
1901            let b = ts_of(&evaluated[1])?;
1902            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
1903            let sign = if days < 0 || (days == 0 && micros < 0) {
1904                "-"
1905            } else {
1906                "+"
1907            };
1908            let abs_days = days.unsigned_abs() as i64;
1909            let abs_us = micros.unsigned_abs() as i64;
1910            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
1911            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
1912            Ok(Value::Text(
1913                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
1914            ))
1915        }
1916        "AT_TIMEZONE" => {
1917            check_args(name, &evaluated, 2)?;
1918            if evaluated.iter().any(|v| v.is_null()) {
1919                return Ok(Value::Null);
1920            }
1921            let ts = match &evaluated[0] {
1922                Value::Timestamp(t) => *t,
1923                Value::Date(d) => crate::datetime::date_to_ts(*d),
1924                other => {
1925                    return Err(SqlError::TypeMismatch {
1926                        expected: "TIMESTAMP or DATE".into(),
1927                        got: other.data_type().to_string(),
1928                    })
1929                }
1930            };
1931            let zone = match &evaluated[1] {
1932                Value::Text(s) => s.to_string(),
1933                _ => {
1934                    return Err(SqlError::TypeMismatch {
1935                        expected: "TEXT time zone".into(),
1936                        got: evaluated[1].data_type().to_string(),
1937                    })
1938                }
1939            };
1940            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
1941            let upper = zone.to_ascii_uppercase();
1942            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
1943                return Err(SqlError::InvalidTimezone(format!(
1944                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
1945                )));
1946            }
1947            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
1948            Ok(Value::Text(formatted.into()))
1949        }
1950        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
1951    }
1952}
1953
1954/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
1955fn ts_of(v: &Value) -> Result<i64> {
1956    match v {
1957        Value::Timestamp(t) => Ok(*t),
1958        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
1959        _ => Err(SqlError::TypeMismatch {
1960            expected: "TIMESTAMP or DATE".into(),
1961            got: v.data_type().to_string(),
1962        }),
1963    }
1964}
1965
1966fn int_arg(v: &Value, label: &str) -> Result<i64> {
1967    match v {
1968        Value::Integer(n) => Ok(*n),
1969        _ => Err(SqlError::TypeMismatch {
1970            expected: format!("INTEGER ({label})"),
1971            got: v.data_type().to_string(),
1972        }),
1973    }
1974}
1975
1976/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
1977fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
1978    match v {
1979        Value::Integer(n) => {
1980            if !(0..=60).contains(n) {
1981                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
1982            }
1983            Ok((*n as u8, 0))
1984        }
1985        Value::Real(r) => {
1986            let whole = r.trunc() as i64;
1987            if !(0..=60).contains(&whole) {
1988                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
1989            }
1990            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
1991            Ok((whole as u8, frac.max(0) as u32))
1992        }
1993        _ => Err(SqlError::TypeMismatch {
1994            expected: "numeric seconds".into(),
1995            got: v.data_type().to_string(),
1996        }),
1997    }
1998}
1999
2000/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
2001fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
2002    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
2003    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
2004    let ts = JTimestamp::from_microsecond(ts_utc)
2005        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
2006    let zoned = ts.to_zoned(zone.clone());
2007    let unit_lower = unit.to_ascii_lowercase();
2008    let rounded = match unit_lower.as_str() {
2009        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
2010        "second" => zoned
2011            .start_of_day()
2012            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
2013        _ => {
2014            let naive_ts = zoned.timestamp().as_microsecond();
2015            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
2016        }
2017    };
2018    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
2019}
2020
2021fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
2022    if args.len() != expected {
2023        Err(SqlError::InvalidValue(format!(
2024            "{name} requires {expected} argument(s), got {}",
2025            args.len()
2026        )))
2027    } else {
2028        Ok(())
2029    }
2030}
2031
2032pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
2033    let mut indices = Vec::new();
2034    collect_column_refs(expr, columns, &mut indices);
2035    indices.sort_unstable();
2036    indices.dedup();
2037    indices
2038}
2039
2040fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
2041    match expr {
2042        Expr::Column(name) => {
2043            for (i, c) in columns.iter().enumerate() {
2044                if c.name == *name || c.name.ends_with(&format!(".{name}")) {
2045                    out.push(i);
2046                    break;
2047                }
2048            }
2049        }
2050        Expr::QualifiedColumn { table, column } => {
2051            let qualified = format!("{table}.{column}");
2052            if let Some(idx) = columns.iter().position(|c| c.name == qualified) {
2053                out.push(idx);
2054            } else {
2055                let matches: Vec<usize> = columns
2056                    .iter()
2057                    .enumerate()
2058                    .filter(|(_, c)| c.name == *column)
2059                    .map(|(i, _)| i)
2060                    .collect();
2061                if matches.len() == 1 {
2062                    out.push(matches[0]);
2063                }
2064            }
2065        }
2066        Expr::BinaryOp { left, right, .. } => {
2067            collect_column_refs(left, columns, out);
2068            collect_column_refs(right, columns, out);
2069        }
2070        Expr::UnaryOp { expr, .. } => {
2071            collect_column_refs(expr, columns, out);
2072        }
2073        Expr::IsNull(e) | Expr::IsNotNull(e) => {
2074            collect_column_refs(e, columns, out);
2075        }
2076        Expr::Function { args, .. } => {
2077            for arg in args {
2078                collect_column_refs(arg, columns, out);
2079            }
2080        }
2081        Expr::InSubquery { expr, .. } => {
2082            collect_column_refs(expr, columns, out);
2083        }
2084        Expr::InList { expr, list, .. } => {
2085            collect_column_refs(expr, columns, out);
2086            for item in list {
2087                collect_column_refs(item, columns, out);
2088            }
2089        }
2090        Expr::InSet { expr, .. } => {
2091            collect_column_refs(expr, columns, out);
2092        }
2093        Expr::Between {
2094            expr, low, high, ..
2095        } => {
2096            collect_column_refs(expr, columns, out);
2097            collect_column_refs(low, columns, out);
2098            collect_column_refs(high, columns, out);
2099        }
2100        Expr::Like {
2101            expr,
2102            pattern,
2103            escape,
2104            ..
2105        } => {
2106            collect_column_refs(expr, columns, out);
2107            collect_column_refs(pattern, columns, out);
2108            if let Some(esc) = escape {
2109                collect_column_refs(esc, columns, out);
2110            }
2111        }
2112        Expr::Case {
2113            operand,
2114            conditions,
2115            else_result,
2116        } => {
2117            if let Some(op) = operand {
2118                collect_column_refs(op, columns, out);
2119            }
2120            for (when, then) in conditions {
2121                collect_column_refs(when, columns, out);
2122                collect_column_refs(then, columns, out);
2123            }
2124            if let Some(e) = else_result {
2125                collect_column_refs(e, columns, out);
2126            }
2127        }
2128        Expr::Coalesce(args) => {
2129            for arg in args {
2130                collect_column_refs(arg, columns, out);
2131            }
2132        }
2133        Expr::Cast { expr, .. } => {
2134            collect_column_refs(expr, columns, out);
2135        }
2136        Expr::WindowFunction { args, spec, .. } => {
2137            for arg in args {
2138                collect_column_refs(arg, columns, out);
2139            }
2140            for pb in &spec.partition_by {
2141                collect_column_refs(pb, columns, out);
2142            }
2143            for ob in &spec.order_by {
2144                collect_column_refs(&ob.expr, columns, out);
2145            }
2146        }
2147        Expr::Literal(_)
2148        | Expr::Parameter(_)
2149        | Expr::CountStar
2150        | Expr::Exists { .. }
2151        | Expr::ScalarSubquery(_) => {}
2152    }
2153}
2154
2155/// Check if an expression result is truthy (for WHERE/HAVING).
2156pub fn is_truthy(val: &Value) -> bool {
2157    match val {
2158        Value::Boolean(b) => *b,
2159        Value::Integer(i) => *i != 0,
2160        Value::Null => false,
2161        _ => true,
2162    }
2163}
2164
2165#[cfg(test)]
2166mod tests {
2167    use super::*;
2168    use crate::types::DataType;
2169
2170    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
2171        ColumnDef {
2172            name: name.into(),
2173            data_type: dt,
2174            nullable,
2175            position: pos,
2176            default_expr: None,
2177            default_sql: None,
2178            check_expr: None,
2179            check_sql: None,
2180            check_name: None,
2181            is_with_timezone: false,
2182        }
2183    }
2184
2185    fn test_columns() -> Vec<ColumnDef> {
2186        vec![
2187            col("id", DataType::Integer, false, 0),
2188            col("name", DataType::Text, true, 1),
2189            col("score", DataType::Real, true, 2),
2190            col("active", DataType::Boolean, false, 3),
2191        ]
2192    }
2193
2194    fn test_row() -> Vec<Value> {
2195        vec![
2196            Value::Integer(1),
2197            Value::Text("Alice".into()),
2198            Value::Real(95.5),
2199            Value::Boolean(true),
2200        ]
2201    }
2202
2203    #[test]
2204    fn eval_literal() {
2205        let cols = test_columns();
2206        let cm = ColumnMap::new(&cols);
2207        let row = test_row();
2208        let expr = Expr::Literal(Value::Integer(42));
2209        assert_eq!(
2210            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2211            Value::Integer(42)
2212        );
2213    }
2214
2215    #[test]
2216    fn eval_column_ref() {
2217        let cols = test_columns();
2218        let cm = ColumnMap::new(&cols);
2219        let row = test_row();
2220        let expr = Expr::Column("name".into());
2221        assert_eq!(
2222            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2223            Value::Text("Alice".into())
2224        );
2225    }
2226
2227    #[test]
2228    fn eval_column_case_insensitive() {
2229        let cols = test_columns();
2230        let cm = ColumnMap::new(&cols);
2231        let row = test_row();
2232        let expr = Expr::Column("name".into());
2233        assert_eq!(
2234            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2235            Value::Text("Alice".into())
2236        );
2237    }
2238
2239    #[test]
2240    fn eval_arithmetic_int() {
2241        let cols = test_columns();
2242        let cm = ColumnMap::new(&cols);
2243        let row = test_row();
2244        let expr = Expr::BinaryOp {
2245            left: Box::new(Expr::Column("id".into())),
2246            op: BinOp::Add,
2247            right: Box::new(Expr::Literal(Value::Integer(10))),
2248        };
2249        assert_eq!(
2250            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2251            Value::Integer(11)
2252        );
2253    }
2254
2255    #[test]
2256    fn eval_comparison() {
2257        let cols = test_columns();
2258        let cm = ColumnMap::new(&cols);
2259        let row = test_row();
2260        let expr = Expr::BinaryOp {
2261            left: Box::new(Expr::Column("score".into())),
2262            op: BinOp::Gt,
2263            right: Box::new(Expr::Literal(Value::Real(90.0))),
2264        };
2265        assert_eq!(
2266            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2267            Value::Boolean(true)
2268        );
2269    }
2270
2271    #[test]
2272    fn eval_null_propagation() {
2273        let cols = test_columns();
2274        let cm = ColumnMap::new(&cols);
2275        let row = vec![
2276            Value::Integer(1),
2277            Value::Null,
2278            Value::Null,
2279            Value::Boolean(true),
2280        ];
2281        let expr = Expr::BinaryOp {
2282            left: Box::new(Expr::Column("name".into())),
2283            op: BinOp::Eq,
2284            right: Box::new(Expr::Literal(Value::Text("test".into()))),
2285        };
2286        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2287            .unwrap()
2288            .is_null());
2289    }
2290
2291    #[test]
2292    fn eval_and_three_valued() {
2293        let cols = test_columns();
2294        let cm = ColumnMap::new(&cols);
2295        let row = vec![
2296            Value::Integer(1),
2297            Value::Null,
2298            Value::Null,
2299            Value::Boolean(true),
2300        ];
2301
2302        // NULL AND false = false
2303        let expr = Expr::BinaryOp {
2304            left: Box::new(Expr::Column("name".into())),
2305            op: BinOp::And,
2306            right: Box::new(Expr::Literal(Value::Boolean(false))),
2307        };
2308        assert_eq!(
2309            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2310            Value::Boolean(false)
2311        );
2312
2313        // NULL AND true = NULL
2314        let expr = Expr::BinaryOp {
2315            left: Box::new(Expr::Column("name".into())),
2316            op: BinOp::And,
2317            right: Box::new(Expr::Literal(Value::Boolean(true))),
2318        };
2319        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2320            .unwrap()
2321            .is_null());
2322    }
2323
2324    #[test]
2325    fn eval_or_three_valued() {
2326        let cols = test_columns();
2327        let cm = ColumnMap::new(&cols);
2328        let row = vec![
2329            Value::Integer(1),
2330            Value::Null,
2331            Value::Null,
2332            Value::Boolean(true),
2333        ];
2334
2335        // NULL OR true = true
2336        let expr = Expr::BinaryOp {
2337            left: Box::new(Expr::Column("name".into())),
2338            op: BinOp::Or,
2339            right: Box::new(Expr::Literal(Value::Boolean(true))),
2340        };
2341        assert_eq!(
2342            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2343            Value::Boolean(true)
2344        );
2345
2346        // NULL OR false = NULL
2347        let expr = Expr::BinaryOp {
2348            left: Box::new(Expr::Column("name".into())),
2349            op: BinOp::Or,
2350            right: Box::new(Expr::Literal(Value::Boolean(false))),
2351        };
2352        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2353            .unwrap()
2354            .is_null());
2355    }
2356
2357    #[test]
2358    fn eval_is_null() {
2359        let cols = test_columns();
2360        let cm = ColumnMap::new(&cols);
2361        let row = vec![
2362            Value::Integer(1),
2363            Value::Null,
2364            Value::Null,
2365            Value::Boolean(true),
2366        ];
2367        let expr = Expr::IsNull(Box::new(Expr::Column("name".into())));
2368        assert_eq!(
2369            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2370            Value::Boolean(true)
2371        );
2372
2373        let expr = Expr::IsNotNull(Box::new(Expr::Column("id".into())));
2374        assert_eq!(
2375            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2376            Value::Boolean(true)
2377        );
2378    }
2379
2380    #[test]
2381    fn eval_not() {
2382        let cols = test_columns();
2383        let cm = ColumnMap::new(&cols);
2384        let row = test_row();
2385        let expr = Expr::UnaryOp {
2386            op: UnaryOp::Not,
2387            expr: Box::new(Expr::Column("active".into())),
2388        };
2389        assert_eq!(
2390            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2391            Value::Boolean(false)
2392        );
2393    }
2394
2395    #[test]
2396    fn eval_neg() {
2397        let cols = test_columns();
2398        let cm = ColumnMap::new(&cols);
2399        let row = test_row();
2400        let expr = Expr::UnaryOp {
2401            op: UnaryOp::Neg,
2402            expr: Box::new(Expr::Column("id".into())),
2403        };
2404        assert_eq!(
2405            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2406            Value::Integer(-1)
2407        );
2408    }
2409
2410    #[test]
2411    fn eval_division_by_zero() {
2412        let cols = test_columns();
2413        let cm = ColumnMap::new(&cols);
2414        let row = test_row();
2415        let expr = Expr::BinaryOp {
2416            left: Box::new(Expr::Column("id".into())),
2417            op: BinOp::Div,
2418            right: Box::new(Expr::Literal(Value::Integer(0))),
2419        };
2420        assert!(matches!(
2421            eval_expr(&expr, &EvalCtx::new(&cm, &row)),
2422            Err(SqlError::DivisionByZero)
2423        ));
2424    }
2425
2426    #[test]
2427    fn eval_mixed_numeric() {
2428        let cols = test_columns();
2429        let cm = ColumnMap::new(&cols);
2430        let row = test_row();
2431        // id (int 1) + score (real 95.5) = real 96.5
2432        let expr = Expr::BinaryOp {
2433            left: Box::new(Expr::Column("id".into())),
2434            op: BinOp::Add,
2435            right: Box::new(Expr::Column("score".into())),
2436        };
2437        assert_eq!(
2438            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2439            Value::Real(96.5)
2440        );
2441    }
2442
2443    #[test]
2444    fn is_truthy_values() {
2445        assert!(is_truthy(&Value::Boolean(true)));
2446        assert!(!is_truthy(&Value::Boolean(false)));
2447        assert!(!is_truthy(&Value::Null));
2448        assert!(is_truthy(&Value::Integer(1)));
2449        assert!(!is_truthy(&Value::Integer(0)));
2450    }
2451}