Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9pub struct ColumnMap {
10    exact: FxHashMap<String, usize>,
11    short: FxHashMap<String, ShortMatch>,
12    collations: Vec<crate::types::Collation>,
13    has_non_binary_collation: bool,
14}
15
16#[derive(Clone)]
17enum ShortMatch {
18    Unique(usize),
19    Ambiguous,
20}
21
22impl Clone for ColumnMap {
23    fn clone(&self) -> Self {
24        Self {
25            exact: self.exact.clone(),
26            short: self.short.clone(),
27            collations: self.collations.clone(),
28            has_non_binary_collation: self.has_non_binary_collation,
29        }
30    }
31}
32
33impl ColumnMap {
34    pub fn new(columns: &[ColumnDef]) -> Self {
35        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
36        let mut short: FxHashMap<String, ShortMatch> =
37            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
38        let mut collations = Vec::with_capacity(columns.len());
39        let mut has_non_binary_collation = false;
40
41        for (i, col) in columns.iter().enumerate() {
42            let lower = col.name.to_ascii_lowercase();
43            exact.insert(lower.clone(), i);
44
45            let unqualified = if let Some(dot) = lower.rfind('.') {
46                &lower[dot + 1..]
47            } else {
48                &lower
49            };
50            short
51                .entry(unqualified.to_string())
52                .and_modify(|e| *e = ShortMatch::Ambiguous)
53                .or_insert(ShortMatch::Unique(i));
54            collations.push(col.collation);
55            if col.collation != crate::types::Collation::Binary {
56                has_non_binary_collation = true;
57            }
58        }
59
60        Self {
61            exact,
62            short,
63            collations,
64            has_non_binary_collation,
65        }
66    }
67
68    pub(crate) fn collation_at(&self, idx: usize) -> crate::types::Collation {
69        self.collations
70            .get(idx)
71            .copied()
72            .unwrap_or(crate::types::Collation::Binary)
73    }
74
75    #[inline]
76    pub(crate) fn has_non_binary_collation(&self) -> bool {
77        self.has_non_binary_collation
78    }
79
80    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
81        if let Some(&idx) = self.exact.get(name) {
82            return Ok(idx);
83        }
84        match self.short.get(name) {
85            Some(ShortMatch::Unique(idx)) => Ok(*idx),
86            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
87            None => Err(SqlError::ColumnNotFound(name.to_string())),
88        }
89    }
90
91    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
92        let qualified = format!("{table}.{column}");
93        if let Some(&idx) = self.exact.get(&qualified) {
94            return Ok(idx);
95        }
96        match self.short.get(column) {
97            Some(ShortMatch::Unique(idx)) => Ok(*idx),
98            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
99        }
100    }
101}
102
103pub struct EvalCtx<'a> {
104    pub col_map: &'a ColumnMap,
105    pub row: &'a [Value],
106    pub params: &'a [Value],
107    pub excluded: Option<ExcludedRow<'a>>,
108    pub old_new: Option<OldNewRows<'a>>,
109}
110
111pub struct ExcludedRow<'a> {
112    pub col_map: &'a ColumnMap,
113    pub row: &'a [Value],
114}
115
116pub struct OldNewRows<'a> {
117    pub col_map: &'a ColumnMap,
118    pub old_row: Option<&'a [Value]>,
119    pub new_row: Option<&'a [Value]>,
120}
121
122impl<'a> EvalCtx<'a> {
123    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
124        Self {
125            col_map,
126            row,
127            params: &[],
128            excluded: None,
129            old_new: None,
130        }
131    }
132
133    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
134        Self {
135            col_map,
136            row,
137            params,
138            excluded: None,
139            old_new: None,
140        }
141    }
142
143    pub fn with_excluded(
144        col_map: &'a ColumnMap,
145        row: &'a [Value],
146        excluded_col_map: &'a ColumnMap,
147        excluded_row: &'a [Value],
148    ) -> Self {
149        Self {
150            col_map,
151            row,
152            params: &[],
153            excluded: Some(ExcludedRow {
154                col_map: excluded_col_map,
155                row: excluded_row,
156            }),
157            old_new: None,
158        }
159    }
160
161    pub fn with_old_new(
162        col_map: &'a ColumnMap,
163        row: &'a [Value],
164        old_row: Option<&'a [Value]>,
165        new_row: Option<&'a [Value]>,
166    ) -> Self {
167        Self {
168            col_map,
169            row,
170            params: &[],
171            excluded: None,
172            old_new: Some(OldNewRows {
173                col_map,
174                old_row,
175                new_row,
176            }),
177        }
178    }
179}
180
181thread_local! {
182    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
183        const { std::cell::Cell::new((std::ptr::null(), 0)) };
184}
185
186/// Install positional parameters for `Expr::Parameter` resolution during `f`.
187pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
188    struct Guard((*const Value, usize));
189    impl Drop for Guard {
190        fn drop(&mut self) {
191            SCOPED_PARAMS.with(|slot| slot.set(self.0));
192        }
193    }
194    SCOPED_PARAMS.with(|slot| {
195        let prev = slot.get();
196        slot.set((params.as_ptr(), params.len()));
197        let _guard = Guard(prev);
198        f()
199    })
200}
201
202fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
203    if !ctx_params.is_empty() {
204        if n == 0 || n > ctx_params.len() {
205            return Err(SqlError::ParameterCountMismatch {
206                expected: n,
207                got: ctx_params.len(),
208            });
209        }
210        return Ok(ctx_params[n - 1].clone());
211    }
212    resolve_scoped_param(n)
213}
214
215pub fn resolve_scoped_param(n: usize) -> Result<Value> {
216    SCOPED_PARAMS.with(|slot| {
217        let (ptr, len) = slot.get();
218        if n == 0 || n > len {
219            return Err(SqlError::ParameterCountMismatch {
220                expected: n,
221                got: len,
222            });
223        }
224        // SAFETY: `with_scoped_params` keeps the slice alive for the duration of `f()`
225        // and restores the previous pointer on return. Reads only happen inside `f()`.
226        unsafe { Ok((*ptr.add(n - 1)).clone()) }
227    })
228}
229
230pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
231    match expr {
232        Expr::Literal(v) => Ok(v.clone()),
233
234        Expr::Column(name) => {
235            let idx = ctx.col_map.resolve(name)?;
236            Ok(ctx.row[idx].clone())
237        }
238
239        Expr::QualifiedColumn { table, column } => {
240            if let Some(excluded) = ctx.excluded.as_ref() {
241                if table.eq_ignore_ascii_case("excluded") {
242                    let lowered = column.to_ascii_lowercase();
243                    let idx = excluded.col_map.resolve(&lowered)?;
244                    return Ok(excluded.row[idx].clone());
245                }
246            }
247            if let Some(on) = ctx.old_new.as_ref() {
248                if table.eq_ignore_ascii_case("old") {
249                    let lowered = column.to_ascii_lowercase();
250                    let idx = on.col_map.resolve(&lowered)?;
251                    return Ok(on.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
252                }
253                if table.eq_ignore_ascii_case("new") {
254                    let lowered = column.to_ascii_lowercase();
255                    let idx = on.col_map.resolve(&lowered)?;
256                    return Ok(on.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
257                }
258            }
259            let idx = ctx.col_map.resolve_qualified(table, column)?;
260            Ok(ctx.row[idx].clone())
261        }
262
263        Expr::BinaryOp { left, op, right } => {
264            let lval = eval_expr(left, ctx)?;
265            let rval = eval_expr(right, ctx)?;
266            let needs_collation_check = ctx.col_map.has_non_binary_collation()
267                || matches!(left.as_ref(), Expr::Collate { .. })
268                || matches!(right.as_ref(), Expr::Collate { .. });
269            if needs_collation_check {
270                let coll = collation_of(left)
271                    .or_else(|| collation_of(right))
272                    .or_else(|| {
273                        column_collation(left, ctx).or_else(|| column_collation(right, ctx))
274                    });
275                if let Some(c) = coll {
276                    if c != crate::types::Collation::Binary {
277                        if let Some(b) = eval_text_compare(&lval, *op, &rval, c) {
278                            return Ok(Value::Boolean(b));
279                        }
280                    }
281                }
282            }
283            eval_binary_op(&lval, *op, &rval)
284        }
285
286        Expr::UnaryOp { op, expr } => {
287            let val = eval_expr(expr, ctx)?;
288            eval_unary_op(*op, &val)
289        }
290
291        Expr::IsNull(e) => {
292            let val = eval_expr(e, ctx)?;
293            Ok(Value::Boolean(val.is_null()))
294        }
295
296        Expr::IsNotNull(e) => {
297            let val = eval_expr(e, ctx)?;
298            Ok(Value::Boolean(!val.is_null()))
299        }
300
301        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
302
303        Expr::CountStar => Err(SqlError::Unsupported(
304            "COUNT(*) in non-aggregate context".into(),
305        )),
306
307        Expr::InList {
308            expr: e,
309            list,
310            negated,
311        } => {
312            let lhs = eval_expr(e, ctx)?;
313            eval_in_values(&lhs, list, ctx, *negated)
314        }
315
316        Expr::InSet {
317            expr: e,
318            values,
319            has_null,
320            negated,
321        } => {
322            let lhs = eval_expr(e, ctx)?;
323            eval_in_set(&lhs, values, *has_null, *negated)
324        }
325
326        Expr::Between {
327            expr: e,
328            low,
329            high,
330            negated,
331        } => {
332            let val = eval_expr(e, ctx)?;
333            let lo = eval_expr(low, ctx)?;
334            let hi = eval_expr(high, ctx)?;
335            eval_between(&val, &lo, &hi, *negated)
336        }
337
338        Expr::Like {
339            expr: e,
340            pattern,
341            escape,
342            negated,
343        } => {
344            let val = eval_expr(e, ctx)?;
345            let pat = eval_expr(pattern, ctx)?;
346            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
347            eval_like(&val, &pat, esc.as_ref(), *negated)
348        }
349
350        Expr::Case {
351            operand,
352            conditions,
353            else_result,
354        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
355
356        Expr::Coalesce(args) => {
357            for arg in args {
358                let val = eval_expr(arg, ctx)?;
359                if !val.is_null() {
360                    return Ok(val);
361                }
362            }
363            Ok(Value::Null)
364        }
365
366        Expr::Cast { expr: e, data_type } => {
367            let val = eval_expr(e, ctx)?;
368            eval_cast(&val, *data_type)
369        }
370
371        Expr::Collate { expr: e, .. } => eval_expr(e, ctx),
372
373        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
374            SqlError::Unsupported("subquery not materialized (internal error)".into()),
375        ),
376
377        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
378
379        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
380            "window functions are only allowed in SELECT columns".into(),
381        )),
382    }
383}
384
385/// Planner-level constant folding hook; shares semantics with row evaluation.
386fn collation_of(expr: &Expr) -> Option<crate::types::Collation> {
387    match expr {
388        Expr::Collate { collation, .. } => Some(*collation),
389        _ => None,
390    }
391}
392
393fn column_collation(expr: &Expr, ctx: &EvalCtx<'_>) -> Option<crate::types::Collation> {
394    match expr {
395        Expr::Column(name) => ctx
396            .col_map
397            .resolve(name)
398            .ok()
399            .map(|i| ctx.col_map.collation_at(i)),
400        Expr::QualifiedColumn { table, column } => ctx
401            .col_map
402            .resolve_qualified(table, column)
403            .ok()
404            .map(|i| ctx.col_map.collation_at(i)),
405        _ => None,
406    }
407}
408
409fn eval_text_compare(
410    left: &Value,
411    op: BinOp,
412    right: &Value,
413    coll: crate::types::Collation,
414) -> Option<bool> {
415    let (a, b) = match (left, right) {
416        (Value::Null, _) | (_, Value::Null) => return None,
417        (Value::Text(a), Value::Text(b)) => (a.as_str(), b.as_str()),
418        _ => return None,
419    };
420    let ord = coll.cmp_text(a, b);
421    Some(match op {
422        BinOp::Eq => ord == std::cmp::Ordering::Equal,
423        BinOp::NotEq => ord != std::cmp::Ordering::Equal,
424        BinOp::Lt => ord == std::cmp::Ordering::Less,
425        BinOp::Gt => ord == std::cmp::Ordering::Greater,
426        BinOp::LtEq => ord != std::cmp::Ordering::Greater,
427        BinOp::GtEq => ord != std::cmp::Ordering::Less,
428        _ => return None,
429    })
430}
431
432pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
433    eval_binary_op(left, op, right)
434}
435
436fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
437    match op {
438        BinOp::And => return eval_and(left, right),
439        BinOp::Or => return eval_or(left, right),
440        _ => {}
441    }
442
443    if left.is_null() || right.is_null() {
444        return Ok(Value::Null);
445    }
446
447    if let Some(res) = eval_temporal_op(left, op, right) {
448        return res;
449    }
450
451    match op {
452        BinOp::Eq => Ok(Value::Boolean(left == right)),
453        BinOp::NotEq => Ok(Value::Boolean(left != right)),
454        BinOp::Lt => Ok(Value::Boolean(left < right)),
455        BinOp::Gt => Ok(Value::Boolean(left > right)),
456        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
457        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
458        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
459        BinOp::Sub => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
460        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
461        BinOp::Div => {
462            match right {
463                Value::Integer(0) => return Err(SqlError::DivisionByZero),
464                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
465                _ => {}
466            }
467            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
468        }
469        BinOp::Mod => {
470            match right {
471                Value::Integer(0) => return Err(SqlError::DivisionByZero),
472                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
473                _ => {}
474            }
475            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
476        }
477        BinOp::Concat => {
478            let ls = value_to_text(left);
479            let rs = value_to_text(right);
480            Ok(Value::Text(format!("{ls}{rs}").into()))
481        }
482        BinOp::And | BinOp::Or => unreachable!(),
483    }
484}
485
486/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
487fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
488    use crate::datetime as dt;
489    use std::cmp::Ordering;
490
491    let is_temporal = |v: &Value| {
492        matches!(
493            v,
494            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
495        )
496    };
497    if matches!(op, BinOp::Add | BinOp::Sub)
498        && ((is_temporal(left) && matches!(right, Value::Real(_)))
499            || (matches!(left, Value::Real(_)) && is_temporal(right)))
500    {
501        return Some(Err(SqlError::TypeMismatch {
502            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
503            got: format!("{} and {}", left.data_type(), right.data_type()),
504        }));
505    }
506
507    match (left, op, right) {
508        (Value::Date(d), BinOp::Add, Value::Integer(n))
509        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
510            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
511        }
512        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
513            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
514        }
515        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
516            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
517        }
518        // DATE ± INTERVAL → TIMESTAMP (PG rule).
519        (
520            Value::Date(d),
521            BinOp::Add,
522            Value::Interval {
523                months,
524                days,
525                micros,
526            },
527        )
528        | (
529            Value::Interval {
530                months,
531                days,
532                micros,
533            },
534            BinOp::Add,
535            Value::Date(d),
536        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
537        (
538            Value::Date(d),
539            BinOp::Sub,
540            Value::Interval {
541                months,
542                days,
543                micros,
544            },
545        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
546        (
547            Value::Timestamp(t),
548            BinOp::Add,
549            Value::Interval {
550                months,
551                days,
552                micros,
553            },
554        )
555        | (
556            Value::Interval {
557                months,
558                days,
559                micros,
560            },
561            BinOp::Add,
562            Value::Timestamp(t),
563        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
564        (
565            Value::Timestamp(t),
566            BinOp::Sub,
567            Value::Interval {
568                months,
569                days,
570                micros,
571            },
572        ) => Some(
573            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
574        ),
575        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
576            let (days, micros) = dt::subtract_timestamps(*a, *b);
577            Some(Ok(Value::Interval {
578                months: 0,
579                days,
580                micros,
581            }))
582        }
583        (
584            Value::Time(t),
585            BinOp::Add,
586            Value::Interval {
587                months,
588                days,
589                micros,
590            },
591        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
592        (
593            Value::Time(t),
594            BinOp::Sub,
595            Value::Interval {
596                months,
597                days,
598                micros,
599            },
600        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
601        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
602            months: 0,
603            days: 0,
604            micros: *a - *b,
605        })),
606        (
607            Value::Interval {
608                months: am,
609                days: ad,
610                micros: au,
611            },
612            BinOp::Add,
613            Value::Interval {
614                months: bm,
615                days: bd,
616                micros: bu,
617            },
618        ) => Some(Ok(Value::Interval {
619            months: am.saturating_add(*bm),
620            days: ad.saturating_add(*bd),
621            micros: au.saturating_add(*bu),
622        })),
623        (
624            Value::Interval {
625                months: am,
626                days: ad,
627                micros: au,
628            },
629            BinOp::Sub,
630            Value::Interval {
631                months: bm,
632                days: bd,
633                micros: bu,
634            },
635        ) => Some(Ok(Value::Interval {
636            months: am.saturating_sub(*bm),
637            days: ad.saturating_sub(*bd),
638            micros: au.saturating_sub(*bu),
639        })),
640        (
641            Value::Interval {
642                months,
643                days,
644                micros,
645            },
646            BinOp::Mul,
647            Value::Integer(n),
648        )
649        | (
650            Value::Integer(n),
651            BinOp::Mul,
652            Value::Interval {
653                months,
654                days,
655                micros,
656            },
657        ) => {
658            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
659            Some(Ok(Value::Interval {
660                months: months.saturating_mul(n32),
661                days: days.saturating_mul(n32),
662                micros: micros.saturating_mul(*n),
663            }))
664        }
665        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
666        (
667            Value::Interval {
668                months,
669                days,
670                micros,
671            },
672            BinOp::Mul,
673            Value::Real(r),
674        )
675        | (
676            Value::Real(r),
677            BinOp::Mul,
678            Value::Interval {
679                months,
680                days,
681                micros,
682            },
683        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
684        (
685            Value::Interval {
686                months,
687                days,
688                micros,
689            },
690            BinOp::Div,
691            Value::Integer(n),
692        ) if *n != 0 => Some(Ok(Value::Interval {
693            months: (*months as i64 / *n) as i32,
694            days: (*days as i64 / *n) as i32,
695            micros: *micros / *n,
696        })),
697        (
698            Value::Interval {
699                months,
700                days,
701                micros,
702            },
703            BinOp::Div,
704            Value::Real(r),
705        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
706        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
707        (
708            Value::Interval {
709                months: am,
710                days: ad,
711                micros: au,
712            },
713            op,
714            Value::Interval {
715                months: bm,
716                days: bd,
717                micros: bu,
718            },
719        ) if matches!(
720            op,
721            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
722        ) =>
723        {
724            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
725            let b = match op {
726                BinOp::Eq => ord == Ordering::Equal,
727                BinOp::NotEq => ord != Ordering::Equal,
728                BinOp::Lt => ord == Ordering::Less,
729                BinOp::Gt => ord == Ordering::Greater,
730                BinOp::LtEq => ord != Ordering::Greater,
731                BinOp::GtEq => ord != Ordering::Less,
732                _ => unreachable!(),
733            };
734            Some(Ok(Value::Boolean(b)))
735        }
736        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
737        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
738        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
739            Some(Err(SqlError::TypeMismatch {
740                expected: "INTERVAL (use CAST or explicit unit)".into(),
741                got: format!("{} and {}", left.data_type(), right.data_type()),
742            }))
743        }
744        _ => None,
745    }
746}
747
748/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
749fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
750    let raw_months = months as f64 * factor;
751    let whole_months = raw_months.trunc() as i64;
752    let frac_months = raw_months - whole_months as f64;
753    let months_frac_as_days = frac_months * 30.0;
754
755    let raw_days = days as f64 * factor + months_frac_as_days;
756    let whole_days = raw_days.trunc() as i64;
757    let frac_days = raw_days - whole_days as f64;
758    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
759
760    let raw_micros = (micros as f64 * factor).round() as i64;
761    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
762
763    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
764    Value::Interval {
765        months: clamp_i32(whole_months),
766        days: clamp_i32(whole_days),
767        micros: total_micros,
768    }
769}
770
771/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
772fn eval_and(left: &Value, right: &Value) -> Result<Value> {
773    let l = to_bool_or_null(left)?;
774    let r = to_bool_or_null(right)?;
775    match (l, r) {
776        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
777        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
778        _ => Ok(Value::Null),
779    }
780}
781
782/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
783fn eval_or(left: &Value, right: &Value) -> Result<Value> {
784    let l = to_bool_or_null(left)?;
785    let r = to_bool_or_null(right)?;
786    match (l, r) {
787        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
788        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
789        _ => Ok(Value::Null),
790    }
791}
792
793fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
794    match val {
795        Value::Boolean(b) => Ok(Some(*b)),
796        Value::Null => Ok(None),
797        Value::Integer(i) => Ok(Some(*i != 0)),
798        _ => Err(SqlError::TypeMismatch {
799            expected: "BOOLEAN".into(),
800            got: format!("{}", val.data_type()),
801        }),
802    }
803}
804
805fn eval_arithmetic(
806    left: &Value,
807    right: &Value,
808    int_op: fn(i64, i64) -> Option<i64>,
809    real_op: fn(f64, f64) -> f64,
810) -> Result<Value> {
811    match (left, right) {
812        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
813            .map(Value::Integer)
814            .ok_or(SqlError::IntegerOverflow),
815        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
816        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
817        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
818        _ => Err(SqlError::TypeMismatch {
819            expected: "numeric".into(),
820            got: format!("{} and {}", left.data_type(), right.data_type()),
821        }),
822    }
823}
824
825fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
826    if list.is_empty() {
827        return Ok(Value::Boolean(negated));
828    }
829    if lhs.is_null() {
830        return Ok(Value::Null);
831    }
832    let mut has_null = false;
833    for item in list {
834        let rhs = eval_expr(item, ctx)?;
835        if rhs.is_null() {
836            has_null = true;
837        } else if lhs == &rhs {
838            return Ok(Value::Boolean(!negated));
839        }
840    }
841    if has_null {
842        Ok(Value::Null)
843    } else {
844        Ok(Value::Boolean(negated))
845    }
846}
847
848fn eval_in_set(
849    lhs: &Value,
850    values: &rustc_hash::FxHashSet<Value>,
851    has_null: bool,
852    negated: bool,
853) -> Result<Value> {
854    if values.is_empty() && !has_null {
855        return Ok(Value::Boolean(negated));
856    }
857    if lhs.is_null() {
858        return Ok(Value::Null);
859    }
860    if values.contains(lhs) {
861        return Ok(Value::Boolean(!negated));
862    }
863    if has_null {
864        Ok(Value::Null)
865    } else {
866        Ok(Value::Boolean(negated))
867    }
868}
869
870fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
871    if val.is_null() {
872        return Ok(Value::Null);
873    }
874    match op {
875        UnaryOp::Neg => match val {
876            Value::Integer(i) => i
877                .checked_neg()
878                .map(Value::Integer)
879                .ok_or(SqlError::IntegerOverflow),
880            Value::Real(r) => Ok(Value::Real(-r)),
881            Value::Interval {
882                months,
883                days,
884                micros,
885            } => {
886                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
887                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
888                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
889                Ok(Value::Interval {
890                    months: m,
891                    days: d,
892                    micros: u,
893                })
894            }
895            _ => Err(SqlError::TypeMismatch {
896                expected: "numeric or INTERVAL".into(),
897                got: format!("{}", val.data_type()),
898            }),
899        },
900        UnaryOp::Not => match val {
901            Value::Boolean(b) => Ok(Value::Boolean(!b)),
902            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
903            _ => Err(SqlError::TypeMismatch {
904                expected: "BOOLEAN".into(),
905                got: format!("{}", val.data_type()),
906            }),
907        },
908    }
909}
910
911fn value_to_text(val: &Value) -> String {
912    match val {
913        Value::Text(s) => s.to_string(),
914        Value::Integer(i) => i.to_string(),
915        Value::Real(r) => {
916            if r.fract() == 0.0 && r.is_finite() {
917                format!("{r:.1}")
918            } else {
919                format!("{r}")
920            }
921        }
922        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
923        Value::Null => String::new(),
924        Value::Blob(b) => {
925            let mut s = String::with_capacity(b.len() * 2);
926            for byte in b {
927                s.push_str(&format!("{byte:02X}"));
928            }
929            s
930        }
931        Value::Date(d) => crate::datetime::format_date(*d),
932        Value::Time(t) => crate::datetime::format_time(*t),
933        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
934        Value::Interval {
935            months,
936            days,
937            micros,
938        } => crate::datetime::format_interval(*months, *days, *micros),
939    }
940}
941
942fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
943    if val.is_null() || low.is_null() || high.is_null() {
944        let ge = if val.is_null() || low.is_null() {
945            None
946        } else {
947            Some(*val >= *low)
948        };
949        let le = if val.is_null() || high.is_null() {
950            None
951        } else {
952            Some(*val <= *high)
953        };
954
955        let result = match (ge, le) {
956            (Some(false), _) | (_, Some(false)) => Some(false),
957            (Some(true), Some(true)) => Some(true),
958            _ => None,
959        };
960
961        return match result {
962            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
963            None => Ok(Value::Null),
964        };
965    }
966
967    let in_range = *val >= *low && *val <= *high;
968    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
969}
970
971const MAX_LIKE_PATTERN_LEN: usize = 10_000;
972
973fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
974    if val.is_null() || pattern.is_null() {
975        return Ok(Value::Null);
976    }
977    let text = match val {
978        Value::Text(s) => s.as_str(),
979        _ => {
980            return Err(SqlError::TypeMismatch {
981                expected: "TEXT".into(),
982                got: val.data_type().to_string(),
983            })
984        }
985    };
986    let pat = match pattern {
987        Value::Text(s) => s.as_str(),
988        _ => {
989            return Err(SqlError::TypeMismatch {
990                expected: "TEXT".into(),
991                got: pattern.data_type().to_string(),
992            })
993        }
994    };
995
996    if pat.len() > MAX_LIKE_PATTERN_LEN {
997        return Err(SqlError::InvalidValue(format!(
998            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
999            pat.len()
1000        )));
1001    }
1002
1003    let esc_char = match escape {
1004        Some(Value::Text(s)) => {
1005            let mut chars = s.chars();
1006            let c = chars.next().ok_or_else(|| {
1007                SqlError::InvalidValue("ESCAPE must be a single character".into())
1008            })?;
1009            if chars.next().is_some() {
1010                return Err(SqlError::InvalidValue(
1011                    "ESCAPE must be a single character".into(),
1012                ));
1013            }
1014            Some(c)
1015        }
1016        Some(Value::Null) => return Ok(Value::Null),
1017        Some(_) => {
1018            return Err(SqlError::TypeMismatch {
1019                expected: "TEXT".into(),
1020                got: "non-text".into(),
1021            })
1022        }
1023        None => None,
1024    };
1025
1026    let matched = like_match(text, pat, esc_char);
1027    Ok(Value::Boolean(if negated { !matched } else { matched }))
1028}
1029
1030fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
1031    let t: Vec<char> = text.chars().collect();
1032    let p: Vec<char> = pattern.chars().collect();
1033    like_match_impl(&t, &p, 0, 0, escape)
1034}
1035
1036fn like_match_impl(
1037    t: &[char],
1038    p: &[char],
1039    mut ti: usize,
1040    mut pi: usize,
1041    esc: Option<char>,
1042) -> bool {
1043    let mut star_pi: Option<usize> = None;
1044    let mut star_ti: usize = 0;
1045
1046    while ti < t.len() {
1047        if pi < p.len() {
1048            if let Some(ec) = esc {
1049                if p[pi] == ec && pi + 1 < p.len() {
1050                    pi += 1;
1051                    let pc_lower = p[pi].to_ascii_lowercase();
1052                    let tc_lower = t[ti].to_ascii_lowercase();
1053                    if pc_lower == tc_lower {
1054                        pi += 1;
1055                        ti += 1;
1056                        continue;
1057                    } else if let Some(sp) = star_pi {
1058                        pi = sp + 1;
1059                        star_ti += 1;
1060                        ti = star_ti;
1061                        continue;
1062                    } else {
1063                        return false;
1064                    }
1065                }
1066            }
1067            if p[pi] == '%' {
1068                star_pi = Some(pi);
1069                star_ti = ti;
1070                pi += 1;
1071                continue;
1072            }
1073            if p[pi] == '_' {
1074                pi += 1;
1075                ti += 1;
1076                continue;
1077            }
1078            if p[pi].eq_ignore_ascii_case(&t[ti]) {
1079                pi += 1;
1080                ti += 1;
1081                continue;
1082            }
1083        }
1084        if let Some(sp) = star_pi {
1085            pi = sp + 1;
1086            star_ti += 1;
1087            ti = star_ti;
1088        } else {
1089            return false;
1090        }
1091    }
1092
1093    while pi < p.len() && p[pi] == '%' {
1094        pi += 1;
1095    }
1096    pi == p.len()
1097}
1098
1099fn eval_case(
1100    operand: Option<&Expr>,
1101    conditions: &[(Expr, Expr)],
1102    else_result: Option<&Expr>,
1103    ctx: &EvalCtx,
1104) -> Result<Value> {
1105    if let Some(op_expr) = operand {
1106        let op_val = eval_expr(op_expr, ctx)?;
1107        for (cond, result) in conditions {
1108            let cond_val = eval_expr(cond, ctx)?;
1109            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
1110                return eval_expr(result, ctx);
1111            }
1112        }
1113    } else {
1114        for (cond, result) in conditions {
1115            let cond_val = eval_expr(cond, ctx)?;
1116            if is_truthy(&cond_val) {
1117                return eval_expr(result, ctx);
1118            }
1119        }
1120    }
1121    match else_result {
1122        Some(e) => eval_expr(e, ctx),
1123        None => Ok(Value::Null),
1124    }
1125}
1126
1127fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
1128    if val.is_null() {
1129        return Ok(Value::Null);
1130    }
1131    match target {
1132        DataType::Integer => match val {
1133            Value::Integer(_) => Ok(val.clone()),
1134            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1135            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1136            Value::Text(s) => s
1137                .trim()
1138                .parse::<i64>()
1139                .map(Value::Integer)
1140                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1141                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1142            _ => Err(SqlError::InvalidValue(format!(
1143                "cannot cast {} to INTEGER",
1144                val.data_type()
1145            ))),
1146        },
1147        DataType::Real => match val {
1148            Value::Real(_) => Ok(val.clone()),
1149            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1150            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1151            Value::Text(s) => s
1152                .trim()
1153                .parse::<f64>()
1154                .map(Value::Real)
1155                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1156            _ => Err(SqlError::InvalidValue(format!(
1157                "cannot cast {} to REAL",
1158                val.data_type()
1159            ))),
1160        },
1161        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1162        DataType::Boolean => match val {
1163            Value::Boolean(_) => Ok(val.clone()),
1164            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1165            Value::Text(s) => {
1166                let lower = s.trim().to_ascii_lowercase();
1167                match lower.as_str() {
1168                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1169                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1170                    _ => Err(SqlError::InvalidValue(format!(
1171                        "cannot cast '{s}' to BOOLEAN"
1172                    ))),
1173                }
1174            }
1175            _ => Err(SqlError::InvalidValue(format!(
1176                "cannot cast {} to BOOLEAN",
1177                val.data_type()
1178            ))),
1179        },
1180        DataType::Blob => match val {
1181            Value::Blob(_) => Ok(val.clone()),
1182            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1183            _ => Err(SqlError::InvalidValue(format!(
1184                "cannot cast {} to BLOB",
1185                val.data_type()
1186            ))),
1187        },
1188        DataType::Null => Ok(Value::Null),
1189        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1190            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1191        }),
1192        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1193            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1194        }),
1195        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1196            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1197        }),
1198        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1199            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1200        }),
1201    }
1202}
1203
1204fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1205    let evaluated: Vec<Value> = args
1206        .iter()
1207        .map(|a| eval_expr(a, ctx))
1208        .collect::<Result<Vec<_>>>()?;
1209
1210    match name {
1211        "LENGTH" => {
1212            check_args(name, &evaluated, 1)?;
1213            match &evaluated[0] {
1214                Value::Null => Ok(Value::Null),
1215                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1216                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1217                _ => Ok(Value::Integer(
1218                    value_to_text(&evaluated[0]).chars().count() as i64
1219                )),
1220            }
1221        }
1222        "UPPER" => {
1223            check_args(name, &evaluated, 1)?;
1224            match &evaluated[0] {
1225                Value::Null => Ok(Value::Null),
1226                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1227                _ => Ok(Value::Text(
1228                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1229                )),
1230            }
1231        }
1232        "LOWER" => {
1233            check_args(name, &evaluated, 1)?;
1234            match &evaluated[0] {
1235                Value::Null => Ok(Value::Null),
1236                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1237                _ => Ok(Value::Text(
1238                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1239                )),
1240            }
1241        }
1242        "SUBSTR" | "SUBSTRING" => {
1243            if evaluated.len() < 2 || evaluated.len() > 3 {
1244                return Err(SqlError::InvalidValue(format!(
1245                    "{name} requires 2 or 3 arguments"
1246                )));
1247            }
1248            if evaluated.iter().any(|v| v.is_null()) {
1249                return Ok(Value::Null);
1250            }
1251            let s = value_to_text(&evaluated[0]);
1252            let chars: Vec<char> = s.chars().collect();
1253            let start = match &evaluated[1] {
1254                Value::Integer(i) => *i,
1255                _ => {
1256                    return Err(SqlError::TypeMismatch {
1257                        expected: "INTEGER".into(),
1258                        got: evaluated[1].data_type().to_string(),
1259                    })
1260                }
1261            };
1262            let len = chars.len() as i64;
1263
1264            let (begin, count) = if evaluated.len() == 3 {
1265                let cnt = match &evaluated[2] {
1266                    Value::Integer(i) => *i,
1267                    _ => {
1268                        return Err(SqlError::TypeMismatch {
1269                            expected: "INTEGER".into(),
1270                            got: evaluated[2].data_type().to_string(),
1271                        })
1272                    }
1273                };
1274                if start >= 1 {
1275                    let b = (start - 1).min(len) as usize;
1276                    let c = cnt.max(0) as usize;
1277                    (b, c)
1278                } else if start == 0 {
1279                    let c = (cnt - 1).max(0) as usize;
1280                    (0usize, c)
1281                } else {
1282                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1283                    (0usize, adjusted_cnt)
1284                }
1285            } else if start >= 1 {
1286                let b = (start - 1).min(len) as usize;
1287                (b, chars.len() - b)
1288            } else if start == 0 {
1289                (0usize, chars.len())
1290            } else {
1291                let b = (len + start).max(0) as usize;
1292                (b, chars.len() - b)
1293            };
1294
1295            let result: String = chars.iter().skip(begin).take(count).collect();
1296            Ok(Value::Text(result.into()))
1297        }
1298        "TRIM" | "LTRIM" | "RTRIM" => {
1299            if evaluated.is_empty() || evaluated.len() > 2 {
1300                return Err(SqlError::InvalidValue(format!(
1301                    "{name} requires 1 or 2 arguments"
1302                )));
1303            }
1304            if evaluated[0].is_null() {
1305                return Ok(Value::Null);
1306            }
1307            let s = value_to_text(&evaluated[0]);
1308            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1309                if evaluated[1].is_null() {
1310                    return Ok(Value::Null);
1311                }
1312                value_to_text(&evaluated[1]).chars().collect()
1313            } else {
1314                vec![' ']
1315            };
1316            let result = match name {
1317                "TRIM" => s
1318                    .trim_matches(|c: char| trim_chars.contains(&c))
1319                    .to_string(),
1320                "LTRIM" => s
1321                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1322                    .to_string(),
1323                "RTRIM" => s
1324                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1325                    .to_string(),
1326                _ => unreachable!(),
1327            };
1328            Ok(Value::Text(result.into()))
1329        }
1330        "REPLACE" => {
1331            check_args(name, &evaluated, 3)?;
1332            if evaluated.iter().any(|v| v.is_null()) {
1333                return Ok(Value::Null);
1334            }
1335            let s = value_to_text(&evaluated[0]);
1336            let from = value_to_text(&evaluated[1]);
1337            let to = value_to_text(&evaluated[2]);
1338            if from.is_empty() {
1339                return Ok(Value::Text(s.into()));
1340            }
1341            Ok(Value::Text(s.replace(&from, &to).into()))
1342        }
1343        "INSTR" => {
1344            check_args(name, &evaluated, 2)?;
1345            if evaluated.iter().any(|v| v.is_null()) {
1346                return Ok(Value::Null);
1347            }
1348            let haystack = value_to_text(&evaluated[0]);
1349            let needle = value_to_text(&evaluated[1]);
1350            let pos = haystack
1351                .find(&needle)
1352                .map(|i| haystack[..i].chars().count() as i64 + 1)
1353                .unwrap_or(0);
1354            Ok(Value::Integer(pos))
1355        }
1356        "CONCAT" => {
1357            if evaluated.is_empty() {
1358                return Ok(Value::Text(CompactString::default()));
1359            }
1360            let mut result = String::new();
1361            for v in &evaluated {
1362                match v {
1363                    Value::Null => {}
1364                    _ => result.push_str(&value_to_text(v)),
1365                }
1366            }
1367            Ok(Value::Text(result.into()))
1368        }
1369        "ABS" => {
1370            check_args(name, &evaluated, 1)?;
1371            match &evaluated[0] {
1372                Value::Null => Ok(Value::Null),
1373                Value::Integer(i) => i
1374                    .checked_abs()
1375                    .map(Value::Integer)
1376                    .ok_or(SqlError::IntegerOverflow),
1377                Value::Real(r) => Ok(Value::Real(r.abs())),
1378                _ => Err(SqlError::TypeMismatch {
1379                    expected: "numeric".into(),
1380                    got: evaluated[0].data_type().to_string(),
1381                }),
1382            }
1383        }
1384        "ROUND" => {
1385            if evaluated.is_empty() || evaluated.len() > 2 {
1386                return Err(SqlError::InvalidValue(
1387                    "ROUND requires 1 or 2 arguments".into(),
1388                ));
1389            }
1390            if evaluated[0].is_null() {
1391                return Ok(Value::Null);
1392            }
1393            let val = match &evaluated[0] {
1394                Value::Integer(i) => *i as f64,
1395                Value::Real(r) => *r,
1396                _ => {
1397                    return Err(SqlError::TypeMismatch {
1398                        expected: "numeric".into(),
1399                        got: evaluated[0].data_type().to_string(),
1400                    })
1401                }
1402            };
1403            let places = if evaluated.len() == 2 {
1404                match &evaluated[1] {
1405                    Value::Null => return Ok(Value::Null),
1406                    Value::Integer(i) => *i,
1407                    _ => {
1408                        return Err(SqlError::TypeMismatch {
1409                            expected: "INTEGER".into(),
1410                            got: evaluated[1].data_type().to_string(),
1411                        })
1412                    }
1413                }
1414            } else {
1415                0
1416            };
1417            let factor = 10f64.powi(places as i32);
1418            let rounded = (val * factor).round() / factor;
1419            Ok(Value::Real(rounded))
1420        }
1421        "CEIL" | "CEILING" => {
1422            check_args(name, &evaluated, 1)?;
1423            match &evaluated[0] {
1424                Value::Null => Ok(Value::Null),
1425                Value::Integer(i) => Ok(Value::Integer(*i)),
1426                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1427                _ => Err(SqlError::TypeMismatch {
1428                    expected: "numeric".into(),
1429                    got: evaluated[0].data_type().to_string(),
1430                }),
1431            }
1432        }
1433        "FLOOR" => {
1434            check_args(name, &evaluated, 1)?;
1435            match &evaluated[0] {
1436                Value::Null => Ok(Value::Null),
1437                Value::Integer(i) => Ok(Value::Integer(*i)),
1438                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1439                _ => Err(SqlError::TypeMismatch {
1440                    expected: "numeric".into(),
1441                    got: evaluated[0].data_type().to_string(),
1442                }),
1443            }
1444        }
1445        "SIGN" => {
1446            check_args(name, &evaluated, 1)?;
1447            match &evaluated[0] {
1448                Value::Null => Ok(Value::Null),
1449                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1450                Value::Real(r) => {
1451                    if *r > 0.0 {
1452                        Ok(Value::Integer(1))
1453                    } else if *r < 0.0 {
1454                        Ok(Value::Integer(-1))
1455                    } else {
1456                        Ok(Value::Integer(0))
1457                    }
1458                }
1459                _ => Err(SqlError::TypeMismatch {
1460                    expected: "numeric".into(),
1461                    got: evaluated[0].data_type().to_string(),
1462                }),
1463            }
1464        }
1465        "SQRT" => {
1466            check_args(name, &evaluated, 1)?;
1467            match &evaluated[0] {
1468                Value::Null => Ok(Value::Null),
1469                Value::Integer(i) => {
1470                    if *i < 0 {
1471                        Ok(Value::Null)
1472                    } else {
1473                        Ok(Value::Real((*i as f64).sqrt()))
1474                    }
1475                }
1476                Value::Real(r) => {
1477                    if *r < 0.0 {
1478                        Ok(Value::Null)
1479                    } else {
1480                        Ok(Value::Real(r.sqrt()))
1481                    }
1482                }
1483                _ => Err(SqlError::TypeMismatch {
1484                    expected: "numeric".into(),
1485                    got: evaluated[0].data_type().to_string(),
1486                }),
1487            }
1488        }
1489        "RANDOM" => {
1490            check_args(name, &evaluated, 0)?;
1491            use std::collections::hash_map::DefaultHasher;
1492            use std::hash::{Hash, Hasher};
1493            use std::time::SystemTime;
1494            let mut hasher = DefaultHasher::new();
1495            SystemTime::now().hash(&mut hasher);
1496            std::thread::current().id().hash(&mut hasher);
1497            let mut val = hasher.finish() as i64;
1498            if val == i64::MIN {
1499                val = i64::MAX;
1500            }
1501            Ok(Value::Integer(val))
1502        }
1503        "TYPEOF" => {
1504            check_args(name, &evaluated, 1)?;
1505            let type_name = match &evaluated[0] {
1506                Value::Null => "null",
1507                Value::Integer(_) => "integer",
1508                Value::Real(_) => "real",
1509                Value::Text(_) => "text",
1510                Value::Blob(_) => "blob",
1511                Value::Boolean(_) => "boolean",
1512                Value::Date(_) => "date",
1513                Value::Time(_) => "time",
1514                Value::Timestamp(_) => "timestamp",
1515                Value::Interval { .. } => "interval",
1516            };
1517            Ok(Value::Text(type_name.into()))
1518        }
1519        "MIN" => {
1520            check_args(name, &evaluated, 2)?;
1521            if evaluated[0].is_null() {
1522                return Ok(evaluated[1].clone());
1523            }
1524            if evaluated[1].is_null() {
1525                return Ok(evaluated[0].clone());
1526            }
1527            if evaluated[0] <= evaluated[1] {
1528                Ok(evaluated[0].clone())
1529            } else {
1530                Ok(evaluated[1].clone())
1531            }
1532        }
1533        "MAX" => {
1534            check_args(name, &evaluated, 2)?;
1535            if evaluated[0].is_null() {
1536                return Ok(evaluated[1].clone());
1537            }
1538            if evaluated[1].is_null() {
1539                return Ok(evaluated[0].clone());
1540            }
1541            if evaluated[0] >= evaluated[1] {
1542                Ok(evaluated[0].clone())
1543            } else {
1544                Ok(evaluated[1].clone())
1545            }
1546        }
1547        "HEX" => {
1548            check_args(name, &evaluated, 1)?;
1549            match &evaluated[0] {
1550                Value::Null => Ok(Value::Null),
1551                Value::Blob(b) => {
1552                    let mut s = String::with_capacity(b.len() * 2);
1553                    for byte in b {
1554                        s.push_str(&format!("{byte:02X}"));
1555                    }
1556                    Ok(Value::Text(s.into()))
1557                }
1558                Value::Text(s) => {
1559                    let mut r = String::with_capacity(s.len() * 2);
1560                    for byte in s.as_bytes() {
1561                        r.push_str(&format!("{byte:02X}"));
1562                    }
1563                    Ok(Value::Text(r.into()))
1564                }
1565                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
1566            }
1567        }
1568        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
1569            check_args(name, &evaluated, 0)?;
1570            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
1571        }
1572        "CURRENT_DATE" => {
1573            check_args(name, &evaluated, 0)?;
1574            Ok(Value::Date(crate::datetime::ts_to_date_floor(
1575                crate::datetime::txn_or_clock_micros(),
1576            )))
1577        }
1578        "CURRENT_TIME" | "LOCALTIME" => {
1579            check_args(name, &evaluated, 0)?;
1580            Ok(Value::Time(
1581                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
1582            ))
1583        }
1584        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
1585            check_args(name, &evaluated, 0)?;
1586            let ts = match name {
1587                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
1588                _ => crate::datetime::txn_or_clock_micros(),
1589            };
1590            Ok(Value::Timestamp(ts))
1591        }
1592        "EXTRACT" | "DATE_PART" | "DATEPART" => {
1593            check_args(name, &evaluated, 2)?;
1594            // Borrow the field str without allocating; datetime::extract accepts &str.
1595            let field: &str = match &evaluated[0] {
1596                Value::Null => return Ok(Value::Null),
1597                Value::Text(s) => s.as_str(),
1598                _ => {
1599                    return Err(SqlError::TypeMismatch {
1600                        expected: "TEXT field name".into(),
1601                        got: evaluated[0].data_type().to_string(),
1602                    })
1603                }
1604            };
1605            if evaluated[1].is_null() {
1606                return Ok(Value::Null);
1607            }
1608            crate::datetime::extract(field, &evaluated[1])
1609        }
1610        "DATE_TRUNC" => {
1611            if evaluated.len() < 2 || evaluated.len() > 3 {
1612                return Err(SqlError::InvalidValue(
1613                    "DATE_TRUNC requires 2 or 3 arguments".into(),
1614                ));
1615            }
1616            let unit = match &evaluated[0] {
1617                Value::Null => return Ok(Value::Null),
1618                Value::Text(s) => s.to_string(),
1619                _ => {
1620                    return Err(SqlError::TypeMismatch {
1621                        expected: "TEXT unit name".into(),
1622                        got: evaluated[0].data_type().to_string(),
1623                    })
1624                }
1625            };
1626            if evaluated[1].is_null() {
1627                return Ok(Value::Null);
1628            }
1629            // Optional tz arg: truncate in that zone, then convert back to UTC.
1630            if evaluated.len() == 3 {
1631                if let Value::Text(tz) = &evaluated[2] {
1632                    if !tz.eq_ignore_ascii_case("UTC") {
1633                        if let Value::Timestamp(ts) = &evaluated[1] {
1634                            return date_trunc_in_zone(&unit, *ts, tz);
1635                        }
1636                    }
1637                }
1638            }
1639            crate::datetime::date_trunc(&unit, &evaluated[1])
1640        }
1641        "DATE_BIN" => {
1642            check_args(name, &evaluated, 3)?;
1643            if evaluated.iter().any(|v| v.is_null()) {
1644                return Ok(Value::Null);
1645            }
1646            let stride = match &evaluated[0] {
1647                Value::Interval {
1648                    months: _,
1649                    days,
1650                    micros,
1651                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
1652                _ => {
1653                    return Err(SqlError::TypeMismatch {
1654                        expected: "INTERVAL stride".into(),
1655                        got: evaluated[0].data_type().to_string(),
1656                    })
1657                }
1658            };
1659            if stride <= 0 {
1660                return Err(SqlError::InvalidValue(
1661                    "DATE_BIN stride must be positive".into(),
1662                ));
1663            }
1664            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
1665                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
1666                _ => {
1667                    return Err(SqlError::TypeMismatch {
1668                        expected: "TIMESTAMP, TIMESTAMP".into(),
1669                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
1670                    })
1671                }
1672            };
1673            let diff = src - origin;
1674            let binned = origin + (diff.div_euclid(stride)) * stride;
1675            Ok(Value::Timestamp(binned))
1676        }
1677        "AGE" => {
1678            if evaluated.len() == 1 {
1679                if evaluated[0].is_null() {
1680                    return Ok(Value::Null);
1681                }
1682                let ts = match &evaluated[0] {
1683                    Value::Timestamp(t) => *t,
1684                    Value::Date(d) => crate::datetime::date_to_ts(*d),
1685                    _ => {
1686                        return Err(SqlError::TypeMismatch {
1687                            expected: "TIMESTAMP or DATE".into(),
1688                            got: evaluated[0].data_type().to_string(),
1689                        })
1690                    }
1691                };
1692                // Implicit reference: today at midnight UTC.
1693                let today = crate::datetime::today_days();
1694                let midnight = crate::datetime::date_to_ts(today);
1695                let (m, d, u) = crate::datetime::age(midnight, ts)?;
1696                return Ok(Value::Interval {
1697                    months: m,
1698                    days: d,
1699                    micros: u,
1700                });
1701            }
1702            check_args(name, &evaluated, 2)?;
1703            if evaluated.iter().any(|v| v.is_null()) {
1704                return Ok(Value::Null);
1705            }
1706            let a = ts_of(&evaluated[0])?;
1707            let b = ts_of(&evaluated[1])?;
1708            let (m, d, u) = crate::datetime::age(a, b)?;
1709            Ok(Value::Interval {
1710                months: m,
1711                days: d,
1712                micros: u,
1713            })
1714        }
1715        "MAKE_DATE" => {
1716            check_args(name, &evaluated, 3)?;
1717            if evaluated.iter().any(|v| v.is_null()) {
1718                return Ok(Value::Null);
1719            }
1720            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
1721            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
1722            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
1723            crate::datetime::ymd_to_days(y, m, d)
1724                .map(Value::Date)
1725                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
1726        }
1727        "MAKE_TIME" => {
1728            check_args(name, &evaluated, 3)?;
1729            if evaluated.iter().any(|v| v.is_null()) {
1730                return Ok(Value::Null);
1731            }
1732            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
1733            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
1734            let (s, us) = real_sec_arg(&evaluated[2])?;
1735            crate::datetime::hmsn_to_micros(h, mi, s, us)
1736                .map(Value::Time)
1737                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
1738        }
1739        "MAKE_TIMESTAMP" => {
1740            check_args(name, &evaluated, 6)?;
1741            if evaluated.iter().any(|v| v.is_null()) {
1742                return Ok(Value::Null);
1743            }
1744            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
1745            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
1746            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
1747            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
1748            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
1749            let (s, us) = real_sec_arg(&evaluated[5])?;
1750            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
1751                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
1752            })?;
1753            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
1754                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
1755            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
1756        }
1757        "MAKE_INTERVAL" => {
1758            // Positional args: years, months, weeks, days, hours, mins, secs.
1759            if evaluated.len() > 7 {
1760                return Err(SqlError::InvalidValue(
1761                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
1762                ));
1763            }
1764            let mut months: i64 = 0;
1765            let mut days: i64 = 0;
1766            let mut micros: i64 = 0;
1767            for (i, v) in evaluated.iter().enumerate() {
1768                if v.is_null() {
1769                    continue;
1770                }
1771                let n = match v {
1772                    Value::Integer(n) => *n,
1773                    Value::Real(r) => *r as i64,
1774                    _ => {
1775                        return Err(SqlError::TypeMismatch {
1776                            expected: "numeric".into(),
1777                            got: v.data_type().to_string(),
1778                        })
1779                    }
1780                };
1781                match i {
1782                    0 => months = months.saturating_add(n.saturating_mul(12)),
1783                    1 => months = months.saturating_add(n),
1784                    2 => days = days.saturating_add(n.saturating_mul(7)),
1785                    3 => days = days.saturating_add(n),
1786                    4 => {
1787                        micros = micros
1788                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
1789                    }
1790                    5 => {
1791                        micros =
1792                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
1793                    }
1794                    6 => {
1795                        // Seconds may be fractional — also check Real.
1796                        if let Value::Real(r) = v {
1797                            micros = micros.saturating_add(
1798                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
1799                            );
1800                        } else {
1801                            micros = micros
1802                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
1803                        }
1804                    }
1805                    _ => unreachable!(),
1806                }
1807            }
1808            Ok(Value::Interval {
1809                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1810                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1811                micros,
1812            })
1813        }
1814        "JUSTIFY_DAYS" => {
1815            check_args(name, &evaluated, 1)?;
1816            match &evaluated[0] {
1817                Value::Null => Ok(Value::Null),
1818                Value::Interval {
1819                    months,
1820                    days,
1821                    micros,
1822                } => {
1823                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
1824                    Ok(Value::Interval {
1825                        months: m,
1826                        days: d,
1827                        micros: u,
1828                    })
1829                }
1830                other => Err(SqlError::TypeMismatch {
1831                    expected: "INTERVAL".into(),
1832                    got: other.data_type().to_string(),
1833                }),
1834            }
1835        }
1836        "JUSTIFY_HOURS" => {
1837            check_args(name, &evaluated, 1)?;
1838            match &evaluated[0] {
1839                Value::Null => Ok(Value::Null),
1840                Value::Interval {
1841                    months,
1842                    days,
1843                    micros,
1844                } => {
1845                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
1846                    Ok(Value::Interval {
1847                        months: m,
1848                        days: d,
1849                        micros: u,
1850                    })
1851                }
1852                other => Err(SqlError::TypeMismatch {
1853                    expected: "INTERVAL".into(),
1854                    got: other.data_type().to_string(),
1855                }),
1856            }
1857        }
1858        "JUSTIFY_INTERVAL" => {
1859            check_args(name, &evaluated, 1)?;
1860            match &evaluated[0] {
1861                Value::Null => Ok(Value::Null),
1862                Value::Interval {
1863                    months,
1864                    days,
1865                    micros,
1866                } => {
1867                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
1868                    Ok(Value::Interval {
1869                        months: m,
1870                        days: d,
1871                        micros: u,
1872                    })
1873                }
1874                other => Err(SqlError::TypeMismatch {
1875                    expected: "INTERVAL".into(),
1876                    got: other.data_type().to_string(),
1877                }),
1878            }
1879        }
1880        "ISFINITE" => {
1881            check_args(name, &evaluated, 1)?;
1882            if evaluated[0].is_null() {
1883                return Ok(Value::Null);
1884            }
1885            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
1886        }
1887        "DATE" => {
1888            if evaluated.is_empty() {
1889                return Err(SqlError::InvalidValue(
1890                    "DATE requires at least 1 argument".into(),
1891                ));
1892            }
1893            if evaluated[0].is_null() {
1894                return Ok(Value::Null);
1895            }
1896            let d = match &evaluated[0] {
1897                Value::Date(d) => *d,
1898                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
1899                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
1900                Value::Text(s) => crate::datetime::parse_date(s)?,
1901                Value::Integer(n) => {
1902                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
1903                }
1904                other => {
1905                    return Err(SqlError::TypeMismatch {
1906                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1907                        got: other.data_type().to_string(),
1908                    })
1909                }
1910            };
1911            Ok(Value::Date(d))
1912        }
1913        "TIME" => {
1914            if evaluated.is_empty() {
1915                return Err(SqlError::InvalidValue(
1916                    "TIME requires at least 1 argument".into(),
1917                ));
1918            }
1919            if evaluated[0].is_null() {
1920                return Ok(Value::Null);
1921            }
1922            let t = match &evaluated[0] {
1923                Value::Time(t) => *t,
1924                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
1925                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
1926                    crate::datetime::current_time_micros()
1927                }
1928                Value::Text(s) => crate::datetime::parse_time(s)?,
1929                other => {
1930                    return Err(SqlError::TypeMismatch {
1931                        expected: "TIMESTAMP, TIME, or TEXT".into(),
1932                        got: other.data_type().to_string(),
1933                    })
1934                }
1935            };
1936            Ok(Value::Time(t))
1937        }
1938        "DATETIME" => {
1939            if evaluated.is_empty() {
1940                return Err(SqlError::InvalidValue(
1941                    "DATETIME requires at least 1 argument".into(),
1942                ));
1943            }
1944            if evaluated[0].is_null() {
1945                return Ok(Value::Null);
1946            }
1947            let t = match &evaluated[0] {
1948                Value::Timestamp(t) => *t,
1949                Value::Date(d) => crate::datetime::date_to_ts(*d),
1950                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
1951                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
1952                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
1953                other => {
1954                    return Err(SqlError::TypeMismatch {
1955                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1956                        got: other.data_type().to_string(),
1957                    })
1958                }
1959            };
1960            Ok(Value::Timestamp(t))
1961        }
1962        "STRFTIME" => {
1963            if evaluated.len() < 2 {
1964                return Err(SqlError::InvalidValue(
1965                    "STRFTIME requires format + value".into(),
1966                ));
1967            }
1968            if evaluated.iter().take(2).any(|v| v.is_null()) {
1969                return Ok(Value::Null);
1970            }
1971            let fmt = match &evaluated[0] {
1972                Value::Text(s) => s.to_string(),
1973                _ => {
1974                    return Err(SqlError::TypeMismatch {
1975                        expected: "TEXT format".into(),
1976                        got: evaluated[0].data_type().to_string(),
1977                    })
1978                }
1979            };
1980            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
1981            Ok(Value::Text(out.into()))
1982        }
1983        "JULIANDAY" => {
1984            if evaluated.is_empty() {
1985                return Err(SqlError::InvalidValue(
1986                    "JULIANDAY requires at least 1 argument".into(),
1987                ));
1988            }
1989            if evaluated[0].is_null() {
1990                return Ok(Value::Null);
1991            }
1992            let micros = ts_of(&evaluated[0])?;
1993            let (days, tmicros) = crate::datetime::ts_split(micros);
1994            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
1995            let julian =
1996                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
1997            Ok(Value::Real(julian))
1998        }
1999        "UNIXEPOCH" => {
2000            if evaluated.is_empty() {
2001                return Err(SqlError::InvalidValue(
2002                    "UNIXEPOCH requires at least 1 argument".into(),
2003                ));
2004            }
2005            if evaluated[0].is_null() {
2006                return Ok(Value::Null);
2007            }
2008            let micros = ts_of(&evaluated[0])?;
2009            let subsec = evaluated
2010                .get(1)
2011                .and_then(|v| {
2012                    if let Value::Text(s) = v {
2013                        Some(s.to_string())
2014                    } else {
2015                        None
2016                    }
2017                })
2018                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
2019                .unwrap_or(false);
2020            if subsec {
2021                Ok(Value::Real(
2022                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
2023                ))
2024            } else {
2025                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
2026            }
2027        }
2028        "TIMEDIFF" => {
2029            check_args(name, &evaluated, 2)?;
2030            if evaluated.iter().any(|v| v.is_null()) {
2031                return Ok(Value::Null);
2032            }
2033            let a = ts_of(&evaluated[0])?;
2034            let b = ts_of(&evaluated[1])?;
2035            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
2036            let sign = if days < 0 || (days == 0 && micros < 0) {
2037                "-"
2038            } else {
2039                "+"
2040            };
2041            let abs_days = days.unsigned_abs() as i64;
2042            let abs_us = micros.unsigned_abs() as i64;
2043            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
2044            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
2045            Ok(Value::Text(
2046                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
2047            ))
2048        }
2049        "AT_TIMEZONE" => {
2050            check_args(name, &evaluated, 2)?;
2051            if evaluated.iter().any(|v| v.is_null()) {
2052                return Ok(Value::Null);
2053            }
2054            let ts = match &evaluated[0] {
2055                Value::Timestamp(t) => *t,
2056                Value::Date(d) => crate::datetime::date_to_ts(*d),
2057                other => {
2058                    return Err(SqlError::TypeMismatch {
2059                        expected: "TIMESTAMP or DATE".into(),
2060                        got: other.data_type().to_string(),
2061                    })
2062                }
2063            };
2064            let zone = match &evaluated[1] {
2065                Value::Text(s) => s.to_string(),
2066                _ => {
2067                    return Err(SqlError::TypeMismatch {
2068                        expected: "TEXT time zone".into(),
2069                        got: evaluated[1].data_type().to_string(),
2070                    })
2071                }
2072            };
2073            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
2074            let upper = zone.to_ascii_uppercase();
2075            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
2076                return Err(SqlError::InvalidTimezone(format!(
2077                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
2078                )));
2079            }
2080            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
2081            Ok(Value::Text(formatted.into()))
2082        }
2083        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
2084    }
2085}
2086
2087/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
2088fn ts_of(v: &Value) -> Result<i64> {
2089    match v {
2090        Value::Timestamp(t) => Ok(*t),
2091        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
2092        _ => Err(SqlError::TypeMismatch {
2093            expected: "TIMESTAMP or DATE".into(),
2094            got: v.data_type().to_string(),
2095        }),
2096    }
2097}
2098
2099fn int_arg(v: &Value, label: &str) -> Result<i64> {
2100    match v {
2101        Value::Integer(n) => Ok(*n),
2102        _ => Err(SqlError::TypeMismatch {
2103            expected: format!("INTEGER ({label})"),
2104            got: v.data_type().to_string(),
2105        }),
2106    }
2107}
2108
2109/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
2110fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
2111    match v {
2112        Value::Integer(n) => {
2113            if !(0..=60).contains(n) {
2114                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
2115            }
2116            Ok((*n as u8, 0))
2117        }
2118        Value::Real(r) => {
2119            let whole = r.trunc() as i64;
2120            if !(0..=60).contains(&whole) {
2121                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
2122            }
2123            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
2124            Ok((whole as u8, frac.max(0) as u32))
2125        }
2126        _ => Err(SqlError::TypeMismatch {
2127            expected: "numeric seconds".into(),
2128            got: v.data_type().to_string(),
2129        }),
2130    }
2131}
2132
2133/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
2134fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
2135    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
2136    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
2137    let ts = JTimestamp::from_microsecond(ts_utc)
2138        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
2139    let zoned = ts.to_zoned(zone.clone());
2140    let unit_lower = unit.to_ascii_lowercase();
2141    let rounded = match unit_lower.as_str() {
2142        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
2143        "second" => zoned
2144            .start_of_day()
2145            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
2146        _ => {
2147            let naive_ts = zoned.timestamp().as_microsecond();
2148            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
2149        }
2150    };
2151    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
2152}
2153
2154fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
2155    if args.len() != expected {
2156        Err(SqlError::InvalidValue(format!(
2157            "{name} requires {expected} argument(s), got {}",
2158            args.len()
2159        )))
2160    } else {
2161        Ok(())
2162    }
2163}
2164
2165pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
2166    let mut indices = Vec::new();
2167    collect_column_refs(expr, columns, &mut indices);
2168    indices.sort_unstable();
2169    indices.dedup();
2170    indices
2171}
2172
2173fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
2174    match expr {
2175        Expr::Column(name) => {
2176            for (i, c) in columns.iter().enumerate() {
2177                if c.name == *name
2178                    || (c.name.len() > name.len()
2179                        && c.name.as_bytes()[c.name.len() - name.len() - 1] == b'.'
2180                        && c.name.ends_with(name.as_str()))
2181                {
2182                    out.push(i);
2183                    break;
2184                }
2185            }
2186        }
2187        Expr::QualifiedColumn { table, column } => {
2188            let mut found: Option<usize> = None;
2189            let mut bare_match: Option<usize> = None;
2190            let mut bare_count = 0usize;
2191            for (i, c) in columns.iter().enumerate() {
2192                if c.name.len() == table.len() + 1 + column.len()
2193                    && c.name.as_bytes()[table.len()] == b'.'
2194                    && c.name.starts_with(table.as_str())
2195                    && c.name.ends_with(column.as_str())
2196                {
2197                    found = Some(i);
2198                    break;
2199                }
2200                if c.name == *column {
2201                    bare_match = Some(i);
2202                    bare_count += 1;
2203                }
2204            }
2205            if let Some(idx) = found {
2206                out.push(idx);
2207            } else if bare_count == 1 {
2208                out.push(bare_match.unwrap());
2209            }
2210        }
2211        Expr::BinaryOp { left, right, .. } => {
2212            collect_column_refs(left, columns, out);
2213            collect_column_refs(right, columns, out);
2214        }
2215        Expr::UnaryOp { expr, .. } => {
2216            collect_column_refs(expr, columns, out);
2217        }
2218        Expr::IsNull(e) | Expr::IsNotNull(e) => {
2219            collect_column_refs(e, columns, out);
2220        }
2221        Expr::Function { args, .. } => {
2222            for arg in args {
2223                collect_column_refs(arg, columns, out);
2224            }
2225        }
2226        Expr::InSubquery { expr, .. } => {
2227            collect_column_refs(expr, columns, out);
2228        }
2229        Expr::InList { expr, list, .. } => {
2230            collect_column_refs(expr, columns, out);
2231            for item in list {
2232                collect_column_refs(item, columns, out);
2233            }
2234        }
2235        Expr::InSet { expr, .. } => {
2236            collect_column_refs(expr, columns, out);
2237        }
2238        Expr::Between {
2239            expr, low, high, ..
2240        } => {
2241            collect_column_refs(expr, columns, out);
2242            collect_column_refs(low, columns, out);
2243            collect_column_refs(high, columns, out);
2244        }
2245        Expr::Like {
2246            expr,
2247            pattern,
2248            escape,
2249            ..
2250        } => {
2251            collect_column_refs(expr, columns, out);
2252            collect_column_refs(pattern, columns, out);
2253            if let Some(esc) = escape {
2254                collect_column_refs(esc, columns, out);
2255            }
2256        }
2257        Expr::Case {
2258            operand,
2259            conditions,
2260            else_result,
2261        } => {
2262            if let Some(op) = operand {
2263                collect_column_refs(op, columns, out);
2264            }
2265            for (when, then) in conditions {
2266                collect_column_refs(when, columns, out);
2267                collect_column_refs(then, columns, out);
2268            }
2269            if let Some(e) = else_result {
2270                collect_column_refs(e, columns, out);
2271            }
2272        }
2273        Expr::Coalesce(args) => {
2274            for arg in args {
2275                collect_column_refs(arg, columns, out);
2276            }
2277        }
2278        Expr::Cast { expr, .. } => {
2279            collect_column_refs(expr, columns, out);
2280        }
2281        Expr::Collate { expr, .. } => {
2282            collect_column_refs(expr, columns, out);
2283        }
2284        Expr::WindowFunction { args, spec, .. } => {
2285            for arg in args {
2286                collect_column_refs(arg, columns, out);
2287            }
2288            for pb in &spec.partition_by {
2289                collect_column_refs(pb, columns, out);
2290            }
2291            for ob in &spec.order_by {
2292                collect_column_refs(&ob.expr, columns, out);
2293            }
2294        }
2295        Expr::Literal(_)
2296        | Expr::Parameter(_)
2297        | Expr::CountStar
2298        | Expr::Exists { .. }
2299        | Expr::ScalarSubquery(_) => {}
2300    }
2301}
2302
2303/// Check if an expression result is truthy (for WHERE/HAVING).
2304pub fn is_truthy(val: &Value) -> bool {
2305    match val {
2306        Value::Boolean(b) => *b,
2307        Value::Integer(i) => *i != 0,
2308        Value::Null => false,
2309        _ => true,
2310    }
2311}
2312
2313#[cfg(test)]
2314#[path = "eval_tests.rs"]
2315mod tests;