Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::boxed::Box;
19use alloc::format;
20use alloc::string::{String, ToString};
21use alloc::vec::Vec;
22
23use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
24use spg_storage::{ColumnSchema, DataType, Row, TsLexeme, TsQueryAst, Value};
25
26/// Resolution context for evaluating a single row. `table_alias` is the alias
27/// (or table name) callers should accept as the qualifier on a column ref —
28/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
29#[derive(Debug, Clone)]
30pub struct EvalContext<'a> {
31    pub columns: &'a [ColumnSchema],
32    pub table_alias: Option<&'a str>,
33    /// v6.1.1 — bound parameters for `$N` placeholders inside the
34    /// expression tree. Empty for simple queries; populated by the
35    /// prepared-statement Execute path with Bind values converted
36    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
37    pub params: &'a [Value],
38    /// v7.12.1 — session text-search config (from `SET
39    /// default_text_search_config = '<name>'`). Resolved when the
40    /// engine builds an `EvalContext` and consumed by the FTS
41    /// function dispatcher when `to_tsvector(text)` /
42    /// `plainto_tsquery(text)` etc are called without an explicit
43    /// config arg. `None` falls through to `simple`.
44    pub default_text_search_config: Option<&'a str>,
45}
46
47impl<'a> EvalContext<'a> {
48    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
49        Self {
50            columns,
51            table_alias,
52            params: &[],
53            default_text_search_config: None,
54        }
55    }
56
57    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
58    /// resolution. The slice must outlive the context; callers
59    /// construct it from the prepared statement's Bind values.
60    #[must_use]
61    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
62        self.params = params;
63        self
64    }
65
66    /// v7.12.1 — attach the session's
67    /// `default_text_search_config`. Used by the FTS function
68    /// dispatcher when no explicit config arg is given.
69    #[must_use]
70    pub const fn with_default_text_search_config(mut self, cfg: Option<&'a str>) -> Self {
71        self.default_text_search_config = cfg;
72        self
73    }
74}
75
76#[derive(Debug, Clone, PartialEq)]
77pub enum EvalError {
78    ColumnNotFound {
79        name: String,
80    },
81    UnknownQualifier {
82        qualifier: String,
83    },
84    DivisionByZero,
85    TypeMismatch {
86        detail: String,
87    },
88    /// v6.1.1 — `$N` reference past the number of bound parameters.
89    /// Either the client sent too few in Bind, or the SQL has a
90    /// placeholder the prepared statement didn't account for.
91    PlaceholderOutOfRange {
92        n: u16,
93        bound: u16,
94    },
95}
96
97impl core::fmt::Display for EvalError {
98    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
99        match self {
100            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
101            Self::UnknownQualifier { qualifier } => {
102                write!(f, "unknown table qualifier: {qualifier}")
103            }
104            Self::DivisionByZero => f.write_str("division by zero"),
105            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
106            Self::PlaceholderOutOfRange { n, bound } => write!(
107                f,
108                "parameter ${n} referenced but only {bound} bound by client"
109            ),
110        }
111    }
112}
113
114pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
115    match expr {
116        Expr::Literal(l) => Ok(literal_to_value(l)),
117        Expr::Column(c) => resolve_column(c, row, ctx),
118        Expr::Placeholder(n) => {
119            let idx = usize::from(*n).saturating_sub(1);
120            ctx.params
121                .get(idx)
122                .cloned()
123                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
124                    n: *n,
125                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
126                })
127        }
128        Expr::Unary { op, expr } => {
129            let v = eval_expr(expr, row, ctx)?;
130            apply_unary(*op, v)
131        }
132        Expr::Binary { lhs, op, rhs } => {
133            let l = eval_expr(lhs, row, ctx)?;
134            let r = eval_expr(rhs, row, ctx)?;
135            apply_binary(*op, l, r)
136        }
137        Expr::Cast { expr, target } => {
138            let v = eval_expr(expr, row, ctx)?;
139            cast_value(v, *target)
140        }
141        Expr::IsNull { expr, negated } => {
142            let v = eval_expr(expr, row, ctx)?;
143            let is_null = matches!(v, Value::Null);
144            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
145        }
146        Expr::FunctionCall { name, args } => {
147            let evaluated: Result<Vec<Value>, _> =
148                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
149            apply_function(name, &evaluated?, ctx)
150        }
151        Expr::Like {
152            expr,
153            pattern,
154            negated,
155        } => {
156            let v = eval_expr(expr, row, ctx)?;
157            let p = eval_expr(pattern, row, ctx)?;
158            // NULL on either side propagates to NULL — same as PG.
159            let (text, pat) = match (v, p) {
160                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
161                (Value::Text(a), Value::Text(b)) => (a, b),
162                (Value::Text(_), other) | (other, _) => {
163                    return Err(EvalError::TypeMismatch {
164                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
165                    });
166                }
167            };
168            let m = like_match(&text, &pat);
169            Ok(Value::Bool(if *negated { !m } else { m }))
170        }
171        Expr::Extract { field, source } => {
172            let v = eval_expr(source, row, ctx)?;
173            extract_field(*field, &v)
174        }
175        // v4.10: subquery nodes should have been resolved into
176        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
177        // before the row loop. Anything reaching here is a bug.
178        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
179            Err(EvalError::TypeMismatch {
180                detail: "subquery reached row eval — engine resolver bug".into(),
181            })
182        }
183        // v4.12: window functions should have been rewritten into
184        // synthetic __win_N column references by
185        // exec_select_with_window before row eval. Anything
186        // reaching here is similarly a bug.
187        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
188            detail: "window function reached row eval — engine rewrite bug".into(),
189        }),
190        // v7.10.10 — `ARRAY[expr, expr, …]` constructor.
191        // v7.11.13 — element-type detection: all integers →
192        // IntArray (or BigIntArray when widening), any Text →
193        // TextArray. Non-TEXT non-integer elements (Bool, Float)
194        // stringify into TextArray as the safe default.
195        Expr::Array(items) => {
196            let mut materialised: Vec<Value> = Vec::with_capacity(items.len());
197            for elem in items {
198                materialised.push(eval_expr(elem, row, ctx)?);
199            }
200            let mut has_text = false;
201            let mut has_bigint = false;
202            let mut has_int = false;
203            for v in &materialised {
204                match v {
205                    Value::Null => {}
206                    Value::Int(_) | Value::SmallInt(_) => has_int = true,
207                    Value::BigInt(_) => has_bigint = true,
208                    Value::Text(_) | Value::Json(_) => has_text = true,
209                    _ => has_text = true,
210                }
211            }
212            if has_text || (!has_int && !has_bigint) {
213                let out: Vec<Option<String>> = materialised
214                    .into_iter()
215                    .map(|v| match v {
216                        Value::Null => None,
217                        Value::Text(s) | Value::Json(s) => Some(s),
218                        other => Some(value_to_text_for_array(&other)),
219                    })
220                    .collect();
221                return Ok(Value::TextArray(out));
222            }
223            if has_bigint {
224                let out: Vec<Option<i64>> = materialised
225                    .into_iter()
226                    .map(|v| match v {
227                        Value::Null => None,
228                        Value::Int(n) => Some(i64::from(n)),
229                        Value::SmallInt(n) => Some(i64::from(n)),
230                        Value::BigInt(n) => Some(n),
231                        _ => unreachable!(),
232                    })
233                    .collect();
234                return Ok(Value::BigIntArray(out));
235            }
236            let out: Vec<Option<i32>> = materialised
237                .into_iter()
238                .map(|v| match v {
239                    Value::Null => None,
240                    Value::Int(n) => Some(n),
241                    Value::SmallInt(n) => Some(i32::from(n)),
242                    _ => unreachable!(),
243                })
244                .collect();
245            Ok(Value::IntArray(out))
246        }
247        // v7.10.12 — `arr[i]` PG-style 1-based indexing.
248        // Out-of-range indices (including i ≤ 0) return NULL.
249        Expr::ArraySubscript { target, index } => {
250            let target_v = eval_expr(target, row, ctx)?;
251            let idx_v = eval_expr(index, row, ctx)?;
252            if matches!(target_v, Value::Null) || matches!(idx_v, Value::Null) {
253                return Ok(Value::Null);
254            }
255            let i: i64 = match idx_v {
256                Value::Int(n) => i64::from(n),
257                Value::BigInt(n) => n,
258                Value::SmallInt(n) => i64::from(n),
259                other => {
260                    return Err(EvalError::TypeMismatch {
261                        detail: format!(
262                            "array subscript must be integer, got {:?}",
263                            other.data_type()
264                        ),
265                    });
266                }
267            };
268            if i < 1 {
269                return Ok(Value::Null);
270            }
271            let pos = (i - 1) as usize;
272            match target_v {
273                Value::TextArray(items) => match items.get(pos) {
274                    Some(Some(s)) => Ok(Value::Text(s.clone())),
275                    Some(None) | None => Ok(Value::Null),
276                },
277                Value::IntArray(items) => match items.get(pos) {
278                    Some(Some(n)) => Ok(Value::Int(*n)),
279                    Some(None) | None => Ok(Value::Null),
280                },
281                Value::BigIntArray(items) => match items.get(pos) {
282                    Some(Some(n)) => Ok(Value::BigInt(*n)),
283                    Some(None) | None => Ok(Value::Null),
284                },
285                other => Err(EvalError::TypeMismatch {
286                    detail: format!(
287                        "subscript target must be an array, got {:?}",
288                        other.data_type()
289                    ),
290                }),
291            }
292        }
293        // v7.10.12 — `x op ANY(arr)` / `x op ALL(arr)`. PG
294        // 3VL: ANY → true if any element compares-true; NULL if
295        // no true but some NULL; false otherwise. ALL: false if
296        // any compares-false; NULL if no false but some NULL;
297        // true otherwise.
298        Expr::AnyAll {
299            expr,
300            op,
301            array,
302            is_any,
303        } => {
304            let lhs = eval_expr(expr, row, ctx)?;
305            let arr = eval_expr(array, row, ctx)?;
306            if matches!(arr, Value::Null) {
307                return Ok(Value::Null);
308            }
309            let elems: Vec<Option<Value>> = match arr {
310                Value::TextArray(items) => items.into_iter().map(|o| o.map(Value::Text)).collect(),
311                Value::IntArray(items) => items.into_iter().map(|o| o.map(Value::Int)).collect(),
312                Value::BigIntArray(items) => {
313                    items.into_iter().map(|o| o.map(Value::BigInt)).collect()
314                }
315                other => {
316                    return Err(EvalError::TypeMismatch {
317                        detail: format!(
318                            "ANY/ALL right-hand side must be an array, got {:?}",
319                            other.data_type()
320                        ),
321                    });
322                }
323            };
324            let mut saw_null = matches!(lhs, Value::Null);
325            let mut saw_match = false;
326            let mut saw_mismatch = false;
327            for elem in elems {
328                let elem_v = match elem {
329                    Some(v) => v,
330                    None => {
331                        saw_null = true;
332                        continue;
333                    }
334                };
335                if matches!(lhs, Value::Null) {
336                    saw_null = true;
337                    continue;
338                }
339                match apply_binary(*op, lhs.clone(), elem_v) {
340                    Ok(Value::Bool(true)) => saw_match = true,
341                    Ok(Value::Bool(false)) => saw_mismatch = true,
342                    Ok(Value::Null) => saw_null = true,
343                    Ok(other) => {
344                        return Err(EvalError::TypeMismatch {
345                            detail: format!(
346                                "ANY/ALL comparison didn't return Bool: {:?}",
347                                other.data_type()
348                            ),
349                        });
350                    }
351                    Err(e) => return Err(e),
352                }
353            }
354            let result = if *is_any {
355                if saw_match {
356                    Value::Bool(true)
357                } else if saw_null {
358                    Value::Null
359                } else {
360                    Value::Bool(false)
361                }
362            } else if saw_mismatch {
363                Value::Bool(false)
364            } else if saw_null {
365                Value::Null
366            } else {
367                Value::Bool(true)
368            };
369            Ok(result)
370        }
371    }
372}
373
374/// v7.10.10 — best-effort text rendering for non-TEXT array
375/// elements (numbers, bools, etc.). The PG rule is that
376/// `ARRAY[1, 2]` is `int[]`, but SPG's v7.10 only models TEXT[],
377/// so we widen by stringifying. NUMERIC formatting goes through
378/// the existing canonical helpers to stay consistent with
379/// `format_numeric` / `format_date` etc.
380fn value_to_text_for_array(v: &Value) -> String {
381    match v {
382        Value::Text(s) | Value::Json(s) => s.clone(),
383        Value::Int(n) => n.to_string(),
384        Value::BigInt(n) => n.to_string(),
385        Value::SmallInt(n) => n.to_string(),
386        Value::Bool(b) => {
387            if *b {
388                "true".into()
389            } else {
390                "false".into()
391            }
392        }
393        Value::Float(x) => format!("{x}"),
394        Value::Date(d) => format_date(*d),
395        Value::Timestamp(t) => format_timestamp(*t),
396        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
397        _ => format!("{v:?}"),
398    }
399}
400
401/// Pull an integer component (year / month / ... / microsecond) out
402/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
403/// when the source isn't a calendar type.
404fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
405    use spg_sql::ast::ExtractField as F;
406    if matches!(v, Value::Null) {
407        return Ok(Value::Null);
408    }
409    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
410    // the months part, the rest from the microseconds part. PG matches
411    // this convention (months is normalised modulo 12 for MONTH).
412    if let Value::Interval { months, micros } = *v {
413        let years = months / 12;
414        let mons = months % 12;
415        let secs_total = micros / 1_000_000;
416        let frac = micros % 1_000_000;
417        let result = match field {
418            F::Year => i64::from(years),
419            F::Month => i64::from(mons),
420            F::Day => micros / 86_400_000_000,
421            F::Hour => (secs_total / 3600) % 24,
422            F::Minute => (secs_total / 60) % 60,
423            F::Second => secs_total % 60,
424            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
425        };
426        return Ok(Value::BigInt(result));
427    }
428    let (days, day_micros) = match *v {
429        Value::Date(d) => (d, 0_i64),
430        Value::Timestamp(t) => {
431            let days = t.div_euclid(86_400_000_000);
432            let day_micros = t.rem_euclid(86_400_000_000);
433            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
434        }
435        _ => {
436            return Err(EvalError::TypeMismatch {
437                detail: format!(
438                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
439                    v.data_type()
440                ),
441            });
442        }
443    };
444    let (y, m, d) = civil_components(days);
445    let secs = day_micros / 1_000_000;
446    let hh = secs / 3600;
447    let mm = (secs / 60) % 60;
448    let ss = secs % 60;
449    let frac = day_micros % 1_000_000;
450    let result = match field {
451        F::Year => i64::from(y),
452        F::Month => i64::from(m),
453        F::Day => i64::from(d),
454        F::Hour => hh,
455        F::Minute => mm,
456        F::Second => ss,
457        F::Microsecond => ss * 1_000_000 + frac,
458    };
459    Ok(Value::BigInt(result))
460}
461
462/// Internal wrapper around the file-private `civil_from_days` so the
463/// public surface area doesn't change. Returns `(year, month, day)`.
464fn civil_components(days: i32) -> (i32, u32, u32) {
465    civil_from_days(days)
466}
467
468/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
469/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
470/// literal `%`. Matches the whole input — no implicit anchoring needed
471/// since SQL `LIKE` is always full-string.
472fn like_match(text: &str, pattern: &str) -> bool {
473    let text: Vec<char> = text.chars().collect();
474    let pat: Vec<char> = pattern.chars().collect();
475    like_match_inner(&text, 0, &pat, 0)
476}
477
478fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
479    while pi < pat.len() {
480        match pat[pi] {
481            '%' => {
482                // Collapse consecutive `%` and try every possible split.
483                while pi < pat.len() && pat[pi] == '%' {
484                    pi += 1;
485                }
486                if pi == pat.len() {
487                    return true;
488                }
489                for k in ti..=text.len() {
490                    if like_match_inner(text, k, pat, pi) {
491                        return true;
492                    }
493                }
494                return false;
495            }
496            '_' => {
497                if ti >= text.len() {
498                    return false;
499                }
500                ti += 1;
501                pi += 1;
502            }
503            '\\' if pi + 1 < pat.len() => {
504                let want = pat[pi + 1];
505                if ti >= text.len() || text[ti] != want {
506                    return false;
507                }
508                ti += 1;
509                pi += 2;
510            }
511            c => {
512                if ti >= text.len() || text[ti] != c {
513                    return false;
514                }
515                ti += 1;
516                pi += 1;
517            }
518        }
519    }
520    ti == text.len()
521}
522
523/// Dispatch on lowercased function name. v1.4 implements only a handful of
524/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
525fn apply_function(name: &str, args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
526    match name.to_ascii_lowercase().as_str() {
527        "length" => {
528            if args.len() != 1 {
529                return Err(EvalError::TypeMismatch {
530                    detail: format!("length() takes 1 arg, got {}", args.len()),
531                });
532            }
533            match &args[0] {
534                Value::Null => Ok(Value::Null),
535                Value::Text(s) => {
536                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
537                    Ok(Value::Int(n))
538                }
539                // v7.10.4 — PG semantics: length(bytea) returns
540                // byte count (= octet_length). Without this branch
541                // mailrs's INSERT … SELECT length(body) … against a
542                // BYTEA column would type-mismatch.
543                Value::Bytes(b) => {
544                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
545                    Ok(Value::Int(n))
546                }
547                other => Err(EvalError::TypeMismatch {
548                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
549                }),
550            }
551        }
552        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
553        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
554        // to length() for bytea by design.
555        "octet_length" => {
556            if args.len() != 1 {
557                return Err(EvalError::TypeMismatch {
558                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
559                });
560            }
561            match &args[0] {
562                Value::Null => Ok(Value::Null),
563                Value::Text(s) => {
564                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
565                    Ok(Value::Int(n))
566                }
567                Value::Bytes(b) => {
568                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
569                    Ok(Value::Int(n))
570                }
571                other => Err(EvalError::TypeMismatch {
572                    detail: format!(
573                        "octet_length() needs text or bytea, got {:?}",
574                        other.data_type()
575                    ),
576                }),
577            }
578        }
579        // v7.11.6 — `array_length(arr, dim)` returns the element
580        // count of `arr` along dimension `dim`. v7.11 only models
581        // single-dimension arrays so dim must be 1 (otherwise NULL,
582        // matching PG semantics for unsupported dimensions). NULL
583        // array → NULL. v7.11 TEXT[] only; non-array operand is
584        // a type mismatch.
585        "array_length" => {
586            if args.len() != 2 {
587                return Err(EvalError::TypeMismatch {
588                    detail: format!("array_length() takes 2 args, got {}", args.len()),
589                });
590            }
591            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
592                return Ok(Value::Null);
593            }
594            let len = match &args[0] {
595                Value::TextArray(items) => items.len(),
596                Value::IntArray(items) => items.len(),
597                Value::BigIntArray(items) => items.len(),
598                _ => {
599                    return Err(EvalError::TypeMismatch {
600                        detail: format!(
601                            "array_length() first arg must be an array, got {:?}",
602                            args[0].data_type()
603                        ),
604                    });
605                }
606            };
607            let dim: i64 = match args[1] {
608                Value::Int(n) => i64::from(n),
609                Value::BigInt(n) => n,
610                Value::SmallInt(n) => i64::from(n),
611                _ => {
612                    return Err(EvalError::TypeMismatch {
613                        detail: format!(
614                            "array_length() second arg must be integer, got {:?}",
615                            args[1].data_type()
616                        ),
617                    });
618                }
619            };
620            if dim != 1 {
621                return Ok(Value::Null);
622            }
623            let n = i32::try_from(len).unwrap_or(i32::MAX);
624            Ok(Value::Int(n))
625        }
626        // v7.11.6 — `array_position(arr, val)` returns 1-based
627        // index of the first element of `arr` equal to `val`, or
628        // NULL if not found. PG NULL semantics: NULL array → NULL;
629        // NULL val never matches (returns NULL if absent).
630        "array_position" => {
631            if args.len() != 2 {
632                return Err(EvalError::TypeMismatch {
633                    detail: format!("array_position() takes 2 args, got {}", args.len()),
634                });
635            }
636            if matches!(args[0], Value::Null) {
637                return Ok(Value::Null);
638            }
639            if matches!(args[1], Value::Null) {
640                return Ok(Value::Null);
641            }
642            match (&args[0], &args[1]) {
643                (Value::TextArray(items), Value::Text(needle)) => {
644                    for (idx, item) in items.iter().enumerate() {
645                        if let Some(s) = item
646                            && s == needle
647                        {
648                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
649                        }
650                    }
651                    Ok(Value::Null)
652                }
653                (Value::IntArray(items), needle_v)
654                    if matches!(
655                        needle_v,
656                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
657                    ) =>
658                {
659                    let needle: i64 = match *needle_v {
660                        Value::Int(n) => i64::from(n),
661                        Value::SmallInt(n) => i64::from(n),
662                        Value::BigInt(n) => n,
663                        _ => unreachable!(),
664                    };
665                    for (idx, item) in items.iter().enumerate() {
666                        if let Some(n) = item
667                            && i64::from(*n) == needle
668                        {
669                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
670                        }
671                    }
672                    Ok(Value::Null)
673                }
674                (Value::BigIntArray(items), needle_v)
675                    if matches!(
676                        needle_v,
677                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
678                    ) =>
679                {
680                    let needle: i64 = match *needle_v {
681                        Value::Int(n) => i64::from(n),
682                        Value::SmallInt(n) => i64::from(n),
683                        Value::BigInt(n) => n,
684                        _ => unreachable!(),
685                    };
686                    for (idx, item) in items.iter().enumerate() {
687                        if let Some(n) = item
688                            && *n == needle
689                        {
690                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
691                        }
692                    }
693                    Ok(Value::Null)
694                }
695                (
696                    arr @ (Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_)),
697                    other,
698                ) => Err(EvalError::TypeMismatch {
699                    detail: format!(
700                        "array_position() needle type {:?} doesn't match array {:?}",
701                        other.data_type(),
702                        arr.data_type()
703                    ),
704                }),
705                (other, _) => Err(EvalError::TypeMismatch {
706                    detail: format!(
707                        "array_position() first arg must be an array, got {:?}",
708                        other.data_type()
709                    ),
710                }),
711            }
712        }
713        // v7.11.15 — `substring(s, start)` / `substring(s, start, length)`
714        // for both TEXT and BYTEA. PG semantics: `start` is 1-based;
715        // values ≤ 0 clamp into the string (i.e. effective start is
716        // adjusted so the window still begins at index 1 — but
717        // `length` is reduced by the clipped prefix). A NULL arg
718        // makes the result NULL. Out-of-range windows return an
719        // empty value, not NULL.
720        "substring" => {
721            if !matches!(args.len(), 2 | 3) {
722                return Err(EvalError::TypeMismatch {
723                    detail: format!("substring() takes 2 or 3 args, got {}", args.len()),
724                });
725            }
726            if args.iter().any(|a| matches!(a, Value::Null)) {
727                return Ok(Value::Null);
728            }
729            let start: i64 = match args[1] {
730                Value::Int(n) => i64::from(n),
731                Value::BigInt(n) => n,
732                Value::SmallInt(n) => i64::from(n),
733                _ => {
734                    return Err(EvalError::TypeMismatch {
735                        detail: format!(
736                            "substring() start must be integer, got {:?}",
737                            args[1].data_type()
738                        ),
739                    });
740                }
741            };
742            let length: Option<i64> = if args.len() == 3 {
743                match args[2] {
744                    Value::Int(n) => Some(i64::from(n)),
745                    Value::BigInt(n) => Some(n),
746                    Value::SmallInt(n) => Some(i64::from(n)),
747                    _ => {
748                        return Err(EvalError::TypeMismatch {
749                            detail: format!(
750                                "substring() length must be integer, got {:?}",
751                                args[2].data_type()
752                            ),
753                        });
754                    }
755                }
756            } else {
757                None
758            };
759            // PG: when length is given, end = start + length; if
760            // end < start the result is empty. Clip start to 1.
761            let (effective_start, effective_length): (i64, Option<i64>) = match length {
762                Some(len) => {
763                    let end = start.saturating_add(len);
764                    if end <= 1 || len < 0 {
765                        return Ok(match &args[0] {
766                            Value::Text(_) => Value::Text(String::new()),
767                            Value::Bytes(_) => Value::Bytes(Vec::new()),
768                            other => {
769                                return Err(EvalError::TypeMismatch {
770                                    detail: format!(
771                                        "substring() needs text or bytea, got {:?}",
772                                        other.data_type()
773                                    ),
774                                });
775                            }
776                        });
777                    }
778                    let eff_start = start.max(1);
779                    let eff_len = end - eff_start;
780                    (eff_start, Some(eff_len.max(0)))
781                }
782                None => (start.max(1), None),
783            };
784            match &args[0] {
785                Value::Text(s) => {
786                    // PG counts in characters (codepoints) for TEXT.
787                    let chars: Vec<char> = s.chars().collect();
788                    let skip = (effective_start - 1) as usize;
789                    if skip >= chars.len() {
790                        return Ok(Value::Text(String::new()));
791                    }
792                    let take = match effective_length {
793                        Some(n) => (n as usize).min(chars.len() - skip),
794                        None => chars.len() - skip,
795                    };
796                    Ok(Value::Text(chars[skip..skip + take].iter().collect()))
797                }
798                Value::Bytes(b) => {
799                    let skip = (effective_start - 1) as usize;
800                    if skip >= b.len() {
801                        return Ok(Value::Bytes(Vec::new()));
802                    }
803                    let take = match effective_length {
804                        Some(n) => (n as usize).min(b.len() - skip),
805                        None => b.len() - skip,
806                    };
807                    Ok(Value::Bytes(b[skip..skip + take].to_vec()))
808                }
809                other => Err(EvalError::TypeMismatch {
810                    detail: format!(
811                        "substring() needs text or bytea, got {:?}",
812                        other.data_type()
813                    ),
814                }),
815            }
816        }
817        // v7.11.15 — `position(needle, haystack)`. PG semantics:
818        // 1-based byte/char index of first occurrence, or 0 if
819        // absent. NULL on either operand → NULL. Empty needle
820        // returns 1 (PG convention). Works on TEXT (char positions)
821        // and BYTEA (byte positions). (The PG-spec syntax `position(
822        // needle IN haystack)` is not parsed in v7.11; clients must
823        // call the function-call form.)
824        "position" => {
825            if args.len() != 2 {
826                return Err(EvalError::TypeMismatch {
827                    detail: format!("position() takes 2 args, got {}", args.len()),
828                });
829            }
830            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
831                return Ok(Value::Null);
832            }
833            match (&args[0], &args[1]) {
834                (Value::Text(needle), Value::Text(haystack)) => {
835                    if needle.is_empty() {
836                        return Ok(Value::Int(1));
837                    }
838                    // Char-based position (PG uses character count).
839                    let h_chars: Vec<char> = haystack.chars().collect();
840                    let n_chars: Vec<char> = needle.chars().collect();
841                    if n_chars.len() > h_chars.len() {
842                        return Ok(Value::Int(0));
843                    }
844                    for i in 0..=h_chars.len() - n_chars.len() {
845                        if h_chars[i..i + n_chars.len()] == n_chars[..] {
846                            return Ok(Value::Int(i32::try_from(i + 1).unwrap_or(i32::MAX)));
847                        }
848                    }
849                    Ok(Value::Int(0))
850                }
851                (Value::Bytes(needle), Value::Bytes(haystack)) => {
852                    if needle.is_empty() {
853                        return Ok(Value::Int(1));
854                    }
855                    if needle.len() > haystack.len() {
856                        return Ok(Value::Int(0));
857                    }
858                    for i in 0..=haystack.len() - needle.len() {
859                        if &haystack[i..i + needle.len()] == needle.as_slice() {
860                            return Ok(Value::Int(i32::try_from(i + 1).unwrap_or(i32::MAX)));
861                        }
862                    }
863                    Ok(Value::Int(0))
864                }
865                (a, b) => Err(EvalError::TypeMismatch {
866                    detail: format!(
867                        "position() operands must both be text or both bytea, got {:?} and {:?}",
868                        a.data_type(),
869                        b.data_type()
870                    ),
871                }),
872            }
873        }
874        "upper" => {
875            if args.len() != 1 {
876                return Err(EvalError::TypeMismatch {
877                    detail: format!("upper() takes 1 arg, got {}", args.len()),
878                });
879            }
880            match &args[0] {
881                Value::Null => Ok(Value::Null),
882                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
883                other => Err(EvalError::TypeMismatch {
884                    detail: format!("upper() needs text, got {:?}", other.data_type()),
885                }),
886            }
887        }
888        "lower" => {
889            if args.len() != 1 {
890                return Err(EvalError::TypeMismatch {
891                    detail: format!("lower() takes 1 arg, got {}", args.len()),
892                });
893            }
894            match &args[0] {
895                Value::Null => Ok(Value::Null),
896                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
897                other => Err(EvalError::TypeMismatch {
898                    detail: format!("lower() needs text, got {:?}", other.data_type()),
899                }),
900            }
901        }
902        "abs" => {
903            if args.len() != 1 {
904                return Err(EvalError::TypeMismatch {
905                    detail: format!("abs() takes 1 arg, got {}", args.len()),
906                });
907            }
908            match &args[0] {
909                Value::Null => Ok(Value::Null),
910                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
911                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
912                Value::Float(x) => Ok(Value::Float(x.abs())),
913                other => Err(EvalError::TypeMismatch {
914                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
915                }),
916            }
917        }
918        "coalesce" => {
919            for a in args {
920                if !matches!(a, Value::Null) {
921                    return Ok(a.clone());
922                }
923            }
924            Ok(Value::Null)
925        }
926        "date_trunc" => date_trunc(args),
927        "date_part" => date_part(args),
928        "age" => age(args),
929        "to_char" => to_char(args),
930        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
931        "encode" => encode_text(args),
932        "decode" => decode_text(args),
933        "error_on_null" => error_on_null(args),
934        // v7.12.1 — PG full-text search lexer / tsquery builders.
935        // mailrs G-CRIT-3 acceptance path: `to_tsvector('english',
936        // … || ' ' || … || …)` runs end-to-end against a tsvector
937        // column with Porter stemming + standard english stopwords.
938        "to_tsvector" => fts_to_tsvector(args, ctx),
939        "plainto_tsquery" => fts_plainto_tsquery(args, ctx),
940        "phraseto_tsquery" => fts_phraseto_tsquery(args, ctx),
941        "websearch_to_tsquery" => fts_websearch_to_tsquery(args, ctx),
942        "to_tsquery" => fts_to_tsquery(args, ctx),
943        // v7.12.2 — ranking functions. mailrs's fallback search
944        // query ORDERs BY ts_rank(search_vector, q) DESC.
945        "ts_rank" => fts_ts_rank(args),
946        "ts_rank_cd" => fts_ts_rank_cd(args),
947        other => Err(EvalError::TypeMismatch {
948            detail: format!("unknown function `{other}`"),
949        }),
950    }
951}
952
953/// v7.12.2 — `ts_rank([weights,] vec, query [, norm])`. v7.12.2
954/// supports the canonical `(vec, query)` two-arg form mailrs uses;
955/// optional weight-array / normalisation arguments error with an
956/// "unsupported" message rather than silently changing semantics.
957fn fts_ts_rank(args: &[Value]) -> Result<Value, EvalError> {
958    let (vec, query) = parse_rank_args("ts_rank", args)?;
959    match (vec, query) {
960        (None, _) | (_, None) => Ok(Value::Null),
961        (Some(v), Some(q)) => Ok(Value::Float(f64::from(crate::fts::ts_rank(&v, &q)))),
962    }
963}
964
965fn fts_ts_rank_cd(args: &[Value]) -> Result<Value, EvalError> {
966    let (vec, query) = parse_rank_args("ts_rank_cd", args)?;
967    match (vec, query) {
968        (None, _) | (_, None) => Ok(Value::Null),
969        (Some(v), Some(q)) => Ok(Value::Float(f64::from(crate::fts::ts_rank_cd(&v, &q)))),
970    }
971}
972
973fn parse_rank_args(
974    name: &str,
975    args: &[Value],
976) -> Result<
977    (
978        Option<Vec<spg_storage::TsLexeme>>,
979        Option<spg_storage::TsQueryAst>,
980    ),
981    EvalError,
982> {
983    if args.len() != 2 {
984        return Err(EvalError::TypeMismatch {
985            detail: format!(
986                "{name}() takes 2 args in v7.12.2 (weights array + normalisation flag are v7.12.x carve-out), got {}",
987                args.len()
988            ),
989        });
990    }
991    let vec = match &args[0] {
992        Value::Null => None,
993        Value::TsVector(v) => Some(v.clone()),
994        other => {
995            return Err(EvalError::TypeMismatch {
996                detail: format!(
997                    "{name}() first arg must be tsvector, got {:?}",
998                    other.data_type()
999                ),
1000            });
1001        }
1002    };
1003    let query = match &args[1] {
1004        Value::Null => None,
1005        Value::TsQuery(q) => Some(q.clone()),
1006        other => {
1007            return Err(EvalError::TypeMismatch {
1008                detail: format!(
1009                    "{name}() second arg must be tsquery, got {:?}",
1010                    other.data_type()
1011                ),
1012            });
1013        }
1014    };
1015    Ok((vec, query))
1016}
1017
1018/// v7.12.2 — `tsvector @@ tsquery` match operator. Either
1019/// ordering accepted (PG semantics). NULL on either side → NULL.
1020/// Anything that isn't tsvector/tsquery on either side is a type
1021/// mismatch. Returns BOOL.
1022fn ts_match(l: Value, r: Value) -> Result<Value, EvalError> {
1023    let (vec, query) = match (l, r) {
1024        (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
1025        (Value::TsVector(v), Value::TsQuery(q)) => (v, q),
1026        (Value::TsQuery(q), Value::TsVector(v)) => (v, q),
1027        (l, r) => {
1028            return Err(EvalError::TypeMismatch {
1029                detail: format!(
1030                    "@@ requires (tsvector, tsquery), got ({:?}, {:?})",
1031                    l.data_type(),
1032                    r.data_type()
1033                ),
1034            });
1035        }
1036    };
1037    Ok(Value::Bool(crate::fts::ts_query_matches(&vec, &query)))
1038}
1039
1040/// v7.12.1 — `to_tsvector([config,] text)`. With one arg the
1041/// session-resolved `default_text_search_config` is used (defaults
1042/// to `simple` when unset); with two args the first picks the
1043/// config. NULL text → NULL.
1044fn fts_to_tsvector(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1045    let (config, text) = parse_fts_args("to_tsvector", args, ctx)?;
1046    match text {
1047        None => Ok(Value::Null),
1048        Some(t) => Ok(Value::TsVector(crate::fts::to_tsvector(config, &t))),
1049    }
1050}
1051
1052fn fts_plainto_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1053    let (config, text) = parse_fts_args("plainto_tsquery", args, ctx)?;
1054    match text {
1055        None => Ok(Value::Null),
1056        Some(t) => Ok(Value::TsQuery(crate::fts::plainto_tsquery(config, &t))),
1057    }
1058}
1059
1060fn fts_phraseto_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1061    let (config, text) = parse_fts_args("phraseto_tsquery", args, ctx)?;
1062    match text {
1063        None => Ok(Value::Null),
1064        Some(t) => Ok(Value::TsQuery(crate::fts::phraseto_tsquery(config, &t))),
1065    }
1066}
1067
1068fn fts_websearch_to_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1069    let (config, text) = parse_fts_args("websearch_to_tsquery", args, ctx)?;
1070    match text {
1071        None => Ok(Value::Null),
1072        Some(t) => Ok(Value::TsQuery(crate::fts::websearch_to_tsquery(config, &t))),
1073    }
1074}
1075
1076fn fts_to_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1077    let (config, text) = parse_fts_args("to_tsquery", args, ctx)?;
1078    match text {
1079        None => Ok(Value::Null),
1080        Some(t) => Ok(Value::TsQuery(crate::fts::to_tsquery(config, &t)?)),
1081    }
1082}
1083
1084/// Parse the `(config, text)` / `(text)` argument pair shared by
1085/// all FTS builders. Returns the resolved config + the text
1086/// payload (None when text is NULL). The one-arg form pulls the
1087/// config from the session's `default_text_search_config`.
1088fn parse_fts_args(
1089    name: &str,
1090    args: &[Value],
1091    ctx: &EvalContext<'_>,
1092) -> Result<(crate::fts::TsConfig, Option<String>), EvalError> {
1093    let (config_arg, text_arg) = match args {
1094        [t] => (None, t),
1095        [c, t] => (Some(c), t),
1096        _ => {
1097            return Err(EvalError::TypeMismatch {
1098                detail: format!("{name}() takes 1 or 2 args, got {}", args.len()),
1099            });
1100        }
1101    };
1102    let config = match config_arg {
1103        None => match ctx.default_text_search_config {
1104            Some(name_str) => crate::fts::TsConfig::from_name(name_str).ok_or_else(|| {
1105                EvalError::TypeMismatch {
1106                    detail: format!(
1107                        "text search config not implemented: {name_str:?} (supported: simple, english)"
1108                    ),
1109                }
1110            })?,
1111            None => crate::fts::TsConfig::Simple,
1112        },
1113        Some(Value::Null) => return Ok((crate::fts::TsConfig::Simple, None)),
1114        Some(Value::Text(name_str)) => crate::fts::TsConfig::from_name(name_str).ok_or_else(|| {
1115            EvalError::TypeMismatch {
1116                detail: format!(
1117                    "text search config not implemented: {name_str:?} (supported: simple, english)"
1118                ),
1119            }
1120        })?,
1121        Some(other) => {
1122            return Err(EvalError::TypeMismatch {
1123                detail: format!(
1124                    "{name}() config arg must be text, got {:?}",
1125                    other.data_type()
1126                ),
1127            });
1128        }
1129    };
1130    let text = match text_arg {
1131        Value::Null => None,
1132        Value::Text(s) => Some(s.clone()),
1133        other => {
1134            return Err(EvalError::TypeMismatch {
1135                detail: format!(
1136                    "{name}() text arg must be text, got {:?}",
1137                    other.data_type()
1138                ),
1139            });
1140        }
1141    };
1142    Ok((config, text))
1143}
1144
1145/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
1146/// arguments; SPG's value space treats Text as the byte container
1147/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
1148/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
1149/// hex.
1150fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
1151    if args.len() != 2 {
1152        return Err(EvalError::TypeMismatch {
1153            detail: format!("encode() takes 2 args, got {}", args.len()),
1154        });
1155    }
1156    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
1157        return Ok(Value::Null);
1158    }
1159    let bytes: &[u8] = match &args[0] {
1160        Value::Text(s) => s.as_bytes(),
1161        other => {
1162            return Err(EvalError::TypeMismatch {
1163                detail: format!("encode() expects text bytes, got {:?}", other.data_type()),
1164            });
1165        }
1166    };
1167    let fmt = match &args[1] {
1168        Value::Text(s) => s.to_ascii_lowercase(),
1169        other => {
1170            return Err(EvalError::TypeMismatch {
1171                detail: format!("encode() format must be text, got {:?}", other.data_type()),
1172            });
1173        }
1174    };
1175    let out = match fmt.as_str() {
1176        "base64" => b64_encode(bytes, B64_STD),
1177        "base64url" => b64_encode(bytes, B64_URL),
1178        "base32hex" => b32hex_encode(bytes),
1179        "hex" => hex_encode(bytes),
1180        other => {
1181            return Err(EvalError::TypeMismatch {
1182                detail: format!("encode(): unknown format `{other}`"),
1183            });
1184        }
1185    };
1186    Ok(Value::Text(out))
1187}
1188
1189/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
1190/// Text containing the raw decoded bytes (caller may CAST to bytea
1191/// equivalent if SPG adds bytea later).
1192fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
1193    if args.len() != 2 {
1194        return Err(EvalError::TypeMismatch {
1195            detail: format!("decode() takes 2 args, got {}", args.len()),
1196        });
1197    }
1198    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
1199        return Ok(Value::Null);
1200    }
1201    let text = match &args[0] {
1202        Value::Text(s) => s.as_str(),
1203        other => {
1204            return Err(EvalError::TypeMismatch {
1205                detail: format!("decode() expects text, got {:?}", other.data_type()),
1206            });
1207        }
1208    };
1209    let fmt = match &args[1] {
1210        Value::Text(s) => s.to_ascii_lowercase(),
1211        other => {
1212            return Err(EvalError::TypeMismatch {
1213                detail: format!("decode() format must be text, got {:?}", other.data_type()),
1214            });
1215        }
1216    };
1217    let bytes = match fmt.as_str() {
1218        "base64" => b64_decode(text, B64_STD)?,
1219        "base64url" => b64_decode(text, B64_URL)?,
1220        "base32hex" => b32hex_decode(text)?,
1221        "hex" => hex_decode(text)?,
1222        other => {
1223            return Err(EvalError::TypeMismatch {
1224                detail: format!("decode(): unknown format `{other}`"),
1225            });
1226        }
1227    };
1228    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
1229        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
1230    })?;
1231    Ok(Value::Text(s))
1232}
1233
1234/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
1235/// errors otherwise. Convenience to assert NOT NULL inside an
1236/// expression without wrapping it in COALESCE + raise hacks.
1237fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
1238    if args.len() != 1 {
1239        return Err(EvalError::TypeMismatch {
1240            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
1241        });
1242    }
1243    if matches!(args[0], Value::Null) {
1244        return Err(EvalError::TypeMismatch {
1245            detail: "error_on_null(): argument is NULL".into(),
1246        });
1247    }
1248    Ok(args[0].clone())
1249}
1250
1251// ── byte-level encoders ───────────────────────────────────────────
1252
1253const B64_STD: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1254const B64_URL: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
1255const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
1256
1257fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
1258    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1259    let mut i = 0;
1260    while i + 3 <= bytes.len() {
1261        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
1262        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1263        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1264        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1265        out.push(alpha[(n & 0x3f) as usize] as char);
1266        i += 3;
1267    }
1268    let rem = bytes.len() - i;
1269    if rem == 1 {
1270        let n = (bytes[i] as u32) << 16;
1271        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1272        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1273        out.push('=');
1274        out.push('=');
1275    } else if rem == 2 {
1276        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
1277        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1278        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1279        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1280        out.push('=');
1281    }
1282    out
1283}
1284
1285fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
1286    let mut lookup = [255u8; 256];
1287    for (i, &c) in alpha.iter().enumerate() {
1288        lookup[c as usize] = i as u8;
1289    }
1290    let mut out = Vec::with_capacity(text.len() * 3 / 4);
1291    let mut buf: u32 = 0;
1292    let mut bits: u32 = 0;
1293    for c in text.bytes() {
1294        if c == b'=' {
1295            break;
1296        }
1297        if c == b'\n' || c == b'\r' || c == b' ' {
1298            continue;
1299        }
1300        let v = lookup[c as usize];
1301        if v == 255 {
1302            return Err(EvalError::TypeMismatch {
1303                detail: format!("decode(base64): invalid char {:?}", c as char),
1304            });
1305        }
1306        buf = (buf << 6) | v as u32;
1307        bits += 6;
1308        if bits >= 8 {
1309            bits -= 8;
1310            out.push(((buf >> bits) & 0xff) as u8);
1311        }
1312    }
1313    Ok(out)
1314}
1315
1316fn b32hex_encode(bytes: &[u8]) -> String {
1317    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
1318    let mut buf: u64 = 0;
1319    let mut bits: u32 = 0;
1320    for &b in bytes {
1321        buf = (buf << 8) | b as u64;
1322        bits += 8;
1323        while bits >= 5 {
1324            bits -= 5;
1325            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
1326        }
1327    }
1328    if bits > 0 {
1329        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
1330    }
1331    // Pad to multiple of 8.
1332    while out.len() % 8 != 0 {
1333        out.push('=');
1334    }
1335    out
1336}
1337
1338fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1339    let mut lookup = [255u8; 256];
1340    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
1341        lookup[c as usize] = i as u8;
1342        // base32hex is case-insensitive — also map lowercase.
1343        let lower = (c as char).to_ascii_lowercase() as u8;
1344        lookup[lower as usize] = i as u8;
1345    }
1346    let mut out = Vec::with_capacity(text.len() * 5 / 8);
1347    let mut buf: u64 = 0;
1348    let mut bits: u32 = 0;
1349    for c in text.bytes() {
1350        if c == b'=' {
1351            break;
1352        }
1353        if c == b'\n' || c == b'\r' || c == b' ' {
1354            continue;
1355        }
1356        let v = lookup[c as usize];
1357        if v == 255 {
1358            return Err(EvalError::TypeMismatch {
1359                detail: format!("decode(base32hex): invalid char {:?}", c as char),
1360            });
1361        }
1362        buf = (buf << 5) | v as u64;
1363        bits += 5;
1364        if bits >= 8 {
1365            bits -= 8;
1366            out.push(((buf >> bits) & 0xff) as u8);
1367        }
1368    }
1369    Ok(out)
1370}
1371
1372fn hex_encode(bytes: &[u8]) -> String {
1373    const HEX: &[u8; 16] = b"0123456789abcdef";
1374    let mut out = String::with_capacity(bytes.len() * 2);
1375    for &b in bytes {
1376        out.push(HEX[(b >> 4) as usize] as char);
1377        out.push(HEX[(b & 0xf) as usize] as char);
1378    }
1379    out
1380}
1381
1382fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1383    let trimmed = text.trim();
1384    if trimmed.len() % 2 != 0 {
1385        return Err(EvalError::TypeMismatch {
1386            detail: "decode(hex): input length must be even".into(),
1387        });
1388    }
1389    let mut out = Vec::with_capacity(trimmed.len() / 2);
1390    let mut hi: u8 = 0;
1391    for (i, c) in trimmed.bytes().enumerate() {
1392        let v = match c {
1393            b'0'..=b'9' => c - b'0',
1394            b'a'..=b'f' => c - b'a' + 10,
1395            b'A'..=b'F' => c - b'A' + 10,
1396            _ => {
1397                return Err(EvalError::TypeMismatch {
1398                    detail: format!("decode(hex): invalid char {:?}", c as char),
1399                });
1400            }
1401        };
1402        if i % 2 == 0 {
1403            hi = v;
1404        } else {
1405            out.push((hi << 4) | v);
1406        }
1407    }
1408    Ok(out)
1409}
1410
1411/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
1412/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
1413/// same `BigInt` return shape; PG returns double precision but we keep the
1414/// integer convention so the runner's `query I` shape works unchanged.
1415fn date_part(args: &[Value]) -> Result<Value, EvalError> {
1416    use spg_sql::ast::ExtractField as F;
1417    if args.len() != 2 {
1418        return Err(EvalError::TypeMismatch {
1419            detail: format!("date_part() takes 2 args, got {}", args.len()),
1420        });
1421    }
1422    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1423        return Ok(Value::Null);
1424    }
1425    let Value::Text(field_name) = &args[0] else {
1426        return Err(EvalError::TypeMismatch {
1427            detail: format!(
1428                "date_part() needs a text field, got {:?}",
1429                args[0].data_type()
1430            ),
1431        });
1432    };
1433    let field = match field_name.to_ascii_lowercase().as_str() {
1434        "year" => F::Year,
1435        "month" => F::Month,
1436        "day" => F::Day,
1437        "hour" => F::Hour,
1438        "minute" => F::Minute,
1439        "second" => F::Second,
1440        "microsecond" | "microseconds" => F::Microsecond,
1441        other => {
1442            return Err(EvalError::TypeMismatch {
1443                detail: format!(
1444                    "unknown date_part field {other:?}; \
1445                     supported: year, month, day, hour, minute, second, microsecond"
1446                ),
1447            });
1448        }
1449    };
1450    extract_field(field, &args[1])
1451}
1452
1453/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
1454/// micros-only interval (no months normalisation) because PG's
1455/// month-justification rule is sensitive to the day-of-month walk and
1456/// adds material complexity for marginal corpus value.
1457///
1458/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
1459/// the dispatcher errors instead of guessing a clock source. Callers
1460/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
1461/// explicitly so the clock reference is visible at the SQL layer.
1462fn age(args: &[Value]) -> Result<Value, EvalError> {
1463    if args.is_empty() || args.len() > 2 {
1464        return Err(EvalError::TypeMismatch {
1465            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
1466        });
1467    }
1468    if args.iter().any(|v| matches!(v, Value::Null)) {
1469        return Ok(Value::Null);
1470    }
1471    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
1472    // stays as-is; anything else errors.
1473    let to_micros = |v: &Value| -> Result<i64, EvalError> {
1474        match v {
1475            Value::Timestamp(t) => Ok(*t),
1476            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
1477            other => Err(EvalError::TypeMismatch {
1478                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
1479            }),
1480        }
1481    };
1482    if args.len() == 1 {
1483        return Err(EvalError::TypeMismatch {
1484            detail: "single-arg age() is unsupported in v2.12 \
1485                     (use age(CURRENT_DATE, t) explicitly)"
1486                .into(),
1487        });
1488    }
1489    let a = to_micros(&args[0])?;
1490    let b = to_micros(&args[1])?;
1491    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
1492        detail: "age() subtraction overflows i64 microseconds".into(),
1493    })?;
1494    Ok(Value::Interval {
1495        months: 0,
1496        micros: delta,
1497    })
1498}
1499
1500/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
1501/// format template. Supports the high-traffic placeholders:
1502///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
1503/// Unrecognised characters pass through literally so the template's
1504/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
1505fn to_char(args: &[Value]) -> Result<Value, EvalError> {
1506    use core::fmt::Write as _;
1507    if args.len() != 2 {
1508        return Err(EvalError::TypeMismatch {
1509            detail: format!("to_char() takes 2 args, got {}", args.len()),
1510        });
1511    }
1512    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1513        return Ok(Value::Null);
1514    }
1515    let Value::Text(fmt) = &args[1] else {
1516        return Err(EvalError::TypeMismatch {
1517            detail: format!(
1518                "to_char() needs a text format, got {:?}",
1519                args[1].data_type()
1520            ),
1521        });
1522    };
1523    let (days, day_micros) = match &args[0] {
1524        Value::Date(d) => (*d, 0_i64),
1525        Value::Timestamp(t) => {
1526            let days = t.div_euclid(86_400_000_000);
1527            (
1528                i32::try_from(days).unwrap_or(i32::MAX),
1529                t.rem_euclid(86_400_000_000),
1530            )
1531        }
1532        other => {
1533            return Err(EvalError::TypeMismatch {
1534                detail: format!(
1535                    "to_char() needs DATE or TIMESTAMP, got {:?}",
1536                    other.data_type()
1537                ),
1538            });
1539        }
1540    };
1541    let (y, mo, d) = civil_from_days(days);
1542    let secs = day_micros / 1_000_000;
1543    let frac = day_micros % 1_000_000;
1544    // div_euclid keeps every value non-negative — the casts below are
1545    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
1546    // 1_000_000)`, so all three quantities fit in u32.
1547    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
1548    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
1549    let ss = u32::try_from(secs % 60).unwrap_or(0);
1550    let hh12 = match hh24 % 12 {
1551        0 => 12,
1552        x => x,
1553    };
1554    let ampm = if hh24 < 12 { "AM" } else { "PM" };
1555    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
1556    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
1557
1558    let mut out = String::with_capacity(fmt.len() + 8);
1559    let bytes = fmt.as_bytes();
1560    let mut i = 0;
1561    // write! against a String never fails — discard the Result.
1562    while i < bytes.len() {
1563        // Try the longest prefixes first so "YYYY" wins over "YY".
1564        let rest = &bytes[i..];
1565        if rest.starts_with(b"YYYY") {
1566            let _ = write!(out, "{y:04}");
1567            i += 4;
1568        } else if rest.starts_with(b"YY") {
1569            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
1570            let yy = (y.rem_euclid(100)) as u32;
1571            let _ = write!(out, "{yy:02}");
1572            i += 2;
1573        } else if rest.starts_with(b"Month") {
1574            out.push_str(MONTH_FULL[(mo - 1) as usize]);
1575            i += 5;
1576        } else if rest.starts_with(b"Mon") {
1577            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
1578            i += 3;
1579        } else if rest.starts_with(b"MM") {
1580            let _ = write!(out, "{mo:02}");
1581            i += 2;
1582        } else if rest.starts_with(b"DD") {
1583            let _ = write!(out, "{d:02}");
1584            i += 2;
1585        } else if rest.starts_with(b"HH24") {
1586            let _ = write!(out, "{hh24:02}");
1587            i += 4;
1588        } else if rest.starts_with(b"HH12") {
1589            let _ = write!(out, "{hh12:02}");
1590            i += 4;
1591        } else if rest.starts_with(b"MI") {
1592            let _ = write!(out, "{mi:02}");
1593            i += 2;
1594        } else if rest.starts_with(b"SS") {
1595            let _ = write!(out, "{ss:02}");
1596            i += 2;
1597        } else if rest.starts_with(b"MS") {
1598            let _ = write!(out, "{ms:03}");
1599            i += 2;
1600        } else if rest.starts_with(b"US") {
1601            let _ = write!(out, "{us:06}");
1602            i += 2;
1603        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
1604            out.push_str(ampm);
1605            i += 2;
1606        } else {
1607            // Pass any non-placeholder byte through verbatim.
1608            out.push(bytes[i] as char);
1609            i += 1;
1610        }
1611    }
1612    Ok(Value::Text(out))
1613}
1614
1615const MONTH_FULL: [&str; 12] = [
1616    "January",
1617    "February",
1618    "March",
1619    "April",
1620    "May",
1621    "June",
1622    "July",
1623    "August",
1624    "September",
1625    "October",
1626    "November",
1627    "December",
1628];
1629const MONTH_ABBR: [&str; 12] = [
1630    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1631];
1632
1633/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
1634/// requested calendar boundary (year / month / day / hour / minute /
1635/// second). Returns the truncated `TIMESTAMP`. NULL on either side
1636/// propagates to NULL.
1637fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
1638    if args.len() != 2 {
1639        return Err(EvalError::TypeMismatch {
1640            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
1641        });
1642    }
1643    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1644        return Ok(Value::Null);
1645    }
1646    let Value::Text(unit) = &args[0] else {
1647        return Err(EvalError::TypeMismatch {
1648            detail: format!(
1649                "date_trunc() needs a text unit, got {:?}",
1650                args[0].data_type()
1651            ),
1652        });
1653    };
1654    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
1655    // midnight first; the result is always TIMESTAMP.
1656    let micros = match &args[1] {
1657        Value::Timestamp(t) => *t,
1658        Value::Date(d) => i64::from(*d) * 86_400_000_000,
1659        other => {
1660            return Err(EvalError::TypeMismatch {
1661                detail: format!(
1662                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
1663                    other.data_type()
1664                ),
1665            });
1666        }
1667    };
1668    let unit_lc = unit.to_ascii_lowercase();
1669    let days = micros.div_euclid(86_400_000_000);
1670    let day_micros = micros.rem_euclid(86_400_000_000);
1671    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1672    let (y, m, _) = civil_from_days(day_i32);
1673    let truncated = match unit_lc.as_str() {
1674        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
1675        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
1676        "day" => days * 86_400_000_000,
1677        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
1678        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
1679        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
1680        other => {
1681            return Err(EvalError::TypeMismatch {
1682                detail: format!(
1683                    "unknown date_trunc unit {other:?}; \
1684                     supported: year, month, day, hour, minute, second"
1685                ),
1686            });
1687        }
1688    };
1689    Ok(Value::Timestamp(truncated))
1690}
1691
1692/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
1693pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
1694    if matches!(v, Value::Null) {
1695        return Ok(Value::Null);
1696    }
1697    match target {
1698        CastTarget::Vector => cast_to_vector(v),
1699        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
1700        CastTarget::Int => cast_numeric_to_int(v),
1701        CastTarget::BigInt => cast_numeric_to_bigint(v),
1702        CastTarget::Float => cast_numeric_to_float(v),
1703        CastTarget::Bool => cast_to_bool(v),
1704        CastTarget::Date => cast_to_date(v),
1705        // TIMESTAMP and TIMESTAMPTZ have identical runtime
1706        // representation (i64 microseconds UTC).
1707        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
1708        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
1709        // is supported (the mailrs idiom: `$1::INTERVAL` where the
1710        // bound param is a string like `'7 days'`).
1711        CastTarget::Interval => cast_to_interval(v),
1712        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
1713        // (validation is the producer's responsibility, same as
1714        // the column-INSERT path).
1715        CastTarget::Json | CastTarget::Jsonb => match v {
1716            Value::Json(s) => Ok(Value::Json(s)),
1717            Value::Text(s) => Ok(Value::Json(s)),
1718            other => Err(EvalError::TypeMismatch {
1719                detail: alloc::format!(
1720                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
1721                    other.data_type()
1722                ),
1723            }),
1724        },
1725        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1726        // pg_catalog; surface a clear error.
1727        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1728            detail: "::regtype / ::regclass not supported on SPG \
1729                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1730                .into(),
1731        }),
1732        // v7.10.11 — `::TEXT[]`. Decode PG external array form
1733        // when input is Text; pass through unchanged when it is
1734        // already TextArray. Anything else is a type mismatch.
1735        CastTarget::TextArray => match v {
1736            Value::TextArray(items) => Ok(Value::TextArray(items)),
1737            Value::Text(s) => decode_text_array_external(&s).map(Value::TextArray),
1738            other => Err(EvalError::TypeMismatch {
1739                detail: alloc::format!(
1740                    "::TEXT[] only accepts TEXT / TEXT[] inputs, got {:?}",
1741                    other.data_type()
1742                ),
1743            }),
1744        },
1745        // v7.11.13 — `::INT[]` / `::BIGINT[]`. Decode PG external
1746        // form `{1,2,3}` when input is Text; widen TextArray /
1747        // IntArray as appropriate.
1748        CastTarget::IntArray => cast_to_int_array(v),
1749        CastTarget::BigIntArray => cast_to_bigint_array(v),
1750        // v7.12.0 — `::tsvector` / `::tsquery`. Decodes PG external
1751        // form when input is Text; passes through unchanged when the
1752        // input is already the target type. Other inputs are a type
1753        // mismatch. Lexer / Porter stemmer arrive in v7.12.1; the
1754        // external-form cast at v7.12.0 is the path pg_dump and
1755        // direct-literal callers use.
1756        CastTarget::TsVector => match v {
1757            Value::TsVector(items) => Ok(Value::TsVector(items)),
1758            Value::Text(s) => decode_tsvector_external(&s).map(Value::TsVector),
1759            other => Err(EvalError::TypeMismatch {
1760                detail: alloc::format!(
1761                    "::tsvector only accepts TEXT / tsvector inputs, got {:?}",
1762                    other.data_type()
1763                ),
1764            }),
1765        },
1766        CastTarget::TsQuery => match v {
1767            Value::TsQuery(ast) => Ok(Value::TsQuery(ast)),
1768            Value::Text(s) => decode_tsquery_external(&s).map(Value::TsQuery),
1769            other => Err(EvalError::TypeMismatch {
1770                detail: alloc::format!(
1771                    "::tsquery only accepts TEXT / tsquery inputs, got {:?}",
1772                    other.data_type()
1773                ),
1774            }),
1775        },
1776    }
1777}
1778
1779fn cast_to_int_array(v: Value) -> Result<Value, EvalError> {
1780    match v {
1781        Value::IntArray(items) => Ok(Value::IntArray(items)),
1782        Value::BigIntArray(items) => {
1783            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1784            for item in items {
1785                match item {
1786                    None => out.push(None),
1787                    Some(n) => match i32::try_from(n) {
1788                        Ok(x) => out.push(Some(x)),
1789                        Err(_) => {
1790                            return Err(EvalError::TypeMismatch {
1791                                detail: alloc::format!("::INT[] element {n} overflows i32"),
1792                            });
1793                        }
1794                    },
1795                }
1796            }
1797            Ok(Value::IntArray(out))
1798        }
1799        Value::Text(s) => decode_int_array_external(&s).map(Value::IntArray),
1800        Value::TextArray(items) => {
1801            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1802            for item in items {
1803                match item {
1804                    None => out.push(None),
1805                    Some(s) => match s.parse::<i32>() {
1806                        Ok(n) => out.push(Some(n)),
1807                        Err(_) => {
1808                            return Err(EvalError::TypeMismatch {
1809                                detail: alloc::format!("::INT[] cannot parse {s:?}"),
1810                            });
1811                        }
1812                    },
1813                }
1814            }
1815            Ok(Value::IntArray(out))
1816        }
1817        other => Err(EvalError::TypeMismatch {
1818            detail: alloc::format!("::INT[] does not accept {:?}", other.data_type()),
1819        }),
1820    }
1821}
1822
1823fn cast_to_bigint_array(v: Value) -> Result<Value, EvalError> {
1824    match v {
1825        Value::BigIntArray(items) => Ok(Value::BigIntArray(items)),
1826        Value::IntArray(items) => Ok(Value::BigIntArray(
1827            items.into_iter().map(|x| x.map(i64::from)).collect(),
1828        )),
1829        Value::Text(s) => decode_bigint_array_external(&s).map(Value::BigIntArray),
1830        Value::TextArray(items) => {
1831            let mut out: Vec<Option<i64>> = Vec::with_capacity(items.len());
1832            for item in items {
1833                match item {
1834                    None => out.push(None),
1835                    Some(s) => match s.parse::<i64>() {
1836                        Ok(n) => out.push(Some(n)),
1837                        Err(_) => {
1838                            return Err(EvalError::TypeMismatch {
1839                                detail: alloc::format!("::BIGINT[] cannot parse {s:?}"),
1840                            });
1841                        }
1842                    },
1843                }
1844            }
1845            Ok(Value::BigIntArray(out))
1846        }
1847        other => Err(EvalError::TypeMismatch {
1848            detail: alloc::format!("::BIGINT[] does not accept {:?}", other.data_type()),
1849        }),
1850    }
1851}
1852
1853fn decode_int_array_external(s: &str) -> Result<Vec<Option<i32>>, EvalError> {
1854    let trimmed = s.trim();
1855    let inner = trimmed
1856        .strip_prefix('{')
1857        .and_then(|x| x.strip_suffix('}'))
1858        .ok_or_else(|| EvalError::TypeMismatch {
1859            detail: alloc::format!("INT[] literal {s:?} must be enclosed in '{{...}}'"),
1860        })?;
1861    if inner.trim().is_empty() {
1862        return Ok(Vec::new());
1863    }
1864    inner
1865        .split(',')
1866        .map(|part| {
1867            let p = part.trim();
1868            if p.eq_ignore_ascii_case("NULL") {
1869                Ok(None)
1870            } else {
1871                p.parse::<i32>()
1872                    .map(Some)
1873                    .map_err(|_| EvalError::TypeMismatch {
1874                        detail: alloc::format!("INT[] element {p:?} is not an i32"),
1875                    })
1876            }
1877        })
1878        .collect()
1879}
1880
1881fn decode_bigint_array_external(s: &str) -> Result<Vec<Option<i64>>, EvalError> {
1882    let trimmed = s.trim();
1883    let inner = trimmed
1884        .strip_prefix('{')
1885        .and_then(|x| x.strip_suffix('}'))
1886        .ok_or_else(|| EvalError::TypeMismatch {
1887            detail: alloc::format!("BIGINT[] literal {s:?} must be enclosed in '{{...}}'"),
1888        })?;
1889    if inner.trim().is_empty() {
1890        return Ok(Vec::new());
1891    }
1892    inner
1893        .split(',')
1894        .map(|part| {
1895            let p = part.trim();
1896            if p.eq_ignore_ascii_case("NULL") {
1897                Ok(None)
1898            } else {
1899                p.parse::<i64>()
1900                    .map(Some)
1901                    .map_err(|_| EvalError::TypeMismatch {
1902                        detail: alloc::format!("BIGINT[] element {p:?} is not an i64"),
1903                    })
1904            }
1905        })
1906        .collect()
1907}
1908
1909/// v7.10.11 — same decoder as `decode_text_array_literal` in
1910/// `lib.rs`, but lives here so the eval-time cast path stays
1911/// inside `spg-engine::eval`. Kept in lock-step with the engine
1912/// `coerce_value` decoder by tests.
1913fn decode_text_array_external(s: &str) -> Result<Vec<Option<String>>, EvalError> {
1914    let trimmed = s.trim();
1915    let inner = trimmed
1916        .strip_prefix('{')
1917        .and_then(|x| x.strip_suffix('}'))
1918        .ok_or_else(|| EvalError::TypeMismatch {
1919            detail: alloc::format!("TEXT[] literal {s:?} must be enclosed in '{{...}}'"),
1920        })?;
1921    let mut out: Vec<Option<String>> = Vec::new();
1922    if inner.trim().is_empty() {
1923        return Ok(out);
1924    }
1925    let bytes = inner.as_bytes();
1926    let mut i = 0;
1927    while i <= bytes.len() {
1928        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1929            i += 1;
1930        }
1931        if i < bytes.len() && bytes[i] == b'"' {
1932            i += 1;
1933            let mut buf = String::new();
1934            while i < bytes.len() && bytes[i] != b'"' {
1935                if bytes[i] == b'\\' && i + 1 < bytes.len() {
1936                    buf.push(bytes[i + 1] as char);
1937                    i += 2;
1938                } else {
1939                    buf.push(bytes[i] as char);
1940                    i += 1;
1941                }
1942            }
1943            if i >= bytes.len() {
1944                return Err(EvalError::TypeMismatch {
1945                    detail: "unterminated quoted element in TEXT[] literal".into(),
1946                });
1947            }
1948            i += 1;
1949            out.push(Some(buf));
1950        } else {
1951            let start = i;
1952            while i < bytes.len() && bytes[i] != b',' {
1953                i += 1;
1954            }
1955            let raw = inner[start..i].trim();
1956            if raw.eq_ignore_ascii_case("NULL") {
1957                out.push(None);
1958            } else {
1959                out.push(Some(raw.to_string()));
1960            }
1961        }
1962        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1963            i += 1;
1964        }
1965        if i >= bytes.len() {
1966            break;
1967        }
1968        if bytes[i] != b',' {
1969            return Err(EvalError::TypeMismatch {
1970                detail: "expected ',' between TEXT[] elements".into(),
1971            });
1972        }
1973        i += 1;
1974    }
1975    Ok(out)
1976}
1977
1978fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
1979    match v {
1980        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
1981        Value::Text(s) => {
1982            let (months, micros) = spg_sql::parser::parse_interval_text(&s).ok_or_else(|| {
1983                EvalError::TypeMismatch {
1984                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
1985                }
1986            })?;
1987            Ok(Value::Interval { months, micros })
1988        }
1989        other => Err(EvalError::TypeMismatch {
1990            detail: alloc::format!(
1991                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
1992                other.data_type()
1993            ),
1994        }),
1995    }
1996}
1997
1998fn cast_to_date(v: Value) -> Result<Value, EvalError> {
1999    match v {
2000        Value::Date(d) => Ok(Value::Date(d)),
2001        // Integer literals carry days since the Unix epoch — used by
2002        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
2003        Value::Int(n) => Ok(Value::Date(n)),
2004        Value::BigInt(n) => {
2005            i32::try_from(n)
2006                .map(Value::Date)
2007                .map_err(|_| EvalError::TypeMismatch {
2008                    detail: "bigint days-since-epoch out of DATE range".into(),
2009                })
2010        }
2011        // Timestamp truncates to its day boundary.
2012        Value::Timestamp(t) => {
2013            let days = t.div_euclid(86_400_000_000);
2014            i32::try_from(days)
2015                .map(Value::Date)
2016                .map_err(|_| EvalError::TypeMismatch {
2017                    detail: "timestamp out of DATE range".into(),
2018                })
2019        }
2020        Value::Text(s) => parse_date_literal(&s)
2021            .map(Value::Date)
2022            .ok_or(EvalError::TypeMismatch {
2023                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
2024            }),
2025        other => Err(EvalError::TypeMismatch {
2026            detail: format!("cannot cast {:?} to DATE", other.data_type()),
2027        }),
2028    }
2029}
2030
2031fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
2032    match v {
2033        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
2034        // Int / BigInt carry microseconds since the Unix epoch — used
2035        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
2036        // the wall clock as a plain integer literal.
2037        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
2038        Value::BigInt(n) => Ok(Value::Timestamp(n)),
2039        // DATE → TIMESTAMP picks midnight on the date.
2040        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
2041        Value::Text(s) => {
2042            parse_timestamp_literal(&s)
2043                .map(Value::Timestamp)
2044                .ok_or(EvalError::TypeMismatch {
2045                    detail: format!(
2046                        "cannot parse {s:?} as TIMESTAMP \
2047                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
2048                    ),
2049                })
2050        }
2051        other => Err(EvalError::TypeMismatch {
2052            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
2053        }),
2054    }
2055}
2056
2057fn value_to_text(v: &Value) -> String {
2058    match v {
2059        // v7.5.0 — Value is #[non_exhaustive]; any future variant
2060        // without explicit text rendering hits the Debug fallback
2061        // at the end.
2062        Value::SmallInt(n) => format!("{n}"),
2063        Value::Int(n) => format!("{n}"),
2064        Value::BigInt(n) => format!("{n}"),
2065        Value::Float(x) => format!("{x}"),
2066        // v4.9: JSON renders identically to Text — both are raw UTF-8.
2067        Value::Text(s) | Value::Json(s) => s.clone(),
2068        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
2069        Value::Vector(v) => {
2070            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
2071            format!("[{}]", cells.join(", "))
2072        }
2073        // v6.0.1: render SQ8 cells dequantised, so SELECT output
2074        // matches the pgvector wire shape clients expect. The
2075        // recall envelope already absorbs the ≤ (max-min)/255/2
2076        // dequantisation error.
2077        Value::Sq8Vector(q) => {
2078            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
2079                .iter()
2080                .map(|x| format!("{x}"))
2081                .collect();
2082            format!("[{}]", cells.join(", "))
2083        }
2084        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
2085        // for SELECT output.
2086        Value::HalfVector(h) => {
2087            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
2088            format!("[{}]", cells.join(", "))
2089        }
2090        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
2091        Value::Date(d) => format_date(*d),
2092        Value::Timestamp(t) => format_timestamp(*t),
2093        Value::Interval { months, micros } => format_interval(*months, *micros),
2094        Value::Null => "NULL".into(),
2095        // v7.10.4 — BYTEA renders as PG hex form.
2096        Value::Bytes(b) => format_bytea_hex(b),
2097        // v7.10.9 — TEXT[] / INT[] / BIGINT[] render PG external form.
2098        Value::TextArray(items) => format_text_array(items),
2099        Value::IntArray(items) => format_int_array(items),
2100        Value::BigIntArray(items) => format_bigint_array(items),
2101        // v7.12.0 — tsvector / tsquery render PG external form.
2102        Value::TsVector(lexs) => format_tsvector(lexs),
2103        Value::TsQuery(ast) => format_tsquery(ast),
2104        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
2105        _ => format!("{v:?}"),
2106    }
2107}
2108
2109/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
2110/// for pre-1970 dates render with a leading `-` on the year.
2111pub fn format_date(days: i32) -> String {
2112    let (y, m, d) = civil_from_days(days);
2113    format!("{y:04}-{m:02}-{d:02}")
2114}
2115
2116/// Render a `Timestamp` (microseconds since epoch) as
2117/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
2118/// dropped; a whole-second value has no fractional part.
2119pub fn format_timestamp(micros: i64) -> String {
2120    const MICROS_PER_DAY: i64 = 86_400_000_000;
2121    // Split into day + intra-day part with proper floor division so
2122    // negative timestamps render right too.
2123    let days = micros.div_euclid(MICROS_PER_DAY);
2124    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
2125    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
2126    let (y, m, d) = civil_from_days(day_i32);
2127    let secs = day_micros / 1_000_000;
2128    let frac = day_micros % 1_000_000;
2129    let hh = secs / 3600;
2130    let mm = (secs / 60) % 60;
2131    let ss = secs % 60;
2132    if frac == 0 {
2133        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
2134    } else {
2135        // Strip trailing zeros from the 6-digit fractional component.
2136        let raw = format!("{frac:06}");
2137        let trimmed = raw.trim_end_matches('0');
2138        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
2139    }
2140}
2141
2142/// Howard Hinnant's `civil_from_days` — converts days since the Unix
2143/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
2144/// directions of this calendar conversion live in `eval.rs` so the
2145/// engine never reaches for `std` time facilities.
2146#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2147fn civil_from_days(days: i32) -> (i32, u32, u32) {
2148    let z = i64::from(days) + 719_468;
2149    let era = z.div_euclid(146_097);
2150    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
2151    // every other quantity below — `as u32` truncations are safe by
2152    // construction.
2153    let doe = (z - era * 146_097) as u32;
2154    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
2155    let y_base = i64::from(yoe) + era * 400;
2156    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
2157    let mp = (5 * doy + 2) / 153;
2158    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
2159    let m = if mp < 10 { mp + 3 } else { mp - 9 };
2160    let y = if m <= 2 { y_base + 1 } else { y_base };
2161    (y as i32, m, d)
2162}
2163
2164/// Inverse of `civil_from_days` — converts (year, month, day) to days
2165/// since 1970-01-01. Out-of-range months / days saturate.
2166#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2167pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
2168    let y_adj = if m <= 2 {
2169        i64::from(y) - 1
2170    } else {
2171        i64::from(y)
2172    };
2173    let era = y_adj.div_euclid(400);
2174    let yoe = (y_adj - era * 400) as u32;
2175    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
2176    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2177    let total = era * 146_097 + i64::from(doe) - 719_468;
2178    i32::try_from(total).unwrap_or(i32::MAX)
2179}
2180
2181/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
2182/// `None` on shape / numeric failure; the engine surfaces that as a
2183/// `TypeMismatch` with the original text included.
2184pub fn parse_date_literal(s: &str) -> Option<i32> {
2185    let bytes = s.as_bytes();
2186    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
2187        return None;
2188    }
2189    let y: i32 = s[0..4].parse().ok()?;
2190    let m: u32 = s[5..7].parse().ok()?;
2191    let d: u32 = s[8..10].parse().ok()?;
2192    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
2193        return None;
2194    }
2195    Some(days_from_civil(y, m, d))
2196}
2197
2198/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
2199/// (microseconds since Unix epoch). The time portion is optional;
2200/// missing → midnight. The fractional portion accepts 1–6 digits and
2201/// pads with zeros to microseconds.
2202pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
2203    let trimmed = s.trim();
2204    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
2205        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
2206        None => (trimmed, None),
2207    };
2208    let days = parse_date_literal(date_part)?;
2209    let day_micros = match time_part {
2210        None => 0,
2211        Some(t) => parse_time_of_day_micros(t)?,
2212    };
2213    Some(i64::from(days) * 86_400_000_000 + day_micros)
2214}
2215
2216fn parse_time_of_day_micros(t: &str) -> Option<i64> {
2217    let (time, frac_str) = match t.split_once('.') {
2218        Some((a, b)) => (a, Some(b)),
2219        None => (t, None),
2220    };
2221    let bytes = time.as_bytes();
2222    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
2223        return None;
2224    }
2225    let hh: i64 = time[0..2].parse().ok()?;
2226    let mm: i64 = time[3..5].parse().ok()?;
2227    let ss: i64 = time[6..8].parse().ok()?;
2228    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
2229        return None;
2230    }
2231    let frac_micros: i64 = match frac_str {
2232        None => 0,
2233        Some(f) => {
2234            // Pad right with zeros to 6 digits, then truncate extras.
2235            if f.is_empty() || f.len() > 9 {
2236                return None;
2237            }
2238            let mut padded = String::with_capacity(6);
2239            padded.push_str(&f[..f.len().min(6)]);
2240            while padded.len() < 6 {
2241                padded.push('0');
2242            }
2243            padded.parse().ok()?
2244        }
2245    };
2246    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
2247}
2248
2249/// Render an `Interval { months, micros }` in a PG-ish shape. The output
2250/// mirrors `psql`'s text format: years/months from the months part,
2251/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
2252/// omitted; an all-zero interval renders as `0`.
2253pub fn format_interval(months: i32, micros: i64) -> String {
2254    const MICROS_PER_DAY: i64 = 86_400_000_000;
2255    let mut parts: Vec<String> = Vec::new();
2256    let years = months / 12;
2257    let mons = months % 12;
2258    // PG renders the unit in the singular only for `+1`; `-1` and any
2259    // other value pluralise. Helper closes over that rule.
2260    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
2261        if n == 1 { singular } else { plural }
2262    };
2263    if years != 0 {
2264        parts.push(format!(
2265            "{years} {}",
2266            unit(i64::from(years), "year", "years")
2267        ));
2268    }
2269    if mons != 0 {
2270        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
2271    }
2272    let days = micros / MICROS_PER_DAY;
2273    let mut rem = micros % MICROS_PER_DAY;
2274    if days != 0 {
2275        parts.push(format!("{days} {}", unit(days, "day", "days")));
2276    }
2277    if rem != 0 {
2278        let neg = rem < 0;
2279        if neg {
2280            rem = -rem;
2281        }
2282        let secs = rem / 1_000_000;
2283        let frac = rem % 1_000_000;
2284        let hh = secs / 3600;
2285        let mm = (secs / 60) % 60;
2286        let ss = secs % 60;
2287        let sign = if neg { "-" } else { "" };
2288        if frac == 0 {
2289            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
2290        } else {
2291            let raw = format!("{frac:06}");
2292            let trimmed = raw.trim_end_matches('0');
2293            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
2294        }
2295    }
2296    if parts.is_empty() {
2297        "0".into()
2298    } else {
2299        parts.join(" ")
2300    }
2301}
2302
2303/// Add `months` (signed) to a `(year, month, day)` triple using PG's
2304/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
2305fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
2306    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
2307    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
2308    let new_month_zero = total_months.rem_euclid(12);
2309    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2310    let new_month = (new_month_zero as u32) + 1;
2311    let max_day = days_in_month(new_year, new_month);
2312    (new_year, new_month, d.min(max_day))
2313}
2314
2315const fn days_in_month(y: i32, m: u32) -> u32 {
2316    match m {
2317        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
2318        2 => {
2319            // Proleptic Gregorian leap rule.
2320            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
2321                29
2322            } else {
2323                28
2324            }
2325        }
2326        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
2327        // first, but be defensive) get the 30-day fallback.
2328        _ => 30,
2329    }
2330}
2331
2332/// v7.10.9 — render a TEXT[] in PG's external array form
2333/// (`{a,b,NULL}`). Elements containing whitespace, commas,
2334/// quotes, or braces get double-quoted with `\\` / `\"` escapes.
2335/// NULL elements use the literal token `NULL`. Public so the
2336/// wire layer can produce the canonical text-mode encoding.
2337pub fn format_text_array(items: &[Option<String>]) -> String {
2338    let mut out = String::with_capacity(2 + items.len() * 8);
2339    out.push('{');
2340    for (i, item) in items.iter().enumerate() {
2341        if i > 0 {
2342            out.push(',');
2343        }
2344        match item {
2345            None => out.push_str("NULL"),
2346            Some(s) => {
2347                let needs_quote = s.is_empty()
2348                    || s.eq_ignore_ascii_case("NULL")
2349                    || s.chars()
2350                        .any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
2351                if needs_quote {
2352                    out.push('"');
2353                    for c in s.chars() {
2354                        if c == '"' || c == '\\' {
2355                            out.push('\\');
2356                        }
2357                        out.push(c);
2358                    }
2359                    out.push('"');
2360                } else {
2361                    out.push_str(s);
2362                }
2363            }
2364        }
2365    }
2366    out.push('}');
2367    out
2368}
2369
2370/// v7.11.14 — render an INT[] in PG's external array form
2371/// (`{1,2,NULL}`). Integer payloads never need quoting. NULL
2372/// elements use the literal token `NULL`.
2373pub fn format_int_array(items: &[Option<i32>]) -> String {
2374    let mut out = String::with_capacity(2 + items.len() * 4);
2375    out.push('{');
2376    for (i, item) in items.iter().enumerate() {
2377        if i > 0 {
2378            out.push(',');
2379        }
2380        match item {
2381            None => out.push_str("NULL"),
2382            Some(n) => out.push_str(&n.to_string()),
2383        }
2384    }
2385    out.push('}');
2386    out
2387}
2388
2389/// v7.11.14 — render a BIGINT[] in PG's external array form
2390/// (`{1,2,NULL}`).
2391pub fn format_bigint_array(items: &[Option<i64>]) -> String {
2392    let mut out = String::with_capacity(2 + items.len() * 6);
2393    out.push('{');
2394    for (i, item) in items.iter().enumerate() {
2395        if i > 0 {
2396            out.push(',');
2397        }
2398        match item {
2399            None => out.push_str("NULL"),
2400            Some(n) => out.push_str(&n.to_string()),
2401        }
2402    }
2403    out.push('}');
2404    out
2405}
2406
2407/// v7.12.0 — render a `tsvector` in PG's external form:
2408/// `'lex':1,2A 'word':3` (single-quoted lexemes, optional
2409/// `:positions`, optional weight letter `A/B/C/D` per position).
2410/// Lexemes already arrive sorted + deduped from the engine. Used
2411/// by the wire layer (OID 3614) and by SELECT-text output.
2412pub fn format_tsvector(lexs: &[TsLexeme]) -> String {
2413    let mut out = String::with_capacity(lexs.len() * 12);
2414    for (i, l) in lexs.iter().enumerate() {
2415        if i > 0 {
2416            out.push(' ');
2417        }
2418        out.push('\'');
2419        for c in l.word.chars() {
2420            if c == '\'' {
2421                out.push('\'');
2422            }
2423            out.push(c);
2424        }
2425        out.push('\'');
2426        if !l.positions.is_empty() {
2427            for (pi, p) in l.positions.iter().enumerate() {
2428                out.push(if pi == 0 { ':' } else { ',' });
2429                out.push_str(&p.to_string());
2430            }
2431            // v7.12.0 — weight is per-lexeme (the v7.12 design
2432            // collapses PG's per-position weight into one letter).
2433            // Emit once after the last position; default `D`
2434            // (weight=0) stays implicit.
2435            match l.weight {
2436                3 => out.push('A'),
2437                2 => out.push('B'),
2438                1 => out.push('C'),
2439                _ => {}
2440            }
2441        }
2442    }
2443    out
2444}
2445
2446/// v7.12.0 — render a `tsquery` in PG's external form. Operator
2447/// precedence: `!` > `&` > `|`. Phrase distance shown as `<N>`.
2448pub fn format_tsquery(ast: &TsQueryAst) -> String {
2449    fn go(ast: &TsQueryAst, parent_prec: u8, out: &mut String) {
2450        // 0 = top, 1 = OR, 2 = AND, 3 = NOT/Phrase, 4 = atom.
2451        let (own_prec, write_self): (u8, &dyn Fn(&mut String)) = match ast {
2452            TsQueryAst::Or(_, _) => (1, &|_| {}),
2453            TsQueryAst::And(_, _) | TsQueryAst::Phrase { .. } => (2, &|_| {}),
2454            TsQueryAst::Not(_) => (3, &|_| {}),
2455            TsQueryAst::Term { .. } => (4, &|_| {}),
2456        };
2457        let need_parens = own_prec < parent_prec;
2458        if need_parens {
2459            out.push('(');
2460        }
2461        match ast {
2462            TsQueryAst::Term { word, .. } => {
2463                out.push('\'');
2464                for c in word.chars() {
2465                    if c == '\'' {
2466                        out.push('\'');
2467                    }
2468                    out.push(c);
2469                }
2470                out.push('\'');
2471            }
2472            TsQueryAst::And(a, b) => {
2473                go(a, own_prec, out);
2474                out.push_str(" & ");
2475                go(b, own_prec, out);
2476            }
2477            TsQueryAst::Or(a, b) => {
2478                go(a, own_prec, out);
2479                out.push_str(" | ");
2480                go(b, own_prec, out);
2481            }
2482            TsQueryAst::Not(x) => {
2483                out.push('!');
2484                go(x, own_prec, out);
2485            }
2486            TsQueryAst::Phrase {
2487                left,
2488                right,
2489                distance,
2490            } => {
2491                go(left, own_prec, out);
2492                out.push_str(&alloc::format!(" <{distance}> "));
2493                go(right, own_prec, out);
2494            }
2495        }
2496        write_self(out);
2497        if need_parens {
2498            out.push(')');
2499        }
2500    }
2501    let mut out = String::new();
2502    go(ast, 0, &mut out);
2503    out
2504}
2505
2506/// v7.12.0 — decode PG external form `'word':1,2A 'other':3` into
2507/// a `Vec<TsLexeme>`. Lexemes are sorted ascending by `word` (with
2508/// duplicates merged on positions) so the output matches the
2509/// engine invariant. Empty input yields an empty vector.
2510///
2511/// v7.12.0 only ships the cast-literal entry. Full `to_tsvector`
2512/// (Unicode word-split + Porter stemming + stopwords) lands in
2513/// v7.12.1.
2514pub fn decode_tsvector_external(s: &str) -> Result<Vec<TsLexeme>, EvalError> {
2515    let mut out: Vec<TsLexeme> = Vec::new();
2516    let mut i = 0;
2517    let bytes = s.as_bytes();
2518    while i < bytes.len() {
2519        while i < bytes.len() && bytes[i].is_ascii_whitespace() {
2520            i += 1;
2521        }
2522        if i >= bytes.len() {
2523            break;
2524        }
2525        // Quoted form `'word'` (with embedded `''` for a literal
2526        // single quote, mirroring PG).
2527        let word = if bytes[i] == b'\'' {
2528            i += 1;
2529            let mut w = String::new();
2530            loop {
2531                if i >= bytes.len() {
2532                    return Err(EvalError::TypeMismatch {
2533                        detail: "tsvector literal: unterminated quoted lexeme".into(),
2534                    });
2535                }
2536                let b = bytes[i];
2537                if b == b'\'' {
2538                    if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
2539                        w.push('\'');
2540                        i += 2;
2541                    } else {
2542                        i += 1;
2543                        break;
2544                    }
2545                } else {
2546                    w.push(b as char);
2547                    i += 1;
2548                }
2549            }
2550            w
2551        } else {
2552            // Bare form — read until whitespace, ':' or end.
2553            let start = i;
2554            while i < bytes.len() && !bytes[i].is_ascii_whitespace() && bytes[i] != b':' {
2555                i += 1;
2556            }
2557            core::str::from_utf8(&bytes[start..i])
2558                .map_err(|_| EvalError::TypeMismatch {
2559                    detail: "tsvector literal: non-UTF-8 lexeme".into(),
2560                })?
2561                .to_string()
2562        };
2563        if word.is_empty() {
2564            return Err(EvalError::TypeMismatch {
2565                detail: "tsvector literal: empty lexeme".into(),
2566            });
2567        }
2568        // Optional `:pos[,pos][,pos]`. Each position is u16; each
2569        // may carry a trailing weight letter A/B/C/D.
2570        let mut positions: Vec<u16> = Vec::new();
2571        let mut weight: u8 = 0;
2572        if i < bytes.len() && bytes[i] == b':' {
2573            i += 1;
2574            loop {
2575                let start = i;
2576                while i < bytes.len() && bytes[i].is_ascii_digit() {
2577                    i += 1;
2578                }
2579                if start == i {
2580                    return Err(EvalError::TypeMismatch {
2581                        detail: "tsvector literal: expected digit after ':'".into(),
2582                    });
2583                }
2584                let num: u16 = core::str::from_utf8(&bytes[start..i])
2585                    .expect("ascii digits")
2586                    .parse()
2587                    .map_err(|_| EvalError::TypeMismatch {
2588                        detail: alloc::format!(
2589                            "tsvector literal: position {} overflows u16",
2590                            core::str::from_utf8(&bytes[start..i]).unwrap_or("?")
2591                        ),
2592                    })?;
2593                positions.push(num);
2594                if i < bytes.len() {
2595                    let w = bytes[i];
2596                    if matches!(w, b'A' | b'B' | b'C' | b'D') {
2597                        weight = match w {
2598                            b'A' => 3,
2599                            b'B' => 2,
2600                            b'C' => 1,
2601                            _ => 0,
2602                        };
2603                        i += 1;
2604                    }
2605                }
2606                if i < bytes.len() && bytes[i] == b',' {
2607                    i += 1;
2608                    continue;
2609                }
2610                break;
2611            }
2612        }
2613        positions.sort_unstable();
2614        positions.dedup();
2615        // Merge into the output vector — sorted insert by word,
2616        // duplicate words merge positions.
2617        match out.binary_search_by(|l| l.word.as_str().cmp(word.as_str())) {
2618            Ok(idx) => {
2619                for p in positions {
2620                    if !out[idx].positions.contains(&p) {
2621                        out[idx].positions.push(p);
2622                    }
2623                }
2624                out[idx].positions.sort_unstable();
2625                if weight != 0 {
2626                    out[idx].weight = weight;
2627                }
2628            }
2629            Err(idx) => {
2630                out.insert(
2631                    idx,
2632                    TsLexeme {
2633                        word,
2634                        positions,
2635                        weight,
2636                    },
2637                );
2638            }
2639        }
2640    }
2641    Ok(out)
2642}
2643
2644/// v7.12.0 — decode PG external form `'foo' & 'bar' | !'baz'`
2645/// into a `TsQueryAst`. v7.12.0 supports the canonical
2646/// `to_tsquery` surface: single-quoted lexemes, `&` / `|` / `!`,
2647/// parens, and phrase `<N>`. Bare lexemes are accepted too. Full
2648/// `plainto_tsquery` / `websearch_to_tsquery` arrive in v7.12.1.
2649pub fn decode_tsquery_external(s: &str) -> Result<TsQueryAst, EvalError> {
2650    let mut p = TsQueryParser {
2651        bytes: s.as_bytes(),
2652        pos: 0,
2653    };
2654    p.skip_ws();
2655    if p.pos >= p.bytes.len() {
2656        return Err(EvalError::TypeMismatch {
2657            detail: "tsquery literal: empty".into(),
2658        });
2659    }
2660    let ast = p.parse_or()?;
2661    p.skip_ws();
2662    if p.pos < p.bytes.len() {
2663        return Err(EvalError::TypeMismatch {
2664            detail: alloc::format!("tsquery literal: trailing garbage at offset {}", p.pos),
2665        });
2666    }
2667    Ok(ast)
2668}
2669
2670struct TsQueryParser<'a> {
2671    bytes: &'a [u8],
2672    pos: usize,
2673}
2674
2675impl<'a> TsQueryParser<'a> {
2676    fn skip_ws(&mut self) {
2677        while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_whitespace() {
2678            self.pos += 1;
2679        }
2680    }
2681    fn peek(&self) -> Option<u8> {
2682        self.bytes.get(self.pos).copied()
2683    }
2684    fn parse_or(&mut self) -> Result<TsQueryAst, EvalError> {
2685        let mut lhs = self.parse_and()?;
2686        loop {
2687            self.skip_ws();
2688            if self.peek() != Some(b'|') {
2689                return Ok(lhs);
2690            }
2691            self.pos += 1;
2692            let rhs = self.parse_and()?;
2693            lhs = TsQueryAst::Or(Box::new(lhs), Box::new(rhs));
2694        }
2695    }
2696    fn parse_and(&mut self) -> Result<TsQueryAst, EvalError> {
2697        let mut lhs = self.parse_unary()?;
2698        loop {
2699            self.skip_ws();
2700            match self.peek() {
2701                Some(b'&') => {
2702                    self.pos += 1;
2703                    let rhs = self.parse_unary()?;
2704                    lhs = TsQueryAst::And(Box::new(lhs), Box::new(rhs));
2705                }
2706                Some(b'<') => {
2707                    // Phrase distance `<N>`.
2708                    self.pos += 1;
2709                    let start = self.pos;
2710                    while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_digit() {
2711                        self.pos += 1;
2712                    }
2713                    if start == self.pos || self.peek() != Some(b'>') {
2714                        return Err(EvalError::TypeMismatch {
2715                            detail: "tsquery literal: malformed <N> phrase operator".into(),
2716                        });
2717                    }
2718                    let n: u16 = core::str::from_utf8(&self.bytes[start..self.pos])
2719                        .expect("ascii digits")
2720                        .parse()
2721                        .map_err(|_| EvalError::TypeMismatch {
2722                            detail: "tsquery literal: phrase distance overflows u16".into(),
2723                        })?;
2724                    self.pos += 1; // consume '>'
2725                    let rhs = self.parse_unary()?;
2726                    lhs = TsQueryAst::Phrase {
2727                        left: Box::new(lhs),
2728                        right: Box::new(rhs),
2729                        distance: n,
2730                    };
2731                }
2732                _ => return Ok(lhs),
2733            }
2734        }
2735    }
2736    fn parse_unary(&mut self) -> Result<TsQueryAst, EvalError> {
2737        self.skip_ws();
2738        if self.peek() == Some(b'!') {
2739            self.pos += 1;
2740            let inner = self.parse_unary()?;
2741            return Ok(TsQueryAst::Not(Box::new(inner)));
2742        }
2743        self.parse_atom()
2744    }
2745    fn parse_atom(&mut self) -> Result<TsQueryAst, EvalError> {
2746        self.skip_ws();
2747        match self.peek() {
2748            Some(b'(') => {
2749                self.pos += 1;
2750                let inner = self.parse_or()?;
2751                self.skip_ws();
2752                if self.peek() != Some(b')') {
2753                    return Err(EvalError::TypeMismatch {
2754                        detail: "tsquery literal: missing ')'".into(),
2755                    });
2756                }
2757                self.pos += 1;
2758                Ok(inner)
2759            }
2760            Some(b'\'') => {
2761                self.pos += 1;
2762                let mut w = String::new();
2763                loop {
2764                    match self.peek() {
2765                        None => {
2766                            return Err(EvalError::TypeMismatch {
2767                                detail: "tsquery literal: unterminated quoted lexeme".into(),
2768                            });
2769                        }
2770                        Some(b'\'') => {
2771                            if self.bytes.get(self.pos + 1) == Some(&b'\'') {
2772                                w.push('\'');
2773                                self.pos += 2;
2774                            } else {
2775                                self.pos += 1;
2776                                break;
2777                            }
2778                        }
2779                        Some(b) => {
2780                            w.push(b as char);
2781                            self.pos += 1;
2782                        }
2783                    }
2784                }
2785                // Optional `:WEIGHT_MASK` (digit-mask) — v7.12.0
2786                // accepts but always stores 0 (any).
2787                self.skip_weight_suffix();
2788                Ok(TsQueryAst::Term {
2789                    word: w,
2790                    weight_mask: 0,
2791                })
2792            }
2793            Some(b) if b.is_ascii_alphanumeric() || b == b'_' => {
2794                let start = self.pos;
2795                while self.pos < self.bytes.len() {
2796                    let c = self.bytes[self.pos];
2797                    if c.is_ascii_alphanumeric() || c == b'_' {
2798                        self.pos += 1;
2799                    } else {
2800                        break;
2801                    }
2802                }
2803                let w = core::str::from_utf8(&self.bytes[start..self.pos])
2804                    .map_err(|_| EvalError::TypeMismatch {
2805                        detail: "tsquery literal: non-UTF-8 lexeme".into(),
2806                    })?
2807                    .to_string();
2808                self.skip_weight_suffix();
2809                Ok(TsQueryAst::Term {
2810                    word: w,
2811                    weight_mask: 0,
2812                })
2813            }
2814            Some(b) => Err(EvalError::TypeMismatch {
2815                detail: alloc::format!(
2816                    "tsquery literal: unexpected byte {:?} at offset {}",
2817                    b as char,
2818                    self.pos
2819                ),
2820            }),
2821            None => Err(EvalError::TypeMismatch {
2822                detail: "tsquery literal: expected term".into(),
2823            }),
2824        }
2825    }
2826    fn skip_weight_suffix(&mut self) {
2827        if self.peek() != Some(b':') {
2828            return;
2829        }
2830        self.pos += 1;
2831        while let Some(b) = self.peek() {
2832            if matches!(
2833                b,
2834                b'A' | b'B' | b'C' | b'D' | b'a' | b'b' | b'c' | b'd' | b'*'
2835            ) || b.is_ascii_digit()
2836            {
2837                self.pos += 1;
2838            } else {
2839                break;
2840            }
2841        }
2842    }
2843}
2844
2845/// v7.10.4 — render a BYTEA payload in PG's hex output format
2846/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
2847/// can emit the canonical bytea-as-text representation.
2848pub fn format_bytea_hex(b: &[u8]) -> String {
2849    let mut out = String::with_capacity(2 + 2 * b.len());
2850    out.push_str("\\x");
2851    const HEX: &[u8; 16] = b"0123456789abcdef";
2852    for byte in b {
2853        out.push(HEX[(byte >> 4) as usize] as char);
2854        out.push(HEX[(byte & 0x0F) as usize] as char);
2855    }
2856    out
2857}
2858
2859/// Render a `Numeric { scaled, scale }` as its decimal text form.
2860/// Negative `scaled` prepends `-` to the absolute value's digits; the
2861/// integer / fractional split is by character count, padding the
2862/// fractional side with leading zeros to exactly `scale` chars.
2863pub fn format_numeric(scaled: i128, scale: u8) -> String {
2864    if scale == 0 {
2865        return format!("{scaled}");
2866    }
2867    let negative = scaled < 0;
2868    let mag_str = scaled.unsigned_abs().to_string();
2869    let mag_bytes = mag_str.as_bytes();
2870    let scale_u = scale as usize;
2871    let mut out = String::with_capacity(mag_str.len() + 3);
2872    if negative {
2873        out.push('-');
2874    }
2875    if mag_bytes.len() <= scale_u {
2876        out.push('0');
2877        out.push('.');
2878        for _ in mag_bytes.len()..scale_u {
2879            out.push('0');
2880        }
2881        out.push_str(&mag_str);
2882    } else {
2883        let split = mag_bytes.len() - scale_u;
2884        out.push_str(&mag_str[..split]);
2885        out.push('.');
2886        out.push_str(&mag_str[split..]);
2887    }
2888    out
2889}
2890
2891fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
2892    match v {
2893        Value::Int(n) => Ok(Value::Int(n)),
2894        Value::BigInt(n) => i32::try_from(n)
2895            .map(Value::Int)
2896            .map_err(|_| EvalError::TypeMismatch {
2897                detail: format!("bigint {n} does not fit in int"),
2898            }),
2899        #[allow(clippy::cast_possible_truncation)]
2900        Value::Float(x) => Ok(Value::Int(x as i32)),
2901        Value::Text(s) => {
2902            s.trim()
2903                .parse::<i32>()
2904                .map(Value::Int)
2905                .map_err(|_| EvalError::TypeMismatch {
2906                    detail: format!("cannot parse {s:?} as int"),
2907                })
2908        }
2909        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
2910        other => Err(EvalError::TypeMismatch {
2911            detail: format!("cannot cast {:?} to int", other.data_type()),
2912        }),
2913    }
2914}
2915
2916fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
2917    match v {
2918        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
2919        Value::BigInt(n) => Ok(Value::BigInt(n)),
2920        #[allow(clippy::cast_possible_truncation)]
2921        Value::Float(x) => Ok(Value::BigInt(x as i64)),
2922        Value::Text(s) => {
2923            s.trim()
2924                .parse::<i64>()
2925                .map(Value::BigInt)
2926                .map_err(|_| EvalError::TypeMismatch {
2927                    detail: format!("cannot parse {s:?} as bigint"),
2928                })
2929        }
2930        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
2931        other => Err(EvalError::TypeMismatch {
2932            detail: format!("cannot cast {:?} to bigint", other.data_type()),
2933        }),
2934    }
2935}
2936
2937fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
2938    match v {
2939        Value::Int(n) => Ok(Value::Float(f64::from(n))),
2940        #[allow(clippy::cast_precision_loss)]
2941        Value::BigInt(n) => Ok(Value::Float(n as f64)),
2942        Value::Float(x) => Ok(Value::Float(x)),
2943        Value::Text(s) => {
2944            s.trim()
2945                .parse::<f64>()
2946                .map(Value::Float)
2947                .map_err(|_| EvalError::TypeMismatch {
2948                    detail: format!("cannot parse {s:?} as float"),
2949                })
2950        }
2951        other => Err(EvalError::TypeMismatch {
2952            detail: format!("cannot cast {:?} to float", other.data_type()),
2953        }),
2954    }
2955}
2956
2957fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
2958    match v {
2959        Value::Bool(b) => Ok(Value::Bool(b)),
2960        Value::Int(n) => Ok(Value::Bool(n != 0)),
2961        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
2962        Value::Text(s) => {
2963            let lo = s.trim().to_ascii_lowercase();
2964            match lo.as_str() {
2965                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
2966                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
2967                _ => Err(EvalError::TypeMismatch {
2968                    detail: format!("cannot parse {s:?} as bool"),
2969                }),
2970            }
2971        }
2972        other => Err(EvalError::TypeMismatch {
2973            detail: format!("cannot cast {:?} to bool", other.data_type()),
2974        }),
2975    }
2976}
2977
2978/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
2979/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
2980pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
2981    match v {
2982        Value::Null => Ok(Value::Null),
2983        Value::Vector(v) => Ok(Value::Vector(v)),
2984        Value::Text(s) => parse_vector_text(&s)
2985            .map(Value::Vector)
2986            .ok_or(EvalError::TypeMismatch {
2987                detail: format!("cannot parse {s:?} as a vector literal"),
2988            }),
2989        other => Err(EvalError::TypeMismatch {
2990            detail: format!("::vector requires text input, got {:?}", other.data_type()),
2991        }),
2992    }
2993}
2994
2995/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
2996fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
2997    let trimmed = s.trim();
2998    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
2999    let trimmed_inner = inner.trim();
3000    if trimmed_inner.is_empty() {
3001        return Some(Vec::new());
3002    }
3003    let mut out = Vec::new();
3004    for part in trimmed_inner.split(',') {
3005        let f: f32 = part.trim().parse().ok()?;
3006        out.push(f);
3007    }
3008    Some(out)
3009}
3010
3011fn literal_to_value(l: &Literal) -> Value {
3012    match l {
3013        Literal::Integer(n) => {
3014            if let Ok(small) = i32::try_from(*n) {
3015                Value::Int(small)
3016            } else {
3017                Value::BigInt(*n)
3018            }
3019        }
3020        Literal::Float(x) => Value::Float(*x),
3021        Literal::String(s) => Value::Text(s.clone()),
3022        Literal::Vector(v) => Value::Vector(v.clone()),
3023        Literal::Bool(b) => Value::Bool(*b),
3024        Literal::Null => Value::Null,
3025        Literal::Interval { months, micros, .. } => Value::Interval {
3026            months: *months,
3027            micros: *micros,
3028        },
3029    }
3030}
3031
3032fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
3033    if let Some(q) = &c.qualifier {
3034        // Multi-table evaluation (joins): the synthesised schema uses
3035        // composite column names "alias.column" so we look that up
3036        // directly. Falls back to the single-table case below if the
3037        // composite isn't present.
3038        let composite = alloc::format!("{q}.{name}", name = c.name);
3039        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
3040            return Ok(row.values[pos].clone());
3041        }
3042        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
3043            qualifier: q.clone(),
3044        })?;
3045        if q != expected {
3046            return Err(EvalError::UnknownQualifier {
3047                qualifier: q.clone(),
3048            });
3049        }
3050    }
3051    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
3052        return Ok(row.values[pos].clone());
3053    }
3054    // Bare-name fallback for joined schemas: match any single composite
3055    // column ending in ".<name>"; ambiguity is an error.
3056    let suffix = alloc::format!(".{name}", name = c.name);
3057    let mut matches = ctx
3058        .columns
3059        .iter()
3060        .enumerate()
3061        .filter(|(_, s)| s.name.ends_with(&suffix));
3062    let first = matches.next();
3063    let extra = matches.next();
3064    match (first, extra) {
3065        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
3066        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
3067            detail: alloc::format!("ambiguous column reference: {}", c.name),
3068        }),
3069        _ => Err(EvalError::ColumnNotFound {
3070            name: c.name.clone(),
3071        }),
3072    }
3073}
3074
3075fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
3076    match (op, v) {
3077        (_, Value::Null) => Ok(Value::Null),
3078        (UnOp::Neg, Value::Int(n)) => {
3079            n.checked_neg()
3080                .map(Value::Int)
3081                .ok_or(EvalError::TypeMismatch {
3082                    detail: "integer overflow on unary -".into(),
3083                })
3084        }
3085        (UnOp::Neg, Value::BigInt(n)) => {
3086            n.checked_neg()
3087                .map(Value::BigInt)
3088                .ok_or(EvalError::TypeMismatch {
3089                    detail: "bigint overflow on unary -".into(),
3090                })
3091        }
3092        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
3093        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
3094            detail: format!("unary - applied to {:?}", other.data_type()),
3095        }),
3096        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
3097        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
3098            detail: format!("NOT applied to {:?}", other.data_type()),
3099        }),
3100    }
3101}
3102
3103/// v7.9.27b — true when two values are "not distinct" per PG:
3104/// both NULL counts as equal; otherwise reduces to regular Eq.
3105fn values_not_distinct(l: &Value, r: &Value) -> bool {
3106    match (l, r) {
3107        (Value::Null, Value::Null) => true,
3108        (Value::Null, _) | (_, Value::Null) => false,
3109        _ => l == r,
3110    }
3111}
3112
3113fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
3114    // SQL three-valued logic for AND / OR with NULL is special — handle before
3115    // the general NULL-propagation rule.
3116    if let BinOp::And = op {
3117        return and_3vl(l, r);
3118    }
3119    if let BinOp::Or = op {
3120        return or_3vl(l, r);
3121    }
3122    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
3123    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
3124    if let BinOp::IsNotDistinctFrom = op {
3125        return Ok(Value::Bool(values_not_distinct(&l, &r)));
3126    }
3127    if let BinOp::IsDistinctFrom = op {
3128        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
3129    }
3130    // Everything else: any NULL operand → NULL.
3131    if l.is_null() || r.is_null() {
3132        return Ok(Value::Null);
3133    }
3134    // NUMERIC arithmetic and comparisons run in fixed-point; promote
3135    // integers to a common NUMERIC scale and stay in i128 throughout.
3136    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
3137        return apply_binary_numeric(op, l, r);
3138    }
3139    // Date / Timestamp arithmetic. PG semantics:
3140    //   * date + int      → date  (int is days)
3141    //   * int + date      → date
3142    //   * date - int      → date
3143    //   * date - date     → int   (days, signed)
3144    //   * timestamp - timestamp → bigint (microseconds, signed)
3145    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
3146    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
3147        return Ok(result);
3148    }
3149    match op {
3150        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
3151        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
3152        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
3153        BinOp::Div => div_op(l, r),
3154        BinOp::L2Distance => l2_distance(l, r),
3155        BinOp::InnerProduct => inner_product(l, r),
3156        BinOp::CosineDistance => cosine_distance(l, r),
3157        BinOp::Concat => Ok(text_concat(&l, &r)),
3158        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
3159        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
3160        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
3161        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
3162        BinOp::JsonContains => crate::json::contains(&l, &r),
3163        // v7.12.2 — `@@` match. NULL on either side → NULL; PG
3164        // accepts both orderings so we normalise.
3165        BinOp::TsMatch => ts_match(l, r),
3166        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3167            compare(op, &l, &r)
3168        }
3169        BinOp::And | BinOp::Or | BinOp::IsDistinctFrom | BinOp::IsNotDistinctFrom => {
3170            unreachable!("handled above")
3171        }
3172    }
3173}
3174
3175/// Calendar arithmetic. Returns `Some(value)` when the operand pair
3176/// is a date/time combo this function understands, `None` to let the
3177/// caller fall through to the regular numeric / text paths.
3178fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
3179    let int_value = |v: &Value| -> Option<i64> {
3180        match v {
3181            Value::SmallInt(n) => Some(i64::from(*n)),
3182            Value::Int(n) => Some(i64::from(*n)),
3183            Value::BigInt(n) => Some(*n),
3184            _ => None,
3185        }
3186    };
3187    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
3188    // DATE-integer subtraction, otherwise the latter swallows the
3189    // former with an `int_value(Date) = None` no-op fall-through.
3190    match (l, r) {
3191        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
3192            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
3193        }
3194        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
3195            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
3196                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
3197            })?;
3198            return Ok(Some(Value::BigInt(delta)));
3199        }
3200        _ => {}
3201    }
3202    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
3203    // date ± interval → date (if interval is pure days/months with no
3204    // sub-day component) else timestamp, interval ± interval → interval.
3205    if let Some(out) = apply_binary_interval(op, l, r)? {
3206        return Ok(Some(out));
3207    }
3208    match (l, r) {
3209        (Value::Date(d), other) if op == BinOp::Add => {
3210            if let Some(n) = int_value(other) {
3211                let days = i64::from(*d).saturating_add(n);
3212                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3213                    detail: "DATE + integer overflows DATE range".into(),
3214                })?;
3215                return Ok(Some(Value::Date(days32)));
3216            }
3217        }
3218        (other, Value::Date(d)) if op == BinOp::Add => {
3219            if let Some(n) = int_value(other) {
3220                let days = i64::from(*d).saturating_add(n);
3221                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3222                    detail: "integer + DATE overflows DATE range".into(),
3223                })?;
3224                return Ok(Some(Value::Date(days32)));
3225            }
3226        }
3227        (Value::Date(d), other) if op == BinOp::Sub => {
3228            if let Some(n) = int_value(other) {
3229                let days = i64::from(*d).saturating_sub(n);
3230                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3231                    detail: "DATE - integer overflows DATE range".into(),
3232                })?;
3233                return Ok(Some(Value::Date(days32)));
3234            }
3235        }
3236        _ => {}
3237    }
3238    Ok(None)
3239}
3240
3241/// INTERVAL-aware binary ops. Recognises:
3242///   timestamp ± interval → timestamp
3243///   date ± interval      → date (if interval is integral days/months only)
3244///                       → timestamp (if interval has sub-day micros)
3245///   interval ± interval  → interval
3246/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
3247/// the caller can fall through.
3248fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
3249    // Normalise so the interval (if any) is always on the right for Add;
3250    // Sub stays left-handed because it isn't commutative.
3251    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
3252        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
3253        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
3254        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
3255        _ => return Ok(None),
3256    };
3257    let Value::Interval {
3258        months: rhs_months,
3259        micros: rhs_us,
3260    } = rhs
3261    else {
3262        unreachable!("rhs guaranteed to be Interval by the match above");
3263    };
3264    let signed_months = i64::from(*rhs_months) * sign;
3265    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
3266        detail: "INTERVAL micros overflows on negation".into(),
3267    })?;
3268    match lhs {
3269        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
3270            *t,
3271            signed_months,
3272            signed_micros,
3273        )?))),
3274        Value::Date(d) => {
3275            // Date + interval stays a date when the interval has zero
3276            // sub-day microseconds; otherwise promote to TIMESTAMP at
3277            // midnight of the (months-shifted) date first.
3278            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
3279            if day_aligned {
3280                let micros_per_day = 86_400_000_000_i64;
3281                let days_delta = signed_micros / micros_per_day;
3282                let shifted = shift_date_by_months(*d, signed_months)?;
3283                let new_days =
3284                    i64::from(shifted)
3285                        .checked_add(days_delta)
3286                        .ok_or(EvalError::TypeMismatch {
3287                            detail: "DATE ± INTERVAL overflows DATE range".into(),
3288                        })?;
3289                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
3290                    detail: "DATE ± INTERVAL overflows DATE range".into(),
3291                })?;
3292                Ok(Some(Value::Date(days32)))
3293            } else {
3294                let base =
3295                    i64::from(*d)
3296                        .checked_mul(86_400_000_000)
3297                        .ok_or(EvalError::TypeMismatch {
3298                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
3299                        })?;
3300                Ok(Some(Value::Timestamp(add_interval_to_micros(
3301                    base,
3302                    signed_months,
3303                    signed_micros,
3304                )?)))
3305            }
3306        }
3307        Value::Interval {
3308            months: lhs_months,
3309            micros: lhs_us,
3310        } => {
3311            let new_months = i64::from(*lhs_months)
3312                .checked_add(signed_months)
3313                .and_then(|n| i32::try_from(n).ok())
3314                .ok_or(EvalError::TypeMismatch {
3315                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
3316                })?;
3317            let new_micros = lhs_us
3318                .checked_add(signed_micros)
3319                .ok_or(EvalError::TypeMismatch {
3320                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
3321                })?;
3322            Ok(Some(Value::Interval {
3323                months: new_months,
3324                micros: new_micros,
3325            }))
3326        }
3327        _ => Err(EvalError::TypeMismatch {
3328            detail: format!(
3329                "operator {op:?} not defined for {:?} and INTERVAL",
3330                lhs.data_type()
3331            ),
3332        }),
3333    }
3334}
3335
3336/// Shift a `Date` by a signed number of months using the PG clamp rule.
3337fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
3338    let (y, m, day) = civil_from_days(d);
3339    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
3340        detail: "INTERVAL months delta out of i32 range".into(),
3341    })?;
3342    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
3343    Ok(days_from_civil(ny, nm, nd))
3344}
3345
3346/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
3347/// Months part is applied through civil calendar with clamp-to-last-day;
3348/// micros part is plain i64 addition with overflow guard.
3349fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
3350    let mut out = t;
3351    if months != 0 {
3352        const MICROS_PER_DAY: i64 = 86_400_000_000;
3353        let days = out.div_euclid(MICROS_PER_DAY);
3354        let day_micros = out.rem_euclid(MICROS_PER_DAY);
3355        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3356            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
3357        })?;
3358        let shifted_days = shift_date_by_months(day_i32, months)?;
3359        out = i64::from(shifted_days)
3360            .checked_mul(MICROS_PER_DAY)
3361            .and_then(|n| n.checked_add(day_micros))
3362            .ok_or(EvalError::TypeMismatch {
3363                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
3364            })?;
3365    }
3366    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
3367        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
3368    })
3369}
3370
3371/// Dispatch for any binary op when at least one operand is NUMERIC.
3372/// Other-side integers / floats are promoted to a NUMERIC at a common
3373/// scale; all add / sub / mul / div / compare paths stay in i128.
3374#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
3375fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
3376    // Float still wins — Numeric + Float coerces both to f64 and runs
3377    // through the float path. PG demotes Numeric to float in this mix
3378    // too (the documented behaviour for `numeric + double precision`).
3379    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
3380    if float_path {
3381        let af = as_f64(&l)?;
3382        let bf = as_f64(&r)?;
3383        return match op {
3384            BinOp::Add => Ok(Value::Float(af + bf)),
3385            BinOp::Sub => Ok(Value::Float(af - bf)),
3386            BinOp::Mul => Ok(Value::Float(af * bf)),
3387            BinOp::Div => {
3388                if bf == 0.0 {
3389                    Err(EvalError::DivisionByZero)
3390                } else {
3391                    Ok(Value::Float(af / bf))
3392                }
3393            }
3394            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3395                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
3396                    detail: "NaN in NUMERIC/Float comparison".into(),
3397                })?;
3398                Ok(Value::Bool(cmp_to_bool(op, ord)))
3399            }
3400            BinOp::Concat => Ok(text_concat(&l, &r)),
3401            other => Err(EvalError::TypeMismatch {
3402                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
3403            }),
3404        };
3405    }
3406    // Promote integer ↔ numeric to a shared scale (max of both sides).
3407    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
3408        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
3409    })?;
3410    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
3411        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
3412    })?;
3413    match op {
3414        BinOp::Add | BinOp::Sub => {
3415            let target_scale = sa.max(sb);
3416            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
3417                detail: "NUMERIC overflow on rescale".into(),
3418            })?;
3419            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
3420                detail: "NUMERIC overflow on rescale".into(),
3421            })?;
3422            let r = match op {
3423                BinOp::Add => lhs.checked_add(rhs),
3424                BinOp::Sub => lhs.checked_sub(rhs),
3425                _ => unreachable!(),
3426            }
3427            .ok_or(EvalError::TypeMismatch {
3428                detail: "NUMERIC overflow on +/-".into(),
3429            })?;
3430            Ok(Value::Numeric {
3431                scaled: r,
3432                scale: target_scale,
3433            })
3434        }
3435        BinOp::Mul => {
3436            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
3437                detail: "NUMERIC overflow on *".into(),
3438            })?;
3439            Ok(Value::Numeric {
3440                scaled,
3441                scale: sa.saturating_add(sb),
3442            })
3443        }
3444        BinOp::Div => {
3445            if b == 0 {
3446                return Err(EvalError::DivisionByZero);
3447            }
3448            // Result scale: keep the wider operand's scale. Pre-scale
3449            // the numerator so the integer division retains that many
3450            // fractional digits. Round half-away-from-zero.
3451            let target_scale = sa.max(sb);
3452            // Numerator effective scale becomes sa + target_scale; we
3453            // bring it up to (target_scale + sb) so the divisor's scale
3454            // cancels cleanly.
3455            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
3456            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
3457                detail: "NUMERIC overflow on / scaling".into(),
3458            })?;
3459            let half = if b >= 0 { b / 2 } else { -(b / 2) };
3460            let adj = if (num >= 0) == (b >= 0) {
3461                num + half
3462            } else {
3463                num - half
3464            };
3465            Ok(Value::Numeric {
3466                scaled: adj / b,
3467                scale: target_scale,
3468            })
3469        }
3470        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3471            let target_scale = sa.max(sb);
3472            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
3473                detail: "NUMERIC overflow on rescale".into(),
3474            })?;
3475            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
3476                detail: "NUMERIC overflow on rescale".into(),
3477            })?;
3478            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
3479        }
3480        BinOp::Concat => Ok(text_concat(&l, &r)),
3481        other => Err(EvalError::TypeMismatch {
3482            detail: format!("operator {other:?} not defined for NUMERIC"),
3483        }),
3484    }
3485}
3486
3487/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
3488/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
3489/// returns `None` and the caller raises a type error.
3490fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
3491    match v {
3492        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
3493        Value::Int(n) => Some((i128::from(*n), 0)),
3494        Value::SmallInt(n) => Some((i128::from(*n), 0)),
3495        Value::BigInt(n) => Some((i128::from(*n), 0)),
3496        _ => None,
3497    }
3498}
3499
3500fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
3501    if src == dst {
3502        return Some(scaled);
3503    }
3504    if dst > src {
3505        scaled.checked_mul(pow10_i128(dst - src))
3506    } else {
3507        let drop = pow10_i128(src - dst);
3508        let half = drop / 2;
3509        let r = if scaled >= 0 {
3510            scaled + half
3511        } else {
3512            scaled - half
3513        };
3514        Some(r / drop)
3515    }
3516}
3517
3518const fn pow10_i128(p: u8) -> i128 {
3519    let mut acc: i128 = 1;
3520    let mut i = 0;
3521    while i < p {
3522        acc *= 10;
3523        i += 1;
3524    }
3525    acc
3526}
3527
3528const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
3529    use core::cmp::Ordering::{Equal, Greater, Less};
3530    match op {
3531        BinOp::Eq => matches!(ord, Equal),
3532        BinOp::NotEq => !matches!(ord, Equal),
3533        BinOp::Lt => matches!(ord, Less),
3534        BinOp::LtEq => matches!(ord, Less | Equal),
3535        BinOp::Gt => matches!(ord, Greater),
3536        BinOp::GtEq => matches!(ord, Greater | Equal),
3537        _ => false,
3538    }
3539}
3540
3541/// SQL `||` string concatenation. Operands are coerced to text via the same
3542/// rule as `::text` cast. NULL propagates (handled above; this function only
3543/// runs with non-NULL operands).
3544fn text_concat(l: &Value, r: &Value) -> Value {
3545    // v7.11.8 — PG `||` overloads: TEXT[] || TEXT[] = concatenated array;
3546    // TEXT[] || TEXT (or TEXT || TEXT[]) prepends/appends the single
3547    // element. NULL || anything = NULL (PG semantics for arrays;
3548    // text concat treats NULL the same way after value_to_text).
3549    match (l, r) {
3550        (Value::Null, _) | (_, Value::Null) => {
3551            // PG text concat: NULL || x = NULL. Array concat: NULL || x = NULL.
3552            // Keep the legacy text path (value_to_text handles Null as ""),
3553            // but for arrays we surface real NULL to match PG.
3554            if matches!(
3555                l,
3556                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
3557            ) || matches!(
3558                r,
3559                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
3560            ) {
3561                return Value::Null;
3562            }
3563        }
3564        (Value::TextArray(a), Value::TextArray(b)) => {
3565            let mut out = a.clone();
3566            out.extend(b.iter().cloned());
3567            return Value::TextArray(out);
3568        }
3569        (Value::TextArray(a), Value::Text(s)) => {
3570            let mut out = a.clone();
3571            out.push(Some(s.clone()));
3572            return Value::TextArray(out);
3573        }
3574        (Value::Text(s), Value::TextArray(b)) => {
3575            let mut out: alloc::vec::Vec<Option<alloc::string::String>> =
3576                alloc::vec::Vec::with_capacity(1 + b.len());
3577            out.push(Some(s.clone()));
3578            out.extend(b.iter().cloned());
3579            return Value::TextArray(out);
3580        }
3581        // v7.11.13 — IntArray / BigIntArray `||` overloads. Same
3582        // PG semantics as TEXT[]: array||array concatenates, and
3583        // array||scalar appends/prepends. Mixed Int/BigInt widens
3584        // to BigIntArray.
3585        (Value::IntArray(a), Value::IntArray(b)) => {
3586            let mut out = a.clone();
3587            out.extend(b.iter().copied());
3588            return Value::IntArray(out);
3589        }
3590        (Value::IntArray(a), Value::Int(n)) => {
3591            let mut out = a.clone();
3592            out.push(Some(*n));
3593            return Value::IntArray(out);
3594        }
3595        (Value::IntArray(a), Value::SmallInt(n)) => {
3596            let mut out = a.clone();
3597            out.push(Some(i32::from(*n)));
3598            return Value::IntArray(out);
3599        }
3600        (Value::Int(n), Value::IntArray(b)) => {
3601            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
3602            out.push(Some(*n));
3603            out.extend(b.iter().copied());
3604            return Value::IntArray(out);
3605        }
3606        (Value::SmallInt(n), Value::IntArray(b)) => {
3607            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
3608            out.push(Some(i32::from(*n)));
3609            out.extend(b.iter().copied());
3610            return Value::IntArray(out);
3611        }
3612        (Value::BigIntArray(a), Value::BigIntArray(b)) => {
3613            let mut out = a.clone();
3614            out.extend(b.iter().copied());
3615            return Value::BigIntArray(out);
3616        }
3617        (Value::BigIntArray(a), Value::IntArray(b)) => {
3618            let mut out = a.clone();
3619            out.extend(b.iter().map(|o| o.map(i64::from)));
3620            return Value::BigIntArray(out);
3621        }
3622        (Value::IntArray(a), Value::BigIntArray(b)) => {
3623            let mut out: alloc::vec::Vec<Option<i64>> =
3624                a.iter().map(|o| o.map(i64::from)).collect();
3625            out.extend(b.iter().copied());
3626            return Value::BigIntArray(out);
3627        }
3628        (Value::BigIntArray(a), Value::BigInt(n)) => {
3629            let mut out = a.clone();
3630            out.push(Some(*n));
3631            return Value::BigIntArray(out);
3632        }
3633        (Value::BigIntArray(a), Value::Int(n)) => {
3634            let mut out = a.clone();
3635            out.push(Some(i64::from(*n)));
3636            return Value::BigIntArray(out);
3637        }
3638        (Value::BigIntArray(a), Value::SmallInt(n)) => {
3639            let mut out = a.clone();
3640            out.push(Some(i64::from(*n)));
3641            return Value::BigIntArray(out);
3642        }
3643        (Value::BigInt(n), Value::BigIntArray(b)) => {
3644            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3645            out.push(Some(*n));
3646            out.extend(b.iter().copied());
3647            return Value::BigIntArray(out);
3648        }
3649        (Value::Int(n), Value::BigIntArray(b)) => {
3650            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3651            out.push(Some(i64::from(*n)));
3652            out.extend(b.iter().copied());
3653            return Value::BigIntArray(out);
3654        }
3655        (Value::SmallInt(n), Value::BigIntArray(b)) => {
3656            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3657            out.push(Some(i64::from(*n)));
3658            out.extend(b.iter().copied());
3659            return Value::BigIntArray(out);
3660        }
3661        // v7.11.15 — BYTEA `||` is byte concatenation.
3662        (Value::Bytes(a), Value::Bytes(b)) => {
3663            let mut out = a.clone();
3664            out.extend_from_slice(b);
3665            return Value::Bytes(out);
3666        }
3667        _ => {}
3668    }
3669    let a = value_to_text(l);
3670    let b = value_to_text(r);
3671    Value::Text(a + &b)
3672}
3673
3674/// pgvector inner-product `<#>`. Returns the *negative* dot product so
3675/// smaller still means more similar — same convention as pgvector.
3676fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
3677    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
3678    let mut dot: f64 = 0.0;
3679    for (x, y) in a.iter().zip(b.iter()) {
3680        dot += f64::from(*x) * f64::from(*y);
3681    }
3682    Ok(Value::Float(-dot))
3683}
3684
3685/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
3686/// operand produces NaN (matches pgvector).
3687fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
3688    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
3689    let mut dot: f64 = 0.0;
3690    let mut na: f64 = 0.0;
3691    let mut nb: f64 = 0.0;
3692    for (x, y) in a.iter().zip(b.iter()) {
3693        let xf = f64::from(*x);
3694        let yf = f64::from(*y);
3695        dot += xf * yf;
3696        na += xf * xf;
3697        nb += yf * yf;
3698    }
3699    let denom = sqrt_newton(na) * sqrt_newton(nb);
3700    if denom == 0.0 {
3701        return Ok(Value::Float(f64::NAN));
3702    }
3703    Ok(Value::Float(1.0 - dot / denom))
3704}
3705
3706fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
3707    // v6.0.1: SQ8 cells coming through the SQL evaluator are
3708    // dequantised to f32 here so the existing scalar distance
3709    // arithmetic stays intact. HNSW kNN search continues to use
3710    // the asymmetric ADC variant inside `cell_to_query_metric_
3711    // distance` — this path only runs when a vector expression
3712    // lands in the evaluator (full-scan ORDER BY, SELECT
3713    // projection of `v <-> $1`, etc.).
3714    let to_f32 = |v: Value| -> Option<Vec<f32>> {
3715        match v {
3716            Value::Vector(a) => Some(a),
3717            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
3718            // v6.0.3: bit-exact dequant for halfvec cells.
3719            Value::HalfVector(h) => Some(h.to_f32_vec()),
3720            _ => None,
3721        }
3722    };
3723    let l_ty = l.data_type();
3724    let r_ty = r.data_type();
3725    match (to_f32(l), to_f32(r)) {
3726        (Some(a), Some(b)) => {
3727            if a.len() != b.len() {
3728                return Err(EvalError::TypeMismatch {
3729                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
3730                });
3731            }
3732            Ok((a, b))
3733        }
3734        _ => Err(EvalError::TypeMismatch {
3735            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
3736        }),
3737    }
3738}
3739
3740/// Numeric arithmetic with widening.
3741/// - both `Int` → `Int` (with overflow check)
3742/// - `Int` op `BigInt` (either side) → `BigInt`
3743/// - any `Float` involved → `Float`
3744fn arith(
3745    l: Value,
3746    r: Value,
3747    int_op: impl Fn(i64, i64) -> Option<i64>,
3748    float_op: impl Fn(f64, f64) -> f64,
3749    op_name: &str,
3750) -> Result<Value, EvalError> {
3751    // Widen SmallInt to Int up front so the rest of the arithmetic
3752    // table only deals with Int / BigInt / Float pairs.
3753    let widen = |v: Value| -> Value {
3754        match v {
3755            Value::SmallInt(n) => Value::Int(i32::from(n)),
3756            other => other,
3757        }
3758    };
3759    let l = widen(l);
3760    let r = widen(r);
3761    match (l, r) {
3762        (Value::Int(a), Value::Int(b)) => {
3763            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
3764                detail: format!("integer overflow on {op_name}"),
3765            })?;
3766            if let Ok(small) = i32::try_from(result) {
3767                Ok(Value::Int(small))
3768            } else {
3769                Ok(Value::BigInt(result))
3770            }
3771        }
3772        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
3773            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
3774                detail: format!("bigint overflow on {op_name}"),
3775            })?;
3776            Ok(Value::BigInt(result))
3777        }
3778        (Value::BigInt(a), Value::BigInt(b)) => {
3779            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
3780                detail: format!("bigint overflow on {op_name}"),
3781            })?;
3782            Ok(Value::BigInt(result))
3783        }
3784        (a, b)
3785            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
3786        {
3787            let af = as_f64(&a)?;
3788            let bf = as_f64(&b)?;
3789            Ok(Value::Float(float_op(af, bf)))
3790        }
3791        (a, b) => Err(EvalError::TypeMismatch {
3792            detail: format!(
3793                "{op_name} applied to non-numeric: {:?} vs {:?}",
3794                a.data_type(),
3795                b.data_type()
3796            ),
3797        }),
3798    }
3799}
3800
3801/// L2 (Euclidean) distance between two vectors of equal dimension.
3802/// Returned as `Value::Float(d)` so it composes with the existing
3803/// comparison / sort plumbing. Mismatched dims or non-vector operands
3804/// raise `TypeMismatch`.
3805#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
3806fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
3807    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
3808    // cells dequantise on the way in. Sub-f64 precision loss is
3809    // negligible vs the dequantisation noise the SQ8 path already
3810    // ships with.
3811    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
3812    let mut sum: f64 = 0.0;
3813    for (x, y) in a.iter().zip(b.iter()) {
3814        let d = f64::from(*x) - f64::from(*y);
3815        sum += d * d;
3816    }
3817    Ok(Value::Float(sqrt_newton(sum)))
3818}
3819
3820/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
3821/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
3822/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
3823/// f32-derived distances, always non-negative, never NaN).
3824fn sqrt_newton(x: f64) -> f64 {
3825    if x <= 0.0 {
3826        return 0.0;
3827    }
3828    let mut g = x;
3829    // 10 iterations is conservative; 6 already converges to ulp for typical
3830    // distances.
3831    for _ in 0..10 {
3832        g = 0.5 * (g + x / g);
3833    }
3834    g
3835}
3836
3837fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
3838    let any_float = matches!(l.data_type(), Some(DataType::Float))
3839        || matches!(r.data_type(), Some(DataType::Float));
3840    if any_float {
3841        let a = as_f64(&l)?;
3842        let b = as_f64(&r)?;
3843        if b == 0.0 {
3844            return Err(EvalError::DivisionByZero);
3845        }
3846        return Ok(Value::Float(a / b));
3847    }
3848    arith(
3849        l,
3850        r,
3851        |a, b| {
3852            if b == 0 { None } else { Some(a / b) }
3853        },
3854        |a, b| a / b,
3855        "/",
3856    )
3857    .map_err(|e| match e {
3858        // The closure returns None on b == 0; translate that into the dedicated
3859        // DivisionByZero variant instead of "integer overflow on /".
3860        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
3861        other => other,
3862    })
3863}
3864
3865fn as_f64(v: &Value) -> Result<f64, EvalError> {
3866    match v {
3867        Value::SmallInt(n) => Ok(f64::from(*n)),
3868        Value::Int(n) => Ok(f64::from(*n)),
3869        #[allow(clippy::cast_precision_loss)]
3870        Value::BigInt(n) => Ok(*n as f64),
3871        Value::Float(x) => Ok(*x),
3872        #[allow(clippy::cast_precision_loss)]
3873        Value::Numeric { scaled, scale } => {
3874            let mut div = 1.0_f64;
3875            for _ in 0..*scale {
3876                div *= 10.0;
3877            }
3878            Ok((*scaled as f64) / div)
3879        }
3880        other => Err(EvalError::TypeMismatch {
3881            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
3882        }),
3883    }
3884}
3885
3886fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
3887    let ord = match (l, r) {
3888        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
3889        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
3890        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
3891        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
3892        (a, b)
3893            if matches!(a.data_type(), Some(DataType::Float))
3894                || matches!(b.data_type(), Some(DataType::Float)) =>
3895        {
3896            let af = as_f64(a)?;
3897            let bf = as_f64(b)?;
3898            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
3899                detail: "NaN in comparison".into(),
3900            })?
3901        }
3902        (Value::Text(a), Value::Text(b)) => a.cmp(b),
3903        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
3904        // Date / Timestamp compare on their integer storage repr.
3905        // Cross-domain (Date vs Timestamp) lifts the Date to the
3906        // matching midnight TIMESTAMP first.
3907        (Value::Date(a), Value::Date(b)) => a.cmp(b),
3908        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
3909        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
3910        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
3911        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
3912        // column against a text literal lifts the literal into the
3913        // matching domain (e.g. `day >= '2024-01-01'`).
3914        (Value::Date(a), Value::Text(b)) => {
3915            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
3916                detail: format!("cannot parse {b:?} as DATE for comparison"),
3917            })?;
3918            a.cmp(&bd)
3919        }
3920        (Value::Text(a), Value::Date(b)) => {
3921            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
3922                detail: format!("cannot parse {a:?} as DATE for comparison"),
3923            })?;
3924            ad.cmp(b)
3925        }
3926        (Value::Timestamp(a), Value::Text(b)) => {
3927            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
3928                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
3929            })?;
3930            a.cmp(&bt)
3931        }
3932        (Value::Text(a), Value::Timestamp(b)) => {
3933            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
3934                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
3935            })?;
3936            at.cmp(b)
3937        }
3938        (a, b) => {
3939            return Err(EvalError::TypeMismatch {
3940                detail: format!(
3941                    "comparison between {:?} and {:?}",
3942                    a.data_type(),
3943                    b.data_type()
3944                ),
3945            });
3946        }
3947    };
3948    let result = match op {
3949        BinOp::Eq => ord.is_eq(),
3950        BinOp::NotEq => !ord.is_eq(),
3951        BinOp::Lt => ord.is_lt(),
3952        BinOp::LtEq => ord.is_le(),
3953        BinOp::Gt => ord.is_gt(),
3954        BinOp::GtEq => ord.is_ge(),
3955        BinOp::And
3956        | BinOp::Or
3957        | BinOp::Add
3958        | BinOp::Sub
3959        | BinOp::Mul
3960        | BinOp::Div
3961        | BinOp::L2Distance
3962        | BinOp::InnerProduct
3963        | BinOp::CosineDistance
3964        | BinOp::Concat
3965        | BinOp::JsonGet
3966        | BinOp::JsonGetText
3967        | BinOp::JsonGetPath
3968        | BinOp::JsonGetPathText
3969        | BinOp::JsonContains
3970        | BinOp::TsMatch
3971        | BinOp::IsDistinctFrom
3972        | BinOp::IsNotDistinctFrom => {
3973            unreachable!("compare() only called with comparison ops")
3974        }
3975    };
3976    Ok(Value::Bool(result))
3977}
3978
3979// SQL three-valued AND / OR.
3980fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
3981    match (l, r) {
3982        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
3983        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
3984        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
3985        (a, b) => Err(EvalError::TypeMismatch {
3986            detail: format!(
3987                "AND on non-boolean: {:?} and {:?}",
3988                a.data_type(),
3989                b.data_type()
3990            ),
3991        }),
3992    }
3993}
3994
3995fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
3996    match (l, r) {
3997        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
3998        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
3999        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
4000        (a, b) => Err(EvalError::TypeMismatch {
4001            detail: format!(
4002                "OR on non-boolean: {:?} and {:?}",
4003                a.data_type(),
4004                b.data_type()
4005            ),
4006        }),
4007    }
4008}
4009
4010#[cfg(test)]
4011mod tests {
4012    use super::*;
4013    use alloc::vec;
4014    use spg_storage::{ColumnSchema, Row};
4015
4016    fn col(name: &str, ty: DataType) -> ColumnSchema {
4017        ColumnSchema::new(name, ty, true)
4018    }
4019
4020    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
4021        EvalContext::new(cols, alias)
4022    }
4023
4024    fn lit(n: i64) -> Expr {
4025        Expr::Literal(Literal::Integer(n))
4026    }
4027
4028    fn null() -> Expr {
4029        Expr::Literal(Literal::Null)
4030    }
4031
4032    fn col_ref(name: &str) -> Expr {
4033        Expr::Column(ColumnName {
4034            qualifier: None,
4035            name: name.into(),
4036        })
4037    }
4038
4039    #[test]
4040    fn literal_evaluates_to_value() {
4041        let r = Row::new(vec![]);
4042        let cs: [ColumnSchema; 0] = [];
4043        let c = ctx(&cs, None);
4044        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
4045        assert_eq!(
4046            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
4047            Value::Float(1.5)
4048        );
4049        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
4050    }
4051
4052    #[test]
4053    fn column_lookup_unqualified() {
4054        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
4055        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
4056        let c = ctx(&cs, None);
4057        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
4058        assert_eq!(
4059            eval_expr(&col_ref("b"), &r, &c).unwrap(),
4060            Value::Text("hi".into())
4061        );
4062    }
4063
4064    #[test]
4065    fn column_not_found_errors() {
4066        let cs = vec![col("a", DataType::Int)];
4067        let r = Row::new(vec![Value::Int(0)]);
4068        let c = ctx(&cs, None);
4069        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
4070        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
4071    }
4072
4073    #[test]
4074    fn qualified_column_matches_alias() {
4075        let cs = vec![col("a", DataType::Int)];
4076        let r = Row::new(vec![Value::Int(5)]);
4077        let c = ctx(&cs, Some("u"));
4078        let qualified = Expr::Column(ColumnName {
4079            qualifier: Some("u".into()),
4080            name: "a".into(),
4081        });
4082        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
4083    }
4084
4085    #[test]
4086    fn qualified_column_unknown_alias_errors() {
4087        let cs = vec![col("a", DataType::Int)];
4088        let r = Row::new(vec![Value::Int(5)]);
4089        let c = ctx(&cs, Some("u"));
4090        let wrong = Expr::Column(ColumnName {
4091            qualifier: Some("x".into()),
4092            name: "a".into(),
4093        });
4094        assert!(matches!(
4095            eval_expr(&wrong, &r, &c).unwrap_err(),
4096            EvalError::UnknownQualifier { .. }
4097        ));
4098    }
4099
4100    #[test]
4101    fn arithmetic_with_widening() {
4102        let r = Row::new(vec![]);
4103        let cs: [ColumnSchema; 0] = [];
4104        let c = ctx(&cs, None);
4105        let e = Expr::Binary {
4106            lhs: alloc::boxed::Box::new(lit(2)),
4107            op: BinOp::Add,
4108            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
4109        };
4110        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
4111    }
4112
4113    #[test]
4114    fn division_by_zero_errors() {
4115        let r = Row::new(vec![]);
4116        let cs: [ColumnSchema; 0] = [];
4117        let c = ctx(&cs, None);
4118        let e = Expr::Binary {
4119            lhs: alloc::boxed::Box::new(lit(1)),
4120            op: BinOp::Div,
4121            rhs: alloc::boxed::Box::new(lit(0)),
4122        };
4123        assert_eq!(
4124            eval_expr(&e, &r, &c).unwrap_err(),
4125            EvalError::DivisionByZero
4126        );
4127    }
4128
4129    #[test]
4130    fn comparison_returns_bool() {
4131        let r = Row::new(vec![]);
4132        let cs: [ColumnSchema; 0] = [];
4133        let c = ctx(&cs, None);
4134        let e = Expr::Binary {
4135            lhs: alloc::boxed::Box::new(lit(1)),
4136            op: BinOp::Lt,
4137            rhs: alloc::boxed::Box::new(lit(2)),
4138        };
4139        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
4140    }
4141
4142    #[test]
4143    fn null_propagates_through_arithmetic() {
4144        let r = Row::new(vec![]);
4145        let cs: [ColumnSchema; 0] = [];
4146        let c = ctx(&cs, None);
4147        let e = Expr::Binary {
4148            lhs: alloc::boxed::Box::new(lit(1)),
4149            op: BinOp::Add,
4150            rhs: alloc::boxed::Box::new(null()),
4151        };
4152        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
4153    }
4154
4155    #[test]
4156    fn and_three_valued_logic() {
4157        let r = Row::new(vec![]);
4158        let cs: [ColumnSchema; 0] = [];
4159        let c = ctx(&cs, None);
4160        let tt = |a: bool, b_null: bool| Expr::Binary {
4161            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
4162            op: BinOp::And,
4163            rhs: alloc::boxed::Box::new(if b_null {
4164                null()
4165            } else {
4166                Expr::Literal(Literal::Bool(true))
4167            }),
4168        };
4169        // FALSE AND NULL → FALSE
4170        assert_eq!(
4171            eval_expr(&tt(false, true), &r, &c).unwrap(),
4172            Value::Bool(false)
4173        );
4174        // TRUE AND NULL → NULL
4175        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
4176        // TRUE AND TRUE → TRUE
4177        assert_eq!(
4178            eval_expr(&tt(true, false), &r, &c).unwrap(),
4179            Value::Bool(true)
4180        );
4181    }
4182
4183    #[test]
4184    fn or_three_valued_logic() {
4185        let r = Row::new(vec![]);
4186        let cs: [ColumnSchema; 0] = [];
4187        let c = ctx(&cs, None);
4188        let or_with_null = |a: bool| Expr::Binary {
4189            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
4190            op: BinOp::Or,
4191            rhs: alloc::boxed::Box::new(null()),
4192        };
4193        // TRUE OR NULL → TRUE
4194        assert_eq!(
4195            eval_expr(&or_with_null(true), &r, &c).unwrap(),
4196            Value::Bool(true)
4197        );
4198        // FALSE OR NULL → NULL
4199        assert_eq!(
4200            eval_expr(&or_with_null(false), &r, &c).unwrap(),
4201            Value::Null
4202        );
4203    }
4204
4205    #[test]
4206    fn not_on_null_is_null() {
4207        let r = Row::new(vec![]);
4208        let cs: [ColumnSchema; 0] = [];
4209        let c = ctx(&cs, None);
4210        let e = Expr::Unary {
4211            op: UnOp::Not,
4212            expr: alloc::boxed::Box::new(null()),
4213        };
4214        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
4215    }
4216
4217    #[test]
4218    fn text_comparison_lexicographic() {
4219        let r = Row::new(vec![]);
4220        let cs: [ColumnSchema; 0] = [];
4221        let c = ctx(&cs, None);
4222        let e = Expr::Binary {
4223            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
4224            op: BinOp::Lt,
4225            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
4226        };
4227        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
4228    }
4229
4230    #[test]
4231    fn interval_format_basics() {
4232        assert_eq!(format_interval(0, 0), "0");
4233        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
4234        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
4235        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
4236        assert_eq!(
4237            format_interval(0, 86_400_000_000 + 9_000_000),
4238            "1 day 00:00:09"
4239        );
4240        assert_eq!(format_interval(14, 0), "1 year 2 mons");
4241        assert_eq!(format_interval(-1, 0), "-1 mons");
4242    }
4243
4244    #[test]
4245    fn interval_add_to_timestamp_micros_part() {
4246        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
4247        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
4248        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
4249        let expected = ts + 3_600_000_000;
4250        assert_eq!(r, expected);
4251    }
4252
4253    #[test]
4254    fn interval_clamp_month_end() {
4255        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
4256        let d = days_from_civil(2024, 1, 31);
4257        let shifted = shift_date_by_months(d, 1).unwrap();
4258        let (y, m, day) = civil_from_days(shifted);
4259        assert_eq!((y, m, day), (2024, 2, 29));
4260        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
4261        let d = days_from_civil(2023, 1, 31);
4262        let shifted = shift_date_by_months(d, 1).unwrap();
4263        let (y, m, day) = civil_from_days(shifted);
4264        assert_eq!((y, m, day), (2023, 2, 28));
4265        // 2024-03-31 - 1 month = 2024-02-29.
4266        let d = days_from_civil(2024, 3, 31);
4267        let shifted = shift_date_by_months(d, -1).unwrap();
4268        let (y, m, day) = civil_from_days(shifted);
4269        assert_eq!((y, m, day), (2024, 2, 29));
4270    }
4271
4272    #[test]
4273    fn interval_date_plus_pure_days_stays_date() {
4274        // DATE + INTERVAL '7 days' must stay DATE.
4275        let d = days_from_civil(2024, 6, 1);
4276        let lhs = Value::Date(d);
4277        let rhs = Value::Interval {
4278            months: 0,
4279            micros: 7 * 86_400_000_000,
4280        };
4281        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
4282            .unwrap()
4283            .unwrap();
4284        let expected = days_from_civil(2024, 6, 8);
4285        assert_eq!(v, Value::Date(expected));
4286    }
4287
4288    #[test]
4289    fn interval_date_plus_sub_day_lifts_to_timestamp() {
4290        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
4291        let d = days_from_civil(2024, 6, 1);
4292        let lhs = Value::Date(d);
4293        let rhs = Value::Interval {
4294            months: 0,
4295            micros: 3_600_000_000,
4296        };
4297        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
4298            .unwrap()
4299            .unwrap();
4300        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
4301        assert_eq!(v, Value::Timestamp(expected));
4302    }
4303}