Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21
22use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
23use spg_storage::{ColumnSchema, DataType, Row, Value};
24
25/// Resolution context for evaluating a single row. `table_alias` is the alias
26/// (or table name) callers should accept as the qualifier on a column ref —
27/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
28#[derive(Debug, Clone)]
29pub struct EvalContext<'a> {
30    pub columns: &'a [ColumnSchema],
31    pub table_alias: Option<&'a str>,
32    /// v6.1.1 — bound parameters for `$N` placeholders inside the
33    /// expression tree. Empty for simple queries; populated by the
34    /// prepared-statement Execute path with Bind values converted
35    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
36    pub params: &'a [Value],
37}
38
39impl<'a> EvalContext<'a> {
40    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
41        Self {
42            columns,
43            table_alias,
44            params: &[],
45        }
46    }
47
48    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
49    /// resolution. The slice must outlive the context; callers
50    /// construct it from the prepared statement's Bind values.
51    #[must_use]
52    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
53        self.params = params;
54        self
55    }
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum EvalError {
60    ColumnNotFound { name: String },
61    UnknownQualifier { qualifier: String },
62    DivisionByZero,
63    TypeMismatch { detail: String },
64    /// v6.1.1 — `$N` reference past the number of bound parameters.
65    /// Either the client sent too few in Bind, or the SQL has a
66    /// placeholder the prepared statement didn't account for.
67    PlaceholderOutOfRange { n: u16, bound: u16 },
68}
69
70impl core::fmt::Display for EvalError {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        match self {
73            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
74            Self::UnknownQualifier { qualifier } => {
75                write!(f, "unknown table qualifier: {qualifier}")
76            }
77            Self::DivisionByZero => f.write_str("division by zero"),
78            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
79            Self::PlaceholderOutOfRange { n, bound } => write!(
80                f,
81                "parameter ${n} referenced but only {bound} bound by client"
82            ),
83        }
84    }
85}
86
87pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
88    match expr {
89        Expr::Literal(l) => Ok(literal_to_value(l)),
90        Expr::Column(c) => resolve_column(c, row, ctx),
91        Expr::Placeholder(n) => {
92            let idx = usize::from(*n).saturating_sub(1);
93            ctx.params
94                .get(idx)
95                .cloned()
96                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
97                    n: *n,
98                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
99                })
100        }
101        Expr::Unary { op, expr } => {
102            let v = eval_expr(expr, row, ctx)?;
103            apply_unary(*op, v)
104        }
105        Expr::Binary { lhs, op, rhs } => {
106            let l = eval_expr(lhs, row, ctx)?;
107            let r = eval_expr(rhs, row, ctx)?;
108            apply_binary(*op, l, r)
109        }
110        Expr::Cast { expr, target } => {
111            let v = eval_expr(expr, row, ctx)?;
112            cast_value(v, *target)
113        }
114        Expr::IsNull { expr, negated } => {
115            let v = eval_expr(expr, row, ctx)?;
116            let is_null = matches!(v, Value::Null);
117            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
118        }
119        Expr::FunctionCall { name, args } => {
120            let evaluated: Result<Vec<Value>, _> =
121                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
122            apply_function(name, &evaluated?)
123        }
124        Expr::Like {
125            expr,
126            pattern,
127            negated,
128        } => {
129            let v = eval_expr(expr, row, ctx)?;
130            let p = eval_expr(pattern, row, ctx)?;
131            // NULL on either side propagates to NULL — same as PG.
132            let (text, pat) = match (v, p) {
133                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
134                (Value::Text(a), Value::Text(b)) => (a, b),
135                (Value::Text(_), other) | (other, _) => {
136                    return Err(EvalError::TypeMismatch {
137                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
138                    });
139                }
140            };
141            let m = like_match(&text, &pat);
142            Ok(Value::Bool(if *negated { !m } else { m }))
143        }
144        Expr::Extract { field, source } => {
145            let v = eval_expr(source, row, ctx)?;
146            extract_field(*field, &v)
147        }
148        // v4.10: subquery nodes should have been resolved into
149        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
150        // before the row loop. Anything reaching here is a bug.
151        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
152            Err(EvalError::TypeMismatch {
153                detail: "subquery reached row eval — engine resolver bug".into(),
154            })
155        }
156        // v4.12: window functions should have been rewritten into
157        // synthetic __win_N column references by
158        // exec_select_with_window before row eval. Anything
159        // reaching here is similarly a bug.
160        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
161            detail: "window function reached row eval — engine rewrite bug".into(),
162        }),
163        // v7.10.10 — `ARRAY[expr, expr, …]` constructor. Build a
164        // `Value::TextArray` where each NULL maps to `None` and
165        // each Text element maps to `Some(...)`. Non-TEXT
166        // elements get coerced via `to_string()` here so the
167        // user can write `ARRAY[1, 2, 3]` against a TEXT[]
168        // column (PG also accepts this in many positions).
169        Expr::Array(items) => {
170            let mut out: Vec<Option<String>> = Vec::with_capacity(items.len());
171            for elem in items {
172                match eval_expr(elem, row, ctx)? {
173                    Value::Null => out.push(None),
174                    Value::Text(s) => out.push(Some(s)),
175                    other => out.push(Some(value_to_text_for_array(&other))),
176                }
177            }
178            Ok(Value::TextArray(out))
179        }
180        // v7.10.12 — `arr[i]` PG-style 1-based indexing.
181        // Out-of-range indices (including i ≤ 0) return NULL.
182        Expr::ArraySubscript { target, index } => {
183            let target_v = eval_expr(target, row, ctx)?;
184            let idx_v = eval_expr(index, row, ctx)?;
185            if matches!(target_v, Value::Null) || matches!(idx_v, Value::Null) {
186                return Ok(Value::Null);
187            }
188            let Value::TextArray(items) = target_v else {
189                return Err(EvalError::TypeMismatch {
190                    detail: format!(
191                        "subscript target must be an array, got {:?}",
192                        target_v.data_type()
193                    ),
194                });
195            };
196            let i: i64 = match idx_v {
197                Value::Int(n) => i64::from(n),
198                Value::BigInt(n) => n,
199                Value::SmallInt(n) => i64::from(n),
200                other => {
201                    return Err(EvalError::TypeMismatch {
202                        detail: format!("array subscript must be integer, got {:?}", other.data_type()),
203                    });
204                }
205            };
206            if i < 1 {
207                return Ok(Value::Null);
208            }
209            let pos = (i - 1) as usize;
210            match items.get(pos) {
211                Some(Some(s)) => Ok(Value::Text(s.clone())),
212                Some(None) | None => Ok(Value::Null),
213            }
214        }
215        // v7.10.12 — `x op ANY(arr)` / `x op ALL(arr)`. PG
216        // 3VL: ANY → true if any element compares-true; NULL if
217        // no true but some NULL; false otherwise. ALL: false if
218        // any compares-false; NULL if no false but some NULL;
219        // true otherwise.
220        Expr::AnyAll {
221            expr,
222            op,
223            array,
224            is_any,
225        } => {
226            let lhs = eval_expr(expr, row, ctx)?;
227            let arr = eval_expr(array, row, ctx)?;
228            if matches!(arr, Value::Null) {
229                return Ok(Value::Null);
230            }
231            let Value::TextArray(items) = arr else {
232                return Err(EvalError::TypeMismatch {
233                    detail: format!(
234                        "ANY/ALL right-hand side must be an array, got {:?}",
235                        arr.data_type()
236                    ),
237                });
238            };
239            let mut saw_null = matches!(lhs, Value::Null);
240            let mut saw_match = false;
241            let mut saw_mismatch = false;
242            for elem in items {
243                let elem_v = match elem {
244                    Some(s) => Value::Text(s),
245                    None => {
246                        saw_null = true;
247                        continue;
248                    }
249                };
250                if matches!(lhs, Value::Null) {
251                    saw_null = true;
252                    continue;
253                }
254                match apply_binary(*op, lhs.clone(), elem_v) {
255                    Ok(Value::Bool(true)) => saw_match = true,
256                    Ok(Value::Bool(false)) => saw_mismatch = true,
257                    Ok(Value::Null) => saw_null = true,
258                    Ok(other) => {
259                        return Err(EvalError::TypeMismatch {
260                            detail: format!(
261                                "ANY/ALL comparison didn't return Bool: {:?}",
262                                other.data_type()
263                            ),
264                        });
265                    }
266                    Err(e) => return Err(e),
267                }
268            }
269            let result = if *is_any {
270                if saw_match {
271                    Value::Bool(true)
272                } else if saw_null {
273                    Value::Null
274                } else {
275                    Value::Bool(false)
276                }
277            } else if saw_mismatch {
278                Value::Bool(false)
279            } else if saw_null {
280                Value::Null
281            } else {
282                Value::Bool(true)
283            };
284            Ok(result)
285        }
286    }
287}
288
289/// v7.10.10 — best-effort text rendering for non-TEXT array
290/// elements (numbers, bools, etc.). The PG rule is that
291/// `ARRAY[1, 2]` is `int[]`, but SPG's v7.10 only models TEXT[],
292/// so we widen by stringifying. NUMERIC formatting goes through
293/// the existing canonical helpers to stay consistent with
294/// `format_numeric` / `format_date` etc.
295fn value_to_text_for_array(v: &Value) -> String {
296    match v {
297        Value::Text(s) | Value::Json(s) => s.clone(),
298        Value::Int(n) => n.to_string(),
299        Value::BigInt(n) => n.to_string(),
300        Value::SmallInt(n) => n.to_string(),
301        Value::Bool(b) => if *b { "true".into() } else { "false".into() },
302        Value::Float(x) => format!("{x}"),
303        Value::Date(d) => format_date(*d),
304        Value::Timestamp(t) => format_timestamp(*t),
305        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
306        _ => format!("{v:?}"),
307    }
308}
309
310/// Pull an integer component (year / month / ... / microsecond) out
311/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
312/// when the source isn't a calendar type.
313fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
314    use spg_sql::ast::ExtractField as F;
315    if matches!(v, Value::Null) {
316        return Ok(Value::Null);
317    }
318    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
319    // the months part, the rest from the microseconds part. PG matches
320    // this convention (months is normalised modulo 12 for MONTH).
321    if let Value::Interval { months, micros } = *v {
322        let years = months / 12;
323        let mons = months % 12;
324        let secs_total = micros / 1_000_000;
325        let frac = micros % 1_000_000;
326        let result = match field {
327            F::Year => i64::from(years),
328            F::Month => i64::from(mons),
329            F::Day => micros / 86_400_000_000,
330            F::Hour => (secs_total / 3600) % 24,
331            F::Minute => (secs_total / 60) % 60,
332            F::Second => secs_total % 60,
333            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
334        };
335        return Ok(Value::BigInt(result));
336    }
337    let (days, day_micros) = match *v {
338        Value::Date(d) => (d, 0_i64),
339        Value::Timestamp(t) => {
340            let days = t.div_euclid(86_400_000_000);
341            let day_micros = t.rem_euclid(86_400_000_000);
342            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
343        }
344        _ => {
345            return Err(EvalError::TypeMismatch {
346                detail: format!(
347                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
348                    v.data_type()
349                ),
350            });
351        }
352    };
353    let (y, m, d) = civil_components(days);
354    let secs = day_micros / 1_000_000;
355    let hh = secs / 3600;
356    let mm = (secs / 60) % 60;
357    let ss = secs % 60;
358    let frac = day_micros % 1_000_000;
359    let result = match field {
360        F::Year => i64::from(y),
361        F::Month => i64::from(m),
362        F::Day => i64::from(d),
363        F::Hour => hh,
364        F::Minute => mm,
365        F::Second => ss,
366        F::Microsecond => ss * 1_000_000 + frac,
367    };
368    Ok(Value::BigInt(result))
369}
370
371/// Internal wrapper around the file-private `civil_from_days` so the
372/// public surface area doesn't change. Returns `(year, month, day)`.
373fn civil_components(days: i32) -> (i32, u32, u32) {
374    civil_from_days(days)
375}
376
377/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
378/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
379/// literal `%`. Matches the whole input — no implicit anchoring needed
380/// since SQL `LIKE` is always full-string.
381fn like_match(text: &str, pattern: &str) -> bool {
382    let text: Vec<char> = text.chars().collect();
383    let pat: Vec<char> = pattern.chars().collect();
384    like_match_inner(&text, 0, &pat, 0)
385}
386
387fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
388    while pi < pat.len() {
389        match pat[pi] {
390            '%' => {
391                // Collapse consecutive `%` and try every possible split.
392                while pi < pat.len() && pat[pi] == '%' {
393                    pi += 1;
394                }
395                if pi == pat.len() {
396                    return true;
397                }
398                for k in ti..=text.len() {
399                    if like_match_inner(text, k, pat, pi) {
400                        return true;
401                    }
402                }
403                return false;
404            }
405            '_' => {
406                if ti >= text.len() {
407                    return false;
408                }
409                ti += 1;
410                pi += 1;
411            }
412            '\\' if pi + 1 < pat.len() => {
413                let want = pat[pi + 1];
414                if ti >= text.len() || text[ti] != want {
415                    return false;
416                }
417                ti += 1;
418                pi += 2;
419            }
420            c => {
421                if ti >= text.len() || text[ti] != c {
422                    return false;
423                }
424                ti += 1;
425                pi += 1;
426            }
427        }
428    }
429    ti == text.len()
430}
431
432/// Dispatch on lowercased function name. v1.4 implements only a handful of
433/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
434fn apply_function(name: &str, args: &[Value]) -> Result<Value, EvalError> {
435    match name.to_ascii_lowercase().as_str() {
436        "length" => {
437            if args.len() != 1 {
438                return Err(EvalError::TypeMismatch {
439                    detail: format!("length() takes 1 arg, got {}", args.len()),
440                });
441            }
442            match &args[0] {
443                Value::Null => Ok(Value::Null),
444                Value::Text(s) => {
445                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
446                    Ok(Value::Int(n))
447                }
448                // v7.10.4 — PG semantics: length(bytea) returns
449                // byte count (= octet_length). Without this branch
450                // mailrs's INSERT … SELECT length(body) … against a
451                // BYTEA column would type-mismatch.
452                Value::Bytes(b) => {
453                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
454                    Ok(Value::Int(n))
455                }
456                other => Err(EvalError::TypeMismatch {
457                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
458                }),
459            }
460        }
461        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
462        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
463        // to length() for bytea by design.
464        "octet_length" => {
465            if args.len() != 1 {
466                return Err(EvalError::TypeMismatch {
467                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
468                });
469            }
470            match &args[0] {
471                Value::Null => Ok(Value::Null),
472                Value::Text(s) => {
473                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
474                    Ok(Value::Int(n))
475                }
476                Value::Bytes(b) => {
477                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
478                    Ok(Value::Int(n))
479                }
480                other => Err(EvalError::TypeMismatch {
481                    detail: format!(
482                        "octet_length() needs text or bytea, got {:?}",
483                        other.data_type()
484                    ),
485                }),
486            }
487        }
488        "upper" => {
489            if args.len() != 1 {
490                return Err(EvalError::TypeMismatch {
491                    detail: format!("upper() takes 1 arg, got {}", args.len()),
492                });
493            }
494            match &args[0] {
495                Value::Null => Ok(Value::Null),
496                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
497                other => Err(EvalError::TypeMismatch {
498                    detail: format!("upper() needs text, got {:?}", other.data_type()),
499                }),
500            }
501        }
502        "lower" => {
503            if args.len() != 1 {
504                return Err(EvalError::TypeMismatch {
505                    detail: format!("lower() takes 1 arg, got {}", args.len()),
506                });
507            }
508            match &args[0] {
509                Value::Null => Ok(Value::Null),
510                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
511                other => Err(EvalError::TypeMismatch {
512                    detail: format!("lower() needs text, got {:?}", other.data_type()),
513                }),
514            }
515        }
516        "abs" => {
517            if args.len() != 1 {
518                return Err(EvalError::TypeMismatch {
519                    detail: format!("abs() takes 1 arg, got {}", args.len()),
520                });
521            }
522            match &args[0] {
523                Value::Null => Ok(Value::Null),
524                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
525                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
526                Value::Float(x) => Ok(Value::Float(x.abs())),
527                other => Err(EvalError::TypeMismatch {
528                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
529                }),
530            }
531        }
532        "coalesce" => {
533            for a in args {
534                if !matches!(a, Value::Null) {
535                    return Ok(a.clone());
536                }
537            }
538            Ok(Value::Null)
539        }
540        "date_trunc" => date_trunc(args),
541        "date_part" => date_part(args),
542        "age" => age(args),
543        "to_char" => to_char(args),
544        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
545        "encode" => encode_text(args),
546        "decode" => decode_text(args),
547        "error_on_null" => error_on_null(args),
548        other => Err(EvalError::TypeMismatch {
549            detail: format!("unknown function `{other}`"),
550        }),
551    }
552}
553
554/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
555/// arguments; SPG's value space treats Text as the byte container
556/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
557/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
558/// hex.
559fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
560    if args.len() != 2 {
561        return Err(EvalError::TypeMismatch {
562            detail: format!("encode() takes 2 args, got {}", args.len()),
563        });
564    }
565    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
566        return Ok(Value::Null);
567    }
568    let bytes: &[u8] = match &args[0] {
569        Value::Text(s) => s.as_bytes(),
570        other => {
571            return Err(EvalError::TypeMismatch {
572                detail: format!(
573                    "encode() expects text bytes, got {:?}",
574                    other.data_type()
575                ),
576            });
577        }
578    };
579    let fmt = match &args[1] {
580        Value::Text(s) => s.to_ascii_lowercase(),
581        other => {
582            return Err(EvalError::TypeMismatch {
583                detail: format!(
584                    "encode() format must be text, got {:?}",
585                    other.data_type()
586                ),
587            });
588        }
589    };
590    let out = match fmt.as_str() {
591        "base64" => b64_encode(bytes, B64_STD),
592        "base64url" => b64_encode(bytes, B64_URL),
593        "base32hex" => b32hex_encode(bytes),
594        "hex" => hex_encode(bytes),
595        other => {
596            return Err(EvalError::TypeMismatch {
597                detail: format!("encode(): unknown format `{other}`"),
598            });
599        }
600    };
601    Ok(Value::Text(out))
602}
603
604/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
605/// Text containing the raw decoded bytes (caller may CAST to bytea
606/// equivalent if SPG adds bytea later).
607fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
608    if args.len() != 2 {
609        return Err(EvalError::TypeMismatch {
610            detail: format!("decode() takes 2 args, got {}", args.len()),
611        });
612    }
613    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
614        return Ok(Value::Null);
615    }
616    let text = match &args[0] {
617        Value::Text(s) => s.as_str(),
618        other => {
619            return Err(EvalError::TypeMismatch {
620                detail: format!("decode() expects text, got {:?}", other.data_type()),
621            });
622        }
623    };
624    let fmt = match &args[1] {
625        Value::Text(s) => s.to_ascii_lowercase(),
626        other => {
627            return Err(EvalError::TypeMismatch {
628                detail: format!(
629                    "decode() format must be text, got {:?}",
630                    other.data_type()
631                ),
632            });
633        }
634    };
635    let bytes = match fmt.as_str() {
636        "base64" => b64_decode(text, B64_STD)?,
637        "base64url" => b64_decode(text, B64_URL)?,
638        "base32hex" => b32hex_decode(text)?,
639        "hex" => hex_decode(text)?,
640        other => {
641            return Err(EvalError::TypeMismatch {
642                detail: format!("decode(): unknown format `{other}`"),
643            });
644        }
645    };
646    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
647        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
648    })?;
649    Ok(Value::Text(s))
650}
651
652/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
653/// errors otherwise. Convenience to assert NOT NULL inside an
654/// expression without wrapping it in COALESCE + raise hacks.
655fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
656    if args.len() != 1 {
657        return Err(EvalError::TypeMismatch {
658            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
659        });
660    }
661    if matches!(args[0], Value::Null) {
662        return Err(EvalError::TypeMismatch {
663            detail: "error_on_null(): argument is NULL".into(),
664        });
665    }
666    Ok(args[0].clone())
667}
668
669// ── byte-level encoders ───────────────────────────────────────────
670
671const B64_STD: &[u8; 64] =
672    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
673const B64_URL: &[u8; 64] =
674    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
675const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
676
677fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
678    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
679    let mut i = 0;
680    while i + 3 <= bytes.len() {
681        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
682        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
683        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
684        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
685        out.push(alpha[(n & 0x3f) as usize] as char);
686        i += 3;
687    }
688    let rem = bytes.len() - i;
689    if rem == 1 {
690        let n = (bytes[i] as u32) << 16;
691        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
692        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
693        out.push('=');
694        out.push('=');
695    } else if rem == 2 {
696        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
697        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
698        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
699        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
700        out.push('=');
701    }
702    out
703}
704
705fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
706    let mut lookup = [255u8; 256];
707    for (i, &c) in alpha.iter().enumerate() {
708        lookup[c as usize] = i as u8;
709    }
710    let mut out = Vec::with_capacity(text.len() * 3 / 4);
711    let mut buf: u32 = 0;
712    let mut bits: u32 = 0;
713    for c in text.bytes() {
714        if c == b'=' {
715            break;
716        }
717        if c == b'\n' || c == b'\r' || c == b' ' {
718            continue;
719        }
720        let v = lookup[c as usize];
721        if v == 255 {
722            return Err(EvalError::TypeMismatch {
723                detail: format!("decode(base64): invalid char {:?}", c as char),
724            });
725        }
726        buf = (buf << 6) | v as u32;
727        bits += 6;
728        if bits >= 8 {
729            bits -= 8;
730            out.push(((buf >> bits) & 0xff) as u8);
731        }
732    }
733    Ok(out)
734}
735
736fn b32hex_encode(bytes: &[u8]) -> String {
737    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
738    let mut buf: u64 = 0;
739    let mut bits: u32 = 0;
740    for &b in bytes {
741        buf = (buf << 8) | b as u64;
742        bits += 8;
743        while bits >= 5 {
744            bits -= 5;
745            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
746        }
747    }
748    if bits > 0 {
749        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
750    }
751    // Pad to multiple of 8.
752    while out.len() % 8 != 0 {
753        out.push('=');
754    }
755    out
756}
757
758fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
759    let mut lookup = [255u8; 256];
760    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
761        lookup[c as usize] = i as u8;
762        // base32hex is case-insensitive — also map lowercase.
763        let lower = (c as char).to_ascii_lowercase() as u8;
764        lookup[lower as usize] = i as u8;
765    }
766    let mut out = Vec::with_capacity(text.len() * 5 / 8);
767    let mut buf: u64 = 0;
768    let mut bits: u32 = 0;
769    for c in text.bytes() {
770        if c == b'=' {
771            break;
772        }
773        if c == b'\n' || c == b'\r' || c == b' ' {
774            continue;
775        }
776        let v = lookup[c as usize];
777        if v == 255 {
778            return Err(EvalError::TypeMismatch {
779                detail: format!("decode(base32hex): invalid char {:?}", c as char),
780            });
781        }
782        buf = (buf << 5) | v as u64;
783        bits += 5;
784        if bits >= 8 {
785            bits -= 8;
786            out.push(((buf >> bits) & 0xff) as u8);
787        }
788    }
789    Ok(out)
790}
791
792fn hex_encode(bytes: &[u8]) -> String {
793    const HEX: &[u8; 16] = b"0123456789abcdef";
794    let mut out = String::with_capacity(bytes.len() * 2);
795    for &b in bytes {
796        out.push(HEX[(b >> 4) as usize] as char);
797        out.push(HEX[(b & 0xf) as usize] as char);
798    }
799    out
800}
801
802fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
803    let trimmed = text.trim();
804    if trimmed.len() % 2 != 0 {
805        return Err(EvalError::TypeMismatch {
806            detail: "decode(hex): input length must be even".into(),
807        });
808    }
809    let mut out = Vec::with_capacity(trimmed.len() / 2);
810    let mut hi: u8 = 0;
811    for (i, c) in trimmed.bytes().enumerate() {
812        let v = match c {
813            b'0'..=b'9' => c - b'0',
814            b'a'..=b'f' => c - b'a' + 10,
815            b'A'..=b'F' => c - b'A' + 10,
816            _ => {
817                return Err(EvalError::TypeMismatch {
818                    detail: format!("decode(hex): invalid char {:?}", c as char),
819                });
820            }
821        };
822        if i % 2 == 0 {
823            hi = v;
824        } else {
825            out.push((hi << 4) | v);
826        }
827    }
828    Ok(out)
829}
830
831/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
832/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
833/// same `BigInt` return shape; PG returns double precision but we keep the
834/// integer convention so the runner's `query I` shape works unchanged.
835fn date_part(args: &[Value]) -> Result<Value, EvalError> {
836    use spg_sql::ast::ExtractField as F;
837    if args.len() != 2 {
838        return Err(EvalError::TypeMismatch {
839            detail: format!("date_part() takes 2 args, got {}", args.len()),
840        });
841    }
842    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
843        return Ok(Value::Null);
844    }
845    let Value::Text(field_name) = &args[0] else {
846        return Err(EvalError::TypeMismatch {
847            detail: format!(
848                "date_part() needs a text field, got {:?}",
849                args[0].data_type()
850            ),
851        });
852    };
853    let field = match field_name.to_ascii_lowercase().as_str() {
854        "year" => F::Year,
855        "month" => F::Month,
856        "day" => F::Day,
857        "hour" => F::Hour,
858        "minute" => F::Minute,
859        "second" => F::Second,
860        "microsecond" | "microseconds" => F::Microsecond,
861        other => {
862            return Err(EvalError::TypeMismatch {
863                detail: format!(
864                    "unknown date_part field {other:?}; \
865                     supported: year, month, day, hour, minute, second, microsecond"
866                ),
867            });
868        }
869    };
870    extract_field(field, &args[1])
871}
872
873/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
874/// micros-only interval (no months normalisation) because PG's
875/// month-justification rule is sensitive to the day-of-month walk and
876/// adds material complexity for marginal corpus value.
877///
878/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
879/// the dispatcher errors instead of guessing a clock source. Callers
880/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
881/// explicitly so the clock reference is visible at the SQL layer.
882fn age(args: &[Value]) -> Result<Value, EvalError> {
883    if args.is_empty() || args.len() > 2 {
884        return Err(EvalError::TypeMismatch {
885            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
886        });
887    }
888    if args.iter().any(|v| matches!(v, Value::Null)) {
889        return Ok(Value::Null);
890    }
891    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
892    // stays as-is; anything else errors.
893    let to_micros = |v: &Value| -> Result<i64, EvalError> {
894        match v {
895            Value::Timestamp(t) => Ok(*t),
896            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
897            other => Err(EvalError::TypeMismatch {
898                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
899            }),
900        }
901    };
902    if args.len() == 1 {
903        return Err(EvalError::TypeMismatch {
904            detail: "single-arg age() is unsupported in v2.12 \
905                     (use age(CURRENT_DATE, t) explicitly)"
906                .into(),
907        });
908    }
909    let a = to_micros(&args[0])?;
910    let b = to_micros(&args[1])?;
911    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
912        detail: "age() subtraction overflows i64 microseconds".into(),
913    })?;
914    Ok(Value::Interval {
915        months: 0,
916        micros: delta,
917    })
918}
919
920/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
921/// format template. Supports the high-traffic placeholders:
922///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
923/// Unrecognised characters pass through literally so the template's
924/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
925fn to_char(args: &[Value]) -> Result<Value, EvalError> {
926    use core::fmt::Write as _;
927    if args.len() != 2 {
928        return Err(EvalError::TypeMismatch {
929            detail: format!("to_char() takes 2 args, got {}", args.len()),
930        });
931    }
932    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
933        return Ok(Value::Null);
934    }
935    let Value::Text(fmt) = &args[1] else {
936        return Err(EvalError::TypeMismatch {
937            detail: format!(
938                "to_char() needs a text format, got {:?}",
939                args[1].data_type()
940            ),
941        });
942    };
943    let (days, day_micros) = match &args[0] {
944        Value::Date(d) => (*d, 0_i64),
945        Value::Timestamp(t) => {
946            let days = t.div_euclid(86_400_000_000);
947            (
948                i32::try_from(days).unwrap_or(i32::MAX),
949                t.rem_euclid(86_400_000_000),
950            )
951        }
952        other => {
953            return Err(EvalError::TypeMismatch {
954                detail: format!(
955                    "to_char() needs DATE or TIMESTAMP, got {:?}",
956                    other.data_type()
957                ),
958            });
959        }
960    };
961    let (y, mo, d) = civil_from_days(days);
962    let secs = day_micros / 1_000_000;
963    let frac = day_micros % 1_000_000;
964    // div_euclid keeps every value non-negative — the casts below are
965    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
966    // 1_000_000)`, so all three quantities fit in u32.
967    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
968    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
969    let ss = u32::try_from(secs % 60).unwrap_or(0);
970    let hh12 = match hh24 % 12 {
971        0 => 12,
972        x => x,
973    };
974    let ampm = if hh24 < 12 { "AM" } else { "PM" };
975    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
976    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
977
978    let mut out = String::with_capacity(fmt.len() + 8);
979    let bytes = fmt.as_bytes();
980    let mut i = 0;
981    // write! against a String never fails — discard the Result.
982    while i < bytes.len() {
983        // Try the longest prefixes first so "YYYY" wins over "YY".
984        let rest = &bytes[i..];
985        if rest.starts_with(b"YYYY") {
986            let _ = write!(out, "{y:04}");
987            i += 4;
988        } else if rest.starts_with(b"YY") {
989            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
990            let yy = (y.rem_euclid(100)) as u32;
991            let _ = write!(out, "{yy:02}");
992            i += 2;
993        } else if rest.starts_with(b"Month") {
994            out.push_str(MONTH_FULL[(mo - 1) as usize]);
995            i += 5;
996        } else if rest.starts_with(b"Mon") {
997            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
998            i += 3;
999        } else if rest.starts_with(b"MM") {
1000            let _ = write!(out, "{mo:02}");
1001            i += 2;
1002        } else if rest.starts_with(b"DD") {
1003            let _ = write!(out, "{d:02}");
1004            i += 2;
1005        } else if rest.starts_with(b"HH24") {
1006            let _ = write!(out, "{hh24:02}");
1007            i += 4;
1008        } else if rest.starts_with(b"HH12") {
1009            let _ = write!(out, "{hh12:02}");
1010            i += 4;
1011        } else if rest.starts_with(b"MI") {
1012            let _ = write!(out, "{mi:02}");
1013            i += 2;
1014        } else if rest.starts_with(b"SS") {
1015            let _ = write!(out, "{ss:02}");
1016            i += 2;
1017        } else if rest.starts_with(b"MS") {
1018            let _ = write!(out, "{ms:03}");
1019            i += 2;
1020        } else if rest.starts_with(b"US") {
1021            let _ = write!(out, "{us:06}");
1022            i += 2;
1023        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
1024            out.push_str(ampm);
1025            i += 2;
1026        } else {
1027            // Pass any non-placeholder byte through verbatim.
1028            out.push(bytes[i] as char);
1029            i += 1;
1030        }
1031    }
1032    Ok(Value::Text(out))
1033}
1034
1035const MONTH_FULL: [&str; 12] = [
1036    "January",
1037    "February",
1038    "March",
1039    "April",
1040    "May",
1041    "June",
1042    "July",
1043    "August",
1044    "September",
1045    "October",
1046    "November",
1047    "December",
1048];
1049const MONTH_ABBR: [&str; 12] = [
1050    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1051];
1052
1053/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
1054/// requested calendar boundary (year / month / day / hour / minute /
1055/// second). Returns the truncated `TIMESTAMP`. NULL on either side
1056/// propagates to NULL.
1057fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
1058    if args.len() != 2 {
1059        return Err(EvalError::TypeMismatch {
1060            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
1061        });
1062    }
1063    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
1064        return Ok(Value::Null);
1065    }
1066    let Value::Text(unit) = &args[0] else {
1067        return Err(EvalError::TypeMismatch {
1068            detail: format!(
1069                "date_trunc() needs a text unit, got {:?}",
1070                args[0].data_type()
1071            ),
1072        });
1073    };
1074    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
1075    // midnight first; the result is always TIMESTAMP.
1076    let micros = match &args[1] {
1077        Value::Timestamp(t) => *t,
1078        Value::Date(d) => i64::from(*d) * 86_400_000_000,
1079        other => {
1080            return Err(EvalError::TypeMismatch {
1081                detail: format!(
1082                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
1083                    other.data_type()
1084                ),
1085            });
1086        }
1087    };
1088    let unit_lc = unit.to_ascii_lowercase();
1089    let days = micros.div_euclid(86_400_000_000);
1090    let day_micros = micros.rem_euclid(86_400_000_000);
1091    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1092    let (y, m, _) = civil_from_days(day_i32);
1093    let truncated = match unit_lc.as_str() {
1094        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
1095        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
1096        "day" => days * 86_400_000_000,
1097        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
1098        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
1099        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
1100        other => {
1101            return Err(EvalError::TypeMismatch {
1102                detail: format!(
1103                    "unknown date_trunc unit {other:?}; \
1104                     supported: year, month, day, hour, minute, second"
1105                ),
1106            });
1107        }
1108    };
1109    Ok(Value::Timestamp(truncated))
1110}
1111
1112/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
1113pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
1114    if matches!(v, Value::Null) {
1115        return Ok(Value::Null);
1116    }
1117    match target {
1118        CastTarget::Vector => cast_to_vector(v),
1119        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
1120        CastTarget::Int => cast_numeric_to_int(v),
1121        CastTarget::BigInt => cast_numeric_to_bigint(v),
1122        CastTarget::Float => cast_numeric_to_float(v),
1123        CastTarget::Bool => cast_to_bool(v),
1124        CastTarget::Date => cast_to_date(v),
1125        // TIMESTAMP and TIMESTAMPTZ have identical runtime
1126        // representation (i64 microseconds UTC).
1127        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
1128        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
1129        // is supported (the mailrs idiom: `$1::INTERVAL` where the
1130        // bound param is a string like `'7 days'`).
1131        CastTarget::Interval => cast_to_interval(v),
1132        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
1133        // (validation is the producer's responsibility, same as
1134        // the column-INSERT path).
1135        CastTarget::Json | CastTarget::Jsonb => match v {
1136            Value::Json(s) => Ok(Value::Json(s)),
1137            Value::Text(s) => Ok(Value::Json(s)),
1138            other => Err(EvalError::TypeMismatch {
1139                detail: alloc::format!(
1140                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
1141                    other.data_type()
1142                ),
1143            }),
1144        },
1145        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1146        // pg_catalog; surface a clear error.
1147        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1148            detail:
1149                "::regtype / ::regclass not supported on SPG \
1150                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1151                    .into(),
1152        }),
1153        // v7.10.11 — `::TEXT[]`. Decode PG external array form
1154        // when input is Text; pass through unchanged when it is
1155        // already TextArray. Anything else is a type mismatch.
1156        CastTarget::TextArray => match v {
1157            Value::TextArray(items) => Ok(Value::TextArray(items)),
1158            Value::Text(s) => decode_text_array_external(&s).map(Value::TextArray),
1159            other => Err(EvalError::TypeMismatch {
1160                detail: alloc::format!(
1161                    "::TEXT[] only accepts TEXT / TEXT[] inputs, got {:?}",
1162                    other.data_type()
1163                ),
1164            }),
1165        },
1166    }
1167}
1168
1169/// v7.10.11 — same decoder as `decode_text_array_literal` in
1170/// `lib.rs`, but lives here so the eval-time cast path stays
1171/// inside `spg-engine::eval`. Kept in lock-step with the engine
1172/// `coerce_value` decoder by tests.
1173fn decode_text_array_external(s: &str) -> Result<Vec<Option<String>>, EvalError> {
1174    let trimmed = s.trim();
1175    let inner = trimmed
1176        .strip_prefix('{')
1177        .and_then(|x| x.strip_suffix('}'))
1178        .ok_or_else(|| EvalError::TypeMismatch {
1179            detail: alloc::format!("TEXT[] literal {s:?} must be enclosed in '{{...}}'"),
1180        })?;
1181    let mut out: Vec<Option<String>> = Vec::new();
1182    if inner.trim().is_empty() {
1183        return Ok(out);
1184    }
1185    let bytes = inner.as_bytes();
1186    let mut i = 0;
1187    while i <= bytes.len() {
1188        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1189            i += 1;
1190        }
1191        if i < bytes.len() && bytes[i] == b'"' {
1192            i += 1;
1193            let mut buf = String::new();
1194            while i < bytes.len() && bytes[i] != b'"' {
1195                if bytes[i] == b'\\' && i + 1 < bytes.len() {
1196                    buf.push(bytes[i + 1] as char);
1197                    i += 2;
1198                } else {
1199                    buf.push(bytes[i] as char);
1200                    i += 1;
1201                }
1202            }
1203            if i >= bytes.len() {
1204                return Err(EvalError::TypeMismatch {
1205                    detail: "unterminated quoted element in TEXT[] literal".into(),
1206                });
1207            }
1208            i += 1;
1209            out.push(Some(buf));
1210        } else {
1211            let start = i;
1212            while i < bytes.len() && bytes[i] != b',' {
1213                i += 1;
1214            }
1215            let raw = inner[start..i].trim();
1216            if raw.eq_ignore_ascii_case("NULL") {
1217                out.push(None);
1218            } else {
1219                out.push(Some(raw.to_string()));
1220            }
1221        }
1222        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
1223            i += 1;
1224        }
1225        if i >= bytes.len() {
1226            break;
1227        }
1228        if bytes[i] != b',' {
1229            return Err(EvalError::TypeMismatch {
1230                detail: "expected ',' between TEXT[] elements".into(),
1231            });
1232        }
1233        i += 1;
1234    }
1235    Ok(out)
1236}
1237
1238fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
1239    match v {
1240        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
1241        Value::Text(s) => {
1242            let (months, micros) = spg_sql::parser::parse_interval_text(&s)
1243                .ok_or_else(|| EvalError::TypeMismatch {
1244                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
1245                })?;
1246            Ok(Value::Interval { months, micros })
1247        }
1248        other => Err(EvalError::TypeMismatch {
1249            detail: alloc::format!(
1250                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
1251                other.data_type()
1252            ),
1253        }),
1254    }
1255}
1256
1257fn cast_to_date(v: Value) -> Result<Value, EvalError> {
1258    match v {
1259        Value::Date(d) => Ok(Value::Date(d)),
1260        // Integer literals carry days since the Unix epoch — used by
1261        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
1262        Value::Int(n) => Ok(Value::Date(n)),
1263        Value::BigInt(n) => {
1264            i32::try_from(n)
1265                .map(Value::Date)
1266                .map_err(|_| EvalError::TypeMismatch {
1267                    detail: "bigint days-since-epoch out of DATE range".into(),
1268                })
1269        }
1270        // Timestamp truncates to its day boundary.
1271        Value::Timestamp(t) => {
1272            let days = t.div_euclid(86_400_000_000);
1273            i32::try_from(days)
1274                .map(Value::Date)
1275                .map_err(|_| EvalError::TypeMismatch {
1276                    detail: "timestamp out of DATE range".into(),
1277                })
1278        }
1279        Value::Text(s) => parse_date_literal(&s)
1280            .map(Value::Date)
1281            .ok_or(EvalError::TypeMismatch {
1282                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
1283            }),
1284        other => Err(EvalError::TypeMismatch {
1285            detail: format!("cannot cast {:?} to DATE", other.data_type()),
1286        }),
1287    }
1288}
1289
1290fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
1291    match v {
1292        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
1293        // Int / BigInt carry microseconds since the Unix epoch — used
1294        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
1295        // the wall clock as a plain integer literal.
1296        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
1297        Value::BigInt(n) => Ok(Value::Timestamp(n)),
1298        // DATE → TIMESTAMP picks midnight on the date.
1299        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
1300        Value::Text(s) => {
1301            parse_timestamp_literal(&s)
1302                .map(Value::Timestamp)
1303                .ok_or(EvalError::TypeMismatch {
1304                    detail: format!(
1305                        "cannot parse {s:?} as TIMESTAMP \
1306                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
1307                    ),
1308                })
1309        }
1310        other => Err(EvalError::TypeMismatch {
1311            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
1312        }),
1313    }
1314}
1315
1316fn value_to_text(v: &Value) -> String {
1317    match v {
1318        // v7.5.0 — Value is #[non_exhaustive]; any future variant
1319        // without explicit text rendering hits the Debug fallback
1320        // at the end.
1321        Value::SmallInt(n) => format!("{n}"),
1322        Value::Int(n) => format!("{n}"),
1323        Value::BigInt(n) => format!("{n}"),
1324        Value::Float(x) => format!("{x}"),
1325        // v4.9: JSON renders identically to Text — both are raw UTF-8.
1326        Value::Text(s) | Value::Json(s) => s.clone(),
1327        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
1328        Value::Vector(v) => {
1329            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
1330            format!("[{}]", cells.join(", "))
1331        }
1332        // v6.0.1: render SQ8 cells dequantised, so SELECT output
1333        // matches the pgvector wire shape clients expect. The
1334        // recall envelope already absorbs the ≤ (max-min)/255/2
1335        // dequantisation error.
1336        Value::Sq8Vector(q) => {
1337            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
1338                .iter()
1339                .map(|x| format!("{x}"))
1340                .collect();
1341            format!("[{}]", cells.join(", "))
1342        }
1343        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
1344        // for SELECT output.
1345        Value::HalfVector(h) => {
1346            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
1347            format!("[{}]", cells.join(", "))
1348        }
1349        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
1350        Value::Date(d) => format_date(*d),
1351        Value::Timestamp(t) => format_timestamp(*t),
1352        Value::Interval { months, micros } => format_interval(*months, *micros),
1353        Value::Null => "NULL".into(),
1354        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
1355        _ => format!("{v:?}"),
1356    }
1357}
1358
1359/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
1360/// for pre-1970 dates render with a leading `-` on the year.
1361pub fn format_date(days: i32) -> String {
1362    let (y, m, d) = civil_from_days(days);
1363    format!("{y:04}-{m:02}-{d:02}")
1364}
1365
1366/// Render a `Timestamp` (microseconds since epoch) as
1367/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
1368/// dropped; a whole-second value has no fractional part.
1369pub fn format_timestamp(micros: i64) -> String {
1370    const MICROS_PER_DAY: i64 = 86_400_000_000;
1371    // Split into day + intra-day part with proper floor division so
1372    // negative timestamps render right too.
1373    let days = micros.div_euclid(MICROS_PER_DAY);
1374    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
1375    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1376    let (y, m, d) = civil_from_days(day_i32);
1377    let secs = day_micros / 1_000_000;
1378    let frac = day_micros % 1_000_000;
1379    let hh = secs / 3600;
1380    let mm = (secs / 60) % 60;
1381    let ss = secs % 60;
1382    if frac == 0 {
1383        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
1384    } else {
1385        // Strip trailing zeros from the 6-digit fractional component.
1386        let raw = format!("{frac:06}");
1387        let trimmed = raw.trim_end_matches('0');
1388        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
1389    }
1390}
1391
1392/// Howard Hinnant's `civil_from_days` — converts days since the Unix
1393/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
1394/// directions of this calendar conversion live in `eval.rs` so the
1395/// engine never reaches for `std` time facilities.
1396#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1397fn civil_from_days(days: i32) -> (i32, u32, u32) {
1398    let z = i64::from(days) + 719_468;
1399    let era = z.div_euclid(146_097);
1400    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
1401    // every other quantity below — `as u32` truncations are safe by
1402    // construction.
1403    let doe = (z - era * 146_097) as u32;
1404    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
1405    let y_base = i64::from(yoe) + era * 400;
1406    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
1407    let mp = (5 * doy + 2) / 153;
1408    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
1409    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1410    let y = if m <= 2 { y_base + 1 } else { y_base };
1411    (y as i32, m, d)
1412}
1413
1414/// Inverse of `civil_from_days` — converts (year, month, day) to days
1415/// since 1970-01-01. Out-of-range months / days saturate.
1416#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1417pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
1418    let y_adj = if m <= 2 {
1419        i64::from(y) - 1
1420    } else {
1421        i64::from(y)
1422    };
1423    let era = y_adj.div_euclid(400);
1424    let yoe = (y_adj - era * 400) as u32;
1425    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
1426    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1427    let total = era * 146_097 + i64::from(doe) - 719_468;
1428    i32::try_from(total).unwrap_or(i32::MAX)
1429}
1430
1431/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
1432/// `None` on shape / numeric failure; the engine surfaces that as a
1433/// `TypeMismatch` with the original text included.
1434pub fn parse_date_literal(s: &str) -> Option<i32> {
1435    let bytes = s.as_bytes();
1436    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
1437        return None;
1438    }
1439    let y: i32 = s[0..4].parse().ok()?;
1440    let m: u32 = s[5..7].parse().ok()?;
1441    let d: u32 = s[8..10].parse().ok()?;
1442    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
1443        return None;
1444    }
1445    Some(days_from_civil(y, m, d))
1446}
1447
1448/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
1449/// (microseconds since Unix epoch). The time portion is optional;
1450/// missing → midnight. The fractional portion accepts 1–6 digits and
1451/// pads with zeros to microseconds.
1452pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
1453    let trimmed = s.trim();
1454    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
1455        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
1456        None => (trimmed, None),
1457    };
1458    let days = parse_date_literal(date_part)?;
1459    let day_micros = match time_part {
1460        None => 0,
1461        Some(t) => parse_time_of_day_micros(t)?,
1462    };
1463    Some(i64::from(days) * 86_400_000_000 + day_micros)
1464}
1465
1466fn parse_time_of_day_micros(t: &str) -> Option<i64> {
1467    let (time, frac_str) = match t.split_once('.') {
1468        Some((a, b)) => (a, Some(b)),
1469        None => (t, None),
1470    };
1471    let bytes = time.as_bytes();
1472    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
1473        return None;
1474    }
1475    let hh: i64 = time[0..2].parse().ok()?;
1476    let mm: i64 = time[3..5].parse().ok()?;
1477    let ss: i64 = time[6..8].parse().ok()?;
1478    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
1479        return None;
1480    }
1481    let frac_micros: i64 = match frac_str {
1482        None => 0,
1483        Some(f) => {
1484            // Pad right with zeros to 6 digits, then truncate extras.
1485            if f.is_empty() || f.len() > 9 {
1486                return None;
1487            }
1488            let mut padded = String::with_capacity(6);
1489            padded.push_str(&f[..f.len().min(6)]);
1490            while padded.len() < 6 {
1491                padded.push('0');
1492            }
1493            padded.parse().ok()?
1494        }
1495    };
1496    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
1497}
1498
1499/// Render an `Interval { months, micros }` in a PG-ish shape. The output
1500/// mirrors `psql`'s text format: years/months from the months part,
1501/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
1502/// omitted; an all-zero interval renders as `0`.
1503pub fn format_interval(months: i32, micros: i64) -> String {
1504    const MICROS_PER_DAY: i64 = 86_400_000_000;
1505    let mut parts: Vec<String> = Vec::new();
1506    let years = months / 12;
1507    let mons = months % 12;
1508    // PG renders the unit in the singular only for `+1`; `-1` and any
1509    // other value pluralise. Helper closes over that rule.
1510    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
1511        if n == 1 { singular } else { plural }
1512    };
1513    if years != 0 {
1514        parts.push(format!(
1515            "{years} {}",
1516            unit(i64::from(years), "year", "years")
1517        ));
1518    }
1519    if mons != 0 {
1520        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
1521    }
1522    let days = micros / MICROS_PER_DAY;
1523    let mut rem = micros % MICROS_PER_DAY;
1524    if days != 0 {
1525        parts.push(format!("{days} {}", unit(days, "day", "days")));
1526    }
1527    if rem != 0 {
1528        let neg = rem < 0;
1529        if neg {
1530            rem = -rem;
1531        }
1532        let secs = rem / 1_000_000;
1533        let frac = rem % 1_000_000;
1534        let hh = secs / 3600;
1535        let mm = (secs / 60) % 60;
1536        let ss = secs % 60;
1537        let sign = if neg { "-" } else { "" };
1538        if frac == 0 {
1539            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
1540        } else {
1541            let raw = format!("{frac:06}");
1542            let trimmed = raw.trim_end_matches('0');
1543            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
1544        }
1545    }
1546    if parts.is_empty() {
1547        "0".into()
1548    } else {
1549        parts.join(" ")
1550    }
1551}
1552
1553/// Add `months` (signed) to a `(year, month, day)` triple using PG's
1554/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
1555fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
1556    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
1557    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
1558    let new_month_zero = total_months.rem_euclid(12);
1559    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1560    let new_month = (new_month_zero as u32) + 1;
1561    let max_day = days_in_month(new_year, new_month);
1562    (new_year, new_month, d.min(max_day))
1563}
1564
1565const fn days_in_month(y: i32, m: u32) -> u32 {
1566    match m {
1567        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
1568        2 => {
1569            // Proleptic Gregorian leap rule.
1570            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
1571                29
1572            } else {
1573                28
1574            }
1575        }
1576        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
1577        // first, but be defensive) get the 30-day fallback.
1578        _ => 30,
1579    }
1580}
1581
1582/// v7.10.9 — render a TEXT[] in PG's external array form
1583/// (`{a,b,NULL}`). Elements containing whitespace, commas,
1584/// quotes, or braces get double-quoted with `\\` / `\"` escapes.
1585/// NULL elements use the literal token `NULL`. Public so the
1586/// wire layer can produce the canonical text-mode encoding.
1587pub fn format_text_array(items: &[Option<String>]) -> String {
1588    let mut out = String::with_capacity(2 + items.len() * 8);
1589    out.push('{');
1590    for (i, item) in items.iter().enumerate() {
1591        if i > 0 {
1592            out.push(',');
1593        }
1594        match item {
1595            None => out.push_str("NULL"),
1596            Some(s) => {
1597                let needs_quote = s.is_empty()
1598                    || s.eq_ignore_ascii_case("NULL")
1599                    || s.chars().any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
1600                if needs_quote {
1601                    out.push('"');
1602                    for c in s.chars() {
1603                        if c == '"' || c == '\\' {
1604                            out.push('\\');
1605                        }
1606                        out.push(c);
1607                    }
1608                    out.push('"');
1609                } else {
1610                    out.push_str(s);
1611                }
1612            }
1613        }
1614    }
1615    out.push('}');
1616    out
1617}
1618
1619/// v7.10.4 — render a BYTEA payload in PG's hex output format
1620/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
1621/// can emit the canonical bytea-as-text representation.
1622pub fn format_bytea_hex(b: &[u8]) -> String {
1623    let mut out = String::with_capacity(2 + 2 * b.len());
1624    out.push_str("\\x");
1625    const HEX: &[u8; 16] = b"0123456789abcdef";
1626    for byte in b {
1627        out.push(HEX[(byte >> 4) as usize] as char);
1628        out.push(HEX[(byte & 0x0F) as usize] as char);
1629    }
1630    out
1631}
1632
1633/// Render a `Numeric { scaled, scale }` as its decimal text form.
1634/// Negative `scaled` prepends `-` to the absolute value's digits; the
1635/// integer / fractional split is by character count, padding the
1636/// fractional side with leading zeros to exactly `scale` chars.
1637pub fn format_numeric(scaled: i128, scale: u8) -> String {
1638    if scale == 0 {
1639        return format!("{scaled}");
1640    }
1641    let negative = scaled < 0;
1642    let mag_str = scaled.unsigned_abs().to_string();
1643    let mag_bytes = mag_str.as_bytes();
1644    let scale_u = scale as usize;
1645    let mut out = String::with_capacity(mag_str.len() + 3);
1646    if negative {
1647        out.push('-');
1648    }
1649    if mag_bytes.len() <= scale_u {
1650        out.push('0');
1651        out.push('.');
1652        for _ in mag_bytes.len()..scale_u {
1653            out.push('0');
1654        }
1655        out.push_str(&mag_str);
1656    } else {
1657        let split = mag_bytes.len() - scale_u;
1658        out.push_str(&mag_str[..split]);
1659        out.push('.');
1660        out.push_str(&mag_str[split..]);
1661    }
1662    out
1663}
1664
1665fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
1666    match v {
1667        Value::Int(n) => Ok(Value::Int(n)),
1668        Value::BigInt(n) => i32::try_from(n)
1669            .map(Value::Int)
1670            .map_err(|_| EvalError::TypeMismatch {
1671                detail: format!("bigint {n} does not fit in int"),
1672            }),
1673        #[allow(clippy::cast_possible_truncation)]
1674        Value::Float(x) => Ok(Value::Int(x as i32)),
1675        Value::Text(s) => {
1676            s.trim()
1677                .parse::<i32>()
1678                .map(Value::Int)
1679                .map_err(|_| EvalError::TypeMismatch {
1680                    detail: format!("cannot parse {s:?} as int"),
1681                })
1682        }
1683        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
1684        other => Err(EvalError::TypeMismatch {
1685            detail: format!("cannot cast {:?} to int", other.data_type()),
1686        }),
1687    }
1688}
1689
1690fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
1691    match v {
1692        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
1693        Value::BigInt(n) => Ok(Value::BigInt(n)),
1694        #[allow(clippy::cast_possible_truncation)]
1695        Value::Float(x) => Ok(Value::BigInt(x as i64)),
1696        Value::Text(s) => {
1697            s.trim()
1698                .parse::<i64>()
1699                .map(Value::BigInt)
1700                .map_err(|_| EvalError::TypeMismatch {
1701                    detail: format!("cannot parse {s:?} as bigint"),
1702                })
1703        }
1704        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
1705        other => Err(EvalError::TypeMismatch {
1706            detail: format!("cannot cast {:?} to bigint", other.data_type()),
1707        }),
1708    }
1709}
1710
1711fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
1712    match v {
1713        Value::Int(n) => Ok(Value::Float(f64::from(n))),
1714        #[allow(clippy::cast_precision_loss)]
1715        Value::BigInt(n) => Ok(Value::Float(n as f64)),
1716        Value::Float(x) => Ok(Value::Float(x)),
1717        Value::Text(s) => {
1718            s.trim()
1719                .parse::<f64>()
1720                .map(Value::Float)
1721                .map_err(|_| EvalError::TypeMismatch {
1722                    detail: format!("cannot parse {s:?} as float"),
1723                })
1724        }
1725        other => Err(EvalError::TypeMismatch {
1726            detail: format!("cannot cast {:?} to float", other.data_type()),
1727        }),
1728    }
1729}
1730
1731fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
1732    match v {
1733        Value::Bool(b) => Ok(Value::Bool(b)),
1734        Value::Int(n) => Ok(Value::Bool(n != 0)),
1735        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
1736        Value::Text(s) => {
1737            let lo = s.trim().to_ascii_lowercase();
1738            match lo.as_str() {
1739                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
1740                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
1741                _ => Err(EvalError::TypeMismatch {
1742                    detail: format!("cannot parse {s:?} as bool"),
1743                }),
1744            }
1745        }
1746        other => Err(EvalError::TypeMismatch {
1747            detail: format!("cannot cast {:?} to bool", other.data_type()),
1748        }),
1749    }
1750}
1751
1752/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
1753/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
1754pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
1755    match v {
1756        Value::Null => Ok(Value::Null),
1757        Value::Vector(v) => Ok(Value::Vector(v)),
1758        Value::Text(s) => parse_vector_text(&s)
1759            .map(Value::Vector)
1760            .ok_or(EvalError::TypeMismatch {
1761                detail: format!("cannot parse {s:?} as a vector literal"),
1762            }),
1763        other => Err(EvalError::TypeMismatch {
1764            detail: format!("::vector requires text input, got {:?}", other.data_type()),
1765        }),
1766    }
1767}
1768
1769/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
1770fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
1771    let trimmed = s.trim();
1772    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
1773    let trimmed_inner = inner.trim();
1774    if trimmed_inner.is_empty() {
1775        return Some(Vec::new());
1776    }
1777    let mut out = Vec::new();
1778    for part in trimmed_inner.split(',') {
1779        let f: f32 = part.trim().parse().ok()?;
1780        out.push(f);
1781    }
1782    Some(out)
1783}
1784
1785fn literal_to_value(l: &Literal) -> Value {
1786    match l {
1787        Literal::Integer(n) => {
1788            if let Ok(small) = i32::try_from(*n) {
1789                Value::Int(small)
1790            } else {
1791                Value::BigInt(*n)
1792            }
1793        }
1794        Literal::Float(x) => Value::Float(*x),
1795        Literal::String(s) => Value::Text(s.clone()),
1796        Literal::Vector(v) => Value::Vector(v.clone()),
1797        Literal::Bool(b) => Value::Bool(*b),
1798        Literal::Null => Value::Null,
1799        Literal::Interval { months, micros, .. } => Value::Interval {
1800            months: *months,
1801            micros: *micros,
1802        },
1803    }
1804}
1805
1806fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1807    if let Some(q) = &c.qualifier {
1808        // Multi-table evaluation (joins): the synthesised schema uses
1809        // composite column names "alias.column" so we look that up
1810        // directly. Falls back to the single-table case below if the
1811        // composite isn't present.
1812        let composite = alloc::format!("{q}.{name}", name = c.name);
1813        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
1814            return Ok(row.values[pos].clone());
1815        }
1816        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
1817            qualifier: q.clone(),
1818        })?;
1819        if q != expected {
1820            return Err(EvalError::UnknownQualifier {
1821                qualifier: q.clone(),
1822            });
1823        }
1824    }
1825    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
1826        return Ok(row.values[pos].clone());
1827    }
1828    // Bare-name fallback for joined schemas: match any single composite
1829    // column ending in ".<name>"; ambiguity is an error.
1830    let suffix = alloc::format!(".{name}", name = c.name);
1831    let mut matches = ctx
1832        .columns
1833        .iter()
1834        .enumerate()
1835        .filter(|(_, s)| s.name.ends_with(&suffix));
1836    let first = matches.next();
1837    let extra = matches.next();
1838    match (first, extra) {
1839        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
1840        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
1841            detail: alloc::format!("ambiguous column reference: {}", c.name),
1842        }),
1843        _ => Err(EvalError::ColumnNotFound {
1844            name: c.name.clone(),
1845        }),
1846    }
1847}
1848
1849fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
1850    match (op, v) {
1851        (_, Value::Null) => Ok(Value::Null),
1852        (UnOp::Neg, Value::Int(n)) => {
1853            n.checked_neg()
1854                .map(Value::Int)
1855                .ok_or(EvalError::TypeMismatch {
1856                    detail: "integer overflow on unary -".into(),
1857                })
1858        }
1859        (UnOp::Neg, Value::BigInt(n)) => {
1860            n.checked_neg()
1861                .map(Value::BigInt)
1862                .ok_or(EvalError::TypeMismatch {
1863                    detail: "bigint overflow on unary -".into(),
1864                })
1865        }
1866        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
1867        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
1868            detail: format!("unary - applied to {:?}", other.data_type()),
1869        }),
1870        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
1871        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
1872            detail: format!("NOT applied to {:?}", other.data_type()),
1873        }),
1874    }
1875}
1876
1877/// v7.9.27b — true when two values are "not distinct" per PG:
1878/// both NULL counts as equal; otherwise reduces to regular Eq.
1879fn values_not_distinct(l: &Value, r: &Value) -> bool {
1880    match (l, r) {
1881        (Value::Null, Value::Null) => true,
1882        (Value::Null, _) | (_, Value::Null) => false,
1883        _ => l == r,
1884    }
1885}
1886
1887fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1888    // SQL three-valued logic for AND / OR with NULL is special — handle before
1889    // the general NULL-propagation rule.
1890    if let BinOp::And = op {
1891        return and_3vl(l, r);
1892    }
1893    if let BinOp::Or = op {
1894        return or_3vl(l, r);
1895    }
1896    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
1897    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
1898    if let BinOp::IsNotDistinctFrom = op {
1899        return Ok(Value::Bool(values_not_distinct(&l, &r)));
1900    }
1901    if let BinOp::IsDistinctFrom = op {
1902        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
1903    }
1904    // Everything else: any NULL operand → NULL.
1905    if l.is_null() || r.is_null() {
1906        return Ok(Value::Null);
1907    }
1908    // NUMERIC arithmetic and comparisons run in fixed-point; promote
1909    // integers to a common NUMERIC scale and stay in i128 throughout.
1910    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
1911        return apply_binary_numeric(op, l, r);
1912    }
1913    // Date / Timestamp arithmetic. PG semantics:
1914    //   * date + int      → date  (int is days)
1915    //   * int + date      → date
1916    //   * date - int      → date
1917    //   * date - date     → int   (days, signed)
1918    //   * timestamp - timestamp → bigint (microseconds, signed)
1919    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
1920    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
1921        return Ok(result);
1922    }
1923    match op {
1924        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
1925        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
1926        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
1927        BinOp::Div => div_op(l, r),
1928        BinOp::L2Distance => l2_distance(l, r),
1929        BinOp::InnerProduct => inner_product(l, r),
1930        BinOp::CosineDistance => cosine_distance(l, r),
1931        BinOp::Concat => Ok(text_concat(&l, &r)),
1932        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
1933        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
1934        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
1935        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
1936        BinOp::JsonContains => crate::json::contains(&l, &r),
1937        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1938            compare(op, &l, &r)
1939        }
1940        BinOp::And
1941        | BinOp::Or
1942        | BinOp::IsDistinctFrom
1943        | BinOp::IsNotDistinctFrom => unreachable!("handled above"),
1944    }
1945}
1946
1947/// Calendar arithmetic. Returns `Some(value)` when the operand pair
1948/// is a date/time combo this function understands, `None` to let the
1949/// caller fall through to the regular numeric / text paths.
1950fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
1951    let int_value = |v: &Value| -> Option<i64> {
1952        match v {
1953            Value::SmallInt(n) => Some(i64::from(*n)),
1954            Value::Int(n) => Some(i64::from(*n)),
1955            Value::BigInt(n) => Some(*n),
1956            _ => None,
1957        }
1958    };
1959    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
1960    // DATE-integer subtraction, otherwise the latter swallows the
1961    // former with an `int_value(Date) = None` no-op fall-through.
1962    match (l, r) {
1963        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
1964            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
1965        }
1966        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
1967            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
1968                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
1969            })?;
1970            return Ok(Some(Value::BigInt(delta)));
1971        }
1972        _ => {}
1973    }
1974    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
1975    // date ± interval → date (if interval is pure days/months with no
1976    // sub-day component) else timestamp, interval ± interval → interval.
1977    if let Some(out) = apply_binary_interval(op, l, r)? {
1978        return Ok(Some(out));
1979    }
1980    match (l, r) {
1981        (Value::Date(d), other) if op == BinOp::Add => {
1982            if let Some(n) = int_value(other) {
1983                let days = i64::from(*d).saturating_add(n);
1984                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1985                    detail: "DATE + integer overflows DATE range".into(),
1986                })?;
1987                return Ok(Some(Value::Date(days32)));
1988            }
1989        }
1990        (other, Value::Date(d)) if op == BinOp::Add => {
1991            if let Some(n) = int_value(other) {
1992                let days = i64::from(*d).saturating_add(n);
1993                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1994                    detail: "integer + DATE overflows DATE range".into(),
1995                })?;
1996                return Ok(Some(Value::Date(days32)));
1997            }
1998        }
1999        (Value::Date(d), other) if op == BinOp::Sub => {
2000            if let Some(n) = int_value(other) {
2001                let days = i64::from(*d).saturating_sub(n);
2002                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2003                    detail: "DATE - integer overflows DATE range".into(),
2004                })?;
2005                return Ok(Some(Value::Date(days32)));
2006            }
2007        }
2008        _ => {}
2009    }
2010    Ok(None)
2011}
2012
2013/// INTERVAL-aware binary ops. Recognises:
2014///   timestamp ± interval → timestamp
2015///   date ± interval      → date (if interval is integral days/months only)
2016///                       → timestamp (if interval has sub-day micros)
2017///   interval ± interval  → interval
2018/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
2019/// the caller can fall through.
2020fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
2021    // Normalise so the interval (if any) is always on the right for Add;
2022    // Sub stays left-handed because it isn't commutative.
2023    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
2024        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
2025        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
2026        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
2027        _ => return Ok(None),
2028    };
2029    let Value::Interval {
2030        months: rhs_months,
2031        micros: rhs_us,
2032    } = rhs
2033    else {
2034        unreachable!("rhs guaranteed to be Interval by the match above");
2035    };
2036    let signed_months = i64::from(*rhs_months) * sign;
2037    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
2038        detail: "INTERVAL micros overflows on negation".into(),
2039    })?;
2040    match lhs {
2041        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
2042            *t,
2043            signed_months,
2044            signed_micros,
2045        )?))),
2046        Value::Date(d) => {
2047            // Date + interval stays a date when the interval has zero
2048            // sub-day microseconds; otherwise promote to TIMESTAMP at
2049            // midnight of the (months-shifted) date first.
2050            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
2051            if day_aligned {
2052                let micros_per_day = 86_400_000_000_i64;
2053                let days_delta = signed_micros / micros_per_day;
2054                let shifted = shift_date_by_months(*d, signed_months)?;
2055                let new_days =
2056                    i64::from(shifted)
2057                        .checked_add(days_delta)
2058                        .ok_or(EvalError::TypeMismatch {
2059                            detail: "DATE ± INTERVAL overflows DATE range".into(),
2060                        })?;
2061                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
2062                    detail: "DATE ± INTERVAL overflows DATE range".into(),
2063                })?;
2064                Ok(Some(Value::Date(days32)))
2065            } else {
2066                let base =
2067                    i64::from(*d)
2068                        .checked_mul(86_400_000_000)
2069                        .ok_or(EvalError::TypeMismatch {
2070                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
2071                        })?;
2072                Ok(Some(Value::Timestamp(add_interval_to_micros(
2073                    base,
2074                    signed_months,
2075                    signed_micros,
2076                )?)))
2077            }
2078        }
2079        Value::Interval {
2080            months: lhs_months,
2081            micros: lhs_us,
2082        } => {
2083            let new_months = i64::from(*lhs_months)
2084                .checked_add(signed_months)
2085                .and_then(|n| i32::try_from(n).ok())
2086                .ok_or(EvalError::TypeMismatch {
2087                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
2088                })?;
2089            let new_micros = lhs_us
2090                .checked_add(signed_micros)
2091                .ok_or(EvalError::TypeMismatch {
2092                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
2093                })?;
2094            Ok(Some(Value::Interval {
2095                months: new_months,
2096                micros: new_micros,
2097            }))
2098        }
2099        _ => Err(EvalError::TypeMismatch {
2100            detail: format!(
2101                "operator {op:?} not defined for {:?} and INTERVAL",
2102                lhs.data_type()
2103            ),
2104        }),
2105    }
2106}
2107
2108/// Shift a `Date` by a signed number of months using the PG clamp rule.
2109fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
2110    let (y, m, day) = civil_from_days(d);
2111    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
2112        detail: "INTERVAL months delta out of i32 range".into(),
2113    })?;
2114    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
2115    Ok(days_from_civil(ny, nm, nd))
2116}
2117
2118/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
2119/// Months part is applied through civil calendar with clamp-to-last-day;
2120/// micros part is plain i64 addition with overflow guard.
2121fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
2122    let mut out = t;
2123    if months != 0 {
2124        const MICROS_PER_DAY: i64 = 86_400_000_000;
2125        let days = out.div_euclid(MICROS_PER_DAY);
2126        let day_micros = out.rem_euclid(MICROS_PER_DAY);
2127        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
2128            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
2129        })?;
2130        let shifted_days = shift_date_by_months(day_i32, months)?;
2131        out = i64::from(shifted_days)
2132            .checked_mul(MICROS_PER_DAY)
2133            .and_then(|n| n.checked_add(day_micros))
2134            .ok_or(EvalError::TypeMismatch {
2135                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
2136            })?;
2137    }
2138    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
2139        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
2140    })
2141}
2142
2143/// Dispatch for any binary op when at least one operand is NUMERIC.
2144/// Other-side integers / floats are promoted to a NUMERIC at a common
2145/// scale; all add / sub / mul / div / compare paths stay in i128.
2146#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
2147fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
2148    // Float still wins — Numeric + Float coerces both to f64 and runs
2149    // through the float path. PG demotes Numeric to float in this mix
2150    // too (the documented behaviour for `numeric + double precision`).
2151    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
2152    if float_path {
2153        let af = as_f64(&l)?;
2154        let bf = as_f64(&r)?;
2155        return match op {
2156            BinOp::Add => Ok(Value::Float(af + bf)),
2157            BinOp::Sub => Ok(Value::Float(af - bf)),
2158            BinOp::Mul => Ok(Value::Float(af * bf)),
2159            BinOp::Div => {
2160                if bf == 0.0 {
2161                    Err(EvalError::DivisionByZero)
2162                } else {
2163                    Ok(Value::Float(af / bf))
2164                }
2165            }
2166            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2167                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2168                    detail: "NaN in NUMERIC/Float comparison".into(),
2169                })?;
2170                Ok(Value::Bool(cmp_to_bool(op, ord)))
2171            }
2172            BinOp::Concat => Ok(text_concat(&l, &r)),
2173            other => Err(EvalError::TypeMismatch {
2174                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
2175            }),
2176        };
2177    }
2178    // Promote integer ↔ numeric to a shared scale (max of both sides).
2179    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
2180        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
2181    })?;
2182    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
2183        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
2184    })?;
2185    match op {
2186        BinOp::Add | BinOp::Sub => {
2187            let target_scale = sa.max(sb);
2188            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2189                detail: "NUMERIC overflow on rescale".into(),
2190            })?;
2191            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2192                detail: "NUMERIC overflow on rescale".into(),
2193            })?;
2194            let r = match op {
2195                BinOp::Add => lhs.checked_add(rhs),
2196                BinOp::Sub => lhs.checked_sub(rhs),
2197                _ => unreachable!(),
2198            }
2199            .ok_or(EvalError::TypeMismatch {
2200                detail: "NUMERIC overflow on +/-".into(),
2201            })?;
2202            Ok(Value::Numeric {
2203                scaled: r,
2204                scale: target_scale,
2205            })
2206        }
2207        BinOp::Mul => {
2208            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
2209                detail: "NUMERIC overflow on *".into(),
2210            })?;
2211            Ok(Value::Numeric {
2212                scaled,
2213                scale: sa.saturating_add(sb),
2214            })
2215        }
2216        BinOp::Div => {
2217            if b == 0 {
2218                return Err(EvalError::DivisionByZero);
2219            }
2220            // Result scale: keep the wider operand's scale. Pre-scale
2221            // the numerator so the integer division retains that many
2222            // fractional digits. Round half-away-from-zero.
2223            let target_scale = sa.max(sb);
2224            // Numerator effective scale becomes sa + target_scale; we
2225            // bring it up to (target_scale + sb) so the divisor's scale
2226            // cancels cleanly.
2227            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
2228            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
2229                detail: "NUMERIC overflow on / scaling".into(),
2230            })?;
2231            let half = if b >= 0 { b / 2 } else { -(b / 2) };
2232            let adj = if (num >= 0) == (b >= 0) {
2233                num + half
2234            } else {
2235                num - half
2236            };
2237            Ok(Value::Numeric {
2238                scaled: adj / b,
2239                scale: target_scale,
2240            })
2241        }
2242        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
2243            let target_scale = sa.max(sb);
2244            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
2245                detail: "NUMERIC overflow on rescale".into(),
2246            })?;
2247            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
2248                detail: "NUMERIC overflow on rescale".into(),
2249            })?;
2250            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
2251        }
2252        BinOp::Concat => Ok(text_concat(&l, &r)),
2253        other => Err(EvalError::TypeMismatch {
2254            detail: format!("operator {other:?} not defined for NUMERIC"),
2255        }),
2256    }
2257}
2258
2259/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
2260/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
2261/// returns `None` and the caller raises a type error.
2262fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
2263    match v {
2264        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
2265        Value::Int(n) => Some((i128::from(*n), 0)),
2266        Value::SmallInt(n) => Some((i128::from(*n), 0)),
2267        Value::BigInt(n) => Some((i128::from(*n), 0)),
2268        _ => None,
2269    }
2270}
2271
2272fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
2273    if src == dst {
2274        return Some(scaled);
2275    }
2276    if dst > src {
2277        scaled.checked_mul(pow10_i128(dst - src))
2278    } else {
2279        let drop = pow10_i128(src - dst);
2280        let half = drop / 2;
2281        let r = if scaled >= 0 {
2282            scaled + half
2283        } else {
2284            scaled - half
2285        };
2286        Some(r / drop)
2287    }
2288}
2289
2290const fn pow10_i128(p: u8) -> i128 {
2291    let mut acc: i128 = 1;
2292    let mut i = 0;
2293    while i < p {
2294        acc *= 10;
2295        i += 1;
2296    }
2297    acc
2298}
2299
2300const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
2301    use core::cmp::Ordering::{Equal, Greater, Less};
2302    match op {
2303        BinOp::Eq => matches!(ord, Equal),
2304        BinOp::NotEq => !matches!(ord, Equal),
2305        BinOp::Lt => matches!(ord, Less),
2306        BinOp::LtEq => matches!(ord, Less | Equal),
2307        BinOp::Gt => matches!(ord, Greater),
2308        BinOp::GtEq => matches!(ord, Greater | Equal),
2309        _ => false,
2310    }
2311}
2312
2313/// SQL `||` string concatenation. Operands are coerced to text via the same
2314/// rule as `::text` cast. NULL propagates (handled above; this function only
2315/// runs with non-NULL operands).
2316fn text_concat(l: &Value, r: &Value) -> Value {
2317    let a = value_to_text(l);
2318    let b = value_to_text(r);
2319    Value::Text(a + &b)
2320}
2321
2322/// pgvector inner-product `<#>`. Returns the *negative* dot product so
2323/// smaller still means more similar — same convention as pgvector.
2324fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
2325    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
2326    let mut dot: f64 = 0.0;
2327    for (x, y) in a.iter().zip(b.iter()) {
2328        dot += f64::from(*x) * f64::from(*y);
2329    }
2330    Ok(Value::Float(-dot))
2331}
2332
2333/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
2334/// operand produces NaN (matches pgvector).
2335fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2336    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
2337    let mut dot: f64 = 0.0;
2338    let mut na: f64 = 0.0;
2339    let mut nb: f64 = 0.0;
2340    for (x, y) in a.iter().zip(b.iter()) {
2341        let xf = f64::from(*x);
2342        let yf = f64::from(*y);
2343        dot += xf * yf;
2344        na += xf * xf;
2345        nb += yf * yf;
2346    }
2347    let denom = sqrt_newton(na) * sqrt_newton(nb);
2348    if denom == 0.0 {
2349        return Ok(Value::Float(f64::NAN));
2350    }
2351    Ok(Value::Float(1.0 - dot / denom))
2352}
2353
2354fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
2355    // v6.0.1: SQ8 cells coming through the SQL evaluator are
2356    // dequantised to f32 here so the existing scalar distance
2357    // arithmetic stays intact. HNSW kNN search continues to use
2358    // the asymmetric ADC variant inside `cell_to_query_metric_
2359    // distance` — this path only runs when a vector expression
2360    // lands in the evaluator (full-scan ORDER BY, SELECT
2361    // projection of `v <-> $1`, etc.).
2362    let to_f32 = |v: Value| -> Option<Vec<f32>> {
2363        match v {
2364            Value::Vector(a) => Some(a),
2365            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
2366            // v6.0.3: bit-exact dequant for halfvec cells.
2367            Value::HalfVector(h) => Some(h.to_f32_vec()),
2368            _ => None,
2369        }
2370    };
2371    let l_ty = l.data_type();
2372    let r_ty = r.data_type();
2373    match (to_f32(l), to_f32(r)) {
2374        (Some(a), Some(b)) => {
2375            if a.len() != b.len() {
2376                return Err(EvalError::TypeMismatch {
2377                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
2378                });
2379            }
2380            Ok((a, b))
2381        }
2382        _ => Err(EvalError::TypeMismatch {
2383            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
2384        }),
2385    }
2386}
2387
2388/// Numeric arithmetic with widening.
2389/// - both `Int` → `Int` (with overflow check)
2390/// - `Int` op `BigInt` (either side) → `BigInt`
2391/// - any `Float` involved → `Float`
2392fn arith(
2393    l: Value,
2394    r: Value,
2395    int_op: impl Fn(i64, i64) -> Option<i64>,
2396    float_op: impl Fn(f64, f64) -> f64,
2397    op_name: &str,
2398) -> Result<Value, EvalError> {
2399    // Widen SmallInt to Int up front so the rest of the arithmetic
2400    // table only deals with Int / BigInt / Float pairs.
2401    let widen = |v: Value| -> Value {
2402        match v {
2403            Value::SmallInt(n) => Value::Int(i32::from(n)),
2404            other => other,
2405        }
2406    };
2407    let l = widen(l);
2408    let r = widen(r);
2409    match (l, r) {
2410        (Value::Int(a), Value::Int(b)) => {
2411            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
2412                detail: format!("integer overflow on {op_name}"),
2413            })?;
2414            if let Ok(small) = i32::try_from(result) {
2415                Ok(Value::Int(small))
2416            } else {
2417                Ok(Value::BigInt(result))
2418            }
2419        }
2420        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
2421            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
2422                detail: format!("bigint overflow on {op_name}"),
2423            })?;
2424            Ok(Value::BigInt(result))
2425        }
2426        (Value::BigInt(a), Value::BigInt(b)) => {
2427            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
2428                detail: format!("bigint overflow on {op_name}"),
2429            })?;
2430            Ok(Value::BigInt(result))
2431        }
2432        (a, b)
2433            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
2434        {
2435            let af = as_f64(&a)?;
2436            let bf = as_f64(&b)?;
2437            Ok(Value::Float(float_op(af, bf)))
2438        }
2439        (a, b) => Err(EvalError::TypeMismatch {
2440            detail: format!(
2441                "{op_name} applied to non-numeric: {:?} vs {:?}",
2442                a.data_type(),
2443                b.data_type()
2444            ),
2445        }),
2446    }
2447}
2448
2449/// L2 (Euclidean) distance between two vectors of equal dimension.
2450/// Returned as `Value::Float(d)` so it composes with the existing
2451/// comparison / sort plumbing. Mismatched dims or non-vector operands
2452/// raise `TypeMismatch`.
2453#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
2454fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2455    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
2456    // cells dequantise on the way in. Sub-f64 precision loss is
2457    // negligible vs the dequantisation noise the SQ8 path already
2458    // ships with.
2459    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
2460    let mut sum: f64 = 0.0;
2461    for (x, y) in a.iter().zip(b.iter()) {
2462        let d = f64::from(*x) - f64::from(*y);
2463        sum += d * d;
2464    }
2465    Ok(Value::Float(sqrt_newton(sum)))
2466}
2467
2468/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
2469/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
2470/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
2471/// f32-derived distances, always non-negative, never NaN).
2472fn sqrt_newton(x: f64) -> f64 {
2473    if x <= 0.0 {
2474        return 0.0;
2475    }
2476    let mut g = x;
2477    // 10 iterations is conservative; 6 already converges to ulp for typical
2478    // distances.
2479    for _ in 0..10 {
2480        g = 0.5 * (g + x / g);
2481    }
2482    g
2483}
2484
2485fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
2486    let any_float = matches!(l.data_type(), Some(DataType::Float))
2487        || matches!(r.data_type(), Some(DataType::Float));
2488    if any_float {
2489        let a = as_f64(&l)?;
2490        let b = as_f64(&r)?;
2491        if b == 0.0 {
2492            return Err(EvalError::DivisionByZero);
2493        }
2494        return Ok(Value::Float(a / b));
2495    }
2496    arith(
2497        l,
2498        r,
2499        |a, b| {
2500            if b == 0 { None } else { Some(a / b) }
2501        },
2502        |a, b| a / b,
2503        "/",
2504    )
2505    .map_err(|e| match e {
2506        // The closure returns None on b == 0; translate that into the dedicated
2507        // DivisionByZero variant instead of "integer overflow on /".
2508        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
2509        other => other,
2510    })
2511}
2512
2513fn as_f64(v: &Value) -> Result<f64, EvalError> {
2514    match v {
2515        Value::SmallInt(n) => Ok(f64::from(*n)),
2516        Value::Int(n) => Ok(f64::from(*n)),
2517        #[allow(clippy::cast_precision_loss)]
2518        Value::BigInt(n) => Ok(*n as f64),
2519        Value::Float(x) => Ok(*x),
2520        #[allow(clippy::cast_precision_loss)]
2521        Value::Numeric { scaled, scale } => {
2522            let mut div = 1.0_f64;
2523            for _ in 0..*scale {
2524                div *= 10.0;
2525            }
2526            Ok((*scaled as f64) / div)
2527        }
2528        other => Err(EvalError::TypeMismatch {
2529            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
2530        }),
2531    }
2532}
2533
2534fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
2535    let ord = match (l, r) {
2536        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
2537        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
2538        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
2539        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
2540        (a, b)
2541            if matches!(a.data_type(), Some(DataType::Float))
2542                || matches!(b.data_type(), Some(DataType::Float)) =>
2543        {
2544            let af = as_f64(a)?;
2545            let bf = as_f64(b)?;
2546            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2547                detail: "NaN in comparison".into(),
2548            })?
2549        }
2550        (Value::Text(a), Value::Text(b)) => a.cmp(b),
2551        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
2552        // Date / Timestamp compare on their integer storage repr.
2553        // Cross-domain (Date vs Timestamp) lifts the Date to the
2554        // matching midnight TIMESTAMP first.
2555        (Value::Date(a), Value::Date(b)) => a.cmp(b),
2556        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
2557        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
2558        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
2559        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
2560        // column against a text literal lifts the literal into the
2561        // matching domain (e.g. `day >= '2024-01-01'`).
2562        (Value::Date(a), Value::Text(b)) => {
2563            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2564                detail: format!("cannot parse {b:?} as DATE for comparison"),
2565            })?;
2566            a.cmp(&bd)
2567        }
2568        (Value::Text(a), Value::Date(b)) => {
2569            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2570                detail: format!("cannot parse {a:?} as DATE for comparison"),
2571            })?;
2572            ad.cmp(b)
2573        }
2574        (Value::Timestamp(a), Value::Text(b)) => {
2575            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2576                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
2577            })?;
2578            a.cmp(&bt)
2579        }
2580        (Value::Text(a), Value::Timestamp(b)) => {
2581            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2582                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
2583            })?;
2584            at.cmp(b)
2585        }
2586        (a, b) => {
2587            return Err(EvalError::TypeMismatch {
2588                detail: format!(
2589                    "comparison between {:?} and {:?}",
2590                    a.data_type(),
2591                    b.data_type()
2592                ),
2593            });
2594        }
2595    };
2596    let result = match op {
2597        BinOp::Eq => ord.is_eq(),
2598        BinOp::NotEq => !ord.is_eq(),
2599        BinOp::Lt => ord.is_lt(),
2600        BinOp::LtEq => ord.is_le(),
2601        BinOp::Gt => ord.is_gt(),
2602        BinOp::GtEq => ord.is_ge(),
2603        BinOp::And
2604        | BinOp::Or
2605        | BinOp::Add
2606        | BinOp::Sub
2607        | BinOp::Mul
2608        | BinOp::Div
2609        | BinOp::L2Distance
2610        | BinOp::InnerProduct
2611        | BinOp::CosineDistance
2612        | BinOp::Concat
2613        | BinOp::JsonGet
2614        | BinOp::JsonGetText
2615        | BinOp::JsonGetPath
2616        | BinOp::JsonGetPathText
2617        | BinOp::JsonContains
2618        | BinOp::IsDistinctFrom
2619        | BinOp::IsNotDistinctFrom => {
2620            unreachable!("compare() only called with comparison ops")
2621        }
2622    };
2623    Ok(Value::Bool(result))
2624}
2625
2626// SQL three-valued AND / OR.
2627fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2628    match (l, r) {
2629        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
2630        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
2631        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2632        (a, b) => Err(EvalError::TypeMismatch {
2633            detail: format!(
2634                "AND on non-boolean: {:?} and {:?}",
2635                a.data_type(),
2636                b.data_type()
2637            ),
2638        }),
2639    }
2640}
2641
2642fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2643    match (l, r) {
2644        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
2645        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
2646        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2647        (a, b) => Err(EvalError::TypeMismatch {
2648            detail: format!(
2649                "OR on non-boolean: {:?} and {:?}",
2650                a.data_type(),
2651                b.data_type()
2652            ),
2653        }),
2654    }
2655}
2656
2657#[cfg(test)]
2658mod tests {
2659    use super::*;
2660    use alloc::vec;
2661    use spg_storage::{ColumnSchema, Row};
2662
2663    fn col(name: &str, ty: DataType) -> ColumnSchema {
2664        ColumnSchema::new(name, ty, true)
2665    }
2666
2667    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
2668        EvalContext::new(cols, alias)
2669    }
2670
2671    fn lit(n: i64) -> Expr {
2672        Expr::Literal(Literal::Integer(n))
2673    }
2674
2675    fn null() -> Expr {
2676        Expr::Literal(Literal::Null)
2677    }
2678
2679    fn col_ref(name: &str) -> Expr {
2680        Expr::Column(ColumnName {
2681            qualifier: None,
2682            name: name.into(),
2683        })
2684    }
2685
2686    #[test]
2687    fn literal_evaluates_to_value() {
2688        let r = Row::new(vec![]);
2689        let cs: [ColumnSchema; 0] = [];
2690        let c = ctx(&cs, None);
2691        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
2692        assert_eq!(
2693            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
2694            Value::Float(1.5)
2695        );
2696        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
2697    }
2698
2699    #[test]
2700    fn column_lookup_unqualified() {
2701        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
2702        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
2703        let c = ctx(&cs, None);
2704        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
2705        assert_eq!(
2706            eval_expr(&col_ref("b"), &r, &c).unwrap(),
2707            Value::Text("hi".into())
2708        );
2709    }
2710
2711    #[test]
2712    fn column_not_found_errors() {
2713        let cs = vec![col("a", DataType::Int)];
2714        let r = Row::new(vec![Value::Int(0)]);
2715        let c = ctx(&cs, None);
2716        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
2717        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
2718    }
2719
2720    #[test]
2721    fn qualified_column_matches_alias() {
2722        let cs = vec![col("a", DataType::Int)];
2723        let r = Row::new(vec![Value::Int(5)]);
2724        let c = ctx(&cs, Some("u"));
2725        let qualified = Expr::Column(ColumnName {
2726            qualifier: Some("u".into()),
2727            name: "a".into(),
2728        });
2729        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
2730    }
2731
2732    #[test]
2733    fn qualified_column_unknown_alias_errors() {
2734        let cs = vec![col("a", DataType::Int)];
2735        let r = Row::new(vec![Value::Int(5)]);
2736        let c = ctx(&cs, Some("u"));
2737        let wrong = Expr::Column(ColumnName {
2738            qualifier: Some("x".into()),
2739            name: "a".into(),
2740        });
2741        assert!(matches!(
2742            eval_expr(&wrong, &r, &c).unwrap_err(),
2743            EvalError::UnknownQualifier { .. }
2744        ));
2745    }
2746
2747    #[test]
2748    fn arithmetic_with_widening() {
2749        let r = Row::new(vec![]);
2750        let cs: [ColumnSchema; 0] = [];
2751        let c = ctx(&cs, None);
2752        let e = Expr::Binary {
2753            lhs: alloc::boxed::Box::new(lit(2)),
2754            op: BinOp::Add,
2755            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
2756        };
2757        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
2758    }
2759
2760    #[test]
2761    fn division_by_zero_errors() {
2762        let r = Row::new(vec![]);
2763        let cs: [ColumnSchema; 0] = [];
2764        let c = ctx(&cs, None);
2765        let e = Expr::Binary {
2766            lhs: alloc::boxed::Box::new(lit(1)),
2767            op: BinOp::Div,
2768            rhs: alloc::boxed::Box::new(lit(0)),
2769        };
2770        assert_eq!(
2771            eval_expr(&e, &r, &c).unwrap_err(),
2772            EvalError::DivisionByZero
2773        );
2774    }
2775
2776    #[test]
2777    fn comparison_returns_bool() {
2778        let r = Row::new(vec![]);
2779        let cs: [ColumnSchema; 0] = [];
2780        let c = ctx(&cs, None);
2781        let e = Expr::Binary {
2782            lhs: alloc::boxed::Box::new(lit(1)),
2783            op: BinOp::Lt,
2784            rhs: alloc::boxed::Box::new(lit(2)),
2785        };
2786        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2787    }
2788
2789    #[test]
2790    fn null_propagates_through_arithmetic() {
2791        let r = Row::new(vec![]);
2792        let cs: [ColumnSchema; 0] = [];
2793        let c = ctx(&cs, None);
2794        let e = Expr::Binary {
2795            lhs: alloc::boxed::Box::new(lit(1)),
2796            op: BinOp::Add,
2797            rhs: alloc::boxed::Box::new(null()),
2798        };
2799        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2800    }
2801
2802    #[test]
2803    fn and_three_valued_logic() {
2804        let r = Row::new(vec![]);
2805        let cs: [ColumnSchema; 0] = [];
2806        let c = ctx(&cs, None);
2807        let tt = |a: bool, b_null: bool| Expr::Binary {
2808            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2809            op: BinOp::And,
2810            rhs: alloc::boxed::Box::new(if b_null {
2811                null()
2812            } else {
2813                Expr::Literal(Literal::Bool(true))
2814            }),
2815        };
2816        // FALSE AND NULL → FALSE
2817        assert_eq!(
2818            eval_expr(&tt(false, true), &r, &c).unwrap(),
2819            Value::Bool(false)
2820        );
2821        // TRUE AND NULL → NULL
2822        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
2823        // TRUE AND TRUE → TRUE
2824        assert_eq!(
2825            eval_expr(&tt(true, false), &r, &c).unwrap(),
2826            Value::Bool(true)
2827        );
2828    }
2829
2830    #[test]
2831    fn or_three_valued_logic() {
2832        let r = Row::new(vec![]);
2833        let cs: [ColumnSchema; 0] = [];
2834        let c = ctx(&cs, None);
2835        let or_with_null = |a: bool| Expr::Binary {
2836            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2837            op: BinOp::Or,
2838            rhs: alloc::boxed::Box::new(null()),
2839        };
2840        // TRUE OR NULL → TRUE
2841        assert_eq!(
2842            eval_expr(&or_with_null(true), &r, &c).unwrap(),
2843            Value::Bool(true)
2844        );
2845        // FALSE OR NULL → NULL
2846        assert_eq!(
2847            eval_expr(&or_with_null(false), &r, &c).unwrap(),
2848            Value::Null
2849        );
2850    }
2851
2852    #[test]
2853    fn not_on_null_is_null() {
2854        let r = Row::new(vec![]);
2855        let cs: [ColumnSchema; 0] = [];
2856        let c = ctx(&cs, None);
2857        let e = Expr::Unary {
2858            op: UnOp::Not,
2859            expr: alloc::boxed::Box::new(null()),
2860        };
2861        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2862    }
2863
2864    #[test]
2865    fn text_comparison_lexicographic() {
2866        let r = Row::new(vec![]);
2867        let cs: [ColumnSchema; 0] = [];
2868        let c = ctx(&cs, None);
2869        let e = Expr::Binary {
2870            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
2871            op: BinOp::Lt,
2872            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
2873        };
2874        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2875    }
2876
2877    #[test]
2878    fn interval_format_basics() {
2879        assert_eq!(format_interval(0, 0), "0");
2880        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
2881        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
2882        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
2883        assert_eq!(
2884            format_interval(0, 86_400_000_000 + 9_000_000),
2885            "1 day 00:00:09"
2886        );
2887        assert_eq!(format_interval(14, 0), "1 year 2 mons");
2888        assert_eq!(format_interval(-1, 0), "-1 mons");
2889    }
2890
2891    #[test]
2892    fn interval_add_to_timestamp_micros_part() {
2893        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
2894        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
2895        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
2896        let expected = ts + 3_600_000_000;
2897        assert_eq!(r, expected);
2898    }
2899
2900    #[test]
2901    fn interval_clamp_month_end() {
2902        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
2903        let d = days_from_civil(2024, 1, 31);
2904        let shifted = shift_date_by_months(d, 1).unwrap();
2905        let (y, m, day) = civil_from_days(shifted);
2906        assert_eq!((y, m, day), (2024, 2, 29));
2907        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
2908        let d = days_from_civil(2023, 1, 31);
2909        let shifted = shift_date_by_months(d, 1).unwrap();
2910        let (y, m, day) = civil_from_days(shifted);
2911        assert_eq!((y, m, day), (2023, 2, 28));
2912        // 2024-03-31 - 1 month = 2024-02-29.
2913        let d = days_from_civil(2024, 3, 31);
2914        let shifted = shift_date_by_months(d, -1).unwrap();
2915        let (y, m, day) = civil_from_days(shifted);
2916        assert_eq!((y, m, day), (2024, 2, 29));
2917    }
2918
2919    #[test]
2920    fn interval_date_plus_pure_days_stays_date() {
2921        // DATE + INTERVAL '7 days' must stay DATE.
2922        let d = days_from_civil(2024, 6, 1);
2923        let lhs = Value::Date(d);
2924        let rhs = Value::Interval {
2925            months: 0,
2926            micros: 7 * 86_400_000_000,
2927        };
2928        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2929            .unwrap()
2930            .unwrap();
2931        let expected = days_from_civil(2024, 6, 8);
2932        assert_eq!(v, Value::Date(expected));
2933    }
2934
2935    #[test]
2936    fn interval_date_plus_sub_day_lifts_to_timestamp() {
2937        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
2938        let d = days_from_civil(2024, 6, 1);
2939        let lhs = Value::Date(d);
2940        let rhs = Value::Interval {
2941            months: 0,
2942            micros: 3_600_000_000,
2943        };
2944        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2945            .unwrap()
2946            .unwrap();
2947        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
2948        assert_eq!(v, Value::Timestamp(expected));
2949    }
2950}