Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9pub struct ColumnMap {
10    exact: FxHashMap<String, usize>,
11    short: FxHashMap<String, ShortMatch>,
12}
13
14#[derive(Clone)]
15enum ShortMatch {
16    Unique(usize),
17    Ambiguous,
18}
19
20impl Clone for ColumnMap {
21    fn clone(&self) -> Self {
22        Self {
23            exact: self.exact.clone(),
24            short: self.short.clone(),
25        }
26    }
27}
28
29impl ColumnMap {
30    pub fn new(columns: &[ColumnDef]) -> Self {
31        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
32        let mut short: FxHashMap<String, ShortMatch> =
33            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
34
35        for (i, col) in columns.iter().enumerate() {
36            let lower = col.name.to_ascii_lowercase();
37            exact.insert(lower.clone(), i);
38
39            let unqualified = if let Some(dot) = lower.rfind('.') {
40                &lower[dot + 1..]
41            } else {
42                &lower
43            };
44            short
45                .entry(unqualified.to_string())
46                .and_modify(|e| *e = ShortMatch::Ambiguous)
47                .or_insert(ShortMatch::Unique(i));
48        }
49
50        Self { exact, short }
51    }
52
53    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
54        if let Some(&idx) = self.exact.get(name) {
55            return Ok(idx);
56        }
57        match self.short.get(name) {
58            Some(ShortMatch::Unique(idx)) => Ok(*idx),
59            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
60            None => Err(SqlError::ColumnNotFound(name.to_string())),
61        }
62    }
63
64    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
65        let qualified = format!("{table}.{column}");
66        if let Some(&idx) = self.exact.get(&qualified) {
67            return Ok(idx);
68        }
69        match self.short.get(column) {
70            Some(ShortMatch::Unique(idx)) => Ok(*idx),
71            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
72        }
73    }
74}
75
76pub struct EvalCtx<'a> {
77    pub col_map: &'a ColumnMap,
78    pub row: &'a [Value],
79    pub params: &'a [Value],
80}
81
82impl<'a> EvalCtx<'a> {
83    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
84        Self {
85            col_map,
86            row,
87            params: &[],
88        }
89    }
90
91    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
92        Self {
93            col_map,
94            row,
95            params,
96        }
97    }
98}
99
100thread_local! {
101    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
102        const { std::cell::Cell::new((std::ptr::null(), 0)) };
103}
104
105/// Install positional parameters for `Expr::Parameter` resolution during `f`.
106pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
107    struct Guard((*const Value, usize));
108    impl Drop for Guard {
109        fn drop(&mut self) {
110            SCOPED_PARAMS.with(|slot| slot.set(self.0));
111        }
112    }
113    SCOPED_PARAMS.with(|slot| {
114        let prev = slot.get();
115        slot.set((params.as_ptr(), params.len()));
116        let _guard = Guard(prev);
117        f()
118    })
119}
120
121fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
122    if !ctx_params.is_empty() {
123        if n == 0 || n > ctx_params.len() {
124            return Err(SqlError::ParameterCountMismatch {
125                expected: n,
126                got: ctx_params.len(),
127            });
128        }
129        return Ok(ctx_params[n - 1].clone());
130    }
131    resolve_scoped_param(n)
132}
133
134pub fn resolve_scoped_param(n: usize) -> Result<Value> {
135    SCOPED_PARAMS.with(|slot| {
136        let (ptr, len) = slot.get();
137        if n == 0 || n > len {
138            return Err(SqlError::ParameterCountMismatch {
139                expected: n,
140                got: len,
141            });
142        }
143        // SAFETY: `with_scoped_params` keeps the slice alive for the duration of `f()`
144        // and restores the previous pointer on return. Reads only happen inside `f()`.
145        unsafe { Ok((*ptr.add(n - 1)).clone()) }
146    })
147}
148
149pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
150    match expr {
151        Expr::Literal(v) => Ok(v.clone()),
152
153        Expr::Column(name) => {
154            let idx = ctx.col_map.resolve(name)?;
155            Ok(ctx.row[idx].clone())
156        }
157
158        Expr::QualifiedColumn { table, column } => {
159            let idx = ctx.col_map.resolve_qualified(table, column)?;
160            Ok(ctx.row[idx].clone())
161        }
162
163        Expr::BinaryOp { left, op, right } => {
164            let lval = eval_expr(left, ctx)?;
165            let rval = eval_expr(right, ctx)?;
166            eval_binary_op(&lval, *op, &rval)
167        }
168
169        Expr::UnaryOp { op, expr } => {
170            let val = eval_expr(expr, ctx)?;
171            eval_unary_op(*op, &val)
172        }
173
174        Expr::IsNull(e) => {
175            let val = eval_expr(e, ctx)?;
176            Ok(Value::Boolean(val.is_null()))
177        }
178
179        Expr::IsNotNull(e) => {
180            let val = eval_expr(e, ctx)?;
181            Ok(Value::Boolean(!val.is_null()))
182        }
183
184        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
185
186        Expr::CountStar => Err(SqlError::Unsupported(
187            "COUNT(*) in non-aggregate context".into(),
188        )),
189
190        Expr::InList {
191            expr: e,
192            list,
193            negated,
194        } => {
195            let lhs = eval_expr(e, ctx)?;
196            eval_in_values(&lhs, list, ctx, *negated)
197        }
198
199        Expr::InSet {
200            expr: e,
201            values,
202            has_null,
203            negated,
204        } => {
205            let lhs = eval_expr(e, ctx)?;
206            eval_in_set(&lhs, values, *has_null, *negated)
207        }
208
209        Expr::Between {
210            expr: e,
211            low,
212            high,
213            negated,
214        } => {
215            let val = eval_expr(e, ctx)?;
216            let lo = eval_expr(low, ctx)?;
217            let hi = eval_expr(high, ctx)?;
218            eval_between(&val, &lo, &hi, *negated)
219        }
220
221        Expr::Like {
222            expr: e,
223            pattern,
224            escape,
225            negated,
226        } => {
227            let val = eval_expr(e, ctx)?;
228            let pat = eval_expr(pattern, ctx)?;
229            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
230            eval_like(&val, &pat, esc.as_ref(), *negated)
231        }
232
233        Expr::Case {
234            operand,
235            conditions,
236            else_result,
237        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
238
239        Expr::Coalesce(args) => {
240            for arg in args {
241                let val = eval_expr(arg, ctx)?;
242                if !val.is_null() {
243                    return Ok(val);
244                }
245            }
246            Ok(Value::Null)
247        }
248
249        Expr::Cast { expr: e, data_type } => {
250            let val = eval_expr(e, ctx)?;
251            eval_cast(&val, *data_type)
252        }
253
254        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
255            SqlError::Unsupported("subquery not materialized (internal error)".into()),
256        ),
257
258        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
259
260        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
261            "window functions are only allowed in SELECT columns".into(),
262        )),
263    }
264}
265
266/// Planner-level constant folding hook; shares semantics with row evaluation.
267pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
268    eval_binary_op(left, op, right)
269}
270
271fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
272    match op {
273        BinOp::And => return eval_and(left, right),
274        BinOp::Or => return eval_or(left, right),
275        _ => {}
276    }
277
278    if left.is_null() || right.is_null() {
279        return Ok(Value::Null);
280    }
281
282    if let Some(res) = eval_temporal_op(left, op, right) {
283        return res;
284    }
285
286    match op {
287        BinOp::Eq => Ok(Value::Boolean(left == right)),
288        BinOp::NotEq => Ok(Value::Boolean(left != right)),
289        BinOp::Lt => Ok(Value::Boolean(left < right)),
290        BinOp::Gt => Ok(Value::Boolean(left > right)),
291        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
292        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
293        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
294        BinOp::Sub => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
295        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
296        BinOp::Div => {
297            match right {
298                Value::Integer(0) => return Err(SqlError::DivisionByZero),
299                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
300                _ => {}
301            }
302            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
303        }
304        BinOp::Mod => {
305            match right {
306                Value::Integer(0) => return Err(SqlError::DivisionByZero),
307                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
308                _ => {}
309            }
310            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
311        }
312        BinOp::Concat => {
313            let ls = value_to_text(left);
314            let rs = value_to_text(right);
315            Ok(Value::Text(format!("{ls}{rs}").into()))
316        }
317        BinOp::And | BinOp::Or => unreachable!(),
318    }
319}
320
321/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
322fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
323    use crate::datetime as dt;
324    use std::cmp::Ordering;
325
326    let is_temporal = |v: &Value| {
327        matches!(
328            v,
329            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
330        )
331    };
332    if matches!(op, BinOp::Add | BinOp::Sub)
333        && ((is_temporal(left) && matches!(right, Value::Real(_)))
334            || (matches!(left, Value::Real(_)) && is_temporal(right)))
335    {
336        return Some(Err(SqlError::TypeMismatch {
337            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
338            got: format!("{} and {}", left.data_type(), right.data_type()),
339        }));
340    }
341
342    match (left, op, right) {
343        (Value::Date(d), BinOp::Add, Value::Integer(n))
344        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
345            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
346        }
347        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
348            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
349        }
350        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
351            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
352        }
353        // DATE ± INTERVAL → TIMESTAMP (PG rule).
354        (
355            Value::Date(d),
356            BinOp::Add,
357            Value::Interval {
358                months,
359                days,
360                micros,
361            },
362        )
363        | (
364            Value::Interval {
365                months,
366                days,
367                micros,
368            },
369            BinOp::Add,
370            Value::Date(d),
371        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
372        (
373            Value::Date(d),
374            BinOp::Sub,
375            Value::Interval {
376                months,
377                days,
378                micros,
379            },
380        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
381        (
382            Value::Timestamp(t),
383            BinOp::Add,
384            Value::Interval {
385                months,
386                days,
387                micros,
388            },
389        )
390        | (
391            Value::Interval {
392                months,
393                days,
394                micros,
395            },
396            BinOp::Add,
397            Value::Timestamp(t),
398        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
399        (
400            Value::Timestamp(t),
401            BinOp::Sub,
402            Value::Interval {
403                months,
404                days,
405                micros,
406            },
407        ) => Some(
408            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
409        ),
410        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
411            let (days, micros) = dt::subtract_timestamps(*a, *b);
412            Some(Ok(Value::Interval {
413                months: 0,
414                days,
415                micros,
416            }))
417        }
418        (
419            Value::Time(t),
420            BinOp::Add,
421            Value::Interval {
422                months,
423                days,
424                micros,
425            },
426        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
427        (
428            Value::Time(t),
429            BinOp::Sub,
430            Value::Interval {
431                months,
432                days,
433                micros,
434            },
435        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
436        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
437            months: 0,
438            days: 0,
439            micros: *a - *b,
440        })),
441        (
442            Value::Interval {
443                months: am,
444                days: ad,
445                micros: au,
446            },
447            BinOp::Add,
448            Value::Interval {
449                months: bm,
450                days: bd,
451                micros: bu,
452            },
453        ) => Some(Ok(Value::Interval {
454            months: am.saturating_add(*bm),
455            days: ad.saturating_add(*bd),
456            micros: au.saturating_add(*bu),
457        })),
458        (
459            Value::Interval {
460                months: am,
461                days: ad,
462                micros: au,
463            },
464            BinOp::Sub,
465            Value::Interval {
466                months: bm,
467                days: bd,
468                micros: bu,
469            },
470        ) => Some(Ok(Value::Interval {
471            months: am.saturating_sub(*bm),
472            days: ad.saturating_sub(*bd),
473            micros: au.saturating_sub(*bu),
474        })),
475        (
476            Value::Interval {
477                months,
478                days,
479                micros,
480            },
481            BinOp::Mul,
482            Value::Integer(n),
483        )
484        | (
485            Value::Integer(n),
486            BinOp::Mul,
487            Value::Interval {
488                months,
489                days,
490                micros,
491            },
492        ) => {
493            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
494            Some(Ok(Value::Interval {
495                months: months.saturating_mul(n32),
496                days: days.saturating_mul(n32),
497                micros: micros.saturating_mul(*n),
498            }))
499        }
500        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
501        (
502            Value::Interval {
503                months,
504                days,
505                micros,
506            },
507            BinOp::Mul,
508            Value::Real(r),
509        )
510        | (
511            Value::Real(r),
512            BinOp::Mul,
513            Value::Interval {
514                months,
515                days,
516                micros,
517            },
518        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
519        (
520            Value::Interval {
521                months,
522                days,
523                micros,
524            },
525            BinOp::Div,
526            Value::Integer(n),
527        ) if *n != 0 => Some(Ok(Value::Interval {
528            months: (*months as i64 / *n) as i32,
529            days: (*days as i64 / *n) as i32,
530            micros: *micros / *n,
531        })),
532        (
533            Value::Interval {
534                months,
535                days,
536                micros,
537            },
538            BinOp::Div,
539            Value::Real(r),
540        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
541        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
542        (
543            Value::Interval {
544                months: am,
545                days: ad,
546                micros: au,
547            },
548            op,
549            Value::Interval {
550                months: bm,
551                days: bd,
552                micros: bu,
553            },
554        ) if matches!(
555            op,
556            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
557        ) =>
558        {
559            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
560            let b = match op {
561                BinOp::Eq => ord == Ordering::Equal,
562                BinOp::NotEq => ord != Ordering::Equal,
563                BinOp::Lt => ord == Ordering::Less,
564                BinOp::Gt => ord == Ordering::Greater,
565                BinOp::LtEq => ord != Ordering::Greater,
566                BinOp::GtEq => ord != Ordering::Less,
567                _ => unreachable!(),
568            };
569            Some(Ok(Value::Boolean(b)))
570        }
571        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
572        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
573        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
574            Some(Err(SqlError::TypeMismatch {
575                expected: "INTERVAL (use CAST or explicit unit)".into(),
576                got: format!("{} and {}", left.data_type(), right.data_type()),
577            }))
578        }
579        _ => None,
580    }
581}
582
583/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
584fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
585    let raw_months = months as f64 * factor;
586    let whole_months = raw_months.trunc() as i64;
587    let frac_months = raw_months - whole_months as f64;
588    let months_frac_as_days = frac_months * 30.0;
589
590    let raw_days = days as f64 * factor + months_frac_as_days;
591    let whole_days = raw_days.trunc() as i64;
592    let frac_days = raw_days - whole_days as f64;
593    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
594
595    let raw_micros = (micros as f64 * factor).round() as i64;
596    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
597
598    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
599    Value::Interval {
600        months: clamp_i32(whole_months),
601        days: clamp_i32(whole_days),
602        micros: total_micros,
603    }
604}
605
606/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
607fn eval_and(left: &Value, right: &Value) -> Result<Value> {
608    let l = to_bool_or_null(left)?;
609    let r = to_bool_or_null(right)?;
610    match (l, r) {
611        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
612        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
613        _ => Ok(Value::Null),
614    }
615}
616
617/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
618fn eval_or(left: &Value, right: &Value) -> Result<Value> {
619    let l = to_bool_or_null(left)?;
620    let r = to_bool_or_null(right)?;
621    match (l, r) {
622        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
623        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
624        _ => Ok(Value::Null),
625    }
626}
627
628fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
629    match val {
630        Value::Boolean(b) => Ok(Some(*b)),
631        Value::Null => Ok(None),
632        Value::Integer(i) => Ok(Some(*i != 0)),
633        _ => Err(SqlError::TypeMismatch {
634            expected: "BOOLEAN".into(),
635            got: format!("{}", val.data_type()),
636        }),
637    }
638}
639
640fn eval_arithmetic(
641    left: &Value,
642    right: &Value,
643    int_op: fn(i64, i64) -> Option<i64>,
644    real_op: fn(f64, f64) -> f64,
645) -> Result<Value> {
646    match (left, right) {
647        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
648            .map(Value::Integer)
649            .ok_or(SqlError::IntegerOverflow),
650        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
651        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
652        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
653        _ => Err(SqlError::TypeMismatch {
654            expected: "numeric".into(),
655            got: format!("{} and {}", left.data_type(), right.data_type()),
656        }),
657    }
658}
659
660fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
661    if list.is_empty() {
662        return Ok(Value::Boolean(negated));
663    }
664    if lhs.is_null() {
665        return Ok(Value::Null);
666    }
667    let mut has_null = false;
668    for item in list {
669        let rhs = eval_expr(item, ctx)?;
670        if rhs.is_null() {
671            has_null = true;
672        } else if lhs == &rhs {
673            return Ok(Value::Boolean(!negated));
674        }
675    }
676    if has_null {
677        Ok(Value::Null)
678    } else {
679        Ok(Value::Boolean(negated))
680    }
681}
682
683fn eval_in_set(
684    lhs: &Value,
685    values: &std::collections::HashSet<Value>,
686    has_null: bool,
687    negated: bool,
688) -> Result<Value> {
689    if values.is_empty() && !has_null {
690        return Ok(Value::Boolean(negated));
691    }
692    if lhs.is_null() {
693        return Ok(Value::Null);
694    }
695    if values.contains(lhs) {
696        return Ok(Value::Boolean(!negated));
697    }
698    if has_null {
699        Ok(Value::Null)
700    } else {
701        Ok(Value::Boolean(negated))
702    }
703}
704
705fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
706    if val.is_null() {
707        return Ok(Value::Null);
708    }
709    match op {
710        UnaryOp::Neg => match val {
711            Value::Integer(i) => i
712                .checked_neg()
713                .map(Value::Integer)
714                .ok_or(SqlError::IntegerOverflow),
715            Value::Real(r) => Ok(Value::Real(-r)),
716            Value::Interval {
717                months,
718                days,
719                micros,
720            } => {
721                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
722                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
723                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
724                Ok(Value::Interval {
725                    months: m,
726                    days: d,
727                    micros: u,
728                })
729            }
730            _ => Err(SqlError::TypeMismatch {
731                expected: "numeric or INTERVAL".into(),
732                got: format!("{}", val.data_type()),
733            }),
734        },
735        UnaryOp::Not => match val {
736            Value::Boolean(b) => Ok(Value::Boolean(!b)),
737            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
738            _ => Err(SqlError::TypeMismatch {
739                expected: "BOOLEAN".into(),
740                got: format!("{}", val.data_type()),
741            }),
742        },
743    }
744}
745
746fn value_to_text(val: &Value) -> String {
747    match val {
748        Value::Text(s) => s.to_string(),
749        Value::Integer(i) => i.to_string(),
750        Value::Real(r) => {
751            if r.fract() == 0.0 && r.is_finite() {
752                format!("{r:.1}")
753            } else {
754                format!("{r}")
755            }
756        }
757        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
758        Value::Null => String::new(),
759        Value::Blob(b) => {
760            let mut s = String::with_capacity(b.len() * 2);
761            for byte in b {
762                s.push_str(&format!("{byte:02X}"));
763            }
764            s
765        }
766        Value::Date(d) => crate::datetime::format_date(*d),
767        Value::Time(t) => crate::datetime::format_time(*t),
768        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
769        Value::Interval {
770            months,
771            days,
772            micros,
773        } => crate::datetime::format_interval(*months, *days, *micros),
774    }
775}
776
777fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
778    if val.is_null() || low.is_null() || high.is_null() {
779        let ge = if val.is_null() || low.is_null() {
780            None
781        } else {
782            Some(*val >= *low)
783        };
784        let le = if val.is_null() || high.is_null() {
785            None
786        } else {
787            Some(*val <= *high)
788        };
789
790        let result = match (ge, le) {
791            (Some(false), _) | (_, Some(false)) => Some(false),
792            (Some(true), Some(true)) => Some(true),
793            _ => None,
794        };
795
796        return match result {
797            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
798            None => Ok(Value::Null),
799        };
800    }
801
802    let in_range = *val >= *low && *val <= *high;
803    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
804}
805
806const MAX_LIKE_PATTERN_LEN: usize = 10_000;
807
808fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
809    if val.is_null() || pattern.is_null() {
810        return Ok(Value::Null);
811    }
812    let text = match val {
813        Value::Text(s) => s.as_str(),
814        _ => {
815            return Err(SqlError::TypeMismatch {
816                expected: "TEXT".into(),
817                got: val.data_type().to_string(),
818            })
819        }
820    };
821    let pat = match pattern {
822        Value::Text(s) => s.as_str(),
823        _ => {
824            return Err(SqlError::TypeMismatch {
825                expected: "TEXT".into(),
826                got: pattern.data_type().to_string(),
827            })
828        }
829    };
830
831    if pat.len() > MAX_LIKE_PATTERN_LEN {
832        return Err(SqlError::InvalidValue(format!(
833            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
834            pat.len()
835        )));
836    }
837
838    let esc_char = match escape {
839        Some(Value::Text(s)) => {
840            let mut chars = s.chars();
841            let c = chars.next().ok_or_else(|| {
842                SqlError::InvalidValue("ESCAPE must be a single character".into())
843            })?;
844            if chars.next().is_some() {
845                return Err(SqlError::InvalidValue(
846                    "ESCAPE must be a single character".into(),
847                ));
848            }
849            Some(c)
850        }
851        Some(Value::Null) => return Ok(Value::Null),
852        Some(_) => {
853            return Err(SqlError::TypeMismatch {
854                expected: "TEXT".into(),
855                got: "non-text".into(),
856            })
857        }
858        None => None,
859    };
860
861    let matched = like_match(text, pat, esc_char);
862    Ok(Value::Boolean(if negated { !matched } else { matched }))
863}
864
865fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
866    let t: Vec<char> = text.chars().collect();
867    let p: Vec<char> = pattern.chars().collect();
868    like_match_impl(&t, &p, 0, 0, escape)
869}
870
871fn like_match_impl(
872    t: &[char],
873    p: &[char],
874    mut ti: usize,
875    mut pi: usize,
876    esc: Option<char>,
877) -> bool {
878    let mut star_pi: Option<usize> = None;
879    let mut star_ti: usize = 0;
880
881    while ti < t.len() {
882        if pi < p.len() {
883            if let Some(ec) = esc {
884                if p[pi] == ec && pi + 1 < p.len() {
885                    pi += 1;
886                    let pc_lower = p[pi].to_ascii_lowercase();
887                    let tc_lower = t[ti].to_ascii_lowercase();
888                    if pc_lower == tc_lower {
889                        pi += 1;
890                        ti += 1;
891                        continue;
892                    } else if let Some(sp) = star_pi {
893                        pi = sp + 1;
894                        star_ti += 1;
895                        ti = star_ti;
896                        continue;
897                    } else {
898                        return false;
899                    }
900                }
901            }
902            if p[pi] == '%' {
903                star_pi = Some(pi);
904                star_ti = ti;
905                pi += 1;
906                continue;
907            }
908            if p[pi] == '_' {
909                pi += 1;
910                ti += 1;
911                continue;
912            }
913            if p[pi].eq_ignore_ascii_case(&t[ti]) {
914                pi += 1;
915                ti += 1;
916                continue;
917            }
918        }
919        if let Some(sp) = star_pi {
920            pi = sp + 1;
921            star_ti += 1;
922            ti = star_ti;
923        } else {
924            return false;
925        }
926    }
927
928    while pi < p.len() && p[pi] == '%' {
929        pi += 1;
930    }
931    pi == p.len()
932}
933
934fn eval_case(
935    operand: Option<&Expr>,
936    conditions: &[(Expr, Expr)],
937    else_result: Option<&Expr>,
938    ctx: &EvalCtx,
939) -> Result<Value> {
940    if let Some(op_expr) = operand {
941        let op_val = eval_expr(op_expr, ctx)?;
942        for (cond, result) in conditions {
943            let cond_val = eval_expr(cond, ctx)?;
944            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
945                return eval_expr(result, ctx);
946            }
947        }
948    } else {
949        for (cond, result) in conditions {
950            let cond_val = eval_expr(cond, ctx)?;
951            if is_truthy(&cond_val) {
952                return eval_expr(result, ctx);
953            }
954        }
955    }
956    match else_result {
957        Some(e) => eval_expr(e, ctx),
958        None => Ok(Value::Null),
959    }
960}
961
962fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
963    if val.is_null() {
964        return Ok(Value::Null);
965    }
966    match target {
967        DataType::Integer => match val {
968            Value::Integer(_) => Ok(val.clone()),
969            Value::Real(r) => Ok(Value::Integer(*r as i64)),
970            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
971            Value::Text(s) => s
972                .trim()
973                .parse::<i64>()
974                .map(Value::Integer)
975                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
976                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
977            _ => Err(SqlError::InvalidValue(format!(
978                "cannot cast {} to INTEGER",
979                val.data_type()
980            ))),
981        },
982        DataType::Real => match val {
983            Value::Real(_) => Ok(val.clone()),
984            Value::Integer(i) => Ok(Value::Real(*i as f64)),
985            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
986            Value::Text(s) => s
987                .trim()
988                .parse::<f64>()
989                .map(Value::Real)
990                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
991            _ => Err(SqlError::InvalidValue(format!(
992                "cannot cast {} to REAL",
993                val.data_type()
994            ))),
995        },
996        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
997        DataType::Boolean => match val {
998            Value::Boolean(_) => Ok(val.clone()),
999            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1000            Value::Text(s) => {
1001                let lower = s.trim().to_ascii_lowercase();
1002                match lower.as_str() {
1003                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1004                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1005                    _ => Err(SqlError::InvalidValue(format!(
1006                        "cannot cast '{s}' to BOOLEAN"
1007                    ))),
1008                }
1009            }
1010            _ => Err(SqlError::InvalidValue(format!(
1011                "cannot cast {} to BOOLEAN",
1012                val.data_type()
1013            ))),
1014        },
1015        DataType::Blob => match val {
1016            Value::Blob(_) => Ok(val.clone()),
1017            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1018            _ => Err(SqlError::InvalidValue(format!(
1019                "cannot cast {} to BLOB",
1020                val.data_type()
1021            ))),
1022        },
1023        DataType::Null => Ok(Value::Null),
1024        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1025            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1026        }),
1027        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1028            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1029        }),
1030        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1031            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1032        }),
1033        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1034            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1035        }),
1036    }
1037}
1038
1039fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1040    let evaluated: Vec<Value> = args
1041        .iter()
1042        .map(|a| eval_expr(a, ctx))
1043        .collect::<Result<Vec<_>>>()?;
1044
1045    match name {
1046        "LENGTH" => {
1047            check_args(name, &evaluated, 1)?;
1048            match &evaluated[0] {
1049                Value::Null => Ok(Value::Null),
1050                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1051                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1052                _ => Ok(Value::Integer(
1053                    value_to_text(&evaluated[0]).chars().count() as i64
1054                )),
1055            }
1056        }
1057        "UPPER" => {
1058            check_args(name, &evaluated, 1)?;
1059            match &evaluated[0] {
1060                Value::Null => Ok(Value::Null),
1061                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1062                _ => Ok(Value::Text(
1063                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1064                )),
1065            }
1066        }
1067        "LOWER" => {
1068            check_args(name, &evaluated, 1)?;
1069            match &evaluated[0] {
1070                Value::Null => Ok(Value::Null),
1071                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1072                _ => Ok(Value::Text(
1073                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1074                )),
1075            }
1076        }
1077        "SUBSTR" | "SUBSTRING" => {
1078            if evaluated.len() < 2 || evaluated.len() > 3 {
1079                return Err(SqlError::InvalidValue(format!(
1080                    "{name} requires 2 or 3 arguments"
1081                )));
1082            }
1083            if evaluated.iter().any(|v| v.is_null()) {
1084                return Ok(Value::Null);
1085            }
1086            let s = value_to_text(&evaluated[0]);
1087            let chars: Vec<char> = s.chars().collect();
1088            let start = match &evaluated[1] {
1089                Value::Integer(i) => *i,
1090                _ => {
1091                    return Err(SqlError::TypeMismatch {
1092                        expected: "INTEGER".into(),
1093                        got: evaluated[1].data_type().to_string(),
1094                    })
1095                }
1096            };
1097            let len = chars.len() as i64;
1098
1099            let (begin, count) = if evaluated.len() == 3 {
1100                let cnt = match &evaluated[2] {
1101                    Value::Integer(i) => *i,
1102                    _ => {
1103                        return Err(SqlError::TypeMismatch {
1104                            expected: "INTEGER".into(),
1105                            got: evaluated[2].data_type().to_string(),
1106                        })
1107                    }
1108                };
1109                if start >= 1 {
1110                    let b = (start - 1).min(len) as usize;
1111                    let c = cnt.max(0) as usize;
1112                    (b, c)
1113                } else if start == 0 {
1114                    let c = (cnt - 1).max(0) as usize;
1115                    (0usize, c)
1116                } else {
1117                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1118                    (0usize, adjusted_cnt)
1119                }
1120            } else if start >= 1 {
1121                let b = (start - 1).min(len) as usize;
1122                (b, chars.len() - b)
1123            } else if start == 0 {
1124                (0usize, chars.len())
1125            } else {
1126                let b = (len + start).max(0) as usize;
1127                (b, chars.len() - b)
1128            };
1129
1130            let result: String = chars.iter().skip(begin).take(count).collect();
1131            Ok(Value::Text(result.into()))
1132        }
1133        "TRIM" | "LTRIM" | "RTRIM" => {
1134            if evaluated.is_empty() || evaluated.len() > 2 {
1135                return Err(SqlError::InvalidValue(format!(
1136                    "{name} requires 1 or 2 arguments"
1137                )));
1138            }
1139            if evaluated[0].is_null() {
1140                return Ok(Value::Null);
1141            }
1142            let s = value_to_text(&evaluated[0]);
1143            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1144                if evaluated[1].is_null() {
1145                    return Ok(Value::Null);
1146                }
1147                value_to_text(&evaluated[1]).chars().collect()
1148            } else {
1149                vec![' ']
1150            };
1151            let result = match name {
1152                "TRIM" => s
1153                    .trim_matches(|c: char| trim_chars.contains(&c))
1154                    .to_string(),
1155                "LTRIM" => s
1156                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1157                    .to_string(),
1158                "RTRIM" => s
1159                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1160                    .to_string(),
1161                _ => unreachable!(),
1162            };
1163            Ok(Value::Text(result.into()))
1164        }
1165        "REPLACE" => {
1166            check_args(name, &evaluated, 3)?;
1167            if evaluated.iter().any(|v| v.is_null()) {
1168                return Ok(Value::Null);
1169            }
1170            let s = value_to_text(&evaluated[0]);
1171            let from = value_to_text(&evaluated[1]);
1172            let to = value_to_text(&evaluated[2]);
1173            if from.is_empty() {
1174                return Ok(Value::Text(s.into()));
1175            }
1176            Ok(Value::Text(s.replace(&from, &to).into()))
1177        }
1178        "INSTR" => {
1179            check_args(name, &evaluated, 2)?;
1180            if evaluated.iter().any(|v| v.is_null()) {
1181                return Ok(Value::Null);
1182            }
1183            let haystack = value_to_text(&evaluated[0]);
1184            let needle = value_to_text(&evaluated[1]);
1185            let pos = haystack
1186                .find(&needle)
1187                .map(|i| haystack[..i].chars().count() as i64 + 1)
1188                .unwrap_or(0);
1189            Ok(Value::Integer(pos))
1190        }
1191        "CONCAT" => {
1192            if evaluated.is_empty() {
1193                return Ok(Value::Text(CompactString::default()));
1194            }
1195            let mut result = String::new();
1196            for v in &evaluated {
1197                match v {
1198                    Value::Null => {}
1199                    _ => result.push_str(&value_to_text(v)),
1200                }
1201            }
1202            Ok(Value::Text(result.into()))
1203        }
1204        "ABS" => {
1205            check_args(name, &evaluated, 1)?;
1206            match &evaluated[0] {
1207                Value::Null => Ok(Value::Null),
1208                Value::Integer(i) => i
1209                    .checked_abs()
1210                    .map(Value::Integer)
1211                    .ok_or(SqlError::IntegerOverflow),
1212                Value::Real(r) => Ok(Value::Real(r.abs())),
1213                _ => Err(SqlError::TypeMismatch {
1214                    expected: "numeric".into(),
1215                    got: evaluated[0].data_type().to_string(),
1216                }),
1217            }
1218        }
1219        "ROUND" => {
1220            if evaluated.is_empty() || evaluated.len() > 2 {
1221                return Err(SqlError::InvalidValue(
1222                    "ROUND requires 1 or 2 arguments".into(),
1223                ));
1224            }
1225            if evaluated[0].is_null() {
1226                return Ok(Value::Null);
1227            }
1228            let val = match &evaluated[0] {
1229                Value::Integer(i) => *i as f64,
1230                Value::Real(r) => *r,
1231                _ => {
1232                    return Err(SqlError::TypeMismatch {
1233                        expected: "numeric".into(),
1234                        got: evaluated[0].data_type().to_string(),
1235                    })
1236                }
1237            };
1238            let places = if evaluated.len() == 2 {
1239                match &evaluated[1] {
1240                    Value::Null => return Ok(Value::Null),
1241                    Value::Integer(i) => *i,
1242                    _ => {
1243                        return Err(SqlError::TypeMismatch {
1244                            expected: "INTEGER".into(),
1245                            got: evaluated[1].data_type().to_string(),
1246                        })
1247                    }
1248                }
1249            } else {
1250                0
1251            };
1252            let factor = 10f64.powi(places as i32);
1253            let rounded = (val * factor).round() / factor;
1254            Ok(Value::Real(rounded))
1255        }
1256        "CEIL" | "CEILING" => {
1257            check_args(name, &evaluated, 1)?;
1258            match &evaluated[0] {
1259                Value::Null => Ok(Value::Null),
1260                Value::Integer(i) => Ok(Value::Integer(*i)),
1261                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1262                _ => Err(SqlError::TypeMismatch {
1263                    expected: "numeric".into(),
1264                    got: evaluated[0].data_type().to_string(),
1265                }),
1266            }
1267        }
1268        "FLOOR" => {
1269            check_args(name, &evaluated, 1)?;
1270            match &evaluated[0] {
1271                Value::Null => Ok(Value::Null),
1272                Value::Integer(i) => Ok(Value::Integer(*i)),
1273                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1274                _ => Err(SqlError::TypeMismatch {
1275                    expected: "numeric".into(),
1276                    got: evaluated[0].data_type().to_string(),
1277                }),
1278            }
1279        }
1280        "SIGN" => {
1281            check_args(name, &evaluated, 1)?;
1282            match &evaluated[0] {
1283                Value::Null => Ok(Value::Null),
1284                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1285                Value::Real(r) => {
1286                    if *r > 0.0 {
1287                        Ok(Value::Integer(1))
1288                    } else if *r < 0.0 {
1289                        Ok(Value::Integer(-1))
1290                    } else {
1291                        Ok(Value::Integer(0))
1292                    }
1293                }
1294                _ => Err(SqlError::TypeMismatch {
1295                    expected: "numeric".into(),
1296                    got: evaluated[0].data_type().to_string(),
1297                }),
1298            }
1299        }
1300        "SQRT" => {
1301            check_args(name, &evaluated, 1)?;
1302            match &evaluated[0] {
1303                Value::Null => Ok(Value::Null),
1304                Value::Integer(i) => {
1305                    if *i < 0 {
1306                        Ok(Value::Null)
1307                    } else {
1308                        Ok(Value::Real((*i as f64).sqrt()))
1309                    }
1310                }
1311                Value::Real(r) => {
1312                    if *r < 0.0 {
1313                        Ok(Value::Null)
1314                    } else {
1315                        Ok(Value::Real(r.sqrt()))
1316                    }
1317                }
1318                _ => Err(SqlError::TypeMismatch {
1319                    expected: "numeric".into(),
1320                    got: evaluated[0].data_type().to_string(),
1321                }),
1322            }
1323        }
1324        "RANDOM" => {
1325            check_args(name, &evaluated, 0)?;
1326            use std::collections::hash_map::DefaultHasher;
1327            use std::hash::{Hash, Hasher};
1328            use std::time::SystemTime;
1329            let mut hasher = DefaultHasher::new();
1330            SystemTime::now().hash(&mut hasher);
1331            std::thread::current().id().hash(&mut hasher);
1332            let mut val = hasher.finish() as i64;
1333            if val == i64::MIN {
1334                val = i64::MAX;
1335            }
1336            Ok(Value::Integer(val))
1337        }
1338        "TYPEOF" => {
1339            check_args(name, &evaluated, 1)?;
1340            let type_name = match &evaluated[0] {
1341                Value::Null => "null",
1342                Value::Integer(_) => "integer",
1343                Value::Real(_) => "real",
1344                Value::Text(_) => "text",
1345                Value::Blob(_) => "blob",
1346                Value::Boolean(_) => "boolean",
1347                Value::Date(_) => "date",
1348                Value::Time(_) => "time",
1349                Value::Timestamp(_) => "timestamp",
1350                Value::Interval { .. } => "interval",
1351            };
1352            Ok(Value::Text(type_name.into()))
1353        }
1354        "MIN" => {
1355            check_args(name, &evaluated, 2)?;
1356            if evaluated[0].is_null() {
1357                return Ok(evaluated[1].clone());
1358            }
1359            if evaluated[1].is_null() {
1360                return Ok(evaluated[0].clone());
1361            }
1362            if evaluated[0] <= evaluated[1] {
1363                Ok(evaluated[0].clone())
1364            } else {
1365                Ok(evaluated[1].clone())
1366            }
1367        }
1368        "MAX" => {
1369            check_args(name, &evaluated, 2)?;
1370            if evaluated[0].is_null() {
1371                return Ok(evaluated[1].clone());
1372            }
1373            if evaluated[1].is_null() {
1374                return Ok(evaluated[0].clone());
1375            }
1376            if evaluated[0] >= evaluated[1] {
1377                Ok(evaluated[0].clone())
1378            } else {
1379                Ok(evaluated[1].clone())
1380            }
1381        }
1382        "HEX" => {
1383            check_args(name, &evaluated, 1)?;
1384            match &evaluated[0] {
1385                Value::Null => Ok(Value::Null),
1386                Value::Blob(b) => {
1387                    let mut s = String::with_capacity(b.len() * 2);
1388                    for byte in b {
1389                        s.push_str(&format!("{byte:02X}"));
1390                    }
1391                    Ok(Value::Text(s.into()))
1392                }
1393                Value::Text(s) => {
1394                    let mut r = String::with_capacity(s.len() * 2);
1395                    for byte in s.as_bytes() {
1396                        r.push_str(&format!("{byte:02X}"));
1397                    }
1398                    Ok(Value::Text(r.into()))
1399                }
1400                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
1401            }
1402        }
1403        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
1404            check_args(name, &evaluated, 0)?;
1405            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
1406        }
1407        "CURRENT_DATE" => {
1408            check_args(name, &evaluated, 0)?;
1409            Ok(Value::Date(crate::datetime::ts_to_date_floor(
1410                crate::datetime::txn_or_clock_micros(),
1411            )))
1412        }
1413        "CURRENT_TIME" | "LOCALTIME" => {
1414            check_args(name, &evaluated, 0)?;
1415            Ok(Value::Time(
1416                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
1417            ))
1418        }
1419        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
1420            check_args(name, &evaluated, 0)?;
1421            let ts = match name {
1422                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
1423                _ => crate::datetime::txn_or_clock_micros(),
1424            };
1425            Ok(Value::Timestamp(ts))
1426        }
1427        "EXTRACT" | "DATE_PART" | "DATEPART" => {
1428            check_args(name, &evaluated, 2)?;
1429            // Borrow the field str without allocating; datetime::extract accepts &str.
1430            let field: &str = match &evaluated[0] {
1431                Value::Null => return Ok(Value::Null),
1432                Value::Text(s) => s.as_str(),
1433                _ => {
1434                    return Err(SqlError::TypeMismatch {
1435                        expected: "TEXT field name".into(),
1436                        got: evaluated[0].data_type().to_string(),
1437                    })
1438                }
1439            };
1440            if evaluated[1].is_null() {
1441                return Ok(Value::Null);
1442            }
1443            crate::datetime::extract(field, &evaluated[1])
1444        }
1445        "DATE_TRUNC" => {
1446            if evaluated.len() < 2 || evaluated.len() > 3 {
1447                return Err(SqlError::InvalidValue(
1448                    "DATE_TRUNC requires 2 or 3 arguments".into(),
1449                ));
1450            }
1451            let unit = match &evaluated[0] {
1452                Value::Null => return Ok(Value::Null),
1453                Value::Text(s) => s.to_string(),
1454                _ => {
1455                    return Err(SqlError::TypeMismatch {
1456                        expected: "TEXT unit name".into(),
1457                        got: evaluated[0].data_type().to_string(),
1458                    })
1459                }
1460            };
1461            if evaluated[1].is_null() {
1462                return Ok(Value::Null);
1463            }
1464            // Optional tz arg: truncate in that zone, then convert back to UTC.
1465            if evaluated.len() == 3 {
1466                if let Value::Text(tz) = &evaluated[2] {
1467                    if !tz.eq_ignore_ascii_case("UTC") {
1468                        if let Value::Timestamp(ts) = &evaluated[1] {
1469                            return date_trunc_in_zone(&unit, *ts, tz);
1470                        }
1471                    }
1472                }
1473            }
1474            crate::datetime::date_trunc(&unit, &evaluated[1])
1475        }
1476        "DATE_BIN" => {
1477            check_args(name, &evaluated, 3)?;
1478            if evaluated.iter().any(|v| v.is_null()) {
1479                return Ok(Value::Null);
1480            }
1481            let stride = match &evaluated[0] {
1482                Value::Interval {
1483                    months: _,
1484                    days,
1485                    micros,
1486                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
1487                _ => {
1488                    return Err(SqlError::TypeMismatch {
1489                        expected: "INTERVAL stride".into(),
1490                        got: evaluated[0].data_type().to_string(),
1491                    })
1492                }
1493            };
1494            if stride <= 0 {
1495                return Err(SqlError::InvalidValue(
1496                    "DATE_BIN stride must be positive".into(),
1497                ));
1498            }
1499            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
1500                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
1501                _ => {
1502                    return Err(SqlError::TypeMismatch {
1503                        expected: "TIMESTAMP, TIMESTAMP".into(),
1504                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
1505                    })
1506                }
1507            };
1508            let diff = src - origin;
1509            let binned = origin + (diff.div_euclid(stride)) * stride;
1510            Ok(Value::Timestamp(binned))
1511        }
1512        "AGE" => {
1513            if evaluated.len() == 1 {
1514                if evaluated[0].is_null() {
1515                    return Ok(Value::Null);
1516                }
1517                let ts = match &evaluated[0] {
1518                    Value::Timestamp(t) => *t,
1519                    Value::Date(d) => crate::datetime::date_to_ts(*d),
1520                    _ => {
1521                        return Err(SqlError::TypeMismatch {
1522                            expected: "TIMESTAMP or DATE".into(),
1523                            got: evaluated[0].data_type().to_string(),
1524                        })
1525                    }
1526                };
1527                // Implicit reference: today at midnight UTC.
1528                let today = crate::datetime::today_days();
1529                let midnight = crate::datetime::date_to_ts(today);
1530                let (m, d, u) = crate::datetime::age(midnight, ts)?;
1531                return Ok(Value::Interval {
1532                    months: m,
1533                    days: d,
1534                    micros: u,
1535                });
1536            }
1537            check_args(name, &evaluated, 2)?;
1538            if evaluated.iter().any(|v| v.is_null()) {
1539                return Ok(Value::Null);
1540            }
1541            let a = ts_of(&evaluated[0])?;
1542            let b = ts_of(&evaluated[1])?;
1543            let (m, d, u) = crate::datetime::age(a, b)?;
1544            Ok(Value::Interval {
1545                months: m,
1546                days: d,
1547                micros: u,
1548            })
1549        }
1550        "MAKE_DATE" => {
1551            check_args(name, &evaluated, 3)?;
1552            if evaluated.iter().any(|v| v.is_null()) {
1553                return Ok(Value::Null);
1554            }
1555            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
1556            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
1557            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
1558            crate::datetime::ymd_to_days(y, m, d)
1559                .map(Value::Date)
1560                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
1561        }
1562        "MAKE_TIME" => {
1563            check_args(name, &evaluated, 3)?;
1564            if evaluated.iter().any(|v| v.is_null()) {
1565                return Ok(Value::Null);
1566            }
1567            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
1568            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
1569            let (s, us) = real_sec_arg(&evaluated[2])?;
1570            crate::datetime::hmsn_to_micros(h, mi, s, us)
1571                .map(Value::Time)
1572                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
1573        }
1574        "MAKE_TIMESTAMP" => {
1575            check_args(name, &evaluated, 6)?;
1576            if evaluated.iter().any(|v| v.is_null()) {
1577                return Ok(Value::Null);
1578            }
1579            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
1580            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
1581            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
1582            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
1583            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
1584            let (s, us) = real_sec_arg(&evaluated[5])?;
1585            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
1586                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
1587            })?;
1588            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
1589                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
1590            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
1591        }
1592        "MAKE_INTERVAL" => {
1593            // Positional args: years, months, weeks, days, hours, mins, secs.
1594            if evaluated.len() > 7 {
1595                return Err(SqlError::InvalidValue(
1596                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
1597                ));
1598            }
1599            let mut months: i64 = 0;
1600            let mut days: i64 = 0;
1601            let mut micros: i64 = 0;
1602            for (i, v) in evaluated.iter().enumerate() {
1603                if v.is_null() {
1604                    continue;
1605                }
1606                let n = match v {
1607                    Value::Integer(n) => *n,
1608                    Value::Real(r) => *r as i64,
1609                    _ => {
1610                        return Err(SqlError::TypeMismatch {
1611                            expected: "numeric".into(),
1612                            got: v.data_type().to_string(),
1613                        })
1614                    }
1615                };
1616                match i {
1617                    0 => months = months.saturating_add(n.saturating_mul(12)),
1618                    1 => months = months.saturating_add(n),
1619                    2 => days = days.saturating_add(n.saturating_mul(7)),
1620                    3 => days = days.saturating_add(n),
1621                    4 => {
1622                        micros = micros
1623                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
1624                    }
1625                    5 => {
1626                        micros =
1627                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
1628                    }
1629                    6 => {
1630                        // Seconds may be fractional — also check Real.
1631                        if let Value::Real(r) = v {
1632                            micros = micros.saturating_add(
1633                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
1634                            );
1635                        } else {
1636                            micros = micros
1637                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
1638                        }
1639                    }
1640                    _ => unreachable!(),
1641                }
1642            }
1643            Ok(Value::Interval {
1644                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1645                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1646                micros,
1647            })
1648        }
1649        "JUSTIFY_DAYS" => {
1650            check_args(name, &evaluated, 1)?;
1651            match &evaluated[0] {
1652                Value::Null => Ok(Value::Null),
1653                Value::Interval {
1654                    months,
1655                    days,
1656                    micros,
1657                } => {
1658                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
1659                    Ok(Value::Interval {
1660                        months: m,
1661                        days: d,
1662                        micros: u,
1663                    })
1664                }
1665                other => Err(SqlError::TypeMismatch {
1666                    expected: "INTERVAL".into(),
1667                    got: other.data_type().to_string(),
1668                }),
1669            }
1670        }
1671        "JUSTIFY_HOURS" => {
1672            check_args(name, &evaluated, 1)?;
1673            match &evaluated[0] {
1674                Value::Null => Ok(Value::Null),
1675                Value::Interval {
1676                    months,
1677                    days,
1678                    micros,
1679                } => {
1680                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
1681                    Ok(Value::Interval {
1682                        months: m,
1683                        days: d,
1684                        micros: u,
1685                    })
1686                }
1687                other => Err(SqlError::TypeMismatch {
1688                    expected: "INTERVAL".into(),
1689                    got: other.data_type().to_string(),
1690                }),
1691            }
1692        }
1693        "JUSTIFY_INTERVAL" => {
1694            check_args(name, &evaluated, 1)?;
1695            match &evaluated[0] {
1696                Value::Null => Ok(Value::Null),
1697                Value::Interval {
1698                    months,
1699                    days,
1700                    micros,
1701                } => {
1702                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
1703                    Ok(Value::Interval {
1704                        months: m,
1705                        days: d,
1706                        micros: u,
1707                    })
1708                }
1709                other => Err(SqlError::TypeMismatch {
1710                    expected: "INTERVAL".into(),
1711                    got: other.data_type().to_string(),
1712                }),
1713            }
1714        }
1715        "ISFINITE" => {
1716            check_args(name, &evaluated, 1)?;
1717            if evaluated[0].is_null() {
1718                return Ok(Value::Null);
1719            }
1720            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
1721        }
1722        "DATE" => {
1723            if evaluated.is_empty() {
1724                return Err(SqlError::InvalidValue(
1725                    "DATE requires at least 1 argument".into(),
1726                ));
1727            }
1728            if evaluated[0].is_null() {
1729                return Ok(Value::Null);
1730            }
1731            let d = match &evaluated[0] {
1732                Value::Date(d) => *d,
1733                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
1734                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
1735                Value::Text(s) => crate::datetime::parse_date(s)?,
1736                Value::Integer(n) => {
1737                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
1738                }
1739                other => {
1740                    return Err(SqlError::TypeMismatch {
1741                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1742                        got: other.data_type().to_string(),
1743                    })
1744                }
1745            };
1746            Ok(Value::Date(d))
1747        }
1748        "TIME" => {
1749            if evaluated.is_empty() {
1750                return Err(SqlError::InvalidValue(
1751                    "TIME requires at least 1 argument".into(),
1752                ));
1753            }
1754            if evaluated[0].is_null() {
1755                return Ok(Value::Null);
1756            }
1757            let t = match &evaluated[0] {
1758                Value::Time(t) => *t,
1759                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
1760                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
1761                    crate::datetime::current_time_micros()
1762                }
1763                Value::Text(s) => crate::datetime::parse_time(s)?,
1764                other => {
1765                    return Err(SqlError::TypeMismatch {
1766                        expected: "TIMESTAMP, TIME, or TEXT".into(),
1767                        got: other.data_type().to_string(),
1768                    })
1769                }
1770            };
1771            Ok(Value::Time(t))
1772        }
1773        "DATETIME" => {
1774            if evaluated.is_empty() {
1775                return Err(SqlError::InvalidValue(
1776                    "DATETIME requires at least 1 argument".into(),
1777                ));
1778            }
1779            if evaluated[0].is_null() {
1780                return Ok(Value::Null);
1781            }
1782            let t = match &evaluated[0] {
1783                Value::Timestamp(t) => *t,
1784                Value::Date(d) => crate::datetime::date_to_ts(*d),
1785                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
1786                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
1787                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
1788                other => {
1789                    return Err(SqlError::TypeMismatch {
1790                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1791                        got: other.data_type().to_string(),
1792                    })
1793                }
1794            };
1795            Ok(Value::Timestamp(t))
1796        }
1797        "STRFTIME" => {
1798            if evaluated.len() < 2 {
1799                return Err(SqlError::InvalidValue(
1800                    "STRFTIME requires format + value".into(),
1801                ));
1802            }
1803            if evaluated.iter().take(2).any(|v| v.is_null()) {
1804                return Ok(Value::Null);
1805            }
1806            let fmt = match &evaluated[0] {
1807                Value::Text(s) => s.to_string(),
1808                _ => {
1809                    return Err(SqlError::TypeMismatch {
1810                        expected: "TEXT format".into(),
1811                        got: evaluated[0].data_type().to_string(),
1812                    })
1813                }
1814            };
1815            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
1816            Ok(Value::Text(out.into()))
1817        }
1818        "JULIANDAY" => {
1819            if evaluated.is_empty() {
1820                return Err(SqlError::InvalidValue(
1821                    "JULIANDAY requires at least 1 argument".into(),
1822                ));
1823            }
1824            if evaluated[0].is_null() {
1825                return Ok(Value::Null);
1826            }
1827            let micros = ts_of(&evaluated[0])?;
1828            let (days, tmicros) = crate::datetime::ts_split(micros);
1829            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
1830            let julian =
1831                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
1832            Ok(Value::Real(julian))
1833        }
1834        "UNIXEPOCH" => {
1835            if evaluated.is_empty() {
1836                return Err(SqlError::InvalidValue(
1837                    "UNIXEPOCH requires at least 1 argument".into(),
1838                ));
1839            }
1840            if evaluated[0].is_null() {
1841                return Ok(Value::Null);
1842            }
1843            let micros = ts_of(&evaluated[0])?;
1844            let subsec = evaluated
1845                .get(1)
1846                .and_then(|v| {
1847                    if let Value::Text(s) = v {
1848                        Some(s.to_string())
1849                    } else {
1850                        None
1851                    }
1852                })
1853                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
1854                .unwrap_or(false);
1855            if subsec {
1856                Ok(Value::Real(
1857                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
1858                ))
1859            } else {
1860                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
1861            }
1862        }
1863        "TIMEDIFF" => {
1864            check_args(name, &evaluated, 2)?;
1865            if evaluated.iter().any(|v| v.is_null()) {
1866                return Ok(Value::Null);
1867            }
1868            let a = ts_of(&evaluated[0])?;
1869            let b = ts_of(&evaluated[1])?;
1870            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
1871            let sign = if days < 0 || (days == 0 && micros < 0) {
1872                "-"
1873            } else {
1874                "+"
1875            };
1876            let abs_days = days.unsigned_abs() as i64;
1877            let abs_us = micros.unsigned_abs() as i64;
1878            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
1879            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
1880            Ok(Value::Text(
1881                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
1882            ))
1883        }
1884        "AT_TIMEZONE" => {
1885            check_args(name, &evaluated, 2)?;
1886            if evaluated.iter().any(|v| v.is_null()) {
1887                return Ok(Value::Null);
1888            }
1889            let ts = match &evaluated[0] {
1890                Value::Timestamp(t) => *t,
1891                Value::Date(d) => crate::datetime::date_to_ts(*d),
1892                other => {
1893                    return Err(SqlError::TypeMismatch {
1894                        expected: "TIMESTAMP or DATE".into(),
1895                        got: other.data_type().to_string(),
1896                    })
1897                }
1898            };
1899            let zone = match &evaluated[1] {
1900                Value::Text(s) => s.to_string(),
1901                _ => {
1902                    return Err(SqlError::TypeMismatch {
1903                        expected: "TEXT time zone".into(),
1904                        got: evaluated[1].data_type().to_string(),
1905                    })
1906                }
1907            };
1908            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
1909            let upper = zone.to_ascii_uppercase();
1910            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
1911                return Err(SqlError::InvalidTimezone(format!(
1912                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
1913                )));
1914            }
1915            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
1916            Ok(Value::Text(formatted.into()))
1917        }
1918        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
1919    }
1920}
1921
1922/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
1923fn ts_of(v: &Value) -> Result<i64> {
1924    match v {
1925        Value::Timestamp(t) => Ok(*t),
1926        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
1927        _ => Err(SqlError::TypeMismatch {
1928            expected: "TIMESTAMP or DATE".into(),
1929            got: v.data_type().to_string(),
1930        }),
1931    }
1932}
1933
1934fn int_arg(v: &Value, label: &str) -> Result<i64> {
1935    match v {
1936        Value::Integer(n) => Ok(*n),
1937        _ => Err(SqlError::TypeMismatch {
1938            expected: format!("INTEGER ({label})"),
1939            got: v.data_type().to_string(),
1940        }),
1941    }
1942}
1943
1944/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
1945fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
1946    match v {
1947        Value::Integer(n) => {
1948            if !(0..=60).contains(n) {
1949                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
1950            }
1951            Ok((*n as u8, 0))
1952        }
1953        Value::Real(r) => {
1954            let whole = r.trunc() as i64;
1955            if !(0..=60).contains(&whole) {
1956                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
1957            }
1958            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
1959            Ok((whole as u8, frac.max(0) as u32))
1960        }
1961        _ => Err(SqlError::TypeMismatch {
1962            expected: "numeric seconds".into(),
1963            got: v.data_type().to_string(),
1964        }),
1965    }
1966}
1967
1968/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
1969fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
1970    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
1971    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
1972    let ts = JTimestamp::from_microsecond(ts_utc)
1973        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
1974    let zoned = ts.to_zoned(zone.clone());
1975    let unit_lower = unit.to_ascii_lowercase();
1976    let rounded = match unit_lower.as_str() {
1977        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
1978        "second" => zoned
1979            .start_of_day()
1980            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
1981        _ => {
1982            let naive_ts = zoned.timestamp().as_microsecond();
1983            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
1984        }
1985    };
1986    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
1987}
1988
1989fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
1990    if args.len() != expected {
1991        Err(SqlError::InvalidValue(format!(
1992            "{name} requires {expected} argument(s), got {}",
1993            args.len()
1994        )))
1995    } else {
1996        Ok(())
1997    }
1998}
1999
2000pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
2001    let mut indices = Vec::new();
2002    collect_column_refs(expr, columns, &mut indices);
2003    indices.sort_unstable();
2004    indices.dedup();
2005    indices
2006}
2007
2008fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
2009    match expr {
2010        Expr::Column(name) => {
2011            for (i, c) in columns.iter().enumerate() {
2012                if c.name == *name || c.name.ends_with(&format!(".{name}")) {
2013                    out.push(i);
2014                    break;
2015                }
2016            }
2017        }
2018        Expr::QualifiedColumn { table, column } => {
2019            let qualified = format!("{table}.{column}");
2020            if let Some(idx) = columns.iter().position(|c| c.name == qualified) {
2021                out.push(idx);
2022            } else {
2023                let matches: Vec<usize> = columns
2024                    .iter()
2025                    .enumerate()
2026                    .filter(|(_, c)| c.name == *column)
2027                    .map(|(i, _)| i)
2028                    .collect();
2029                if matches.len() == 1 {
2030                    out.push(matches[0]);
2031                }
2032            }
2033        }
2034        Expr::BinaryOp { left, right, .. } => {
2035            collect_column_refs(left, columns, out);
2036            collect_column_refs(right, columns, out);
2037        }
2038        Expr::UnaryOp { expr, .. } => {
2039            collect_column_refs(expr, columns, out);
2040        }
2041        Expr::IsNull(e) | Expr::IsNotNull(e) => {
2042            collect_column_refs(e, columns, out);
2043        }
2044        Expr::Function { args, .. } => {
2045            for arg in args {
2046                collect_column_refs(arg, columns, out);
2047            }
2048        }
2049        Expr::InSubquery { expr, .. } => {
2050            collect_column_refs(expr, columns, out);
2051        }
2052        Expr::InList { expr, list, .. } => {
2053            collect_column_refs(expr, columns, out);
2054            for item in list {
2055                collect_column_refs(item, columns, out);
2056            }
2057        }
2058        Expr::InSet { expr, .. } => {
2059            collect_column_refs(expr, columns, out);
2060        }
2061        Expr::Between {
2062            expr, low, high, ..
2063        } => {
2064            collect_column_refs(expr, columns, out);
2065            collect_column_refs(low, columns, out);
2066            collect_column_refs(high, columns, out);
2067        }
2068        Expr::Like {
2069            expr,
2070            pattern,
2071            escape,
2072            ..
2073        } => {
2074            collect_column_refs(expr, columns, out);
2075            collect_column_refs(pattern, columns, out);
2076            if let Some(esc) = escape {
2077                collect_column_refs(esc, columns, out);
2078            }
2079        }
2080        Expr::Case {
2081            operand,
2082            conditions,
2083            else_result,
2084        } => {
2085            if let Some(op) = operand {
2086                collect_column_refs(op, columns, out);
2087            }
2088            for (when, then) in conditions {
2089                collect_column_refs(when, columns, out);
2090                collect_column_refs(then, columns, out);
2091            }
2092            if let Some(e) = else_result {
2093                collect_column_refs(e, columns, out);
2094            }
2095        }
2096        Expr::Coalesce(args) => {
2097            for arg in args {
2098                collect_column_refs(arg, columns, out);
2099            }
2100        }
2101        Expr::Cast { expr, .. } => {
2102            collect_column_refs(expr, columns, out);
2103        }
2104        Expr::WindowFunction { args, spec, .. } => {
2105            for arg in args {
2106                collect_column_refs(arg, columns, out);
2107            }
2108            for pb in &spec.partition_by {
2109                collect_column_refs(pb, columns, out);
2110            }
2111            for ob in &spec.order_by {
2112                collect_column_refs(&ob.expr, columns, out);
2113            }
2114        }
2115        Expr::Literal(_)
2116        | Expr::Parameter(_)
2117        | Expr::CountStar
2118        | Expr::Exists { .. }
2119        | Expr::ScalarSubquery(_) => {}
2120    }
2121}
2122
2123/// Check if an expression result is truthy (for WHERE/HAVING).
2124pub fn is_truthy(val: &Value) -> bool {
2125    match val {
2126        Value::Boolean(b) => *b,
2127        Value::Integer(i) => *i != 0,
2128        Value::Null => false,
2129        _ => true,
2130    }
2131}
2132
2133#[cfg(test)]
2134mod tests {
2135    use super::*;
2136    use crate::types::DataType;
2137
2138    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
2139        ColumnDef {
2140            name: name.into(),
2141            data_type: dt,
2142            nullable,
2143            position: pos,
2144            default_expr: None,
2145            default_sql: None,
2146            check_expr: None,
2147            check_sql: None,
2148            check_name: None,
2149            is_with_timezone: false,
2150        }
2151    }
2152
2153    fn test_columns() -> Vec<ColumnDef> {
2154        vec![
2155            col("id", DataType::Integer, false, 0),
2156            col("name", DataType::Text, true, 1),
2157            col("score", DataType::Real, true, 2),
2158            col("active", DataType::Boolean, false, 3),
2159        ]
2160    }
2161
2162    fn test_row() -> Vec<Value> {
2163        vec![
2164            Value::Integer(1),
2165            Value::Text("Alice".into()),
2166            Value::Real(95.5),
2167            Value::Boolean(true),
2168        ]
2169    }
2170
2171    #[test]
2172    fn eval_literal() {
2173        let cols = test_columns();
2174        let cm = ColumnMap::new(&cols);
2175        let row = test_row();
2176        let expr = Expr::Literal(Value::Integer(42));
2177        assert_eq!(
2178            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2179            Value::Integer(42)
2180        );
2181    }
2182
2183    #[test]
2184    fn eval_column_ref() {
2185        let cols = test_columns();
2186        let cm = ColumnMap::new(&cols);
2187        let row = test_row();
2188        let expr = Expr::Column("name".into());
2189        assert_eq!(
2190            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2191            Value::Text("Alice".into())
2192        );
2193    }
2194
2195    #[test]
2196    fn eval_column_case_insensitive() {
2197        let cols = test_columns();
2198        let cm = ColumnMap::new(&cols);
2199        let row = test_row();
2200        let expr = Expr::Column("name".into());
2201        assert_eq!(
2202            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2203            Value::Text("Alice".into())
2204        );
2205    }
2206
2207    #[test]
2208    fn eval_arithmetic_int() {
2209        let cols = test_columns();
2210        let cm = ColumnMap::new(&cols);
2211        let row = test_row();
2212        let expr = Expr::BinaryOp {
2213            left: Box::new(Expr::Column("id".into())),
2214            op: BinOp::Add,
2215            right: Box::new(Expr::Literal(Value::Integer(10))),
2216        };
2217        assert_eq!(
2218            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2219            Value::Integer(11)
2220        );
2221    }
2222
2223    #[test]
2224    fn eval_comparison() {
2225        let cols = test_columns();
2226        let cm = ColumnMap::new(&cols);
2227        let row = test_row();
2228        let expr = Expr::BinaryOp {
2229            left: Box::new(Expr::Column("score".into())),
2230            op: BinOp::Gt,
2231            right: Box::new(Expr::Literal(Value::Real(90.0))),
2232        };
2233        assert_eq!(
2234            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2235            Value::Boolean(true)
2236        );
2237    }
2238
2239    #[test]
2240    fn eval_null_propagation() {
2241        let cols = test_columns();
2242        let cm = ColumnMap::new(&cols);
2243        let row = vec![
2244            Value::Integer(1),
2245            Value::Null,
2246            Value::Null,
2247            Value::Boolean(true),
2248        ];
2249        let expr = Expr::BinaryOp {
2250            left: Box::new(Expr::Column("name".into())),
2251            op: BinOp::Eq,
2252            right: Box::new(Expr::Literal(Value::Text("test".into()))),
2253        };
2254        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2255            .unwrap()
2256            .is_null());
2257    }
2258
2259    #[test]
2260    fn eval_and_three_valued() {
2261        let cols = test_columns();
2262        let cm = ColumnMap::new(&cols);
2263        let row = vec![
2264            Value::Integer(1),
2265            Value::Null,
2266            Value::Null,
2267            Value::Boolean(true),
2268        ];
2269
2270        // NULL AND false = false
2271        let expr = Expr::BinaryOp {
2272            left: Box::new(Expr::Column("name".into())),
2273            op: BinOp::And,
2274            right: Box::new(Expr::Literal(Value::Boolean(false))),
2275        };
2276        assert_eq!(
2277            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2278            Value::Boolean(false)
2279        );
2280
2281        // NULL AND true = NULL
2282        let expr = Expr::BinaryOp {
2283            left: Box::new(Expr::Column("name".into())),
2284            op: BinOp::And,
2285            right: Box::new(Expr::Literal(Value::Boolean(true))),
2286        };
2287        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2288            .unwrap()
2289            .is_null());
2290    }
2291
2292    #[test]
2293    fn eval_or_three_valued() {
2294        let cols = test_columns();
2295        let cm = ColumnMap::new(&cols);
2296        let row = vec![
2297            Value::Integer(1),
2298            Value::Null,
2299            Value::Null,
2300            Value::Boolean(true),
2301        ];
2302
2303        // NULL OR true = true
2304        let expr = Expr::BinaryOp {
2305            left: Box::new(Expr::Column("name".into())),
2306            op: BinOp::Or,
2307            right: Box::new(Expr::Literal(Value::Boolean(true))),
2308        };
2309        assert_eq!(
2310            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2311            Value::Boolean(true)
2312        );
2313
2314        // NULL OR false = NULL
2315        let expr = Expr::BinaryOp {
2316            left: Box::new(Expr::Column("name".into())),
2317            op: BinOp::Or,
2318            right: Box::new(Expr::Literal(Value::Boolean(false))),
2319        };
2320        assert!(eval_expr(&expr, &EvalCtx::new(&cm, &row))
2321            .unwrap()
2322            .is_null());
2323    }
2324
2325    #[test]
2326    fn eval_is_null() {
2327        let cols = test_columns();
2328        let cm = ColumnMap::new(&cols);
2329        let row = vec![
2330            Value::Integer(1),
2331            Value::Null,
2332            Value::Null,
2333            Value::Boolean(true),
2334        ];
2335        let expr = Expr::IsNull(Box::new(Expr::Column("name".into())));
2336        assert_eq!(
2337            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2338            Value::Boolean(true)
2339        );
2340
2341        let expr = Expr::IsNotNull(Box::new(Expr::Column("id".into())));
2342        assert_eq!(
2343            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2344            Value::Boolean(true)
2345        );
2346    }
2347
2348    #[test]
2349    fn eval_not() {
2350        let cols = test_columns();
2351        let cm = ColumnMap::new(&cols);
2352        let row = test_row();
2353        let expr = Expr::UnaryOp {
2354            op: UnaryOp::Not,
2355            expr: Box::new(Expr::Column("active".into())),
2356        };
2357        assert_eq!(
2358            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2359            Value::Boolean(false)
2360        );
2361    }
2362
2363    #[test]
2364    fn eval_neg() {
2365        let cols = test_columns();
2366        let cm = ColumnMap::new(&cols);
2367        let row = test_row();
2368        let expr = Expr::UnaryOp {
2369            op: UnaryOp::Neg,
2370            expr: Box::new(Expr::Column("id".into())),
2371        };
2372        assert_eq!(
2373            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2374            Value::Integer(-1)
2375        );
2376    }
2377
2378    #[test]
2379    fn eval_division_by_zero() {
2380        let cols = test_columns();
2381        let cm = ColumnMap::new(&cols);
2382        let row = test_row();
2383        let expr = Expr::BinaryOp {
2384            left: Box::new(Expr::Column("id".into())),
2385            op: BinOp::Div,
2386            right: Box::new(Expr::Literal(Value::Integer(0))),
2387        };
2388        assert!(matches!(
2389            eval_expr(&expr, &EvalCtx::new(&cm, &row)),
2390            Err(SqlError::DivisionByZero)
2391        ));
2392    }
2393
2394    #[test]
2395    fn eval_mixed_numeric() {
2396        let cols = test_columns();
2397        let cm = ColumnMap::new(&cols);
2398        let row = test_row();
2399        // id (int 1) + score (real 95.5) = real 96.5
2400        let expr = Expr::BinaryOp {
2401            left: Box::new(Expr::Column("id".into())),
2402            op: BinOp::Add,
2403            right: Box::new(Expr::Column("score".into())),
2404        };
2405        assert_eq!(
2406            eval_expr(&expr, &EvalCtx::new(&cm, &row)).unwrap(),
2407            Value::Real(96.5)
2408        );
2409    }
2410
2411    #[test]
2412    fn is_truthy_values() {
2413        assert!(is_truthy(&Value::Boolean(true)));
2414        assert!(!is_truthy(&Value::Boolean(false)));
2415        assert!(!is_truthy(&Value::Null));
2416        assert!(is_truthy(&Value::Integer(1)));
2417        assert!(!is_truthy(&Value::Integer(0)));
2418    }
2419}