Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9#[derive(Debug)]
10pub struct ColumnMap {
11    exact: FxHashMap<String, usize>,
12    short: FxHashMap<String, ShortMatch>,
13    collations: Vec<crate::types::Collation>,
14    has_non_binary_collation: bool,
15}
16
17#[derive(Clone, Debug)]
18enum ShortMatch {
19    Unique(usize),
20    Ambiguous,
21}
22
23impl Clone for ColumnMap {
24    fn clone(&self) -> Self {
25        Self {
26            exact: self.exact.clone(),
27            short: self.short.clone(),
28            collations: self.collations.clone(),
29            has_non_binary_collation: self.has_non_binary_collation,
30        }
31    }
32}
33
34impl ColumnMap {
35    pub fn new(columns: &[ColumnDef]) -> Self {
36        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
37        let mut short: FxHashMap<String, ShortMatch> =
38            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
39        let mut collations = Vec::with_capacity(columns.len());
40        let mut has_non_binary_collation = false;
41
42        for (i, col) in columns.iter().enumerate() {
43            let lower = col.name.to_ascii_lowercase();
44            exact.insert(lower.clone(), i);
45
46            let unqualified = if let Some(dot) = lower.rfind('.') {
47                &lower[dot + 1..]
48            } else {
49                &lower
50            };
51            short
52                .entry(unqualified.to_string())
53                .and_modify(|e| *e = ShortMatch::Ambiguous)
54                .or_insert(ShortMatch::Unique(i));
55            collations.push(col.collation);
56            if col.collation != crate::types::Collation::Binary {
57                has_non_binary_collation = true;
58            }
59        }
60
61        Self {
62            exact,
63            short,
64            collations,
65            has_non_binary_collation,
66        }
67    }
68
69    pub(crate) fn collation_at(&self, idx: usize) -> crate::types::Collation {
70        self.collations
71            .get(idx)
72            .copied()
73            .unwrap_or(crate::types::Collation::Binary)
74    }
75
76    #[inline]
77    pub(crate) fn has_non_binary_collation(&self) -> bool {
78        self.has_non_binary_collation
79    }
80
81    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
82        if let Some(&idx) = self.exact.get(name) {
83            return Ok(idx);
84        }
85        match self.short.get(name) {
86            Some(ShortMatch::Unique(idx)) => Ok(*idx),
87            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
88            None => Err(SqlError::ColumnNotFound(name.to_string())),
89        }
90    }
91
92    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
93        let qualified = format!("{table}.{column}");
94        if let Some(&idx) = self.exact.get(&qualified) {
95            return Ok(idx);
96        }
97        match self.short.get(column) {
98            Some(ShortMatch::Unique(idx)) => Ok(*idx),
99            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
100        }
101    }
102}
103
104pub struct EvalCtx<'a> {
105    pub col_map: &'a ColumnMap,
106    pub row: &'a [Value],
107    pub params: &'a [Value],
108    pub excluded: Option<ExcludedRow<'a>>,
109    pub old_new: Option<OldNewRows<'a>>,
110    pub session_tz: Option<jiff::tz::TimeZone>,
111}
112
113pub struct ExcludedRow<'a> {
114    pub col_map: &'a ColumnMap,
115    pub row: &'a [Value],
116}
117
118pub struct OldNewRows<'a> {
119    pub col_map: &'a ColumnMap,
120    pub old_row: Option<&'a [Value]>,
121    pub new_row: Option<&'a [Value]>,
122}
123
124impl<'a> EvalCtx<'a> {
125    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
126        Self {
127            col_map,
128            row,
129            params: &[],
130            excluded: None,
131            old_new: None,
132            session_tz: None,
133        }
134    }
135
136    pub fn with_session_tz(mut self, tz: Option<jiff::tz::TimeZone>) -> Self {
137        self.session_tz = tz;
138        self
139    }
140
141    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
142        Self {
143            col_map,
144            row,
145            params,
146            excluded: None,
147            old_new: None,
148            session_tz: None,
149        }
150    }
151
152    pub fn with_excluded(
153        col_map: &'a ColumnMap,
154        row: &'a [Value],
155        excluded_col_map: &'a ColumnMap,
156        excluded_row: &'a [Value],
157    ) -> Self {
158        Self {
159            col_map,
160            row,
161            params: &[],
162            excluded: Some(ExcludedRow {
163                col_map: excluded_col_map,
164                row: excluded_row,
165            }),
166            old_new: None,
167            session_tz: None,
168        }
169    }
170
171    pub fn with_old_new(
172        col_map: &'a ColumnMap,
173        row: &'a [Value],
174        old_row: Option<&'a [Value]>,
175        new_row: Option<&'a [Value]>,
176    ) -> Self {
177        Self {
178            col_map,
179            row,
180            params: &[],
181            excluded: None,
182            old_new: Some(OldNewRows {
183                col_map,
184                old_row,
185                new_row,
186            }),
187            session_tz: None,
188        }
189    }
190}
191
192thread_local! {
193    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
194        const { std::cell::Cell::new((std::ptr::null(), 0)) };
195}
196
197pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
198    struct Guard((*const Value, usize));
199    impl Drop for Guard {
200        fn drop(&mut self) {
201            SCOPED_PARAMS.with(|slot| slot.set(self.0));
202        }
203    }
204    SCOPED_PARAMS.with(|slot| {
205        let prev = slot.get();
206        slot.set((params.as_ptr(), params.len()));
207        let _guard = Guard(prev);
208        f()
209    })
210}
211
212fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
213    if !ctx_params.is_empty() {
214        if n == 0 || n > ctx_params.len() {
215            return Err(SqlError::ParameterCountMismatch {
216                expected: n,
217                got: ctx_params.len(),
218            });
219        }
220        return Ok(ctx_params[n - 1].clone());
221    }
222    resolve_scoped_param(n)
223}
224
225pub fn resolve_scoped_param(n: usize) -> Result<Value> {
226    SCOPED_PARAMS.with(|slot| {
227        let (ptr, len) = slot.get();
228        if n == 0 || n > len {
229            return Err(SqlError::ParameterCountMismatch {
230                expected: n,
231                got: len,
232            });
233        }
234        // SAFETY: `with_scoped_params` keeps the slice alive for the body's run.
235        unsafe { Ok((*ptr.add(n - 1)).clone()) }
236    })
237}
238
239pub(crate) enum CompiledExpr<'a> {
240    Const(Value),
241    Slot(usize),
242    Param(usize),
243    BinaryOp {
244        left: Box<CompiledExpr<'a>>,
245        op: BinOp,
246        right: Box<CompiledExpr<'a>>,
247        collation: Option<crate::types::Collation>,
248    },
249    UnaryOp {
250        op: UnaryOp,
251        operand: Box<CompiledExpr<'a>>,
252    },
253    IsNull(Box<CompiledExpr<'a>>),
254    IsNotNull(Box<CompiledExpr<'a>>),
255    Dynamic(&'a Expr),
256}
257
258impl<'a> CompiledExpr<'a> {
259    pub(crate) fn compile(expr: &'a Expr, col_map: &ColumnMap) -> Self {
260        match expr {
261            Expr::Literal(v) => CompiledExpr::Const(v.clone()),
262            Expr::Parameter(n) => CompiledExpr::Param(*n),
263            Expr::Column(name) => match col_map.resolve(name) {
264                Ok(idx) => CompiledExpr::Slot(idx),
265                Err(_) => CompiledExpr::Dynamic(expr),
266            },
267            Expr::IsNull(e) => CompiledExpr::IsNull(Box::new(Self::compile(e, col_map))),
268            Expr::IsNotNull(e) => CompiledExpr::IsNotNull(Box::new(Self::compile(e, col_map))),
269            Expr::UnaryOp { op, expr: e } => CompiledExpr::UnaryOp {
270                op: *op,
271                operand: Box::new(Self::compile(e, col_map)),
272            },
273            Expr::BinaryOp { left, op, right } => CompiledExpr::BinaryOp {
274                collation: compile_collation(left, right, col_map),
275                left: Box::new(Self::compile(left, col_map)),
276                op: *op,
277                right: Box::new(Self::compile(right, col_map)),
278            },
279            _ => CompiledExpr::Dynamic(expr),
280        }
281    }
282
283    pub(crate) fn eval(&self, ctx: &EvalCtx) -> Result<Value> {
284        match self {
285            CompiledExpr::Const(v) => Ok(v.clone()),
286            CompiledExpr::Slot(i) => Ok(ctx.row[*i].clone()),
287            CompiledExpr::Param(n) => resolve_parameter(*n, ctx.params),
288            CompiledExpr::IsNull(e) => Ok(Value::Boolean(e.eval(ctx)?.is_null())),
289            CompiledExpr::IsNotNull(e) => Ok(Value::Boolean(!e.eval(ctx)?.is_null())),
290            CompiledExpr::UnaryOp { op, operand } => {
291                let val = operand.eval(ctx)?;
292                eval_unary_op(*op, &val)
293            }
294            CompiledExpr::BinaryOp {
295                left,
296                op,
297                right,
298                collation,
299            } => {
300                let lval = left.eval(ctx)?;
301                let rval = right.eval(ctx)?;
302                if let Some(coll) = collation {
303                    if let Some(b) = eval_text_compare(&lval, *op, &rval, *coll) {
304                        return Ok(Value::Boolean(b));
305                    }
306                }
307                eval_binary_op(&lval, *op, &rval)
308            }
309            CompiledExpr::Dynamic(e) => eval_expr(e, ctx),
310        }
311    }
312}
313
314fn compile_collation(
315    left: &Expr,
316    right: &Expr,
317    col_map: &ColumnMap,
318) -> Option<crate::types::Collation> {
319    let needs_check = col_map.has_non_binary_collation()
320        || matches!(left, Expr::Collate { .. })
321        || matches!(right, Expr::Collate { .. });
322    if !needs_check {
323        return None;
324    }
325    let coll = collation_of(left)
326        .or_else(|| collation_of(right))
327        .or_else(|| column_collation(left, col_map))
328        .or_else(|| column_collation(right, col_map))?;
329    (coll != crate::types::Collation::Binary).then_some(coll)
330}
331
332pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
333    match expr {
334        Expr::Literal(v) => Ok(v.clone()),
335
336        Expr::Column(name) => {
337            let idx = ctx.col_map.resolve(name)?;
338            Ok(ctx.row[idx].clone())
339        }
340
341        Expr::QualifiedColumn { table, column } => {
342            if let Some(excluded) = ctx.excluded.as_ref() {
343                if table.eq_ignore_ascii_case("excluded") {
344                    let lowered = column.to_ascii_lowercase();
345                    let idx = excluded.col_map.resolve(&lowered)?;
346                    return Ok(excluded.row[idx].clone());
347                }
348            }
349            if let Some(on) = ctx.old_new.as_ref() {
350                if table.eq_ignore_ascii_case("old") {
351                    let lowered = column.to_ascii_lowercase();
352                    let idx = on.col_map.resolve(&lowered)?;
353                    return Ok(on.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
354                }
355                if table.eq_ignore_ascii_case("new") {
356                    let lowered = column.to_ascii_lowercase();
357                    let idx = on.col_map.resolve(&lowered)?;
358                    return Ok(on.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
359                }
360            }
361            // Trigger-body fallback: nested executor calls don't carry `ctx.old_new`.
362            if table.eq_ignore_ascii_case("old") || table.eq_ignore_ascii_case("new") {
363                if let Some(b) = crate::executor::triggers::current_bindings() {
364                    let lowered = column.to_ascii_lowercase();
365                    if table.eq_ignore_ascii_case("old") {
366                        let cm = ColumnMap::new(&b.old_columns);
367                        let idx = cm.resolve(&lowered)?;
368                        return Ok(b.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
369                    } else {
370                        let cm = ColumnMap::new(&b.new_columns);
371                        let idx = cm.resolve(&lowered)?;
372                        return Ok(b.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
373                    }
374                }
375            }
376            let idx = ctx.col_map.resolve_qualified(table, column)?;
377            Ok(ctx.row[idx].clone())
378        }
379
380        Expr::BinaryOp { left, op, right } => {
381            let lval = eval_expr(left, ctx)?;
382            let rval = eval_expr(right, ctx)?;
383            let needs_collation_check = ctx.col_map.has_non_binary_collation()
384                || matches!(left.as_ref(), Expr::Collate { .. })
385                || matches!(right.as_ref(), Expr::Collate { .. });
386            if needs_collation_check {
387                let coll = collation_of(left)
388                    .or_else(|| collation_of(right))
389                    .or_else(|| {
390                        column_collation(left, ctx.col_map)
391                            .or_else(|| column_collation(right, ctx.col_map))
392                    });
393                if let Some(c) = coll {
394                    if c != crate::types::Collation::Binary {
395                        if let Some(b) = eval_text_compare(&lval, *op, &rval, c) {
396                            return Ok(Value::Boolean(b));
397                        }
398                    }
399                }
400            }
401            eval_binary_op(&lval, *op, &rval)
402        }
403
404        Expr::UnaryOp { op, expr } => {
405            let val = eval_expr(expr, ctx)?;
406            eval_unary_op(*op, &val)
407        }
408
409        Expr::IsNull(e) => {
410            let val = eval_expr(e, ctx)?;
411            Ok(Value::Boolean(val.is_null()))
412        }
413
414        Expr::IsNotNull(e) => {
415            let val = eval_expr(e, ctx)?;
416            Ok(Value::Boolean(!val.is_null()))
417        }
418
419        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
420
421        Expr::CountStar => Err(SqlError::Unsupported(
422            "COUNT(*) in non-aggregate context".into(),
423        )),
424
425        Expr::InList {
426            expr: e,
427            list,
428            negated,
429        } => {
430            let lhs = eval_expr(e, ctx)?;
431            eval_in_values(&lhs, list, ctx, *negated)
432        }
433
434        Expr::InSet {
435            expr: e,
436            values,
437            has_null,
438            negated,
439        } => {
440            let lhs = eval_expr(e, ctx)?;
441            eval_in_set(&lhs, values, *has_null, *negated)
442        }
443
444        Expr::Between {
445            expr: e,
446            low,
447            high,
448            negated,
449        } => {
450            let val = eval_expr(e, ctx)?;
451            let lo = eval_expr(low, ctx)?;
452            let hi = eval_expr(high, ctx)?;
453            eval_between(&val, &lo, &hi, *negated)
454        }
455
456        Expr::Like {
457            expr: e,
458            pattern,
459            escape,
460            negated,
461        } => {
462            let val = eval_expr(e, ctx)?;
463            let pat = eval_expr(pattern, ctx)?;
464            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
465            eval_like(&val, &pat, esc.as_ref(), *negated)
466        }
467
468        Expr::Case {
469            operand,
470            conditions,
471            else_result,
472        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
473
474        Expr::Coalesce(args) => {
475            for arg in args {
476                let val = eval_expr(arg, ctx)?;
477                if !val.is_null() {
478                    return Ok(val);
479                }
480            }
481            Ok(Value::Null)
482        }
483
484        Expr::Cast { expr: e, data_type } => {
485            let val = eval_expr(e, ctx)?;
486            eval_cast(&val, *data_type)
487        }
488
489        Expr::Collate { expr: e, .. } => eval_expr(e, ctx),
490
491        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
492            SqlError::Unsupported("subquery not materialized (internal error)".into()),
493        ),
494
495        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
496
497        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
498            "window functions are only allowed in SELECT columns".into(),
499        )),
500
501        Expr::TypedNullRecord(_) => Ok(Value::Null),
502
503        Expr::ArrayLiteral(elems) => {
504            let mut out = Vec::with_capacity(elems.len());
505            for e in elems {
506                out.push(eval_expr(e, ctx)?);
507            }
508            Ok(Value::Array(std::sync::Arc::new(out)))
509        }
510
511        Expr::Quantified {
512            left,
513            op,
514            quantifier,
515            right,
516        } => eval_quantified(left, *op, *quantifier, right, ctx),
517    }
518}
519
520fn eval_quantified(
521    left: &Expr,
522    op: crate::parser::BinOp,
523    quantifier: crate::parser::Quantifier,
524    right: &crate::parser::QuantifiedRhs,
525    ctx: &EvalCtx,
526) -> Result<Value> {
527    use crate::parser::{QuantifiedRhs, Quantifier};
528    let lhs = eval_expr(left, ctx)?;
529    let elems: Vec<Value> = match right {
530        QuantifiedRhs::Array(e) => match eval_expr(e, ctx)? {
531            Value::Array(a) => (*a).clone(),
532            Value::Null => return Ok(Value::Null),
533            other => {
534                return Err(SqlError::TypeMismatch {
535                    expected: "ARRAY".into(),
536                    got: other.data_type().to_string(),
537                });
538            }
539        },
540        QuantifiedRhs::Subquery(_) => {
541            return Err(SqlError::Unsupported(
542                "ANY/ALL subquery not materialized (internal error)".into(),
543            ));
544        }
545    };
546
547    if lhs.is_null() {
548        return if elems.is_empty() {
549            match quantifier {
550                Quantifier::Any => Ok(Value::Boolean(false)),
551                Quantifier::All => Ok(Value::Boolean(true)),
552            }
553        } else {
554            Ok(Value::Null)
555        };
556    }
557
558    let mut any_unknown = false;
559    let mut any_match = false;
560    let mut any_mismatch = false;
561    for elem in &elems {
562        if elem.is_null() {
563            any_unknown = true;
564            continue;
565        }
566        let result = eval_binary_compare(&lhs, op, elem)?;
567        match result {
568            Value::Boolean(true) => any_match = true,
569            Value::Boolean(false) => any_mismatch = true,
570            Value::Null => any_unknown = true,
571            _ => {
572                return Err(SqlError::TypeMismatch {
573                    expected: "BOOLEAN".into(),
574                    got: result.data_type().to_string(),
575                });
576            }
577        }
578    }
579
580    match quantifier {
581        Quantifier::Any => {
582            if any_match {
583                Ok(Value::Boolean(true))
584            } else if any_unknown {
585                Ok(Value::Null)
586            } else {
587                Ok(Value::Boolean(false))
588            }
589        }
590        Quantifier::All => {
591            if any_mismatch {
592                Ok(Value::Boolean(false))
593            } else if any_unknown {
594                Ok(Value::Null)
595            } else {
596                Ok(Value::Boolean(true))
597            }
598        }
599    }
600}
601
602fn eval_binary_compare(left: &Value, op: crate::parser::BinOp, right: &Value) -> Result<Value> {
603    use crate::parser::BinOp;
604    if left.is_null() || right.is_null() {
605        return Ok(Value::Null);
606    }
607    let cmp = match (left, right) {
608        (Value::Text(a), Value::Text(b)) => Some(a.cmp(b)),
609        _ => left.partial_cmp(right),
610    };
611    let Some(cmp) = cmp else {
612        return Ok(Value::Null);
613    };
614    use std::cmp::Ordering;
615    let result = match op {
616        BinOp::Eq => cmp == Ordering::Equal,
617        BinOp::NotEq => cmp != Ordering::Equal,
618        BinOp::Lt => cmp == Ordering::Less,
619        BinOp::Gt => cmp == Ordering::Greater,
620        BinOp::LtEq => cmp != Ordering::Greater,
621        BinOp::GtEq => cmp != Ordering::Less,
622        _ => {
623            return Err(SqlError::Unsupported(format!(
624                "ANY/ALL comparison op {op:?}"
625            )));
626        }
627    };
628    Ok(Value::Boolean(result))
629}
630
631fn collation_of(expr: &Expr) -> Option<crate::types::Collation> {
632    match expr {
633        Expr::Collate { collation, .. } => Some(*collation),
634        _ => None,
635    }
636}
637
638fn column_collation(expr: &Expr, col_map: &ColumnMap) -> Option<crate::types::Collation> {
639    match expr {
640        Expr::Column(name) => col_map.resolve(name).ok().map(|i| col_map.collation_at(i)),
641        Expr::QualifiedColumn { table, column } => col_map
642            .resolve_qualified(table, column)
643            .ok()
644            .map(|i| col_map.collation_at(i)),
645        _ => None,
646    }
647}
648
649fn eval_text_compare(
650    left: &Value,
651    op: BinOp,
652    right: &Value,
653    coll: crate::types::Collation,
654) -> Option<bool> {
655    let (a, b) = match (left, right) {
656        (Value::Null, _) | (_, Value::Null) => return None,
657        (Value::Text(a), Value::Text(b)) => (a.as_str(), b.as_str()),
658        _ => return None,
659    };
660    let ord = coll.cmp_text(a, b);
661    Some(match op {
662        BinOp::Eq => ord == std::cmp::Ordering::Equal,
663        BinOp::NotEq => ord != std::cmp::Ordering::Equal,
664        BinOp::Lt => ord == std::cmp::Ordering::Less,
665        BinOp::Gt => ord == std::cmp::Ordering::Greater,
666        BinOp::LtEq => ord != std::cmp::Ordering::Greater,
667        BinOp::GtEq => ord != std::cmp::Ordering::Less,
668        _ => return None,
669    })
670}
671
672pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
673    eval_binary_op(left, op, right)
674}
675
676fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
677    match op {
678        BinOp::And => return eval_and(left, right),
679        BinOp::Or => return eval_or(left, right),
680        _ => {}
681    }
682
683    if left.is_null() || right.is_null() {
684        return Ok(Value::Null);
685    }
686
687    if let Some(res) = eval_temporal_op(left, op, right) {
688        return res;
689    }
690
691    match op {
692        BinOp::Eq => Ok(Value::Boolean(left == right)),
693        BinOp::NotEq => Ok(Value::Boolean(left != right)),
694        BinOp::Lt => Ok(Value::Boolean(left < right)),
695        BinOp::Gt => Ok(Value::Boolean(left > right)),
696        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
697        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
698        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
699        BinOp::Sub => match left {
700            Value::Json(_) | Value::Jsonb(_) => crate::json::op_delete_one(left, right),
701            _ => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
702        },
703        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
704        BinOp::Div => {
705            match right {
706                Value::Integer(0) => return Err(SqlError::DivisionByZero),
707                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
708                _ => {}
709            }
710            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
711        }
712        BinOp::Mod => {
713            match right {
714                Value::Integer(0) => return Err(SqlError::DivisionByZero),
715                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
716                _ => {}
717            }
718            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
719        }
720        BinOp::Concat => match (left, right) {
721            (Value::TsVector(a), Value::TsVector(b)) => crate::fts::op_concat(a, b),
722            (Value::Json(_) | Value::Jsonb(_), _) | (_, Value::Json(_) | Value::Jsonb(_)) => {
723                crate::json::op_concat(left, right)
724            }
725            _ => {
726                let ls = value_to_text(left);
727                let rs = value_to_text(right);
728                Ok(Value::Text(format!("{ls}{rs}").into()))
729            }
730        },
731        BinOp::JsonGet
732        | BinOp::JsonGetText
733        | BinOp::JsonPath
734        | BinOp::JsonPathText
735        | BinOp::JsonContains
736        | BinOp::JsonContainedBy
737        | BinOp::JsonHasKey
738        | BinOp::JsonHasAnyKey
739        | BinOp::JsonHasAllKeys
740        | BinOp::JsonDeletePath
741        | BinOp::JsonPathExists
742        | BinOp::JsonPathMatch
743        | BinOp::JsonPathExistsTz
744        | BinOp::JsonPathMatchTz => eval_json_binary_op(left, op, right),
745        BinOp::VectorL2 => eval_vector_distance(left, right, VectorMetric::L2),
746        BinOp::VectorInner => eval_vector_distance(left, right, VectorMetric::Inner),
747        BinOp::VectorCosine => eval_vector_distance(left, right, VectorMetric::Cosine),
748        BinOp::And | BinOp::Or => unreachable!(),
749    }
750}
751
752#[cold]
753fn eval_json_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
754    match op {
755        BinOp::JsonGet => crate::json::op_get(left, right),
756        BinOp::JsonGetText => crate::json::op_get_text(left, right),
757        BinOp::JsonPath => crate::json::op_path(left, right),
758        BinOp::JsonPathText => crate::json::op_path_text(left, right),
759        BinOp::JsonContains => crate::json::op_contains(left, right),
760        BinOp::JsonContainedBy => crate::json::op_contained_by(left, right),
761        BinOp::JsonHasKey => crate::json::op_has_key(left, right),
762        BinOp::JsonHasAnyKey => crate::json::op_has_any_key(left, right),
763        BinOp::JsonHasAllKeys => crate::json::op_has_all_keys(left, right),
764        BinOp::JsonDeletePath => crate::json::op_delete_path(left, right),
765        BinOp::JsonPathExists => crate::json::op_path_exists(left, right),
766        BinOp::JsonPathMatch => eval_at_at(left, right),
767        BinOp::JsonPathExistsTz => {
768            crate::json::fn_jsonb_path_exists_tz(&[left.clone(), right.clone()])
769        }
770        BinOp::JsonPathMatchTz => {
771            crate::json::fn_jsonb_path_match_tz(&[left.clone(), right.clone()])
772        }
773        BinOp::VectorL2 => eval_vector_distance(left, right, VectorMetric::L2),
774        BinOp::VectorInner => eval_vector_distance(left, right, VectorMetric::Inner),
775        BinOp::VectorCosine => eval_vector_distance(left, right, VectorMetric::Cosine),
776        _ => unreachable!(),
777    }
778}
779
780#[derive(Copy, Clone)]
781enum VectorMetric {
782    L2,
783    Inner,
784    Cosine,
785}
786
787fn eval_vector_distance(left: &Value, right: &Value, metric: VectorMetric) -> Result<Value> {
788    if left.is_null() || right.is_null() {
789        return Ok(Value::Null);
790    }
791    let (a, b) = match (left, right) {
792        (Value::Vector(a), Value::Vector(b)) => (a.as_ref(), b.as_ref()),
793        _ => {
794            return Err(SqlError::TypeMismatch {
795                expected: "VECTOR".into(),
796                got: format!("{} vs {}", left.data_type(), right.data_type()),
797            });
798        }
799    };
800    if a.len() != b.len() {
801        return Err(SqlError::InvalidValue(format!(
802            "vector dimension mismatch: {} vs {}",
803            a.len(),
804            b.len()
805        )));
806    }
807    let d = match metric {
808        VectorMetric::L2 => {
809            let mut sum = 0.0f64;
810            for (x, y) in a.iter().zip(b.iter()) {
811                let diff = (*x as f64) - (*y as f64);
812                sum += diff * diff;
813            }
814            sum.sqrt()
815        }
816        VectorMetric::Inner => {
817            let mut sum = 0.0f64;
818            for (x, y) in a.iter().zip(b.iter()) {
819                sum += (*x as f64) * (*y as f64);
820            }
821            -sum
822        }
823        VectorMetric::Cosine => {
824            let mut dot = 0.0f64;
825            let mut na = 0.0f64;
826            let mut nb = 0.0f64;
827            for (x, y) in a.iter().zip(b.iter()) {
828                let xf = *x as f64;
829                let yf = *y as f64;
830                dot += xf * yf;
831                na += xf * xf;
832                nb += yf * yf;
833            }
834            let denom = na.sqrt() * nb.sqrt();
835            if denom == 0.0 {
836                return Ok(Value::Null);
837            }
838            1.0 - dot / denom
839        }
840    };
841    Ok(Value::Real(d))
842}
843
844fn eval_at_at(left: &Value, right: &Value) -> Result<Value> {
845    use crate::types::DataType as D;
846    if left.is_null() || right.is_null() {
847        return Ok(Value::Null);
848    }
849    match (left.data_type(), right.data_type()) {
850        (D::Json | D::Jsonb, D::Text) => crate::json::op_path_match(left, right),
851        (D::TsVector, D::TsQuery) => match (left, right) {
852            (Value::TsVector(v), Value::TsQuery(q)) => crate::fts::op_match(v, q),
853            _ => unreachable!(),
854        },
855        (D::TsQuery, D::TsVector) => match (left, right) {
856            (Value::TsQuery(q), Value::TsVector(v)) => crate::fts::op_match(v, q),
857            _ => unreachable!(),
858        },
859        (D::Text, D::TsQuery) => {
860            let s = match left {
861                Value::Text(s) => s.as_str(),
862                _ => unreachable!(),
863            };
864            let lhs = crate::fts::fn_to_tsvector(s)?;
865            eval_at_at(&lhs, right)
866        }
867        (D::TsVector, D::Text) => {
868            let s = match right {
869                Value::Text(s) => s.as_str(),
870                _ => unreachable!(),
871            };
872            let rhs = crate::fts::fn_plainto_tsquery(s)?;
873            eval_at_at(left, &rhs)
874        }
875        (D::Text, D::Text) => {
876            let ls = match left {
877                Value::Text(s) => s.as_str(),
878                _ => unreachable!(),
879            };
880            let rs = match right {
881                Value::Text(s) => s.as_str(),
882                _ => unreachable!(),
883            };
884            let lhs = crate::fts::fn_to_tsvector(ls)?;
885            let rhs = crate::fts::fn_plainto_tsquery(rs)?;
886            eval_at_at(&lhs, &rhs)
887        }
888        (lt, rt) => Err(SqlError::TypeMismatch {
889            expected: "JSONB @@ text, tsvector @@ tsquery".into(),
890            got: format!("{lt} @@ {rt}"),
891        }),
892    }
893}
894
895/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
896fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
897    use crate::datetime as dt;
898    use std::cmp::Ordering;
899
900    let is_temporal = |v: &Value| {
901        matches!(
902            v,
903            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
904        )
905    };
906    if !is_temporal(left) && !is_temporal(right) {
907        return None;
908    }
909    if matches!(op, BinOp::Add | BinOp::Sub)
910        && ((is_temporal(left) && matches!(right, Value::Real(_)))
911            || (matches!(left, Value::Real(_)) && is_temporal(right)))
912    {
913        return Some(Err(SqlError::TypeMismatch {
914            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
915            got: format!("{} and {}", left.data_type(), right.data_type()),
916        }));
917    }
918
919    match (left, op, right) {
920        (Value::Date(d), BinOp::Add, Value::Integer(n))
921        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
922            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
923        }
924        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
925            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
926        }
927        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
928            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
929        }
930        // DATE ± INTERVAL → TIMESTAMP (PG rule).
931        (
932            Value::Date(d),
933            BinOp::Add,
934            Value::Interval {
935                months,
936                days,
937                micros,
938            },
939        )
940        | (
941            Value::Interval {
942                months,
943                days,
944                micros,
945            },
946            BinOp::Add,
947            Value::Date(d),
948        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
949        (
950            Value::Date(d),
951            BinOp::Sub,
952            Value::Interval {
953                months,
954                days,
955                micros,
956            },
957        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
958        (
959            Value::Timestamp(t),
960            BinOp::Add,
961            Value::Interval {
962                months,
963                days,
964                micros,
965            },
966        )
967        | (
968            Value::Interval {
969                months,
970                days,
971                micros,
972            },
973            BinOp::Add,
974            Value::Timestamp(t),
975        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
976        (
977            Value::Timestamp(t),
978            BinOp::Sub,
979            Value::Interval {
980                months,
981                days,
982                micros,
983            },
984        ) => Some(
985            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
986        ),
987        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
988            let (days, micros) = dt::subtract_timestamps(*a, *b);
989            Some(Ok(Value::Interval {
990                months: 0,
991                days,
992                micros,
993            }))
994        }
995        (
996            Value::Time(t),
997            BinOp::Add,
998            Value::Interval {
999                months,
1000                days,
1001                micros,
1002            },
1003        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
1004        (
1005            Value::Time(t),
1006            BinOp::Sub,
1007            Value::Interval {
1008                months,
1009                days,
1010                micros,
1011            },
1012        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
1013        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
1014            months: 0,
1015            days: 0,
1016            micros: *a - *b,
1017        })),
1018        (
1019            Value::Interval {
1020                months: am,
1021                days: ad,
1022                micros: au,
1023            },
1024            BinOp::Add,
1025            Value::Interval {
1026                months: bm,
1027                days: bd,
1028                micros: bu,
1029            },
1030        ) => Some(Ok(Value::Interval {
1031            months: am.saturating_add(*bm),
1032            days: ad.saturating_add(*bd),
1033            micros: au.saturating_add(*bu),
1034        })),
1035        (
1036            Value::Interval {
1037                months: am,
1038                days: ad,
1039                micros: au,
1040            },
1041            BinOp::Sub,
1042            Value::Interval {
1043                months: bm,
1044                days: bd,
1045                micros: bu,
1046            },
1047        ) => Some(Ok(Value::Interval {
1048            months: am.saturating_sub(*bm),
1049            days: ad.saturating_sub(*bd),
1050            micros: au.saturating_sub(*bu),
1051        })),
1052        (
1053            Value::Interval {
1054                months,
1055                days,
1056                micros,
1057            },
1058            BinOp::Mul,
1059            Value::Integer(n),
1060        )
1061        | (
1062            Value::Integer(n),
1063            BinOp::Mul,
1064            Value::Interval {
1065                months,
1066                days,
1067                micros,
1068            },
1069        ) => {
1070            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
1071            Some(Ok(Value::Interval {
1072                months: months.saturating_mul(n32),
1073                days: days.saturating_mul(n32),
1074                micros: micros.saturating_mul(*n),
1075            }))
1076        }
1077        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
1078        (
1079            Value::Interval {
1080                months,
1081                days,
1082                micros,
1083            },
1084            BinOp::Mul,
1085            Value::Real(r),
1086        )
1087        | (
1088            Value::Real(r),
1089            BinOp::Mul,
1090            Value::Interval {
1091                months,
1092                days,
1093                micros,
1094            },
1095        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
1096        (
1097            Value::Interval {
1098                months,
1099                days,
1100                micros,
1101            },
1102            BinOp::Div,
1103            Value::Integer(n),
1104        ) if *n != 0 => Some(Ok(Value::Interval {
1105            months: (*months as i64 / *n) as i32,
1106            days: (*days as i64 / *n) as i32,
1107            micros: *micros / *n,
1108        })),
1109        (
1110            Value::Interval {
1111                months,
1112                days,
1113                micros,
1114            },
1115            BinOp::Div,
1116            Value::Real(r),
1117        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
1118        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
1119        (
1120            Value::Interval {
1121                months: am,
1122                days: ad,
1123                micros: au,
1124            },
1125            op,
1126            Value::Interval {
1127                months: bm,
1128                days: bd,
1129                micros: bu,
1130            },
1131        ) if matches!(
1132            op,
1133            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
1134        ) =>
1135        {
1136            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
1137            let b = match op {
1138                BinOp::Eq => ord == Ordering::Equal,
1139                BinOp::NotEq => ord != Ordering::Equal,
1140                BinOp::Lt => ord == Ordering::Less,
1141                BinOp::Gt => ord == Ordering::Greater,
1142                BinOp::LtEq => ord != Ordering::Greater,
1143                BinOp::GtEq => ord != Ordering::Less,
1144                _ => unreachable!(),
1145            };
1146            Some(Ok(Value::Boolean(b)))
1147        }
1148        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
1149        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
1150        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
1151            Some(Err(SqlError::TypeMismatch {
1152                expected: "INTERVAL (use CAST or explicit unit)".into(),
1153                got: format!("{} and {}", left.data_type(), right.data_type()),
1154            }))
1155        }
1156        // Comparison with one temporal side: coerce the literal to that type.
1157        (l, op, r)
1158            if matches!(
1159                op,
1160                BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
1161            ) =>
1162        {
1163            temporal_compare(l, op, r)
1164        }
1165        _ => None,
1166    }
1167}
1168
1169/// Compares values where one side is temporal, coercing the other to match.
1170fn temporal_compare(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
1171    let (a, b) = coerce_temporal_pair(left, right)?;
1172    let ord = a.cmp(&b);
1173    use std::cmp::Ordering;
1174    let result = match op {
1175        BinOp::Eq => ord == Ordering::Equal,
1176        BinOp::NotEq => ord != Ordering::Equal,
1177        BinOp::Lt => ord == Ordering::Less,
1178        BinOp::Gt => ord == Ordering::Greater,
1179        BinOp::LtEq => ord != Ordering::Greater,
1180        BinOp::GtEq => ord != Ordering::Less,
1181        _ => return None,
1182    };
1183    Some(Ok(Value::Boolean(result)))
1184}
1185
1186/// Coerces a TEXT/INTEGER (or DATE/TIMESTAMP) operand to match the temporal side.
1187fn coerce_temporal_pair(left: &Value, right: &Value) -> Option<(Value, Value)> {
1188    use crate::types::DataType;
1189    let temporal_type = |v: &Value| match v {
1190        Value::Date(_) => Some(DataType::Date),
1191        Value::Time(_) => Some(DataType::Time),
1192        Value::Timestamp(_) => Some(DataType::Timestamp),
1193        Value::Interval { .. } => Some(DataType::Interval),
1194        _ => None,
1195    };
1196    match (temporal_type(left), temporal_type(right)) {
1197        (Some(DataType::Date), Some(DataType::Timestamp))
1198        | (Some(DataType::Timestamp), Some(DataType::Date)) => Some((
1199            left.clone().coerce_into(DataType::Timestamp)?,
1200            right.clone().coerce_into(DataType::Timestamp)?,
1201        )),
1202        (Some(_), Some(_)) => None,
1203        (Some(t), None) => {
1204            let coerced = right.clone().coerce_into(t)?;
1205            Some((left.clone(), coerced))
1206        }
1207        (None, Some(t)) => {
1208            let coerced = left.clone().coerce_into(t)?;
1209            Some((coerced, right.clone()))
1210        }
1211        (None, None) => None,
1212    }
1213}
1214
1215/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
1216fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
1217    let raw_months = months as f64 * factor;
1218    let whole_months = raw_months.trunc() as i64;
1219    let frac_months = raw_months - whole_months as f64;
1220    let months_frac_as_days = frac_months * 30.0;
1221
1222    let raw_days = days as f64 * factor + months_frac_as_days;
1223    let whole_days = raw_days.trunc() as i64;
1224    let frac_days = raw_days - whole_days as f64;
1225    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
1226
1227    let raw_micros = (micros as f64 * factor).round() as i64;
1228    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
1229
1230    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
1231    Value::Interval {
1232        months: clamp_i32(whole_months),
1233        days: clamp_i32(whole_days),
1234        micros: total_micros,
1235    }
1236}
1237
1238/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
1239fn eval_and(left: &Value, right: &Value) -> Result<Value> {
1240    let l = to_bool_or_null(left)?;
1241    let r = to_bool_or_null(right)?;
1242    match (l, r) {
1243        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
1244        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
1245        _ => Ok(Value::Null),
1246    }
1247}
1248
1249/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
1250fn eval_or(left: &Value, right: &Value) -> Result<Value> {
1251    let l = to_bool_or_null(left)?;
1252    let r = to_bool_or_null(right)?;
1253    match (l, r) {
1254        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
1255        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
1256        _ => Ok(Value::Null),
1257    }
1258}
1259
1260fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
1261    match val {
1262        Value::Boolean(b) => Ok(Some(*b)),
1263        Value::Null => Ok(None),
1264        Value::Integer(i) => Ok(Some(*i != 0)),
1265        _ => Err(SqlError::TypeMismatch {
1266            expected: "BOOLEAN".into(),
1267            got: format!("{}", val.data_type()),
1268        }),
1269    }
1270}
1271
1272fn eval_arithmetic(
1273    left: &Value,
1274    right: &Value,
1275    int_op: fn(i64, i64) -> Option<i64>,
1276    real_op: fn(f64, f64) -> f64,
1277) -> Result<Value> {
1278    match (left, right) {
1279        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
1280            .map(Value::Integer)
1281            .ok_or(SqlError::IntegerOverflow),
1282        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
1283        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
1284        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
1285        _ => Err(SqlError::TypeMismatch {
1286            expected: "numeric".into(),
1287            got: format!("{} and {}", left.data_type(), right.data_type()),
1288        }),
1289    }
1290}
1291
1292fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
1293    if list.is_empty() {
1294        return Ok(Value::Boolean(negated));
1295    }
1296    if lhs.is_null() {
1297        return Ok(Value::Null);
1298    }
1299    let mut has_null = false;
1300    for item in list {
1301        let rhs = eval_expr(item, ctx)?;
1302        if rhs.is_null() {
1303            has_null = true;
1304        } else if lhs == &rhs {
1305            return Ok(Value::Boolean(!negated));
1306        }
1307    }
1308    if has_null {
1309        Ok(Value::Null)
1310    } else {
1311        Ok(Value::Boolean(negated))
1312    }
1313}
1314
1315fn eval_in_set(
1316    lhs: &Value,
1317    values: &rustc_hash::FxHashSet<Value>,
1318    has_null: bool,
1319    negated: bool,
1320) -> Result<Value> {
1321    if values.is_empty() && !has_null {
1322        return Ok(Value::Boolean(negated));
1323    }
1324    if lhs.is_null() {
1325        return Ok(Value::Null);
1326    }
1327    if values.contains(lhs) {
1328        return Ok(Value::Boolean(!negated));
1329    }
1330    if has_null {
1331        Ok(Value::Null)
1332    } else {
1333        Ok(Value::Boolean(negated))
1334    }
1335}
1336
1337fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
1338    if val.is_null() {
1339        return Ok(Value::Null);
1340    }
1341    match op {
1342        UnaryOp::Neg => match val {
1343            Value::Integer(i) => i
1344                .checked_neg()
1345                .map(Value::Integer)
1346                .ok_or(SqlError::IntegerOverflow),
1347            Value::Real(r) => Ok(Value::Real(-r)),
1348            Value::Interval {
1349                months,
1350                days,
1351                micros,
1352            } => {
1353                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1354                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1355                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1356                Ok(Value::Interval {
1357                    months: m,
1358                    days: d,
1359                    micros: u,
1360                })
1361            }
1362            _ => Err(SqlError::TypeMismatch {
1363                expected: "numeric or INTERVAL".into(),
1364                got: format!("{}", val.data_type()),
1365            }),
1366        },
1367        UnaryOp::Not => match val {
1368            Value::Boolean(b) => Ok(Value::Boolean(!b)),
1369            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
1370            _ => Err(SqlError::TypeMismatch {
1371                expected: "BOOLEAN".into(),
1372                got: format!("{}", val.data_type()),
1373            }),
1374        },
1375    }
1376}
1377
1378fn value_to_text(val: &Value) -> String {
1379    match val {
1380        Value::Text(s) => s.to_string(),
1381        Value::Integer(i) => i.to_string(),
1382        Value::Real(r) => {
1383            if r.fract() == 0.0 && r.is_finite() {
1384                format!("{r:.1}")
1385            } else {
1386                format!("{r}")
1387            }
1388        }
1389        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
1390        Value::Null => String::new(),
1391        Value::Blob(b) => {
1392            let mut s = String::with_capacity(b.len() * 2);
1393            for byte in b {
1394                s.push_str(&format!("{byte:02X}"));
1395            }
1396            s
1397        }
1398        Value::Date(d) => crate::datetime::format_date(*d),
1399        Value::Time(t) => crate::datetime::format_time(*t),
1400        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
1401        Value::Interval {
1402            months,
1403            days,
1404            micros,
1405        } => crate::datetime::format_interval(*months, *days, *micros),
1406        Value::Json(s) => s.to_string(),
1407        Value::Jsonb(b) => crate::json::decode_to_text(b).unwrap_or_default(),
1408        Value::TsVector(b) => crate::fts::tsvector_display(b),
1409        Value::TsQuery(b) => crate::fts::tsquery_display(b),
1410        Value::Array(_) => val.to_string(),
1411        Value::Vector(_) => val.to_string(),
1412    }
1413}
1414
1415fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
1416    // BETWEEN routed through eval_binary_op so comparison logic lives in one place.
1417    let ge = eval_binary_op(val, BinOp::GtEq, low)?;
1418    let le = eval_binary_op(val, BinOp::LtEq, high)?;
1419
1420    let result = match (as_bool(&ge), as_bool(&le)) {
1421        (Some(false), _) | (_, Some(false)) => Some(false),
1422        (Some(true), Some(true)) => Some(true),
1423        _ => None,
1424    };
1425
1426    match result {
1427        Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
1428        None => Ok(Value::Null),
1429    }
1430}
1431
1432fn as_bool(v: &Value) -> Option<bool> {
1433    match v {
1434        Value::Boolean(b) => Some(*b),
1435        _ => None,
1436    }
1437}
1438
1439const MAX_LIKE_PATTERN_LEN: usize = 10_000;
1440
1441fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
1442    if val.is_null() || pattern.is_null() {
1443        return Ok(Value::Null);
1444    }
1445    let text = match val {
1446        Value::Text(s) => s.as_str(),
1447        _ => {
1448            return Err(SqlError::TypeMismatch {
1449                expected: "TEXT".into(),
1450                got: val.data_type().to_string(),
1451            })
1452        }
1453    };
1454    let pat = match pattern {
1455        Value::Text(s) => s.as_str(),
1456        _ => {
1457            return Err(SqlError::TypeMismatch {
1458                expected: "TEXT".into(),
1459                got: pattern.data_type().to_string(),
1460            })
1461        }
1462    };
1463
1464    if pat.len() > MAX_LIKE_PATTERN_LEN {
1465        return Err(SqlError::InvalidValue(format!(
1466            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
1467            pat.len()
1468        )));
1469    }
1470
1471    let esc_char = match escape {
1472        Some(Value::Text(s)) => {
1473            let mut chars = s.chars();
1474            let c = chars.next().ok_or_else(|| {
1475                SqlError::InvalidValue("ESCAPE must be a single character".into())
1476            })?;
1477            if chars.next().is_some() {
1478                return Err(SqlError::InvalidValue(
1479                    "ESCAPE must be a single character".into(),
1480                ));
1481            }
1482            Some(c)
1483        }
1484        Some(Value::Null) => return Ok(Value::Null),
1485        Some(_) => {
1486            return Err(SqlError::TypeMismatch {
1487                expected: "TEXT".into(),
1488                got: "non-text".into(),
1489            })
1490        }
1491        None => None,
1492    };
1493
1494    let matched = like_match(text, pat, esc_char);
1495    Ok(Value::Boolean(if negated { !matched } else { matched }))
1496}
1497
1498fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
1499    let t: Vec<char> = text.chars().collect();
1500    let p: Vec<char> = pattern.chars().collect();
1501    like_match_impl(&t, &p, 0, 0, escape)
1502}
1503
1504fn like_match_impl(
1505    t: &[char],
1506    p: &[char],
1507    mut ti: usize,
1508    mut pi: usize,
1509    esc: Option<char>,
1510) -> bool {
1511    let mut star_pi: Option<usize> = None;
1512    let mut star_ti: usize = 0;
1513
1514    while ti < t.len() {
1515        if pi < p.len() {
1516            if let Some(ec) = esc {
1517                if p[pi] == ec && pi + 1 < p.len() {
1518                    pi += 1;
1519                    let pc_lower = p[pi].to_ascii_lowercase();
1520                    let tc_lower = t[ti].to_ascii_lowercase();
1521                    if pc_lower == tc_lower {
1522                        pi += 1;
1523                        ti += 1;
1524                        continue;
1525                    } else if let Some(sp) = star_pi {
1526                        pi = sp + 1;
1527                        star_ti += 1;
1528                        ti = star_ti;
1529                        continue;
1530                    } else {
1531                        return false;
1532                    }
1533                }
1534            }
1535            if p[pi] == '%' {
1536                star_pi = Some(pi);
1537                star_ti = ti;
1538                pi += 1;
1539                continue;
1540            }
1541            if p[pi] == '_' {
1542                pi += 1;
1543                ti += 1;
1544                continue;
1545            }
1546            if p[pi].eq_ignore_ascii_case(&t[ti]) {
1547                pi += 1;
1548                ti += 1;
1549                continue;
1550            }
1551        }
1552        if let Some(sp) = star_pi {
1553            pi = sp + 1;
1554            star_ti += 1;
1555            ti = star_ti;
1556        } else {
1557            return false;
1558        }
1559    }
1560
1561    while pi < p.len() && p[pi] == '%' {
1562        pi += 1;
1563    }
1564    pi == p.len()
1565}
1566
1567fn eval_case(
1568    operand: Option<&Expr>,
1569    conditions: &[(Expr, Expr)],
1570    else_result: Option<&Expr>,
1571    ctx: &EvalCtx,
1572) -> Result<Value> {
1573    if let Some(op_expr) = operand {
1574        let op_val = eval_expr(op_expr, ctx)?;
1575        for (cond, result) in conditions {
1576            let cond_val = eval_expr(cond, ctx)?;
1577            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
1578                return eval_expr(result, ctx);
1579            }
1580        }
1581    } else {
1582        for (cond, result) in conditions {
1583            let cond_val = eval_expr(cond, ctx)?;
1584            if is_truthy(&cond_val) {
1585                return eval_expr(result, ctx);
1586            }
1587        }
1588    }
1589    match else_result {
1590        Some(e) => eval_expr(e, ctx),
1591        None => Ok(Value::Null),
1592    }
1593}
1594
1595pub(crate) fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
1596    if val.is_null() {
1597        return Ok(Value::Null);
1598    }
1599    match target {
1600        DataType::Integer => match val {
1601            Value::Integer(_) => Ok(val.clone()),
1602            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1603            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1604            Value::Text(s) => s
1605                .trim()
1606                .parse::<i64>()
1607                .map(Value::Integer)
1608                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1609                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1610            _ => Err(SqlError::InvalidValue(format!(
1611                "cannot cast {} to INTEGER",
1612                val.data_type()
1613            ))),
1614        },
1615        DataType::Real => match val {
1616            Value::Real(_) => Ok(val.clone()),
1617            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1618            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1619            Value::Text(s) => s
1620                .trim()
1621                .parse::<f64>()
1622                .map(Value::Real)
1623                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1624            _ => Err(SqlError::InvalidValue(format!(
1625                "cannot cast {} to REAL",
1626                val.data_type()
1627            ))),
1628        },
1629        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1630        DataType::Boolean => match val {
1631            Value::Boolean(_) => Ok(val.clone()),
1632            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1633            Value::Text(s) => {
1634                let lower = s.trim().to_ascii_lowercase();
1635                match lower.as_str() {
1636                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1637                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1638                    _ => Err(SqlError::InvalidValue(format!(
1639                        "cannot cast '{s}' to BOOLEAN"
1640                    ))),
1641                }
1642            }
1643            _ => Err(SqlError::InvalidValue(format!(
1644                "cannot cast {} to BOOLEAN",
1645                val.data_type()
1646            ))),
1647        },
1648        DataType::Blob => match val {
1649            Value::Blob(_) => Ok(val.clone()),
1650            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1651            _ => Err(SqlError::InvalidValue(format!(
1652                "cannot cast {} to BLOB",
1653                val.data_type()
1654            ))),
1655        },
1656        DataType::Null => Ok(Value::Null),
1657        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1658            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1659        }),
1660        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1661            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1662        }),
1663        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1664            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1665        }),
1666        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1667            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1668        }),
1669        DataType::Json => val.clone().coerce_into(DataType::Json).ok_or_else(|| {
1670            SqlError::InvalidValue(format!("cannot cast {} to JSON", val.data_type()))
1671        }),
1672        DataType::Jsonb => val.clone().coerce_into(DataType::Jsonb).ok_or_else(|| {
1673            SqlError::InvalidValue(format!("cannot cast {} to JSONB", val.data_type()))
1674        }),
1675        DataType::TsVector => val.clone().coerce_into(DataType::TsVector).ok_or_else(|| {
1676            SqlError::InvalidValue(format!("cannot cast {} to TSVECTOR", val.data_type()))
1677        }),
1678        DataType::TsQuery => val.clone().coerce_into(DataType::TsQuery).ok_or_else(|| {
1679            SqlError::InvalidValue(format!("cannot cast {} to TSQUERY", val.data_type()))
1680        }),
1681        DataType::Array => val.clone().coerce_into(DataType::Array).ok_or_else(|| {
1682            SqlError::InvalidValue(format!("cannot cast {} to ARRAY", val.data_type()))
1683        }),
1684        DataType::Vector { dim } => match val {
1685            Value::Vector(v) if v.len() as u16 == dim => Ok(val.clone()),
1686            Value::Vector(_) => Err(SqlError::InvalidValue(format!(
1687                "cannot cast {} to VECTOR({dim}) (dim mismatch)",
1688                val.data_type()
1689            ))),
1690            Value::Text(s) => parse_vector_literal(s.as_str(), dim).map(Value::Vector),
1691            _ => Err(SqlError::InvalidValue(format!(
1692                "cannot cast {} to VECTOR({dim})",
1693                val.data_type()
1694            ))),
1695        },
1696    }
1697}
1698
1699fn parse_vector_literal(s: &str, expected_dim: u16) -> Result<std::sync::Arc<[f32]>> {
1700    let trimmed = s.trim();
1701    let inner = trimmed
1702        .strip_prefix('[')
1703        .and_then(|s| s.strip_suffix(']'))
1704        .unwrap_or(trimmed);
1705    let mut out: Vec<f32> = Vec::with_capacity(expected_dim as usize);
1706    for tok in inner.split(',') {
1707        let tok = tok.trim();
1708        if tok.is_empty() {
1709            continue;
1710        }
1711        let x: f32 = tok
1712            .parse()
1713            .map_err(|_| SqlError::InvalidValue(format!("invalid vector element: '{tok}'")))?;
1714        out.push(x);
1715    }
1716    if out.len() as u16 != expected_dim {
1717        return Err(SqlError::InvalidValue(format!(
1718            "vector literal has {} elements, expected {expected_dim}",
1719            out.len()
1720        )));
1721    }
1722    Ok(std::sync::Arc::from(out.into_boxed_slice()))
1723}
1724
1725/// Scalar functions whose result can differ across statements for identical
1726/// arguments (wall clock, RNG, tzdb). Single source of truth for volatility;
1727/// any function added to `eval_scalar_function` MUST be classified here.
1728pub(crate) fn is_volatile_function(name_upper: &str, argc: usize) -> bool {
1729    match name_upper {
1730        "RANDOM"
1731        | "RAND"
1732        | "NOW"
1733        | "CURRENT_TIMESTAMP"
1734        | "LOCALTIMESTAMP"
1735        | "CURRENT_DATE"
1736        | "CURRENT_TIME"
1737        | "LOCALTIME"
1738        | "CLOCK_TIMESTAMP"
1739        | "STATEMENT_TIMESTAMP"
1740        | "TRANSACTION_TIMESTAMP"
1741        | "AT_TIMEZONE" => true,
1742        // 1-arg AGE measures from the current clock; 2-arg AGE is pure.
1743        "AGE" => argc == 1,
1744        _ => false,
1745    }
1746}
1747
1748/// Name volatility plus the DATE/TIME/DATETIME clock forms (zero-arg or a
1749/// literal 'now'). A non-literal arg can reach 'now' via TEXT data at runtime;
1750/// callers needing that guarantee must exclude those shapes themselves.
1751pub(crate) fn is_volatile_function_expr(name_upper: &str, args: &[Expr]) -> bool {
1752    if is_volatile_function(name_upper, args.len()) {
1753        return true;
1754    }
1755    matches!(name_upper, "DATE" | "TIME" | "DATETIME")
1756        && match args.first() {
1757            None => true,
1758            Some(Expr::Literal(Value::Text(s))) => s.trim().eq_ignore_ascii_case("now"),
1759            Some(_) => false,
1760        }
1761}
1762
1763fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1764    let evaluated: Vec<Value> = args
1765        .iter()
1766        .map(|a| eval_expr(a, ctx))
1767        .collect::<Result<Vec<_>>>()?;
1768
1769    match name {
1770        "LENGTH" => {
1771            check_args(name, &evaluated, 1)?;
1772            match &evaluated[0] {
1773                Value::Null => Ok(Value::Null),
1774                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1775                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1776                Value::TsVector(b) => crate::fts::fn_length_tsvector(b),
1777                _ => Ok(Value::Integer(
1778                    value_to_text(&evaluated[0]).chars().count() as i64
1779                )),
1780            }
1781        }
1782        "UPPER" => {
1783            check_args(name, &evaluated, 1)?;
1784            match &evaluated[0] {
1785                Value::Null => Ok(Value::Null),
1786                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1787                _ => Ok(Value::Text(
1788                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1789                )),
1790            }
1791        }
1792        "LOWER" => {
1793            check_args(name, &evaluated, 1)?;
1794            match &evaluated[0] {
1795                Value::Null => Ok(Value::Null),
1796                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1797                _ => Ok(Value::Text(
1798                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1799                )),
1800            }
1801        }
1802        "SUBSTR" | "SUBSTRING" => {
1803            if evaluated.len() < 2 || evaluated.len() > 3 {
1804                return Err(SqlError::InvalidValue(format!(
1805                    "{name} requires 2 or 3 arguments"
1806                )));
1807            }
1808            if evaluated.iter().any(|v| v.is_null()) {
1809                return Ok(Value::Null);
1810            }
1811            let s = value_to_text(&evaluated[0]);
1812            let chars: Vec<char> = s.chars().collect();
1813            let start = match &evaluated[1] {
1814                Value::Integer(i) => *i,
1815                _ => {
1816                    return Err(SqlError::TypeMismatch {
1817                        expected: "INTEGER".into(),
1818                        got: evaluated[1].data_type().to_string(),
1819                    })
1820                }
1821            };
1822            let len = chars.len() as i64;
1823
1824            let (begin, count) = if evaluated.len() == 3 {
1825                let cnt = match &evaluated[2] {
1826                    Value::Integer(i) => *i,
1827                    _ => {
1828                        return Err(SqlError::TypeMismatch {
1829                            expected: "INTEGER".into(),
1830                            got: evaluated[2].data_type().to_string(),
1831                        })
1832                    }
1833                };
1834                if start >= 1 {
1835                    let b = (start - 1).min(len) as usize;
1836                    let c = cnt.max(0) as usize;
1837                    (b, c)
1838                } else if start == 0 {
1839                    let c = (cnt - 1).max(0) as usize;
1840                    (0usize, c)
1841                } else {
1842                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1843                    (0usize, adjusted_cnt)
1844                }
1845            } else if start >= 1 {
1846                let b = (start - 1).min(len) as usize;
1847                (b, chars.len() - b)
1848            } else if start == 0 {
1849                (0usize, chars.len())
1850            } else {
1851                let b = (len + start).max(0) as usize;
1852                (b, chars.len() - b)
1853            };
1854
1855            let result: String = chars.iter().skip(begin).take(count).collect();
1856            Ok(Value::Text(result.into()))
1857        }
1858        "TRIM" | "LTRIM" | "RTRIM" => {
1859            if evaluated.is_empty() || evaluated.len() > 2 {
1860                return Err(SqlError::InvalidValue(format!(
1861                    "{name} requires 1 or 2 arguments"
1862                )));
1863            }
1864            if evaluated[0].is_null() {
1865                return Ok(Value::Null);
1866            }
1867            let s = value_to_text(&evaluated[0]);
1868            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1869                if evaluated[1].is_null() {
1870                    return Ok(Value::Null);
1871                }
1872                value_to_text(&evaluated[1]).chars().collect()
1873            } else {
1874                vec![' ']
1875            };
1876            let result = match name {
1877                "TRIM" => s
1878                    .trim_matches(|c: char| trim_chars.contains(&c))
1879                    .to_string(),
1880                "LTRIM" => s
1881                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1882                    .to_string(),
1883                "RTRIM" => s
1884                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1885                    .to_string(),
1886                _ => unreachable!(),
1887            };
1888            Ok(Value::Text(result.into()))
1889        }
1890        "REPLACE" => {
1891            check_args(name, &evaluated, 3)?;
1892            if evaluated.iter().any(|v| v.is_null()) {
1893                return Ok(Value::Null);
1894            }
1895            let s = value_to_text(&evaluated[0]);
1896            let from = value_to_text(&evaluated[1]);
1897            let to = value_to_text(&evaluated[2]);
1898            if from.is_empty() {
1899                return Ok(Value::Text(s.into()));
1900            }
1901            Ok(Value::Text(s.replace(&from, &to).into()))
1902        }
1903        "INSTR" => {
1904            check_args(name, &evaluated, 2)?;
1905            if evaluated.iter().any(|v| v.is_null()) {
1906                return Ok(Value::Null);
1907            }
1908            let haystack = value_to_text(&evaluated[0]);
1909            let needle = value_to_text(&evaluated[1]);
1910            let pos = haystack
1911                .find(&needle)
1912                .map(|i| haystack[..i].chars().count() as i64 + 1)
1913                .unwrap_or(0);
1914            Ok(Value::Integer(pos))
1915        }
1916        "CONCAT" => {
1917            if evaluated.is_empty() {
1918                return Ok(Value::Text(CompactString::default()));
1919            }
1920            let mut result = String::new();
1921            for v in &evaluated {
1922                match v {
1923                    Value::Null => {}
1924                    _ => result.push_str(&value_to_text(v)),
1925                }
1926            }
1927            Ok(Value::Text(result.into()))
1928        }
1929        "ABS" => {
1930            check_args(name, &evaluated, 1)?;
1931            match &evaluated[0] {
1932                Value::Null => Ok(Value::Null),
1933                Value::Integer(i) => i
1934                    .checked_abs()
1935                    .map(Value::Integer)
1936                    .ok_or(SqlError::IntegerOverflow),
1937                Value::Real(r) => Ok(Value::Real(r.abs())),
1938                _ => Err(SqlError::TypeMismatch {
1939                    expected: "numeric".into(),
1940                    got: evaluated[0].data_type().to_string(),
1941                }),
1942            }
1943        }
1944        "ROUND" => {
1945            if evaluated.is_empty() || evaluated.len() > 2 {
1946                return Err(SqlError::InvalidValue(
1947                    "ROUND requires 1 or 2 arguments".into(),
1948                ));
1949            }
1950            if evaluated[0].is_null() {
1951                return Ok(Value::Null);
1952            }
1953            let val = match &evaluated[0] {
1954                Value::Integer(i) => *i as f64,
1955                Value::Real(r) => *r,
1956                _ => {
1957                    return Err(SqlError::TypeMismatch {
1958                        expected: "numeric".into(),
1959                        got: evaluated[0].data_type().to_string(),
1960                    })
1961                }
1962            };
1963            let places = if evaluated.len() == 2 {
1964                match &evaluated[1] {
1965                    Value::Null => return Ok(Value::Null),
1966                    Value::Integer(i) => *i,
1967                    _ => {
1968                        return Err(SqlError::TypeMismatch {
1969                            expected: "INTEGER".into(),
1970                            got: evaluated[1].data_type().to_string(),
1971                        })
1972                    }
1973                }
1974            } else {
1975                0
1976            };
1977            let factor = 10f64.powi(places as i32);
1978            let rounded = (val * factor).round() / factor;
1979            Ok(Value::Real(rounded))
1980        }
1981        "CEIL" | "CEILING" => {
1982            check_args(name, &evaluated, 1)?;
1983            match &evaluated[0] {
1984                Value::Null => Ok(Value::Null),
1985                Value::Integer(i) => Ok(Value::Integer(*i)),
1986                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1987                _ => Err(SqlError::TypeMismatch {
1988                    expected: "numeric".into(),
1989                    got: evaluated[0].data_type().to_string(),
1990                }),
1991            }
1992        }
1993        "FLOOR" => {
1994            check_args(name, &evaluated, 1)?;
1995            match &evaluated[0] {
1996                Value::Null => Ok(Value::Null),
1997                Value::Integer(i) => Ok(Value::Integer(*i)),
1998                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1999                _ => Err(SqlError::TypeMismatch {
2000                    expected: "numeric".into(),
2001                    got: evaluated[0].data_type().to_string(),
2002                }),
2003            }
2004        }
2005        "SIGN" => {
2006            check_args(name, &evaluated, 1)?;
2007            match &evaluated[0] {
2008                Value::Null => Ok(Value::Null),
2009                Value::Integer(i) => Ok(Value::Integer(i.signum())),
2010                Value::Real(r) => {
2011                    if *r > 0.0 {
2012                        Ok(Value::Integer(1))
2013                    } else if *r < 0.0 {
2014                        Ok(Value::Integer(-1))
2015                    } else {
2016                        Ok(Value::Integer(0))
2017                    }
2018                }
2019                _ => Err(SqlError::TypeMismatch {
2020                    expected: "numeric".into(),
2021                    got: evaluated[0].data_type().to_string(),
2022                }),
2023            }
2024        }
2025        "SQRT" => {
2026            check_args(name, &evaluated, 1)?;
2027            match &evaluated[0] {
2028                Value::Null => Ok(Value::Null),
2029                Value::Integer(i) => {
2030                    if *i < 0 {
2031                        Ok(Value::Null)
2032                    } else {
2033                        Ok(Value::Real((*i as f64).sqrt()))
2034                    }
2035                }
2036                Value::Real(r) => {
2037                    if *r < 0.0 {
2038                        Ok(Value::Null)
2039                    } else {
2040                        Ok(Value::Real(r.sqrt()))
2041                    }
2042                }
2043                _ => Err(SqlError::TypeMismatch {
2044                    expected: "numeric".into(),
2045                    got: evaluated[0].data_type().to_string(),
2046                }),
2047            }
2048        }
2049        "RANDOM" => {
2050            check_args(name, &evaluated, 0)?;
2051            use std::collections::hash_map::DefaultHasher;
2052            use std::hash::{Hash, Hasher};
2053            let mut hasher = DefaultHasher::new();
2054            crate::datetime::now_micros().hash(&mut hasher);
2055            std::thread::current().id().hash(&mut hasher);
2056            let mut val = hasher.finish() as i64;
2057            if val == i64::MIN {
2058                val = i64::MAX;
2059            }
2060            Ok(Value::Integer(val))
2061        }
2062        "TYPEOF" => {
2063            check_args(name, &evaluated, 1)?;
2064            let type_name = match &evaluated[0] {
2065                Value::Null => "null",
2066                Value::Integer(_) => "integer",
2067                Value::Real(_) => "real",
2068                Value::Text(_) => "text",
2069                Value::Blob(_) => "blob",
2070                Value::Boolean(_) => "boolean",
2071                Value::Date(_) => "date",
2072                Value::Time(_) => "time",
2073                Value::Timestamp(_) => "timestamp",
2074                Value::Interval { .. } => "interval",
2075                Value::Json(_) => "json",
2076                Value::Jsonb(_) => "jsonb",
2077                Value::TsVector(_) => "tsvector",
2078                Value::TsQuery(_) => "tsquery",
2079                Value::Array(_) => "array",
2080                Value::Vector(_) => "vector",
2081            };
2082            Ok(Value::Text(type_name.into()))
2083        }
2084        "MIN" => {
2085            check_args(name, &evaluated, 2)?;
2086            if evaluated[0].is_null() {
2087                return Ok(evaluated[1].clone());
2088            }
2089            if evaluated[1].is_null() {
2090                return Ok(evaluated[0].clone());
2091            }
2092            if evaluated[0] <= evaluated[1] {
2093                Ok(evaluated[0].clone())
2094            } else {
2095                Ok(evaluated[1].clone())
2096            }
2097        }
2098        "MAX" => {
2099            check_args(name, &evaluated, 2)?;
2100            if evaluated[0].is_null() {
2101                return Ok(evaluated[1].clone());
2102            }
2103            if evaluated[1].is_null() {
2104                return Ok(evaluated[0].clone());
2105            }
2106            if evaluated[0] >= evaluated[1] {
2107                Ok(evaluated[0].clone())
2108            } else {
2109                Ok(evaluated[1].clone())
2110            }
2111        }
2112        "HEX" => {
2113            check_args(name, &evaluated, 1)?;
2114            match &evaluated[0] {
2115                Value::Null => Ok(Value::Null),
2116                Value::Blob(b) => {
2117                    let mut s = String::with_capacity(b.len() * 2);
2118                    for byte in b {
2119                        s.push_str(&format!("{byte:02X}"));
2120                    }
2121                    Ok(Value::Text(s.into()))
2122                }
2123                Value::Text(s) => {
2124                    let mut r = String::with_capacity(s.len() * 2);
2125                    for byte in s.as_bytes() {
2126                        r.push_str(&format!("{byte:02X}"));
2127                    }
2128                    Ok(Value::Text(r.into()))
2129                }
2130                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
2131            }
2132        }
2133        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
2134            check_args(name, &evaluated, 0)?;
2135            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
2136        }
2137        "CURRENT_DATE" => {
2138            check_args(name, &evaluated, 0)?;
2139            Ok(Value::Date(crate::datetime::ts_to_date_floor(
2140                crate::datetime::txn_or_clock_micros(),
2141            )))
2142        }
2143        "CURRENT_TIME" | "LOCALTIME" => {
2144            check_args(name, &evaluated, 0)?;
2145            Ok(Value::Time(
2146                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
2147            ))
2148        }
2149        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
2150            check_args(name, &evaluated, 0)?;
2151            let ts = match name {
2152                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
2153                _ => crate::datetime::txn_or_clock_micros(),
2154            };
2155            Ok(Value::Timestamp(ts))
2156        }
2157        "EXTRACT" | "DATE_PART" | "DATEPART" => {
2158            check_args(name, &evaluated, 2)?;
2159            let field: &str = match &evaluated[0] {
2160                Value::Null => return Ok(Value::Null),
2161                Value::Text(s) => s.as_str(),
2162                _ => {
2163                    return Err(SqlError::TypeMismatch {
2164                        expected: "TEXT field name".into(),
2165                        got: evaluated[0].data_type().to_string(),
2166                    })
2167                }
2168            };
2169            if evaluated[1].is_null() {
2170                return Ok(Value::Null);
2171            }
2172            crate::datetime::extract(field, &evaluated[1])
2173        }
2174        "DATE_TRUNC" => {
2175            if evaluated.len() < 2 || evaluated.len() > 3 {
2176                return Err(SqlError::InvalidValue(
2177                    "DATE_TRUNC requires 2 or 3 arguments".into(),
2178                ));
2179            }
2180            let unit = match &evaluated[0] {
2181                Value::Null => return Ok(Value::Null),
2182                Value::Text(s) => s.to_string(),
2183                _ => {
2184                    return Err(SqlError::TypeMismatch {
2185                        expected: "TEXT unit name".into(),
2186                        got: evaluated[0].data_type().to_string(),
2187                    })
2188                }
2189            };
2190            if evaluated[1].is_null() {
2191                return Ok(Value::Null);
2192            }
2193            // Optional tz arg: truncate in that zone, then convert back to UTC.
2194            if evaluated.len() == 3 {
2195                if let Value::Text(tz) = &evaluated[2] {
2196                    if !tz.eq_ignore_ascii_case("UTC") {
2197                        if let Value::Timestamp(ts) = &evaluated[1] {
2198                            return date_trunc_in_zone(&unit, *ts, tz);
2199                        }
2200                    }
2201                }
2202            }
2203            crate::datetime::date_trunc(&unit, &evaluated[1])
2204        }
2205        "DATE_BIN" => {
2206            check_args(name, &evaluated, 3)?;
2207            if evaluated.iter().any(|v| v.is_null()) {
2208                return Ok(Value::Null);
2209            }
2210            let stride = match &evaluated[0] {
2211                Value::Interval {
2212                    months: _,
2213                    days,
2214                    micros,
2215                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
2216                _ => {
2217                    return Err(SqlError::TypeMismatch {
2218                        expected: "INTERVAL stride".into(),
2219                        got: evaluated[0].data_type().to_string(),
2220                    })
2221                }
2222            };
2223            if stride <= 0 {
2224                return Err(SqlError::InvalidValue(
2225                    "DATE_BIN stride must be positive".into(),
2226                ));
2227            }
2228            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
2229                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
2230                _ => {
2231                    return Err(SqlError::TypeMismatch {
2232                        expected: "TIMESTAMP, TIMESTAMP".into(),
2233                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
2234                    })
2235                }
2236            };
2237            let diff = src - origin;
2238            let binned = origin + (diff.div_euclid(stride)) * stride;
2239            Ok(Value::Timestamp(binned))
2240        }
2241        "AGE" => {
2242            if evaluated.len() == 1 {
2243                if evaluated[0].is_null() {
2244                    return Ok(Value::Null);
2245                }
2246                let ts = match &evaluated[0] {
2247                    Value::Timestamp(t) => *t,
2248                    Value::Date(d) => crate::datetime::date_to_ts(*d),
2249                    _ => {
2250                        return Err(SqlError::TypeMismatch {
2251                            expected: "TIMESTAMP or DATE".into(),
2252                            got: evaluated[0].data_type().to_string(),
2253                        })
2254                    }
2255                };
2256                // Implicit reference: today at midnight UTC.
2257                let today = crate::datetime::today_days();
2258                let midnight = crate::datetime::date_to_ts(today);
2259                let (m, d, u) = crate::datetime::age(midnight, ts)?;
2260                return Ok(Value::Interval {
2261                    months: m,
2262                    days: d,
2263                    micros: u,
2264                });
2265            }
2266            check_args(name, &evaluated, 2)?;
2267            if evaluated.iter().any(|v| v.is_null()) {
2268                return Ok(Value::Null);
2269            }
2270            let a = ts_of(&evaluated[0])?;
2271            let b = ts_of(&evaluated[1])?;
2272            let (m, d, u) = crate::datetime::age(a, b)?;
2273            Ok(Value::Interval {
2274                months: m,
2275                days: d,
2276                micros: u,
2277            })
2278        }
2279        "MAKE_DATE" => {
2280            check_args(name, &evaluated, 3)?;
2281            if evaluated.iter().any(|v| v.is_null()) {
2282                return Ok(Value::Null);
2283            }
2284            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
2285            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
2286            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
2287            crate::datetime::ymd_to_days(y, m, d)
2288                .map(Value::Date)
2289                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
2290        }
2291        "MAKE_TIME" => {
2292            check_args(name, &evaluated, 3)?;
2293            if evaluated.iter().any(|v| v.is_null()) {
2294                return Ok(Value::Null);
2295            }
2296            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
2297            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
2298            let (s, us) = real_sec_arg(&evaluated[2])?;
2299            crate::datetime::hmsn_to_micros(h, mi, s, us)
2300                .map(Value::Time)
2301                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
2302        }
2303        "MAKE_TIMESTAMP" => {
2304            check_args(name, &evaluated, 6)?;
2305            if evaluated.iter().any(|v| v.is_null()) {
2306                return Ok(Value::Null);
2307            }
2308            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
2309            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
2310            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
2311            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
2312            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
2313            let (s, us) = real_sec_arg(&evaluated[5])?;
2314            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
2315                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
2316            })?;
2317            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
2318                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
2319            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
2320        }
2321        "MAKE_INTERVAL" => {
2322            // Positional args: years, months, weeks, days, hours, mins, secs.
2323            if evaluated.len() > 7 {
2324                return Err(SqlError::InvalidValue(
2325                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
2326                ));
2327            }
2328            let mut months: i64 = 0;
2329            let mut days: i64 = 0;
2330            let mut micros: i64 = 0;
2331            for (i, v) in evaluated.iter().enumerate() {
2332                if v.is_null() {
2333                    continue;
2334                }
2335                let n = match v {
2336                    Value::Integer(n) => *n,
2337                    Value::Real(r) => *r as i64,
2338                    _ => {
2339                        return Err(SqlError::TypeMismatch {
2340                            expected: "numeric".into(),
2341                            got: v.data_type().to_string(),
2342                        })
2343                    }
2344                };
2345                match i {
2346                    0 => months = months.saturating_add(n.saturating_mul(12)),
2347                    1 => months = months.saturating_add(n),
2348                    2 => days = days.saturating_add(n.saturating_mul(7)),
2349                    3 => days = days.saturating_add(n),
2350                    4 => {
2351                        micros = micros
2352                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
2353                    }
2354                    5 => {
2355                        micros =
2356                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
2357                    }
2358                    6 => {
2359                        // Seconds may be fractional — also check Real.
2360                        if let Value::Real(r) = v {
2361                            micros = micros.saturating_add(
2362                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
2363                            );
2364                        } else {
2365                            micros = micros
2366                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
2367                        }
2368                    }
2369                    _ => unreachable!(),
2370                }
2371            }
2372            Ok(Value::Interval {
2373                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
2374                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
2375                micros,
2376            })
2377        }
2378        "JUSTIFY_DAYS" => {
2379            check_args(name, &evaluated, 1)?;
2380            match &evaluated[0] {
2381                Value::Null => Ok(Value::Null),
2382                Value::Interval {
2383                    months,
2384                    days,
2385                    micros,
2386                } => {
2387                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
2388                    Ok(Value::Interval {
2389                        months: m,
2390                        days: d,
2391                        micros: u,
2392                    })
2393                }
2394                other => Err(SqlError::TypeMismatch {
2395                    expected: "INTERVAL".into(),
2396                    got: other.data_type().to_string(),
2397                }),
2398            }
2399        }
2400        "JUSTIFY_HOURS" => {
2401            check_args(name, &evaluated, 1)?;
2402            match &evaluated[0] {
2403                Value::Null => Ok(Value::Null),
2404                Value::Interval {
2405                    months,
2406                    days,
2407                    micros,
2408                } => {
2409                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
2410                    Ok(Value::Interval {
2411                        months: m,
2412                        days: d,
2413                        micros: u,
2414                    })
2415                }
2416                other => Err(SqlError::TypeMismatch {
2417                    expected: "INTERVAL".into(),
2418                    got: other.data_type().to_string(),
2419                }),
2420            }
2421        }
2422        "JUSTIFY_INTERVAL" => {
2423            check_args(name, &evaluated, 1)?;
2424            match &evaluated[0] {
2425                Value::Null => Ok(Value::Null),
2426                Value::Interval {
2427                    months,
2428                    days,
2429                    micros,
2430                } => {
2431                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
2432                    Ok(Value::Interval {
2433                        months: m,
2434                        days: d,
2435                        micros: u,
2436                    })
2437                }
2438                other => Err(SqlError::TypeMismatch {
2439                    expected: "INTERVAL".into(),
2440                    got: other.data_type().to_string(),
2441                }),
2442            }
2443        }
2444        "ISFINITE" => {
2445            check_args(name, &evaluated, 1)?;
2446            if evaluated[0].is_null() {
2447                return Ok(Value::Null);
2448            }
2449            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
2450        }
2451        "DATE" => {
2452            if evaluated.is_empty() {
2453                return Err(SqlError::InvalidValue(
2454                    "DATE requires at least 1 argument".into(),
2455                ));
2456            }
2457            if evaluated[0].is_null() {
2458                return Ok(Value::Null);
2459            }
2460            let d = match &evaluated[0] {
2461                Value::Date(d) => *d,
2462                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
2463                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
2464                Value::Text(s) => crate::datetime::parse_date(s)?,
2465                Value::Integer(n) => {
2466                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
2467                }
2468                other => {
2469                    return Err(SqlError::TypeMismatch {
2470                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2471                        got: other.data_type().to_string(),
2472                    })
2473                }
2474            };
2475            Ok(Value::Date(d))
2476        }
2477        "TIME" => {
2478            if evaluated.is_empty() {
2479                return Err(SqlError::InvalidValue(
2480                    "TIME requires at least 1 argument".into(),
2481                ));
2482            }
2483            if evaluated[0].is_null() {
2484                return Ok(Value::Null);
2485            }
2486            let t = match &evaluated[0] {
2487                Value::Time(t) => *t,
2488                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
2489                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
2490                    crate::datetime::current_time_micros()
2491                }
2492                Value::Text(s) => crate::datetime::parse_time(s)?,
2493                other => {
2494                    return Err(SqlError::TypeMismatch {
2495                        expected: "TIMESTAMP, TIME, or TEXT".into(),
2496                        got: other.data_type().to_string(),
2497                    })
2498                }
2499            };
2500            Ok(Value::Time(t))
2501        }
2502        "DATETIME" => {
2503            if evaluated.is_empty() {
2504                return Err(SqlError::InvalidValue(
2505                    "DATETIME requires at least 1 argument".into(),
2506                ));
2507            }
2508            if evaluated[0].is_null() {
2509                return Ok(Value::Null);
2510            }
2511            let t = match &evaluated[0] {
2512                Value::Timestamp(t) => *t,
2513                Value::Date(d) => crate::datetime::date_to_ts(*d),
2514                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
2515                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
2516                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
2517                other => {
2518                    return Err(SqlError::TypeMismatch {
2519                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2520                        got: other.data_type().to_string(),
2521                    })
2522                }
2523            };
2524            Ok(Value::Timestamp(t))
2525        }
2526        "STRFTIME" => {
2527            if evaluated.len() < 2 {
2528                return Err(SqlError::InvalidValue(
2529                    "STRFTIME requires format + value".into(),
2530                ));
2531            }
2532            if evaluated.iter().take(2).any(|v| v.is_null()) {
2533                return Ok(Value::Null);
2534            }
2535            let fmt = match &evaluated[0] {
2536                Value::Text(s) => s.to_string(),
2537                _ => {
2538                    return Err(SqlError::TypeMismatch {
2539                        expected: "TEXT format".into(),
2540                        got: evaluated[0].data_type().to_string(),
2541                    })
2542                }
2543            };
2544            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
2545            Ok(Value::Text(out.into()))
2546        }
2547        "JULIANDAY" => {
2548            if evaluated.is_empty() {
2549                return Err(SqlError::InvalidValue(
2550                    "JULIANDAY requires at least 1 argument".into(),
2551                ));
2552            }
2553            if evaluated[0].is_null() {
2554                return Ok(Value::Null);
2555            }
2556            let micros = ts_of(&evaluated[0])?;
2557            let (days, tmicros) = crate::datetime::ts_split(micros);
2558            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
2559            let julian =
2560                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
2561            Ok(Value::Real(julian))
2562        }
2563        "UNIXEPOCH" => {
2564            if evaluated.is_empty() {
2565                return Err(SqlError::InvalidValue(
2566                    "UNIXEPOCH requires at least 1 argument".into(),
2567                ));
2568            }
2569            if evaluated[0].is_null() {
2570                return Ok(Value::Null);
2571            }
2572            let micros = ts_of(&evaluated[0])?;
2573            let subsec = evaluated
2574                .get(1)
2575                .and_then(|v| {
2576                    if let Value::Text(s) = v {
2577                        Some(s.to_string())
2578                    } else {
2579                        None
2580                    }
2581                })
2582                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
2583                .unwrap_or(false);
2584            if subsec {
2585                Ok(Value::Real(
2586                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
2587                ))
2588            } else {
2589                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
2590            }
2591        }
2592        "TIMEDIFF" => {
2593            check_args(name, &evaluated, 2)?;
2594            if evaluated.iter().any(|v| v.is_null()) {
2595                return Ok(Value::Null);
2596            }
2597            let a = ts_of(&evaluated[0])?;
2598            let b = ts_of(&evaluated[1])?;
2599            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
2600            let sign = if days < 0 || (days == 0 && micros < 0) {
2601                "-"
2602            } else {
2603                "+"
2604            };
2605            let abs_days = days.unsigned_abs() as i64;
2606            let abs_us = micros.unsigned_abs() as i64;
2607            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
2608            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
2609            Ok(Value::Text(
2610                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
2611            ))
2612        }
2613        "AT_TIMEZONE" => {
2614            check_args(name, &evaluated, 2)?;
2615            if evaluated.iter().any(|v| v.is_null()) {
2616                return Ok(Value::Null);
2617            }
2618            let ts = match &evaluated[0] {
2619                Value::Timestamp(t) => *t,
2620                Value::Date(d) => crate::datetime::date_to_ts(*d),
2621                other => {
2622                    return Err(SqlError::TypeMismatch {
2623                        expected: "TIMESTAMP or DATE".into(),
2624                        got: other.data_type().to_string(),
2625                    })
2626                }
2627            };
2628            let zone = match &evaluated[1] {
2629                Value::Text(s) => s.to_string(),
2630                _ => {
2631                    return Err(SqlError::TypeMismatch {
2632                        expected: "TEXT time zone".into(),
2633                        got: evaluated[1].data_type().to_string(),
2634                    })
2635                }
2636            };
2637            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
2638            let upper = zone.to_ascii_uppercase();
2639            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
2640                return Err(SqlError::InvalidTimezone(format!(
2641                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
2642                )));
2643            }
2644            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
2645            Ok(Value::Text(formatted.into()))
2646        }
2647        "JSONB_TYPEOF" | "JSON_TYPEOF" => {
2648            check_args(name, &evaluated, 1)?;
2649            if evaluated[0].is_null() {
2650                return Ok(Value::Null);
2651            }
2652            crate::json::fn_typeof(&evaluated[0])
2653        }
2654        "JSONB_ARRAY_LENGTH" | "JSON_ARRAY_LENGTH" => {
2655            check_args(name, &evaluated, 1)?;
2656            if evaluated[0].is_null() {
2657                return Ok(Value::Null);
2658            }
2659            crate::json::fn_array_length(&evaluated[0])
2660        }
2661        "JSONB_OBJECT_LENGTH" | "JSON_OBJECT_LENGTH" => {
2662            check_args(name, &evaluated, 1)?;
2663            if evaluated[0].is_null() {
2664                return Ok(Value::Null);
2665            }
2666            crate::json::fn_object_length(&evaluated[0])
2667        }
2668        "JSONB_EXTRACT_PATH" | "JSON_EXTRACT_PATH" => {
2669            if evaluated.is_empty() {
2670                return Err(SqlError::InvalidValue(format!(
2671                    "{name} requires at least 1 argument"
2672                )));
2673            }
2674            if evaluated[0].is_null() {
2675                return Ok(Value::Null);
2676            }
2677            let target = if name.eq_ignore_ascii_case("JSONB_EXTRACT_PATH") {
2678                crate::types::DataType::Jsonb
2679            } else {
2680                crate::types::DataType::Json
2681            };
2682            crate::json::fn_extract_path(&evaluated, target, false)
2683        }
2684        "JSONB_EXTRACT_PATH_TEXT" | "JSON_EXTRACT_PATH_TEXT" => {
2685            if evaluated.is_empty() {
2686                return Err(SqlError::InvalidValue(format!(
2687                    "{name} requires at least 1 argument"
2688                )));
2689            }
2690            if evaluated[0].is_null() {
2691                return Ok(Value::Null);
2692            }
2693            crate::json::fn_extract_path(&evaluated, crate::types::DataType::Text, true)
2694        }
2695        "JSON_EXTRACT" => {
2696            check_args(name, &evaluated, 2)?;
2697            if evaluated[0].is_null() || evaluated[1].is_null() {
2698                return Ok(Value::Null);
2699            }
2700            crate::json::fn_sqlite_extract(&evaluated[0], &evaluated[1])
2701        }
2702        "JSON_VALID" => {
2703            check_args(name, &evaluated, 1)?;
2704            if evaluated[0].is_null() {
2705                return Ok(Value::Null);
2706            }
2707            crate::json::fn_valid(&evaluated[0])
2708        }
2709        "JSONB_STRIP_NULLS" | "JSON_STRIP_NULLS" => {
2710            check_args(name, &evaluated, 1)?;
2711            if evaluated[0].is_null() {
2712                return Ok(Value::Null);
2713            }
2714            let target = if name.eq_ignore_ascii_case("JSONB_STRIP_NULLS") {
2715                crate::types::DataType::Jsonb
2716            } else {
2717                crate::types::DataType::Json
2718            };
2719            crate::json::fn_strip_nulls(&evaluated[0], target)
2720        }
2721        "JSONB_PRETTY" | "JSON_PRETTY" => {
2722            check_args(name, &evaluated, 1)?;
2723            if evaluated[0].is_null() {
2724                return Ok(Value::Null);
2725            }
2726            crate::json::fn_pretty(&evaluated[0])
2727        }
2728        "JSONB_BUILD_OBJECT" | "JSON_BUILD_OBJECT" => {
2729            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_OBJECT") {
2730                crate::types::DataType::Jsonb
2731            } else {
2732                crate::types::DataType::Json
2733            };
2734            crate::json::fn_build_object(&evaluated, target)
2735        }
2736        "JSONB_BUILD_ARRAY" | "JSON_BUILD_ARRAY" => {
2737            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_ARRAY") {
2738                crate::types::DataType::Jsonb
2739            } else {
2740                crate::types::DataType::Json
2741            };
2742            crate::json::fn_build_array(&evaluated, target)
2743        }
2744        "JSONB_SET" | "JSON_SET" => {
2745            if !(3..=4).contains(&evaluated.len()) {
2746                return Err(SqlError::InvalidValue(format!(
2747                    "{name} requires 3 or 4 arguments"
2748                )));
2749            }
2750            if evaluated[0].is_null() {
2751                return Ok(Value::Null);
2752            }
2753            let target = if name.eq_ignore_ascii_case("JSONB_SET") {
2754                crate::types::DataType::Jsonb
2755            } else {
2756                crate::types::DataType::Json
2757            };
2758            let create_missing = evaluated
2759                .get(3)
2760                .map(|v| matches!(v, Value::Boolean(true)))
2761                .unwrap_or(true);
2762            crate::json::fn_set(
2763                &evaluated[0],
2764                &evaluated[1],
2765                &evaluated[2],
2766                create_missing,
2767                target,
2768            )
2769        }
2770        "JSONB_INSERT" | "JSON_INSERT" => {
2771            if !(3..=4).contains(&evaluated.len()) {
2772                return Err(SqlError::InvalidValue(format!(
2773                    "{name} requires 3 or 4 arguments"
2774                )));
2775            }
2776            if evaluated[0].is_null() {
2777                return Ok(Value::Null);
2778            }
2779            let target = if name.eq_ignore_ascii_case("JSONB_INSERT") {
2780                crate::types::DataType::Jsonb
2781            } else {
2782                crate::types::DataType::Json
2783            };
2784            let insert_after = evaluated
2785                .get(3)
2786                .map(|v| matches!(v, Value::Boolean(true)))
2787                .unwrap_or(false);
2788            crate::json::fn_insert(
2789                &evaluated[0],
2790                &evaluated[1],
2791                &evaluated[2],
2792                insert_after,
2793                target,
2794            )
2795        }
2796        "TO_JSONB" | "TO_JSON" => {
2797            check_args(name, &evaluated, 1)?;
2798            let target = if name.eq_ignore_ascii_case("TO_JSONB") {
2799                crate::types::DataType::Jsonb
2800            } else {
2801                crate::types::DataType::Json
2802            };
2803            crate::json::fn_to_json(&evaluated[0], target)
2804        }
2805        "ROW_TO_JSON" | "ROW_TO_JSONB" => {
2806            check_args(name, &evaluated, 1)?;
2807            let target = if name.eq_ignore_ascii_case("ROW_TO_JSONB") {
2808                crate::types::DataType::Jsonb
2809            } else {
2810                crate::types::DataType::Json
2811            };
2812            crate::json::fn_to_json(&evaluated[0], target)
2813        }
2814        "JSON_OBJECT" => crate::json::fn_json_object(&evaluated),
2815        "JSON_EXISTS" => {
2816            check_args(name, &evaluated, 2)?;
2817            if evaluated[0].is_null() || evaluated[1].is_null() {
2818                return Ok(Value::Null);
2819            }
2820            crate::json::fn_json_exists(&evaluated[0], &evaluated[1])
2821        }
2822        "JSON_VALUE" => {
2823            check_args(name, &evaluated, 2)?;
2824            if evaluated[0].is_null() || evaluated[1].is_null() {
2825                return Ok(Value::Null);
2826            }
2827            crate::json::fn_json_value(&evaluated[0], &evaluated[1])
2828        }
2829        "JSON_QUERY" => {
2830            check_args(name, &evaluated, 2)?;
2831            if evaluated[0].is_null() || evaluated[1].is_null() {
2832                return Ok(Value::Null);
2833            }
2834            crate::json::fn_json_query(&evaluated[0], &evaluated[1], crate::types::DataType::Jsonb)
2835        }
2836        "JSONB_PATH_EXISTS" => {
2837            if evaluated[0].is_null() || evaluated[1].is_null() {
2838                return Ok(Value::Null);
2839            }
2840            crate::json::fn_jsonb_path_exists(&evaluated)
2841        }
2842        "JSONB_PATH_MATCH" => {
2843            if evaluated[0].is_null() || evaluated[1].is_null() {
2844                return Ok(Value::Null);
2845            }
2846            crate::json::fn_jsonb_path_match(&evaluated)
2847        }
2848        "JSONB_PATH_QUERY_FIRST" => {
2849            if evaluated[0].is_null() || evaluated[1].is_null() {
2850                return Ok(Value::Null);
2851            }
2852            crate::json::fn_jsonb_path_query_first(&evaluated)
2853        }
2854        "JSONB_PATH_QUERY_ARRAY" => {
2855            if evaluated[0].is_null() || evaluated[1].is_null() {
2856                return Ok(Value::Null);
2857            }
2858            crate::json::fn_jsonb_path_query_array(&evaluated)
2859        }
2860        "JSONB_PATH_EXISTS_TZ" => {
2861            if evaluated[0].is_null() || evaluated[1].is_null() {
2862                return Ok(Value::Null);
2863            }
2864            crate::json::fn_jsonb_path_exists_tz(&evaluated)
2865        }
2866        "JSONB_PATH_MATCH_TZ" => {
2867            if evaluated[0].is_null() || evaluated[1].is_null() {
2868                return Ok(Value::Null);
2869            }
2870            crate::json::fn_jsonb_path_match_tz(&evaluated)
2871        }
2872        "JSONB_PATH_QUERY_TZ" => {
2873            if evaluated[0].is_null() || evaluated[1].is_null() {
2874                return Ok(Value::Null);
2875            }
2876            crate::json::fn_jsonb_path_query_tz(&evaluated)
2877        }
2878        "JSONB_PATH_QUERY_FIRST_TZ" => {
2879            if evaluated[0].is_null() || evaluated[1].is_null() {
2880                return Ok(Value::Null);
2881            }
2882            crate::json::fn_jsonb_path_query_first_tz(&evaluated)
2883        }
2884        "JSONB_PATH_QUERY_ARRAY_TZ" => {
2885            if evaluated[0].is_null() || evaluated[1].is_null() {
2886                return Ok(Value::Null);
2887            }
2888            crate::json::fn_jsonb_path_query_array_tz(&evaluated)
2889        }
2890        "JSONB_HAS_KEY" | "JSON_HAS_KEY" => {
2891            check_args(name, &evaluated, 2)?;
2892            if evaluated[0].is_null() || evaluated[1].is_null() {
2893                return Ok(Value::Null);
2894            }
2895            crate::json::op_has_key(&evaluated[0], &evaluated[1])
2896        }
2897        "JSONB_HAS_ANY_KEY" | "JSON_HAS_ANY_KEY" => {
2898            check_args(name, &evaluated, 2)?;
2899            if evaluated[0].is_null() || evaluated[1].is_null() {
2900                return Ok(Value::Null);
2901            }
2902            crate::json::op_has_any_key(&evaluated[0], &evaluated[1])
2903        }
2904        "JSONB_HAS_ALL_KEYS" | "JSON_HAS_ALL_KEYS" => {
2905            check_args(name, &evaluated, 2)?;
2906            if evaluated[0].is_null() || evaluated[1].is_null() {
2907                return Ok(Value::Null);
2908            }
2909            crate::json::op_has_all_keys(&evaluated[0], &evaluated[1])
2910        }
2911        "TO_TSVECTOR" => fts_to_tsvector(&evaluated),
2912        "TO_TSQUERY" => fts_to_tsquery(&evaluated),
2913        "PLAINTO_TSQUERY" => fts_plainto_tsquery(&evaluated),
2914        "PHRASETO_TSQUERY" => fts_phraseto_tsquery(&evaluated),
2915        "WEBSEARCH_TO_TSQUERY" => fts_websearch_to_tsquery(&evaluated),
2916        "TS_RANK" => fts_ts_rank(&evaluated, false),
2917        "TS_RANK_CD" => fts_ts_rank(&evaluated, true),
2918        "TS_HEADLINE" => fts_ts_headline(&evaluated),
2919        "TS_LEXIZE" => fts_ts_lexize(&evaluated),
2920        "NUMNODE" => fts_numnode(&evaluated),
2921        "SETWEIGHT" => fts_setweight(&evaluated),
2922        "STRIP" => fts_strip(&evaluated),
2923        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
2924    }
2925}
2926
2927fn fts_resolve_config_and_text(
2928    args: &[Value],
2929    fname: &str,
2930) -> Result<(crate::fts::TokenizerKind, String)> {
2931    if args.is_empty() || args.len() > 2 {
2932        return Err(SqlError::InvalidValue(format!(
2933            "{fname} requires 1 or 2 arguments"
2934        )));
2935    }
2936    let (config_name, text) = if args.len() == 2 {
2937        let cfg = match &args[0] {
2938            Value::Text(s) => Some(s.as_str().to_string()),
2939            v => {
2940                return Err(SqlError::TypeMismatch {
2941                    expected: "TEXT (config)".into(),
2942                    got: v.data_type().to_string(),
2943                })
2944            }
2945        };
2946        let txt = match &args[1] {
2947            Value::Text(s) => s.as_str().to_string(),
2948            v => {
2949                return Err(SqlError::TypeMismatch {
2950                    expected: "TEXT".into(),
2951                    got: v.data_type().to_string(),
2952                })
2953            }
2954        };
2955        (cfg, txt)
2956    } else {
2957        let txt = match &args[0] {
2958            Value::Text(s) => s.as_str().to_string(),
2959            v => {
2960                return Err(SqlError::TypeMismatch {
2961                    expected: "TEXT".into(),
2962                    got: v.data_type().to_string(),
2963                })
2964            }
2965        };
2966        (None, txt)
2967    };
2968    let kind = match config_name {
2969        Some(name) => crate::fts::TokenizerKind::from_name(&name)?,
2970        None => crate::fts::TokenizerKind::English,
2971    };
2972    Ok((kind, text))
2973}
2974
2975fn fts_to_tsvector(args: &[Value]) -> Result<Value> {
2976    if args.iter().any(|v| v.is_null()) {
2977        return Ok(Value::Null);
2978    }
2979    let (kind, text) = fts_resolve_config_and_text(args, "to_tsvector")?;
2980    crate::fts::fn_to_tsvector_with(kind, &text)
2981}
2982
2983fn fts_to_tsquery(args: &[Value]) -> Result<Value> {
2984    if args.iter().any(|v| v.is_null()) {
2985        return Ok(Value::Null);
2986    }
2987    let (kind, text) = fts_resolve_config_and_text(args, "to_tsquery")?;
2988    crate::fts::fn_to_tsquery_with(kind, &text)
2989}
2990
2991fn fts_plainto_tsquery(args: &[Value]) -> Result<Value> {
2992    if args.iter().any(|v| v.is_null()) {
2993        return Ok(Value::Null);
2994    }
2995    let (kind, text) = fts_resolve_config_and_text(args, "plainto_tsquery")?;
2996    crate::fts::fn_plainto_tsquery_with(kind, &text)
2997}
2998
2999fn fts_phraseto_tsquery(args: &[Value]) -> Result<Value> {
3000    if args.iter().any(|v| v.is_null()) {
3001        return Ok(Value::Null);
3002    }
3003    let (kind, text) = fts_resolve_config_and_text(args, "phraseto_tsquery")?;
3004    crate::fts::fn_phraseto_tsquery_with(kind, &text)
3005}
3006
3007fn fts_websearch_to_tsquery(args: &[Value]) -> Result<Value> {
3008    if args.iter().any(|v| v.is_null()) {
3009        return Ok(Value::Null);
3010    }
3011    let (kind, text) = fts_resolve_config_and_text(args, "websearch_to_tsquery")?;
3012    crate::fts::fn_websearch_to_tsquery_with(kind, &text)
3013}
3014
3015fn fts_ts_rank(args: &[Value], cover_density: bool) -> Result<Value> {
3016    let fname = if cover_density {
3017        "ts_rank_cd"
3018    } else {
3019        "ts_rank"
3020    };
3021    if args.len() != 2 && args.len() != 3 {
3022        return Err(SqlError::InvalidValue(format!(
3023            "{fname} requires 2 or 3 arguments"
3024        )));
3025    }
3026    if args[0].is_null() || args[1].is_null() {
3027        return Ok(Value::Null);
3028    }
3029    let tsv = match &args[0] {
3030        Value::TsVector(b) => b,
3031        v => {
3032            return Err(SqlError::TypeMismatch {
3033                expected: "TSVECTOR".into(),
3034                got: v.data_type().to_string(),
3035            })
3036        }
3037    };
3038    let tsq = match &args[1] {
3039        Value::TsQuery(b) => b,
3040        v => {
3041            return Err(SqlError::TypeMismatch {
3042                expected: "TSQUERY".into(),
3043                got: v.data_type().to_string(),
3044            })
3045        }
3046    };
3047    let norm = if args.len() == 3 {
3048        match &args[2] {
3049            Value::Integer(n) => *n,
3050            Value::Null => return Ok(Value::Null),
3051            v => {
3052                return Err(SqlError::TypeMismatch {
3053                    expected: "INTEGER (norm)".into(),
3054                    got: v.data_type().to_string(),
3055                })
3056            }
3057        }
3058    } else {
3059        0
3060    };
3061    if cover_density {
3062        crate::fts::fn_ts_rank_cd(tsv, tsq, norm)
3063    } else {
3064        crate::fts::fn_ts_rank(tsv, tsq, norm)
3065    }
3066}
3067
3068fn fts_ts_headline(args: &[Value]) -> Result<Value> {
3069    if args.len() < 2 || args.len() > 4 {
3070        return Err(SqlError::InvalidValue(
3071            "ts_headline requires 2 to 4 arguments".into(),
3072        ));
3073    }
3074    if args.iter().any(|v| v.is_null()) {
3075        return Ok(Value::Null);
3076    }
3077    let kind = if args.len() >= 3 {
3078        match &args[0] {
3079            Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
3080            v => {
3081                return Err(SqlError::TypeMismatch {
3082                    expected: "TEXT (config)".into(),
3083                    got: v.data_type().to_string(),
3084                })
3085            }
3086        }
3087    } else {
3088        crate::fts::TokenizerKind::English
3089    };
3090    let text_idx = if args.len() >= 3 { 1 } else { 0 };
3091    let tsq_idx = if args.len() >= 3 { 2 } else { 1 };
3092    let text = match args.get(text_idx) {
3093        Some(Value::Text(s)) => s.as_str(),
3094        _ => {
3095            return Err(SqlError::TypeMismatch {
3096                expected: "TEXT".into(),
3097                got: "non-text".into(),
3098            })
3099        }
3100    };
3101    let tsq = match args.get(tsq_idx) {
3102        Some(Value::TsQuery(b)) => b.as_ref(),
3103        _ => {
3104            return Err(SqlError::TypeMismatch {
3105                expected: "TSQUERY".into(),
3106                got: "non-tsquery".into(),
3107            })
3108        }
3109    };
3110    crate::fts::fn_ts_headline_with(kind, text, tsq)
3111}
3112
3113fn fts_ts_lexize(args: &[Value]) -> Result<Value> {
3114    if args.len() != 2 {
3115        return Err(SqlError::InvalidValue(
3116            "ts_lexize requires 2 arguments (config, word)".into(),
3117        ));
3118    }
3119    if args.iter().any(|v| v.is_null()) {
3120        return Ok(Value::Null);
3121    }
3122    let kind = match &args[0] {
3123        Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
3124        v => {
3125            return Err(SqlError::TypeMismatch {
3126                expected: "TEXT (config)".into(),
3127                got: v.data_type().to_string(),
3128            })
3129        }
3130    };
3131    let word = match &args[1] {
3132        Value::Text(s) => s.as_str(),
3133        v => {
3134            return Err(SqlError::TypeMismatch {
3135                expected: "TEXT (word)".into(),
3136                got: v.data_type().to_string(),
3137            })
3138        }
3139    };
3140    crate::fts::fn_ts_lexize_with(kind, word)
3141}
3142
3143fn fts_numnode(args: &[Value]) -> Result<Value> {
3144    check_args("numnode", args, 1)?;
3145    if args[0].is_null() {
3146        return Ok(Value::Null);
3147    }
3148    let tsq = match &args[0] {
3149        Value::TsQuery(b) => b,
3150        v => {
3151            return Err(SqlError::TypeMismatch {
3152                expected: "TSQUERY".into(),
3153                got: v.data_type().to_string(),
3154            })
3155        }
3156    };
3157    crate::fts::fn_numnode(tsq)
3158}
3159
3160fn fts_setweight(args: &[Value]) -> Result<Value> {
3161    if args.len() == 3 {
3162        return fts_setweight_selective(args);
3163    }
3164    check_args("setweight", args, 2)?;
3165    if args[0].is_null() || args[1].is_null() {
3166        return Ok(Value::Null);
3167    }
3168    let tsv = match &args[0] {
3169        Value::TsVector(b) => b,
3170        v => {
3171            return Err(SqlError::TypeMismatch {
3172                expected: "TSVECTOR".into(),
3173                got: v.data_type().to_string(),
3174            })
3175        }
3176    };
3177    let weight_text = match &args[1] {
3178        Value::Text(s) => s.as_str(),
3179        v => {
3180            return Err(SqlError::TypeMismatch {
3181                expected: "TEXT".into(),
3182                got: v.data_type().to_string(),
3183            })
3184        }
3185    };
3186    let weight = crate::fts::parse_weight_char(weight_text)?;
3187    crate::fts::fn_setweight(tsv, weight)
3188}
3189
3190fn fts_setweight_selective(args: &[Value]) -> Result<Value> {
3191    check_args("setweight", args, 3)?;
3192    if args[0].is_null() || args[1].is_null() || args[2].is_null() {
3193        return Ok(Value::Null);
3194    }
3195    let tsv = match &args[0] {
3196        Value::TsVector(b) => b,
3197        v => {
3198            return Err(SqlError::TypeMismatch {
3199                expected: "TSVECTOR".into(),
3200                got: v.data_type().to_string(),
3201            })
3202        }
3203    };
3204    let weight_text = match &args[1] {
3205        Value::Text(s) => s.as_str(),
3206        v => {
3207            return Err(SqlError::TypeMismatch {
3208                expected: "TEXT".into(),
3209                got: v.data_type().to_string(),
3210            })
3211        }
3212    };
3213    let weight = crate::fts::parse_weight_char(weight_text)?;
3214    let filter = match &args[2] {
3215        Value::Array(a) => a.as_ref().as_slice(),
3216        v => {
3217            return Err(SqlError::TypeMismatch {
3218                expected: "TEXT[]".into(),
3219                got: v.data_type().to_string(),
3220            })
3221        }
3222    };
3223    crate::fts::fn_setweight_selective(tsv, weight, filter)
3224}
3225
3226fn fts_strip(args: &[Value]) -> Result<Value> {
3227    check_args("strip", args, 1)?;
3228    if args[0].is_null() {
3229        return Ok(Value::Null);
3230    }
3231    let tsv = match &args[0] {
3232        Value::TsVector(b) => b,
3233        v => {
3234            return Err(SqlError::TypeMismatch {
3235                expected: "TSVECTOR".into(),
3236                got: v.data_type().to_string(),
3237            })
3238        }
3239    };
3240    crate::fts::fn_strip(tsv)
3241}
3242
3243/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
3244fn ts_of(v: &Value) -> Result<i64> {
3245    match v {
3246        Value::Timestamp(t) => Ok(*t),
3247        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
3248        _ => Err(SqlError::TypeMismatch {
3249            expected: "TIMESTAMP or DATE".into(),
3250            got: v.data_type().to_string(),
3251        }),
3252    }
3253}
3254
3255fn int_arg(v: &Value, label: &str) -> Result<i64> {
3256    match v {
3257        Value::Integer(n) => Ok(*n),
3258        _ => Err(SqlError::TypeMismatch {
3259            expected: format!("INTEGER ({label})"),
3260            got: v.data_type().to_string(),
3261        }),
3262    }
3263}
3264
3265/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
3266fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
3267    match v {
3268        Value::Integer(n) => {
3269            if !(0..=60).contains(n) {
3270                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
3271            }
3272            Ok((*n as u8, 0))
3273        }
3274        Value::Real(r) => {
3275            let whole = r.trunc() as i64;
3276            if !(0..=60).contains(&whole) {
3277                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
3278            }
3279            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
3280            Ok((whole as u8, frac.max(0) as u32))
3281        }
3282        _ => Err(SqlError::TypeMismatch {
3283            expected: "numeric seconds".into(),
3284            got: v.data_type().to_string(),
3285        }),
3286    }
3287}
3288
3289/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
3290fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
3291    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
3292    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
3293    let ts = JTimestamp::from_microsecond(ts_utc)
3294        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
3295    let zoned = ts.to_zoned(zone.clone());
3296    let unit_lower = unit.to_ascii_lowercase();
3297    let rounded = match unit_lower.as_str() {
3298        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
3299        "second" => zoned
3300            .start_of_day()
3301            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
3302        _ => {
3303            let naive_ts = zoned.timestamp().as_microsecond();
3304            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
3305        }
3306    };
3307    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
3308}
3309
3310fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
3311    if args.len() != expected {
3312        Err(SqlError::InvalidValue(format!(
3313            "{name} requires {expected} argument(s), got {}",
3314            args.len()
3315        )))
3316    } else {
3317        Ok(())
3318    }
3319}
3320
3321pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
3322    let mut indices = Vec::new();
3323    collect_column_refs(expr, columns, &mut indices);
3324    indices.sort_unstable();
3325    indices.dedup();
3326    indices
3327}
3328
3329fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
3330    match expr {
3331        Expr::Column(name) => {
3332            for (i, c) in columns.iter().enumerate() {
3333                if c.name == *name
3334                    || (c.name.len() > name.len()
3335                        && c.name.as_bytes()[c.name.len() - name.len() - 1] == b'.'
3336                        && c.name.ends_with(name.as_str()))
3337                {
3338                    out.push(i);
3339                    break;
3340                }
3341            }
3342        }
3343        Expr::QualifiedColumn { table, column } => {
3344            let mut found: Option<usize> = None;
3345            let mut bare_match: Option<usize> = None;
3346            let mut bare_count = 0usize;
3347            for (i, c) in columns.iter().enumerate() {
3348                if c.name.len() == table.len() + 1 + column.len()
3349                    && c.name.as_bytes()[table.len()] == b'.'
3350                    && c.name.starts_with(table.as_str())
3351                    && c.name.ends_with(column.as_str())
3352                {
3353                    found = Some(i);
3354                    break;
3355                }
3356                if c.name == *column {
3357                    bare_match = Some(i);
3358                    bare_count += 1;
3359                }
3360            }
3361            if let Some(idx) = found {
3362                out.push(idx);
3363            } else if bare_count == 1 {
3364                out.push(bare_match.unwrap());
3365            }
3366        }
3367        Expr::BinaryOp { left, right, .. } => {
3368            collect_column_refs(left, columns, out);
3369            collect_column_refs(right, columns, out);
3370        }
3371        Expr::UnaryOp { expr, .. } => {
3372            collect_column_refs(expr, columns, out);
3373        }
3374        Expr::IsNull(e) | Expr::IsNotNull(e) => {
3375            collect_column_refs(e, columns, out);
3376        }
3377        Expr::Function { args, .. } => {
3378            for arg in args {
3379                collect_column_refs(arg, columns, out);
3380            }
3381        }
3382        Expr::InSubquery { expr, .. } => {
3383            collect_column_refs(expr, columns, out);
3384        }
3385        Expr::InList { expr, list, .. } => {
3386            collect_column_refs(expr, columns, out);
3387            for item in list {
3388                collect_column_refs(item, columns, out);
3389            }
3390        }
3391        Expr::InSet { expr, .. } => {
3392            collect_column_refs(expr, columns, out);
3393        }
3394        Expr::Between {
3395            expr, low, high, ..
3396        } => {
3397            collect_column_refs(expr, columns, out);
3398            collect_column_refs(low, columns, out);
3399            collect_column_refs(high, columns, out);
3400        }
3401        Expr::Like {
3402            expr,
3403            pattern,
3404            escape,
3405            ..
3406        } => {
3407            collect_column_refs(expr, columns, out);
3408            collect_column_refs(pattern, columns, out);
3409            if let Some(esc) = escape {
3410                collect_column_refs(esc, columns, out);
3411            }
3412        }
3413        Expr::Case {
3414            operand,
3415            conditions,
3416            else_result,
3417        } => {
3418            if let Some(op) = operand {
3419                collect_column_refs(op, columns, out);
3420            }
3421            for (when, then) in conditions {
3422                collect_column_refs(when, columns, out);
3423                collect_column_refs(then, columns, out);
3424            }
3425            if let Some(e) = else_result {
3426                collect_column_refs(e, columns, out);
3427            }
3428        }
3429        Expr::Coalesce(args) => {
3430            for arg in args {
3431                collect_column_refs(arg, columns, out);
3432            }
3433        }
3434        Expr::Cast { expr, .. } => {
3435            collect_column_refs(expr, columns, out);
3436        }
3437        Expr::Collate { expr, .. } => {
3438            collect_column_refs(expr, columns, out);
3439        }
3440        Expr::WindowFunction { args, spec, .. } => {
3441            for arg in args {
3442                collect_column_refs(arg, columns, out);
3443            }
3444            for pb in &spec.partition_by {
3445                collect_column_refs(pb, columns, out);
3446            }
3447            for ob in &spec.order_by {
3448                collect_column_refs(&ob.expr, columns, out);
3449            }
3450        }
3451        Expr::ArrayLiteral(elems) => {
3452            for e in elems {
3453                collect_column_refs(e, columns, out);
3454            }
3455        }
3456        Expr::Quantified { left, right, .. } => {
3457            collect_column_refs(left, columns, out);
3458            if let crate::parser::QuantifiedRhs::Array(e) = right {
3459                collect_column_refs(e, columns, out);
3460            }
3461        }
3462        Expr::Literal(_)
3463        | Expr::Parameter(_)
3464        | Expr::CountStar
3465        | Expr::Exists { .. }
3466        | Expr::ScalarSubquery(_)
3467        | Expr::TypedNullRecord(_) => {}
3468    }
3469}
3470
3471pub fn is_truthy(val: &Value) -> bool {
3472    match val {
3473        Value::Boolean(b) => *b,
3474        Value::Integer(i) => *i != 0,
3475        Value::Null => false,
3476        _ => true,
3477    }
3478}
3479
3480#[cfg(test)]
3481#[path = "eval_tests.rs"]
3482mod tests;