Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21
22use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
23use spg_storage::{ColumnSchema, DataType, Row, Value};
24
25/// Resolution context for evaluating a single row. `table_alias` is the alias
26/// (or table name) callers should accept as the qualifier on a column ref —
27/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
28#[derive(Debug, Clone)]
29pub struct EvalContext<'a> {
30    pub columns: &'a [ColumnSchema],
31    pub table_alias: Option<&'a str>,
32    /// v6.1.1 — bound parameters for `$N` placeholders inside the
33    /// expression tree. Empty for simple queries; populated by the
34    /// prepared-statement Execute path with Bind values converted
35    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
36    pub params: &'a [Value],
37}
38
39impl<'a> EvalContext<'a> {
40    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
41        Self {
42            columns,
43            table_alias,
44            params: &[],
45        }
46    }
47
48    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
49    /// resolution. The slice must outlive the context; callers
50    /// construct it from the prepared statement's Bind values.
51    #[must_use]
52    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
53        self.params = params;
54        self
55    }
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum EvalError {
60    ColumnNotFound {
61        name: String,
62    },
63    UnknownQualifier {
64        qualifier: String,
65    },
66    DivisionByZero,
67    TypeMismatch {
68        detail: String,
69    },
70    /// v6.1.1 — `$N` reference past the number of bound parameters.
71    /// Either the client sent too few in Bind, or the SQL has a
72    /// placeholder the prepared statement didn't account for.
73    PlaceholderOutOfRange {
74        n: u16,
75        bound: u16,
76    },
77}
78
79impl core::fmt::Display for EvalError {
80    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
81        match self {
82            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
83            Self::UnknownQualifier { qualifier } => {
84                write!(f, "unknown table qualifier: {qualifier}")
85            }
86            Self::DivisionByZero => f.write_str("division by zero"),
87            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
88            Self::PlaceholderOutOfRange { n, bound } => write!(
89                f,
90                "parameter ${n} referenced but only {bound} bound by client"
91            ),
92        }
93    }
94}
95
96pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
97    match expr {
98        Expr::Literal(l) => Ok(literal_to_value(l)),
99        Expr::Column(c) => resolve_column(c, row, ctx),
100        Expr::Placeholder(n) => {
101            let idx = usize::from(*n).saturating_sub(1);
102            ctx.params
103                .get(idx)
104                .cloned()
105                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
106                    n: *n,
107                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
108                })
109        }
110        Expr::Unary { op, expr } => {
111            let v = eval_expr(expr, row, ctx)?;
112            apply_unary(*op, v)
113        }
114        Expr::Binary { lhs, op, rhs } => {
115            let l = eval_expr(lhs, row, ctx)?;
116            let r = eval_expr(rhs, row, ctx)?;
117            apply_binary(*op, l, r)
118        }
119        Expr::Cast { expr, target } => {
120            let v = eval_expr(expr, row, ctx)?;
121            cast_value(v, *target)
122        }
123        Expr::IsNull { expr, negated } => {
124            let v = eval_expr(expr, row, ctx)?;
125            let is_null = matches!(v, Value::Null);
126            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
127        }
128        Expr::FunctionCall { name, args } => {
129            let evaluated: Result<Vec<Value>, _> =
130                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
131            apply_function(name, &evaluated?)
132        }
133        Expr::Like {
134            expr,
135            pattern,
136            negated,
137        } => {
138            let v = eval_expr(expr, row, ctx)?;
139            let p = eval_expr(pattern, row, ctx)?;
140            // NULL on either side propagates to NULL — same as PG.
141            let (text, pat) = match (v, p) {
142                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
143                (Value::Text(a), Value::Text(b)) => (a, b),
144                (Value::Text(_), other) | (other, _) => {
145                    return Err(EvalError::TypeMismatch {
146                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
147                    });
148                }
149            };
150            let m = like_match(&text, &pat);
151            Ok(Value::Bool(if *negated { !m } else { m }))
152        }
153        Expr::Extract { field, source } => {
154            let v = eval_expr(source, row, ctx)?;
155            extract_field(*field, &v)
156        }
157        // v4.10: subquery nodes should have been resolved into
158        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
159        // before the row loop. Anything reaching here is a bug.
160        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
161            Err(EvalError::TypeMismatch {
162                detail: "subquery reached row eval — engine resolver bug".into(),
163            })
164        }
165        // v4.12: window functions should have been rewritten into
166        // synthetic __win_N column references by
167        // exec_select_with_window before row eval. Anything
168        // reaching here is similarly a bug.
169        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
170            detail: "window function reached row eval — engine rewrite bug".into(),
171        }),
172        // v7.10.10 — `ARRAY[expr, expr, …]` constructor.
173        // v7.11.13 — element-type detection: all integers →
174        // IntArray (or BigIntArray when widening), any Text →
175        // TextArray. Non-TEXT non-integer elements (Bool, Float)
176        // stringify into TextArray as the safe default.
177        Expr::Array(items) => {
178            let mut materialised: Vec<Value> = Vec::with_capacity(items.len());
179            for elem in items {
180                materialised.push(eval_expr(elem, row, ctx)?);
181            }
182            let mut has_text = false;
183            let mut has_bigint = false;
184            let mut has_int = false;
185            for v in &materialised {
186                match v {
187                    Value::Null => {}
188                    Value::Int(_) | Value::SmallInt(_) => has_int = true,
189                    Value::BigInt(_) => has_bigint = true,
190                    Value::Text(_) | Value::Json(_) => has_text = true,
191                    _ => has_text = true,
192                }
193            }
194            if has_text || (!has_int && !has_bigint) {
195                let out: Vec<Option<String>> = materialised
196                    .into_iter()
197                    .map(|v| match v {
198                        Value::Null => None,
199                        Value::Text(s) | Value::Json(s) => Some(s),
200                        other => Some(value_to_text_for_array(&other)),
201                    })
202                    .collect();
203                return Ok(Value::TextArray(out));
204            }
205            if has_bigint {
206                let out: Vec<Option<i64>> = materialised
207                    .into_iter()
208                    .map(|v| match v {
209                        Value::Null => None,
210                        Value::Int(n) => Some(i64::from(n)),
211                        Value::SmallInt(n) => Some(i64::from(n)),
212                        Value::BigInt(n) => Some(n),
213                        _ => unreachable!(),
214                    })
215                    .collect();
216                return Ok(Value::BigIntArray(out));
217            }
218            let out: Vec<Option<i32>> = materialised
219                .into_iter()
220                .map(|v| match v {
221                    Value::Null => None,
222                    Value::Int(n) => Some(n),
223                    Value::SmallInt(n) => Some(i32::from(n)),
224                    _ => unreachable!(),
225                })
226                .collect();
227            Ok(Value::IntArray(out))
228        }
229        // v7.10.12 — `arr[i]` PG-style 1-based indexing.
230        // Out-of-range indices (including i ≤ 0) return NULL.
231        Expr::ArraySubscript { target, index } => {
232            let target_v = eval_expr(target, row, ctx)?;
233            let idx_v = eval_expr(index, row, ctx)?;
234            if matches!(target_v, Value::Null) || matches!(idx_v, Value::Null) {
235                return Ok(Value::Null);
236            }
237            let i: i64 = match idx_v {
238                Value::Int(n) => i64::from(n),
239                Value::BigInt(n) => n,
240                Value::SmallInt(n) => i64::from(n),
241                other => {
242                    return Err(EvalError::TypeMismatch {
243                        detail: format!(
244                            "array subscript must be integer, got {:?}",
245                            other.data_type()
246                        ),
247                    });
248                }
249            };
250            if i < 1 {
251                return Ok(Value::Null);
252            }
253            let pos = (i - 1) as usize;
254            match target_v {
255                Value::TextArray(items) => match items.get(pos) {
256                    Some(Some(s)) => Ok(Value::Text(s.clone())),
257                    Some(None) | None => Ok(Value::Null),
258                },
259                Value::IntArray(items) => match items.get(pos) {
260                    Some(Some(n)) => Ok(Value::Int(*n)),
261                    Some(None) | None => Ok(Value::Null),
262                },
263                Value::BigIntArray(items) => match items.get(pos) {
264                    Some(Some(n)) => Ok(Value::BigInt(*n)),
265                    Some(None) | None => Ok(Value::Null),
266                },
267                other => Err(EvalError::TypeMismatch {
268                    detail: format!(
269                        "subscript target must be an array, got {:?}",
270                        other.data_type()
271                    ),
272                }),
273            }
274        }
275        // v7.10.12 — `x op ANY(arr)` / `x op ALL(arr)`. PG
276        // 3VL: ANY → true if any element compares-true; NULL if
277        // no true but some NULL; false otherwise. ALL: false if
278        // any compares-false; NULL if no false but some NULL;
279        // true otherwise.
280        Expr::AnyAll {
281            expr,
282            op,
283            array,
284            is_any,
285        } => {
286            let lhs = eval_expr(expr, row, ctx)?;
287            let arr = eval_expr(array, row, ctx)?;
288            if matches!(arr, Value::Null) {
289                return Ok(Value::Null);
290            }
291            let elems: Vec<Option<Value>> = match arr {
292                Value::TextArray(items) => items
293                    .into_iter()
294                    .map(|o| o.map(Value::Text))
295                    .collect(),
296                Value::IntArray(items) => items
297                    .into_iter()
298                    .map(|o| o.map(Value::Int))
299                    .collect(),
300                Value::BigIntArray(items) => items
301                    .into_iter()
302                    .map(|o| o.map(Value::BigInt))
303                    .collect(),
304                other => {
305                    return Err(EvalError::TypeMismatch {
306                        detail: format!(
307                            "ANY/ALL right-hand side must be an array, got {:?}",
308                            other.data_type()
309                        ),
310                    });
311                }
312            };
313            let mut saw_null = matches!(lhs, Value::Null);
314            let mut saw_match = false;
315            let mut saw_mismatch = false;
316            for elem in elems {
317                let elem_v = match elem {
318                    Some(v) => v,
319                    None => {
320                        saw_null = true;
321                        continue;
322                    }
323                };
324                if matches!(lhs, Value::Null) {
325                    saw_null = true;
326                    continue;
327                }
328                match apply_binary(*op, lhs.clone(), elem_v) {
329                    Ok(Value::Bool(true)) => saw_match = true,
330                    Ok(Value::Bool(false)) => saw_mismatch = true,
331                    Ok(Value::Null) => saw_null = true,
332                    Ok(other) => {
333                        return Err(EvalError::TypeMismatch {
334                            detail: format!(
335                                "ANY/ALL comparison didn't return Bool: {:?}",
336                                other.data_type()
337                            ),
338                        });
339                    }
340                    Err(e) => return Err(e),
341                }
342            }
343            let result = if *is_any {
344                if saw_match {
345                    Value::Bool(true)
346                } else if saw_null {
347                    Value::Null
348                } else {
349                    Value::Bool(false)
350                }
351            } else if saw_mismatch {
352                Value::Bool(false)
353            } else if saw_null {
354                Value::Null
355            } else {
356                Value::Bool(true)
357            };
358            Ok(result)
359        }
360    }
361}
362
363/// v7.10.10 — best-effort text rendering for non-TEXT array
364/// elements (numbers, bools, etc.). The PG rule is that
365/// `ARRAY[1, 2]` is `int[]`, but SPG's v7.10 only models TEXT[],
366/// so we widen by stringifying. NUMERIC formatting goes through
367/// the existing canonical helpers to stay consistent with
368/// `format_numeric` / `format_date` etc.
369fn value_to_text_for_array(v: &Value) -> String {
370    match v {
371        Value::Text(s) | Value::Json(s) => s.clone(),
372        Value::Int(n) => n.to_string(),
373        Value::BigInt(n) => n.to_string(),
374        Value::SmallInt(n) => n.to_string(),
375        Value::Bool(b) => {
376            if *b {
377                "true".into()
378            } else {
379                "false".into()
380            }
381        }
382        Value::Float(x) => format!("{x}"),
383        Value::Date(d) => format_date(*d),
384        Value::Timestamp(t) => format_timestamp(*t),
385        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
386        _ => format!("{v:?}"),
387    }
388}
389
390/// Pull an integer component (year / month / ... / microsecond) out
391/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
392/// when the source isn't a calendar type.
393fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
394    use spg_sql::ast::ExtractField as F;
395    if matches!(v, Value::Null) {
396        return Ok(Value::Null);
397    }
398    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
399    // the months part, the rest from the microseconds part. PG matches
400    // this convention (months is normalised modulo 12 for MONTH).
401    if let Value::Interval { months, micros } = *v {
402        let years = months / 12;
403        let mons = months % 12;
404        let secs_total = micros / 1_000_000;
405        let frac = micros % 1_000_000;
406        let result = match field {
407            F::Year => i64::from(years),
408            F::Month => i64::from(mons),
409            F::Day => micros / 86_400_000_000,
410            F::Hour => (secs_total / 3600) % 24,
411            F::Minute => (secs_total / 60) % 60,
412            F::Second => secs_total % 60,
413            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
414        };
415        return Ok(Value::BigInt(result));
416    }
417    let (days, day_micros) = match *v {
418        Value::Date(d) => (d, 0_i64),
419        Value::Timestamp(t) => {
420            let days = t.div_euclid(86_400_000_000);
421            let day_micros = t.rem_euclid(86_400_000_000);
422            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
423        }
424        _ => {
425            return Err(EvalError::TypeMismatch {
426                detail: format!(
427                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
428                    v.data_type()
429                ),
430            });
431        }
432    };
433    let (y, m, d) = civil_components(days);
434    let secs = day_micros / 1_000_000;
435    let hh = secs / 3600;
436    let mm = (secs / 60) % 60;
437    let ss = secs % 60;
438    let frac = day_micros % 1_000_000;
439    let result = match field {
440        F::Year => i64::from(y),
441        F::Month => i64::from(m),
442        F::Day => i64::from(d),
443        F::Hour => hh,
444        F::Minute => mm,
445        F::Second => ss,
446        F::Microsecond => ss * 1_000_000 + frac,
447    };
448    Ok(Value::BigInt(result))
449}
450
451/// Internal wrapper around the file-private `civil_from_days` so the
452/// public surface area doesn't change. Returns `(year, month, day)`.
453fn civil_components(days: i32) -> (i32, u32, u32) {
454    civil_from_days(days)
455}
456
457/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
458/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
459/// literal `%`. Matches the whole input — no implicit anchoring needed
460/// since SQL `LIKE` is always full-string.
461fn like_match(text: &str, pattern: &str) -> bool {
462    let text: Vec<char> = text.chars().collect();
463    let pat: Vec<char> = pattern.chars().collect();
464    like_match_inner(&text, 0, &pat, 0)
465}
466
467fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
468    while pi < pat.len() {
469        match pat[pi] {
470            '%' => {
471                // Collapse consecutive `%` and try every possible split.
472                while pi < pat.len() && pat[pi] == '%' {
473                    pi += 1;
474                }
475                if pi == pat.len() {
476                    return true;
477                }
478                for k in ti..=text.len() {
479                    if like_match_inner(text, k, pat, pi) {
480                        return true;
481                    }
482                }
483                return false;
484            }
485            '_' => {
486                if ti >= text.len() {
487                    return false;
488                }
489                ti += 1;
490                pi += 1;
491            }
492            '\\' if pi + 1 < pat.len() => {
493                let want = pat[pi + 1];
494                if ti >= text.len() || text[ti] != want {
495                    return false;
496                }
497                ti += 1;
498                pi += 2;
499            }
500            c => {
501                if ti >= text.len() || text[ti] != c {
502                    return false;
503                }
504                ti += 1;
505                pi += 1;
506            }
507        }
508    }
509    ti == text.len()
510}
511
512/// Dispatch on lowercased function name. v1.4 implements only a handful of
513/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
514fn apply_function(name: &str, args: &[Value]) -> Result<Value, EvalError> {
515    match name.to_ascii_lowercase().as_str() {
516        "length" => {
517            if args.len() != 1 {
518                return Err(EvalError::TypeMismatch {
519                    detail: format!("length() takes 1 arg, got {}", args.len()),
520                });
521            }
522            match &args[0] {
523                Value::Null => Ok(Value::Null),
524                Value::Text(s) => {
525                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
526                    Ok(Value::Int(n))
527                }
528                // v7.10.4 — PG semantics: length(bytea) returns
529                // byte count (= octet_length). Without this branch
530                // mailrs's INSERT … SELECT length(body) … against a
531                // BYTEA column would type-mismatch.
532                Value::Bytes(b) => {
533                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
534                    Ok(Value::Int(n))
535                }
536                other => Err(EvalError::TypeMismatch {
537                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
538                }),
539            }
540        }
541        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
542        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
543        // to length() for bytea by design.
544        "octet_length" => {
545            if args.len() != 1 {
546                return Err(EvalError::TypeMismatch {
547                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
548                });
549            }
550            match &args[0] {
551                Value::Null => Ok(Value::Null),
552                Value::Text(s) => {
553                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
554                    Ok(Value::Int(n))
555                }
556                Value::Bytes(b) => {
557                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
558                    Ok(Value::Int(n))
559                }
560                other => Err(EvalError::TypeMismatch {
561                    detail: format!(
562                        "octet_length() needs text or bytea, got {:?}",
563                        other.data_type()
564                    ),
565                }),
566            }
567        }
568        // v7.11.6 — `array_length(arr, dim)` returns the element
569        // count of `arr` along dimension `dim`. v7.11 only models
570        // single-dimension arrays so dim must be 1 (otherwise NULL,
571        // matching PG semantics for unsupported dimensions). NULL
572        // array → NULL. v7.11 TEXT[] only; non-array operand is
573        // a type mismatch.
574        "array_length" => {
575            if args.len() != 2 {
576                return Err(EvalError::TypeMismatch {
577                    detail: format!("array_length() takes 2 args, got {}", args.len()),
578                });
579            }
580            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
581                return Ok(Value::Null);
582            }
583            let len = match &args[0] {
584                Value::TextArray(items) => items.len(),
585                Value::IntArray(items) => items.len(),
586                Value::BigIntArray(items) => items.len(),
587                _ => {
588                    return Err(EvalError::TypeMismatch {
589                        detail: format!(
590                            "array_length() first arg must be an array, got {:?}",
591                            args[0].data_type()
592                        ),
593                    });
594                }
595            };
596            let dim: i64 = match args[1] {
597                Value::Int(n) => i64::from(n),
598                Value::BigInt(n) => n,
599                Value::SmallInt(n) => i64::from(n),
600                _ => {
601                    return Err(EvalError::TypeMismatch {
602                        detail: format!(
603                            "array_length() second arg must be integer, got {:?}",
604                            args[1].data_type()
605                        ),
606                    });
607                }
608            };
609            if dim != 1 {
610                return Ok(Value::Null);
611            }
612            let n = i32::try_from(len).unwrap_or(i32::MAX);
613            Ok(Value::Int(n))
614        }
615        // v7.11.6 — `array_position(arr, val)` returns 1-based
616        // index of the first element of `arr` equal to `val`, or
617        // NULL if not found. PG NULL semantics: NULL array → NULL;
618        // NULL val never matches (returns NULL if absent).
619        "array_position" => {
620            if args.len() != 2 {
621                return Err(EvalError::TypeMismatch {
622                    detail: format!("array_position() takes 2 args, got {}", args.len()),
623                });
624            }
625            if matches!(args[0], Value::Null) {
626                return Ok(Value::Null);
627            }
628            if matches!(args[1], Value::Null) {
629                return Ok(Value::Null);
630            }
631            match (&args[0], &args[1]) {
632                (Value::TextArray(items), Value::Text(needle)) => {
633                    for (idx, item) in items.iter().enumerate() {
634                        if let Some(s) = item
635                            && s == needle
636                        {
637                            return Ok(Value::Int(
638                                i32::try_from(idx + 1).unwrap_or(i32::MAX),
639                            ));
640                        }
641                    }
642                    Ok(Value::Null)
643                }
644                (Value::IntArray(items), needle_v)
645                    if matches!(
646                        needle_v,
647                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
648                    ) =>
649                {
650                    let needle: i64 = match *needle_v {
651                        Value::Int(n) => i64::from(n),
652                        Value::SmallInt(n) => i64::from(n),
653                        Value::BigInt(n) => n,
654                        _ => unreachable!(),
655                    };
656                    for (idx, item) in items.iter().enumerate() {
657                        if let Some(n) = item
658                            && i64::from(*n) == needle
659                        {
660                            return Ok(Value::Int(
661                                i32::try_from(idx + 1).unwrap_or(i32::MAX),
662                            ));
663                        }
664                    }
665                    Ok(Value::Null)
666                }
667                (Value::BigIntArray(items), needle_v)
668                    if matches!(
669                        needle_v,
670                        Value::Int(_) | Value::SmallInt(_) | Value::BigInt(_)
671                    ) =>
672                {
673                    let needle: i64 = match *needle_v {
674                        Value::Int(n) => i64::from(n),
675                        Value::SmallInt(n) => i64::from(n),
676                        Value::BigInt(n) => n,
677                        _ => unreachable!(),
678                    };
679                    for (idx, item) in items.iter().enumerate() {
680                        if let Some(n) = item
681                            && *n == needle
682                        {
683                            return Ok(Value::Int(
684                                i32::try_from(idx + 1).unwrap_or(i32::MAX),
685                            ));
686                        }
687                    }
688                    Ok(Value::Null)
689                }
690                (arr @ (Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_)), other) => {
691                    Err(EvalError::TypeMismatch {
692                        detail: format!(
693                            "array_position() needle type {:?} doesn't match array {:?}",
694                            other.data_type(),
695                            arr.data_type()
696                        ),
697                    })
698                }
699                (other, _) => Err(EvalError::TypeMismatch {
700                    detail: format!(
701                        "array_position() first arg must be an array, got {:?}",
702                        other.data_type()
703                    ),
704                }),
705            }
706        }
707        // v7.11.15 — `substring(s, start)` / `substring(s, start, length)`
708        // for both TEXT and BYTEA. PG semantics: `start` is 1-based;
709        // values ≤ 0 clamp into the string (i.e. effective start is
710        // adjusted so the window still begins at index 1 — but
711        // `length` is reduced by the clipped prefix). A NULL arg
712        // makes the result NULL. Out-of-range windows return an
713        // empty value, not NULL.
714        "substring" => {
715            if !matches!(args.len(), 2 | 3) {
716                return Err(EvalError::TypeMismatch {
717                    detail: format!("substring() takes 2 or 3 args, got {}", args.len()),
718                });
719            }
720            if args.iter().any(|a| matches!(a, Value::Null)) {
721                return Ok(Value::Null);
722            }
723            let start: i64 = match args[1] {
724                Value::Int(n) => i64::from(n),
725                Value::BigInt(n) => n,
726                Value::SmallInt(n) => i64::from(n),
727                _ => {
728                    return Err(EvalError::TypeMismatch {
729                        detail: format!(
730                            "substring() start must be integer, got {:?}",
731                            args[1].data_type()
732                        ),
733                    });
734                }
735            };
736            let length: Option<i64> = if args.len() == 3 {
737                match args[2] {
738                    Value::Int(n) => Some(i64::from(n)),
739                    Value::BigInt(n) => Some(n),
740                    Value::SmallInt(n) => Some(i64::from(n)),
741                    _ => {
742                        return Err(EvalError::TypeMismatch {
743                            detail: format!(
744                                "substring() length must be integer, got {:?}",
745                                args[2].data_type()
746                            ),
747                        });
748                    }
749                }
750            } else {
751                None
752            };
753            // PG: when length is given, end = start + length; if
754            // end < start the result is empty. Clip start to 1.
755            let (effective_start, effective_length): (i64, Option<i64>) = match length {
756                Some(len) => {
757                    let end = start.saturating_add(len);
758                    if end <= 1 || len < 0 {
759                        return Ok(match &args[0] {
760                            Value::Text(_) => Value::Text(String::new()),
761                            Value::Bytes(_) => Value::Bytes(Vec::new()),
762                            other => {
763                                return Err(EvalError::TypeMismatch {
764                                    detail: format!(
765                                        "substring() needs text or bytea, got {:?}",
766                                        other.data_type()
767                                    ),
768                                });
769                            }
770                        });
771                    }
772                    let eff_start = start.max(1);
773                    let eff_len = end - eff_start;
774                    (eff_start, Some(eff_len.max(0)))
775                }
776                None => (start.max(1), None),
777            };
778            match &args[0] {
779                Value::Text(s) => {
780                    // PG counts in characters (codepoints) for TEXT.
781                    let chars: Vec<char> = s.chars().collect();
782                    let skip = (effective_start - 1) as usize;
783                    if skip >= chars.len() {
784                        return Ok(Value::Text(String::new()));
785                    }
786                    let take = match effective_length {
787                        Some(n) => (n as usize).min(chars.len() - skip),
788                        None => chars.len() - skip,
789                    };
790                    Ok(Value::Text(chars[skip..skip + take].iter().collect()))
791                }
792                Value::Bytes(b) => {
793                    let skip = (effective_start - 1) as usize;
794                    if skip >= b.len() {
795                        return Ok(Value::Bytes(Vec::new()));
796                    }
797                    let take = match effective_length {
798                        Some(n) => (n as usize).min(b.len() - skip),
799                        None => b.len() - skip,
800                    };
801                    Ok(Value::Bytes(b[skip..skip + take].to_vec()))
802                }
803                other => Err(EvalError::TypeMismatch {
804                    detail: format!(
805                        "substring() needs text or bytea, got {:?}",
806                        other.data_type()
807                    ),
808                }),
809            }
810        }
811        // v7.11.15 — `position(needle, haystack)`. PG semantics:
812        // 1-based byte/char index of first occurrence, or 0 if
813        // absent. NULL on either operand → NULL. Empty needle
814        // returns 1 (PG convention). Works on TEXT (char positions)
815        // and BYTEA (byte positions). (The PG-spec syntax `position(
816        // needle IN haystack)` is not parsed in v7.11; clients must
817        // call the function-call form.)
818        "position" => {
819            if args.len() != 2 {
820                return Err(EvalError::TypeMismatch {
821                    detail: format!("position() takes 2 args, got {}", args.len()),
822                });
823            }
824            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
825                return Ok(Value::Null);
826            }
827            match (&args[0], &args[1]) {
828                (Value::Text(needle), Value::Text(haystack)) => {
829                    if needle.is_empty() {
830                        return Ok(Value::Int(1));
831                    }
832                    // Char-based position (PG uses character count).
833                    let h_chars: Vec<char> = haystack.chars().collect();
834                    let n_chars: Vec<char> = needle.chars().collect();
835                    if n_chars.len() > h_chars.len() {
836                        return Ok(Value::Int(0));
837                    }
838                    for i in 0..=h_chars.len() - n_chars.len() {
839                        if h_chars[i..i + n_chars.len()] == n_chars[..] {
840                            return Ok(Value::Int(
841                                i32::try_from(i + 1).unwrap_or(i32::MAX),
842                            ));
843                        }
844                    }
845                    Ok(Value::Int(0))
846                }
847                (Value::Bytes(needle), Value::Bytes(haystack)) => {
848                    if needle.is_empty() {
849                        return Ok(Value::Int(1));
850                    }
851                    if needle.len() > haystack.len() {
852                        return Ok(Value::Int(0));
853                    }
854                    for i in 0..=haystack.len() - needle.len() {
855                        if &haystack[i..i + needle.len()] == needle.as_slice() {
856                            return Ok(Value::Int(
857                                i32::try_from(i + 1).unwrap_or(i32::MAX),
858                            ));
859                        }
860                    }
861                    Ok(Value::Int(0))
862                }
863                (a, b) => Err(EvalError::TypeMismatch {
864                    detail: format!(
865                        "position() operands must both be text or both bytea, got {:?} and {:?}",
866                        a.data_type(),
867                        b.data_type()
868                    ),
869                }),
870            }
871        }
872        "upper" => {
873            if args.len() != 1 {
874                return Err(EvalError::TypeMismatch {
875                    detail: format!("upper() takes 1 arg, got {}", args.len()),
876                });
877            }
878            match &args[0] {
879                Value::Null => Ok(Value::Null),
880                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
881                other => Err(EvalError::TypeMismatch {
882                    detail: format!("upper() needs text, got {:?}", other.data_type()),
883                }),
884            }
885        }
886        "lower" => {
887            if args.len() != 1 {
888                return Err(EvalError::TypeMismatch {
889                    detail: format!("lower() takes 1 arg, got {}", args.len()),
890                });
891            }
892            match &args[0] {
893                Value::Null => Ok(Value::Null),
894                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
895                other => Err(EvalError::TypeMismatch {
896                    detail: format!("lower() needs text, got {:?}", other.data_type()),
897                }),
898            }
899        }
900        "abs" => {
901            if args.len() != 1 {
902                return Err(EvalError::TypeMismatch {
903                    detail: format!("abs() takes 1 arg, got {}", args.len()),
904                });
905            }
906            match &args[0] {
907                Value::Null => Ok(Value::Null),
908                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
909                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
910                Value::Float(x) => Ok(Value::Float(x.abs())),
911                other => Err(EvalError::TypeMismatch {
912                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
913                }),
914            }
915        }
916        "coalesce" => {
917            for a in args {
918                if !matches!(a, Value::Null) {
919                    return Ok(a.clone());
920                }
921            }
922            Ok(Value::Null)
923        }
924        "date_trunc" => date_trunc(args),
925        "date_part" => date_part(args),
926        "age" => age(args),
927        "to_char" => to_char(args),
928        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
929        "encode" => encode_text(args),
930        "decode" => decode_text(args),
931        "error_on_null" => error_on_null(args),
932        other => Err(EvalError::TypeMismatch {
933            detail: format!("unknown function `{other}`"),
934        }),
935    }
936}
937
938/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
939/// arguments; SPG's value space treats Text as the byte container
940/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
941/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
942/// hex.
943fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
944    if args.len() != 2 {
945        return Err(EvalError::TypeMismatch {
946            detail: format!("encode() takes 2 args, got {}", args.len()),
947        });
948    }
949    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
950        return Ok(Value::Null);
951    }
952    let bytes: &[u8] = match &args[0] {
953        Value::Text(s) => s.as_bytes(),
954        other => {
955            return Err(EvalError::TypeMismatch {
956                detail: format!("encode() expects text bytes, got {:?}", other.data_type()),
957            });
958        }
959    };
960    let fmt = match &args[1] {
961        Value::Text(s) => s.to_ascii_lowercase(),
962        other => {
963            return Err(EvalError::TypeMismatch {
964                detail: format!("encode() format must be text, got {:?}", other.data_type()),
965            });
966        }
967    };
968    let out = match fmt.as_str() {
969        "base64" => b64_encode(bytes, B64_STD),
970        "base64url" => b64_encode(bytes, B64_URL),
971        "base32hex" => b32hex_encode(bytes),
972        "hex" => hex_encode(bytes),
973        other => {
974            return Err(EvalError::TypeMismatch {
975                detail: format!("encode(): unknown format `{other}`"),
976            });
977        }
978    };
979    Ok(Value::Text(out))
980}
981
982/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
983/// Text containing the raw decoded bytes (caller may CAST to bytea
984/// equivalent if SPG adds bytea later).
985fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
986    if args.len() != 2 {
987        return Err(EvalError::TypeMismatch {
988            detail: format!("decode() takes 2 args, got {}", args.len()),
989        });
990    }
991    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
992        return Ok(Value::Null);
993    }
994    let text = match &args[0] {
995        Value::Text(s) => s.as_str(),
996        other => {
997            return Err(EvalError::TypeMismatch {
998                detail: format!("decode() expects text, got {:?}", other.data_type()),
999            });
1000        }
1001    };
1002    let fmt = match &args[1] {
1003        Value::Text(s) => s.to_ascii_lowercase(),
1004        other => {
1005            return Err(EvalError::TypeMismatch {
1006                detail: format!("decode() format must be text, got {:?}", other.data_type()),
1007            });
1008        }
1009    };
1010    let bytes = match fmt.as_str() {
1011        "base64" => b64_decode(text, B64_STD)?,
1012        "base64url" => b64_decode(text, B64_URL)?,
1013        "base32hex" => b32hex_decode(text)?,
1014        "hex" => hex_decode(text)?,
1015        other => {
1016            return Err(EvalError::TypeMismatch {
1017                detail: format!("decode(): unknown format `{other}`"),
1018            });
1019        }
1020    };
1021    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
1022        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
1023    })?;
1024    Ok(Value::Text(s))
1025}
1026
1027/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
1028/// errors otherwise. Convenience to assert NOT NULL inside an
1029/// expression without wrapping it in COALESCE + raise hacks.
1030fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
1031    if args.len() != 1 {
1032        return Err(EvalError::TypeMismatch {
1033            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
1034        });
1035    }
1036    if matches!(args[0], Value::Null) {
1037        return Err(EvalError::TypeMismatch {
1038            detail: "error_on_null(): argument is NULL".into(),
1039        });
1040    }
1041    Ok(args[0].clone())
1042}
1043
1044// ── byte-level encoders ───────────────────────────────────────────
1045
1046const B64_STD: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1047const B64_URL: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
1048const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
1049
1050fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
1051    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1052    let mut i = 0;
1053    while i + 3 <= bytes.len() {
1054        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
1055        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1056        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1057        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1058        out.push(alpha[(n & 0x3f) as usize] as char);
1059        i += 3;
1060    }
1061    let rem = bytes.len() - i;
1062    if rem == 1 {
1063        let n = (bytes[i] as u32) << 16;
1064        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1065        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1066        out.push('=');
1067        out.push('=');
1068    } else if rem == 2 {
1069        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
1070        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
1071        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
1072        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
1073        out.push('=');
1074    }
1075    out
1076}
1077
1078fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
1079    let mut lookup = [255u8; 256];
1080    for (i, &c) in alpha.iter().enumerate() {
1081        lookup[c as usize] = i as u8;
1082    }
1083    let mut out = Vec::with_capacity(text.len() * 3 / 4);
1084    let mut buf: u32 = 0;
1085    let mut bits: u32 = 0;
1086    for c in text.bytes() {
1087        if c == b'=' {
1088            break;
1089        }
1090        if c == b'\n' || c == b'\r' || c == b' ' {
1091            continue;
1092        }
1093        let v = lookup[c as usize];
1094        if v == 255 {
1095            return Err(EvalError::TypeMismatch {
1096                detail: format!("decode(base64): invalid char {:?}", c as char),
1097            });
1098        }
1099        buf = (buf << 6) | v as u32;
1100        bits += 6;
1101        if bits >= 8 {
1102            bits -= 8;
1103            out.push(((buf >> bits) & 0xff) as u8);
1104        }
1105    }
1106    Ok(out)
1107}
1108
1109fn b32hex_encode(bytes: &[u8]) -> String {
1110    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
1111    let mut buf: u64 = 0;
1112    let mut bits: u32 = 0;
1113    for &b in bytes {
1114        buf = (buf << 8) | b as u64;
1115        bits += 8;
1116        while bits >= 5 {
1117            bits -= 5;
1118            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
1119        }
1120    }
1121    if bits > 0 {
1122        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
1123    }
1124    // Pad to multiple of 8.
1125    while out.len() % 8 != 0 {
1126        out.push('=');
1127    }
1128    out
1129}
1130
1131fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1132    let mut lookup = [255u8; 256];
1133    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
1134        lookup[c as usize] = i as u8;
1135        // base32hex is case-insensitive — also map lowercase.
1136        let lower = (c as char).to_ascii_lowercase() as u8;
1137        lookup[lower as usize] = i as u8;
1138    }
1139    let mut out = Vec::with_capacity(text.len() * 5 / 8);
1140    let mut buf: u64 = 0;
1141    let mut bits: u32 = 0;
1142    for c in text.bytes() {
1143        if c == b'=' {
1144            break;
1145        }
1146        if c == b'\n' || c == b'\r' || c == b' ' {
1147            continue;
1148        }
1149        let v = lookup[c as usize];
1150        if v == 255 {
1151            return Err(EvalError::TypeMismatch {
1152                detail: format!("decode(base32hex): invalid char {:?}", c as char),
1153            });
1154        }
1155        buf = (buf << 5) | v as u64;
1156        bits += 5;
1157        if bits >= 8 {
1158            bits -= 8;
1159            out.push(((buf >> bits) & 0xff) as u8);
1160        }
1161    }
1162    Ok(out)
1163}
1164
1165fn hex_encode(bytes: &[u8]) -> String {
1166    const HEX: &[u8; 16] = b"0123456789abcdef";
1167    let mut out = String::with_capacity(bytes.len() * 2);
1168    for &b in bytes {
1169        out.push(HEX[(b >> 4) as usize] as char);
1170        out.push(HEX[(b & 0xf) as usize] as char);
1171    }
1172    out
1173}
1174
1175fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
1176    let trimmed = text.trim();
1177    if trimmed.len() % 2 != 0 {
1178        return Err(EvalError::TypeMismatch {
1179            detail: "decode(hex): input length must be even".into(),
1180        });
1181    }
1182    let mut out = Vec::with_capacity(trimmed.len() / 2);
1183    let mut hi: u8 = 0;
1184    for (i, c) in trimmed.bytes().enumerate() {
1185        let v = match c {
1186            b'0'..=b'9' => c - b'0',
1187            b'a'..=b'f' => c - b'a' + 10,
1188            b'A'..=b'F' => c - b'A' + 10,
1189            _ => {
1190                return Err(EvalError::TypeMismatch {
1191                    detail: format!("decode(hex): invalid char {:?}", c as char),
1192                });
1193            }
1194        };
1195        if i % 2 == 0 {
1196            hi = v;
1197        } else {
1198            out.push((hi << 4) | v);
1199        }
1200    }
1201    Ok(out)
1202}
1203
1204/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
1205/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
1206/// same `BigInt` return shape; PG returns double precision but we keep the
1207/// integer convention so the runner's `query I` shape works unchanged.
1208fn date_part(args: &[Value]) -> Result<Value, EvalError> {
1209    use spg_sql::ast::ExtractField as F;
1210    if args.len() != 2 {
1211        return Err(EvalError::TypeMismatch {
1212            detail: format!("date_part() takes 2 args, got {}", args.len()),
1213        });
1214    }
1215    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1216        return Ok(Value::Null);
1217    }
1218    let Value::Text(field_name) = &args[0] else {
1219        return Err(EvalError::TypeMismatch {
1220            detail: format!(
1221                "date_part() needs a text field, got {:?}",
1222                args[0].data_type()
1223            ),
1224        });
1225    };
1226    let field = match field_name.to_ascii_lowercase().as_str() {
1227        "year" => F::Year,
1228        "month" => F::Month,
1229        "day" => F::Day,
1230        "hour" => F::Hour,
1231        "minute" => F::Minute,
1232        "second" => F::Second,
1233        "microsecond" | "microseconds" => F::Microsecond,
1234        other => {
1235            return Err(EvalError::TypeMismatch {
1236                detail: format!(
1237                    "unknown date_part field {other:?}; \
1238                     supported: year, month, day, hour, minute, second, microsecond"
1239                ),
1240            });
1241        }
1242    };
1243    extract_field(field, &args[1])
1244}
1245
1246/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
1247/// micros-only interval (no months normalisation) because PG's
1248/// month-justification rule is sensitive to the day-of-month walk and
1249/// adds material complexity for marginal corpus value.
1250///
1251/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
1252/// the dispatcher errors instead of guessing a clock source. Callers
1253/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
1254/// explicitly so the clock reference is visible at the SQL layer.
1255fn age(args: &[Value]) -> Result<Value, EvalError> {
1256    if args.is_empty() || args.len() > 2 {
1257        return Err(EvalError::TypeMismatch {
1258            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
1259        });
1260    }
1261    if args.iter().any(|v| matches!(v, Value::Null)) {
1262        return Ok(Value::Null);
1263    }
1264    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
1265    // stays as-is; anything else errors.
1266    let to_micros = |v: &Value| -> Result<i64, EvalError> {
1267        match v {
1268            Value::Timestamp(t) => Ok(*t),
1269            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
1270            other => Err(EvalError::TypeMismatch {
1271                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
1272            }),
1273        }
1274    };
1275    if args.len() == 1 {
1276        return Err(EvalError::TypeMismatch {
1277            detail: "single-arg age() is unsupported in v2.12 \
1278                     (use age(CURRENT_DATE, t) explicitly)"
1279                .into(),
1280        });
1281    }
1282    let a = to_micros(&args[0])?;
1283    let b = to_micros(&args[1])?;
1284    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
1285        detail: "age() subtraction overflows i64 microseconds".into(),
1286    })?;
1287    Ok(Value::Interval {
1288        months: 0,
1289        micros: delta,
1290    })
1291}
1292
1293/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
1294/// format template. Supports the high-traffic placeholders:
1295///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
1296/// Unrecognised characters pass through literally so the template's
1297/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
1298fn to_char(args: &[Value]) -> Result<Value, EvalError> {
1299    use core::fmt::Write as _;
1300    if args.len() != 2 {
1301        return Err(EvalError::TypeMismatch {
1302            detail: format!("to_char() takes 2 args, got {}", args.len()),
1303        });
1304    }
1305    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1306        return Ok(Value::Null);
1307    }
1308    let Value::Text(fmt) = &args[1] else {
1309        return Err(EvalError::TypeMismatch {
1310            detail: format!(
1311                "to_char() needs a text format, got {:?}",
1312                args[1].data_type()
1313            ),
1314        });
1315    };
1316    let (days, day_micros) = match &args[0] {
1317        Value::Date(d) => (*d, 0_i64),
1318        Value::Timestamp(t) => {
1319            let days = t.div_euclid(86_400_000_000);
1320            (
1321                i32::try_from(days).unwrap_or(i32::MAX),
1322                t.rem_euclid(86_400_000_000),
1323            )
1324        }
1325        other => {
1326            return Err(EvalError::TypeMismatch {
1327                detail: format!(
1328                    "to_char() needs DATE or TIMESTAMP, got {:?}",
1329                    other.data_type()
1330                ),
1331            });
1332        }
1333    };
1334    let (y, mo, d) = civil_from_days(days);
1335    let secs = day_micros / 1_000_000;
1336    let frac = day_micros % 1_000_000;
1337    // div_euclid keeps every value non-negative — the casts below are
1338    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
1339    // 1_000_000)`, so all three quantities fit in u32.
1340    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
1341    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
1342    let ss = u32::try_from(secs % 60).unwrap_or(0);
1343    let hh12 = match hh24 % 12 {
1344        0 => 12,
1345        x => x,
1346    };
1347    let ampm = if hh24 < 12 { "AM" } else { "PM" };
1348    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
1349    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
1350
1351    let mut out = String::with_capacity(fmt.len() + 8);
1352    let bytes = fmt.as_bytes();
1353    let mut i = 0;
1354    // write! against a String never fails — discard the Result.
1355    while i < bytes.len() {
1356        // Try the longest prefixes first so "YYYY" wins over "YY".
1357        let rest = &bytes[i..];
1358        if rest.starts_with(b"YYYY") {
1359            let _ = write!(out, "{y:04}");
1360            i += 4;
1361        } else if rest.starts_with(b"YY") {
1362            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
1363            let yy = (y.rem_euclid(100)) as u32;
1364            let _ = write!(out, "{yy:02}");
1365            i += 2;
1366        } else if rest.starts_with(b"Month") {
1367            out.push_str(MONTH_FULL[(mo - 1) as usize]);
1368            i += 5;
1369        } else if rest.starts_with(b"Mon") {
1370            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
1371            i += 3;
1372        } else if rest.starts_with(b"MM") {
1373            let _ = write!(out, "{mo:02}");
1374            i += 2;
1375        } else if rest.starts_with(b"DD") {
1376            let _ = write!(out, "{d:02}");
1377            i += 2;
1378        } else if rest.starts_with(b"HH24") {
1379            let _ = write!(out, "{hh24:02}");
1380            i += 4;
1381        } else if rest.starts_with(b"HH12") {
1382            let _ = write!(out, "{hh12:02}");
1383            i += 4;
1384        } else if rest.starts_with(b"MI") {
1385            let _ = write!(out, "{mi:02}");
1386            i += 2;
1387        } else if rest.starts_with(b"SS") {
1388            let _ = write!(out, "{ss:02}");
1389            i += 2;
1390        } else if rest.starts_with(b"MS") {
1391            let _ = write!(out, "{ms:03}");
1392            i += 2;
1393        } else if rest.starts_with(b"US") {
1394            let _ = write!(out, "{us:06}");
1395            i += 2;
1396        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
1397            out.push_str(ampm);
1398            i += 2;
1399        } else {
1400            // Pass any non-placeholder byte through verbatim.
1401            out.push(bytes[i] as char);
1402            i += 1;
1403        }
1404    }
1405    Ok(Value::Text(out))
1406}
1407
1408const MONTH_FULL: [&str; 12] = [
1409    "January",
1410    "February",
1411    "March",
1412    "April",
1413    "May",
1414    "June",
1415    "July",
1416    "August",
1417    "September",
1418    "October",
1419    "November",
1420    "December",
1421];
1422const MONTH_ABBR: [&str; 12] = [
1423    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1424];
1425
1426/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
1427/// requested calendar boundary (year / month / day / hour / minute /
1428/// second). Returns the truncated `TIMESTAMP`. NULL on either side
1429/// propagates to NULL.
1430fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
1431    if args.len() != 2 {
1432        return Err(EvalError::TypeMismatch {
1433            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
1434        });
1435    }
1436    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1437        return Ok(Value::Null);
1438    }
1439    let Value::Text(unit) = &args[0] else {
1440        return Err(EvalError::TypeMismatch {
1441            detail: format!(
1442                "date_trunc() needs a text unit, got {:?}",
1443                args[0].data_type()
1444            ),
1445        });
1446    };
1447    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
1448    // midnight first; the result is always TIMESTAMP.
1449    let micros = match &args[1] {
1450        Value::Timestamp(t) => *t,
1451        Value::Date(d) => i64::from(*d) * 86_400_000_000,
1452        other => {
1453            return Err(EvalError::TypeMismatch {
1454                detail: format!(
1455                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
1456                    other.data_type()
1457                ),
1458            });
1459        }
1460    };
1461    let unit_lc = unit.to_ascii_lowercase();
1462    let days = micros.div_euclid(86_400_000_000);
1463    let day_micros = micros.rem_euclid(86_400_000_000);
1464    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1465    let (y, m, _) = civil_from_days(day_i32);
1466    let truncated = match unit_lc.as_str() {
1467        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
1468        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
1469        "day" => days * 86_400_000_000,
1470        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
1471        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
1472        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
1473        other => {
1474            return Err(EvalError::TypeMismatch {
1475                detail: format!(
1476                    "unknown date_trunc unit {other:?}; \
1477                     supported: year, month, day, hour, minute, second"
1478                ),
1479            });
1480        }
1481    };
1482    Ok(Value::Timestamp(truncated))
1483}
1484
1485/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
1486pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
1487    if matches!(v, Value::Null) {
1488        return Ok(Value::Null);
1489    }
1490    match target {
1491        CastTarget::Vector => cast_to_vector(v),
1492        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
1493        CastTarget::Int => cast_numeric_to_int(v),
1494        CastTarget::BigInt => cast_numeric_to_bigint(v),
1495        CastTarget::Float => cast_numeric_to_float(v),
1496        CastTarget::Bool => cast_to_bool(v),
1497        CastTarget::Date => cast_to_date(v),
1498        // TIMESTAMP and TIMESTAMPTZ have identical runtime
1499        // representation (i64 microseconds UTC).
1500        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
1501        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
1502        // is supported (the mailrs idiom: `$1::INTERVAL` where the
1503        // bound param is a string like `'7 days'`).
1504        CastTarget::Interval => cast_to_interval(v),
1505        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
1506        // (validation is the producer's responsibility, same as
1507        // the column-INSERT path).
1508        CastTarget::Json | CastTarget::Jsonb => match v {
1509            Value::Json(s) => Ok(Value::Json(s)),
1510            Value::Text(s) => Ok(Value::Json(s)),
1511            other => Err(EvalError::TypeMismatch {
1512                detail: alloc::format!(
1513                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
1514                    other.data_type()
1515                ),
1516            }),
1517        },
1518        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1519        // pg_catalog; surface a clear error.
1520        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1521            detail: "::regtype / ::regclass not supported on SPG \
1522                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1523                .into(),
1524        }),
1525        // v7.10.11 — `::TEXT[]`. Decode PG external array form
1526        // when input is Text; pass through unchanged when it is
1527        // already TextArray. Anything else is a type mismatch.
1528        CastTarget::TextArray => match v {
1529            Value::TextArray(items) => Ok(Value::TextArray(items)),
1530            Value::Text(s) => decode_text_array_external(&s).map(Value::TextArray),
1531            other => Err(EvalError::TypeMismatch {
1532                detail: alloc::format!(
1533                    "::TEXT[] only accepts TEXT / TEXT[] inputs, got {:?}",
1534                    other.data_type()
1535                ),
1536            }),
1537        },
1538        // v7.11.13 — `::INT[]` / `::BIGINT[]`. Decode PG external
1539        // form `{1,2,3}` when input is Text; widen TextArray /
1540        // IntArray as appropriate.
1541        CastTarget::IntArray => cast_to_int_array(v),
1542        CastTarget::BigIntArray => cast_to_bigint_array(v),
1543    }
1544}
1545
1546fn cast_to_int_array(v: Value) -> Result<Value, EvalError> {
1547    match v {
1548        Value::IntArray(items) => Ok(Value::IntArray(items)),
1549        Value::BigIntArray(items) => {
1550            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1551            for item in items {
1552                match item {
1553                    None => out.push(None),
1554                    Some(n) => match i32::try_from(n) {
1555                        Ok(x) => out.push(Some(x)),
1556                        Err(_) => {
1557                            return Err(EvalError::TypeMismatch {
1558                                detail: alloc::format!(
1559                                    "::INT[] element {n} overflows i32"
1560                                ),
1561                            });
1562                        }
1563                    },
1564                }
1565            }
1566            Ok(Value::IntArray(out))
1567        }
1568        Value::Text(s) => decode_int_array_external(&s).map(Value::IntArray),
1569        Value::TextArray(items) => {
1570            let mut out: Vec<Option<i32>> = Vec::with_capacity(items.len());
1571            for item in items {
1572                match item {
1573                    None => out.push(None),
1574                    Some(s) => match s.parse::<i32>() {
1575                        Ok(n) => out.push(Some(n)),
1576                        Err(_) => {
1577                            return Err(EvalError::TypeMismatch {
1578                                detail: alloc::format!("::INT[] cannot parse {s:?}"),
1579                            });
1580                        }
1581                    },
1582                }
1583            }
1584            Ok(Value::IntArray(out))
1585        }
1586        other => Err(EvalError::TypeMismatch {
1587            detail: alloc::format!(
1588                "::INT[] does not accept {:?}",
1589                other.data_type()
1590            ),
1591        }),
1592    }
1593}
1594
1595fn cast_to_bigint_array(v: Value) -> Result<Value, EvalError> {
1596    match v {
1597        Value::BigIntArray(items) => Ok(Value::BigIntArray(items)),
1598        Value::IntArray(items) => Ok(Value::BigIntArray(
1599            items
1600                .into_iter()
1601                .map(|x| x.map(i64::from))
1602                .collect(),
1603        )),
1604        Value::Text(s) => decode_bigint_array_external(&s).map(Value::BigIntArray),
1605        Value::TextArray(items) => {
1606            let mut out: Vec<Option<i64>> = Vec::with_capacity(items.len());
1607            for item in items {
1608                match item {
1609                    None => out.push(None),
1610                    Some(s) => match s.parse::<i64>() {
1611                        Ok(n) => out.push(Some(n)),
1612                        Err(_) => {
1613                            return Err(EvalError::TypeMismatch {
1614                                detail: alloc::format!("::BIGINT[] cannot parse {s:?}"),
1615                            });
1616                        }
1617                    },
1618                }
1619            }
1620            Ok(Value::BigIntArray(out))
1621        }
1622        other => Err(EvalError::TypeMismatch {
1623            detail: alloc::format!(
1624                "::BIGINT[] does not accept {:?}",
1625                other.data_type()
1626            ),
1627        }),
1628    }
1629}
1630
1631fn decode_int_array_external(s: &str) -> Result<Vec<Option<i32>>, EvalError> {
1632    let trimmed = s.trim();
1633    let inner = trimmed
1634        .strip_prefix('{')
1635        .and_then(|x| x.strip_suffix('}'))
1636        .ok_or_else(|| EvalError::TypeMismatch {
1637            detail: alloc::format!("INT[] literal {s:?} must be enclosed in '{{...}}'"),
1638        })?;
1639    if inner.trim().is_empty() {
1640        return Ok(Vec::new());
1641    }
1642    inner
1643        .split(',')
1644        .map(|part| {
1645            let p = part.trim();
1646            if p.eq_ignore_ascii_case("NULL") {
1647                Ok(None)
1648            } else {
1649                p.parse::<i32>().map(Some).map_err(|_| EvalError::TypeMismatch {
1650                    detail: alloc::format!("INT[] element {p:?} is not an i32"),
1651                })
1652            }
1653        })
1654        .collect()
1655}
1656
1657fn decode_bigint_array_external(s: &str) -> Result<Vec<Option<i64>>, EvalError> {
1658    let trimmed = s.trim();
1659    let inner = trimmed
1660        .strip_prefix('{')
1661        .and_then(|x| x.strip_suffix('}'))
1662        .ok_or_else(|| EvalError::TypeMismatch {
1663            detail: alloc::format!("BIGINT[] literal {s:?} must be enclosed in '{{...}}'"),
1664        })?;
1665    if inner.trim().is_empty() {
1666        return Ok(Vec::new());
1667    }
1668    inner
1669        .split(',')
1670        .map(|part| {
1671            let p = part.trim();
1672            if p.eq_ignore_ascii_case("NULL") {
1673                Ok(None)
1674            } else {
1675                p.parse::<i64>().map(Some).map_err(|_| EvalError::TypeMismatch {
1676                    detail: alloc::format!("BIGINT[] element {p:?} is not an i64"),
1677                })
1678            }
1679        })
1680        .collect()
1681}
1682
1683/// v7.10.11 — same decoder as `decode_text_array_literal` in
1684/// `lib.rs`, but lives here so the eval-time cast path stays
1685/// inside `spg-engine::eval`. Kept in lock-step with the engine
1686/// `coerce_value` decoder by tests.
1687fn decode_text_array_external(s: &str) -> Result<Vec<Option<String>>, EvalError> {
1688    let trimmed = s.trim();
1689    let inner = trimmed
1690        .strip_prefix('{')
1691        .and_then(|x| x.strip_suffix('}'))
1692        .ok_or_else(|| EvalError::TypeMismatch {
1693            detail: alloc::format!("TEXT[] literal {s:?} must be enclosed in '{{...}}'"),
1694        })?;
1695    let mut out: Vec<Option<String>> = Vec::new();
1696    if inner.trim().is_empty() {
1697        return Ok(out);
1698    }
1699    let bytes = inner.as_bytes();
1700    let mut i = 0;
1701    while i <= bytes.len() {
1702        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1703            i += 1;
1704        }
1705        if i < bytes.len() && bytes[i] == b'"' {
1706            i += 1;
1707            let mut buf = String::new();
1708            while i < bytes.len() && bytes[i] != b'"' {
1709                if bytes[i] == b'\\' && i + 1 < bytes.len() {
1710                    buf.push(bytes[i + 1] as char);
1711                    i += 2;
1712                } else {
1713                    buf.push(bytes[i] as char);
1714                    i += 1;
1715                }
1716            }
1717            if i >= bytes.len() {
1718                return Err(EvalError::TypeMismatch {
1719                    detail: "unterminated quoted element in TEXT[] literal".into(),
1720                });
1721            }
1722            i += 1;
1723            out.push(Some(buf));
1724        } else {
1725            let start = i;
1726            while i < bytes.len() && bytes[i] != b',' {
1727                i += 1;
1728            }
1729            let raw = inner[start..i].trim();
1730            if raw.eq_ignore_ascii_case("NULL") {
1731                out.push(None);
1732            } else {
1733                out.push(Some(raw.to_string()));
1734            }
1735        }
1736        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1737            i += 1;
1738        }
1739        if i >= bytes.len() {
1740            break;
1741        }
1742        if bytes[i] != b',' {
1743            return Err(EvalError::TypeMismatch {
1744                detail: "expected ',' between TEXT[] elements".into(),
1745            });
1746        }
1747        i += 1;
1748    }
1749    Ok(out)
1750}
1751
1752fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
1753    match v {
1754        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
1755        Value::Text(s) => {
1756            let (months, micros) = spg_sql::parser::parse_interval_text(&s).ok_or_else(|| {
1757                EvalError::TypeMismatch {
1758                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
1759                }
1760            })?;
1761            Ok(Value::Interval { months, micros })
1762        }
1763        other => Err(EvalError::TypeMismatch {
1764            detail: alloc::format!(
1765                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
1766                other.data_type()
1767            ),
1768        }),
1769    }
1770}
1771
1772fn cast_to_date(v: Value) -> Result<Value, EvalError> {
1773    match v {
1774        Value::Date(d) => Ok(Value::Date(d)),
1775        // Integer literals carry days since the Unix epoch — used by
1776        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
1777        Value::Int(n) => Ok(Value::Date(n)),
1778        Value::BigInt(n) => {
1779            i32::try_from(n)
1780                .map(Value::Date)
1781                .map_err(|_| EvalError::TypeMismatch {
1782                    detail: "bigint days-since-epoch out of DATE range".into(),
1783                })
1784        }
1785        // Timestamp truncates to its day boundary.
1786        Value::Timestamp(t) => {
1787            let days = t.div_euclid(86_400_000_000);
1788            i32::try_from(days)
1789                .map(Value::Date)
1790                .map_err(|_| EvalError::TypeMismatch {
1791                    detail: "timestamp out of DATE range".into(),
1792                })
1793        }
1794        Value::Text(s) => parse_date_literal(&s)
1795            .map(Value::Date)
1796            .ok_or(EvalError::TypeMismatch {
1797                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
1798            }),
1799        other => Err(EvalError::TypeMismatch {
1800            detail: format!("cannot cast {:?} to DATE", other.data_type()),
1801        }),
1802    }
1803}
1804
1805fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
1806    match v {
1807        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
1808        // Int / BigInt carry microseconds since the Unix epoch — used
1809        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
1810        // the wall clock as a plain integer literal.
1811        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
1812        Value::BigInt(n) => Ok(Value::Timestamp(n)),
1813        // DATE → TIMESTAMP picks midnight on the date.
1814        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
1815        Value::Text(s) => {
1816            parse_timestamp_literal(&s)
1817                .map(Value::Timestamp)
1818                .ok_or(EvalError::TypeMismatch {
1819                    detail: format!(
1820                        "cannot parse {s:?} as TIMESTAMP \
1821                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
1822                    ),
1823                })
1824        }
1825        other => Err(EvalError::TypeMismatch {
1826            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
1827        }),
1828    }
1829}
1830
1831fn value_to_text(v: &Value) -> String {
1832    match v {
1833        // v7.5.0 — Value is #[non_exhaustive]; any future variant
1834        // without explicit text rendering hits the Debug fallback
1835        // at the end.
1836        Value::SmallInt(n) => format!("{n}"),
1837        Value::Int(n) => format!("{n}"),
1838        Value::BigInt(n) => format!("{n}"),
1839        Value::Float(x) => format!("{x}"),
1840        // v4.9: JSON renders identically to Text — both are raw UTF-8.
1841        Value::Text(s) | Value::Json(s) => s.clone(),
1842        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
1843        Value::Vector(v) => {
1844            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
1845            format!("[{}]", cells.join(", "))
1846        }
1847        // v6.0.1: render SQ8 cells dequantised, so SELECT output
1848        // matches the pgvector wire shape clients expect. The
1849        // recall envelope already absorbs the ≤ (max-min)/255/2
1850        // dequantisation error.
1851        Value::Sq8Vector(q) => {
1852            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
1853                .iter()
1854                .map(|x| format!("{x}"))
1855                .collect();
1856            format!("[{}]", cells.join(", "))
1857        }
1858        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
1859        // for SELECT output.
1860        Value::HalfVector(h) => {
1861            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
1862            format!("[{}]", cells.join(", "))
1863        }
1864        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
1865        Value::Date(d) => format_date(*d),
1866        Value::Timestamp(t) => format_timestamp(*t),
1867        Value::Interval { months, micros } => format_interval(*months, *micros),
1868        Value::Null => "NULL".into(),
1869        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
1870        _ => format!("{v:?}"),
1871    }
1872}
1873
1874/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
1875/// for pre-1970 dates render with a leading `-` on the year.
1876pub fn format_date(days: i32) -> String {
1877    let (y, m, d) = civil_from_days(days);
1878    format!("{y:04}-{m:02}-{d:02}")
1879}
1880
1881/// Render a `Timestamp` (microseconds since epoch) as
1882/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
1883/// dropped; a whole-second value has no fractional part.
1884pub fn format_timestamp(micros: i64) -> String {
1885    const MICROS_PER_DAY: i64 = 86_400_000_000;
1886    // Split into day + intra-day part with proper floor division so
1887    // negative timestamps render right too.
1888    let days = micros.div_euclid(MICROS_PER_DAY);
1889    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
1890    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1891    let (y, m, d) = civil_from_days(day_i32);
1892    let secs = day_micros / 1_000_000;
1893    let frac = day_micros % 1_000_000;
1894    let hh = secs / 3600;
1895    let mm = (secs / 60) % 60;
1896    let ss = secs % 60;
1897    if frac == 0 {
1898        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
1899    } else {
1900        // Strip trailing zeros from the 6-digit fractional component.
1901        let raw = format!("{frac:06}");
1902        let trimmed = raw.trim_end_matches('0');
1903        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
1904    }
1905}
1906
1907/// Howard Hinnant's `civil_from_days` — converts days since the Unix
1908/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
1909/// directions of this calendar conversion live in `eval.rs` so the
1910/// engine never reaches for `std` time facilities.
1911#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1912fn civil_from_days(days: i32) -> (i32, u32, u32) {
1913    let z = i64::from(days) + 719_468;
1914    let era = z.div_euclid(146_097);
1915    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
1916    // every other quantity below — `as u32` truncations are safe by
1917    // construction.
1918    let doe = (z - era * 146_097) as u32;
1919    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
1920    let y_base = i64::from(yoe) + era * 400;
1921    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
1922    let mp = (5 * doy + 2) / 153;
1923    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
1924    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1925    let y = if m <= 2 { y_base + 1 } else { y_base };
1926    (y as i32, m, d)
1927}
1928
1929/// Inverse of `civil_from_days` — converts (year, month, day) to days
1930/// since 1970-01-01. Out-of-range months / days saturate.
1931#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1932pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
1933    let y_adj = if m <= 2 {
1934        i64::from(y) - 1
1935    } else {
1936        i64::from(y)
1937    };
1938    let era = y_adj.div_euclid(400);
1939    let yoe = (y_adj - era * 400) as u32;
1940    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
1941    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1942    let total = era * 146_097 + i64::from(doe) - 719_468;
1943    i32::try_from(total).unwrap_or(i32::MAX)
1944}
1945
1946/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
1947/// `None` on shape / numeric failure; the engine surfaces that as a
1948/// `TypeMismatch` with the original text included.
1949pub fn parse_date_literal(s: &str) -> Option<i32> {
1950    let bytes = s.as_bytes();
1951    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
1952        return None;
1953    }
1954    let y: i32 = s[0..4].parse().ok()?;
1955    let m: u32 = s[5..7].parse().ok()?;
1956    let d: u32 = s[8..10].parse().ok()?;
1957    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
1958        return None;
1959    }
1960    Some(days_from_civil(y, m, d))
1961}
1962
1963/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
1964/// (microseconds since Unix epoch). The time portion is optional;
1965/// missing → midnight. The fractional portion accepts 1–6 digits and
1966/// pads with zeros to microseconds.
1967pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
1968    let trimmed = s.trim();
1969    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
1970        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
1971        None => (trimmed, None),
1972    };
1973    let days = parse_date_literal(date_part)?;
1974    let day_micros = match time_part {
1975        None => 0,
1976        Some(t) => parse_time_of_day_micros(t)?,
1977    };
1978    Some(i64::from(days) * 86_400_000_000 + day_micros)
1979}
1980
1981fn parse_time_of_day_micros(t: &str) -> Option<i64> {
1982    let (time, frac_str) = match t.split_once('.') {
1983        Some((a, b)) => (a, Some(b)),
1984        None => (t, None),
1985    };
1986    let bytes = time.as_bytes();
1987    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
1988        return None;
1989    }
1990    let hh: i64 = time[0..2].parse().ok()?;
1991    let mm: i64 = time[3..5].parse().ok()?;
1992    let ss: i64 = time[6..8].parse().ok()?;
1993    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
1994        return None;
1995    }
1996    let frac_micros: i64 = match frac_str {
1997        None => 0,
1998        Some(f) => {
1999            // Pad right with zeros to 6 digits, then truncate extras.
2000            if f.is_empty() || f.len() > 9 {
2001                return None;
2002            }
2003            let mut padded = String::with_capacity(6);
2004            padded.push_str(&f[..f.len().min(6)]);
2005            while padded.len() < 6 {
2006                padded.push('0');
2007            }
2008            padded.parse().ok()?
2009        }
2010    };
2011    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
2012}
2013
2014/// Render an `Interval { months, micros }` in a PG-ish shape. The output
2015/// mirrors `psql`'s text format: years/months from the months part,
2016/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
2017/// omitted; an all-zero interval renders as `0`.
2018pub fn format_interval(months: i32, micros: i64) -> String {
2019    const MICROS_PER_DAY: i64 = 86_400_000_000;
2020    let mut parts: Vec<String> = Vec::new();
2021    let years = months / 12;
2022    let mons = months % 12;
2023    // PG renders the unit in the singular only for `+1`; `-1` and any
2024    // other value pluralise. Helper closes over that rule.
2025    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
2026        if n == 1 { singular } else { plural }
2027    };
2028    if years != 0 {
2029        parts.push(format!(
2030            "{years} {}",
2031            unit(i64::from(years), "year", "years")
2032        ));
2033    }
2034    if mons != 0 {
2035        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
2036    }
2037    let days = micros / MICROS_PER_DAY;
2038    let mut rem = micros % MICROS_PER_DAY;
2039    if days != 0 {
2040        parts.push(format!("{days} {}", unit(days, "day", "days")));
2041    }
2042    if rem != 0 {
2043        let neg = rem < 0;
2044        if neg {
2045            rem = -rem;
2046        }
2047        let secs = rem / 1_000_000;
2048        let frac = rem % 1_000_000;
2049        let hh = secs / 3600;
2050        let mm = (secs / 60) % 60;
2051        let ss = secs % 60;
2052        let sign = if neg { "-" } else { "" };
2053        if frac == 0 {
2054            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
2055        } else {
2056            let raw = format!("{frac:06}");
2057            let trimmed = raw.trim_end_matches('0');
2058            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
2059        }
2060    }
2061    if parts.is_empty() {
2062        "0".into()
2063    } else {
2064        parts.join(" ")
2065    }
2066}
2067
2068/// Add `months` (signed) to a `(year, month, day)` triple using PG's
2069/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
2070fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
2071    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
2072    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
2073    let new_month_zero = total_months.rem_euclid(12);
2074    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2075    let new_month = (new_month_zero as u32) + 1;
2076    let max_day = days_in_month(new_year, new_month);
2077    (new_year, new_month, d.min(max_day))
2078}
2079
2080const fn days_in_month(y: i32, m: u32) -> u32 {
2081    match m {
2082        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
2083        2 => {
2084            // Proleptic Gregorian leap rule.
2085            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
2086                29
2087            } else {
2088                28
2089            }
2090        }
2091        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
2092        // first, but be defensive) get the 30-day fallback.
2093        _ => 30,
2094    }
2095}
2096
2097/// v7.10.9 — render a TEXT[] in PG's external array form
2098/// (`{a,b,NULL}`). Elements containing whitespace, commas,
2099/// quotes, or braces get double-quoted with `\\` / `\"` escapes.
2100/// NULL elements use the literal token `NULL`. Public so the
2101/// wire layer can produce the canonical text-mode encoding.
2102pub fn format_text_array(items: &[Option<String>]) -> String {
2103    let mut out = String::with_capacity(2 + items.len() * 8);
2104    out.push('{');
2105    for (i, item) in items.iter().enumerate() {
2106        if i > 0 {
2107            out.push(',');
2108        }
2109        match item {
2110            None => out.push_str("NULL"),
2111            Some(s) => {
2112                let needs_quote = s.is_empty()
2113                    || s.eq_ignore_ascii_case("NULL")
2114                    || s.chars()
2115                        .any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
2116                if needs_quote {
2117                    out.push('"');
2118                    for c in s.chars() {
2119                        if c == '"' || c == '\\' {
2120                            out.push('\\');
2121                        }
2122                        out.push(c);
2123                    }
2124                    out.push('"');
2125                } else {
2126                    out.push_str(s);
2127                }
2128            }
2129        }
2130    }
2131    out.push('}');
2132    out
2133}
2134
2135/// v7.11.14 — render an INT[] in PG's external array form
2136/// (`{1,2,NULL}`). Integer payloads never need quoting. NULL
2137/// elements use the literal token `NULL`.
2138pub fn format_int_array(items: &[Option<i32>]) -> String {
2139    let mut out = String::with_capacity(2 + items.len() * 4);
2140    out.push('{');
2141    for (i, item) in items.iter().enumerate() {
2142        if i > 0 {
2143            out.push(',');
2144        }
2145        match item {
2146            None => out.push_str("NULL"),
2147            Some(n) => out.push_str(&n.to_string()),
2148        }
2149    }
2150    out.push('}');
2151    out
2152}
2153
2154/// v7.11.14 — render a BIGINT[] in PG's external array form
2155/// (`{1,2,NULL}`).
2156pub fn format_bigint_array(items: &[Option<i64>]) -> String {
2157    let mut out = String::with_capacity(2 + items.len() * 6);
2158    out.push('{');
2159    for (i, item) in items.iter().enumerate() {
2160        if i > 0 {
2161            out.push(',');
2162        }
2163        match item {
2164            None => out.push_str("NULL"),
2165            Some(n) => out.push_str(&n.to_string()),
2166        }
2167    }
2168    out.push('}');
2169    out
2170}
2171
2172/// v7.10.4 — render a BYTEA payload in PG's hex output format
2173/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
2174/// can emit the canonical bytea-as-text representation.
2175pub fn format_bytea_hex(b: &[u8]) -> String {
2176    let mut out = String::with_capacity(2 + 2 * b.len());
2177    out.push_str("\\x");
2178    const HEX: &[u8; 16] = b"0123456789abcdef";
2179    for byte in b {
2180        out.push(HEX[(byte >> 4) as usize] as char);
2181        out.push(HEX[(byte & 0x0F) as usize] as char);
2182    }
2183    out
2184}
2185
2186/// Render a `Numeric { scaled, scale }` as its decimal text form.
2187/// Negative `scaled` prepends `-` to the absolute value's digits; the
2188/// integer / fractional split is by character count, padding the
2189/// fractional side with leading zeros to exactly `scale` chars.
2190pub fn format_numeric(scaled: i128, scale: u8) -> String {
2191    if scale == 0 {
2192        return format!("{scaled}");
2193    }
2194    let negative = scaled < 0;
2195    let mag_str = scaled.unsigned_abs().to_string();
2196    let mag_bytes = mag_str.as_bytes();
2197    let scale_u = scale as usize;
2198    let mut out = String::with_capacity(mag_str.len() + 3);
2199    if negative {
2200        out.push('-');
2201    }
2202    if mag_bytes.len() <= scale_u {
2203        out.push('0');
2204        out.push('.');
2205        for _ in mag_bytes.len()..scale_u {
2206            out.push('0');
2207        }
2208        out.push_str(&mag_str);
2209    } else {
2210        let split = mag_bytes.len() - scale_u;
2211        out.push_str(&mag_str[..split]);
2212        out.push('.');
2213        out.push_str(&mag_str[split..]);
2214    }
2215    out
2216}
2217
2218fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
2219    match v {
2220        Value::Int(n) => Ok(Value::Int(n)),
2221        Value::BigInt(n) => i32::try_from(n)
2222            .map(Value::Int)
2223            .map_err(|_| EvalError::TypeMismatch {
2224                detail: format!("bigint {n} does not fit in int"),
2225            }),
2226        #[allow(clippy::cast_possible_truncation)]
2227        Value::Float(x) => Ok(Value::Int(x as i32)),
2228        Value::Text(s) => {
2229            s.trim()
2230                .parse::<i32>()
2231                .map(Value::Int)
2232                .map_err(|_| EvalError::TypeMismatch {
2233                    detail: format!("cannot parse {s:?} as int"),
2234                })
2235        }
2236        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
2237        other => Err(EvalError::TypeMismatch {
2238            detail: format!("cannot cast {:?} to int", other.data_type()),
2239        }),
2240    }
2241}
2242
2243fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
2244    match v {
2245        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
2246        Value::BigInt(n) => Ok(Value::BigInt(n)),
2247        #[allow(clippy::cast_possible_truncation)]
2248        Value::Float(x) => Ok(Value::BigInt(x as i64)),
2249        Value::Text(s) => {
2250            s.trim()
2251                .parse::<i64>()
2252                .map(Value::BigInt)
2253                .map_err(|_| EvalError::TypeMismatch {
2254                    detail: format!("cannot parse {s:?} as bigint"),
2255                })
2256        }
2257        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
2258        other => Err(EvalError::TypeMismatch {
2259            detail: format!("cannot cast {:?} to bigint", other.data_type()),
2260        }),
2261    }
2262}
2263
2264fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
2265    match v {
2266        Value::Int(n) => Ok(Value::Float(f64::from(n))),
2267        #[allow(clippy::cast_precision_loss)]
2268        Value::BigInt(n) => Ok(Value::Float(n as f64)),
2269        Value::Float(x) => Ok(Value::Float(x)),
2270        Value::Text(s) => {
2271            s.trim()
2272                .parse::<f64>()
2273                .map(Value::Float)
2274                .map_err(|_| EvalError::TypeMismatch {
2275                    detail: format!("cannot parse {s:?} as float"),
2276                })
2277        }
2278        other => Err(EvalError::TypeMismatch {
2279            detail: format!("cannot cast {:?} to float", other.data_type()),
2280        }),
2281    }
2282}
2283
2284fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
2285    match v {
2286        Value::Bool(b) => Ok(Value::Bool(b)),
2287        Value::Int(n) => Ok(Value::Bool(n != 0)),
2288        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
2289        Value::Text(s) => {
2290            let lo = s.trim().to_ascii_lowercase();
2291            match lo.as_str() {
2292                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
2293                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
2294                _ => Err(EvalError::TypeMismatch {
2295                    detail: format!("cannot parse {s:?} as bool"),
2296                }),
2297            }
2298        }
2299        other => Err(EvalError::TypeMismatch {
2300            detail: format!("cannot cast {:?} to bool", other.data_type()),
2301        }),
2302    }
2303}
2304
2305/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
2306/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
2307pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
2308    match v {
2309        Value::Null => Ok(Value::Null),
2310        Value::Vector(v) => Ok(Value::Vector(v)),
2311        Value::Text(s) => parse_vector_text(&s)
2312            .map(Value::Vector)
2313            .ok_or(EvalError::TypeMismatch {
2314                detail: format!("cannot parse {s:?} as a vector literal"),
2315            }),
2316        other => Err(EvalError::TypeMismatch {
2317            detail: format!("::vector requires text input, got {:?}", other.data_type()),
2318        }),
2319    }
2320}
2321
2322/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
2323fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
2324    let trimmed = s.trim();
2325    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
2326    let trimmed_inner = inner.trim();
2327    if trimmed_inner.is_empty() {
2328        return Some(Vec::new());
2329    }
2330    let mut out = Vec::new();
2331    for part in trimmed_inner.split(',') {
2332        let f: f32 = part.trim().parse().ok()?;
2333        out.push(f);
2334    }
2335    Some(out)
2336}
2337
2338fn literal_to_value(l: &Literal) -> Value {
2339    match l {
2340        Literal::Integer(n) => {
2341            if let Ok(small) = i32::try_from(*n) {
2342                Value::Int(small)
2343            } else {
2344                Value::BigInt(*n)
2345            }
2346        }
2347        Literal::Float(x) => Value::Float(*x),
2348        Literal::String(s) => Value::Text(s.clone()),
2349        Literal::Vector(v) => Value::Vector(v.clone()),
2350        Literal::Bool(b) => Value::Bool(*b),
2351        Literal::Null => Value::Null,
2352        Literal::Interval { months, micros, .. } => Value::Interval {
2353            months: *months,
2354            micros: *micros,
2355        },
2356    }
2357}
2358
2359fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
2360    if let Some(q) = &c.qualifier {
2361        // Multi-table evaluation (joins): the synthesised schema uses
2362        // composite column names "alias.column" so we look that up
2363        // directly. Falls back to the single-table case below if the
2364        // composite isn't present.
2365        let composite = alloc::format!("{q}.{name}", name = c.name);
2366        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
2367            return Ok(row.values[pos].clone());
2368        }
2369        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
2370            qualifier: q.clone(),
2371        })?;
2372        if q != expected {
2373            return Err(EvalError::UnknownQualifier {
2374                qualifier: q.clone(),
2375            });
2376        }
2377    }
2378    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
2379        return Ok(row.values[pos].clone());
2380    }
2381    // Bare-name fallback for joined schemas: match any single composite
2382    // column ending in ".<name>"; ambiguity is an error.
2383    let suffix = alloc::format!(".{name}", name = c.name);
2384    let mut matches = ctx
2385        .columns
2386        .iter()
2387        .enumerate()
2388        .filter(|(_, s)| s.name.ends_with(&suffix));
2389    let first = matches.next();
2390    let extra = matches.next();
2391    match (first, extra) {
2392        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
2393        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
2394            detail: alloc::format!("ambiguous column reference: {}", c.name),
2395        }),
2396        _ => Err(EvalError::ColumnNotFound {
2397            name: c.name.clone(),
2398        }),
2399    }
2400}
2401
2402fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
2403    match (op, v) {
2404        (_, Value::Null) => Ok(Value::Null),
2405        (UnOp::Neg, Value::Int(n)) => {
2406            n.checked_neg()
2407                .map(Value::Int)
2408                .ok_or(EvalError::TypeMismatch {
2409                    detail: "integer overflow on unary -".into(),
2410                })
2411        }
2412        (UnOp::Neg, Value::BigInt(n)) => {
2413            n.checked_neg()
2414                .map(Value::BigInt)
2415                .ok_or(EvalError::TypeMismatch {
2416                    detail: "bigint overflow on unary -".into(),
2417                })
2418        }
2419        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
2420        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
2421            detail: format!("unary - applied to {:?}", other.data_type()),
2422        }),
2423        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
2424        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
2425            detail: format!("NOT applied to {:?}", other.data_type()),
2426        }),
2427    }
2428}
2429
2430/// v7.9.27b — true when two values are "not distinct" per PG:
2431/// both NULL counts as equal; otherwise reduces to regular Eq.
2432fn values_not_distinct(l: &Value, r: &Value) -> bool {
2433    match (l, r) {
2434        (Value::Null, Value::Null) => true,
2435        (Value::Null, _) | (_, Value::Null) => false,
2436        _ => l == r,
2437    }
2438}
2439
2440fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
2441    // SQL three-valued logic for AND / OR with NULL is special — handle before
2442    // the general NULL-propagation rule.
2443    if let BinOp::And = op {
2444        return and_3vl(l, r);
2445    }
2446    if let BinOp::Or = op {
2447        return or_3vl(l, r);
2448    }
2449    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
2450    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
2451    if let BinOp::IsNotDistinctFrom = op {
2452        return Ok(Value::Bool(values_not_distinct(&l, &r)));
2453    }
2454    if let BinOp::IsDistinctFrom = op {
2455        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
2456    }
2457    // Everything else: any NULL operand → NULL.
2458    if l.is_null() || r.is_null() {
2459        return Ok(Value::Null);
2460    }
2461    // NUMERIC arithmetic and comparisons run in fixed-point; promote
2462    // integers to a common NUMERIC scale and stay in i128 throughout.
2463    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
2464        return apply_binary_numeric(op, l, r);
2465    }
2466    // Date / Timestamp arithmetic. PG semantics:
2467    //   * date + int      → date  (int is days)
2468    //   * int + date      → date
2469    //   * date - int      → date
2470    //   * date - date     → int   (days, signed)
2471    //   * timestamp - timestamp → bigint (microseconds, signed)
2472    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
2473    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
2474        return Ok(result);
2475    }
2476    match op {
2477        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
2478        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
2479        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
2480        BinOp::Div => div_op(l, r),
2481        BinOp::L2Distance => l2_distance(l, r),
2482        BinOp::InnerProduct => inner_product(l, r),
2483        BinOp::CosineDistance => cosine_distance(l, r),
2484        BinOp::Concat => Ok(text_concat(&l, &r)),
2485        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
2486        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
2487        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
2488        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
2489        BinOp::JsonContains => crate::json::contains(&l, &r),
2490        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2491            compare(op, &l, &r)
2492        }
2493        BinOp::And | BinOp::Or | BinOp::IsDistinctFrom | BinOp::IsNotDistinctFrom => {
2494            unreachable!("handled above")
2495        }
2496    }
2497}
2498
2499/// Calendar arithmetic. Returns `Some(value)` when the operand pair
2500/// is a date/time combo this function understands, `None` to let the
2501/// caller fall through to the regular numeric / text paths.
2502fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
2503    let int_value = |v: &Value| -> Option<i64> {
2504        match v {
2505            Value::SmallInt(n) => Some(i64::from(*n)),
2506            Value::Int(n) => Some(i64::from(*n)),
2507            Value::BigInt(n) => Some(*n),
2508            _ => None,
2509        }
2510    };
2511    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
2512    // DATE-integer subtraction, otherwise the latter swallows the
2513    // former with an `int_value(Date) = None` no-op fall-through.
2514    match (l, r) {
2515        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
2516            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
2517        }
2518        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
2519            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
2520                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
2521            })?;
2522            return Ok(Some(Value::BigInt(delta)));
2523        }
2524        _ => {}
2525    }
2526    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
2527    // date ± interval → date (if interval is pure days/months with no
2528    // sub-day component) else timestamp, interval ± interval → interval.
2529    if let Some(out) = apply_binary_interval(op, l, r)? {
2530        return Ok(Some(out));
2531    }
2532    match (l, r) {
2533        (Value::Date(d), other) if op == BinOp::Add => {
2534            if let Some(n) = int_value(other) {
2535                let days = i64::from(*d).saturating_add(n);
2536                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2537                    detail: "DATE + integer overflows DATE range".into(),
2538                })?;
2539                return Ok(Some(Value::Date(days32)));
2540            }
2541        }
2542        (other, Value::Date(d)) if op == BinOp::Add => {
2543            if let Some(n) = int_value(other) {
2544                let days = i64::from(*d).saturating_add(n);
2545                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2546                    detail: "integer + DATE overflows DATE range".into(),
2547                })?;
2548                return Ok(Some(Value::Date(days32)));
2549            }
2550        }
2551        (Value::Date(d), other) if op == BinOp::Sub => {
2552            if let Some(n) = int_value(other) {
2553                let days = i64::from(*d).saturating_sub(n);
2554                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2555                    detail: "DATE - integer overflows DATE range".into(),
2556                })?;
2557                return Ok(Some(Value::Date(days32)));
2558            }
2559        }
2560        _ => {}
2561    }
2562    Ok(None)
2563}
2564
2565/// INTERVAL-aware binary ops. Recognises:
2566///   timestamp ± interval → timestamp
2567///   date ± interval      → date (if interval is integral days/months only)
2568///                       → timestamp (if interval has sub-day micros)
2569///   interval ± interval  → interval
2570/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
2571/// the caller can fall through.
2572fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
2573    // Normalise so the interval (if any) is always on the right for Add;
2574    // Sub stays left-handed because it isn't commutative.
2575    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
2576        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
2577        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
2578        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
2579        _ => return Ok(None),
2580    };
2581    let Value::Interval {
2582        months: rhs_months,
2583        micros: rhs_us,
2584    } = rhs
2585    else {
2586        unreachable!("rhs guaranteed to be Interval by the match above");
2587    };
2588    let signed_months = i64::from(*rhs_months) * sign;
2589    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
2590        detail: "INTERVAL micros overflows on negation".into(),
2591    })?;
2592    match lhs {
2593        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
2594            *t,
2595            signed_months,
2596            signed_micros,
2597        )?))),
2598        Value::Date(d) => {
2599            // Date + interval stays a date when the interval has zero
2600            // sub-day microseconds; otherwise promote to TIMESTAMP at
2601            // midnight of the (months-shifted) date first.
2602            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
2603            if day_aligned {
2604                let micros_per_day = 86_400_000_000_i64;
2605                let days_delta = signed_micros / micros_per_day;
2606                let shifted = shift_date_by_months(*d, signed_months)?;
2607                let new_days =
2608                    i64::from(shifted)
2609                        .checked_add(days_delta)
2610                        .ok_or(EvalError::TypeMismatch {
2611                            detail: "DATE ± INTERVAL overflows DATE range".into(),
2612                        })?;
2613                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
2614                    detail: "DATE ± INTERVAL overflows DATE range".into(),
2615                })?;
2616                Ok(Some(Value::Date(days32)))
2617            } else {
2618                let base =
2619                    i64::from(*d)
2620                        .checked_mul(86_400_000_000)
2621                        .ok_or(EvalError::TypeMismatch {
2622                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
2623                        })?;
2624                Ok(Some(Value::Timestamp(add_interval_to_micros(
2625                    base,
2626                    signed_months,
2627                    signed_micros,
2628                )?)))
2629            }
2630        }
2631        Value::Interval {
2632            months: lhs_months,
2633            micros: lhs_us,
2634        } => {
2635            let new_months = i64::from(*lhs_months)
2636                .checked_add(signed_months)
2637                .and_then(|n| i32::try_from(n).ok())
2638                .ok_or(EvalError::TypeMismatch {
2639                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
2640                })?;
2641            let new_micros = lhs_us
2642                .checked_add(signed_micros)
2643                .ok_or(EvalError::TypeMismatch {
2644                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
2645                })?;
2646            Ok(Some(Value::Interval {
2647                months: new_months,
2648                micros: new_micros,
2649            }))
2650        }
2651        _ => Err(EvalError::TypeMismatch {
2652            detail: format!(
2653                "operator {op:?} not defined for {:?} and INTERVAL",
2654                lhs.data_type()
2655            ),
2656        }),
2657    }
2658}
2659
2660/// Shift a `Date` by a signed number of months using the PG clamp rule.
2661fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
2662    let (y, m, day) = civil_from_days(d);
2663    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
2664        detail: "INTERVAL months delta out of i32 range".into(),
2665    })?;
2666    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
2667    Ok(days_from_civil(ny, nm, nd))
2668}
2669
2670/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
2671/// Months part is applied through civil calendar with clamp-to-last-day;
2672/// micros part is plain i64 addition with overflow guard.
2673fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
2674    let mut out = t;
2675    if months != 0 {
2676        const MICROS_PER_DAY: i64 = 86_400_000_000;
2677        let days = out.div_euclid(MICROS_PER_DAY);
2678        let day_micros = out.rem_euclid(MICROS_PER_DAY);
2679        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2680            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
2681        })?;
2682        let shifted_days = shift_date_by_months(day_i32, months)?;
2683        out = i64::from(shifted_days)
2684            .checked_mul(MICROS_PER_DAY)
2685            .and_then(|n| n.checked_add(day_micros))
2686            .ok_or(EvalError::TypeMismatch {
2687                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
2688            })?;
2689    }
2690    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
2691        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
2692    })
2693}
2694
2695/// Dispatch for any binary op when at least one operand is NUMERIC.
2696/// Other-side integers / floats are promoted to a NUMERIC at a common
2697/// scale; all add / sub / mul / div / compare paths stay in i128.
2698#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
2699fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
2700    // Float still wins — Numeric + Float coerces both to f64 and runs
2701    // through the float path. PG demotes Numeric to float in this mix
2702    // too (the documented behaviour for `numeric + double precision`).
2703    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
2704    if float_path {
2705        let af = as_f64(&l)?;
2706        let bf = as_f64(&r)?;
2707        return match op {
2708            BinOp::Add => Ok(Value::Float(af + bf)),
2709            BinOp::Sub => Ok(Value::Float(af - bf)),
2710            BinOp::Mul => Ok(Value::Float(af * bf)),
2711            BinOp::Div => {
2712                if bf == 0.0 {
2713                    Err(EvalError::DivisionByZero)
2714                } else {
2715                    Ok(Value::Float(af / bf))
2716                }
2717            }
2718            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2719                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2720                    detail: "NaN in NUMERIC/Float comparison".into(),
2721                })?;
2722                Ok(Value::Bool(cmp_to_bool(op, ord)))
2723            }
2724            BinOp::Concat => Ok(text_concat(&l, &r)),
2725            other => Err(EvalError::TypeMismatch {
2726                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
2727            }),
2728        };
2729    }
2730    // Promote integer ↔ numeric to a shared scale (max of both sides).
2731    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
2732        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
2733    })?;
2734    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
2735        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
2736    })?;
2737    match op {
2738        BinOp::Add | BinOp::Sub => {
2739            let target_scale = sa.max(sb);
2740            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2741                detail: "NUMERIC overflow on rescale".into(),
2742            })?;
2743            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2744                detail: "NUMERIC overflow on rescale".into(),
2745            })?;
2746            let r = match op {
2747                BinOp::Add => lhs.checked_add(rhs),
2748                BinOp::Sub => lhs.checked_sub(rhs),
2749                _ => unreachable!(),
2750            }
2751            .ok_or(EvalError::TypeMismatch {
2752                detail: "NUMERIC overflow on +/-".into(),
2753            })?;
2754            Ok(Value::Numeric {
2755                scaled: r,
2756                scale: target_scale,
2757            })
2758        }
2759        BinOp::Mul => {
2760            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
2761                detail: "NUMERIC overflow on *".into(),
2762            })?;
2763            Ok(Value::Numeric {
2764                scaled,
2765                scale: sa.saturating_add(sb),
2766            })
2767        }
2768        BinOp::Div => {
2769            if b == 0 {
2770                return Err(EvalError::DivisionByZero);
2771            }
2772            // Result scale: keep the wider operand's scale. Pre-scale
2773            // the numerator so the integer division retains that many
2774            // fractional digits. Round half-away-from-zero.
2775            let target_scale = sa.max(sb);
2776            // Numerator effective scale becomes sa + target_scale; we
2777            // bring it up to (target_scale + sb) so the divisor's scale
2778            // cancels cleanly.
2779            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
2780            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
2781                detail: "NUMERIC overflow on / scaling".into(),
2782            })?;
2783            let half = if b >= 0 { b / 2 } else { -(b / 2) };
2784            let adj = if (num >= 0) == (b >= 0) {
2785                num + half
2786            } else {
2787                num - half
2788            };
2789            Ok(Value::Numeric {
2790                scaled: adj / b,
2791                scale: target_scale,
2792            })
2793        }
2794        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2795            let target_scale = sa.max(sb);
2796            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2797                detail: "NUMERIC overflow on rescale".into(),
2798            })?;
2799            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2800                detail: "NUMERIC overflow on rescale".into(),
2801            })?;
2802            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
2803        }
2804        BinOp::Concat => Ok(text_concat(&l, &r)),
2805        other => Err(EvalError::TypeMismatch {
2806            detail: format!("operator {other:?} not defined for NUMERIC"),
2807        }),
2808    }
2809}
2810
2811/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
2812/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
2813/// returns `None` and the caller raises a type error.
2814fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
2815    match v {
2816        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
2817        Value::Int(n) => Some((i128::from(*n), 0)),
2818        Value::SmallInt(n) => Some((i128::from(*n), 0)),
2819        Value::BigInt(n) => Some((i128::from(*n), 0)),
2820        _ => None,
2821    }
2822}
2823
2824fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
2825    if src == dst {
2826        return Some(scaled);
2827    }
2828    if dst > src {
2829        scaled.checked_mul(pow10_i128(dst - src))
2830    } else {
2831        let drop = pow10_i128(src - dst);
2832        let half = drop / 2;
2833        let r = if scaled >= 0 {
2834            scaled + half
2835        } else {
2836            scaled - half
2837        };
2838        Some(r / drop)
2839    }
2840}
2841
2842const fn pow10_i128(p: u8) -> i128 {
2843    let mut acc: i128 = 1;
2844    let mut i = 0;
2845    while i < p {
2846        acc *= 10;
2847        i += 1;
2848    }
2849    acc
2850}
2851
2852const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
2853    use core::cmp::Ordering::{Equal, Greater, Less};
2854    match op {
2855        BinOp::Eq => matches!(ord, Equal),
2856        BinOp::NotEq => !matches!(ord, Equal),
2857        BinOp::Lt => matches!(ord, Less),
2858        BinOp::LtEq => matches!(ord, Less | Equal),
2859        BinOp::Gt => matches!(ord, Greater),
2860        BinOp::GtEq => matches!(ord, Greater | Equal),
2861        _ => false,
2862    }
2863}
2864
2865/// SQL `||` string concatenation. Operands are coerced to text via the same
2866/// rule as `::text` cast. NULL propagates (handled above; this function only
2867/// runs with non-NULL operands).
2868fn text_concat(l: &Value, r: &Value) -> Value {
2869    // v7.11.8 — PG `||` overloads: TEXT[] || TEXT[] = concatenated array;
2870    // TEXT[] || TEXT (or TEXT || TEXT[]) prepends/appends the single
2871    // element. NULL || anything = NULL (PG semantics for arrays;
2872    // text concat treats NULL the same way after value_to_text).
2873    match (l, r) {
2874        (Value::Null, _) | (_, Value::Null) => {
2875            // PG text concat: NULL || x = NULL. Array concat: NULL || x = NULL.
2876            // Keep the legacy text path (value_to_text handles Null as ""),
2877            // but for arrays we surface real NULL to match PG.
2878            if matches!(
2879                l,
2880                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
2881            ) || matches!(
2882                r,
2883                Value::TextArray(_) | Value::IntArray(_) | Value::BigIntArray(_) | Value::Bytes(_)
2884            ) {
2885                return Value::Null;
2886            }
2887        }
2888        (Value::TextArray(a), Value::TextArray(b)) => {
2889            let mut out = a.clone();
2890            out.extend(b.iter().cloned());
2891            return Value::TextArray(out);
2892        }
2893        (Value::TextArray(a), Value::Text(s)) => {
2894            let mut out = a.clone();
2895            out.push(Some(s.clone()));
2896            return Value::TextArray(out);
2897        }
2898        (Value::Text(s), Value::TextArray(b)) => {
2899            let mut out: alloc::vec::Vec<Option<alloc::string::String>> =
2900                alloc::vec::Vec::with_capacity(1 + b.len());
2901            out.push(Some(s.clone()));
2902            out.extend(b.iter().cloned());
2903            return Value::TextArray(out);
2904        }
2905        // v7.11.13 — IntArray / BigIntArray `||` overloads. Same
2906        // PG semantics as TEXT[]: array||array concatenates, and
2907        // array||scalar appends/prepends. Mixed Int/BigInt widens
2908        // to BigIntArray.
2909        (Value::IntArray(a), Value::IntArray(b)) => {
2910            let mut out = a.clone();
2911            out.extend(b.iter().copied());
2912            return Value::IntArray(out);
2913        }
2914        (Value::IntArray(a), Value::Int(n)) => {
2915            let mut out = a.clone();
2916            out.push(Some(*n));
2917            return Value::IntArray(out);
2918        }
2919        (Value::IntArray(a), Value::SmallInt(n)) => {
2920            let mut out = a.clone();
2921            out.push(Some(i32::from(*n)));
2922            return Value::IntArray(out);
2923        }
2924        (Value::Int(n), Value::IntArray(b)) => {
2925            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
2926            out.push(Some(*n));
2927            out.extend(b.iter().copied());
2928            return Value::IntArray(out);
2929        }
2930        (Value::SmallInt(n), Value::IntArray(b)) => {
2931            let mut out: alloc::vec::Vec<Option<i32>> = alloc::vec::Vec::with_capacity(1 + b.len());
2932            out.push(Some(i32::from(*n)));
2933            out.extend(b.iter().copied());
2934            return Value::IntArray(out);
2935        }
2936        (Value::BigIntArray(a), Value::BigIntArray(b)) => {
2937            let mut out = a.clone();
2938            out.extend(b.iter().copied());
2939            return Value::BigIntArray(out);
2940        }
2941        (Value::BigIntArray(a), Value::IntArray(b)) => {
2942            let mut out = a.clone();
2943            out.extend(b.iter().map(|o| o.map(i64::from)));
2944            return Value::BigIntArray(out);
2945        }
2946        (Value::IntArray(a), Value::BigIntArray(b)) => {
2947            let mut out: alloc::vec::Vec<Option<i64>> =
2948                a.iter().map(|o| o.map(i64::from)).collect();
2949            out.extend(b.iter().copied());
2950            return Value::BigIntArray(out);
2951        }
2952        (Value::BigIntArray(a), Value::BigInt(n)) => {
2953            let mut out = a.clone();
2954            out.push(Some(*n));
2955            return Value::BigIntArray(out);
2956        }
2957        (Value::BigIntArray(a), Value::Int(n)) => {
2958            let mut out = a.clone();
2959            out.push(Some(i64::from(*n)));
2960            return Value::BigIntArray(out);
2961        }
2962        (Value::BigIntArray(a), Value::SmallInt(n)) => {
2963            let mut out = a.clone();
2964            out.push(Some(i64::from(*n)));
2965            return Value::BigIntArray(out);
2966        }
2967        (Value::BigInt(n), Value::BigIntArray(b)) => {
2968            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
2969            out.push(Some(*n));
2970            out.extend(b.iter().copied());
2971            return Value::BigIntArray(out);
2972        }
2973        (Value::Int(n), Value::BigIntArray(b)) => {
2974            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
2975            out.push(Some(i64::from(*n)));
2976            out.extend(b.iter().copied());
2977            return Value::BigIntArray(out);
2978        }
2979        (Value::SmallInt(n), Value::BigIntArray(b)) => {
2980            let mut out: alloc::vec::Vec<Option<i64>> = alloc::vec::Vec::with_capacity(1 + b.len());
2981            out.push(Some(i64::from(*n)));
2982            out.extend(b.iter().copied());
2983            return Value::BigIntArray(out);
2984        }
2985        // v7.11.15 — BYTEA `||` is byte concatenation.
2986        (Value::Bytes(a), Value::Bytes(b)) => {
2987            let mut out = a.clone();
2988            out.extend_from_slice(b);
2989            return Value::Bytes(out);
2990        }
2991        _ => {}
2992    }
2993    let a = value_to_text(l);
2994    let b = value_to_text(r);
2995    Value::Text(a + &b)
2996}
2997
2998/// pgvector inner-product `<#>`. Returns the *negative* dot product so
2999/// smaller still means more similar — same convention as pgvector.
3000fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
3001    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
3002    let mut dot: f64 = 0.0;
3003    for (x, y) in a.iter().zip(b.iter()) {
3004        dot += f64::from(*x) * f64::from(*y);
3005    }
3006    Ok(Value::Float(-dot))
3007}
3008
3009/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
3010/// operand produces NaN (matches pgvector).
3011fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
3012    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
3013    let mut dot: f64 = 0.0;
3014    let mut na: f64 = 0.0;
3015    let mut nb: f64 = 0.0;
3016    for (x, y) in a.iter().zip(b.iter()) {
3017        let xf = f64::from(*x);
3018        let yf = f64::from(*y);
3019        dot += xf * yf;
3020        na += xf * xf;
3021        nb += yf * yf;
3022    }
3023    let denom = sqrt_newton(na) * sqrt_newton(nb);
3024    if denom == 0.0 {
3025        return Ok(Value::Float(f64::NAN));
3026    }
3027    Ok(Value::Float(1.0 - dot / denom))
3028}
3029
3030fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
3031    // v6.0.1: SQ8 cells coming through the SQL evaluator are
3032    // dequantised to f32 here so the existing scalar distance
3033    // arithmetic stays intact. HNSW kNN search continues to use
3034    // the asymmetric ADC variant inside `cell_to_query_metric_
3035    // distance` — this path only runs when a vector expression
3036    // lands in the evaluator (full-scan ORDER BY, SELECT
3037    // projection of `v <-> $1`, etc.).
3038    let to_f32 = |v: Value| -> Option<Vec<f32>> {
3039        match v {
3040            Value::Vector(a) => Some(a),
3041            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
3042            // v6.0.3: bit-exact dequant for halfvec cells.
3043            Value::HalfVector(h) => Some(h.to_f32_vec()),
3044            _ => None,
3045        }
3046    };
3047    let l_ty = l.data_type();
3048    let r_ty = r.data_type();
3049    match (to_f32(l), to_f32(r)) {
3050        (Some(a), Some(b)) => {
3051            if a.len() != b.len() {
3052                return Err(EvalError::TypeMismatch {
3053                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
3054                });
3055            }
3056            Ok((a, b))
3057        }
3058        _ => Err(EvalError::TypeMismatch {
3059            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
3060        }),
3061    }
3062}
3063
3064/// Numeric arithmetic with widening.
3065/// - both `Int` → `Int` (with overflow check)
3066/// - `Int` op `BigInt` (either side) → `BigInt`
3067/// - any `Float` involved → `Float`
3068fn arith(
3069    l: Value,
3070    r: Value,
3071    int_op: impl Fn(i64, i64) -> Option<i64>,
3072    float_op: impl Fn(f64, f64) -> f64,
3073    op_name: &str,
3074) -> Result<Value, EvalError> {
3075    // Widen SmallInt to Int up front so the rest of the arithmetic
3076    // table only deals with Int / BigInt / Float pairs.
3077    let widen = |v: Value| -> Value {
3078        match v {
3079            Value::SmallInt(n) => Value::Int(i32::from(n)),
3080            other => other,
3081        }
3082    };
3083    let l = widen(l);
3084    let r = widen(r);
3085    match (l, r) {
3086        (Value::Int(a), Value::Int(b)) => {
3087            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
3088                detail: format!("integer overflow on {op_name}"),
3089            })?;
3090            if let Ok(small) = i32::try_from(result) {
3091                Ok(Value::Int(small))
3092            } else {
3093                Ok(Value::BigInt(result))
3094            }
3095        }
3096        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
3097            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
3098                detail: format!("bigint overflow on {op_name}"),
3099            })?;
3100            Ok(Value::BigInt(result))
3101        }
3102        (Value::BigInt(a), Value::BigInt(b)) => {
3103            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
3104                detail: format!("bigint overflow on {op_name}"),
3105            })?;
3106            Ok(Value::BigInt(result))
3107        }
3108        (a, b)
3109            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
3110        {
3111            let af = as_f64(&a)?;
3112            let bf = as_f64(&b)?;
3113            Ok(Value::Float(float_op(af, bf)))
3114        }
3115        (a, b) => Err(EvalError::TypeMismatch {
3116            detail: format!(
3117                "{op_name} applied to non-numeric: {:?} vs {:?}",
3118                a.data_type(),
3119                b.data_type()
3120            ),
3121        }),
3122    }
3123}
3124
3125/// L2 (Euclidean) distance between two vectors of equal dimension.
3126/// Returned as `Value::Float(d)` so it composes with the existing
3127/// comparison / sort plumbing. Mismatched dims or non-vector operands
3128/// raise `TypeMismatch`.
3129#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
3130fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
3131    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
3132    // cells dequantise on the way in. Sub-f64 precision loss is
3133    // negligible vs the dequantisation noise the SQ8 path already
3134    // ships with.
3135    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
3136    let mut sum: f64 = 0.0;
3137    for (x, y) in a.iter().zip(b.iter()) {
3138        let d = f64::from(*x) - f64::from(*y);
3139        sum += d * d;
3140    }
3141    Ok(Value::Float(sqrt_newton(sum)))
3142}
3143
3144/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
3145/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
3146/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
3147/// f32-derived distances, always non-negative, never NaN).
3148fn sqrt_newton(x: f64) -> f64 {
3149    if x <= 0.0 {
3150        return 0.0;
3151    }
3152    let mut g = x;
3153    // 10 iterations is conservative; 6 already converges to ulp for typical
3154    // distances.
3155    for _ in 0..10 {
3156        g = 0.5 * (g + x / g);
3157    }
3158    g
3159}
3160
3161fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
3162    let any_float = matches!(l.data_type(), Some(DataType::Float))
3163        || matches!(r.data_type(), Some(DataType::Float));
3164    if any_float {
3165        let a = as_f64(&l)?;
3166        let b = as_f64(&r)?;
3167        if b == 0.0 {
3168            return Err(EvalError::DivisionByZero);
3169        }
3170        return Ok(Value::Float(a / b));
3171    }
3172    arith(
3173        l,
3174        r,
3175        |a, b| {
3176            if b == 0 { None } else { Some(a / b) }
3177        },
3178        |a, b| a / b,
3179        "/",
3180    )
3181    .map_err(|e| match e {
3182        // The closure returns None on b == 0; translate that into the dedicated
3183        // DivisionByZero variant instead of "integer overflow on /".
3184        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
3185        other => other,
3186    })
3187}
3188
3189fn as_f64(v: &Value) -> Result<f64, EvalError> {
3190    match v {
3191        Value::SmallInt(n) => Ok(f64::from(*n)),
3192        Value::Int(n) => Ok(f64::from(*n)),
3193        #[allow(clippy::cast_precision_loss)]
3194        Value::BigInt(n) => Ok(*n as f64),
3195        Value::Float(x) => Ok(*x),
3196        #[allow(clippy::cast_precision_loss)]
3197        Value::Numeric { scaled, scale } => {
3198            let mut div = 1.0_f64;
3199            for _ in 0..*scale {
3200                div *= 10.0;
3201            }
3202            Ok((*scaled as f64) / div)
3203        }
3204        other => Err(EvalError::TypeMismatch {
3205            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
3206        }),
3207    }
3208}
3209
3210fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
3211    let ord = match (l, r) {
3212        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
3213        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
3214        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
3215        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
3216        (a, b)
3217            if matches!(a.data_type(), Some(DataType::Float))
3218                || matches!(b.data_type(), Some(DataType::Float)) =>
3219        {
3220            let af = as_f64(a)?;
3221            let bf = as_f64(b)?;
3222            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
3223                detail: "NaN in comparison".into(),
3224            })?
3225        }
3226        (Value::Text(a), Value::Text(b)) => a.cmp(b),
3227        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
3228        // Date / Timestamp compare on their integer storage repr.
3229        // Cross-domain (Date vs Timestamp) lifts the Date to the
3230        // matching midnight TIMESTAMP first.
3231        (Value::Date(a), Value::Date(b)) => a.cmp(b),
3232        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
3233        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
3234        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
3235        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
3236        // column against a text literal lifts the literal into the
3237        // matching domain (e.g. `day >= '2024-01-01'`).
3238        (Value::Date(a), Value::Text(b)) => {
3239            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
3240                detail: format!("cannot parse {b:?} as DATE for comparison"),
3241            })?;
3242            a.cmp(&bd)
3243        }
3244        (Value::Text(a), Value::Date(b)) => {
3245            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
3246                detail: format!("cannot parse {a:?} as DATE for comparison"),
3247            })?;
3248            ad.cmp(b)
3249        }
3250        (Value::Timestamp(a), Value::Text(b)) => {
3251            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
3252                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
3253            })?;
3254            a.cmp(&bt)
3255        }
3256        (Value::Text(a), Value::Timestamp(b)) => {
3257            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
3258                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
3259            })?;
3260            at.cmp(b)
3261        }
3262        (a, b) => {
3263            return Err(EvalError::TypeMismatch {
3264                detail: format!(
3265                    "comparison between {:?} and {:?}",
3266                    a.data_type(),
3267                    b.data_type()
3268                ),
3269            });
3270        }
3271    };
3272    let result = match op {
3273        BinOp::Eq => ord.is_eq(),
3274        BinOp::NotEq => !ord.is_eq(),
3275        BinOp::Lt => ord.is_lt(),
3276        BinOp::LtEq => ord.is_le(),
3277        BinOp::Gt => ord.is_gt(),
3278        BinOp::GtEq => ord.is_ge(),
3279        BinOp::And
3280        | BinOp::Or
3281        | BinOp::Add
3282        | BinOp::Sub
3283        | BinOp::Mul
3284        | BinOp::Div
3285        | BinOp::L2Distance
3286        | BinOp::InnerProduct
3287        | BinOp::CosineDistance
3288        | BinOp::Concat
3289        | BinOp::JsonGet
3290        | BinOp::JsonGetText
3291        | BinOp::JsonGetPath
3292        | BinOp::JsonGetPathText
3293        | BinOp::JsonContains
3294        | BinOp::IsDistinctFrom
3295        | BinOp::IsNotDistinctFrom => {
3296            unreachable!("compare() only called with comparison ops")
3297        }
3298    };
3299    Ok(Value::Bool(result))
3300}
3301
3302// SQL three-valued AND / OR.
3303fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
3304    match (l, r) {
3305        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
3306        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
3307        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
3308        (a, b) => Err(EvalError::TypeMismatch {
3309            detail: format!(
3310                "AND on non-boolean: {:?} and {:?}",
3311                a.data_type(),
3312                b.data_type()
3313            ),
3314        }),
3315    }
3316}
3317
3318fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
3319    match (l, r) {
3320        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
3321        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
3322        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
3323        (a, b) => Err(EvalError::TypeMismatch {
3324            detail: format!(
3325                "OR on non-boolean: {:?} and {:?}",
3326                a.data_type(),
3327                b.data_type()
3328            ),
3329        }),
3330    }
3331}
3332
3333#[cfg(test)]
3334mod tests {
3335    use super::*;
3336    use alloc::vec;
3337    use spg_storage::{ColumnSchema, Row};
3338
3339    fn col(name: &str, ty: DataType) -> ColumnSchema {
3340        ColumnSchema::new(name, ty, true)
3341    }
3342
3343    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
3344        EvalContext::new(cols, alias)
3345    }
3346
3347    fn lit(n: i64) -> Expr {
3348        Expr::Literal(Literal::Integer(n))
3349    }
3350
3351    fn null() -> Expr {
3352        Expr::Literal(Literal::Null)
3353    }
3354
3355    fn col_ref(name: &str) -> Expr {
3356        Expr::Column(ColumnName {
3357            qualifier: None,
3358            name: name.into(),
3359        })
3360    }
3361
3362    #[test]
3363    fn literal_evaluates_to_value() {
3364        let r = Row::new(vec![]);
3365        let cs: [ColumnSchema; 0] = [];
3366        let c = ctx(&cs, None);
3367        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
3368        assert_eq!(
3369            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
3370            Value::Float(1.5)
3371        );
3372        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
3373    }
3374
3375    #[test]
3376    fn column_lookup_unqualified() {
3377        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
3378        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
3379        let c = ctx(&cs, None);
3380        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
3381        assert_eq!(
3382            eval_expr(&col_ref("b"), &r, &c).unwrap(),
3383            Value::Text("hi".into())
3384        );
3385    }
3386
3387    #[test]
3388    fn column_not_found_errors() {
3389        let cs = vec![col("a", DataType::Int)];
3390        let r = Row::new(vec![Value::Int(0)]);
3391        let c = ctx(&cs, None);
3392        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
3393        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
3394    }
3395
3396    #[test]
3397    fn qualified_column_matches_alias() {
3398        let cs = vec![col("a", DataType::Int)];
3399        let r = Row::new(vec![Value::Int(5)]);
3400        let c = ctx(&cs, Some("u"));
3401        let qualified = Expr::Column(ColumnName {
3402            qualifier: Some("u".into()),
3403            name: "a".into(),
3404        });
3405        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
3406    }
3407
3408    #[test]
3409    fn qualified_column_unknown_alias_errors() {
3410        let cs = vec![col("a", DataType::Int)];
3411        let r = Row::new(vec![Value::Int(5)]);
3412        let c = ctx(&cs, Some("u"));
3413        let wrong = Expr::Column(ColumnName {
3414            qualifier: Some("x".into()),
3415            name: "a".into(),
3416        });
3417        assert!(matches!(
3418            eval_expr(&wrong, &r, &c).unwrap_err(),
3419            EvalError::UnknownQualifier { .. }
3420        ));
3421    }
3422
3423    #[test]
3424    fn arithmetic_with_widening() {
3425        let r = Row::new(vec![]);
3426        let cs: [ColumnSchema; 0] = [];
3427        let c = ctx(&cs, None);
3428        let e = Expr::Binary {
3429            lhs: alloc::boxed::Box::new(lit(2)),
3430            op: BinOp::Add,
3431            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
3432        };
3433        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
3434    }
3435
3436    #[test]
3437    fn division_by_zero_errors() {
3438        let r = Row::new(vec![]);
3439        let cs: [ColumnSchema; 0] = [];
3440        let c = ctx(&cs, None);
3441        let e = Expr::Binary {
3442            lhs: alloc::boxed::Box::new(lit(1)),
3443            op: BinOp::Div,
3444            rhs: alloc::boxed::Box::new(lit(0)),
3445        };
3446        assert_eq!(
3447            eval_expr(&e, &r, &c).unwrap_err(),
3448            EvalError::DivisionByZero
3449        );
3450    }
3451
3452    #[test]
3453    fn comparison_returns_bool() {
3454        let r = Row::new(vec![]);
3455        let cs: [ColumnSchema; 0] = [];
3456        let c = ctx(&cs, None);
3457        let e = Expr::Binary {
3458            lhs: alloc::boxed::Box::new(lit(1)),
3459            op: BinOp::Lt,
3460            rhs: alloc::boxed::Box::new(lit(2)),
3461        };
3462        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
3463    }
3464
3465    #[test]
3466    fn null_propagates_through_arithmetic() {
3467        let r = Row::new(vec![]);
3468        let cs: [ColumnSchema; 0] = [];
3469        let c = ctx(&cs, None);
3470        let e = Expr::Binary {
3471            lhs: alloc::boxed::Box::new(lit(1)),
3472            op: BinOp::Add,
3473            rhs: alloc::boxed::Box::new(null()),
3474        };
3475        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
3476    }
3477
3478    #[test]
3479    fn and_three_valued_logic() {
3480        let r = Row::new(vec![]);
3481        let cs: [ColumnSchema; 0] = [];
3482        let c = ctx(&cs, None);
3483        let tt = |a: bool, b_null: bool| Expr::Binary {
3484            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
3485            op: BinOp::And,
3486            rhs: alloc::boxed::Box::new(if b_null {
3487                null()
3488            } else {
3489                Expr::Literal(Literal::Bool(true))
3490            }),
3491        };
3492        // FALSE AND NULL → FALSE
3493        assert_eq!(
3494            eval_expr(&tt(false, true), &r, &c).unwrap(),
3495            Value::Bool(false)
3496        );
3497        // TRUE AND NULL → NULL
3498        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
3499        // TRUE AND TRUE → TRUE
3500        assert_eq!(
3501            eval_expr(&tt(true, false), &r, &c).unwrap(),
3502            Value::Bool(true)
3503        );
3504    }
3505
3506    #[test]
3507    fn or_three_valued_logic() {
3508        let r = Row::new(vec![]);
3509        let cs: [ColumnSchema; 0] = [];
3510        let c = ctx(&cs, None);
3511        let or_with_null = |a: bool| Expr::Binary {
3512            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
3513            op: BinOp::Or,
3514            rhs: alloc::boxed::Box::new(null()),
3515        };
3516        // TRUE OR NULL → TRUE
3517        assert_eq!(
3518            eval_expr(&or_with_null(true), &r, &c).unwrap(),
3519            Value::Bool(true)
3520        );
3521        // FALSE OR NULL → NULL
3522        assert_eq!(
3523            eval_expr(&or_with_null(false), &r, &c).unwrap(),
3524            Value::Null
3525        );
3526    }
3527
3528    #[test]
3529    fn not_on_null_is_null() {
3530        let r = Row::new(vec![]);
3531        let cs: [ColumnSchema; 0] = [];
3532        let c = ctx(&cs, None);
3533        let e = Expr::Unary {
3534            op: UnOp::Not,
3535            expr: alloc::boxed::Box::new(null()),
3536        };
3537        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
3538    }
3539
3540    #[test]
3541    fn text_comparison_lexicographic() {
3542        let r = Row::new(vec![]);
3543        let cs: [ColumnSchema; 0] = [];
3544        let c = ctx(&cs, None);
3545        let e = Expr::Binary {
3546            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
3547            op: BinOp::Lt,
3548            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
3549        };
3550        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
3551    }
3552
3553    #[test]
3554    fn interval_format_basics() {
3555        assert_eq!(format_interval(0, 0), "0");
3556        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
3557        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
3558        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
3559        assert_eq!(
3560            format_interval(0, 86_400_000_000 + 9_000_000),
3561            "1 day 00:00:09"
3562        );
3563        assert_eq!(format_interval(14, 0), "1 year 2 mons");
3564        assert_eq!(format_interval(-1, 0), "-1 mons");
3565    }
3566
3567    #[test]
3568    fn interval_add_to_timestamp_micros_part() {
3569        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
3570        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
3571        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
3572        let expected = ts + 3_600_000_000;
3573        assert_eq!(r, expected);
3574    }
3575
3576    #[test]
3577    fn interval_clamp_month_end() {
3578        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
3579        let d = days_from_civil(2024, 1, 31);
3580        let shifted = shift_date_by_months(d, 1).unwrap();
3581        let (y, m, day) = civil_from_days(shifted);
3582        assert_eq!((y, m, day), (2024, 2, 29));
3583        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
3584        let d = days_from_civil(2023, 1, 31);
3585        let shifted = shift_date_by_months(d, 1).unwrap();
3586        let (y, m, day) = civil_from_days(shifted);
3587        assert_eq!((y, m, day), (2023, 2, 28));
3588        // 2024-03-31 - 1 month = 2024-02-29.
3589        let d = days_from_civil(2024, 3, 31);
3590        let shifted = shift_date_by_months(d, -1).unwrap();
3591        let (y, m, day) = civil_from_days(shifted);
3592        assert_eq!((y, m, day), (2024, 2, 29));
3593    }
3594
3595    #[test]
3596    fn interval_date_plus_pure_days_stays_date() {
3597        // DATE + INTERVAL '7 days' must stay DATE.
3598        let d = days_from_civil(2024, 6, 1);
3599        let lhs = Value::Date(d);
3600        let rhs = Value::Interval {
3601            months: 0,
3602            micros: 7 * 86_400_000_000,
3603        };
3604        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
3605            .unwrap()
3606            .unwrap();
3607        let expected = days_from_civil(2024, 6, 8);
3608        assert_eq!(v, Value::Date(expected));
3609    }
3610
3611    #[test]
3612    fn interval_date_plus_sub_day_lifts_to_timestamp() {
3613        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
3614        let d = days_from_civil(2024, 6, 1);
3615        let lhs = Value::Date(d);
3616        let rhs = Value::Interval {
3617            months: 0,
3618            micros: 3_600_000_000,
3619        };
3620        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
3621            .unwrap()
3622            .unwrap();
3623        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
3624        assert_eq!(v, Value::Timestamp(expected));
3625    }
3626}