Skip to main content

spg_engine/
eval.rs

1//! Expression evaluator. Given a parsed `Expr`, a `Row`, and the row's column
2//! schema, produce a `Value`. v0.4 implements:
3//!
4//! - literals
5//! - column lookups (bare and qualified `t.col`)
6//! - unary minus / NOT
7//! - binary arithmetic, comparison, AND, OR
8//! - numeric widening (`Int → BigInt → Float`) at evaluation time
9//! - SQL three-valued logic for NULL:
10//!     * any arithmetic / comparison op with a NULL operand → NULL
11//!     * `TRUE OR NULL` → TRUE, `FALSE OR NULL` → NULL,
12//!     * `FALSE AND NULL` → FALSE, `TRUE AND NULL` → NULL,
13//!     * `NOT NULL` → NULL
14//!
15//! v0.4 deliberately does *not* implement: function calls, string
16//! concatenation, IS NULL / IS NOT NULL, BETWEEN, IN, etc. Those come later.
17
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21
22use spg_sql::ast::{BinOp, CastTarget, ColumnName, Expr, Literal, UnOp};
23use spg_storage::{ColumnSchema, DataType, Row, Value};
24
25/// Resolution context for evaluating a single row. `table_alias` is the alias
26/// (or table name) callers should accept as the qualifier on a column ref —
27/// e.g. `FROM users AS u` makes `u.name` valid and rejects `other.name`.
28#[derive(Debug, Clone)]
29pub struct EvalContext<'a> {
30    pub columns: &'a [ColumnSchema],
31    pub table_alias: Option<&'a str>,
32    /// v6.1.1 — bound parameters for `$N` placeholders inside the
33    /// expression tree. Empty for simple queries; populated by the
34    /// prepared-statement Execute path with Bind values converted
35    /// to `Value`. Index N (1-based per PG) hits `params[N-1]`.
36    pub params: &'a [Value],
37}
38
39impl<'a> EvalContext<'a> {
40    pub const fn new(columns: &'a [ColumnSchema], table_alias: Option<&'a str>) -> Self {
41        Self {
42            columns,
43            table_alias,
44            params: &[],
45        }
46    }
47
48    /// v6.1.1 — attach a parameter buffer for `$N` placeholder
49    /// resolution. The slice must outlive the context; callers
50    /// construct it from the prepared statement's Bind values.
51    #[must_use]
52    pub const fn with_params(mut self, params: &'a [Value]) -> Self {
53        self.params = params;
54        self
55    }
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum EvalError {
60    ColumnNotFound { name: String },
61    UnknownQualifier { qualifier: String },
62    DivisionByZero,
63    TypeMismatch { detail: String },
64    /// v6.1.1 — `$N` reference past the number of bound parameters.
65    /// Either the client sent too few in Bind, or the SQL has a
66    /// placeholder the prepared statement didn't account for.
67    PlaceholderOutOfRange { n: u16, bound: u16 },
68}
69
70impl core::fmt::Display for EvalError {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        match self {
73            Self::ColumnNotFound { name } => write!(f, "column not found: {name}"),
74            Self::UnknownQualifier { qualifier } => {
75                write!(f, "unknown table qualifier: {qualifier}")
76            }
77            Self::DivisionByZero => f.write_str("division by zero"),
78            Self::TypeMismatch { detail } => write!(f, "type mismatch: {detail}"),
79            Self::PlaceholderOutOfRange { n, bound } => write!(
80                f,
81                "parameter ${n} referenced but only {bound} bound by client"
82            ),
83        }
84    }
85}
86
87pub fn eval_expr(expr: &Expr, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
88    match expr {
89        Expr::Literal(l) => Ok(literal_to_value(l)),
90        Expr::Column(c) => resolve_column(c, row, ctx),
91        Expr::Placeholder(n) => {
92            let idx = usize::from(*n).saturating_sub(1);
93            ctx.params
94                .get(idx)
95                .cloned()
96                .ok_or_else(|| EvalError::PlaceholderOutOfRange {
97                    n: *n,
98                    bound: u16::try_from(ctx.params.len()).unwrap_or(u16::MAX),
99                })
100        }
101        Expr::Unary { op, expr } => {
102            let v = eval_expr(expr, row, ctx)?;
103            apply_unary(*op, v)
104        }
105        Expr::Binary { lhs, op, rhs } => {
106            let l = eval_expr(lhs, row, ctx)?;
107            let r = eval_expr(rhs, row, ctx)?;
108            apply_binary(*op, l, r)
109        }
110        Expr::Cast { expr, target } => {
111            let v = eval_expr(expr, row, ctx)?;
112            cast_value(v, *target)
113        }
114        Expr::IsNull { expr, negated } => {
115            let v = eval_expr(expr, row, ctx)?;
116            let is_null = matches!(v, Value::Null);
117            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
118        }
119        Expr::FunctionCall { name, args } => {
120            let evaluated: Result<Vec<Value>, _> =
121                args.iter().map(|a| eval_expr(a, row, ctx)).collect();
122            apply_function(name, &evaluated?)
123        }
124        Expr::Like {
125            expr,
126            pattern,
127            negated,
128        } => {
129            let v = eval_expr(expr, row, ctx)?;
130            let p = eval_expr(pattern, row, ctx)?;
131            // NULL on either side propagates to NULL — same as PG.
132            let (text, pat) = match (v, p) {
133                (Value::Null, _) | (_, Value::Null) => return Ok(Value::Null),
134                (Value::Text(a), Value::Text(b)) => (a, b),
135                (Value::Text(_), other) | (other, _) => {
136                    return Err(EvalError::TypeMismatch {
137                        detail: format!("LIKE requires text operands, got {:?}", other.data_type()),
138                    });
139                }
140            };
141            let m = like_match(&text, &pat);
142            Ok(Value::Bool(if *negated { !m } else { m }))
143        }
144        Expr::Extract { field, source } => {
145            let v = eval_expr(source, row, ctx)?;
146            extract_field(*field, &v)
147        }
148        // v4.10: subquery nodes should have been resolved into
149        // Literal / Binary-Eq-OR chains by Engine::resolve_select_subqueries
150        // before the row loop. Anything reaching here is a bug.
151        Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
152            Err(EvalError::TypeMismatch {
153                detail: "subquery reached row eval — engine resolver bug".into(),
154            })
155        }
156        // v4.12: window functions should have been rewritten into
157        // synthetic __win_N column references by
158        // exec_select_with_window before row eval. Anything
159        // reaching here is similarly a bug.
160        Expr::WindowFunction { .. } => Err(EvalError::TypeMismatch {
161            detail: "window function reached row eval — engine rewrite bug".into(),
162        }),
163    }
164}
165
166/// Pull an integer component (year / month / ... / microsecond) out
167/// of a `DATE` or `TIMESTAMP`. Returns NULL on a NULL source, errors
168/// when the source isn't a calendar type.
169fn extract_field(field: spg_sql::ast::ExtractField, v: &Value) -> Result<Value, EvalError> {
170    use spg_sql::ast::ExtractField as F;
171    if matches!(v, Value::Null) {
172        return Ok(Value::Null);
173    }
174    // INTERVAL has its own decomposition — `YEAR` / `MONTH` come from
175    // the months part, the rest from the microseconds part. PG matches
176    // this convention (months is normalised modulo 12 for MONTH).
177    if let Value::Interval { months, micros } = *v {
178        let years = months / 12;
179        let mons = months % 12;
180        let secs_total = micros / 1_000_000;
181        let frac = micros % 1_000_000;
182        let result = match field {
183            F::Year => i64::from(years),
184            F::Month => i64::from(mons),
185            F::Day => micros / 86_400_000_000,
186            F::Hour => (secs_total / 3600) % 24,
187            F::Minute => (secs_total / 60) % 60,
188            F::Second => secs_total % 60,
189            F::Microsecond => (secs_total % 60) * 1_000_000 + frac,
190        };
191        return Ok(Value::BigInt(result));
192    }
193    let (days, day_micros) = match *v {
194        Value::Date(d) => (d, 0_i64),
195        Value::Timestamp(t) => {
196            let days = t.div_euclid(86_400_000_000);
197            let day_micros = t.rem_euclid(86_400_000_000);
198            (i32::try_from(days).unwrap_or(i32::MAX), day_micros)
199        }
200        _ => {
201            return Err(EvalError::TypeMismatch {
202                detail: format!(
203                    "EXTRACT requires DATE / TIMESTAMP / INTERVAL, got {:?}",
204                    v.data_type()
205                ),
206            });
207        }
208    };
209    let (y, m, d) = civil_components(days);
210    let secs = day_micros / 1_000_000;
211    let hh = secs / 3600;
212    let mm = (secs / 60) % 60;
213    let ss = secs % 60;
214    let frac = day_micros % 1_000_000;
215    let result = match field {
216        F::Year => i64::from(y),
217        F::Month => i64::from(m),
218        F::Day => i64::from(d),
219        F::Hour => hh,
220        F::Minute => mm,
221        F::Second => ss,
222        F::Microsecond => ss * 1_000_000 + frac,
223    };
224    Ok(Value::BigInt(result))
225}
226
227/// Internal wrapper around the file-private `civil_from_days` so the
228/// public surface area doesn't change. Returns `(year, month, day)`.
229fn civil_components(days: i32) -> (i32, u32, u32) {
230    civil_from_days(days)
231}
232
233/// SQL `LIKE` matcher. Wildcards are `%` (any run, possibly empty) and `_`
234/// (exactly one char). `\` escapes the next pattern char so `\%` matches a
235/// literal `%`. Matches the whole input — no implicit anchoring needed
236/// since SQL `LIKE` is always full-string.
237fn like_match(text: &str, pattern: &str) -> bool {
238    let text: Vec<char> = text.chars().collect();
239    let pat: Vec<char> = pattern.chars().collect();
240    like_match_inner(&text, 0, &pat, 0)
241}
242
243fn like_match_inner(text: &[char], mut ti: usize, pat: &[char], mut pi: usize) -> bool {
244    while pi < pat.len() {
245        match pat[pi] {
246            '%' => {
247                // Collapse consecutive `%` and try every possible split.
248                while pi < pat.len() && pat[pi] == '%' {
249                    pi += 1;
250                }
251                if pi == pat.len() {
252                    return true;
253                }
254                for k in ti..=text.len() {
255                    if like_match_inner(text, k, pat, pi) {
256                        return true;
257                    }
258                }
259                return false;
260            }
261            '_' => {
262                if ti >= text.len() {
263                    return false;
264                }
265                ti += 1;
266                pi += 1;
267            }
268            '\\' if pi + 1 < pat.len() => {
269                let want = pat[pi + 1];
270                if ti >= text.len() || text[ti] != want {
271                    return false;
272                }
273                ti += 1;
274                pi += 2;
275            }
276            c => {
277                if ti >= text.len() || text[ti] != c {
278                    return false;
279                }
280                ti += 1;
281                pi += 1;
282            }
283        }
284    }
285    ti == text.len()
286}
287
288/// Dispatch on lowercased function name. v1.4 implements only a handful of
289/// scalar functions; aggregates land in v1.5 alongside GROUP BY.
290fn apply_function(name: &str, args: &[Value]) -> Result<Value, EvalError> {
291    match name.to_ascii_lowercase().as_str() {
292        "length" => {
293            if args.len() != 1 {
294                return Err(EvalError::TypeMismatch {
295                    detail: format!("length() takes 1 arg, got {}", args.len()),
296                });
297            }
298            match &args[0] {
299                Value::Null => Ok(Value::Null),
300                Value::Text(s) => {
301                    let n = i32::try_from(s.chars().count()).unwrap_or(i32::MAX);
302                    Ok(Value::Int(n))
303                }
304                other => Err(EvalError::TypeMismatch {
305                    detail: format!("length() needs text, got {:?}", other.data_type()),
306                }),
307            }
308        }
309        "upper" => {
310            if args.len() != 1 {
311                return Err(EvalError::TypeMismatch {
312                    detail: format!("upper() takes 1 arg, got {}", args.len()),
313                });
314            }
315            match &args[0] {
316                Value::Null => Ok(Value::Null),
317                Value::Text(s) => Ok(Value::Text(s.to_uppercase())),
318                other => Err(EvalError::TypeMismatch {
319                    detail: format!("upper() needs text, got {:?}", other.data_type()),
320                }),
321            }
322        }
323        "lower" => {
324            if args.len() != 1 {
325                return Err(EvalError::TypeMismatch {
326                    detail: format!("lower() takes 1 arg, got {}", args.len()),
327                });
328            }
329            match &args[0] {
330                Value::Null => Ok(Value::Null),
331                Value::Text(s) => Ok(Value::Text(s.to_lowercase())),
332                other => Err(EvalError::TypeMismatch {
333                    detail: format!("lower() needs text, got {:?}", other.data_type()),
334                }),
335            }
336        }
337        "abs" => {
338            if args.len() != 1 {
339                return Err(EvalError::TypeMismatch {
340                    detail: format!("abs() takes 1 arg, got {}", args.len()),
341                });
342            }
343            match &args[0] {
344                Value::Null => Ok(Value::Null),
345                Value::Int(n) => Ok(Value::Int(n.wrapping_abs())),
346                Value::BigInt(n) => Ok(Value::BigInt(n.wrapping_abs())),
347                Value::Float(x) => Ok(Value::Float(x.abs())),
348                other => Err(EvalError::TypeMismatch {
349                    detail: format!("abs() needs numeric, got {:?}", other.data_type()),
350                }),
351            }
352        }
353        "coalesce" => {
354            for a in args {
355                if !matches!(a, Value::Null) {
356                    return Ok(a.clone());
357                }
358            }
359            Ok(Value::Null)
360        }
361        "date_trunc" => date_trunc(args),
362        "date_part" => date_part(args),
363        "age" => age(args),
364        "to_char" => to_char(args),
365        // v6.4.3 — encode/decode + error_on_null SQL function bundle.
366        "encode" => encode_text(args),
367        "decode" => decode_text(args),
368        "error_on_null" => error_on_null(args),
369        other => Err(EvalError::TypeMismatch {
370            detail: format!("unknown function `{other}`"),
371        }),
372    }
373}
374
375/// v6.4.3 — `encode(bytes_as_text, format)`. PG works on bytea
376/// arguments; SPG's value space treats Text as the byte container
377/// (raw UTF-8 bytes). Supported formats: base64 (PG default),
378/// base64url (RFC 4648 §5), base32hex (RFC 4648 §7 extended-hex),
379/// hex.
380fn encode_text(args: &[Value]) -> Result<Value, EvalError> {
381    if args.len() != 2 {
382        return Err(EvalError::TypeMismatch {
383            detail: format!("encode() takes 2 args, got {}", args.len()),
384        });
385    }
386    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
387        return Ok(Value::Null);
388    }
389    let bytes: &[u8] = match &args[0] {
390        Value::Text(s) => s.as_bytes(),
391        other => {
392            return Err(EvalError::TypeMismatch {
393                detail: format!(
394                    "encode() expects text bytes, got {:?}",
395                    other.data_type()
396                ),
397            });
398        }
399    };
400    let fmt = match &args[1] {
401        Value::Text(s) => s.to_ascii_lowercase(),
402        other => {
403            return Err(EvalError::TypeMismatch {
404                detail: format!(
405                    "encode() format must be text, got {:?}",
406                    other.data_type()
407                ),
408            });
409        }
410    };
411    let out = match fmt.as_str() {
412        "base64" => b64_encode(bytes, B64_STD),
413        "base64url" => b64_encode(bytes, B64_URL),
414        "base32hex" => b32hex_encode(bytes),
415        "hex" => hex_encode(bytes),
416        other => {
417            return Err(EvalError::TypeMismatch {
418                detail: format!("encode(): unknown format `{other}`"),
419            });
420        }
421    };
422    Ok(Value::Text(out))
423}
424
425/// v6.4.3 — `decode(text, format)`. Inverse of `encode`; returns
426/// Text containing the raw decoded bytes (caller may CAST to bytea
427/// equivalent if SPG adds bytea later).
428fn decode_text(args: &[Value]) -> Result<Value, EvalError> {
429    if args.len() != 2 {
430        return Err(EvalError::TypeMismatch {
431            detail: format!("decode() takes 2 args, got {}", args.len()),
432        });
433    }
434    if matches!(args[0], Value::Null) || matches!(args[1], Value::Null) {
435        return Ok(Value::Null);
436    }
437    let text = match &args[0] {
438        Value::Text(s) => s.as_str(),
439        other => {
440            return Err(EvalError::TypeMismatch {
441                detail: format!("decode() expects text, got {:?}", other.data_type()),
442            });
443        }
444    };
445    let fmt = match &args[1] {
446        Value::Text(s) => s.to_ascii_lowercase(),
447        other => {
448            return Err(EvalError::TypeMismatch {
449                detail: format!(
450                    "decode() format must be text, got {:?}",
451                    other.data_type()
452                ),
453            });
454        }
455    };
456    let bytes = match fmt.as_str() {
457        "base64" => b64_decode(text, B64_STD)?,
458        "base64url" => b64_decode(text, B64_URL)?,
459        "base32hex" => b32hex_decode(text)?,
460        "hex" => hex_decode(text)?,
461        other => {
462            return Err(EvalError::TypeMismatch {
463                detail: format!("decode(): unknown format `{other}`"),
464            });
465        }
466    };
467    let s = String::from_utf8(bytes).map_err(|_| EvalError::TypeMismatch {
468        detail: "decode(): result bytes are not valid UTF-8 (SPG stores raw bytes as Text)".into(),
469    })?;
470    Ok(Value::Text(s))
471}
472
473/// v6.4.3 — `error_on_null(v)`. Returns `v` unchanged if non-NULL;
474/// errors otherwise. Convenience to assert NOT NULL inside an
475/// expression without wrapping it in COALESCE + raise hacks.
476fn error_on_null(args: &[Value]) -> Result<Value, EvalError> {
477    if args.len() != 1 {
478        return Err(EvalError::TypeMismatch {
479            detail: format!("error_on_null() takes 1 arg, got {}", args.len()),
480        });
481    }
482    if matches!(args[0], Value::Null) {
483        return Err(EvalError::TypeMismatch {
484            detail: "error_on_null(): argument is NULL".into(),
485        });
486    }
487    Ok(args[0].clone())
488}
489
490// ── byte-level encoders ───────────────────────────────────────────
491
492const B64_STD: &[u8; 64] =
493    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
494const B64_URL: &[u8; 64] =
495    b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
496const B32HEX_ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHIJKLMNOPQRSTUV";
497
498fn b64_encode(bytes: &[u8], alpha: &[u8; 64]) -> String {
499    let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
500    let mut i = 0;
501    while i + 3 <= bytes.len() {
502        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
503        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
504        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
505        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
506        out.push(alpha[(n & 0x3f) as usize] as char);
507        i += 3;
508    }
509    let rem = bytes.len() - i;
510    if rem == 1 {
511        let n = (bytes[i] as u32) << 16;
512        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
513        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
514        out.push('=');
515        out.push('=');
516    } else if rem == 2 {
517        let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
518        out.push(alpha[((n >> 18) & 0x3f) as usize] as char);
519        out.push(alpha[((n >> 12) & 0x3f) as usize] as char);
520        out.push(alpha[((n >> 6) & 0x3f) as usize] as char);
521        out.push('=');
522    }
523    out
524}
525
526fn b64_decode(text: &str, alpha: &[u8; 64]) -> Result<Vec<u8>, EvalError> {
527    let mut lookup = [255u8; 256];
528    for (i, &c) in alpha.iter().enumerate() {
529        lookup[c as usize] = i as u8;
530    }
531    let mut out = Vec::with_capacity(text.len() * 3 / 4);
532    let mut buf: u32 = 0;
533    let mut bits: u32 = 0;
534    for c in text.bytes() {
535        if c == b'=' {
536            break;
537        }
538        if c == b'\n' || c == b'\r' || c == b' ' {
539            continue;
540        }
541        let v = lookup[c as usize];
542        if v == 255 {
543            return Err(EvalError::TypeMismatch {
544                detail: format!("decode(base64): invalid char {:?}", c as char),
545            });
546        }
547        buf = (buf << 6) | v as u32;
548        bits += 6;
549        if bits >= 8 {
550            bits -= 8;
551            out.push(((buf >> bits) & 0xff) as u8);
552        }
553    }
554    Ok(out)
555}
556
557fn b32hex_encode(bytes: &[u8]) -> String {
558    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
559    let mut buf: u64 = 0;
560    let mut bits: u32 = 0;
561    for &b in bytes {
562        buf = (buf << 8) | b as u64;
563        bits += 8;
564        while bits >= 5 {
565            bits -= 5;
566            out.push(B32HEX_ALPHABET[((buf >> bits) & 0x1f) as usize] as char);
567        }
568    }
569    if bits > 0 {
570        out.push(B32HEX_ALPHABET[((buf << (5 - bits)) & 0x1f) as usize] as char);
571    }
572    // Pad to multiple of 8.
573    while out.len() % 8 != 0 {
574        out.push('=');
575    }
576    out
577}
578
579fn b32hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
580    let mut lookup = [255u8; 256];
581    for (i, &c) in B32HEX_ALPHABET.iter().enumerate() {
582        lookup[c as usize] = i as u8;
583        // base32hex is case-insensitive — also map lowercase.
584        let lower = (c as char).to_ascii_lowercase() as u8;
585        lookup[lower as usize] = i as u8;
586    }
587    let mut out = Vec::with_capacity(text.len() * 5 / 8);
588    let mut buf: u64 = 0;
589    let mut bits: u32 = 0;
590    for c in text.bytes() {
591        if c == b'=' {
592            break;
593        }
594        if c == b'\n' || c == b'\r' || c == b' ' {
595            continue;
596        }
597        let v = lookup[c as usize];
598        if v == 255 {
599            return Err(EvalError::TypeMismatch {
600                detail: format!("decode(base32hex): invalid char {:?}", c as char),
601            });
602        }
603        buf = (buf << 5) | v as u64;
604        bits += 5;
605        if bits >= 8 {
606            bits -= 8;
607            out.push(((buf >> bits) & 0xff) as u8);
608        }
609    }
610    Ok(out)
611}
612
613fn hex_encode(bytes: &[u8]) -> String {
614    const HEX: &[u8; 16] = b"0123456789abcdef";
615    let mut out = String::with_capacity(bytes.len() * 2);
616    for &b in bytes {
617        out.push(HEX[(b >> 4) as usize] as char);
618        out.push(HEX[(b & 0xf) as usize] as char);
619    }
620    out
621}
622
623fn hex_decode(text: &str) -> Result<Vec<u8>, EvalError> {
624    let trimmed = text.trim();
625    if trimmed.len() % 2 != 0 {
626        return Err(EvalError::TypeMismatch {
627            detail: "decode(hex): input length must be even".into(),
628        });
629    }
630    let mut out = Vec::with_capacity(trimmed.len() / 2);
631    let mut hi: u8 = 0;
632    for (i, c) in trimmed.bytes().enumerate() {
633        let v = match c {
634            b'0'..=b'9' => c - b'0',
635            b'a'..=b'f' => c - b'a' + 10,
636            b'A'..=b'F' => c - b'A' + 10,
637            _ => {
638                return Err(EvalError::TypeMismatch {
639                    detail: format!("decode(hex): invalid char {:?}", c as char),
640                });
641            }
642        };
643        if i % 2 == 0 {
644            hi = v;
645        } else {
646            out.push((hi << 4) | v);
647        }
648    }
649    Ok(out)
650}
651
652/// `date_part(field_text, source)` — function form of `EXTRACT(field FROM
653/// source)`. Same component dispatch (DATE / TIMESTAMP / INTERVAL) and
654/// same `BigInt` return shape; PG returns double precision but we keep the
655/// integer convention so the runner's `query I` shape works unchanged.
656fn date_part(args: &[Value]) -> Result<Value, EvalError> {
657    use spg_sql::ast::ExtractField as F;
658    if args.len() != 2 {
659        return Err(EvalError::TypeMismatch {
660            detail: format!("date_part() takes 2 args, got {}", args.len()),
661        });
662    }
663    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
664        return Ok(Value::Null);
665    }
666    let Value::Text(field_name) = &args[0] else {
667        return Err(EvalError::TypeMismatch {
668            detail: format!(
669                "date_part() needs a text field, got {:?}",
670                args[0].data_type()
671            ),
672        });
673    };
674    let field = match field_name.to_ascii_lowercase().as_str() {
675        "year" => F::Year,
676        "month" => F::Month,
677        "day" => F::Day,
678        "hour" => F::Hour,
679        "minute" => F::Minute,
680        "second" => F::Second,
681        "microsecond" | "microseconds" => F::Microsecond,
682        other => {
683            return Err(EvalError::TypeMismatch {
684                detail: format!(
685                    "unknown date_part field {other:?}; \
686                     supported: year, month, day, hour, minute, second, microsecond"
687                ),
688            });
689        }
690    };
691    extract_field(field, &args[1])
692}
693
694/// `age(t1, t2)` — return `t1 - t2` as an INTERVAL. v2.12 produces a
695/// micros-only interval (no months normalisation) because PG's
696/// month-justification rule is sensitive to the day-of-month walk and
697/// adds material complexity for marginal corpus value.
698///
699/// `age(t)` (single-arg form) is intentionally unsupported in v2.12:
700/// the dispatcher errors instead of guessing a clock source. Callers
701/// who want PG's `age(t)` semantics should write `age(CURRENT_DATE, t)`
702/// explicitly so the clock reference is visible at the SQL layer.
703fn age(args: &[Value]) -> Result<Value, EvalError> {
704    if args.is_empty() || args.len() > 2 {
705        return Err(EvalError::TypeMismatch {
706            detail: format!("age() takes 1 or 2 args, got {}", args.len()),
707        });
708    }
709    if args.iter().any(|v| matches!(v, Value::Null)) {
710        return Ok(Value::Null);
711    }
712    // Coerce to TIMESTAMP micros — DATE lifts to midnight; TIMESTAMP
713    // stays as-is; anything else errors.
714    let to_micros = |v: &Value| -> Result<i64, EvalError> {
715        match v {
716            Value::Timestamp(t) => Ok(*t),
717            Value::Date(d) => Ok(i64::from(*d) * 86_400_000_000),
718            other => Err(EvalError::TypeMismatch {
719                detail: format!("age() needs DATE or TIMESTAMP, got {:?}", other.data_type()),
720            }),
721        }
722    };
723    if args.len() == 1 {
724        return Err(EvalError::TypeMismatch {
725            detail: "single-arg age() is unsupported in v2.12 \
726                     (use age(CURRENT_DATE, t) explicitly)"
727                .into(),
728        });
729    }
730    let a = to_micros(&args[0])?;
731    let b = to_micros(&args[1])?;
732    let delta = a.checked_sub(b).ok_or(EvalError::TypeMismatch {
733        detail: "age() subtraction overflows i64 microseconds".into(),
734    })?;
735    Ok(Value::Interval {
736        months: 0,
737        micros: delta,
738    })
739}
740
741/// `to_char(value, format)` — render a DATE / TIMESTAMP through a PG
742/// format template. Supports the high-traffic placeholders:
743///   YYYY YY MM Mon Month DD HH24 HH12 MI SS MS US AM PM
744/// Unrecognised characters pass through literally so the template's
745/// punctuation ('-', ':', ' ', '/') needs no escape mechanism.
746fn to_char(args: &[Value]) -> Result<Value, EvalError> {
747    use core::fmt::Write as _;
748    if args.len() != 2 {
749        return Err(EvalError::TypeMismatch {
750            detail: format!("to_char() takes 2 args, got {}", args.len()),
751        });
752    }
753    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
754        return Ok(Value::Null);
755    }
756    let Value::Text(fmt) = &args[1] else {
757        return Err(EvalError::TypeMismatch {
758            detail: format!(
759                "to_char() needs a text format, got {:?}",
760                args[1].data_type()
761            ),
762        });
763    };
764    let (days, day_micros) = match &args[0] {
765        Value::Date(d) => (*d, 0_i64),
766        Value::Timestamp(t) => {
767            let days = t.div_euclid(86_400_000_000);
768            (
769                i32::try_from(days).unwrap_or(i32::MAX),
770                t.rem_euclid(86_400_000_000),
771            )
772        }
773        other => {
774            return Err(EvalError::TypeMismatch {
775                detail: format!(
776                    "to_char() needs DATE or TIMESTAMP, got {:?}",
777                    other.data_type()
778                ),
779            });
780        }
781    };
782    let (y, mo, d) = civil_from_days(days);
783    let secs = day_micros / 1_000_000;
784    let frac = day_micros % 1_000_000;
785    // div_euclid keeps every value non-negative — the casts below are
786    // sign-safe by construction. `secs ∈ [0, 86400)`, `frac ∈ [0,
787    // 1_000_000)`, so all three quantities fit in u32.
788    let hh24 = u32::try_from(secs / 3600).unwrap_or(0);
789    let mi = u32::try_from((secs / 60) % 60).unwrap_or(0);
790    let ss = u32::try_from(secs % 60).unwrap_or(0);
791    let hh12 = match hh24 % 12 {
792        0 => 12,
793        x => x,
794    };
795    let ampm = if hh24 < 12 { "AM" } else { "PM" };
796    let ms = u32::try_from(frac / 1_000).unwrap_or(0); // millisecond
797    let us = u32::try_from(frac).unwrap_or(0); // microsecond (0..1_000_000)
798
799    let mut out = String::with_capacity(fmt.len() + 8);
800    let bytes = fmt.as_bytes();
801    let mut i = 0;
802    // write! against a String never fails — discard the Result.
803    while i < bytes.len() {
804        // Try the longest prefixes first so "YYYY" wins over "YY".
805        let rest = &bytes[i..];
806        if rest.starts_with(b"YYYY") {
807            let _ = write!(out, "{y:04}");
808            i += 4;
809        } else if rest.starts_with(b"YY") {
810            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
811            let yy = (y.rem_euclid(100)) as u32;
812            let _ = write!(out, "{yy:02}");
813            i += 2;
814        } else if rest.starts_with(b"Month") {
815            out.push_str(MONTH_FULL[(mo - 1) as usize]);
816            i += 5;
817        } else if rest.starts_with(b"Mon") {
818            out.push_str(MONTH_ABBR[(mo - 1) as usize]);
819            i += 3;
820        } else if rest.starts_with(b"MM") {
821            let _ = write!(out, "{mo:02}");
822            i += 2;
823        } else if rest.starts_with(b"DD") {
824            let _ = write!(out, "{d:02}");
825            i += 2;
826        } else if rest.starts_with(b"HH24") {
827            let _ = write!(out, "{hh24:02}");
828            i += 4;
829        } else if rest.starts_with(b"HH12") {
830            let _ = write!(out, "{hh12:02}");
831            i += 4;
832        } else if rest.starts_with(b"MI") {
833            let _ = write!(out, "{mi:02}");
834            i += 2;
835        } else if rest.starts_with(b"SS") {
836            let _ = write!(out, "{ss:02}");
837            i += 2;
838        } else if rest.starts_with(b"MS") {
839            let _ = write!(out, "{ms:03}");
840            i += 2;
841        } else if rest.starts_with(b"US") {
842            let _ = write!(out, "{us:06}");
843            i += 2;
844        } else if rest.starts_with(b"AM") || rest.starts_with(b"PM") {
845            out.push_str(ampm);
846            i += 2;
847        } else {
848            // Pass any non-placeholder byte through verbatim.
849            out.push(bytes[i] as char);
850            i += 1;
851        }
852    }
853    Ok(Value::Text(out))
854}
855
856const MONTH_FULL: [&str; 12] = [
857    "January",
858    "February",
859    "March",
860    "April",
861    "May",
862    "June",
863    "July",
864    "August",
865    "September",
866    "October",
867    "November",
868    "December",
869];
870const MONTH_ABBR: [&str; 12] = [
871    "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
872];
873
874/// `date_trunc(unit, timestamp)` — round a `TIMESTAMP` down to the
875/// requested calendar boundary (year / month / day / hour / minute /
876/// second). Returns the truncated `TIMESTAMP`. NULL on either side
877/// propagates to NULL.
878fn date_trunc(args: &[Value]) -> Result<Value, EvalError> {
879    if args.len() != 2 {
880        return Err(EvalError::TypeMismatch {
881            detail: format!("date_trunc() takes 2 args, got {}", args.len()),
882        });
883    }
884    if matches!(&args[0], Value::Null) || matches!(&args[1], Value::Null) {
885        return Ok(Value::Null);
886    }
887    let Value::Text(unit) = &args[0] else {
888        return Err(EvalError::TypeMismatch {
889            detail: format!(
890                "date_trunc() needs a text unit, got {:?}",
891                args[0].data_type()
892            ),
893        });
894    };
895    // Both DATE and TIMESTAMP sources are accepted. DATE lifts to
896    // midnight first; the result is always TIMESTAMP.
897    let micros = match &args[1] {
898        Value::Timestamp(t) => *t,
899        Value::Date(d) => i64::from(*d) * 86_400_000_000,
900        other => {
901            return Err(EvalError::TypeMismatch {
902                detail: format!(
903                    "date_trunc() needs DATE or TIMESTAMP, got {:?}",
904                    other.data_type()
905                ),
906            });
907        }
908    };
909    let unit_lc = unit.to_ascii_lowercase();
910    let days = micros.div_euclid(86_400_000_000);
911    let day_micros = micros.rem_euclid(86_400_000_000);
912    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
913    let (y, m, _) = civil_from_days(day_i32);
914    let truncated = match unit_lc.as_str() {
915        "year" => i64::from(days_from_civil(y, 1, 1)) * 86_400_000_000,
916        "month" => i64::from(days_from_civil(y, m, 1)) * 86_400_000_000,
917        "day" => days * 86_400_000_000,
918        "hour" => days * 86_400_000_000 + (day_micros / 3_600_000_000) * 3_600_000_000,
919        "minute" => days * 86_400_000_000 + (day_micros / 60_000_000) * 60_000_000,
920        "second" => days * 86_400_000_000 + (day_micros / 1_000_000) * 1_000_000,
921        other => {
922            return Err(EvalError::TypeMismatch {
923                detail: format!(
924                    "unknown date_trunc unit {other:?}; \
925                     supported: year, month, day, hour, minute, second"
926                ),
927            });
928        }
929    };
930    Ok(Value::Timestamp(truncated))
931}
932
933/// PG-style `expr::TYPE` coercion. NULL always casts as NULL.
934pub fn cast_value(v: Value, target: CastTarget) -> Result<Value, EvalError> {
935    if matches!(v, Value::Null) {
936        return Ok(Value::Null);
937    }
938    match target {
939        CastTarget::Vector => cast_to_vector(v),
940        CastTarget::Text => Ok(Value::Text(value_to_text(&v))),
941        CastTarget::Int => cast_numeric_to_int(v),
942        CastTarget::BigInt => cast_numeric_to_bigint(v),
943        CastTarget::Float => cast_numeric_to_float(v),
944        CastTarget::Bool => cast_to_bool(v),
945        CastTarget::Date => cast_to_date(v),
946        // TIMESTAMP and TIMESTAMPTZ have identical runtime
947        // representation (i64 microseconds UTC).
948        CastTarget::Timestamp | CastTarget::Timestamptz => cast_to_timestamp(v),
949        // v7.9.25 — `expr::INTERVAL`. Currently only TEXT → Interval
950        // is supported (the mailrs idiom: `$1::INTERVAL` where the
951        // bound param is a string like `'7 days'`).
952        CastTarget::Interval => cast_to_interval(v),
953        // v7.9.25 — `::json` / `::jsonb`. Routes Text → Json
954        // (validation is the producer's responsibility, same as
955        // the column-INSERT path).
956        CastTarget::Json | CastTarget::Jsonb => match v {
957            Value::Json(s) => Ok(Value::Json(s)),
958            Value::Text(s) => Ok(Value::Json(s)),
959            other => Err(EvalError::TypeMismatch {
960                detail: alloc::format!(
961                    "::json / ::jsonb only accepts TEXT-shape inputs, got {:?}",
962                    other.data_type()
963                ),
964            }),
965        },
966        // v7.9.26 — `::regtype` / `::regclass`. SPG has no
967        // pg_catalog; surface a clear error.
968        CastTarget::RegType | CastTarget::RegClass => Err(EvalError::TypeMismatch {
969            detail:
970                "::regtype / ::regclass not supported on SPG \
971                 (no pg_catalog); use SHOW TABLES / spg_table_ddl instead"
972                    .into(),
973        }),
974    }
975}
976
977fn cast_to_interval(v: Value) -> Result<Value, EvalError> {
978    match v {
979        Value::Interval { months, micros } => Ok(Value::Interval { months, micros }),
980        Value::Text(s) => {
981            let (months, micros) = spg_sql::parser::parse_interval_text(&s)
982                .ok_or_else(|| EvalError::TypeMismatch {
983                    detail: alloc::format!("cannot parse {s:?} as INTERVAL"),
984                })?;
985            Ok(Value::Interval { months, micros })
986        }
987        other => Err(EvalError::TypeMismatch {
988            detail: alloc::format!(
989                "::INTERVAL only accepts TEXT-shape inputs, got {:?}",
990                other.data_type()
991            ),
992        }),
993    }
994}
995
996fn cast_to_date(v: Value) -> Result<Value, EvalError> {
997    match v {
998        Value::Date(d) => Ok(Value::Date(d)),
999        // Integer literals carry days since the Unix epoch — used by
1000        // the `CURRENT_DATE` AST rewrite to inject the wall clock.
1001        Value::Int(n) => Ok(Value::Date(n)),
1002        Value::BigInt(n) => {
1003            i32::try_from(n)
1004                .map(Value::Date)
1005                .map_err(|_| EvalError::TypeMismatch {
1006                    detail: "bigint days-since-epoch out of DATE range".into(),
1007                })
1008        }
1009        // Timestamp truncates to its day boundary.
1010        Value::Timestamp(t) => {
1011            let days = t.div_euclid(86_400_000_000);
1012            i32::try_from(days)
1013                .map(Value::Date)
1014                .map_err(|_| EvalError::TypeMismatch {
1015                    detail: "timestamp out of DATE range".into(),
1016                })
1017        }
1018        Value::Text(s) => parse_date_literal(&s)
1019            .map(Value::Date)
1020            .ok_or(EvalError::TypeMismatch {
1021                detail: format!("cannot parse {s:?} as DATE (expected YYYY-MM-DD)"),
1022            }),
1023        other => Err(EvalError::TypeMismatch {
1024            detail: format!("cannot cast {:?} to DATE", other.data_type()),
1025        }),
1026    }
1027}
1028
1029fn cast_to_timestamp(v: Value) -> Result<Value, EvalError> {
1030    match v {
1031        Value::Timestamp(t) => Ok(Value::Timestamp(t)),
1032        // Int / BigInt carry microseconds since the Unix epoch — used
1033        // by the `NOW()` / `CURRENT_TIMESTAMP` AST rewrite to inject
1034        // the wall clock as a plain integer literal.
1035        Value::Int(n) => Ok(Value::Timestamp(i64::from(n))),
1036        Value::BigInt(n) => Ok(Value::Timestamp(n)),
1037        // DATE → TIMESTAMP picks midnight on the date.
1038        Value::Date(d) => Ok(Value::Timestamp(i64::from(d) * 86_400_000_000)),
1039        Value::Text(s) => {
1040            parse_timestamp_literal(&s)
1041                .map(Value::Timestamp)
1042                .ok_or(EvalError::TypeMismatch {
1043                    detail: format!(
1044                        "cannot parse {s:?} as TIMESTAMP \
1045                     (expected YYYY-MM-DD[ HH:MM:SS[.ffffff]])"
1046                    ),
1047                })
1048        }
1049        other => Err(EvalError::TypeMismatch {
1050            detail: format!("cannot cast {:?} to TIMESTAMP", other.data_type()),
1051        }),
1052    }
1053}
1054
1055fn value_to_text(v: &Value) -> String {
1056    match v {
1057        // v7.5.0 — Value is #[non_exhaustive]; any future variant
1058        // without explicit text rendering hits the Debug fallback
1059        // at the end.
1060        Value::SmallInt(n) => format!("{n}"),
1061        Value::Int(n) => format!("{n}"),
1062        Value::BigInt(n) => format!("{n}"),
1063        Value::Float(x) => format!("{x}"),
1064        // v4.9: JSON renders identically to Text — both are raw UTF-8.
1065        Value::Text(s) | Value::Json(s) => s.clone(),
1066        Value::Bool(b) => (if *b { "true" } else { "false" }).into(),
1067        Value::Vector(v) => {
1068            let cells: Vec<String> = v.iter().map(|x| format!("{x}")).collect();
1069            format!("[{}]", cells.join(", "))
1070        }
1071        // v6.0.1: render SQ8 cells dequantised, so SELECT output
1072        // matches the pgvector wire shape clients expect. The
1073        // recall envelope already absorbs the ≤ (max-min)/255/2
1074        // dequantisation error.
1075        Value::Sq8Vector(q) => {
1076            let cells: Vec<String> = spg_storage::quantize::dequantize(q)
1077                .iter()
1078                .map(|x| format!("{x}"))
1079                .collect();
1080            format!("[{}]", cells.join(", "))
1081        }
1082        // v6.0.3: HalfVector cells dequantise bit-exactly to f32
1083        // for SELECT output.
1084        Value::HalfVector(h) => {
1085            let cells: Vec<String> = h.to_f32_vec().iter().map(|x| format!("{x}")).collect();
1086            format!("[{}]", cells.join(", "))
1087        }
1088        Value::Numeric { scaled, scale } => format_numeric(*scaled, *scale),
1089        Value::Date(d) => format_date(*d),
1090        Value::Timestamp(t) => format_timestamp(*t),
1091        Value::Interval { months, micros } => format_interval(*months, *micros),
1092        Value::Null => "NULL".into(),
1093        // v7.5.0 — #[non_exhaustive] fallback for future Value variants.
1094        _ => format!("{v:?}"),
1095    }
1096}
1097
1098/// Render a `Date` (days since epoch) as `YYYY-MM-DD`. Negative values
1099/// for pre-1970 dates render with a leading `-` on the year.
1100pub fn format_date(days: i32) -> String {
1101    let (y, m, d) = civil_from_days(days);
1102    format!("{y:04}-{m:02}-{d:02}")
1103}
1104
1105/// Render a `Timestamp` (microseconds since epoch) as
1106/// `YYYY-MM-DD HH:MM:SS[.fff...]`. Trailing-zero fractional digits are
1107/// dropped; a whole-second value has no fractional part.
1108pub fn format_timestamp(micros: i64) -> String {
1109    const MICROS_PER_DAY: i64 = 86_400_000_000;
1110    // Split into day + intra-day part with proper floor division so
1111    // negative timestamps render right too.
1112    let days = micros.div_euclid(MICROS_PER_DAY);
1113    let day_micros = micros.rem_euclid(MICROS_PER_DAY);
1114    let day_i32 = i32::try_from(days).unwrap_or(i32::MAX);
1115    let (y, m, d) = civil_from_days(day_i32);
1116    let secs = day_micros / 1_000_000;
1117    let frac = day_micros % 1_000_000;
1118    let hh = secs / 3600;
1119    let mm = (secs / 60) % 60;
1120    let ss = secs % 60;
1121    if frac == 0 {
1122        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}")
1123    } else {
1124        // Strip trailing zeros from the 6-digit fractional component.
1125        let raw = format!("{frac:06}");
1126        let trimmed = raw.trim_end_matches('0');
1127        format!("{y:04}-{m:02}-{d:02} {hh:02}:{mm:02}:{ss:02}.{trimmed}")
1128    }
1129}
1130
1131/// Howard Hinnant's `civil_from_days` — converts days since the Unix
1132/// epoch back to a proleptic-Gregorian (year, month, day) triple. Both
1133/// directions of this calendar conversion live in `eval.rs` so the
1134/// engine never reaches for `std` time facilities.
1135#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1136fn civil_from_days(days: i32) -> (i32, u32, u32) {
1137    let z = i64::from(days) + 719_468;
1138    let era = z.div_euclid(146_097);
1139    // doe ∈ [0, 146_097); fits in u32 with room to spare. Same for
1140    // every other quantity below — `as u32` truncations are safe by
1141    // construction.
1142    let doe = (z - era * 146_097) as u32;
1143    let yoe = (doe.saturating_sub(doe / 1460) + doe / 36524 - doe / 146_096) / 365;
1144    let y_base = i64::from(yoe) + era * 400;
1145    let doy = doe.saturating_sub(365 * yoe + yoe / 4 - yoe / 100);
1146    let mp = (5 * doy + 2) / 153;
1147    let d = doy.saturating_sub((153 * mp + 2) / 5) + 1;
1148    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1149    let y = if m <= 2 { y_base + 1 } else { y_base };
1150    (y as i32, m, d)
1151}
1152
1153/// Inverse of `civil_from_days` — converts (year, month, day) to days
1154/// since 1970-01-01. Out-of-range months / days saturate.
1155#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1156pub fn days_from_civil(y: i32, m: u32, d: u32) -> i32 {
1157    let y_adj = if m <= 2 {
1158        i64::from(y) - 1
1159    } else {
1160        i64::from(y)
1161    };
1162    let era = y_adj.div_euclid(400);
1163    let yoe = (y_adj - era * 400) as u32;
1164    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d.saturating_sub(1);
1165    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1166    let total = era * 146_097 + i64::from(doe) - 719_468;
1167    i32::try_from(total).unwrap_or(i32::MAX)
1168}
1169
1170/// Parse `YYYY-MM-DD` into a `Date` (days since Unix epoch). Returns
1171/// `None` on shape / numeric failure; the engine surfaces that as a
1172/// `TypeMismatch` with the original text included.
1173pub fn parse_date_literal(s: &str) -> Option<i32> {
1174    let bytes = s.as_bytes();
1175    if bytes.len() != 10 || bytes[4] != b'-' || bytes[7] != b'-' {
1176        return None;
1177    }
1178    let y: i32 = s[0..4].parse().ok()?;
1179    let m: u32 = s[5..7].parse().ok()?;
1180    let d: u32 = s[8..10].parse().ok()?;
1181    if !(1..=12).contains(&m) || !(1..=31).contains(&d) {
1182        return None;
1183    }
1184    Some(days_from_civil(y, m, d))
1185}
1186
1187/// Parse `YYYY-MM-DD[ HH:MM:SS[.ffffff]]` into a `Timestamp`
1188/// (microseconds since Unix epoch). The time portion is optional;
1189/// missing → midnight. The fractional portion accepts 1–6 digits and
1190/// pads with zeros to microseconds.
1191pub fn parse_timestamp_literal(s: &str) -> Option<i64> {
1192    let trimmed = s.trim();
1193    let (date_part, time_part) = match trimmed.find([' ', 'T']) {
1194        Some(i) => (&trimmed[..i], Some(&trimmed[i + 1..])),
1195        None => (trimmed, None),
1196    };
1197    let days = parse_date_literal(date_part)?;
1198    let day_micros = match time_part {
1199        None => 0,
1200        Some(t) => parse_time_of_day_micros(t)?,
1201    };
1202    Some(i64::from(days) * 86_400_000_000 + day_micros)
1203}
1204
1205fn parse_time_of_day_micros(t: &str) -> Option<i64> {
1206    let (time, frac_str) = match t.split_once('.') {
1207        Some((a, b)) => (a, Some(b)),
1208        None => (t, None),
1209    };
1210    let bytes = time.as_bytes();
1211    if bytes.len() != 8 || bytes[2] != b':' || bytes[5] != b':' {
1212        return None;
1213    }
1214    let hh: i64 = time[0..2].parse().ok()?;
1215    let mm: i64 = time[3..5].parse().ok()?;
1216    let ss: i64 = time[6..8].parse().ok()?;
1217    if !(0..24).contains(&hh) || !(0..60).contains(&mm) || !(0..60).contains(&ss) {
1218        return None;
1219    }
1220    let frac_micros: i64 = match frac_str {
1221        None => 0,
1222        Some(f) => {
1223            // Pad right with zeros to 6 digits, then truncate extras.
1224            if f.is_empty() || f.len() > 9 {
1225                return None;
1226            }
1227            let mut padded = String::with_capacity(6);
1228            padded.push_str(&f[..f.len().min(6)]);
1229            while padded.len() < 6 {
1230                padded.push('0');
1231            }
1232            padded.parse().ok()?
1233        }
1234    };
1235    Some(((hh * 3600 + mm * 60 + ss) * 1_000_000) + frac_micros)
1236}
1237
1238/// Render an `Interval { months, micros }` in a PG-ish shape. The output
1239/// mirrors `psql`'s text format: years/months from the months part,
1240/// days/HH:MM:SS[.frac] from the microsecond part. Empty parts are
1241/// omitted; an all-zero interval renders as `0`.
1242pub fn format_interval(months: i32, micros: i64) -> String {
1243    const MICROS_PER_DAY: i64 = 86_400_000_000;
1244    let mut parts: Vec<String> = Vec::new();
1245    let years = months / 12;
1246    let mons = months % 12;
1247    // PG renders the unit in the singular only for `+1`; `-1` and any
1248    // other value pluralise. Helper closes over that rule.
1249    let unit = |n: i64, singular: &'static str, plural: &'static str| -> &'static str {
1250        if n == 1 { singular } else { plural }
1251    };
1252    if years != 0 {
1253        parts.push(format!(
1254            "{years} {}",
1255            unit(i64::from(years), "year", "years")
1256        ));
1257    }
1258    if mons != 0 {
1259        parts.push(format!("{mons} {}", unit(i64::from(mons), "mon", "mons")));
1260    }
1261    let days = micros / MICROS_PER_DAY;
1262    let mut rem = micros % MICROS_PER_DAY;
1263    if days != 0 {
1264        parts.push(format!("{days} {}", unit(days, "day", "days")));
1265    }
1266    if rem != 0 {
1267        let neg = rem < 0;
1268        if neg {
1269            rem = -rem;
1270        }
1271        let secs = rem / 1_000_000;
1272        let frac = rem % 1_000_000;
1273        let hh = secs / 3600;
1274        let mm = (secs / 60) % 60;
1275        let ss = secs % 60;
1276        let sign = if neg { "-" } else { "" };
1277        if frac == 0 {
1278            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}"));
1279        } else {
1280            let raw = format!("{frac:06}");
1281            let trimmed = raw.trim_end_matches('0');
1282            parts.push(format!("{sign}{hh:02}:{mm:02}:{ss:02}.{trimmed}"));
1283        }
1284    }
1285    if parts.is_empty() {
1286        "0".into()
1287    } else {
1288        parts.join(" ")
1289    }
1290}
1291
1292/// Add `months` (signed) to a `(year, month, day)` triple using PG's
1293/// clamp-to-last-day rule (so `'2024-01-31' + 1 month` → `'2024-02-29'`).
1294fn add_months_to_civil(y: i32, m: u32, d: u32, months: i32) -> (i32, u32, u32) {
1295    let total_months = i64::from(y) * 12 + i64::from(m) - 1 + i64::from(months);
1296    let new_year = i32::try_from(total_months.div_euclid(12)).unwrap_or(i32::MAX);
1297    let new_month_zero = total_months.rem_euclid(12);
1298    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1299    let new_month = (new_month_zero as u32) + 1;
1300    let max_day = days_in_month(new_year, new_month);
1301    (new_year, new_month, d.min(max_day))
1302}
1303
1304const fn days_in_month(y: i32, m: u32) -> u32 {
1305    match m {
1306        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
1307        2 => {
1308            // Proleptic Gregorian leap rule.
1309            if y.rem_euclid(4) == 0 && (y.rem_euclid(100) != 0 || y.rem_euclid(400) == 0) {
1310                29
1311            } else {
1312                28
1313            }
1314        }
1315        // 4 / 6 / 9 / 11 plus any out-of-range month (callers normalise
1316        // first, but be defensive) get the 30-day fallback.
1317        _ => 30,
1318    }
1319}
1320
1321/// Render a `Numeric { scaled, scale }` as its decimal text form.
1322/// Negative `scaled` prepends `-` to the absolute value's digits; the
1323/// integer / fractional split is by character count, padding the
1324/// fractional side with leading zeros to exactly `scale` chars.
1325pub fn format_numeric(scaled: i128, scale: u8) -> String {
1326    if scale == 0 {
1327        return format!("{scaled}");
1328    }
1329    let negative = scaled < 0;
1330    let mag_str = scaled.unsigned_abs().to_string();
1331    let mag_bytes = mag_str.as_bytes();
1332    let scale_u = scale as usize;
1333    let mut out = String::with_capacity(mag_str.len() + 3);
1334    if negative {
1335        out.push('-');
1336    }
1337    if mag_bytes.len() <= scale_u {
1338        out.push('0');
1339        out.push('.');
1340        for _ in mag_bytes.len()..scale_u {
1341            out.push('0');
1342        }
1343        out.push_str(&mag_str);
1344    } else {
1345        let split = mag_bytes.len() - scale_u;
1346        out.push_str(&mag_str[..split]);
1347        out.push('.');
1348        out.push_str(&mag_str[split..]);
1349    }
1350    out
1351}
1352
1353fn cast_numeric_to_int(v: Value) -> Result<Value, EvalError> {
1354    match v {
1355        Value::Int(n) => Ok(Value::Int(n)),
1356        Value::BigInt(n) => i32::try_from(n)
1357            .map(Value::Int)
1358            .map_err(|_| EvalError::TypeMismatch {
1359                detail: format!("bigint {n} does not fit in int"),
1360            }),
1361        #[allow(clippy::cast_possible_truncation)]
1362        Value::Float(x) => Ok(Value::Int(x as i32)),
1363        Value::Text(s) => {
1364            s.trim()
1365                .parse::<i32>()
1366                .map(Value::Int)
1367                .map_err(|_| EvalError::TypeMismatch {
1368                    detail: format!("cannot parse {s:?} as int"),
1369                })
1370        }
1371        Value::Bool(b) => Ok(Value::Int(i32::from(b))),
1372        other => Err(EvalError::TypeMismatch {
1373            detail: format!("cannot cast {:?} to int", other.data_type()),
1374        }),
1375    }
1376}
1377
1378fn cast_numeric_to_bigint(v: Value) -> Result<Value, EvalError> {
1379    match v {
1380        Value::Int(n) => Ok(Value::BigInt(i64::from(n))),
1381        Value::BigInt(n) => Ok(Value::BigInt(n)),
1382        #[allow(clippy::cast_possible_truncation)]
1383        Value::Float(x) => Ok(Value::BigInt(x as i64)),
1384        Value::Text(s) => {
1385            s.trim()
1386                .parse::<i64>()
1387                .map(Value::BigInt)
1388                .map_err(|_| EvalError::TypeMismatch {
1389                    detail: format!("cannot parse {s:?} as bigint"),
1390                })
1391        }
1392        Value::Bool(b) => Ok(Value::BigInt(i64::from(b))),
1393        other => Err(EvalError::TypeMismatch {
1394            detail: format!("cannot cast {:?} to bigint", other.data_type()),
1395        }),
1396    }
1397}
1398
1399fn cast_numeric_to_float(v: Value) -> Result<Value, EvalError> {
1400    match v {
1401        Value::Int(n) => Ok(Value::Float(f64::from(n))),
1402        #[allow(clippy::cast_precision_loss)]
1403        Value::BigInt(n) => Ok(Value::Float(n as f64)),
1404        Value::Float(x) => Ok(Value::Float(x)),
1405        Value::Text(s) => {
1406            s.trim()
1407                .parse::<f64>()
1408                .map(Value::Float)
1409                .map_err(|_| EvalError::TypeMismatch {
1410                    detail: format!("cannot parse {s:?} as float"),
1411                })
1412        }
1413        other => Err(EvalError::TypeMismatch {
1414            detail: format!("cannot cast {:?} to float", other.data_type()),
1415        }),
1416    }
1417}
1418
1419fn cast_to_bool(v: Value) -> Result<Value, EvalError> {
1420    match v {
1421        Value::Bool(b) => Ok(Value::Bool(b)),
1422        Value::Int(n) => Ok(Value::Bool(n != 0)),
1423        Value::BigInt(n) => Ok(Value::Bool(n != 0)),
1424        Value::Text(s) => {
1425            let lo = s.trim().to_ascii_lowercase();
1426            match lo.as_str() {
1427                "true" | "t" | "yes" | "y" | "1" | "on" => Ok(Value::Bool(true)),
1428                "false" | "f" | "no" | "n" | "0" | "off" => Ok(Value::Bool(false)),
1429                _ => Err(EvalError::TypeMismatch {
1430                    detail: format!("cannot parse {s:?} as bool"),
1431                }),
1432            }
1433        }
1434        other => Err(EvalError::TypeMismatch {
1435            detail: format!("cannot cast {:?} to bool", other.data_type()),
1436        }),
1437    }
1438}
1439
1440/// Parse a `Value::Text("[1.0, 2.0, 3.0]")` into a `Value::Vector(..)`. Mirrors
1441/// pgvector's `'[..]'::vector` cast. NULL casts as NULL.
1442pub fn cast_to_vector(v: Value) -> Result<Value, EvalError> {
1443    match v {
1444        Value::Null => Ok(Value::Null),
1445        Value::Vector(v) => Ok(Value::Vector(v)),
1446        Value::Text(s) => parse_vector_text(&s)
1447            .map(Value::Vector)
1448            .ok_or(EvalError::TypeMismatch {
1449                detail: format!("cannot parse {s:?} as a vector literal"),
1450            }),
1451        other => Err(EvalError::TypeMismatch {
1452            detail: format!("::vector requires text input, got {:?}", other.data_type()),
1453        }),
1454    }
1455}
1456
1457/// Parse `"[1.0, 2.0, -3]"` into `Vec<f32>`. Returns `None` on malformed input.
1458fn parse_vector_text(s: &str) -> Option<Vec<f32>> {
1459    let trimmed = s.trim();
1460    let inner = trimmed.strip_prefix('[')?.strip_suffix(']')?;
1461    let trimmed_inner = inner.trim();
1462    if trimmed_inner.is_empty() {
1463        return Some(Vec::new());
1464    }
1465    let mut out = Vec::new();
1466    for part in trimmed_inner.split(',') {
1467        let f: f32 = part.trim().parse().ok()?;
1468        out.push(f);
1469    }
1470    Some(out)
1471}
1472
1473fn literal_to_value(l: &Literal) -> Value {
1474    match l {
1475        Literal::Integer(n) => {
1476            if let Ok(small) = i32::try_from(*n) {
1477                Value::Int(small)
1478            } else {
1479                Value::BigInt(*n)
1480            }
1481        }
1482        Literal::Float(x) => Value::Float(*x),
1483        Literal::String(s) => Value::Text(s.clone()),
1484        Literal::Vector(v) => Value::Vector(v.clone()),
1485        Literal::Bool(b) => Value::Bool(*b),
1486        Literal::Null => Value::Null,
1487        Literal::Interval { months, micros, .. } => Value::Interval {
1488            months: *months,
1489            micros: *micros,
1490        },
1491    }
1492}
1493
1494fn resolve_column(c: &ColumnName, row: &Row, ctx: &EvalContext<'_>) -> Result<Value, EvalError> {
1495    if let Some(q) = &c.qualifier {
1496        // Multi-table evaluation (joins): the synthesised schema uses
1497        // composite column names "alias.column" so we look that up
1498        // directly. Falls back to the single-table case below if the
1499        // composite isn't present.
1500        let composite = alloc::format!("{q}.{name}", name = c.name);
1501        if let Some(pos) = ctx.columns.iter().position(|s| s.name == composite) {
1502            return Ok(row.values[pos].clone());
1503        }
1504        let expected = ctx.table_alias.ok_or_else(|| EvalError::UnknownQualifier {
1505            qualifier: q.clone(),
1506        })?;
1507        if q != expected {
1508            return Err(EvalError::UnknownQualifier {
1509                qualifier: q.clone(),
1510            });
1511        }
1512    }
1513    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
1514        return Ok(row.values[pos].clone());
1515    }
1516    // Bare-name fallback for joined schemas: match any single composite
1517    // column ending in ".<name>"; ambiguity is an error.
1518    let suffix = alloc::format!(".{name}", name = c.name);
1519    let mut matches = ctx
1520        .columns
1521        .iter()
1522        .enumerate()
1523        .filter(|(_, s)| s.name.ends_with(&suffix));
1524    let first = matches.next();
1525    let extra = matches.next();
1526    match (first, extra) {
1527        (Some((pos, _)), None) => Ok(row.values[pos].clone()),
1528        (Some(_), Some(_)) => Err(EvalError::TypeMismatch {
1529            detail: alloc::format!("ambiguous column reference: {}", c.name),
1530        }),
1531        _ => Err(EvalError::ColumnNotFound {
1532            name: c.name.clone(),
1533        }),
1534    }
1535}
1536
1537fn apply_unary(op: UnOp, v: Value) -> Result<Value, EvalError> {
1538    match (op, v) {
1539        (_, Value::Null) => Ok(Value::Null),
1540        (UnOp::Neg, Value::Int(n)) => {
1541            n.checked_neg()
1542                .map(Value::Int)
1543                .ok_or(EvalError::TypeMismatch {
1544                    detail: "integer overflow on unary -".into(),
1545                })
1546        }
1547        (UnOp::Neg, Value::BigInt(n)) => {
1548            n.checked_neg()
1549                .map(Value::BigInt)
1550                .ok_or(EvalError::TypeMismatch {
1551                    detail: "bigint overflow on unary -".into(),
1552                })
1553        }
1554        (UnOp::Neg, Value::Float(x)) => Ok(Value::Float(-x)),
1555        (UnOp::Neg, other) => Err(EvalError::TypeMismatch {
1556            detail: format!("unary - applied to {:?}", other.data_type()),
1557        }),
1558        (UnOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
1559        (UnOp::Not, other) => Err(EvalError::TypeMismatch {
1560            detail: format!("NOT applied to {:?}", other.data_type()),
1561        }),
1562    }
1563}
1564
1565/// v7.9.27b — true when two values are "not distinct" per PG:
1566/// both NULL counts as equal; otherwise reduces to regular Eq.
1567fn values_not_distinct(l: &Value, r: &Value) -> bool {
1568    match (l, r) {
1569        (Value::Null, Value::Null) => true,
1570        (Value::Null, _) | (_, Value::Null) => false,
1571        _ => l == r,
1572    }
1573}
1574
1575fn apply_binary(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1576    // SQL three-valued logic for AND / OR with NULL is special — handle before
1577    // the general NULL-propagation rule.
1578    if let BinOp::And = op {
1579        return and_3vl(l, r);
1580    }
1581    if let BinOp::Or = op {
1582        return or_3vl(l, r);
1583    }
1584    // v7.9.27b — IS [NOT] DISTINCT FROM. NULL-safe equality:
1585    // `NULL IS NOT DISTINCT FROM NULL` → true. mailrs pg_dump.
1586    if let BinOp::IsNotDistinctFrom = op {
1587        return Ok(Value::Bool(values_not_distinct(&l, &r)));
1588    }
1589    if let BinOp::IsDistinctFrom = op {
1590        return Ok(Value::Bool(!values_not_distinct(&l, &r)));
1591    }
1592    // Everything else: any NULL operand → NULL.
1593    if l.is_null() || r.is_null() {
1594        return Ok(Value::Null);
1595    }
1596    // NUMERIC arithmetic and comparisons run in fixed-point; promote
1597    // integers to a common NUMERIC scale and stay in i128 throughout.
1598    if matches!(l, Value::Numeric { .. }) || matches!(r, Value::Numeric { .. }) {
1599        return apply_binary_numeric(op, l, r);
1600    }
1601    // Date / Timestamp arithmetic. PG semantics:
1602    //   * date + int      → date  (int is days)
1603    //   * int + date      → date
1604    //   * date - int      → date
1605    //   * date - date     → int   (days, signed)
1606    //   * timestamp - timestamp → bigint (microseconds, signed)
1607    // Other date/time math (`timestamp + int`, INTERVAL) lands later.
1608    if let Some(result) = apply_binary_calendar(op, &l, &r)? {
1609        return Ok(result);
1610    }
1611    match op {
1612        BinOp::Add => arith(l, r, i64::checked_add, |a, b| a + b, "+"),
1613        BinOp::Sub => arith(l, r, i64::checked_sub, |a, b| a - b, "-"),
1614        BinOp::Mul => arith(l, r, i64::checked_mul, |a, b| a * b, "*"),
1615        BinOp::Div => div_op(l, r),
1616        BinOp::L2Distance => l2_distance(l, r),
1617        BinOp::InnerProduct => inner_product(l, r),
1618        BinOp::CosineDistance => cosine_distance(l, r),
1619        BinOp::Concat => Ok(text_concat(&l, &r)),
1620        BinOp::JsonGet => crate::json::path_get(&l, &r, false),
1621        BinOp::JsonGetText => crate::json::path_get(&l, &r, true),
1622        BinOp::JsonGetPath => crate::json::path_walk(&l, &r, false),
1623        BinOp::JsonGetPathText => crate::json::path_walk(&l, &r, true),
1624        BinOp::JsonContains => crate::json::contains(&l, &r),
1625        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1626            compare(op, &l, &r)
1627        }
1628        BinOp::And
1629        | BinOp::Or
1630        | BinOp::IsDistinctFrom
1631        | BinOp::IsNotDistinctFrom => unreachable!("handled above"),
1632    }
1633}
1634
1635/// Calendar arithmetic. Returns `Some(value)` when the operand pair
1636/// is a date/time combo this function understands, `None` to let the
1637/// caller fall through to the regular numeric / text paths.
1638fn apply_binary_calendar(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
1639    let int_value = |v: &Value| -> Option<i64> {
1640        match v {
1641            Value::SmallInt(n) => Some(i64::from(*n)),
1642            Value::Int(n) => Some(i64::from(*n)),
1643            Value::BigInt(n) => Some(*n),
1644            _ => None,
1645        }
1646    };
1647    // Most-specific cases first — DATE-DATE / TS-TS subtraction before
1648    // DATE-integer subtraction, otherwise the latter swallows the
1649    // former with an `int_value(Date) = None` no-op fall-through.
1650    match (l, r) {
1651        (Value::Date(a), Value::Date(b)) if op == BinOp::Sub => {
1652            return Ok(Some(Value::BigInt(i64::from(*a) - i64::from(*b))));
1653        }
1654        (Value::Timestamp(a), Value::Timestamp(b)) if op == BinOp::Sub => {
1655            let delta = a.checked_sub(*b).ok_or(EvalError::TypeMismatch {
1656                detail: "TIMESTAMP - TIMESTAMP overflows i64 microseconds".into(),
1657            })?;
1658            return Ok(Some(Value::BigInt(delta)));
1659        }
1660        _ => {}
1661    }
1662    // INTERVAL arithmetic. PG: timestamp ± interval → timestamp,
1663    // date ± interval → date (if interval is pure days/months with no
1664    // sub-day component) else timestamp, interval ± interval → interval.
1665    if let Some(out) = apply_binary_interval(op, l, r)? {
1666        return Ok(Some(out));
1667    }
1668    match (l, r) {
1669        (Value::Date(d), other) if op == BinOp::Add => {
1670            if let Some(n) = int_value(other) {
1671                let days = i64::from(*d).saturating_add(n);
1672                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1673                    detail: "DATE + integer overflows DATE range".into(),
1674                })?;
1675                return Ok(Some(Value::Date(days32)));
1676            }
1677        }
1678        (other, Value::Date(d)) if op == BinOp::Add => {
1679            if let Some(n) = int_value(other) {
1680                let days = i64::from(*d).saturating_add(n);
1681                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1682                    detail: "integer + DATE overflows DATE range".into(),
1683                })?;
1684                return Ok(Some(Value::Date(days32)));
1685            }
1686        }
1687        (Value::Date(d), other) if op == BinOp::Sub => {
1688            if let Some(n) = int_value(other) {
1689                let days = i64::from(*d).saturating_sub(n);
1690                let days32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1691                    detail: "DATE - integer overflows DATE range".into(),
1692                })?;
1693                return Ok(Some(Value::Date(days32)));
1694            }
1695        }
1696        _ => {}
1697    }
1698    Ok(None)
1699}
1700
1701/// INTERVAL-aware binary ops. Recognises:
1702///   timestamp ± interval → timestamp
1703///   date ± interval      → date (if interval is integral days/months only)
1704///                       → timestamp (if interval has sub-day micros)
1705///   interval ± interval  → interval
1706/// Commutative for `+`. Returns `None` for unrecognised operand pairs so
1707/// the caller can fall through.
1708fn apply_binary_interval(op: BinOp, l: &Value, r: &Value) -> Result<Option<Value>, EvalError> {
1709    // Normalise so the interval (if any) is always on the right for Add;
1710    // Sub stays left-handed because it isn't commutative.
1711    let (lhs, rhs, sign): (&Value, &Value, i64) = match (l, r, op) {
1712        (Value::Interval { .. }, _, BinOp::Add) => (r, l, 1),
1713        (_, Value::Interval { .. }, BinOp::Add) => (l, r, 1),
1714        (_, Value::Interval { .. }, BinOp::Sub) => (l, r, -1),
1715        _ => return Ok(None),
1716    };
1717    let Value::Interval {
1718        months: rhs_months,
1719        micros: rhs_us,
1720    } = rhs
1721    else {
1722        unreachable!("rhs guaranteed to be Interval by the match above");
1723    };
1724    let signed_months = i64::from(*rhs_months) * sign;
1725    let signed_micros = rhs_us.checked_mul(sign).ok_or(EvalError::TypeMismatch {
1726        detail: "INTERVAL micros overflows on negation".into(),
1727    })?;
1728    match lhs {
1729        Value::Timestamp(t) => Ok(Some(Value::Timestamp(add_interval_to_micros(
1730            *t,
1731            signed_months,
1732            signed_micros,
1733        )?))),
1734        Value::Date(d) => {
1735            // Date + interval stays a date when the interval has zero
1736            // sub-day microseconds; otherwise promote to TIMESTAMP at
1737            // midnight of the (months-shifted) date first.
1738            let day_aligned = signed_micros.rem_euclid(86_400_000_000) == 0;
1739            if day_aligned {
1740                let micros_per_day = 86_400_000_000_i64;
1741                let days_delta = signed_micros / micros_per_day;
1742                let shifted = shift_date_by_months(*d, signed_months)?;
1743                let new_days =
1744                    i64::from(shifted)
1745                        .checked_add(days_delta)
1746                        .ok_or(EvalError::TypeMismatch {
1747                            detail: "DATE ± INTERVAL overflows DATE range".into(),
1748                        })?;
1749                let days32 = i32::try_from(new_days).map_err(|_| EvalError::TypeMismatch {
1750                    detail: "DATE ± INTERVAL overflows DATE range".into(),
1751                })?;
1752                Ok(Some(Value::Date(days32)))
1753            } else {
1754                let base =
1755                    i64::from(*d)
1756                        .checked_mul(86_400_000_000)
1757                        .ok_or(EvalError::TypeMismatch {
1758                            detail: "DATE → TIMESTAMP lift overflows for INTERVAL math".into(),
1759                        })?;
1760                Ok(Some(Value::Timestamp(add_interval_to_micros(
1761                    base,
1762                    signed_months,
1763                    signed_micros,
1764                )?)))
1765            }
1766        }
1767        Value::Interval {
1768            months: lhs_months,
1769            micros: lhs_us,
1770        } => {
1771            let new_months = i64::from(*lhs_months)
1772                .checked_add(signed_months)
1773                .and_then(|n| i32::try_from(n).ok())
1774                .ok_or(EvalError::TypeMismatch {
1775                    detail: "INTERVAL ± INTERVAL months overflows i32".into(),
1776                })?;
1777            let new_micros = lhs_us
1778                .checked_add(signed_micros)
1779                .ok_or(EvalError::TypeMismatch {
1780                    detail: "INTERVAL ± INTERVAL micros overflows i64".into(),
1781                })?;
1782            Ok(Some(Value::Interval {
1783                months: new_months,
1784                micros: new_micros,
1785            }))
1786        }
1787        _ => Err(EvalError::TypeMismatch {
1788            detail: format!(
1789                "operator {op:?} not defined for {:?} and INTERVAL",
1790                lhs.data_type()
1791            ),
1792        }),
1793    }
1794}
1795
1796/// Shift a `Date` by a signed number of months using the PG clamp rule.
1797fn shift_date_by_months(d: i32, months: i64) -> Result<i32, EvalError> {
1798    let (y, m, day) = civil_from_days(d);
1799    let months_i32 = i32::try_from(months).map_err(|_| EvalError::TypeMismatch {
1800        detail: "INTERVAL months delta out of i32 range".into(),
1801    })?;
1802    let (ny, nm, nd) = add_months_to_civil(y, m, day, months_i32);
1803    Ok(days_from_civil(ny, nm, nd))
1804}
1805
1806/// Add (months, micros) to a `Timestamp` (microseconds since epoch).
1807/// Months part is applied through civil calendar with clamp-to-last-day;
1808/// micros part is plain i64 addition with overflow guard.
1809fn add_interval_to_micros(t: i64, months: i64, micros: i64) -> Result<i64, EvalError> {
1810    let mut out = t;
1811    if months != 0 {
1812        const MICROS_PER_DAY: i64 = 86_400_000_000;
1813        let days = out.div_euclid(MICROS_PER_DAY);
1814        let day_micros = out.rem_euclid(MICROS_PER_DAY);
1815        let day_i32 = i32::try_from(days).map_err(|_| EvalError::TypeMismatch {
1816            detail: "TIMESTAMP day component out of i32 range for INTERVAL months math".into(),
1817        })?;
1818        let shifted_days = shift_date_by_months(day_i32, months)?;
1819        out = i64::from(shifted_days)
1820            .checked_mul(MICROS_PER_DAY)
1821            .and_then(|n| n.checked_add(day_micros))
1822            .ok_or(EvalError::TypeMismatch {
1823                detail: "TIMESTAMP ± INTERVAL months overflows i64 microseconds".into(),
1824            })?;
1825    }
1826    out.checked_add(micros).ok_or(EvalError::TypeMismatch {
1827        detail: "TIMESTAMP ± INTERVAL micros overflows i64".into(),
1828    })
1829}
1830
1831/// Dispatch for any binary op when at least one operand is NUMERIC.
1832/// Other-side integers / floats are promoted to a NUMERIC at a common
1833/// scale; all add / sub / mul / div / compare paths stay in i128.
1834#[allow(clippy::needless_pass_by_value)] // mirrors `apply_binary`'s by-value calling convention
1835fn apply_binary_numeric(op: BinOp, l: Value, r: Value) -> Result<Value, EvalError> {
1836    // Float still wins — Numeric + Float coerces both to f64 and runs
1837    // through the float path. PG demotes Numeric to float in this mix
1838    // too (the documented behaviour for `numeric + double precision`).
1839    let float_path = matches!(l, Value::Float(_)) || matches!(r, Value::Float(_));
1840    if float_path {
1841        let af = as_f64(&l)?;
1842        let bf = as_f64(&r)?;
1843        return match op {
1844            BinOp::Add => Ok(Value::Float(af + bf)),
1845            BinOp::Sub => Ok(Value::Float(af - bf)),
1846            BinOp::Mul => Ok(Value::Float(af * bf)),
1847            BinOp::Div => {
1848                if bf == 0.0 {
1849                    Err(EvalError::DivisionByZero)
1850                } else {
1851                    Ok(Value::Float(af / bf))
1852                }
1853            }
1854            BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1855                let ord = af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
1856                    detail: "NaN in NUMERIC/Float comparison".into(),
1857                })?;
1858                Ok(Value::Bool(cmp_to_bool(op, ord)))
1859            }
1860            BinOp::Concat => Ok(text_concat(&l, &r)),
1861            other => Err(EvalError::TypeMismatch {
1862                detail: format!("operator {other:?} not defined for NUMERIC and Float"),
1863            }),
1864        };
1865    }
1866    // Promote integer ↔ numeric to a shared scale (max of both sides).
1867    let (a, sa) = numeric_or_widen(&l).ok_or_else(|| EvalError::TypeMismatch {
1868        detail: format!("NUMERIC op against non-numeric {:?}", l.data_type()),
1869    })?;
1870    let (b, sb) = numeric_or_widen(&r).ok_or_else(|| EvalError::TypeMismatch {
1871        detail: format!("NUMERIC op against non-numeric {:?}", r.data_type()),
1872    })?;
1873    match op {
1874        BinOp::Add | BinOp::Sub => {
1875            let target_scale = sa.max(sb);
1876            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
1877                detail: "NUMERIC overflow on rescale".into(),
1878            })?;
1879            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
1880                detail: "NUMERIC overflow on rescale".into(),
1881            })?;
1882            let r = match op {
1883                BinOp::Add => lhs.checked_add(rhs),
1884                BinOp::Sub => lhs.checked_sub(rhs),
1885                _ => unreachable!(),
1886            }
1887            .ok_or(EvalError::TypeMismatch {
1888                detail: "NUMERIC overflow on +/-".into(),
1889            })?;
1890            Ok(Value::Numeric {
1891                scaled: r,
1892                scale: target_scale,
1893            })
1894        }
1895        BinOp::Mul => {
1896            let scaled = a.checked_mul(b).ok_or(EvalError::TypeMismatch {
1897                detail: "NUMERIC overflow on *".into(),
1898            })?;
1899            Ok(Value::Numeric {
1900                scaled,
1901                scale: sa.saturating_add(sb),
1902            })
1903        }
1904        BinOp::Div => {
1905            if b == 0 {
1906                return Err(EvalError::DivisionByZero);
1907            }
1908            // Result scale: keep the wider operand's scale. Pre-scale
1909            // the numerator so the integer division retains that many
1910            // fractional digits. Round half-away-from-zero.
1911            let target_scale = sa.max(sb);
1912            // Numerator effective scale becomes sa + target_scale; we
1913            // bring it up to (target_scale + sb) so the divisor's scale
1914            // cancels cleanly.
1915            let bump = pow10_i128(target_scale.saturating_add(sb).saturating_sub(sa));
1916            let num = a.checked_mul(bump).ok_or(EvalError::TypeMismatch {
1917                detail: "NUMERIC overflow on / scaling".into(),
1918            })?;
1919            let half = if b >= 0 { b / 2 } else { -(b / 2) };
1920            let adj = if (num >= 0) == (b >= 0) {
1921                num + half
1922            } else {
1923                num - half
1924            };
1925            Ok(Value::Numeric {
1926                scaled: adj / b,
1927                scale: target_scale,
1928            })
1929        }
1930        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq => {
1931            let target_scale = sa.max(sb);
1932            let lhs = rescale(a, sa, target_scale).ok_or(EvalError::TypeMismatch {
1933                detail: "NUMERIC overflow on rescale".into(),
1934            })?;
1935            let rhs = rescale(b, sb, target_scale).ok_or(EvalError::TypeMismatch {
1936                detail: "NUMERIC overflow on rescale".into(),
1937            })?;
1938            Ok(Value::Bool(cmp_to_bool(op, lhs.cmp(&rhs))))
1939        }
1940        BinOp::Concat => Ok(text_concat(&l, &r)),
1941        other => Err(EvalError::TypeMismatch {
1942            detail: format!("operator {other:?} not defined for NUMERIC"),
1943        }),
1944    }
1945}
1946
1947/// Express `v` as a `(scaled_i128, scale)` pair. Plain integers come
1948/// back with `scale=0`; NUMERIC keeps its own scale. Anything else
1949/// returns `None` and the caller raises a type error.
1950fn numeric_or_widen(v: &Value) -> Option<(i128, u8)> {
1951    match v {
1952        Value::Numeric { scaled, scale } => Some((*scaled, *scale)),
1953        Value::Int(n) => Some((i128::from(*n), 0)),
1954        Value::SmallInt(n) => Some((i128::from(*n), 0)),
1955        Value::BigInt(n) => Some((i128::from(*n), 0)),
1956        _ => None,
1957    }
1958}
1959
1960fn rescale(scaled: i128, src: u8, dst: u8) -> Option<i128> {
1961    if src == dst {
1962        return Some(scaled);
1963    }
1964    if dst > src {
1965        scaled.checked_mul(pow10_i128(dst - src))
1966    } else {
1967        let drop = pow10_i128(src - dst);
1968        let half = drop / 2;
1969        let r = if scaled >= 0 {
1970            scaled + half
1971        } else {
1972            scaled - half
1973        };
1974        Some(r / drop)
1975    }
1976}
1977
1978const fn pow10_i128(p: u8) -> i128 {
1979    let mut acc: i128 = 1;
1980    let mut i = 0;
1981    while i < p {
1982        acc *= 10;
1983        i += 1;
1984    }
1985    acc
1986}
1987
1988const fn cmp_to_bool(op: BinOp, ord: core::cmp::Ordering) -> bool {
1989    use core::cmp::Ordering::{Equal, Greater, Less};
1990    match op {
1991        BinOp::Eq => matches!(ord, Equal),
1992        BinOp::NotEq => !matches!(ord, Equal),
1993        BinOp::Lt => matches!(ord, Less),
1994        BinOp::LtEq => matches!(ord, Less | Equal),
1995        BinOp::Gt => matches!(ord, Greater),
1996        BinOp::GtEq => matches!(ord, Greater | Equal),
1997        _ => false,
1998    }
1999}
2000
2001/// SQL `||` string concatenation. Operands are coerced to text via the same
2002/// rule as `::text` cast. NULL propagates (handled above; this function only
2003/// runs with non-NULL operands).
2004fn text_concat(l: &Value, r: &Value) -> Value {
2005    let a = value_to_text(l);
2006    let b = value_to_text(r);
2007    Value::Text(a + &b)
2008}
2009
2010/// pgvector inner-product `<#>`. Returns the *negative* dot product so
2011/// smaller still means more similar — same convention as pgvector.
2012fn inner_product(l: Value, r: Value) -> Result<Value, EvalError> {
2013    let (a, b) = unwrap_vec_pair(l, r, "<#>")?;
2014    let mut dot: f64 = 0.0;
2015    for (x, y) in a.iter().zip(b.iter()) {
2016        dot += f64::from(*x) * f64::from(*y);
2017    }
2018    Ok(Value::Float(-dot))
2019}
2020
2021/// pgvector cosine distance `<=>` — `1 - (a·b) / (‖a‖ ‖b‖)`. A zero-norm
2022/// operand produces NaN (matches pgvector).
2023fn cosine_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2024    let (a, b) = unwrap_vec_pair(l, r, "<=>")?;
2025    let mut dot: f64 = 0.0;
2026    let mut na: f64 = 0.0;
2027    let mut nb: f64 = 0.0;
2028    for (x, y) in a.iter().zip(b.iter()) {
2029        let xf = f64::from(*x);
2030        let yf = f64::from(*y);
2031        dot += xf * yf;
2032        na += xf * xf;
2033        nb += yf * yf;
2034    }
2035    let denom = sqrt_newton(na) * sqrt_newton(nb);
2036    if denom == 0.0 {
2037        return Ok(Value::Float(f64::NAN));
2038    }
2039    Ok(Value::Float(1.0 - dot / denom))
2040}
2041
2042fn unwrap_vec_pair(l: Value, r: Value, op: &str) -> Result<(Vec<f32>, Vec<f32>), EvalError> {
2043    // v6.0.1: SQ8 cells coming through the SQL evaluator are
2044    // dequantised to f32 here so the existing scalar distance
2045    // arithmetic stays intact. HNSW kNN search continues to use
2046    // the asymmetric ADC variant inside `cell_to_query_metric_
2047    // distance` — this path only runs when a vector expression
2048    // lands in the evaluator (full-scan ORDER BY, SELECT
2049    // projection of `v <-> $1`, etc.).
2050    let to_f32 = |v: Value| -> Option<Vec<f32>> {
2051        match v {
2052            Value::Vector(a) => Some(a),
2053            Value::Sq8Vector(q) => Some(spg_storage::quantize::dequantize(&q)),
2054            // v6.0.3: bit-exact dequant for halfvec cells.
2055            Value::HalfVector(h) => Some(h.to_f32_vec()),
2056            _ => None,
2057        }
2058    };
2059    let l_ty = l.data_type();
2060    let r_ty = r.data_type();
2061    match (to_f32(l), to_f32(r)) {
2062        (Some(a), Some(b)) => {
2063            if a.len() != b.len() {
2064                return Err(EvalError::TypeMismatch {
2065                    detail: format!("vector dim mismatch in {op}: {} vs {}", a.len(), b.len()),
2066                });
2067            }
2068            Ok((a, b))
2069        }
2070        _ => Err(EvalError::TypeMismatch {
2071            detail: format!("{op} requires two vectors, got {l_ty:?} and {r_ty:?}"),
2072        }),
2073    }
2074}
2075
2076/// Numeric arithmetic with widening.
2077/// - both `Int` → `Int` (with overflow check)
2078/// - `Int` op `BigInt` (either side) → `BigInt`
2079/// - any `Float` involved → `Float`
2080fn arith(
2081    l: Value,
2082    r: Value,
2083    int_op: impl Fn(i64, i64) -> Option<i64>,
2084    float_op: impl Fn(f64, f64) -> f64,
2085    op_name: &str,
2086) -> Result<Value, EvalError> {
2087    // Widen SmallInt to Int up front so the rest of the arithmetic
2088    // table only deals with Int / BigInt / Float pairs.
2089    let widen = |v: Value| -> Value {
2090        match v {
2091            Value::SmallInt(n) => Value::Int(i32::from(n)),
2092            other => other,
2093        }
2094    };
2095    let l = widen(l);
2096    let r = widen(r);
2097    match (l, r) {
2098        (Value::Int(a), Value::Int(b)) => {
2099            let result = int_op(i64::from(a), i64::from(b)).ok_or(EvalError::TypeMismatch {
2100                detail: format!("integer overflow on {op_name}"),
2101            })?;
2102            if let Ok(small) = i32::try_from(result) {
2103                Ok(Value::Int(small))
2104            } else {
2105                Ok(Value::BigInt(result))
2106            }
2107        }
2108        (Value::Int(a), Value::BigInt(b)) | (Value::BigInt(b), Value::Int(a)) => {
2109            let result = int_op(i64::from(a), b).ok_or(EvalError::TypeMismatch {
2110                detail: format!("bigint overflow on {op_name}"),
2111            })?;
2112            Ok(Value::BigInt(result))
2113        }
2114        (Value::BigInt(a), Value::BigInt(b)) => {
2115            let result = int_op(a, b).ok_or(EvalError::TypeMismatch {
2116                detail: format!("bigint overflow on {op_name}"),
2117            })?;
2118            Ok(Value::BigInt(result))
2119        }
2120        (a, b)
2121            if a.data_type() == Some(DataType::Float) || b.data_type() == Some(DataType::Float) =>
2122        {
2123            let af = as_f64(&a)?;
2124            let bf = as_f64(&b)?;
2125            Ok(Value::Float(float_op(af, bf)))
2126        }
2127        (a, b) => Err(EvalError::TypeMismatch {
2128            detail: format!(
2129                "{op_name} applied to non-numeric: {:?} vs {:?}",
2130                a.data_type(),
2131                b.data_type()
2132            ),
2133        }),
2134    }
2135}
2136
2137/// L2 (Euclidean) distance between two vectors of equal dimension.
2138/// Returned as `Value::Float(d)` so it composes with the existing
2139/// comparison / sort plumbing. Mismatched dims or non-vector operands
2140/// raise `TypeMismatch`.
2141#[allow(clippy::many_single_char_names)] // l, r, a, b, d are the natural names
2142fn l2_distance(l: Value, r: Value) -> Result<Value, EvalError> {
2143    // v6.0.1: route both operands through `unwrap_vec_pair` so SQ8
2144    // cells dequantise on the way in. Sub-f64 precision loss is
2145    // negligible vs the dequantisation noise the SQ8 path already
2146    // ships with.
2147    let (a, b) = unwrap_vec_pair(l, r, "<->")?;
2148    let mut sum: f64 = 0.0;
2149    for (x, y) in a.iter().zip(b.iter()) {
2150        let d = f64::from(*x) - f64::from(*y);
2151        sum += d * d;
2152    }
2153    Ok(Value::Float(sqrt_newton(sum)))
2154}
2155
2156/// Self-built `sqrt` for `f64` — `std::f64::sqrt` lives in `std`, which the
2157/// engine's `no_std` constraint disallows. Newton-Raphson with a few rounds
2158/// reaches IEEE-754 precision for the inputs we'll see (sum of squares of
2159/// f32-derived distances, always non-negative, never NaN).
2160fn sqrt_newton(x: f64) -> f64 {
2161    if x <= 0.0 {
2162        return 0.0;
2163    }
2164    let mut g = x;
2165    // 10 iterations is conservative; 6 already converges to ulp for typical
2166    // distances.
2167    for _ in 0..10 {
2168        g = 0.5 * (g + x / g);
2169    }
2170    g
2171}
2172
2173fn div_op(l: Value, r: Value) -> Result<Value, EvalError> {
2174    let any_float = matches!(l.data_type(), Some(DataType::Float))
2175        || matches!(r.data_type(), Some(DataType::Float));
2176    if any_float {
2177        let a = as_f64(&l)?;
2178        let b = as_f64(&r)?;
2179        if b == 0.0 {
2180            return Err(EvalError::DivisionByZero);
2181        }
2182        return Ok(Value::Float(a / b));
2183    }
2184    arith(
2185        l,
2186        r,
2187        |a, b| {
2188            if b == 0 { None } else { Some(a / b) }
2189        },
2190        |a, b| a / b,
2191        "/",
2192    )
2193    .map_err(|e| match e {
2194        // The closure returns None on b == 0; translate that into the dedicated
2195        // DivisionByZero variant instead of "integer overflow on /".
2196        EvalError::TypeMismatch { detail } if detail.contains('/') => EvalError::DivisionByZero,
2197        other => other,
2198    })
2199}
2200
2201fn as_f64(v: &Value) -> Result<f64, EvalError> {
2202    match v {
2203        Value::SmallInt(n) => Ok(f64::from(*n)),
2204        Value::Int(n) => Ok(f64::from(*n)),
2205        #[allow(clippy::cast_precision_loss)]
2206        Value::BigInt(n) => Ok(*n as f64),
2207        Value::Float(x) => Ok(*x),
2208        #[allow(clippy::cast_precision_loss)]
2209        Value::Numeric { scaled, scale } => {
2210            let mut div = 1.0_f64;
2211            for _ in 0..*scale {
2212                div *= 10.0;
2213            }
2214            Ok((*scaled as f64) / div)
2215        }
2216        other => Err(EvalError::TypeMismatch {
2217            detail: format!("cannot convert {:?} to FLOAT", other.data_type()),
2218        }),
2219    }
2220}
2221
2222fn compare(op: BinOp, l: &Value, r: &Value) -> Result<Value, EvalError> {
2223    let ord = match (l, r) {
2224        (Value::Int(a), Value::Int(b)) => i64::from(*a).cmp(&i64::from(*b)),
2225        (Value::Int(a), Value::BigInt(b)) => i64::from(*a).cmp(b),
2226        (Value::BigInt(a), Value::Int(b)) => a.cmp(&i64::from(*b)),
2227        (Value::BigInt(a), Value::BigInt(b)) => a.cmp(b),
2228        (a, b)
2229            if matches!(a.data_type(), Some(DataType::Float))
2230                || matches!(b.data_type(), Some(DataType::Float)) =>
2231        {
2232            let af = as_f64(a)?;
2233            let bf = as_f64(b)?;
2234            af.partial_cmp(&bf).ok_or(EvalError::TypeMismatch {
2235                detail: "NaN in comparison".into(),
2236            })?
2237        }
2238        (Value::Text(a), Value::Text(b)) => a.cmp(b),
2239        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
2240        // Date / Timestamp compare on their integer storage repr.
2241        // Cross-domain (Date vs Timestamp) lifts the Date to the
2242        // matching midnight TIMESTAMP first.
2243        (Value::Date(a), Value::Date(b)) => a.cmp(b),
2244        (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
2245        (Value::Date(a), Value::Timestamp(b)) => (i64::from(*a) * 86_400_000_000).cmp(b),
2246        (Value::Timestamp(a), Value::Date(b)) => a.cmp(&(i64::from(*b) * 86_400_000_000)),
2247        // PG-style implicit coercion: comparing a DATE / TIMESTAMP
2248        // column against a text literal lifts the literal into the
2249        // matching domain (e.g. `day >= '2024-01-01'`).
2250        (Value::Date(a), Value::Text(b)) => {
2251            let bd = parse_date_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2252                detail: format!("cannot parse {b:?} as DATE for comparison"),
2253            })?;
2254            a.cmp(&bd)
2255        }
2256        (Value::Text(a), Value::Date(b)) => {
2257            let ad = parse_date_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2258                detail: format!("cannot parse {a:?} as DATE for comparison"),
2259            })?;
2260            ad.cmp(b)
2261        }
2262        (Value::Timestamp(a), Value::Text(b)) => {
2263            let bt = parse_timestamp_literal(b).ok_or_else(|| EvalError::TypeMismatch {
2264                detail: format!("cannot parse {b:?} as TIMESTAMP for comparison"),
2265            })?;
2266            a.cmp(&bt)
2267        }
2268        (Value::Text(a), Value::Timestamp(b)) => {
2269            let at = parse_timestamp_literal(a).ok_or_else(|| EvalError::TypeMismatch {
2270                detail: format!("cannot parse {a:?} as TIMESTAMP for comparison"),
2271            })?;
2272            at.cmp(b)
2273        }
2274        (a, b) => {
2275            return Err(EvalError::TypeMismatch {
2276                detail: format!(
2277                    "comparison between {:?} and {:?}",
2278                    a.data_type(),
2279                    b.data_type()
2280                ),
2281            });
2282        }
2283    };
2284    let result = match op {
2285        BinOp::Eq => ord.is_eq(),
2286        BinOp::NotEq => !ord.is_eq(),
2287        BinOp::Lt => ord.is_lt(),
2288        BinOp::LtEq => ord.is_le(),
2289        BinOp::Gt => ord.is_gt(),
2290        BinOp::GtEq => ord.is_ge(),
2291        BinOp::And
2292        | BinOp::Or
2293        | BinOp::Add
2294        | BinOp::Sub
2295        | BinOp::Mul
2296        | BinOp::Div
2297        | BinOp::L2Distance
2298        | BinOp::InnerProduct
2299        | BinOp::CosineDistance
2300        | BinOp::Concat
2301        | BinOp::JsonGet
2302        | BinOp::JsonGetText
2303        | BinOp::JsonGetPath
2304        | BinOp::JsonGetPathText
2305        | BinOp::JsonContains
2306        | BinOp::IsDistinctFrom
2307        | BinOp::IsNotDistinctFrom => {
2308            unreachable!("compare() only called with comparison ops")
2309        }
2310    };
2311    Ok(Value::Bool(result))
2312}
2313
2314// SQL three-valued AND / OR.
2315fn and_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2316    match (l, r) {
2317        (Value::Bool(false), _) | (_, Value::Bool(false)) => Ok(Value::Bool(false)),
2318        (Value::Bool(true), Value::Bool(true)) => Ok(Value::Bool(true)),
2319        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2320        (a, b) => Err(EvalError::TypeMismatch {
2321            detail: format!(
2322                "AND on non-boolean: {:?} and {:?}",
2323                a.data_type(),
2324                b.data_type()
2325            ),
2326        }),
2327    }
2328}
2329
2330fn or_3vl(l: Value, r: Value) -> Result<Value, EvalError> {
2331    match (l, r) {
2332        (Value::Bool(true), _) | (_, Value::Bool(true)) => Ok(Value::Bool(true)),
2333        (Value::Bool(false), Value::Bool(false)) => Ok(Value::Bool(false)),
2334        (Value::Null, _) | (_, Value::Null) => Ok(Value::Null),
2335        (a, b) => Err(EvalError::TypeMismatch {
2336            detail: format!(
2337                "OR on non-boolean: {:?} and {:?}",
2338                a.data_type(),
2339                b.data_type()
2340            ),
2341        }),
2342    }
2343}
2344
2345#[cfg(test)]
2346mod tests {
2347    use super::*;
2348    use alloc::vec;
2349    use spg_storage::{ColumnSchema, Row};
2350
2351    fn col(name: &str, ty: DataType) -> ColumnSchema {
2352        ColumnSchema::new(name, ty, true)
2353    }
2354
2355    fn ctx<'a>(cols: &'a [ColumnSchema], alias: Option<&'a str>) -> EvalContext<'a> {
2356        EvalContext::new(cols, alias)
2357    }
2358
2359    fn lit(n: i64) -> Expr {
2360        Expr::Literal(Literal::Integer(n))
2361    }
2362
2363    fn null() -> Expr {
2364        Expr::Literal(Literal::Null)
2365    }
2366
2367    fn col_ref(name: &str) -> Expr {
2368        Expr::Column(ColumnName {
2369            qualifier: None,
2370            name: name.into(),
2371        })
2372    }
2373
2374    #[test]
2375    fn literal_evaluates_to_value() {
2376        let r = Row::new(vec![]);
2377        let cs: [ColumnSchema; 0] = [];
2378        let c = ctx(&cs, None);
2379        assert_eq!(eval_expr(&lit(42), &r, &c).unwrap(), Value::Int(42));
2380        assert_eq!(
2381            eval_expr(&Expr::Literal(Literal::Float(1.5)), &r, &c).unwrap(),
2382            Value::Float(1.5)
2383        );
2384        assert_eq!(eval_expr(&null(), &r, &c).unwrap(), Value::Null);
2385    }
2386
2387    #[test]
2388    fn column_lookup_unqualified() {
2389        let cs = vec![col("a", DataType::Int), col("b", DataType::Text)];
2390        let r = Row::new(vec![Value::Int(7), Value::Text("hi".into())]);
2391        let c = ctx(&cs, None);
2392        assert_eq!(eval_expr(&col_ref("a"), &r, &c).unwrap(), Value::Int(7));
2393        assert_eq!(
2394            eval_expr(&col_ref("b"), &r, &c).unwrap(),
2395            Value::Text("hi".into())
2396        );
2397    }
2398
2399    #[test]
2400    fn column_not_found_errors() {
2401        let cs = vec![col("a", DataType::Int)];
2402        let r = Row::new(vec![Value::Int(0)]);
2403        let c = ctx(&cs, None);
2404        let err = eval_expr(&col_ref("ghost"), &r, &c).unwrap_err();
2405        assert!(matches!(err, EvalError::ColumnNotFound { ref name } if name == "ghost"));
2406    }
2407
2408    #[test]
2409    fn qualified_column_matches_alias() {
2410        let cs = vec![col("a", DataType::Int)];
2411        let r = Row::new(vec![Value::Int(5)]);
2412        let c = ctx(&cs, Some("u"));
2413        let qualified = Expr::Column(ColumnName {
2414            qualifier: Some("u".into()),
2415            name: "a".into(),
2416        });
2417        assert_eq!(eval_expr(&qualified, &r, &c).unwrap(), Value::Int(5));
2418    }
2419
2420    #[test]
2421    fn qualified_column_unknown_alias_errors() {
2422        let cs = vec![col("a", DataType::Int)];
2423        let r = Row::new(vec![Value::Int(5)]);
2424        let c = ctx(&cs, Some("u"));
2425        let wrong = Expr::Column(ColumnName {
2426            qualifier: Some("x".into()),
2427            name: "a".into(),
2428        });
2429        assert!(matches!(
2430            eval_expr(&wrong, &r, &c).unwrap_err(),
2431            EvalError::UnknownQualifier { .. }
2432        ));
2433    }
2434
2435    #[test]
2436    fn arithmetic_with_widening() {
2437        let r = Row::new(vec![]);
2438        let cs: [ColumnSchema; 0] = [];
2439        let c = ctx(&cs, None);
2440        let e = Expr::Binary {
2441            lhs: alloc::boxed::Box::new(lit(2)),
2442            op: BinOp::Add,
2443            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::Float(0.5))),
2444        };
2445        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Float(2.5));
2446    }
2447
2448    #[test]
2449    fn division_by_zero_errors() {
2450        let r = Row::new(vec![]);
2451        let cs: [ColumnSchema; 0] = [];
2452        let c = ctx(&cs, None);
2453        let e = Expr::Binary {
2454            lhs: alloc::boxed::Box::new(lit(1)),
2455            op: BinOp::Div,
2456            rhs: alloc::boxed::Box::new(lit(0)),
2457        };
2458        assert_eq!(
2459            eval_expr(&e, &r, &c).unwrap_err(),
2460            EvalError::DivisionByZero
2461        );
2462    }
2463
2464    #[test]
2465    fn comparison_returns_bool() {
2466        let r = Row::new(vec![]);
2467        let cs: [ColumnSchema; 0] = [];
2468        let c = ctx(&cs, None);
2469        let e = Expr::Binary {
2470            lhs: alloc::boxed::Box::new(lit(1)),
2471            op: BinOp::Lt,
2472            rhs: alloc::boxed::Box::new(lit(2)),
2473        };
2474        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2475    }
2476
2477    #[test]
2478    fn null_propagates_through_arithmetic() {
2479        let r = Row::new(vec![]);
2480        let cs: [ColumnSchema; 0] = [];
2481        let c = ctx(&cs, None);
2482        let e = Expr::Binary {
2483            lhs: alloc::boxed::Box::new(lit(1)),
2484            op: BinOp::Add,
2485            rhs: alloc::boxed::Box::new(null()),
2486        };
2487        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2488    }
2489
2490    #[test]
2491    fn and_three_valued_logic() {
2492        let r = Row::new(vec![]);
2493        let cs: [ColumnSchema; 0] = [];
2494        let c = ctx(&cs, None);
2495        let tt = |a: bool, b_null: bool| Expr::Binary {
2496            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2497            op: BinOp::And,
2498            rhs: alloc::boxed::Box::new(if b_null {
2499                null()
2500            } else {
2501                Expr::Literal(Literal::Bool(true))
2502            }),
2503        };
2504        // FALSE AND NULL → FALSE
2505        assert_eq!(
2506            eval_expr(&tt(false, true), &r, &c).unwrap(),
2507            Value::Bool(false)
2508        );
2509        // TRUE AND NULL → NULL
2510        assert_eq!(eval_expr(&tt(true, true), &r, &c).unwrap(), Value::Null);
2511        // TRUE AND TRUE → TRUE
2512        assert_eq!(
2513            eval_expr(&tt(true, false), &r, &c).unwrap(),
2514            Value::Bool(true)
2515        );
2516    }
2517
2518    #[test]
2519    fn or_three_valued_logic() {
2520        let r = Row::new(vec![]);
2521        let cs: [ColumnSchema; 0] = [];
2522        let c = ctx(&cs, None);
2523        let or_with_null = |a: bool| Expr::Binary {
2524            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::Bool(a))),
2525            op: BinOp::Or,
2526            rhs: alloc::boxed::Box::new(null()),
2527        };
2528        // TRUE OR NULL → TRUE
2529        assert_eq!(
2530            eval_expr(&or_with_null(true), &r, &c).unwrap(),
2531            Value::Bool(true)
2532        );
2533        // FALSE OR NULL → NULL
2534        assert_eq!(
2535            eval_expr(&or_with_null(false), &r, &c).unwrap(),
2536            Value::Null
2537        );
2538    }
2539
2540    #[test]
2541    fn not_on_null_is_null() {
2542        let r = Row::new(vec![]);
2543        let cs: [ColumnSchema; 0] = [];
2544        let c = ctx(&cs, None);
2545        let e = Expr::Unary {
2546            op: UnOp::Not,
2547            expr: alloc::boxed::Box::new(null()),
2548        };
2549        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Null);
2550    }
2551
2552    #[test]
2553    fn text_comparison_lexicographic() {
2554        let r = Row::new(vec![]);
2555        let cs: [ColumnSchema; 0] = [];
2556        let c = ctx(&cs, None);
2557        let e = Expr::Binary {
2558            lhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("apple".into()))),
2559            op: BinOp::Lt,
2560            rhs: alloc::boxed::Box::new(Expr::Literal(Literal::String("banana".into()))),
2561        };
2562        assert_eq!(eval_expr(&e, &r, &c).unwrap(), Value::Bool(true));
2563    }
2564
2565    #[test]
2566    fn interval_format_basics() {
2567        assert_eq!(format_interval(0, 0), "0");
2568        assert_eq!(format_interval(0, 86_400_000_000), "1 day");
2569        assert_eq!(format_interval(0, -86_400_000_000), "-1 days");
2570        assert_eq!(format_interval(0, 3_600_000_000), "01:00:00");
2571        assert_eq!(
2572            format_interval(0, 86_400_000_000 + 9_000_000),
2573            "1 day 00:00:09"
2574        );
2575        assert_eq!(format_interval(14, 0), "1 year 2 mons");
2576        assert_eq!(format_interval(-1, 0), "-1 mons");
2577    }
2578
2579    #[test]
2580    fn interval_add_to_timestamp_micros_part() {
2581        // 2024-01-01 00:00:00 + INTERVAL '1 hour' = 2024-01-01 01:00:00
2582        let ts = i64::from(days_from_civil(2024, 1, 1)) * 86_400_000_000;
2583        let r = add_interval_to_micros(ts, 0, 3_600_000_000).unwrap();
2584        let expected = ts + 3_600_000_000;
2585        assert_eq!(r, expected);
2586    }
2587
2588    #[test]
2589    fn interval_clamp_month_end() {
2590        // 2024-01-31 + 1 month = 2024-02-29 (leap year).
2591        let d = days_from_civil(2024, 1, 31);
2592        let shifted = shift_date_by_months(d, 1).unwrap();
2593        let (y, m, day) = civil_from_days(shifted);
2594        assert_eq!((y, m, day), (2024, 2, 29));
2595        // 2023-01-31 + 1 month = 2023-02-28 (non-leap).
2596        let d = days_from_civil(2023, 1, 31);
2597        let shifted = shift_date_by_months(d, 1).unwrap();
2598        let (y, m, day) = civil_from_days(shifted);
2599        assert_eq!((y, m, day), (2023, 2, 28));
2600        // 2024-03-31 - 1 month = 2024-02-29.
2601        let d = days_from_civil(2024, 3, 31);
2602        let shifted = shift_date_by_months(d, -1).unwrap();
2603        let (y, m, day) = civil_from_days(shifted);
2604        assert_eq!((y, m, day), (2024, 2, 29));
2605    }
2606
2607    #[test]
2608    fn interval_date_plus_pure_days_stays_date() {
2609        // DATE + INTERVAL '7 days' must stay DATE.
2610        let d = days_from_civil(2024, 6, 1);
2611        let lhs = Value::Date(d);
2612        let rhs = Value::Interval {
2613            months: 0,
2614            micros: 7 * 86_400_000_000,
2615        };
2616        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2617            .unwrap()
2618            .unwrap();
2619        let expected = days_from_civil(2024, 6, 8);
2620        assert_eq!(v, Value::Date(expected));
2621    }
2622
2623    #[test]
2624    fn interval_date_plus_sub_day_lifts_to_timestamp() {
2625        // DATE + INTERVAL '1 hour' must lift to TIMESTAMP.
2626        let d = days_from_civil(2024, 6, 1);
2627        let lhs = Value::Date(d);
2628        let rhs = Value::Interval {
2629            months: 0,
2630            micros: 3_600_000_000,
2631        };
2632        let v = apply_binary_interval(BinOp::Add, &lhs, &rhs)
2633            .unwrap()
2634            .unwrap();
2635        let expected = i64::from(d) * 86_400_000_000 + 3_600_000_000;
2636        assert_eq!(v, Value::Timestamp(expected));
2637    }
2638}