Skip to main content

citadel_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use rustc_hash::FxHashMap;
4
5use crate::error::{Result, SqlError};
6use crate::parser::{BinOp, Expr, UnaryOp};
7use crate::types::{ColumnDef, CompactString, DataType, Value};
8
9#[derive(Debug)]
10pub struct ColumnMap {
11    exact: FxHashMap<String, usize>,
12    short: FxHashMap<String, ShortMatch>,
13    collations: Vec<crate::types::Collation>,
14    has_non_binary_collation: bool,
15}
16
17#[derive(Clone, Debug)]
18enum ShortMatch {
19    Unique(usize),
20    Ambiguous,
21}
22
23impl Clone for ColumnMap {
24    fn clone(&self) -> Self {
25        Self {
26            exact: self.exact.clone(),
27            short: self.short.clone(),
28            collations: self.collations.clone(),
29            has_non_binary_collation: self.has_non_binary_collation,
30        }
31    }
32}
33
34impl ColumnMap {
35    pub fn new(columns: &[ColumnDef]) -> Self {
36        let mut exact = FxHashMap::with_capacity_and_hasher(columns.len() * 2, Default::default());
37        let mut short: FxHashMap<String, ShortMatch> =
38            FxHashMap::with_capacity_and_hasher(columns.len(), Default::default());
39        let mut collations = Vec::with_capacity(columns.len());
40        let mut has_non_binary_collation = false;
41
42        for (i, col) in columns.iter().enumerate() {
43            let lower = col.name.to_ascii_lowercase();
44            exact.insert(lower.clone(), i);
45
46            let unqualified = if let Some(dot) = lower.rfind('.') {
47                &lower[dot + 1..]
48            } else {
49                &lower
50            };
51            short
52                .entry(unqualified.to_string())
53                .and_modify(|e| *e = ShortMatch::Ambiguous)
54                .or_insert(ShortMatch::Unique(i));
55            collations.push(col.collation);
56            if col.collation != crate::types::Collation::Binary {
57                has_non_binary_collation = true;
58            }
59        }
60
61        Self {
62            exact,
63            short,
64            collations,
65            has_non_binary_collation,
66        }
67    }
68
69    pub(crate) fn collation_at(&self, idx: usize) -> crate::types::Collation {
70        self.collations
71            .get(idx)
72            .copied()
73            .unwrap_or(crate::types::Collation::Binary)
74    }
75
76    #[inline]
77    pub(crate) fn has_non_binary_collation(&self) -> bool {
78        self.has_non_binary_collation
79    }
80
81    pub(crate) fn resolve(&self, name: &str) -> Result<usize> {
82        if let Some(&idx) = self.exact.get(name) {
83            return Ok(idx);
84        }
85        match self.short.get(name) {
86            Some(ShortMatch::Unique(idx)) => Ok(*idx),
87            Some(ShortMatch::Ambiguous) => Err(SqlError::AmbiguousColumn(name.to_string())),
88            None => Err(SqlError::ColumnNotFound(name.to_string())),
89        }
90    }
91
92    pub(crate) fn resolve_qualified(&self, table: &str, column: &str) -> Result<usize> {
93        let qualified = format!("{table}.{column}");
94        if let Some(&idx) = self.exact.get(&qualified) {
95            return Ok(idx);
96        }
97        match self.short.get(column) {
98            Some(ShortMatch::Unique(idx)) => Ok(*idx),
99            _ => Err(SqlError::ColumnNotFound(format!("{table}.{column}"))),
100        }
101    }
102}
103
104pub struct EvalCtx<'a> {
105    pub col_map: &'a ColumnMap,
106    pub row: &'a [Value],
107    pub params: &'a [Value],
108    pub excluded: Option<ExcludedRow<'a>>,
109    pub old_new: Option<OldNewRows<'a>>,
110    pub session_tz: Option<jiff::tz::TimeZone>,
111}
112
113pub struct ExcludedRow<'a> {
114    pub col_map: &'a ColumnMap,
115    pub row: &'a [Value],
116}
117
118pub struct OldNewRows<'a> {
119    pub col_map: &'a ColumnMap,
120    pub old_row: Option<&'a [Value]>,
121    pub new_row: Option<&'a [Value]>,
122}
123
124impl<'a> EvalCtx<'a> {
125    pub fn new(col_map: &'a ColumnMap, row: &'a [Value]) -> Self {
126        Self {
127            col_map,
128            row,
129            params: &[],
130            excluded: None,
131            old_new: None,
132            session_tz: None,
133        }
134    }
135
136    pub fn with_session_tz(mut self, tz: Option<jiff::tz::TimeZone>) -> Self {
137        self.session_tz = tz;
138        self
139    }
140
141    pub fn with_params(col_map: &'a ColumnMap, row: &'a [Value], params: &'a [Value]) -> Self {
142        Self {
143            col_map,
144            row,
145            params,
146            excluded: None,
147            old_new: None,
148            session_tz: None,
149        }
150    }
151
152    pub fn with_excluded(
153        col_map: &'a ColumnMap,
154        row: &'a [Value],
155        excluded_col_map: &'a ColumnMap,
156        excluded_row: &'a [Value],
157    ) -> Self {
158        Self {
159            col_map,
160            row,
161            params: &[],
162            excluded: Some(ExcludedRow {
163                col_map: excluded_col_map,
164                row: excluded_row,
165            }),
166            old_new: None,
167            session_tz: None,
168        }
169    }
170
171    pub fn with_old_new(
172        col_map: &'a ColumnMap,
173        row: &'a [Value],
174        old_row: Option<&'a [Value]>,
175        new_row: Option<&'a [Value]>,
176    ) -> Self {
177        Self {
178            col_map,
179            row,
180            params: &[],
181            excluded: None,
182            old_new: Some(OldNewRows {
183                col_map,
184                old_row,
185                new_row,
186            }),
187            session_tz: None,
188        }
189    }
190}
191
192thread_local! {
193    static SCOPED_PARAMS: std::cell::Cell<(*const Value, usize)> =
194        const { std::cell::Cell::new((std::ptr::null(), 0)) };
195}
196
197pub fn with_scoped_params<R>(params: &[Value], f: impl FnOnce() -> R) -> R {
198    struct Guard((*const Value, usize));
199    impl Drop for Guard {
200        fn drop(&mut self) {
201            SCOPED_PARAMS.with(|slot| slot.set(self.0));
202        }
203    }
204    SCOPED_PARAMS.with(|slot| {
205        let prev = slot.get();
206        slot.set((params.as_ptr(), params.len()));
207        let _guard = Guard(prev);
208        f()
209    })
210}
211
212fn resolve_parameter(n: usize, ctx_params: &[Value]) -> Result<Value> {
213    if !ctx_params.is_empty() {
214        if n == 0 || n > ctx_params.len() {
215            return Err(SqlError::ParameterCountMismatch {
216                expected: n,
217                got: ctx_params.len(),
218            });
219        }
220        return Ok(ctx_params[n - 1].clone());
221    }
222    resolve_scoped_param(n)
223}
224
225pub fn resolve_scoped_param(n: usize) -> Result<Value> {
226    SCOPED_PARAMS.with(|slot| {
227        let (ptr, len) = slot.get();
228        if n == 0 || n > len {
229            return Err(SqlError::ParameterCountMismatch {
230                expected: n,
231                got: len,
232            });
233        }
234        // SAFETY: `with_scoped_params` keeps the slice alive for the body's run.
235        unsafe { Ok((*ptr.add(n - 1)).clone()) }
236    })
237}
238
239pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value> {
240    match expr {
241        Expr::Literal(v) => Ok(v.clone()),
242
243        Expr::Column(name) => {
244            let idx = ctx.col_map.resolve(name)?;
245            Ok(ctx.row[idx].clone())
246        }
247
248        Expr::QualifiedColumn { table, column } => {
249            if let Some(excluded) = ctx.excluded.as_ref() {
250                if table.eq_ignore_ascii_case("excluded") {
251                    let lowered = column.to_ascii_lowercase();
252                    let idx = excluded.col_map.resolve(&lowered)?;
253                    return Ok(excluded.row[idx].clone());
254                }
255            }
256            if let Some(on) = ctx.old_new.as_ref() {
257                if table.eq_ignore_ascii_case("old") {
258                    let lowered = column.to_ascii_lowercase();
259                    let idx = on.col_map.resolve(&lowered)?;
260                    return Ok(on.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
261                }
262                if table.eq_ignore_ascii_case("new") {
263                    let lowered = column.to_ascii_lowercase();
264                    let idx = on.col_map.resolve(&lowered)?;
265                    return Ok(on.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
266                }
267            }
268            // Trigger-body fallback: nested executor calls don't carry `ctx.old_new`.
269            if table.eq_ignore_ascii_case("old") || table.eq_ignore_ascii_case("new") {
270                if let Some(b) = crate::executor::triggers::current_bindings() {
271                    let lowered = column.to_ascii_lowercase();
272                    if table.eq_ignore_ascii_case("old") {
273                        let cm = ColumnMap::new(&b.old_columns);
274                        let idx = cm.resolve(&lowered)?;
275                        return Ok(b.old_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
276                    } else {
277                        let cm = ColumnMap::new(&b.new_columns);
278                        let idx = cm.resolve(&lowered)?;
279                        return Ok(b.new_row.map(|r| r[idx].clone()).unwrap_or(Value::Null));
280                    }
281                }
282            }
283            let idx = ctx.col_map.resolve_qualified(table, column)?;
284            Ok(ctx.row[idx].clone())
285        }
286
287        Expr::BinaryOp { left, op, right } => {
288            let lval = eval_expr(left, ctx)?;
289            let rval = eval_expr(right, ctx)?;
290            let needs_collation_check = ctx.col_map.has_non_binary_collation()
291                || matches!(left.as_ref(), Expr::Collate { .. })
292                || matches!(right.as_ref(), Expr::Collate { .. });
293            if needs_collation_check {
294                let coll = collation_of(left)
295                    .or_else(|| collation_of(right))
296                    .or_else(|| {
297                        column_collation(left, ctx).or_else(|| column_collation(right, ctx))
298                    });
299                if let Some(c) = coll {
300                    if c != crate::types::Collation::Binary {
301                        if let Some(b) = eval_text_compare(&lval, *op, &rval, c) {
302                            return Ok(Value::Boolean(b));
303                        }
304                    }
305                }
306            }
307            eval_binary_op(&lval, *op, &rval)
308        }
309
310        Expr::UnaryOp { op, expr } => {
311            let val = eval_expr(expr, ctx)?;
312            eval_unary_op(*op, &val)
313        }
314
315        Expr::IsNull(e) => {
316            let val = eval_expr(e, ctx)?;
317            Ok(Value::Boolean(val.is_null()))
318        }
319
320        Expr::IsNotNull(e) => {
321            let val = eval_expr(e, ctx)?;
322            Ok(Value::Boolean(!val.is_null()))
323        }
324
325        Expr::Function { name, args, .. } => eval_scalar_function(name, args, ctx),
326
327        Expr::CountStar => Err(SqlError::Unsupported(
328            "COUNT(*) in non-aggregate context".into(),
329        )),
330
331        Expr::InList {
332            expr: e,
333            list,
334            negated,
335        } => {
336            let lhs = eval_expr(e, ctx)?;
337            eval_in_values(&lhs, list, ctx, *negated)
338        }
339
340        Expr::InSet {
341            expr: e,
342            values,
343            has_null,
344            negated,
345        } => {
346            let lhs = eval_expr(e, ctx)?;
347            eval_in_set(&lhs, values, *has_null, *negated)
348        }
349
350        Expr::Between {
351            expr: e,
352            low,
353            high,
354            negated,
355        } => {
356            let val = eval_expr(e, ctx)?;
357            let lo = eval_expr(low, ctx)?;
358            let hi = eval_expr(high, ctx)?;
359            eval_between(&val, &lo, &hi, *negated)
360        }
361
362        Expr::Like {
363            expr: e,
364            pattern,
365            escape,
366            negated,
367        } => {
368            let val = eval_expr(e, ctx)?;
369            let pat = eval_expr(pattern, ctx)?;
370            let esc = escape.as_ref().map(|e| eval_expr(e, ctx)).transpose()?;
371            eval_like(&val, &pat, esc.as_ref(), *negated)
372        }
373
374        Expr::Case {
375            operand,
376            conditions,
377            else_result,
378        } => eval_case(operand.as_deref(), conditions, else_result.as_deref(), ctx),
379
380        Expr::Coalesce(args) => {
381            for arg in args {
382                let val = eval_expr(arg, ctx)?;
383                if !val.is_null() {
384                    return Ok(val);
385                }
386            }
387            Ok(Value::Null)
388        }
389
390        Expr::Cast { expr: e, data_type } => {
391            let val = eval_expr(e, ctx)?;
392            eval_cast(&val, *data_type)
393        }
394
395        Expr::Collate { expr: e, .. } => eval_expr(e, ctx),
396
397        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => Err(
398            SqlError::Unsupported("subquery not materialized (internal error)".into()),
399        ),
400
401        Expr::Parameter(n) => resolve_parameter(*n, ctx.params),
402
403        Expr::WindowFunction { .. } => Err(SqlError::Unsupported(
404            "window functions are only allowed in SELECT columns".into(),
405        )),
406
407        Expr::TypedNullRecord(_) => Ok(Value::Null),
408
409        Expr::ArrayLiteral(elems) => {
410            let mut out = Vec::with_capacity(elems.len());
411            for e in elems {
412                out.push(eval_expr(e, ctx)?);
413            }
414            Ok(Value::Array(std::sync::Arc::new(out)))
415        }
416
417        Expr::Quantified {
418            left,
419            op,
420            quantifier,
421            right,
422        } => eval_quantified(left, *op, *quantifier, right, ctx),
423    }
424}
425
426fn eval_quantified(
427    left: &Expr,
428    op: crate::parser::BinOp,
429    quantifier: crate::parser::Quantifier,
430    right: &crate::parser::QuantifiedRhs,
431    ctx: &EvalCtx,
432) -> Result<Value> {
433    use crate::parser::{QuantifiedRhs, Quantifier};
434    let lhs = eval_expr(left, ctx)?;
435    let elems: Vec<Value> = match right {
436        QuantifiedRhs::Array(e) => match eval_expr(e, ctx)? {
437            Value::Array(a) => (*a).clone(),
438            Value::Null => return Ok(Value::Null),
439            other => {
440                return Err(SqlError::TypeMismatch {
441                    expected: "ARRAY".into(),
442                    got: other.data_type().to_string(),
443                });
444            }
445        },
446        QuantifiedRhs::Subquery(_) => {
447            return Err(SqlError::Unsupported(
448                "ANY/ALL subquery not materialized (internal error)".into(),
449            ));
450        }
451    };
452
453    if lhs.is_null() {
454        return if elems.is_empty() {
455            match quantifier {
456                Quantifier::Any => Ok(Value::Boolean(false)),
457                Quantifier::All => Ok(Value::Boolean(true)),
458            }
459        } else {
460            Ok(Value::Null)
461        };
462    }
463
464    let mut any_unknown = false;
465    let mut any_match = false;
466    let mut any_mismatch = false;
467    for elem in &elems {
468        if elem.is_null() {
469            any_unknown = true;
470            continue;
471        }
472        let result = eval_binary_compare(&lhs, op, elem)?;
473        match result {
474            Value::Boolean(true) => any_match = true,
475            Value::Boolean(false) => any_mismatch = true,
476            Value::Null => any_unknown = true,
477            _ => {
478                return Err(SqlError::TypeMismatch {
479                    expected: "BOOLEAN".into(),
480                    got: result.data_type().to_string(),
481                });
482            }
483        }
484    }
485
486    match quantifier {
487        Quantifier::Any => {
488            if any_match {
489                Ok(Value::Boolean(true))
490            } else if any_unknown {
491                Ok(Value::Null)
492            } else {
493                Ok(Value::Boolean(false))
494            }
495        }
496        Quantifier::All => {
497            if any_mismatch {
498                Ok(Value::Boolean(false))
499            } else if any_unknown {
500                Ok(Value::Null)
501            } else {
502                Ok(Value::Boolean(true))
503            }
504        }
505    }
506}
507
508fn eval_binary_compare(left: &Value, op: crate::parser::BinOp, right: &Value) -> Result<Value> {
509    use crate::parser::BinOp;
510    if left.is_null() || right.is_null() {
511        return Ok(Value::Null);
512    }
513    let cmp = match (left, right) {
514        (Value::Text(a), Value::Text(b)) => Some(a.cmp(b)),
515        _ => left.partial_cmp(right),
516    };
517    let Some(cmp) = cmp else {
518        return Ok(Value::Null);
519    };
520    use std::cmp::Ordering;
521    let result = match op {
522        BinOp::Eq => cmp == Ordering::Equal,
523        BinOp::NotEq => cmp != Ordering::Equal,
524        BinOp::Lt => cmp == Ordering::Less,
525        BinOp::Gt => cmp == Ordering::Greater,
526        BinOp::LtEq => cmp != Ordering::Greater,
527        BinOp::GtEq => cmp != Ordering::Less,
528        _ => {
529            return Err(SqlError::Unsupported(format!(
530                "ANY/ALL comparison op {op:?}"
531            )));
532        }
533    };
534    Ok(Value::Boolean(result))
535}
536
537fn collation_of(expr: &Expr) -> Option<crate::types::Collation> {
538    match expr {
539        Expr::Collate { collation, .. } => Some(*collation),
540        _ => None,
541    }
542}
543
544fn column_collation(expr: &Expr, ctx: &EvalCtx<'_>) -> Option<crate::types::Collation> {
545    match expr {
546        Expr::Column(name) => ctx
547            .col_map
548            .resolve(name)
549            .ok()
550            .map(|i| ctx.col_map.collation_at(i)),
551        Expr::QualifiedColumn { table, column } => ctx
552            .col_map
553            .resolve_qualified(table, column)
554            .ok()
555            .map(|i| ctx.col_map.collation_at(i)),
556        _ => None,
557    }
558}
559
560fn eval_text_compare(
561    left: &Value,
562    op: BinOp,
563    right: &Value,
564    coll: crate::types::Collation,
565) -> Option<bool> {
566    let (a, b) = match (left, right) {
567        (Value::Null, _) | (_, Value::Null) => return None,
568        (Value::Text(a), Value::Text(b)) => (a.as_str(), b.as_str()),
569        _ => return None,
570    };
571    let ord = coll.cmp_text(a, b);
572    Some(match op {
573        BinOp::Eq => ord == std::cmp::Ordering::Equal,
574        BinOp::NotEq => ord != std::cmp::Ordering::Equal,
575        BinOp::Lt => ord == std::cmp::Ordering::Less,
576        BinOp::Gt => ord == std::cmp::Ordering::Greater,
577        BinOp::LtEq => ord != std::cmp::Ordering::Greater,
578        BinOp::GtEq => ord != std::cmp::Ordering::Less,
579        _ => return None,
580    })
581}
582
583pub fn eval_binary_op_public(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
584    eval_binary_op(left, op, right)
585}
586
587fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
588    match op {
589        BinOp::And => return eval_and(left, right),
590        BinOp::Or => return eval_or(left, right),
591        _ => {}
592    }
593
594    if left.is_null() || right.is_null() {
595        return Ok(Value::Null);
596    }
597
598    if let Some(res) = eval_temporal_op(left, op, right) {
599        return res;
600    }
601
602    match op {
603        BinOp::Eq => Ok(Value::Boolean(left == right)),
604        BinOp::NotEq => Ok(Value::Boolean(left != right)),
605        BinOp::Lt => Ok(Value::Boolean(left < right)),
606        BinOp::Gt => Ok(Value::Boolean(left > right)),
607        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
608        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
609        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
610        BinOp::Sub => match left {
611            Value::Json(_) | Value::Jsonb(_) => crate::json::op_delete_one(left, right),
612            _ => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
613        },
614        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
615        BinOp::Div => {
616            match right {
617                Value::Integer(0) => return Err(SqlError::DivisionByZero),
618                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
619                _ => {}
620            }
621            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
622        }
623        BinOp::Mod => {
624            match right {
625                Value::Integer(0) => return Err(SqlError::DivisionByZero),
626                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
627                _ => {}
628            }
629            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
630        }
631        BinOp::Concat => match (left, right) {
632            (Value::TsVector(a), Value::TsVector(b)) => crate::fts::op_concat(a, b),
633            (Value::Json(_) | Value::Jsonb(_), _) | (_, Value::Json(_) | Value::Jsonb(_)) => {
634                crate::json::op_concat(left, right)
635            }
636            _ => {
637                let ls = value_to_text(left);
638                let rs = value_to_text(right);
639                Ok(Value::Text(format!("{ls}{rs}").into()))
640            }
641        },
642        BinOp::JsonGet
643        | BinOp::JsonGetText
644        | BinOp::JsonPath
645        | BinOp::JsonPathText
646        | BinOp::JsonContains
647        | BinOp::JsonContainedBy
648        | BinOp::JsonHasKey
649        | BinOp::JsonHasAnyKey
650        | BinOp::JsonHasAllKeys
651        | BinOp::JsonDeletePath
652        | BinOp::JsonPathExists
653        | BinOp::JsonPathMatch
654        | BinOp::JsonPathExistsTz
655        | BinOp::JsonPathMatchTz => eval_json_binary_op(left, op, right),
656        BinOp::VectorL2 => eval_vector_distance(left, right, VectorMetric::L2),
657        BinOp::VectorInner => eval_vector_distance(left, right, VectorMetric::Inner),
658        BinOp::VectorCosine => eval_vector_distance(left, right, VectorMetric::Cosine),
659        BinOp::And | BinOp::Or => unreachable!(),
660    }
661}
662
663#[cold]
664fn eval_json_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
665    match op {
666        BinOp::JsonGet => crate::json::op_get(left, right),
667        BinOp::JsonGetText => crate::json::op_get_text(left, right),
668        BinOp::JsonPath => crate::json::op_path(left, right),
669        BinOp::JsonPathText => crate::json::op_path_text(left, right),
670        BinOp::JsonContains => crate::json::op_contains(left, right),
671        BinOp::JsonContainedBy => crate::json::op_contained_by(left, right),
672        BinOp::JsonHasKey => crate::json::op_has_key(left, right),
673        BinOp::JsonHasAnyKey => crate::json::op_has_any_key(left, right),
674        BinOp::JsonHasAllKeys => crate::json::op_has_all_keys(left, right),
675        BinOp::JsonDeletePath => crate::json::op_delete_path(left, right),
676        BinOp::JsonPathExists => crate::json::op_path_exists(left, right),
677        BinOp::JsonPathMatch => eval_at_at(left, right),
678        BinOp::JsonPathExistsTz => {
679            crate::json::fn_jsonb_path_exists_tz(&[left.clone(), right.clone()])
680        }
681        BinOp::JsonPathMatchTz => {
682            crate::json::fn_jsonb_path_match_tz(&[left.clone(), right.clone()])
683        }
684        BinOp::VectorL2 => eval_vector_distance(left, right, VectorMetric::L2),
685        BinOp::VectorInner => eval_vector_distance(left, right, VectorMetric::Inner),
686        BinOp::VectorCosine => eval_vector_distance(left, right, VectorMetric::Cosine),
687        _ => unreachable!(),
688    }
689}
690
691#[derive(Copy, Clone)]
692enum VectorMetric {
693    L2,
694    Inner,
695    Cosine,
696}
697
698fn eval_vector_distance(left: &Value, right: &Value, metric: VectorMetric) -> Result<Value> {
699    if left.is_null() || right.is_null() {
700        return Ok(Value::Null);
701    }
702    let (a, b) = match (left, right) {
703        (Value::Vector(a), Value::Vector(b)) => (a.as_ref(), b.as_ref()),
704        _ => {
705            return Err(SqlError::TypeMismatch {
706                expected: "VECTOR".into(),
707                got: format!("{} vs {}", left.data_type(), right.data_type()),
708            });
709        }
710    };
711    if a.len() != b.len() {
712        return Err(SqlError::InvalidValue(format!(
713            "vector dimension mismatch: {} vs {}",
714            a.len(),
715            b.len()
716        )));
717    }
718    let d = match metric {
719        VectorMetric::L2 => {
720            let mut sum = 0.0f64;
721            for (x, y) in a.iter().zip(b.iter()) {
722                let diff = (*x as f64) - (*y as f64);
723                sum += diff * diff;
724            }
725            sum.sqrt()
726        }
727        VectorMetric::Inner => {
728            let mut sum = 0.0f64;
729            for (x, y) in a.iter().zip(b.iter()) {
730                sum += (*x as f64) * (*y as f64);
731            }
732            -sum
733        }
734        VectorMetric::Cosine => {
735            let mut dot = 0.0f64;
736            let mut na = 0.0f64;
737            let mut nb = 0.0f64;
738            for (x, y) in a.iter().zip(b.iter()) {
739                let xf = *x as f64;
740                let yf = *y as f64;
741                dot += xf * yf;
742                na += xf * xf;
743                nb += yf * yf;
744            }
745            let denom = na.sqrt() * nb.sqrt();
746            if denom == 0.0 {
747                return Ok(Value::Null);
748            }
749            1.0 - dot / denom
750        }
751    };
752    Ok(Value::Real(d))
753}
754
755fn eval_at_at(left: &Value, right: &Value) -> Result<Value> {
756    use crate::types::DataType as D;
757    if left.is_null() || right.is_null() {
758        return Ok(Value::Null);
759    }
760    match (left.data_type(), right.data_type()) {
761        (D::Json | D::Jsonb, D::Text) => crate::json::op_path_match(left, right),
762        (D::TsVector, D::TsQuery) => match (left, right) {
763            (Value::TsVector(v), Value::TsQuery(q)) => crate::fts::op_match(v, q),
764            _ => unreachable!(),
765        },
766        (D::TsQuery, D::TsVector) => match (left, right) {
767            (Value::TsQuery(q), Value::TsVector(v)) => crate::fts::op_match(v, q),
768            _ => unreachable!(),
769        },
770        (D::Text, D::TsQuery) => {
771            let s = match left {
772                Value::Text(s) => s.as_str(),
773                _ => unreachable!(),
774            };
775            let lhs = crate::fts::fn_to_tsvector(s)?;
776            eval_at_at(&lhs, right)
777        }
778        (D::TsVector, D::Text) => {
779            let s = match right {
780                Value::Text(s) => s.as_str(),
781                _ => unreachable!(),
782            };
783            let rhs = crate::fts::fn_plainto_tsquery(s)?;
784            eval_at_at(left, &rhs)
785        }
786        (D::Text, D::Text) => {
787            let ls = match left {
788                Value::Text(s) => s.as_str(),
789                _ => unreachable!(),
790            };
791            let rs = match right {
792                Value::Text(s) => s.as_str(),
793                _ => unreachable!(),
794            };
795            let lhs = crate::fts::fn_to_tsvector(ls)?;
796            let rhs = crate::fts::fn_plainto_tsquery(rs)?;
797            eval_at_at(&lhs, &rhs)
798        }
799        (lt, rt) => Err(SqlError::TypeMismatch {
800            expected: "JSONB @@ text, tsvector @@ tsquery".into(),
801            got: format!("{lt} @@ {rt}"),
802        }),
803    }
804}
805
806/// Returns `Some` when `(left, op, right)` is a temporal operation; `None` to fall through.
807fn eval_temporal_op(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
808    use crate::datetime as dt;
809    use std::cmp::Ordering;
810
811    let is_temporal = |v: &Value| {
812        matches!(
813            v,
814            Value::Date(_) | Value::Time(_) | Value::Timestamp(_) | Value::Interval { .. }
815        )
816    };
817    if !is_temporal(left) && !is_temporal(right) {
818        return None;
819    }
820    if matches!(op, BinOp::Add | BinOp::Sub)
821        && ((is_temporal(left) && matches!(right, Value::Real(_)))
822            || (matches!(left, Value::Real(_)) && is_temporal(right)))
823    {
824        return Some(Err(SqlError::TypeMismatch {
825            expected: "INTEGER or INTERVAL for date/time arithmetic (use CAST for REAL)".into(),
826            got: format!("{} and {}", left.data_type(), right.data_type()),
827        }));
828    }
829
830    match (left, op, right) {
831        (Value::Date(d), BinOp::Add, Value::Integer(n))
832        | (Value::Integer(n), BinOp::Add, Value::Date(d)) => {
833            Some(dt::add_days_to_date(*d, *n).map(Value::Date))
834        }
835        (Value::Date(d), BinOp::Sub, Value::Integer(n)) => {
836            Some(dt::add_days_to_date(*d, -*n).map(Value::Date))
837        }
838        (Value::Date(a), BinOp::Sub, Value::Date(b)) => {
839            Some(Ok(Value::Integer(*a as i64 - *b as i64)))
840        }
841        // DATE ± INTERVAL → TIMESTAMP (PG rule).
842        (
843            Value::Date(d),
844            BinOp::Add,
845            Value::Interval {
846                months,
847                days,
848                micros,
849            },
850        )
851        | (
852            Value::Interval {
853                months,
854                days,
855                micros,
856            },
857            BinOp::Add,
858            Value::Date(d),
859        ) => Some(dt::add_interval_to_date(*d, *months, *days, *micros).map(Value::Timestamp)),
860        (
861            Value::Date(d),
862            BinOp::Sub,
863            Value::Interval {
864                months,
865                days,
866                micros,
867            },
868        ) => Some(dt::add_interval_to_date(*d, -*months, -*days, -*micros).map(Value::Timestamp)),
869        (
870            Value::Timestamp(t),
871            BinOp::Add,
872            Value::Interval {
873                months,
874                days,
875                micros,
876            },
877        )
878        | (
879            Value::Interval {
880                months,
881                days,
882                micros,
883            },
884            BinOp::Add,
885            Value::Timestamp(t),
886        ) => Some(dt::add_interval_to_timestamp(*t, *months, *days, *micros).map(Value::Timestamp)),
887        (
888            Value::Timestamp(t),
889            BinOp::Sub,
890            Value::Interval {
891                months,
892                days,
893                micros,
894            },
895        ) => Some(
896            dt::add_interval_to_timestamp(*t, -*months, -*days, -*micros).map(Value::Timestamp),
897        ),
898        (Value::Timestamp(a), BinOp::Sub, Value::Timestamp(b)) => {
899            let (days, micros) = dt::subtract_timestamps(*a, *b);
900            Some(Ok(Value::Interval {
901                months: 0,
902                days,
903                micros,
904            }))
905        }
906        (
907            Value::Time(t),
908            BinOp::Add,
909            Value::Interval {
910                months,
911                days,
912                micros,
913            },
914        ) => Some(dt::add_interval_to_time(*t, *months, *days, *micros).map(Value::Time)),
915        (
916            Value::Time(t),
917            BinOp::Sub,
918            Value::Interval {
919                months,
920                days,
921                micros,
922            },
923        ) => Some(dt::add_interval_to_time(*t, -*months, -*days, -*micros).map(Value::Time)),
924        (Value::Time(a), BinOp::Sub, Value::Time(b)) => Some(Ok(Value::Interval {
925            months: 0,
926            days: 0,
927            micros: *a - *b,
928        })),
929        (
930            Value::Interval {
931                months: am,
932                days: ad,
933                micros: au,
934            },
935            BinOp::Add,
936            Value::Interval {
937                months: bm,
938                days: bd,
939                micros: bu,
940            },
941        ) => Some(Ok(Value::Interval {
942            months: am.saturating_add(*bm),
943            days: ad.saturating_add(*bd),
944            micros: au.saturating_add(*bu),
945        })),
946        (
947            Value::Interval {
948                months: am,
949                days: ad,
950                micros: au,
951            },
952            BinOp::Sub,
953            Value::Interval {
954                months: bm,
955                days: bd,
956                micros: bu,
957            },
958        ) => Some(Ok(Value::Interval {
959            months: am.saturating_sub(*bm),
960            days: ad.saturating_sub(*bd),
961            micros: au.saturating_sub(*bu),
962        })),
963        (
964            Value::Interval {
965                months,
966                days,
967                micros,
968            },
969            BinOp::Mul,
970            Value::Integer(n),
971        )
972        | (
973            Value::Integer(n),
974            BinOp::Mul,
975            Value::Interval {
976                months,
977                days,
978                micros,
979            },
980        ) => {
981            let n32 = (*n).clamp(i32::MIN as i64, i32::MAX as i64) as i32;
982            Some(Ok(Value::Interval {
983                months: months.saturating_mul(n32),
984                days: days.saturating_mul(n32),
985                micros: micros.saturating_mul(*n),
986            }))
987        }
988        // INTERVAL * REAL — fractional months → days, fractional days → micros (PG).
989        (
990            Value::Interval {
991                months,
992                days,
993                micros,
994            },
995            BinOp::Mul,
996            Value::Real(r),
997        )
998        | (
999            Value::Real(r),
1000            BinOp::Mul,
1001            Value::Interval {
1002                months,
1003                days,
1004                micros,
1005            },
1006        ) => Some(Ok(scale_interval_by_real(*months, *days, *micros, *r))),
1007        (
1008            Value::Interval {
1009                months,
1010                days,
1011                micros,
1012            },
1013            BinOp::Div,
1014            Value::Integer(n),
1015        ) if *n != 0 => Some(Ok(Value::Interval {
1016            months: (*months as i64 / *n) as i32,
1017            days: (*days as i64 / *n) as i32,
1018            micros: *micros / *n,
1019        })),
1020        (
1021            Value::Interval {
1022                months,
1023                days,
1024                micros,
1025            },
1026            BinOp::Div,
1027            Value::Real(r),
1028        ) if *r != 0.0 => Some(Ok(scale_interval_by_real(*months, *days, *micros, 1.0 / r))),
1029        // PG-normalized INTERVAL compare: 30-day month, 24-hour day.
1030        (
1031            Value::Interval {
1032                months: am,
1033                days: ad,
1034                micros: au,
1035            },
1036            op,
1037            Value::Interval {
1038                months: bm,
1039                days: bd,
1040                micros: bu,
1041            },
1042        ) if matches!(
1043            op,
1044            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
1045        ) =>
1046        {
1047            let ord = dt::pg_normalized_interval_cmp((*am, *ad, *au), (*bm, *bd, *bu));
1048            let b = match op {
1049                BinOp::Eq => ord == Ordering::Equal,
1050                BinOp::NotEq => ord != Ordering::Equal,
1051                BinOp::Lt => ord == Ordering::Less,
1052                BinOp::Gt => ord == Ordering::Greater,
1053                BinOp::LtEq => ord != Ordering::Greater,
1054                BinOp::GtEq => ord != Ordering::Less,
1055                _ => unreachable!(),
1056            };
1057            Some(Ok(Value::Boolean(b)))
1058        }
1059        // PG rejects TIMESTAMP ± INTEGER; require CAST to INTERVAL.
1060        (Value::Timestamp(_), BinOp::Add | BinOp::Sub, Value::Integer(_))
1061        | (Value::Integer(_), BinOp::Add, Value::Timestamp(_)) => {
1062            Some(Err(SqlError::TypeMismatch {
1063                expected: "INTERVAL (use CAST or explicit unit)".into(),
1064                got: format!("{} and {}", left.data_type(), right.data_type()),
1065            }))
1066        }
1067        // Comparison with one temporal side: coerce the literal to that type.
1068        (l, op, r)
1069            if matches!(
1070                op,
1071                BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
1072            ) =>
1073        {
1074            temporal_compare(l, op, r)
1075        }
1076        _ => None,
1077    }
1078}
1079
1080/// Compares values where one side is temporal, coercing the other to match.
1081fn temporal_compare(left: &Value, op: BinOp, right: &Value) -> Option<Result<Value>> {
1082    let (a, b) = coerce_temporal_pair(left, right)?;
1083    let ord = a.cmp(&b);
1084    use std::cmp::Ordering;
1085    let result = match op {
1086        BinOp::Eq => ord == Ordering::Equal,
1087        BinOp::NotEq => ord != Ordering::Equal,
1088        BinOp::Lt => ord == Ordering::Less,
1089        BinOp::Gt => ord == Ordering::Greater,
1090        BinOp::LtEq => ord != Ordering::Greater,
1091        BinOp::GtEq => ord != Ordering::Less,
1092        _ => return None,
1093    };
1094    Some(Ok(Value::Boolean(result)))
1095}
1096
1097/// Coerces a TEXT/INTEGER (or DATE/TIMESTAMP) operand to match the temporal side.
1098fn coerce_temporal_pair(left: &Value, right: &Value) -> Option<(Value, Value)> {
1099    use crate::types::DataType;
1100    let temporal_type = |v: &Value| match v {
1101        Value::Date(_) => Some(DataType::Date),
1102        Value::Time(_) => Some(DataType::Time),
1103        Value::Timestamp(_) => Some(DataType::Timestamp),
1104        Value::Interval { .. } => Some(DataType::Interval),
1105        _ => None,
1106    };
1107    match (temporal_type(left), temporal_type(right)) {
1108        (Some(DataType::Date), Some(DataType::Timestamp))
1109        | (Some(DataType::Timestamp), Some(DataType::Date)) => Some((
1110            left.clone().coerce_into(DataType::Timestamp)?,
1111            right.clone().coerce_into(DataType::Timestamp)?,
1112        )),
1113        (Some(_), Some(_)) => None,
1114        (Some(t), None) => {
1115            let coerced = right.clone().coerce_into(t)?;
1116            Some((left.clone(), coerced))
1117        }
1118        (None, Some(t)) => {
1119            let coerced = left.clone().coerce_into(t)?;
1120            Some((coerced, right.clone()))
1121        }
1122        (None, None) => None,
1123    }
1124}
1125
1126/// PG fractional-propagation: month frac → days (×30), day frac → micros (×86.4G).
1127fn scale_interval_by_real(months: i32, days: i32, micros: i64, factor: f64) -> Value {
1128    let raw_months = months as f64 * factor;
1129    let whole_months = raw_months.trunc() as i64;
1130    let frac_months = raw_months - whole_months as f64;
1131    let months_frac_as_days = frac_months * 30.0;
1132
1133    let raw_days = days as f64 * factor + months_frac_as_days;
1134    let whole_days = raw_days.trunc() as i64;
1135    let frac_days = raw_days - whole_days as f64;
1136    let days_frac_as_micros = (frac_days * crate::datetime::MICROS_PER_DAY as f64).round() as i64;
1137
1138    let raw_micros = (micros as f64 * factor).round() as i64;
1139    let total_micros = raw_micros.saturating_add(days_frac_as_micros);
1140
1141    let clamp_i32 = |n: i64| n.clamp(i32::MIN as i64, i32::MAX as i64) as i32;
1142    Value::Interval {
1143        months: clamp_i32(whole_months),
1144        days: clamp_i32(whole_days),
1145        micros: total_micros,
1146    }
1147}
1148
1149/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
1150fn eval_and(left: &Value, right: &Value) -> Result<Value> {
1151    let l = to_bool_or_null(left)?;
1152    let r = to_bool_or_null(right)?;
1153    match (l, r) {
1154        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
1155        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
1156        _ => Ok(Value::Null),
1157    }
1158}
1159
1160/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
1161fn eval_or(left: &Value, right: &Value) -> Result<Value> {
1162    let l = to_bool_or_null(left)?;
1163    let r = to_bool_or_null(right)?;
1164    match (l, r) {
1165        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
1166        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
1167        _ => Ok(Value::Null),
1168    }
1169}
1170
1171fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
1172    match val {
1173        Value::Boolean(b) => Ok(Some(*b)),
1174        Value::Null => Ok(None),
1175        Value::Integer(i) => Ok(Some(*i != 0)),
1176        _ => Err(SqlError::TypeMismatch {
1177            expected: "BOOLEAN".into(),
1178            got: format!("{}", val.data_type()),
1179        }),
1180    }
1181}
1182
1183fn eval_arithmetic(
1184    left: &Value,
1185    right: &Value,
1186    int_op: fn(i64, i64) -> Option<i64>,
1187    real_op: fn(f64, f64) -> f64,
1188) -> Result<Value> {
1189    match (left, right) {
1190        (Value::Integer(a), Value::Integer(b)) => int_op(*a, *b)
1191            .map(Value::Integer)
1192            .ok_or(SqlError::IntegerOverflow),
1193        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
1194        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
1195        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
1196        _ => Err(SqlError::TypeMismatch {
1197            expected: "numeric".into(),
1198            got: format!("{} and {}", left.data_type(), right.data_type()),
1199        }),
1200    }
1201}
1202
1203fn eval_in_values(lhs: &Value, list: &[Expr], ctx: &EvalCtx, negated: bool) -> Result<Value> {
1204    if list.is_empty() {
1205        return Ok(Value::Boolean(negated));
1206    }
1207    if lhs.is_null() {
1208        return Ok(Value::Null);
1209    }
1210    let mut has_null = false;
1211    for item in list {
1212        let rhs = eval_expr(item, ctx)?;
1213        if rhs.is_null() {
1214            has_null = true;
1215        } else if lhs == &rhs {
1216            return Ok(Value::Boolean(!negated));
1217        }
1218    }
1219    if has_null {
1220        Ok(Value::Null)
1221    } else {
1222        Ok(Value::Boolean(negated))
1223    }
1224}
1225
1226fn eval_in_set(
1227    lhs: &Value,
1228    values: &rustc_hash::FxHashSet<Value>,
1229    has_null: bool,
1230    negated: bool,
1231) -> Result<Value> {
1232    if values.is_empty() && !has_null {
1233        return Ok(Value::Boolean(negated));
1234    }
1235    if lhs.is_null() {
1236        return Ok(Value::Null);
1237    }
1238    if values.contains(lhs) {
1239        return Ok(Value::Boolean(!negated));
1240    }
1241    if has_null {
1242        Ok(Value::Null)
1243    } else {
1244        Ok(Value::Boolean(negated))
1245    }
1246}
1247
1248fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
1249    if val.is_null() {
1250        return Ok(Value::Null);
1251    }
1252    match op {
1253        UnaryOp::Neg => match val {
1254            Value::Integer(i) => i
1255                .checked_neg()
1256                .map(Value::Integer)
1257                .ok_or(SqlError::IntegerOverflow),
1258            Value::Real(r) => Ok(Value::Real(-r)),
1259            Value::Interval {
1260                months,
1261                days,
1262                micros,
1263            } => {
1264                let m = months.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1265                let d = days.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1266                let u = micros.checked_neg().ok_or(SqlError::IntegerOverflow)?;
1267                Ok(Value::Interval {
1268                    months: m,
1269                    days: d,
1270                    micros: u,
1271                })
1272            }
1273            _ => Err(SqlError::TypeMismatch {
1274                expected: "numeric or INTERVAL".into(),
1275                got: format!("{}", val.data_type()),
1276            }),
1277        },
1278        UnaryOp::Not => match val {
1279            Value::Boolean(b) => Ok(Value::Boolean(!b)),
1280            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
1281            _ => Err(SqlError::TypeMismatch {
1282                expected: "BOOLEAN".into(),
1283                got: format!("{}", val.data_type()),
1284            }),
1285        },
1286    }
1287}
1288
1289fn value_to_text(val: &Value) -> String {
1290    match val {
1291        Value::Text(s) => s.to_string(),
1292        Value::Integer(i) => i.to_string(),
1293        Value::Real(r) => {
1294            if r.fract() == 0.0 && r.is_finite() {
1295                format!("{r:.1}")
1296            } else {
1297                format!("{r}")
1298            }
1299        }
1300        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
1301        Value::Null => String::new(),
1302        Value::Blob(b) => {
1303            let mut s = String::with_capacity(b.len() * 2);
1304            for byte in b {
1305                s.push_str(&format!("{byte:02X}"));
1306            }
1307            s
1308        }
1309        Value::Date(d) => crate::datetime::format_date(*d),
1310        Value::Time(t) => crate::datetime::format_time(*t),
1311        Value::Timestamp(t) => crate::datetime::format_timestamp(*t),
1312        Value::Interval {
1313            months,
1314            days,
1315            micros,
1316        } => crate::datetime::format_interval(*months, *days, *micros),
1317        Value::Json(s) => s.to_string(),
1318        Value::Jsonb(b) => crate::json::decode_to_text(b).unwrap_or_default(),
1319        Value::TsVector(b) => crate::fts::tsvector_display(b),
1320        Value::TsQuery(b) => crate::fts::tsquery_display(b),
1321        Value::Array(_) => val.to_string(),
1322        Value::Vector(_) => val.to_string(),
1323    }
1324}
1325
1326fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
1327    // BETWEEN routed through eval_binary_op so comparison logic lives in one place.
1328    let ge = eval_binary_op(val, BinOp::GtEq, low)?;
1329    let le = eval_binary_op(val, BinOp::LtEq, high)?;
1330
1331    let result = match (as_bool(&ge), as_bool(&le)) {
1332        (Some(false), _) | (_, Some(false)) => Some(false),
1333        (Some(true), Some(true)) => Some(true),
1334        _ => None,
1335    };
1336
1337    match result {
1338        Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
1339        None => Ok(Value::Null),
1340    }
1341}
1342
1343fn as_bool(v: &Value) -> Option<bool> {
1344    match v {
1345        Value::Boolean(b) => Some(*b),
1346        _ => None,
1347    }
1348}
1349
1350const MAX_LIKE_PATTERN_LEN: usize = 10_000;
1351
1352fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
1353    if val.is_null() || pattern.is_null() {
1354        return Ok(Value::Null);
1355    }
1356    let text = match val {
1357        Value::Text(s) => s.as_str(),
1358        _ => {
1359            return Err(SqlError::TypeMismatch {
1360                expected: "TEXT".into(),
1361                got: val.data_type().to_string(),
1362            })
1363        }
1364    };
1365    let pat = match pattern {
1366        Value::Text(s) => s.as_str(),
1367        _ => {
1368            return Err(SqlError::TypeMismatch {
1369                expected: "TEXT".into(),
1370                got: pattern.data_type().to_string(),
1371            })
1372        }
1373    };
1374
1375    if pat.len() > MAX_LIKE_PATTERN_LEN {
1376        return Err(SqlError::InvalidValue(format!(
1377            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})",
1378            pat.len()
1379        )));
1380    }
1381
1382    let esc_char = match escape {
1383        Some(Value::Text(s)) => {
1384            let mut chars = s.chars();
1385            let c = chars.next().ok_or_else(|| {
1386                SqlError::InvalidValue("ESCAPE must be a single character".into())
1387            })?;
1388            if chars.next().is_some() {
1389                return Err(SqlError::InvalidValue(
1390                    "ESCAPE must be a single character".into(),
1391                ));
1392            }
1393            Some(c)
1394        }
1395        Some(Value::Null) => return Ok(Value::Null),
1396        Some(_) => {
1397            return Err(SqlError::TypeMismatch {
1398                expected: "TEXT".into(),
1399                got: "non-text".into(),
1400            })
1401        }
1402        None => None,
1403    };
1404
1405    let matched = like_match(text, pat, esc_char);
1406    Ok(Value::Boolean(if negated { !matched } else { matched }))
1407}
1408
1409fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
1410    let t: Vec<char> = text.chars().collect();
1411    let p: Vec<char> = pattern.chars().collect();
1412    like_match_impl(&t, &p, 0, 0, escape)
1413}
1414
1415fn like_match_impl(
1416    t: &[char],
1417    p: &[char],
1418    mut ti: usize,
1419    mut pi: usize,
1420    esc: Option<char>,
1421) -> bool {
1422    let mut star_pi: Option<usize> = None;
1423    let mut star_ti: usize = 0;
1424
1425    while ti < t.len() {
1426        if pi < p.len() {
1427            if let Some(ec) = esc {
1428                if p[pi] == ec && pi + 1 < p.len() {
1429                    pi += 1;
1430                    let pc_lower = p[pi].to_ascii_lowercase();
1431                    let tc_lower = t[ti].to_ascii_lowercase();
1432                    if pc_lower == tc_lower {
1433                        pi += 1;
1434                        ti += 1;
1435                        continue;
1436                    } else if let Some(sp) = star_pi {
1437                        pi = sp + 1;
1438                        star_ti += 1;
1439                        ti = star_ti;
1440                        continue;
1441                    } else {
1442                        return false;
1443                    }
1444                }
1445            }
1446            if p[pi] == '%' {
1447                star_pi = Some(pi);
1448                star_ti = ti;
1449                pi += 1;
1450                continue;
1451            }
1452            if p[pi] == '_' {
1453                pi += 1;
1454                ti += 1;
1455                continue;
1456            }
1457            if p[pi].eq_ignore_ascii_case(&t[ti]) {
1458                pi += 1;
1459                ti += 1;
1460                continue;
1461            }
1462        }
1463        if let Some(sp) = star_pi {
1464            pi = sp + 1;
1465            star_ti += 1;
1466            ti = star_ti;
1467        } else {
1468            return false;
1469        }
1470    }
1471
1472    while pi < p.len() && p[pi] == '%' {
1473        pi += 1;
1474    }
1475    pi == p.len()
1476}
1477
1478fn eval_case(
1479    operand: Option<&Expr>,
1480    conditions: &[(Expr, Expr)],
1481    else_result: Option<&Expr>,
1482    ctx: &EvalCtx,
1483) -> Result<Value> {
1484    if let Some(op_expr) = operand {
1485        let op_val = eval_expr(op_expr, ctx)?;
1486        for (cond, result) in conditions {
1487            let cond_val = eval_expr(cond, ctx)?;
1488            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
1489                return eval_expr(result, ctx);
1490            }
1491        }
1492    } else {
1493        for (cond, result) in conditions {
1494            let cond_val = eval_expr(cond, ctx)?;
1495            if is_truthy(&cond_val) {
1496                return eval_expr(result, ctx);
1497            }
1498        }
1499    }
1500    match else_result {
1501        Some(e) => eval_expr(e, ctx),
1502        None => Ok(Value::Null),
1503    }
1504}
1505
1506pub(crate) fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
1507    if val.is_null() {
1508        return Ok(Value::Null);
1509    }
1510    match target {
1511        DataType::Integer => match val {
1512            Value::Integer(_) => Ok(val.clone()),
1513            Value::Real(r) => Ok(Value::Integer(*r as i64)),
1514            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
1515            Value::Text(s) => s
1516                .trim()
1517                .parse::<i64>()
1518                .map(Value::Integer)
1519                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
1520                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
1521            _ => Err(SqlError::InvalidValue(format!(
1522                "cannot cast {} to INTEGER",
1523                val.data_type()
1524            ))),
1525        },
1526        DataType::Real => match val {
1527            Value::Real(_) => Ok(val.clone()),
1528            Value::Integer(i) => Ok(Value::Real(*i as f64)),
1529            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
1530            Value::Text(s) => s
1531                .trim()
1532                .parse::<f64>()
1533                .map(Value::Real)
1534                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
1535            _ => Err(SqlError::InvalidValue(format!(
1536                "cannot cast {} to REAL",
1537                val.data_type()
1538            ))),
1539        },
1540        DataType::Text => Ok(Value::Text(value_to_text(val).into())),
1541        DataType::Boolean => match val {
1542            Value::Boolean(_) => Ok(val.clone()),
1543            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
1544            Value::Text(s) => {
1545                let lower = s.trim().to_ascii_lowercase();
1546                match lower.as_str() {
1547                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
1548                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
1549                    _ => Err(SqlError::InvalidValue(format!(
1550                        "cannot cast '{s}' to BOOLEAN"
1551                    ))),
1552                }
1553            }
1554            _ => Err(SqlError::InvalidValue(format!(
1555                "cannot cast {} to BOOLEAN",
1556                val.data_type()
1557            ))),
1558        },
1559        DataType::Blob => match val {
1560            Value::Blob(_) => Ok(val.clone()),
1561            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
1562            _ => Err(SqlError::InvalidValue(format!(
1563                "cannot cast {} to BLOB",
1564                val.data_type()
1565            ))),
1566        },
1567        DataType::Null => Ok(Value::Null),
1568        DataType::Date => val.clone().coerce_into(DataType::Date).ok_or_else(|| {
1569            SqlError::InvalidValue(format!("cannot cast {} to DATE", val.data_type()))
1570        }),
1571        DataType::Time => val.clone().coerce_into(DataType::Time).ok_or_else(|| {
1572            SqlError::InvalidValue(format!("cannot cast {} to TIME", val.data_type()))
1573        }),
1574        DataType::Timestamp => val.clone().coerce_into(DataType::Timestamp).ok_or_else(|| {
1575            SqlError::InvalidValue(format!("cannot cast {} to TIMESTAMP", val.data_type()))
1576        }),
1577        DataType::Interval => val.clone().coerce_into(DataType::Interval).ok_or_else(|| {
1578            SqlError::InvalidValue(format!("cannot cast {} to INTERVAL", val.data_type()))
1579        }),
1580        DataType::Json => val.clone().coerce_into(DataType::Json).ok_or_else(|| {
1581            SqlError::InvalidValue(format!("cannot cast {} to JSON", val.data_type()))
1582        }),
1583        DataType::Jsonb => val.clone().coerce_into(DataType::Jsonb).ok_or_else(|| {
1584            SqlError::InvalidValue(format!("cannot cast {} to JSONB", val.data_type()))
1585        }),
1586        DataType::TsVector => val.clone().coerce_into(DataType::TsVector).ok_or_else(|| {
1587            SqlError::InvalidValue(format!("cannot cast {} to TSVECTOR", val.data_type()))
1588        }),
1589        DataType::TsQuery => val.clone().coerce_into(DataType::TsQuery).ok_or_else(|| {
1590            SqlError::InvalidValue(format!("cannot cast {} to TSQUERY", val.data_type()))
1591        }),
1592        DataType::Array => val.clone().coerce_into(DataType::Array).ok_or_else(|| {
1593            SqlError::InvalidValue(format!("cannot cast {} to ARRAY", val.data_type()))
1594        }),
1595        DataType::Vector { dim } => match val {
1596            Value::Vector(v) if v.len() as u16 == dim => Ok(val.clone()),
1597            Value::Vector(_) => Err(SqlError::InvalidValue(format!(
1598                "cannot cast {} to VECTOR({dim}) (dim mismatch)",
1599                val.data_type()
1600            ))),
1601            Value::Text(s) => parse_vector_literal(s.as_str(), dim).map(Value::Vector),
1602            _ => Err(SqlError::InvalidValue(format!(
1603                "cannot cast {} to VECTOR({dim})",
1604                val.data_type()
1605            ))),
1606        },
1607    }
1608}
1609
1610fn parse_vector_literal(s: &str, expected_dim: u16) -> Result<std::sync::Arc<[f32]>> {
1611    let trimmed = s.trim();
1612    let inner = trimmed
1613        .strip_prefix('[')
1614        .and_then(|s| s.strip_suffix(']'))
1615        .unwrap_or(trimmed);
1616    let mut out: Vec<f32> = Vec::with_capacity(expected_dim as usize);
1617    for tok in inner.split(',') {
1618        let tok = tok.trim();
1619        if tok.is_empty() {
1620            continue;
1621        }
1622        let x: f32 = tok
1623            .parse()
1624            .map_err(|_| SqlError::InvalidValue(format!("invalid vector element: '{tok}'")))?;
1625        out.push(x);
1626    }
1627    if out.len() as u16 != expected_dim {
1628        return Err(SqlError::InvalidValue(format!(
1629            "vector literal has {} elements, expected {expected_dim}",
1630            out.len()
1631        )));
1632    }
1633    Ok(std::sync::Arc::from(out.into_boxed_slice()))
1634}
1635
1636fn eval_scalar_function(name: &str, args: &[Expr], ctx: &EvalCtx) -> Result<Value> {
1637    let evaluated: Vec<Value> = args
1638        .iter()
1639        .map(|a| eval_expr(a, ctx))
1640        .collect::<Result<Vec<_>>>()?;
1641
1642    match name {
1643        "LENGTH" => {
1644            check_args(name, &evaluated, 1)?;
1645            match &evaluated[0] {
1646                Value::Null => Ok(Value::Null),
1647                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
1648                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
1649                Value::TsVector(b) => crate::fts::fn_length_tsvector(b),
1650                _ => Ok(Value::Integer(
1651                    value_to_text(&evaluated[0]).chars().count() as i64
1652                )),
1653            }
1654        }
1655        "UPPER" => {
1656            check_args(name, &evaluated, 1)?;
1657            match &evaluated[0] {
1658                Value::Null => Ok(Value::Null),
1659                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
1660                _ => Ok(Value::Text(
1661                    value_to_text(&evaluated[0]).to_ascii_uppercase().into(),
1662                )),
1663            }
1664        }
1665        "LOWER" => {
1666            check_args(name, &evaluated, 1)?;
1667            match &evaluated[0] {
1668                Value::Null => Ok(Value::Null),
1669                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
1670                _ => Ok(Value::Text(
1671                    value_to_text(&evaluated[0]).to_ascii_lowercase().into(),
1672                )),
1673            }
1674        }
1675        "SUBSTR" | "SUBSTRING" => {
1676            if evaluated.len() < 2 || evaluated.len() > 3 {
1677                return Err(SqlError::InvalidValue(format!(
1678                    "{name} requires 2 or 3 arguments"
1679                )));
1680            }
1681            if evaluated.iter().any(|v| v.is_null()) {
1682                return Ok(Value::Null);
1683            }
1684            let s = value_to_text(&evaluated[0]);
1685            let chars: Vec<char> = s.chars().collect();
1686            let start = match &evaluated[1] {
1687                Value::Integer(i) => *i,
1688                _ => {
1689                    return Err(SqlError::TypeMismatch {
1690                        expected: "INTEGER".into(),
1691                        got: evaluated[1].data_type().to_string(),
1692                    })
1693                }
1694            };
1695            let len = chars.len() as i64;
1696
1697            let (begin, count) = if evaluated.len() == 3 {
1698                let cnt = match &evaluated[2] {
1699                    Value::Integer(i) => *i,
1700                    _ => {
1701                        return Err(SqlError::TypeMismatch {
1702                            expected: "INTEGER".into(),
1703                            got: evaluated[2].data_type().to_string(),
1704                        })
1705                    }
1706                };
1707                if start >= 1 {
1708                    let b = (start - 1).min(len) as usize;
1709                    let c = cnt.max(0) as usize;
1710                    (b, c)
1711                } else if start == 0 {
1712                    let c = (cnt - 1).max(0) as usize;
1713                    (0usize, c)
1714                } else {
1715                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
1716                    (0usize, adjusted_cnt)
1717                }
1718            } else if start >= 1 {
1719                let b = (start - 1).min(len) as usize;
1720                (b, chars.len() - b)
1721            } else if start == 0 {
1722                (0usize, chars.len())
1723            } else {
1724                let b = (len + start).max(0) as usize;
1725                (b, chars.len() - b)
1726            };
1727
1728            let result: String = chars.iter().skip(begin).take(count).collect();
1729            Ok(Value::Text(result.into()))
1730        }
1731        "TRIM" | "LTRIM" | "RTRIM" => {
1732            if evaluated.is_empty() || evaluated.len() > 2 {
1733                return Err(SqlError::InvalidValue(format!(
1734                    "{name} requires 1 or 2 arguments"
1735                )));
1736            }
1737            if evaluated[0].is_null() {
1738                return Ok(Value::Null);
1739            }
1740            let s = value_to_text(&evaluated[0]);
1741            let trim_chars: Vec<char> = if evaluated.len() == 2 {
1742                if evaluated[1].is_null() {
1743                    return Ok(Value::Null);
1744                }
1745                value_to_text(&evaluated[1]).chars().collect()
1746            } else {
1747                vec![' ']
1748            };
1749            let result = match name {
1750                "TRIM" => s
1751                    .trim_matches(|c: char| trim_chars.contains(&c))
1752                    .to_string(),
1753                "LTRIM" => s
1754                    .trim_start_matches(|c: char| trim_chars.contains(&c))
1755                    .to_string(),
1756                "RTRIM" => s
1757                    .trim_end_matches(|c: char| trim_chars.contains(&c))
1758                    .to_string(),
1759                _ => unreachable!(),
1760            };
1761            Ok(Value::Text(result.into()))
1762        }
1763        "REPLACE" => {
1764            check_args(name, &evaluated, 3)?;
1765            if evaluated.iter().any(|v| v.is_null()) {
1766                return Ok(Value::Null);
1767            }
1768            let s = value_to_text(&evaluated[0]);
1769            let from = value_to_text(&evaluated[1]);
1770            let to = value_to_text(&evaluated[2]);
1771            if from.is_empty() {
1772                return Ok(Value::Text(s.into()));
1773            }
1774            Ok(Value::Text(s.replace(&from, &to).into()))
1775        }
1776        "INSTR" => {
1777            check_args(name, &evaluated, 2)?;
1778            if evaluated.iter().any(|v| v.is_null()) {
1779                return Ok(Value::Null);
1780            }
1781            let haystack = value_to_text(&evaluated[0]);
1782            let needle = value_to_text(&evaluated[1]);
1783            let pos = haystack
1784                .find(&needle)
1785                .map(|i| haystack[..i].chars().count() as i64 + 1)
1786                .unwrap_or(0);
1787            Ok(Value::Integer(pos))
1788        }
1789        "CONCAT" => {
1790            if evaluated.is_empty() {
1791                return Ok(Value::Text(CompactString::default()));
1792            }
1793            let mut result = String::new();
1794            for v in &evaluated {
1795                match v {
1796                    Value::Null => {}
1797                    _ => result.push_str(&value_to_text(v)),
1798                }
1799            }
1800            Ok(Value::Text(result.into()))
1801        }
1802        "ABS" => {
1803            check_args(name, &evaluated, 1)?;
1804            match &evaluated[0] {
1805                Value::Null => Ok(Value::Null),
1806                Value::Integer(i) => i
1807                    .checked_abs()
1808                    .map(Value::Integer)
1809                    .ok_or(SqlError::IntegerOverflow),
1810                Value::Real(r) => Ok(Value::Real(r.abs())),
1811                _ => Err(SqlError::TypeMismatch {
1812                    expected: "numeric".into(),
1813                    got: evaluated[0].data_type().to_string(),
1814                }),
1815            }
1816        }
1817        "ROUND" => {
1818            if evaluated.is_empty() || evaluated.len() > 2 {
1819                return Err(SqlError::InvalidValue(
1820                    "ROUND requires 1 or 2 arguments".into(),
1821                ));
1822            }
1823            if evaluated[0].is_null() {
1824                return Ok(Value::Null);
1825            }
1826            let val = match &evaluated[0] {
1827                Value::Integer(i) => *i as f64,
1828                Value::Real(r) => *r,
1829                _ => {
1830                    return Err(SqlError::TypeMismatch {
1831                        expected: "numeric".into(),
1832                        got: evaluated[0].data_type().to_string(),
1833                    })
1834                }
1835            };
1836            let places = if evaluated.len() == 2 {
1837                match &evaluated[1] {
1838                    Value::Null => return Ok(Value::Null),
1839                    Value::Integer(i) => *i,
1840                    _ => {
1841                        return Err(SqlError::TypeMismatch {
1842                            expected: "INTEGER".into(),
1843                            got: evaluated[1].data_type().to_string(),
1844                        })
1845                    }
1846                }
1847            } else {
1848                0
1849            };
1850            let factor = 10f64.powi(places as i32);
1851            let rounded = (val * factor).round() / factor;
1852            Ok(Value::Real(rounded))
1853        }
1854        "CEIL" | "CEILING" => {
1855            check_args(name, &evaluated, 1)?;
1856            match &evaluated[0] {
1857                Value::Null => Ok(Value::Null),
1858                Value::Integer(i) => Ok(Value::Integer(*i)),
1859                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
1860                _ => Err(SqlError::TypeMismatch {
1861                    expected: "numeric".into(),
1862                    got: evaluated[0].data_type().to_string(),
1863                }),
1864            }
1865        }
1866        "FLOOR" => {
1867            check_args(name, &evaluated, 1)?;
1868            match &evaluated[0] {
1869                Value::Null => Ok(Value::Null),
1870                Value::Integer(i) => Ok(Value::Integer(*i)),
1871                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
1872                _ => Err(SqlError::TypeMismatch {
1873                    expected: "numeric".into(),
1874                    got: evaluated[0].data_type().to_string(),
1875                }),
1876            }
1877        }
1878        "SIGN" => {
1879            check_args(name, &evaluated, 1)?;
1880            match &evaluated[0] {
1881                Value::Null => Ok(Value::Null),
1882                Value::Integer(i) => Ok(Value::Integer(i.signum())),
1883                Value::Real(r) => {
1884                    if *r > 0.0 {
1885                        Ok(Value::Integer(1))
1886                    } else if *r < 0.0 {
1887                        Ok(Value::Integer(-1))
1888                    } else {
1889                        Ok(Value::Integer(0))
1890                    }
1891                }
1892                _ => Err(SqlError::TypeMismatch {
1893                    expected: "numeric".into(),
1894                    got: evaluated[0].data_type().to_string(),
1895                }),
1896            }
1897        }
1898        "SQRT" => {
1899            check_args(name, &evaluated, 1)?;
1900            match &evaluated[0] {
1901                Value::Null => Ok(Value::Null),
1902                Value::Integer(i) => {
1903                    if *i < 0 {
1904                        Ok(Value::Null)
1905                    } else {
1906                        Ok(Value::Real((*i as f64).sqrt()))
1907                    }
1908                }
1909                Value::Real(r) => {
1910                    if *r < 0.0 {
1911                        Ok(Value::Null)
1912                    } else {
1913                        Ok(Value::Real(r.sqrt()))
1914                    }
1915                }
1916                _ => Err(SqlError::TypeMismatch {
1917                    expected: "numeric".into(),
1918                    got: evaluated[0].data_type().to_string(),
1919                }),
1920            }
1921        }
1922        "RANDOM" => {
1923            check_args(name, &evaluated, 0)?;
1924            use std::collections::hash_map::DefaultHasher;
1925            use std::hash::{Hash, Hasher};
1926            let mut hasher = DefaultHasher::new();
1927            crate::datetime::now_micros().hash(&mut hasher);
1928            std::thread::current().id().hash(&mut hasher);
1929            let mut val = hasher.finish() as i64;
1930            if val == i64::MIN {
1931                val = i64::MAX;
1932            }
1933            Ok(Value::Integer(val))
1934        }
1935        "TYPEOF" => {
1936            check_args(name, &evaluated, 1)?;
1937            let type_name = match &evaluated[0] {
1938                Value::Null => "null",
1939                Value::Integer(_) => "integer",
1940                Value::Real(_) => "real",
1941                Value::Text(_) => "text",
1942                Value::Blob(_) => "blob",
1943                Value::Boolean(_) => "boolean",
1944                Value::Date(_) => "date",
1945                Value::Time(_) => "time",
1946                Value::Timestamp(_) => "timestamp",
1947                Value::Interval { .. } => "interval",
1948                Value::Json(_) => "json",
1949                Value::Jsonb(_) => "jsonb",
1950                Value::TsVector(_) => "tsvector",
1951                Value::TsQuery(_) => "tsquery",
1952                Value::Array(_) => "array",
1953                Value::Vector(_) => "vector",
1954            };
1955            Ok(Value::Text(type_name.into()))
1956        }
1957        "MIN" => {
1958            check_args(name, &evaluated, 2)?;
1959            if evaluated[0].is_null() {
1960                return Ok(evaluated[1].clone());
1961            }
1962            if evaluated[1].is_null() {
1963                return Ok(evaluated[0].clone());
1964            }
1965            if evaluated[0] <= evaluated[1] {
1966                Ok(evaluated[0].clone())
1967            } else {
1968                Ok(evaluated[1].clone())
1969            }
1970        }
1971        "MAX" => {
1972            check_args(name, &evaluated, 2)?;
1973            if evaluated[0].is_null() {
1974                return Ok(evaluated[1].clone());
1975            }
1976            if evaluated[1].is_null() {
1977                return Ok(evaluated[0].clone());
1978            }
1979            if evaluated[0] >= evaluated[1] {
1980                Ok(evaluated[0].clone())
1981            } else {
1982                Ok(evaluated[1].clone())
1983            }
1984        }
1985        "HEX" => {
1986            check_args(name, &evaluated, 1)?;
1987            match &evaluated[0] {
1988                Value::Null => Ok(Value::Null),
1989                Value::Blob(b) => {
1990                    let mut s = String::with_capacity(b.len() * 2);
1991                    for byte in b {
1992                        s.push_str(&format!("{byte:02X}"));
1993                    }
1994                    Ok(Value::Text(s.into()))
1995                }
1996                Value::Text(s) => {
1997                    let mut r = String::with_capacity(s.len() * 2);
1998                    for byte in s.as_bytes() {
1999                        r.push_str(&format!("{byte:02X}"));
2000                    }
2001                    Ok(Value::Text(r.into()))
2002                }
2003                _ => Ok(Value::Text(value_to_text(&evaluated[0]).into())),
2004            }
2005        }
2006        "NOW" | "CURRENT_TIMESTAMP" | "LOCALTIMESTAMP" => {
2007            check_args(name, &evaluated, 0)?;
2008            Ok(Value::Timestamp(crate::datetime::txn_or_clock_micros()))
2009        }
2010        "CURRENT_DATE" => {
2011            check_args(name, &evaluated, 0)?;
2012            Ok(Value::Date(crate::datetime::ts_to_date_floor(
2013                crate::datetime::txn_or_clock_micros(),
2014            )))
2015        }
2016        "CURRENT_TIME" | "LOCALTIME" => {
2017            check_args(name, &evaluated, 0)?;
2018            Ok(Value::Time(
2019                crate::datetime::ts_split(crate::datetime::txn_or_clock_micros()).1,
2020            ))
2021        }
2022        "CLOCK_TIMESTAMP" | "STATEMENT_TIMESTAMP" | "TRANSACTION_TIMESTAMP" => {
2023            check_args(name, &evaluated, 0)?;
2024            let ts = match name {
2025                "CLOCK_TIMESTAMP" => crate::datetime::now_micros(),
2026                _ => crate::datetime::txn_or_clock_micros(),
2027            };
2028            Ok(Value::Timestamp(ts))
2029        }
2030        "EXTRACT" | "DATE_PART" | "DATEPART" => {
2031            check_args(name, &evaluated, 2)?;
2032            let field: &str = match &evaluated[0] {
2033                Value::Null => return Ok(Value::Null),
2034                Value::Text(s) => s.as_str(),
2035                _ => {
2036                    return Err(SqlError::TypeMismatch {
2037                        expected: "TEXT field name".into(),
2038                        got: evaluated[0].data_type().to_string(),
2039                    })
2040                }
2041            };
2042            if evaluated[1].is_null() {
2043                return Ok(Value::Null);
2044            }
2045            crate::datetime::extract(field, &evaluated[1])
2046        }
2047        "DATE_TRUNC" => {
2048            if evaluated.len() < 2 || evaluated.len() > 3 {
2049                return Err(SqlError::InvalidValue(
2050                    "DATE_TRUNC requires 2 or 3 arguments".into(),
2051                ));
2052            }
2053            let unit = match &evaluated[0] {
2054                Value::Null => return Ok(Value::Null),
2055                Value::Text(s) => s.to_string(),
2056                _ => {
2057                    return Err(SqlError::TypeMismatch {
2058                        expected: "TEXT unit name".into(),
2059                        got: evaluated[0].data_type().to_string(),
2060                    })
2061                }
2062            };
2063            if evaluated[1].is_null() {
2064                return Ok(Value::Null);
2065            }
2066            // Optional tz arg: truncate in that zone, then convert back to UTC.
2067            if evaluated.len() == 3 {
2068                if let Value::Text(tz) = &evaluated[2] {
2069                    if !tz.eq_ignore_ascii_case("UTC") {
2070                        if let Value::Timestamp(ts) = &evaluated[1] {
2071                            return date_trunc_in_zone(&unit, *ts, tz);
2072                        }
2073                    }
2074                }
2075            }
2076            crate::datetime::date_trunc(&unit, &evaluated[1])
2077        }
2078        "DATE_BIN" => {
2079            check_args(name, &evaluated, 3)?;
2080            if evaluated.iter().any(|v| v.is_null()) {
2081                return Ok(Value::Null);
2082            }
2083            let stride = match &evaluated[0] {
2084                Value::Interval {
2085                    months: _,
2086                    days,
2087                    micros,
2088                } => *days as i64 * crate::datetime::MICROS_PER_DAY + *micros,
2089                _ => {
2090                    return Err(SqlError::TypeMismatch {
2091                        expected: "INTERVAL stride".into(),
2092                        got: evaluated[0].data_type().to_string(),
2093                    })
2094                }
2095            };
2096            if stride <= 0 {
2097                return Err(SqlError::InvalidValue(
2098                    "DATE_BIN stride must be positive".into(),
2099                ));
2100            }
2101            let (src, origin) = match (&evaluated[1], &evaluated[2]) {
2102                (Value::Timestamp(s), Value::Timestamp(o)) => (*s, *o),
2103                _ => {
2104                    return Err(SqlError::TypeMismatch {
2105                        expected: "TIMESTAMP, TIMESTAMP".into(),
2106                        got: format!("{}, {}", evaluated[1].data_type(), evaluated[2].data_type()),
2107                    })
2108                }
2109            };
2110            let diff = src - origin;
2111            let binned = origin + (diff.div_euclid(stride)) * stride;
2112            Ok(Value::Timestamp(binned))
2113        }
2114        "AGE" => {
2115            if evaluated.len() == 1 {
2116                if evaluated[0].is_null() {
2117                    return Ok(Value::Null);
2118                }
2119                let ts = match &evaluated[0] {
2120                    Value::Timestamp(t) => *t,
2121                    Value::Date(d) => crate::datetime::date_to_ts(*d),
2122                    _ => {
2123                        return Err(SqlError::TypeMismatch {
2124                            expected: "TIMESTAMP or DATE".into(),
2125                            got: evaluated[0].data_type().to_string(),
2126                        })
2127                    }
2128                };
2129                // Implicit reference: today at midnight UTC.
2130                let today = crate::datetime::today_days();
2131                let midnight = crate::datetime::date_to_ts(today);
2132                let (m, d, u) = crate::datetime::age(midnight, ts)?;
2133                return Ok(Value::Interval {
2134                    months: m,
2135                    days: d,
2136                    micros: u,
2137                });
2138            }
2139            check_args(name, &evaluated, 2)?;
2140            if evaluated.iter().any(|v| v.is_null()) {
2141                return Ok(Value::Null);
2142            }
2143            let a = ts_of(&evaluated[0])?;
2144            let b = ts_of(&evaluated[1])?;
2145            let (m, d, u) = crate::datetime::age(a, b)?;
2146            Ok(Value::Interval {
2147                months: m,
2148                days: d,
2149                micros: u,
2150            })
2151        }
2152        "MAKE_DATE" => {
2153            check_args(name, &evaluated, 3)?;
2154            if evaluated.iter().any(|v| v.is_null()) {
2155                return Ok(Value::Null);
2156            }
2157            let y = int_arg(&evaluated[0], "MAKE_DATE year")? as i32;
2158            let m = int_arg(&evaluated[1], "MAKE_DATE month")? as u8;
2159            let d = int_arg(&evaluated[2], "MAKE_DATE day")? as u8;
2160            crate::datetime::ymd_to_days(y, m, d)
2161                .map(Value::Date)
2162                .ok_or_else(|| SqlError::InvalidDateLiteral(format!("make_date({y}, {m}, {d})")))
2163        }
2164        "MAKE_TIME" => {
2165            check_args(name, &evaluated, 3)?;
2166            if evaluated.iter().any(|v| v.is_null()) {
2167                return Ok(Value::Null);
2168            }
2169            let h = int_arg(&evaluated[0], "MAKE_TIME hour")? as u8;
2170            let mi = int_arg(&evaluated[1], "MAKE_TIME minute")? as u8;
2171            let (s, us) = real_sec_arg(&evaluated[2])?;
2172            crate::datetime::hmsn_to_micros(h, mi, s, us)
2173                .map(Value::Time)
2174                .ok_or_else(|| SqlError::InvalidTimeLiteral(format!("make_time({h}, {mi}, ...)")))
2175        }
2176        "MAKE_TIMESTAMP" => {
2177            check_args(name, &evaluated, 6)?;
2178            if evaluated.iter().any(|v| v.is_null()) {
2179                return Ok(Value::Null);
2180            }
2181            let y = int_arg(&evaluated[0], "MAKE_TIMESTAMP year")? as i32;
2182            let mo = int_arg(&evaluated[1], "MAKE_TIMESTAMP month")? as u8;
2183            let d = int_arg(&evaluated[2], "MAKE_TIMESTAMP day")? as u8;
2184            let h = int_arg(&evaluated[3], "MAKE_TIMESTAMP hour")? as u8;
2185            let mi = int_arg(&evaluated[4], "MAKE_TIMESTAMP min")? as u8;
2186            let (s, us) = real_sec_arg(&evaluated[5])?;
2187            let days = crate::datetime::ymd_to_days(y, mo, d).ok_or_else(|| {
2188                SqlError::InvalidTimestampLiteral(format!("make_timestamp year={y}"))
2189            })?;
2190            let tmicros = crate::datetime::hmsn_to_micros(h, mi, s, us)
2191                .ok_or_else(|| SqlError::InvalidTimestampLiteral("time out of range".into()))?;
2192            Ok(Value::Timestamp(crate::datetime::ts_combine(days, tmicros)))
2193        }
2194        "MAKE_INTERVAL" => {
2195            // Positional args: years, months, weeks, days, hours, mins, secs.
2196            if evaluated.len() > 7 {
2197                return Err(SqlError::InvalidValue(
2198                    "MAKE_INTERVAL accepts at most 7 arguments".into(),
2199                ));
2200            }
2201            let mut months: i64 = 0;
2202            let mut days: i64 = 0;
2203            let mut micros: i64 = 0;
2204            for (i, v) in evaluated.iter().enumerate() {
2205                if v.is_null() {
2206                    continue;
2207                }
2208                let n = match v {
2209                    Value::Integer(n) => *n,
2210                    Value::Real(r) => *r as i64,
2211                    _ => {
2212                        return Err(SqlError::TypeMismatch {
2213                            expected: "numeric".into(),
2214                            got: v.data_type().to_string(),
2215                        })
2216                    }
2217                };
2218                match i {
2219                    0 => months = months.saturating_add(n.saturating_mul(12)),
2220                    1 => months = months.saturating_add(n),
2221                    2 => days = days.saturating_add(n.saturating_mul(7)),
2222                    3 => days = days.saturating_add(n),
2223                    4 => {
2224                        micros = micros
2225                            .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_HOUR))
2226                    }
2227                    5 => {
2228                        micros =
2229                            micros.saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_MIN))
2230                    }
2231                    6 => {
2232                        // Seconds may be fractional — also check Real.
2233                        if let Value::Real(r) = v {
2234                            micros = micros.saturating_add(
2235                                (*r * crate::datetime::MICROS_PER_SEC as f64) as i64,
2236                            );
2237                        } else {
2238                            micros = micros
2239                                .saturating_add(n.saturating_mul(crate::datetime::MICROS_PER_SEC));
2240                        }
2241                    }
2242                    _ => unreachable!(),
2243                }
2244            }
2245            Ok(Value::Interval {
2246                months: months.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
2247                days: days.clamp(i32::MIN as i64, i32::MAX as i64) as i32,
2248                micros,
2249            })
2250        }
2251        "JUSTIFY_DAYS" => {
2252            check_args(name, &evaluated, 1)?;
2253            match &evaluated[0] {
2254                Value::Null => Ok(Value::Null),
2255                Value::Interval {
2256                    months,
2257                    days,
2258                    micros,
2259                } => {
2260                    let (m, d, u) = crate::datetime::justify_days(*months, *days, *micros);
2261                    Ok(Value::Interval {
2262                        months: m,
2263                        days: d,
2264                        micros: u,
2265                    })
2266                }
2267                other => Err(SqlError::TypeMismatch {
2268                    expected: "INTERVAL".into(),
2269                    got: other.data_type().to_string(),
2270                }),
2271            }
2272        }
2273        "JUSTIFY_HOURS" => {
2274            check_args(name, &evaluated, 1)?;
2275            match &evaluated[0] {
2276                Value::Null => Ok(Value::Null),
2277                Value::Interval {
2278                    months,
2279                    days,
2280                    micros,
2281                } => {
2282                    let (m, d, u) = crate::datetime::justify_hours(*months, *days, *micros);
2283                    Ok(Value::Interval {
2284                        months: m,
2285                        days: d,
2286                        micros: u,
2287                    })
2288                }
2289                other => Err(SqlError::TypeMismatch {
2290                    expected: "INTERVAL".into(),
2291                    got: other.data_type().to_string(),
2292                }),
2293            }
2294        }
2295        "JUSTIFY_INTERVAL" => {
2296            check_args(name, &evaluated, 1)?;
2297            match &evaluated[0] {
2298                Value::Null => Ok(Value::Null),
2299                Value::Interval {
2300                    months,
2301                    days,
2302                    micros,
2303                } => {
2304                    let (m, d, u) = crate::datetime::justify_interval(*months, *days, *micros);
2305                    Ok(Value::Interval {
2306                        months: m,
2307                        days: d,
2308                        micros: u,
2309                    })
2310                }
2311                other => Err(SqlError::TypeMismatch {
2312                    expected: "INTERVAL".into(),
2313                    got: other.data_type().to_string(),
2314                }),
2315            }
2316        }
2317        "ISFINITE" => {
2318            check_args(name, &evaluated, 1)?;
2319            if evaluated[0].is_null() {
2320                return Ok(Value::Null);
2321            }
2322            Ok(Value::Boolean(evaluated[0].is_finite_temporal()))
2323        }
2324        "DATE" => {
2325            if evaluated.is_empty() {
2326                return Err(SqlError::InvalidValue(
2327                    "DATE requires at least 1 argument".into(),
2328                ));
2329            }
2330            if evaluated[0].is_null() {
2331                return Ok(Value::Null);
2332            }
2333            let d = match &evaluated[0] {
2334                Value::Date(d) => *d,
2335                Value::Timestamp(t) => crate::datetime::ts_to_date_floor(*t),
2336                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::today_days(),
2337                Value::Text(s) => crate::datetime::parse_date(s)?,
2338                Value::Integer(n) => {
2339                    crate::datetime::ts_to_date_floor(*n * crate::datetime::MICROS_PER_SEC)
2340                }
2341                other => {
2342                    return Err(SqlError::TypeMismatch {
2343                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2344                        got: other.data_type().to_string(),
2345                    })
2346                }
2347            };
2348            Ok(Value::Date(d))
2349        }
2350        "TIME" => {
2351            if evaluated.is_empty() {
2352                return Err(SqlError::InvalidValue(
2353                    "TIME requires at least 1 argument".into(),
2354                ));
2355            }
2356            if evaluated[0].is_null() {
2357                return Ok(Value::Null);
2358            }
2359            let t = match &evaluated[0] {
2360                Value::Time(t) => *t,
2361                Value::Timestamp(t) => crate::datetime::ts_split(*t).1,
2362                Value::Text(s) if s.eq_ignore_ascii_case("now") => {
2363                    crate::datetime::current_time_micros()
2364                }
2365                Value::Text(s) => crate::datetime::parse_time(s)?,
2366                other => {
2367                    return Err(SqlError::TypeMismatch {
2368                        expected: "TIMESTAMP, TIME, or TEXT".into(),
2369                        got: other.data_type().to_string(),
2370                    })
2371                }
2372            };
2373            Ok(Value::Time(t))
2374        }
2375        "DATETIME" => {
2376            if evaluated.is_empty() {
2377                return Err(SqlError::InvalidValue(
2378                    "DATETIME requires at least 1 argument".into(),
2379                ));
2380            }
2381            if evaluated[0].is_null() {
2382                return Ok(Value::Null);
2383            }
2384            let t = match &evaluated[0] {
2385                Value::Timestamp(t) => *t,
2386                Value::Date(d) => crate::datetime::date_to_ts(*d),
2387                Value::Text(s) if s.eq_ignore_ascii_case("now") => crate::datetime::now_micros(),
2388                Value::Text(s) => crate::datetime::parse_timestamp(s)?,
2389                Value::Integer(n) => n * crate::datetime::MICROS_PER_SEC,
2390                other => {
2391                    return Err(SqlError::TypeMismatch {
2392                        expected: "TIMESTAMP, DATE, TEXT, or INTEGER".into(),
2393                        got: other.data_type().to_string(),
2394                    })
2395                }
2396            };
2397            Ok(Value::Timestamp(t))
2398        }
2399        "STRFTIME" => {
2400            if evaluated.len() < 2 {
2401                return Err(SqlError::InvalidValue(
2402                    "STRFTIME requires format + value".into(),
2403                ));
2404            }
2405            if evaluated.iter().take(2).any(|v| v.is_null()) {
2406                return Ok(Value::Null);
2407            }
2408            let fmt = match &evaluated[0] {
2409                Value::Text(s) => s.to_string(),
2410                _ => {
2411                    return Err(SqlError::TypeMismatch {
2412                        expected: "TEXT format".into(),
2413                        got: evaluated[0].data_type().to_string(),
2414                    })
2415                }
2416            };
2417            let out = crate::datetime::strftime(&fmt, &evaluated[1])?;
2418            Ok(Value::Text(out.into()))
2419        }
2420        "JULIANDAY" => {
2421            if evaluated.is_empty() {
2422                return Err(SqlError::InvalidValue(
2423                    "JULIANDAY requires at least 1 argument".into(),
2424                ));
2425            }
2426            if evaluated[0].is_null() {
2427                return Ok(Value::Null);
2428            }
2429            let micros = ts_of(&evaluated[0])?;
2430            let (days, tmicros) = crate::datetime::ts_split(micros);
2431            // Julian Day 2440587.5 = 1970-01-01 00:00:00 UTC (Julian days start at noon).
2432            let julian =
2433                days as f64 + 2_440_587.5 + tmicros as f64 / crate::datetime::MICROS_PER_DAY as f64;
2434            Ok(Value::Real(julian))
2435        }
2436        "UNIXEPOCH" => {
2437            if evaluated.is_empty() {
2438                return Err(SqlError::InvalidValue(
2439                    "UNIXEPOCH requires at least 1 argument".into(),
2440                ));
2441            }
2442            if evaluated[0].is_null() {
2443                return Ok(Value::Null);
2444            }
2445            let micros = ts_of(&evaluated[0])?;
2446            let subsec = evaluated
2447                .get(1)
2448                .and_then(|v| {
2449                    if let Value::Text(s) = v {
2450                        Some(s.to_string())
2451                    } else {
2452                        None
2453                    }
2454                })
2455                .map(|s| s.eq_ignore_ascii_case("subsec") || s.eq_ignore_ascii_case("subsecond"))
2456                .unwrap_or(false);
2457            if subsec {
2458                Ok(Value::Real(
2459                    micros as f64 / crate::datetime::MICROS_PER_SEC as f64,
2460                ))
2461            } else {
2462                Ok(Value::Integer(micros / crate::datetime::MICROS_PER_SEC))
2463            }
2464        }
2465        "TIMEDIFF" => {
2466            check_args(name, &evaluated, 2)?;
2467            if evaluated.iter().any(|v| v.is_null()) {
2468                return Ok(Value::Null);
2469            }
2470            let a = ts_of(&evaluated[0])?;
2471            let b = ts_of(&evaluated[1])?;
2472            let (days, micros) = crate::datetime::subtract_timestamps(a, b);
2473            let sign = if days < 0 || (days == 0 && micros < 0) {
2474                "-"
2475            } else {
2476                "+"
2477            };
2478            let abs_days = days.unsigned_abs() as i64;
2479            let abs_us = micros.unsigned_abs() as i64;
2480            // PG-compat format string: "(+|-)YYYY-MM-DD HH:MM:SS.SSS", days-only.
2481            let (h, m, s, us) = crate::datetime::micros_to_hmsn(abs_us);
2482            Ok(Value::Text(
2483                format!("{sign}{abs_days:04}-00-00 {h:02}:{m:02}:{s:02}.{us:06}").into(),
2484            ))
2485        }
2486        "AT_TIMEZONE" => {
2487            check_args(name, &evaluated, 2)?;
2488            if evaluated.iter().any(|v| v.is_null()) {
2489                return Ok(Value::Null);
2490            }
2491            let ts = match &evaluated[0] {
2492                Value::Timestamp(t) => *t,
2493                Value::Date(d) => crate::datetime::date_to_ts(*d),
2494                other => {
2495                    return Err(SqlError::TypeMismatch {
2496                        expected: "TIMESTAMP or DATE".into(),
2497                        got: other.data_type().to_string(),
2498                    })
2499                }
2500            };
2501            let zone = match &evaluated[1] {
2502                Value::Text(s) => s.to_string(),
2503                _ => {
2504                    return Err(SqlError::TypeMismatch {
2505                        expected: "TEXT time zone".into(),
2506                        got: evaluated[1].data_type().to_string(),
2507                    })
2508                }
2509            };
2510            // Reject POSIX-style 'UTC+5' (ambiguous sign convention).
2511            let upper = zone.to_ascii_uppercase();
2512            if (upper.starts_with("UTC+") || upper.starts_with("UTC-")) && zone.len() > 3 {
2513                return Err(SqlError::InvalidTimezone(format!(
2514                    "'{zone}' is ambiguous — use ISO-8601 offset like '+05:00' or named zone like 'Etc/GMT-5'"
2515                )));
2516            }
2517            let formatted = crate::datetime::format_timestamp_in_zone(ts, &zone)?;
2518            Ok(Value::Text(formatted.into()))
2519        }
2520        "JSONB_TYPEOF" | "JSON_TYPEOF" => {
2521            check_args(name, &evaluated, 1)?;
2522            if evaluated[0].is_null() {
2523                return Ok(Value::Null);
2524            }
2525            crate::json::fn_typeof(&evaluated[0])
2526        }
2527        "JSONB_ARRAY_LENGTH" | "JSON_ARRAY_LENGTH" => {
2528            check_args(name, &evaluated, 1)?;
2529            if evaluated[0].is_null() {
2530                return Ok(Value::Null);
2531            }
2532            crate::json::fn_array_length(&evaluated[0])
2533        }
2534        "JSONB_OBJECT_LENGTH" | "JSON_OBJECT_LENGTH" => {
2535            check_args(name, &evaluated, 1)?;
2536            if evaluated[0].is_null() {
2537                return Ok(Value::Null);
2538            }
2539            crate::json::fn_object_length(&evaluated[0])
2540        }
2541        "JSONB_EXTRACT_PATH" | "JSON_EXTRACT_PATH" => {
2542            if evaluated.is_empty() {
2543                return Err(SqlError::InvalidValue(format!(
2544                    "{name} requires at least 1 argument"
2545                )));
2546            }
2547            if evaluated[0].is_null() {
2548                return Ok(Value::Null);
2549            }
2550            let target = if name.eq_ignore_ascii_case("JSONB_EXTRACT_PATH") {
2551                crate::types::DataType::Jsonb
2552            } else {
2553                crate::types::DataType::Json
2554            };
2555            crate::json::fn_extract_path(&evaluated, target, false)
2556        }
2557        "JSONB_EXTRACT_PATH_TEXT" | "JSON_EXTRACT_PATH_TEXT" => {
2558            if evaluated.is_empty() {
2559                return Err(SqlError::InvalidValue(format!(
2560                    "{name} requires at least 1 argument"
2561                )));
2562            }
2563            if evaluated[0].is_null() {
2564                return Ok(Value::Null);
2565            }
2566            crate::json::fn_extract_path(&evaluated, crate::types::DataType::Text, true)
2567        }
2568        "JSON_EXTRACT" => {
2569            check_args(name, &evaluated, 2)?;
2570            if evaluated[0].is_null() || evaluated[1].is_null() {
2571                return Ok(Value::Null);
2572            }
2573            crate::json::fn_sqlite_extract(&evaluated[0], &evaluated[1])
2574        }
2575        "JSON_VALID" => {
2576            check_args(name, &evaluated, 1)?;
2577            if evaluated[0].is_null() {
2578                return Ok(Value::Null);
2579            }
2580            crate::json::fn_valid(&evaluated[0])
2581        }
2582        "JSONB_STRIP_NULLS" | "JSON_STRIP_NULLS" => {
2583            check_args(name, &evaluated, 1)?;
2584            if evaluated[0].is_null() {
2585                return Ok(Value::Null);
2586            }
2587            let target = if name.eq_ignore_ascii_case("JSONB_STRIP_NULLS") {
2588                crate::types::DataType::Jsonb
2589            } else {
2590                crate::types::DataType::Json
2591            };
2592            crate::json::fn_strip_nulls(&evaluated[0], target)
2593        }
2594        "JSONB_PRETTY" | "JSON_PRETTY" => {
2595            check_args(name, &evaluated, 1)?;
2596            if evaluated[0].is_null() {
2597                return Ok(Value::Null);
2598            }
2599            crate::json::fn_pretty(&evaluated[0])
2600        }
2601        "JSONB_BUILD_OBJECT" | "JSON_BUILD_OBJECT" => {
2602            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_OBJECT") {
2603                crate::types::DataType::Jsonb
2604            } else {
2605                crate::types::DataType::Json
2606            };
2607            crate::json::fn_build_object(&evaluated, target)
2608        }
2609        "JSONB_BUILD_ARRAY" | "JSON_BUILD_ARRAY" => {
2610            let target = if name.eq_ignore_ascii_case("JSONB_BUILD_ARRAY") {
2611                crate::types::DataType::Jsonb
2612            } else {
2613                crate::types::DataType::Json
2614            };
2615            crate::json::fn_build_array(&evaluated, target)
2616        }
2617        "JSONB_SET" | "JSON_SET" => {
2618            if !(3..=4).contains(&evaluated.len()) {
2619                return Err(SqlError::InvalidValue(format!(
2620                    "{name} requires 3 or 4 arguments"
2621                )));
2622            }
2623            if evaluated[0].is_null() {
2624                return Ok(Value::Null);
2625            }
2626            let target = if name.eq_ignore_ascii_case("JSONB_SET") {
2627                crate::types::DataType::Jsonb
2628            } else {
2629                crate::types::DataType::Json
2630            };
2631            let create_missing = evaluated
2632                .get(3)
2633                .map(|v| matches!(v, Value::Boolean(true)))
2634                .unwrap_or(true);
2635            crate::json::fn_set(
2636                &evaluated[0],
2637                &evaluated[1],
2638                &evaluated[2],
2639                create_missing,
2640                target,
2641            )
2642        }
2643        "JSONB_INSERT" | "JSON_INSERT" => {
2644            if !(3..=4).contains(&evaluated.len()) {
2645                return Err(SqlError::InvalidValue(format!(
2646                    "{name} requires 3 or 4 arguments"
2647                )));
2648            }
2649            if evaluated[0].is_null() {
2650                return Ok(Value::Null);
2651            }
2652            let target = if name.eq_ignore_ascii_case("JSONB_INSERT") {
2653                crate::types::DataType::Jsonb
2654            } else {
2655                crate::types::DataType::Json
2656            };
2657            let insert_after = evaluated
2658                .get(3)
2659                .map(|v| matches!(v, Value::Boolean(true)))
2660                .unwrap_or(false);
2661            crate::json::fn_insert(
2662                &evaluated[0],
2663                &evaluated[1],
2664                &evaluated[2],
2665                insert_after,
2666                target,
2667            )
2668        }
2669        "TO_JSONB" | "TO_JSON" => {
2670            check_args(name, &evaluated, 1)?;
2671            let target = if name.eq_ignore_ascii_case("TO_JSONB") {
2672                crate::types::DataType::Jsonb
2673            } else {
2674                crate::types::DataType::Json
2675            };
2676            crate::json::fn_to_json(&evaluated[0], target)
2677        }
2678        "ROW_TO_JSON" | "ROW_TO_JSONB" => {
2679            check_args(name, &evaluated, 1)?;
2680            let target = if name.eq_ignore_ascii_case("ROW_TO_JSONB") {
2681                crate::types::DataType::Jsonb
2682            } else {
2683                crate::types::DataType::Json
2684            };
2685            crate::json::fn_to_json(&evaluated[0], target)
2686        }
2687        "JSON_OBJECT" => crate::json::fn_json_object(&evaluated),
2688        "JSON_EXISTS" => {
2689            check_args(name, &evaluated, 2)?;
2690            if evaluated[0].is_null() || evaluated[1].is_null() {
2691                return Ok(Value::Null);
2692            }
2693            crate::json::fn_json_exists(&evaluated[0], &evaluated[1])
2694        }
2695        "JSON_VALUE" => {
2696            check_args(name, &evaluated, 2)?;
2697            if evaluated[0].is_null() || evaluated[1].is_null() {
2698                return Ok(Value::Null);
2699            }
2700            crate::json::fn_json_value(&evaluated[0], &evaluated[1])
2701        }
2702        "JSON_QUERY" => {
2703            check_args(name, &evaluated, 2)?;
2704            if evaluated[0].is_null() || evaluated[1].is_null() {
2705                return Ok(Value::Null);
2706            }
2707            crate::json::fn_json_query(&evaluated[0], &evaluated[1], crate::types::DataType::Jsonb)
2708        }
2709        "JSONB_PATH_EXISTS" => {
2710            if evaluated[0].is_null() || evaluated[1].is_null() {
2711                return Ok(Value::Null);
2712            }
2713            crate::json::fn_jsonb_path_exists(&evaluated)
2714        }
2715        "JSONB_PATH_MATCH" => {
2716            if evaluated[0].is_null() || evaluated[1].is_null() {
2717                return Ok(Value::Null);
2718            }
2719            crate::json::fn_jsonb_path_match(&evaluated)
2720        }
2721        "JSONB_PATH_QUERY_FIRST" => {
2722            if evaluated[0].is_null() || evaluated[1].is_null() {
2723                return Ok(Value::Null);
2724            }
2725            crate::json::fn_jsonb_path_query_first(&evaluated)
2726        }
2727        "JSONB_PATH_QUERY_ARRAY" => {
2728            if evaluated[0].is_null() || evaluated[1].is_null() {
2729                return Ok(Value::Null);
2730            }
2731            crate::json::fn_jsonb_path_query_array(&evaluated)
2732        }
2733        "JSONB_PATH_EXISTS_TZ" => {
2734            if evaluated[0].is_null() || evaluated[1].is_null() {
2735                return Ok(Value::Null);
2736            }
2737            crate::json::fn_jsonb_path_exists_tz(&evaluated)
2738        }
2739        "JSONB_PATH_MATCH_TZ" => {
2740            if evaluated[0].is_null() || evaluated[1].is_null() {
2741                return Ok(Value::Null);
2742            }
2743            crate::json::fn_jsonb_path_match_tz(&evaluated)
2744        }
2745        "JSONB_PATH_QUERY_TZ" => {
2746            if evaluated[0].is_null() || evaluated[1].is_null() {
2747                return Ok(Value::Null);
2748            }
2749            crate::json::fn_jsonb_path_query_tz(&evaluated)
2750        }
2751        "JSONB_PATH_QUERY_FIRST_TZ" => {
2752            if evaluated[0].is_null() || evaluated[1].is_null() {
2753                return Ok(Value::Null);
2754            }
2755            crate::json::fn_jsonb_path_query_first_tz(&evaluated)
2756        }
2757        "JSONB_PATH_QUERY_ARRAY_TZ" => {
2758            if evaluated[0].is_null() || evaluated[1].is_null() {
2759                return Ok(Value::Null);
2760            }
2761            crate::json::fn_jsonb_path_query_array_tz(&evaluated)
2762        }
2763        "JSONB_HAS_KEY" | "JSON_HAS_KEY" => {
2764            check_args(name, &evaluated, 2)?;
2765            if evaluated[0].is_null() || evaluated[1].is_null() {
2766                return Ok(Value::Null);
2767            }
2768            crate::json::op_has_key(&evaluated[0], &evaluated[1])
2769        }
2770        "JSONB_HAS_ANY_KEY" | "JSON_HAS_ANY_KEY" => {
2771            check_args(name, &evaluated, 2)?;
2772            if evaluated[0].is_null() || evaluated[1].is_null() {
2773                return Ok(Value::Null);
2774            }
2775            crate::json::op_has_any_key(&evaluated[0], &evaluated[1])
2776        }
2777        "JSONB_HAS_ALL_KEYS" | "JSON_HAS_ALL_KEYS" => {
2778            check_args(name, &evaluated, 2)?;
2779            if evaluated[0].is_null() || evaluated[1].is_null() {
2780                return Ok(Value::Null);
2781            }
2782            crate::json::op_has_all_keys(&evaluated[0], &evaluated[1])
2783        }
2784        "TO_TSVECTOR" => fts_to_tsvector(&evaluated),
2785        "TO_TSQUERY" => fts_to_tsquery(&evaluated),
2786        "PLAINTO_TSQUERY" => fts_plainto_tsquery(&evaluated),
2787        "PHRASETO_TSQUERY" => fts_phraseto_tsquery(&evaluated),
2788        "WEBSEARCH_TO_TSQUERY" => fts_websearch_to_tsquery(&evaluated),
2789        "TS_RANK" => fts_ts_rank(&evaluated, false),
2790        "TS_RANK_CD" => fts_ts_rank(&evaluated, true),
2791        "TS_HEADLINE" => fts_ts_headline(&evaluated),
2792        "TS_LEXIZE" => fts_ts_lexize(&evaluated),
2793        "NUMNODE" => fts_numnode(&evaluated),
2794        "SETWEIGHT" => fts_setweight(&evaluated),
2795        "STRIP" => fts_strip(&evaluated),
2796        _ => Err(SqlError::Unsupported(format!("scalar function: {name}"))),
2797    }
2798}
2799
2800fn fts_resolve_config_and_text(
2801    args: &[Value],
2802    fname: &str,
2803) -> Result<(crate::fts::TokenizerKind, String)> {
2804    if args.is_empty() || args.len() > 2 {
2805        return Err(SqlError::InvalidValue(format!(
2806            "{fname} requires 1 or 2 arguments"
2807        )));
2808    }
2809    let (config_name, text) = if args.len() == 2 {
2810        let cfg = match &args[0] {
2811            Value::Text(s) => Some(s.as_str().to_string()),
2812            v => {
2813                return Err(SqlError::TypeMismatch {
2814                    expected: "TEXT (config)".into(),
2815                    got: v.data_type().to_string(),
2816                })
2817            }
2818        };
2819        let txt = match &args[1] {
2820            Value::Text(s) => s.as_str().to_string(),
2821            v => {
2822                return Err(SqlError::TypeMismatch {
2823                    expected: "TEXT".into(),
2824                    got: v.data_type().to_string(),
2825                })
2826            }
2827        };
2828        (cfg, txt)
2829    } else {
2830        let txt = match &args[0] {
2831            Value::Text(s) => s.as_str().to_string(),
2832            v => {
2833                return Err(SqlError::TypeMismatch {
2834                    expected: "TEXT".into(),
2835                    got: v.data_type().to_string(),
2836                })
2837            }
2838        };
2839        (None, txt)
2840    };
2841    let kind = match config_name {
2842        Some(name) => crate::fts::TokenizerKind::from_name(&name)?,
2843        None => crate::fts::TokenizerKind::English,
2844    };
2845    Ok((kind, text))
2846}
2847
2848fn fts_to_tsvector(args: &[Value]) -> Result<Value> {
2849    if args.iter().any(|v| v.is_null()) {
2850        return Ok(Value::Null);
2851    }
2852    let (kind, text) = fts_resolve_config_and_text(args, "to_tsvector")?;
2853    crate::fts::fn_to_tsvector_with(kind, &text)
2854}
2855
2856fn fts_to_tsquery(args: &[Value]) -> Result<Value> {
2857    if args.iter().any(|v| v.is_null()) {
2858        return Ok(Value::Null);
2859    }
2860    let (kind, text) = fts_resolve_config_and_text(args, "to_tsquery")?;
2861    crate::fts::fn_to_tsquery_with(kind, &text)
2862}
2863
2864fn fts_plainto_tsquery(args: &[Value]) -> Result<Value> {
2865    if args.iter().any(|v| v.is_null()) {
2866        return Ok(Value::Null);
2867    }
2868    let (kind, text) = fts_resolve_config_and_text(args, "plainto_tsquery")?;
2869    crate::fts::fn_plainto_tsquery_with(kind, &text)
2870}
2871
2872fn fts_phraseto_tsquery(args: &[Value]) -> Result<Value> {
2873    if args.iter().any(|v| v.is_null()) {
2874        return Ok(Value::Null);
2875    }
2876    let (kind, text) = fts_resolve_config_and_text(args, "phraseto_tsquery")?;
2877    crate::fts::fn_phraseto_tsquery_with(kind, &text)
2878}
2879
2880fn fts_websearch_to_tsquery(args: &[Value]) -> Result<Value> {
2881    if args.iter().any(|v| v.is_null()) {
2882        return Ok(Value::Null);
2883    }
2884    let (kind, text) = fts_resolve_config_and_text(args, "websearch_to_tsquery")?;
2885    crate::fts::fn_websearch_to_tsquery_with(kind, &text)
2886}
2887
2888fn fts_ts_rank(args: &[Value], cover_density: bool) -> Result<Value> {
2889    let fname = if cover_density {
2890        "ts_rank_cd"
2891    } else {
2892        "ts_rank"
2893    };
2894    if args.len() != 2 && args.len() != 3 {
2895        return Err(SqlError::InvalidValue(format!(
2896            "{fname} requires 2 or 3 arguments"
2897        )));
2898    }
2899    if args[0].is_null() || args[1].is_null() {
2900        return Ok(Value::Null);
2901    }
2902    let tsv = match &args[0] {
2903        Value::TsVector(b) => b,
2904        v => {
2905            return Err(SqlError::TypeMismatch {
2906                expected: "TSVECTOR".into(),
2907                got: v.data_type().to_string(),
2908            })
2909        }
2910    };
2911    let tsq = match &args[1] {
2912        Value::TsQuery(b) => b,
2913        v => {
2914            return Err(SqlError::TypeMismatch {
2915                expected: "TSQUERY".into(),
2916                got: v.data_type().to_string(),
2917            })
2918        }
2919    };
2920    let norm = if args.len() == 3 {
2921        match &args[2] {
2922            Value::Integer(n) => *n,
2923            Value::Null => return Ok(Value::Null),
2924            v => {
2925                return Err(SqlError::TypeMismatch {
2926                    expected: "INTEGER (norm)".into(),
2927                    got: v.data_type().to_string(),
2928                })
2929            }
2930        }
2931    } else {
2932        0
2933    };
2934    if cover_density {
2935        crate::fts::fn_ts_rank_cd(tsv, tsq, norm)
2936    } else {
2937        crate::fts::fn_ts_rank(tsv, tsq, norm)
2938    }
2939}
2940
2941fn fts_ts_headline(args: &[Value]) -> Result<Value> {
2942    if args.len() < 2 || args.len() > 4 {
2943        return Err(SqlError::InvalidValue(
2944            "ts_headline requires 2 to 4 arguments".into(),
2945        ));
2946    }
2947    if args.iter().any(|v| v.is_null()) {
2948        return Ok(Value::Null);
2949    }
2950    let kind = if args.len() >= 3 {
2951        match &args[0] {
2952            Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
2953            v => {
2954                return Err(SqlError::TypeMismatch {
2955                    expected: "TEXT (config)".into(),
2956                    got: v.data_type().to_string(),
2957                })
2958            }
2959        }
2960    } else {
2961        crate::fts::TokenizerKind::English
2962    };
2963    let text_idx = if args.len() >= 3 { 1 } else { 0 };
2964    let tsq_idx = if args.len() >= 3 { 2 } else { 1 };
2965    let text = match args.get(text_idx) {
2966        Some(Value::Text(s)) => s.as_str(),
2967        _ => {
2968            return Err(SqlError::TypeMismatch {
2969                expected: "TEXT".into(),
2970                got: "non-text".into(),
2971            })
2972        }
2973    };
2974    let tsq = match args.get(tsq_idx) {
2975        Some(Value::TsQuery(b)) => b.as_ref(),
2976        _ => {
2977            return Err(SqlError::TypeMismatch {
2978                expected: "TSQUERY".into(),
2979                got: "non-tsquery".into(),
2980            })
2981        }
2982    };
2983    crate::fts::fn_ts_headline_with(kind, text, tsq)
2984}
2985
2986fn fts_ts_lexize(args: &[Value]) -> Result<Value> {
2987    if args.len() != 2 {
2988        return Err(SqlError::InvalidValue(
2989            "ts_lexize requires 2 arguments (config, word)".into(),
2990        ));
2991    }
2992    if args.iter().any(|v| v.is_null()) {
2993        return Ok(Value::Null);
2994    }
2995    let kind = match &args[0] {
2996        Value::Text(s) => crate::fts::TokenizerKind::from_name(s.as_str())?,
2997        v => {
2998            return Err(SqlError::TypeMismatch {
2999                expected: "TEXT (config)".into(),
3000                got: v.data_type().to_string(),
3001            })
3002        }
3003    };
3004    let word = match &args[1] {
3005        Value::Text(s) => s.as_str(),
3006        v => {
3007            return Err(SqlError::TypeMismatch {
3008                expected: "TEXT (word)".into(),
3009                got: v.data_type().to_string(),
3010            })
3011        }
3012    };
3013    crate::fts::fn_ts_lexize_with(kind, word)
3014}
3015
3016fn fts_numnode(args: &[Value]) -> Result<Value> {
3017    check_args("numnode", args, 1)?;
3018    if args[0].is_null() {
3019        return Ok(Value::Null);
3020    }
3021    let tsq = match &args[0] {
3022        Value::TsQuery(b) => b,
3023        v => {
3024            return Err(SqlError::TypeMismatch {
3025                expected: "TSQUERY".into(),
3026                got: v.data_type().to_string(),
3027            })
3028        }
3029    };
3030    crate::fts::fn_numnode(tsq)
3031}
3032
3033fn fts_setweight(args: &[Value]) -> Result<Value> {
3034    if args.len() == 3 {
3035        return fts_setweight_selective(args);
3036    }
3037    check_args("setweight", args, 2)?;
3038    if args[0].is_null() || args[1].is_null() {
3039        return Ok(Value::Null);
3040    }
3041    let tsv = match &args[0] {
3042        Value::TsVector(b) => b,
3043        v => {
3044            return Err(SqlError::TypeMismatch {
3045                expected: "TSVECTOR".into(),
3046                got: v.data_type().to_string(),
3047            })
3048        }
3049    };
3050    let weight_text = match &args[1] {
3051        Value::Text(s) => s.as_str(),
3052        v => {
3053            return Err(SqlError::TypeMismatch {
3054                expected: "TEXT".into(),
3055                got: v.data_type().to_string(),
3056            })
3057        }
3058    };
3059    let weight = crate::fts::parse_weight_char(weight_text)?;
3060    crate::fts::fn_setweight(tsv, weight)
3061}
3062
3063fn fts_setweight_selective(args: &[Value]) -> Result<Value> {
3064    check_args("setweight", args, 3)?;
3065    if args[0].is_null() || args[1].is_null() || args[2].is_null() {
3066        return Ok(Value::Null);
3067    }
3068    let tsv = match &args[0] {
3069        Value::TsVector(b) => b,
3070        v => {
3071            return Err(SqlError::TypeMismatch {
3072                expected: "TSVECTOR".into(),
3073                got: v.data_type().to_string(),
3074            })
3075        }
3076    };
3077    let weight_text = match &args[1] {
3078        Value::Text(s) => s.as_str(),
3079        v => {
3080            return Err(SqlError::TypeMismatch {
3081                expected: "TEXT".into(),
3082                got: v.data_type().to_string(),
3083            })
3084        }
3085    };
3086    let weight = crate::fts::parse_weight_char(weight_text)?;
3087    let filter = match &args[2] {
3088        Value::Array(a) => a.as_ref().as_slice(),
3089        v => {
3090            return Err(SqlError::TypeMismatch {
3091                expected: "TEXT[]".into(),
3092                got: v.data_type().to_string(),
3093            })
3094        }
3095    };
3096    crate::fts::fn_setweight_selective(tsv, weight, filter)
3097}
3098
3099fn fts_strip(args: &[Value]) -> Result<Value> {
3100    check_args("strip", args, 1)?;
3101    if args[0].is_null() {
3102        return Ok(Value::Null);
3103    }
3104    let tsv = match &args[0] {
3105        Value::TsVector(b) => b,
3106        v => {
3107            return Err(SqlError::TypeMismatch {
3108                expected: "TSVECTOR".into(),
3109                got: v.data_type().to_string(),
3110            })
3111        }
3112    };
3113    crate::fts::fn_strip(tsv)
3114}
3115
3116/// Extract a timestamp (µs UTC) from a Value, coercing DATE → midnight.
3117fn ts_of(v: &Value) -> Result<i64> {
3118    match v {
3119        Value::Timestamp(t) => Ok(*t),
3120        Value::Date(d) => Ok(crate::datetime::date_to_ts(*d)),
3121        _ => Err(SqlError::TypeMismatch {
3122            expected: "TIMESTAMP or DATE".into(),
3123            got: v.data_type().to_string(),
3124        }),
3125    }
3126}
3127
3128fn int_arg(v: &Value, label: &str) -> Result<i64> {
3129    match v {
3130        Value::Integer(n) => Ok(*n),
3131        _ => Err(SqlError::TypeMismatch {
3132            expected: format!("INTEGER ({label})"),
3133            got: v.data_type().to_string(),
3134        }),
3135    }
3136}
3137
3138/// Extract (whole_seconds: u8, frac_micros: u32) from a numeric argument for MAKE_TIME-style calls.
3139fn real_sec_arg(v: &Value) -> Result<(u8, u32)> {
3140    match v {
3141        Value::Integer(n) => {
3142            if !(0..=60).contains(n) {
3143                return Err(SqlError::InvalidValue(format!("second out of range: {n}")));
3144            }
3145            Ok((*n as u8, 0))
3146        }
3147        Value::Real(r) => {
3148            let whole = r.trunc() as i64;
3149            if !(0..=60).contains(&whole) {
3150                return Err(SqlError::InvalidValue(format!("second out of range: {r}")));
3151            }
3152            let frac = ((r - whole as f64) * 1_000_000.0).round() as i64;
3153            Ok((whole as u8, frac.max(0) as u32))
3154        }
3155        _ => Err(SqlError::TypeMismatch {
3156            expected: "numeric seconds".into(),
3157            got: v.data_type().to_string(),
3158        }),
3159    }
3160}
3161
3162/// DATE_TRUNC with a non-UTC IANA zone: convert → truncate in that zone → convert back to UTC.
3163fn date_trunc_in_zone(unit: &str, ts_utc: i64, tz: &str) -> Result<Value> {
3164    use jiff::{tz::TimeZone, Timestamp as JTimestamp};
3165    let zone = TimeZone::get(tz).map_err(|e| SqlError::InvalidTimezone(format!("{tz}: {e}")))?;
3166    let ts = JTimestamp::from_microsecond(ts_utc)
3167        .map_err(|e| SqlError::InvalidValue(format!("ts: {e}")))?;
3168    let zoned = ts.to_zoned(zone.clone());
3169    let unit_lower = unit.to_ascii_lowercase();
3170    let rounded = match unit_lower.as_str() {
3171        "microseconds" => return Ok(Value::Timestamp(ts_utc)),
3172        "second" => zoned
3173            .start_of_day()
3174            .map_err(|e| SqlError::InvalidValue(format!("{e}")))?,
3175        _ => {
3176            let naive_ts = zoned.timestamp().as_microsecond();
3177            return crate::datetime::date_trunc(unit, &Value::Timestamp(naive_ts));
3178        }
3179    };
3180    Ok(Value::Timestamp(rounded.timestamp().as_microsecond()))
3181}
3182
3183fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
3184    if args.len() != expected {
3185        Err(SqlError::InvalidValue(format!(
3186            "{name} requires {expected} argument(s), got {}",
3187            args.len()
3188        )))
3189    } else {
3190        Ok(())
3191    }
3192}
3193
3194pub fn referenced_columns(expr: &Expr, columns: &[ColumnDef]) -> Vec<usize> {
3195    let mut indices = Vec::new();
3196    collect_column_refs(expr, columns, &mut indices);
3197    indices.sort_unstable();
3198    indices.dedup();
3199    indices
3200}
3201
3202fn collect_column_refs(expr: &Expr, columns: &[ColumnDef], out: &mut Vec<usize>) {
3203    match expr {
3204        Expr::Column(name) => {
3205            for (i, c) in columns.iter().enumerate() {
3206                if c.name == *name
3207                    || (c.name.len() > name.len()
3208                        && c.name.as_bytes()[c.name.len() - name.len() - 1] == b'.'
3209                        && c.name.ends_with(name.as_str()))
3210                {
3211                    out.push(i);
3212                    break;
3213                }
3214            }
3215        }
3216        Expr::QualifiedColumn { table, column } => {
3217            let mut found: Option<usize> = None;
3218            let mut bare_match: Option<usize> = None;
3219            let mut bare_count = 0usize;
3220            for (i, c) in columns.iter().enumerate() {
3221                if c.name.len() == table.len() + 1 + column.len()
3222                    && c.name.as_bytes()[table.len()] == b'.'
3223                    && c.name.starts_with(table.as_str())
3224                    && c.name.ends_with(column.as_str())
3225                {
3226                    found = Some(i);
3227                    break;
3228                }
3229                if c.name == *column {
3230                    bare_match = Some(i);
3231                    bare_count += 1;
3232                }
3233            }
3234            if let Some(idx) = found {
3235                out.push(idx);
3236            } else if bare_count == 1 {
3237                out.push(bare_match.unwrap());
3238            }
3239        }
3240        Expr::BinaryOp { left, right, .. } => {
3241            collect_column_refs(left, columns, out);
3242            collect_column_refs(right, columns, out);
3243        }
3244        Expr::UnaryOp { expr, .. } => {
3245            collect_column_refs(expr, columns, out);
3246        }
3247        Expr::IsNull(e) | Expr::IsNotNull(e) => {
3248            collect_column_refs(e, columns, out);
3249        }
3250        Expr::Function { args, .. } => {
3251            for arg in args {
3252                collect_column_refs(arg, columns, out);
3253            }
3254        }
3255        Expr::InSubquery { expr, .. } => {
3256            collect_column_refs(expr, columns, out);
3257        }
3258        Expr::InList { expr, list, .. } => {
3259            collect_column_refs(expr, columns, out);
3260            for item in list {
3261                collect_column_refs(item, columns, out);
3262            }
3263        }
3264        Expr::InSet { expr, .. } => {
3265            collect_column_refs(expr, columns, out);
3266        }
3267        Expr::Between {
3268            expr, low, high, ..
3269        } => {
3270            collect_column_refs(expr, columns, out);
3271            collect_column_refs(low, columns, out);
3272            collect_column_refs(high, columns, out);
3273        }
3274        Expr::Like {
3275            expr,
3276            pattern,
3277            escape,
3278            ..
3279        } => {
3280            collect_column_refs(expr, columns, out);
3281            collect_column_refs(pattern, columns, out);
3282            if let Some(esc) = escape {
3283                collect_column_refs(esc, columns, out);
3284            }
3285        }
3286        Expr::Case {
3287            operand,
3288            conditions,
3289            else_result,
3290        } => {
3291            if let Some(op) = operand {
3292                collect_column_refs(op, columns, out);
3293            }
3294            for (when, then) in conditions {
3295                collect_column_refs(when, columns, out);
3296                collect_column_refs(then, columns, out);
3297            }
3298            if let Some(e) = else_result {
3299                collect_column_refs(e, columns, out);
3300            }
3301        }
3302        Expr::Coalesce(args) => {
3303            for arg in args {
3304                collect_column_refs(arg, columns, out);
3305            }
3306        }
3307        Expr::Cast { expr, .. } => {
3308            collect_column_refs(expr, columns, out);
3309        }
3310        Expr::Collate { expr, .. } => {
3311            collect_column_refs(expr, columns, out);
3312        }
3313        Expr::WindowFunction { args, spec, .. } => {
3314            for arg in args {
3315                collect_column_refs(arg, columns, out);
3316            }
3317            for pb in &spec.partition_by {
3318                collect_column_refs(pb, columns, out);
3319            }
3320            for ob in &spec.order_by {
3321                collect_column_refs(&ob.expr, columns, out);
3322            }
3323        }
3324        Expr::ArrayLiteral(elems) => {
3325            for e in elems {
3326                collect_column_refs(e, columns, out);
3327            }
3328        }
3329        Expr::Quantified { left, right, .. } => {
3330            collect_column_refs(left, columns, out);
3331            if let crate::parser::QuantifiedRhs::Array(e) = right {
3332                collect_column_refs(e, columns, out);
3333            }
3334        }
3335        Expr::Literal(_)
3336        | Expr::Parameter(_)
3337        | Expr::CountStar
3338        | Expr::Exists { .. }
3339        | Expr::ScalarSubquery(_)
3340        | Expr::TypedNullRecord(_) => {}
3341    }
3342}
3343
3344pub fn is_truthy(val: &Value) -> bool {
3345    match val {
3346        Value::Boolean(b) => *b,
3347        Value::Integer(i) => *i != 0,
3348        Value::Null => false,
3349        _ => true,
3350    }
3351}
3352
3353#[cfg(test)]
3354#[path = "eval_tests.rs"]
3355mod tests;