Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9#[derive(Debug)]
10pub struct ColumnMap {
11    exact: FxHashMap<String, usize>,
12    short: FxHashMap<String, ShortMatch>,
13    collations: Vec<crate::types::Collation>,
14    has_non_binary_collation: bool,
15}
16
17#[derive(Clone, Debug)]
18enum ShortMatch {
19    Unique(usize),
20    Ambiguous,
21}
22
23impl Clone for ColumnMap {
24    fn clone(&self) -> Self {
25        Self {
26            exact: self.exact.clone(),
27            short: self.short.clone(),
28            collations: self.collations.clone(),
29            has_non_binary_collation: self.has_non_binary_collation,
30        }
31    }
32}
33
34impl ColumnMap {
35    pub fn new(columns: &[ColumnDef]) -> Self {
36        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
37        let mut short: FxHashMap<String, ShortMatch> =
38            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
39        let mut collations = Vec::with_capacity(columns.len());
40        let mut has_non_binary_collation = false;
41
42        for (i, col) in columns.iter().enumerate() {
43            let lower = col.name.to_ascii_lowercase();
44            exact.insert(lower.clone(), i);
45
46            let unqualified = if let Some(dot) = lower.rfind('.') {
47                &lower[dot + 1..]
48            } else {
49                &lower
50            };
51            short
52                .entry(unqualified.to_string())
53                .and_modify(|e| *e = ShortMatch::Ambiguous)
54                .or_insert(ShortMatch::Unique(i));
55            collations.push(col.collation);
56            if col.collation != crate::types::Collation::Binary {
57                has_non_binary_collation = true;
58            }
59        }
60
61        Self {
62            exact,
63            short,
64            collations,
65            has_non_binary_collation,
66        }
67    }
68
69    pub(crate) fn collation_at(&self, idx: usize) -> crate::types::Collation {
70        self.collations
71            .get(idx)
72            .copied()
73            .unwrap_or(crate::types::Collation::Binary)
74    }
75
76    #[inline]
77    pub(crate) fn has_non_binary_collation(&self) -> bool {
78        self.has_non_binary_collation
79    }
80
81    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
82        if let Some(&idx) = self.exact.get(name) {
83            return Ok(idx);
84        }
85        match self.short.get(name) {
86            Some(ShortMatch::Unique(idx)) => Ok(*idx),
87            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
88            None => Err(SqlError::ColumnNotFound(name.to_string())),
89        }
90    }
91
92    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
93        let qualified = format!("{table}.{column}");
94        if let Some(&idx) = self.exact.get(&qualified) {
95            return Ok(idx);
96        }
97        match self.short.get(column) {
98            Some(ShortMatch::Unique(idx)) => Ok(*idx),
99            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
100        }
101    }
102}
103
104pub struct EvalCtx<'a> {
105    pub col_map: &'a ColumnMap,
106    pub row: &'a [Value],
107    pub params: &'a [Value],
108    pub excluded: Option<ExcludedRow<'a>>,
109    pub old_new: Option<OldNewRows<'a>>,
110    pub session_tz: Option<jiff::tz::TimeZone>,
111}
112
113pub struct ExcludedRow<'a> {
114    pub col_map: &'a ColumnMap,
115    pub row: &'a [Value],
116}
117
118pub struct OldNewRows<'a> {
119    pub col_map: &'a ColumnMap,
120    pub old_row: Option<&'a [Value]>,
121    pub new_row: Option<&'a [Value]>,
122}
123
124impl<'a> EvalCtx<'a> {
125    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
126        Self {
127            col_map,
128            row,
129            params: &[],
130            excluded: None,
131            old_new: None,
132            session_tz: None,
133        }
134    }
135
136    pub fn with_session_tz(mut self, tz: Option<jiff::tz::TimeZone>) -> Self {
137        self.session_tz = tz;
138        self
139    }
140
141    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
142        Self {
143            col_map,
144            row,
145            params,
146            excluded: None,
147            old_new: None,
148            session_tz: None,
149        }
150    }
151
152    pub fn with_excluded(
153        col_map: &'a ColumnMap,
154        row: &'a [Value],
155        excluded_col_map: &'a ColumnMap,
156        excluded_row: &'a [Value],
157    ) -> Self {
158        Self {
159            col_map,
160            row,
161            params: &[],
162            excluded: Some(ExcludedRow {
163                col_map: excluded_col_map,
164                row: excluded_row,
165            }),
166            old_new: None,
167            session_tz: None,
168        }
169    }
170
171    pub fn with_old_new(
172        col_map: &'a ColumnMap,
173        row: &'a [Value],
174        old_row: Option<&'a [Value]>,
175        new_row: Option<&'a [Value]>,
176    ) -> Self {
177        Self {
178            col_map,
179            row,
180            params: &[],
181            excluded: None,
182            old_new: Some(OldNewRows {
183                col_map,
184                old_row,
185                new_row,
186            }),
187            session_tz: None,
188        }
189    }
190}
191
192thread_local! {
193    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
194        const { std::cell::Cell::new((std::ptr::null(), 0)) };
195}
196
197/// Install positional parameters for `Expr::Parameter` resolution during `f`.
198pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
199    struct Guard((*const Value, usize));
200    impl Drop for Guard {
201        fn drop(&mut self) {
202            SCOPED_PARAMS.with(|slot| slot.set(self.0));
203        }
204    }
205    SCOPED_PARAMS.with(|slot| {
206        let prev = slot.get();
207        slot.set((params.as_ptr(), params.len()));
208        let _guard = Guard(prev);
209        f()
210    })
211}
212
213fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
214    if !ctx_params.is_empty() {
215        if n == 0 || n > ctx_params.len() {
216            return Err(SqlError::ParameterCountMismatch {
217                expected: n,
218                got: ctx_params.len(),
219            });
220        }
221        return Ok(ctx_params[n - 1].clone());
222    }
223    resolve_scoped_param(n)
224}
225
226pub fn resolve_scoped_param(n: usize) -> Result<Value> {
227    SCOPED_PARAMS.with(|slot| {
228        let (ptr, len) = slot.get();
229        if n == 0 || n > len {
230            return Err(SqlError::ParameterCountMismatch {
231                expected: n,
232                got: len,
233            });
234        }
235        // SAFETY: `with_scoped_params` keeps the slice alive for the duration of `f()`
236        // and restores the previous pointer on return. Reads only happen inside `f()`.
237        unsafe { Ok((*ptr.add(n - 1)).clone()) }
238    })
239}
240
241pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
242    match expr {
243        Expr::Literal(v) => Ok(v.clone()),
244
245        Expr::Column(name) => {
246            let idx = ctx.col_map.resolve(name)?;
247            Ok(ctx.row[idx].clone())
248        }
249
250        Expr::QualifiedColumn { table, column } => {
251            if let Some(excluded) = ctx.excluded.as_ref() {
252                if table.eq_ignore_ascii_case("excluded") {
253                    let lowered = column.to_ascii_lowercase();
254                    let idx = excluded.col_map.resolve(&lowered)?;
255                    return Ok(excluded.row[idx].clone());
256                }
257            }
258            if let Some(on) = ctx.old_new.as_ref() {
259                if table.eq_ignore_ascii_case("old") {
260                    let lowered = column.to_ascii_lowercase();
261                    let idx = on.col_map.resolve(&lowered)?;
262                    return Ok(on.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
263                }
264                if table.eq_ignore_ascii_case("new") {
265                    let lowered = column.to_ascii_lowercase();
266                    let idx = on.col_map.resolve(&lowered)?;
267                    return Ok(on.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
268                }
269            }
270            let idx = ctx.col_map.resolve_qualified(table, column)?;
271            Ok(ctx.row[idx].clone())
272        }
273
274        Expr::BinaryOp { left, op, right } => {
275            let lval = eval_expr(left, ctx)?;
276            let rval = eval_expr(right, ctx)?;
277            let needs_collation_check = ctx.col_map.has_non_binary_collation()
278                || matches!(left.as_ref(), Expr::Collate { .. })
279                || matches!(right.as_ref(), Expr::Collate { .. });
280            if needs_collation_check {
281                let coll = collation_of(left)
282                    .or_else(|| collation_of(right))
283                    .or_else(|| {
284                        column_collation(left, ctx).or_else(|| column_collation(right, ctx))
285                    });
286                if let Some(c) = coll {
287                    if c != crate::types::Collation::Binary {
288                        if let Some(b) = eval_text_compare(&lval, *op, &rval, c) {
289                            return Ok(Value::Boolean(b));
290                        }
291                    }
292                }
293            }
294            eval_binary_op(&lval, *op, &rval)
295        }
296
297        Expr::UnaryOp { op, expr } => {
298            let val = eval_expr(expr, ctx)?;
299            eval_unary_op(*op, &val)
300        }
301
302        Expr::IsNull(e) => {
303            let val = eval_expr(e, ctx)?;
304            Ok(Value::Boolean(val.is_null()))
305        }
306
307        Expr::IsNotNull(e) => {
308            let val = eval_expr(e, ctx)?;
309            Ok(Value::Boolean(!val.is_null()))
310        }
311
312        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
313
314        Expr::CountStar => Err(SqlError::Unsupported(
315            "COUNT(*) in non-aggregate context".into(),
316        )),
317
318        Expr::InList {
319            expr: e,
320            list,
321            negated,
322        } => {
323            let lhs = eval_expr(e, ctx)?;
324            eval_in_values(&lhs, list, ctx, *negated)
325        }
326
327        Expr::InSet {
328            expr: e,
329            values,
330            has_null,
331            negated,
332        } => {
333            let lhs = eval_expr(e, ctx)?;
334            eval_in_set(&lhs, values, *has_null, *negated)
335        }
336
337        Expr::Between {
338            expr: e,
339            low,
340            high,
341            negated,
342        } => {
343            let val = eval_expr(e, ctx)?;
344            let lo = eval_expr(low, ctx)?;
345            let hi = eval_expr(high, ctx)?;
346            eval_between(&val, &lo, &hi, *negated)
347        }
348
349        Expr::Like {
350            expr: e,
351            pattern,
352            escape,
353            negated,
354        } => {
355            let val = eval_expr(e, ctx)?;
356            let pat = eval_expr(pattern, ctx)?;
357            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
358            eval_like(&val, &pat, esc.as_ref(), *negated)
359        }
360
361        Expr::Case {
362            operand,
363            conditions,
364            else_result,
365        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
366
367        Expr::Coalesce(args) => {
368            for arg in args {
369                let val = eval_expr(arg, ctx)?;
370                if !val.is_null() {
371                    return Ok(val);
372                }
373            }
374            Ok(Value::Null)
375        }
376
377        Expr::Cast { expr: e, data_type } => {
378            let val = eval_expr(e, ctx)?;
379            eval_cast(&val, *data_type)
380        }
381
382        Expr::Collate { expr: e, .. } => eval_expr(e, ctx),
383
384        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
385            SqlError::Unsupported("subquery not materialized (internal error)".into()),
386        ),
387
388        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
389
390        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
391            "window functions are only allowed in SELECT columns".into(),
392        )),
393
394        Expr::TypedNullRecord(_) => Ok(Value::Null),
395    }
396}
397
398/// Planner-level constant folding hook; shares semantics with row evaluation.
399fn collation_of(expr: &Expr) -> Option<crate::types::Collation> {
400    match expr {
401        Expr::Collate { collation, .. } => Some(*collation),
402        _ => None,
403    }
404}
405
406fn column_collation(expr: &Expr, ctx: &EvalCtx<'_>) -> Option<crate::types::Collation> {
407    match expr {
408        Expr::Column(name) => ctx
409            .col_map
410            .resolve(name)
411            .ok()
412            .map(|i| ctx.col_map.collation_at(i)),
413        Expr::QualifiedColumn { table, column } => ctx
414            .col_map
415            .resolve_qualified(table, column)
416            .ok()
417            .map(|i| ctx.col_map.collation_at(i)),
418        _ => None,
419    }
420}
421
422fn eval_text_compare(
423    left: &Value,
424    op: BinOp,
425    right: &Value,
426    coll: crate::types::Collation,
427) -> Option<bool> {
428    let (a, b) = match (left, right) {
429        (Value::Null, _) | (_, Value::Null) => return None,
430        (Value::Text(a), Value::Text(b)) => (a.as_str(), b.as_str()),
431        _ => return None,
432    };
433    let ord = coll.cmp_text(a, b);
434    Some(match op {
435        BinOp::Eq => ord == std::cmp::Ordering::Equal,
436        BinOp::NotEq => ord != std::cmp::Ordering::Equal,
437        BinOp::Lt => ord == std::cmp::Ordering::Less,
438        BinOp::Gt => ord == std::cmp::Ordering::Greater,
439        BinOp::LtEq => ord != std::cmp::Ordering::Greater,
440        BinOp::GtEq => ord != std::cmp::Ordering::Less,
441        _ => return None,
442    })
443}
444
445pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
446    eval_binary_op(left, op, right)
447}
448
449fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
450    match op {
451        BinOp::And => return eval_and(left, right),
452        BinOp::Or => return eval_or(left, right),
453        _ => {}
454    }
455
456    if left.is_null() || right.is_null() {
457        return Ok(Value::Null);
458    }
459
460    if let Some(res) = eval_temporal_op(left, op, right) {
461        return res;
462    }
463
464    match op {
465        BinOp::Eq => Ok(Value::Boolean(left == right)),
466        BinOp::NotEq => Ok(Value::Boolean(left != right)),
467        BinOp::Lt => Ok(Value::Boolean(left < right)),
468        BinOp::Gt => Ok(Value::Boolean(left > right)),
469        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
470        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
471        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
472        BinOp::Sub => match left {
473            Value::Json(_) | Value::Jsonb(_) => crate::json::op_delete_one(left, right),
474            _ => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
475        },
476        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
477        BinOp::Div => {
478            match right {
479                Value::Integer(0) => return Err(SqlError::DivisionByZero),
480                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
481                _ => {}
482            }
483            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
484        }
485        BinOp::Mod => {
486            match right {
487                Value::Integer(0) => return Err(SqlError::DivisionByZero),
488                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
489                _ => {}
490            }
491            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
492        }
493        BinOp::Concat => match (left, right) {
494            (Value::Json(_) | Value::Jsonb(_), _) | (_, Value::Json(_) | Value::Jsonb(_)) => {
495                crate::json::op_concat(left, right)
496            }
497            _ => {
498                let ls = value_to_text(left);
499                let rs = value_to_text(right);
500                Ok(Value::Text(format!("{ls}{rs}").into()))
501            }
502        },
503        BinOp::JsonGet
504        | BinOp::JsonGetText
505        | BinOp::JsonPath
506        | BinOp::JsonPathText
507        | BinOp::JsonContains
508        | BinOp::JsonContainedBy
509        | BinOp::JsonHasKey
510        | BinOp::JsonHasAnyKey
511        | BinOp::JsonHasAllKeys
512        | BinOp::JsonDeletePath
513        | BinOp::JsonPathExists
514        | BinOp::JsonPathMatch => eval_json_binary_op(left, op, right),
515        BinOp::And | BinOp::Or => unreachable!(),
516    }
517}
518
519#[cold]
520fn eval_json_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
521    match op {
522        BinOp::JsonGet => crate::json::op_get(left, right),
523        BinOp::JsonGetText => crate::json::op_get_text(left, right),
524        BinOp::JsonPath => crate::json::op_path(left, right),
525        BinOp::JsonPathText => crate::json::op_path_text(left, right),
526        BinOp::JsonContains => crate::json::op_contains(left, right),
527        BinOp::JsonContainedBy => crate::json::op_contained_by(left, right),
528        BinOp::JsonHasKey => crate::json::op_has_key(left, right),
529        BinOp::JsonHasAnyKey => crate::json::op_has_any_key(left, right),
530        BinOp::JsonHasAllKeys => crate::json::op_has_all_keys(left, right),
531        BinOp::JsonDeletePath => crate::json::op_delete_path(left, right),
532        BinOp::JsonPathExists => crate::json::op_path_exists(left, right),
533        BinOp::JsonPathMatch => eval_at_at(left, right),
534        _ => unreachable!(),
535    }
536}
537
538fn eval_at_at(left: &Value, right: &Value) -> Result<Value> {
539    use crate::types::DataType as D;
540    if left.is_null() || right.is_null() {
541        return Ok(Value::Null);
542    }
543    match (left.data_type(), right.data_type()) {
544        (D::Json | D::Jsonb, D::Text) => crate::json::op_path_match(left, right),
545        (D::TsVector, D::TsQuery) => match (left, right) {
546            (Value::TsVector(v), Value::TsQuery(q)) => crate::fts::op_match(v, q),
547            _ => unreachable!(),
548        },
549        (D::TsQuery, D::TsVector) => match (left, right) {
550            (Value::TsQuery(q), Value::TsVector(v)) => crate::fts::op_match(v, q),
551            _ => unreachable!(),
552        },
553        (D::Text, D::TsQuery) => {
554            let s = match left {
555                Value::Text(s) => s.as_str(),
556                _ => unreachable!(),
557            };
558            let lhs = crate::fts::fn_to_tsvector(s)?;
559            eval_at_at(&lhs, right)
560        }
561        (D::TsVector, D::Text) => {
562            let s = match right {
563                Value::Text(s) => s.as_str(),
564                _ => unreachable!(),
565            };
566            let rhs = crate::fts::fn_plainto_tsquery(s)?;
567            eval_at_at(left, &rhs)
568        }
569        (D::Text, D::Text) => {
570            let ls = match left {
571                Value::Text(s) => s.as_str(),
572                _ => unreachable!(),
573            };
574            let rs = match right {
575                Value::Text(s) => s.as_str(),
576                _ => unreachable!(),
577            };
578            let lhs = crate::fts::fn_to_tsvector(ls)?;
579            let rhs = crate::fts::fn_plainto_tsquery(rs)?;
580            eval_at_at(&lhs, &rhs)
581        }
582        (lt, rt) => Err(SqlError::TypeMismatch {
583            expected: "JSONB @@ text, tsvector @@ tsquery".into(),
584            got: format!("{lt} @@ {rt}"),
585        }),
586    }
587}
588
589/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
590fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
591    use crate::datetime as dt;
592    use std::cmp::Ordering;
593
594    let is_temporal = |v: &Value| {
595        matches!(
596            v,
597            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
598        )
599    };
600    if !is_temporal(left) && !is_temporal(right) {
601        return None;
602    }
603    if matches!(op, BinOp::Add | BinOp::Sub)
604        && ((is_temporal(left) && matches!(right, Value::Real(_)))
605            || (matches!(left, Value::Real(_)) && is_temporal(right)))
606    {
607        return Some(Err(SqlError::TypeMismatch {
608            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
609            got: format!("{} and {}", left.data_type(), right.data_type()),
610        }));
611    }
612
613    match (left, op, right) {
614        (Value::Date(d), BinOp::Add, Value::Integer(n))
615        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
616            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
617        }
618        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
619            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
620        }
621        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
622            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
623        }
624        // DATE ± INTERVAL → TIMESTAMP (PG rule).
625        (
626            Value::Date(d),
627            BinOp::Add,
628            Value::Interval {
629                months,
630                days,
631                micros,
632            },
633        )
634        | (
635            Value::Interval {
636                months,
637                days,
638                micros,
639            },
640            BinOp::Add,
641            Value::Date(d),
642        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
643        (
644            Value::Date(d),
645            BinOp::Sub,
646            Value::Interval {
647                months,
648                days,
649                micros,
650            },
651        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
652        (
653            Value::Timestamp(t),
654            BinOp::Add,
655            Value::Interval {
656                months,
657                days,
658                micros,
659            },
660        )
661        | (
662            Value::Interval {
663                months,
664                days,
665                micros,
666            },
667            BinOp::Add,
668            Value::Timestamp(t),
669        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
670        (
671            Value::Timestamp(t),
672            BinOp::Sub,
673            Value::Interval {
674                months,
675                days,
676                micros,
677            },
678        ) => Some(
679            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
680        ),
681        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
682            let (days, micros) = dt::subtract_timestamps(*a, *b);
683            Some(Ok(Value::Interval {
684                months: 0,
685                days,
686                micros,
687            }))
688        }
689        (
690            Value::Time(t),
691            BinOp::Add,
692            Value::Interval {
693                months,
694                days,
695                micros,
696            },
697        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
698        (
699            Value::Time(t),
700            BinOp::Sub,
701            Value::Interval {
702                months,
703                days,
704                micros,
705            },
706        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
707        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
708            months: 0,
709            days: 0,
710            micros: *a - *b,
711        })),
712        (
713            Value::Interval {
714                months: am,
715                days: ad,
716                micros: au,
717            },
718            BinOp::Add,
719            Value::Interval {
720                months: bm,
721                days: bd,
722                micros: bu,
723            },
724        ) => Some(Ok(Value::Interval {
725            months: am.saturating_add(*bm),
726            days: ad.saturating_add(*bd),
727            micros: au.saturating_add(*bu),
728        })),
729        (
730            Value::Interval {
731                months: am,
732                days: ad,
733                micros: au,
734            },
735            BinOp::Sub,
736            Value::Interval {
737                months: bm,
738                days: bd,
739                micros: bu,
740            },
741        ) => Some(Ok(Value::Interval {
742            months: am.saturating_sub(*bm),
743            days: ad.saturating_sub(*bd),
744            micros: au.saturating_sub(*bu),
745        })),
746        (
747            Value::Interval {
748                months,
749                days,
750                micros,
751            },
752            BinOp::Mul,
753            Value::Integer(n),
754        )
755        | (
756            Value::Integer(n),
757            BinOp::Mul,
758            Value::Interval {
759                months,
760                days,
761                micros,
762            },
763        ) => {
764            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
765            Some(Ok(Value::Interval {
766                months: months.saturating_mul(n32),
767                days: days.saturating_mul(n32),
768                micros: micros.saturating_mul(*n),
769            }))
770        }
771        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
772        (
773            Value::Interval {
774                months,
775                days,
776                micros,
777            },
778            BinOp::Mul,
779            Value::Real(r),
780        )
781        | (
782            Value::Real(r),
783            BinOp::Mul,
784            Value::Interval {
785                months,
786                days,
787                micros,
788            },
789        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
790        (
791            Value::Interval {
792                months,
793                days,
794                micros,
795            },
796            BinOp::Div,
797            Value::Integer(n),
798        ) if *n != 0 => Some(Ok(Value::Interval {
799            months: (*months as i64 / *n) as i32,
800            days: (*days as i64 / *n) as i32,
801            micros: *micros / *n,
802        })),
803        (
804            Value::Interval {
805                months,
806                days,
807                micros,
808            },
809            BinOp::Div,
810            Value::Real(r),
811        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
812        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
813        (
814            Value::Interval {
815                months: am,
816                days: ad,
817                micros: au,
818            },
819            op,
820            Value::Interval {
821                months: bm,
822                days: bd,
823                micros: bu,
824            },
825        ) if matches!(
826            op,
827            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
828        ) =>
829        {
830            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
831            let b = match op {
832                BinOp::Eq => ord == Ordering::Equal,
833                BinOp::NotEq => ord != Ordering::Equal,
834                BinOp::Lt => ord == Ordering::Less,
835                BinOp::Gt => ord == Ordering::Greater,
836                BinOp::LtEq => ord != Ordering::Greater,
837                BinOp::GtEq => ord != Ordering::Less,
838                _ => unreachable!(),
839            };
840            Some(Ok(Value::Boolean(b)))
841        }
842        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
843        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
844        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
845            Some(Err(SqlError::TypeMismatch {
846                expected: "INTERVAL (use CAST or explicit unit)".into(),
847                got: format!("{} and {}", left.data_type(), right.data_type()),
848            }))
849        }
850        _ => None,
851    }
852}
853
854/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
855fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
856    let raw_months = months as f64 * factor;
857    let whole_months = raw_months.trunc() as i64;
858    let frac_months = raw_months - whole_months as f64;
859    let months_frac_as_days = frac_months * 30.0;
860
861    let raw_days = days as f64 * factor + months_frac_as_days;
862    let whole_days = raw_days.trunc() as i64;
863    let frac_days = raw_days - whole_days as f64;
864    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
865
866    let raw_micros = (micros as f64 * factor).round() as i64;
867    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
868
869    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
870    Value::Interval {
871        months: clamp_i32(whole_months),
872        days: clamp_i32(whole_days),
873        micros: total_micros,
874    }
875}
876
877/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
878fn eval_and(left: &Value, right: &Value) -> Result<Value> {
879    let l = to_bool_or_null(left)?;
880    let r = to_bool_or_null(right)?;
881    match (l, r) {
882        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
883        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
884        _ => Ok(Value::Null),
885    }
886}
887
888/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
889fn eval_or(left: &Value, right: &Value) -> Result<Value> {
890    let l = to_bool_or_null(left)?;
891    let r = to_bool_or_null(right)?;
892    match (l, r) {
893        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
894        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
895        _ => Ok(Value::Null),
896    }
897}
898
899fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
900    match val {
901        Value::Boolean(b) => Ok(Some(*b)),
902        Value::Null => Ok(None),
903        Value::Integer(i) => Ok(Some(*i != 0)),
904        _ => Err(SqlError::TypeMismatch {
905            expected: "BOOLEAN".into(),
906            got: format!("{}", val.data_type()),
907        }),
908    }
909}
910
911fn eval_arithmetic(
912    left: &Value,
913    right: &Value,
914    int_op: fn(i64, i64) -> Option<i64>,
915    real_op: fn(f64, f64) -> f64,
916) -> Result<Value> {
917    match (left, right) {
918        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
919            .map(Value::Integer)
920            .ok_or(SqlError::IntegerOverflow),
921        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
922        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
923        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
924        _ => Err(SqlError::TypeMismatch {
925            expected: "numeric".into(),
926            got: format!("{} and {}", left.data_type(), right.data_type()),
927        }),
928    }
929}
930
931fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
932    if list.is_empty() {
933        return Ok(Value::Boolean(negated));
934    }
935    if lhs.is_null() {
936        return Ok(Value::Null);
937    }
938    let mut has_null = false;
939    for item in list {
940        let rhs = eval_expr(item, ctx)?;
941        if rhs.is_null() {
942            has_null = true;
943        } else if lhs == &rhs {
944            return Ok(Value::Boolean(!negated));
945        }
946    }
947    if has_null {
948        Ok(Value::Null)
949    } else {
950        Ok(Value::Boolean(negated))
951    }
952}
953
954fn eval_in_set(
955    lhs: &Value,
956    values: &rustc_hash::FxHashSet<Value>,
957    has_null: bool,
958    negated: bool,
959) -> Result<Value> {
960    if values.is_empty() && !has_null {
961        return Ok(Value::Boolean(negated));
962    }
963    if lhs.is_null() {
964        return Ok(Value::Null);
965    }
966    if values.contains(lhs) {
967        return Ok(Value::Boolean(!negated));
968    }
969    if has_null {
970        Ok(Value::Null)
971    } else {
972        Ok(Value::Boolean(negated))
973    }
974}
975
976fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
977    if val.is_null() {
978        return Ok(Value::Null);
979    }
980    match op {
981        UnaryOp::Neg => match val {
982            Value::Integer(i) => i
983                .checked_neg()
984                .map(Value::Integer)
985                .ok_or(SqlError::IntegerOverflow),
986            Value::Real(r) => Ok(Value::Real(-r)),
987            Value::Interval {
988                months,
989                days,
990                micros,
991            } => {
992                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
993                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
994                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
995                Ok(Value::Interval {
996                    months: m,
997                    days: d,
998                    micros: u,
999                })
1000            }
1001            _ => Err(SqlError::TypeMismatch {
1002                expected: "numeric or INTERVAL".into(),
1003                got: format!("{}", val.data_type()),
1004            }),
1005        },
1006        UnaryOp::Not => match val {
1007            Value::Boolean(b) => Ok(Value::Boolean(!b)),
1008            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
1009            _ => Err(SqlError::TypeMismatch {
1010                expected: "BOOLEAN".into(),
1011                got: format!("{}", val.data_type()),
1012            }),
1013        },
1014    }
1015}
1016
1017fn value_to_text(val: &Value) -> String {
1018    match val {
1019        Value::Text(s) => s.to_string(),
1020        Value::Integer(i) => i.to_string(),
1021        Value::Real(r) => {
1022            if r.fract() == 0.0 && r.is_finite() {
1023                format!("{r:.1}")
1024            } else {
1025                format!("{r}")
1026            }
1027        }
1028        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
1029        Value::Null => String::new(),
1030        Value::Blob(b) => {
1031            let mut s = String::with_capacity(b.len() * 2);
1032            for byte in b {
1033                s.push_str(&format!("{byte:02X}"));
1034            }
1035            s
1036        }
1037        Value::Date(d) => crate::datetime::format_date(*d),
1038        Value::Time(t) => crate::datetime::format_time(*t),
1039        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
1040        Value::Interval {
1041            months,
1042            days,
1043            micros,
1044        } => crate::datetime::format_interval(*months, *days, *micros),
1045        Value::Json(s) => s.to_string(),
1046        Value::Jsonb(b) => crate::json::decode_to_text(b).unwrap_or_default(),
1047        Value::TsVector(b) => crate::fts::tsvector_display(b),
1048        Value::TsQuery(b) => crate::fts::tsquery_display(b),
1049    }
1050}
1051
1052fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
1053    if val.is_null() || low.is_null() || high.is_null() {
1054        let ge = if val.is_null() || low.is_null() {
1055            None
1056        } else {
1057            Some(*val >= *low)
1058        };
1059        let le = if val.is_null() || high.is_null() {
1060            None
1061        } else {
1062            Some(*val <= *high)
1063        };
1064
1065        let result = match (ge, le) {
1066            (Some(false), _) | (_, Some(false)) => Some(false),
1067            (Some(true), Some(true)) => Some(true),
1068            _ => None,
1069        };
1070
1071        return match result {
1072            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
1073            None => Ok(Value::Null),
1074        };
1075    }
1076
1077    let in_range = *val >= *low && *val <= *high;
1078    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
1079}
1080
1081const MAX_LIKE_PATTERN_LEN: usize = 10_000;
1082
1083fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
1084    if val.is_null() || pattern.is_null() {
1085        return Ok(Value::Null);
1086    }
1087    let text = match val {
1088        Value::Text(s) => s.as_str(),
1089        _ => {
1090            return Err(SqlError::TypeMismatch {
1091                expected: "TEXT".into(),
1092                got: val.data_type().to_string(),
1093            })
1094        }
1095    };
1096    let pat = match pattern {
1097        Value::Text(s) => s.as_str(),
1098        _ => {
1099            return Err(SqlError::TypeMismatch {
1100                expected: "TEXT".into(),
1101                got: pattern.data_type().to_string(),
1102            })
1103        }
1104    };
1105
1106    if pat.len() > MAX_LIKE_PATTERN_LEN {
1107        return Err(SqlError::InvalidValue(format!(
1108            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
1109            pat.len()
1110        )));
1111    }
1112
1113    let esc_char = match escape {
1114        Some(Value::Text(s)) => {
1115            let mut chars = s.chars();
1116            let c = chars.next().ok_or_else(|| {
1117                SqlError::InvalidValue("ESCAPE must be a single character".into())
1118            })?;
1119            if chars.next().is_some() {
1120                return Err(SqlError::InvalidValue(
1121                    "ESCAPE must be a single character".into(),
1122                ));
1123            }
1124            Some(c)
1125        }
1126        Some(Value::Null) => return Ok(Value::Null),
1127        Some(_) => {
1128            return Err(SqlError::TypeMismatch {
1129                expected: "TEXT".into(),
1130                got: "non-text".into(),
1131            })
1132        }
1133        None => None,
1134    };
1135
1136    let matched = like_match(text, pat, esc_char);
1137    Ok(Value::Boolean(if negated { !matched } else { matched }))
1138}
1139
1140fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
1141    let t: Vec<char> = text.chars().collect();
1142    let p: Vec<char> = pattern.chars().collect();
1143    like_match_impl(&t, &p, 0, 0, escape)
1144}
1145
1146fn like_match_impl(
1147    t: &[char],
1148    p: &[char],
1149    mut ti: usize,
1150    mut pi: usize,
1151    esc: Option<char>,
1152) -> bool {
1153    let mut star_pi: Option<usize> = None;
1154    let mut star_ti: usize = 0;
1155
1156    while ti < t.len() {
1157        if pi < p.len() {
1158            if let Some(ec) = esc {
1159                if p[pi] == ec && pi + 1 < p.len() {
1160                    pi += 1;
1161                    let pc_lower = p[pi].to_ascii_lowercase();
1162                    let tc_lower = t[ti].to_ascii_lowercase();
1163                    if pc_lower == tc_lower {
1164                        pi += 1;
1165                        ti += 1;
1166                        continue;
1167                    } else if let Some(sp) = star_pi {
1168                        pi = sp + 1;
1169                        star_ti += 1;
1170                        ti = star_ti;
1171                        continue;
1172                    } else {
1173                        return false;
1174                    }
1175                }
1176            }
1177            if p[pi] == '%' {
1178                star_pi = Some(pi);
1179                star_ti = ti;
1180                pi += 1;
1181                continue;
1182            }
1183            if p[pi] == '_' {
1184                pi += 1;
1185                ti += 1;
1186                continue;
1187            }
1188            if p[pi].eq_ignore_ascii_case(&t[ti]) {
1189                pi += 1;
1190                ti += 1;
1191                continue;
1192            }
1193        }
1194        if let Some(sp) = star_pi {
1195            pi = sp + 1;
1196            star_ti += 1;
1197            ti = star_ti;
1198        } else {
1199            return false;
1200        }
1201    }
1202
1203    while pi < p.len() && p[pi] == '%' {
1204        pi += 1;
1205    }
1206    pi == p.len()
1207}
1208
1209fn eval_case(
1210    operand: Option<&Expr>,
1211    conditions: &[(Expr, Expr)],
1212    else_result: Option<&Expr>,
1213    ctx: &EvalCtx,
1214) -> Result<Value> {
1215    if let Some(op_expr) = operand {
1216        let op_val = eval_expr(op_expr, ctx)?;
1217        for (cond, result) in conditions {
1218            let cond_val = eval_expr(cond, ctx)?;
1219            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
1220                return eval_expr(result, ctx);
1221            }
1222        }
1223    } else {
1224        for (cond, result) in conditions {
1225            let cond_val = eval_expr(cond, ctx)?;
1226            if is_truthy(&cond_val) {
1227                return eval_expr(result, ctx);
1228            }
1229        }
1230    }
1231    match else_result {
1232        Some(e) => eval_expr(e, ctx),
1233        None => Ok(Value::Null),
1234    }
1235}
1236
1237pub(crate) fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
1238    if val.is_null() {
1239        return Ok(Value::Null);
1240    }
1241    match target {
1242        DataType::Integer => match val {
1243            Value::Integer(_) => Ok(val.clone()),
1244            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1245            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1246            Value::Text(s) => s
1247                .trim()
1248                .parse::<i64>()
1249                .map(Value::Integer)
1250                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1251                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1252            _ => Err(SqlError::InvalidValue(format!(
1253                "cannot cast {} to INTEGER",
1254                val.data_type()
1255            ))),
1256        },
1257        DataType::Real => match val {
1258            Value::Real(_) => Ok(val.clone()),
1259            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1260            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1261            Value::Text(s) => s
1262                .trim()
1263                .parse::<f64>()
1264                .map(Value::Real)
1265                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1266            _ => Err(SqlError::InvalidValue(format!(
1267                "cannot cast {} to REAL",
1268                val.data_type()
1269            ))),
1270        },
1271        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1272        DataType::Boolean => match val {
1273            Value::Boolean(_) => Ok(val.clone()),
1274            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1275            Value::Text(s) => {
1276                let lower = s.trim().to_ascii_lowercase();
1277                match lower.as_str() {
1278                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1279                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1280                    _ => Err(SqlError::InvalidValue(format!(
1281                        "cannot cast '{s}' to BOOLEAN"
1282                    ))),
1283                }
1284            }
1285            _ => Err(SqlError::InvalidValue(format!(
1286                "cannot cast {} to BOOLEAN",
1287                val.data_type()
1288            ))),
1289        },
1290        DataType::Blob => match val {
1291            Value::Blob(_) => Ok(val.clone()),
1292            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1293            _ => Err(SqlError::InvalidValue(format!(
1294                "cannot cast {} to BLOB",
1295                val.data_type()
1296            ))),
1297        },
1298        DataType::Null => Ok(Value::Null),
1299        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1300            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1301        }),
1302        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1303            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1304        }),
1305        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1306            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1307        }),
1308        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1309            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1310        }),
1311        DataType::Json => val.clone().coerce_into(DataType::Json).ok_or_else(|| {
1312            SqlError::InvalidValue(format!("cannot cast {} to JSON", val.data_type()))
1313        }),
1314        DataType::Jsonb => val.clone().coerce_into(DataType::Jsonb).ok_or_else(|| {
1315            SqlError::InvalidValue(format!("cannot cast {} to JSONB", val.data_type()))
1316        }),
1317        DataType::TsVector => val.clone().coerce_into(DataType::TsVector).ok_or_else(|| {
1318            SqlError::InvalidValue(format!("cannot cast {} to TSVECTOR", val.data_type()))
1319        }),
1320        DataType::TsQuery => val.clone().coerce_into(DataType::TsQuery).ok_or_else(|| {
1321            SqlError::InvalidValue(format!("cannot cast {} to TSQUERY", val.data_type()))
1322        }),
1323    }
1324}
1325
1326fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1327    let evaluated: Vec<Value> = args
1328        .iter()
1329        .map(|a| eval_expr(a, ctx))
1330        .collect::<Result<Vec<_>>>()?;
1331
1332    match name {
1333        "LENGTH" => {
1334            check_args(name, &evaluated, 1)?;
1335            match &evaluated[0] {
1336                Value::Null => Ok(Value::Null),
1337                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1338                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1339                Value::TsVector(b) => crate::fts::fn_length_tsvector(b),
1340                _ => Ok(Value::Integer(
1341                    value_to_text(&evaluated[0]).chars().count() as i64
1342                )),
1343            }
1344        }
1345        "UPPER" => {
1346            check_args(name, &evaluated, 1)?;
1347            match &evaluated[0] {
1348                Value::Null => Ok(Value::Null),
1349                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1350                _ => Ok(Value::Text(
1351                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1352                )),
1353            }
1354        }
1355        "LOWER" => {
1356            check_args(name, &evaluated, 1)?;
1357            match &evaluated[0] {
1358                Value::Null => Ok(Value::Null),
1359                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1360                _ => Ok(Value::Text(
1361                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1362                )),
1363            }
1364        }
1365        "SUBSTR" | "SUBSTRING" => {
1366            if evaluated.len() < 2 || evaluated.len() > 3 {
1367                return Err(SqlError::InvalidValue(format!(
1368                    "{name} requires 2 or 3 arguments"
1369                )));
1370            }
1371            if evaluated.iter().any(|v| v.is_null()) {
1372                return Ok(Value::Null);
1373            }
1374            let s = value_to_text(&evaluated[0]);
1375            let chars: Vec<char> = s.chars().collect();
1376            let start = match &evaluated[1] {
1377                Value::Integer(i) => *i,
1378                _ => {
1379                    return Err(SqlError::TypeMismatch {
1380                        expected: "INTEGER".into(),
1381                        got: evaluated[1].data_type().to_string(),
1382                    })
1383                }
1384            };
1385            let len = chars.len() as i64;
1386
1387            let (begin, count) = if evaluated.len() == 3 {
1388                let cnt = match &evaluated[2] {
1389                    Value::Integer(i) => *i,
1390                    _ => {
1391                        return Err(SqlError::TypeMismatch {
1392                            expected: "INTEGER".into(),
1393                            got: evaluated[2].data_type().to_string(),
1394                        })
1395                    }
1396                };
1397                if start >= 1 {
1398                    let b = (start - 1).min(len) as usize;
1399                    let c = cnt.max(0) as usize;
1400                    (b, c)
1401                } else if start == 0 {
1402                    let c = (cnt - 1).max(0) as usize;
1403                    (0usize, c)
1404                } else {
1405                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1406                    (0usize, adjusted_cnt)
1407                }
1408            } else if start >= 1 {
1409                let b = (start - 1).min(len) as usize;
1410                (b, chars.len() - b)
1411            } else if start == 0 {
1412                (0usize, chars.len())
1413            } else {
1414                let b = (len + start).max(0) as usize;
1415                (b, chars.len() - b)
1416            };
1417
1418            let result: String = chars.iter().skip(begin).take(count).collect();
1419            Ok(Value::Text(result.into()))
1420        }
1421        "TRIM" | "LTRIM" | "RTRIM" => {
1422            if evaluated.is_empty() || evaluated.len() > 2 {
1423                return Err(SqlError::InvalidValue(format!(
1424                    "{name} requires 1 or 2 arguments"
1425                )));
1426            }
1427            if evaluated[0].is_null() {
1428                return Ok(Value::Null);
1429            }
1430            let s = value_to_text(&evaluated[0]);
1431            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1432                if evaluated[1].is_null() {
1433                    return Ok(Value::Null);
1434                }
1435                value_to_text(&evaluated[1]).chars().collect()
1436            } else {
1437                vec![' ']
1438            };
1439            let result = match name {
1440                "TRIM" => s
1441                    .trim_matches(|c: char| trim_chars.contains(&c))
1442                    .to_string(),
1443                "LTRIM" => s
1444                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1445                    .to_string(),
1446                "RTRIM" => s
1447                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1448                    .to_string(),
1449                _ => unreachable!(),
1450            };
1451            Ok(Value::Text(result.into()))
1452        }
1453        "REPLACE" => {
1454            check_args(name, &evaluated, 3)?;
1455            if evaluated.iter().any(|v| v.is_null()) {
1456                return Ok(Value::Null);
1457            }
1458            let s = value_to_text(&evaluated[0]);
1459            let from = value_to_text(&evaluated[1]);
1460            let to = value_to_text(&evaluated[2]);
1461            if from.is_empty() {
1462                return Ok(Value::Text(s.into()));
1463            }
1464            Ok(Value::Text(s.replace(&from, &to).into()))
1465        }
1466        "INSTR" => {
1467            check_args(name, &evaluated, 2)?;
1468            if evaluated.iter().any(|v| v.is_null()) {
1469                return Ok(Value::Null);
1470            }
1471            let haystack = value_to_text(&evaluated[0]);
1472            let needle = value_to_text(&evaluated[1]);
1473            let pos = haystack
1474                .find(&needle)
1475                .map(|i| haystack[..i].chars().count() as i64 + 1)
1476                .unwrap_or(0);
1477            Ok(Value::Integer(pos))
1478        }
1479        "CONCAT" => {
1480            if evaluated.is_empty() {
1481                return Ok(Value::Text(CompactString::default()));
1482            }
1483            let mut result = String::new();
1484            for v in &evaluated {
1485                match v {
1486                    Value::Null => {}
1487                    _ => result.push_str(&value_to_text(v)),
1488                }
1489            }
1490            Ok(Value::Text(result.into()))
1491        }
1492        "ABS" => {
1493            check_args(name, &evaluated, 1)?;
1494            match &evaluated[0] {
1495                Value::Null => Ok(Value::Null),
1496                Value::Integer(i) => i
1497                    .checked_abs()
1498                    .map(Value::Integer)
1499                    .ok_or(SqlError::IntegerOverflow),
1500                Value::Real(r) => Ok(Value::Real(r.abs())),
1501                _ => Err(SqlError::TypeMismatch {
1502                    expected: "numeric".into(),
1503                    got: evaluated[0].data_type().to_string(),
1504                }),
1505            }
1506        }
1507        "ROUND" => {
1508            if evaluated.is_empty() || evaluated.len() > 2 {
1509                return Err(SqlError::InvalidValue(
1510                    "ROUND requires 1 or 2 arguments".into(),
1511                ));
1512            }
1513            if evaluated[0].is_null() {
1514                return Ok(Value::Null);
1515            }
1516            let val = match &evaluated[0] {
1517                Value::Integer(i) => *i as f64,
1518                Value::Real(r) => *r,
1519                _ => {
1520                    return Err(SqlError::TypeMismatch {
1521                        expected: "numeric".into(),
1522                        got: evaluated[0].data_type().to_string(),
1523                    })
1524                }
1525            };
1526            let places = if evaluated.len() == 2 {
1527                match &evaluated[1] {
1528                    Value::Null => return Ok(Value::Null),
1529                    Value::Integer(i) => *i,
1530                    _ => {
1531                        return Err(SqlError::TypeMismatch {
1532                            expected: "INTEGER".into(),
1533                            got: evaluated[1].data_type().to_string(),
1534                        })
1535                    }
1536                }
1537            } else {
1538                0
1539            };
1540            let factor = 10f64.powi(places as i32);
1541            let rounded = (val * factor).round() / factor;
1542            Ok(Value::Real(rounded))
1543        }
1544        "CEIL" | "CEILING" => {
1545            check_args(name, &evaluated, 1)?;
1546            match &evaluated[0] {
1547                Value::Null => Ok(Value::Null),
1548                Value::Integer(i) => Ok(Value::Integer(*i)),
1549                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1550                _ => Err(SqlError::TypeMismatch {
1551                    expected: "numeric".into(),
1552                    got: evaluated[0].data_type().to_string(),
1553                }),
1554            }
1555        }
1556        "FLOOR" => {
1557            check_args(name, &evaluated, 1)?;
1558            match &evaluated[0] {
1559                Value::Null => Ok(Value::Null),
1560                Value::Integer(i) => Ok(Value::Integer(*i)),
1561                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1562                _ => Err(SqlError::TypeMismatch {
1563                    expected: "numeric".into(),
1564                    got: evaluated[0].data_type().to_string(),
1565                }),
1566            }
1567        }
1568        "SIGN" => {
1569            check_args(name, &evaluated, 1)?;
1570            match &evaluated[0] {
1571                Value::Null => Ok(Value::Null),
1572                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1573                Value::Real(r) => {
1574                    if *r > 0.0 {
1575                        Ok(Value::Integer(1))
1576                    } else if *r < 0.0 {
1577                        Ok(Value::Integer(-1))
1578                    } else {
1579                        Ok(Value::Integer(0))
1580                    }
1581                }
1582                _ => Err(SqlError::TypeMismatch {
1583                    expected: "numeric".into(),
1584                    got: evaluated[0].data_type().to_string(),
1585                }),
1586            }
1587        }
1588        "SQRT" => {
1589            check_args(name, &evaluated, 1)?;
1590            match &evaluated[0] {
1591                Value::Null => Ok(Value::Null),
1592                Value::Integer(i) => {
1593                    if *i < 0 {
1594                        Ok(Value::Null)
1595                    } else {
1596                        Ok(Value::Real((*i as f64).sqrt()))
1597                    }
1598                }
1599                Value::Real(r) => {
1600                    if *r < 0.0 {
1601                        Ok(Value::Null)
1602                    } else {
1603                        Ok(Value::Real(r.sqrt()))
1604                    }
1605                }
1606                _ => Err(SqlError::TypeMismatch {
1607                    expected: "numeric".into(),
1608                    got: evaluated[0].data_type().to_string(),
1609                }),
1610            }
1611        }
1612        "RANDOM" => {
1613            check_args(name, &evaluated, 0)?;
1614            use std::collections::hash_map::DefaultHasher;
1615            use std::hash::{Hash, Hasher};
1616            use std::time::SystemTime;
1617            let mut hasher = DefaultHasher::new();
1618            SystemTime::now().hash(&mut hasher);
1619            std::thread::current().id().hash(&mut hasher);
1620            let mut val = hasher.finish() as i64;
1621            if val == i64::MIN {
1622                val = i64::MAX;
1623            }
1624            Ok(Value::Integer(val))
1625        }
1626        "TYPEOF" => {
1627            check_args(name, &evaluated, 1)?;
1628            let type_name = match &evaluated[0] {
1629                Value::Null => "null",
1630                Value::Integer(_) => "integer",
1631                Value::Real(_) => "real",
1632                Value::Text(_) => "text",
1633                Value::Blob(_) => "blob",
1634                Value::Boolean(_) => "boolean",
1635                Value::Date(_) => "date",
1636                Value::Time(_) => "time",
1637                Value::Timestamp(_) => "timestamp",
1638                Value::Interval { .. } => "interval",
1639                Value::Json(_) => "json",
1640                Value::Jsonb(_) => "jsonb",
1641                Value::TsVector(_) => "tsvector",
1642                Value::TsQuery(_) => "tsquery",
1643            };
1644            Ok(Value::Text(type_name.into()))
1645        }
1646        "MIN" => {
1647            check_args(name, &evaluated, 2)?;
1648            if evaluated[0].is_null() {
1649                return Ok(evaluated[1].clone());
1650            }
1651            if evaluated[1].is_null() {
1652                return Ok(evaluated[0].clone());
1653            }
1654            if evaluated[0] <= evaluated[1] {
1655                Ok(evaluated[0].clone())
1656            } else {
1657                Ok(evaluated[1].clone())
1658            }
1659        }
1660        "MAX" => {
1661            check_args(name, &evaluated, 2)?;
1662            if evaluated[0].is_null() {
1663                return Ok(evaluated[1].clone());
1664            }
1665            if evaluated[1].is_null() {
1666                return Ok(evaluated[0].clone());
1667            }
1668            if evaluated[0] >= evaluated[1] {
1669                Ok(evaluated[0].clone())
1670            } else {
1671                Ok(evaluated[1].clone())
1672            }
1673        }
1674        "HEX" => {
1675            check_args(name, &evaluated, 1)?;
1676            match &evaluated[0] {
1677                Value::Null => Ok(Value::Null),
1678                Value::Blob(b) => {
1679                    let mut s = String::with_capacity(b.len() * 2);
1680                    for byte in b {
1681                        s.push_str(&format!("{byte:02X}"));
1682                    }
1683                    Ok(Value::Text(s.into()))
1684                }
1685                Value::Text(s) => {
1686                    let mut r = String::with_capacity(s.len() * 2);
1687                    for byte in s.as_bytes() {
1688                        r.push_str(&format!("{byte:02X}"));
1689                    }
1690                    Ok(Value::Text(r.into()))
1691                }
1692                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
1693            }
1694        }
1695        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
1696            check_args(name, &evaluated, 0)?;
1697            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
1698        }
1699        "CURRENT_DATE" => {
1700            check_args(name, &evaluated, 0)?;
1701            Ok(Value::Date(crate::datetime::ts_to_date_floor(
1702                crate::datetime::txn_or_clock_micros(),
1703            )))
1704        }
1705        "CURRENT_TIME" | "LOCALTIME" => {
1706            check_args(name, &evaluated, 0)?;
1707            Ok(Value::Time(
1708                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
1709            ))
1710        }
1711        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
1712            check_args(name, &evaluated, 0)?;
1713            let ts = match name {
1714                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
1715                _ => crate::datetime::txn_or_clock_micros(),
1716            };
1717            Ok(Value::Timestamp(ts))
1718        }
1719        "EXTRACT" | "DATE_PART" | "DATEPART" => {
1720            check_args(name, &evaluated, 2)?;
1721            // Borrow the field str without allocating; datetime::extract accepts &str.
1722            let field: &str = match &evaluated[0] {
1723                Value::Null => return Ok(Value::Null),
1724                Value::Text(s) => s.as_str(),
1725                _ => {
1726                    return Err(SqlError::TypeMismatch {
1727                        expected: "TEXT field name".into(),
1728                        got: evaluated[0].data_type().to_string(),
1729                    })
1730                }
1731            };
1732            if evaluated[1].is_null() {
1733                return Ok(Value::Null);
1734            }
1735            crate::datetime::extract(field, &evaluated[1])
1736        }
1737        "DATE_TRUNC" => {
1738            if evaluated.len() < 2 || evaluated.len() > 3 {
1739                return Err(SqlError::InvalidValue(
1740                    "DATE_TRUNC requires 2 or 3 arguments".into(),
1741                ));
1742            }
1743            let unit = match &evaluated[0] {
1744                Value::Null => return Ok(Value::Null),
1745                Value::Text(s) => s.to_string(),
1746                _ => {
1747                    return Err(SqlError::TypeMismatch {
1748                        expected: "TEXT unit name".into(),
1749                        got: evaluated[0].data_type().to_string(),
1750                    })
1751                }
1752            };
1753            if evaluated[1].is_null() {
1754                return Ok(Value::Null);
1755            }
1756            // Optional tz arg: truncate in that zone, then convert back to UTC.
1757            if evaluated.len() == 3 {
1758                if let Value::Text(tz) = &evaluated[2] {
1759                    if !tz.eq_ignore_ascii_case("UTC") {
1760                        if let Value::Timestamp(ts) = &evaluated[1] {
1761                            return date_trunc_in_zone(&unit, *ts, tz);
1762                        }
1763                    }
1764                }
1765            }
1766            crate::datetime::date_trunc(&unit, &evaluated[1])
1767        }
1768        "DATE_BIN" => {
1769            check_args(name, &evaluated, 3)?;
1770            if evaluated.iter().any(|v| v.is_null()) {
1771                return Ok(Value::Null);
1772            }
1773            let stride = match &evaluated[0] {
1774                Value::Interval {
1775                    months: _,
1776                    days,
1777                    micros,
1778                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
1779                _ => {
1780                    return Err(SqlError::TypeMismatch {
1781                        expected: "INTERVAL stride".into(),
1782                        got: evaluated[0].data_type().to_string(),
1783                    })
1784                }
1785            };
1786            if stride <= 0 {
1787                return Err(SqlError::InvalidValue(
1788                    "DATE_BIN stride must be positive".into(),
1789                ));
1790            }
1791            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
1792                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
1793                _ => {
1794                    return Err(SqlError::TypeMismatch {
1795                        expected: "TIMESTAMP, TIMESTAMP".into(),
1796                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
1797                    })
1798                }
1799            };
1800            let diff = src - origin;
1801            let binned = origin + (diff.div_euclid(stride)) * stride;
1802            Ok(Value::Timestamp(binned))
1803        }
1804        "AGE" => {
1805            if evaluated.len() == 1 {
1806                if evaluated[0].is_null() {
1807                    return Ok(Value::Null);
1808                }
1809                let ts = match &evaluated[0] {
1810                    Value::Timestamp(t) => *t,
1811                    Value::Date(d) => crate::datetime::date_to_ts(*d),
1812                    _ => {
1813                        return Err(SqlError::TypeMismatch {
1814                            expected: "TIMESTAMP or DATE".into(),
1815                            got: evaluated[0].data_type().to_string(),
1816                        })
1817                    }
1818                };
1819                // Implicit reference: today at midnight UTC.
1820                let today = crate::datetime::today_days();
1821                let midnight = crate::datetime::date_to_ts(today);
1822                let (m, d, u) = crate::datetime::age(midnight, ts)?;
1823                return Ok(Value::Interval {
1824                    months: m,
1825                    days: d,
1826                    micros: u,
1827                });
1828            }
1829            check_args(name, &evaluated, 2)?;
1830            if evaluated.iter().any(|v| v.is_null()) {
1831                return Ok(Value::Null);
1832            }
1833            let a = ts_of(&evaluated[0])?;
1834            let b = ts_of(&evaluated[1])?;
1835            let (m, d, u) = crate::datetime::age(a, b)?;
1836            Ok(Value::Interval {
1837                months: m,
1838                days: d,
1839                micros: u,
1840            })
1841        }
1842        "MAKE_DATE" => {
1843            check_args(name, &evaluated, 3)?;
1844            if evaluated.iter().any(|v| v.is_null()) {
1845                return Ok(Value::Null);
1846            }
1847            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
1848            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
1849            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
1850            crate::datetime::ymd_to_days(y, m, d)
1851                .map(Value::Date)
1852                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
1853        }
1854        "MAKE_TIME" => {
1855            check_args(name, &evaluated, 3)?;
1856            if evaluated.iter().any(|v| v.is_null()) {
1857                return Ok(Value::Null);
1858            }
1859            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
1860            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
1861            let (s, us) = real_sec_arg(&evaluated[2])?;
1862            crate::datetime::hmsn_to_micros(h, mi, s, us)
1863                .map(Value::Time)
1864                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
1865        }
1866        "MAKE_TIMESTAMP" => {
1867            check_args(name, &evaluated, 6)?;
1868            if evaluated.iter().any(|v| v.is_null()) {
1869                return Ok(Value::Null);
1870            }
1871            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
1872            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
1873            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
1874            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
1875            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
1876            let (s, us) = real_sec_arg(&evaluated[5])?;
1877            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
1878                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
1879            })?;
1880            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
1881                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
1882            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
1883        }
1884        "MAKE_INTERVAL" => {
1885            // Positional args: years, months, weeks, days, hours, mins, secs.
1886            if evaluated.len() > 7 {
1887                return Err(SqlError::InvalidValue(
1888                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
1889                ));
1890            }
1891            let mut months: i64 = 0;
1892            let mut days: i64 = 0;
1893            let mut micros: i64 = 0;
1894            for (i, v) in evaluated.iter().enumerate() {
1895                if v.is_null() {
1896                    continue;
1897                }
1898                let n = match v {
1899                    Value::Integer(n) => *n,
1900                    Value::Real(r) => *r as i64,
1901                    _ => {
1902                        return Err(SqlError::TypeMismatch {
1903                            expected: "numeric".into(),
1904                            got: v.data_type().to_string(),
1905                        })
1906                    }
1907                };
1908                match i {
1909                    0 => months = months.saturating_add(n.saturating_mul(12)),
1910                    1 => months = months.saturating_add(n),
1911                    2 => days = days.saturating_add(n.saturating_mul(7)),
1912                    3 => days = days.saturating_add(n),
1913                    4 => {
1914                        micros = micros
1915                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
1916                    }
1917                    5 => {
1918                        micros =
1919                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
1920                    }
1921                    6 => {
1922                        // Seconds may be fractional — also check Real.
1923                        if let Value::Real(r) = v {
1924                            micros = micros.saturating_add(
1925                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
1926                            );
1927                        } else {
1928                            micros = micros
1929                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
1930                        }
1931                    }
1932                    _ => unreachable!(),
1933                }
1934            }
1935            Ok(Value::Interval {
1936                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1937                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
1938                micros,
1939            })
1940        }
1941        "JUSTIFY_DAYS" => {
1942            check_args(name, &evaluated, 1)?;
1943            match &evaluated[0] {
1944                Value::Null => Ok(Value::Null),
1945                Value::Interval {
1946                    months,
1947                    days,
1948                    micros,
1949                } => {
1950                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
1951                    Ok(Value::Interval {
1952                        months: m,
1953                        days: d,
1954                        micros: u,
1955                    })
1956                }
1957                other => Err(SqlError::TypeMismatch {
1958                    expected: "INTERVAL".into(),
1959                    got: other.data_type().to_string(),
1960                }),
1961            }
1962        }
1963        "JUSTIFY_HOURS" => {
1964            check_args(name, &evaluated, 1)?;
1965            match &evaluated[0] {
1966                Value::Null => Ok(Value::Null),
1967                Value::Interval {
1968                    months,
1969                    days,
1970                    micros,
1971                } => {
1972                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
1973                    Ok(Value::Interval {
1974                        months: m,
1975                        days: d,
1976                        micros: u,
1977                    })
1978                }
1979                other => Err(SqlError::TypeMismatch {
1980                    expected: "INTERVAL".into(),
1981                    got: other.data_type().to_string(),
1982                }),
1983            }
1984        }
1985        "JUSTIFY_INTERVAL" => {
1986            check_args(name, &evaluated, 1)?;
1987            match &evaluated[0] {
1988                Value::Null => Ok(Value::Null),
1989                Value::Interval {
1990                    months,
1991                    days,
1992                    micros,
1993                } => {
1994                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
1995                    Ok(Value::Interval {
1996                        months: m,
1997                        days: d,
1998                        micros: u,
1999                    })
2000                }
2001                other => Err(SqlError::TypeMismatch {
2002                    expected: "INTERVAL".into(),
2003                    got: other.data_type().to_string(),
2004                }),
2005            }
2006        }
2007        "ISFINITE" => {
2008            check_args(name, &evaluated, 1)?;
2009            if evaluated[0].is_null() {
2010                return Ok(Value::Null);
2011            }
2012            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
2013        }
2014        "DATE" => {
2015            if evaluated.is_empty() {
2016                return Err(SqlError::InvalidValue(
2017                    "DATE requires at least 1 argument".into(),
2018                ));
2019            }
2020            if evaluated[0].is_null() {
2021                return Ok(Value::Null);
2022            }
2023            let d = match &evaluated[0] {
2024                Value::Date(d) => *d,
2025                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
2026                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
2027                Value::Text(s) => crate::datetime::parse_date(s)?,
2028                Value::Integer(n) => {
2029                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
2030                }
2031                other => {
2032                    return Err(SqlError::TypeMismatch {
2033                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2034                        got: other.data_type().to_string(),
2035                    })
2036                }
2037            };
2038            Ok(Value::Date(d))
2039        }
2040        "TIME" => {
2041            if evaluated.is_empty() {
2042                return Err(SqlError::InvalidValue(
2043                    "TIME requires at least 1 argument".into(),
2044                ));
2045            }
2046            if evaluated[0].is_null() {
2047                return Ok(Value::Null);
2048            }
2049            let t = match &evaluated[0] {
2050                Value::Time(t) => *t,
2051                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
2052                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
2053                    crate::datetime::current_time_micros()
2054                }
2055                Value::Text(s) => crate::datetime::parse_time(s)?,
2056                other => {
2057                    return Err(SqlError::TypeMismatch {
2058                        expected: "TIMESTAMP, TIME, or TEXT".into(),
2059                        got: other.data_type().to_string(),
2060                    })
2061                }
2062            };
2063            Ok(Value::Time(t))
2064        }
2065        "DATETIME" => {
2066            if evaluated.is_empty() {
2067                return Err(SqlError::InvalidValue(
2068                    "DATETIME requires at least 1 argument".into(),
2069                ));
2070            }
2071            if evaluated[0].is_null() {
2072                return Ok(Value::Null);
2073            }
2074            let t = match &evaluated[0] {
2075                Value::Timestamp(t) => *t,
2076                Value::Date(d) => crate::datetime::date_to_ts(*d),
2077                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
2078                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
2079                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
2080                other => {
2081                    return Err(SqlError::TypeMismatch {
2082                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2083                        got: other.data_type().to_string(),
2084                    })
2085                }
2086            };
2087            Ok(Value::Timestamp(t))
2088        }
2089        "STRFTIME" => {
2090            if evaluated.len() < 2 {
2091                return Err(SqlError::InvalidValue(
2092                    "STRFTIME requires format + value".into(),
2093                ));
2094            }
2095            if evaluated.iter().take(2).any(|v| v.is_null()) {
2096                return Ok(Value::Null);
2097            }
2098            let fmt = match &evaluated[0] {
2099                Value::Text(s) => s.to_string(),
2100                _ => {
2101                    return Err(SqlError::TypeMismatch {
2102                        expected: "TEXT format".into(),
2103                        got: evaluated[0].data_type().to_string(),
2104                    })
2105                }
2106            };
2107            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
2108            Ok(Value::Text(out.into()))
2109        }
2110        "JULIANDAY" => {
2111            if evaluated.is_empty() {
2112                return Err(SqlError::InvalidValue(
2113                    "JULIANDAY requires at least 1 argument".into(),
2114                ));
2115            }
2116            if evaluated[0].is_null() {
2117                return Ok(Value::Null);
2118            }
2119            let micros = ts_of(&evaluated[0])?;
2120            let (days, tmicros) = crate::datetime::ts_split(micros);
2121            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
2122            let julian =
2123                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
2124            Ok(Value::Real(julian))
2125        }
2126        "UNIXEPOCH" => {
2127            if evaluated.is_empty() {
2128                return Err(SqlError::InvalidValue(
2129                    "UNIXEPOCH requires at least 1 argument".into(),
2130                ));
2131            }
2132            if evaluated[0].is_null() {
2133                return Ok(Value::Null);
2134            }
2135            let micros = ts_of(&evaluated[0])?;
2136            let subsec = evaluated
2137                .get(1)
2138                .and_then(|v| {
2139                    if let Value::Text(s) = v {
2140                        Some(s.to_string())
2141                    } else {
2142                        None
2143                    }
2144                })
2145                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
2146                .unwrap_or(false);
2147            if subsec {
2148                Ok(Value::Real(
2149                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
2150                ))
2151            } else {
2152                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
2153            }
2154        }
2155        "TIMEDIFF" => {
2156            check_args(name, &evaluated, 2)?;
2157            if evaluated.iter().any(|v| v.is_null()) {
2158                return Ok(Value::Null);
2159            }
2160            let a = ts_of(&evaluated[0])?;
2161            let b = ts_of(&evaluated[1])?;
2162            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
2163            let sign = if days < 0 || (days == 0 && micros < 0) {
2164                "-"
2165            } else {
2166                "+"
2167            };
2168            let abs_days = days.unsigned_abs() as i64;
2169            let abs_us = micros.unsigned_abs() as i64;
2170            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
2171            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
2172            Ok(Value::Text(
2173                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
2174            ))
2175        }
2176        "AT_TIMEZONE" => {
2177            check_args(name, &evaluated, 2)?;
2178            if evaluated.iter().any(|v| v.is_null()) {
2179                return Ok(Value::Null);
2180            }
2181            let ts = match &evaluated[0] {
2182                Value::Timestamp(t) => *t,
2183                Value::Date(d) => crate::datetime::date_to_ts(*d),
2184                other => {
2185                    return Err(SqlError::TypeMismatch {
2186                        expected: "TIMESTAMP or DATE".into(),
2187                        got: other.data_type().to_string(),
2188                    })
2189                }
2190            };
2191            let zone = match &evaluated[1] {
2192                Value::Text(s) => s.to_string(),
2193                _ => {
2194                    return Err(SqlError::TypeMismatch {
2195                        expected: "TEXT time zone".into(),
2196                        got: evaluated[1].data_type().to_string(),
2197                    })
2198                }
2199            };
2200            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
2201            let upper = zone.to_ascii_uppercase();
2202            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
2203                return Err(SqlError::InvalidTimezone(format!(
2204                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
2205                )));
2206            }
2207            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
2208            Ok(Value::Text(formatted.into()))
2209        }
2210        "JSONB_TYPEOF" | "JSON_TYPEOF" => {
2211            check_args(name, &evaluated, 1)?;
2212            if evaluated[0].is_null() {
2213                return Ok(Value::Null);
2214            }
2215            crate::json::fn_typeof(&evaluated[0])
2216        }
2217        "JSONB_ARRAY_LENGTH" | "JSON_ARRAY_LENGTH" => {
2218            check_args(name, &evaluated, 1)?;
2219            if evaluated[0].is_null() {
2220                return Ok(Value::Null);
2221            }
2222            crate::json::fn_array_length(&evaluated[0])
2223        }
2224        "JSONB_OBJECT_LENGTH" | "JSON_OBJECT_LENGTH" => {
2225            check_args(name, &evaluated, 1)?;
2226            if evaluated[0].is_null() {
2227                return Ok(Value::Null);
2228            }
2229            crate::json::fn_object_length(&evaluated[0])
2230        }
2231        "JSONB_EXTRACT_PATH" | "JSON_EXTRACT_PATH" => {
2232            if evaluated.is_empty() {
2233                return Err(SqlError::InvalidValue(format!(
2234                    "{name} requires at least 1 argument"
2235                )));
2236            }
2237            if evaluated[0].is_null() {
2238                return Ok(Value::Null);
2239            }
2240            let target = if name.eq_ignore_ascii_case("JSONB_EXTRACT_PATH") {
2241                crate::types::DataType::Jsonb
2242            } else {
2243                crate::types::DataType::Json
2244            };
2245            crate::json::fn_extract_path(&evaluated, target, false)
2246        }
2247        "JSONB_EXTRACT_PATH_TEXT" | "JSON_EXTRACT_PATH_TEXT" => {
2248            if evaluated.is_empty() {
2249                return Err(SqlError::InvalidValue(format!(
2250                    "{name} requires at least 1 argument"
2251                )));
2252            }
2253            if evaluated[0].is_null() {
2254                return Ok(Value::Null);
2255            }
2256            crate::json::fn_extract_path(&evaluated, crate::types::DataType::Text, true)
2257        }
2258        "JSON_EXTRACT" => {
2259            check_args(name, &evaluated, 2)?;
2260            if evaluated[0].is_null() || evaluated[1].is_null() {
2261                return Ok(Value::Null);
2262            }
2263            crate::json::fn_sqlite_extract(&evaluated[0], &evaluated[1])
2264        }
2265        "JSON_VALID" => {
2266            check_args(name, &evaluated, 1)?;
2267            if evaluated[0].is_null() {
2268                return Ok(Value::Null);
2269            }
2270            crate::json::fn_valid(&evaluated[0])
2271        }
2272        "JSONB_STRIP_NULLS" | "JSON_STRIP_NULLS" => {
2273            check_args(name, &evaluated, 1)?;
2274            if evaluated[0].is_null() {
2275                return Ok(Value::Null);
2276            }
2277            let target = if name.eq_ignore_ascii_case("JSONB_STRIP_NULLS") {
2278                crate::types::DataType::Jsonb
2279            } else {
2280                crate::types::DataType::Json
2281            };
2282            crate::json::fn_strip_nulls(&evaluated[0], target)
2283        }
2284        "JSONB_PRETTY" | "JSON_PRETTY" => {
2285            check_args(name, &evaluated, 1)?;
2286            if evaluated[0].is_null() {
2287                return Ok(Value::Null);
2288            }
2289            crate::json::fn_pretty(&evaluated[0])
2290        }
2291        "JSONB_BUILD_OBJECT" | "JSON_BUILD_OBJECT" => {
2292            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_OBJECT") {
2293                crate::types::DataType::Jsonb
2294            } else {
2295                crate::types::DataType::Json
2296            };
2297            crate::json::fn_build_object(&evaluated, target)
2298        }
2299        "JSONB_BUILD_ARRAY" | "JSON_BUILD_ARRAY" => {
2300            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_ARRAY") {
2301                crate::types::DataType::Jsonb
2302            } else {
2303                crate::types::DataType::Json
2304            };
2305            crate::json::fn_build_array(&evaluated, target)
2306        }
2307        "JSONB_SET" | "JSON_SET" => {
2308            if !(3..=4).contains(&evaluated.len()) {
2309                return Err(SqlError::InvalidValue(format!(
2310                    "{name} requires 3 or 4 arguments"
2311                )));
2312            }
2313            if evaluated[0].is_null() {
2314                return Ok(Value::Null);
2315            }
2316            let target = if name.eq_ignore_ascii_case("JSONB_SET") {
2317                crate::types::DataType::Jsonb
2318            } else {
2319                crate::types::DataType::Json
2320            };
2321            let create_missing = evaluated
2322                .get(3)
2323                .map(|v| matches!(v, Value::Boolean(true)))
2324                .unwrap_or(true);
2325            crate::json::fn_set(
2326                &evaluated[0],
2327                &evaluated[1],
2328                &evaluated[2],
2329                create_missing,
2330                target,
2331            )
2332        }
2333        "JSONB_INSERT" | "JSON_INSERT" => {
2334            if !(3..=4).contains(&evaluated.len()) {
2335                return Err(SqlError::InvalidValue(format!(
2336                    "{name} requires 3 or 4 arguments"
2337                )));
2338            }
2339            if evaluated[0].is_null() {
2340                return Ok(Value::Null);
2341            }
2342            let target = if name.eq_ignore_ascii_case("JSONB_INSERT") {
2343                crate::types::DataType::Jsonb
2344            } else {
2345                crate::types::DataType::Json
2346            };
2347            let insert_after = evaluated
2348                .get(3)
2349                .map(|v| matches!(v, Value::Boolean(true)))
2350                .unwrap_or(false);
2351            crate::json::fn_insert(
2352                &evaluated[0],
2353                &evaluated[1],
2354                &evaluated[2],
2355                insert_after,
2356                target,
2357            )
2358        }
2359        "TO_JSONB" | "TO_JSON" => {
2360            check_args(name, &evaluated, 1)?;
2361            let target = if name.eq_ignore_ascii_case("TO_JSONB") {
2362                crate::types::DataType::Jsonb
2363            } else {
2364                crate::types::DataType::Json
2365            };
2366            crate::json::fn_to_json(&evaluated[0], target)
2367        }
2368        "ROW_TO_JSON" | "ROW_TO_JSONB" => {
2369            check_args(name, &evaluated, 1)?;
2370            let target = if name.eq_ignore_ascii_case("ROW_TO_JSONB") {
2371                crate::types::DataType::Jsonb
2372            } else {
2373                crate::types::DataType::Json
2374            };
2375            crate::json::fn_to_json(&evaluated[0], target)
2376        }
2377        "JSON_OBJECT" => crate::json::fn_json_object(&evaluated),
2378        "JSON_EXISTS" => {
2379            check_args(name, &evaluated, 2)?;
2380            if evaluated[0].is_null() || evaluated[1].is_null() {
2381                return Ok(Value::Null);
2382            }
2383            crate::json::fn_json_exists(&evaluated[0], &evaluated[1])
2384        }
2385        "JSON_VALUE" => {
2386            check_args(name, &evaluated, 2)?;
2387            if evaluated[0].is_null() || evaluated[1].is_null() {
2388                return Ok(Value::Null);
2389            }
2390            crate::json::fn_json_value(&evaluated[0], &evaluated[1])
2391        }
2392        "JSON_QUERY" => {
2393            check_args(name, &evaluated, 2)?;
2394            if evaluated[0].is_null() || evaluated[1].is_null() {
2395                return Ok(Value::Null);
2396            }
2397            crate::json::fn_json_query(&evaluated[0], &evaluated[1], crate::types::DataType::Jsonb)
2398        }
2399        "JSONB_PATH_EXISTS" => {
2400            if evaluated[0].is_null() || evaluated[1].is_null() {
2401                return Ok(Value::Null);
2402            }
2403            crate::json::fn_jsonb_path_exists(&evaluated)
2404        }
2405        "JSONB_PATH_MATCH" => {
2406            if evaluated[0].is_null() || evaluated[1].is_null() {
2407                return Ok(Value::Null);
2408            }
2409            crate::json::fn_jsonb_path_match(&evaluated)
2410        }
2411        "JSONB_PATH_QUERY_FIRST" => {
2412            if evaluated[0].is_null() || evaluated[1].is_null() {
2413                return Ok(Value::Null);
2414            }
2415            crate::json::fn_jsonb_path_query_first(&evaluated)
2416        }
2417        "JSONB_PATH_QUERY_ARRAY" => {
2418            if evaluated[0].is_null() || evaluated[1].is_null() {
2419                return Ok(Value::Null);
2420            }
2421            crate::json::fn_jsonb_path_query_array(&evaluated)
2422        }
2423        "JSONB_PATH_EXISTS_TZ" => {
2424            if evaluated[0].is_null() || evaluated[1].is_null() {
2425                return Ok(Value::Null);
2426            }
2427            crate::json::fn_jsonb_path_exists_tz(&evaluated)
2428        }
2429        "JSONB_PATH_MATCH_TZ" => {
2430            if evaluated[0].is_null() || evaluated[1].is_null() {
2431                return Ok(Value::Null);
2432            }
2433            crate::json::fn_jsonb_path_match_tz(&evaluated)
2434        }
2435        "JSONB_PATH_QUERY_TZ" => {
2436            if evaluated[0].is_null() || evaluated[1].is_null() {
2437                return Ok(Value::Null);
2438            }
2439            crate::json::fn_jsonb_path_query_tz(&evaluated)
2440        }
2441        "JSONB_PATH_QUERY_FIRST_TZ" => {
2442            if evaluated[0].is_null() || evaluated[1].is_null() {
2443                return Ok(Value::Null);
2444            }
2445            crate::json::fn_jsonb_path_query_first_tz(&evaluated)
2446        }
2447        "JSONB_PATH_QUERY_ARRAY_TZ" => {
2448            if evaluated[0].is_null() || evaluated[1].is_null() {
2449                return Ok(Value::Null);
2450            }
2451            crate::json::fn_jsonb_path_query_array_tz(&evaluated)
2452        }
2453        "JSONB_HAS_KEY" | "JSON_HAS_KEY" => {
2454            check_args(name, &evaluated, 2)?;
2455            if evaluated[0].is_null() || evaluated[1].is_null() {
2456                return Ok(Value::Null);
2457            }
2458            crate::json::op_has_key(&evaluated[0], &evaluated[1])
2459        }
2460        "JSONB_HAS_ANY_KEY" | "JSON_HAS_ANY_KEY" => {
2461            check_args(name, &evaluated, 2)?;
2462            if evaluated[0].is_null() || evaluated[1].is_null() {
2463                return Ok(Value::Null);
2464            }
2465            crate::json::op_has_any_key(&evaluated[0], &evaluated[1])
2466        }
2467        "JSONB_HAS_ALL_KEYS" | "JSON_HAS_ALL_KEYS" => {
2468            check_args(name, &evaluated, 2)?;
2469            if evaluated[0].is_null() || evaluated[1].is_null() {
2470                return Ok(Value::Null);
2471            }
2472            crate::json::op_has_all_keys(&evaluated[0], &evaluated[1])
2473        }
2474        "TO_TSVECTOR" => fts_to_tsvector(&evaluated),
2475        "TO_TSQUERY" => fts_to_tsquery(&evaluated),
2476        "PLAINTO_TSQUERY" => fts_plainto_tsquery(&evaluated),
2477        "PHRASETO_TSQUERY" => fts_phraseto_tsquery(&evaluated),
2478        "WEBSEARCH_TO_TSQUERY" => fts_websearch_to_tsquery(&evaluated),
2479        "TS_RANK" => fts_ts_rank(&evaluated, false),
2480        "TS_RANK_CD" => fts_ts_rank(&evaluated, true),
2481        "TS_HEADLINE" => fts_ts_headline(&evaluated),
2482        "TS_LEXIZE" => fts_ts_lexize(&evaluated),
2483        "NUMNODE" => fts_numnode(&evaluated),
2484        "SETWEIGHT" => fts_setweight(&evaluated),
2485        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
2486    }
2487}
2488
2489fn fts_resolve_config_and_text(
2490    args: &[Value],
2491    fname: &str,
2492) -> Result<(crate::fts::TokenizerKind, String)> {
2493    if args.is_empty() || args.len() > 2 {
2494        return Err(SqlError::InvalidValue(format!(
2495            "{fname} requires 1 or 2 arguments"
2496        )));
2497    }
2498    let (config_name, text) = if args.len() == 2 {
2499        let cfg = match &args[0] {
2500            Value::Text(s) => Some(s.as_str().to_string()),
2501            v => {
2502                return Err(SqlError::TypeMismatch {
2503                    expected: "TEXT (config)".into(),
2504                    got: v.data_type().to_string(),
2505                })
2506            }
2507        };
2508        let txt = match &args[1] {
2509            Value::Text(s) => s.as_str().to_string(),
2510            v => {
2511                return Err(SqlError::TypeMismatch {
2512                    expected: "TEXT".into(),
2513                    got: v.data_type().to_string(),
2514                })
2515            }
2516        };
2517        (cfg, txt)
2518    } else {
2519        let txt = match &args[0] {
2520            Value::Text(s) => s.as_str().to_string(),
2521            v => {
2522                return Err(SqlError::TypeMismatch {
2523                    expected: "TEXT".into(),
2524                    got: v.data_type().to_string(),
2525                })
2526            }
2527        };
2528        (None, txt)
2529    };
2530    let kind = match config_name {
2531        Some(name) => crate::fts::TokenizerKind::from_name(&name)?,
2532        None => crate::fts::TokenizerKind::English,
2533    };
2534    Ok((kind, text))
2535}
2536
2537fn fts_to_tsvector(args: &[Value]) -> Result<Value> {
2538    if args.iter().any(|v| v.is_null()) {
2539        return Ok(Value::Null);
2540    }
2541    let (kind, text) = fts_resolve_config_and_text(args, "to_tsvector")?;
2542    crate::fts::fn_to_tsvector_with(kind, &text)
2543}
2544
2545fn fts_to_tsquery(args: &[Value]) -> Result<Value> {
2546    if args.iter().any(|v| v.is_null()) {
2547        return Ok(Value::Null);
2548    }
2549    let (kind, text) = fts_resolve_config_and_text(args, "to_tsquery")?;
2550    crate::fts::fn_to_tsquery_with(kind, &text)
2551}
2552
2553fn fts_plainto_tsquery(args: &[Value]) -> Result<Value> {
2554    if args.iter().any(|v| v.is_null()) {
2555        return Ok(Value::Null);
2556    }
2557    let (kind, text) = fts_resolve_config_and_text(args, "plainto_tsquery")?;
2558    crate::fts::fn_plainto_tsquery_with(kind, &text)
2559}
2560
2561fn fts_phraseto_tsquery(args: &[Value]) -> Result<Value> {
2562    if args.iter().any(|v| v.is_null()) {
2563        return Ok(Value::Null);
2564    }
2565    let (kind, text) = fts_resolve_config_and_text(args, "phraseto_tsquery")?;
2566    crate::fts::fn_phraseto_tsquery_with(kind, &text)
2567}
2568
2569fn fts_websearch_to_tsquery(args: &[Value]) -> Result<Value> {
2570    if args.iter().any(|v| v.is_null()) {
2571        return Ok(Value::Null);
2572    }
2573    let (kind, text) = fts_resolve_config_and_text(args, "websearch_to_tsquery")?;
2574    crate::fts::fn_websearch_to_tsquery_with(kind, &text)
2575}
2576
2577fn fts_ts_rank(args: &[Value], cover_density: bool) -> Result<Value> {
2578    let fname = if cover_density {
2579        "ts_rank_cd"
2580    } else {
2581        "ts_rank"
2582    };
2583    if args.len() != 2 && args.len() != 3 {
2584        return Err(SqlError::InvalidValue(format!(
2585            "{fname} requires 2 or 3 arguments"
2586        )));
2587    }
2588    if args[0].is_null() || args[1].is_null() {
2589        return Ok(Value::Null);
2590    }
2591    let tsv = match &args[0] {
2592        Value::TsVector(b) => b,
2593        v => {
2594            return Err(SqlError::TypeMismatch {
2595                expected: "TSVECTOR".into(),
2596                got: v.data_type().to_string(),
2597            })
2598        }
2599    };
2600    let tsq = match &args[1] {
2601        Value::TsQuery(b) => b,
2602        v => {
2603            return Err(SqlError::TypeMismatch {
2604                expected: "TSQUERY".into(),
2605                got: v.data_type().to_string(),
2606            })
2607        }
2608    };
2609    let norm = if args.len() == 3 {
2610        match &args[2] {
2611            Value::Integer(n) => *n,
2612            Value::Null => return Ok(Value::Null),
2613            v => {
2614                return Err(SqlError::TypeMismatch {
2615                    expected: "INTEGER (norm)".into(),
2616                    got: v.data_type().to_string(),
2617                })
2618            }
2619        }
2620    } else {
2621        0
2622    };
2623    if cover_density {
2624        crate::fts::fn_ts_rank_cd(tsv, tsq, norm)
2625    } else {
2626        crate::fts::fn_ts_rank(tsv, tsq, norm)
2627    }
2628}
2629
2630fn fts_ts_headline(args: &[Value]) -> Result<Value> {
2631    if args.len() < 2 || args.len() > 4 {
2632        return Err(SqlError::InvalidValue(
2633            "ts_headline requires 2 to 4 arguments".into(),
2634        ));
2635    }
2636    if args.iter().any(|v| v.is_null()) {
2637        return Ok(Value::Null);
2638    }
2639    let kind = if args.len() >= 3 {
2640        match &args[0] {
2641            Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
2642            v => {
2643                return Err(SqlError::TypeMismatch {
2644                    expected: "TEXT (config)".into(),
2645                    got: v.data_type().to_string(),
2646                })
2647            }
2648        }
2649    } else {
2650        crate::fts::TokenizerKind::English
2651    };
2652    let text_idx = if args.len() >= 3 { 1 } else { 0 };
2653    let tsq_idx = if args.len() >= 3 { 2 } else { 1 };
2654    let text = match args.get(text_idx) {
2655        Some(Value::Text(s)) => s.as_str(),
2656        _ => {
2657            return Err(SqlError::TypeMismatch {
2658                expected: "TEXT".into(),
2659                got: "non-text".into(),
2660            })
2661        }
2662    };
2663    let tsq = match args.get(tsq_idx) {
2664        Some(Value::TsQuery(b)) => b.as_ref(),
2665        _ => {
2666            return Err(SqlError::TypeMismatch {
2667                expected: "TSQUERY".into(),
2668                got: "non-tsquery".into(),
2669            })
2670        }
2671    };
2672    crate::fts::fn_ts_headline_with(kind, text, tsq)
2673}
2674
2675fn fts_ts_lexize(args: &[Value]) -> Result<Value> {
2676    if args.len() != 2 {
2677        return Err(SqlError::InvalidValue(
2678            "ts_lexize requires 2 arguments (config, word)".into(),
2679        ));
2680    }
2681    if args.iter().any(|v| v.is_null()) {
2682        return Ok(Value::Null);
2683    }
2684    let kind = match &args[0] {
2685        Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
2686        v => {
2687            return Err(SqlError::TypeMismatch {
2688                expected: "TEXT (config)".into(),
2689                got: v.data_type().to_string(),
2690            })
2691        }
2692    };
2693    let word = match &args[1] {
2694        Value::Text(s) => s.as_str(),
2695        v => {
2696            return Err(SqlError::TypeMismatch {
2697                expected: "TEXT (word)".into(),
2698                got: v.data_type().to_string(),
2699            })
2700        }
2701    };
2702    crate::fts::fn_ts_lexize_with(kind, word)
2703}
2704
2705fn fts_numnode(args: &[Value]) -> Result<Value> {
2706    check_args("numnode", args, 1)?;
2707    if args[0].is_null() {
2708        return Ok(Value::Null);
2709    }
2710    let tsq = match &args[0] {
2711        Value::TsQuery(b) => b,
2712        v => {
2713            return Err(SqlError::TypeMismatch {
2714                expected: "TSQUERY".into(),
2715                got: v.data_type().to_string(),
2716            })
2717        }
2718    };
2719    crate::fts::fn_numnode(tsq)
2720}
2721
2722fn fts_setweight(args: &[Value]) -> Result<Value> {
2723    if args.len() == 3 {
2724        return Err(SqlError::Unsupported(
2725            "setweight(tsvector, char, text[]) — 3-arg selective form deferred to v0.17".into(),
2726        ));
2727    }
2728    check_args("setweight", args, 2)?;
2729    if args[0].is_null() || args[1].is_null() {
2730        return Ok(Value::Null);
2731    }
2732    let tsv = match &args[0] {
2733        Value::TsVector(b) => b,
2734        v => {
2735            return Err(SqlError::TypeMismatch {
2736                expected: "TSVECTOR".into(),
2737                got: v.data_type().to_string(),
2738            })
2739        }
2740    };
2741    let weight_text = match &args[1] {
2742        Value::Text(s) => s.as_str(),
2743        v => {
2744            return Err(SqlError::TypeMismatch {
2745                expected: "TEXT".into(),
2746                got: v.data_type().to_string(),
2747            })
2748        }
2749    };
2750    let weight = crate::fts::parse_weight_char(weight_text)?;
2751    crate::fts::fn_setweight(tsv, weight)
2752}
2753
2754/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
2755fn ts_of(v: &Value) -> Result<i64> {
2756    match v {
2757        Value::Timestamp(t) => Ok(*t),
2758        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
2759        _ => Err(SqlError::TypeMismatch {
2760            expected: "TIMESTAMP or DATE".into(),
2761            got: v.data_type().to_string(),
2762        }),
2763    }
2764}
2765
2766fn int_arg(v: &Value, label: &str) -> Result<i64> {
2767    match v {
2768        Value::Integer(n) => Ok(*n),
2769        _ => Err(SqlError::TypeMismatch {
2770            expected: format!("INTEGER ({label})"),
2771            got: v.data_type().to_string(),
2772        }),
2773    }
2774}
2775
2776/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
2777fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
2778    match v {
2779        Value::Integer(n) => {
2780            if !(0..=60).contains(n) {
2781                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
2782            }
2783            Ok((*n as u8, 0))
2784        }
2785        Value::Real(r) => {
2786            let whole = r.trunc() as i64;
2787            if !(0..=60).contains(&whole) {
2788                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
2789            }
2790            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
2791            Ok((whole as u8, frac.max(0) as u32))
2792        }
2793        _ => Err(SqlError::TypeMismatch {
2794            expected: "numeric seconds".into(),
2795            got: v.data_type().to_string(),
2796        }),
2797    }
2798}
2799
2800/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
2801fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
2802    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
2803    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
2804    let ts = JTimestamp::from_microsecond(ts_utc)
2805        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
2806    let zoned = ts.to_zoned(zone.clone());
2807    let unit_lower = unit.to_ascii_lowercase();
2808    let rounded = match unit_lower.as_str() {
2809        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
2810        "second" => zoned
2811            .start_of_day()
2812            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
2813        _ => {
2814            let naive_ts = zoned.timestamp().as_microsecond();
2815            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
2816        }
2817    };
2818    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
2819}
2820
2821fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
2822    if args.len() != expected {
2823        Err(SqlError::InvalidValue(format!(
2824            "{name} requires {expected} argument(s), got {}",
2825            args.len()
2826        )))
2827    } else {
2828        Ok(())
2829    }
2830}
2831
2832pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
2833    let mut indices = Vec::new();
2834    collect_column_refs(expr, columns, &mut indices);
2835    indices.sort_unstable();
2836    indices.dedup();
2837    indices
2838}
2839
2840fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
2841    match expr {
2842        Expr::Column(name) => {
2843            for (i, c) in columns.iter().enumerate() {
2844                if c.name == *name
2845                    || (c.name.len() > name.len()
2846                        && c.name.as_bytes()[c.name.len() - name.len() - 1] == b'.'
2847                        && c.name.ends_with(name.as_str()))
2848                {
2849                    out.push(i);
2850                    break;
2851                }
2852            }
2853        }
2854        Expr::QualifiedColumn { table, column } => {
2855            let mut found: Option<usize> = None;
2856            let mut bare_match: Option<usize> = None;
2857            let mut bare_count = 0usize;
2858            for (i, c) in columns.iter().enumerate() {
2859                if c.name.len() == table.len() + 1 + column.len()
2860                    && c.name.as_bytes()[table.len()] == b'.'
2861                    && c.name.starts_with(table.as_str())
2862                    && c.name.ends_with(column.as_str())
2863                {
2864                    found = Some(i);
2865                    break;
2866                }
2867                if c.name == *column {
2868                    bare_match = Some(i);
2869                    bare_count += 1;
2870                }
2871            }
2872            if let Some(idx) = found {
2873                out.push(idx);
2874            } else if bare_count == 1 {
2875                out.push(bare_match.unwrap());
2876            }
2877        }
2878        Expr::BinaryOp { left, right, .. } => {
2879            collect_column_refs(left, columns, out);
2880            collect_column_refs(right, columns, out);
2881        }
2882        Expr::UnaryOp { expr, .. } => {
2883            collect_column_refs(expr, columns, out);
2884        }
2885        Expr::IsNull(e) | Expr::IsNotNull(e) => {
2886            collect_column_refs(e, columns, out);
2887        }
2888        Expr::Function { args, .. } => {
2889            for arg in args {
2890                collect_column_refs(arg, columns, out);
2891            }
2892        }
2893        Expr::InSubquery { expr, .. } => {
2894            collect_column_refs(expr, columns, out);
2895        }
2896        Expr::InList { expr, list, .. } => {
2897            collect_column_refs(expr, columns, out);
2898            for item in list {
2899                collect_column_refs(item, columns, out);
2900            }
2901        }
2902        Expr::InSet { expr, .. } => {
2903            collect_column_refs(expr, columns, out);
2904        }
2905        Expr::Between {
2906            expr, low, high, ..
2907        } => {
2908            collect_column_refs(expr, columns, out);
2909            collect_column_refs(low, columns, out);
2910            collect_column_refs(high, columns, out);
2911        }
2912        Expr::Like {
2913            expr,
2914            pattern,
2915            escape,
2916            ..
2917        } => {
2918            collect_column_refs(expr, columns, out);
2919            collect_column_refs(pattern, columns, out);
2920            if let Some(esc) = escape {
2921                collect_column_refs(esc, columns, out);
2922            }
2923        }
2924        Expr::Case {
2925            operand,
2926            conditions,
2927            else_result,
2928        } => {
2929            if let Some(op) = operand {
2930                collect_column_refs(op, columns, out);
2931            }
2932            for (when, then) in conditions {
2933                collect_column_refs(when, columns, out);
2934                collect_column_refs(then, columns, out);
2935            }
2936            if let Some(e) = else_result {
2937                collect_column_refs(e, columns, out);
2938            }
2939        }
2940        Expr::Coalesce(args) => {
2941            for arg in args {
2942                collect_column_refs(arg, columns, out);
2943            }
2944        }
2945        Expr::Cast { expr, .. } => {
2946            collect_column_refs(expr, columns, out);
2947        }
2948        Expr::Collate { expr, .. } => {
2949            collect_column_refs(expr, columns, out);
2950        }
2951        Expr::WindowFunction { args, spec, .. } => {
2952            for arg in args {
2953                collect_column_refs(arg, columns, out);
2954            }
2955            for pb in &spec.partition_by {
2956                collect_column_refs(pb, columns, out);
2957            }
2958            for ob in &spec.order_by {
2959                collect_column_refs(&ob.expr, columns, out);
2960            }
2961        }
2962        Expr::Literal(_)
2963        | Expr::Parameter(_)
2964        | Expr::CountStar
2965        | Expr::Exists { .. }
2966        | Expr::ScalarSubquery(_)
2967        | Expr::TypedNullRecord(_) => {}
2968    }
2969}
2970
2971/// Check if an expression result is truthy (for WHERE/HAVING).
2972pub fn is_truthy(val: &Value) -> bool {
2973    match val {
2974        Value::Boolean(b) => *b,
2975        Value::Integer(i) => *i != 0,
2976        Value::Null => false,
2977        _ => true,
2978    }
2979}
2980
2981#[cfg(test)]
2982#[path = "eval_tests.rs"]
2983mod tests;