Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21
22use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
23use spg_storage::{ColumnSchema, DataType, Row, Value};
24
25/// Resolution context for evaluating a single row. `table_alias` is the alias
26/// (or table name) callers should accept as the qualifier on a column ref —
27/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
28#[derive(Debug, Clone)]
29pub struct EvalContext<'a> {
30    pub columns: &'a [ColumnSchema],
31    pub table_alias: Option<&'a str>,
32    /// v6.1.1 — bound parameters for `$N` placeholders inside the
33    /// expression tree. Empty for simple queries; populated by the
34    /// prepared-statement Execute path with Bind values converted
35    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
36    pub params: &'a [Value],
37}
38
39impl<'a> EvalContext<'a> {
40    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
41        Self {
42            columns,
43            table_alias,
44            params: &[],
45        }
46    }
47
48    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
49    /// resolution. The slice must outlive the context; callers
50    /// construct it from the prepared statement's Bind values.
51    #[must_use]
52    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
53        self.params = params;
54        self
55    }
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum EvalError {
60    ColumnNotFound { name: String },
61    UnknownQualifier { qualifier: String },
62    DivisionByZero,
63    TypeMismatch { detail: String },
64    /// v6.1.1 — `$N` reference past the number of bound parameters.
65    /// Either the client sent too few in Bind, or the SQL has a
66    /// placeholder the prepared statement didn't account for.
67    PlaceholderOutOfRange { n: u16, bound: u16 },
68}
69
70impl core::fmt::Display for EvalError {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        match self {
73            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
74            Self::UnknownQualifier { qualifier } => {
75                write!(f, "unknown table qualifier: {qualifier}")
76            }
77            Self::DivisionByZero => f.write_str("division by zero"),
78            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
79            Self::PlaceholderOutOfRange { n, bound } => write!(
80                f,
81                "parameter ${n} referenced but only {bound} bound by client"
82            ),
83        }
84    }
85}
86
87pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
88    match expr {
89        Expr::Literal(l) => Ok(literal_to_value(l)),
90        Expr::Column(c) => resolve_column(c, row, ctx),
91        Expr::Placeholder(n) => {
92            let idx = usize::from(*n).saturating_sub(1);
93            ctx.params
94                .get(idx)
95                .cloned()
96                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
97                    n: *n,
98                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
99                })
100        }
101        Expr::Unary { op, expr } => {
102            let v = eval_expr(expr, row, ctx)?;
103            apply_unary(*op, v)
104        }
105        Expr::Binary { lhs, op, rhs } => {
106            let l = eval_expr(lhs, row, ctx)?;
107            let r = eval_expr(rhs, row, ctx)?;
108            apply_binary(*op, l, r)
109        }
110        Expr::Cast { expr, target } => {
111            let v = eval_expr(expr, row, ctx)?;
112            cast_value(v, *target)
113        }
114        Expr::IsNull { expr, negated } => {
115            let v = eval_expr(expr, row, ctx)?;
116            let is_null = matches!(v, Value::Null);
117            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
118        }
119        Expr::FunctionCall { name, args } => {
120            let evaluated: Result<Vec<Value>, _> =
121                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
122            apply_function(name, &evaluated?)
123        }
124        Expr::Like {
125            expr,
126            pattern,
127            negated,
128        } => {
129            let v = eval_expr(expr, row, ctx)?;
130            let p = eval_expr(pattern, row, ctx)?;
131            // NULL on either side propagates to NULL — same as PG.
132            let (text, pat) = match (v, p) {
133                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
134                (Value::Text(a), Value::Text(b)) => (a, b),
135                (Value::Text(_), other) | (other, _) => {
136                    return Err(EvalError::TypeMismatch {
137                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
138                    });
139                }
140            };
141            let m = like_match(&text, &pat);
142            Ok(Value::Bool(if *negated { !m } else { m }))
143        }
144        Expr::Extract { field, source } => {
145            let v = eval_expr(source, row, ctx)?;
146            extract_field(*field, &v)
147        }
148        // v4.10: subquery nodes should have been resolved into
149        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
150        // before the row loop. Anything reaching here is a bug.
151        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
152            Err(EvalError::TypeMismatch {
153                detail: "subquery reached row eval — engine resolver bug".into(),
154            })
155        }
156        // v4.12: window functions should have been rewritten into
157        // synthetic __win_N column references by
158        // exec_select_with_window before row eval. Anything
159        // reaching here is similarly a bug.
160        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
161            detail: "window function reached row eval — engine rewrite bug".into(),
162        }),
163    }
164}
165
166/// Pull an integer component (year / month / ... / microsecond) out
167/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
168/// when the source isn't a calendar type.
169fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
170    use spg_sql::ast::ExtractField as F;
171    if matches!(v, Value::Null) {
172        return Ok(Value::Null);
173    }
174    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
175    // the months part, the rest from the microseconds part. PG matches
176    // this convention (months is normalised modulo 12 for MONTH).
177    if let Value::Interval { months, micros } = *v {
178        let years = months / 12;
179        let mons = months % 12;
180        let secs_total = micros / 1_000_000;
181        let frac = micros % 1_000_000;
182        let result = match field {
183            F::Year => i64::from(years),
184            F::Month => i64::from(mons),
185            F::Day => micros / 86_400_000_000,
186            F::Hour => (secs_total / 3600) % 24,
187            F::Minute => (secs_total / 60) % 60,
188            F::Second => secs_total % 60,
189            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
190        };
191        return Ok(Value::BigInt(result));
192    }
193    let (days, day_micros) = match *v {
194        Value::Date(d) => (d, 0_i64),
195        Value::Timestamp(t) => {
196            let days = t.div_euclid(86_400_000_000);
197            let day_micros = t.rem_euclid(86_400_000_000);
198            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
199        }
200        _ => {
201            return Err(EvalError::TypeMismatch {
202                detail: format!(
203                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
204                    v.data_type()
205                ),
206            });
207        }
208    };
209    let (y, m, d) = civil_components(days);
210    let secs = day_micros / 1_000_000;
211    let hh = secs / 3600;
212    let mm = (secs / 60) % 60;
213    let ss = secs % 60;
214    let frac = day_micros % 1_000_000;
215    let result = match field {
216        F::Year => i64::from(y),
217        F::Month => i64::from(m),
218        F::Day => i64::from(d),
219        F::Hour => hh,
220        F::Minute => mm,
221        F::Second => ss,
222        F::Microsecond => ss * 1_000_000 + frac,
223    };
224    Ok(Value::BigInt(result))
225}
226
227/// Internal wrapper around the file-private `civil_from_days` so the
228/// public surface area doesn't change. Returns `(year, month, day)`.
229fn civil_components(days: i32) -> (i32, u32, u32) {
230    civil_from_days(days)
231}
232
233/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
234/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
235/// literal `%`. Matches the whole input — no implicit anchoring needed
236/// since SQL `LIKE` is always full-string.
237fn like_match(text: &str, pattern: &str) -> bool {
238    let text: Vec<char> = text.chars().collect();
239    let pat: Vec<char> = pattern.chars().collect();
240    like_match_inner(&text, 0, &pat, 0)
241}
242
243fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
244    while pi < pat.len() {
245        match pat[pi] {
246            '%' => {
247                // Collapse consecutive `%` and try every possible split.
248                while pi < pat.len() && pat[pi] == '%' {
249                    pi += 1;
250                }
251                if pi == pat.len() {
252                    return true;
253                }
254                for k in ti..=text.len() {
255                    if like_match_inner(text, k, pat, pi) {
256                        return true;
257                    }
258                }
259                return false;
260            }
261            '_' => {
262                if ti >= text.len() {
263                    return false;
264                }
265                ti += 1;
266                pi += 1;
267            }
268            '\\' if pi + 1 < pat.len() => {
269                let want = pat[pi + 1];
270                if ti >= text.len() || text[ti] != want {
271                    return false;
272                }
273                ti += 1;
274                pi += 2;
275            }
276            c => {
277                if ti >= text.len() || text[ti] != c {
278                    return false;
279                }
280                ti += 1;
281                pi += 1;
282            }
283        }
284    }
285    ti == text.len()
286}
287
288/// Dispatch on lowercased function name. v1.4 implements only a handful of
289/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
290fn apply_function(name: &str, args: &[Value]) -> Result<Value, EvalError> {
291    match name.to_ascii_lowercase().as_str() {
292        "length" => {
293            if args.len() != 1 {
294                return Err(EvalError::TypeMismatch {
295                    detail: format!("length() takes 1 arg, got {}", args.len()),
296                });
297            }
298            match &args[0] {
299                Value::Null => Ok(Value::Null),
300                Value::Text(s) => {
301                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
302                    Ok(Value::Int(n))
303                }
304                // v7.10.4 — PG semantics: length(bytea) returns
305                // byte count (= octet_length). Without this branch
306                // mailrs's INSERT … SELECT length(body) … against a
307                // BYTEA column would type-mismatch.
308                Value::Bytes(b) => {
309                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
310                    Ok(Value::Int(n))
311                }
312                other => Err(EvalError::TypeMismatch {
313                    detail: format!("length() needs text or bytea, got {:?}", other.data_type()),
314                }),
315            }
316        }
317        // v7.10.4 — `OCTET_LENGTH(x)` returns byte count for both
318        // TEXT (UTF-8 byte length) and BYTEA. PG-spec name; aliases
319        // to length() for bytea by design.
320        "octet_length" => {
321            if args.len() != 1 {
322                return Err(EvalError::TypeMismatch {
323                    detail: format!("octet_length() takes 1 arg, got {}", args.len()),
324                });
325            }
326            match &args[0] {
327                Value::Null => Ok(Value::Null),
328                Value::Text(s) => {
329                    let n = i32::try_from(s.len()).unwrap_or(i32::MAX);
330                    Ok(Value::Int(n))
331                }
332                Value::Bytes(b) => {
333                    let n = i32::try_from(b.len()).unwrap_or(i32::MAX);
334                    Ok(Value::Int(n))
335                }
336                other => Err(EvalError::TypeMismatch {
337                    detail: format!(
338                        "octet_length() needs text or bytea, got {:?}",
339                        other.data_type()
340                    ),
341                }),
342            }
343        }
344        "upper" => {
345            if args.len() != 1 {
346                return Err(EvalError::TypeMismatch {
347                    detail: format!("upper() takes 1 arg, got {}", args.len()),
348                });
349            }
350            match &args[0] {
351                Value::Null => Ok(Value::Null),
352                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
353                other => Err(EvalError::TypeMismatch {
354                    detail: format!("upper() needs text, got {:?}", other.data_type()),
355                }),
356            }
357        }
358        "lower" => {
359            if args.len() != 1 {
360                return Err(EvalError::TypeMismatch {
361                    detail: format!("lower() takes 1 arg, got {}", args.len()),
362                });
363            }
364            match &args[0] {
365                Value::Null => Ok(Value::Null),
366                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
367                other => Err(EvalError::TypeMismatch {
368                    detail: format!("lower() needs text, got {:?}", other.data_type()),
369                }),
370            }
371        }
372        "abs" => {
373            if args.len() != 1 {
374                return Err(EvalError::TypeMismatch {
375                    detail: format!("abs() takes 1 arg, got {}", args.len()),
376                });
377            }
378            match &args[0] {
379                Value::Null => Ok(Value::Null),
380                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
381                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
382                Value::Float(x) => Ok(Value::Float(x.abs())),
383                other => Err(EvalError::TypeMismatch {
384                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
385                }),
386            }
387        }
388        "coalesce" => {
389            for a in args {
390                if !matches!(a, Value::Null) {
391                    return Ok(a.clone());
392                }
393            }
394            Ok(Value::Null)
395        }
396        "date_trunc" => date_trunc(args),
397        "date_part" => date_part(args),
398        "age" => age(args),
399        "to_char" => to_char(args),
400        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
401        "encode" => encode_text(args),
402        "decode" => decode_text(args),
403        "error_on_null" => error_on_null(args),
404        other => Err(EvalError::TypeMismatch {
405            detail: format!("unknown function `{other}`"),
406        }),
407    }
408}
409
410/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
411/// arguments; SPG's value space treats Text as the byte container
412/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
413/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
414/// hex.
415fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
416    if args.len() != 2 {
417        return Err(EvalError::TypeMismatch {
418            detail: format!("encode() takes 2 args, got {}", args.len()),
419        });
420    }
421    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
422        return Ok(Value::Null);
423    }
424    let bytes: &[u8] = match &args[0] {
425        Value::Text(s) => s.as_bytes(),
426        other => {
427            return Err(EvalError::TypeMismatch {
428                detail: format!(
429                    "encode() expects text bytes, got {:?}",
430                    other.data_type()
431                ),
432            });
433        }
434    };
435    let fmt = match &args[1] {
436        Value::Text(s) => s.to_ascii_lowercase(),
437        other => {
438            return Err(EvalError::TypeMismatch {
439                detail: format!(
440                    "encode() format must be text, got {:?}",
441                    other.data_type()
442                ),
443            });
444        }
445    };
446    let out = match fmt.as_str() {
447        "base64" => b64_encode(bytes, B64_STD),
448        "base64url" => b64_encode(bytes, B64_URL),
449        "base32hex" => b32hex_encode(bytes),
450        "hex" => hex_encode(bytes),
451        other => {
452            return Err(EvalError::TypeMismatch {
453                detail: format!("encode(): unknown format `{other}`"),
454            });
455        }
456    };
457    Ok(Value::Text(out))
458}
459
460/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
461/// Text containing the raw decoded bytes (caller may CAST to bytea
462/// equivalent if SPG adds bytea later).
463fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
464    if args.len() != 2 {
465        return Err(EvalError::TypeMismatch {
466            detail: format!("decode() takes 2 args, got {}", args.len()),
467        });
468    }
469    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
470        return Ok(Value::Null);
471    }
472    let text = match &args[0] {
473        Value::Text(s) => s.as_str(),
474        other => {
475            return Err(EvalError::TypeMismatch {
476                detail: format!("decode() expects text, got {:?}", other.data_type()),
477            });
478        }
479    };
480    let fmt = match &args[1] {
481        Value::Text(s) => s.to_ascii_lowercase(),
482        other => {
483            return Err(EvalError::TypeMismatch {
484                detail: format!(
485                    "decode() format must be text, got {:?}",
486                    other.data_type()
487                ),
488            });
489        }
490    };
491    let bytes = match fmt.as_str() {
492        "base64" => b64_decode(text, B64_STD)?,
493        "base64url" => b64_decode(text, B64_URL)?,
494        "base32hex" => b32hex_decode(text)?,
495        "hex" => hex_decode(text)?,
496        other => {
497            return Err(EvalError::TypeMismatch {
498                detail: format!("decode(): unknown format `{other}`"),
499            });
500        }
501    };
502    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
503        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
504    })?;
505    Ok(Value::Text(s))
506}
507
508/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
509/// errors otherwise. Convenience to assert NOT NULL inside an
510/// expression without wrapping it in COALESCE + raise hacks.
511fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
512    if args.len() != 1 {
513        return Err(EvalError::TypeMismatch {
514            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
515        });
516    }
517    if matches!(args[0], Value::Null) {
518        return Err(EvalError::TypeMismatch {
519            detail: "error_on_null(): argument is NULL".into(),
520        });
521    }
522    Ok(args[0].clone())
523}
524
525// ── byte-level encoders ───────────────────────────────────────────
526
527const B64_STD: &[u8; 64] =
528    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
529const B64_URL: &[u8; 64] =
530    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
531const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
532
533fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
534    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
535    let mut i = 0;
536    while i + 3 <= bytes.len() {
537        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
538        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
539        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
540        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
541        out.push(alpha[(n & 0x3f) as usize] as char);
542        i += 3;
543    }
544    let rem = bytes.len() - i;
545    if rem == 1 {
546        let n = (bytes[i] as u32) << 16;
547        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
548        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
549        out.push('=');
550        out.push('=');
551    } else if rem == 2 {
552        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
553        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
554        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
555        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
556        out.push('=');
557    }
558    out
559}
560
561fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
562    let mut lookup = [255u8; 256];
563    for (i, &c) in alpha.iter().enumerate() {
564        lookup[c as usize] = i as u8;
565    }
566    let mut out = Vec::with_capacity(text.len() * 3 / 4);
567    let mut buf: u32 = 0;
568    let mut bits: u32 = 0;
569    for c in text.bytes() {
570        if c == b'=' {
571            break;
572        }
573        if c == b'\n' || c == b'\r' || c == b' ' {
574            continue;
575        }
576        let v = lookup[c as usize];
577        if v == 255 {
578            return Err(EvalError::TypeMismatch {
579                detail: format!("decode(base64): invalid char {:?}", c as char),
580            });
581        }
582        buf = (buf << 6) | v as u32;
583        bits += 6;
584        if bits >= 8 {
585            bits -= 8;
586            out.push(((buf >> bits) & 0xff) as u8);
587        }
588    }
589    Ok(out)
590}
591
592fn b32hex_encode(bytes: &[u8]) -> String {
593    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
594    let mut buf: u64 = 0;
595    let mut bits: u32 = 0;
596    for &b in bytes {
597        buf = (buf << 8) | b as u64;
598        bits += 8;
599        while bits >= 5 {
600            bits -= 5;
601            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
602        }
603    }
604    if bits > 0 {
605        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
606    }
607    // Pad to multiple of 8.
608    while out.len() % 8 != 0 {
609        out.push('=');
610    }
611    out
612}
613
614fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
615    let mut lookup = [255u8; 256];
616    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
617        lookup[c as usize] = i as u8;
618        // base32hex is case-insensitive — also map lowercase.
619        let lower = (c as char).to_ascii_lowercase() as u8;
620        lookup[lower as usize] = i as u8;
621    }
622    let mut out = Vec::with_capacity(text.len() * 5 / 8);
623    let mut buf: u64 = 0;
624    let mut bits: u32 = 0;
625    for c in text.bytes() {
626        if c == b'=' {
627            break;
628        }
629        if c == b'\n' || c == b'\r' || c == b' ' {
630            continue;
631        }
632        let v = lookup[c as usize];
633        if v == 255 {
634            return Err(EvalError::TypeMismatch {
635                detail: format!("decode(base32hex): invalid char {:?}", c as char),
636            });
637        }
638        buf = (buf << 5) | v as u64;
639        bits += 5;
640        if bits >= 8 {
641            bits -= 8;
642            out.push(((buf >> bits) & 0xff) as u8);
643        }
644    }
645    Ok(out)
646}
647
648fn hex_encode(bytes: &[u8]) -> String {
649    const HEX: &[u8; 16] = b"0123456789abcdef";
650    let mut out = String::with_capacity(bytes.len() * 2);
651    for &b in bytes {
652        out.push(HEX[(b >> 4) as usize] as char);
653        out.push(HEX[(b & 0xf) as usize] as char);
654    }
655    out
656}
657
658fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
659    let trimmed = text.trim();
660    if trimmed.len() % 2 != 0 {
661        return Err(EvalError::TypeMismatch {
662            detail: "decode(hex): input length must be even".into(),
663        });
664    }
665    let mut out = Vec::with_capacity(trimmed.len() / 2);
666    let mut hi: u8 = 0;
667    for (i, c) in trimmed.bytes().enumerate() {
668        let v = match c {
669            b'0'..=b'9' => c - b'0',
670            b'a'..=b'f' => c - b'a' + 10,
671            b'A'..=b'F' => c - b'A' + 10,
672            _ => {
673                return Err(EvalError::TypeMismatch {
674                    detail: format!("decode(hex): invalid char {:?}", c as char),
675                });
676            }
677        };
678        if i % 2 == 0 {
679            hi = v;
680        } else {
681            out.push((hi << 4) | v);
682        }
683    }
684    Ok(out)
685}
686
687/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
688/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
689/// same `BigInt` return shape; PG returns double precision but we keep the
690/// integer convention so the runner's `query I` shape works unchanged.
691fn date_part(args: &[Value]) -> Result<Value, EvalError> {
692    use spg_sql::ast::ExtractField as F;
693    if args.len() != 2 {
694        return Err(EvalError::TypeMismatch {
695            detail: format!("date_part() takes 2 args, got {}", args.len()),
696        });
697    }
698    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
699        return Ok(Value::Null);
700    }
701    let Value::Text(field_name) = &args[0] else {
702        return Err(EvalError::TypeMismatch {
703            detail: format!(
704                "date_part() needs a text field, got {:?}",
705                args[0].data_type()
706            ),
707        });
708    };
709    let field = match field_name.to_ascii_lowercase().as_str() {
710        "year" => F::Year,
711        "month" => F::Month,
712        "day" => F::Day,
713        "hour" => F::Hour,
714        "minute" => F::Minute,
715        "second" => F::Second,
716        "microsecond" | "microseconds" => F::Microsecond,
717        other => {
718            return Err(EvalError::TypeMismatch {
719                detail: format!(
720                    "unknown date_part field {other:?}; \
721                     supported: year, month, day, hour, minute, second, microsecond"
722                ),
723            });
724        }
725    };
726    extract_field(field, &args[1])
727}
728
729/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
730/// micros-only interval (no months normalisation) because PG's
731/// month-justification rule is sensitive to the day-of-month walk and
732/// adds material complexity for marginal corpus value.
733///
734/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
735/// the dispatcher errors instead of guessing a clock source. Callers
736/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
737/// explicitly so the clock reference is visible at the SQL layer.
738fn age(args: &[Value]) -> Result<Value, EvalError> {
739    if args.is_empty() || args.len() > 2 {
740        return Err(EvalError::TypeMismatch {
741            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
742        });
743    }
744    if args.iter().any(|v| matches!(v, Value::Null)) {
745        return Ok(Value::Null);
746    }
747    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
748    // stays as-is; anything else errors.
749    let to_micros = |v: &Value| -> Result<i64, EvalError> {
750        match v {
751            Value::Timestamp(t) => Ok(*t),
752            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
753            other => Err(EvalError::TypeMismatch {
754                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
755            }),
756        }
757    };
758    if args.len() == 1 {
759        return Err(EvalError::TypeMismatch {
760            detail: "single-arg age() is unsupported in v2.12 \
761                     (use age(CURRENT_DATE, t) explicitly)"
762                .into(),
763        });
764    }
765    let a = to_micros(&args[0])?;
766    let b = to_micros(&args[1])?;
767    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
768        detail: "age() subtraction overflows i64 microseconds".into(),
769    })?;
770    Ok(Value::Interval {
771        months: 0,
772        micros: delta,
773    })
774}
775
776/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
777/// format template. Supports the high-traffic placeholders:
778///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
779/// Unrecognised characters pass through literally so the template's
780/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
781fn to_char(args: &[Value]) -> Result<Value, EvalError> {
782    use core::fmt::Write as _;
783    if args.len() != 2 {
784        return Err(EvalError::TypeMismatch {
785            detail: format!("to_char() takes 2 args, got {}", args.len()),
786        });
787    }
788    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
789        return Ok(Value::Null);
790    }
791    let Value::Text(fmt) = &args[1] else {
792        return Err(EvalError::TypeMismatch {
793            detail: format!(
794                "to_char() needs a text format, got {:?}",
795                args[1].data_type()
796            ),
797        });
798    };
799    let (days, day_micros) = match &args[0] {
800        Value::Date(d) => (*d, 0_i64),
801        Value::Timestamp(t) => {
802            let days = t.div_euclid(86_400_000_000);
803            (
804                i32::try_from(days).unwrap_or(i32::MAX),
805                t.rem_euclid(86_400_000_000),
806            )
807        }
808        other => {
809            return Err(EvalError::TypeMismatch {
810                detail: format!(
811                    "to_char() needs DATE or TIMESTAMP, got {:?}",
812                    other.data_type()
813                ),
814            });
815        }
816    };
817    let (y, mo, d) = civil_from_days(days);
818    let secs = day_micros / 1_000_000;
819    let frac = day_micros % 1_000_000;
820    // div_euclid keeps every value non-negative — the casts below are
821    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
822    // 1_000_000)`, so all three quantities fit in u32.
823    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
824    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
825    let ss = u32::try_from(secs % 60).unwrap_or(0);
826    let hh12 = match hh24 % 12 {
827        0 => 12,
828        x => x,
829    };
830    let ampm = if hh24 < 12 { "AM" } else { "PM" };
831    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
832    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
833
834    let mut out = String::with_capacity(fmt.len() + 8);
835    let bytes = fmt.as_bytes();
836    let mut i = 0;
837    // write! against a String never fails — discard the Result.
838    while i < bytes.len() {
839        // Try the longest prefixes first so "YYYY" wins over "YY".
840        let rest = &bytes[i..];
841        if rest.starts_with(b"YYYY") {
842            let _ = write!(out, "{y:04}");
843            i += 4;
844        } else if rest.starts_with(b"YY") {
845            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
846            let yy = (y.rem_euclid(100)) as u32;
847            let _ = write!(out, "{yy:02}");
848            i += 2;
849        } else if rest.starts_with(b"Month") {
850            out.push_str(MONTH_FULL[(mo - 1) as usize]);
851            i += 5;
852        } else if rest.starts_with(b"Mon") {
853            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
854            i += 3;
855        } else if rest.starts_with(b"MM") {
856            let _ = write!(out, "{mo:02}");
857            i += 2;
858        } else if rest.starts_with(b"DD") {
859            let _ = write!(out, "{d:02}");
860            i += 2;
861        } else if rest.starts_with(b"HH24") {
862            let _ = write!(out, "{hh24:02}");
863            i += 4;
864        } else if rest.starts_with(b"HH12") {
865            let _ = write!(out, "{hh12:02}");
866            i += 4;
867        } else if rest.starts_with(b"MI") {
868            let _ = write!(out, "{mi:02}");
869            i += 2;
870        } else if rest.starts_with(b"SS") {
871            let _ = write!(out, "{ss:02}");
872            i += 2;
873        } else if rest.starts_with(b"MS") {
874            let _ = write!(out, "{ms:03}");
875            i += 2;
876        } else if rest.starts_with(b"US") {
877            let _ = write!(out, "{us:06}");
878            i += 2;
879        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
880            out.push_str(ampm);
881            i += 2;
882        } else {
883            // Pass any non-placeholder byte through verbatim.
884            out.push(bytes[i] as char);
885            i += 1;
886        }
887    }
888    Ok(Value::Text(out))
889}
890
891const MONTH_FULL: [&str; 12] = [
892    "January",
893    "February",
894    "March",
895    "April",
896    "May",
897    "June",
898    "July",
899    "August",
900    "September",
901    "October",
902    "November",
903    "December",
904];
905const MONTH_ABBR: [&str; 12] = [
906    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
907];
908
909/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
910/// requested calendar boundary (year / month / day / hour / minute /
911/// second). Returns the truncated `TIMESTAMP`. NULL on either side
912/// propagates to NULL.
913fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
914    if args.len() != 2 {
915        return Err(EvalError::TypeMismatch {
916            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
917        });
918    }
919    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
920        return Ok(Value::Null);
921    }
922    let Value::Text(unit) = &args[0] else {
923        return Err(EvalError::TypeMismatch {
924            detail: format!(
925                "date_trunc() needs a text unit, got {:?}",
926                args[0].data_type()
927            ),
928        });
929    };
930    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
931    // midnight first; the result is always TIMESTAMP.
932    let micros = match &args[1] {
933        Value::Timestamp(t) => *t,
934        Value::Date(d) => i64::from(*d) * 86_400_000_000,
935        other => {
936            return Err(EvalError::TypeMismatch {
937                detail: format!(
938                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
939                    other.data_type()
940                ),
941            });
942        }
943    };
944    let unit_lc = unit.to_ascii_lowercase();
945    let days = micros.div_euclid(86_400_000_000);
946    let day_micros = micros.rem_euclid(86_400_000_000);
947    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
948    let (y, m, _) = civil_from_days(day_i32);
949    let truncated = match unit_lc.as_str() {
950        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
951        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
952        "day" => days * 86_400_000_000,
953        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
954        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
955        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
956        other => {
957            return Err(EvalError::TypeMismatch {
958                detail: format!(
959                    "unknown date_trunc unit {other:?}; \
960                     supported: year, month, day, hour, minute, second"
961                ),
962            });
963        }
964    };
965    Ok(Value::Timestamp(truncated))
966}
967
968/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
969pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
970    if matches!(v, Value::Null) {
971        return Ok(Value::Null);
972    }
973    match target {
974        CastTarget::Vector => cast_to_vector(v),
975        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
976        CastTarget::Int => cast_numeric_to_int(v),
977        CastTarget::BigInt => cast_numeric_to_bigint(v),
978        CastTarget::Float => cast_numeric_to_float(v),
979        CastTarget::Bool => cast_to_bool(v),
980        CastTarget::Date => cast_to_date(v),
981        // TIMESTAMP and TIMESTAMPTZ have identical runtime
982        // representation (i64 microseconds UTC).
983        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
984        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
985        // is supported (the mailrs idiom: `$1::INTERVAL` where the
986        // bound param is a string like `'7 days'`).
987        CastTarget::Interval => cast_to_interval(v),
988        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
989        // (validation is the producer's responsibility, same as
990        // the column-INSERT path).
991        CastTarget::Json | CastTarget::Jsonb => match v {
992            Value::Json(s) => Ok(Value::Json(s)),
993            Value::Text(s) => Ok(Value::Json(s)),
994            other => Err(EvalError::TypeMismatch {
995                detail: alloc::format!(
996                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
997                    other.data_type()
998                ),
999            }),
1000        },
1001        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
1002        // pg_catalog; surface a clear error.
1003        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
1004            detail:
1005                "::regtype / ::regclass not supported on SPG \
1006                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
1007                    .into(),
1008        }),
1009    }
1010}
1011
1012fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
1013    match v {
1014        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
1015        Value::Text(s) => {
1016            let (months, micros) = spg_sql::parser::parse_interval_text(&s)
1017                .ok_or_else(|| EvalError::TypeMismatch {
1018                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
1019                })?;
1020            Ok(Value::Interval { months, micros })
1021        }
1022        other => Err(EvalError::TypeMismatch {
1023            detail: alloc::format!(
1024                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
1025                other.data_type()
1026            ),
1027        }),
1028    }
1029}
1030
1031fn cast_to_date(v: Value) -> Result<Value, EvalError> {
1032    match v {
1033        Value::Date(d) => Ok(Value::Date(d)),
1034        // Integer literals carry days since the Unix epoch — used by
1035        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
1036        Value::Int(n) => Ok(Value::Date(n)),
1037        Value::BigInt(n) => {
1038            i32::try_from(n)
1039                .map(Value::Date)
1040                .map_err(|_| EvalError::TypeMismatch {
1041                    detail: "bigint days-since-epoch out of DATE range".into(),
1042                })
1043        }
1044        // Timestamp truncates to its day boundary.
1045        Value::Timestamp(t) => {
1046            let days = t.div_euclid(86_400_000_000);
1047            i32::try_from(days)
1048                .map(Value::Date)
1049                .map_err(|_| EvalError::TypeMismatch {
1050                    detail: "timestamp out of DATE range".into(),
1051                })
1052        }
1053        Value::Text(s) => parse_date_literal(&s)
1054            .map(Value::Date)
1055            .ok_or(EvalError::TypeMismatch {
1056                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
1057            }),
1058        other => Err(EvalError::TypeMismatch {
1059            detail: format!("cannot cast {:?} to DATE", other.data_type()),
1060        }),
1061    }
1062}
1063
1064fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
1065    match v {
1066        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
1067        // Int / BigInt carry microseconds since the Unix epoch — used
1068        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
1069        // the wall clock as a plain integer literal.
1070        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
1071        Value::BigInt(n) => Ok(Value::Timestamp(n)),
1072        // DATE → TIMESTAMP picks midnight on the date.
1073        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
1074        Value::Text(s) => {
1075            parse_timestamp_literal(&s)
1076                .map(Value::Timestamp)
1077                .ok_or(EvalError::TypeMismatch {
1078                    detail: format!(
1079                        "cannot parse {s:?} as TIMESTAMP \
1080                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
1081                    ),
1082                })
1083        }
1084        other => Err(EvalError::TypeMismatch {
1085            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
1086        }),
1087    }
1088}
1089
1090fn value_to_text(v: &Value) -> String {
1091    match v {
1092        // v7.5.0 — Value is #[non_exhaustive]; any future variant
1093        // without explicit text rendering hits the Debug fallback
1094        // at the end.
1095        Value::SmallInt(n) => format!("{n}"),
1096        Value::Int(n) => format!("{n}"),
1097        Value::BigInt(n) => format!("{n}"),
1098        Value::Float(x) => format!("{x}"),
1099        // v4.9: JSON renders identically to Text — both are raw UTF-8.
1100        Value::Text(s) | Value::Json(s) => s.clone(),
1101        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
1102        Value::Vector(v) => {
1103            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
1104            format!("[{}]", cells.join(", "))
1105        }
1106        // v6.0.1: render SQ8 cells dequantised, so SELECT output
1107        // matches the pgvector wire shape clients expect. The
1108        // recall envelope already absorbs the ≤ (max-min)/255/2
1109        // dequantisation error.
1110        Value::Sq8Vector(q) => {
1111            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
1112                .iter()
1113                .map(|x| format!("{x}"))
1114                .collect();
1115            format!("[{}]", cells.join(", "))
1116        }
1117        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
1118        // for SELECT output.
1119        Value::HalfVector(h) => {
1120            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
1121            format!("[{}]", cells.join(", "))
1122        }
1123        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
1124        Value::Date(d) => format_date(*d),
1125        Value::Timestamp(t) => format_timestamp(*t),
1126        Value::Interval { months, micros } => format_interval(*months, *micros),
1127        Value::Null => "NULL".into(),
1128        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
1129        _ => format!("{v:?}"),
1130    }
1131}
1132
1133/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
1134/// for pre-1970 dates render with a leading `-` on the year.
1135pub fn format_date(days: i32) -> String {
1136    let (y, m, d) = civil_from_days(days);
1137    format!("{y:04}-{m:02}-{d:02}")
1138}
1139
1140/// Render a `Timestamp` (microseconds since epoch) as
1141/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
1142/// dropped; a whole-second value has no fractional part.
1143pub fn format_timestamp(micros: i64) -> String {
1144    const MICROS_PER_DAY: i64 = 86_400_000_000;
1145    // Split into day + intra-day part with proper floor division so
1146    // negative timestamps render right too.
1147    let days = micros.div_euclid(MICROS_PER_DAY);
1148    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
1149    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1150    let (y, m, d) = civil_from_days(day_i32);
1151    let secs = day_micros / 1_000_000;
1152    let frac = day_micros % 1_000_000;
1153    let hh = secs / 3600;
1154    let mm = (secs / 60) % 60;
1155    let ss = secs % 60;
1156    if frac == 0 {
1157        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
1158    } else {
1159        // Strip trailing zeros from the 6-digit fractional component.
1160        let raw = format!("{frac:06}");
1161        let trimmed = raw.trim_end_matches('0');
1162        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
1163    }
1164}
1165
1166/// Howard Hinnant's `civil_from_days` — converts days since the Unix
1167/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
1168/// directions of this calendar conversion live in `eval.rs` so the
1169/// engine never reaches for `std` time facilities.
1170#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1171fn civil_from_days(days: i32) -> (i32, u32, u32) {
1172    let z = i64::from(days) + 719_468;
1173    let era = z.div_euclid(146_097);
1174    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
1175    // every other quantity below — `as u32` truncations are safe by
1176    // construction.
1177    let doe = (z - era * 146_097) as u32;
1178    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
1179    let y_base = i64::from(yoe) + era * 400;
1180    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
1181    let mp = (5 * doy + 2) / 153;
1182    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
1183    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1184    let y = if m <= 2 { y_base + 1 } else { y_base };
1185    (y as i32, m, d)
1186}
1187
1188/// Inverse of `civil_from_days` — converts (year, month, day) to days
1189/// since 1970-01-01. Out-of-range months / days saturate.
1190#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1191pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
1192    let y_adj = if m <= 2 {
1193        i64::from(y) - 1
1194    } else {
1195        i64::from(y)
1196    };
1197    let era = y_adj.div_euclid(400);
1198    let yoe = (y_adj - era * 400) as u32;
1199    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
1200    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1201    let total = era * 146_097 + i64::from(doe) - 719_468;
1202    i32::try_from(total).unwrap_or(i32::MAX)
1203}
1204
1205/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
1206/// `None` on shape / numeric failure; the engine surfaces that as a
1207/// `TypeMismatch` with the original text included.
1208pub fn parse_date_literal(s: &str) -> Option<i32> {
1209    let bytes = s.as_bytes();
1210    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
1211        return None;
1212    }
1213    let y: i32 = s[0..4].parse().ok()?;
1214    let m: u32 = s[5..7].parse().ok()?;
1215    let d: u32 = s[8..10].parse().ok()?;
1216    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
1217        return None;
1218    }
1219    Some(days_from_civil(y, m, d))
1220}
1221
1222/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
1223/// (microseconds since Unix epoch). The time portion is optional;
1224/// missing → midnight. The fractional portion accepts 1–6 digits and
1225/// pads with zeros to microseconds.
1226pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
1227    let trimmed = s.trim();
1228    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
1229        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
1230        None => (trimmed, None),
1231    };
1232    let days = parse_date_literal(date_part)?;
1233    let day_micros = match time_part {
1234        None => 0,
1235        Some(t) => parse_time_of_day_micros(t)?,
1236    };
1237    Some(i64::from(days) * 86_400_000_000 + day_micros)
1238}
1239
1240fn parse_time_of_day_micros(t: &str) -> Option<i64> {
1241    let (time, frac_str) = match t.split_once('.') {
1242        Some((a, b)) => (a, Some(b)),
1243        None => (t, None),
1244    };
1245    let bytes = time.as_bytes();
1246    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
1247        return None;
1248    }
1249    let hh: i64 = time[0..2].parse().ok()?;
1250    let mm: i64 = time[3..5].parse().ok()?;
1251    let ss: i64 = time[6..8].parse().ok()?;
1252    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
1253        return None;
1254    }
1255    let frac_micros: i64 = match frac_str {
1256        None => 0,
1257        Some(f) => {
1258            // Pad right with zeros to 6 digits, then truncate extras.
1259            if f.is_empty() || f.len() > 9 {
1260                return None;
1261            }
1262            let mut padded = String::with_capacity(6);
1263            padded.push_str(&f[..f.len().min(6)]);
1264            while padded.len() < 6 {
1265                padded.push('0');
1266            }
1267            padded.parse().ok()?
1268        }
1269    };
1270    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
1271}
1272
1273/// Render an `Interval { months, micros }` in a PG-ish shape. The output
1274/// mirrors `psql`'s text format: years/months from the months part,
1275/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
1276/// omitted; an all-zero interval renders as `0`.
1277pub fn format_interval(months: i32, micros: i64) -> String {
1278    const MICROS_PER_DAY: i64 = 86_400_000_000;
1279    let mut parts: Vec<String> = Vec::new();
1280    let years = months / 12;
1281    let mons = months % 12;
1282    // PG renders the unit in the singular only for `+1`; `-1` and any
1283    // other value pluralise. Helper closes over that rule.
1284    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
1285        if n == 1 { singular } else { plural }
1286    };
1287    if years != 0 {
1288        parts.push(format!(
1289            "{years} {}",
1290            unit(i64::from(years), "year", "years")
1291        ));
1292    }
1293    if mons != 0 {
1294        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
1295    }
1296    let days = micros / MICROS_PER_DAY;
1297    let mut rem = micros % MICROS_PER_DAY;
1298    if days != 0 {
1299        parts.push(format!("{days} {}", unit(days, "day", "days")));
1300    }
1301    if rem != 0 {
1302        let neg = rem < 0;
1303        if neg {
1304            rem = -rem;
1305        }
1306        let secs = rem / 1_000_000;
1307        let frac = rem % 1_000_000;
1308        let hh = secs / 3600;
1309        let mm = (secs / 60) % 60;
1310        let ss = secs % 60;
1311        let sign = if neg { "-" } else { "" };
1312        if frac == 0 {
1313            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
1314        } else {
1315            let raw = format!("{frac:06}");
1316            let trimmed = raw.trim_end_matches('0');
1317            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
1318        }
1319    }
1320    if parts.is_empty() {
1321        "0".into()
1322    } else {
1323        parts.join(" ")
1324    }
1325}
1326
1327/// Add `months` (signed) to a `(year, month, day)` triple using PG's
1328/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
1329fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
1330    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
1331    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
1332    let new_month_zero = total_months.rem_euclid(12);
1333    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1334    let new_month = (new_month_zero as u32) + 1;
1335    let max_day = days_in_month(new_year, new_month);
1336    (new_year, new_month, d.min(max_day))
1337}
1338
1339const fn days_in_month(y: i32, m: u32) -> u32 {
1340    match m {
1341        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
1342        2 => {
1343            // Proleptic Gregorian leap rule.
1344            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
1345                29
1346            } else {
1347                28
1348            }
1349        }
1350        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
1351        // first, but be defensive) get the 30-day fallback.
1352        _ => 30,
1353    }
1354}
1355
1356/// v7.10.4 — render a BYTEA payload in PG's hex output format
1357/// (`\x` prefix, lowercase hex pairs). Public so the wire layer
1358/// can emit the canonical bytea-as-text representation.
1359pub fn format_bytea_hex(b: &[u8]) -> String {
1360    let mut out = String::with_capacity(2 + 2 * b.len());
1361    out.push_str("\\x");
1362    const HEX: &[u8; 16] = b"0123456789abcdef";
1363    for byte in b {
1364        out.push(HEX[(byte >> 4) as usize] as char);
1365        out.push(HEX[(byte & 0x0F) as usize] as char);
1366    }
1367    out
1368}
1369
1370/// Render a `Numeric { scaled, scale }` as its decimal text form.
1371/// Negative `scaled` prepends `-` to the absolute value's digits; the
1372/// integer / fractional split is by character count, padding the
1373/// fractional side with leading zeros to exactly `scale` chars.
1374pub fn format_numeric(scaled: i128, scale: u8) -> String {
1375    if scale == 0 {
1376        return format!("{scaled}");
1377    }
1378    let negative = scaled < 0;
1379    let mag_str = scaled.unsigned_abs().to_string();
1380    let mag_bytes = mag_str.as_bytes();
1381    let scale_u = scale as usize;
1382    let mut out = String::with_capacity(mag_str.len() + 3);
1383    if negative {
1384        out.push('-');
1385    }
1386    if mag_bytes.len() <= scale_u {
1387        out.push('0');
1388        out.push('.');
1389        for _ in mag_bytes.len()..scale_u {
1390            out.push('0');
1391        }
1392        out.push_str(&mag_str);
1393    } else {
1394        let split = mag_bytes.len() - scale_u;
1395        out.push_str(&mag_str[..split]);
1396        out.push('.');
1397        out.push_str(&mag_str[split..]);
1398    }
1399    out
1400}
1401
1402fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
1403    match v {
1404        Value::Int(n) => Ok(Value::Int(n)),
1405        Value::BigInt(n) => i32::try_from(n)
1406            .map(Value::Int)
1407            .map_err(|_| EvalError::TypeMismatch {
1408                detail: format!("bigint {n} does not fit in int"),
1409            }),
1410        #[allow(clippy::cast_possible_truncation)]
1411        Value::Float(x) => Ok(Value::Int(x as i32)),
1412        Value::Text(s) => {
1413            s.trim()
1414                .parse::<i32>()
1415                .map(Value::Int)
1416                .map_err(|_| EvalError::TypeMismatch {
1417                    detail: format!("cannot parse {s:?} as int"),
1418                })
1419        }
1420        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
1421        other => Err(EvalError::TypeMismatch {
1422            detail: format!("cannot cast {:?} to int", other.data_type()),
1423        }),
1424    }
1425}
1426
1427fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
1428    match v {
1429        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
1430        Value::BigInt(n) => Ok(Value::BigInt(n)),
1431        #[allow(clippy::cast_possible_truncation)]
1432        Value::Float(x) => Ok(Value::BigInt(x as i64)),
1433        Value::Text(s) => {
1434            s.trim()
1435                .parse::<i64>()
1436                .map(Value::BigInt)
1437                .map_err(|_| EvalError::TypeMismatch {
1438                    detail: format!("cannot parse {s:?} as bigint"),
1439                })
1440        }
1441        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
1442        other => Err(EvalError::TypeMismatch {
1443            detail: format!("cannot cast {:?} to bigint", other.data_type()),
1444        }),
1445    }
1446}
1447
1448fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
1449    match v {
1450        Value::Int(n) => Ok(Value::Float(f64::from(n))),
1451        #[allow(clippy::cast_precision_loss)]
1452        Value::BigInt(n) => Ok(Value::Float(n as f64)),
1453        Value::Float(x) => Ok(Value::Float(x)),
1454        Value::Text(s) => {
1455            s.trim()
1456                .parse::<f64>()
1457                .map(Value::Float)
1458                .map_err(|_| EvalError::TypeMismatch {
1459                    detail: format!("cannot parse {s:?} as float"),
1460                })
1461        }
1462        other => Err(EvalError::TypeMismatch {
1463            detail: format!("cannot cast {:?} to float", other.data_type()),
1464        }),
1465    }
1466}
1467
1468fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
1469    match v {
1470        Value::Bool(b) => Ok(Value::Bool(b)),
1471        Value::Int(n) => Ok(Value::Bool(n != 0)),
1472        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
1473        Value::Text(s) => {
1474            let lo = s.trim().to_ascii_lowercase();
1475            match lo.as_str() {
1476                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
1477                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
1478                _ => Err(EvalError::TypeMismatch {
1479                    detail: format!("cannot parse {s:?} as bool"),
1480                }),
1481            }
1482        }
1483        other => Err(EvalError::TypeMismatch {
1484            detail: format!("cannot cast {:?} to bool", other.data_type()),
1485        }),
1486    }
1487}
1488
1489/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
1490/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
1491pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
1492    match v {
1493        Value::Null => Ok(Value::Null),
1494        Value::Vector(v) => Ok(Value::Vector(v)),
1495        Value::Text(s) => parse_vector_text(&s)
1496            .map(Value::Vector)
1497            .ok_or(EvalError::TypeMismatch {
1498                detail: format!("cannot parse {s:?} as a vector literal"),
1499            }),
1500        other => Err(EvalError::TypeMismatch {
1501            detail: format!("::vector requires text input, got {:?}", other.data_type()),
1502        }),
1503    }
1504}
1505
1506/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
1507fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
1508    let trimmed = s.trim();
1509    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
1510    let trimmed_inner = inner.trim();
1511    if trimmed_inner.is_empty() {
1512        return Some(Vec::new());
1513    }
1514    let mut out = Vec::new();
1515    for part in trimmed_inner.split(',') {
1516        let f: f32 = part.trim().parse().ok()?;
1517        out.push(f);
1518    }
1519    Some(out)
1520}
1521
1522fn literal_to_value(l: &Literal) -> Value {
1523    match l {
1524        Literal::Integer(n) => {
1525            if let Ok(small) = i32::try_from(*n) {
1526                Value::Int(small)
1527            } else {
1528                Value::BigInt(*n)
1529            }
1530        }
1531        Literal::Float(x) => Value::Float(*x),
1532        Literal::String(s) => Value::Text(s.clone()),
1533        Literal::Vector(v) => Value::Vector(v.clone()),
1534        Literal::Bool(b) => Value::Bool(*b),
1535        Literal::Null => Value::Null,
1536        Literal::Interval { months, micros, .. } => Value::Interval {
1537            months: *months,
1538            micros: *micros,
1539        },
1540    }
1541}
1542
1543fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1544    if let Some(q) = &c.qualifier {
1545        // Multi-table evaluation (joins): the synthesised schema uses
1546        // composite column names "alias.column" so we look that up
1547        // directly. Falls back to the single-table case below if the
1548        // composite isn't present.
1549        let composite = alloc::format!("{q}.{name}", name = c.name);
1550        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
1551            return Ok(row.values[pos].clone());
1552        }
1553        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
1554            qualifier: q.clone(),
1555        })?;
1556        if q != expected {
1557            return Err(EvalError::UnknownQualifier {
1558                qualifier: q.clone(),
1559            });
1560        }
1561    }
1562    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
1563        return Ok(row.values[pos].clone());
1564    }
1565    // Bare-name fallback for joined schemas: match any single composite
1566    // column ending in ".<name>"; ambiguity is an error.
1567    let suffix = alloc::format!(".{name}", name = c.name);
1568    let mut matches = ctx
1569        .columns
1570        .iter()
1571        .enumerate()
1572        .filter(|(_, s)| s.name.ends_with(&suffix));
1573    let first = matches.next();
1574    let extra = matches.next();
1575    match (first, extra) {
1576        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
1577        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
1578            detail: alloc::format!("ambiguous column reference: {}", c.name),
1579        }),
1580        _ => Err(EvalError::ColumnNotFound {
1581            name: c.name.clone(),
1582        }),
1583    }
1584}
1585
1586fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
1587    match (op, v) {
1588        (_, Value::Null) => Ok(Value::Null),
1589        (UnOp::Neg, Value::Int(n)) => {
1590            n.checked_neg()
1591                .map(Value::Int)
1592                .ok_or(EvalError::TypeMismatch {
1593                    detail: "integer overflow on unary -".into(),
1594                })
1595        }
1596        (UnOp::Neg, Value::BigInt(n)) => {
1597            n.checked_neg()
1598                .map(Value::BigInt)
1599                .ok_or(EvalError::TypeMismatch {
1600                    detail: "bigint overflow on unary -".into(),
1601                })
1602        }
1603        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
1604        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
1605            detail: format!("unary - applied to {:?}", other.data_type()),
1606        }),
1607        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
1608        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
1609            detail: format!("NOT applied to {:?}", other.data_type()),
1610        }),
1611    }
1612}
1613
1614/// v7.9.27b — true when two values are "not distinct" per PG:
1615/// both NULL counts as equal; otherwise reduces to regular Eq.
1616fn values_not_distinct(l: &Value, r: &Value) -> bool {
1617    match (l, r) {
1618        (Value::Null, Value::Null) => true,
1619        (Value::Null, _) | (_, Value::Null) => false,
1620        _ => l == r,
1621    }
1622}
1623
1624fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1625    // SQL three-valued logic for AND / OR with NULL is special — handle before
1626    // the general NULL-propagation rule.
1627    if let BinOp::And = op {
1628        return and_3vl(l, r);
1629    }
1630    if let BinOp::Or = op {
1631        return or_3vl(l, r);
1632    }
1633    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
1634    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
1635    if let BinOp::IsNotDistinctFrom = op {
1636        return Ok(Value::Bool(values_not_distinct(&l, &r)));
1637    }
1638    if let BinOp::IsDistinctFrom = op {
1639        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
1640    }
1641    // Everything else: any NULL operand → NULL.
1642    if l.is_null() || r.is_null() {
1643        return Ok(Value::Null);
1644    }
1645    // NUMERIC arithmetic and comparisons run in fixed-point; promote
1646    // integers to a common NUMERIC scale and stay in i128 throughout.
1647    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
1648        return apply_binary_numeric(op, l, r);
1649    }
1650    // Date / Timestamp arithmetic. PG semantics:
1651    //   * date + int      → date  (int is days)
1652    //   * int + date      → date
1653    //   * date - int      → date
1654    //   * date - date     → int   (days, signed)
1655    //   * timestamp - timestamp → bigint (microseconds, signed)
1656    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
1657    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
1658        return Ok(result);
1659    }
1660    match op {
1661        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
1662        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
1663        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
1664        BinOp::Div => div_op(l, r),
1665        BinOp::L2Distance => l2_distance(l, r),
1666        BinOp::InnerProduct => inner_product(l, r),
1667        BinOp::CosineDistance => cosine_distance(l, r),
1668        BinOp::Concat => Ok(text_concat(&l, &r)),
1669        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
1670        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
1671        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
1672        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
1673        BinOp::JsonContains => crate::json::contains(&l, &r),
1674        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1675            compare(op, &l, &r)
1676        }
1677        BinOp::And
1678        | BinOp::Or
1679        | BinOp::IsDistinctFrom
1680        | BinOp::IsNotDistinctFrom => unreachable!("handled above"),
1681    }
1682}
1683
1684/// Calendar arithmetic. Returns `Some(value)` when the operand pair
1685/// is a date/time combo this function understands, `None` to let the
1686/// caller fall through to the regular numeric / text paths.
1687fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
1688    let int_value = |v: &Value| -> Option<i64> {
1689        match v {
1690            Value::SmallInt(n) => Some(i64::from(*n)),
1691            Value::Int(n) => Some(i64::from(*n)),
1692            Value::BigInt(n) => Some(*n),
1693            _ => None,
1694        }
1695    };
1696    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
1697    // DATE-integer subtraction, otherwise the latter swallows the
1698    // former with an `int_value(Date) = None` no-op fall-through.
1699    match (l, r) {
1700        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
1701            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
1702        }
1703        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
1704            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
1705                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
1706            })?;
1707            return Ok(Some(Value::BigInt(delta)));
1708        }
1709        _ => {}
1710    }
1711    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
1712    // date ± interval → date (if interval is pure days/months with no
1713    // sub-day component) else timestamp, interval ± interval → interval.
1714    if let Some(out) = apply_binary_interval(op, l, r)? {
1715        return Ok(Some(out));
1716    }
1717    match (l, r) {
1718        (Value::Date(d), other) if op == BinOp::Add => {
1719            if let Some(n) = int_value(other) {
1720                let days = i64::from(*d).saturating_add(n);
1721                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1722                    detail: "DATE + integer overflows DATE range".into(),
1723                })?;
1724                return Ok(Some(Value::Date(days32)));
1725            }
1726        }
1727        (other, Value::Date(d)) if op == BinOp::Add => {
1728            if let Some(n) = int_value(other) {
1729                let days = i64::from(*d).saturating_add(n);
1730                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1731                    detail: "integer + DATE overflows DATE range".into(),
1732                })?;
1733                return Ok(Some(Value::Date(days32)));
1734            }
1735        }
1736        (Value::Date(d), other) if op == BinOp::Sub => {
1737            if let Some(n) = int_value(other) {
1738                let days = i64::from(*d).saturating_sub(n);
1739                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1740                    detail: "DATE - integer overflows DATE range".into(),
1741                })?;
1742                return Ok(Some(Value::Date(days32)));
1743            }
1744        }
1745        _ => {}
1746    }
1747    Ok(None)
1748}
1749
1750/// INTERVAL-aware binary ops. Recognises:
1751///   timestamp ± interval → timestamp
1752///   date ± interval      → date (if interval is integral days/months only)
1753///                       → timestamp (if interval has sub-day micros)
1754///   interval ± interval  → interval
1755/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
1756/// the caller can fall through.
1757fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
1758    // Normalise so the interval (if any) is always on the right for Add;
1759    // Sub stays left-handed because it isn't commutative.
1760    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
1761        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
1762        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
1763        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
1764        _ => return Ok(None),
1765    };
1766    let Value::Interval {
1767        months: rhs_months,
1768        micros: rhs_us,
1769    } = rhs
1770    else {
1771        unreachable!("rhs guaranteed to be Interval by the match above");
1772    };
1773    let signed_months = i64::from(*rhs_months) * sign;
1774    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
1775        detail: "INTERVAL micros overflows on negation".into(),
1776    })?;
1777    match lhs {
1778        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
1779            *t,
1780            signed_months,
1781            signed_micros,
1782        )?))),
1783        Value::Date(d) => {
1784            // Date + interval stays a date when the interval has zero
1785            // sub-day microseconds; otherwise promote to TIMESTAMP at
1786            // midnight of the (months-shifted) date first.
1787            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
1788            if day_aligned {
1789                let micros_per_day = 86_400_000_000_i64;
1790                let days_delta = signed_micros / micros_per_day;
1791                let shifted = shift_date_by_months(*d, signed_months)?;
1792                let new_days =
1793                    i64::from(shifted)
1794                        .checked_add(days_delta)
1795                        .ok_or(EvalError::TypeMismatch {
1796                            detail: "DATE ± INTERVAL overflows DATE range".into(),
1797                        })?;
1798                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
1799                    detail: "DATE ± INTERVAL overflows DATE range".into(),
1800                })?;
1801                Ok(Some(Value::Date(days32)))
1802            } else {
1803                let base =
1804                    i64::from(*d)
1805                        .checked_mul(86_400_000_000)
1806                        .ok_or(EvalError::TypeMismatch {
1807                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
1808                        })?;
1809                Ok(Some(Value::Timestamp(add_interval_to_micros(
1810                    base,
1811                    signed_months,
1812                    signed_micros,
1813                )?)))
1814            }
1815        }
1816        Value::Interval {
1817            months: lhs_months,
1818            micros: lhs_us,
1819        } => {
1820            let new_months = i64::from(*lhs_months)
1821                .checked_add(signed_months)
1822                .and_then(|n| i32::try_from(n).ok())
1823                .ok_or(EvalError::TypeMismatch {
1824                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
1825                })?;
1826            let new_micros = lhs_us
1827                .checked_add(signed_micros)
1828                .ok_or(EvalError::TypeMismatch {
1829                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
1830                })?;
1831            Ok(Some(Value::Interval {
1832                months: new_months,
1833                micros: new_micros,
1834            }))
1835        }
1836        _ => Err(EvalError::TypeMismatch {
1837            detail: format!(
1838                "operator {op:?} not defined for {:?} and INTERVAL",
1839                lhs.data_type()
1840            ),
1841        }),
1842    }
1843}
1844
1845/// Shift a `Date` by a signed number of months using the PG clamp rule.
1846fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
1847    let (y, m, day) = civil_from_days(d);
1848    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
1849        detail: "INTERVAL months delta out of i32 range".into(),
1850    })?;
1851    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
1852    Ok(days_from_civil(ny, nm, nd))
1853}
1854
1855/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
1856/// Months part is applied through civil calendar with clamp-to-last-day;
1857/// micros part is plain i64 addition with overflow guard.
1858fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
1859    let mut out = t;
1860    if months != 0 {
1861        const MICROS_PER_DAY: i64 = 86_400_000_000;
1862        let days = out.div_euclid(MICROS_PER_DAY);
1863        let day_micros = out.rem_euclid(MICROS_PER_DAY);
1864        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1865            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
1866        })?;
1867        let shifted_days = shift_date_by_months(day_i32, months)?;
1868        out = i64::from(shifted_days)
1869            .checked_mul(MICROS_PER_DAY)
1870            .and_then(|n| n.checked_add(day_micros))
1871            .ok_or(EvalError::TypeMismatch {
1872                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
1873            })?;
1874    }
1875    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
1876        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
1877    })
1878}
1879
1880/// Dispatch for any binary op when at least one operand is NUMERIC.
1881/// Other-side integers / floats are promoted to a NUMERIC at a common
1882/// scale; all add / sub / mul / div / compare paths stay in i128.
1883#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
1884fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1885    // Float still wins — Numeric + Float coerces both to f64 and runs
1886    // through the float path. PG demotes Numeric to float in this mix
1887    // too (the documented behaviour for `numeric + double precision`).
1888    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
1889    if float_path {
1890        let af = as_f64(&l)?;
1891        let bf = as_f64(&r)?;
1892        return match op {
1893            BinOp::Add => Ok(Value::Float(af + bf)),
1894            BinOp::Sub => Ok(Value::Float(af - bf)),
1895            BinOp::Mul => Ok(Value::Float(af * bf)),
1896            BinOp::Div => {
1897                if bf == 0.0 {
1898                    Err(EvalError::DivisionByZero)
1899                } else {
1900                    Ok(Value::Float(af / bf))
1901                }
1902            }
1903            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1904                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
1905                    detail: "NaN in NUMERIC/Float comparison".into(),
1906                })?;
1907                Ok(Value::Bool(cmp_to_bool(op, ord)))
1908            }
1909            BinOp::Concat => Ok(text_concat(&l, &r)),
1910            other => Err(EvalError::TypeMismatch {
1911                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
1912            }),
1913        };
1914    }
1915    // Promote integer ↔ numeric to a shared scale (max of both sides).
1916    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
1917        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
1918    })?;
1919    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
1920        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
1921    })?;
1922    match op {
1923        BinOp::Add | BinOp::Sub => {
1924            let target_scale = sa.max(sb);
1925            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
1926                detail: "NUMERIC overflow on rescale".into(),
1927            })?;
1928            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
1929                detail: "NUMERIC overflow on rescale".into(),
1930            })?;
1931            let r = match op {
1932                BinOp::Add => lhs.checked_add(rhs),
1933                BinOp::Sub => lhs.checked_sub(rhs),
1934                _ => unreachable!(),
1935            }
1936            .ok_or(EvalError::TypeMismatch {
1937                detail: "NUMERIC overflow on +/-".into(),
1938            })?;
1939            Ok(Value::Numeric {
1940                scaled: r,
1941                scale: target_scale,
1942            })
1943        }
1944        BinOp::Mul => {
1945            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
1946                detail: "NUMERIC overflow on *".into(),
1947            })?;
1948            Ok(Value::Numeric {
1949                scaled,
1950                scale: sa.saturating_add(sb),
1951            })
1952        }
1953        BinOp::Div => {
1954            if b == 0 {
1955                return Err(EvalError::DivisionByZero);
1956            }
1957            // Result scale: keep the wider operand's scale. Pre-scale
1958            // the numerator so the integer division retains that many
1959            // fractional digits. Round half-away-from-zero.
1960            let target_scale = sa.max(sb);
1961            // Numerator effective scale becomes sa + target_scale; we
1962            // bring it up to (target_scale + sb) so the divisor's scale
1963            // cancels cleanly.
1964            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
1965            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
1966                detail: "NUMERIC overflow on / scaling".into(),
1967            })?;
1968            let half = if b >= 0 { b / 2 } else { -(b / 2) };
1969            let adj = if (num >= 0) == (b >= 0) {
1970                num + half
1971            } else {
1972                num - half
1973            };
1974            Ok(Value::Numeric {
1975                scaled: adj / b,
1976                scale: target_scale,
1977            })
1978        }
1979        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1980            let target_scale = sa.max(sb);
1981            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
1982                detail: "NUMERIC overflow on rescale".into(),
1983            })?;
1984            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
1985                detail: "NUMERIC overflow on rescale".into(),
1986            })?;
1987            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
1988        }
1989        BinOp::Concat => Ok(text_concat(&l, &r)),
1990        other => Err(EvalError::TypeMismatch {
1991            detail: format!("operator {other:?} not defined for NUMERIC"),
1992        }),
1993    }
1994}
1995
1996/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
1997/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
1998/// returns `None` and the caller raises a type error.
1999fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
2000    match v {
2001        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
2002        Value::Int(n) => Some((i128::from(*n), 0)),
2003        Value::SmallInt(n) => Some((i128::from(*n), 0)),
2004        Value::BigInt(n) => Some((i128::from(*n), 0)),
2005        _ => None,
2006    }
2007}
2008
2009fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
2010    if src == dst {
2011        return Some(scaled);
2012    }
2013    if dst > src {
2014        scaled.checked_mul(pow10_i128(dst - src))
2015    } else {
2016        let drop = pow10_i128(src - dst);
2017        let half = drop / 2;
2018        let r = if scaled >= 0 {
2019            scaled + half
2020        } else {
2021            scaled - half
2022        };
2023        Some(r / drop)
2024    }
2025}
2026
2027const fn pow10_i128(p: u8) -> i128 {
2028    let mut acc: i128 = 1;
2029    let mut i = 0;
2030    while i < p {
2031        acc *= 10;
2032        i += 1;
2033    }
2034    acc
2035}
2036
2037const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
2038    use core::cmp::Ordering::{Equal, Greater, Less};
2039    match op {
2040        BinOp::Eq => matches!(ord, Equal),
2041        BinOp::NotEq => !matches!(ord, Equal),
2042        BinOp::Lt => matches!(ord, Less),
2043        BinOp::LtEq => matches!(ord, Less | Equal),
2044        BinOp::Gt => matches!(ord, Greater),
2045        BinOp::GtEq => matches!(ord, Greater | Equal),
2046        _ => false,
2047    }
2048}
2049
2050/// SQL `||` string concatenation. Operands are coerced to text via the same
2051/// rule as `::text` cast. NULL propagates (handled above; this function only
2052/// runs with non-NULL operands).
2053fn text_concat(l: &Value, r: &Value) -> Value {
2054    let a = value_to_text(l);
2055    let b = value_to_text(r);
2056    Value::Text(a + &b)
2057}
2058
2059/// pgvector inner-product `<#>`. Returns the *negative* dot product so
2060/// smaller still means more similar — same convention as pgvector.
2061fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
2062    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
2063    let mut dot: f64 = 0.0;
2064    for (x, y) in a.iter().zip(b.iter()) {
2065        dot += f64::from(*x) * f64::from(*y);
2066    }
2067    Ok(Value::Float(-dot))
2068}
2069
2070/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
2071/// operand produces NaN (matches pgvector).
2072fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2073    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
2074    let mut dot: f64 = 0.0;
2075    let mut na: f64 = 0.0;
2076    let mut nb: f64 = 0.0;
2077    for (x, y) in a.iter().zip(b.iter()) {
2078        let xf = f64::from(*x);
2079        let yf = f64::from(*y);
2080        dot += xf * yf;
2081        na += xf * xf;
2082        nb += yf * yf;
2083    }
2084    let denom = sqrt_newton(na) * sqrt_newton(nb);
2085    if denom == 0.0 {
2086        return Ok(Value::Float(f64::NAN));
2087    }
2088    Ok(Value::Float(1.0 - dot / denom))
2089}
2090
2091fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
2092    // v6.0.1: SQ8 cells coming through the SQL evaluator are
2093    // dequantised to f32 here so the existing scalar distance
2094    // arithmetic stays intact. HNSW kNN search continues to use
2095    // the asymmetric ADC variant inside `cell_to_query_metric_
2096    // distance` — this path only runs when a vector expression
2097    // lands in the evaluator (full-scan ORDER BY, SELECT
2098    // projection of `v <-> $1`, etc.).
2099    let to_f32 = |v: Value| -> Option<Vec<f32>> {
2100        match v {
2101            Value::Vector(a) => Some(a),
2102            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
2103            // v6.0.3: bit-exact dequant for halfvec cells.
2104            Value::HalfVector(h) => Some(h.to_f32_vec()),
2105            _ => None,
2106        }
2107    };
2108    let l_ty = l.data_type();
2109    let r_ty = r.data_type();
2110    match (to_f32(l), to_f32(r)) {
2111        (Some(a), Some(b)) => {
2112            if a.len() != b.len() {
2113                return Err(EvalError::TypeMismatch {
2114                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
2115                });
2116            }
2117            Ok((a, b))
2118        }
2119        _ => Err(EvalError::TypeMismatch {
2120            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
2121        }),
2122    }
2123}
2124
2125/// Numeric arithmetic with widening.
2126/// - both `Int` → `Int` (with overflow check)
2127/// - `Int` op `BigInt` (either side) → `BigInt`
2128/// - any `Float` involved → `Float`
2129fn arith(
2130    l: Value,
2131    r: Value,
2132    int_op: impl Fn(i64, i64) -> Option<i64>,
2133    float_op: impl Fn(f64, f64) -> f64,
2134    op_name: &str,
2135) -> Result<Value, EvalError> {
2136    // Widen SmallInt to Int up front so the rest of the arithmetic
2137    // table only deals with Int / BigInt / Float pairs.
2138    let widen = |v: Value| -> Value {
2139        match v {
2140            Value::SmallInt(n) => Value::Int(i32::from(n)),
2141            other => other,
2142        }
2143    };
2144    let l = widen(l);
2145    let r = widen(r);
2146    match (l, r) {
2147        (Value::Int(a), Value::Int(b)) => {
2148            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
2149                detail: format!("integer overflow on {op_name}"),
2150            })?;
2151            if let Ok(small) = i32::try_from(result) {
2152                Ok(Value::Int(small))
2153            } else {
2154                Ok(Value::BigInt(result))
2155            }
2156        }
2157        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
2158            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
2159                detail: format!("bigint overflow on {op_name}"),
2160            })?;
2161            Ok(Value::BigInt(result))
2162        }
2163        (Value::BigInt(a), Value::BigInt(b)) => {
2164            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
2165                detail: format!("bigint overflow on {op_name}"),
2166            })?;
2167            Ok(Value::BigInt(result))
2168        }
2169        (a, b)
2170            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
2171        {
2172            let af = as_f64(&a)?;
2173            let bf = as_f64(&b)?;
2174            Ok(Value::Float(float_op(af, bf)))
2175        }
2176        (a, b) => Err(EvalError::TypeMismatch {
2177            detail: format!(
2178                "{op_name} applied to non-numeric: {:?} vs {:?}",
2179                a.data_type(),
2180                b.data_type()
2181            ),
2182        }),
2183    }
2184}
2185
2186/// L2 (Euclidean) distance between two vectors of equal dimension.
2187/// Returned as `Value::Float(d)` so it composes with the existing
2188/// comparison / sort plumbing. Mismatched dims or non-vector operands
2189/// raise `TypeMismatch`.
2190#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
2191fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2192    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
2193    // cells dequantise on the way in. Sub-f64 precision loss is
2194    // negligible vs the dequantisation noise the SQ8 path already
2195    // ships with.
2196    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
2197    let mut sum: f64 = 0.0;
2198    for (x, y) in a.iter().zip(b.iter()) {
2199        let d = f64::from(*x) - f64::from(*y);
2200        sum += d * d;
2201    }
2202    Ok(Value::Float(sqrt_newton(sum)))
2203}
2204
2205/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
2206/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
2207/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
2208/// f32-derived distances, always non-negative, never NaN).
2209fn sqrt_newton(x: f64) -> f64 {
2210    if x <= 0.0 {
2211        return 0.0;
2212    }
2213    let mut g = x;
2214    // 10 iterations is conservative; 6 already converges to ulp for typical
2215    // distances.
2216    for _ in 0..10 {
2217        g = 0.5 * (g + x / g);
2218    }
2219    g
2220}
2221
2222fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
2223    let any_float = matches!(l.data_type(), Some(DataType::Float))
2224        || matches!(r.data_type(), Some(DataType::Float));
2225    if any_float {
2226        let a = as_f64(&l)?;
2227        let b = as_f64(&r)?;
2228        if b == 0.0 {
2229            return Err(EvalError::DivisionByZero);
2230        }
2231        return Ok(Value::Float(a / b));
2232    }
2233    arith(
2234        l,
2235        r,
2236        |a, b| {
2237            if b == 0 { None } else { Some(a / b) }
2238        },
2239        |a, b| a / b,
2240        "/",
2241    )
2242    .map_err(|e| match e {
2243        // The closure returns None on b == 0; translate that into the dedicated
2244        // DivisionByZero variant instead of "integer overflow on /".
2245        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
2246        other => other,
2247    })
2248}
2249
2250fn as_f64(v: &Value) -> Result<f64, EvalError> {
2251    match v {
2252        Value::SmallInt(n) => Ok(f64::from(*n)),
2253        Value::Int(n) => Ok(f64::from(*n)),
2254        #[allow(clippy::cast_precision_loss)]
2255        Value::BigInt(n) => Ok(*n as f64),
2256        Value::Float(x) => Ok(*x),
2257        #[allow(clippy::cast_precision_loss)]
2258        Value::Numeric { scaled, scale } => {
2259            let mut div = 1.0_f64;
2260            for _ in 0..*scale {
2261                div *= 10.0;
2262            }
2263            Ok((*scaled as f64) / div)
2264        }
2265        other => Err(EvalError::TypeMismatch {
2266            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
2267        }),
2268    }
2269}
2270
2271fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
2272    let ord = match (l, r) {
2273        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
2274        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
2275        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
2276        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
2277        (a, b)
2278            if matches!(a.data_type(), Some(DataType::Float))
2279                || matches!(b.data_type(), Some(DataType::Float)) =>
2280        {
2281            let af = as_f64(a)?;
2282            let bf = as_f64(b)?;
2283            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2284                detail: "NaN in comparison".into(),
2285            })?
2286        }
2287        (Value::Text(a), Value::Text(b)) => a.cmp(b),
2288        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
2289        // Date / Timestamp compare on their integer storage repr.
2290        // Cross-domain (Date vs Timestamp) lifts the Date to the
2291        // matching midnight TIMESTAMP first.
2292        (Value::Date(a), Value::Date(b)) => a.cmp(b),
2293        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
2294        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
2295        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
2296        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
2297        // column against a text literal lifts the literal into the
2298        // matching domain (e.g. `day >= '2024-01-01'`).
2299        (Value::Date(a), Value::Text(b)) => {
2300            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2301                detail: format!("cannot parse {b:?} as DATE for comparison"),
2302            })?;
2303            a.cmp(&bd)
2304        }
2305        (Value::Text(a), Value::Date(b)) => {
2306            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2307                detail: format!("cannot parse {a:?} as DATE for comparison"),
2308            })?;
2309            ad.cmp(b)
2310        }
2311        (Value::Timestamp(a), Value::Text(b)) => {
2312            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2313                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
2314            })?;
2315            a.cmp(&bt)
2316        }
2317        (Value::Text(a), Value::Timestamp(b)) => {
2318            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2319                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
2320            })?;
2321            at.cmp(b)
2322        }
2323        (a, b) => {
2324            return Err(EvalError::TypeMismatch {
2325                detail: format!(
2326                    "comparison between {:?} and {:?}",
2327                    a.data_type(),
2328                    b.data_type()
2329                ),
2330            });
2331        }
2332    };
2333    let result = match op {
2334        BinOp::Eq => ord.is_eq(),
2335        BinOp::NotEq => !ord.is_eq(),
2336        BinOp::Lt => ord.is_lt(),
2337        BinOp::LtEq => ord.is_le(),
2338        BinOp::Gt => ord.is_gt(),
2339        BinOp::GtEq => ord.is_ge(),
2340        BinOp::And
2341        | BinOp::Or
2342        | BinOp::Add
2343        | BinOp::Sub
2344        | BinOp::Mul
2345        | BinOp::Div
2346        | BinOp::L2Distance
2347        | BinOp::InnerProduct
2348        | BinOp::CosineDistance
2349        | BinOp::Concat
2350        | BinOp::JsonGet
2351        | BinOp::JsonGetText
2352        | BinOp::JsonGetPath
2353        | BinOp::JsonGetPathText
2354        | BinOp::JsonContains
2355        | BinOp::IsDistinctFrom
2356        | BinOp::IsNotDistinctFrom => {
2357            unreachable!("compare() only called with comparison ops")
2358        }
2359    };
2360    Ok(Value::Bool(result))
2361}
2362
2363// SQL three-valued AND / OR.
2364fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2365    match (l, r) {
2366        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
2367        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
2368        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2369        (a, b) => Err(EvalError::TypeMismatch {
2370            detail: format!(
2371                "AND on non-boolean: {:?} and {:?}",
2372                a.data_type(),
2373                b.data_type()
2374            ),
2375        }),
2376    }
2377}
2378
2379fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2380    match (l, r) {
2381        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
2382        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
2383        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2384        (a, b) => Err(EvalError::TypeMismatch {
2385            detail: format!(
2386                "OR on non-boolean: {:?} and {:?}",
2387                a.data_type(),
2388                b.data_type()
2389            ),
2390        }),
2391    }
2392}
2393
2394#[cfg(test)]
2395mod tests {
2396    use super::*;
2397    use alloc::vec;
2398    use spg_storage::{ColumnSchema, Row};
2399
2400    fn col(name: &str, ty: DataType) -> ColumnSchema {
2401        ColumnSchema::new(name, ty, true)
2402    }
2403
2404    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
2405        EvalContext::new(cols, alias)
2406    }
2407
2408    fn lit(n: i64) -> Expr {
2409        Expr::Literal(Literal::Integer(n))
2410    }
2411
2412    fn null() -> Expr {
2413        Expr::Literal(Literal::Null)
2414    }
2415
2416    fn col_ref(name: &str) -> Expr {
2417        Expr::Column(ColumnName {
2418            qualifier: None,
2419            name: name.into(),
2420        })
2421    }
2422
2423    #[test]
2424    fn literal_evaluates_to_value() {
2425        let r = Row::new(vec![]);
2426        let cs: [ColumnSchema; 0] = [];
2427        let c = ctx(&cs, None);
2428        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
2429        assert_eq!(
2430            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
2431            Value::Float(1.5)
2432        );
2433        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
2434    }
2435
2436    #[test]
2437    fn column_lookup_unqualified() {
2438        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
2439        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
2440        let c = ctx(&cs, None);
2441        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
2442        assert_eq!(
2443            eval_expr(&col_ref("b"), &r, &c).unwrap(),
2444            Value::Text("hi".into())
2445        );
2446    }
2447
2448    #[test]
2449    fn column_not_found_errors() {
2450        let cs = vec![col("a", DataType::Int)];
2451        let r = Row::new(vec![Value::Int(0)]);
2452        let c = ctx(&cs, None);
2453        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
2454        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
2455    }
2456
2457    #[test]
2458    fn qualified_column_matches_alias() {
2459        let cs = vec![col("a", DataType::Int)];
2460        let r = Row::new(vec![Value::Int(5)]);
2461        let c = ctx(&cs, Some("u"));
2462        let qualified = Expr::Column(ColumnName {
2463            qualifier: Some("u".into()),
2464            name: "a".into(),
2465        });
2466        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
2467    }
2468
2469    #[test]
2470    fn qualified_column_unknown_alias_errors() {
2471        let cs = vec![col("a", DataType::Int)];
2472        let r = Row::new(vec![Value::Int(5)]);
2473        let c = ctx(&cs, Some("u"));
2474        let wrong = Expr::Column(ColumnName {
2475            qualifier: Some("x".into()),
2476            name: "a".into(),
2477        });
2478        assert!(matches!(
2479            eval_expr(&wrong, &r, &c).unwrap_err(),
2480            EvalError::UnknownQualifier { .. }
2481        ));
2482    }
2483
2484    #[test]
2485    fn arithmetic_with_widening() {
2486        let r = Row::new(vec![]);
2487        let cs: [ColumnSchema; 0] = [];
2488        let c = ctx(&cs, None);
2489        let e = Expr::Binary {
2490            lhs: alloc::boxed::Box::new(lit(2)),
2491            op: BinOp::Add,
2492            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
2493        };
2494        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
2495    }
2496
2497    #[test]
2498    fn division_by_zero_errors() {
2499        let r = Row::new(vec![]);
2500        let cs: [ColumnSchema; 0] = [];
2501        let c = ctx(&cs, None);
2502        let e = Expr::Binary {
2503            lhs: alloc::boxed::Box::new(lit(1)),
2504            op: BinOp::Div,
2505            rhs: alloc::boxed::Box::new(lit(0)),
2506        };
2507        assert_eq!(
2508            eval_expr(&e, &r, &c).unwrap_err(),
2509            EvalError::DivisionByZero
2510        );
2511    }
2512
2513    #[test]
2514    fn comparison_returns_bool() {
2515        let r = Row::new(vec![]);
2516        let cs: [ColumnSchema; 0] = [];
2517        let c = ctx(&cs, None);
2518        let e = Expr::Binary {
2519            lhs: alloc::boxed::Box::new(lit(1)),
2520            op: BinOp::Lt,
2521            rhs: alloc::boxed::Box::new(lit(2)),
2522        };
2523        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2524    }
2525
2526    #[test]
2527    fn null_propagates_through_arithmetic() {
2528        let r = Row::new(vec![]);
2529        let cs: [ColumnSchema; 0] = [];
2530        let c = ctx(&cs, None);
2531        let e = Expr::Binary {
2532            lhs: alloc::boxed::Box::new(lit(1)),
2533            op: BinOp::Add,
2534            rhs: alloc::boxed::Box::new(null()),
2535        };
2536        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2537    }
2538
2539    #[test]
2540    fn and_three_valued_logic() {
2541        let r = Row::new(vec![]);
2542        let cs: [ColumnSchema; 0] = [];
2543        let c = ctx(&cs, None);
2544        let tt = |a: bool, b_null: bool| Expr::Binary {
2545            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2546            op: BinOp::And,
2547            rhs: alloc::boxed::Box::new(if b_null {
2548                null()
2549            } else {
2550                Expr::Literal(Literal::Bool(true))
2551            }),
2552        };
2553        // FALSE AND NULL → FALSE
2554        assert_eq!(
2555            eval_expr(&tt(false, true), &r, &c).unwrap(),
2556            Value::Bool(false)
2557        );
2558        // TRUE AND NULL → NULL
2559        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
2560        // TRUE AND TRUE → TRUE
2561        assert_eq!(
2562            eval_expr(&tt(true, false), &r, &c).unwrap(),
2563            Value::Bool(true)
2564        );
2565    }
2566
2567    #[test]
2568    fn or_three_valued_logic() {
2569        let r = Row::new(vec![]);
2570        let cs: [ColumnSchema; 0] = [];
2571        let c = ctx(&cs, None);
2572        let or_with_null = |a: bool| Expr::Binary {
2573            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2574            op: BinOp::Or,
2575            rhs: alloc::boxed::Box::new(null()),
2576        };
2577        // TRUE OR NULL → TRUE
2578        assert_eq!(
2579            eval_expr(&or_with_null(true), &r, &c).unwrap(),
2580            Value::Bool(true)
2581        );
2582        // FALSE OR NULL → NULL
2583        assert_eq!(
2584            eval_expr(&or_with_null(false), &r, &c).unwrap(),
2585            Value::Null
2586        );
2587    }
2588
2589    #[test]
2590    fn not_on_null_is_null() {
2591        let r = Row::new(vec![]);
2592        let cs: [ColumnSchema; 0] = [];
2593        let c = ctx(&cs, None);
2594        let e = Expr::Unary {
2595            op: UnOp::Not,
2596            expr: alloc::boxed::Box::new(null()),
2597        };
2598        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2599    }
2600
2601    #[test]
2602    fn text_comparison_lexicographic() {
2603        let r = Row::new(vec![]);
2604        let cs: [ColumnSchema; 0] = [];
2605        let c = ctx(&cs, None);
2606        let e = Expr::Binary {
2607            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
2608            op: BinOp::Lt,
2609            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
2610        };
2611        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2612    }
2613
2614    #[test]
2615    fn interval_format_basics() {
2616        assert_eq!(format_interval(0, 0), "0");
2617        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
2618        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
2619        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
2620        assert_eq!(
2621            format_interval(0, 86_400_000_000 + 9_000_000),
2622            "1 day 00:00:09"
2623        );
2624        assert_eq!(format_interval(14, 0), "1 year 2 mons");
2625        assert_eq!(format_interval(-1, 0), "-1 mons");
2626    }
2627
2628    #[test]
2629    fn interval_add_to_timestamp_micros_part() {
2630        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
2631        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
2632        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
2633        let expected = ts + 3_600_000_000;
2634        assert_eq!(r, expected);
2635    }
2636
2637    #[test]
2638    fn interval_clamp_month_end() {
2639        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
2640        let d = days_from_civil(2024, 1, 31);
2641        let shifted = shift_date_by_months(d, 1).unwrap();
2642        let (y, m, day) = civil_from_days(shifted);
2643        assert_eq!((y, m, day), (2024, 2, 29));
2644        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
2645        let d = days_from_civil(2023, 1, 31);
2646        let shifted = shift_date_by_months(d, 1).unwrap();
2647        let (y, m, day) = civil_from_days(shifted);
2648        assert_eq!((y, m, day), (2023, 2, 28));
2649        // 2024-03-31 - 1 month = 2024-02-29.
2650        let d = days_from_civil(2024, 3, 31);
2651        let shifted = shift_date_by_months(d, -1).unwrap();
2652        let (y, m, day) = civil_from_days(shifted);
2653        assert_eq!((y, m, day), (2024, 2, 29));
2654    }
2655
2656    #[test]
2657    fn interval_date_plus_pure_days_stays_date() {
2658        // DATE + INTERVAL '7 days' must stay DATE.
2659        let d = days_from_civil(2024, 6, 1);
2660        let lhs = Value::Date(d);
2661        let rhs = Value::Interval {
2662            months: 0,
2663            micros: 7 * 86_400_000_000,
2664        };
2665        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2666            .unwrap()
2667            .unwrap();
2668        let expected = days_from_civil(2024, 6, 8);
2669        assert_eq!(v, Value::Date(expected));
2670    }
2671
2672    #[test]
2673    fn interval_date_plus_sub_day_lifts_to_timestamp() {
2674        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
2675        let d = days_from_civil(2024, 6, 1);
2676        let lhs = Value::Date(d);
2677        let rhs = Value::Interval {
2678            months: 0,
2679            micros: 3_600_000_000,
2680        };
2681        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2682            .unwrap()
2683            .unwrap();
2684        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
2685        assert_eq!(v, Value::Timestamp(expected));
2686    }
2687}