Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9pub struct ColumnMap {
10    exact: FxHashMap<String, usize>,
11    short: FxHashMap<String, ShortMatch>,
12    collations: Vec<crate::types::Collation>,
13    has_non_binary_collation: bool,
14}
15
16#[derive(Clone)]
17enum ShortMatch {
18    Unique(usize),
19    Ambiguous,
20}
21
22impl Clone for ColumnMap {
23    fn clone(&self) -> Self {
24        Self {
25            exact: self.exact.clone(),
26            short: self.short.clone(),
27            collations: self.collations.clone(),
28            has_non_binary_collation: self.has_non_binary_collation,
29        }
30    }
31}
32
33impl ColumnMap {
34    pub fn new(columns: &[ColumnDef]) -> Self {
35        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
36        let mut short: FxHashMap<String, ShortMatch> =
37            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
38        let mut collations = Vec::with_capacity(columns.len());
39        let mut has_non_binary_collation = false;
40
41        for (i, col) in columns.iter().enumerate() {
42            let lower = col.name.to_ascii_lowercase();
43            exact.insert(lower.clone(), i);
44
45            let unqualified = if let Some(dot) = lower.rfind('.') {
46                &lower[dot + 1..]
47            } else {
48                &lower
49            };
50            short
51                .entry(unqualified.to_string())
52                .and_modify(|e| *e = ShortMatch::Ambiguous)
53                .or_insert(ShortMatch::Unique(i));
54            collations.push(col.collation);
55            if col.collation != crate::types::Collation::Binary {
56                has_non_binary_collation = true;
57            }
58        }
59
60        Self {
61            exact,
62            short,
63            collations,
64            has_non_binary_collation,
65        }
66    }
67
68    pub(crate) fn collation_at(&self, idx: usize) -> crate::types::Collation {
69        self.collations
70            .get(idx)
71            .copied()
72            .unwrap_or(crate::types::Collation::Binary)
73    }
74
75    #[inline]
76    pub(crate) fn has_non_binary_collation(&self) -> bool {
77        self.has_non_binary_collation
78    }
79
80    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
81        if let Some(&idx) = self.exact.get(name) {
82            return Ok(idx);
83        }
84        match self.short.get(name) {
85            Some(ShortMatch::Unique(idx)) => Ok(*idx),
86            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
87            None => Err(SqlError::ColumnNotFound(name.to_string())),
88        }
89    }
90
91    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
92        let qualified = format!("{table}.{column}");
93        if let Some(&idx) = self.exact.get(&qualified) {
94            return Ok(idx);
95        }
96        match self.short.get(column) {
97            Some(ShortMatch::Unique(idx)) => Ok(*idx),
98            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
99        }
100    }
101}
102
103pub struct EvalCtx<'a> {
104    pub col_map: &'a ColumnMap,
105    pub row: &'a [Value],
106    pub params: &'a [Value],
107    pub excluded: Option<ExcludedRow<'a>>,
108    pub old_new: Option<OldNewRows<'a>>,
109}
110
111pub struct ExcludedRow<'a> {
112    pub col_map: &'a ColumnMap,
113    pub row: &'a [Value],
114}
115
116pub struct OldNewRows<'a> {
117    pub col_map: &'a ColumnMap,
118    pub old_row: Option<&'a [Value]>,
119    pub new_row: Option<&'a [Value]>,
120}
121
122impl<'a> EvalCtx<'a> {
123    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
124        Self {
125            col_map,
126            row,
127            params: &[],
128            excluded: None,
129            old_new: None,
130        }
131    }
132
133    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
134        Self {
135            col_map,
136            row,
137            params,
138            excluded: None,
139            old_new: None,
140        }
141    }
142
143    pub fn with_excluded(
144        col_map: &'a ColumnMap,
145        row: &'a [Value],
146        excluded_col_map: &'a ColumnMap,
147        excluded_row: &'a [Value],
148    ) -> Self {
149        Self {
150            col_map,
151            row,
152            params: &[],
153            excluded: Some(ExcludedRow {
154                col_map: excluded_col_map,
155                row: excluded_row,
156            }),
157            old_new: None,
158        }
159    }
160
161    pub fn with_old_new(
162        col_map: &'a ColumnMap,
163        row: &'a [Value],
164        old_row: Option<&'a [Value]>,
165        new_row: Option<&'a [Value]>,
166    ) -> Self {
167        Self {
168            col_map,
169            row,
170            params: &[],
171            excluded: None,
172            old_new: Some(OldNewRows {
173                col_map,
174                old_row,
175                new_row,
176            }),
177        }
178    }
179}
180
181thread_local! {
182    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
183        const { std::cell::Cell::new((std::ptr::null(), 0)) };
184}
185
186/// Install positional parameters for `Expr::Parameter` resolution during `f`.
187pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
188    struct Guard((*const Value, usize));
189    impl Drop for Guard {
190        fn drop(&mut self) {
191            SCOPED_PARAMS.with(|slot| slot.set(self.0));
192        }
193    }
194    SCOPED_PARAMS.with(|slot| {
195        let prev = slot.get();
196        slot.set((params.as_ptr(), params.len()));
197        let _guard = Guard(prev);
198        f()
199    })
200}
201
202fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
203    if !ctx_params.is_empty() {
204        if n == 0 || n > ctx_params.len() {
205            return Err(SqlError::ParameterCountMismatch {
206                expected: n,
207                got: ctx_params.len(),
208            });
209        }
210        return Ok(ctx_params[n - 1].clone());
211    }
212    resolve_scoped_param(n)
213}
214
215pub fn resolve_scoped_param(n: usize) -> Result<Value> {
216    SCOPED_PARAMS.with(|slot| {
217        let (ptr, len) = slot.get();
218        if n == 0 || n > len {
219            return Err(SqlError::ParameterCountMismatch {
220                expected: n,
221                got: len,
222            });
223        }
224        // SAFETY: `with_scoped_params` keeps the slice alive for the duration of `f()`
225        // and restores the previous pointer on return. Reads only happen inside `f()`.
226        unsafe { Ok((*ptr.add(n - 1)).clone()) }
227    })
228}
229
230pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
231    match expr {
232        Expr::Literal(v) => Ok(v.clone()),
233
234        Expr::Column(name) => {
235            let idx = ctx.col_map.resolve(name)?;
236            Ok(ctx.row[idx].clone())
237        }
238
239        Expr::QualifiedColumn { table, column } => {
240            if let Some(excluded) = ctx.excluded.as_ref() {
241                if table.eq_ignore_ascii_case("excluded") {
242                    let lowered = column.to_ascii_lowercase();
243                    let idx = excluded.col_map.resolve(&lowered)?;
244                    return Ok(excluded.row[idx].clone());
245                }
246            }
247            if let Some(on) = ctx.old_new.as_ref() {
248                if table.eq_ignore_ascii_case("old") {
249                    let lowered = column.to_ascii_lowercase();
250                    let idx = on.col_map.resolve(&lowered)?;
251                    return Ok(on.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
252                }
253                if table.eq_ignore_ascii_case("new") {
254                    let lowered = column.to_ascii_lowercase();
255                    let idx = on.col_map.resolve(&lowered)?;
256                    return Ok(on.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
257                }
258            }
259            let idx = ctx.col_map.resolve_qualified(table, column)?;
260            Ok(ctx.row[idx].clone())
261        }
262
263        Expr::BinaryOp { left, op, right } => {
264            let lval = eval_expr(left, ctx)?;
265            let rval = eval_expr(right, ctx)?;
266            let needs_collation_check = ctx.col_map.has_non_binary_collation()
267                || matches!(left.as_ref(), Expr::Collate { .. })
268                || matches!(right.as_ref(), Expr::Collate { .. });
269            if needs_collation_check {
270                let coll = collation_of(left)
271                    .or_else(|| collation_of(right))
272                    .or_else(|| {
273                        column_collation(left, ctx).or_else(|| column_collation(right, ctx))
274                    });
275                if let Some(c) = coll {
276                    if c != crate::types::Collation::Binary {
277                        if let Some(b) = eval_text_compare(&lval, *op, &rval, c) {
278                            return Ok(Value::Boolean(b));
279                        }
280                    }
281                }
282            }
283            eval_binary_op(&lval, *op, &rval)
284        }
285
286        Expr::UnaryOp { op, expr } => {
287            let val = eval_expr(expr, ctx)?;
288            eval_unary_op(*op, &val)
289        }
290
291        Expr::IsNull(e) => {
292            let val = eval_expr(e, ctx)?;
293            Ok(Value::Boolean(val.is_null()))
294        }
295
296        Expr::IsNotNull(e) => {
297            let val = eval_expr(e, ctx)?;
298            Ok(Value::Boolean(!val.is_null()))
299        }
300
301        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
302
303        Expr::CountStar => Err(SqlError::Unsupported(
304            "COUNT(*) in non-aggregate context".into(),
305        )),
306
307        Expr::InList {
308            expr: e,
309            list,
310            negated,
311        } => {
312            let lhs = eval_expr(e, ctx)?;
313            eval_in_values(&lhs, list, ctx, *negated)
314        }
315
316        Expr::InSet {
317            expr: e,
318            values,
319            has_null,
320            negated,
321        } => {
322            let lhs = eval_expr(e, ctx)?;
323            eval_in_set(&lhs, values, *has_null, *negated)
324        }
325
326        Expr::Between {
327            expr: e,
328            low,
329            high,
330            negated,
331        } => {
332            let val = eval_expr(e, ctx)?;
333            let lo = eval_expr(low, ctx)?;
334            let hi = eval_expr(high, ctx)?;
335            eval_between(&val, &lo, &hi, *negated)
336        }
337
338        Expr::Like {
339            expr: e,
340            pattern,
341            escape,
342            negated,
343        } => {
344            let val = eval_expr(e, ctx)?;
345            let pat = eval_expr(pattern, ctx)?;
346            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
347            eval_like(&val, &pat, esc.as_ref(), *negated)
348        }
349
350        Expr::Case {
351            operand,
352            conditions,
353            else_result,
354        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
355
356        Expr::Coalesce(args) => {
357            for arg in args {
358                let val = eval_expr(arg, ctx)?;
359                if !val.is_null() {
360                    return Ok(val);
361                }
362            }
363            Ok(Value::Null)
364        }
365
366        Expr::Cast { expr: e, data_type } => {
367            let val = eval_expr(e, ctx)?;
368            eval_cast(&val, *data_type)
369        }
370
371        Expr::Collate { expr: e, .. } => eval_expr(e, ctx),
372
373        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
374            SqlError::Unsupported("subquery not materialized (internal error)".into()),
375        ),
376
377        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
378
379        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
380            "window functions are only allowed in SELECT columns".into(),
381        )),
382    }
383}
384
385/// Planner-level constant folding hook; shares semantics with row evaluation.
386fn collation_of(expr: &Expr) -> Option<crate::types::Collation> {
387    match expr {
388        Expr::Collate { collation, .. } => Some(*collation),
389        _ => None,
390    }
391}
392
393fn column_collation(expr: &Expr, ctx: &EvalCtx<'_>) -> Option<crate::types::Collation> {
394    match expr {
395        Expr::Column(name) => ctx
396            .col_map
397            .resolve(name)
398            .ok()
399            .map(|i| ctx.col_map.collation_at(i)),
400        Expr::QualifiedColumn { table, column } => ctx
401            .col_map
402            .resolve_qualified(table, column)
403            .ok()
404            .map(|i| ctx.col_map.collation_at(i)),
405        _ => None,
406    }
407}
408
409fn eval_text_compare(
410    left: &Value,
411    op: BinOp,
412    right: &Value,
413    coll: crate::types::Collation,
414) -> Option<bool> {
415    let (a, b) = match (left, right) {
416        (Value::Null, _) | (_, Value::Null) => return None,
417        (Value::Text(a), Value::Text(b)) => (a.as_str(), b.as_str()),
418        _ => return None,
419    };
420    let ord = coll.cmp_text(a, b);
421    Some(match op {
422        BinOp::Eq => ord == std::cmp::Ordering::Equal,
423        BinOp::NotEq => ord != std::cmp::Ordering::Equal,
424        BinOp::Lt => ord == std::cmp::Ordering::Less,
425        BinOp::Gt => ord == std::cmp::Ordering::Greater,
426        BinOp::LtEq => ord != std::cmp::Ordering::Greater,
427        BinOp::GtEq => ord != std::cmp::Ordering::Less,
428        _ => return None,
429    })
430}
431
432pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
433    eval_binary_op(left, op, right)
434}
435
436fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
437    match op {
438        BinOp::And => return eval_and(left, right),
439        BinOp::Or => return eval_or(left, right),
440        _ => {}
441    }
442
443    if left.is_null() || right.is_null() {
444        return Ok(Value::Null);
445    }
446
447    if let Some(res) = eval_temporal_op(left, op, right) {
448        return res;
449    }
450
451    match op {
452        BinOp::Eq => Ok(Value::Boolean(left == right)),
453        BinOp::NotEq => Ok(Value::Boolean(left != right)),
454        BinOp::Lt => Ok(Value::Boolean(left < right)),
455        BinOp::Gt => Ok(Value::Boolean(left > right)),
456        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
457        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
458        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
459        BinOp::Sub => match left {
460            Value::Json(_) | Value::Jsonb(_) => crate::json::op_delete_one(left, right),
461            _ => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
462        },
463        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
464        BinOp::Div => {
465            match right {
466                Value::Integer(0) => return Err(SqlError::DivisionByZero),
467                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
468                _ => {}
469            }
470            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
471        }
472        BinOp::Mod => {
473            match right {
474                Value::Integer(0) => return Err(SqlError::DivisionByZero),
475                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
476                _ => {}
477            }
478            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
479        }
480        BinOp::Concat => match (left, right) {
481            (Value::Json(_) | Value::Jsonb(_), _) | (_, Value::Json(_) | Value::Jsonb(_)) => {
482                crate::json::op_concat(left, right)
483            }
484            _ => {
485                let ls = value_to_text(left);
486                let rs = value_to_text(right);
487                Ok(Value::Text(format!("{ls}{rs}").into()))
488            }
489        },
490        BinOp::JsonGet
491        | BinOp::JsonGetText
492        | BinOp::JsonPath
493        | BinOp::JsonPathText
494        | BinOp::JsonContains
495        | BinOp::JsonContainedBy
496        | BinOp::JsonHasKey
497        | BinOp::JsonHasAnyKey
498        | BinOp::JsonHasAllKeys
499        | BinOp::JsonDeletePath
500        | BinOp::JsonPathExists
501        | BinOp::JsonPathMatch => eval_json_binary_op(left, op, right),
502        BinOp::And | BinOp::Or => unreachable!(),
503    }
504}
505
506#[cold]
507fn eval_json_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
508    match op {
509        BinOp::JsonGet => crate::json::op_get(left, right),
510        BinOp::JsonGetText => crate::json::op_get_text(left, right),
511        BinOp::JsonPath => crate::json::op_path(left, right),
512        BinOp::JsonPathText => crate::json::op_path_text(left, right),
513        BinOp::JsonContains => crate::json::op_contains(left, right),
514        BinOp::JsonContainedBy => crate::json::op_contained_by(left, right),
515        BinOp::JsonHasKey => crate::json::op_has_key(left, right),
516        BinOp::JsonHasAnyKey => crate::json::op_has_any_key(left, right),
517        BinOp::JsonHasAllKeys => crate::json::op_has_all_keys(left, right),
518        BinOp::JsonDeletePath => crate::json::op_delete_path(left, right),
519        BinOp::JsonPathExists => crate::json::op_path_exists(left, right),
520        BinOp::JsonPathMatch => crate::json::op_path_match(left, right),
521        _ => unreachable!(),
522    }
523}
524
525/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
526fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
527    use crate::datetime as dt;
528    use std::cmp::Ordering;
529
530    let is_temporal = |v: &Value| {
531        matches!(
532            v,
533            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
534        )
535    };
536    if !is_temporal(left) && !is_temporal(right) {
537        return None;
538    }
539    if matches!(op, BinOp::Add | BinOp::Sub)
540        && ((is_temporal(left) && matches!(right, Value::Real(_)))
541            || (matches!(left, Value::Real(_)) && is_temporal(right)))
542    {
543        return Some(Err(SqlError::TypeMismatch {
544            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
545            got: format!("{} and {}", left.data_type(), right.data_type()),
546        }));
547    }
548
549    match (left, op, right) {
550        (Value::Date(d), BinOp::Add, Value::Integer(n))
551        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
552            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
553        }
554        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
555            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
556        }
557        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
558            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
559        }
560        // DATE ± INTERVAL → TIMESTAMP (PG rule).
561        (
562            Value::Date(d),
563            BinOp::Add,
564            Value::Interval {
565                months,
566                days,
567                micros,
568            },
569        )
570        | (
571            Value::Interval {
572                months,
573                days,
574                micros,
575            },
576            BinOp::Add,
577            Value::Date(d),
578        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
579        (
580            Value::Date(d),
581            BinOp::Sub,
582            Value::Interval {
583                months,
584                days,
585                micros,
586            },
587        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
588        (
589            Value::Timestamp(t),
590            BinOp::Add,
591            Value::Interval {
592                months,
593                days,
594                micros,
595            },
596        )
597        | (
598            Value::Interval {
599                months,
600                days,
601                micros,
602            },
603            BinOp::Add,
604            Value::Timestamp(t),
605        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
606        (
607            Value::Timestamp(t),
608            BinOp::Sub,
609            Value::Interval {
610                months,
611                days,
612                micros,
613            },
614        ) => Some(
615            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
616        ),
617        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
618            let (days, micros) = dt::subtract_timestamps(*a, *b);
619            Some(Ok(Value::Interval {
620                months: 0,
621                days,
622                micros,
623            }))
624        }
625        (
626            Value::Time(t),
627            BinOp::Add,
628            Value::Interval {
629                months,
630                days,
631                micros,
632            },
633        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
634        (
635            Value::Time(t),
636            BinOp::Sub,
637            Value::Interval {
638                months,
639                days,
640                micros,
641            },
642        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
643        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
644            months: 0,
645            days: 0,
646            micros: *a - *b,
647        })),
648        (
649            Value::Interval {
650                months: am,
651                days: ad,
652                micros: au,
653            },
654            BinOp::Add,
655            Value::Interval {
656                months: bm,
657                days: bd,
658                micros: bu,
659            },
660        ) => Some(Ok(Value::Interval {
661            months: am.saturating_add(*bm),
662            days: ad.saturating_add(*bd),
663            micros: au.saturating_add(*bu),
664        })),
665        (
666            Value::Interval {
667                months: am,
668                days: ad,
669                micros: au,
670            },
671            BinOp::Sub,
672            Value::Interval {
673                months: bm,
674                days: bd,
675                micros: bu,
676            },
677        ) => Some(Ok(Value::Interval {
678            months: am.saturating_sub(*bm),
679            days: ad.saturating_sub(*bd),
680            micros: au.saturating_sub(*bu),
681        })),
682        (
683            Value::Interval {
684                months,
685                days,
686                micros,
687            },
688            BinOp::Mul,
689            Value::Integer(n),
690        )
691        | (
692            Value::Integer(n),
693            BinOp::Mul,
694            Value::Interval {
695                months,
696                days,
697                micros,
698            },
699        ) => {
700            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
701            Some(Ok(Value::Interval {
702                months: months.saturating_mul(n32),
703                days: days.saturating_mul(n32),
704                micros: micros.saturating_mul(*n),
705            }))
706        }
707        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
708        (
709            Value::Interval {
710                months,
711                days,
712                micros,
713            },
714            BinOp::Mul,
715            Value::Real(r),
716        )
717        | (
718            Value::Real(r),
719            BinOp::Mul,
720            Value::Interval {
721                months,
722                days,
723                micros,
724            },
725        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
726        (
727            Value::Interval {
728                months,
729                days,
730                micros,
731            },
732            BinOp::Div,
733            Value::Integer(n),
734        ) if *n != 0 => Some(Ok(Value::Interval {
735            months: (*months as i64 / *n) as i32,
736            days: (*days as i64 / *n) as i32,
737            micros: *micros / *n,
738        })),
739        (
740            Value::Interval {
741                months,
742                days,
743                micros,
744            },
745            BinOp::Div,
746            Value::Real(r),
747        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
748        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
749        (
750            Value::Interval {
751                months: am,
752                days: ad,
753                micros: au,
754            },
755            op,
756            Value::Interval {
757                months: bm,
758                days: bd,
759                micros: bu,
760            },
761        ) if matches!(
762            op,
763            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
764        ) =>
765        {
766            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
767            let b = match op {
768                BinOp::Eq => ord == Ordering::Equal,
769                BinOp::NotEq => ord != Ordering::Equal,
770                BinOp::Lt => ord == Ordering::Less,
771                BinOp::Gt => ord == Ordering::Greater,
772                BinOp::LtEq => ord != Ordering::Greater,
773                BinOp::GtEq => ord != Ordering::Less,
774                _ => unreachable!(),
775            };
776            Some(Ok(Value::Boolean(b)))
777        }
778        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
779        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
780        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
781            Some(Err(SqlError::TypeMismatch {
782                expected: "INTERVAL (use CAST or explicit unit)".into(),
783                got: format!("{} and {}", left.data_type(), right.data_type()),
784            }))
785        }
786        _ => None,
787    }
788}
789
790/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
791fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
792    let raw_months = months as f64 * factor;
793    let whole_months = raw_months.trunc() as i64;
794    let frac_months = raw_months - whole_months as f64;
795    let months_frac_as_days = frac_months * 30.0;
796
797    let raw_days = days as f64 * factor + months_frac_as_days;
798    let whole_days = raw_days.trunc() as i64;
799    let frac_days = raw_days - whole_days as f64;
800    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
801
802    let raw_micros = (micros as f64 * factor).round() as i64;
803    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
804
805    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
806    Value::Interval {
807        months: clamp_i32(whole_months),
808        days: clamp_i32(whole_days),
809        micros: total_micros,
810    }
811}
812
813/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
814fn eval_and(left: &Value, right: &Value) -> Result<Value> {
815    let l = to_bool_or_null(left)?;
816    let r = to_bool_or_null(right)?;
817    match (l, r) {
818        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
819        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
820        _ => Ok(Value::Null),
821    }
822}
823
824/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
825fn eval_or(left: &Value, right: &Value) -> Result<Value> {
826    let l = to_bool_or_null(left)?;
827    let r = to_bool_or_null(right)?;
828    match (l, r) {
829        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
830        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
831        _ => Ok(Value::Null),
832    }
833}
834
835fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
836    match val {
837        Value::Boolean(b) => Ok(Some(*b)),
838        Value::Null => Ok(None),
839        Value::Integer(i) => Ok(Some(*i != 0)),
840        _ => Err(SqlError::TypeMismatch {
841            expected: "BOOLEAN".into(),
842            got: format!("{}", val.data_type()),
843        }),
844    }
845}
846
847fn eval_arithmetic(
848    left: &Value,
849    right: &Value,
850    int_op: fn(i64, i64) -> Option<i64>,
851    real_op: fn(f64, f64) -> f64,
852) -> Result<Value> {
853    match (left, right) {
854        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
855            .map(Value::Integer)
856            .ok_or(SqlError::IntegerOverflow),
857        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
858        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
859        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
860        _ => Err(SqlError::TypeMismatch {
861            expected: "numeric".into(),
862            got: format!("{} and {}", left.data_type(), right.data_type()),
863        }),
864    }
865}
866
867fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
868    if list.is_empty() {
869        return Ok(Value::Boolean(negated));
870    }
871    if lhs.is_null() {
872        return Ok(Value::Null);
873    }
874    let mut has_null = false;
875    for item in list {
876        let rhs = eval_expr(item, ctx)?;
877        if rhs.is_null() {
878            has_null = true;
879        } else if lhs == &rhs {
880            return Ok(Value::Boolean(!negated));
881        }
882    }
883    if has_null {
884        Ok(Value::Null)
885    } else {
886        Ok(Value::Boolean(negated))
887    }
888}
889
890fn eval_in_set(
891    lhs: &Value,
892    values: &rustc_hash::FxHashSet<Value>,
893    has_null: bool,
894    negated: bool,
895) -> Result<Value> {
896    if values.is_empty() && !has_null {
897        return Ok(Value::Boolean(negated));
898    }
899    if lhs.is_null() {
900        return Ok(Value::Null);
901    }
902    if values.contains(lhs) {
903        return Ok(Value::Boolean(!negated));
904    }
905    if has_null {
906        Ok(Value::Null)
907    } else {
908        Ok(Value::Boolean(negated))
909    }
910}
911
912fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
913    if val.is_null() {
914        return Ok(Value::Null);
915    }
916    match op {
917        UnaryOp::Neg => match val {
918            Value::Integer(i) => i
919                .checked_neg()
920                .map(Value::Integer)
921                .ok_or(SqlError::IntegerOverflow),
922            Value::Real(r) => Ok(Value::Real(-r)),
923            Value::Interval {
924                months,
925                days,
926                micros,
927            } => {
928                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
929                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
930                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
931                Ok(Value::Interval {
932                    months: m,
933                    days: d,
934                    micros: u,
935                })
936            }
937            _ => Err(SqlError::TypeMismatch {
938                expected: "numeric or INTERVAL".into(),
939                got: format!("{}", val.data_type()),
940            }),
941        },
942        UnaryOp::Not => match val {
943            Value::Boolean(b) => Ok(Value::Boolean(!b)),
944            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
945            _ => Err(SqlError::TypeMismatch {
946                expected: "BOOLEAN".into(),
947                got: format!("{}", val.data_type()),
948            }),
949        },
950    }
951}
952
953fn value_to_text(val: &Value) -> String {
954    match val {
955        Value::Text(s) => s.to_string(),
956        Value::Integer(i) => i.to_string(),
957        Value::Real(r) => {
958            if r.fract() == 0.0 && r.is_finite() {
959                format!("{r:.1}")
960            } else {
961                format!("{r}")
962            }
963        }
964        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
965        Value::Null => String::new(),
966        Value::Blob(b) => {
967            let mut s = String::with_capacity(b.len() * 2);
968            for byte in b {
969                s.push_str(&format!("{byte:02X}"));
970            }
971            s
972        }
973        Value::Date(d) => crate::datetime::format_date(*d),
974        Value::Time(t) => crate::datetime::format_time(*t),
975        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
976        Value::Interval {
977            months,
978            days,
979            micros,
980        } => crate::datetime::format_interval(*months, *days, *micros),
981        Value::Json(s) => s.to_string(),
982        Value::Jsonb(b) => crate::json::decode_to_text(b).unwrap_or_default(),
983    }
984}
985
986fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
987    if val.is_null() || low.is_null() || high.is_null() {
988        let ge = if val.is_null() || low.is_null() {
989            None
990        } else {
991            Some(*val >= *low)
992        };
993        let le = if val.is_null() || high.is_null() {
994            None
995        } else {
996            Some(*val <= *high)
997        };
998
999        let result = match (ge, le) {
1000            (Some(false), _) | (_, Some(false)) => Some(false),
1001            (Some(true), Some(true)) => Some(true),
1002            _ => None,
1003        };
1004
1005        return match result {
1006            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
1007            None => Ok(Value::Null),
1008        };
1009    }
1010
1011    let in_range = *val >= *low && *val <= *high;
1012    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
1013}
1014
1015const MAX_LIKE_PATTERN_LEN: usize = 10_000;
1016
1017fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
1018    if val.is_null() || pattern.is_null() {
1019        return Ok(Value::Null);
1020    }
1021    let text = match val {
1022        Value::Text(s) => s.as_str(),
1023        _ => {
1024            return Err(SqlError::TypeMismatch {
1025                expected: "TEXT".into(),
1026                got: val.data_type().to_string(),
1027            })
1028        }
1029    };
1030    let pat = match pattern {
1031        Value::Text(s) => s.as_str(),
1032        _ => {
1033            return Err(SqlError::TypeMismatch {
1034                expected: "TEXT".into(),
1035                got: pattern.data_type().to_string(),
1036            })
1037        }
1038    };
1039
1040    if pat.len() > MAX_LIKE_PATTERN_LEN {
1041        return Err(SqlError::InvalidValue(format!(
1042            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
1043            pat.len()
1044        )));
1045    }
1046
1047    let esc_char = match escape {
1048        Some(Value::Text(s)) => {
1049            let mut chars = s.chars();
1050            let c = chars.next().ok_or_else(|| {
1051                SqlError::InvalidValue("ESCAPE must be a single character".into())
1052            })?;
1053            if chars.next().is_some() {
1054                return Err(SqlError::InvalidValue(
1055                    "ESCAPE must be a single character".into(),
1056                ));
1057            }
1058            Some(c)
1059        }
1060        Some(Value::Null) => return Ok(Value::Null),
1061        Some(_) => {
1062            return Err(SqlError::TypeMismatch {
1063                expected: "TEXT".into(),
1064                got: "non-text".into(),
1065            })
1066        }
1067        None => None,
1068    };
1069
1070    let matched = like_match(text, pat, esc_char);
1071    Ok(Value::Boolean(if negated { !matched } else { matched }))
1072}
1073
1074fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
1075    let t: Vec<char> = text.chars().collect();
1076    let p: Vec<char> = pattern.chars().collect();
1077    like_match_impl(&t, &p, 0, 0, escape)
1078}
1079
1080fn like_match_impl(
1081    t: &[char],
1082    p: &[char],
1083    mut ti: usize,
1084    mut pi: usize,
1085    esc: Option<char>,
1086) -> bool {
1087    let mut star_pi: Option<usize> = None;
1088    let mut star_ti: usize = 0;
1089
1090    while ti < t.len() {
1091        if pi < p.len() {
1092            if let Some(ec) = esc {
1093                if p[pi] == ec && pi + 1 < p.len() {
1094                    pi += 1;
1095                    let pc_lower = p[pi].to_ascii_lowercase();
1096                    let tc_lower = t[ti].to_ascii_lowercase();
1097                    if pc_lower == tc_lower {
1098                        pi += 1;
1099                        ti += 1;
1100                        continue;
1101                    } else if let Some(sp) = star_pi {
1102                        pi = sp + 1;
1103                        star_ti += 1;
1104                        ti = star_ti;
1105                        continue;
1106                    } else {
1107                        return false;
1108                    }
1109                }
1110            }
1111            if p[pi] == '%' {
1112                star_pi = Some(pi);
1113                star_ti = ti;
1114                pi += 1;
1115                continue;
1116            }
1117            if p[pi] == '_' {
1118                pi += 1;
1119                ti += 1;
1120                continue;
1121            }
1122            if p[pi].eq_ignore_ascii_case(&t[ti]) {
1123                pi += 1;
1124                ti += 1;
1125                continue;
1126            }
1127        }
1128        if let Some(sp) = star_pi {
1129            pi = sp + 1;
1130            star_ti += 1;
1131            ti = star_ti;
1132        } else {
1133            return false;
1134        }
1135    }
1136
1137    while pi < p.len() && p[pi] == '%' {
1138        pi += 1;
1139    }
1140    pi == p.len()
1141}
1142
1143fn eval_case(
1144    operand: Option<&Expr>,
1145    conditions: &[(Expr, Expr)],
1146    else_result: Option<&Expr>,
1147    ctx: &EvalCtx,
1148) -> Result<Value> {
1149    if let Some(op_expr) = operand {
1150        let op_val = eval_expr(op_expr, ctx)?;
1151        for (cond, result) in conditions {
1152            let cond_val = eval_expr(cond, ctx)?;
1153            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
1154                return eval_expr(result, ctx);
1155            }
1156        }
1157    } else {
1158        for (cond, result) in conditions {
1159            let cond_val = eval_expr(cond, ctx)?;
1160            if is_truthy(&cond_val) {
1161                return eval_expr(result, ctx);
1162            }
1163        }
1164    }
1165    match else_result {
1166        Some(e) => eval_expr(e, ctx),
1167        None => Ok(Value::Null),
1168    }
1169}
1170
1171fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
1172    if val.is_null() {
1173        return Ok(Value::Null);
1174    }
1175    match target {
1176        DataType::Integer => match val {
1177            Value::Integer(_) => Ok(val.clone()),
1178            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1179            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1180            Value::Text(s) => s
1181                .trim()
1182                .parse::<i64>()
1183                .map(Value::Integer)
1184                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1185                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1186            _ => Err(SqlError::InvalidValue(format!(
1187                "cannot cast {} to INTEGER",
1188                val.data_type()
1189            ))),
1190        },
1191        DataType::Real => match val {
1192            Value::Real(_) => Ok(val.clone()),
1193            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1194            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1195            Value::Text(s) => s
1196                .trim()
1197                .parse::<f64>()
1198                .map(Value::Real)
1199                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1200            _ => Err(SqlError::InvalidValue(format!(
1201                "cannot cast {} to REAL",
1202                val.data_type()
1203            ))),
1204        },
1205        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1206        DataType::Boolean => match val {
1207            Value::Boolean(_) => Ok(val.clone()),
1208            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1209            Value::Text(s) => {
1210                let lower = s.trim().to_ascii_lowercase();
1211                match lower.as_str() {
1212                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1213                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1214                    _ => Err(SqlError::InvalidValue(format!(
1215                        "cannot cast '{s}' to BOOLEAN"
1216                    ))),
1217                }
1218            }
1219            _ => Err(SqlError::InvalidValue(format!(
1220                "cannot cast {} to BOOLEAN",
1221                val.data_type()
1222            ))),
1223        },
1224        DataType::Blob => match val {
1225            Value::Blob(_) => Ok(val.clone()),
1226            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1227            _ => Err(SqlError::InvalidValue(format!(
1228                "cannot cast {} to BLOB",
1229                val.data_type()
1230            ))),
1231        },
1232        DataType::Null => Ok(Value::Null),
1233        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1234            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1235        }),
1236        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1237            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1238        }),
1239        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1240            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1241        }),
1242        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1243            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1244        }),
1245        DataType::Json => val.clone().coerce_into(DataType::Json).ok_or_else(|| {
1246            SqlError::InvalidValue(format!("cannot cast {} to JSON", val.data_type()))
1247        }),
1248        DataType::Jsonb => val.clone().coerce_into(DataType::Jsonb).ok_or_else(|| {
1249            SqlError::InvalidValue(format!("cannot cast {} to JSONB", val.data_type()))
1250        }),
1251    }
1252}
1253
1254fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1255    let evaluated: Vec<Value> = args
1256        .iter()
1257        .map(|a| eval_expr(a, ctx))
1258        .collect::<Result<Vec<_>>>()?;
1259
1260    match name {
1261        "LENGTH" => {
1262            check_args(name, &evaluated, 1)?;
1263            match &evaluated[0] {
1264                Value::Null => Ok(Value::Null),
1265                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1266                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1267                _ => Ok(Value::Integer(
1268                    value_to_text(&evaluated[0]).chars().count() as i64
1269                )),
1270            }
1271        }
1272        "UPPER" => {
1273            check_args(name, &evaluated, 1)?;
1274            match &evaluated[0] {
1275                Value::Null => Ok(Value::Null),
1276                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1277                _ => Ok(Value::Text(
1278                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1279                )),
1280            }
1281        }
1282        "LOWER" => {
1283            check_args(name, &evaluated, 1)?;
1284            match &evaluated[0] {
1285                Value::Null => Ok(Value::Null),
1286                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1287                _ => Ok(Value::Text(
1288                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1289                )),
1290            }
1291        }
1292        "SUBSTR" | "SUBSTRING" => {
1293            if evaluated.len() < 2 || evaluated.len() > 3 {
1294                return Err(SqlError::InvalidValue(format!(
1295                    "{name} requires 2 or 3 arguments"
1296                )));
1297            }
1298            if evaluated.iter().any(|v| v.is_null()) {
1299                return Ok(Value::Null);
1300            }
1301            let s = value_to_text(&evaluated[0]);
1302            let chars: Vec<char> = s.chars().collect();
1303            let start = match &evaluated[1] {
1304                Value::Integer(i) => *i,
1305                _ => {
1306                    return Err(SqlError::TypeMismatch {
1307                        expected: "INTEGER".into(),
1308                        got: evaluated[1].data_type().to_string(),
1309                    })
1310                }
1311            };
1312            let len = chars.len() as i64;
1313
1314            let (begin, count) = if evaluated.len() == 3 {
1315                let cnt = match &evaluated[2] {
1316                    Value::Integer(i) => *i,
1317                    _ => {
1318                        return Err(SqlError::TypeMismatch {
1319                            expected: "INTEGER".into(),
1320                            got: evaluated[2].data_type().to_string(),
1321                        })
1322                    }
1323                };
1324                if start >= 1 {
1325                    let b = (start - 1).min(len) as usize;
1326                    let c = cnt.max(0) as usize;
1327                    (b, c)
1328                } else if start == 0 {
1329                    let c = (cnt - 1).max(0) as usize;
1330                    (0usize, c)
1331                } else {
1332                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1333                    (0usize, adjusted_cnt)
1334                }
1335            } else if start >= 1 {
1336                let b = (start - 1).min(len) as usize;
1337                (b, chars.len() - b)
1338            } else if start == 0 {
1339                (0usize, chars.len())
1340            } else {
1341                let b = (len + start).max(0) as usize;
1342                (b, chars.len() - b)
1343            };
1344
1345            let result: String = chars.iter().skip(begin).take(count).collect();
1346            Ok(Value::Text(result.into()))
1347        }
1348        "TRIM" | "LTRIM" | "RTRIM" => {
1349            if evaluated.is_empty() || evaluated.len() > 2 {
1350                return Err(SqlError::InvalidValue(format!(
1351                    "{name} requires 1 or 2 arguments"
1352                )));
1353            }
1354            if evaluated[0].is_null() {
1355                return Ok(Value::Null);
1356            }
1357            let s = value_to_text(&evaluated[0]);
1358            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1359                if evaluated[1].is_null() {
1360                    return Ok(Value::Null);
1361                }
1362                value_to_text(&evaluated[1]).chars().collect()
1363            } else {
1364                vec![' ']
1365            };
1366            let result = match name {
1367                "TRIM" => s
1368                    .trim_matches(|c: char| trim_chars.contains(&c))
1369                    .to_string(),
1370                "LTRIM" => s
1371                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1372                    .to_string(),
1373                "RTRIM" => s
1374                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1375                    .to_string(),
1376                _ => unreachable!(),
1377            };
1378            Ok(Value::Text(result.into()))
1379        }
1380        "REPLACE" => {
1381            check_args(name, &evaluated, 3)?;
1382            if evaluated.iter().any(|v| v.is_null()) {
1383                return Ok(Value::Null);
1384            }
1385            let s = value_to_text(&evaluated[0]);
1386            let from = value_to_text(&evaluated[1]);
1387            let to = value_to_text(&evaluated[2]);
1388            if from.is_empty() {
1389                return Ok(Value::Text(s.into()));
1390            }
1391            Ok(Value::Text(s.replace(&from, &to).into()))
1392        }
1393        "INSTR" => {
1394            check_args(name, &evaluated, 2)?;
1395            if evaluated.iter().any(|v| v.is_null()) {
1396                return Ok(Value::Null);
1397            }
1398            let haystack = value_to_text(&evaluated[0]);
1399            let needle = value_to_text(&evaluated[1]);
1400            let pos = haystack
1401                .find(&needle)
1402                .map(|i| haystack[..i].chars().count() as i64 + 1)
1403                .unwrap_or(0);
1404            Ok(Value::Integer(pos))
1405        }
1406        "CONCAT" => {
1407            if evaluated.is_empty() {
1408                return Ok(Value::Text(CompactString::default()));
1409            }
1410            let mut result = String::new();
1411            for v in &evaluated {
1412                match v {
1413                    Value::Null => {}
1414                    _ => result.push_str(&value_to_text(v)),
1415                }
1416            }
1417            Ok(Value::Text(result.into()))
1418        }
1419        "ABS" => {
1420            check_args(name, &evaluated, 1)?;
1421            match &evaluated[0] {
1422                Value::Null => Ok(Value::Null),
1423                Value::Integer(i) => i
1424                    .checked_abs()
1425                    .map(Value::Integer)
1426                    .ok_or(SqlError::IntegerOverflow),
1427                Value::Real(r) => Ok(Value::Real(r.abs())),
1428                _ => Err(SqlError::TypeMismatch {
1429                    expected: "numeric".into(),
1430                    got: evaluated[0].data_type().to_string(),
1431                }),
1432            }
1433        }
1434        "ROUND" => {
1435            if evaluated.is_empty() || evaluated.len() > 2 {
1436                return Err(SqlError::InvalidValue(
1437                    "ROUND requires 1 or 2 arguments".into(),
1438                ));
1439            }
1440            if evaluated[0].is_null() {
1441                return Ok(Value::Null);
1442            }
1443            let val = match &evaluated[0] {
1444                Value::Integer(i) => *i as f64,
1445                Value::Real(r) => *r,
1446                _ => {
1447                    return Err(SqlError::TypeMismatch {
1448                        expected: "numeric".into(),
1449                        got: evaluated[0].data_type().to_string(),
1450                    })
1451                }
1452            };
1453            let places = if evaluated.len() == 2 {
1454                match &evaluated[1] {
1455                    Value::Null => return Ok(Value::Null),
1456                    Value::Integer(i) => *i,
1457                    _ => {
1458                        return Err(SqlError::TypeMismatch {
1459                            expected: "INTEGER".into(),
1460                            got: evaluated[1].data_type().to_string(),
1461                        })
1462                    }
1463                }
1464            } else {
1465                0
1466            };
1467            let factor = 10f64.powi(places as i32);
1468            let rounded = (val * factor).round() / factor;
1469            Ok(Value::Real(rounded))
1470        }
1471        "CEIL" | "CEILING" => {
1472            check_args(name, &evaluated, 1)?;
1473            match &evaluated[0] {
1474                Value::Null => Ok(Value::Null),
1475                Value::Integer(i) => Ok(Value::Integer(*i)),
1476                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1477                _ => Err(SqlError::TypeMismatch {
1478                    expected: "numeric".into(),
1479                    got: evaluated[0].data_type().to_string(),
1480                }),
1481            }
1482        }
1483        "FLOOR" => {
1484            check_args(name, &evaluated, 1)?;
1485            match &evaluated[0] {
1486                Value::Null => Ok(Value::Null),
1487                Value::Integer(i) => Ok(Value::Integer(*i)),
1488                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1489                _ => Err(SqlError::TypeMismatch {
1490                    expected: "numeric".into(),
1491                    got: evaluated[0].data_type().to_string(),
1492                }),
1493            }
1494        }
1495        "SIGN" => {
1496            check_args(name, &evaluated, 1)?;
1497            match &evaluated[0] {
1498                Value::Null => Ok(Value::Null),
1499                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1500                Value::Real(r) => {
1501                    if *r > 0.0 {
1502                        Ok(Value::Integer(1))
1503                    } else if *r < 0.0 {
1504                        Ok(Value::Integer(-1))
1505                    } else {
1506                        Ok(Value::Integer(0))
1507                    }
1508                }
1509                _ => Err(SqlError::TypeMismatch {
1510                    expected: "numeric".into(),
1511                    got: evaluated[0].data_type().to_string(),
1512                }),
1513            }
1514        }
1515        "SQRT" => {
1516            check_args(name, &evaluated, 1)?;
1517            match &evaluated[0] {
1518                Value::Null => Ok(Value::Null),
1519                Value::Integer(i) => {
1520                    if *i < 0 {
1521                        Ok(Value::Null)
1522                    } else {
1523                        Ok(Value::Real((*i as f64).sqrt()))
1524                    }
1525                }
1526                Value::Real(r) => {
1527                    if *r < 0.0 {
1528                        Ok(Value::Null)
1529                    } else {
1530                        Ok(Value::Real(r.sqrt()))
1531                    }
1532                }
1533                _ => Err(SqlError::TypeMismatch {
1534                    expected: "numeric".into(),
1535                    got: evaluated[0].data_type().to_string(),
1536                }),
1537            }
1538        }
1539        "RANDOM" => {
1540            check_args(name, &evaluated, 0)?;
1541            use std::collections::hash_map::DefaultHasher;
1542            use std::hash::{Hash, Hasher};
1543            use std::time::SystemTime;
1544            let mut hasher = DefaultHasher::new();
1545            SystemTime::now().hash(&mut hasher);
1546            std::thread::current().id().hash(&mut hasher);
1547            let mut val = hasher.finish() as i64;
1548            if val == i64::MIN {
1549                val = i64::MAX;
1550            }
1551            Ok(Value::Integer(val))
1552        }
1553        "TYPEOF" => {
1554            check_args(name, &evaluated, 1)?;
1555            let type_name = match &evaluated[0] {
1556                Value::Null => "null",
1557                Value::Integer(_) => "integer",
1558                Value::Real(_) => "real",
1559                Value::Text(_) => "text",
1560                Value::Blob(_) => "blob",
1561                Value::Boolean(_) => "boolean",
1562                Value::Date(_) => "date",
1563                Value::Time(_) => "time",
1564                Value::Timestamp(_) => "timestamp",
1565                Value::Interval { .. } => "interval",
1566                Value::Json(_) => "json",
1567                Value::Jsonb(_) => "jsonb",
1568            };
1569            Ok(Value::Text(type_name.into()))
1570        }
1571        "MIN" => {
1572            check_args(name, &evaluated, 2)?;
1573            if evaluated[0].is_null() {
1574                return Ok(evaluated[1].clone());
1575            }
1576            if evaluated[1].is_null() {
1577                return Ok(evaluated[0].clone());
1578            }
1579            if evaluated[0] <= evaluated[1] {
1580                Ok(evaluated[0].clone())
1581            } else {
1582                Ok(evaluated[1].clone())
1583            }
1584        }
1585        "MAX" => {
1586            check_args(name, &evaluated, 2)?;
1587            if evaluated[0].is_null() {
1588                return Ok(evaluated[1].clone());
1589            }
1590            if evaluated[1].is_null() {
1591                return Ok(evaluated[0].clone());
1592            }
1593            if evaluated[0] >= evaluated[1] {
1594                Ok(evaluated[0].clone())
1595            } else {
1596                Ok(evaluated[1].clone())
1597            }
1598        }
1599        "HEX" => {
1600            check_args(name, &evaluated, 1)?;
1601            match &evaluated[0] {
1602                Value::Null => Ok(Value::Null),
1603                Value::Blob(b) => {
1604                    let mut s = String::with_capacity(b.len() * 2);
1605                    for byte in b {
1606                        s.push_str(&format!("{byte:02X}"));
1607                    }
1608                    Ok(Value::Text(s.into()))
1609                }
1610                Value::Text(s) => {
1611                    let mut r = String::with_capacity(s.len() * 2);
1612                    for byte in s.as_bytes() {
1613                        r.push_str(&format!("{byte:02X}"));
1614                    }
1615                    Ok(Value::Text(r.into()))
1616                }
1617                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
1618            }
1619        }
1620        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
1621            check_args(name, &evaluated, 0)?;
1622            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
1623        }
1624        "CURRENT_DATE" => {
1625            check_args(name, &evaluated, 0)?;
1626            Ok(Value::Date(crate::datetime::ts_to_date_floor(
1627                crate::datetime::txn_or_clock_micros(),
1628            )))
1629        }
1630        "CURRENT_TIME" | "LOCALTIME" => {
1631            check_args(name, &evaluated, 0)?;
1632            Ok(Value::Time(
1633                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
1634            ))
1635        }
1636        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
1637            check_args(name, &evaluated, 0)?;
1638            let ts = match name {
1639                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
1640                _ => crate::datetime::txn_or_clock_micros(),
1641            };
1642            Ok(Value::Timestamp(ts))
1643        }
1644        "EXTRACT" | "DATE_PART" | "DATEPART" => {
1645            check_args(name, &evaluated, 2)?;
1646            // Borrow the field str without allocating; datetime::extract accepts &str.
1647            let field: &str = match &evaluated[0] {
1648                Value::Null => return Ok(Value::Null),
1649                Value::Text(s) => s.as_str(),
1650                _ => {
1651                    return Err(SqlError::TypeMismatch {
1652                        expected: "TEXT field name".into(),
1653                        got: evaluated[0].data_type().to_string(),
1654                    })
1655                }
1656            };
1657            if evaluated[1].is_null() {
1658                return Ok(Value::Null);
1659            }
1660            crate::datetime::extract(field, &evaluated[1])
1661        }
1662        "DATE_TRUNC" => {
1663            if evaluated.len() < 2 || evaluated.len() > 3 {
1664                return Err(SqlError::InvalidValue(
1665                    "DATE_TRUNC requires 2 or 3 arguments".into(),
1666                ));
1667            }
1668            let unit = match &evaluated[0] {
1669                Value::Null => return Ok(Value::Null),
1670                Value::Text(s) => s.to_string(),
1671                _ => {
1672                    return Err(SqlError::TypeMismatch {
1673                        expected: "TEXT unit name".into(),
1674                        got: evaluated[0].data_type().to_string(),
1675                    })
1676                }
1677            };
1678            if evaluated[1].is_null() {
1679                return Ok(Value::Null);
1680            }
1681            // Optional tz arg: truncate in that zone, then convert back to UTC.
1682            if evaluated.len() == 3 {
1683                if let Value::Text(tz) = &evaluated[2] {
1684                    if !tz.eq_ignore_ascii_case("UTC") {
1685                        if let Value::Timestamp(ts) = &evaluated[1] {
1686                            return date_trunc_in_zone(&unit, *ts, tz);
1687                        }
1688                    }
1689                }
1690            }
1691            crate::datetime::date_trunc(&unit, &evaluated[1])
1692        }
1693        "DATE_BIN" => {
1694            check_args(name, &evaluated, 3)?;
1695            if evaluated.iter().any(|v| v.is_null()) {
1696                return Ok(Value::Null);
1697            }
1698            let stride = match &evaluated[0] {
1699                Value::Interval {
1700                    months: _,
1701                    days,
1702                    micros,
1703                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
1704                _ => {
1705                    return Err(SqlError::TypeMismatch {
1706                        expected: "INTERVAL stride".into(),
1707                        got: evaluated[0].data_type().to_string(),
1708                    })
1709                }
1710            };
1711            if stride <= 0 {
1712                return Err(SqlError::InvalidValue(
1713                    "DATE_BIN stride must be positive".into(),
1714                ));
1715            }
1716            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
1717                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
1718                _ => {
1719                    return Err(SqlError::TypeMismatch {
1720                        expected: "TIMESTAMP, TIMESTAMP".into(),
1721                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
1722                    })
1723                }
1724            };
1725            let diff = src - origin;
1726            let binned = origin + (diff.div_euclid(stride)) * stride;
1727            Ok(Value::Timestamp(binned))
1728        }
1729        "AGE" => {
1730            if evaluated.len() == 1 {
1731                if evaluated[0].is_null() {
1732                    return Ok(Value::Null);
1733                }
1734                let ts = match &evaluated[0] {
1735                    Value::Timestamp(t) => *t,
1736                    Value::Date(d) => crate::datetime::date_to_ts(*d),
1737                    _ => {
1738                        return Err(SqlError::TypeMismatch {
1739                            expected: "TIMESTAMP or DATE".into(),
1740                            got: evaluated[0].data_type().to_string(),
1741                        })
1742                    }
1743                };
1744                // Implicit reference: today at midnight UTC.
1745                let today = crate::datetime::today_days();
1746                let midnight = crate::datetime::date_to_ts(today);
1747                let (m, d, u) = crate::datetime::age(midnight, ts)?;
1748                return Ok(Value::Interval {
1749                    months: m,
1750                    days: d,
1751                    micros: u,
1752                });
1753            }
1754            check_args(name, &evaluated, 2)?;
1755            if evaluated.iter().any(|v| v.is_null()) {
1756                return Ok(Value::Null);
1757            }
1758            let a = ts_of(&evaluated[0])?;
1759            let b = ts_of(&evaluated[1])?;
1760            let (m, d, u) = crate::datetime::age(a, b)?;
1761            Ok(Value::Interval {
1762                months: m,
1763                days: d,
1764                micros: u,
1765            })
1766        }
1767        "MAKE_DATE" => {
1768            check_args(name, &evaluated, 3)?;
1769            if evaluated.iter().any(|v| v.is_null()) {
1770                return Ok(Value::Null);
1771            }
1772            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
1773            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
1774            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
1775            crate::datetime::ymd_to_days(y, m, d)
1776                .map(Value::Date)
1777                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
1778        }
1779        "MAKE_TIME" => {
1780            check_args(name, &evaluated, 3)?;
1781            if evaluated.iter().any(|v| v.is_null()) {
1782                return Ok(Value::Null);
1783            }
1784            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
1785            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
1786            let (s, us) = real_sec_arg(&evaluated[2])?;
1787            crate::datetime::hmsn_to_micros(h, mi, s, us)
1788                .map(Value::Time)
1789                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
1790        }
1791        "MAKE_TIMESTAMP" => {
1792            check_args(name, &evaluated, 6)?;
1793            if evaluated.iter().any(|v| v.is_null()) {
1794                return Ok(Value::Null);
1795            }
1796            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
1797            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
1798            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
1799            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
1800            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
1801            let (s, us) = real_sec_arg(&evaluated[5])?;
1802            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
1803                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
1804            })?;
1805            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
1806                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
1807            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
1808        }
1809        "MAKE_INTERVAL" => {
1810            // Positional args: years, months, weeks, days, hours, mins, secs.
1811            if evaluated.len() > 7 {
1812                return Err(SqlError::InvalidValue(
1813                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
1814                ));
1815            }
1816            let mut months: i64 = 0;
1817            let mut days: i64 = 0;
1818            let mut micros: i64 = 0;
1819            for (i, v) in evaluated.iter().enumerate() {
1820                if v.is_null() {
1821                    continue;
1822                }
1823                let n = match v {
1824                    Value::Integer(n) => *n,
1825                    Value::Real(r) => *r as i64,
1826                    _ => {
1827                        return Err(SqlError::TypeMismatch {
1828                            expected: "numeric".into(),
1829                            got: v.data_type().to_string(),
1830                        })
1831                    }
1832                };
1833                match i {
1834                    0 => months = months.saturating_add(n.saturating_mul(12)),
1835                    1 => months = months.saturating_add(n),
1836                    2 => days = days.saturating_add(n.saturating_mul(7)),
1837                    3 => days = days.saturating_add(n),
1838                    4 => {
1839                        micros = micros
1840                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
1841                    }
1842                    5 => {
1843                        micros =
1844                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
1845                    }
1846                    6 => {
1847                        // Seconds may be fractional — also check Real.
1848                        if let Value::Real(r) = v {
1849                            micros = micros.saturating_add(
1850                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
1851                            );
1852                        } else {
1853                            micros = micros
1854                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
1855                        }
1856                    }
1857                    _ => unreachable!(),
1858                }
1859            }
1860            Ok(Value::Interval {
1861                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1862                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1863                micros,
1864            })
1865        }
1866        "JUSTIFY_DAYS" => {
1867            check_args(name, &evaluated, 1)?;
1868            match &evaluated[0] {
1869                Value::Null => Ok(Value::Null),
1870                Value::Interval {
1871                    months,
1872                    days,
1873                    micros,
1874                } => {
1875                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
1876                    Ok(Value::Interval {
1877                        months: m,
1878                        days: d,
1879                        micros: u,
1880                    })
1881                }
1882                other => Err(SqlError::TypeMismatch {
1883                    expected: "INTERVAL".into(),
1884                    got: other.data_type().to_string(),
1885                }),
1886            }
1887        }
1888        "JUSTIFY_HOURS" => {
1889            check_args(name, &evaluated, 1)?;
1890            match &evaluated[0] {
1891                Value::Null => Ok(Value::Null),
1892                Value::Interval {
1893                    months,
1894                    days,
1895                    micros,
1896                } => {
1897                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
1898                    Ok(Value::Interval {
1899                        months: m,
1900                        days: d,
1901                        micros: u,
1902                    })
1903                }
1904                other => Err(SqlError::TypeMismatch {
1905                    expected: "INTERVAL".into(),
1906                    got: other.data_type().to_string(),
1907                }),
1908            }
1909        }
1910        "JUSTIFY_INTERVAL" => {
1911            check_args(name, &evaluated, 1)?;
1912            match &evaluated[0] {
1913                Value::Null => Ok(Value::Null),
1914                Value::Interval {
1915                    months,
1916                    days,
1917                    micros,
1918                } => {
1919                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
1920                    Ok(Value::Interval {
1921                        months: m,
1922                        days: d,
1923                        micros: u,
1924                    })
1925                }
1926                other => Err(SqlError::TypeMismatch {
1927                    expected: "INTERVAL".into(),
1928                    got: other.data_type().to_string(),
1929                }),
1930            }
1931        }
1932        "ISFINITE" => {
1933            check_args(name, &evaluated, 1)?;
1934            if evaluated[0].is_null() {
1935                return Ok(Value::Null);
1936            }
1937            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
1938        }
1939        "DATE" => {
1940            if evaluated.is_empty() {
1941                return Err(SqlError::InvalidValue(
1942                    "DATE requires at least 1 argument".into(),
1943                ));
1944            }
1945            if evaluated[0].is_null() {
1946                return Ok(Value::Null);
1947            }
1948            let d = match &evaluated[0] {
1949                Value::Date(d) => *d,
1950                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
1951                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
1952                Value::Text(s) => crate::datetime::parse_date(s)?,
1953                Value::Integer(n) => {
1954                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
1955                }
1956                other => {
1957                    return Err(SqlError::TypeMismatch {
1958                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
1959                        got: other.data_type().to_string(),
1960                    })
1961                }
1962            };
1963            Ok(Value::Date(d))
1964        }
1965        "TIME" => {
1966            if evaluated.is_empty() {
1967                return Err(SqlError::InvalidValue(
1968                    "TIME requires at least 1 argument".into(),
1969                ));
1970            }
1971            if evaluated[0].is_null() {
1972                return Ok(Value::Null);
1973            }
1974            let t = match &evaluated[0] {
1975                Value::Time(t) => *t,
1976                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
1977                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
1978                    crate::datetime::current_time_micros()
1979                }
1980                Value::Text(s) => crate::datetime::parse_time(s)?,
1981                other => {
1982                    return Err(SqlError::TypeMismatch {
1983                        expected: "TIMESTAMP, TIME, or TEXT".into(),
1984                        got: other.data_type().to_string(),
1985                    })
1986                }
1987            };
1988            Ok(Value::Time(t))
1989        }
1990        "DATETIME" => {
1991            if evaluated.is_empty() {
1992                return Err(SqlError::InvalidValue(
1993                    "DATETIME requires at least 1 argument".into(),
1994                ));
1995            }
1996            if evaluated[0].is_null() {
1997                return Ok(Value::Null);
1998            }
1999            let t = match &evaluated[0] {
2000                Value::Timestamp(t) => *t,
2001                Value::Date(d) => crate::datetime::date_to_ts(*d),
2002                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
2003                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
2004                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
2005                other => {
2006                    return Err(SqlError::TypeMismatch {
2007                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2008                        got: other.data_type().to_string(),
2009                    })
2010                }
2011            };
2012            Ok(Value::Timestamp(t))
2013        }
2014        "STRFTIME" => {
2015            if evaluated.len() < 2 {
2016                return Err(SqlError::InvalidValue(
2017                    "STRFTIME requires format + value".into(),
2018                ));
2019            }
2020            if evaluated.iter().take(2).any(|v| v.is_null()) {
2021                return Ok(Value::Null);
2022            }
2023            let fmt = match &evaluated[0] {
2024                Value::Text(s) => s.to_string(),
2025                _ => {
2026                    return Err(SqlError::TypeMismatch {
2027                        expected: "TEXT format".into(),
2028                        got: evaluated[0].data_type().to_string(),
2029                    })
2030                }
2031            };
2032            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
2033            Ok(Value::Text(out.into()))
2034        }
2035        "JULIANDAY" => {
2036            if evaluated.is_empty() {
2037                return Err(SqlError::InvalidValue(
2038                    "JULIANDAY requires at least 1 argument".into(),
2039                ));
2040            }
2041            if evaluated[0].is_null() {
2042                return Ok(Value::Null);
2043            }
2044            let micros = ts_of(&evaluated[0])?;
2045            let (days, tmicros) = crate::datetime::ts_split(micros);
2046            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
2047            let julian =
2048                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
2049            Ok(Value::Real(julian))
2050        }
2051        "UNIXEPOCH" => {
2052            if evaluated.is_empty() {
2053                return Err(SqlError::InvalidValue(
2054                    "UNIXEPOCH requires at least 1 argument".into(),
2055                ));
2056            }
2057            if evaluated[0].is_null() {
2058                return Ok(Value::Null);
2059            }
2060            let micros = ts_of(&evaluated[0])?;
2061            let subsec = evaluated
2062                .get(1)
2063                .and_then(|v| {
2064                    if let Value::Text(s) = v {
2065                        Some(s.to_string())
2066                    } else {
2067                        None
2068                    }
2069                })
2070                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
2071                .unwrap_or(false);
2072            if subsec {
2073                Ok(Value::Real(
2074                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
2075                ))
2076            } else {
2077                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
2078            }
2079        }
2080        "TIMEDIFF" => {
2081            check_args(name, &evaluated, 2)?;
2082            if evaluated.iter().any(|v| v.is_null()) {
2083                return Ok(Value::Null);
2084            }
2085            let a = ts_of(&evaluated[0])?;
2086            let b = ts_of(&evaluated[1])?;
2087            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
2088            let sign = if days < 0 || (days == 0 && micros < 0) {
2089                "-"
2090            } else {
2091                "+"
2092            };
2093            let abs_days = days.unsigned_abs() as i64;
2094            let abs_us = micros.unsigned_abs() as i64;
2095            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
2096            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
2097            Ok(Value::Text(
2098                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
2099            ))
2100        }
2101        "AT_TIMEZONE" => {
2102            check_args(name, &evaluated, 2)?;
2103            if evaluated.iter().any(|v| v.is_null()) {
2104                return Ok(Value::Null);
2105            }
2106            let ts = match &evaluated[0] {
2107                Value::Timestamp(t) => *t,
2108                Value::Date(d) => crate::datetime::date_to_ts(*d),
2109                other => {
2110                    return Err(SqlError::TypeMismatch {
2111                        expected: "TIMESTAMP or DATE".into(),
2112                        got: other.data_type().to_string(),
2113                    })
2114                }
2115            };
2116            let zone = match &evaluated[1] {
2117                Value::Text(s) => s.to_string(),
2118                _ => {
2119                    return Err(SqlError::TypeMismatch {
2120                        expected: "TEXT time zone".into(),
2121                        got: evaluated[1].data_type().to_string(),
2122                    })
2123                }
2124            };
2125            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
2126            let upper = zone.to_ascii_uppercase();
2127            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
2128                return Err(SqlError::InvalidTimezone(format!(
2129                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
2130                )));
2131            }
2132            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
2133            Ok(Value::Text(formatted.into()))
2134        }
2135        "JSONB_TYPEOF" | "JSON_TYPEOF" => {
2136            check_args(name, &evaluated, 1)?;
2137            if evaluated[0].is_null() {
2138                return Ok(Value::Null);
2139            }
2140            crate::json::fn_typeof(&evaluated[0])
2141        }
2142        "JSONB_ARRAY_LENGTH" | "JSON_ARRAY_LENGTH" => {
2143            check_args(name, &evaluated, 1)?;
2144            if evaluated[0].is_null() {
2145                return Ok(Value::Null);
2146            }
2147            crate::json::fn_array_length(&evaluated[0])
2148        }
2149        "JSONB_OBJECT_LENGTH" | "JSON_OBJECT_LENGTH" => {
2150            check_args(name, &evaluated, 1)?;
2151            if evaluated[0].is_null() {
2152                return Ok(Value::Null);
2153            }
2154            crate::json::fn_object_length(&evaluated[0])
2155        }
2156        "JSONB_EXTRACT_PATH" | "JSON_EXTRACT_PATH" => {
2157            if evaluated.is_empty() {
2158                return Err(SqlError::InvalidValue(format!(
2159                    "{name} requires at least 1 argument"
2160                )));
2161            }
2162            if evaluated[0].is_null() {
2163                return Ok(Value::Null);
2164            }
2165            let target = if name.eq_ignore_ascii_case("JSONB_EXTRACT_PATH") {
2166                crate::types::DataType::Jsonb
2167            } else {
2168                crate::types::DataType::Json
2169            };
2170            crate::json::fn_extract_path(&evaluated, target, false)
2171        }
2172        "JSONB_EXTRACT_PATH_TEXT" | "JSON_EXTRACT_PATH_TEXT" => {
2173            if evaluated.is_empty() {
2174                return Err(SqlError::InvalidValue(format!(
2175                    "{name} requires at least 1 argument"
2176                )));
2177            }
2178            if evaluated[0].is_null() {
2179                return Ok(Value::Null);
2180            }
2181            crate::json::fn_extract_path(&evaluated, crate::types::DataType::Text, true)
2182        }
2183        "JSON_EXTRACT" => {
2184            check_args(name, &evaluated, 2)?;
2185            if evaluated[0].is_null() || evaluated[1].is_null() {
2186                return Ok(Value::Null);
2187            }
2188            crate::json::fn_sqlite_extract(&evaluated[0], &evaluated[1])
2189        }
2190        "JSON_VALID" => {
2191            check_args(name, &evaluated, 1)?;
2192            if evaluated[0].is_null() {
2193                return Ok(Value::Null);
2194            }
2195            crate::json::fn_valid(&evaluated[0])
2196        }
2197        "JSONB_STRIP_NULLS" | "JSON_STRIP_NULLS" => {
2198            check_args(name, &evaluated, 1)?;
2199            if evaluated[0].is_null() {
2200                return Ok(Value::Null);
2201            }
2202            let target = if name.eq_ignore_ascii_case("JSONB_STRIP_NULLS") {
2203                crate::types::DataType::Jsonb
2204            } else {
2205                crate::types::DataType::Json
2206            };
2207            crate::json::fn_strip_nulls(&evaluated[0], target)
2208        }
2209        "JSONB_PRETTY" | "JSON_PRETTY" => {
2210            check_args(name, &evaluated, 1)?;
2211            if evaluated[0].is_null() {
2212                return Ok(Value::Null);
2213            }
2214            crate::json::fn_pretty(&evaluated[0])
2215        }
2216        "JSONB_BUILD_OBJECT" | "JSON_BUILD_OBJECT" => {
2217            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_OBJECT") {
2218                crate::types::DataType::Jsonb
2219            } else {
2220                crate::types::DataType::Json
2221            };
2222            crate::json::fn_build_object(&evaluated, target)
2223        }
2224        "JSONB_BUILD_ARRAY" | "JSON_BUILD_ARRAY" => {
2225            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_ARRAY") {
2226                crate::types::DataType::Jsonb
2227            } else {
2228                crate::types::DataType::Json
2229            };
2230            crate::json::fn_build_array(&evaluated, target)
2231        }
2232        "JSONB_SET" | "JSON_SET" => {
2233            if !(3..=4).contains(&evaluated.len()) {
2234                return Err(SqlError::InvalidValue(format!(
2235                    "{name} requires 3 or 4 arguments"
2236                )));
2237            }
2238            if evaluated[0].is_null() {
2239                return Ok(Value::Null);
2240            }
2241            let target = if name.eq_ignore_ascii_case("JSONB_SET") {
2242                crate::types::DataType::Jsonb
2243            } else {
2244                crate::types::DataType::Json
2245            };
2246            let create_missing = evaluated
2247                .get(3)
2248                .map(|v| matches!(v, Value::Boolean(true)))
2249                .unwrap_or(true);
2250            crate::json::fn_set(
2251                &evaluated[0],
2252                &evaluated[1],
2253                &evaluated[2],
2254                create_missing,
2255                target,
2256            )
2257        }
2258        "JSONB_INSERT" | "JSON_INSERT" => {
2259            if !(3..=4).contains(&evaluated.len()) {
2260                return Err(SqlError::InvalidValue(format!(
2261                    "{name} requires 3 or 4 arguments"
2262                )));
2263            }
2264            if evaluated[0].is_null() {
2265                return Ok(Value::Null);
2266            }
2267            let target = if name.eq_ignore_ascii_case("JSONB_INSERT") {
2268                crate::types::DataType::Jsonb
2269            } else {
2270                crate::types::DataType::Json
2271            };
2272            let insert_after = evaluated
2273                .get(3)
2274                .map(|v| matches!(v, Value::Boolean(true)))
2275                .unwrap_or(false);
2276            crate::json::fn_insert(
2277                &evaluated[0],
2278                &evaluated[1],
2279                &evaluated[2],
2280                insert_after,
2281                target,
2282            )
2283        }
2284        "TO_JSONB" | "TO_JSON" => {
2285            check_args(name, &evaluated, 1)?;
2286            let target = if name.eq_ignore_ascii_case("TO_JSONB") {
2287                crate::types::DataType::Jsonb
2288            } else {
2289                crate::types::DataType::Json
2290            };
2291            crate::json::fn_to_json(&evaluated[0], target)
2292        }
2293        "ROW_TO_JSON" | "ROW_TO_JSONB" => {
2294            check_args(name, &evaluated, 1)?;
2295            let target = if name.eq_ignore_ascii_case("ROW_TO_JSONB") {
2296                crate::types::DataType::Jsonb
2297            } else {
2298                crate::types::DataType::Json
2299            };
2300            crate::json::fn_to_json(&evaluated[0], target)
2301        }
2302        "JSON_OBJECT" => crate::json::fn_json_object(&evaluated),
2303        "JSON_EXISTS" => {
2304            check_args(name, &evaluated, 2)?;
2305            if evaluated[0].is_null() || evaluated[1].is_null() {
2306                return Ok(Value::Null);
2307            }
2308            crate::json::fn_json_exists(&evaluated[0], &evaluated[1])
2309        }
2310        "JSON_VALUE" => {
2311            check_args(name, &evaluated, 2)?;
2312            if evaluated[0].is_null() || evaluated[1].is_null() {
2313                return Ok(Value::Null);
2314            }
2315            crate::json::fn_json_value(&evaluated[0], &evaluated[1])
2316        }
2317        "JSON_QUERY" => {
2318            check_args(name, &evaluated, 2)?;
2319            if evaluated[0].is_null() || evaluated[1].is_null() {
2320                return Ok(Value::Null);
2321            }
2322            crate::json::fn_json_query(&evaluated[0], &evaluated[1], crate::types::DataType::Jsonb)
2323        }
2324        "JSONB_HAS_KEY" | "JSON_HAS_KEY" => {
2325            check_args(name, &evaluated, 2)?;
2326            if evaluated[0].is_null() || evaluated[1].is_null() {
2327                return Ok(Value::Null);
2328            }
2329            crate::json::op_has_key(&evaluated[0], &evaluated[1])
2330        }
2331        "JSONB_HAS_ANY_KEY" | "JSON_HAS_ANY_KEY" => {
2332            check_args(name, &evaluated, 2)?;
2333            if evaluated[0].is_null() || evaluated[1].is_null() {
2334                return Ok(Value::Null);
2335            }
2336            crate::json::op_has_any_key(&evaluated[0], &evaluated[1])
2337        }
2338        "JSONB_HAS_ALL_KEYS" | "JSON_HAS_ALL_KEYS" => {
2339            check_args(name, &evaluated, 2)?;
2340            if evaluated[0].is_null() || evaluated[1].is_null() {
2341                return Ok(Value::Null);
2342            }
2343            crate::json::op_has_all_keys(&evaluated[0], &evaluated[1])
2344        }
2345        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
2346    }
2347}
2348
2349/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
2350fn ts_of(v: &Value) -> Result<i64> {
2351    match v {
2352        Value::Timestamp(t) => Ok(*t),
2353        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
2354        _ => Err(SqlError::TypeMismatch {
2355            expected: "TIMESTAMP or DATE".into(),
2356            got: v.data_type().to_string(),
2357        }),
2358    }
2359}
2360
2361fn int_arg(v: &Value, label: &str) -> Result<i64> {
2362    match v {
2363        Value::Integer(n) => Ok(*n),
2364        _ => Err(SqlError::TypeMismatch {
2365            expected: format!("INTEGER ({label})"),
2366            got: v.data_type().to_string(),
2367        }),
2368    }
2369}
2370
2371/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
2372fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
2373    match v {
2374        Value::Integer(n) => {
2375            if !(0..=60).contains(n) {
2376                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
2377            }
2378            Ok((*n as u8, 0))
2379        }
2380        Value::Real(r) => {
2381            let whole = r.trunc() as i64;
2382            if !(0..=60).contains(&whole) {
2383                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
2384            }
2385            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
2386            Ok((whole as u8, frac.max(0) as u32))
2387        }
2388        _ => Err(SqlError::TypeMismatch {
2389            expected: "numeric seconds".into(),
2390            got: v.data_type().to_string(),
2391        }),
2392    }
2393}
2394
2395/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
2396fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
2397    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
2398    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
2399    let ts = JTimestamp::from_microsecond(ts_utc)
2400        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
2401    let zoned = ts.to_zoned(zone.clone());
2402    let unit_lower = unit.to_ascii_lowercase();
2403    let rounded = match unit_lower.as_str() {
2404        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
2405        "second" => zoned
2406            .start_of_day()
2407            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
2408        _ => {
2409            let naive_ts = zoned.timestamp().as_microsecond();
2410            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
2411        }
2412    };
2413    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
2414}
2415
2416fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
2417    if args.len() != expected {
2418        Err(SqlError::InvalidValue(format!(
2419            "{name} requires {expected} argument(s), got {}",
2420            args.len()
2421        )))
2422    } else {
2423        Ok(())
2424    }
2425}
2426
2427pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
2428    let mut indices = Vec::new();
2429    collect_column_refs(expr, columns, &mut indices);
2430    indices.sort_unstable();
2431    indices.dedup();
2432    indices
2433}
2434
2435fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
2436    match expr {
2437        Expr::Column(name) => {
2438            for (i, c) in columns.iter().enumerate() {
2439                if c.name == *name
2440                    || (c.name.len() > name.len()
2441                        && c.name.as_bytes()[c.name.len() - name.len() - 1] == b'.'
2442                        && c.name.ends_with(name.as_str()))
2443                {
2444                    out.push(i);
2445                    break;
2446                }
2447            }
2448        }
2449        Expr::QualifiedColumn { table, column } => {
2450            let mut found: Option<usize> = None;
2451            let mut bare_match: Option<usize> = None;
2452            let mut bare_count = 0usize;
2453            for (i, c) in columns.iter().enumerate() {
2454                if c.name.len() == table.len() + 1 + column.len()
2455                    && c.name.as_bytes()[table.len()] == b'.'
2456                    && c.name.starts_with(table.as_str())
2457                    && c.name.ends_with(column.as_str())
2458                {
2459                    found = Some(i);
2460                    break;
2461                }
2462                if c.name == *column {
2463                    bare_match = Some(i);
2464                    bare_count += 1;
2465                }
2466            }
2467            if let Some(idx) = found {
2468                out.push(idx);
2469            } else if bare_count == 1 {
2470                out.push(bare_match.unwrap());
2471            }
2472        }
2473        Expr::BinaryOp { left, right, .. } => {
2474            collect_column_refs(left, columns, out);
2475            collect_column_refs(right, columns, out);
2476        }
2477        Expr::UnaryOp { expr, .. } => {
2478            collect_column_refs(expr, columns, out);
2479        }
2480        Expr::IsNull(e) | Expr::IsNotNull(e) => {
2481            collect_column_refs(e, columns, out);
2482        }
2483        Expr::Function { args, .. } => {
2484            for arg in args {
2485                collect_column_refs(arg, columns, out);
2486            }
2487        }
2488        Expr::InSubquery { expr, .. } => {
2489            collect_column_refs(expr, columns, out);
2490        }
2491        Expr::InList { expr, list, .. } => {
2492            collect_column_refs(expr, columns, out);
2493            for item in list {
2494                collect_column_refs(item, columns, out);
2495            }
2496        }
2497        Expr::InSet { expr, .. } => {
2498            collect_column_refs(expr, columns, out);
2499        }
2500        Expr::Between {
2501            expr, low, high, ..
2502        } => {
2503            collect_column_refs(expr, columns, out);
2504            collect_column_refs(low, columns, out);
2505            collect_column_refs(high, columns, out);
2506        }
2507        Expr::Like {
2508            expr,
2509            pattern,
2510            escape,
2511            ..
2512        } => {
2513            collect_column_refs(expr, columns, out);
2514            collect_column_refs(pattern, columns, out);
2515            if let Some(esc) = escape {
2516                collect_column_refs(esc, columns, out);
2517            }
2518        }
2519        Expr::Case {
2520            operand,
2521            conditions,
2522            else_result,
2523        } => {
2524            if let Some(op) = operand {
2525                collect_column_refs(op, columns, out);
2526            }
2527            for (when, then) in conditions {
2528                collect_column_refs(when, columns, out);
2529                collect_column_refs(then, columns, out);
2530            }
2531            if let Some(e) = else_result {
2532                collect_column_refs(e, columns, out);
2533            }
2534        }
2535        Expr::Coalesce(args) => {
2536            for arg in args {
2537                collect_column_refs(arg, columns, out);
2538            }
2539        }
2540        Expr::Cast { expr, .. } => {
2541            collect_column_refs(expr, columns, out);
2542        }
2543        Expr::Collate { expr, .. } => {
2544            collect_column_refs(expr, columns, out);
2545        }
2546        Expr::WindowFunction { args, spec, .. } => {
2547            for arg in args {
2548                collect_column_refs(arg, columns, out);
2549            }
2550            for pb in &spec.partition_by {
2551                collect_column_refs(pb, columns, out);
2552            }
2553            for ob in &spec.order_by {
2554                collect_column_refs(&ob.expr, columns, out);
2555            }
2556        }
2557        Expr::Literal(_)
2558        | Expr::Parameter(_)
2559        | Expr::CountStar
2560        | Expr::Exists { .. }
2561        | Expr::ScalarSubquery(_) => {}
2562    }
2563}
2564
2565/// Check if an expression result is truthy (for WHERE/HAVING).
2566pub fn is_truthy(val: &Value) -> bool {
2567    match val {
2568        Value::Boolean(b) => *b,
2569        Value::Integer(i) => *i != 0,
2570        Value::Null => false,
2571        _ => true,
2572    }
2573}
2574
2575#[cfg(test)]
2576#[path = "eval_tests.rs"]
2577mod tests;