Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21
22use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
23use spg_storage::{ColumnSchema, DataType, Row, Value};
24
25/// Resolution context for evaluating a single row. `table_alias` is the alias
26/// (or table name) callers should accept as the qualifier on a column ref —
27/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
28#[derive(Debug, Clone)]
29pub struct EvalContext<'a> {
30    pub columns: &'a [ColumnSchema],
31    pub table_alias: Option<&'a str>,
32    /// v6.1.1 — bound parameters for `$N` placeholders inside the
33    /// expression tree. Empty for simple queries; populated by the
34    /// prepared-statement Execute path with Bind values converted
35    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
36    pub params: &'a [Value],
37}
38
39impl<'a> EvalContext<'a> {
40    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
41        Self {
42            columns,
43            table_alias,
44            params: &[],
45        }
46    }
47
48    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
49    /// resolution. The slice must outlive the context; callers
50    /// construct it from the prepared statement's Bind values.
51    #[must_use]
52    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
53        self.params = params;
54        self
55    }
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum EvalError {
60    ColumnNotFound { name: String },
61    UnknownQualifier { qualifier: String },
62    DivisionByZero,
63    TypeMismatch { detail: String },
64    /// v6.1.1 — `$N` reference past the number of bound parameters.
65    /// Either the client sent too few in Bind, or the SQL has a
66    /// placeholder the prepared statement didn't account for.
67    PlaceholderOutOfRange { n: u16, bound: u16 },
68}
69
70impl core::fmt::Display for EvalError {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        match self {
73            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
74            Self::UnknownQualifier { qualifier } => {
75                write!(f, "unknown table qualifier: {qualifier}")
76            }
77            Self::DivisionByZero => f.write_str("division by zero"),
78            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
79            Self::PlaceholderOutOfRange { n, bound } => write!(
80                f,
81                "parameter ${n} referenced but only {bound} bound by client"
82            ),
83        }
84    }
85}
86
87pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
88    match expr {
89        Expr::Literal(l) => Ok(literal_to_value(l)),
90        Expr::Column(c) => resolve_column(c, row, ctx),
91        Expr::Placeholder(n) => {
92            let idx = usize::from(*n).saturating_sub(1);
93            ctx.params
94                .get(idx)
95                .cloned()
96                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
97                    n: *n,
98                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
99                })
100        }
101        Expr::Unary { op, expr } => {
102            let v = eval_expr(expr, row, ctx)?;
103            apply_unary(*op, v)
104        }
105        Expr::Binary { lhs, op, rhs } => {
106            let l = eval_expr(lhs, row, ctx)?;
107            let r = eval_expr(rhs, row, ctx)?;
108            apply_binary(*op, l, r)
109        }
110        Expr::Cast { expr, target } => {
111            let v = eval_expr(expr, row, ctx)?;
112            cast_value(v, *target)
113        }
114        Expr::IsNull { expr, negated } => {
115            let v = eval_expr(expr, row, ctx)?;
116            let is_null = matches!(v, Value::Null);
117            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
118        }
119        Expr::FunctionCall { name, args } => {
120            let evaluated: Result<Vec<Value>, _> =
121                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
122            apply_function(name, &evaluated?)
123        }
124        Expr::Like {
125            expr,
126            pattern,
127            negated,
128        } => {
129            let v = eval_expr(expr, row, ctx)?;
130            let p = eval_expr(pattern, row, ctx)?;
131            // NULL on either side propagates to NULL — same as PG.
132            let (text, pat) = match (v, p) {
133                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
134                (Value::Text(a), Value::Text(b)) => (a, b),
135                (Value::Text(_), other) | (other, _) => {
136                    return Err(EvalError::TypeMismatch {
137                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
138                    });
139                }
140            };
141            let m = like_match(&text, &pat);
142            Ok(Value::Bool(if *negated { !m } else { m }))
143        }
144        Expr::Extract { field, source } => {
145            let v = eval_expr(source, row, ctx)?;
146            extract_field(*field, &v)
147        }
148        // v4.10: subquery nodes should have been resolved into
149        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
150        // before the row loop. Anything reaching here is a bug.
151        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
152            Err(EvalError::TypeMismatch {
153                detail: "subquery reached row eval — engine resolver bug".into(),
154            })
155        }
156        // v4.12: window functions should have been rewritten into
157        // synthetic __win_N column references by
158        // exec_select_with_window before row eval. Anything
159        // reaching here is similarly a bug.
160        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
161            detail: "window function reached row eval — engine rewrite bug".into(),
162        }),
163        // v7.10.10 — `ARRAY[expr, expr, …]` constructor. Build a
164        // `Value::TextArray` where each NULL maps to `None` and
165        // each Text element maps to `Some(...)`. Non-TEXT
166        // elements get coerced via `to_string()` here so the
167        // user can write `ARRAY[1, 2, 3]` against a TEXT[]
168        // column (PG also accepts this in many positions).
169        Expr::Array(items) => {
170            let mut out: Vec<Option<String>> = Vec::with_capacity(items.len());
171            for elem in items {
172                match eval_expr(elem, row, ctx)? {
173                    Value::Null => out.push(None),
174                    Value::Text(s) => out.push(Some(s)),
175                    other => out.push(Some(value_to_text_for_array(&other))),
176                }
177            }
178            Ok(Value::TextArray(out))
179        }
180        // v7.10.12 — `arr[i]` PG-style 1-based indexing.
181        // Out-of-range indices (including i ≤ 0) return NULL.
182        Expr::ArraySubscript { target, index } => {
183            let target_v = eval_expr(target, row, ctx)?;
184            let idx_v = eval_expr(index, row, ctx)?;
185            if matches!(target_v, Value::Null) || matches!(idx_v, Value::Null) {
186                return Ok(Value::Null);
187            }
188            let Value::TextArray(items) = target_v else {
189                return Err(EvalError::TypeMismatch {
190                    detail: format!(
191                        "subscript target must be an array, got {:?}",
192                        target_v.data_type()
193                    ),
194                });
195            };
196            let i: i64 = match idx_v {
197                Value::Int(n) => i64::from(n),
198                Value::BigInt(n) => n,
199                Value::SmallInt(n) => i64::from(n),
200                other => {
201                    return Err(EvalError::TypeMismatch {
202                        detail: format!("array subscript must be integer, got {:?}", other.data_type()),
203                    });
204                }
205            };
206            if i < 1 {
207                return Ok(Value::Null);
208            }
209            let pos = (i - 1) as usize;
210            match items.get(pos) {
211                Some(Some(s)) => Ok(Value::Text(s.clone())),
212                Some(None) | None => Ok(Value::Null),
213            }
214        }
215        // v7.10.12 — `x op ANY(arr)` / `x op ALL(arr)`. PG
216        // 3VL: ANY → true if any element compares-true; NULL if
217        // no true but some NULL; false otherwise. ALL: false if
218        // any compares-false; NULL if no false but some NULL;
219        // true otherwise.
220        Expr::AnyAll {
221            expr,
222            op,
223            array,
224            is_any,
225        } => {
226            let lhs = eval_expr(expr, row, ctx)?;
227            let arr = eval_expr(array, row, ctx)?;
228            if matches!(arr, Value::Null) {
229                return Ok(Value::Null);
230            }
231            let Value::TextArray(items) = arr else {
232                return Err(EvalError::TypeMismatch {
233                    detail: format!(
234                        "ANY/ALL right-hand side must be an array, got {:?}",
235                        arr.data_type()
236                    ),
237                });
238            };
239            let mut saw_null = matches!(lhs, Value::Null);
240            let mut saw_match = false;
241            let mut saw_mismatch = false;
242            for elem in items {
243                let elem_v = match elem {
244                    Some(s) => Value::Text(s),
245                    None => {
246                        saw_null = true;
247                        continue;
248                    }
249                };
250                if matches!(lhs, Value::Null) {
251                    saw_null = true;
252                    continue;
253                }
254                match apply_binary(*op, lhs.clone(), elem_v) {
255                    Ok(Value::Bool(true)) => saw_match = true,
256                    Ok(Value::Bool(false)) => saw_mismatch = true,
257                    Ok(Value::Null) => saw_null = true,
258                    Ok(other) => {
259                        return Err(EvalError::TypeMismatch {
260                            detail: format!(
261                                "ANY/ALL comparison didn't return Bool: {:?}",
262                                other.data_type()
263                            ),
264                        });
265                    }
266                    Err(e) => return Err(e),
267                }
268            }
269            let result = if *is_any {
270                if saw_match {
271                    Value::Bool(true)
272                } else if saw_null {
273                    Value::Null
274                } else {
275                    Value::Bool(false)
276                }
277            } else if saw_mismatch {
278                Value::Bool(false)
279            } else if saw_null {
280                Value::Null
281            } else {
282                Value::Bool(true)
283            };
284            Ok(result)
285        }
286    }
287}
288
289/// v7.10.10 — best-effort text rendering for non-TEXT array
290/// elements (numbers, bools, etc.). The PG rule is that
291/// `ARRAY[1, 2]` is `int[]`, but SPG's v7.10 only models TEXT[],
292/// so we widen by stringifying. NUMERIC formatting goes through
293/// the existing canonical helpers to stay consistent with
294/// `format_numeric` / `format_date` etc.
295fn value_to_text_for_array(v: &Value) -> String {
296    match v {
297        Value::Text(s) | Value::Json(s) => s.clone(),
298        Value::Int(n) => n.to_string(),
299        Value::BigInt(n) => n.to_string(),
300        Value::SmallInt(n) => n.to_string(),
301        Value::Bool(b) => if *b { "true".into() } else { "false".into() },
302        Value::Float(x) => format!("{x}"),
303        Value::Date(d) => format_date(*d),
304        Value::Timestamp(t) => format_timestamp(*t),
305        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
306        _ => format!("{v:?}"),
307    }
308}
309
310/// Pull an integer component (year / month / ... / microsecond) out
311/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
312/// when the source isn't a calendar type.
313fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
314    use spg_sql::ast::ExtractField as F;
315    if matches!(v, Value::Null) {
316        return Ok(Value::Null);
317    }
318    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
319    // the months part, the rest from the microseconds part. PG matches
320    // this convention (months is normalised modulo 12 for MONTH).
321    if let Value::Interval { months, micros } = *v {
322        let years = months / 12;
323        let mons = months % 12;
324        let secs_total = micros / 1_000_000;
325        let frac = micros % 1_000_000;
326        let result = match field {
327            F::Year => i64::from(years),
328            F::Month => i64::from(mons),
329            F::Day => micros / 86_400_000_000,
330            F::Hour => (secs_total / 3600) % 24,
331            F::Minute => (secs_total / 60) % 60,
332            F::Second => secs_total % 60,
333            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
334        };
335        return Ok(Value::BigInt(result));
336    }
337    let (days, day_micros) = match *v {
338        Value::Date(d) => (d, 0_i64),
339        Value::Timestamp(t) => {
340            let days = t.div_euclid(86_400_000_000);
341            let day_micros = t.rem_euclid(86_400_000_000);
342            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
343        }
344        _ => {
345            return Err(EvalError::TypeMismatch {
346                detail: format!(
347                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
348                    v.data_type()
349                ),
350            });
351        }
352    };
353    let (y, m, d) = civil_components(days);
354    let secs = day_micros / 1_000_000;
355    let hh = secs / 3600;
356    let mm = (secs / 60) % 60;
357    let ss = secs % 60;
358    let frac = day_micros % 1_000_000;
359    let result = match field {
360        F::Year => i64::from(y),
361        F::Month => i64::from(m),
362        F::Day => i64::from(d),
363        F::Hour => hh,
364        F::Minute => mm,
365        F::Second => ss,
366        F::Microsecond => ss * 1_000_000 + frac,
367    };
368    Ok(Value::BigInt(result))
369}
370
371/// Internal wrapper around the file-private `civil_from_days` so the
372/// public surface area doesn't change. Returns `(year, month, day)`.
373fn civil_components(days: i32) -> (i32, u32, u32) {
374    civil_from_days(days)
375}
376
377/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
378/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
379/// literal `%`. Matches the whole input — no implicit anchoring needed
380/// since SQL `LIKE` is always full-string.
381fn like_match(text: &str, pattern: &str) -> bool {
382    let text: Vec<char> = text.chars().collect();
383    let pat: Vec<char> = pattern.chars().collect();
384    like_match_inner(&text, 0, &pat, 0)
385}
386
387fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
388    while pi < pat.len() {
389        match pat[pi] {
390            '%' => {
391                // Collapse consecutive `%` and try every possible split.
392                while pi < pat.len() && pat[pi] == '%' {
393                    pi += 1;
394                }
395                if pi == pat.len() {
396                    return true;
397                }
398                for k in ti..=text.len() {
399                    if like_match_inner(text, k, pat, pi) {
400                        return true;
401                    }
402                }
403                return false;
404            }
405            '_' => {
406                if ti >= text.len() {
407                    return false;
408                }
409                ti += 1;
410                pi += 1;
411            }
412            '\\' if pi + 1 < pat.len() => {
413                let want = pat[pi + 1];
414                if ti >= text.len() || text[ti] != want {
415                    return false;
416                }
417                ti += 1;
418                pi += 2;
419            }
420            c => {
421                if ti >= text.len() || text[ti] != c {
422                    return false;
423                }
424                ti += 1;
425                pi += 1;
426            }
427        }
428    }
429    ti == text.len()
430}
431
432/// Dispatch on lowercased function name. v1.4 implements only a handful of
433/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
434fn apply_function(name: &str, args: &[Value]) -> Result<Value, EvalError> {
435    match name.to_ascii_lowercase().as_str() {
436        "length" => {
437            if args.len() != 1 {
438                return Err(EvalError::TypeMismatch {
439                    detail: format!("length() takes 1 arg, got {}", args.len()),
440                });
441            }
442            match &args[0] {
443                Value::Null => Ok(Value::Null),
444                Value::Text(s) => {
445                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
446                    Ok(Value::Int(n))
447                }
448                // v7.10.4 — PG semantics: length(bytea) returns
449                // byte count (= octet_length). Without this branch
450                // mailrs's INSERT … SELECT length(body) … against a
451                // BYTEA column would type-mismatch.
452                Value::Bytes(b) => {
453                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
454                    Ok(Value::Int(n))
455                }
456                other => Err(EvalError::TypeMismatch {
457                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
458                }),
459            }
460        }
461        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
462        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
463        // to length() for bytea by design.
464        "octet_length" => {
465            if args.len() != 1 {
466                return Err(EvalError::TypeMismatch {
467                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
468                });
469            }
470            match &args[0] {
471                Value::Null => Ok(Value::Null),
472                Value::Text(s) => {
473                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
474                    Ok(Value::Int(n))
475                }
476                Value::Bytes(b) => {
477                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
478                    Ok(Value::Int(n))
479                }
480                other => Err(EvalError::TypeMismatch {
481                    detail: format!(
482                        "octet_length() needs text or bytea, got {:?}",
483                        other.data_type()
484                    ),
485                }),
486            }
487        }
488        // v7.11.6 — `array_length(arr, dim)` returns the element
489        // count of `arr` along dimension `dim`. v7.11 only models
490        // single-dimension arrays so dim must be 1 (otherwise NULL,
491        // matching PG semantics for unsupported dimensions). NULL
492        // array → NULL. v7.11 TEXT[] only; non-array operand is
493        // a type mismatch.
494        "array_length" => {
495            if args.len() != 2 {
496                return Err(EvalError::TypeMismatch {
497                    detail: format!("array_length() takes 2 args, got {}", args.len()),
498                });
499            }
500            if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
501                return Ok(Value::Null);
502            }
503            let Value::TextArray(items) = &args[0] else {
504                return Err(EvalError::TypeMismatch {
505                    detail: format!(
506                        "array_length() first arg must be an array, got {:?}",
507                        args[0].data_type()
508                    ),
509                });
510            };
511            let dim: i64 = match args[1] {
512                Value::Int(n) => i64::from(n),
513                Value::BigInt(n) => n,
514                Value::SmallInt(n) => i64::from(n),
515                _ => {
516                    return Err(EvalError::TypeMismatch {
517                        detail: format!(
518                            "array_length() second arg must be integer, got {:?}",
519                            args[1].data_type()
520                        ),
521                    });
522                }
523            };
524            if dim != 1 {
525                return Ok(Value::Null);
526            }
527            let n = i32::try_from(items.len()).unwrap_or(i32::MAX);
528            Ok(Value::Int(n))
529        }
530        // v7.11.6 — `array_position(arr, val)` returns 1-based
531        // index of the first element of `arr` equal to `val`, or
532        // NULL if not found. PG NULL semantics: NULL array → NULL;
533        // NULL val never matches (returns NULL if absent).
534        "array_position" => {
535            if args.len() != 2 {
536                return Err(EvalError::TypeMismatch {
537                    detail: format!("array_position() takes 2 args, got {}", args.len()),
538                });
539            }
540            if matches!(args[0], Value::Null) {
541                return Ok(Value::Null);
542            }
543            let Value::TextArray(items) = &args[0] else {
544                return Err(EvalError::TypeMismatch {
545                    detail: format!(
546                        "array_position() first arg must be an array, got {:?}",
547                        args[0].data_type()
548                    ),
549                });
550            };
551            let needle = match &args[1] {
552                Value::Text(s) => s.clone(),
553                Value::Null => return Ok(Value::Null),
554                other => {
555                    return Err(EvalError::TypeMismatch {
556                        detail: format!(
557                            "array_position() needle must be text, got {:?}",
558                            other.data_type()
559                        ),
560                    });
561                }
562            };
563            for (idx, item) in items.iter().enumerate() {
564                if let Some(s) = item
565                    && s == &needle
566                {
567                    return Ok(Value::Int(i32::try_from(idx + 1).unwrap_or(i32::MAX)));
568                }
569            }
570            Ok(Value::Null)
571        }
572        "upper" => {
573            if args.len() != 1 {
574                return Err(EvalError::TypeMismatch {
575                    detail: format!("upper() takes 1 arg, got {}", args.len()),
576                });
577            }
578            match &args[0] {
579                Value::Null => Ok(Value::Null),
580                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
581                other => Err(EvalError::TypeMismatch {
582                    detail: format!("upper() needs text, got {:?}", other.data_type()),
583                }),
584            }
585        }
586        "lower" => {
587            if args.len() != 1 {
588                return Err(EvalError::TypeMismatch {
589                    detail: format!("lower() takes 1 arg, got {}", args.len()),
590                });
591            }
592            match &args[0] {
593                Value::Null => Ok(Value::Null),
594                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
595                other => Err(EvalError::TypeMismatch {
596                    detail: format!("lower() needs text, got {:?}", other.data_type()),
597                }),
598            }
599        }
600        "abs" => {
601            if args.len() != 1 {
602                return Err(EvalError::TypeMismatch {
603                    detail: format!("abs() takes 1 arg, got {}", args.len()),
604                });
605            }
606            match &args[0] {
607                Value::Null => Ok(Value::Null),
608                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
609                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
610                Value::Float(x) => Ok(Value::Float(x.abs())),
611                other => Err(EvalError::TypeMismatch {
612                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
613                }),
614            }
615        }
616        "coalesce" => {
617            for a in args {
618                if !matches!(a, Value::Null) {
619                    return Ok(a.clone());
620                }
621            }
622            Ok(Value::Null)
623        }
624        "date_trunc" => date_trunc(args),
625        "date_part" => date_part(args),
626        "age" => age(args),
627        "to_char" => to_char(args),
628        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
629        "encode" => encode_text(args),
630        "decode" => decode_text(args),
631        "error_on_null" => error_on_null(args),
632        other => Err(EvalError::TypeMismatch {
633            detail: format!("unknown function `{other}`"),
634        }),
635    }
636}
637
638/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
639/// arguments; SPG's value space treats Text as the byte container
640/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
641/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
642/// hex.
643fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
644    if args.len() != 2 {
645        return Err(EvalError::TypeMismatch {
646            detail: format!("encode() takes 2 args, got {}", args.len()),
647        });
648    }
649    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
650        return Ok(Value::Null);
651    }
652    let bytes: &[u8] = match &args[0] {
653        Value::Text(s) => s.as_bytes(),
654        other => {
655            return Err(EvalError::TypeMismatch {
656                detail: format!(
657                    "encode() expects text bytes, got {:?}",
658                    other.data_type()
659                ),
660            });
661        }
662    };
663    let fmt = match &args[1] {
664        Value::Text(s) => s.to_ascii_lowercase(),
665        other => {
666            return Err(EvalError::TypeMismatch {
667                detail: format!(
668                    "encode() format must be text, got {:?}",
669                    other.data_type()
670                ),
671            });
672        }
673    };
674    let out = match fmt.as_str() {
675        "base64" => b64_encode(bytes, B64_STD),
676        "base64url" => b64_encode(bytes, B64_URL),
677        "base32hex" => b32hex_encode(bytes),
678        "hex" => hex_encode(bytes),
679        other => {
680            return Err(EvalError::TypeMismatch {
681                detail: format!("encode(): unknown format `{other}`"),
682            });
683        }
684    };
685    Ok(Value::Text(out))
686}
687
688/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
689/// Text containing the raw decoded bytes (caller may CAST to bytea
690/// equivalent if SPG adds bytea later).
691fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
692    if args.len() != 2 {
693        return Err(EvalError::TypeMismatch {
694            detail: format!("decode() takes 2 args, got {}", args.len()),
695        });
696    }
697    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
698        return Ok(Value::Null);
699    }
700    let text = match &args[0] {
701        Value::Text(s) => s.as_str(),
702        other => {
703            return Err(EvalError::TypeMismatch {
704                detail: format!("decode() expects text, got {:?}", other.data_type()),
705            });
706        }
707    };
708    let fmt = match &args[1] {
709        Value::Text(s) => s.to_ascii_lowercase(),
710        other => {
711            return Err(EvalError::TypeMismatch {
712                detail: format!(
713                    "decode() format must be text, got {:?}",
714                    other.data_type()
715                ),
716            });
717        }
718    };
719    let bytes = match fmt.as_str() {
720        "base64" => b64_decode(text, B64_STD)?,
721        "base64url" => b64_decode(text, B64_URL)?,
722        "base32hex" => b32hex_decode(text)?,
723        "hex" => hex_decode(text)?,
724        other => {
725            return Err(EvalError::TypeMismatch {
726                detail: format!("decode(): unknown format `{other}`"),
727            });
728        }
729    };
730    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
731        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
732    })?;
733    Ok(Value::Text(s))
734}
735
736/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
737/// errors otherwise. Convenience to assert NOT NULL inside an
738/// expression without wrapping it in COALESCE + raise hacks.
739fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
740    if args.len() != 1 {
741        return Err(EvalError::TypeMismatch {
742            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
743        });
744    }
745    if matches!(args[0], Value::Null) {
746        return Err(EvalError::TypeMismatch {
747            detail: "error_on_null(): argument is NULL".into(),
748        });
749    }
750    Ok(args[0].clone())
751}
752
753// ── byte-level encoders ───────────────────────────────────────────
754
755const B64_STD: &[u8; 64] =
756    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
757const B64_URL: &[u8; 64] =
758    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
759const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
760
761fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
762    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
763    let mut i = 0;
764    while i + 3 <= bytes.len() {
765        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
766        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
767        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
768        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
769        out.push(alpha[(n & 0x3f) as usize] as char);
770        i += 3;
771    }
772    let rem = bytes.len() - i;
773    if rem == 1 {
774        let n = (bytes[i] as u32) << 16;
775        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
776        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
777        out.push('=');
778        out.push('=');
779    } else if rem == 2 {
780        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
781        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
782        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
783        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
784        out.push('=');
785    }
786    out
787}
788
789fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
790    let mut lookup = [255u8; 256];
791    for (i, &c) in alpha.iter().enumerate() {
792        lookup[c as usize] = i as u8;
793    }
794    let mut out = Vec::with_capacity(text.len() * 3 / 4);
795    let mut buf: u32 = 0;
796    let mut bits: u32 = 0;
797    for c in text.bytes() {
798        if c == b'=' {
799            break;
800        }
801        if c == b'\n' || c == b'\r' || c == b' ' {
802            continue;
803        }
804        let v = lookup[c as usize];
805        if v == 255 {
806            return Err(EvalError::TypeMismatch {
807                detail: format!("decode(base64): invalid char {:?}", c as char),
808            });
809        }
810        buf = (buf << 6) | v as u32;
811        bits += 6;
812        if bits >= 8 {
813            bits -= 8;
814            out.push(((buf >> bits) & 0xff) as u8);
815        }
816    }
817    Ok(out)
818}
819
820fn b32hex_encode(bytes: &[u8]) -> String {
821    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
822    let mut buf: u64 = 0;
823    let mut bits: u32 = 0;
824    for &b in bytes {
825        buf = (buf << 8) | b as u64;
826        bits += 8;
827        while bits >= 5 {
828            bits -= 5;
829            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
830        }
831    }
832    if bits > 0 {
833        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
834    }
835    // Pad to multiple of 8.
836    while out.len() % 8 != 0 {
837        out.push('=');
838    }
839    out
840}
841
842fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
843    let mut lookup = [255u8; 256];
844    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
845        lookup[c as usize] = i as u8;
846        // base32hex is case-insensitive — also map lowercase.
847        let lower = (c as char).to_ascii_lowercase() as u8;
848        lookup[lower as usize] = i as u8;
849    }
850    let mut out = Vec::with_capacity(text.len() * 5 / 8);
851    let mut buf: u64 = 0;
852    let mut bits: u32 = 0;
853    for c in text.bytes() {
854        if c == b'=' {
855            break;
856        }
857        if c == b'\n' || c == b'\r' || c == b' ' {
858            continue;
859        }
860        let v = lookup[c as usize];
861        if v == 255 {
862            return Err(EvalError::TypeMismatch {
863                detail: format!("decode(base32hex): invalid char {:?}", c as char),
864            });
865        }
866        buf = (buf << 5) | v as u64;
867        bits += 5;
868        if bits >= 8 {
869            bits -= 8;
870            out.push(((buf >> bits) & 0xff) as u8);
871        }
872    }
873    Ok(out)
874}
875
876fn hex_encode(bytes: &[u8]) -> String {
877    const HEX: &[u8; 16] = b"0123456789abcdef";
878    let mut out = String::with_capacity(bytes.len() * 2);
879    for &b in bytes {
880        out.push(HEX[(b >> 4) as usize] as char);
881        out.push(HEX[(b & 0xf) as usize] as char);
882    }
883    out
884}
885
886fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
887    let trimmed = text.trim();
888    if trimmed.len() % 2 != 0 {
889        return Err(EvalError::TypeMismatch {
890            detail: "decode(hex): input length must be even".into(),
891        });
892    }
893    let mut out = Vec::with_capacity(trimmed.len() / 2);
894    let mut hi: u8 = 0;
895    for (i, c) in trimmed.bytes().enumerate() {
896        let v = match c {
897            b'0'..=b'9' => c - b'0',
898            b'a'..=b'f' => c - b'a' + 10,
899            b'A'..=b'F' => c - b'A' + 10,
900            _ => {
901                return Err(EvalError::TypeMismatch {
902                    detail: format!("decode(hex): invalid char {:?}", c as char),
903                });
904            }
905        };
906        if i % 2 == 0 {
907            hi = v;
908        } else {
909            out.push((hi << 4) | v);
910        }
911    }
912    Ok(out)
913}
914
915/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
916/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
917/// same `BigInt` return shape; PG returns double precision but we keep the
918/// integer convention so the runner's `query I` shape works unchanged.
919fn date_part(args: &[Value]) -> Result<Value, EvalError> {
920    use spg_sql::ast::ExtractField as F;
921    if args.len() != 2 {
922        return Err(EvalError::TypeMismatch {
923            detail: format!("date_part() takes 2 args, got {}", args.len()),
924        });
925    }
926    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
927        return Ok(Value::Null);
928    }
929    let Value::Text(field_name) = &args[0] else {
930        return Err(EvalError::TypeMismatch {
931            detail: format!(
932                "date_part() needs a text field, got {:?}",
933                args[0].data_type()
934            ),
935        });
936    };
937    let field = match field_name.to_ascii_lowercase().as_str() {
938        "year" => F::Year,
939        "month" => F::Month,
940        "day" => F::Day,
941        "hour" => F::Hour,
942        "minute" => F::Minute,
943        "second" => F::Second,
944        "microsecond" | "microseconds" => F::Microsecond,
945        other => {
946            return Err(EvalError::TypeMismatch {
947                detail: format!(
948                    "unknown date_part field {other:?}; \
949                     supported: year, month, day, hour, minute, second, microsecond"
950                ),
951            });
952        }
953    };
954    extract_field(field, &args[1])
955}
956
957/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
958/// micros-only interval (no months normalisation) because PG's
959/// month-justification rule is sensitive to the day-of-month walk and
960/// adds material complexity for marginal corpus value.
961///
962/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
963/// the dispatcher errors instead of guessing a clock source. Callers
964/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
965/// explicitly so the clock reference is visible at the SQL layer.
966fn age(args: &[Value]) -> Result<Value, EvalError> {
967    if args.is_empty() || args.len() > 2 {
968        return Err(EvalError::TypeMismatch {
969            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
970        });
971    }
972    if args.iter().any(|v| matches!(v, Value::Null)) {
973        return Ok(Value::Null);
974    }
975    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
976    // stays as-is; anything else errors.
977    let to_micros = |v: &Value| -> Result<i64, EvalError> {
978        match v {
979            Value::Timestamp(t) => Ok(*t),
980            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
981            other => Err(EvalError::TypeMismatch {
982                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
983            }),
984        }
985    };
986    if args.len() == 1 {
987        return Err(EvalError::TypeMismatch {
988            detail: "single-arg age() is unsupported in v2.12 \
989                     (use age(CURRENT_DATE, t) explicitly)"
990                .into(),
991        });
992    }
993    let a = to_micros(&args[0])?;
994    let b = to_micros(&args[1])?;
995    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
996        detail: "age() subtraction overflows i64 microseconds".into(),
997    })?;
998    Ok(Value::Interval {
999        months: 0,
1000        micros: delta,
1001    })
1002}
1003
1004/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
1005/// format template. Supports the high-traffic placeholders:
1006///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
1007/// Unrecognised characters pass through literally so the template's
1008/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
1009fn to_char(args: &[Value]) -> Result<Value, EvalError> {
1010    use core::fmt::Write as _;
1011    if args.len() != 2 {
1012        return Err(EvalError::TypeMismatch {
1013            detail: format!("to_char() takes 2 args, got {}", args.len()),
1014        });
1015    }
1016    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1017        return Ok(Value::Null);
1018    }
1019    let Value::Text(fmt) = &args[1] else {
1020        return Err(EvalError::TypeMismatch {
1021            detail: format!(
1022                "to_char() needs a text format, got {:?}",
1023                args[1].data_type()
1024            ),
1025        });
1026    };
1027    let (days, day_micros) = match &args[0] {
1028        Value::Date(d) => (*d, 0_i64),
1029        Value::Timestamp(t) => {
1030            let days = t.div_euclid(86_400_000_000);
1031            (
1032                i32::try_from(days).unwrap_or(i32::MAX),
1033                t.rem_euclid(86_400_000_000),
1034            )
1035        }
1036        other => {
1037            return Err(EvalError::TypeMismatch {
1038                detail: format!(
1039                    "to_char() needs DATE or TIMESTAMP, got {:?}",
1040                    other.data_type()
1041                ),
1042            });
1043        }
1044    };
1045    let (y, mo, d) = civil_from_days(days);
1046    let secs = day_micros / 1_000_000;
1047    let frac = day_micros % 1_000_000;
1048    // div_euclid keeps every value non-negative — the casts below are
1049    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
1050    // 1_000_000)`, so all three quantities fit in u32.
1051    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
1052    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
1053    let ss = u32::try_from(secs % 60).unwrap_or(0);
1054    let hh12 = match hh24 % 12 {
1055        0 => 12,
1056        x => x,
1057    };
1058    let ampm = if hh24 < 12 { "AM" } else { "PM" };
1059    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
1060    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
1061
1062    let mut out = String::with_capacity(fmt.len() + 8);
1063    let bytes = fmt.as_bytes();
1064    let mut i = 0;
1065    // write! against a String never fails — discard the Result.
1066    while i < bytes.len() {
1067        // Try the longest prefixes first so "YYYY" wins over "YY".
1068        let rest = &bytes[i..];
1069        if rest.starts_with(b"YYYY") {
1070            let _ = write!(out, "{y:04}");
1071            i += 4;
1072        } else if rest.starts_with(b"YY") {
1073            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
1074            let yy = (y.rem_euclid(100)) as u32;
1075            let _ = write!(out, "{yy:02}");
1076            i += 2;
1077        } else if rest.starts_with(b"Month") {
1078            out.push_str(MONTH_FULL[(mo - 1) as usize]);
1079            i += 5;
1080        } else if rest.starts_with(b"Mon") {
1081            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
1082            i += 3;
1083        } else if rest.starts_with(b"MM") {
1084            let _ = write!(out, "{mo:02}");
1085            i += 2;
1086        } else if rest.starts_with(b"DD") {
1087            let _ = write!(out, "{d:02}");
1088            i += 2;
1089        } else if rest.starts_with(b"HH24") {
1090            let _ = write!(out, "{hh24:02}");
1091            i += 4;
1092        } else if rest.starts_with(b"HH12") {
1093            let _ = write!(out, "{hh12:02}");
1094            i += 4;
1095        } else if rest.starts_with(b"MI") {
1096            let _ = write!(out, "{mi:02}");
1097            i += 2;
1098        } else if rest.starts_with(b"SS") {
1099            let _ = write!(out, "{ss:02}");
1100            i += 2;
1101        } else if rest.starts_with(b"MS") {
1102            let _ = write!(out, "{ms:03}");
1103            i += 2;
1104        } else if rest.starts_with(b"US") {
1105            let _ = write!(out, "{us:06}");
1106            i += 2;
1107        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
1108            out.push_str(ampm);
1109            i += 2;
1110        } else {
1111            // Pass any non-placeholder byte through verbatim.
1112            out.push(bytes[i] as char);
1113            i += 1;
1114        }
1115    }
1116    Ok(Value::Text(out))
1117}
1118
1119const MONTH_FULL: [&str; 12] = [
1120    "January",
1121    "February",
1122    "March",
1123    "April",
1124    "May",
1125    "June",
1126    "July",
1127    "August",
1128    "September",
1129    "October",
1130    "November",
1131    "December",
1132];
1133const MONTH_ABBR: [&str; 12] = [
1134    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1135];
1136
1137/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
1138/// requested calendar boundary (year / month / day / hour / minute /
1139/// second). Returns the truncated `TIMESTAMP`. NULL on either side
1140/// propagates to NULL.
1141fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
1142    if args.len() != 2 {
1143        return Err(EvalError::TypeMismatch {
1144            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
1145        });
1146    }
1147    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1148        return Ok(Value::Null);
1149    }
1150    let Value::Text(unit) = &args[0] else {
1151        return Err(EvalError::TypeMismatch {
1152            detail: format!(
1153                "date_trunc() needs a text unit, got {:?}",
1154                args[0].data_type()
1155            ),
1156        });
1157    };
1158    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
1159    // midnight first; the result is always TIMESTAMP.
1160    let micros = match &args[1] {
1161        Value::Timestamp(t) => *t,
1162        Value::Date(d) => i64::from(*d) * 86_400_000_000,
1163        other => {
1164            return Err(EvalError::TypeMismatch {
1165                detail: format!(
1166                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
1167                    other.data_type()
1168                ),
1169            });
1170        }
1171    };
1172    let unit_lc = unit.to_ascii_lowercase();
1173    let days = micros.div_euclid(86_400_000_000);
1174    let day_micros = micros.rem_euclid(86_400_000_000);
1175    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1176    let (y, m, _) = civil_from_days(day_i32);
1177    let truncated = match unit_lc.as_str() {
1178        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
1179        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
1180        "day" => days * 86_400_000_000,
1181        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
1182        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
1183        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
1184        other => {
1185            return Err(EvalError::TypeMismatch {
1186                detail: format!(
1187                    "unknown date_trunc unit {other:?}; \
1188                     supported: year, month, day, hour, minute, second"
1189                ),
1190            });
1191        }
1192    };
1193    Ok(Value::Timestamp(truncated))
1194}
1195
1196/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
1197pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
1198    if matches!(v, Value::Null) {
1199        return Ok(Value::Null);
1200    }
1201    match target {
1202        CastTarget::Vector => cast_to_vector(v),
1203        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
1204        CastTarget::Int => cast_numeric_to_int(v),
1205        CastTarget::BigInt => cast_numeric_to_bigint(v),
1206        CastTarget::Float => cast_numeric_to_float(v),
1207        CastTarget::Bool => cast_to_bool(v),
1208        CastTarget::Date => cast_to_date(v),
1209        // TIMESTAMP and TIMESTAMPTZ have identical runtime
1210        // representation (i64 microseconds UTC).
1211        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
1212        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
1213        // is supported (the mailrs idiom: `$1::INTERVAL` where the
1214        // bound param is a string like `'7 days'`).
1215        CastTarget::Interval => cast_to_interval(v),
1216        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
1217        // (validation is the producer's responsibility, same as
1218        // the column-INSERT path).
1219        CastTarget::Json | CastTarget::Jsonb => match v {
1220            Value::Json(s) => Ok(Value::Json(s)),
1221            Value::Text(s) => Ok(Value::Json(s)),
1222            other => Err(EvalError::TypeMismatch {
1223                detail: alloc::format!(
1224                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
1225                    other.data_type()
1226                ),
1227            }),
1228        },
1229        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1230        // pg_catalog; surface a clear error.
1231        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1232            detail:
1233                "::regtype / ::regclass not supported on SPG \
1234                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1235                    .into(),
1236        }),
1237        // v7.10.11 — `::TEXT[]`. Decode PG external array form
1238        // when input is Text; pass through unchanged when it is
1239        // already TextArray. Anything else is a type mismatch.
1240        CastTarget::TextArray => match v {
1241            Value::TextArray(items) => Ok(Value::TextArray(items)),
1242            Value::Text(s) => decode_text_array_external(&s).map(Value::TextArray),
1243            other => Err(EvalError::TypeMismatch {
1244                detail: alloc::format!(
1245                    "::TEXT[] only accepts TEXT / TEXT[] inputs, got {:?}",
1246                    other.data_type()
1247                ),
1248            }),
1249        },
1250    }
1251}
1252
1253/// v7.10.11 — same decoder as `decode_text_array_literal` in
1254/// `lib.rs`, but lives here so the eval-time cast path stays
1255/// inside `spg-engine::eval`. Kept in lock-step with the engine
1256/// `coerce_value` decoder by tests.
1257fn decode_text_array_external(s: &str) -> Result<Vec<Option<String>>, EvalError> {
1258    let trimmed = s.trim();
1259    let inner = trimmed
1260        .strip_prefix('{')
1261        .and_then(|x| x.strip_suffix('}'))
1262        .ok_or_else(|| EvalError::TypeMismatch {
1263            detail: alloc::format!("TEXT[] literal {s:?} must be enclosed in '{{...}}'"),
1264        })?;
1265    let mut out: Vec<Option<String>> = Vec::new();
1266    if inner.trim().is_empty() {
1267        return Ok(out);
1268    }
1269    let bytes = inner.as_bytes();
1270    let mut i = 0;
1271    while i <= bytes.len() {
1272        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1273            i += 1;
1274        }
1275        if i < bytes.len() && bytes[i] == b'"' {
1276            i += 1;
1277            let mut buf = String::new();
1278            while i < bytes.len() && bytes[i] != b'"' {
1279                if bytes[i] == b'\\' && i + 1 < bytes.len() {
1280                    buf.push(bytes[i + 1] as char);
1281                    i += 2;
1282                } else {
1283                    buf.push(bytes[i] as char);
1284                    i += 1;
1285                }
1286            }
1287            if i >= bytes.len() {
1288                return Err(EvalError::TypeMismatch {
1289                    detail: "unterminated quoted element in TEXT[] literal".into(),
1290                });
1291            }
1292            i += 1;
1293            out.push(Some(buf));
1294        } else {
1295            let start = i;
1296            while i < bytes.len() && bytes[i] != b',' {
1297                i += 1;
1298            }
1299            let raw = inner[start..i].trim();
1300            if raw.eq_ignore_ascii_case("NULL") {
1301                out.push(None);
1302            } else {
1303                out.push(Some(raw.to_string()));
1304            }
1305        }
1306        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1307            i += 1;
1308        }
1309        if i >= bytes.len() {
1310            break;
1311        }
1312        if bytes[i] != b',' {
1313            return Err(EvalError::TypeMismatch {
1314                detail: "expected ',' between TEXT[] elements".into(),
1315            });
1316        }
1317        i += 1;
1318    }
1319    Ok(out)
1320}
1321
1322fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
1323    match v {
1324        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
1325        Value::Text(s) => {
1326            let (months, micros) = spg_sql::parser::parse_interval_text(&s)
1327                .ok_or_else(|| EvalError::TypeMismatch {
1328                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
1329                })?;
1330            Ok(Value::Interval { months, micros })
1331        }
1332        other => Err(EvalError::TypeMismatch {
1333            detail: alloc::format!(
1334                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
1335                other.data_type()
1336            ),
1337        }),
1338    }
1339}
1340
1341fn cast_to_date(v: Value) -> Result<Value, EvalError> {
1342    match v {
1343        Value::Date(d) => Ok(Value::Date(d)),
1344        // Integer literals carry days since the Unix epoch — used by
1345        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
1346        Value::Int(n) => Ok(Value::Date(n)),
1347        Value::BigInt(n) => {
1348            i32::try_from(n)
1349                .map(Value::Date)
1350                .map_err(|_| EvalError::TypeMismatch {
1351                    detail: "bigint days-since-epoch out of DATE range".into(),
1352                })
1353        }
1354        // Timestamp truncates to its day boundary.
1355        Value::Timestamp(t) => {
1356            let days = t.div_euclid(86_400_000_000);
1357            i32::try_from(days)
1358                .map(Value::Date)
1359                .map_err(|_| EvalError::TypeMismatch {
1360                    detail: "timestamp out of DATE range".into(),
1361                })
1362        }
1363        Value::Text(s) => parse_date_literal(&s)
1364            .map(Value::Date)
1365            .ok_or(EvalError::TypeMismatch {
1366                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
1367            }),
1368        other => Err(EvalError::TypeMismatch {
1369            detail: format!("cannot cast {:?} to DATE", other.data_type()),
1370        }),
1371    }
1372}
1373
1374fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
1375    match v {
1376        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
1377        // Int / BigInt carry microseconds since the Unix epoch — used
1378        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
1379        // the wall clock as a plain integer literal.
1380        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
1381        Value::BigInt(n) => Ok(Value::Timestamp(n)),
1382        // DATE → TIMESTAMP picks midnight on the date.
1383        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
1384        Value::Text(s) => {
1385            parse_timestamp_literal(&s)
1386                .map(Value::Timestamp)
1387                .ok_or(EvalError::TypeMismatch {
1388                    detail: format!(
1389                        "cannot parse {s:?} as TIMESTAMP \
1390                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
1391                    ),
1392                })
1393        }
1394        other => Err(EvalError::TypeMismatch {
1395            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
1396        }),
1397    }
1398}
1399
1400fn value_to_text(v: &Value) -> String {
1401    match v {
1402        // v7.5.0 — Value is #[non_exhaustive]; any future variant
1403        // without explicit text rendering hits the Debug fallback
1404        // at the end.
1405        Value::SmallInt(n) => format!("{n}"),
1406        Value::Int(n) => format!("{n}"),
1407        Value::BigInt(n) => format!("{n}"),
1408        Value::Float(x) => format!("{x}"),
1409        // v4.9: JSON renders identically to Text — both are raw UTF-8.
1410        Value::Text(s) | Value::Json(s) => s.clone(),
1411        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
1412        Value::Vector(v) => {
1413            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
1414            format!("[{}]", cells.join(", "))
1415        }
1416        // v6.0.1: render SQ8 cells dequantised, so SELECT output
1417        // matches the pgvector wire shape clients expect. The
1418        // recall envelope already absorbs the ≤ (max-min)/255/2
1419        // dequantisation error.
1420        Value::Sq8Vector(q) => {
1421            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
1422                .iter()
1423                .map(|x| format!("{x}"))
1424                .collect();
1425            format!("[{}]", cells.join(", "))
1426        }
1427        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
1428        // for SELECT output.
1429        Value::HalfVector(h) => {
1430            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
1431            format!("[{}]", cells.join(", "))
1432        }
1433        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
1434        Value::Date(d) => format_date(*d),
1435        Value::Timestamp(t) => format_timestamp(*t),
1436        Value::Interval { months, micros } => format_interval(*months, *micros),
1437        Value::Null => "NULL".into(),
1438        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
1439        _ => format!("{v:?}"),
1440    }
1441}
1442
1443/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
1444/// for pre-1970 dates render with a leading `-` on the year.
1445pub fn format_date(days: i32) -> String {
1446    let (y, m, d) = civil_from_days(days);
1447    format!("{y:04}-{m:02}-{d:02}")
1448}
1449
1450/// Render a `Timestamp` (microseconds since epoch) as
1451/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
1452/// dropped; a whole-second value has no fractional part.
1453pub fn format_timestamp(micros: i64) -> String {
1454    const MICROS_PER_DAY: i64 = 86_400_000_000;
1455    // Split into day + intra-day part with proper floor division so
1456    // negative timestamps render right too.
1457    let days = micros.div_euclid(MICROS_PER_DAY);
1458    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
1459    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1460    let (y, m, d) = civil_from_days(day_i32);
1461    let secs = day_micros / 1_000_000;
1462    let frac = day_micros % 1_000_000;
1463    let hh = secs / 3600;
1464    let mm = (secs / 60) % 60;
1465    let ss = secs % 60;
1466    if frac == 0 {
1467        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
1468    } else {
1469        // Strip trailing zeros from the 6-digit fractional component.
1470        let raw = format!("{frac:06}");
1471        let trimmed = raw.trim_end_matches('0');
1472        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
1473    }
1474}
1475
1476/// Howard Hinnant's `civil_from_days` — converts days since the Unix
1477/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
1478/// directions of this calendar conversion live in `eval.rs` so the
1479/// engine never reaches for `std` time facilities.
1480#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1481fn civil_from_days(days: i32) -> (i32, u32, u32) {
1482    let z = i64::from(days) + 719_468;
1483    let era = z.div_euclid(146_097);
1484    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
1485    // every other quantity below — `as u32` truncations are safe by
1486    // construction.
1487    let doe = (z - era * 146_097) as u32;
1488    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
1489    let y_base = i64::from(yoe) + era * 400;
1490    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
1491    let mp = (5 * doy + 2) / 153;
1492    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
1493    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1494    let y = if m <= 2 { y_base + 1 } else { y_base };
1495    (y as i32, m, d)
1496}
1497
1498/// Inverse of `civil_from_days` — converts (year, month, day) to days
1499/// since 1970-01-01. Out-of-range months / days saturate.
1500#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1501pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
1502    let y_adj = if m <= 2 {
1503        i64::from(y) - 1
1504    } else {
1505        i64::from(y)
1506    };
1507    let era = y_adj.div_euclid(400);
1508    let yoe = (y_adj - era * 400) as u32;
1509    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
1510    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1511    let total = era * 146_097 + i64::from(doe) - 719_468;
1512    i32::try_from(total).unwrap_or(i32::MAX)
1513}
1514
1515/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
1516/// `None` on shape / numeric failure; the engine surfaces that as a
1517/// `TypeMismatch` with the original text included.
1518pub fn parse_date_literal(s: &str) -> Option<i32> {
1519    let bytes = s.as_bytes();
1520    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
1521        return None;
1522    }
1523    let y: i32 = s[0..4].parse().ok()?;
1524    let m: u32 = s[5..7].parse().ok()?;
1525    let d: u32 = s[8..10].parse().ok()?;
1526    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
1527        return None;
1528    }
1529    Some(days_from_civil(y, m, d))
1530}
1531
1532/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
1533/// (microseconds since Unix epoch). The time portion is optional;
1534/// missing → midnight. The fractional portion accepts 1–6 digits and
1535/// pads with zeros to microseconds.
1536pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
1537    let trimmed = s.trim();
1538    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
1539        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
1540        None => (trimmed, None),
1541    };
1542    let days = parse_date_literal(date_part)?;
1543    let day_micros = match time_part {
1544        None => 0,
1545        Some(t) => parse_time_of_day_micros(t)?,
1546    };
1547    Some(i64::from(days) * 86_400_000_000 + day_micros)
1548}
1549
1550fn parse_time_of_day_micros(t: &str) -> Option<i64> {
1551    let (time, frac_str) = match t.split_once('.') {
1552        Some((a, b)) => (a, Some(b)),
1553        None => (t, None),
1554    };
1555    let bytes = time.as_bytes();
1556    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
1557        return None;
1558    }
1559    let hh: i64 = time[0..2].parse().ok()?;
1560    let mm: i64 = time[3..5].parse().ok()?;
1561    let ss: i64 = time[6..8].parse().ok()?;
1562    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
1563        return None;
1564    }
1565    let frac_micros: i64 = match frac_str {
1566        None => 0,
1567        Some(f) => {
1568            // Pad right with zeros to 6 digits, then truncate extras.
1569            if f.is_empty() || f.len() > 9 {
1570                return None;
1571            }
1572            let mut padded = String::with_capacity(6);
1573            padded.push_str(&f[..f.len().min(6)]);
1574            while padded.len() < 6 {
1575                padded.push('0');
1576            }
1577            padded.parse().ok()?
1578        }
1579    };
1580    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
1581}
1582
1583/// Render an `Interval { months, micros }` in a PG-ish shape. The output
1584/// mirrors `psql`'s text format: years/months from the months part,
1585/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
1586/// omitted; an all-zero interval renders as `0`.
1587pub fn format_interval(months: i32, micros: i64) -> String {
1588    const MICROS_PER_DAY: i64 = 86_400_000_000;
1589    let mut parts: Vec<String> = Vec::new();
1590    let years = months / 12;
1591    let mons = months % 12;
1592    // PG renders the unit in the singular only for `+1`; `-1` and any
1593    // other value pluralise. Helper closes over that rule.
1594    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
1595        if n == 1 { singular } else { plural }
1596    };
1597    if years != 0 {
1598        parts.push(format!(
1599            "{years} {}",
1600            unit(i64::from(years), "year", "years")
1601        ));
1602    }
1603    if mons != 0 {
1604        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
1605    }
1606    let days = micros / MICROS_PER_DAY;
1607    let mut rem = micros % MICROS_PER_DAY;
1608    if days != 0 {
1609        parts.push(format!("{days} {}", unit(days, "day", "days")));
1610    }
1611    if rem != 0 {
1612        let neg = rem < 0;
1613        if neg {
1614            rem = -rem;
1615        }
1616        let secs = rem / 1_000_000;
1617        let frac = rem % 1_000_000;
1618        let hh = secs / 3600;
1619        let mm = (secs / 60) % 60;
1620        let ss = secs % 60;
1621        let sign = if neg { "-" } else { "" };
1622        if frac == 0 {
1623            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
1624        } else {
1625            let raw = format!("{frac:06}");
1626            let trimmed = raw.trim_end_matches('0');
1627            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
1628        }
1629    }
1630    if parts.is_empty() {
1631        "0".into()
1632    } else {
1633        parts.join(" ")
1634    }
1635}
1636
1637/// Add `months` (signed) to a `(year, month, day)` triple using PG's
1638/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
1639fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
1640    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
1641    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
1642    let new_month_zero = total_months.rem_euclid(12);
1643    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1644    let new_month = (new_month_zero as u32) + 1;
1645    let max_day = days_in_month(new_year, new_month);
1646    (new_year, new_month, d.min(max_day))
1647}
1648
1649const fn days_in_month(y: i32, m: u32) -> u32 {
1650    match m {
1651        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
1652        2 => {
1653            // Proleptic Gregorian leap rule.
1654            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
1655                29
1656            } else {
1657                28
1658            }
1659        }
1660        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
1661        // first, but be defensive) get the 30-day fallback.
1662        _ => 30,
1663    }
1664}
1665
1666/// v7.10.9 — render a TEXT[] in PG's external array form
1667/// (`{a,b,NULL}`). Elements containing whitespace, commas,
1668/// quotes, or braces get double-quoted with `\\` / `\"` escapes.
1669/// NULL elements use the literal token `NULL`. Public so the
1670/// wire layer can produce the canonical text-mode encoding.
1671pub fn format_text_array(items: &[Option<String>]) -> String {
1672    let mut out = String::with_capacity(2 + items.len() * 8);
1673    out.push('{');
1674    for (i, item) in items.iter().enumerate() {
1675        if i > 0 {
1676            out.push(',');
1677        }
1678        match item {
1679            None => out.push_str("NULL"),
1680            Some(s) => {
1681                let needs_quote = s.is_empty()
1682                    || s.eq_ignore_ascii_case("NULL")
1683                    || s.chars().any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
1684                if needs_quote {
1685                    out.push('"');
1686                    for c in s.chars() {
1687                        if c == '"' || c == '\\' {
1688                            out.push('\\');
1689                        }
1690                        out.push(c);
1691                    }
1692                    out.push('"');
1693                } else {
1694                    out.push_str(s);
1695                }
1696            }
1697        }
1698    }
1699    out.push('}');
1700    out
1701}
1702
1703/// v7.10.4 — render a BYTEA payload in PG's hex output format
1704/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
1705/// can emit the canonical bytea-as-text representation.
1706pub fn format_bytea_hex(b: &[u8]) -> String {
1707    let mut out = String::with_capacity(2 + 2 * b.len());
1708    out.push_str("\\x");
1709    const HEX: &[u8; 16] = b"0123456789abcdef";
1710    for byte in b {
1711        out.push(HEX[(byte >> 4) as usize] as char);
1712        out.push(HEX[(byte & 0x0F) as usize] as char);
1713    }
1714    out
1715}
1716
1717/// Render a `Numeric { scaled, scale }` as its decimal text form.
1718/// Negative `scaled` prepends `-` to the absolute value's digits; the
1719/// integer / fractional split is by character count, padding the
1720/// fractional side with leading zeros to exactly `scale` chars.
1721pub fn format_numeric(scaled: i128, scale: u8) -> String {
1722    if scale == 0 {
1723        return format!("{scaled}");
1724    }
1725    let negative = scaled < 0;
1726    let mag_str = scaled.unsigned_abs().to_string();
1727    let mag_bytes = mag_str.as_bytes();
1728    let scale_u = scale as usize;
1729    let mut out = String::with_capacity(mag_str.len() + 3);
1730    if negative {
1731        out.push('-');
1732    }
1733    if mag_bytes.len() <= scale_u {
1734        out.push('0');
1735        out.push('.');
1736        for _ in mag_bytes.len()..scale_u {
1737            out.push('0');
1738        }
1739        out.push_str(&mag_str);
1740    } else {
1741        let split = mag_bytes.len() - scale_u;
1742        out.push_str(&mag_str[..split]);
1743        out.push('.');
1744        out.push_str(&mag_str[split..]);
1745    }
1746    out
1747}
1748
1749fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
1750    match v {
1751        Value::Int(n) => Ok(Value::Int(n)),
1752        Value::BigInt(n) => i32::try_from(n)
1753            .map(Value::Int)
1754            .map_err(|_| EvalError::TypeMismatch {
1755                detail: format!("bigint {n} does not fit in int"),
1756            }),
1757        #[allow(clippy::cast_possible_truncation)]
1758        Value::Float(x) => Ok(Value::Int(x as i32)),
1759        Value::Text(s) => {
1760            s.trim()
1761                .parse::<i32>()
1762                .map(Value::Int)
1763                .map_err(|_| EvalError::TypeMismatch {
1764                    detail: format!("cannot parse {s:?} as int"),
1765                })
1766        }
1767        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
1768        other => Err(EvalError::TypeMismatch {
1769            detail: format!("cannot cast {:?} to int", other.data_type()),
1770        }),
1771    }
1772}
1773
1774fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
1775    match v {
1776        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
1777        Value::BigInt(n) => Ok(Value::BigInt(n)),
1778        #[allow(clippy::cast_possible_truncation)]
1779        Value::Float(x) => Ok(Value::BigInt(x as i64)),
1780        Value::Text(s) => {
1781            s.trim()
1782                .parse::<i64>()
1783                .map(Value::BigInt)
1784                .map_err(|_| EvalError::TypeMismatch {
1785                    detail: format!("cannot parse {s:?} as bigint"),
1786                })
1787        }
1788        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
1789        other => Err(EvalError::TypeMismatch {
1790            detail: format!("cannot cast {:?} to bigint", other.data_type()),
1791        }),
1792    }
1793}
1794
1795fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
1796    match v {
1797        Value::Int(n) => Ok(Value::Float(f64::from(n))),
1798        #[allow(clippy::cast_precision_loss)]
1799        Value::BigInt(n) => Ok(Value::Float(n as f64)),
1800        Value::Float(x) => Ok(Value::Float(x)),
1801        Value::Text(s) => {
1802            s.trim()
1803                .parse::<f64>()
1804                .map(Value::Float)
1805                .map_err(|_| EvalError::TypeMismatch {
1806                    detail: format!("cannot parse {s:?} as float"),
1807                })
1808        }
1809        other => Err(EvalError::TypeMismatch {
1810            detail: format!("cannot cast {:?} to float", other.data_type()),
1811        }),
1812    }
1813}
1814
1815fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
1816    match v {
1817        Value::Bool(b) => Ok(Value::Bool(b)),
1818        Value::Int(n) => Ok(Value::Bool(n != 0)),
1819        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
1820        Value::Text(s) => {
1821            let lo = s.trim().to_ascii_lowercase();
1822            match lo.as_str() {
1823                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
1824                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
1825                _ => Err(EvalError::TypeMismatch {
1826                    detail: format!("cannot parse {s:?} as bool"),
1827                }),
1828            }
1829        }
1830        other => Err(EvalError::TypeMismatch {
1831            detail: format!("cannot cast {:?} to bool", other.data_type()),
1832        }),
1833    }
1834}
1835
1836/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
1837/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
1838pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
1839    match v {
1840        Value::Null => Ok(Value::Null),
1841        Value::Vector(v) => Ok(Value::Vector(v)),
1842        Value::Text(s) => parse_vector_text(&s)
1843            .map(Value::Vector)
1844            .ok_or(EvalError::TypeMismatch {
1845                detail: format!("cannot parse {s:?} as a vector literal"),
1846            }),
1847        other => Err(EvalError::TypeMismatch {
1848            detail: format!("::vector requires text input, got {:?}", other.data_type()),
1849        }),
1850    }
1851}
1852
1853/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
1854fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
1855    let trimmed = s.trim();
1856    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
1857    let trimmed_inner = inner.trim();
1858    if trimmed_inner.is_empty() {
1859        return Some(Vec::new());
1860    }
1861    let mut out = Vec::new();
1862    for part in trimmed_inner.split(',') {
1863        let f: f32 = part.trim().parse().ok()?;
1864        out.push(f);
1865    }
1866    Some(out)
1867}
1868
1869fn literal_to_value(l: &Literal) -> Value {
1870    match l {
1871        Literal::Integer(n) => {
1872            if let Ok(small) = i32::try_from(*n) {
1873                Value::Int(small)
1874            } else {
1875                Value::BigInt(*n)
1876            }
1877        }
1878        Literal::Float(x) => Value::Float(*x),
1879        Literal::String(s) => Value::Text(s.clone()),
1880        Literal::Vector(v) => Value::Vector(v.clone()),
1881        Literal::Bool(b) => Value::Bool(*b),
1882        Literal::Null => Value::Null,
1883        Literal::Interval { months, micros, .. } => Value::Interval {
1884            months: *months,
1885            micros: *micros,
1886        },
1887    }
1888}
1889
1890fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1891    if let Some(q) = &c.qualifier {
1892        // Multi-table evaluation (joins): the synthesised schema uses
1893        // composite column names "alias.column" so we look that up
1894        // directly. Falls back to the single-table case below if the
1895        // composite isn't present.
1896        let composite = alloc::format!("{q}.{name}", name = c.name);
1897        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
1898            return Ok(row.values[pos].clone());
1899        }
1900        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
1901            qualifier: q.clone(),
1902        })?;
1903        if q != expected {
1904            return Err(EvalError::UnknownQualifier {
1905                qualifier: q.clone(),
1906            });
1907        }
1908    }
1909    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
1910        return Ok(row.values[pos].clone());
1911    }
1912    // Bare-name fallback for joined schemas: match any single composite
1913    // column ending in ".<name>"; ambiguity is an error.
1914    let suffix = alloc::format!(".{name}", name = c.name);
1915    let mut matches = ctx
1916        .columns
1917        .iter()
1918        .enumerate()
1919        .filter(|(_, s)| s.name.ends_with(&suffix));
1920    let first = matches.next();
1921    let extra = matches.next();
1922    match (first, extra) {
1923        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
1924        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
1925            detail: alloc::format!("ambiguous column reference: {}", c.name),
1926        }),
1927        _ => Err(EvalError::ColumnNotFound {
1928            name: c.name.clone(),
1929        }),
1930    }
1931}
1932
1933fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
1934    match (op, v) {
1935        (_, Value::Null) => Ok(Value::Null),
1936        (UnOp::Neg, Value::Int(n)) => {
1937            n.checked_neg()
1938                .map(Value::Int)
1939                .ok_or(EvalError::TypeMismatch {
1940                    detail: "integer overflow on unary -".into(),
1941                })
1942        }
1943        (UnOp::Neg, Value::BigInt(n)) => {
1944            n.checked_neg()
1945                .map(Value::BigInt)
1946                .ok_or(EvalError::TypeMismatch {
1947                    detail: "bigint overflow on unary -".into(),
1948                })
1949        }
1950        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
1951        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
1952            detail: format!("unary - applied to {:?}", other.data_type()),
1953        }),
1954        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
1955        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
1956            detail: format!("NOT applied to {:?}", other.data_type()),
1957        }),
1958    }
1959}
1960
1961/// v7.9.27b — true when two values are "not distinct" per PG:
1962/// both NULL counts as equal; otherwise reduces to regular Eq.
1963fn values_not_distinct(l: &Value, r: &Value) -> bool {
1964    match (l, r) {
1965        (Value::Null, Value::Null) => true,
1966        (Value::Null, _) | (_, Value::Null) => false,
1967        _ => l == r,
1968    }
1969}
1970
1971fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1972    // SQL three-valued logic for AND / OR with NULL is special — handle before
1973    // the general NULL-propagation rule.
1974    if let BinOp::And = op {
1975        return and_3vl(l, r);
1976    }
1977    if let BinOp::Or = op {
1978        return or_3vl(l, r);
1979    }
1980    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
1981    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
1982    if let BinOp::IsNotDistinctFrom = op {
1983        return Ok(Value::Bool(values_not_distinct(&l, &r)));
1984    }
1985    if let BinOp::IsDistinctFrom = op {
1986        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
1987    }
1988    // Everything else: any NULL operand → NULL.
1989    if l.is_null() || r.is_null() {
1990        return Ok(Value::Null);
1991    }
1992    // NUMERIC arithmetic and comparisons run in fixed-point; promote
1993    // integers to a common NUMERIC scale and stay in i128 throughout.
1994    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
1995        return apply_binary_numeric(op, l, r);
1996    }
1997    // Date / Timestamp arithmetic. PG semantics:
1998    //   * date + int      → date  (int is days)
1999    //   * int + date      → date
2000    //   * date - int      → date
2001    //   * date - date     → int   (days, signed)
2002    //   * timestamp - timestamp → bigint (microseconds, signed)
2003    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
2004    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
2005        return Ok(result);
2006    }
2007    match op {
2008        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
2009        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
2010        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
2011        BinOp::Div => div_op(l, r),
2012        BinOp::L2Distance => l2_distance(l, r),
2013        BinOp::InnerProduct => inner_product(l, r),
2014        BinOp::CosineDistance => cosine_distance(l, r),
2015        BinOp::Concat => Ok(text_concat(&l, &r)),
2016        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
2017        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
2018        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
2019        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
2020        BinOp::JsonContains => crate::json::contains(&l, &r),
2021        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2022            compare(op, &l, &r)
2023        }
2024        BinOp::And
2025        | BinOp::Or
2026        | BinOp::IsDistinctFrom
2027        | BinOp::IsNotDistinctFrom => unreachable!("handled above"),
2028    }
2029}
2030
2031/// Calendar arithmetic. Returns `Some(value)` when the operand pair
2032/// is a date/time combo this function understands, `None` to let the
2033/// caller fall through to the regular numeric / text paths.
2034fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
2035    let int_value = |v: &Value| -> Option<i64> {
2036        match v {
2037            Value::SmallInt(n) => Some(i64::from(*n)),
2038            Value::Int(n) => Some(i64::from(*n)),
2039            Value::BigInt(n) => Some(*n),
2040            _ => None,
2041        }
2042    };
2043    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
2044    // DATE-integer subtraction, otherwise the latter swallows the
2045    // former with an `int_value(Date) = None` no-op fall-through.
2046    match (l, r) {
2047        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
2048            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
2049        }
2050        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
2051            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
2052                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
2053            })?;
2054            return Ok(Some(Value::BigInt(delta)));
2055        }
2056        _ => {}
2057    }
2058    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
2059    // date ± interval → date (if interval is pure days/months with no
2060    // sub-day component) else timestamp, interval ± interval → interval.
2061    if let Some(out) = apply_binary_interval(op, l, r)? {
2062        return Ok(Some(out));
2063    }
2064    match (l, r) {
2065        (Value::Date(d), other) if op == BinOp::Add => {
2066            if let Some(n) = int_value(other) {
2067                let days = i64::from(*d).saturating_add(n);
2068                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2069                    detail: "DATE + integer overflows DATE range".into(),
2070                })?;
2071                return Ok(Some(Value::Date(days32)));
2072            }
2073        }
2074        (other, Value::Date(d)) if op == BinOp::Add => {
2075            if let Some(n) = int_value(other) {
2076                let days = i64::from(*d).saturating_add(n);
2077                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2078                    detail: "integer + DATE overflows DATE range".into(),
2079                })?;
2080                return Ok(Some(Value::Date(days32)));
2081            }
2082        }
2083        (Value::Date(d), other) if op == BinOp::Sub => {
2084            if let Some(n) = int_value(other) {
2085                let days = i64::from(*d).saturating_sub(n);
2086                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2087                    detail: "DATE - integer overflows DATE range".into(),
2088                })?;
2089                return Ok(Some(Value::Date(days32)));
2090            }
2091        }
2092        _ => {}
2093    }
2094    Ok(None)
2095}
2096
2097/// INTERVAL-aware binary ops. Recognises:
2098///   timestamp ± interval → timestamp
2099///   date ± interval      → date (if interval is integral days/months only)
2100///                       → timestamp (if interval has sub-day micros)
2101///   interval ± interval  → interval
2102/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
2103/// the caller can fall through.
2104fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
2105    // Normalise so the interval (if any) is always on the right for Add;
2106    // Sub stays left-handed because it isn't commutative.
2107    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
2108        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
2109        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
2110        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
2111        _ => return Ok(None),
2112    };
2113    let Value::Interval {
2114        months: rhs_months,
2115        micros: rhs_us,
2116    } = rhs
2117    else {
2118        unreachable!("rhs guaranteed to be Interval by the match above");
2119    };
2120    let signed_months = i64::from(*rhs_months) * sign;
2121    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
2122        detail: "INTERVAL micros overflows on negation".into(),
2123    })?;
2124    match lhs {
2125        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
2126            *t,
2127            signed_months,
2128            signed_micros,
2129        )?))),
2130        Value::Date(d) => {
2131            // Date + interval stays a date when the interval has zero
2132            // sub-day microseconds; otherwise promote to TIMESTAMP at
2133            // midnight of the (months-shifted) date first.
2134            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
2135            if day_aligned {
2136                let micros_per_day = 86_400_000_000_i64;
2137                let days_delta = signed_micros / micros_per_day;
2138                let shifted = shift_date_by_months(*d, signed_months)?;
2139                let new_days =
2140                    i64::from(shifted)
2141                        .checked_add(days_delta)
2142                        .ok_or(EvalError::TypeMismatch {
2143                            detail: "DATE ± INTERVAL overflows DATE range".into(),
2144                        })?;
2145                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
2146                    detail: "DATE ± INTERVAL overflows DATE range".into(),
2147                })?;
2148                Ok(Some(Value::Date(days32)))
2149            } else {
2150                let base =
2151                    i64::from(*d)
2152                        .checked_mul(86_400_000_000)
2153                        .ok_or(EvalError::TypeMismatch {
2154                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
2155                        })?;
2156                Ok(Some(Value::Timestamp(add_interval_to_micros(
2157                    base,
2158                    signed_months,
2159                    signed_micros,
2160                )?)))
2161            }
2162        }
2163        Value::Interval {
2164            months: lhs_months,
2165            micros: lhs_us,
2166        } => {
2167            let new_months = i64::from(*lhs_months)
2168                .checked_add(signed_months)
2169                .and_then(|n| i32::try_from(n).ok())
2170                .ok_or(EvalError::TypeMismatch {
2171                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
2172                })?;
2173            let new_micros = lhs_us
2174                .checked_add(signed_micros)
2175                .ok_or(EvalError::TypeMismatch {
2176                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
2177                })?;
2178            Ok(Some(Value::Interval {
2179                months: new_months,
2180                micros: new_micros,
2181            }))
2182        }
2183        _ => Err(EvalError::TypeMismatch {
2184            detail: format!(
2185                "operator {op:?} not defined for {:?} and INTERVAL",
2186                lhs.data_type()
2187            ),
2188        }),
2189    }
2190}
2191
2192/// Shift a `Date` by a signed number of months using the PG clamp rule.
2193fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
2194    let (y, m, day) = civil_from_days(d);
2195    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
2196        detail: "INTERVAL months delta out of i32 range".into(),
2197    })?;
2198    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
2199    Ok(days_from_civil(ny, nm, nd))
2200}
2201
2202/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
2203/// Months part is applied through civil calendar with clamp-to-last-day;
2204/// micros part is plain i64 addition with overflow guard.
2205fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
2206    let mut out = t;
2207    if months != 0 {
2208        const MICROS_PER_DAY: i64 = 86_400_000_000;
2209        let days = out.div_euclid(MICROS_PER_DAY);
2210        let day_micros = out.rem_euclid(MICROS_PER_DAY);
2211        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2212            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
2213        })?;
2214        let shifted_days = shift_date_by_months(day_i32, months)?;
2215        out = i64::from(shifted_days)
2216            .checked_mul(MICROS_PER_DAY)
2217            .and_then(|n| n.checked_add(day_micros))
2218            .ok_or(EvalError::TypeMismatch {
2219                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
2220            })?;
2221    }
2222    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
2223        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
2224    })
2225}
2226
2227/// Dispatch for any binary op when at least one operand is NUMERIC.
2228/// Other-side integers / floats are promoted to a NUMERIC at a common
2229/// scale; all add / sub / mul / div / compare paths stay in i128.
2230#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
2231fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
2232    // Float still wins — Numeric + Float coerces both to f64 and runs
2233    // through the float path. PG demotes Numeric to float in this mix
2234    // too (the documented behaviour for `numeric + double precision`).
2235    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
2236    if float_path {
2237        let af = as_f64(&l)?;
2238        let bf = as_f64(&r)?;
2239        return match op {
2240            BinOp::Add => Ok(Value::Float(af + bf)),
2241            BinOp::Sub => Ok(Value::Float(af - bf)),
2242            BinOp::Mul => Ok(Value::Float(af * bf)),
2243            BinOp::Div => {
2244                if bf == 0.0 {
2245                    Err(EvalError::DivisionByZero)
2246                } else {
2247                    Ok(Value::Float(af / bf))
2248                }
2249            }
2250            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2251                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2252                    detail: "NaN in NUMERIC/Float comparison".into(),
2253                })?;
2254                Ok(Value::Bool(cmp_to_bool(op, ord)))
2255            }
2256            BinOp::Concat => Ok(text_concat(&l, &r)),
2257            other => Err(EvalError::TypeMismatch {
2258                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
2259            }),
2260        };
2261    }
2262    // Promote integer ↔ numeric to a shared scale (max of both sides).
2263    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
2264        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
2265    })?;
2266    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
2267        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
2268    })?;
2269    match op {
2270        BinOp::Add | BinOp::Sub => {
2271            let target_scale = sa.max(sb);
2272            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2273                detail: "NUMERIC overflow on rescale".into(),
2274            })?;
2275            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2276                detail: "NUMERIC overflow on rescale".into(),
2277            })?;
2278            let r = match op {
2279                BinOp::Add => lhs.checked_add(rhs),
2280                BinOp::Sub => lhs.checked_sub(rhs),
2281                _ => unreachable!(),
2282            }
2283            .ok_or(EvalError::TypeMismatch {
2284                detail: "NUMERIC overflow on +/-".into(),
2285            })?;
2286            Ok(Value::Numeric {
2287                scaled: r,
2288                scale: target_scale,
2289            })
2290        }
2291        BinOp::Mul => {
2292            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
2293                detail: "NUMERIC overflow on *".into(),
2294            })?;
2295            Ok(Value::Numeric {
2296                scaled,
2297                scale: sa.saturating_add(sb),
2298            })
2299        }
2300        BinOp::Div => {
2301            if b == 0 {
2302                return Err(EvalError::DivisionByZero);
2303            }
2304            // Result scale: keep the wider operand's scale. Pre-scale
2305            // the numerator so the integer division retains that many
2306            // fractional digits. Round half-away-from-zero.
2307            let target_scale = sa.max(sb);
2308            // Numerator effective scale becomes sa + target_scale; we
2309            // bring it up to (target_scale + sb) so the divisor's scale
2310            // cancels cleanly.
2311            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
2312            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
2313                detail: "NUMERIC overflow on / scaling".into(),
2314            })?;
2315            let half = if b >= 0 { b / 2 } else { -(b / 2) };
2316            let adj = if (num >= 0) == (b >= 0) {
2317                num + half
2318            } else {
2319                num - half
2320            };
2321            Ok(Value::Numeric {
2322                scaled: adj / b,
2323                scale: target_scale,
2324            })
2325        }
2326        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2327            let target_scale = sa.max(sb);
2328            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2329                detail: "NUMERIC overflow on rescale".into(),
2330            })?;
2331            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2332                detail: "NUMERIC overflow on rescale".into(),
2333            })?;
2334            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
2335        }
2336        BinOp::Concat => Ok(text_concat(&l, &r)),
2337        other => Err(EvalError::TypeMismatch {
2338            detail: format!("operator {other:?} not defined for NUMERIC"),
2339        }),
2340    }
2341}
2342
2343/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
2344/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
2345/// returns `None` and the caller raises a type error.
2346fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
2347    match v {
2348        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
2349        Value::Int(n) => Some((i128::from(*n), 0)),
2350        Value::SmallInt(n) => Some((i128::from(*n), 0)),
2351        Value::BigInt(n) => Some((i128::from(*n), 0)),
2352        _ => None,
2353    }
2354}
2355
2356fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
2357    if src == dst {
2358        return Some(scaled);
2359    }
2360    if dst > src {
2361        scaled.checked_mul(pow10_i128(dst - src))
2362    } else {
2363        let drop = pow10_i128(src - dst);
2364        let half = drop / 2;
2365        let r = if scaled >= 0 {
2366            scaled + half
2367        } else {
2368            scaled - half
2369        };
2370        Some(r / drop)
2371    }
2372}
2373
2374const fn pow10_i128(p: u8) -> i128 {
2375    let mut acc: i128 = 1;
2376    let mut i = 0;
2377    while i < p {
2378        acc *= 10;
2379        i += 1;
2380    }
2381    acc
2382}
2383
2384const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
2385    use core::cmp::Ordering::{Equal, Greater, Less};
2386    match op {
2387        BinOp::Eq => matches!(ord, Equal),
2388        BinOp::NotEq => !matches!(ord, Equal),
2389        BinOp::Lt => matches!(ord, Less),
2390        BinOp::LtEq => matches!(ord, Less | Equal),
2391        BinOp::Gt => matches!(ord, Greater),
2392        BinOp::GtEq => matches!(ord, Greater | Equal),
2393        _ => false,
2394    }
2395}
2396
2397/// SQL `||` string concatenation. Operands are coerced to text via the same
2398/// rule as `::text` cast. NULL propagates (handled above; this function only
2399/// runs with non-NULL operands).
2400fn text_concat(l: &Value, r: &Value) -> Value {
2401    // v7.11.8 — PG `||` overloads: TEXT[] || TEXT[] = concatenated array;
2402    // TEXT[] || TEXT (or TEXT || TEXT[]) prepends/appends the single
2403    // element. NULL || anything = NULL (PG semantics for arrays;
2404    // text concat treats NULL the same way after value_to_text).
2405    match (l, r) {
2406        (Value::Null, _) | (_, Value::Null) => {
2407            // PG text concat: NULL || x = NULL. Array concat: NULL || x = NULL.
2408            // Keep the legacy text path (value_to_text handles Null as ""),
2409            // but for arrays we surface real NULL to match PG.
2410            if matches!(l, Value::TextArray(_)) || matches!(r, Value::TextArray(_)) {
2411                return Value::Null;
2412            }
2413        }
2414        (Value::TextArray(a), Value::TextArray(b)) => {
2415            let mut out = a.clone();
2416            out.extend(b.iter().cloned());
2417            return Value::TextArray(out);
2418        }
2419        (Value::TextArray(a), Value::Text(s)) => {
2420            let mut out = a.clone();
2421            out.push(Some(s.clone()));
2422            return Value::TextArray(out);
2423        }
2424        (Value::Text(s), Value::TextArray(b)) => {
2425            let mut out: alloc::vec::Vec<Option<alloc::string::String>> =
2426                alloc::vec::Vec::with_capacity(1 + b.len());
2427            out.push(Some(s.clone()));
2428            out.extend(b.iter().cloned());
2429            return Value::TextArray(out);
2430        }
2431        _ => {}
2432    }
2433    let a = value_to_text(l);
2434    let b = value_to_text(r);
2435    Value::Text(a + &b)
2436}
2437
2438/// pgvector inner-product `<#>`. Returns the *negative* dot product so
2439/// smaller still means more similar — same convention as pgvector.
2440fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
2441    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
2442    let mut dot: f64 = 0.0;
2443    for (x, y) in a.iter().zip(b.iter()) {
2444        dot += f64::from(*x) * f64::from(*y);
2445    }
2446    Ok(Value::Float(-dot))
2447}
2448
2449/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
2450/// operand produces NaN (matches pgvector).
2451fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2452    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
2453    let mut dot: f64 = 0.0;
2454    let mut na: f64 = 0.0;
2455    let mut nb: f64 = 0.0;
2456    for (x, y) in a.iter().zip(b.iter()) {
2457        let xf = f64::from(*x);
2458        let yf = f64::from(*y);
2459        dot += xf * yf;
2460        na += xf * xf;
2461        nb += yf * yf;
2462    }
2463    let denom = sqrt_newton(na) * sqrt_newton(nb);
2464    if denom == 0.0 {
2465        return Ok(Value::Float(f64::NAN));
2466    }
2467    Ok(Value::Float(1.0 - dot / denom))
2468}
2469
2470fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
2471    // v6.0.1: SQ8 cells coming through the SQL evaluator are
2472    // dequantised to f32 here so the existing scalar distance
2473    // arithmetic stays intact. HNSW kNN search continues to use
2474    // the asymmetric ADC variant inside `cell_to_query_metric_
2475    // distance` — this path only runs when a vector expression
2476    // lands in the evaluator (full-scan ORDER BY, SELECT
2477    // projection of `v <-> $1`, etc.).
2478    let to_f32 = |v: Value| -> Option<Vec<f32>> {
2479        match v {
2480            Value::Vector(a) => Some(a),
2481            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
2482            // v6.0.3: bit-exact dequant for halfvec cells.
2483            Value::HalfVector(h) => Some(h.to_f32_vec()),
2484            _ => None,
2485        }
2486    };
2487    let l_ty = l.data_type();
2488    let r_ty = r.data_type();
2489    match (to_f32(l), to_f32(r)) {
2490        (Some(a), Some(b)) => {
2491            if a.len() != b.len() {
2492                return Err(EvalError::TypeMismatch {
2493                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
2494                });
2495            }
2496            Ok((a, b))
2497        }
2498        _ => Err(EvalError::TypeMismatch {
2499            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
2500        }),
2501    }
2502}
2503
2504/// Numeric arithmetic with widening.
2505/// - both `Int` → `Int` (with overflow check)
2506/// - `Int` op `BigInt` (either side) → `BigInt`
2507/// - any `Float` involved → `Float`
2508fn arith(
2509    l: Value,
2510    r: Value,
2511    int_op: impl Fn(i64, i64) -> Option<i64>,
2512    float_op: impl Fn(f64, f64) -> f64,
2513    op_name: &str,
2514) -> Result<Value, EvalError> {
2515    // Widen SmallInt to Int up front so the rest of the arithmetic
2516    // table only deals with Int / BigInt / Float pairs.
2517    let widen = |v: Value| -> Value {
2518        match v {
2519            Value::SmallInt(n) => Value::Int(i32::from(n)),
2520            other => other,
2521        }
2522    };
2523    let l = widen(l);
2524    let r = widen(r);
2525    match (l, r) {
2526        (Value::Int(a), Value::Int(b)) => {
2527            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
2528                detail: format!("integer overflow on {op_name}"),
2529            })?;
2530            if let Ok(small) = i32::try_from(result) {
2531                Ok(Value::Int(small))
2532            } else {
2533                Ok(Value::BigInt(result))
2534            }
2535        }
2536        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
2537            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
2538                detail: format!("bigint overflow on {op_name}"),
2539            })?;
2540            Ok(Value::BigInt(result))
2541        }
2542        (Value::BigInt(a), Value::BigInt(b)) => {
2543            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
2544                detail: format!("bigint overflow on {op_name}"),
2545            })?;
2546            Ok(Value::BigInt(result))
2547        }
2548        (a, b)
2549            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
2550        {
2551            let af = as_f64(&a)?;
2552            let bf = as_f64(&b)?;
2553            Ok(Value::Float(float_op(af, bf)))
2554        }
2555        (a, b) => Err(EvalError::TypeMismatch {
2556            detail: format!(
2557                "{op_name} applied to non-numeric: {:?} vs {:?}",
2558                a.data_type(),
2559                b.data_type()
2560            ),
2561        }),
2562    }
2563}
2564
2565/// L2 (Euclidean) distance between two vectors of equal dimension.
2566/// Returned as `Value::Float(d)` so it composes with the existing
2567/// comparison / sort plumbing. Mismatched dims or non-vector operands
2568/// raise `TypeMismatch`.
2569#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
2570fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2571    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
2572    // cells dequantise on the way in. Sub-f64 precision loss is
2573    // negligible vs the dequantisation noise the SQ8 path already
2574    // ships with.
2575    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
2576    let mut sum: f64 = 0.0;
2577    for (x, y) in a.iter().zip(b.iter()) {
2578        let d = f64::from(*x) - f64::from(*y);
2579        sum += d * d;
2580    }
2581    Ok(Value::Float(sqrt_newton(sum)))
2582}
2583
2584/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
2585/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
2586/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
2587/// f32-derived distances, always non-negative, never NaN).
2588fn sqrt_newton(x: f64) -> f64 {
2589    if x <= 0.0 {
2590        return 0.0;
2591    }
2592    let mut g = x;
2593    // 10 iterations is conservative; 6 already converges to ulp for typical
2594    // distances.
2595    for _ in 0..10 {
2596        g = 0.5 * (g + x / g);
2597    }
2598    g
2599}
2600
2601fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
2602    let any_float = matches!(l.data_type(), Some(DataType::Float))
2603        || matches!(r.data_type(), Some(DataType::Float));
2604    if any_float {
2605        let a = as_f64(&l)?;
2606        let b = as_f64(&r)?;
2607        if b == 0.0 {
2608            return Err(EvalError::DivisionByZero);
2609        }
2610        return Ok(Value::Float(a / b));
2611    }
2612    arith(
2613        l,
2614        r,
2615        |a, b| {
2616            if b == 0 { None } else { Some(a / b) }
2617        },
2618        |a, b| a / b,
2619        "/",
2620    )
2621    .map_err(|e| match e {
2622        // The closure returns None on b == 0; translate that into the dedicated
2623        // DivisionByZero variant instead of "integer overflow on /".
2624        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
2625        other => other,
2626    })
2627}
2628
2629fn as_f64(v: &Value) -> Result<f64, EvalError> {
2630    match v {
2631        Value::SmallInt(n) => Ok(f64::from(*n)),
2632        Value::Int(n) => Ok(f64::from(*n)),
2633        #[allow(clippy::cast_precision_loss)]
2634        Value::BigInt(n) => Ok(*n as f64),
2635        Value::Float(x) => Ok(*x),
2636        #[allow(clippy::cast_precision_loss)]
2637        Value::Numeric { scaled, scale } => {
2638            let mut div = 1.0_f64;
2639            for _ in 0..*scale {
2640                div *= 10.0;
2641            }
2642            Ok((*scaled as f64) / div)
2643        }
2644        other => Err(EvalError::TypeMismatch {
2645            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
2646        }),
2647    }
2648}
2649
2650fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
2651    let ord = match (l, r) {
2652        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
2653        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
2654        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
2655        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
2656        (a, b)
2657            if matches!(a.data_type(), Some(DataType::Float))
2658                || matches!(b.data_type(), Some(DataType::Float)) =>
2659        {
2660            let af = as_f64(a)?;
2661            let bf = as_f64(b)?;
2662            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2663                detail: "NaN in comparison".into(),
2664            })?
2665        }
2666        (Value::Text(a), Value::Text(b)) => a.cmp(b),
2667        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
2668        // Date / Timestamp compare on their integer storage repr.
2669        // Cross-domain (Date vs Timestamp) lifts the Date to the
2670        // matching midnight TIMESTAMP first.
2671        (Value::Date(a), Value::Date(b)) => a.cmp(b),
2672        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
2673        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
2674        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
2675        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
2676        // column against a text literal lifts the literal into the
2677        // matching domain (e.g. `day >= '2024-01-01'`).
2678        (Value::Date(a), Value::Text(b)) => {
2679            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2680                detail: format!("cannot parse {b:?} as DATE for comparison"),
2681            })?;
2682            a.cmp(&bd)
2683        }
2684        (Value::Text(a), Value::Date(b)) => {
2685            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2686                detail: format!("cannot parse {a:?} as DATE for comparison"),
2687            })?;
2688            ad.cmp(b)
2689        }
2690        (Value::Timestamp(a), Value::Text(b)) => {
2691            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2692                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
2693            })?;
2694            a.cmp(&bt)
2695        }
2696        (Value::Text(a), Value::Timestamp(b)) => {
2697            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2698                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
2699            })?;
2700            at.cmp(b)
2701        }
2702        (a, b) => {
2703            return Err(EvalError::TypeMismatch {
2704                detail: format!(
2705                    "comparison between {:?} and {:?}",
2706                    a.data_type(),
2707                    b.data_type()
2708                ),
2709            });
2710        }
2711    };
2712    let result = match op {
2713        BinOp::Eq => ord.is_eq(),
2714        BinOp::NotEq => !ord.is_eq(),
2715        BinOp::Lt => ord.is_lt(),
2716        BinOp::LtEq => ord.is_le(),
2717        BinOp::Gt => ord.is_gt(),
2718        BinOp::GtEq => ord.is_ge(),
2719        BinOp::And
2720        | BinOp::Or
2721        | BinOp::Add
2722        | BinOp::Sub
2723        | BinOp::Mul
2724        | BinOp::Div
2725        | BinOp::L2Distance
2726        | BinOp::InnerProduct
2727        | BinOp::CosineDistance
2728        | BinOp::Concat
2729        | BinOp::JsonGet
2730        | BinOp::JsonGetText
2731        | BinOp::JsonGetPath
2732        | BinOp::JsonGetPathText
2733        | BinOp::JsonContains
2734        | BinOp::IsDistinctFrom
2735        | BinOp::IsNotDistinctFrom => {
2736            unreachable!("compare() only called with comparison ops")
2737        }
2738    };
2739    Ok(Value::Bool(result))
2740}
2741
2742// SQL three-valued AND / OR.
2743fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2744    match (l, r) {
2745        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
2746        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
2747        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2748        (a, b) => Err(EvalError::TypeMismatch {
2749            detail: format!(
2750                "AND on non-boolean: {:?} and {:?}",
2751                a.data_type(),
2752                b.data_type()
2753            ),
2754        }),
2755    }
2756}
2757
2758fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2759    match (l, r) {
2760        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
2761        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
2762        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2763        (a, b) => Err(EvalError::TypeMismatch {
2764            detail: format!(
2765                "OR on non-boolean: {:?} and {:?}",
2766                a.data_type(),
2767                b.data_type()
2768            ),
2769        }),
2770    }
2771}
2772
2773#[cfg(test)]
2774mod tests {
2775    use super::*;
2776    use alloc::vec;
2777    use spg_storage::{ColumnSchema, Row};
2778
2779    fn col(name: &str, ty: DataType) -> ColumnSchema {
2780        ColumnSchema::new(name, ty, true)
2781    }
2782
2783    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
2784        EvalContext::new(cols, alias)
2785    }
2786
2787    fn lit(n: i64) -> Expr {
2788        Expr::Literal(Literal::Integer(n))
2789    }
2790
2791    fn null() -> Expr {
2792        Expr::Literal(Literal::Null)
2793    }
2794
2795    fn col_ref(name: &str) -> Expr {
2796        Expr::Column(ColumnName {
2797            qualifier: None,
2798            name: name.into(),
2799        })
2800    }
2801
2802    #[test]
2803    fn literal_evaluates_to_value() {
2804        let r = Row::new(vec![]);
2805        let cs: [ColumnSchema; 0] = [];
2806        let c = ctx(&cs, None);
2807        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
2808        assert_eq!(
2809            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
2810            Value::Float(1.5)
2811        );
2812        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
2813    }
2814
2815    #[test]
2816    fn column_lookup_unqualified() {
2817        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
2818        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
2819        let c = ctx(&cs, None);
2820        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
2821        assert_eq!(
2822            eval_expr(&col_ref("b"), &r, &c).unwrap(),
2823            Value::Text("hi".into())
2824        );
2825    }
2826
2827    #[test]
2828    fn column_not_found_errors() {
2829        let cs = vec![col("a", DataType::Int)];
2830        let r = Row::new(vec![Value::Int(0)]);
2831        let c = ctx(&cs, None);
2832        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
2833        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
2834    }
2835
2836    #[test]
2837    fn qualified_column_matches_alias() {
2838        let cs = vec![col("a", DataType::Int)];
2839        let r = Row::new(vec![Value::Int(5)]);
2840        let c = ctx(&cs, Some("u"));
2841        let qualified = Expr::Column(ColumnName {
2842            qualifier: Some("u".into()),
2843            name: "a".into(),
2844        });
2845        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
2846    }
2847
2848    #[test]
2849    fn qualified_column_unknown_alias_errors() {
2850        let cs = vec![col("a", DataType::Int)];
2851        let r = Row::new(vec![Value::Int(5)]);
2852        let c = ctx(&cs, Some("u"));
2853        let wrong = Expr::Column(ColumnName {
2854            qualifier: Some("x".into()),
2855            name: "a".into(),
2856        });
2857        assert!(matches!(
2858            eval_expr(&wrong, &r, &c).unwrap_err(),
2859            EvalError::UnknownQualifier { .. }
2860        ));
2861    }
2862
2863    #[test]
2864    fn arithmetic_with_widening() {
2865        let r = Row::new(vec![]);
2866        let cs: [ColumnSchema; 0] = [];
2867        let c = ctx(&cs, None);
2868        let e = Expr::Binary {
2869            lhs: alloc::boxed::Box::new(lit(2)),
2870            op: BinOp::Add,
2871            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
2872        };
2873        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
2874    }
2875
2876    #[test]
2877    fn division_by_zero_errors() {
2878        let r = Row::new(vec![]);
2879        let cs: [ColumnSchema; 0] = [];
2880        let c = ctx(&cs, None);
2881        let e = Expr::Binary {
2882            lhs: alloc::boxed::Box::new(lit(1)),
2883            op: BinOp::Div,
2884            rhs: alloc::boxed::Box::new(lit(0)),
2885        };
2886        assert_eq!(
2887            eval_expr(&e, &r, &c).unwrap_err(),
2888            EvalError::DivisionByZero
2889        );
2890    }
2891
2892    #[test]
2893    fn comparison_returns_bool() {
2894        let r = Row::new(vec![]);
2895        let cs: [ColumnSchema; 0] = [];
2896        let c = ctx(&cs, None);
2897        let e = Expr::Binary {
2898            lhs: alloc::boxed::Box::new(lit(1)),
2899            op: BinOp::Lt,
2900            rhs: alloc::boxed::Box::new(lit(2)),
2901        };
2902        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2903    }
2904
2905    #[test]
2906    fn null_propagates_through_arithmetic() {
2907        let r = Row::new(vec![]);
2908        let cs: [ColumnSchema; 0] = [];
2909        let c = ctx(&cs, None);
2910        let e = Expr::Binary {
2911            lhs: alloc::boxed::Box::new(lit(1)),
2912            op: BinOp::Add,
2913            rhs: alloc::boxed::Box::new(null()),
2914        };
2915        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2916    }
2917
2918    #[test]
2919    fn and_three_valued_logic() {
2920        let r = Row::new(vec![]);
2921        let cs: [ColumnSchema; 0] = [];
2922        let c = ctx(&cs, None);
2923        let tt = |a: bool, b_null: bool| Expr::Binary {
2924            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2925            op: BinOp::And,
2926            rhs: alloc::boxed::Box::new(if b_null {
2927                null()
2928            } else {
2929                Expr::Literal(Literal::Bool(true))
2930            }),
2931        };
2932        // FALSE AND NULL → FALSE
2933        assert_eq!(
2934            eval_expr(&tt(false, true), &r, &c).unwrap(),
2935            Value::Bool(false)
2936        );
2937        // TRUE AND NULL → NULL
2938        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
2939        // TRUE AND TRUE → TRUE
2940        assert_eq!(
2941            eval_expr(&tt(true, false), &r, &c).unwrap(),
2942            Value::Bool(true)
2943        );
2944    }
2945
2946    #[test]
2947    fn or_three_valued_logic() {
2948        let r = Row::new(vec![]);
2949        let cs: [ColumnSchema; 0] = [];
2950        let c = ctx(&cs, None);
2951        let or_with_null = |a: bool| Expr::Binary {
2952            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2953            op: BinOp::Or,
2954            rhs: alloc::boxed::Box::new(null()),
2955        };
2956        // TRUE OR NULL → TRUE
2957        assert_eq!(
2958            eval_expr(&or_with_null(true), &r, &c).unwrap(),
2959            Value::Bool(true)
2960        );
2961        // FALSE OR NULL → NULL
2962        assert_eq!(
2963            eval_expr(&or_with_null(false), &r, &c).unwrap(),
2964            Value::Null
2965        );
2966    }
2967
2968    #[test]
2969    fn not_on_null_is_null() {
2970        let r = Row::new(vec![]);
2971        let cs: [ColumnSchema; 0] = [];
2972        let c = ctx(&cs, None);
2973        let e = Expr::Unary {
2974            op: UnOp::Not,
2975            expr: alloc::boxed::Box::new(null()),
2976        };
2977        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2978    }
2979
2980    #[test]
2981    fn text_comparison_lexicographic() {
2982        let r = Row::new(vec![]);
2983        let cs: [ColumnSchema; 0] = [];
2984        let c = ctx(&cs, None);
2985        let e = Expr::Binary {
2986            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
2987            op: BinOp::Lt,
2988            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
2989        };
2990        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2991    }
2992
2993    #[test]
2994    fn interval_format_basics() {
2995        assert_eq!(format_interval(0, 0), "0");
2996        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
2997        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
2998        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
2999        assert_eq!(
3000            format_interval(0, 86_400_000_000 + 9_000_000),
3001            "1 day 00:00:09"
3002        );
3003        assert_eq!(format_interval(14, 0), "1 year 2 mons");
3004        assert_eq!(format_interval(-1, 0), "-1 mons");
3005    }
3006
3007    #[test]
3008    fn interval_add_to_timestamp_micros_part() {
3009        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
3010        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
3011        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
3012        let expected = ts + 3_600_000_000;
3013        assert_eq!(r, expected);
3014    }
3015
3016    #[test]
3017    fn interval_clamp_month_end() {
3018        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
3019        let d = days_from_civil(2024, 1, 31);
3020        let shifted = shift_date_by_months(d, 1).unwrap();
3021        let (y, m, day) = civil_from_days(shifted);
3022        assert_eq!((y, m, day), (2024, 2, 29));
3023        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
3024        let d = days_from_civil(2023, 1, 31);
3025        let shifted = shift_date_by_months(d, 1).unwrap();
3026        let (y, m, day) = civil_from_days(shifted);
3027        assert_eq!((y, m, day), (2023, 2, 28));
3028        // 2024-03-31 - 1 month = 2024-02-29.
3029        let d = days_from_civil(2024, 3, 31);
3030        let shifted = shift_date_by_months(d, -1).unwrap();
3031        let (y, m, day) = civil_from_days(shifted);
3032        assert_eq!((y, m, day), (2024, 2, 29));
3033    }
3034
3035    #[test]
3036    fn interval_date_plus_pure_days_stays_date() {
3037        // DATE + INTERVAL '7 days' must stay DATE.
3038        let d = days_from_civil(2024, 6, 1);
3039        let lhs = Value::Date(d);
3040        let rhs = Value::Interval {
3041            months: 0,
3042            micros: 7 * 86_400_000_000,
3043        };
3044        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
3045            .unwrap()
3046            .unwrap();
3047        let expected = days_from_civil(2024, 6, 8);
3048        assert_eq!(v, Value::Date(expected));
3049    }
3050
3051    #[test]
3052    fn interval_date_plus_sub_day_lifts_to_timestamp() {
3053        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
3054        let d = days_from_civil(2024, 6, 1);
3055        let lhs = Value::Date(d);
3056        let rhs = Value::Interval {
3057            months: 0,
3058            micros: 3_600_000_000,
3059        };
3060        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
3061            .unwrap()
3062            .unwrap();
3063        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
3064        assert_eq!(v, Value::Timestamp(expected));
3065    }
3066}