Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::boxed::Box;
19use alloc::format;
20use alloc::string::{String, ToString};
21use alloc::vec::Vec;
22
23use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
24use spg_storage::{ColumnSchema, DataType, Row, TsLexeme, TsQueryAst, Value};
25
26/// Resolution context for evaluating a single row. `table_alias` is the alias
27/// (or table name) callers should accept as the qualifier on a column ref —
28/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
29#[derive(Debug, Clone)]
30pub struct EvalContext<'a> {
31    pub columns: &'a [ColumnSchema],
32    pub table_alias: Option<&'a str>,
33    /// v6.1.1 — bound parameters for `$N` placeholders inside the
34    /// expression tree. Empty for simple queries; populated by the
35    /// prepared-statement Execute path with Bind values converted
36    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
37    pub params: &'a [Value],
38    /// v7.12.1 — session text-search config (from `SET
39    /// default_text_search_config = '<name>'`). Resolved when the
40    /// engine builds an `EvalContext` and consumed by the FTS
41    /// function dispatcher when `to_tsvector(text)` /
42    /// `plainto_tsquery(text)` etc are called without an explicit
43    /// config arg. `None` falls through to `simple`.
44    pub default_text_search_config: Option<&'a str>,
45}
46
47impl<'a> EvalContext<'a> {
48    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
49        Self {
50            columns,
51            table_alias,
52            params: &[],
53            default_text_search_config: None,
54        }
55    }
56
57    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
58    /// resolution. The slice must outlive the context; callers
59    /// construct it from the prepared statement's Bind values.
60    #[must_use]
61    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
62        self.params = params;
63        self
64    }
65
66    /// v7.12.1 — attach the session's
67    /// `default_text_search_config`. Used by the FTS function
68    /// dispatcher when no explicit config arg is given.
69    #[must_use]
70    pub const fn with_default_text_search_config(mut self, cfg: Option<&'a str>) -> Self {
71        self.default_text_search_config = cfg;
72        self
73    }
74}
75
76#[derive(Debug, Clone, PartialEq)]
77pub enum EvalError {
78    ColumnNotFound {
79        name: String,
80    },
81    UnknownQualifier {
82        qualifier: String,
83    },
84    DivisionByZero,
85    TypeMismatch {
86        detail: String,
87    },
88    /// v6.1.1 — `$N` reference past the number of bound parameters.
89    /// Either the client sent too few in Bind, or the SQL has a
90    /// placeholder the prepared statement didn't account for.
91    PlaceholderOutOfRange {
92        n: u16,
93        bound: u16,
94    },
95}
96
97impl core::fmt::Display for EvalError {
98    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
99        match self {
100            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
101            Self::UnknownQualifier { qualifier } => {
102                write!(f, "unknown table qualifier: {qualifier}")
103            }
104            Self::DivisionByZero => f.write_str("division by zero"),
105            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
106            Self::PlaceholderOutOfRange { n, bound } => write!(
107                f,
108                "parameter ${n} referenced but only {bound} bound by client"
109            ),
110        }
111    }
112}
113
114pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
115    match expr {
116        Expr::Literal(l) => Ok(literal_to_value(l)),
117        Expr::Column(c) => resolve_column(c, row, ctx),
118        Expr::Placeholder(n) => {
119            let idx = usize::from(*n).saturating_sub(1);
120            ctx.params
121                .get(idx)
122                .cloned()
123                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
124                    n: *n,
125                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
126                })
127        }
128        Expr::Unary { op, expr } => {
129            let v = eval_expr(expr, row, ctx)?;
130            apply_unary(*op, v)
131        }
132        Expr::Binary { lhs, op, rhs } => {
133            let l = eval_expr(lhs, row, ctx)?;
134            let r = eval_expr(rhs, row, ctx)?;
135            apply_binary(*op, l, r)
136        }
137        Expr::Cast { expr, target } => {
138            let v = eval_expr(expr, row, ctx)?;
139            cast_value(v, *target)
140        }
141        Expr::IsNull { expr, negated } => {
142            let v = eval_expr(expr, row, ctx)?;
143            let is_null = matches!(v, Value::Null);
144            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
145        }
146        Expr::FunctionCall { name, args } => {
147            let evaluated: Result<Vec<Value>, _> =
148                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
149            apply_function(name, &evaluated?, ctx)
150        }
151        Expr::Like {
152            expr,
153            pattern,
154            negated,
155        } => {
156            let v = eval_expr(expr, row, ctx)?;
157            let p = eval_expr(pattern, row, ctx)?;
158            // NULL on either side propagates to NULL — same as PG.
159            let (text, pat) = match (v, p) {
160                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
161                (Value::Text(a), Value::Text(b)) => (a, b),
162                (Value::Text(_), other) | (other, _) => {
163                    return Err(EvalError::TypeMismatch {
164                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
165                    });
166                }
167            };
168            let m = like_match(&text, &pat);
169            Ok(Value::Bool(if *negated { !m } else { m }))
170        }
171        Expr::Extract { field, source } => {
172            let v = eval_expr(source, row, ctx)?;
173            extract_field(*field, &v)
174        }
175        // v4.10: subquery nodes should have been resolved into
176        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
177        // before the row loop. Anything reaching here is a bug.
178        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
179            Err(EvalError::TypeMismatch {
180                detail: "subquery reached row eval — engine resolver bug".into(),
181            })
182        }
183        // v4.12: window functions should have been rewritten into
184        // synthetic __win_N column references by
185        // exec_select_with_window before row eval. Anything
186        // reaching here is similarly a bug.
187        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
188            detail: "window function reached row eval — engine rewrite bug".into(),
189        }),
190        // v7.10.10 — `ARRAY[expr, expr, …]` constructor.
191        // v7.11.13 — element-type detection: all integers →
192        // IntArray (or BigIntArray when widening), any Text →
193        // TextArray. Non-TEXT non-integer elements (Bool, Float)
194        // stringify into TextArray as the safe default.
195        Expr::Array(items) => {
196            let mut materialised: Vec<Value> = Vec::with_capacity(items.len());
197            for elem in items {
198                materialised.push(eval_expr(elem, row, ctx)?);
199            }
200            let mut has_text = false;
201            let mut has_bigint = false;
202            let mut has_int = false;
203            for v in &materialised {
204                match v {
205                    Value::Null => {}
206                    Value::Int(_) | Value::SmallInt(_) => has_int = true,
207                    Value::BigInt(_) => has_bigint = true,
208                    Value::Text(_) | Value::Json(_) => has_text = true,
209                    _ => has_text = true,
210                }
211            }
212            if has_text || (!has_int && !has_bigint) {
213                let out: Vec<Option<String>> = materialised
214                    .into_iter()
215                    .map(|v| match v {
216                        Value::Null => None,
217                        Value::Text(s) | Value::Json(s) => Some(s),
218                        other => Some(value_to_text_for_array(&other)),
219                    })
220                    .collect();
221                return Ok(Value::TextArray(out));
222            }
223            if has_bigint {
224                let out: Vec<Option<i64>> = materialised
225                    .into_iter()
226                    .map(|v| match v {
227                        Value::Null => None,
228                        Value::Int(n) => Some(i64::from(n)),
229                        Value::SmallInt(n) => Some(i64::from(n)),
230                        Value::BigInt(n) => Some(n),
231                        _ => unreachable!(),
232                    })
233                    .collect();
234                return Ok(Value::BigIntArray(out));
235            }
236            let out: Vec<Option<i32>> = materialised
237                .into_iter()
238                .map(|v| match v {
239                    Value::Null => None,
240                    Value::Int(n) => Some(n),
241                    Value::SmallInt(n) => Some(i32::from(n)),
242                    _ => unreachable!(),
243                })
244                .collect();
245            Ok(Value::IntArray(out))
246        }
247        // v7.10.12 — `arr[i]` PG-style 1-based indexing.
248        // Out-of-range indices (including i ≤ 0) return NULL.
249        Expr::ArraySubscript { target, index } => {
250            let target_v = eval_expr(target, row, ctx)?;
251            let idx_v = eval_expr(index, row, ctx)?;
252            if matches!(target_v, Value::Null) || matches!(idx_v, Value::Null) {
253                return Ok(Value::Null);
254            }
255            let i: i64 = match idx_v {
256                Value::Int(n) => i64::from(n),
257                Value::BigInt(n) => n,
258                Value::SmallInt(n) => i64::from(n),
259                other => {
260                    return Err(EvalError::TypeMismatch {
261                        detail: format!(
262                            "array subscript must be integer, got {:?}",
263                            other.data_type()
264                        ),
265                    });
266                }
267            };
268            if i < 1 {
269                return Ok(Value::Null);
270            }
271            let pos = (i - 1) as usize;
272            match target_v {
273                Value::TextArray(items) => match items.get(pos) {
274                    Some(Some(s)) => Ok(Value::Text(s.clone())),
275                    Some(None) | None => Ok(Value::Null),
276                },
277                Value::IntArray(items) => match items.get(pos) {
278                    Some(Some(n)) => Ok(Value::Int(*n)),
279                    Some(None) | None => Ok(Value::Null),
280                },
281                Value::BigIntArray(items) => match items.get(pos) {
282                    Some(Some(n)) => Ok(Value::BigInt(*n)),
283                    Some(None) | None => Ok(Value::Null),
284                },
285                other => Err(EvalError::TypeMismatch {
286                    detail: format!(
287                        "subscript target must be an array, got {:?}",
288                        other.data_type()
289                    ),
290                }),
291            }
292        }
293        // v7.10.12 — `x op ANY(arr)` / `x op ALL(arr)`. PG
294        // 3VL: ANY → true if any element compares-true; NULL if
295        // no true but some NULL; false otherwise. ALL: false if
296        // any compares-false; NULL if no false but some NULL;
297        // true otherwise.
298        Expr::AnyAll {
299            expr,
300            op,
301            array,
302            is_any,
303        } => {
304            let lhs = eval_expr(expr, row, ctx)?;
305            let arr = eval_expr(array, row, ctx)?;
306            if matches!(arr, Value::Null) {
307                return Ok(Value::Null);
308            }
309            let elems: Vec<Option<Value>> = match arr {
310                Value::TextArray(items) => items.into_iter().map(|o| o.map(Value::Text)).collect(),
311                Value::IntArray(items) => items.into_iter().map(|o| o.map(Value::Int)).collect(),
312                Value::BigIntArray(items) => {
313                    items.into_iter().map(|o| o.map(Value::BigInt)).collect()
314                }
315                other => {
316                    return Err(EvalError::TypeMismatch {
317                        detail: format!(
318                            "ANY/ALL right-hand side must be an array, got {:?}",
319                            other.data_type()
320                        ),
321                    });
322                }
323            };
324            let mut saw_null = matches!(lhs, Value::Null);
325            let mut saw_match = false;
326            let mut saw_mismatch = false;
327            for elem in elems {
328                let elem_v = match elem {
329                    Some(v) => v,
330                    None => {
331                        saw_null = true;
332                        continue;
333                    }
334                };
335                if matches!(lhs, Value::Null) {
336                    saw_null = true;
337                    continue;
338                }
339                match apply_binary(*op, lhs.clone(), elem_v) {
340                    Ok(Value::Bool(true)) => saw_match = true,
341                    Ok(Value::Bool(false)) => saw_mismatch = true,
342                    Ok(Value::Null) => saw_null = true,
343                    Ok(other) => {
344                        return Err(EvalError::TypeMismatch {
345                            detail: format!(
346                                "ANY/ALL comparison didn't return Bool: {:?}",
347                                other.data_type()
348                            ),
349                        });
350                    }
351                    Err(e) => return Err(e),
352                }
353            }
354            let result = if *is_any {
355                if saw_match {
356                    Value::Bool(true)
357                } else if saw_null {
358                    Value::Null
359                } else {
360                    Value::Bool(false)
361                }
362            } else if saw_mismatch {
363                Value::Bool(false)
364            } else if saw_null {
365                Value::Null
366            } else {
367                Value::Bool(true)
368            };
369            Ok(result)
370        }
371        // v7.13.0 — CASE WHEN … END (mailrs round-5 G9).
372        // Short-circuit on the first matching branch. Searched form
373        // (operand=None) treats each branch's WHEN as a Bool
374        // predicate. Simple form (operand=Some) compares with =.
375        // ELSE on no match; NULL if no ELSE.
376        Expr::Case {
377            operand,
378            branches,
379            else_branch,
380        } => {
381            let operand_value = match operand {
382                Some(o) => Some(eval_expr(o, row, ctx)?),
383                None => None,
384            };
385            for (when_expr, then_expr) in branches {
386                let when_value = eval_expr(when_expr, row, ctx)?;
387                let matched = match &operand_value {
388                    None => matches!(when_value, Value::Bool(true)),
389                    Some(op_v) => matches!(
390                        apply_binary(spg_sql::ast::BinOp::Eq, op_v.clone(), when_value)?,
391                        Value::Bool(true)
392                    ),
393                };
394                if matched {
395                    return eval_expr(then_expr, row, ctx);
396                }
397            }
398            match else_branch {
399                Some(e) => eval_expr(e, row, ctx),
400                None => Ok(Value::Null),
401            }
402        }
403    }
404}
405
406/// v7.10.10 — best-effort text rendering for non-TEXT array
407/// elements (numbers, bools, etc.). The PG rule is that
408/// `ARRAY[1, 2]` is `int[]`, but SPG's v7.10 only models TEXT[],
409/// so we widen by stringifying. NUMERIC formatting goes through
410/// the existing canonical helpers to stay consistent with
411/// `format_numeric` / `format_date` etc.
412fn value_to_text_for_array(v: &Value) -> String {
413    match v {
414        Value::Text(s) | Value::Json(s) => s.clone(),
415        Value::Int(n) => n.to_string(),
416        Value::BigInt(n) => n.to_string(),
417        Value::SmallInt(n) => n.to_string(),
418        Value::Bool(b) => {
419            if *b {
420                "true".into()
421            } else {
422                "false".into()
423            }
424        }
425        Value::Float(x) => format!("{x}"),
426        Value::Date(d) => format_date(*d),
427        Value::Timestamp(t) => format_timestamp(*t),
428        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
429        _ => format!("{v:?}"),
430    }
431}
432
433/// Pull an integer component (year / month / ... / microsecond) out
434/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
435/// when the source isn't a calendar type.
436fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
437    use spg_sql::ast::ExtractField as F;
438    if matches!(v, Value::Null) {
439        return Ok(Value::Null);
440    }
441    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
442    // the months part, the rest from the microseconds part. PG matches
443    // this convention (months is normalised modulo 12 for MONTH).
444    if let Value::Interval { months, micros } = *v {
445        let years = months / 12;
446        let mons = months % 12;
447        let secs_total = micros / 1_000_000;
448        let frac = micros % 1_000_000;
449        let result = match field {
450            F::Year => i64::from(years),
451            F::Month => i64::from(mons),
452            F::Day => micros / 86_400_000_000,
453            F::Hour => (secs_total / 3600) % 24,
454            F::Minute => (secs_total / 60) % 60,
455            F::Second => secs_total % 60,
456            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
457        };
458        return Ok(Value::BigInt(result));
459    }
460    let (days, day_micros) = match *v {
461        Value::Date(d) => (d, 0_i64),
462        Value::Timestamp(t) => {
463            let days = t.div_euclid(86_400_000_000);
464            let day_micros = t.rem_euclid(86_400_000_000);
465            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
466        }
467        _ => {
468            return Err(EvalError::TypeMismatch {
469                detail: format!(
470                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
471                    v.data_type()
472                ),
473            });
474        }
475    };
476    let (y, m, d) = civil_components(days);
477    let secs = day_micros / 1_000_000;
478    let hh = secs / 3600;
479    let mm = (secs / 60) % 60;
480    let ss = secs % 60;
481    let frac = day_micros % 1_000_000;
482    let result = match field {
483        F::Year => i64::from(y),
484        F::Month => i64::from(m),
485        F::Day => i64::from(d),
486        F::Hour => hh,
487        F::Minute => mm,
488        F::Second => ss,
489        F::Microsecond => ss * 1_000_000 + frac,
490    };
491    Ok(Value::BigInt(result))
492}
493
494/// Internal wrapper around the file-private `civil_from_days` so the
495/// public surface area doesn't change. Returns `(year, month, day)`.
496fn civil_components(days: i32) -> (i32, u32, u32) {
497    civil_from_days(days)
498}
499
500/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
501/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
502/// literal `%`. Matches the whole input — no implicit anchoring needed
503/// since SQL `LIKE` is always full-string.
504fn like_match(text: &str, pattern: &str) -> bool {
505    let text: Vec<char> = text.chars().collect();
506    let pat: Vec<char> = pattern.chars().collect();
507    like_match_inner(&text, 0, &pat, 0)
508}
509
510fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
511    while pi < pat.len() {
512        match pat[pi] {
513            '%' => {
514                // Collapse consecutive `%` and try every possible split.
515                while pi < pat.len() && pat[pi] == '%' {
516                    pi += 1;
517                }
518                if pi == pat.len() {
519                    return true;
520                }
521                for k in ti..=text.len() {
522                    if like_match_inner(text, k, pat, pi) {
523                        return true;
524                    }
525                }
526                return false;
527            }
528            '_' => {
529                if ti >= text.len() {
530                    return false;
531                }
532                ti += 1;
533                pi += 1;
534            }
535            '\\' if pi + 1 < pat.len() => {
536                let want = pat[pi + 1];
537                if ti >= text.len() || text[ti] != want {
538                    return false;
539                }
540                ti += 1;
541                pi += 2;
542            }
543            c => {
544                if ti >= text.len() || text[ti] != c {
545                    return false;
546                }
547                ti += 1;
548                pi += 1;
549            }
550        }
551    }
552    ti == text.len()
553}
554
555/// Dispatch on lowercased function name. v1.4 implements only a handful of
556/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
557fn apply_function(name: &str, args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
558    match name.to_ascii_lowercase().as_str() {
559        "length" => {
560            if args.len() != 1 {
561                return Err(EvalError::TypeMismatch {
562                    detail: format!("length() takes 1 arg, got {}", args.len()),
563                });
564            }
565            match &args[0] {
566                Value::Null => Ok(Value::Null),
567                Value::Text(s) => {
568                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
569                    Ok(Value::Int(n))
570                }
571                // v7.10.4 — PG semantics: length(bytea) returns
572                // byte count (= octet_length). Without this branch
573                // mailrs's INSERT … SELECT length(body) … against a
574                // BYTEA column would type-mismatch.
575                Value::Bytes(b) => {
576                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
577                    Ok(Value::Int(n))
578                }
579                other => Err(EvalError::TypeMismatch {
580                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
581                }),
582            }
583        }
584        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
585        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
586        // to length() for bytea by design.
587        "octet_length" => {
588            if args.len() != 1 {
589                return Err(EvalError::TypeMismatch {
590                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
591                });
592            }
593            match &args[0] {
594                Value::Null => Ok(Value::Null),
595                Value::Text(s) => {
596                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
597                    Ok(Value::Int(n))
598                }
599                Value::Bytes(b) => {
600                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
601                    Ok(Value::Int(n))
602                }
603                other => Err(EvalError::TypeMismatch {
604                    detail: format!(
605                        "octet_length() needs text or bytea, got {:?}",
606                        other.data_type()
607                    ),
608                }),
609            }
610        }
611        // v7.11.6 — `array_length(arr, dim)` returns the element
612        // count of `arr` along dimension `dim`. v7.11 only models
613        // single-dimension arrays so dim must be 1 (otherwise NULL,
614        // matching PG semantics for unsupported dimensions). NULL
615        // array → NULL. v7.11 TEXT[] only; non-array operand is
616        // a type mismatch.
617        "array_length" => {
618            if args.len() != 2 {
619                return Err(EvalError::TypeMismatch {
620                    detail: format!("array_length() takes 2 args, got {}", args.len()),
621                });
622            }
623            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
624                return Ok(Value::Null);
625            }
626            let len = match &args[0] {
627                Value::TextArray(items) => items.len(),
628                Value::IntArray(items) => items.len(),
629                Value::BigIntArray(items) => items.len(),
630                _ => {
631                    return Err(EvalError::TypeMismatch {
632                        detail: format!(
633                            "array_length() first arg must be an array, got {:?}",
634                            args[0].data_type()
635                        ),
636                    });
637                }
638            };
639            let dim: i64 = match args[1] {
640                Value::Int(n) => i64::from(n),
641                Value::BigInt(n) => n,
642                Value::SmallInt(n) => i64::from(n),
643                _ => {
644                    return Err(EvalError::TypeMismatch {
645                        detail: format!(
646                            "array_length() second arg must be integer, got {:?}",
647                            args[1].data_type()
648                        ),
649                    });
650                }
651            };
652            if dim != 1 {
653                return Ok(Value::Null);
654            }
655            let n = i32::try_from(len).unwrap_or(i32::MAX);
656            Ok(Value::Int(n))
657        }
658        // v7.11.6 — `array_position(arr, val)` returns 1-based
659        // index of the first element of `arr` equal to `val`, or
660        // NULL if not found. PG NULL semantics: NULL array → NULL;
661        // NULL val never matches (returns NULL if absent).
662        "array_position" => {
663            if args.len() != 2 {
664                return Err(EvalError::TypeMismatch {
665                    detail: format!("array_position() takes 2 args, got {}", args.len()),
666                });
667            }
668            if matches!(args[0], Value::Null) {
669                return Ok(Value::Null);
670            }
671            if matches!(args[1], Value::Null) {
672                return Ok(Value::Null);
673            }
674            match (&args[0], &args[1]) {
675                (Value::TextArray(items), Value::Text(needle)) => {
676                    for (idx, item) in items.iter().enumerate() {
677                        if let Some(s) = item
678                            && s == needle
679                        {
680                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
681                        }
682                    }
683                    Ok(Value::Null)
684                }
685                (Value::IntArray(items), needle_v)
686                    if matches!(
687                        needle_v,
688                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
689                    ) =>
690                {
691                    let needle: i64 = match *needle_v {
692                        Value::Int(n) => i64::from(n),
693                        Value::SmallInt(n) => i64::from(n),
694                        Value::BigInt(n) => n,
695                        _ => unreachable!(),
696                    };
697                    for (idx, item) in items.iter().enumerate() {
698                        if let Some(n) = item
699                            && i64::from(*n) == needle
700                        {
701                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
702                        }
703                    }
704                    Ok(Value::Null)
705                }
706                (Value::BigIntArray(items), needle_v)
707                    if matches!(
708                        needle_v,
709                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
710                    ) =>
711                {
712                    let needle: i64 = match *needle_v {
713                        Value::Int(n) => i64::from(n),
714                        Value::SmallInt(n) => i64::from(n),
715                        Value::BigInt(n) => n,
716                        _ => unreachable!(),
717                    };
718                    for (idx, item) in items.iter().enumerate() {
719                        if let Some(n) = item
720                            && *n == needle
721                        {
722                            return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
723                        }
724                    }
725                    Ok(Value::Null)
726                }
727                (
728                    arr @ (Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_)),
729                    other,
730                ) => Err(EvalError::TypeMismatch {
731                    detail: format!(
732                        "array_position() needle type {:?} doesn't match array {:?}",
733                        other.data_type(),
734                        arr.data_type()
735                    ),
736                }),
737                (other, _) => Err(EvalError::TypeMismatch {
738                    detail: format!(
739                        "array_position() first arg must be an array, got {:?}",
740                        other.data_type()
741                    ),
742                }),
743            }
744        }
745        // v7.11.15 — `substring(s, start)` / `substring(s, start, length)`
746        // for both TEXT and BYTEA. PG semantics: `start` is 1-based;
747        // values ≤ 0 clamp into the string (i.e. effective start is
748        // adjusted so the window still begins at index 1 — but
749        // `length` is reduced by the clipped prefix). A NULL arg
750        // makes the result NULL. Out-of-range windows return an
751        // empty value, not NULL.
752        "substring" => {
753            if !matches!(args.len(), 2 | 3) {
754                return Err(EvalError::TypeMismatch {
755                    detail: format!("substring() takes 2 or 3 args, got {}", args.len()),
756                });
757            }
758            if args.iter().any(|a| matches!(a, Value::Null)) {
759                return Ok(Value::Null);
760            }
761            let start: i64 = match args[1] {
762                Value::Int(n) => i64::from(n),
763                Value::BigInt(n) => n,
764                Value::SmallInt(n) => i64::from(n),
765                _ => {
766                    return Err(EvalError::TypeMismatch {
767                        detail: format!(
768                            "substring() start must be integer, got {:?}",
769                            args[1].data_type()
770                        ),
771                    });
772                }
773            };
774            let length: Option<i64> = if args.len() == 3 {
775                match args[2] {
776                    Value::Int(n) => Some(i64::from(n)),
777                    Value::BigInt(n) => Some(n),
778                    Value::SmallInt(n) => Some(i64::from(n)),
779                    _ => {
780                        return Err(EvalError::TypeMismatch {
781                            detail: format!(
782                                "substring() length must be integer, got {:?}",
783                                args[2].data_type()
784                            ),
785                        });
786                    }
787                }
788            } else {
789                None
790            };
791            // PG: when length is given, end = start + length; if
792            // end < start the result is empty. Clip start to 1.
793            let (effective_start, effective_length): (i64, Option<i64>) = match length {
794                Some(len) => {
795                    let end = start.saturating_add(len);
796                    if end <= 1 || len < 0 {
797                        return Ok(match &args[0] {
798                            Value::Text(_) => Value::Text(String::new()),
799                            Value::Bytes(_) => Value::Bytes(Vec::new()),
800                            other => {
801                                return Err(EvalError::TypeMismatch {
802                                    detail: format!(
803                                        "substring() needs text or bytea, got {:?}",
804                                        other.data_type()
805                                    ),
806                                });
807                            }
808                        });
809                    }
810                    let eff_start = start.max(1);
811                    let eff_len = end - eff_start;
812                    (eff_start, Some(eff_len.max(0)))
813                }
814                None => (start.max(1), None),
815            };
816            match &args[0] {
817                Value::Text(s) => {
818                    // PG counts in characters (codepoints) for TEXT.
819                    let chars: Vec<char> = s.chars().collect();
820                    let skip = (effective_start - 1) as usize;
821                    if skip >= chars.len() {
822                        return Ok(Value::Text(String::new()));
823                    }
824                    let take = match effective_length {
825                        Some(n) => (n as usize).min(chars.len() - skip),
826                        None => chars.len() - skip,
827                    };
828                    Ok(Value::Text(chars[skip..skip + take].iter().collect()))
829                }
830                Value::Bytes(b) => {
831                    let skip = (effective_start - 1) as usize;
832                    if skip >= b.len() {
833                        return Ok(Value::Bytes(Vec::new()));
834                    }
835                    let take = match effective_length {
836                        Some(n) => (n as usize).min(b.len() - skip),
837                        None => b.len() - skip,
838                    };
839                    Ok(Value::Bytes(b[skip..skip + take].to_vec()))
840                }
841                other => Err(EvalError::TypeMismatch {
842                    detail: format!(
843                        "substring() needs text or bytea, got {:?}",
844                        other.data_type()
845                    ),
846                }),
847            }
848        }
849        // v7.11.15 — `position(needle, haystack)`. PG semantics:
850        // 1-based byte/char index of first occurrence, or 0 if
851        // absent. NULL on either operand → NULL. Empty needle
852        // returns 1 (PG convention). Works on TEXT (char positions)
853        // and BYTEA (byte positions). (The PG-spec syntax `position(
854        // needle IN haystack)` is not parsed in v7.11; clients must
855        // call the function-call form.)
856        "position" => {
857            if args.len() != 2 {
858                return Err(EvalError::TypeMismatch {
859                    detail: format!("position() takes 2 args, got {}", args.len()),
860                });
861            }
862            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
863                return Ok(Value::Null);
864            }
865            match (&args[0], &args[1]) {
866                (Value::Text(needle), Value::Text(haystack)) => {
867                    if needle.is_empty() {
868                        return Ok(Value::Int(1));
869                    }
870                    // Char-based position (PG uses character count).
871                    let h_chars: Vec<char> = haystack.chars().collect();
872                    let n_chars: Vec<char> = needle.chars().collect();
873                    if n_chars.len() > h_chars.len() {
874                        return Ok(Value::Int(0));
875                    }
876                    for i in 0..=h_chars.len() - n_chars.len() {
877                        if h_chars[i..i + n_chars.len()] == n_chars[..] {
878                            return Ok(Value::Int(i32::try_from(i + 1).unwrap_or(i32::MAX)));
879                        }
880                    }
881                    Ok(Value::Int(0))
882                }
883                (Value::Bytes(needle), Value::Bytes(haystack)) => {
884                    if needle.is_empty() {
885                        return Ok(Value::Int(1));
886                    }
887                    if needle.len() > haystack.len() {
888                        return Ok(Value::Int(0));
889                    }
890                    for i in 0..=haystack.len() - needle.len() {
891                        if &haystack[i..i + needle.len()] == needle.as_slice() {
892                            return Ok(Value::Int(i32::try_from(i + 1).unwrap_or(i32::MAX)));
893                        }
894                    }
895                    Ok(Value::Int(0))
896                }
897                (a, b) => Err(EvalError::TypeMismatch {
898                    detail: format!(
899                        "position() operands must both be text or both bytea, got {:?} and {:?}",
900                        a.data_type(),
901                        b.data_type()
902                    ),
903                }),
904            }
905        }
906        "upper" => {
907            if args.len() != 1 {
908                return Err(EvalError::TypeMismatch {
909                    detail: format!("upper() takes 1 arg, got {}", args.len()),
910                });
911            }
912            match &args[0] {
913                Value::Null => Ok(Value::Null),
914                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
915                other => Err(EvalError::TypeMismatch {
916                    detail: format!("upper() needs text, got {:?}", other.data_type()),
917                }),
918            }
919        }
920        "lower" => {
921            if args.len() != 1 {
922                return Err(EvalError::TypeMismatch {
923                    detail: format!("lower() takes 1 arg, got {}", args.len()),
924                });
925            }
926            match &args[0] {
927                Value::Null => Ok(Value::Null),
928                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
929                other => Err(EvalError::TypeMismatch {
930                    detail: format!("lower() needs text, got {:?}", other.data_type()),
931                }),
932            }
933        }
934        "abs" => {
935            if args.len() != 1 {
936                return Err(EvalError::TypeMismatch {
937                    detail: format!("abs() takes 1 arg, got {}", args.len()),
938                });
939            }
940            match &args[0] {
941                Value::Null => Ok(Value::Null),
942                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
943                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
944                Value::Float(x) => Ok(Value::Float(x.abs())),
945                other => Err(EvalError::TypeMismatch {
946                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
947                }),
948            }
949        }
950        "coalesce" => {
951            for a in args {
952                if !matches!(a, Value::Null) {
953                    return Ok(a.clone());
954                }
955            }
956            Ok(Value::Null)
957        }
958        "date_trunc" => date_trunc(args),
959        "date_part" => date_part(args),
960        "age" => age(args),
961        "to_char" => to_char(args),
962        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
963        "encode" => encode_text(args),
964        "decode" => decode_text(args),
965        "error_on_null" => error_on_null(args),
966        // v7.12.1 — PG full-text search lexer / tsquery builders.
967        // mailrs G-CRIT-3 acceptance path: `to_tsvector('english',
968        // … || ' ' || … || …)` runs end-to-end against a tsvector
969        // column with Porter stemming + standard english stopwords.
970        "to_tsvector" => fts_to_tsvector(args, ctx),
971        "plainto_tsquery" => fts_plainto_tsquery(args, ctx),
972        "phraseto_tsquery" => fts_phraseto_tsquery(args, ctx),
973        "websearch_to_tsquery" => fts_websearch_to_tsquery(args, ctx),
974        "to_tsquery" => fts_to_tsquery(args, ctx),
975        // v7.12.2 — ranking functions. mailrs's fallback search
976        // query ORDERs BY ts_rank(search_vector, q) DESC.
977        "ts_rank" => fts_ts_rank(args),
978        "ts_rank_cd" => fts_ts_rank_cd(args),
979        // v7.14.0 — PG dump preamble emits
980        // `SELECT pg_catalog.set_config('search_path', '', false);`
981        // and friends. SPG is single-schema; accept-as-no-op
982        // returning either the new value or NULL.
983        "set_config" => Ok(args.get(1).cloned().unwrap_or(Value::Null)),
984        "current_setting" => Ok(Value::Text(String::new())),
985        // PG `pg_catalog.*` discovery / cast helpers commonly
986        // emitted by ORMs probing the server. Accept-as-no-op
987        // with sensible defaults so the dump preamble doesn't
988        // fail. `pg_get_serial_sequence` returns NULL (no
989        // sequence — SPG has AUTO_INCREMENT instead).
990        "pg_get_serial_sequence" | "pg_get_constraintdef" | "pg_get_indexdef" => Ok(Value::Null),
991        "version" => Ok(Value::Text("PostgreSQL 16 (SPG-compat)".into())),
992        // pg_dump emits `nextval('seq')` after creating a
993        // sequence; SPG has no separate sequence object (the
994        // owning column carries AUTO_INCREMENT). Return NULL
995        // (PG would return the sequence value) — the value isn't
996        // used at restore time because the column has its own
997        // implicit BIGSERIAL counter.
998        "nextval" | "currval" | "lastval" => Ok(Value::Null),
999        "setval" => Ok(args.first().cloned().unwrap_or(Value::Null)),
1000        // v7.15.0 — pg_trgm: similarity, show_trgm. Match PG
1001        // semantics: similarity returns Jaccard of trigram sets;
1002        // show_trgm returns the trigram set as TEXT[]. NULL on
1003        // any NULL arg.
1004        "similarity" => {
1005            if args.len() != 2 {
1006                return Err(EvalError::TypeMismatch {
1007                    detail: format!("similarity() takes 2 args, got {}", args.len()),
1008                });
1009            }
1010            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
1011                return Ok(Value::Null);
1012            }
1013            let a = match &args[0] {
1014                Value::Text(s) => s.as_str(),
1015                other => {
1016                    return Err(EvalError::TypeMismatch {
1017                        detail: format!(
1018                            "similarity() needs text, got {:?}",
1019                            other.data_type()
1020                        ),
1021                    });
1022                }
1023            };
1024            let b = match &args[1] {
1025                Value::Text(s) => s.as_str(),
1026                other => {
1027                    return Err(EvalError::TypeMismatch {
1028                        detail: format!(
1029                            "similarity() needs text, got {:?}",
1030                            other.data_type()
1031                        ),
1032                    });
1033                }
1034            };
1035            // PG returns REAL (f32) — we use Float (f64) and let
1036            // coerce_value narrow on assignment to a REAL column.
1037            Ok(Value::Float(spg_storage::trgm::similarity(a, b)))
1038        }
1039        "show_trgm" => {
1040            if args.len() != 1 {
1041                return Err(EvalError::TypeMismatch {
1042                    detail: format!("show_trgm() takes 1 arg, got {}", args.len()),
1043                });
1044            }
1045            if matches!(args[0], Value::Null) {
1046                return Ok(Value::Null);
1047            }
1048            let s = match &args[0] {
1049                Value::Text(s) => s.as_str(),
1050                other => {
1051                    return Err(EvalError::TypeMismatch {
1052                        detail: format!(
1053                            "show_trgm() needs text, got {:?}",
1054                            other.data_type()
1055                        ),
1056                    });
1057                }
1058            };
1059            // PG returns the trigram set sorted lexicographically.
1060            // `extract_trigrams` already returns a BTreeSet so the
1061            // order is canonical.
1062            let trigrams: Vec<Option<String>> = spg_storage::trgm::extract_trigrams(s)
1063                .into_iter()
1064                .map(Some)
1065                .collect();
1066            Ok(Value::TextArray(trigrams))
1067        }
1068        other => Err(EvalError::TypeMismatch {
1069            detail: format!("unknown function `{other}`"),
1070        }),
1071    }
1072}
1073
1074/// v7.12.2 — `ts_rank([weights,] vec, query [, norm])`. v7.12.2
1075/// supports the canonical `(vec, query)` two-arg form mailrs uses;
1076/// optional weight-array / normalisation arguments error with an
1077/// "unsupported" message rather than silently changing semantics.
1078fn fts_ts_rank(args: &[Value]) -> Result<Value, EvalError> {
1079    let (vec, query) = parse_rank_args("ts_rank", args)?;
1080    match (vec, query) {
1081        (None, _) | (_, None) => Ok(Value::Null),
1082        (Some(v), Some(q)) => Ok(Value::Float(f64::from(crate::fts::ts_rank(&v, &q)))),
1083    }
1084}
1085
1086fn fts_ts_rank_cd(args: &[Value]) -> Result<Value, EvalError> {
1087    let (vec, query) = parse_rank_args("ts_rank_cd", args)?;
1088    match (vec, query) {
1089        (None, _) | (_, None) => Ok(Value::Null),
1090        (Some(v), Some(q)) => Ok(Value::Float(f64::from(crate::fts::ts_rank_cd(&v, &q)))),
1091    }
1092}
1093
1094fn parse_rank_args(
1095    name: &str,
1096    args: &[Value],
1097) -> Result<
1098    (
1099        Option<Vec<spg_storage::TsLexeme>>,
1100        Option<spg_storage::TsQueryAst>,
1101    ),
1102    EvalError,
1103> {
1104    if args.len() != 2 {
1105        return Err(EvalError::TypeMismatch {
1106            detail: format!(
1107                "{name}() takes 2 args in v7.12.2 (weights array + normalisation flag are v7.12.x carve-out), got {}",
1108                args.len()
1109            ),
1110        });
1111    }
1112    let vec = match &args[0] {
1113        Value::Null => None,
1114        Value::TsVector(v) => Some(v.clone()),
1115        other => {
1116            return Err(EvalError::TypeMismatch {
1117                detail: format!(
1118                    "{name}() first arg must be tsvector, got {:?}",
1119                    other.data_type()
1120                ),
1121            });
1122        }
1123    };
1124    let query = match &args[1] {
1125        Value::Null => None,
1126        Value::TsQuery(q) => Some(q.clone()),
1127        other => {
1128            return Err(EvalError::TypeMismatch {
1129                detail: format!(
1130                    "{name}() second arg must be tsquery, got {:?}",
1131                    other.data_type()
1132                ),
1133            });
1134        }
1135    };
1136    Ok((vec, query))
1137}
1138
1139/// v7.12.2 — `tsvector @@ tsquery` match operator. Either
1140/// ordering accepted (PG semantics). NULL on either side → NULL.
1141/// Anything that isn't tsvector/tsquery on either side is a type
1142/// mismatch. Returns BOOL.
1143fn ts_match(l: Value, r: Value) -> Result<Value, EvalError> {
1144    let (vec, query) = match (l, r) {
1145        (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
1146        (Value::TsVector(v), Value::TsQuery(q)) => (v, q),
1147        (Value::TsQuery(q), Value::TsVector(v)) => (v, q),
1148        (l, r) => {
1149            return Err(EvalError::TypeMismatch {
1150                detail: format!(
1151                    "@@ requires (tsvector, tsquery), got ({:?}, {:?})",
1152                    l.data_type(),
1153                    r.data_type()
1154                ),
1155            });
1156        }
1157    };
1158    Ok(Value::Bool(crate::fts::ts_query_matches(&vec, &query)))
1159}
1160
1161/// v7.12.1 — `to_tsvector([config,] text)`. With one arg the
1162/// session-resolved `default_text_search_config` is used (defaults
1163/// to `simple` when unset); with two args the first picks the
1164/// config. NULL text → NULL.
1165fn fts_to_tsvector(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1166    let (config, text) = parse_fts_args("to_tsvector", args, ctx)?;
1167    match text {
1168        None => Ok(Value::Null),
1169        Some(t) => Ok(Value::TsVector(crate::fts::to_tsvector(config, &t))),
1170    }
1171}
1172
1173fn fts_plainto_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1174    let (config, text) = parse_fts_args("plainto_tsquery", args, ctx)?;
1175    match text {
1176        None => Ok(Value::Null),
1177        Some(t) => Ok(Value::TsQuery(crate::fts::plainto_tsquery(config, &t))),
1178    }
1179}
1180
1181fn fts_phraseto_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1182    let (config, text) = parse_fts_args("phraseto_tsquery", args, ctx)?;
1183    match text {
1184        None => Ok(Value::Null),
1185        Some(t) => Ok(Value::TsQuery(crate::fts::phraseto_tsquery(config, &t))),
1186    }
1187}
1188
1189fn fts_websearch_to_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1190    let (config, text) = parse_fts_args("websearch_to_tsquery", args, ctx)?;
1191    match text {
1192        None => Ok(Value::Null),
1193        Some(t) => Ok(Value::TsQuery(crate::fts::websearch_to_tsquery(config, &t))),
1194    }
1195}
1196
1197fn fts_to_tsquery(args: &[Value], ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1198    let (config, text) = parse_fts_args("to_tsquery", args, ctx)?;
1199    match text {
1200        None => Ok(Value::Null),
1201        Some(t) => Ok(Value::TsQuery(crate::fts::to_tsquery(config, &t)?)),
1202    }
1203}
1204
1205/// Parse the `(config, text)` / `(text)` argument pair shared by
1206/// all FTS builders. Returns the resolved config + the text
1207/// payload (None when text is NULL). The one-arg form pulls the
1208/// config from the session's `default_text_search_config`.
1209fn parse_fts_args(
1210    name: &str,
1211    args: &[Value],
1212    ctx: &EvalContext<'_>,
1213) -> Result<(crate::fts::TsConfig, Option<String>), EvalError> {
1214    let (config_arg, text_arg) = match args {
1215        [t] => (None, t),
1216        [c, t] => (Some(c), t),
1217        _ => {
1218            return Err(EvalError::TypeMismatch {
1219                detail: format!("{name}() takes 1 or 2 args, got {}", args.len()),
1220            });
1221        }
1222    };
1223    let config = match config_arg {
1224        None => match ctx.default_text_search_config {
1225            Some(name_str) => crate::fts::TsConfig::from_name(name_str).ok_or_else(|| {
1226                EvalError::TypeMismatch {
1227                    detail: format!(
1228                        "text search config not implemented: {name_str:?} (supported: simple, english)"
1229                    ),
1230                }
1231            })?,
1232            None => crate::fts::TsConfig::Simple,
1233        },
1234        Some(Value::Null) => return Ok((crate::fts::TsConfig::Simple, None)),
1235        Some(Value::Text(name_str)) => crate::fts::TsConfig::from_name(name_str).ok_or_else(|| {
1236            EvalError::TypeMismatch {
1237                detail: format!(
1238                    "text search config not implemented: {name_str:?} (supported: simple, english)"
1239                ),
1240            }
1241        })?,
1242        Some(other) => {
1243            return Err(EvalError::TypeMismatch {
1244                detail: format!(
1245                    "{name}() config arg must be text, got {:?}",
1246                    other.data_type()
1247                ),
1248            });
1249        }
1250    };
1251    let text = match text_arg {
1252        Value::Null => None,
1253        Value::Text(s) => Some(s.clone()),
1254        other => {
1255            return Err(EvalError::TypeMismatch {
1256                detail: format!(
1257                    "{name}() text arg must be text, got {:?}",
1258                    other.data_type()
1259                ),
1260            });
1261        }
1262    };
1263    Ok((config, text))
1264}
1265
1266/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
1267/// arguments; SPG's value space treats Text as the byte container
1268/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
1269/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
1270/// hex.
1271fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
1272    if args.len() != 2 {
1273        return Err(EvalError::TypeMismatch {
1274            detail: format!("encode() takes 2 args, got {}", args.len()),
1275        });
1276    }
1277    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
1278        return Ok(Value::Null);
1279    }
1280    let bytes: &[u8] = match &args[0] {
1281        Value::Text(s) => s.as_bytes(),
1282        other => {
1283            return Err(EvalError::TypeMismatch {
1284                detail: format!("encode() expects text bytes, got {:?}", other.data_type()),
1285            });
1286        }
1287    };
1288    let fmt = match &args[1] {
1289        Value::Text(s) => s.to_ascii_lowercase(),
1290        other => {
1291            return Err(EvalError::TypeMismatch {
1292                detail: format!("encode() format must be text, got {:?}", other.data_type()),
1293            });
1294        }
1295    };
1296    let out = match fmt.as_str() {
1297        "base64" => b64_encode(bytes, B64_STD),
1298        "base64url" => b64_encode(bytes, B64_URL),
1299        "base32hex" => b32hex_encode(bytes),
1300        "hex" => hex_encode(bytes),
1301        other => {
1302            return Err(EvalError::TypeMismatch {
1303                detail: format!("encode(): unknown format `{other}`"),
1304            });
1305        }
1306    };
1307    Ok(Value::Text(out))
1308}
1309
1310/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
1311/// Text containing the raw decoded bytes (caller may CAST to bytea
1312/// equivalent if SPG adds bytea later).
1313fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
1314    if args.len() != 2 {
1315        return Err(EvalError::TypeMismatch {
1316            detail: format!("decode() takes 2 args, got {}", args.len()),
1317        });
1318    }
1319    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
1320        return Ok(Value::Null);
1321    }
1322    let text = match &args[0] {
1323        Value::Text(s) => s.as_str(),
1324        other => {
1325            return Err(EvalError::TypeMismatch {
1326                detail: format!("decode() expects text, got {:?}", other.data_type()),
1327            });
1328        }
1329    };
1330    let fmt = match &args[1] {
1331        Value::Text(s) => s.to_ascii_lowercase(),
1332        other => {
1333            return Err(EvalError::TypeMismatch {
1334                detail: format!("decode() format must be text, got {:?}", other.data_type()),
1335            });
1336        }
1337    };
1338    let bytes = match fmt.as_str() {
1339        "base64" => b64_decode(text, B64_STD)?,
1340        "base64url" => b64_decode(text, B64_URL)?,
1341        "base32hex" => b32hex_decode(text)?,
1342        "hex" => hex_decode(text)?,
1343        other => {
1344            return Err(EvalError::TypeMismatch {
1345                detail: format!("decode(): unknown format `{other}`"),
1346            });
1347        }
1348    };
1349    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
1350        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
1351    })?;
1352    Ok(Value::Text(s))
1353}
1354
1355/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
1356/// errors otherwise. Convenience to assert NOT NULL inside an
1357/// expression without wrapping it in COALESCE + raise hacks.
1358fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
1359    if args.len() != 1 {
1360        return Err(EvalError::TypeMismatch {
1361            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
1362        });
1363    }
1364    if matches!(args[0], Value::Null) {
1365        return Err(EvalError::TypeMismatch {
1366            detail: "error_on_null(): argument is NULL".into(),
1367        });
1368    }
1369    Ok(args[0].clone())
1370}
1371
1372// ── byte-level encoders ───────────────────────────────────────────
1373
1374const B64_STD: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1375const B64_URL: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
1376const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
1377
1378fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
1379    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1380    let mut i = 0;
1381    while i + 3 <= bytes.len() {
1382        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
1383        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1384        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1385        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1386        out.push(alpha[(n & 0x3f) as usize] as char);
1387        i += 3;
1388    }
1389    let rem = bytes.len() - i;
1390    if rem == 1 {
1391        let n = (bytes[i] as u32) << 16;
1392        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1393        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1394        out.push('=');
1395        out.push('=');
1396    } else if rem == 2 {
1397        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
1398        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1399        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1400        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1401        out.push('=');
1402    }
1403    out
1404}
1405
1406fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
1407    let mut lookup = [255u8; 256];
1408    for (i, &c) in alpha.iter().enumerate() {
1409        lookup[c as usize] = i as u8;
1410    }
1411    let mut out = Vec::with_capacity(text.len() * 3 / 4);
1412    let mut buf: u32 = 0;
1413    let mut bits: u32 = 0;
1414    for c in text.bytes() {
1415        if c == b'=' {
1416            break;
1417        }
1418        if c == b'\n' || c == b'\r' || c == b' ' {
1419            continue;
1420        }
1421        let v = lookup[c as usize];
1422        if v == 255 {
1423            return Err(EvalError::TypeMismatch {
1424                detail: format!("decode(base64): invalid char {:?}", c as char),
1425            });
1426        }
1427        buf = (buf << 6) | v as u32;
1428        bits += 6;
1429        if bits >= 8 {
1430            bits -= 8;
1431            out.push(((buf >> bits) & 0xff) as u8);
1432        }
1433    }
1434    Ok(out)
1435}
1436
1437fn b32hex_encode(bytes: &[u8]) -> String {
1438    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
1439    let mut buf: u64 = 0;
1440    let mut bits: u32 = 0;
1441    for &b in bytes {
1442        buf = (buf << 8) | b as u64;
1443        bits += 8;
1444        while bits >= 5 {
1445            bits -= 5;
1446            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
1447        }
1448    }
1449    if bits > 0 {
1450        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
1451    }
1452    // Pad to multiple of 8.
1453    while out.len() % 8 != 0 {
1454        out.push('=');
1455    }
1456    out
1457}
1458
1459fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1460    let mut lookup = [255u8; 256];
1461    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
1462        lookup[c as usize] = i as u8;
1463        // base32hex is case-insensitive — also map lowercase.
1464        let lower = (c as char).to_ascii_lowercase() as u8;
1465        lookup[lower as usize] = i as u8;
1466    }
1467    let mut out = Vec::with_capacity(text.len() * 5 / 8);
1468    let mut buf: u64 = 0;
1469    let mut bits: u32 = 0;
1470    for c in text.bytes() {
1471        if c == b'=' {
1472            break;
1473        }
1474        if c == b'\n' || c == b'\r' || c == b' ' {
1475            continue;
1476        }
1477        let v = lookup[c as usize];
1478        if v == 255 {
1479            return Err(EvalError::TypeMismatch {
1480                detail: format!("decode(base32hex): invalid char {:?}", c as char),
1481            });
1482        }
1483        buf = (buf << 5) | v as u64;
1484        bits += 5;
1485        if bits >= 8 {
1486            bits -= 8;
1487            out.push(((buf >> bits) & 0xff) as u8);
1488        }
1489    }
1490    Ok(out)
1491}
1492
1493fn hex_encode(bytes: &[u8]) -> String {
1494    const HEX: &[u8; 16] = b"0123456789abcdef";
1495    let mut out = String::with_capacity(bytes.len() * 2);
1496    for &b in bytes {
1497        out.push(HEX[(b >> 4) as usize] as char);
1498        out.push(HEX[(b & 0xf) as usize] as char);
1499    }
1500    out
1501}
1502
1503fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1504    let trimmed = text.trim();
1505    if trimmed.len() % 2 != 0 {
1506        return Err(EvalError::TypeMismatch {
1507            detail: "decode(hex): input length must be even".into(),
1508        });
1509    }
1510    let mut out = Vec::with_capacity(trimmed.len() / 2);
1511    let mut hi: u8 = 0;
1512    for (i, c) in trimmed.bytes().enumerate() {
1513        let v = match c {
1514            b'0'..=b'9' => c - b'0',
1515            b'a'..=b'f' => c - b'a' + 10,
1516            b'A'..=b'F' => c - b'A' + 10,
1517            _ => {
1518                return Err(EvalError::TypeMismatch {
1519                    detail: format!("decode(hex): invalid char {:?}", c as char),
1520                });
1521            }
1522        };
1523        if i % 2 == 0 {
1524            hi = v;
1525        } else {
1526            out.push((hi << 4) | v);
1527        }
1528    }
1529    Ok(out)
1530}
1531
1532/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
1533/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
1534/// same `BigInt` return shape; PG returns double precision but we keep the
1535/// integer convention so the runner's `query I` shape works unchanged.
1536fn date_part(args: &[Value]) -> Result<Value, EvalError> {
1537    use spg_sql::ast::ExtractField as F;
1538    if args.len() != 2 {
1539        return Err(EvalError::TypeMismatch {
1540            detail: format!("date_part() takes 2 args, got {}", args.len()),
1541        });
1542    }
1543    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1544        return Ok(Value::Null);
1545    }
1546    let Value::Text(field_name) = &args[0] else {
1547        return Err(EvalError::TypeMismatch {
1548            detail: format!(
1549                "date_part() needs a text field, got {:?}",
1550                args[0].data_type()
1551            ),
1552        });
1553    };
1554    let field = match field_name.to_ascii_lowercase().as_str() {
1555        "year" => F::Year,
1556        "month" => F::Month,
1557        "day" => F::Day,
1558        "hour" => F::Hour,
1559        "minute" => F::Minute,
1560        "second" => F::Second,
1561        "microsecond" | "microseconds" => F::Microsecond,
1562        other => {
1563            return Err(EvalError::TypeMismatch {
1564                detail: format!(
1565                    "unknown date_part field {other:?}; \
1566                     supported: year, month, day, hour, minute, second, microsecond"
1567                ),
1568            });
1569        }
1570    };
1571    extract_field(field, &args[1])
1572}
1573
1574/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
1575/// micros-only interval (no months normalisation) because PG's
1576/// month-justification rule is sensitive to the day-of-month walk and
1577/// adds material complexity for marginal corpus value.
1578///
1579/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
1580/// the dispatcher errors instead of guessing a clock source. Callers
1581/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
1582/// explicitly so the clock reference is visible at the SQL layer.
1583fn age(args: &[Value]) -> Result<Value, EvalError> {
1584    if args.is_empty() || args.len() > 2 {
1585        return Err(EvalError::TypeMismatch {
1586            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
1587        });
1588    }
1589    if args.iter().any(|v| matches!(v, Value::Null)) {
1590        return Ok(Value::Null);
1591    }
1592    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
1593    // stays as-is; anything else errors.
1594    let to_micros = |v: &Value| -> Result<i64, EvalError> {
1595        match v {
1596            Value::Timestamp(t) => Ok(*t),
1597            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
1598            other => Err(EvalError::TypeMismatch {
1599                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
1600            }),
1601        }
1602    };
1603    if args.len() == 1 {
1604        return Err(EvalError::TypeMismatch {
1605            detail: "single-arg age() is unsupported in v2.12 \
1606                     (use age(CURRENT_DATE, t) explicitly)"
1607                .into(),
1608        });
1609    }
1610    let a = to_micros(&args[0])?;
1611    let b = to_micros(&args[1])?;
1612    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
1613        detail: "age() subtraction overflows i64 microseconds".into(),
1614    })?;
1615    Ok(Value::Interval {
1616        months: 0,
1617        micros: delta,
1618    })
1619}
1620
1621/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
1622/// format template. Supports the high-traffic placeholders:
1623///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
1624/// Unrecognised characters pass through literally so the template's
1625/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
1626fn to_char(args: &[Value]) -> Result<Value, EvalError> {
1627    use core::fmt::Write as _;
1628    if args.len() != 2 {
1629        return Err(EvalError::TypeMismatch {
1630            detail: format!("to_char() takes 2 args, got {}", args.len()),
1631        });
1632    }
1633    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1634        return Ok(Value::Null);
1635    }
1636    let Value::Text(fmt) = &args[1] else {
1637        return Err(EvalError::TypeMismatch {
1638            detail: format!(
1639                "to_char() needs a text format, got {:?}",
1640                args[1].data_type()
1641            ),
1642        });
1643    };
1644    let (days, day_micros) = match &args[0] {
1645        Value::Date(d) => (*d, 0_i64),
1646        Value::Timestamp(t) => {
1647            let days = t.div_euclid(86_400_000_000);
1648            (
1649                i32::try_from(days).unwrap_or(i32::MAX),
1650                t.rem_euclid(86_400_000_000),
1651            )
1652        }
1653        other => {
1654            return Err(EvalError::TypeMismatch {
1655                detail: format!(
1656                    "to_char() needs DATE or TIMESTAMP, got {:?}",
1657                    other.data_type()
1658                ),
1659            });
1660        }
1661    };
1662    let (y, mo, d) = civil_from_days(days);
1663    let secs = day_micros / 1_000_000;
1664    let frac = day_micros % 1_000_000;
1665    // div_euclid keeps every value non-negative — the casts below are
1666    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
1667    // 1_000_000)`, so all three quantities fit in u32.
1668    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
1669    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
1670    let ss = u32::try_from(secs % 60).unwrap_or(0);
1671    let hh12 = match hh24 % 12 {
1672        0 => 12,
1673        x => x,
1674    };
1675    let ampm = if hh24 < 12 { "AM" } else { "PM" };
1676    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
1677    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
1678
1679    let mut out = String::with_capacity(fmt.len() + 8);
1680    let bytes = fmt.as_bytes();
1681    let mut i = 0;
1682    // write! against a String never fails — discard the Result.
1683    while i < bytes.len() {
1684        // Try the longest prefixes first so "YYYY" wins over "YY".
1685        let rest = &bytes[i..];
1686        if rest.starts_with(b"YYYY") {
1687            let _ = write!(out, "{y:04}");
1688            i += 4;
1689        } else if rest.starts_with(b"YY") {
1690            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
1691            let yy = (y.rem_euclid(100)) as u32;
1692            let _ = write!(out, "{yy:02}");
1693            i += 2;
1694        } else if rest.starts_with(b"Month") {
1695            out.push_str(MONTH_FULL[(mo - 1) as usize]);
1696            i += 5;
1697        } else if rest.starts_with(b"Mon") {
1698            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
1699            i += 3;
1700        } else if rest.starts_with(b"MM") {
1701            let _ = write!(out, "{mo:02}");
1702            i += 2;
1703        } else if rest.starts_with(b"DD") {
1704            let _ = write!(out, "{d:02}");
1705            i += 2;
1706        } else if rest.starts_with(b"HH24") {
1707            let _ = write!(out, "{hh24:02}");
1708            i += 4;
1709        } else if rest.starts_with(b"HH12") {
1710            let _ = write!(out, "{hh12:02}");
1711            i += 4;
1712        } else if rest.starts_with(b"MI") {
1713            let _ = write!(out, "{mi:02}");
1714            i += 2;
1715        } else if rest.starts_with(b"SS") {
1716            let _ = write!(out, "{ss:02}");
1717            i += 2;
1718        } else if rest.starts_with(b"MS") {
1719            let _ = write!(out, "{ms:03}");
1720            i += 2;
1721        } else if rest.starts_with(b"US") {
1722            let _ = write!(out, "{us:06}");
1723            i += 2;
1724        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
1725            out.push_str(ampm);
1726            i += 2;
1727        } else {
1728            // Pass any non-placeholder byte through verbatim.
1729            out.push(bytes[i] as char);
1730            i += 1;
1731        }
1732    }
1733    Ok(Value::Text(out))
1734}
1735
1736const MONTH_FULL: [&str; 12] = [
1737    "January",
1738    "February",
1739    "March",
1740    "April",
1741    "May",
1742    "June",
1743    "July",
1744    "August",
1745    "September",
1746    "October",
1747    "November",
1748    "December",
1749];
1750const MONTH_ABBR: [&str; 12] = [
1751    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1752];
1753
1754/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
1755/// requested calendar boundary (year / month / day / hour / minute /
1756/// second). Returns the truncated `TIMESTAMP`. NULL on either side
1757/// propagates to NULL.
1758fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
1759    if args.len() != 2 {
1760        return Err(EvalError::TypeMismatch {
1761            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
1762        });
1763    }
1764    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1765        return Ok(Value::Null);
1766    }
1767    let Value::Text(unit) = &args[0] else {
1768        return Err(EvalError::TypeMismatch {
1769            detail: format!(
1770                "date_trunc() needs a text unit, got {:?}",
1771                args[0].data_type()
1772            ),
1773        });
1774    };
1775    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
1776    // midnight first; the result is always TIMESTAMP.
1777    let micros = match &args[1] {
1778        Value::Timestamp(t) => *t,
1779        Value::Date(d) => i64::from(*d) * 86_400_000_000,
1780        other => {
1781            return Err(EvalError::TypeMismatch {
1782                detail: format!(
1783                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
1784                    other.data_type()
1785                ),
1786            });
1787        }
1788    };
1789    let unit_lc = unit.to_ascii_lowercase();
1790    let days = micros.div_euclid(86_400_000_000);
1791    let day_micros = micros.rem_euclid(86_400_000_000);
1792    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1793    let (y, m, _) = civil_from_days(day_i32);
1794    let truncated = match unit_lc.as_str() {
1795        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
1796        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
1797        "day" => days * 86_400_000_000,
1798        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
1799        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
1800        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
1801        other => {
1802            return Err(EvalError::TypeMismatch {
1803                detail: format!(
1804                    "unknown date_trunc unit {other:?}; \
1805                     supported: year, month, day, hour, minute, second"
1806                ),
1807            });
1808        }
1809    };
1810    Ok(Value::Timestamp(truncated))
1811}
1812
1813/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
1814pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
1815    if matches!(v, Value::Null) {
1816        return Ok(Value::Null);
1817    }
1818    match target {
1819        CastTarget::Vector => cast_to_vector(v),
1820        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
1821        CastTarget::Int => cast_numeric_to_int(v),
1822        CastTarget::BigInt => cast_numeric_to_bigint(v),
1823        CastTarget::Float => cast_numeric_to_float(v),
1824        CastTarget::Bool => cast_to_bool(v),
1825        CastTarget::Date => cast_to_date(v),
1826        // TIMESTAMP and TIMESTAMPTZ have identical runtime
1827        // representation (i64 microseconds UTC).
1828        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
1829        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
1830        // is supported (the mailrs idiom: `$1::INTERVAL` where the
1831        // bound param is a string like `'7 days'`).
1832        CastTarget::Interval => cast_to_interval(v),
1833        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
1834        // (validation is the producer's responsibility, same as
1835        // the column-INSERT path).
1836        CastTarget::Json | CastTarget::Jsonb => match v {
1837            Value::Json(s) => Ok(Value::Json(s)),
1838            Value::Text(s) => Ok(Value::Json(s)),
1839            other => Err(EvalError::TypeMismatch {
1840                detail: alloc::format!(
1841                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
1842                    other.data_type()
1843                ),
1844            }),
1845        },
1846        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1847        // pg_catalog; surface a clear error.
1848        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1849            detail: "::regtype / ::regclass not supported on SPG \
1850                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1851                .into(),
1852        }),
1853        // v7.10.11 — `::TEXT[]`. Decode PG external array form
1854        // when input is Text; pass through unchanged when it is
1855        // already TextArray. Anything else is a type mismatch.
1856        CastTarget::TextArray => match v {
1857            Value::TextArray(items) => Ok(Value::TextArray(items)),
1858            Value::Text(s) => decode_text_array_external(&s).map(Value::TextArray),
1859            other => Err(EvalError::TypeMismatch {
1860                detail: alloc::format!(
1861                    "::TEXT[] only accepts TEXT / TEXT[] inputs, got {:?}",
1862                    other.data_type()
1863                ),
1864            }),
1865        },
1866        // v7.11.13 — `::INT[]` / `::BIGINT[]`. Decode PG external
1867        // form `{1,2,3}` when input is Text; widen TextArray /
1868        // IntArray as appropriate.
1869        CastTarget::IntArray => cast_to_int_array(v),
1870        CastTarget::BigIntArray => cast_to_bigint_array(v),
1871        // v7.12.0 — `::tsvector` / `::tsquery`. Decodes PG external
1872        // form when input is Text; passes through unchanged when the
1873        // input is already the target type. Other inputs are a type
1874        // mismatch. Lexer / Porter stemmer arrive in v7.12.1; the
1875        // external-form cast at v7.12.0 is the path pg_dump and
1876        // direct-literal callers use.
1877        CastTarget::TsVector => match v {
1878            Value::TsVector(items) => Ok(Value::TsVector(items)),
1879            Value::Text(s) => decode_tsvector_external(&s).map(Value::TsVector),
1880            other => Err(EvalError::TypeMismatch {
1881                detail: alloc::format!(
1882                    "::tsvector only accepts TEXT / tsvector inputs, got {:?}",
1883                    other.data_type()
1884                ),
1885            }),
1886        },
1887        CastTarget::TsQuery => match v {
1888            Value::TsQuery(ast) => Ok(Value::TsQuery(ast)),
1889            Value::Text(s) => decode_tsquery_external(&s).map(Value::TsQuery),
1890            other => Err(EvalError::TypeMismatch {
1891                detail: alloc::format!(
1892                    "::tsquery only accepts TEXT / tsquery inputs, got {:?}",
1893                    other.data_type()
1894                ),
1895            }),
1896        },
1897    }
1898}
1899
1900fn cast_to_int_array(v: Value) -> Result<Value, EvalError> {
1901    match v {
1902        Value::IntArray(items) => Ok(Value::IntArray(items)),
1903        Value::BigIntArray(items) => {
1904            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1905            for item in items {
1906                match item {
1907                    None => out.push(None),
1908                    Some(n) => match i32::try_from(n) {
1909                        Ok(x) => out.push(Some(x)),
1910                        Err(_) => {
1911                            return Err(EvalError::TypeMismatch {
1912                                detail: alloc::format!("::INT[] element {n} overflows i32"),
1913                            });
1914                        }
1915                    },
1916                }
1917            }
1918            Ok(Value::IntArray(out))
1919        }
1920        Value::Text(s) => decode_int_array_external(&s).map(Value::IntArray),
1921        Value::TextArray(items) => {
1922            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1923            for item in items {
1924                match item {
1925                    None => out.push(None),
1926                    Some(s) => match s.parse::<i32>() {
1927                        Ok(n) => out.push(Some(n)),
1928                        Err(_) => {
1929                            return Err(EvalError::TypeMismatch {
1930                                detail: alloc::format!("::INT[] cannot parse {s:?}"),
1931                            });
1932                        }
1933                    },
1934                }
1935            }
1936            Ok(Value::IntArray(out))
1937        }
1938        other => Err(EvalError::TypeMismatch {
1939            detail: alloc::format!("::INT[] does not accept {:?}", other.data_type()),
1940        }),
1941    }
1942}
1943
1944fn cast_to_bigint_array(v: Value) -> Result<Value, EvalError> {
1945    match v {
1946        Value::BigIntArray(items) => Ok(Value::BigIntArray(items)),
1947        Value::IntArray(items) => Ok(Value::BigIntArray(
1948            items.into_iter().map(|x| x.map(i64::from)).collect(),
1949        )),
1950        Value::Text(s) => decode_bigint_array_external(&s).map(Value::BigIntArray),
1951        Value::TextArray(items) => {
1952            let mut out: Vec<Option<i64>> = Vec::with_capacity(items.len());
1953            for item in items {
1954                match item {
1955                    None => out.push(None),
1956                    Some(s) => match s.parse::<i64>() {
1957                        Ok(n) => out.push(Some(n)),
1958                        Err(_) => {
1959                            return Err(EvalError::TypeMismatch {
1960                                detail: alloc::format!("::BIGINT[] cannot parse {s:?}"),
1961                            });
1962                        }
1963                    },
1964                }
1965            }
1966            Ok(Value::BigIntArray(out))
1967        }
1968        other => Err(EvalError::TypeMismatch {
1969            detail: alloc::format!("::BIGINT[] does not accept {:?}", other.data_type()),
1970        }),
1971    }
1972}
1973
1974fn decode_int_array_external(s: &str) -> Result<Vec<Option<i32>>, EvalError> {
1975    let trimmed = s.trim();
1976    let inner = trimmed
1977        .strip_prefix('{')
1978        .and_then(|x| x.strip_suffix('}'))
1979        .ok_or_else(|| EvalError::TypeMismatch {
1980            detail: alloc::format!("INT[] literal {s:?} must be enclosed in '{{...}}'"),
1981        })?;
1982    if inner.trim().is_empty() {
1983        return Ok(Vec::new());
1984    }
1985    inner
1986        .split(',')
1987        .map(|part| {
1988            let p = part.trim();
1989            if p.eq_ignore_ascii_case("NULL") {
1990                Ok(None)
1991            } else {
1992                p.parse::<i32>()
1993                    .map(Some)
1994                    .map_err(|_| EvalError::TypeMismatch {
1995                        detail: alloc::format!("INT[] element {p:?} is not an i32"),
1996                    })
1997            }
1998        })
1999        .collect()
2000}
2001
2002fn decode_bigint_array_external(s: &str) -> Result<Vec<Option<i64>>, EvalError> {
2003    let trimmed = s.trim();
2004    let inner = trimmed
2005        .strip_prefix('{')
2006        .and_then(|x| x.strip_suffix('}'))
2007        .ok_or_else(|| EvalError::TypeMismatch {
2008            detail: alloc::format!("BIGINT[] literal {s:?} must be enclosed in '{{...}}'"),
2009        })?;
2010    if inner.trim().is_empty() {
2011        return Ok(Vec::new());
2012    }
2013    inner
2014        .split(',')
2015        .map(|part| {
2016            let p = part.trim();
2017            if p.eq_ignore_ascii_case("NULL") {
2018                Ok(None)
2019            } else {
2020                p.parse::<i64>()
2021                    .map(Some)
2022                    .map_err(|_| EvalError::TypeMismatch {
2023                        detail: alloc::format!("BIGINT[] element {p:?} is not an i64"),
2024                    })
2025            }
2026        })
2027        .collect()
2028}
2029
2030/// v7.10.11 — same decoder as `decode_text_array_literal` in
2031/// `lib.rs`, but lives here so the eval-time cast path stays
2032/// inside `spg-engine::eval`. Kept in lock-step with the engine
2033/// `coerce_value` decoder by tests.
2034fn decode_text_array_external(s: &str) -> Result<Vec<Option<String>>, EvalError> {
2035    let trimmed = s.trim();
2036    let inner = trimmed
2037        .strip_prefix('{')
2038        .and_then(|x| x.strip_suffix('}'))
2039        .ok_or_else(|| EvalError::TypeMismatch {
2040            detail: alloc::format!("TEXT[] literal {s:?} must be enclosed in '{{...}}'"),
2041        })?;
2042    let mut out: Vec<Option<String>> = Vec::new();
2043    if inner.trim().is_empty() {
2044        return Ok(out);
2045    }
2046    let bytes = inner.as_bytes();
2047    let mut i = 0;
2048    while i <= bytes.len() {
2049        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
2050            i += 1;
2051        }
2052        if i < bytes.len() && bytes[i] == b'"' {
2053            i += 1;
2054            let mut buf = String::new();
2055            while i < bytes.len() && bytes[i] != b'"' {
2056                if bytes[i] == b'\\' && i + 1 < bytes.len() {
2057                    buf.push(bytes[i + 1] as char);
2058                    i += 2;
2059                } else {
2060                    buf.push(bytes[i] as char);
2061                    i += 1;
2062                }
2063            }
2064            if i >= bytes.len() {
2065                return Err(EvalError::TypeMismatch {
2066                    detail: "unterminated quoted element in TEXT[] literal".into(),
2067                });
2068            }
2069            i += 1;
2070            out.push(Some(buf));
2071        } else {
2072            let start = i;
2073            while i < bytes.len() && bytes[i] != b',' {
2074                i += 1;
2075            }
2076            let raw = inner[start..i].trim();
2077            if raw.eq_ignore_ascii_case("NULL") {
2078                out.push(None);
2079            } else {
2080                out.push(Some(raw.to_string()));
2081            }
2082        }
2083        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
2084            i += 1;
2085        }
2086        if i >= bytes.len() {
2087            break;
2088        }
2089        if bytes[i] != b',' {
2090            return Err(EvalError::TypeMismatch {
2091                detail: "expected ',' between TEXT[] elements".into(),
2092            });
2093        }
2094        i += 1;
2095    }
2096    Ok(out)
2097}
2098
2099fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
2100    match v {
2101        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
2102        Value::Text(s) => {
2103            let (months, micros) = spg_sql::parser::parse_interval_text(&s).ok_or_else(|| {
2104                EvalError::TypeMismatch {
2105                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
2106                }
2107            })?;
2108            Ok(Value::Interval { months, micros })
2109        }
2110        other => Err(EvalError::TypeMismatch {
2111            detail: alloc::format!(
2112                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
2113                other.data_type()
2114            ),
2115        }),
2116    }
2117}
2118
2119fn cast_to_date(v: Value) -> Result<Value, EvalError> {
2120    match v {
2121        Value::Date(d) => Ok(Value::Date(d)),
2122        // Integer literals carry days since the Unix epoch — used by
2123        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
2124        Value::Int(n) => Ok(Value::Date(n)),
2125        Value::BigInt(n) => {
2126            i32::try_from(n)
2127                .map(Value::Date)
2128                .map_err(|_| EvalError::TypeMismatch {
2129                    detail: "bigint days-since-epoch out of DATE range".into(),
2130                })
2131        }
2132        // Timestamp truncates to its day boundary.
2133        Value::Timestamp(t) => {
2134            let days = t.div_euclid(86_400_000_000);
2135            i32::try_from(days)
2136                .map(Value::Date)
2137                .map_err(|_| EvalError::TypeMismatch {
2138                    detail: "timestamp out of DATE range".into(),
2139                })
2140        }
2141        Value::Text(s) => parse_date_literal(&s)
2142            .map(Value::Date)
2143            .ok_or(EvalError::TypeMismatch {
2144                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
2145            }),
2146        other => Err(EvalError::TypeMismatch {
2147            detail: format!("cannot cast {:?} to DATE", other.data_type()),
2148        }),
2149    }
2150}
2151
2152fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
2153    match v {
2154        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
2155        // Int / BigInt carry microseconds since the Unix epoch — used
2156        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
2157        // the wall clock as a plain integer literal.
2158        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
2159        Value::BigInt(n) => Ok(Value::Timestamp(n)),
2160        // DATE → TIMESTAMP picks midnight on the date.
2161        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
2162        Value::Text(s) => {
2163            parse_timestamp_literal(&s)
2164                .map(Value::Timestamp)
2165                .ok_or(EvalError::TypeMismatch {
2166                    detail: format!(
2167                        "cannot parse {s:?} as TIMESTAMP \
2168                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
2169                    ),
2170                })
2171        }
2172        other => Err(EvalError::TypeMismatch {
2173            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
2174        }),
2175    }
2176}
2177
2178fn value_to_text(v: &Value) -> String {
2179    match v {
2180        // v7.5.0 — Value is #[non_exhaustive]; any future variant
2181        // without explicit text rendering hits the Debug fallback
2182        // at the end.
2183        Value::SmallInt(n) => format!("{n}"),
2184        Value::Int(n) => format!("{n}"),
2185        Value::BigInt(n) => format!("{n}"),
2186        Value::Float(x) => format!("{x}"),
2187        // v4.9: JSON renders identically to Text — both are raw UTF-8.
2188        Value::Text(s) | Value::Json(s) => s.clone(),
2189        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
2190        Value::Vector(v) => {
2191            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
2192            format!("[{}]", cells.join(", "))
2193        }
2194        // v6.0.1: render SQ8 cells dequantised, so SELECT output
2195        // matches the pgvector wire shape clients expect. The
2196        // recall envelope already absorbs the ≤ (max-min)/255/2
2197        // dequantisation error.
2198        Value::Sq8Vector(q) => {
2199            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
2200                .iter()
2201                .map(|x| format!("{x}"))
2202                .collect();
2203            format!("[{}]", cells.join(", "))
2204        }
2205        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
2206        // for SELECT output.
2207        Value::HalfVector(h) => {
2208            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
2209            format!("[{}]", cells.join(", "))
2210        }
2211        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
2212        Value::Date(d) => format_date(*d),
2213        Value::Timestamp(t) => format_timestamp(*t),
2214        Value::Interval { months, micros } => format_interval(*months, *micros),
2215        Value::Null => "NULL".into(),
2216        // v7.10.4 — BYTEA renders as PG hex form.
2217        Value::Bytes(b) => format_bytea_hex(b),
2218        // v7.10.9 — TEXT[] / INT[] / BIGINT[] render PG external form.
2219        Value::TextArray(items) => format_text_array(items),
2220        Value::IntArray(items) => format_int_array(items),
2221        Value::BigIntArray(items) => format_bigint_array(items),
2222        // v7.12.0 — tsvector / tsquery render PG external form.
2223        Value::TsVector(lexs) => format_tsvector(lexs),
2224        Value::TsQuery(ast) => format_tsquery(ast),
2225        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
2226        _ => format!("{v:?}"),
2227    }
2228}
2229
2230/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
2231/// for pre-1970 dates render with a leading `-` on the year.
2232pub fn format_date(days: i32) -> String {
2233    let (y, m, d) = civil_from_days(days);
2234    format!("{y:04}-{m:02}-{d:02}")
2235}
2236
2237/// Render a `Timestamp` (microseconds since epoch) as
2238/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
2239/// dropped; a whole-second value has no fractional part.
2240/// v7.15.0 — PG-canonical TIMESTAMPTZ wire format. Storage is
2241/// the same i64 microseconds UTC as TIMESTAMP, but the canonical
2242/// PG text output appends the session's UTC-offset suffix (`+00`
2243/// for the default UTC session, the form pg_dump emits). Mailrs
2244/// round-8 acceptance criterion: `SELECT col FROM tstz` should
2245/// round-trip to a literal that re-INSERTs without semantic
2246/// drift.
2247pub fn format_timestamptz(micros: i64) -> String {
2248    let base = format_timestamp(micros);
2249    let mut s = String::with_capacity(base.len() + 3);
2250    s.push_str(&base);
2251    s.push_str("+00");
2252    s
2253}
2254
2255pub fn format_timestamp(micros: i64) -> String {
2256    const MICROS_PER_DAY: i64 = 86_400_000_000;
2257    // Split into day + intra-day part with proper floor division so
2258    // negative timestamps render right too.
2259    let days = micros.div_euclid(MICROS_PER_DAY);
2260    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
2261    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
2262    let (y, m, d) = civil_from_days(day_i32);
2263    let secs = day_micros / 1_000_000;
2264    let frac = day_micros % 1_000_000;
2265    let hh = secs / 3600;
2266    let mm = (secs / 60) % 60;
2267    let ss = secs % 60;
2268    if frac == 0 {
2269        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
2270    } else {
2271        // Strip trailing zeros from the 6-digit fractional component.
2272        let raw = format!("{frac:06}");
2273        let trimmed = raw.trim_end_matches('0');
2274        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
2275    }
2276}
2277
2278/// Howard Hinnant's `civil_from_days` — converts days since the Unix
2279/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
2280/// directions of this calendar conversion live in `eval.rs` so the
2281/// engine never reaches for `std` time facilities.
2282#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2283fn civil_from_days(days: i32) -> (i32, u32, u32) {
2284    let z = i64::from(days) + 719_468;
2285    let era = z.div_euclid(146_097);
2286    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
2287    // every other quantity below — `as u32` truncations are safe by
2288    // construction.
2289    let doe = (z - era * 146_097) as u32;
2290    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
2291    let y_base = i64::from(yoe) + era * 400;
2292    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
2293    let mp = (5 * doy + 2) / 153;
2294    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
2295    let m = if mp < 10 { mp + 3 } else { mp - 9 };
2296    let y = if m <= 2 { y_base + 1 } else { y_base };
2297    (y as i32, m, d)
2298}
2299
2300/// Inverse of `civil_from_days` — converts (year, month, day) to days
2301/// since 1970-01-01. Out-of-range months / days saturate.
2302#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2303pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
2304    let y_adj = if m <= 2 {
2305        i64::from(y) - 1
2306    } else {
2307        i64::from(y)
2308    };
2309    let era = y_adj.div_euclid(400);
2310    let yoe = (y_adj - era * 400) as u32;
2311    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
2312    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2313    let total = era * 146_097 + i64::from(doe) - 719_468;
2314    i32::try_from(total).unwrap_or(i32::MAX)
2315}
2316
2317/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
2318/// `None` on shape / numeric failure; the engine surfaces that as a
2319/// `TypeMismatch` with the original text included.
2320pub fn parse_date_literal(s: &str) -> Option<i32> {
2321    let bytes = s.as_bytes();
2322    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
2323        return None;
2324    }
2325    let y: i32 = s[0..4].parse().ok()?;
2326    let m: u32 = s[5..7].parse().ok()?;
2327    let d: u32 = s[8..10].parse().ok()?;
2328    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
2329        return None;
2330    }
2331    Some(days_from_civil(y, m, d))
2332}
2333
2334/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
2335/// (microseconds since Unix epoch). The time portion is optional;
2336/// missing → midnight. The fractional portion accepts 1–6 digits and
2337/// pads with zeros to microseconds.
2338pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
2339    let trimmed = s.trim();
2340    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
2341        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
2342        None => (trimmed, None),
2343    };
2344    let days = parse_date_literal(date_part)?;
2345    let (day_micros, tz_offset_micros) = match time_part {
2346        None => (0, 0),
2347        Some(t) => parse_time_of_day_micros(t)?,
2348    };
2349    // PG semantics: a TIMESTAMPTZ literal with an explicit offset
2350    // is normalised to UTC for storage. `'12:00:00+09'` means
2351    // 12:00:00 in a UTC+09 zone → 03:00:00 UTC → subtract the
2352    // positive offset (or add the negative one). Storage is i64
2353    // microseconds UTC for both TIMESTAMP and TIMESTAMPTZ (see
2354    // spg-storage::DataType::Timestamptz docs); the wire-level
2355    // round-trip then re-applies the session timezone on the
2356    // SELECT side when format_timestamp is asked for a TZ-aware
2357    // render.
2358    Some(i64::from(days) * 86_400_000_000 + day_micros - tz_offset_micros)
2359}
2360
2361/// v7.15.0 — Parse `HH:MM:SS[.frac][<tz>]` and return
2362/// `(day_micros, tz_offset_micros)` where `day_micros` is the
2363/// local-clock seconds-of-day in microseconds and
2364/// `tz_offset_micros` is the UTC offset (positive = east of
2365/// UTC, negative = west). Caller subtracts the offset to
2366/// normalise to UTC. PG's recognised TZ shapes after the
2367/// seconds (or frac) part:
2368///   * `+OO[:MM]` / `-OO[:MM]` — numeric offset
2369///   * `+OOMM` / `-OOMM` (no colon, less common but legal)
2370///   * ` UTC` / `UTC` / `Z` — explicit zero offset
2371/// Anything else after the seconds = parse failure (the caller
2372/// surfaces as "cannot parse … as TIMESTAMP").
2373fn parse_time_of_day_micros(t: &str) -> Option<(i64, i64)> {
2374    let t = t.trim();
2375    // Detect & strip optional TZ suffix. Anchor on the first
2376    // `+` / `-` AFTER position 8 (so the leading sign on a
2377    // negative offset can't be mistaken for an `HH:MM:SS-OO`
2378    // boundary if the time itself is somehow malformed).
2379    // ` UTC` and trailing `Z` also count as zero-offset TZ tags.
2380    let (core, tz_micros) = if let Some(rest) = t.strip_suffix('Z') {
2381        (rest, 0i64)
2382    } else if let Some(rest) = t
2383        .strip_suffix(" UTC")
2384        .or_else(|| t.strip_suffix("UTC"))
2385    {
2386        (rest, 0i64)
2387    } else if let Some((idx, sign_byte)) = find_offset_sign(t) {
2388        let suffix = &t[idx..];
2389        let micros = parse_tz_offset_suffix(suffix, sign_byte == b'+')?;
2390        (&t[..idx], micros)
2391    } else {
2392        (t, 0i64)
2393    };
2394    let (time, frac_str) = match core.split_once('.') {
2395        Some((a, b)) => (a, Some(b)),
2396        None => (core, None),
2397    };
2398    let bytes = time.as_bytes();
2399    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
2400        return None;
2401    }
2402    let hh: i64 = time[0..2].parse().ok()?;
2403    let mm: i64 = time[3..5].parse().ok()?;
2404    let ss: i64 = time[6..8].parse().ok()?;
2405    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
2406        return None;
2407    }
2408    let frac_micros: i64 = match frac_str {
2409        None => 0,
2410        Some(f) => {
2411            // Pad right with zeros to 6 digits, then truncate extras.
2412            if f.is_empty() || f.len() > 9 {
2413                return None;
2414            }
2415            let mut padded = String::with_capacity(6);
2416            padded.push_str(&f[..f.len().min(6)]);
2417            while padded.len() < 6 {
2418                padded.push('0');
2419            }
2420            padded.parse().ok()?
2421        }
2422    };
2423    Some((
2424        ((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros,
2425        tz_micros,
2426    ))
2427}
2428
2429/// Find the index of the TZ-offset sign byte (`+` or `-`) that
2430/// terminates an `HH:MM:SS[.fff]` time string, or `None` when
2431/// the time carries no numeric TZ suffix. Anchors past the first
2432/// 8 bytes (`HH:MM:SS`) so the seconds/minutes colons don't
2433/// confuse the scan.
2434fn find_offset_sign(t: &str) -> Option<(usize, u8)> {
2435    let bytes = t.as_bytes();
2436    // Start past `HH:MM:SS` (8 bytes).
2437    if bytes.len() < 9 {
2438        return None;
2439    }
2440    for i in 8..bytes.len() {
2441        match bytes[i] {
2442            b'+' | b'-' => return Some((i, bytes[i])),
2443            _ => {}
2444        }
2445    }
2446    None
2447}
2448
2449/// Parse `+OO`, `+OO:MM`, `+OOMM`, `-OO`, `-OO:MM`, `-OOMM` into
2450/// a UTC-offset microsecond delta. `is_positive` reflects the
2451/// already-stripped sign.
2452fn parse_tz_offset_suffix(suffix: &str, is_positive: bool) -> Option<i64> {
2453    // suffix starts with `+` or `-`; strip it.
2454    let body = &suffix[1..];
2455    let (hh, mm): (i64, i64) = if let Some((h, m)) = body.split_once(':') {
2456        (h.parse().ok()?, m.parse().ok()?)
2457    } else {
2458        match body.len() {
2459            2 => (body.parse().ok()?, 0),
2460            3 => {
2461                // PG's "+0530" form lacks the colon; but a 3-char
2462                // body is `OOM` which is ambiguous (`+053` ?). PG
2463                // doesn't emit that; reject.
2464                return None;
2465            }
2466            4 => {
2467                let h: i64 = body[0..2].parse().ok()?;
2468                let m: i64 = body[2..4].parse().ok()?;
2469                (h, m)
2470            }
2471            _ => return None,
2472        }
2473    };
2474    if !(0..=18).contains(&hh) || !(0..60).contains(&mm) {
2475        return None;
2476    }
2477    let abs = (hh * 3600 + mm * 60) * 1_000_000;
2478    Some(if is_positive { abs } else { -abs })
2479}
2480
2481/// Render an `Interval { months, micros }` in a PG-ish shape. The output
2482/// mirrors `psql`'s text format: years/months from the months part,
2483/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
2484/// omitted; an all-zero interval renders as `0`.
2485pub fn format_interval(months: i32, micros: i64) -> String {
2486    const MICROS_PER_DAY: i64 = 86_400_000_000;
2487    let mut parts: Vec<String> = Vec::new();
2488    let years = months / 12;
2489    let mons = months % 12;
2490    // PG renders the unit in the singular only for `+1`; `-1` and any
2491    // other value pluralise. Helper closes over that rule.
2492    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
2493        if n == 1 { singular } else { plural }
2494    };
2495    if years != 0 {
2496        parts.push(format!(
2497            "{years} {}",
2498            unit(i64::from(years), "year", "years")
2499        ));
2500    }
2501    if mons != 0 {
2502        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
2503    }
2504    let days = micros / MICROS_PER_DAY;
2505    let mut rem = micros % MICROS_PER_DAY;
2506    if days != 0 {
2507        parts.push(format!("{days} {}", unit(days, "day", "days")));
2508    }
2509    if rem != 0 {
2510        let neg = rem < 0;
2511        if neg {
2512            rem = -rem;
2513        }
2514        let secs = rem / 1_000_000;
2515        let frac = rem % 1_000_000;
2516        let hh = secs / 3600;
2517        let mm = (secs / 60) % 60;
2518        let ss = secs % 60;
2519        let sign = if neg { "-" } else { "" };
2520        if frac == 0 {
2521            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
2522        } else {
2523            let raw = format!("{frac:06}");
2524            let trimmed = raw.trim_end_matches('0');
2525            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
2526        }
2527    }
2528    if parts.is_empty() {
2529        "0".into()
2530    } else {
2531        parts.join(" ")
2532    }
2533}
2534
2535/// Add `months` (signed) to a `(year, month, day)` triple using PG's
2536/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
2537fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
2538    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
2539    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
2540    let new_month_zero = total_months.rem_euclid(12);
2541    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2542    let new_month = (new_month_zero as u32) + 1;
2543    let max_day = days_in_month(new_year, new_month);
2544    (new_year, new_month, d.min(max_day))
2545}
2546
2547const fn days_in_month(y: i32, m: u32) -> u32 {
2548    match m {
2549        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
2550        2 => {
2551            // Proleptic Gregorian leap rule.
2552            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
2553                29
2554            } else {
2555                28
2556            }
2557        }
2558        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
2559        // first, but be defensive) get the 30-day fallback.
2560        _ => 30,
2561    }
2562}
2563
2564/// v7.10.9 — render a TEXT[] in PG's external array form
2565/// (`{a,b,NULL}`). Elements containing whitespace, commas,
2566/// quotes, or braces get double-quoted with `\\` / `\"` escapes.
2567/// NULL elements use the literal token `NULL`. Public so the
2568/// wire layer can produce the canonical text-mode encoding.
2569pub fn format_text_array(items: &[Option<String>]) -> String {
2570    let mut out = String::with_capacity(2 + items.len() * 8);
2571    out.push('{');
2572    for (i, item) in items.iter().enumerate() {
2573        if i > 0 {
2574            out.push(',');
2575        }
2576        match item {
2577            None => out.push_str("NULL"),
2578            Some(s) => {
2579                let needs_quote = s.is_empty()
2580                    || s.eq_ignore_ascii_case("NULL")
2581                    || s.chars()
2582                        .any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
2583                if needs_quote {
2584                    out.push('"');
2585                    for c in s.chars() {
2586                        if c == '"' || c == '\\' {
2587                            out.push('\\');
2588                        }
2589                        out.push(c);
2590                    }
2591                    out.push('"');
2592                } else {
2593                    out.push_str(s);
2594                }
2595            }
2596        }
2597    }
2598    out.push('}');
2599    out
2600}
2601
2602/// v7.11.14 — render an INT[] in PG's external array form
2603/// (`{1,2,NULL}`). Integer payloads never need quoting. NULL
2604/// elements use the literal token `NULL`.
2605pub fn format_int_array(items: &[Option<i32>]) -> String {
2606    let mut out = String::with_capacity(2 + items.len() * 4);
2607    out.push('{');
2608    for (i, item) in items.iter().enumerate() {
2609        if i > 0 {
2610            out.push(',');
2611        }
2612        match item {
2613            None => out.push_str("NULL"),
2614            Some(n) => out.push_str(&n.to_string()),
2615        }
2616    }
2617    out.push('}');
2618    out
2619}
2620
2621/// v7.11.14 — render a BIGINT[] in PG's external array form
2622/// (`{1,2,NULL}`).
2623pub fn format_bigint_array(items: &[Option<i64>]) -> String {
2624    let mut out = String::with_capacity(2 + items.len() * 6);
2625    out.push('{');
2626    for (i, item) in items.iter().enumerate() {
2627        if i > 0 {
2628            out.push(',');
2629        }
2630        match item {
2631            None => out.push_str("NULL"),
2632            Some(n) => out.push_str(&n.to_string()),
2633        }
2634    }
2635    out.push('}');
2636    out
2637}
2638
2639/// v7.12.0 — render a `tsvector` in PG's external form:
2640/// `'lex':1,2A 'word':3` (single-quoted lexemes, optional
2641/// `:positions`, optional weight letter `A/B/C/D` per position).
2642/// Lexemes already arrive sorted + deduped from the engine. Used
2643/// by the wire layer (OID 3614) and by SELECT-text output.
2644pub fn format_tsvector(lexs: &[TsLexeme]) -> String {
2645    let mut out = String::with_capacity(lexs.len() * 12);
2646    for (i, l) in lexs.iter().enumerate() {
2647        if i > 0 {
2648            out.push(' ');
2649        }
2650        out.push('\'');
2651        for c in l.word.chars() {
2652            if c == '\'' {
2653                out.push('\'');
2654            }
2655            out.push(c);
2656        }
2657        out.push('\'');
2658        if !l.positions.is_empty() {
2659            for (pi, p) in l.positions.iter().enumerate() {
2660                out.push(if pi == 0 { ':' } else { ',' });
2661                out.push_str(&p.to_string());
2662            }
2663            // v7.12.0 — weight is per-lexeme (the v7.12 design
2664            // collapses PG's per-position weight into one letter).
2665            // Emit once after the last position; default `D`
2666            // (weight=0) stays implicit.
2667            match l.weight {
2668                3 => out.push('A'),
2669                2 => out.push('B'),
2670                1 => out.push('C'),
2671                _ => {}
2672            }
2673        }
2674    }
2675    out
2676}
2677
2678/// v7.12.0 — render a `tsquery` in PG's external form. Operator
2679/// precedence: `!` > `&` > `|`. Phrase distance shown as `<N>`.
2680pub fn format_tsquery(ast: &TsQueryAst) -> String {
2681    fn go(ast: &TsQueryAst, parent_prec: u8, out: &mut String) {
2682        // 0 = top, 1 = OR, 2 = AND, 3 = NOT/Phrase, 4 = atom.
2683        let (own_prec, write_self): (u8, &dyn Fn(&mut String)) = match ast {
2684            TsQueryAst::Or(_, _) => (1, &|_| {}),
2685            TsQueryAst::And(_, _) | TsQueryAst::Phrase { .. } => (2, &|_| {}),
2686            TsQueryAst::Not(_) => (3, &|_| {}),
2687            TsQueryAst::Term { .. } => (4, &|_| {}),
2688        };
2689        let need_parens = own_prec < parent_prec;
2690        if need_parens {
2691            out.push('(');
2692        }
2693        match ast {
2694            TsQueryAst::Term { word, .. } => {
2695                out.push('\'');
2696                for c in word.chars() {
2697                    if c == '\'' {
2698                        out.push('\'');
2699                    }
2700                    out.push(c);
2701                }
2702                out.push('\'');
2703            }
2704            TsQueryAst::And(a, b) => {
2705                go(a, own_prec, out);
2706                out.push_str(" & ");
2707                go(b, own_prec, out);
2708            }
2709            TsQueryAst::Or(a, b) => {
2710                go(a, own_prec, out);
2711                out.push_str(" | ");
2712                go(b, own_prec, out);
2713            }
2714            TsQueryAst::Not(x) => {
2715                out.push('!');
2716                go(x, own_prec, out);
2717            }
2718            TsQueryAst::Phrase {
2719                left,
2720                right,
2721                distance,
2722            } => {
2723                go(left, own_prec, out);
2724                out.push_str(&alloc::format!(" <{distance}> "));
2725                go(right, own_prec, out);
2726            }
2727        }
2728        write_self(out);
2729        if need_parens {
2730            out.push(')');
2731        }
2732    }
2733    let mut out = String::new();
2734    go(ast, 0, &mut out);
2735    out
2736}
2737
2738/// v7.12.0 — decode PG external form `'word':1,2A 'other':3` into
2739/// a `Vec<TsLexeme>`. Lexemes are sorted ascending by `word` (with
2740/// duplicates merged on positions) so the output matches the
2741/// engine invariant. Empty input yields an empty vector.
2742///
2743/// v7.12.0 only ships the cast-literal entry. Full `to_tsvector`
2744/// (Unicode word-split + Porter stemming + stopwords) lands in
2745/// v7.12.1.
2746pub fn decode_tsvector_external(s: &str) -> Result<Vec<TsLexeme>, EvalError> {
2747    let mut out: Vec<TsLexeme> = Vec::new();
2748    let mut i = 0;
2749    let bytes = s.as_bytes();
2750    while i < bytes.len() {
2751        while i < bytes.len() && bytes[i].is_ascii_whitespace() {
2752            i += 1;
2753        }
2754        if i >= bytes.len() {
2755            break;
2756        }
2757        // Quoted form `'word'` (with embedded `''` for a literal
2758        // single quote, mirroring PG).
2759        let word = if bytes[i] == b'\'' {
2760            i += 1;
2761            let mut w = String::new();
2762            loop {
2763                if i >= bytes.len() {
2764                    return Err(EvalError::TypeMismatch {
2765                        detail: "tsvector literal: unterminated quoted lexeme".into(),
2766                    });
2767                }
2768                let b = bytes[i];
2769                if b == b'\'' {
2770                    if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
2771                        w.push('\'');
2772                        i += 2;
2773                    } else {
2774                        i += 1;
2775                        break;
2776                    }
2777                } else {
2778                    w.push(b as char);
2779                    i += 1;
2780                }
2781            }
2782            w
2783        } else {
2784            // Bare form — read until whitespace, ':' or end.
2785            let start = i;
2786            while i < bytes.len() && !bytes[i].is_ascii_whitespace() && bytes[i] != b':' {
2787                i += 1;
2788            }
2789            core::str::from_utf8(&bytes[start..i])
2790                .map_err(|_| EvalError::TypeMismatch {
2791                    detail: "tsvector literal: non-UTF-8 lexeme".into(),
2792                })?
2793                .to_string()
2794        };
2795        if word.is_empty() {
2796            return Err(EvalError::TypeMismatch {
2797                detail: "tsvector literal: empty lexeme".into(),
2798            });
2799        }
2800        // Optional `:pos[,pos][,pos]`. Each position is u16; each
2801        // may carry a trailing weight letter A/B/C/D.
2802        let mut positions: Vec<u16> = Vec::new();
2803        let mut weight: u8 = 0;
2804        if i < bytes.len() && bytes[i] == b':' {
2805            i += 1;
2806            loop {
2807                let start = i;
2808                while i < bytes.len() && bytes[i].is_ascii_digit() {
2809                    i += 1;
2810                }
2811                if start == i {
2812                    return Err(EvalError::TypeMismatch {
2813                        detail: "tsvector literal: expected digit after ':'".into(),
2814                    });
2815                }
2816                let num: u16 = core::str::from_utf8(&bytes[start..i])
2817                    .expect("ascii digits")
2818                    .parse()
2819                    .map_err(|_| EvalError::TypeMismatch {
2820                        detail: alloc::format!(
2821                            "tsvector literal: position {} overflows u16",
2822                            core::str::from_utf8(&bytes[start..i]).unwrap_or("?")
2823                        ),
2824                    })?;
2825                positions.push(num);
2826                if i < bytes.len() {
2827                    let w = bytes[i];
2828                    if matches!(w, b'A' | b'B' | b'C' | b'D') {
2829                        weight = match w {
2830                            b'A' => 3,
2831                            b'B' => 2,
2832                            b'C' => 1,
2833                            _ => 0,
2834                        };
2835                        i += 1;
2836                    }
2837                }
2838                if i < bytes.len() && bytes[i] == b',' {
2839                    i += 1;
2840                    continue;
2841                }
2842                break;
2843            }
2844        }
2845        positions.sort_unstable();
2846        positions.dedup();
2847        // Merge into the output vector — sorted insert by word,
2848        // duplicate words merge positions.
2849        match out.binary_search_by(|l| l.word.as_str().cmp(word.as_str())) {
2850            Ok(idx) => {
2851                for p in positions {
2852                    if !out[idx].positions.contains(&p) {
2853                        out[idx].positions.push(p);
2854                    }
2855                }
2856                out[idx].positions.sort_unstable();
2857                if weight != 0 {
2858                    out[idx].weight = weight;
2859                }
2860            }
2861            Err(idx) => {
2862                out.insert(
2863                    idx,
2864                    TsLexeme {
2865                        word,
2866                        positions,
2867                        weight,
2868                    },
2869                );
2870            }
2871        }
2872    }
2873    Ok(out)
2874}
2875
2876/// v7.12.0 — decode PG external form `'foo' & 'bar' | !'baz'`
2877/// into a `TsQueryAst`. v7.12.0 supports the canonical
2878/// `to_tsquery` surface: single-quoted lexemes, `&` / `|` / `!`,
2879/// parens, and phrase `<N>`. Bare lexemes are accepted too. Full
2880/// `plainto_tsquery` / `websearch_to_tsquery` arrive in v7.12.1.
2881pub fn decode_tsquery_external(s: &str) -> Result<TsQueryAst, EvalError> {
2882    let mut p = TsQueryParser {
2883        bytes: s.as_bytes(),
2884        pos: 0,
2885    };
2886    p.skip_ws();
2887    if p.pos >= p.bytes.len() {
2888        return Err(EvalError::TypeMismatch {
2889            detail: "tsquery literal: empty".into(),
2890        });
2891    }
2892    let ast = p.parse_or()?;
2893    p.skip_ws();
2894    if p.pos < p.bytes.len() {
2895        return Err(EvalError::TypeMismatch {
2896            detail: alloc::format!("tsquery literal: trailing garbage at offset {}", p.pos),
2897        });
2898    }
2899    Ok(ast)
2900}
2901
2902struct TsQueryParser<'a> {
2903    bytes: &'a [u8],
2904    pos: usize,
2905}
2906
2907impl<'a> TsQueryParser<'a> {
2908    fn skip_ws(&mut self) {
2909        while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_whitespace() {
2910            self.pos += 1;
2911        }
2912    }
2913    fn peek(&self) -> Option<u8> {
2914        self.bytes.get(self.pos).copied()
2915    }
2916    fn parse_or(&mut self) -> Result<TsQueryAst, EvalError> {
2917        let mut lhs = self.parse_and()?;
2918        loop {
2919            self.skip_ws();
2920            if self.peek() != Some(b'|') {
2921                return Ok(lhs);
2922            }
2923            self.pos += 1;
2924            let rhs = self.parse_and()?;
2925            lhs = TsQueryAst::Or(Box::new(lhs), Box::new(rhs));
2926        }
2927    }
2928    fn parse_and(&mut self) -> Result<TsQueryAst, EvalError> {
2929        let mut lhs = self.parse_unary()?;
2930        loop {
2931            self.skip_ws();
2932            match self.peek() {
2933                Some(b'&') => {
2934                    self.pos += 1;
2935                    let rhs = self.parse_unary()?;
2936                    lhs = TsQueryAst::And(Box::new(lhs), Box::new(rhs));
2937                }
2938                Some(b'<') => {
2939                    // Phrase distance `<N>`.
2940                    self.pos += 1;
2941                    let start = self.pos;
2942                    while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_digit() {
2943                        self.pos += 1;
2944                    }
2945                    if start == self.pos || self.peek() != Some(b'>') {
2946                        return Err(EvalError::TypeMismatch {
2947                            detail: "tsquery literal: malformed <N> phrase operator".into(),
2948                        });
2949                    }
2950                    let n: u16 = core::str::from_utf8(&self.bytes[start..self.pos])
2951                        .expect("ascii digits")
2952                        .parse()
2953                        .map_err(|_| EvalError::TypeMismatch {
2954                            detail: "tsquery literal: phrase distance overflows u16".into(),
2955                        })?;
2956                    self.pos += 1; // consume '>'
2957                    let rhs = self.parse_unary()?;
2958                    lhs = TsQueryAst::Phrase {
2959                        left: Box::new(lhs),
2960                        right: Box::new(rhs),
2961                        distance: n,
2962                    };
2963                }
2964                _ => return Ok(lhs),
2965            }
2966        }
2967    }
2968    fn parse_unary(&mut self) -> Result<TsQueryAst, EvalError> {
2969        self.skip_ws();
2970        if self.peek() == Some(b'!') {
2971            self.pos += 1;
2972            let inner = self.parse_unary()?;
2973            return Ok(TsQueryAst::Not(Box::new(inner)));
2974        }
2975        self.parse_atom()
2976    }
2977    fn parse_atom(&mut self) -> Result<TsQueryAst, EvalError> {
2978        self.skip_ws();
2979        match self.peek() {
2980            Some(b'(') => {
2981                self.pos += 1;
2982                let inner = self.parse_or()?;
2983                self.skip_ws();
2984                if self.peek() != Some(b')') {
2985                    return Err(EvalError::TypeMismatch {
2986                        detail: "tsquery literal: missing ')'".into(),
2987                    });
2988                }
2989                self.pos += 1;
2990                Ok(inner)
2991            }
2992            Some(b'\'') => {
2993                self.pos += 1;
2994                let mut w = String::new();
2995                loop {
2996                    match self.peek() {
2997                        None => {
2998                            return Err(EvalError::TypeMismatch {
2999                                detail: "tsquery literal: unterminated quoted lexeme".into(),
3000                            });
3001                        }
3002                        Some(b'\'') => {
3003                            if self.bytes.get(self.pos + 1) == Some(&b'\'') {
3004                                w.push('\'');
3005                                self.pos += 2;
3006                            } else {
3007                                self.pos += 1;
3008                                break;
3009                            }
3010                        }
3011                        Some(b) => {
3012                            w.push(b as char);
3013                            self.pos += 1;
3014                        }
3015                    }
3016                }
3017                // Optional `:WEIGHT_MASK` (digit-mask) — v7.12.0
3018                // accepts but always stores 0 (any).
3019                self.skip_weight_suffix();
3020                Ok(TsQueryAst::Term {
3021                    word: w,
3022                    weight_mask: 0,
3023                })
3024            }
3025            Some(b) if b.is_ascii_alphanumeric() || b == b'_' => {
3026                let start = self.pos;
3027                while self.pos < self.bytes.len() {
3028                    let c = self.bytes[self.pos];
3029                    if c.is_ascii_alphanumeric() || c == b'_' {
3030                        self.pos += 1;
3031                    } else {
3032                        break;
3033                    }
3034                }
3035                let w = core::str::from_utf8(&self.bytes[start..self.pos])
3036                    .map_err(|_| EvalError::TypeMismatch {
3037                        detail: "tsquery literal: non-UTF-8 lexeme".into(),
3038                    })?
3039                    .to_string();
3040                self.skip_weight_suffix();
3041                Ok(TsQueryAst::Term {
3042                    word: w,
3043                    weight_mask: 0,
3044                })
3045            }
3046            Some(b) => Err(EvalError::TypeMismatch {
3047                detail: alloc::format!(
3048                    "tsquery literal: unexpected byte {:?} at offset {}",
3049                    b as char,
3050                    self.pos
3051                ),
3052            }),
3053            None => Err(EvalError::TypeMismatch {
3054                detail: "tsquery literal: expected term".into(),
3055            }),
3056        }
3057    }
3058    fn skip_weight_suffix(&mut self) {
3059        if self.peek() != Some(b':') {
3060            return;
3061        }
3062        self.pos += 1;
3063        while let Some(b) = self.peek() {
3064            if matches!(
3065                b,
3066                b'A' | b'B' | b'C' | b'D' | b'a' | b'b' | b'c' | b'd' | b'*'
3067            ) || b.is_ascii_digit()
3068            {
3069                self.pos += 1;
3070            } else {
3071                break;
3072            }
3073        }
3074    }
3075}
3076
3077/// v7.10.4 — render a BYTEA payload in PG's hex output format
3078/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
3079/// can emit the canonical bytea-as-text representation.
3080pub fn format_bytea_hex(b: &[u8]) -> String {
3081    let mut out = String::with_capacity(2 + 2 * b.len());
3082    out.push_str("\\x");
3083    const HEX: &[u8; 16] = b"0123456789abcdef";
3084    for byte in b {
3085        out.push(HEX[(byte >> 4) as usize] as char);
3086        out.push(HEX[(byte & 0x0F) as usize] as char);
3087    }
3088    out
3089}
3090
3091/// Render a `Numeric { scaled, scale }` as its decimal text form.
3092/// Negative `scaled` prepends `-` to the absolute value's digits; the
3093/// integer / fractional split is by character count, padding the
3094/// fractional side with leading zeros to exactly `scale` chars.
3095pub fn format_numeric(scaled: i128, scale: u8) -> String {
3096    if scale == 0 {
3097        return format!("{scaled}");
3098    }
3099    let negative = scaled < 0;
3100    let mag_str = scaled.unsigned_abs().to_string();
3101    let mag_bytes = mag_str.as_bytes();
3102    let scale_u = scale as usize;
3103    let mut out = String::with_capacity(mag_str.len() + 3);
3104    if negative {
3105        out.push('-');
3106    }
3107    if mag_bytes.len() <= scale_u {
3108        out.push('0');
3109        out.push('.');
3110        for _ in mag_bytes.len()..scale_u {
3111            out.push('0');
3112        }
3113        out.push_str(&mag_str);
3114    } else {
3115        let split = mag_bytes.len() - scale_u;
3116        out.push_str(&mag_str[..split]);
3117        out.push('.');
3118        out.push_str(&mag_str[split..]);
3119    }
3120    out
3121}
3122
3123fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
3124    match v {
3125        Value::Int(n) => Ok(Value::Int(n)),
3126        Value::BigInt(n) => i32::try_from(n)
3127            .map(Value::Int)
3128            .map_err(|_| EvalError::TypeMismatch {
3129                detail: format!("bigint {n} does not fit in int"),
3130            }),
3131        #[allow(clippy::cast_possible_truncation)]
3132        Value::Float(x) => Ok(Value::Int(x as i32)),
3133        Value::Text(s) => {
3134            s.trim()
3135                .parse::<i32>()
3136                .map(Value::Int)
3137                .map_err(|_| EvalError::TypeMismatch {
3138                    detail: format!("cannot parse {s:?} as int"),
3139                })
3140        }
3141        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
3142        other => Err(EvalError::TypeMismatch {
3143            detail: format!("cannot cast {:?} to int", other.data_type()),
3144        }),
3145    }
3146}
3147
3148fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
3149    match v {
3150        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
3151        Value::BigInt(n) => Ok(Value::BigInt(n)),
3152        #[allow(clippy::cast_possible_truncation)]
3153        Value::Float(x) => Ok(Value::BigInt(x as i64)),
3154        Value::Text(s) => {
3155            s.trim()
3156                .parse::<i64>()
3157                .map(Value::BigInt)
3158                .map_err(|_| EvalError::TypeMismatch {
3159                    detail: format!("cannot parse {s:?} as bigint"),
3160                })
3161        }
3162        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
3163        other => Err(EvalError::TypeMismatch {
3164            detail: format!("cannot cast {:?} to bigint", other.data_type()),
3165        }),
3166    }
3167}
3168
3169fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
3170    match v {
3171        Value::Int(n) => Ok(Value::Float(f64::from(n))),
3172        #[allow(clippy::cast_precision_loss)]
3173        Value::BigInt(n) => Ok(Value::Float(n as f64)),
3174        Value::Float(x) => Ok(Value::Float(x)),
3175        Value::Text(s) => {
3176            s.trim()
3177                .parse::<f64>()
3178                .map(Value::Float)
3179                .map_err(|_| EvalError::TypeMismatch {
3180                    detail: format!("cannot parse {s:?} as float"),
3181                })
3182        }
3183        other => Err(EvalError::TypeMismatch {
3184            detail: format!("cannot cast {:?} to float", other.data_type()),
3185        }),
3186    }
3187}
3188
3189fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
3190    match v {
3191        Value::Bool(b) => Ok(Value::Bool(b)),
3192        Value::Int(n) => Ok(Value::Bool(n != 0)),
3193        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
3194        Value::Text(s) => {
3195            let lo = s.trim().to_ascii_lowercase();
3196            match lo.as_str() {
3197                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
3198                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
3199                _ => Err(EvalError::TypeMismatch {
3200                    detail: format!("cannot parse {s:?} as bool"),
3201                }),
3202            }
3203        }
3204        other => Err(EvalError::TypeMismatch {
3205            detail: format!("cannot cast {:?} to bool", other.data_type()),
3206        }),
3207    }
3208}
3209
3210/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
3211/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
3212pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
3213    match v {
3214        Value::Null => Ok(Value::Null),
3215        Value::Vector(v) => Ok(Value::Vector(v)),
3216        Value::Text(s) => parse_vector_text(&s)
3217            .map(Value::Vector)
3218            .ok_or(EvalError::TypeMismatch {
3219                detail: format!("cannot parse {s:?} as a vector literal"),
3220            }),
3221        other => Err(EvalError::TypeMismatch {
3222            detail: format!("::vector requires text input, got {:?}", other.data_type()),
3223        }),
3224    }
3225}
3226
3227/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
3228fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
3229    let trimmed = s.trim();
3230    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
3231    let trimmed_inner = inner.trim();
3232    if trimmed_inner.is_empty() {
3233        return Some(Vec::new());
3234    }
3235    let mut out = Vec::new();
3236    for part in trimmed_inner.split(',') {
3237        let f: f32 = part.trim().parse().ok()?;
3238        out.push(f);
3239    }
3240    Some(out)
3241}
3242
3243fn literal_to_value(l: &Literal) -> Value {
3244    match l {
3245        Literal::Integer(n) => {
3246            if let Ok(small) = i32::try_from(*n) {
3247                Value::Int(small)
3248            } else {
3249                Value::BigInt(*n)
3250            }
3251        }
3252        Literal::Float(x) => Value::Float(*x),
3253        Literal::String(s) => Value::Text(s.clone()),
3254        Literal::Vector(v) => Value::Vector(v.clone()),
3255        Literal::Bool(b) => Value::Bool(*b),
3256        Literal::Null => Value::Null,
3257        Literal::Interval { months, micros, .. } => Value::Interval {
3258            months: *months,
3259            micros: *micros,
3260        },
3261    }
3262}
3263
3264fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
3265    if let Some(q) = &c.qualifier {
3266        // Multi-table evaluation (joins): the synthesised schema uses
3267        // composite column names "alias.column" so we look that up
3268        // directly. Falls back to the single-table case below if the
3269        // composite isn't present.
3270        let composite = alloc::format!("{q}.{name}", name = c.name);
3271        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
3272            return Ok(row.values[pos].clone());
3273        }
3274        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
3275            qualifier: q.clone(),
3276        })?;
3277        if q != expected {
3278            return Err(EvalError::UnknownQualifier {
3279                qualifier: q.clone(),
3280            });
3281        }
3282    }
3283    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
3284        return Ok(row.values[pos].clone());
3285    }
3286    // Bare-name fallback for joined schemas: match any single composite
3287    // column ending in ".<name>"; ambiguity is an error.
3288    let suffix = alloc::format!(".{name}", name = c.name);
3289    let mut matches = ctx
3290        .columns
3291        .iter()
3292        .enumerate()
3293        .filter(|(_, s)| s.name.ends_with(&suffix));
3294    let first = matches.next();
3295    let extra = matches.next();
3296    match (first, extra) {
3297        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
3298        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
3299            detail: alloc::format!("ambiguous column reference: {}", c.name),
3300        }),
3301        _ => Err(EvalError::ColumnNotFound {
3302            name: c.name.clone(),
3303        }),
3304    }
3305}
3306
3307fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
3308    match (op, v) {
3309        (_, Value::Null) => Ok(Value::Null),
3310        (UnOp::Neg, Value::Int(n)) => {
3311            n.checked_neg()
3312                .map(Value::Int)
3313                .ok_or(EvalError::TypeMismatch {
3314                    detail: "integer overflow on unary -".into(),
3315                })
3316        }
3317        (UnOp::Neg, Value::BigInt(n)) => {
3318            n.checked_neg()
3319                .map(Value::BigInt)
3320                .ok_or(EvalError::TypeMismatch {
3321                    detail: "bigint overflow on unary -".into(),
3322                })
3323        }
3324        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
3325        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
3326            detail: format!("unary - applied to {:?}", other.data_type()),
3327        }),
3328        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
3329        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
3330            detail: format!("NOT applied to {:?}", other.data_type()),
3331        }),
3332    }
3333}
3334
3335/// v7.9.27b — true when two values are "not distinct" per PG:
3336/// both NULL counts as equal; otherwise reduces to regular Eq.
3337fn values_not_distinct(l: &Value, r: &Value) -> bool {
3338    match (l, r) {
3339        (Value::Null, Value::Null) => true,
3340        (Value::Null, _) | (_, Value::Null) => false,
3341        _ => l == r,
3342    }
3343}
3344
3345fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
3346    // SQL three-valued logic for AND / OR with NULL is special — handle before
3347    // the general NULL-propagation rule.
3348    if let BinOp::And = op {
3349        return and_3vl(l, r);
3350    }
3351    if let BinOp::Or = op {
3352        return or_3vl(l, r);
3353    }
3354    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
3355    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
3356    if let BinOp::IsNotDistinctFrom = op {
3357        return Ok(Value::Bool(values_not_distinct(&l, &r)));
3358    }
3359    if let BinOp::IsDistinctFrom = op {
3360        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
3361    }
3362    // Everything else: any NULL operand → NULL.
3363    if l.is_null() || r.is_null() {
3364        return Ok(Value::Null);
3365    }
3366    // NUMERIC arithmetic and comparisons run in fixed-point; promote
3367    // integers to a common NUMERIC scale and stay in i128 throughout.
3368    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
3369        return apply_binary_numeric(op, l, r);
3370    }
3371    // Date / Timestamp arithmetic. PG semantics:
3372    //   * date + int      → date  (int is days)
3373    //   * int + date      → date
3374    //   * date - int      → date
3375    //   * date - date     → int   (days, signed)
3376    //   * timestamp - timestamp → bigint (microseconds, signed)
3377    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
3378    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
3379        return Ok(result);
3380    }
3381    match op {
3382        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
3383        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
3384        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
3385        BinOp::Div => div_op(l, r),
3386        BinOp::L2Distance => l2_distance(l, r),
3387        BinOp::InnerProduct => inner_product(l, r),
3388        BinOp::CosineDistance => cosine_distance(l, r),
3389        BinOp::Concat => Ok(text_concat(&l, &r)),
3390        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
3391        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
3392        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
3393        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
3394        BinOp::JsonContains => crate::json::contains(&l, &r),
3395        // v7.12.2 — `@@` match. NULL on either side → NULL; PG
3396        // accepts both orderings so we normalise.
3397        BinOp::TsMatch => ts_match(l, r),
3398        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3399            compare(op, &l, &r)
3400        }
3401        BinOp::And | BinOp::Or | BinOp::IsDistinctFrom | BinOp::IsNotDistinctFrom => {
3402            unreachable!("handled above")
3403        }
3404    }
3405}
3406
3407/// Calendar arithmetic. Returns `Some(value)` when the operand pair
3408/// is a date/time combo this function understands, `None` to let the
3409/// caller fall through to the regular numeric / text paths.
3410fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
3411    let int_value = |v: &Value| -> Option<i64> {
3412        match v {
3413            Value::SmallInt(n) => Some(i64::from(*n)),
3414            Value::Int(n) => Some(i64::from(*n)),
3415            Value::BigInt(n) => Some(*n),
3416            _ => None,
3417        }
3418    };
3419    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
3420    // DATE-integer subtraction, otherwise the latter swallows the
3421    // former with an `int_value(Date) = None` no-op fall-through.
3422    match (l, r) {
3423        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
3424            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
3425        }
3426        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
3427            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
3428                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
3429            })?;
3430            return Ok(Some(Value::BigInt(delta)));
3431        }
3432        _ => {}
3433    }
3434    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
3435    // date ± interval → date (if interval is pure days/months with no
3436    // sub-day component) else timestamp, interval ± interval → interval.
3437    if let Some(out) = apply_binary_interval(op, l, r)? {
3438        return Ok(Some(out));
3439    }
3440    match (l, r) {
3441        (Value::Date(d), other) if op == BinOp::Add => {
3442            if let Some(n) = int_value(other) {
3443                let days = i64::from(*d).saturating_add(n);
3444                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3445                    detail: "DATE + integer overflows DATE range".into(),
3446                })?;
3447                return Ok(Some(Value::Date(days32)));
3448            }
3449        }
3450        (other, Value::Date(d)) if op == BinOp::Add => {
3451            if let Some(n) = int_value(other) {
3452                let days = i64::from(*d).saturating_add(n);
3453                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3454                    detail: "integer + DATE overflows DATE range".into(),
3455                })?;
3456                return Ok(Some(Value::Date(days32)));
3457            }
3458        }
3459        (Value::Date(d), other) if op == BinOp::Sub => {
3460            if let Some(n) = int_value(other) {
3461                let days = i64::from(*d).saturating_sub(n);
3462                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3463                    detail: "DATE - integer overflows DATE range".into(),
3464                })?;
3465                return Ok(Some(Value::Date(days32)));
3466            }
3467        }
3468        _ => {}
3469    }
3470    Ok(None)
3471}
3472
3473/// INTERVAL-aware binary ops. Recognises:
3474///   timestamp ± interval → timestamp
3475///   date ± interval      → date (if interval is integral days/months only)
3476///                       → timestamp (if interval has sub-day micros)
3477///   interval ± interval  → interval
3478/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
3479/// the caller can fall through.
3480fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
3481    // Normalise so the interval (if any) is always on the right for Add;
3482    // Sub stays left-handed because it isn't commutative.
3483    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
3484        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
3485        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
3486        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
3487        _ => return Ok(None),
3488    };
3489    let Value::Interval {
3490        months: rhs_months,
3491        micros: rhs_us,
3492    } = rhs
3493    else {
3494        unreachable!("rhs guaranteed to be Interval by the match above");
3495    };
3496    let signed_months = i64::from(*rhs_months) * sign;
3497    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
3498        detail: "INTERVAL micros overflows on negation".into(),
3499    })?;
3500    match lhs {
3501        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
3502            *t,
3503            signed_months,
3504            signed_micros,
3505        )?))),
3506        Value::Date(d) => {
3507            // Date + interval stays a date when the interval has zero
3508            // sub-day microseconds; otherwise promote to TIMESTAMP at
3509            // midnight of the (months-shifted) date first.
3510            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
3511            if day_aligned {
3512                let micros_per_day = 86_400_000_000_i64;
3513                let days_delta = signed_micros / micros_per_day;
3514                let shifted = shift_date_by_months(*d, signed_months)?;
3515                let new_days =
3516                    i64::from(shifted)
3517                        .checked_add(days_delta)
3518                        .ok_or(EvalError::TypeMismatch {
3519                            detail: "DATE ± INTERVAL overflows DATE range".into(),
3520                        })?;
3521                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
3522                    detail: "DATE ± INTERVAL overflows DATE range".into(),
3523                })?;
3524                Ok(Some(Value::Date(days32)))
3525            } else {
3526                let base =
3527                    i64::from(*d)
3528                        .checked_mul(86_400_000_000)
3529                        .ok_or(EvalError::TypeMismatch {
3530                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
3531                        })?;
3532                Ok(Some(Value::Timestamp(add_interval_to_micros(
3533                    base,
3534                    signed_months,
3535                    signed_micros,
3536                )?)))
3537            }
3538        }
3539        Value::Interval {
3540            months: lhs_months,
3541            micros: lhs_us,
3542        } => {
3543            let new_months = i64::from(*lhs_months)
3544                .checked_add(signed_months)
3545                .and_then(|n| i32::try_from(n).ok())
3546                .ok_or(EvalError::TypeMismatch {
3547                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
3548                })?;
3549            let new_micros = lhs_us
3550                .checked_add(signed_micros)
3551                .ok_or(EvalError::TypeMismatch {
3552                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
3553                })?;
3554            Ok(Some(Value::Interval {
3555                months: new_months,
3556                micros: new_micros,
3557            }))
3558        }
3559        _ => Err(EvalError::TypeMismatch {
3560            detail: format!(
3561                "operator {op:?} not defined for {:?} and INTERVAL",
3562                lhs.data_type()
3563            ),
3564        }),
3565    }
3566}
3567
3568/// Shift a `Date` by a signed number of months using the PG clamp rule.
3569fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
3570    let (y, m, day) = civil_from_days(d);
3571    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
3572        detail: "INTERVAL months delta out of i32 range".into(),
3573    })?;
3574    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
3575    Ok(days_from_civil(ny, nm, nd))
3576}
3577
3578/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
3579/// Months part is applied through civil calendar with clamp-to-last-day;
3580/// micros part is plain i64 addition with overflow guard.
3581fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
3582    let mut out = t;
3583    if months != 0 {
3584        const MICROS_PER_DAY: i64 = 86_400_000_000;
3585        let days = out.div_euclid(MICROS_PER_DAY);
3586        let day_micros = out.rem_euclid(MICROS_PER_DAY);
3587        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
3588            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
3589        })?;
3590        let shifted_days = shift_date_by_months(day_i32, months)?;
3591        out = i64::from(shifted_days)
3592            .checked_mul(MICROS_PER_DAY)
3593            .and_then(|n| n.checked_add(day_micros))
3594            .ok_or(EvalError::TypeMismatch {
3595                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
3596            })?;
3597    }
3598    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
3599        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
3600    })
3601}
3602
3603/// Dispatch for any binary op when at least one operand is NUMERIC.
3604/// Other-side integers / floats are promoted to a NUMERIC at a common
3605/// scale; all add / sub / mul / div / compare paths stay in i128.
3606#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
3607fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
3608    // Float still wins — Numeric + Float coerces both to f64 and runs
3609    // through the float path. PG demotes Numeric to float in this mix
3610    // too (the documented behaviour for `numeric + double precision`).
3611    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
3612    if float_path {
3613        let af = as_f64(&l)?;
3614        let bf = as_f64(&r)?;
3615        return match op {
3616            BinOp::Add => Ok(Value::Float(af + bf)),
3617            BinOp::Sub => Ok(Value::Float(af - bf)),
3618            BinOp::Mul => Ok(Value::Float(af * bf)),
3619            BinOp::Div => {
3620                if bf == 0.0 {
3621                    Err(EvalError::DivisionByZero)
3622                } else {
3623                    Ok(Value::Float(af / bf))
3624                }
3625            }
3626            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3627                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
3628                    detail: "NaN in NUMERIC/Float comparison".into(),
3629                })?;
3630                Ok(Value::Bool(cmp_to_bool(op, ord)))
3631            }
3632            BinOp::Concat => Ok(text_concat(&l, &r)),
3633            other => Err(EvalError::TypeMismatch {
3634                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
3635            }),
3636        };
3637    }
3638    // Promote integer ↔ numeric to a shared scale (max of both sides).
3639    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
3640        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
3641    })?;
3642    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
3643        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
3644    })?;
3645    match op {
3646        BinOp::Add | BinOp::Sub => {
3647            let target_scale = sa.max(sb);
3648            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
3649                detail: "NUMERIC overflow on rescale".into(),
3650            })?;
3651            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
3652                detail: "NUMERIC overflow on rescale".into(),
3653            })?;
3654            let r = match op {
3655                BinOp::Add => lhs.checked_add(rhs),
3656                BinOp::Sub => lhs.checked_sub(rhs),
3657                _ => unreachable!(),
3658            }
3659            .ok_or(EvalError::TypeMismatch {
3660                detail: "NUMERIC overflow on +/-".into(),
3661            })?;
3662            Ok(Value::Numeric {
3663                scaled: r,
3664                scale: target_scale,
3665            })
3666        }
3667        BinOp::Mul => {
3668            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
3669                detail: "NUMERIC overflow on *".into(),
3670            })?;
3671            Ok(Value::Numeric {
3672                scaled,
3673                scale: sa.saturating_add(sb),
3674            })
3675        }
3676        BinOp::Div => {
3677            if b == 0 {
3678                return Err(EvalError::DivisionByZero);
3679            }
3680            // Result scale: keep the wider operand's scale. Pre-scale
3681            // the numerator so the integer division retains that many
3682            // fractional digits. Round half-away-from-zero.
3683            let target_scale = sa.max(sb);
3684            // Numerator effective scale becomes sa + target_scale; we
3685            // bring it up to (target_scale + sb) so the divisor's scale
3686            // cancels cleanly.
3687            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
3688            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
3689                detail: "NUMERIC overflow on / scaling".into(),
3690            })?;
3691            let half = if b >= 0 { b / 2 } else { -(b / 2) };
3692            let adj = if (num >= 0) == (b >= 0) {
3693                num + half
3694            } else {
3695                num - half
3696            };
3697            Ok(Value::Numeric {
3698                scaled: adj / b,
3699                scale: target_scale,
3700            })
3701        }
3702        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
3703            let target_scale = sa.max(sb);
3704            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
3705                detail: "NUMERIC overflow on rescale".into(),
3706            })?;
3707            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
3708                detail: "NUMERIC overflow on rescale".into(),
3709            })?;
3710            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
3711        }
3712        BinOp::Concat => Ok(text_concat(&l, &r)),
3713        other => Err(EvalError::TypeMismatch {
3714            detail: format!("operator {other:?} not defined for NUMERIC"),
3715        }),
3716    }
3717}
3718
3719/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
3720/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
3721/// returns `None` and the caller raises a type error.
3722fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
3723    match v {
3724        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
3725        Value::Int(n) => Some((i128::from(*n), 0)),
3726        Value::SmallInt(n) => Some((i128::from(*n), 0)),
3727        Value::BigInt(n) => Some((i128::from(*n), 0)),
3728        _ => None,
3729    }
3730}
3731
3732fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
3733    if src == dst {
3734        return Some(scaled);
3735    }
3736    if dst > src {
3737        scaled.checked_mul(pow10_i128(dst - src))
3738    } else {
3739        let drop = pow10_i128(src - dst);
3740        let half = drop / 2;
3741        let r = if scaled >= 0 {
3742            scaled + half
3743        } else {
3744            scaled - half
3745        };
3746        Some(r / drop)
3747    }
3748}
3749
3750const fn pow10_i128(p: u8) -> i128 {
3751    let mut acc: i128 = 1;
3752    let mut i = 0;
3753    while i < p {
3754        acc *= 10;
3755        i += 1;
3756    }
3757    acc
3758}
3759
3760const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
3761    use core::cmp::Ordering::{Equal, Greater, Less};
3762    match op {
3763        BinOp::Eq => matches!(ord, Equal),
3764        BinOp::NotEq => !matches!(ord, Equal),
3765        BinOp::Lt => matches!(ord, Less),
3766        BinOp::LtEq => matches!(ord, Less | Equal),
3767        BinOp::Gt => matches!(ord, Greater),
3768        BinOp::GtEq => matches!(ord, Greater | Equal),
3769        _ => false,
3770    }
3771}
3772
3773/// SQL `||` string concatenation. Operands are coerced to text via the same
3774/// rule as `::text` cast. NULL propagates (handled above; this function only
3775/// runs with non-NULL operands).
3776fn text_concat(l: &Value, r: &Value) -> Value {
3777    // v7.11.8 — PG `||` overloads: TEXT[] || TEXT[] = concatenated array;
3778    // TEXT[] || TEXT (or TEXT || TEXT[]) prepends/appends the single
3779    // element. NULL || anything = NULL (PG semantics for arrays;
3780    // text concat treats NULL the same way after value_to_text).
3781    match (l, r) {
3782        (Value::Null, _) | (_, Value::Null) => {
3783            // PG text concat: NULL || x = NULL. Array concat: NULL || x = NULL.
3784            // Keep the legacy text path (value_to_text handles Null as ""),
3785            // but for arrays we surface real NULL to match PG.
3786            if matches!(
3787                l,
3788                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
3789            ) || matches!(
3790                r,
3791                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
3792            ) {
3793                return Value::Null;
3794            }
3795        }
3796        (Value::TextArray(a), Value::TextArray(b)) => {
3797            let mut out = a.clone();
3798            out.extend(b.iter().cloned());
3799            return Value::TextArray(out);
3800        }
3801        (Value::TextArray(a), Value::Text(s)) => {
3802            let mut out = a.clone();
3803            out.push(Some(s.clone()));
3804            return Value::TextArray(out);
3805        }
3806        (Value::Text(s), Value::TextArray(b)) => {
3807            let mut out: alloc::vec::Vec<Option<alloc::string::String>> =
3808                alloc::vec::Vec::with_capacity(1 + b.len());
3809            out.push(Some(s.clone()));
3810            out.extend(b.iter().cloned());
3811            return Value::TextArray(out);
3812        }
3813        // v7.11.13 — IntArray / BigIntArray `||` overloads. Same
3814        // PG semantics as TEXT[]: array||array concatenates, and
3815        // array||scalar appends/prepends. Mixed Int/BigInt widens
3816        // to BigIntArray.
3817        (Value::IntArray(a), Value::IntArray(b)) => {
3818            let mut out = a.clone();
3819            out.extend(b.iter().copied());
3820            return Value::IntArray(out);
3821        }
3822        (Value::IntArray(a), Value::Int(n)) => {
3823            let mut out = a.clone();
3824            out.push(Some(*n));
3825            return Value::IntArray(out);
3826        }
3827        (Value::IntArray(a), Value::SmallInt(n)) => {
3828            let mut out = a.clone();
3829            out.push(Some(i32::from(*n)));
3830            return Value::IntArray(out);
3831        }
3832        (Value::Int(n), Value::IntArray(b)) => {
3833            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
3834            out.push(Some(*n));
3835            out.extend(b.iter().copied());
3836            return Value::IntArray(out);
3837        }
3838        (Value::SmallInt(n), Value::IntArray(b)) => {
3839            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
3840            out.push(Some(i32::from(*n)));
3841            out.extend(b.iter().copied());
3842            return Value::IntArray(out);
3843        }
3844        (Value::BigIntArray(a), Value::BigIntArray(b)) => {
3845            let mut out = a.clone();
3846            out.extend(b.iter().copied());
3847            return Value::BigIntArray(out);
3848        }
3849        (Value::BigIntArray(a), Value::IntArray(b)) => {
3850            let mut out = a.clone();
3851            out.extend(b.iter().map(|o| o.map(i64::from)));
3852            return Value::BigIntArray(out);
3853        }
3854        (Value::IntArray(a), Value::BigIntArray(b)) => {
3855            let mut out: alloc::vec::Vec<Option<i64>> =
3856                a.iter().map(|o| o.map(i64::from)).collect();
3857            out.extend(b.iter().copied());
3858            return Value::BigIntArray(out);
3859        }
3860        (Value::BigIntArray(a), Value::BigInt(n)) => {
3861            let mut out = a.clone();
3862            out.push(Some(*n));
3863            return Value::BigIntArray(out);
3864        }
3865        (Value::BigIntArray(a), Value::Int(n)) => {
3866            let mut out = a.clone();
3867            out.push(Some(i64::from(*n)));
3868            return Value::BigIntArray(out);
3869        }
3870        (Value::BigIntArray(a), Value::SmallInt(n)) => {
3871            let mut out = a.clone();
3872            out.push(Some(i64::from(*n)));
3873            return Value::BigIntArray(out);
3874        }
3875        (Value::BigInt(n), Value::BigIntArray(b)) => {
3876            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3877            out.push(Some(*n));
3878            out.extend(b.iter().copied());
3879            return Value::BigIntArray(out);
3880        }
3881        (Value::Int(n), Value::BigIntArray(b)) => {
3882            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3883            out.push(Some(i64::from(*n)));
3884            out.extend(b.iter().copied());
3885            return Value::BigIntArray(out);
3886        }
3887        (Value::SmallInt(n), Value::BigIntArray(b)) => {
3888            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
3889            out.push(Some(i64::from(*n)));
3890            out.extend(b.iter().copied());
3891            return Value::BigIntArray(out);
3892        }
3893        // v7.11.15 — BYTEA `||` is byte concatenation.
3894        (Value::Bytes(a), Value::Bytes(b)) => {
3895            let mut out = a.clone();
3896            out.extend_from_slice(b);
3897            return Value::Bytes(out);
3898        }
3899        _ => {}
3900    }
3901    let a = value_to_text(l);
3902    let b = value_to_text(r);
3903    Value::Text(a + &b)
3904}
3905
3906/// pgvector inner-product `<#>`. Returns the *negative* dot product so
3907/// smaller still means more similar — same convention as pgvector.
3908fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
3909    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
3910    let mut dot: f64 = 0.0;
3911    for (x, y) in a.iter().zip(b.iter()) {
3912        dot += f64::from(*x) * f64::from(*y);
3913    }
3914    Ok(Value::Float(-dot))
3915}
3916
3917/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
3918/// operand produces NaN (matches pgvector).
3919fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
3920    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
3921    let mut dot: f64 = 0.0;
3922    let mut na: f64 = 0.0;
3923    let mut nb: f64 = 0.0;
3924    for (x, y) in a.iter().zip(b.iter()) {
3925        let xf = f64::from(*x);
3926        let yf = f64::from(*y);
3927        dot += xf * yf;
3928        na += xf * xf;
3929        nb += yf * yf;
3930    }
3931    let denom = sqrt_newton(na) * sqrt_newton(nb);
3932    if denom == 0.0 {
3933        return Ok(Value::Float(f64::NAN));
3934    }
3935    Ok(Value::Float(1.0 - dot / denom))
3936}
3937
3938fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
3939    // v6.0.1: SQ8 cells coming through the SQL evaluator are
3940    // dequantised to f32 here so the existing scalar distance
3941    // arithmetic stays intact. HNSW kNN search continues to use
3942    // the asymmetric ADC variant inside `cell_to_query_metric_
3943    // distance` — this path only runs when a vector expression
3944    // lands in the evaluator (full-scan ORDER BY, SELECT
3945    // projection of `v <-> $1`, etc.).
3946    let to_f32 = |v: Value| -> Option<Vec<f32>> {
3947        match v {
3948            Value::Vector(a) => Some(a),
3949            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
3950            // v6.0.3: bit-exact dequant for halfvec cells.
3951            Value::HalfVector(h) => Some(h.to_f32_vec()),
3952            _ => None,
3953        }
3954    };
3955    let l_ty = l.data_type();
3956    let r_ty = r.data_type();
3957    match (to_f32(l), to_f32(r)) {
3958        (Some(a), Some(b)) => {
3959            if a.len() != b.len() {
3960                return Err(EvalError::TypeMismatch {
3961                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
3962                });
3963            }
3964            Ok((a, b))
3965        }
3966        _ => Err(EvalError::TypeMismatch {
3967            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
3968        }),
3969    }
3970}
3971
3972/// Numeric arithmetic with widening.
3973/// - both `Int` → `Int` (with overflow check)
3974/// - `Int` op `BigInt` (either side) → `BigInt`
3975/// - any `Float` involved → `Float`
3976fn arith(
3977    l: Value,
3978    r: Value,
3979    int_op: impl Fn(i64, i64) -> Option<i64>,
3980    float_op: impl Fn(f64, f64) -> f64,
3981    op_name: &str,
3982) -> Result<Value, EvalError> {
3983    // Widen SmallInt to Int up front so the rest of the arithmetic
3984    // table only deals with Int / BigInt / Float pairs.
3985    let widen = |v: Value| -> Value {
3986        match v {
3987            Value::SmallInt(n) => Value::Int(i32::from(n)),
3988            other => other,
3989        }
3990    };
3991    let l = widen(l);
3992    let r = widen(r);
3993    match (l, r) {
3994        (Value::Int(a), Value::Int(b)) => {
3995            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
3996                detail: format!("integer overflow on {op_name}"),
3997            })?;
3998            if let Ok(small) = i32::try_from(result) {
3999                Ok(Value::Int(small))
4000            } else {
4001                Ok(Value::BigInt(result))
4002            }
4003        }
4004        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
4005            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
4006                detail: format!("bigint overflow on {op_name}"),
4007            })?;
4008            Ok(Value::BigInt(result))
4009        }
4010        (Value::BigInt(a), Value::BigInt(b)) => {
4011            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
4012                detail: format!("bigint overflow on {op_name}"),
4013            })?;
4014            Ok(Value::BigInt(result))
4015        }
4016        (a, b)
4017            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
4018        {
4019            let af = as_f64(&a)?;
4020            let bf = as_f64(&b)?;
4021            Ok(Value::Float(float_op(af, bf)))
4022        }
4023        (a, b) => Err(EvalError::TypeMismatch {
4024            detail: format!(
4025                "{op_name} applied to non-numeric: {:?} vs {:?}",
4026                a.data_type(),
4027                b.data_type()
4028            ),
4029        }),
4030    }
4031}
4032
4033/// L2 (Euclidean) distance between two vectors of equal dimension.
4034/// Returned as `Value::Float(d)` so it composes with the existing
4035/// comparison / sort plumbing. Mismatched dims or non-vector operands
4036/// raise `TypeMismatch`.
4037#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
4038fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
4039    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
4040    // cells dequantise on the way in. Sub-f64 precision loss is
4041    // negligible vs the dequantisation noise the SQ8 path already
4042    // ships with.
4043    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
4044    let mut sum: f64 = 0.0;
4045    for (x, y) in a.iter().zip(b.iter()) {
4046        let d = f64::from(*x) - f64::from(*y);
4047        sum += d * d;
4048    }
4049    Ok(Value::Float(sqrt_newton(sum)))
4050}
4051
4052/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
4053/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
4054/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
4055/// f32-derived distances, always non-negative, never NaN).
4056fn sqrt_newton(x: f64) -> f64 {
4057    if x <= 0.0 {
4058        return 0.0;
4059    }
4060    let mut g = x;
4061    // 10 iterations is conservative; 6 already converges to ulp for typical
4062    // distances.
4063    for _ in 0..10 {
4064        g = 0.5 * (g + x / g);
4065    }
4066    g
4067}
4068
4069fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
4070    let any_float = matches!(l.data_type(), Some(DataType::Float))
4071        || matches!(r.data_type(), Some(DataType::Float));
4072    if any_float {
4073        let a = as_f64(&l)?;
4074        let b = as_f64(&r)?;
4075        if b == 0.0 {
4076            return Err(EvalError::DivisionByZero);
4077        }
4078        return Ok(Value::Float(a / b));
4079    }
4080    arith(
4081        l,
4082        r,
4083        |a, b| {
4084            if b == 0 { None } else { Some(a / b) }
4085        },
4086        |a, b| a / b,
4087        "/",
4088    )
4089    .map_err(|e| match e {
4090        // The closure returns None on b == 0; translate that into the dedicated
4091        // DivisionByZero variant instead of "integer overflow on /".
4092        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
4093        other => other,
4094    })
4095}
4096
4097fn as_f64(v: &Value) -> Result<f64, EvalError> {
4098    match v {
4099        Value::SmallInt(n) => Ok(f64::from(*n)),
4100        Value::Int(n) => Ok(f64::from(*n)),
4101        #[allow(clippy::cast_precision_loss)]
4102        Value::BigInt(n) => Ok(*n as f64),
4103        Value::Float(x) => Ok(*x),
4104        #[allow(clippy::cast_precision_loss)]
4105        Value::Numeric { scaled, scale } => {
4106            let mut div = 1.0_f64;
4107            for _ in 0..*scale {
4108                div *= 10.0;
4109            }
4110            Ok((*scaled as f64) / div)
4111        }
4112        other => Err(EvalError::TypeMismatch {
4113            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
4114        }),
4115    }
4116}
4117
4118fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
4119    let ord = match (l, r) {
4120        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
4121        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
4122        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
4123        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
4124        (a, b)
4125            if matches!(a.data_type(), Some(DataType::Float))
4126                || matches!(b.data_type(), Some(DataType::Float)) =>
4127        {
4128            let af = as_f64(a)?;
4129            let bf = as_f64(b)?;
4130            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
4131                detail: "NaN in comparison".into(),
4132            })?
4133        }
4134        (Value::Text(a), Value::Text(b)) => a.cmp(b),
4135        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
4136        // Date / Timestamp compare on their integer storage repr.
4137        // Cross-domain (Date vs Timestamp) lifts the Date to the
4138        // matching midnight TIMESTAMP first.
4139        (Value::Date(a), Value::Date(b)) => a.cmp(b),
4140        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
4141        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
4142        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
4143        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
4144        // column against a text literal lifts the literal into the
4145        // matching domain (e.g. `day >= '2024-01-01'`).
4146        (Value::Date(a), Value::Text(b)) => {
4147            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
4148                detail: format!("cannot parse {b:?} as DATE for comparison"),
4149            })?;
4150            a.cmp(&bd)
4151        }
4152        (Value::Text(a), Value::Date(b)) => {
4153            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
4154                detail: format!("cannot parse {a:?} as DATE for comparison"),
4155            })?;
4156            ad.cmp(b)
4157        }
4158        (Value::Timestamp(a), Value::Text(b)) => {
4159            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
4160                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
4161            })?;
4162            a.cmp(&bt)
4163        }
4164        (Value::Text(a), Value::Timestamp(b)) => {
4165            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
4166                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
4167            })?;
4168            at.cmp(b)
4169        }
4170        (a, b) => {
4171            return Err(EvalError::TypeMismatch {
4172                detail: format!(
4173                    "comparison between {:?} and {:?}",
4174                    a.data_type(),
4175                    b.data_type()
4176                ),
4177            });
4178        }
4179    };
4180    let result = match op {
4181        BinOp::Eq => ord.is_eq(),
4182        BinOp::NotEq => !ord.is_eq(),
4183        BinOp::Lt => ord.is_lt(),
4184        BinOp::LtEq => ord.is_le(),
4185        BinOp::Gt => ord.is_gt(),
4186        BinOp::GtEq => ord.is_ge(),
4187        BinOp::And
4188        | BinOp::Or
4189        | BinOp::Add
4190        | BinOp::Sub
4191        | BinOp::Mul
4192        | BinOp::Div
4193        | BinOp::L2Distance
4194        | BinOp::InnerProduct
4195        | BinOp::CosineDistance
4196        | BinOp::Concat
4197        | BinOp::JsonGet
4198        | BinOp::JsonGetText
4199        | BinOp::JsonGetPath
4200        | BinOp::JsonGetPathText
4201        | BinOp::JsonContains
4202        | BinOp::TsMatch
4203        | BinOp::IsDistinctFrom
4204        | BinOp::IsNotDistinctFrom => {
4205            unreachable!("compare() only called with comparison ops")
4206        }
4207    };
4208    Ok(Value::Bool(result))
4209}
4210
4211// SQL three-valued AND / OR.
4212fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
4213    match (l, r) {
4214        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
4215        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
4216        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
4217        (a, b) => Err(EvalError::TypeMismatch {
4218            detail: format!(
4219                "AND on non-boolean: {:?} and {:?}",
4220                a.data_type(),
4221                b.data_type()
4222            ),
4223        }),
4224    }
4225}
4226
4227fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
4228    match (l, r) {
4229        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
4230        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
4231        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
4232        (a, b) => Err(EvalError::TypeMismatch {
4233            detail: format!(
4234                "OR on non-boolean: {:?} and {:?}",
4235                a.data_type(),
4236                b.data_type()
4237            ),
4238        }),
4239    }
4240}
4241
4242#[cfg(test)]
4243mod tests {
4244    use super::*;
4245    use alloc::vec;
4246    use spg_storage::{ColumnSchema, Row};
4247
4248    fn col(name: &str, ty: DataType) -> ColumnSchema {
4249        ColumnSchema::new(name, ty, true)
4250    }
4251
4252    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
4253        EvalContext::new(cols, alias)
4254    }
4255
4256    fn lit(n: i64) -> Expr {
4257        Expr::Literal(Literal::Integer(n))
4258    }
4259
4260    fn null() -> Expr {
4261        Expr::Literal(Literal::Null)
4262    }
4263
4264    fn col_ref(name: &str) -> Expr {
4265        Expr::Column(ColumnName {
4266            qualifier: None,
4267            name: name.into(),
4268        })
4269    }
4270
4271    #[test]
4272    fn literal_evaluates_to_value() {
4273        let r = Row::new(vec![]);
4274        let cs: [ColumnSchema; 0] = [];
4275        let c = ctx(&cs, None);
4276        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
4277        assert_eq!(
4278            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
4279            Value::Float(1.5)
4280        );
4281        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
4282    }
4283
4284    #[test]
4285    fn column_lookup_unqualified() {
4286        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
4287        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
4288        let c = ctx(&cs, None);
4289        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
4290        assert_eq!(
4291            eval_expr(&col_ref("b"), &r, &c).unwrap(),
4292            Value::Text("hi".into())
4293        );
4294    }
4295
4296    #[test]
4297    fn column_not_found_errors() {
4298        let cs = vec![col("a", DataType::Int)];
4299        let r = Row::new(vec![Value::Int(0)]);
4300        let c = ctx(&cs, None);
4301        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
4302        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
4303    }
4304
4305    #[test]
4306    fn qualified_column_matches_alias() {
4307        let cs = vec![col("a", DataType::Int)];
4308        let r = Row::new(vec![Value::Int(5)]);
4309        let c = ctx(&cs, Some("u"));
4310        let qualified = Expr::Column(ColumnName {
4311            qualifier: Some("u".into()),
4312            name: "a".into(),
4313        });
4314        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
4315    }
4316
4317    #[test]
4318    fn qualified_column_unknown_alias_errors() {
4319        let cs = vec![col("a", DataType::Int)];
4320        let r = Row::new(vec![Value::Int(5)]);
4321        let c = ctx(&cs, Some("u"));
4322        let wrong = Expr::Column(ColumnName {
4323            qualifier: Some("x".into()),
4324            name: "a".into(),
4325        });
4326        assert!(matches!(
4327            eval_expr(&wrong, &r, &c).unwrap_err(),
4328            EvalError::UnknownQualifier { .. }
4329        ));
4330    }
4331
4332    #[test]
4333    fn arithmetic_with_widening() {
4334        let r = Row::new(vec![]);
4335        let cs: [ColumnSchema; 0] = [];
4336        let c = ctx(&cs, None);
4337        let e = Expr::Binary {
4338            lhs: alloc::boxed::Box::new(lit(2)),
4339            op: BinOp::Add,
4340            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
4341        };
4342        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
4343    }
4344
4345    #[test]
4346    fn division_by_zero_errors() {
4347        let r = Row::new(vec![]);
4348        let cs: [ColumnSchema; 0] = [];
4349        let c = ctx(&cs, None);
4350        let e = Expr::Binary {
4351            lhs: alloc::boxed::Box::new(lit(1)),
4352            op: BinOp::Div,
4353            rhs: alloc::boxed::Box::new(lit(0)),
4354        };
4355        assert_eq!(
4356            eval_expr(&e, &r, &c).unwrap_err(),
4357            EvalError::DivisionByZero
4358        );
4359    }
4360
4361    #[test]
4362    fn comparison_returns_bool() {
4363        let r = Row::new(vec![]);
4364        let cs: [ColumnSchema; 0] = [];
4365        let c = ctx(&cs, None);
4366        let e = Expr::Binary {
4367            lhs: alloc::boxed::Box::new(lit(1)),
4368            op: BinOp::Lt,
4369            rhs: alloc::boxed::Box::new(lit(2)),
4370        };
4371        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
4372    }
4373
4374    #[test]
4375    fn null_propagates_through_arithmetic() {
4376        let r = Row::new(vec![]);
4377        let cs: [ColumnSchema; 0] = [];
4378        let c = ctx(&cs, None);
4379        let e = Expr::Binary {
4380            lhs: alloc::boxed::Box::new(lit(1)),
4381            op: BinOp::Add,
4382            rhs: alloc::boxed::Box::new(null()),
4383        };
4384        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
4385    }
4386
4387    #[test]
4388    fn and_three_valued_logic() {
4389        let r = Row::new(vec![]);
4390        let cs: [ColumnSchema; 0] = [];
4391        let c = ctx(&cs, None);
4392        let tt = |a: bool, b_null: bool| Expr::Binary {
4393            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
4394            op: BinOp::And,
4395            rhs: alloc::boxed::Box::new(if b_null {
4396                null()
4397            } else {
4398                Expr::Literal(Literal::Bool(true))
4399            }),
4400        };
4401        // FALSE AND NULL → FALSE
4402        assert_eq!(
4403            eval_expr(&tt(false, true), &r, &c).unwrap(),
4404            Value::Bool(false)
4405        );
4406        // TRUE AND NULL → NULL
4407        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
4408        // TRUE AND TRUE → TRUE
4409        assert_eq!(
4410            eval_expr(&tt(true, false), &r, &c).unwrap(),
4411            Value::Bool(true)
4412        );
4413    }
4414
4415    #[test]
4416    fn or_three_valued_logic() {
4417        let r = Row::new(vec![]);
4418        let cs: [ColumnSchema; 0] = [];
4419        let c = ctx(&cs, None);
4420        let or_with_null = |a: bool| Expr::Binary {
4421            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
4422            op: BinOp::Or,
4423            rhs: alloc::boxed::Box::new(null()),
4424        };
4425        // TRUE OR NULL → TRUE
4426        assert_eq!(
4427            eval_expr(&or_with_null(true), &r, &c).unwrap(),
4428            Value::Bool(true)
4429        );
4430        // FALSE OR NULL → NULL
4431        assert_eq!(
4432            eval_expr(&or_with_null(false), &r, &c).unwrap(),
4433            Value::Null
4434        );
4435    }
4436
4437    #[test]
4438    fn not_on_null_is_null() {
4439        let r = Row::new(vec![]);
4440        let cs: [ColumnSchema; 0] = [];
4441        let c = ctx(&cs, None);
4442        let e = Expr::Unary {
4443            op: UnOp::Not,
4444            expr: alloc::boxed::Box::new(null()),
4445        };
4446        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
4447    }
4448
4449    #[test]
4450    fn text_comparison_lexicographic() {
4451        let r = Row::new(vec![]);
4452        let cs: [ColumnSchema; 0] = [];
4453        let c = ctx(&cs, None);
4454        let e = Expr::Binary {
4455            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
4456            op: BinOp::Lt,
4457            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
4458        };
4459        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
4460    }
4461
4462    #[test]
4463    fn interval_format_basics() {
4464        assert_eq!(format_interval(0, 0), "0");
4465        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
4466        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
4467        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
4468        assert_eq!(
4469            format_interval(0, 86_400_000_000 + 9_000_000),
4470            "1 day 00:00:09"
4471        );
4472        assert_eq!(format_interval(14, 0), "1 year 2 mons");
4473        assert_eq!(format_interval(-1, 0), "-1 mons");
4474    }
4475
4476    #[test]
4477    fn interval_add_to_timestamp_micros_part() {
4478        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
4479        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
4480        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
4481        let expected = ts + 3_600_000_000;
4482        assert_eq!(r, expected);
4483    }
4484
4485    #[test]
4486    fn interval_clamp_month_end() {
4487        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
4488        let d = days_from_civil(2024, 1, 31);
4489        let shifted = shift_date_by_months(d, 1).unwrap();
4490        let (y, m, day) = civil_from_days(shifted);
4491        assert_eq!((y, m, day), (2024, 2, 29));
4492        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
4493        let d = days_from_civil(2023, 1, 31);
4494        let shifted = shift_date_by_months(d, 1).unwrap();
4495        let (y, m, day) = civil_from_days(shifted);
4496        assert_eq!((y, m, day), (2023, 2, 28));
4497        // 2024-03-31 - 1 month = 2024-02-29.
4498        let d = days_from_civil(2024, 3, 31);
4499        let shifted = shift_date_by_months(d, -1).unwrap();
4500        let (y, m, day) = civil_from_days(shifted);
4501        assert_eq!((y, m, day), (2024, 2, 29));
4502    }
4503
4504    #[test]
4505    fn interval_date_plus_pure_days_stays_date() {
4506        // DATE + INTERVAL '7 days' must stay DATE.
4507        let d = days_from_civil(2024, 6, 1);
4508        let lhs = Value::Date(d);
4509        let rhs = Value::Interval {
4510            months: 0,
4511            micros: 7 * 86_400_000_000,
4512        };
4513        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
4514            .unwrap()
4515            .unwrap();
4516        let expected = days_from_civil(2024, 6, 8);
4517        assert_eq!(v, Value::Date(expected));
4518    }
4519
4520    #[test]
4521    fn interval_date_plus_sub_day_lifts_to_timestamp() {
4522        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
4523        let d = days_from_civil(2024, 6, 1);
4524        let lhs = Value::Date(d);
4525        let rhs = Value::Interval {
4526            months: 0,
4527            micros: 3_600_000_000,
4528        };
4529        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
4530            .unwrap()
4531            .unwrap();
4532        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
4533        assert_eq!(v, Value::Timestamp(expected));
4534    }
4535}