Skip to main content

spg_engine/
conversions.rs

1//! Type conversions — Value/literal <-> text/bytes/special-format. The
2//! coercion entry point (`coerce_value`) plus every parser/formatter it
3//! leans on: bytea, text/2-D arrays, hstore, ranges, money, time, year,
4//! and literal->Value. Split out of `lib.rs` (v7.32 engine
5//! modularisation); a self-contained cluster (its members call each
6//! other), depending only on spg_storage/spg_sql, `eval`, and `numeric`.
7
8use alloc::string::ToString;
9use alloc::vec::Vec;
10
11use spg_sql::ast::{ColumnTypeName, Expr, Literal, UnOp, VecEncoding as SqlVecEncoding};
12use spg_storage::{ColumnSchema, DataType, StorageError, Value, VecEncoding};
13
14use crate::EngineError;
15use crate::eval::{self, EvalContext, EvalError};
16use crate::numeric::{
17    numeric_from_float, numeric_from_integer, numeric_rescale, numeric_truncate_to_integer,
18    parse_numeric_text,
19};
20
21/// v7.10.4 — decode a BYTEA literal. Accepts:
22///   * `\xDEADBEEF` (case-insensitive hex; whitespace stripped)
23///   * `Hello\000world` (backslash escape form; `\\` for literal backslash)
24///   * Anything else → raw UTF-8 bytes of the input (PG accepts this too).
25pub(crate) fn decode_bytea_literal(s: &str) -> Result<alloc::vec::Vec<u8>, &'static str> {
26    let s = s.trim();
27    if let Some(hex) = s.strip_prefix("\\x").or_else(|| s.strip_prefix("\\X")) {
28        // Hex form. Each pair of hex digits → one byte.
29        let cleaned: alloc::string::String = hex.chars().filter(|c| !c.is_whitespace()).collect();
30        if cleaned.len() % 2 != 0 {
31            return Err("odd-length hex literal");
32        }
33        let mut out = alloc::vec::Vec::with_capacity(cleaned.len() / 2);
34        let cleaned_bytes = cleaned.as_bytes();
35        for i in (0..cleaned_bytes.len()).step_by(2) {
36            let hi = hex_nibble(cleaned_bytes[i])?;
37            let lo = hex_nibble(cleaned_bytes[i + 1])?;
38            out.push((hi << 4) | lo);
39        }
40        return Ok(out);
41    }
42    // Escape form or raw. Walk char-by-char; `\\` and `\NNN` octal
43    // sequences decode; anything else is a literal byte.
44    let bytes = s.as_bytes();
45    let mut out = alloc::vec::Vec::with_capacity(bytes.len());
46    let mut i = 0;
47    while i < bytes.len() {
48        let b = bytes[i];
49        if b == b'\\' && i + 1 < bytes.len() {
50            let n = bytes[i + 1];
51            if n == b'\\' {
52                out.push(b'\\');
53                i += 2;
54                continue;
55            }
56            if n.is_ascii_digit()
57                && i + 3 < bytes.len()
58                && bytes[i + 2].is_ascii_digit()
59                && bytes[i + 3].is_ascii_digit()
60            {
61                let oct = |x: u8| (x - b'0') as u32;
62                let v = oct(n) * 64 + oct(bytes[i + 2]) * 8 + oct(bytes[i + 3]);
63                if v <= 0xFF {
64                    out.push(v as u8);
65                    i += 4;
66                    continue;
67                }
68            }
69        }
70        out.push(b);
71        i += 1;
72    }
73    Ok(out)
74}
75
76pub(crate) fn hex_nibble(b: u8) -> Result<u8, &'static str> {
77    match b {
78        b'0'..=b'9' => Ok(b - b'0'),
79        b'a'..=b'f' => Ok(b - b'a' + 10),
80        b'A'..=b'F' => Ok(b - b'A' + 10),
81        _ => Err("invalid hex digit"),
82    }
83}
84
85/// v7.10.11 — decode a PG TEXT[] external array form
86/// (`{a,b,NULL}` with optional double-quoted elements). The
87/// engine takes a leading/trailing `{`/`}` and splits at commas.
88/// Quoted elements (`"hello, world"`) preserve embedded commas;
89/// `\\` and `\"` decode to literal backslash / quote. Plain
90/// unquoted `NULL` (case-insensitive) maps to `None`.
91/// v7.11.13 — pick the array type for `ARRAY[lit, …]` from the
92/// element values. Single-element-type rules:
93///   - all NULL / all Text → TextArray
94///   - all Int (or Int+NULL) → IntArray
95///   - any BigInt without Text → BigIntArray (widening)
96///   - any Text → TextArray (fallback; non-string elements
97///     render as text)
98pub(crate) fn array_literal_widen(items: alloc::vec::Vec<Value>) -> Value {
99    let mut has_text = false;
100    let mut has_bigint = false;
101    let mut has_int = false;
102    for v in &items {
103        match v {
104            Value::Null => {}
105            Value::Text(_) | Value::Json(_) => has_text = true,
106            Value::BigInt(_) => has_bigint = true,
107            Value::Int(_) | Value::SmallInt(_) => has_int = true,
108            _ => has_text = true,
109        }
110    }
111    if has_text || (!has_bigint && !has_int) {
112        let out: alloc::vec::Vec<Option<alloc::string::String>> = items
113            .into_iter()
114            .map(|v| match v {
115                Value::Null => None,
116                Value::Text(s) | Value::Json(s) => Some(s),
117                other => Some(alloc::format!("{other:?}")),
118            })
119            .collect();
120        return Value::TextArray(out);
121    }
122    if has_bigint {
123        let out: alloc::vec::Vec<Option<i64>> = items
124            .into_iter()
125            .map(|v| match v {
126                Value::Null => None,
127                Value::Int(n) => Some(i64::from(n)),
128                Value::SmallInt(n) => Some(i64::from(n)),
129                Value::BigInt(n) => Some(n),
130                _ => unreachable!("widen: unexpected non-integer in BigInt path"),
131            })
132            .collect();
133        return Value::BigIntArray(out);
134    }
135    let out: alloc::vec::Vec<Option<i32>> = items
136        .into_iter()
137        .map(|v| match v {
138            Value::Null => None,
139            Value::Int(n) => Some(n),
140            Value::SmallInt(n) => Some(i32::from(n)),
141            _ => unreachable!("widen: unexpected non-i32-compatible in Int path"),
142        })
143        .collect();
144    Value::IntArray(out)
145}
146
147pub(crate) fn decode_text_array_literal(
148    s: &str,
149) -> Result<alloc::vec::Vec<Option<alloc::string::String>>, &'static str> {
150    let trimmed = s.trim();
151    let inner = trimmed
152        .strip_prefix('{')
153        .and_then(|x| x.strip_suffix('}'))
154        .ok_or("TEXT[] literal must be enclosed in '{...}'")?;
155    let mut out: alloc::vec::Vec<Option<alloc::string::String>> = alloc::vec::Vec::new();
156    if inner.trim().is_empty() {
157        return Ok(out);
158    }
159    let bytes = inner.as_bytes();
160    let mut i = 0;
161    while i <= bytes.len() {
162        // Skip leading whitespace.
163        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
164            i += 1;
165        }
166        // Quoted element.
167        if i < bytes.len() && bytes[i] == b'"' {
168            i += 1; // open quote
169            let mut buf = alloc::string::String::new();
170            while i < bytes.len() && bytes[i] != b'"' {
171                if bytes[i] == b'\\' && i + 1 < bytes.len() {
172                    buf.push(bytes[i + 1] as char);
173                    i += 2;
174                } else {
175                    buf.push(bytes[i] as char);
176                    i += 1;
177                }
178            }
179            if i >= bytes.len() {
180                return Err("unterminated quoted element");
181            }
182            i += 1; // close quote
183            out.push(Some(buf));
184        } else {
185            // Unquoted element — read until next comma or end.
186            let start = i;
187            while i < bytes.len() && bytes[i] != b',' {
188                i += 1;
189            }
190            let raw = inner[start..i].trim();
191            if raw.eq_ignore_ascii_case("NULL") {
192                out.push(None);
193            } else {
194                out.push(Some(alloc::string::ToString::to_string(raw)));
195            }
196        }
197        // Skip whitespace, expect comma or end.
198        while i < bytes.len() && (bytes[i] == b' ' || bytes[i] == b'\t') {
199            i += 1;
200        }
201        if i >= bytes.len() {
202            break;
203        }
204        if bytes[i] != b',' {
205            return Err("expected ',' between TEXT[] elements");
206        }
207        i += 1;
208    }
209    Ok(out)
210}
211
212/// v7.10.11 — encode a TEXT[] back into the PG external array
213/// form. NULL elements become the literal `NULL`; elements
214/// containing commas, quotes, backslashes, or braces are
215/// double-quoted with `\\` / `\"` escapes.
216pub(crate) fn encode_text_array(items: &[Option<alloc::string::String>]) -> alloc::string::String {
217    let mut out = alloc::string::String::with_capacity(2 + items.len() * 8);
218    out.push('{');
219    for (i, item) in items.iter().enumerate() {
220        if i > 0 {
221            out.push(',');
222        }
223        match item {
224            None => out.push_str("NULL"),
225            Some(s) => {
226                let needs_quote = s.is_empty()
227                    || s.eq_ignore_ascii_case("NULL")
228                    || s.chars()
229                        .any(|c| matches!(c, ',' | '{' | '}' | '"' | '\\' | ' ' | '\t'));
230                if needs_quote {
231                    out.push('"');
232                    for c in s.chars() {
233                        if c == '"' || c == '\\' {
234                            out.push('\\');
235                        }
236                        out.push(c);
237                    }
238                    out.push('"');
239                } else {
240                    out.push_str(s);
241                }
242            }
243        }
244    }
245    out.push('}');
246    out
247}
248
249/// v7.10.4 — encode BYTEA bytes in PG hex output format
250/// (`\x` prefix, lowercase hex pairs). Used by Text-side
251/// round-trip + the wire layer's text-mode encoder.
252pub(crate) fn encode_bytea_hex(b: &[u8]) -> alloc::string::String {
253    let mut out = alloc::string::String::with_capacity(2 + 2 * b.len());
254    out.push_str("\\x");
255    for byte in b {
256        let hi = byte >> 4;
257        let lo = byte & 0x0F;
258        out.push(hex_digit(hi));
259        out.push(hex_digit(lo));
260    }
261    out
262}
263
264pub(crate) const fn hex_digit(n: u8) -> char {
265    match n {
266        0..=9 => (b'0' + n) as char,
267        10..=15 => (b'a' + n - 10) as char,
268        _ => '?',
269    }
270}
271
272/// v7.17.0 Phase 3.P0-39 — parse a PG `hstore` text literal into
273/// a flat key→value map. Empty string → empty map. Duplicate
274/// keys take last-write-wins (matches PG `hstore_in`).
275///
276/// Accepted shapes (minimal subset):
277///   * `'a=>1, b=>2'`            — bareword keys/values
278///   * `'"a"=>"1", "b"=>"2"'`    — quoted keys/values
279///   * `'a=>NULL'`               — case-insensitive NULL token
280///     surfaces as `None` (no quotes around NULL)
281///
282/// Returns None on parse failure → caller surfaces as hard error.
283pub(crate) fn parse_hstore_str(
284    s: &str,
285) -> Option<Vec<(alloc::string::String, Option<alloc::string::String>)>> {
286    let bytes = s.as_bytes();
287    let mut i = 0;
288    let mut out: Vec<(alloc::string::String, Option<alloc::string::String>)> = Vec::new();
289    let skip_ws = |bytes: &[u8], i: &mut usize| {
290        while *i < bytes.len() && matches!(bytes[*i], b' ' | b'\t' | b'\n' | b'\r') {
291            *i += 1;
292        }
293    };
294    let parse_token = |bytes: &[u8], i: &mut usize| -> Option<alloc::string::String> {
295        if *i >= bytes.len() {
296            return None;
297        }
298        if bytes[*i] == b'"' {
299            *i += 1;
300            let mut out = alloc::string::String::new();
301            while *i < bytes.len() {
302                match bytes[*i] {
303                    b'"' => {
304                        *i += 1;
305                        return Some(out);
306                    }
307                    b'\\' if *i + 1 < bytes.len() => {
308                        out.push(bytes[*i + 1] as char);
309                        *i += 2;
310                    }
311                    c => {
312                        out.push(c as char);
313                        *i += 1;
314                    }
315                }
316            }
317            None
318        } else {
319            let start = *i;
320            while *i < bytes.len()
321                && !matches!(bytes[*i], b' ' | b'\t' | b'\n' | b'\r' | b',' | b'=')
322            {
323                *i += 1;
324            }
325            if *i == start {
326                return None;
327            }
328            Some(alloc::str::from_utf8(&bytes[start..*i]).ok()?.to_string())
329        }
330    };
331    skip_ws(bytes, &mut i);
332    while i < bytes.len() {
333        let key = parse_token(bytes, &mut i)?;
334        skip_ws(bytes, &mut i);
335        if i + 1 >= bytes.len() || bytes[i] != b'=' || bytes[i + 1] != b'>' {
336            return None;
337        }
338        i += 2;
339        skip_ws(bytes, &mut i);
340        // Check for unquoted NULL token (case-insensitive).
341        let val_token = if i + 4 <= bytes.len()
342            && bytes[i..i + 4].eq_ignore_ascii_case(b"NULL")
343            && (i + 4 == bytes.len() || matches!(bytes[i + 4], b' ' | b'\t' | b',' | b'\n' | b'\r'))
344        {
345            i += 4;
346            None
347        } else {
348            Some(parse_token(bytes, &mut i)?)
349        };
350        // Replace any existing entry with the same key (last-wins).
351        if let Some(pos) = out.iter().position(|(k, _)| k == &key) {
352            out[pos] = (key, val_token);
353        } else {
354            out.push((key, val_token));
355        }
356        skip_ws(bytes, &mut i);
357        if i >= bytes.len() {
358            break;
359        }
360        if bytes[i] == b',' {
361            i += 1;
362            skip_ws(bytes, &mut i);
363            continue;
364        }
365        return None;
366    }
367    Some(out)
368}
369
370/// v7.17.0 Phase 3.P0-39 — render a hstore as canonical PG text
371/// form `"k"=>"v"` (keys and non-NULL values always quoted;
372/// NULL token is bare).
373pub(crate) fn format_hstore_str(
374    pairs: &[(alloc::string::String, Option<alloc::string::String>)],
375) -> alloc::string::String {
376    let mut out = alloc::string::String::new();
377    for (i, (k, v)) in pairs.iter().enumerate() {
378        if i > 0 {
379            out.push_str(", ");
380        }
381        out.push('"');
382        out.push_str(k);
383        out.push_str("\"=>");
384        match v {
385            None => out.push_str("NULL"),
386            Some(val) => {
387                out.push('"');
388                out.push_str(val);
389                out.push('"');
390            }
391        }
392    }
393    out
394}
395
396/// v7.17.0 Phase 3.P0-39 — pub re-export so pgwire + sqllogictest
397/// share the single hstore renderer.
398pub fn format_hstore_text(
399    pairs: &[(alloc::string::String, Option<alloc::string::String>)],
400) -> alloc::string::String {
401    format_hstore_str(pairs)
402}
403
404// ─── v7.17.0 Phase 3.P0-40 — 2D array parse + display ─────────
405
406/// Split a PG external 2D-array literal `'{{a,b},{c,d}}'` into
407/// per-row token lists. Returns Err on shape mismatch.
408pub(crate) fn split_2d_literal(s: &str) -> Result<Vec<Vec<alloc::string::String>>, &'static str> {
409    let s = s.trim();
410    let outer = s
411        .strip_prefix('{')
412        .and_then(|x| x.strip_suffix('}'))
413        .ok_or("missing outer '{...}' braces")?;
414    let trimmed = outer.trim();
415    if trimmed.is_empty() {
416        return Ok(Vec::new());
417    }
418    let mut rows: Vec<Vec<alloc::string::String>> = Vec::new();
419    let mut i = 0;
420    let bytes = trimmed.as_bytes();
421    while i < bytes.len() {
422        while i < bytes.len() && matches!(bytes[i], b' ' | b'\t' | b'\n' | b'\r' | b',') {
423            i += 1;
424        }
425        if i >= bytes.len() {
426            break;
427        }
428        if bytes[i] != b'{' {
429            return Err("expected '{' opening a row");
430        }
431        i += 1;
432        let row_start = i;
433        let mut depth = 1;
434        while i < bytes.len() && depth > 0 {
435            match bytes[i] {
436                b'{' => depth += 1,
437                b'}' => depth -= 1,
438                _ => {}
439            }
440            if depth > 0 {
441                i += 1;
442            }
443        }
444        if depth != 0 {
445            return Err("unbalanced '{...}' in row");
446        }
447        let row_text = &trimmed[row_start..i];
448        i += 1;
449        let cells: Vec<alloc::string::String> = if row_text.trim().is_empty() {
450            Vec::new()
451        } else {
452            row_text.split(',').map(|t| t.trim().to_string()).collect()
453        };
454        rows.push(cells);
455    }
456    if let Some(first) = rows.first() {
457        let cols = first.len();
458        for r in &rows {
459            if r.len() != cols {
460                return Err("ragged 2D array (rows have different column counts)");
461            }
462        }
463    }
464    Ok(rows)
465}
466
467pub(crate) fn parse_int_2d_literal(s: &str) -> Result<Vec<Vec<Option<i32>>>, &'static str> {
468    let raw = split_2d_literal(s)?;
469    raw.into_iter()
470        .map(|row| {
471            row.into_iter()
472                .map(|cell| {
473                    if cell.eq_ignore_ascii_case("NULL") {
474                        Ok(None)
475                    } else {
476                        cell.parse::<i32>()
477                            .map(Some)
478                            .map_err(|_| "invalid int element")
479                    }
480                })
481                .collect()
482        })
483        .collect()
484}
485
486pub(crate) fn parse_bigint_2d_literal(s: &str) -> Result<Vec<Vec<Option<i64>>>, &'static str> {
487    let raw = split_2d_literal(s)?;
488    raw.into_iter()
489        .map(|row| {
490            row.into_iter()
491                .map(|cell| {
492                    if cell.eq_ignore_ascii_case("NULL") {
493                        Ok(None)
494                    } else {
495                        cell.parse::<i64>()
496                            .map(Some)
497                            .map_err(|_| "invalid bigint element")
498                    }
499                })
500                .collect()
501        })
502        .collect()
503}
504
505pub(crate) fn parse_text_2d_literal(
506    s: &str,
507) -> Result<Vec<Vec<Option<alloc::string::String>>>, &'static str> {
508    let raw = split_2d_literal(s)?;
509    Ok(raw
510        .into_iter()
511        .map(|row| {
512            row.into_iter()
513                .map(|cell| {
514                    if cell.eq_ignore_ascii_case("NULL") {
515                        None
516                    } else {
517                        Some(cell.trim_matches('"').to_string())
518                    }
519                })
520                .collect()
521        })
522        .collect())
523}
524
525pub(crate) fn format_int_2d_text(rows: &[Vec<Option<i32>>]) -> alloc::string::String {
526    let mut out = alloc::string::String::from("{");
527    for (i, row) in rows.iter().enumerate() {
528        if i > 0 {
529            out.push(',');
530        }
531        out.push('{');
532        for (j, cell) in row.iter().enumerate() {
533            if j > 0 {
534                out.push(',');
535            }
536            match cell {
537                None => out.push_str("NULL"),
538                Some(n) => out.push_str(&alloc::format!("{n}")),
539            }
540        }
541        out.push('}');
542    }
543    out.push('}');
544    out
545}
546
547pub(crate) fn format_bigint_2d_text(rows: &[Vec<Option<i64>>]) -> alloc::string::String {
548    let mut out = alloc::string::String::from("{");
549    for (i, row) in rows.iter().enumerate() {
550        if i > 0 {
551            out.push(',');
552        }
553        out.push('{');
554        for (j, cell) in row.iter().enumerate() {
555            if j > 0 {
556                out.push(',');
557            }
558            match cell {
559                None => out.push_str("NULL"),
560                Some(n) => out.push_str(&alloc::format!("{n}")),
561            }
562        }
563        out.push('}');
564    }
565    out.push('}');
566    out
567}
568
569pub(crate) fn format_text_2d_text(
570    rows: &[Vec<Option<alloc::string::String>>],
571) -> alloc::string::String {
572    let mut out = alloc::string::String::from("{");
573    for (i, row) in rows.iter().enumerate() {
574        if i > 0 {
575            out.push(',');
576        }
577        out.push('{');
578        for (j, cell) in row.iter().enumerate() {
579            if j > 0 {
580                out.push(',');
581            }
582            match cell {
583                None => out.push_str("NULL"),
584                Some(s) => out.push_str(s),
585            }
586        }
587        out.push('}');
588    }
589    out.push('}');
590    out
591}
592
593/// v7.17.0 Phase 3.P0-40 — pub re-exports so pgwire + sqllogictest
594/// share the single 2D-array renderer.
595pub fn format_int_2d_text_pub(rows: &[Vec<Option<i32>>]) -> alloc::string::String {
596    format_int_2d_text(rows)
597}
598pub fn format_bigint_2d_text_pub(rows: &[Vec<Option<i64>>]) -> alloc::string::String {
599    format_bigint_2d_text(rows)
600}
601pub fn format_text_2d_text_pub(
602    rows: &[Vec<Option<alloc::string::String>>],
603) -> alloc::string::String {
604    format_text_2d_text(rows)
605}
606
607/// v7.17.0 Phase 3.P0-38 — parse a PG range literal of the form
608/// `'[lo,up)'` / `'(lo,up]'` / `'[lo,up]'` / `'(lo,up)'` /
609/// `'empty'`. Lower / upper may be empty (unbounded). Returns
610/// `None` on any parse failure; caller surfaces as hard error.
611pub(crate) fn parse_range_str(s: &str, kind: spg_storage::RangeKind) -> Option<Value> {
612    let s = s.trim();
613    if s.eq_ignore_ascii_case("empty") {
614        return Some(Value::Range {
615            kind,
616            lower: None,
617            upper: None,
618            lower_inc: false,
619            upper_inc: false,
620            empty: true,
621        });
622    }
623    let bytes = s.as_bytes();
624    if bytes.len() < 3 {
625        return None;
626    }
627    let lower_inc = match bytes[0] {
628        b'[' => true,
629        b'(' => false,
630        _ => return None,
631    };
632    let upper_inc = match bytes[bytes.len() - 1] {
633        b']' => true,
634        b')' => false,
635        _ => return None,
636    };
637    let inner = &s[1..s.len() - 1];
638    let (lo_text, up_text) = inner.split_once(',')?;
639    let lower = if lo_text.is_empty() {
640        None
641    } else {
642        Some(alloc::boxed::Box::new(parse_range_element(lo_text, kind)?))
643    };
644    let upper = if up_text.is_empty() {
645        None
646    } else {
647        Some(alloc::boxed::Box::new(parse_range_element(up_text, kind)?))
648    };
649    Some(Value::Range {
650        kind,
651        lower,
652        upper,
653        lower_inc,
654        upper_inc,
655        empty: false,
656    })
657}
658
659/// v7.17.0 Phase 3.P0-38 — parse a single range bound text into
660/// the matching element Value for the RangeKind.
661pub(crate) fn parse_range_element(text: &str, kind: spg_storage::RangeKind) -> Option<Value> {
662    let text = text.trim().trim_matches('"');
663    use spg_storage::RangeKind as K;
664    match kind {
665        K::Int4 => text.parse::<i32>().ok().map(Value::Int),
666        K::Int8 => text.parse::<i64>().ok().map(Value::BigInt),
667        K::Num => {
668            // Reuse the Numeric parse via the engine's text-coercion
669            // path; bail to None on failure.
670            let dot = text.find('.');
671            let scale: u8 = dot.map_or(0, |p| (text.len() - p - 1) as u8);
672            let digits: alloc::string::String = text
673                .chars()
674                .filter(|c| *c == '-' || c.is_ascii_digit())
675                .collect();
676            let scaled: i128 = digits.parse().ok()?;
677            Some(Value::Numeric { scaled, scale })
678        }
679        K::Ts | K::TsTz => {
680            // Reuse the existing timestamp parse path. v7.17.0
681            // expects `'YYYY-MM-DD HH:MM:SS[.ffffff]'` in range
682            // bounds (TZ offset on TsTz is OOS for the initial
683            // P0-38; ship plain Timestamp shape).
684            crate::eval::parse_timestamp_literal(text).map(Value::Timestamp)
685        }
686        K::Date => crate::eval::parse_date_literal(text).map(Value::Date),
687    }
688}
689
690/// v7.17.0 Phase 3.P0-38 — render a Range value as its canonical
691/// PG text form. Re-exported via [`format_range_text`] for use
692/// from spg-server's pgwire layer.
693pub fn format_range_text(v: &Value) -> alloc::string::String {
694    format_range_str(v)
695}
696
697pub(crate) fn format_range_str(v: &Value) -> alloc::string::String {
698    let Value::Range {
699        lower,
700        upper,
701        lower_inc,
702        upper_inc,
703        empty,
704        ..
705    } = v
706    else {
707        return alloc::string::String::new();
708    };
709    if *empty {
710        return "empty".into();
711    }
712    let mut out = alloc::string::String::new();
713    out.push(if *lower_inc { '[' } else { '(' });
714    if let Some(l) = lower {
715        out.push_str(&format_range_element(l));
716    }
717    out.push(',');
718    if let Some(u) = upper {
719        out.push_str(&format_range_element(u));
720    }
721    out.push(if *upper_inc { ']' } else { ')' });
722    out
723}
724
725pub(crate) fn format_range_element(v: &Value) -> alloc::string::String {
726    match v {
727        Value::Int(n) => alloc::format!("{n}"),
728        Value::BigInt(n) => alloc::format!("{n}"),
729        Value::Date(d) => crate::eval::format_date(*d),
730        Value::Timestamp(t) => crate::eval::format_timestamp(*t),
731        Value::Numeric { scaled, scale } => crate::eval::format_numeric(*scaled, *scale),
732        other => alloc::format!("{other:?}"),
733    }
734}
735
736/// v7.17.0 Phase 3.P0-35 — parse a PG `money` literal into i64
737/// cents. Accepts:
738///   * Optional leading `-` (negative)
739///   * Optional `$` prefix
740///   * Integer portion with optional `,` thousands separators
741///   * Optional `.` followed by 1-2 digits (cents); 1 digit
742///     auto-pads to 2 (`.5` → 50 cents).
743///
744/// Returns None on any parse failure — caller surfaces as hard
745/// SQL error.
746pub(crate) fn parse_money_str(s: &str) -> Option<i64> {
747    let s = s.trim();
748    let (neg, rest) = match s.strip_prefix('-') {
749        Some(r) => (true, r.trim_start()),
750        None => (false, s),
751    };
752    let rest = rest.strip_prefix('$').unwrap_or(rest).trim_start();
753    let (int_part, frac_part) = match rest.split_once('.') {
754        Some((i, f)) => (i, Some(f)),
755        None => (rest, None),
756    };
757    if int_part.is_empty() {
758        return None;
759    }
760    // Validate + strip commas from the integer portion.
761    let mut int_digits = alloc::string::String::with_capacity(int_part.len());
762    for b in int_part.bytes() {
763        match b {
764            b',' => {}
765            b'0'..=b'9' => int_digits.push(b as char),
766            _ => return None,
767        }
768    }
769    if int_digits.is_empty() {
770        return None;
771    }
772    let dollars: i64 = int_digits.parse().ok()?;
773    let cents: i64 = match frac_part {
774        None => 0,
775        Some(f) => {
776            if f.is_empty() || f.len() > 2 || !f.bytes().all(|b| b.is_ascii_digit()) {
777                return None;
778            }
779            let padded = if f.len() == 1 {
780                alloc::format!("{f}0")
781            } else {
782                f.to_string()
783            };
784            padded.parse().ok()?
785        }
786    };
787    let total = dollars.checked_mul(100)?.checked_add(cents)?;
788    Some(if neg { -total } else { total })
789}
790
791/// v7.17.0 Phase 3.P0-34 — parse a PG `timetz` literal
792/// `HH:MM:SS[.fraction]±HH[:MM]` into (us, offset_secs).
793///
794/// The offset suffix is MANDATORY: SPG doesn't have a session TZ
795/// wired into eval, so a bare `HH:MM:SS` literal would be
796/// ambiguous. Returns None for any parse failure or out-of-range
797/// component — caller surfaces as a hard SQL error.
798///
799/// Offset range: ±14 hours (±50400 seconds), matching PG's
800/// internal limit.
801pub(crate) fn parse_timetz_str(s: &str) -> Option<(i64, i32)> {
802    let s = s.trim();
803    // Find the offset sign — scan from right since the time part
804    // never contains '+' / '-' (after the optional fractional dot
805    // it's all digits and ':').
806    let bytes = s.as_bytes();
807    let sign_pos = bytes
808        .iter()
809        .enumerate()
810        .rev()
811        .find(|&(_, &b)| b == b'+' || b == b'-')
812        .map(|(i, _)| i)?;
813    if sign_pos == 0 {
814        return None; // bare sign — no time component
815    }
816    let time_part = &s[..sign_pos];
817    let offset_part = &s[sign_pos..];
818    let us = parse_time_str(time_part)?;
819    let sign: i32 = if offset_part.starts_with('+') { 1 } else { -1 };
820    let offset_body = &offset_part[1..];
821    let (hh_str, mm_str) = match offset_body.split_once(':') {
822        Some((h, m)) => (h, m),
823        None => (offset_body, "0"),
824    };
825    let hh: i32 = hh_str.parse().ok()?;
826    let mm: i32 = mm_str.parse().ok()?;
827    if !(0..=14).contains(&hh) || !(0..=59).contains(&mm) {
828        return None;
829    }
830    let total = sign * (hh * 3600 + mm * 60);
831    if total.abs() > 50_400 {
832        return None;
833    }
834    Some((us, total))
835}
836
837/// v7.17.0 Phase 3.P0-33 — funnel an integer literal through MySQL
838/// YEAR range validation: 0 sentinel or 1901..=2155. Out-of-range
839/// surfaces as a hard SQL error (no silent truncation, mirrors PG
840/// `time_in` / `uuid_in` discipline).
841pub(crate) fn coerce_int_to_year(n: i64, col_name: &str) -> Result<Value, EngineError> {
842    if n == 0 || (1901..=2155).contains(&n) {
843        // u16::try_from cannot fail in this range; the cast also
844        // covers the 0 sentinel.
845        return Ok(Value::Year(n as u16));
846    }
847    Err(EngineError::Eval(EvalError::TypeMismatch {
848        detail: alloc::format!(
849            "year value out of range: {n} (column `{col_name}`; \
850             MySQL accepts 0 or 1901..=2155)"
851        ),
852    }))
853}
854
855/// v7.17.0 Phase 3.P0-32 — parse a PG `time` literal
856/// `HH:MM:SS[.fraction]` into microseconds since 00:00:00.
857///
858/// Accepts:
859///   * `HH:MM:SS`            — exact-second precision
860///   * `HH:MM:SS.f` .. `.ffffff` — 1-6 fractional digits, right-padded
861///     with zeros to microseconds
862///
863/// Range: hour 0..=23, minute 0..=59, second 0..=59. Anything else
864/// returns None — caller surfaces as a hard SQL error (no silent
865/// truncation, matches PG's `time_in` behaviour).
866pub(crate) fn parse_time_str(s: &str) -> Option<i64> {
867    let s = s.trim();
868    let (hms, frac) = match s.split_once('.') {
869        Some((h, f)) => (h, Some(f)),
870        None => (s, None),
871    };
872    let mut parts = hms.split(':');
873    let hh: u32 = parts.next()?.parse().ok()?;
874    let mm: u32 = parts.next()?.parse().ok()?;
875    let ss: u32 = parts.next()?.parse().ok()?;
876    if parts.next().is_some() {
877        return None;
878    }
879    if hh > 23 || mm > 59 || ss > 59 {
880        return None;
881    }
882    let frac_us: i64 = match frac {
883        None => 0,
884        Some(f) => {
885            if f.is_empty() || f.len() > 6 || !f.bytes().all(|b| b.is_ascii_digit()) {
886                return None;
887            }
888            // Right-pad with zeros so '.5' = 500000 µsec.
889            let mut padded = alloc::string::String::with_capacity(6);
890            padded.push_str(f);
891            while padded.len() < 6 {
892                padded.push('0');
893            }
894            padded.parse().ok()?
895        }
896    };
897    Some(
898        i64::from(hh) * 3_600_000_000
899            + i64::from(mm) * 60_000_000
900            + i64::from(ss) * 1_000_000
901            + frac_us,
902    )
903}
904
905pub(crate) const fn column_type_to_data_type(t: ColumnTypeName) -> DataType {
906    match t {
907        ColumnTypeName::SmallInt => DataType::SmallInt,
908        ColumnTypeName::Int => DataType::Int,
909        ColumnTypeName::BigInt => DataType::BigInt,
910        ColumnTypeName::Float => DataType::Float,
911        ColumnTypeName::Text => DataType::Text,
912        ColumnTypeName::Varchar(n) => DataType::Varchar(n),
913        ColumnTypeName::Char(n) => DataType::Char(n),
914        ColumnTypeName::Bool => DataType::Bool,
915        ColumnTypeName::Vector { dim, encoding } => DataType::Vector {
916            dim,
917            encoding: match encoding {
918                SqlVecEncoding::F32 => VecEncoding::F32,
919                SqlVecEncoding::Sq8 => VecEncoding::Sq8,
920                SqlVecEncoding::F16 => VecEncoding::F16,
921            },
922        },
923        ColumnTypeName::Numeric(precision, scale) => DataType::Numeric { precision, scale },
924        ColumnTypeName::Date => DataType::Date,
925        ColumnTypeName::Timestamp => DataType::Timestamp,
926        ColumnTypeName::Timestamptz => DataType::Timestamptz,
927        ColumnTypeName::Json => DataType::Json,
928        ColumnTypeName::Jsonb => DataType::Jsonb,
929        ColumnTypeName::Bytes => DataType::Bytes,
930        ColumnTypeName::TextArray => DataType::TextArray,
931        ColumnTypeName::IntArray => DataType::IntArray,
932        ColumnTypeName::BigIntArray => DataType::BigIntArray,
933        ColumnTypeName::TsVector => DataType::TsVector,
934        ColumnTypeName::TsQuery => DataType::TsQuery,
935        ColumnTypeName::Uuid => DataType::Uuid,
936        ColumnTypeName::Time => DataType::Time,
937        ColumnTypeName::Year => DataType::Year,
938        ColumnTypeName::TimeTz => DataType::TimeTz,
939        ColumnTypeName::Money => DataType::Money,
940        ColumnTypeName::Range(k) => DataType::Range(match k {
941            spg_sql::ast::RangeKindAst::Int4 => spg_storage::RangeKind::Int4,
942            spg_sql::ast::RangeKindAst::Int8 => spg_storage::RangeKind::Int8,
943            spg_sql::ast::RangeKindAst::Num => spg_storage::RangeKind::Num,
944            spg_sql::ast::RangeKindAst::Ts => spg_storage::RangeKind::Ts,
945            spg_sql::ast::RangeKindAst::TsTz => spg_storage::RangeKind::TsTz,
946            spg_sql::ast::RangeKindAst::Date => spg_storage::RangeKind::Date,
947        }),
948        ColumnTypeName::Hstore => DataType::Hstore,
949        ColumnTypeName::IntArray2D => DataType::IntArray2D,
950        ColumnTypeName::BigIntArray2D => DataType::BigIntArray2D,
951        ColumnTypeName::TextArray2D => DataType::TextArray2D,
952    }
953}
954
955/// Convert an INSERT VALUES expression to a storage Value. Supports literal
956/// expressions, unary-minus over numeric literals, and pgvector-style
957/// `'[..]'::vector` cast (v1.2). Anything more complex returns `Unsupported`.
958pub(crate) fn literal_expr_to_value(expr: Expr) -> Result<Value, EngineError> {
959    match expr {
960        Expr::Literal(l) => Ok(literal_to_value(l)),
961        Expr::Cast { expr, target } => {
962            let inner_value = literal_expr_to_value(*expr)?;
963            crate::eval::cast_value(inner_value, target).map_err(EngineError::Eval)
964        }
965        Expr::Unary {
966            op: UnOp::Neg,
967            expr,
968        } => match *expr {
969            Expr::Literal(Literal::Integer(n)) => {
970                // Fold to i32 if it fits, else BigInt. Parser emits Integer(i64)
971                // — overflow on negate of i64::MIN is the one edge case.
972                let neg = n.checked_neg().ok_or_else(|| {
973                    EngineError::Unsupported("integer literal overflow on negation".into())
974                })?;
975                Ok(int_value_for(neg))
976            }
977            Expr::Literal(Literal::Float(x)) => Ok(Value::Float(-x)),
978            other => Err(EngineError::Unsupported(alloc::format!(
979                "unary minus over non-literal expression: {other:?}"
980            ))),
981        },
982        // v7.10.10 — `ARRAY[lit, lit, …]` constructor accepted at
983        // INSERT-time. Each element must reduce to a Value through
984        // `literal_expr_to_value`; NULL elements become `None`.
985        // v7.11.13 — deduce shape from element values: all Int →
986        // IntArray; any BigInt → BigIntArray (widening); any Text
987        // → TextArray. Cast targets (`ARRAY[]::INT[]`) flow through
988        // the outer Cast arm before reaching here and re-coerce.
989        Expr::Array(items) => {
990            let mut materialised: alloc::vec::Vec<Value> =
991                alloc::vec::Vec::with_capacity(items.len());
992            for elem in items {
993                materialised.push(literal_expr_to_value(elem)?);
994            }
995            Ok(array_literal_widen(materialised))
996        }
997        // Any other Expr shape — fall back to a general evaluation
998        // against an empty row + empty schema. This unblocks the
999        // app-common patterns where INSERT VALUES carries a
1000        // non-correlated function call:
1001        //   INSERT INTO t VALUES (concat('U-', 42))
1002        //   INSERT INTO t VALUES (now())
1003        //   INSERT INTO t VALUES (format('%s-%s', 'a', 'b'))
1004        // Any expression that references a column or `$N`
1005        // placeholder fails cleanly inside `eval_expr` with a
1006        // descriptive error; literals + casts + ARRAY[…] continue
1007        // to take the fast paths above so the hot INSERT path is
1008        // unchanged on the common case.
1009        other => {
1010            let empty_schema: alloc::vec::Vec<spg_storage::ColumnSchema> = alloc::vec::Vec::new();
1011            let ctx = EvalContext::new(&empty_schema, None);
1012            let empty_row = spg_storage::Row::new(alloc::vec::Vec::new());
1013            crate::eval::eval_expr(&other, &empty_row, &ctx).map_err(EngineError::Eval)
1014        }
1015    }
1016}
1017
1018pub(crate) fn literal_to_value(l: Literal) -> Value {
1019    match l {
1020        Literal::Integer(n) => int_value_for(n),
1021        Literal::Float(x) => Value::Float(x),
1022        Literal::String(s) => Value::Text(s),
1023        Literal::Bool(b) => Value::Bool(b),
1024        Literal::Null => Value::Null,
1025        Literal::Vector(v) => Value::Vector(v),
1026        Literal::TextArray(items) => Value::TextArray(items),
1027        Literal::IntArray(items) => Value::IntArray(items),
1028        Literal::BigIntArray(items) => Value::BigIntArray(items),
1029        Literal::Interval { months, micros, .. } => Value::Interval { months, micros },
1030    }
1031}
1032
1033/// Pick `Int` (`i32`) when the literal fits, else `BigInt`. `INT` vs `BIGINT`
1034/// columns will still enforce the right tag downstream — this is just the
1035/// default we synthesise from an unannotated integer literal.
1036pub(crate) fn int_value_for(n: i64) -> Value {
1037    if let Ok(small) = i32::try_from(n) {
1038        Value::Int(small)
1039    } else {
1040        Value::BigInt(n)
1041    }
1042}
1043
1044/// Widen / narrow `v` to fit `expected`. Numerics permit safe widening
1045/// (`Int → BigInt`, `Int/BigInt → Float`) and best-effort narrowing
1046/// (`BigInt → Int` succeeds only when the value fits in `i32`). Everything
1047/// else returns `TypeMismatch` carrying the column name for caller diagnostics.
1048/// `NULL` is always permitted; the nullability check happens later in storage.
1049#[allow(clippy::too_many_lines)]
1050/// v7.17.0 Phase 4.4 — reject negative integer values on UNSIGNED
1051/// columns. Called after `coerce_value` at each INSERT / UPDATE
1052/// site that has ColumnSchema context. NULL passes through (a
1053/// nullable UNSIGNED column can legitimately hold NULL).
1054pub(crate) fn check_unsigned_range(
1055    v: &Value,
1056    schema: &ColumnSchema,
1057    position: usize,
1058) -> Result<(), EngineError> {
1059    if !schema.is_unsigned {
1060        return Ok(());
1061    }
1062    let n = match v {
1063        Value::SmallInt(x) => i64::from(*x),
1064        Value::Int(x) => i64::from(*x),
1065        Value::BigInt(x) => *x,
1066        _ => return Ok(()), // non-integer cells (NULL, default) skip
1067    };
1068    if n < 0 {
1069        return Err(EngineError::Unsupported(alloc::format!(
1070            "column {:?} is UNSIGNED but got negative value {n} at position {position}",
1071            schema.name
1072        )));
1073    }
1074    Ok(())
1075}
1076
1077pub(crate) fn coerce_value(
1078    v: Value,
1079    expected: DataType,
1080    col_name: &str,
1081    position: usize,
1082) -> Result<Value, EngineError> {
1083    if v.is_null() {
1084        return Ok(Value::Null);
1085    }
1086    let actual = v.data_type().expect("non-null");
1087    if actual == expected {
1088        return Ok(v);
1089    }
1090    let coerced = match (v, expected) {
1091        (Value::Int(n), DataType::BigInt) => Some(Value::BigInt(i64::from(n))),
1092        (Value::Int(n), DataType::Float) => Some(Value::Float(f64::from(n))),
1093        (Value::Int(n), DataType::SmallInt) => i16::try_from(n).ok().map(Value::SmallInt),
1094        (Value::Int(n), DataType::Numeric { precision, scale }) => Some(numeric_from_integer(
1095            i128::from(n),
1096            precision,
1097            scale,
1098            col_name,
1099        )?),
1100        (Value::SmallInt(n), DataType::Int) => Some(Value::Int(i32::from(n))),
1101        (Value::SmallInt(n), DataType::BigInt) => Some(Value::BigInt(i64::from(n))),
1102        (Value::SmallInt(n), DataType::Float) => Some(Value::Float(f64::from(n))),
1103        (Value::SmallInt(n), DataType::Numeric { precision, scale }) => Some(numeric_from_integer(
1104            i128::from(n),
1105            precision,
1106            scale,
1107            col_name,
1108        )?),
1109        (Value::BigInt(n), DataType::Int) => i32::try_from(n).ok().map(Value::Int),
1110        (Value::BigInt(n), DataType::SmallInt) => i16::try_from(n).ok().map(Value::SmallInt),
1111        #[allow(clippy::cast_precision_loss)]
1112        (Value::BigInt(n), DataType::Float) => Some(Value::Float(n as f64)),
1113        (Value::BigInt(n), DataType::Numeric { precision, scale }) => Some(numeric_from_integer(
1114            i128::from(n),
1115            precision,
1116            scale,
1117            col_name,
1118        )?),
1119        (Value::Float(x), DataType::Numeric { precision, scale }) => {
1120            Some(numeric_from_float(x, precision, scale, col_name)?)
1121        }
1122        // v7.17.0 Phase 3.P0-67 — Text → NUMERIC. Parse a
1123        // canonical decimal text (`"-1234.56"` / `"42"` /
1124        // `"0.0001"`) into `(mantissa, source_scale)` and rescale
1125        // to the column's declared scale. Required for prepared
1126        // binds: `value_to_literal` flattens a Value::Numeric
1127        // into a TEXT literal because Literal carries no native
1128        // Numeric variant, so the placeholder substitution path
1129        // reaches coerce_value as Text → Numeric. Without this
1130        // arm the round-trip surfaces a TypeMismatch even though
1131        // the cell already left the engine as a valid Numeric.
1132        (Value::Text(s), DataType::Numeric { precision, scale }) => {
1133            let Some((mantissa, src_scale)) = parse_numeric_text(&s) else {
1134                return Err(EngineError::Eval(EvalError::TypeMismatch {
1135                    detail: alloc::format!("cannot parse {s:?} as NUMERIC for column `{col_name}`"),
1136                }));
1137            };
1138            Some(numeric_rescale(
1139                mantissa, src_scale, precision, scale, col_name,
1140            )?)
1141        }
1142        // Text → DATE / TIMESTAMP: parse canonical text forms.
1143        (Value::Text(s), DataType::Date) => {
1144            let d = eval::parse_date_literal(&s).ok_or_else(|| {
1145                EngineError::Eval(EvalError::TypeMismatch {
1146                    detail: alloc::format!("cannot parse {s:?} as DATE for column `{col_name}`"),
1147                })
1148            })?;
1149            Some(Value::Date(d))
1150        }
1151        // v7.14.0 — MySQL DEFAULT clauses quote integer / float
1152        // / boolean literals (`DEFAULT '0'`, `DEFAULT '1'`,
1153        // `DEFAULT '3.14'`, `DEFAULT 'true'`). Coerce the text
1154        // form to the column's numeric / bool type at DEFAULT-
1155        // installation time so the storage check sees a typed
1156        // value. Parse failures fall through to TypeMismatch.
1157        (Value::Text(s), DataType::SmallInt) => s.parse::<i16>().ok().map(Value::SmallInt),
1158        (Value::Text(s), DataType::Int) => s.parse::<i32>().ok().map(Value::Int),
1159        (Value::Text(s), DataType::BigInt) => s.parse::<i64>().ok().map(Value::BigInt),
1160        (Value::Text(s), DataType::Float) => s.parse::<f64>().ok().map(Value::Float),
1161        (Value::Text(s), DataType::Bool) => match s.to_ascii_lowercase().as_str() {
1162            "0" | "false" | "f" | "no" | "off" => Some(Value::Bool(false)),
1163            "1" | "true" | "t" | "yes" | "on" => Some(Value::Bool(true)),
1164            _ => None,
1165        },
1166        // v7.17.0 Phase 3.P0-46 — MySQL TINYINT(1) (which Phase 4.3
1167        // classifies as DataType::Bool) is the storage shape every
1168        // mysqldump-restored boolean column lands in. mysqldump emits
1169        // the values as integer `0` / `1` literals, so int → bool
1170        // coerce on INSERT is required for a 0-change cutover. MySQL's
1171        // rule is "any non-zero is truthy"; we follow that for all
1172        // signed int widths so the same coerce path serves an
1173        // explicit `BOOLEAN` column too.
1174        (Value::Int(n), DataType::Bool) => Some(Value::Bool(n != 0)),
1175        (Value::SmallInt(n), DataType::Bool) => Some(Value::Bool(n != 0)),
1176        (Value::BigInt(n), DataType::Bool) => Some(Value::Bool(n != 0)),
1177        // v4.9: Text ↔ JSON coercion. No structural validation —
1178        // any text literal is accepted; the responsibility for
1179        // valid JSON lies with the producer.
1180        (Value::Text(s), DataType::Json | DataType::Jsonb) => Some(Value::Json(s)),
1181        (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
1182        // v7.13.3 — mailrs round-7 S10. SPG's storage represents
1183        // both JSON and JSONB on-disk as `Value::Json(String)` —
1184        // they share the underlying text payload. The cast
1185        // `'<text>'::jsonb` produces a Value::Json that needs to
1186        // satisfy a DataType::Jsonb column. Identity coerce in
1187        // both directions so JSON ↔ JSONB assignments work at all
1188        // INSERT / ALTER COLUMN TYPE / DEFAULT contexts.
1189        (Value::Json(s), DataType::Jsonb | DataType::Json) => Some(Value::Json(s)),
1190        // v7.10.4 — Text → BYTEA. Decode PG-style literal forms:
1191        //   - Hex:    `\x48656c6c6f`  (case-insensitive hex pairs)
1192        //   - Escape: `Hello\\000world`  (backslash + octal triples)
1193        //   - Plain:  any string → raw UTF-8 bytes (PG also accepts)
1194        // Errors surface as TypeMismatch so the operator gets a
1195        // clear "this literal isn't a bytea literal" hint.
1196        (Value::Text(s), DataType::Bytes) => {
1197            let bytes = decode_bytea_literal(&s).map_err(|e| {
1198                EngineError::Eval(EvalError::TypeMismatch {
1199                    detail: alloc::format!(
1200                        "cannot parse {s:?} as BYTEA for column `{col_name}`: {e}"
1201                    ),
1202                })
1203            })?;
1204            Some(Value::Bytes(bytes))
1205        }
1206        // v7.10.4 — BYTEA → Text round-trip uses the PG hex
1207        // output (lowercase, `\x` prefix). Important when a
1208        // SELECT pulls a bytea cell through a Text column path.
1209        (Value::Bytes(b), DataType::Text) => Some(Value::Text(encode_bytea_hex(&b))),
1210        // v7.17.0 — Text → UUID. PG accepts canonical hyphenated,
1211        // unhyphenated, uppercase, and `{...}`-braced forms; we
1212        // funnel all four through `spg_storage::parse_uuid_str`.
1213        // A malformed literal surfaces as a SQL TypeMismatch
1214        // rather than silently inserting garbage — `0-change
1215        // cutover` requires that an app inserting bad UUID text
1216        // sees the same hard error PG would raise.
1217        (Value::Text(s), DataType::Uuid) => match spg_storage::parse_uuid_str(&s) {
1218            Some(b) => Some(Value::Uuid(b)),
1219            None => {
1220                return Err(EngineError::Eval(EvalError::TypeMismatch {
1221                    detail: alloc::format!(
1222                        "invalid input syntax for type uuid: {s:?} (column `{col_name}`)"
1223                    ),
1224                }));
1225            }
1226        },
1227        // v7.17.0 — UUID → Text canonical 8-4-4-4-12 lowercase.
1228        // Surfaces when a SELECT plucks a uuid cell through a
1229        // Text column path (e.g. INSERT INTO log SELECT id::text
1230        // FROM other_table).
1231        (Value::Uuid(b), DataType::Text) => Some(Value::Text(spg_storage::format_uuid(&b))),
1232        // v7.17.0 Phase 3.P0-32 — Text → TIME. Accepts
1233        // `HH:MM:SS` and `HH:MM:SS.ffffff` (1-6 fractional digits).
1234        // Out-of-range hour/min/sec is a hard SQL error (no
1235        // silent truncation — same 0-change-cutover discipline
1236        // we apply to UUID).
1237        (Value::Text(s), DataType::Time) => match parse_time_str(&s) {
1238            Some(us) => Some(Value::Time(us)),
1239            None => {
1240                return Err(EngineError::Eval(EvalError::TypeMismatch {
1241                    detail: alloc::format!(
1242                        "invalid input syntax for type time: {s:?} (column `{col_name}`)"
1243                    ),
1244                }));
1245            }
1246        },
1247        // v7.17.0 Phase 3.P0-32 — TIME → Text canonical `HH:MM:SS[.ffffff]`.
1248        (Value::Time(us), DataType::Text) => Some(Value::Text(eval::format_time(us))),
1249        // v7.17.0 Phase 3.P0-33 — int / bigint → YEAR. Range
1250        // check enforces the MySQL canonical 1901..=2155 + 0
1251        // sentinel; out-of-range is a hard SQL error (no silent
1252        // truncation, mirrors P0-32 / P0-25 discipline).
1253        (Value::SmallInt(n), DataType::Year) => Some(coerce_int_to_year(i64::from(n), col_name)?),
1254        (Value::Int(n), DataType::Year) => Some(coerce_int_to_year(i64::from(n), col_name)?),
1255        (Value::BigInt(n), DataType::Year) => Some(coerce_int_to_year(n, col_name)?),
1256        // Text → YEAR. Accepts the 4-digit decimal form only;
1257        // two-digit YEAR (`'99'` → 1999) was deprecated in MySQL
1258        // 5.7 and is out of scope for v7.17.0.
1259        (Value::Text(s), DataType::Year) => match s.trim().parse::<i64>() {
1260            Ok(n) => Some(coerce_int_to_year(n, col_name)?),
1261            Err(_) => {
1262                return Err(EngineError::Eval(EvalError::TypeMismatch {
1263                    detail: alloc::format!(
1264                        "invalid input syntax for type year: {s:?} (column `{col_name}`)"
1265                    ),
1266                }));
1267            }
1268        },
1269        // YEAR → Text 4-digit zero-padded.
1270        (Value::Year(y), DataType::Text) => Some(Value::Text(alloc::format!("{y:04}"))),
1271        // v7.17.0 Phase 3.P0-34 — Text → TIMETZ. Mandatory
1272        // signed offset suffix; missing offset is a hard error
1273        // (SPG has no session TZ wired into eval, unlike PG).
1274        (Value::Text(s), DataType::TimeTz) => match parse_timetz_str(&s) {
1275            Some((us, offset_secs)) => Some(Value::TimeTz { us, offset_secs }),
1276            None => {
1277                return Err(EngineError::Eval(EvalError::TypeMismatch {
1278                    detail: alloc::format!(
1279                        "invalid input syntax for type time with time zone: \
1280                         {s:?} (column `{col_name}`)"
1281                    ),
1282                }));
1283            }
1284        },
1285        // TIMETZ → Text canonical `HH:MM:SS[.ffffff]±HH[:MM]`.
1286        (Value::TimeTz { us, offset_secs }, DataType::Text) => {
1287            Some(Value::Text(eval::format_timetz(us, offset_secs)))
1288        }
1289        // v7.17.0 Phase 3.P0-35 — Text → MONEY. Accepts `$N.NN`,
1290        // `$N,NNN.NN`, optional leading `-`. Bare numeric literals
1291        // arrive via the Int/BigInt/Float/Numeric arms below.
1292        (Value::Text(s), DataType::Money) => match parse_money_str(&s) {
1293            Some(c) => Some(Value::Money(c)),
1294            None => {
1295                return Err(EngineError::Eval(EvalError::TypeMismatch {
1296                    detail: alloc::format!(
1297                        "invalid input syntax for type money: {s:?} (column `{col_name}`)"
1298                    ),
1299                }));
1300            }
1301        },
1302        // Int / BigInt / SmallInt / Float / Numeric → MONEY.
1303        // Bare numeric literal is interpreted as a major-unit
1304        // amount (matches PG: `100`::money → $100.00 = 10000 cents).
1305        (Value::SmallInt(n), DataType::Money) => {
1306            Some(Value::Money(i64::from(n).saturating_mul(100)))
1307        }
1308        (Value::Int(n), DataType::Money) => Some(Value::Money(i64::from(n).saturating_mul(100))),
1309        (Value::BigInt(n), DataType::Money) => Some(Value::Money(n.saturating_mul(100))),
1310        (Value::Float(x), DataType::Money) => {
1311            // Round half-away-from-zero to cents (no_std — no
1312            // `f64::round`, so hand-roll via biased truncation).
1313            let scaled = x * 100.0;
1314            let cents = if scaled >= 0.0 {
1315                (scaled + 0.5) as i64
1316            } else {
1317                (scaled - 0.5) as i64
1318            };
1319            Some(Value::Money(cents))
1320        }
1321        (Value::Numeric { scaled, scale }, DataType::Money) => {
1322            // Convert exact decimal to cents (scale 2). If scale > 2,
1323            // round half-away-from-zero. If scale < 2, multiply up.
1324            let cents = if scale == 2 {
1325                scaled
1326            } else if scale < 2 {
1327                let mult = 10_i128.pow(u32::from(2 - scale));
1328                scaled.saturating_mul(mult)
1329            } else {
1330                let div = 10_i128.pow(u32::from(scale - 2));
1331                let half = div / 2;
1332                let bias = if scaled >= 0 { half } else { -half };
1333                (scaled + bias) / div
1334            };
1335            Some(Value::Money(i64::try_from(cents).unwrap_or(i64::MAX)))
1336        }
1337        // MONEY → Text canonical `$N,NNN.CC`.
1338        (Value::Money(c), DataType::Text) => Some(Value::Text(eval::format_money(c))),
1339        // v7.17.0 Phase 3.P0-38 — Text → Range. Accepts canonical
1340        // PG forms: `'empty'`, `'[a,b)'`, `'(a,b]'`, `'[a,b]'`,
1341        // `'(a,b)'`, with empty lower or upper for unbounded.
1342        (Value::Text(s), DataType::Range(kind)) => match parse_range_str(&s, kind) {
1343            Some(v) => Some(v),
1344            None => {
1345                return Err(EngineError::Eval(EvalError::TypeMismatch {
1346                    detail: alloc::format!(
1347                        "invalid input syntax for range type: {s:?} (column `{col_name}`)"
1348                    ),
1349                }));
1350            }
1351        },
1352        // Range → Text canonical form (`[a,b)`, `'empty'`, etc).
1353        (v @ Value::Range { .. }, DataType::Text) => Some(Value::Text(format_range_str(&v))),
1354        // v7.17.0 Phase 3.P0-39 — Text → Hstore.
1355        (Value::Text(s), DataType::Hstore) => match parse_hstore_str(&s) {
1356            Some(pairs) => Some(Value::Hstore(pairs)),
1357            None => {
1358                return Err(EngineError::Eval(EvalError::TypeMismatch {
1359                    detail: alloc::format!(
1360                        "invalid input syntax for type hstore: {s:?} (column `{col_name}`)"
1361                    ),
1362                }));
1363            }
1364        },
1365        // Hstore → Text canonical `"k"=>"v"` form.
1366        (Value::Hstore(pairs), DataType::Text) => Some(Value::Text(format_hstore_str(&pairs))),
1367        // v7.17.0 Phase 3.P0-40 — Text → 2D arrays via PG
1368        // external `'{{a,b},{c,d}}'` literal.
1369        (Value::Text(s), DataType::IntArray2D) => match parse_int_2d_literal(&s) {
1370            Ok(m) => Some(Value::IntArray2D(m)),
1371            Err(e) => {
1372                return Err(EngineError::Eval(EvalError::TypeMismatch {
1373                    detail: alloc::format!(
1374                        "invalid input syntax for INT[][]: {s:?} (column `{col_name}`): {e}"
1375                    ),
1376                }));
1377            }
1378        },
1379        (Value::Text(s), DataType::BigIntArray2D) => match parse_bigint_2d_literal(&s) {
1380            Ok(m) => Some(Value::BigIntArray2D(m)),
1381            Err(e) => {
1382                return Err(EngineError::Eval(EvalError::TypeMismatch {
1383                    detail: alloc::format!(
1384                        "invalid input syntax for BIGINT[][]: {s:?} (column `{col_name}`): {e}"
1385                    ),
1386                }));
1387            }
1388        },
1389        (Value::Text(s), DataType::TextArray2D) => match parse_text_2d_literal(&s) {
1390            Ok(m) => Some(Value::TextArray2D(m)),
1391            Err(e) => {
1392                return Err(EngineError::Eval(EvalError::TypeMismatch {
1393                    detail: alloc::format!(
1394                        "invalid input syntax for TEXT[][]: {s:?} (column `{col_name}`): {e}"
1395                    ),
1396                }));
1397            }
1398        },
1399        // 2D arrays → Text canonical nested form.
1400        (Value::IntArray2D(rows), DataType::Text) => Some(Value::Text(format_int_2d_text(&rows))),
1401        (Value::BigIntArray2D(rows), DataType::Text) => {
1402            Some(Value::Text(format_bigint_2d_text(&rows)))
1403        }
1404        (Value::TextArray2D(rows), DataType::Text) => Some(Value::Text(format_text_2d_text(&rows))),
1405        // v7.10.11 — Text → TEXT[]. Decode PG's external array
1406        // form `'{a,b,NULL}'`. NULL element token (case-insensitive)
1407        // is the literal `NULL`; everything else is a quoted or
1408        // unquoted text element. mailrs `'{label1,label2}'::TEXT[]`.
1409        (Value::Text(s), DataType::TextArray) => {
1410            let arr = decode_text_array_literal(&s).map_err(|e| {
1411                EngineError::Eval(EvalError::TypeMismatch {
1412                    detail: alloc::format!(
1413                        "cannot parse {s:?} as TEXT[] for column `{col_name}`: {e}"
1414                    ),
1415                })
1416            })?;
1417            Some(Value::TextArray(arr))
1418        }
1419        // v7.16.0 — Text → IntArray / BigIntArray for the
1420        // spg-sqlx Bind path. Decode the PG external form
1421        // `{1,2,3}` as a TEXT array first, then parse each
1422        // element as int. Same shape as the TextArray decode
1423        // above with an element-wise narrow.
1424        (Value::Text(s), DataType::IntArray) => {
1425            let arr = decode_text_array_literal(&s).map_err(|e| {
1426                EngineError::Eval(EvalError::TypeMismatch {
1427                    detail: alloc::format!(
1428                        "cannot parse {s:?} as INT[] for column `{col_name}`: {e}"
1429                    ),
1430                })
1431            })?;
1432            let mut out: Vec<Option<i32>> = Vec::with_capacity(arr.len());
1433            for elem in arr {
1434                match elem {
1435                    None => out.push(None),
1436                    Some(t) => {
1437                        let n: i32 = t.parse().map_err(|_| {
1438                            EngineError::Eval(EvalError::TypeMismatch {
1439                                detail: alloc::format!(
1440                                    "cannot parse {t:?} as INT element for `{col_name}`"
1441                                ),
1442                            })
1443                        })?;
1444                        out.push(Some(n));
1445                    }
1446                }
1447            }
1448            Some(Value::IntArray(out))
1449        }
1450        (Value::Text(s), DataType::BigIntArray) => {
1451            let arr = decode_text_array_literal(&s).map_err(|e| {
1452                EngineError::Eval(EvalError::TypeMismatch {
1453                    detail: alloc::format!(
1454                        "cannot parse {s:?} as BIGINT[] for column `{col_name}`: {e}"
1455                    ),
1456                })
1457            })?;
1458            let mut out: Vec<Option<i64>> = Vec::with_capacity(arr.len());
1459            for elem in arr {
1460                match elem {
1461                    None => out.push(None),
1462                    Some(t) => {
1463                        let n: i64 = t.parse().map_err(|_| {
1464                            EngineError::Eval(EvalError::TypeMismatch {
1465                                detail: alloc::format!(
1466                                    "cannot parse {t:?} as BIGINT element for `{col_name}`"
1467                                ),
1468                            })
1469                        })?;
1470                        out.push(Some(n));
1471                    }
1472                }
1473            }
1474            Some(Value::BigIntArray(out))
1475        }
1476        // v7.10.11 — TEXT[] → Text round-trip uses PG's
1477        // external array form (`{a,b,NULL}`). Lets a SELECT
1478        // pull an array column through any Text-side codepath.
1479        (Value::TextArray(items), DataType::Text) => Some(Value::Text(encode_text_array(&items))),
1480        // v7.17.0 Phase 3.P0-68 — Text → VECTOR auto-coerce.
1481        // Matches the existing Text → TsVector arm and the
1482        // `::vector` cast: PG-canonical pgvector external form
1483        // (`'[1, 2, -3]'`) becomes a typed Vector value at the
1484        // column boundary. Dim mismatch surfaces as TypeMismatch.
1485        // For SQ8 / HALF encodings we chain through the standard
1486        // quantise helpers so the storage shape matches the
1487        // declared encoding without a second coerce pass.
1488        (Value::Text(s), DataType::Vector { dim, encoding }) => {
1489            let parsed = eval::parse_vector_text(&s).ok_or_else(|| {
1490                EngineError::Eval(EvalError::TypeMismatch {
1491                    detail: alloc::format!("cannot parse {s:?} as VECTOR for column `{col_name}`"),
1492                })
1493            })?;
1494            if parsed.len() != dim as usize {
1495                return Err(EngineError::Eval(EvalError::TypeMismatch {
1496                    detail: alloc::format!(
1497                        "VECTOR({dim}) column `{col_name}` rejects literal of length {}",
1498                        parsed.len()
1499                    ),
1500                }));
1501            }
1502            Some(match encoding {
1503                VecEncoding::F32 => Value::Vector(parsed),
1504                VecEncoding::Sq8 => Value::Sq8Vector(spg_storage::quantize::quantize(&parsed)),
1505                VecEncoding::F16 => {
1506                    Value::HalfVector(spg_storage::halfvec::HalfVector::from_f32_slice(&parsed))
1507                }
1508            })
1509        }
1510        // v7.16.1 — Text → TSVECTOR auto-coerce for the
1511        // INSERT-side wire path (mailrs round-9 A.2.a). PG
1512        // implicitly promotes the TEXT literal at INSERT into a
1513        // TSVECTOR column; SPG previously rejected with a hard
1514        // type mismatch, blocking 23,276 pg_dump rows into
1515        // `messages.search_vector`. We route through the same
1516        // `decode_tsvector_external` the `::tsvector` cast
1517        // already uses, so PG-canonical forms (`'word'`,
1518        // `'word:1A,2B'`, multi-lexeme, empty `''`) all parse.
1519        (Value::Text(s), DataType::TsVector) => {
1520            let lexs = eval::decode_tsvector_external(&s).map_err(|e| {
1521                EngineError::Eval(EvalError::TypeMismatch {
1522                    detail: alloc::format!(
1523                        "cannot parse {s:?} as TSVECTOR for column `{col_name}`: {e}"
1524                    ),
1525                })
1526            })?;
1527            Some(Value::TsVector(lexs))
1528        }
1529        (Value::Text(s), DataType::Timestamp | DataType::Timestamptz) => {
1530            let t = eval::parse_timestamp_literal(&s).ok_or_else(|| {
1531                EngineError::Eval(EvalError::TypeMismatch {
1532                    detail: alloc::format!(
1533                        "cannot parse {s:?} as TIMESTAMP for column `{col_name}`"
1534                    ),
1535                })
1536            })?;
1537            Some(Value::Timestamp(t))
1538        }
1539        // DATE ↔ TIMESTAMP convertibility (DATE → midnight,
1540        // TIMESTAMP → day truncation).
1541        (Value::Date(d), DataType::Timestamp | DataType::Timestamptz) => {
1542            Some(Value::Timestamp(i64::from(d) * 86_400_000_000))
1543        }
1544        // v7.9.21 — Value::Timestamp lands in either Timestamp
1545        // or Timestamptz columns; the on-disk layout is the
1546        // same i64 microseconds UTC.
1547        (Value::Timestamp(t), DataType::Timestamptz) => Some(Value::Timestamp(t)),
1548        (Value::Timestamp(t), DataType::Date) => {
1549            let days = t.div_euclid(86_400_000_000);
1550            i32::try_from(days).ok().map(Value::Date)
1551        }
1552        (
1553            Value::Numeric {
1554                scaled,
1555                scale: src_scale,
1556            },
1557            DataType::Numeric { precision, scale },
1558        ) => Some(numeric_rescale(
1559            scaled, src_scale, precision, scale, col_name,
1560        )?),
1561        #[allow(clippy::cast_precision_loss)]
1562        (Value::Numeric { scaled, scale }, DataType::Float) => {
1563            let mut div = 1.0_f64;
1564            for _ in 0..scale {
1565                div *= 10.0;
1566            }
1567            Some(Value::Float((scaled as f64) / div))
1568        }
1569        (Value::Numeric { scaled, scale }, DataType::Int) => {
1570            let truncated = numeric_truncate_to_integer(scaled, scale);
1571            i32::try_from(truncated).ok().map(Value::Int)
1572        }
1573        (Value::Numeric { scaled, scale }, DataType::BigInt) => {
1574            let truncated = numeric_truncate_to_integer(scaled, scale);
1575            i64::try_from(truncated).ok().map(Value::BigInt)
1576        }
1577        (Value::Numeric { scaled, scale }, DataType::SmallInt) => {
1578            let truncated = numeric_truncate_to_integer(scaled, scale);
1579            i16::try_from(truncated).ok().map(Value::SmallInt)
1580        }
1581        // VARCHAR(n) enforces an upper bound on character count.
1582        (Value::Text(s), DataType::Varchar(max)) => {
1583            if u32::try_from(s.chars().count()).unwrap_or(u32::MAX) <= max {
1584                Some(Value::Text(s))
1585            } else {
1586                return Err(EngineError::Unsupported(alloc::format!(
1587                    "value for VARCHAR({max}) column `{col_name}` exceeds length: \
1588                     {} chars",
1589                    s.chars().count()
1590                )));
1591            }
1592        }
1593        // v6.0.1: f32 → SQ8 INSERT-time quantisation. Triggered
1594        // when the column declares `VECTOR(N) USING SQ8` and
1595        // the INSERT VALUES expression yields a raw f32 vector
1596        // (the normal pgvector-shape literal). Dim mismatch
1597        // falls through the `_ => None` arm and surfaces as
1598        // `TypeMismatch` with the expected SQ8 column type —
1599        // matching the F32 path's existing error.
1600        (
1601            Value::Vector(v),
1602            DataType::Vector {
1603                dim,
1604                encoding: VecEncoding::Sq8,
1605            },
1606        ) if v.len() == dim as usize => Some(Value::Sq8Vector(spg_storage::quantize::quantize(&v))),
1607        // v6.0.3: f32 → f16 INSERT-time conversion for HALF
1608        // columns. Bit-exact at the storage layer (modulo
1609        // half-precision rounding); no rerank pass needed at
1610        // search time.
1611        (
1612            Value::Vector(v),
1613            DataType::Vector {
1614                dim,
1615                encoding: VecEncoding::F16,
1616            },
1617        ) if v.len() == dim as usize => Some(Value::HalfVector(
1618            spg_storage::halfvec::HalfVector::from_f32_slice(&v),
1619        )),
1620        // CHAR(n) right-pads with U+0020 to exactly n chars; if the input
1621        // is already longer we reject (PG truncates trailing-space-only;
1622        // staying strict for v1).
1623        (Value::Text(s), DataType::Char(size)) => {
1624            let len = u32::try_from(s.chars().count()).unwrap_or(u32::MAX);
1625            if len > size {
1626                return Err(EngineError::Unsupported(alloc::format!(
1627                    "value for CHAR({size}) column `{col_name}` exceeds length: \
1628                     {len} chars"
1629                )));
1630            }
1631            let need = (size - len) as usize;
1632            let mut padded = s;
1633            padded.reserve(need);
1634            for _ in 0..need {
1635                padded.push(' ');
1636            }
1637            Some(Value::Text(padded))
1638        }
1639        _ => None,
1640    };
1641    coerced.ok_or(EngineError::Storage(StorageError::TypeMismatch {
1642        column: col_name.into(),
1643        expected,
1644        actual,
1645        position,
1646    }))
1647}