Skip to main content

spg_storage/
lib.rs

1//! In-memory storage primitives.
2//!
3//! v0.3 is intentionally simple: a flat catalog of tables, each holding rows
4//! as `Vec<Value>` (positional, matching the table's `TableSchema`). No MVCC,
5//! no on-disk format — those land in later milestones.
6#![no_std]
7// v3.3.2 NEON path for l2_distance_sq (aarch64 only). Scoped allow:
8// `unsafe_code = "deny"` at workspace level stays in force for every
9// other crate.
10#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15mod codec;
16pub mod fts_simple;
17pub mod halfvec;
18mod nsw;
19pub mod persistent;
20pub mod persistent_btree;
21pub mod quantize;
22pub mod row_locator;
23pub mod segment;
24mod table;
25pub mod trgm;
26
27pub use self::bloom::{BloomError, BloomFilter};
28// v7.31 monster tier-3 cut 3 — on-disk codec moved to `codec`; the
29// public dense-row surface keeps its `spg_storage::*` paths, and the
30// low-level write/read primitives stay crate-visible for the
31// `Catalog::serialize`/`deserialize` methods that remain in this file.
32pub(crate) use self::codec::*;
33pub use self::codec::{decode_row_body_dense, encode_row_body_dense, row_body_encoded_len};
34// v7.31 monster tier-3 cut 2 — HNSW algorithms moved to `nsw`; the
35// public vector-search surface keeps its `spg_storage::*` paths via
36// these re-exports, and `nsw_insert_at` stays crate-visible for the
37// `Table` insert paths in the `table` module.
38pub(crate) use self::nsw::nsw_insert_at;
39pub use self::nsw::{NswMetric, cosine_dot_norms_f32, inner_product_f32, nsw_index_on, nsw_query};
40pub use self::row_locator::{RowLocator, RowLocatorError};
41pub use self::segment::{
42    BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
43    SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
44    SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
45    wrap_v2_envelope_with_brin,
46};
47
48use alloc::boxed::Box;
49use alloc::collections::{BTreeMap, BTreeSet};
50use alloc::format;
51use alloc::string::{String, ToString};
52use alloc::sync::Arc;
53use alloc::vec::Vec;
54use core::fmt;
55
56use self::persistent::PersistentVec;
57use self::persistent_btree::PersistentBTreeMap;
58
59/// In-cell encoding for `DataType::Vector`. Mirrors
60/// `spg_sql::ast::VecEncoding` — kept here so storage stays
61/// dep-free of `spg-sql`. The engine bridges between the two
62/// at DDL-execution time.
63///
64/// `F32` is the pre-v6 default: each cell holds a raw `Vec<f32>`.
65/// `Sq8` (v6.0.1) stores `Sq8Vector { min, max, bytes: Vec<u8> }`
66/// per cell; 4× compression vs `F32` with recall@10 ≥ 0.95 on
67/// natural embeddings (Gaussian / unit-sphere corpora).
68/// `F16` (v6.0.3, DDL keyword `HALF`) stores each element as
69/// IEEE-754 binary16; 2× compression and bit-exact dequantise.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
71pub enum VecEncoding {
72    #[default]
73    F32,
74    Sq8,
75    F16,
76}
77
78impl fmt::Display for VecEncoding {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        match self {
81            Self::F32 => f.write_str("F32"),
82            Self::Sq8 => f.write_str("SQ8"),
83            Self::F16 => f.write_str("HALF"),
84        }
85    }
86}
87
88/// Runtime type tags. `Vector { dim, encoding }` / `Varchar(max)` /
89/// `Char(size)` are parameterised; the parameter travels with both
90/// the column schema and the on-wire serialised representation.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum DataType {
93    /// 16-bit signed. Backed by `Value::SmallInt(i16)`; arithmetic that
94    /// would overflow surfaces as a type error at INSERT time.
95    SmallInt,
96    Int,    // 32-bit signed
97    BigInt, // 64-bit signed
98    Float,  // f64 (PG double precision)
99    Text,
100    /// `VARCHAR(n)` — same byte representation as `Text`, but INSERT
101    /// rejects values longer than `n` Unicode characters.
102    Varchar(u32),
103    /// `CHAR(n)` — same representation as `Text`, but INSERT right-pads
104    /// with U+0020 to exactly `n` Unicode characters (or rejects when
105    /// the input is already longer).
106    Char(u32),
107    Bool,
108    /// pgvector-style fixed-dimension vector. `encoding` selects
109    /// the in-cell representation (`F32` = pre-v6 raw f32 buffer;
110    /// `Sq8` = v6.0.1 8-bit scalar-quantised). The DDL grammar
111    /// surfaces encoding via the optional `USING <encoding>`
112    /// clause: `VECTOR(128) USING SQ8`.
113    Vector {
114        dim: u32,
115        encoding: VecEncoding,
116    },
117    /// `NUMERIC(precision, scale)` — exact fixed-point decimal stored as
118    /// a scaled `i128`. `precision` caps total decimal digits, `scale`
119    /// fixes digits after the decimal point. v1.12 supports up to
120    /// precision 38 (the i128-safe ceiling). `NUMERIC` and `NUMERIC(p)`
121    /// surface as `Numeric { precision: p, scale: 0 }`.
122    Numeric {
123        precision: u8,
124        scale: u8,
125    },
126    /// `DATE` — calendar date with day precision, stored as `i32` days
127    /// since the Unix epoch (1970-01-01).
128    Date,
129    /// `TIMESTAMP` (a.k.a. `MySQL` `DATETIME`) — instant with microsecond
130    /// precision, stored as `i64` microseconds since the Unix epoch.
131    Timestamp,
132    /// v7.9.2 `TIMESTAMPTZ` — bit-identical to `Timestamp` on disk
133    /// (i64 microseconds, UTC by convention). Carried as a distinct
134    /// type tag so the PG-wire layer can advertise OID 1184 (PG's
135    /// `timestamp with time zone`) and `sqlx`/`pgx`/JDBC clients
136    /// decode into their TZ-aware datetime types. The internal
137    /// semantics are unchanged: SPG never stored per-row offsets,
138    /// and neither did PG — `TIMESTAMPTZ` in PG is also UTC i64.
139    Timestamptz,
140    /// `INTERVAL` — calendar-aware span (months + microseconds). v2.11
141    /// supports INTERVAL only as a runtime intermediate (literals,
142    /// arithmetic results); on-disk encoding is rejected so this branch
143    /// can't appear in a `ColumnSchema`.
144    Interval,
145    /// v4.9: `JSON` — text-backed JSON document. We don't parse
146    /// the content (no path operators or jsonb functions yet) —
147    /// the column accepts any TEXT-compatible value and round-trips
148    /// it verbatim. PG OID 114 on the wire.
149    Json,
150    /// v7.9.0: `JSONB` — semantically identical to `Json` on
151    /// the storage side (same `Value::Json` cells, same
152    /// row codec), but advertised as PG OID 3802 on the wire
153    /// so `sqlx`-style clients that bind `jsonb` columns
154    /// decode correctly. mailrs migration blocker #3.
155    Jsonb,
156    /// v7.10.4: `BYTES` / `BYTEA` — variable-length raw binary.
157    /// Backed by `Value::Bytes(Vec<u8>)`. PG wire OID 17. Literal
158    /// forms accepted by parser/engine: PG hex form `'\xDEADBEEF'`
159    /// (case-insensitive hex pairs) and escape form
160    /// `'foo\\000bar'` (the latter decoded at coercion time when
161    /// the target column is BYTEA — TEXT columns leave the
162    /// backslash sequence verbatim).
163    Bytes,
164    /// v7.10.9: `TEXT[]` — single-dimension TEXT array. Elements
165    /// may be NULL (PG semantics). PG wire OID 1009. Literal
166    /// forms: `ARRAY['a', 'b', NULL]` and the PG external form
167    /// `'{a,b,NULL}'::TEXT[]`. Engine implements `= ANY(arr)`,
168    /// `<> ALL(arr)`, and 1-based indexing `arr[i]`. Catalog
169    /// FILE_VERSION 18+; older snapshots reject this DataType
170    /// (forward-only by design — TEXT[] columns aren't readable
171    /// on a pre-v7.10 binary).
172    TextArray,
173    /// v7.11.12: `INT[]` — single-dimension i32 array. PG wire
174    /// OID 1007 (_int4). Same `ARRAY[...]` / `'{1,2,3}'::INT[]`
175    /// literal surface as TEXT[]. Catalog FILE_VERSION 19+.
176    IntArray,
177    /// v7.11.12: `BIGINT[]` — single-dimension i64 array. PG
178    /// wire OID 1016 (_int8). Catalog FILE_VERSION 19+.
179    BigIntArray,
180    /// v7.12.0: PG `tsvector` — ordered, deduplicated set of
181    /// `(lexeme, positions, weight)` tuples. PG wire OID 3614.
182    /// Catalog FILE_VERSION 20+. Storage shape is row-codec
183    /// tag 22; the schema-agnostic `write_value` path emits tag
184    /// 18. Literal: `'foo:1 bar:2,3'::tsvector` (PG external
185    /// form). G-CRIT-3 entry — v7.12.0 only ships the type +
186    /// codec; matching `@@` lands in v7.12.2.
187    TsVector,
188    /// v7.12.0: PG `tsquery` — parse tree of lexemes joined by
189    /// `&` `|` `!` and phrase operators. PG wire OID 3615.
190    /// Catalog FILE_VERSION 20+.
191    TsQuery,
192    /// v7.17.0: PG `uuid` — 128-bit identifier stored as
193    /// `Value::Uuid([u8; 16])`. PG wire OID 2950. Canonical
194    /// text form is lowercase 8-4-4-4-12 hyphenated; input
195    /// also accepts uppercase, unhyphenated, and brace-wrapped
196    /// forms (`{xxxx…}`). Catalog FILE_VERSION 36+; tag 24 on
197    /// the dense type-tag side, tag 20 on the schema-agnostic
198    /// value side. The drop-in PG/MySQL surface for Django /
199    /// Rails / Hibernate "id UUID PRIMARY KEY DEFAULT
200    /// gen_random_uuid()" default-PK pattern.
201    Uuid,
202    /// v7.17.0 Phase 3.P0-32: PG `time` (without time zone) — i64
203    /// microseconds since 00:00:00. PG wire OID 1083. Display:
204    /// canonical zero-padded `HH:MM:SS` when fractional is zero,
205    /// `HH:MM:SS.ffffff` otherwise. Catalog FILE_VERSION 37+;
206    /// tag 25 on the dense type-tag side, tag 21 on the schema-
207    /// agnostic value side. The wall-clock-of-day half of PG's
208    /// date/time triplet (date / time / timestamp).
209    Time,
210    /// v7.17.0 Phase 3.P0-33: MySQL `YEAR` — u16 in range
211    /// 1901..=2155 plus the special zero-year sentinel 0. No
212    /// dedicated PG OID (advertised as INT4 / OID 23 on the wire
213    /// — psql renders integers, MySQL CLI renders 4-digit
214    /// zero-padded text). Display always 4 digits: `0000` for the
215    /// zero-year, `1985` / `2007` / etc otherwise. Catalog
216    /// FILE_VERSION 38+; tag 26 on the dense type-tag side, tag
217    /// 22 on the schema-agnostic value side.
218    Year,
219    /// v7.17.0 Phase 3.P0-34: PG `time with time zone` (TIMETZ) —
220    /// i64 microseconds since 00:00:00 in the local wall clock
221    /// PLUS i32 offset-from-UTC in seconds. PG wire OID 1266.
222    /// Display: `HH:MM:SS[.ffffff]±HH[:MM]` (PG `timetz_out`).
223    /// Range: offset in ±50400 seconds (±14 hours). Catalog
224    /// FILE_VERSION 39+; tag 27 on the dense type-tag side, tag
225    /// 23 on the schema-agnostic value side.
226    TimeTz,
227    /// v7.17.0 Phase 3.P0-35: PG `money` — i64 cents (locale-
228    /// independent storage). PG wire OID 790. Display: en_US
229    /// locale (`$N,NNN.CC`, negative → `-$1.23`). Input accepts
230    /// `$N.NN`, `$N,NNN.NN`, bare integer (treated as major
231    /// units), optional leading `-`. Range: full i64. Catalog
232    /// FILE_VERSION 40+; tag 28 on the dense type-tag side, tag
233    /// 24 on the schema-agnostic value side.
234    Money,
235    /// v7.17.0 Phase 3.P0-38: PG range type. The same DataType
236    /// variant covers all six builtin ranges (int4range,
237    /// int8range, numrange, tsrange, tstzrange, daterange) —
238    /// `RangeKind` pins the element type so encode / decode /
239    /// display can route off one switch. Catalog FILE_VERSION
240    /// 43+; tag 29 + a 1-byte RangeKind on the dense type-tag
241    /// side, tag 25 on the schema-agnostic value side.
242    Range(RangeKind),
243    /// v7.17.0 Phase 3.P0-39: PG `hstore` extension type — flat
244    /// `text => text` map with NULL value support. Catalog
245    /// FILE_VERSION 44+; tag 30 on the dense type-tag side, tag
246    /// 26 on the schema-agnostic value side. The contrib OID is
247    /// installation-dependent in real PG; SPG advertises it via
248    /// dynamic lookup, falling back to TEXT (OID 25) on the wire
249    /// when the installed `hstore` extension hasn't claimed an
250    /// OID yet.
251    Hstore,
252    /// v7.17.0 Phase 3.P0-40: PG `int[][]` — 2-dimensional INT
253    /// matrix. Storage: row-major Vec<Vec<Option<i32>>>. All
254    /// rows must share the same column count. Wire OID 1007
255    /// (same as INT[]; the dimension count travels in the data
256    /// header, not the OID). Catalog FILE_VERSION 45+; tag 31
257    /// on the dense type-tag side, tag 27 on the schema-agnostic
258    /// value side.
259    IntArray2D,
260    /// v7.17.0 Phase 3.P0-40: PG `bigint[][]` — 2-dimensional
261    /// BIGINT matrix. Storage / OID / tags mirror IntArray2D.
262    /// Tag 32 dense, tag 28 schema-agnostic.
263    BigIntArray2D,
264    /// v7.17.0 Phase 3.P0-40: PG `text[][]` — 2-dimensional TEXT
265    /// matrix. Storage: row-major Vec<Vec<Option<String>>>.
266    /// Tag 33 dense, tag 29 schema-agnostic.
267    TextArray2D,
268}
269
270/// v7.17.0 Phase 3.P0-38 — pins the element type of a range value
271/// or column. Wire OIDs: Int4=3904, Int8=3926, Num=3906,
272/// Ts=3908, TsTz=3910, Date=3912.
273#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
274pub enum RangeKind {
275    Int4,
276    Int8,
277    Num,
278    Ts,
279    TsTz,
280    Date,
281}
282
283impl RangeKind {
284    pub const fn tag(self) -> u8 {
285        match self {
286            Self::Int4 => 0,
287            Self::Int8 => 1,
288            Self::Num => 2,
289            Self::Ts => 3,
290            Self::TsTz => 4,
291            Self::Date => 5,
292        }
293    }
294    pub const fn from_tag(t: u8) -> Option<Self> {
295        Some(match t {
296            0 => Self::Int4,
297            1 => Self::Int8,
298            2 => Self::Num,
299            3 => Self::Ts,
300            4 => Self::TsTz,
301            5 => Self::Date,
302            _ => return None,
303        })
304    }
305    pub const fn keyword(self) -> &'static str {
306        match self {
307            Self::Int4 => "INT4RANGE",
308            Self::Int8 => "INT8RANGE",
309            Self::Num => "NUMRANGE",
310            Self::Ts => "TSRANGE",
311            Self::TsTz => "TSTZRANGE",
312            Self::Date => "DATERANGE",
313        }
314    }
315}
316
317impl fmt::Display for DataType {
318    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319        match self {
320            Self::SmallInt => f.write_str("SMALLINT"),
321            Self::Int => f.write_str("INT"),
322            Self::BigInt => f.write_str("BIGINT"),
323            Self::Float => f.write_str("FLOAT"),
324            Self::Text => f.write_str("TEXT"),
325            Self::Varchar(n) => write!(f, "VARCHAR({n})"),
326            Self::Char(n) => write!(f, "CHAR({n})"),
327            Self::Bool => f.write_str("BOOL"),
328            Self::Vector { dim, encoding } => match encoding {
329                VecEncoding::F32 => write!(f, "VECTOR({dim})"),
330                VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
331                VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
332            },
333            Self::Numeric { precision, scale } => {
334                if *scale == 0 {
335                    write!(f, "NUMERIC({precision})")
336                } else {
337                    write!(f, "NUMERIC({precision}, {scale})")
338                }
339            }
340            Self::Date => f.write_str("DATE"),
341            Self::Timestamp => f.write_str("TIMESTAMP"),
342            Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
343            Self::Interval => f.write_str("INTERVAL"),
344            Self::Json => f.write_str("JSON"),
345            Self::Jsonb => f.write_str("JSONB"),
346            Self::Bytes => f.write_str("BYTEA"),
347            Self::TextArray => f.write_str("TEXT[]"),
348            Self::IntArray => f.write_str("INT[]"),
349            Self::BigIntArray => f.write_str("BIGINT[]"),
350            Self::TsVector => f.write_str("TSVECTOR"),
351            Self::TsQuery => f.write_str("TSQUERY"),
352            Self::Uuid => f.write_str("UUID"),
353            Self::Time => f.write_str("TIME"),
354            Self::Year => f.write_str("YEAR"),
355            Self::TimeTz => f.write_str("TIMETZ"),
356            Self::Money => f.write_str("MONEY"),
357            Self::Range(k) => f.write_str(k.keyword()),
358            Self::Hstore => f.write_str("HSTORE"),
359            Self::IntArray2D => f.write_str("INT[][]"),
360            Self::BigIntArray2D => f.write_str("BIGINT[][]"),
361            Self::TextArray2D => f.write_str("TEXT[][]"),
362        }
363    }
364}
365
366/// v7.12.0 — one entry in a `Value::TsVector`. The lexeme is the
367/// (already-tokenised + stemmed in v7.12.1+) word; `positions` is
368/// a strictly-ascending list of 1-based positions; `weight` is the
369/// PG weight letter (A=3, B=2, C=1, D=0) — v7.12.0 defaults every
370/// lexeme to D, the v7.12.2 ranking path consumes the weight.
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct TsLexeme {
373    pub word: String,
374    pub positions: Vec<u16>,
375    pub weight: u8,
376}
377
378/// v7.12.0 — parse tree for a PG `tsquery`. v7.12.0 ships the
379/// type + codec only; the `to_tsquery` / `plainto_tsquery` lexer
380/// lands in v7.12.1 and the `@@` evaluator in v7.12.2.
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub enum TsQueryAst {
383    /// Single lexeme term. The `weight_mask` is the PG-style
384    /// bitmask of accepted weights (`A=1<<3`, `B=1<<2`, `C=1<<1`,
385    /// `D=1<<0`); `0` = any weight. v7.12.0 always sets it to 0.
386    Term {
387        word: String,
388        weight_mask: u8,
389    },
390    And(Box<TsQueryAst>, Box<TsQueryAst>),
391    Or(Box<TsQueryAst>, Box<TsQueryAst>),
392    Not(Box<TsQueryAst>),
393    /// `phrase <distance> phrase`. v7.12.0 only persists this; the
394    /// match semantics arrive in v7.12.2 alongside `@@`.
395    Phrase {
396        left: Box<TsQueryAst>,
397        right: Box<TsQueryAst>,
398        distance: u16,
399    },
400}
401
402/// A row-cell value, including SQL `NULL`. `Float` uses `f64`; NaN compares
403/// non-equal to itself (PG behaviour) — `PartialEq` is derived so callers
404/// must opt into NaN-aware comparison if they need stronger guarantees.
405#[derive(Debug, Clone, PartialEq)]
406#[non_exhaustive]
407pub enum Value {
408    SmallInt(i16),
409    Int(i32),
410    BigInt(i64),
411    Float(f64),
412    Text(String),
413    Bool(bool),
414    Vector(Vec<f32>),
415    /// v6.0.1: 8-bit scalar-quantised vector cell. Lives in
416    /// columns declared `VECTOR(N) USING SQ8`. Layout per cell:
417    /// `Sq8Vector { min: f32, max: f32, bytes: Vec<u8> }` —
418    /// 4× compression vs `Vector(Vec<f32>)`. The wire layer
419    /// dequantises to `f32` on SELECT; INSERT path quantises
420    /// incoming `Vector(Vec<f32>)` cells into this variant.
421    Sq8Vector(crate::quantize::Sq8Vector),
422    /// v6.0.3: IEEE-754 binary16 vector cell. Lives in columns
423    /// declared `VECTOR(N) USING HALF`. Stores raw u16 LE bits
424    /// (2× compression vs `Vector(Vec<f32>)`). Wire / display
425    /// paths dequantise to f32 bit-exactly; INSERT path converts
426    /// incoming f32 vectors at the engine boundary.
427    HalfVector(crate::halfvec::HalfVector),
428    /// Exact fixed-point decimal. `scaled` holds the value as
429    /// `actual * 10^scale` so the storage type is always integral —
430    /// arithmetic never falls back to floating-point.
431    Numeric {
432        scaled: i128,
433        scale: u8,
434    },
435    /// Days since the Unix epoch (1970-01-01). Negative for earlier dates.
436    Date(i32),
437    /// Microseconds since the Unix epoch (1970-01-01T00:00:00Z).
438    Timestamp(i64),
439    /// Calendar span: `months` (variable-length) + `micros` (fixed-length).
440    /// Runtime-only — cannot appear in a stored row in v2.11.
441    Interval {
442        months: i32,
443        micros: i64,
444    },
445    /// v4.9 `JSON` — raw JSON text. No structural validation
446    /// happens at the storage layer; whatever the parser hands us
447    /// round-trips verbatim. Equality is byte-wise.
448    Json(String),
449    /// v7.10.4 `BYTEA` — raw binary blob. Equality is byte-wise.
450    /// Layout matches `Text`'s length-prefixed shape (`[u32 LE
451    /// len][bytes]`) under tag 18; the engine accepts PG hex
452    /// literals (`'\xDEADBEEF'`) and escape literals at the
453    /// coercion boundary.
454    Bytes(Vec<u8>),
455    /// v7.10.9 `TEXT[]` — single-dimension TEXT array with
456    /// optional NULL elements. Equality is element-wise. PG's
457    /// NULL-element comparison semantics: NULL ≠ NULL inside
458    /// arrays under `=`, so `[NULL] != [NULL]` (the engine
459    /// honours this).
460    TextArray(Vec<Option<String>>),
461    /// v7.11.12 `INT[]` — single-dimension i32 array with optional
462    /// NULL elements. Codec mirrors TextArray with i32 LE per
463    /// element instead of length-prefixed UTF-8.
464    IntArray(Vec<Option<i32>>),
465    /// v7.11.12 `BIGINT[]` — single-dimension i64 array with optional
466    /// NULL elements.
467    BigIntArray(Vec<Option<i64>>),
468    /// v7.12.0 `tsvector` — sorted-by-word, deduped lexeme set with
469    /// positions + weights. The engine enforces sort/dedup on
470    /// construction; consumers can rely on `lexemes.windows(2)`
471    /// being strictly ascending by `word`.
472    TsVector(Vec<TsLexeme>),
473    /// v7.12.0 `tsquery` — boolean / phrase parse tree over
474    /// lexemes. Engine builds via `to_tsquery` family.
475    TsQuery(TsQueryAst),
476    /// v7.17.0 `uuid` — 128-bit identifier. Stored as 16 bytes
477    /// (big-endian / network-byte order, same as RFC 4122).
478    /// Display normalises to canonical lowercase 8-4-4-4-12
479    /// hyphenated form. Equality is byte-wise.
480    Uuid([u8; 16]),
481    /// v7.17.0 Phase 3.P0-32 — PG `time` (without time zone) —
482    /// i64 microseconds since 00:00:00. Range 0..86_400_000_000.
483    /// Display: `HH:MM:SS` zero-padded, with optional `.ffffff`
484    /// suffix when fractional is non-zero.
485    Time(i64),
486    /// v7.17.0 Phase 3.P0-33 — MySQL `YEAR` — u16 in range
487    /// 1901..=2155 plus the special zero-year sentinel 0.
488    /// Display always 4 digits zero-padded (`0000` for the
489    /// sentinel; `1985`/`2007` otherwise).
490    Year(u16),
491    /// v7.17.0 Phase 3.P0-34 — PG `time with time zone` — i64
492    /// microseconds since 00:00:00 in the LOCAL wall clock PLUS
493    /// an i32 offset-from-UTC in seconds. PG preserves the
494    /// offset on output, so the wall-clock value is NOT shifted
495    /// to UTC at storage time. Offset range: ±50400 seconds
496    /// (±14 hours).
497    TimeTz {
498        us: i64,
499        offset_secs: i32,
500    },
501    /// v7.17.0 Phase 3.P0-35 — PG `money` — i64 cents
502    /// (locale-independent storage; the en_US locale renders on
503    /// display via `$N,NNN.CC`).
504    Money(i64),
505    /// v7.17.0 Phase 3.P0-39 — PG `hstore` value: flat
506    /// `text => text` map with NULL value support. Insertion
507    /// order preserved on input; duplicate keys take last-write-
508    /// wins at parse time.
509    Hstore(Vec<(String, Option<String>)>),
510    /// v7.17.0 Phase 3.P0-40 — 2D INT matrix (row-major).
511    IntArray2D(Vec<Vec<Option<i32>>>),
512    /// v7.17.0 Phase 3.P0-40 — 2D BIGINT matrix (row-major).
513    BigIntArray2D(Vec<Vec<Option<i64>>>),
514    /// v7.17.0 Phase 3.P0-40 — 2D TEXT matrix (row-major).
515    TextArray2D(Vec<Vec<Option<String>>>),
516    /// v7.17.0 Phase 3.P0-38 — PG range value. One shape covers
517    /// all six builtin range types; `kind` pins the element type
518    /// (must match the column's `DataType::Range(kind)`).
519    /// `lower` / `upper` are `None` for the unbounded sides;
520    /// `lower_inc` / `upper_inc` mirror the canonical PG
521    /// `[` / `(` / `]` / `)` bracket inclusivity. `empty=true`
522    /// supersedes all other fields (the empty range has no
523    /// bounds).
524    Range {
525        kind: RangeKind,
526        lower: Option<alloc::boxed::Box<Value>>,
527        upper: Option<alloc::boxed::Box<Value>>,
528        lower_inc: bool,
529        upper_inc: bool,
530        empty: bool,
531    },
532    Null,
533}
534
535impl Value {
536    /// Type tag, or `None` for `NULL` (unknown at value level).
537    pub fn data_type(&self) -> Option<DataType> {
538        match self {
539            Self::SmallInt(_) => Some(DataType::SmallInt),
540            Self::Int(_) => Some(DataType::Int),
541            Self::BigInt(_) => Some(DataType::BigInt),
542            Self::Float(_) => Some(DataType::Float),
543            // `Text` covers both unbounded TEXT and bounded VARCHAR/CHAR
544            // — the constraint lives on the column schema, not the value.
545            Self::Text(_) => Some(DataType::Text),
546            Self::Bool(_) => Some(DataType::Bool),
547            Self::Vector(v) => Some(DataType::Vector {
548                dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
549                encoding: VecEncoding::F32,
550            }),
551            Self::Sq8Vector(q) => Some(DataType::Vector {
552                dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
553                encoding: VecEncoding::Sq8,
554            }),
555            Self::HalfVector(h) => Some(DataType::Vector {
556                dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
557                encoding: VecEncoding::F16,
558            }),
559            // `Value::Numeric` doesn't carry its precision (the column
560            // schema does); we surface precision=0 as "unknown" and let
561            // the engine reconcile against the column type at coercion
562            // time.
563            Self::Numeric { scale, .. } => Some(DataType::Numeric {
564                precision: 0,
565                scale: *scale,
566            }),
567            Self::Date(_) => Some(DataType::Date),
568            Self::Timestamp(_) => Some(DataType::Timestamp),
569            Self::Interval { .. } => Some(DataType::Interval),
570            Self::Json(_) => Some(DataType::Json),
571            Self::Bytes(_) => Some(DataType::Bytes),
572            Self::TextArray(_) => Some(DataType::TextArray),
573            Self::IntArray(_) => Some(DataType::IntArray),
574            Self::BigIntArray(_) => Some(DataType::BigIntArray),
575            Self::TsVector(_) => Some(DataType::TsVector),
576            Self::TsQuery(_) => Some(DataType::TsQuery),
577            Self::Uuid(_) => Some(DataType::Uuid),
578            Self::Time(_) => Some(DataType::Time),
579            Self::Year(_) => Some(DataType::Year),
580            Self::TimeTz { .. } => Some(DataType::TimeTz),
581            Self::Money(_) => Some(DataType::Money),
582            Self::Range { kind, .. } => Some(DataType::Range(*kind)),
583            Self::Hstore(_) => Some(DataType::Hstore),
584            Self::IntArray2D(_) => Some(DataType::IntArray2D),
585            Self::BigIntArray2D(_) => Some(DataType::BigIntArray2D),
586            Self::TextArray2D(_) => Some(DataType::TextArray2D),
587            Self::Null => None,
588        }
589    }
590
591    pub const fn is_null(&self) -> bool {
592        matches!(self, Self::Null)
593    }
594}
595
596/// One table row — values are positional and must match
597/// `TableSchema.columns` in length and (modulo NULL) in `DataType`.
598#[derive(Debug, Clone, PartialEq)]
599pub struct Row {
600    pub values: Vec<Value>,
601}
602
603impl Row {
604    pub const fn new(values: Vec<Value>) -> Self {
605        Self { values }
606    }
607
608    pub fn len(&self) -> usize {
609        self.values.len()
610    }
611
612    pub fn is_empty(&self) -> bool {
613        self.values.is_empty()
614    }
615}
616
617#[derive(Debug, Clone, PartialEq)]
618pub struct ColumnSchema {
619    pub name: String,
620    pub ty: DataType,
621    pub nullable: bool,
622    /// Optional `DEFAULT` value, frozen at CREATE TABLE time. `None`
623    /// means "no default" (so omitted columns become NULL, or error
624    /// out when the column is NOT NULL). Literal defaults take this
625    /// path.
626    pub default: Option<Value>,
627    /// v7.9.21 — for DEFAULT expressions that need INSERT-time
628    /// evaluation (e.g. `DEFAULT now()`, `DEFAULT CURRENT_TIMESTAMP`),
629    /// the Display form of the expression. The engine re-parses
630    /// it on each INSERT default-fill, evaluates against an empty
631    /// row context, and coerces to the column type. mailrs G4.
632    /// Persisted in catalog FILE_VERSION 15+; older catalogs
633    /// deserialise with None.
634    pub runtime_default: Option<String>,
635    /// MySQL-style `AUTO_INCREMENT`. When set, an INSERT that leaves
636    /// this column unbound (or sets it to NULL) gets the next integer
637    /// computed from the column's current max + 1.
638    pub auto_increment: bool,
639    /// v7.17.0 Phase 1.4 — when the column is bound to a user-
640    /// defined ENUM type (the parser saw an unknown type ident
641    /// and the engine resolved it against `catalog.enum_types`),
642    /// this carries the enum name so INSERT/UPDATE can validate
643    /// the cell value against the enum's labels. `ty` is
644    /// `DataType::Text` in that case. Persisted in catalog
645    /// FILE_VERSION 29+; older catalogs deserialise with None.
646    pub user_enum_type: Option<String>,
647    /// v7.17.0 Phase 1.5 — when the column is bound to a user-
648    /// defined DOMAIN (the parser saw an unknown type ident and
649    /// the engine resolved it against `catalog.domain_types`),
650    /// this carries the domain name. `ty` is the domain's base
651    /// type; INSERT/UPDATE re-evaluates the domain's CHECK list
652    /// + NOT NULL against the cell value. Persisted in catalog
653    /// FILE_VERSION 30+; older catalogs deserialise with None.
654    pub user_domain_type: Option<String>,
655    /// v7.17.0 Phase 2.1 — MySQL `ON UPDATE CURRENT_TIMESTAMP`
656    /// column attribute. When `Some(expr_src)`, an UPDATE that
657    /// does NOT bind this column overrides the new value with
658    /// the engine-evaluated expression (always `now()` in
659    /// v7.17.0). Stored as Display-form source so storage
660    /// stays free of spg-sql; the engine re-parses at UPDATE
661    /// time. Persisted in catalog FILE_VERSION 32+; older
662    /// catalogs deserialise with None — preserves the existing
663    /// "silent ignore" behaviour for snapshots written before
664    /// the upgrade.
665    pub on_update_runtime: Option<String>,
666    /// v7.17.0 Phase 2.5 — text collation. Pre-2.5 SPG accepted
667    /// `COLLATE <name>` clauses but discarded the name, so a
668    /// column declared `COLLATE "case_insensitive"` (or any
669    /// MySQL `_ci` collation) still compared byte-wise — a
670    /// Tier-S silent failure where `WHERE name = 'foo'` never
671    /// matched stored `'Foo'`. This carries the parser-derived
672    /// classification so the engine's WHERE evaluator can route
673    /// text equality through a case-aware compare. `Binary` (the
674    /// default) preserves the prior byte-wise behaviour. Only
675    /// CaseInsensitive lands in the catalog appendix — Binary
676    /// columns stay implicit, keeping snapshots compact.
677    /// Persisted in catalog FILE_VERSION 34+; older catalogs
678    /// deserialise every column as `Binary`.
679    pub collation: Collation,
680    /// v7.17.0 Phase 4.4 — MySQL `UNSIGNED` modifier flag. Drives
681    /// engine-side INSERT / UPDATE range enforcement (rejects
682    /// negative values on UNSIGNED int columns). Pre-4.4 the
683    /// parser consumed and discarded the keyword silently, so
684    /// every UNSIGNED column quietly accepted negatives — a
685    /// Tier-A correctness drift. Sparse: only UNSIGNED columns
686    /// land in the catalog appendix; the default `false` keeps
687    /// snapshots compact for the common signed-int path.
688    /// Persisted in catalog FILE_VERSION 35+; older catalogs
689    /// deserialise every column as `is_unsigned = false`.
690    pub is_unsigned: bool,
691    /// v7.17.0 Phase 3.P0-36 — MySQL inline `ENUM('a','b','c')`
692    /// value list. Distinct from `user_enum_type` (which points
693    /// to a separately CREATE TYPE'd PG enum); this carries the
694    /// column-local list MySQL DDL declares inline. When `Some`,
695    /// `ty` is `DataType::Text` and INSERT/UPDATE validates the
696    /// cell value against this list. Variant ORDER is preserved
697    /// (MySQL uses it for `ORDER BY col`). Sparse: only ENUM
698    /// columns land in the catalog appendix.
699    /// Persisted in catalog FILE_VERSION 41+; older catalogs
700    /// deserialise with None — preserves silent-drop behaviour
701    /// for snapshots written before P0-36.
702    pub inline_enum_variants: Option<Vec<String>>,
703    /// v7.17.0 Phase 3.P0-37 — MySQL inline `SET('a','b','c')`
704    /// variant list. Storage is TEXT (canonical comma-joined in
705    /// definition order, de-duplicated). INSERT/UPDATE validates
706    /// every comma-separated token against this list. Sparse:
707    /// only SET columns land in the catalog appendix.
708    /// Persisted in catalog FILE_VERSION 42+; older catalogs
709    /// deserialise with None.
710    pub inline_set_variants: Option<Vec<String>>,
711}
712
713/// v7.17.0 Phase 2.5 — column-level text collation. Drives the
714/// engine's WHERE / GROUP BY equality routing for `Value::Text`.
715/// Only two variants are modelled in v7.17:
716///   * `Binary`  — byte-wise comparison (the SPG default;
717///                 matches PG `COLLATE "C"` / `pg_catalog.default`
718///                 and MySQL `*_bin`).
719///   * `CaseInsensitive` — ASCII case-folded comparison
720///                 (matches PG `COLLATE "case_insensitive"` and
721///                 MySQL `*_ci` collations). Non-ASCII bytes
722///                 still compare byte-wise; full ICU folding is
723///                 out of v7.17 scope.
724/// New variants append at the end — older catalogs read missing
725/// columns as `Binary`.
726#[derive(Debug, Clone, Copy, PartialEq, Eq)]
727pub enum Collation {
728    Binary,
729    CaseInsensitive,
730}
731
732#[allow(clippy::derivable_impls)]
733impl Default for Collation {
734    fn default() -> Self {
735        Self::Binary
736    }
737}
738
739impl Collation {
740    /// Wire tag persisted in the FILE_VERSION 34+ catalog appendix.
741    /// Stable: future variants append above the recognised range
742    /// and unknown tags read back as `Binary` for forward-compat
743    /// on rollback.
744    pub const TAG_BINARY: u8 = 0;
745    pub const TAG_CASE_INSENSITIVE: u8 = 1;
746}
747
748#[derive(Debug, Clone, PartialEq)]
749pub struct TableSchema {
750    pub name: String,
751    pub columns: Vec<ColumnSchema>,
752    /// v6.7.2 — per-table hot-tier byte budget override. `None`
753    /// falls through to the global `SPG_HOT_TIER_BYTES` setting;
754    /// `Some(n)` overrides it for this specific table. Set via
755    /// `ALTER TABLE t SET hot_tier_bytes = X`. Persisted in
756    /// catalog FILE_VERSION 11+.
757    pub hot_tier_bytes: Option<u64>,
758    /// v7.6.1 — FOREIGN KEY constraints declared on this table.
759    /// Engine maintains this in lock-step with `spg-sql`'s parser
760    /// AST; the storage layer carries the on-disk shape so a
761    /// catalog snapshot round-trips without external mapping.
762    /// Persisted in catalog FILE_VERSION 13+. Older catalogs
763    /// deserialise with an empty vec.
764    pub foreign_keys: Vec<ForeignKeyConstraint>,
765    /// v7.9.19 — composite UNIQUE / PRIMARY KEY constraints
766    /// declared at the table level. Each entry's leading column
767    /// has a BTree index (created via the constraint), and INSERT
768    /// path enforces the full-tuple uniqueness via a scan keyed
769    /// by the leading column. Persisted in catalog FILE_VERSION
770    /// 15+. Older catalogs (≤ 14) deserialise with an empty vec.
771    pub uniqueness_constraints: Vec<UniquenessConstraint>,
772    /// v7.13.0 — `CHECK (<expr>)` predicates declared on this
773    /// table. Both column-level inline `CHECK (…)` and
774    /// table-level `CHECK (…)` fold into this list. Each entry
775    /// is the AST Expr's `Display` form, re-parsed on every
776    /// INSERT/UPDATE and evaluated against the candidate row.
777    /// A false / NULL result rejects the mutation (PG semantics).
778    /// Persisted in catalog FILE_VERSION 23+. Older catalogs
779    /// deserialise with an empty vec.
780    pub checks: Vec<String>,
781}
782
783/// v7.9.19 — composite UNIQUE / PRIMARY KEY constraint persisted
784/// on the table schema. The leading column always has a BTree
785/// index (created at CREATE TABLE time); INSERT enforcement
786/// scans that index for collisions on the full column tuple.
787#[derive(Debug, Clone, PartialEq, Eq)]
788pub struct UniquenessConstraint {
789    /// `true` when this constraint was declared as `PRIMARY KEY`
790    /// (vs `UNIQUE`). Semantically PK implies NOT NULL on all
791    /// referenced columns; the engine enforces that at CREATE
792    /// TABLE time.
793    pub is_primary_key: bool,
794    /// Column positions on the parent table. ≥ 1 element. For
795    /// single-column UNIQUE this is exactly one position; the
796    /// BTree index alone enforces it.
797    pub columns: Vec<usize>,
798    /// v7.13.0 — `UNIQUE NULLS NOT DISTINCT` modifier
799    /// (mailrs round-5 G10; PG 15+ surface). When `true`, two
800    /// rows whose constrained columns are all NULL collide on
801    /// the constraint. Default (`false`) is the SQL-standard
802    /// `NULLS DISTINCT` behaviour where any NULL passes.
803    /// Persisted in catalog FILE_VERSION 23+.
804    pub nulls_not_distinct: bool,
805}
806
807/// v7.6.1 — Storage-layer mirror of `spg_sql::ast::ForeignKeyConstraint`.
808/// The engine's CREATE TABLE path translates between the two; keeping
809/// them separate preserves the no-deps boundary between
810/// `spg-storage` and `spg-sql`.
811#[derive(Debug, Clone, PartialEq, Eq)]
812pub struct ForeignKeyConstraint {
813    /// Optional user-supplied constraint name (`CONSTRAINT <name>`
814    /// prefix). Used by `ALTER TABLE DROP CONSTRAINT <name>` in
815    /// v7.6.8; ignored by enforcement.
816    pub name: Option<String>,
817    /// Positions of local columns in this table's column list.
818    /// Same arity as `parent_columns`.
819    pub local_columns: Vec<usize>,
820    /// Referenced parent table name.
821    pub parent_table: String,
822    /// Positions of parent columns in the parent's column list.
823    /// Engine resolves these at CREATE TABLE time (after the parent
824    /// schema is known) so enforcement paths can skip the name
825    /// lookup on every row.
826    pub parent_columns: Vec<usize>,
827    /// Referential action when a parent row is deleted.
828    pub on_delete: FkAction,
829    /// Referential action when a parent row's referenced columns
830    /// are updated.
831    pub on_update: FkAction,
832}
833
834/// v7.6.1 — referential action tag. Mirrors `spg_sql::ast::FkAction`.
835#[derive(Debug, Clone, Copy, PartialEq, Eq)]
836pub enum FkAction {
837    Restrict,
838    Cascade,
839    SetNull,
840    SetDefault,
841    NoAction,
842}
843
844impl FkAction {
845    /// On-disk tag byte (v13 catalog appendix).
846    pub const fn tag(self) -> u8 {
847        match self {
848            Self::Restrict => 0,
849            Self::Cascade => 1,
850            Self::SetNull => 2,
851            Self::SetDefault => 3,
852            Self::NoAction => 4,
853        }
854    }
855    pub const fn from_tag(b: u8) -> Option<Self> {
856        Some(match b {
857            0 => Self::Restrict,
858            1 => Self::Cascade,
859            2 => Self::SetNull,
860            3 => Self::SetDefault,
861            4 => Self::NoAction,
862            _ => return None,
863        })
864    }
865}
866
867impl TableSchema {
868    pub fn column_position(&self, name: &str) -> Option<usize> {
869        self.columns.iter().position(|c| c.name == name)
870    }
871}
872
873/// Key type accepted by secondary indices. Float / NULL / Vector values
874/// can't participate in a B-tree index — `f64` is only `PartialOrd`, NULL
875/// has SQL-three-valued semantics, and Vector belongs to the (future) HNSW
876/// path. Index lookups on those columns fall back to full scan.
877#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
878pub enum IndexKey {
879    Int(i64),
880    Text(String),
881    Bool(bool),
882    /// v7.17.0 — `Value::Uuid` index key. Comparison is byte-wise
883    /// (RFC 4122 byte order) so PRIMARY KEY UUID lookups land on
884    /// the same fast-path as Int / Text.
885    Uuid([u8; 16]),
886}
887
888impl IndexKey {
889    pub fn from_value(v: &Value) -> Option<Self> {
890        match v {
891            Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
892            Value::Int(n) => Some(Self::Int(i64::from(*n))),
893            Value::BigInt(n) => Some(Self::Int(*n)),
894            Value::Text(s) => Some(Self::Text(s.clone())),
895            Value::Bool(b) => Some(Self::Bool(*b)),
896            // Date/Timestamp use their integer storage repr as the
897            // index key — same order semantics, same comparison.
898            Value::Date(d) => Some(Self::Int(i64::from(*d))),
899            Value::Timestamp(t) => Some(Self::Int(*t)),
900            // v7.17.0: UUID indexable via byte-wise ordering. Lookup
901            // on `id = '...'::uuid` resolves through the secondary
902            // index rather than full-scan.
903            Value::Uuid(b) => Some(Self::Uuid(*b)),
904            // v7.17.0 Phase 3.P0-32: TIME indexable via i64 — same
905            // order semantics as Date/Timestamp.
906            Value::Time(us) => Some(Self::Int(*us)),
907            // v7.17.0 Phase 3.P0-33: YEAR indexable as i64 — u16
908            // widens losslessly and gives the natural calendar
909            // ordering.
910            Value::Year(y) => Some(Self::Int(i64::from(*y))),
911            // v7.17.0 Phase 3.P0-34: TIMETZ indexable by its
912            // UTC-equivalent microseconds (local wall - offset).
913            // Without normalising, two values for the same
914            // physical instant in different zones would sort
915            // wrong. Matches PG's TIMETZ index behaviour.
916            Value::TimeTz { us, offset_secs } => {
917                Some(Self::Int(us - i64::from(*offset_secs) * 1_000_000))
918            }
919            // v7.17.0 Phase 3.P0-35: MONEY indexable as i64 cents
920            // (no scaling needed — natural numeric ordering).
921            Value::Money(c) => Some(Self::Int(*c)),
922            // v7.17.0 Phase 3.P0-38: ranges are NOT indexable in
923            // v7.17.0 — they'd need a custom comparator (PG uses
924            // SP-GiST for this). Skip.
925            Value::Range { .. } => None,
926            // v7.17.0 Phase 3.P0-39: hstore is NOT indexable in
927            // v7.17.0 — map columns need GIN with bespoke ops.
928            Value::Hstore(_) => None,
929            // v7.17.0 Phase 3.P0-40: 2D arrays aren't indexable.
930            Value::IntArray2D(_) | Value::BigIntArray2D(_) | Value::TextArray2D(_) => None,
931            // Numeric isn't (yet) indexable — exact-decimal index keys
932            // would need a stable scale-normalised representation.
933            // Interval isn't index-eligible either (and can't reach this
934            // path through column storage anyway).
935            Value::Null
936            | Value::Float(_)
937            | Value::Vector(_)
938            | Value::Sq8Vector(_)
939            | Value::HalfVector(_)
940            | Value::Numeric { .. }
941            | Value::Interval { .. }
942            | Value::Json(_)
943            | Value::Bytes(_)
944            | Value::TextArray(_)
945            | Value::IntArray(_)
946            | Value::BigIntArray(_)
947            | Value::TsVector(_)
948            | Value::TsQuery(_) => None,
949        }
950    }
951}
952
953/// A single-column secondary index. v2.0 carries either a B-tree map
954/// (the default — used for equality / range lookups on scalar columns)
955/// or a navigable-small-world graph (used for kNN over vector
956/// columns).
957#[derive(Debug, Clone)]
958pub struct Index {
959    pub name: String,
960    pub column_position: usize,
961    pub kind: IndexKind,
962    /// v6.8.0 — column positions of `INCLUDE (col1, col2, …)`
963    /// non-key columns. Carries the planner's "this query is
964    /// covered by the index" signal; lookup paths still resolve
965    /// via the `RowLocator` to fetch the row body, but EXPLAIN
966    /// surfaces the covered-scan annotation so operators can
967    /// confirm the planner sees the coverage.
968    ///
969    /// Empty `Vec` = no `INCLUDE` clause (the legacy shape). v12
970    /// catalog snapshots deserialise with an empty vec.
971    pub included_columns: Vec<usize>,
972    /// v6.8.1 — partial-index predicate stored as its canonical
973    /// Display form (the engine re-parses it on the maintenance
974    /// path). `None` = unconditional index (the legacy shape).
975    /// Persisted as `[u8 has_pred][u16 LE len][bytes]` on the
976    /// catalog snapshot (FILE_VERSION 12, appended after
977    /// `included_columns`).
978    pub partial_predicate: Option<String>,
979    /// v6.8.2 — expression-index key, stored as the expression's
980    /// canonical Display form. `None` = bare column-reference
981    /// index (the legacy shape). Persisted alongside
982    /// `partial_predicate` on the v12 catalog snapshot.
983    pub expression: Option<String>,
984    /// v7.9.29 — `CREATE UNIQUE INDEX …`. When true the engine
985    /// rejects INSERTs whose key already appears in this index
986    /// (combined with `partial_predicate` when present — only
987    /// rows matching the predicate enter the uniqueness check).
988    /// Catalog FILE_VERSION 16+; older snapshots deserialise
989    /// with `false`. mailrs K1.
990    pub is_unique: bool,
991    /// v7.9.29 — extra (non-leading) column positions for
992    /// multi-column indexes (`CREATE INDEX … (a, b, c)`). The
993    /// planner today still only uses the leading
994    /// `column_position` for index seeks, but UNIQUE INDEX
995    /// enforcement walks the full tuple so partial-unique
996    /// invariants like CalDAV `(calendar_id, uid,
997    /// recurrence_id)` are enforced correctly. Catalog
998    /// FILE_VERSION 16+; older snapshots deserialise empty.
999    pub extra_column_positions: Vec<usize>,
1000}
1001
1002/// Default neighbor degree (M) for the NSW graph. Picked at construction
1003/// time and persisted with the index.
1004pub const NSW_DEFAULT_M: usize = 16;
1005
1006/// v5.2.2: outcome of a successful [`Catalog::freeze_oldest_to_cold`]
1007/// call. The catalog state has already been mutated by the time this
1008/// is returned (hot rows dropped + segment registered + Cold locators
1009/// flipped). The caller's only remaining concern is `segment_bytes` —
1010/// persist them to disk under `<db>.spg/segments/seg_<id>.spg` so a
1011/// future restart can reload via the v5.1 `SPG_PRELOAD_COLD_SEGMENT`
1012/// path. (v5.3's manifest will subsume this manual step.)
1013#[derive(Debug, Clone)]
1014pub struct FreezeReport {
1015    /// Id allocated by [`Catalog::load_segment_bytes`] for the new
1016    /// cold-tier segment. Stable across the call's success path.
1017    pub segment_id: u32,
1018    /// Number of rows that moved hot → cold. Equals the `max_rows`
1019    /// the caller asked for (the API is strict on the count).
1020    pub frozen_rows: usize,
1021    /// Hot-tier bytes reclaimed by the freeze — the
1022    /// [`Table::hot_bytes`] delta before vs after. Useful to feed
1023    /// back into the freezer's budget check on the next tick.
1024    pub bytes_freed: u64,
1025    /// Encoded segment bytes, byte-identical to what
1026    /// [`encode_segment`] produced. The catalog already owns a
1027    /// copy inside `cold_segments`; this hand-off lets the caller
1028    /// persist them without re-encoding.
1029    pub segment_bytes: Vec<u8>,
1030}
1031
1032/// v6.7.4 — read-only output of [`Catalog::prepare_freeze_slice`].
1033/// Carries every row body + key in a contiguous hot-row range,
1034/// already encoded and sorted by PK so the coordinator's merge
1035/// step is a k-way merge over already-sorted streams.
1036///
1037/// `Vec<FreezeSlice>` from N independent workers feeds
1038/// [`Catalog::commit_freeze_slices`], which concats + encodes the
1039/// merged segment + atomically swaps the catalog state.
1040#[derive(Debug, Clone)]
1041pub struct FreezeSlice {
1042    /// Hot-row index range this slice covered (half-open, in the
1043    /// table's `rows: PersistentVec` ordering at call time). The
1044    /// commit step uses this to compute the union range that
1045    /// gets passed to [`Table::delete_rows`].
1046    pub row_range: core::ops::Range<usize>,
1047    /// `(pk_u64, encoded_row_body, IndexKey)` triples, sorted
1048    /// ascending by `pk_u64`. Per-slice sort happens inside
1049    /// `prepare_freeze_slice`; the coordinator does only a
1050    /// k-way merge to reach the global PK ordering
1051    /// [`encode_segment`] requires.
1052    pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
1053}
1054
1055/// v6.7.3 — outcome of a [`Catalog::compact_cold_segments`] call.
1056/// The catalog state has already been mutated when this is returned:
1057/// the merged segment is loaded into `cold_segments`, the source
1058/// segment slots are tombstoned (`None`), and every BTree-index
1059/// `RowLocator::Cold` that previously pointed at a source now
1060/// points at the merged segment. The caller's remaining job is to
1061/// persist `merged_segment_bytes` under
1062/// `<db>.spg/segments/seg_<merged_segment_id>.spg` and update the
1063/// in-memory `segment_id → path` map (remove the source ids, add
1064/// the merged id) so the next CHECKPOINT writes a manifest that
1065/// no longer lists the retired sources.
1066///
1067/// On a no-op (fewer than 2 candidate segments under the threshold),
1068/// `merged_segment_id` is `None` and `sources` is empty; the
1069/// catalog was not mutated.
1070#[derive(Debug, Clone)]
1071pub struct CompactReport {
1072    /// Source segment ids that were merged + tombstoned.
1073    pub sources: Vec<u32>,
1074    /// Id allocated for the merged segment. `None` on no-op.
1075    pub merged_segment_id: Option<u32>,
1076    /// Encoded merged-segment bytes (empty on no-op).
1077    pub merged_segment_bytes: Vec<u8>,
1078    /// Number of rows that landed in the merged segment.
1079    pub merged_rows: usize,
1080    /// `Σ source.num_rows − merged_rows`. Rows present in source
1081    /// segment payloads but unreferenced by any live BTree
1082    /// `Cold` locator — DELETE'd-but-still-frozen rows that
1083    /// compaction GC'd during the merge.
1084    pub deleted_rows_pruned: usize,
1085    /// `Σ source.bytes() − merged.bytes()`. Estimate of on-disk
1086    /// space the merge will reclaim once the source segment files
1087    /// are GC'd. Saturating subtract — never negative.
1088    pub bytes_reclaimed_estimate: u64,
1089}
1090
1091#[derive(Debug, Clone)]
1092pub enum IndexKind {
1093    /// v4.40: structural-sharing B-tree over `IndexKey`. Replaces the v0.8
1094    /// `BTreeMap<IndexKey, Vec<usize>>` — `Index::clone` is now an `Arc`
1095    /// bump regardless of index size, so `Catalog::clone` inside the
1096    /// v4.34 auto-commit wrap stays O(1) even for tables with secondary
1097    /// indices (the case that bottlenecked v4.39 at 1M rows in the
1098    /// sweep).
1099    ///
1100    /// v5.1: value type widened from `Vec<usize>` to `Vec<RowLocator>` so
1101    /// a single key can point to a mix of hot-tier rows (`RowLocator::Hot`,
1102    /// equivalent to the pre-v5 `usize` row index) and cold-tier rows
1103    /// (`RowLocator::Cold { segment_id, page_offset }`) once the v5.2
1104    /// freezer starts producing them. Pre-v5.2 only `Hot` entries appear
1105    /// — the on-disk encoding stays at `FILE_VERSION` 8 (raw u64 row index)
1106    /// because every locator round-trips through `RowLocator::from_legacy_v8_u64`
1107    /// without information loss. `FILE_VERSION` 9 with tagged encoding lands
1108    /// alongside the first freezer commit (v5.1 step 2b / v5.2).
1109    BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
1110    /// Navigable-small-world graph for vector kNN search.
1111    Nsw(NswGraph),
1112    /// v6.7.1 — BRIN (Block Range INdex). Pure metadata: BRIN
1113    /// indexes carry NO in-memory key→locator map. The (min,
1114    /// max) summaries live in each cold-tier segment's v2
1115    /// envelope sidecar; the BRIN entry in `Table.indices` only
1116    /// records THAT a BRIN index exists on this column so the
1117    /// segment encoder + planner can opt into the summary path.
1118    Brin {
1119        /// The cell type at `column_position` at CREATE INDEX time.
1120        /// Used by the planner to type-check WHERE-clause range
1121        /// predicates against the BRIN-indexed column.
1122        column_type: DataType,
1123    },
1124    /// v7.12.3 — GIN inverted index over a `tsvector` column.
1125    ///
1126    /// Storage shape: `lexeme word → Vec<RowLocator>`. The posting
1127    /// list per word is appended in row-order, so range scans are
1128    /// O(matching rows) once the per-word lookup is done. Multi-
1129    /// term queries intersect / union posting lists.
1130    ///
1131    /// `IndexKey::from_value(TsVector)` returns `None` — GIN doesn't
1132    /// participate in `try_index_seek` (which is BTree-equality-keyed).
1133    /// The engine consults this index through `try_gin_lookup` on
1134    /// `WHERE col @@ tsquery` predicates instead.
1135    ///
1136    /// Backed by a `PersistentBTreeMap` so `Catalog::clone` (the
1137    /// per-write snapshot) stays O(1) — same structural-sharing
1138    /// invariant as BTree.
1139    Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
1140    /// v7.15.0 — `USING gin (col gin_trgm_ops)` over a `TEXT`
1141    /// column. Posting lists map `trigram` (PG-compatible 3-byte
1142    /// shingle on the lower-cased + space-padded input) to row
1143    /// locators. The planner uses this index to accelerate
1144    /// `WHERE col LIKE '…'` / `ILIKE '…'` / `similarity(col, q) >
1145    /// t` — every literal run of length ≥ 1 in the pattern
1146    /// produces a trigram set, the engine intersects the posting
1147    /// lists, and the LIKE / similarity predicate is re-evaluated
1148    /// per candidate row to filter the over-approximation.
1149    /// Persisted via tag-4 index payload in `FILE_VERSION` 24+.
1150    GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
1151    /// v7.17.0 Phase 2.2 — MySQL `FULLTEXT KEY (col)` over a
1152    /// `TEXT` / `VARCHAR` column. Posting lists map
1153    /// `tsvector('simple') lexeme` to row locators. At insert /
1154    /// build time the engine derives the lexemes from the cell
1155    /// via the same lower-case tokenisation rule as
1156    /// `to_tsvector('simple', ...)` — the column itself stays a
1157    /// plain text type on disk (mysqldump round-trips would be
1158    /// broken otherwise). The planner uses this index to
1159    /// accelerate MySQL-shape `MATCH(col) AGAINST('term')`
1160    /// queries by mapping them onto the existing tsquery `@@`
1161    /// walker. Persisted via tag-5 index payload in
1162    /// `FILE_VERSION` 33+.
1163    GinFulltext(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
1164}
1165
1166impl IndexKind {
1167    /// v7.31 (memory campaign, C2) — bytes this index variant holds
1168    /// resident in RAM, computed by walking its OWN structure rather
1169    /// than a parametric guess made by the engine. Replaces the old
1170    /// `spg_admin::memory_stats` inline match, which charged NSW with
1171    /// a stale `m_max_0 * 8` per node (neighbour slots are `u32` = 4 B
1172    /// since v6.1.x, and most nodes never fill `m_max_0`) and lumped
1173    /// every GIN family index into a flat 1 KiB token — a gross
1174    /// undercount for the text-heavy posting lists that dominate
1175    /// mailrs' footprint. Per-entry container overhead uses the
1176    /// 3-word (24 B on 64-bit) `Vec`/`String` header as the charge.
1177    ///
1178    /// O(index entries): operator/monitoring surface (`memory_stats` /
1179    /// `spg_memory_stats`), not a query path.
1180    #[must_use]
1181    pub fn approx_resident_bytes(&self) -> u64 {
1182        const HEADER: usize = 24; // Vec/String 3-word header on 64-bit.
1183        let loc = core::mem::size_of::<RowLocator>();
1184        match self {
1185            IndexKind::BTree(map) => {
1186                let key = core::mem::size_of::<IndexKey>();
1187                map.iter()
1188                    .map(|(_, locs)| (key + HEADER + locs.len() * loc) as u64)
1189                    .sum()
1190            }
1191            IndexKind::Nsw(g) => {
1192                // `levels` is one byte per node; each layer's adjacency
1193                // is a `Vec<u32>` per node whose actual length we walk
1194                // (the dense layer-0 list dominates, but upper layers
1195                // are sparse — the old estimate ignored that).
1196                let mut b = g.levels.len() as u64;
1197                for layer in &g.layers {
1198                    for nbrs in layer.iter() {
1199                        b += (HEADER + nbrs.len() * core::mem::size_of::<u32>()) as u64;
1200                    }
1201                }
1202                b
1203            }
1204            // BRIN carries NO in-memory key→locator map (the (min,max)
1205            // summaries live in cold-segment sidecars on disk); the
1206            // resident footprint is just the column-type token.
1207            IndexKind::Brin { .. } => core::mem::size_of::<DataType>() as u64,
1208            IndexKind::Gin(map) | IndexKind::GinTrgm(map) | IndexKind::GinFulltext(map) => map
1209                .iter()
1210                .map(|(word, postings)| {
1211                    (word.len() + HEADER + HEADER + postings.len() * loc) as u64
1212                })
1213                .sum(),
1214        }
1215    }
1216}
1217
1218/// Multi-layer HNSW graph (v2.13). Each node is assigned a `top_level`;
1219/// it appears in layers `0..=top_level`. Higher layers are sparser, so
1220/// search starts from the entry at the top layer, greedy-descends to
1221/// layer 0, and beam-searches there. Layer 0 keeps a larger neighbour
1222/// budget (`m_max_0 = 2 * m` per the HNSW paper); upper layers cap at
1223/// `m`. The struct name stays `NswGraph` so external users / on-disk
1224/// callers don't have to track a rename — the algorithm changed, the
1225/// data slot didn't.
1226#[derive(Debug, Clone)]
1227pub struct NswGraph {
1228    /// Max neighbours per node on layers ≥ 1.
1229    pub m: usize,
1230    /// Max neighbours on layer 0 (the dense bottom layer). HNSW
1231    /// convention: `m_max_0 = 2 * m`.
1232    pub m_max_0: usize,
1233    /// Entry point — the node that sits on the topmost layer. Search
1234    /// always starts here.
1235    pub entry: Option<usize>,
1236    /// Top layer of the entry node (== `layers.len() - 1` when populated).
1237    pub entry_level: u8,
1238    /// `levels[i]` = top layer of node `i`. Nodes whose vector cell is
1239    /// NULL / non-Vector have `levels[i] = 0` and no neighbour entries.
1240    ///
1241    /// v5.5.0: backed by `PersistentVec` so `NswGraph::clone` (and the
1242    /// `Catalog::clone` on every group-commit write that contains it) is O(1)
1243    /// structural-sharing instead of an O(N) element copy.
1244    pub levels: PersistentVec<u8>,
1245    /// `layers[l][i]` = neighbours of node `i` at layer `l`. Inner vec
1246    /// is empty when node `i` doesn't reach layer `l`.
1247    ///
1248    /// v5.5.0: the per-node middle dimension (the O(N) one) is a
1249    /// `PersistentVec`; the outer layer dimension stays a plain `Vec`
1250    /// (layer count ≤ 8, so its clone is O(1) in practice) and the inner
1251    /// neighbour list stays a `Vec` (bounded by `m_max_0`).
1252    ///
1253    /// v6.1.x: neighbour slot widened from `usize` (8 B on 64-bit) to
1254    /// `u32` (4 B). Row indices are catalog-bounded by `u32::MAX` (4G
1255    /// rows per table); the cast at the NSW boundary asserts this. At
1256    /// 1M dim-128 SQ8, layer 0 adjacency alone shrinks by ~128 MiB
1257    /// — the largest single contribution to the v6.0.5-measured
1258    /// 624 MiB ambition gap. On-disk format already used u32 LE, so
1259    /// this is a pure in-memory layout change; no `FILE_VERSION` bump.
1260    pub layers: Vec<PersistentVec<Vec<u32>>>,
1261}
1262
1263impl NswGraph {
1264    fn new(m: usize) -> Self {
1265        Self {
1266            m,
1267            m_max_0: m.saturating_mul(2),
1268            entry: None,
1269            entry_level: 0,
1270            levels: PersistentVec::new(),
1271            layers: alloc::vec![PersistentVec::new()],
1272        }
1273    }
1274
1275    /// Max-neighbour budget for layer `l`.
1276    pub const fn cap_for_layer(&self, layer: u8) -> usize {
1277        if layer == 0 { self.m_max_0 } else { self.m }
1278    }
1279}
1280
1281/// Deterministic level assignment, seeded on the row index so the same
1282/// insert order reproduces the same topology. Distribution is roughly
1283/// HNSW-flavoured with `mL ≈ 1/ln(M) ≈ 0.36` for M=16: each 4-bit
1284/// chunk that comes up zero promotes the node one layer (so P(level ≥
1285/// L) ≈ (1/16)^L).
1286#[allow(clippy::verbose_bit_mask)] // clippy suggests trailing_zeros(); we need an explicit MAX cap and a stable distribution shape.
1287pub fn nsw_assign_level(row_idx: usize) -> u8 {
1288    const MAX_LEVEL: u8 = 7; // 7 ⇒ ~16^7 ≈ 2.7e8 expected nodes between promotions; ample.
1289    // SplitMix-style mixer — cheap and seedable.
1290    let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
1291    x ^= x >> 30;
1292    x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
1293    x ^= x >> 27;
1294    x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
1295    x ^= x >> 31;
1296    // Count contiguous low-end zero nibbles (4-bit chunks). Each zero
1297    // nibble has probability 1/16, mirroring HNSW's `mL ≈ 1/ln(M)` for
1298    // M=16. `trailing_zeros / 4` would lose the ordering when x = 0, so
1299    // a plain loop with a cap is clearer.
1300    let mut level: u8 = 0;
1301    while x & 0xF == 0 && level < MAX_LEVEL {
1302        level += 1;
1303        x >>= 4;
1304    }
1305    level
1306}
1307
1308impl Index {
1309    fn new_btree(name: String, column_position: usize) -> Self {
1310        Self {
1311            name,
1312            column_position,
1313            kind: IndexKind::BTree(PersistentBTreeMap::new()),
1314            included_columns: Vec::new(),
1315            partial_predicate: None,
1316            expression: None,
1317            is_unique: false,
1318            extra_column_positions: Vec::new(),
1319        }
1320    }
1321
1322    fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
1323        Self {
1324            name,
1325            column_position,
1326            kind: IndexKind::Nsw(NswGraph::new(m)),
1327            included_columns: Vec::new(),
1328            partial_predicate: None,
1329            expression: None,
1330            is_unique: false,
1331            extra_column_positions: Vec::new(),
1332        }
1333    }
1334
1335    /// v6.7.1 — BRIN index constructor. BRIN carries no in-memory
1336    /// data; the `column_type` snapshot is used by the segment
1337    /// encoder + planner for type-checking range predicates.
1338    fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
1339        Self {
1340            name,
1341            column_position,
1342            kind: IndexKind::Brin { column_type },
1343            included_columns: Vec::new(),
1344            partial_predicate: None,
1345            expression: None,
1346            is_unique: false,
1347            extra_column_positions: Vec::new(),
1348        }
1349    }
1350
1351    /// v7.12.3 — GIN inverted-index constructor. Empty posting-list
1352    /// map; caller (typically [`Table::add_gin_index`] or
1353    /// [`Table::restore_gin_index`]) populates it from existing rows
1354    /// or from a deserialised snapshot.
1355    fn new_gin(name: String, column_position: usize) -> Self {
1356        Self {
1357            name,
1358            column_position,
1359            kind: IndexKind::Gin(PersistentBTreeMap::new()),
1360            included_columns: Vec::new(),
1361            partial_predicate: None,
1362            expression: None,
1363            is_unique: false,
1364            extra_column_positions: Vec::new(),
1365        }
1366    }
1367
1368    /// v7.15.0 — `gin_trgm_ops`-flavoured GIN constructor. Same
1369    /// shape as `new_gin` but the posting-list keys are 3-byte
1370    /// trigram shingles (`pg_trgm`-compatible) and the column
1371    /// type is `TEXT` / `VARCHAR` (not `TSVECTOR`).
1372    fn new_gin_trgm(name: String, column_position: usize) -> Self {
1373        Self {
1374            name,
1375            column_position,
1376            kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
1377            included_columns: Vec::new(),
1378            partial_predicate: None,
1379            expression: None,
1380            is_unique: false,
1381            extra_column_positions: Vec::new(),
1382        }
1383    }
1384
1385    /// v7.17.0 Phase 2.2 — MySQL `FULLTEXT KEY` GIN constructor.
1386    /// Same shape as `new_gin_trgm` but the posting-list keys
1387    /// are lower-cased word lexemes (`to_tsvector('simple', col)`
1388    /// equivalent) instead of trigrams, and the column type is
1389    /// `TEXT` / `VARCHAR` (not `TSVECTOR`).
1390    fn new_gin_fulltext(name: String, column_position: usize) -> Self {
1391        Self {
1392            name,
1393            column_position,
1394            kind: IndexKind::GinFulltext(PersistentBTreeMap::new()),
1395            included_columns: Vec::new(),
1396            partial_predicate: None,
1397            expression: None,
1398            is_unique: false,
1399            extra_column_positions: Vec::new(),
1400        }
1401    }
1402
1403    /// v7.34.4 — descending-order iterator over `(IndexKey, locators)`
1404    /// pairs for a BTree index, with O(log N) descent to the rightmost
1405    /// leaf and lazy emission thereafter. Returns an empty iterator
1406    /// for non-BTree index kinds — callers handle both uniformly.
1407    /// Used by the ORDER BY `<indexed col>` DESC + LIMIT N executor
1408    /// path: walking only the first N matches off the rightmost leaf
1409    /// avoids the per-row materialisation + partial-sort cost on
1410    /// large tables (mailrs `content_worker` at 250 k rows).
1411    pub fn iter_desc(
1412        &self,
1413    ) -> alloc::boxed::Box<dyn Iterator<Item = (&IndexKey, &alloc::vec::Vec<RowLocator>)> + '_>
1414    {
1415        match &self.kind {
1416            IndexKind::BTree(m) => alloc::boxed::Box::new(m.iter_rev()),
1417            IndexKind::Nsw(_)
1418            | IndexKind::Brin { .. }
1419            | IndexKind::Gin(_)
1420            | IndexKind::GinTrgm(_)
1421            | IndexKind::GinFulltext(_) => alloc::boxed::Box::new(core::iter::empty()),
1422        }
1423    }
1424
1425    /// v7.34.4 — ascending-order iterator over `(IndexKey, locators)`
1426    /// pairs. Mirror of `iter_desc` for ORDER BY ... ASC + LIMIT N.
1427    pub fn iter_asc(
1428        &self,
1429    ) -> alloc::boxed::Box<dyn Iterator<Item = (&IndexKey, &alloc::vec::Vec<RowLocator>)> + '_>
1430    {
1431        match &self.kind {
1432            IndexKind::BTree(m) => alloc::boxed::Box::new(m.iter()),
1433            IndexKind::Nsw(_)
1434            | IndexKind::Brin { .. }
1435            | IndexKind::Gin(_)
1436            | IndexKind::GinTrgm(_)
1437            | IndexKind::GinFulltext(_) => alloc::boxed::Box::new(core::iter::empty()),
1438        }
1439    }
1440
1441    /// Look up the locators stored under `key` (B-tree only). Returns
1442    /// an empty slice when the key is absent or the index isn't a
1443    /// BTree — callers can treat both cases uniformly.
1444    ///
1445    /// v5.1: return type widened from `&[usize]` to `&[RowLocator]`.
1446    /// Pre-v5.2 callers can read the slice and `.as_hot().unwrap()`
1447    /// each entry (no `Cold` variants exist until the freezer lands);
1448    /// post-v5.2 callers dispatch hot vs. cold per locator.
1449    pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
1450        match &self.kind {
1451            IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
1452            // BRIN / NSW / GIN / trigram-GIN / fulltext-GIN have
1453            // no IndexKey-keyed map; lookup is a no-op. GIN uses
1454            // [`Index::gin_lookup_word`] instead.
1455            IndexKind::Nsw(_)
1456            | IndexKind::Brin { .. }
1457            | IndexKind::Gin(_)
1458            | IndexKind::GinTrgm(_)
1459            | IndexKind::GinFulltext(_) => &[][..],
1460        }
1461    }
1462
1463    /// v7.12.3 — GIN posting-list lookup. Returns the row locators
1464    /// whose `tsvector` cell contains `word`. Empty when the word is
1465    /// absent from the index or this isn't a GIN index.
1466    pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
1467        match &self.kind {
1468            // v7.17.0 Phase 2.2 — fulltext-GIN shares the same
1469            // lexeme-keyed posting list shape as the
1470            // tsvector-typed GIN, so the same lookup applies.
1471            IndexKind::Gin(m) | IndexKind::GinFulltext(m) => {
1472                m.get(&String::from(word)).map_or(&[][..], Vec::as_slice)
1473            }
1474            IndexKind::BTree(_)
1475            | IndexKind::Nsw(_)
1476            | IndexKind::Brin { .. }
1477            | IndexKind::GinTrgm(_) => &[][..],
1478        }
1479    }
1480
1481    /// v7.15.0 — trigram-GIN posting-list lookup. Returns the row
1482    /// locators whose indexed `TEXT` cell contains the trigram
1483    /// `tri`. Empty when the trigram is absent or this isn't a
1484    /// trigram-GIN index.
1485    pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
1486        match &self.kind {
1487            IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
1488            IndexKind::BTree(_)
1489            | IndexKind::Nsw(_)
1490            | IndexKind::Brin { .. }
1491            | IndexKind::Gin(_)
1492            | IndexKind::GinFulltext(_) => &[][..],
1493        }
1494    }
1495
1496    /// Borrow the NSW graph (if this is an NSW index). Callers that need
1497    /// the graph for a kNN search go through here.
1498    pub const fn nsw(&self) -> Option<&NswGraph> {
1499        match &self.kind {
1500            IndexKind::Nsw(g) => Some(g),
1501            IndexKind::BTree(_)
1502            | IndexKind::Brin { .. }
1503            | IndexKind::Gin(_)
1504            | IndexKind::GinTrgm(_)
1505            | IndexKind::GinFulltext(_) => None,
1506        }
1507    }
1508
1509    /// v6.7.1 — true when this index is a BRIN (block range) index.
1510    /// Used by the segment encoder to opt into BRIN sidecar emission
1511    /// at freeze time, and by the planner to opt into page-skipping
1512    /// on range predicates.
1513    pub const fn is_brin(&self) -> bool {
1514        matches!(self.kind, IndexKind::Brin { .. })
1515    }
1516
1517    /// v7.15.0 — true when this index is a trigram GIN
1518    /// (`gin_trgm_ops`-flavoured). Used by the LIKE planner to
1519    /// opt into trigram acceleration.
1520    pub const fn is_gin_trgm(&self) -> bool {
1521        matches!(self.kind, IndexKind::GinTrgm(_))
1522    }
1523
1524    /// v7.12.3 — true when this index is a GIN inverted index.
1525    /// Used by the planner to opt into posting-list acceleration on
1526    /// `WHERE col @@ tsquery` predicates.
1527    pub const fn is_gin(&self) -> bool {
1528        matches!(self.kind, IndexKind::Gin(_))
1529    }
1530
1531    /// v7.17.0 Phase 2.2 — true when this index is a fulltext
1532    /// GIN over a TEXT / VARCHAR column (MySQL `FULLTEXT KEY`
1533    /// surface). Used by the planner to opt the FULLTEXT-indexed
1534    /// column into MATCH AGAINST acceleration.
1535    pub const fn is_gin_fulltext(&self) -> bool {
1536        matches!(self.kind, IndexKind::GinFulltext(_))
1537    }
1538}
1539
1540/// In-memory table: schema + a persistent row vector + secondary indices.
1541///
1542/// v4.39: `rows` is a [`PersistentVec`] (Bitmapped Vector Trie, 32-way) so
1543/// `Table::clone()` is `O(1)` — the whole reason for v4.39's existence is
1544/// to make `Catalog::clone()` cheap inside the v4.34 auto-commit wrap.
1545///
1546/// v5.2.1: `hot_bytes` tracks the encoded byte size of every row currently
1547/// in [`Self::rows`], summed over rows. Updated incrementally by `insert`
1548/// (+= encoded row size), `delete_rows` (-= removed rows' encoded sizes),
1549/// and `update_row` (-= old size, += new size). The value is what the
1550/// v5.2 freezer reads to decide when to demote cold rows — when the
1551/// catalog-wide sum crosses `SPG_HOT_TIER_BYTES` (default 4 GiB) the
1552/// freezer thread wakes. v5.2.1 ships measurement only; the freezer
1553/// itself lands in v5.2.2. Stored as `u64` so a single field clone in
1554/// `Catalog::clone` stays at the O(1) invariant v4.39 built.
1555/// v7.34 (crash-recovery P0 #2) — one row-level physical redo record.
1556/// Row-level redo replaces statement-based WAL replay (which re-executes
1557/// each SQL through the full engine — O(records × catalog_rows), the
1558/// superlinear recovery hang root-caused on the mailrs crash-recovery
1559/// P0). A `RowChange` is the exact storage mutation the engine applied
1560/// (`Table::insert` / `update_row` / `delete_rows`); replaying it on a
1561/// catalog restored from the matching checkpoint reproduces the state
1562/// WITHOUT re-validating uniqueness/FK/parse/plan — O(changed rows).
1563///
1564/// Positions are physical, not key-based: `serialize`/`deserialize`
1565/// preserve row order exactly (rows written + read back in `self.rows`
1566/// order) and the mutation ops are deterministic, so the same op sequence
1567/// replayed from the same checkpoint reproduces the same positions. This
1568/// matches PostgreSQL's physical redo and supports tables with no primary
1569/// key. (Caveat handled at replay integration: a post-checkpoint cold-tier
1570/// freeze shifts hot positions and must itself be logged or fenced by a
1571/// checkpoint — see `row-level-redo-design`.)
1572#[derive(Debug, Clone, PartialEq)]
1573pub enum RowChange {
1574    /// Append `row` to `table`.
1575    Insert { table: String, row: Row },
1576    /// Replace the row at physical `pos` in `table` with `new_row`.
1577    Update {
1578        table: String,
1579        pos: usize,
1580        new_row: Vec<Value>,
1581    },
1582    /// Remove the rows at the given physical `positions` from `table`.
1583    Delete {
1584        table: String,
1585        positions: Vec<usize>,
1586    },
1587}
1588
1589/// v7.34 (crash-recovery P0 #2) — encode a row-level redo log to bytes for
1590/// a WAL record. Self-describing: the writer's `FILE_VERSION` leads so a
1591/// later spg can decode it via the version-gated value codec. Layout:
1592/// `[u8 version][u32 count]` then per change `[u8 op][str table]` and,
1593/// per op, `Insert [u32 n][value×n]`, `Update [u32 pos][u32 n][value×n]`,
1594/// `Delete [u32 n][u32 pos×n]`. Positions are physical (u32 ≤ 4 G rows).
1595#[must_use]
1596pub fn encode_redo_log(changes: &[RowChange]) -> Vec<u8> {
1597    let mut out = Vec::new();
1598    out.push(FILE_VERSION);
1599    codec::write_u32(&mut out, changes.len() as u32);
1600    let write_values = |out: &mut Vec<u8>, vals: &[Value]| {
1601        codec::write_u32(out, vals.len() as u32);
1602        for v in vals {
1603            codec::write_value(out, v);
1604        }
1605    };
1606    for change in changes {
1607        match change {
1608            RowChange::Insert { table, row } => {
1609                out.push(0);
1610                codec::write_str(&mut out, table);
1611                write_values(&mut out, &row.values);
1612            }
1613            RowChange::Update {
1614                table,
1615                pos,
1616                new_row,
1617            } => {
1618                out.push(1);
1619                codec::write_str(&mut out, table);
1620                codec::write_u32(&mut out, *pos as u32);
1621                write_values(&mut out, new_row);
1622            }
1623            RowChange::Delete { table, positions } => {
1624                out.push(2);
1625                codec::write_str(&mut out, table);
1626                codec::write_u32(&mut out, positions.len() as u32);
1627                for p in positions {
1628                    codec::write_u32(&mut out, *p as u32);
1629                }
1630            }
1631        }
1632    }
1633    out
1634}
1635
1636/// v7.34 — decode a row-level redo log written by [`encode_redo_log`].
1637/// A truncated / corrupt buffer is a hard error (the embedding layer
1638/// frames each record with its own length + CRC; a frame that decodes
1639/// short is corruption, not a torn tail).
1640pub fn decode_redo_log(bytes: &[u8]) -> Result<Vec<RowChange>, StorageError> {
1641    let version = *bytes
1642        .first()
1643        .ok_or_else(|| StorageError::Corrupt("redo log: empty".into()))?;
1644    let mut cur = codec::Cursor::new(bytes).with_codec_version(version);
1645    let _version = cur.read_u8()?;
1646    let count = cur.read_u32()? as usize;
1647    let mut read_values = |cur: &mut codec::Cursor<'_>| -> Result<Vec<Value>, StorageError> {
1648        let n = cur.read_u32()? as usize;
1649        let mut vals = Vec::with_capacity(n);
1650        for _ in 0..n {
1651            vals.push(cur.read_value()?);
1652        }
1653        Ok(vals)
1654    };
1655    let mut changes = Vec::with_capacity(count);
1656    for _ in 0..count {
1657        let op = cur.read_u8()?;
1658        let table = cur.read_str()?;
1659        let change = match op {
1660            0 => RowChange::Insert {
1661                table,
1662                row: Row::new(read_values(&mut cur)?),
1663            },
1664            1 => {
1665                let pos = cur.read_u32()? as usize;
1666                RowChange::Update {
1667                    table,
1668                    pos,
1669                    new_row: read_values(&mut cur)?,
1670                }
1671            }
1672            2 => {
1673                let n = cur.read_u32()? as usize;
1674                let mut positions = Vec::with_capacity(n);
1675                for _ in 0..n {
1676                    positions.push(cur.read_u32()? as usize);
1677                }
1678                RowChange::Delete { table, positions }
1679            }
1680            other => {
1681                return Err(StorageError::Corrupt(alloc::format!(
1682                    "redo log: unknown op {other}"
1683                )));
1684            }
1685        };
1686        changes.push(change);
1687    }
1688    Ok(changes)
1689}
1690
1691#[derive(Debug, Clone)]
1692pub struct Table {
1693    schema: TableSchema,
1694    rows: PersistentVec<Row>,
1695    indices: Vec<Index>,
1696    hot_bytes: u64,
1697    /// v6.7.0 — cached count of rows currently materialised in the
1698    /// cold tier via `RowLocator::Cold` entries across THIS table's
1699    /// indices. Populated by `ANALYZE` (walks every BTree index and
1700    /// counts Cold locators); the count survives until the next
1701    /// ANALYZE recomputes it. Surfaced via `spg_statistic.cold_row_count`
1702    /// and `spg_stat_segment.table_name`.
1703    ///
1704    /// Honest scope: this is a CACHED count, not a live one.
1705    /// Freezer / promote / DELETE don't currently update the cache
1706    /// incrementally — they invalidate it by setting the
1707    /// `cold_row_count_stale` flag, and the next ANALYZE re-walks.
1708    /// Incremental maintenance is a v6.7.x candidate if observation
1709    /// shows the ANALYZE walk cost dominates.
1710    cold_row_count: u64,
1711    /// v6.7.0 — set when the cached `cold_row_count` may be wrong
1712    /// because rows moved into / out of the cold tier since the last
1713    /// ANALYZE. The virtual-table surface reports the cached value
1714    /// regardless (operators run ANALYZE to refresh).
1715    cold_row_count_stale: bool,
1716    /// v7.34 (crash-recovery P0 #2) — row-level redo capture buffer.
1717    /// `None` (default, in-memory mode) captures nothing — zero overhead.
1718    /// `Some` (set by the engine when persistence is on, before a
1719    /// mutating call) makes `insert` / `update_row` / `delete_rows`
1720    /// record the physical [`RowChange`] they applied, which the engine
1721    /// drains after the statement and writes to the WAL in place of the
1722    /// SQL text. Transient: never serialized; a `Catalog::clone` between
1723    /// enable and drain copies it (cheap — empty in the steady state).
1724    redo_log: Option<Vec<RowChange>>,
1725}
1726
1727/// Catalog: insertion-ordered `Vec<Table>` for stable iter / serialize,
1728/// plus a `BTreeMap<String, usize>` sidecar index so `get` / `get_mut`
1729/// run in O(log n) instead of the old linear scan with per-element
1730/// string compares.
1731///
1732/// A pure `BTreeMap<String, Table>` was tried in an interim version
1733/// of v3.1.2 and regressed the single-table catalog benches by ~10%
1734/// (the per-element `BTreeMap` overhead outweighs the lookup win
1735/// when n is small). The sidecar shape preserves the insertion-order
1736/// iteration the on-disk encoding relies on and keeps `last_mut`
1737/// (used by the deserialize hot path) cheap.
1738#[derive(Debug, Clone, Default)]
1739pub struct Catalog {
1740    tables: Vec<Table>,
1741    /// `name → tables[index]`. Kept in lock-step with `tables`.
1742    /// `create_table` is the only write path.
1743    by_name: BTreeMap<String, usize>,
1744    /// v5.1: in-memory cold-tier segments. Side-loaded via
1745    /// [`Catalog::load_segment_bytes`] — they live outside the
1746    /// catalog snapshot (caller persists them as separate files
1747    /// and re-loads on boot, until v5.3's `CatalogManifest` makes
1748    /// that wiring automatic). `RowLocator::Cold { segment_id, .. }`
1749    /// indexes this `Vec`. Cleared on `Catalog::new` / fresh
1750    /// `deserialize`.
1751    ///
1752    /// `Arc` wrap keeps `Catalog::clone` at O(N segments) bumps
1753    /// (rather than O(total segment bytes) memcpy) so the v4.42
1754    /// group-commit pre-image rollback invariant — clone is
1755    /// effectively free — survives the cold-tier addition.
1756    ///
1757    /// v6.7.3 — slots became `Option<…>` so cold-segment compaction
1758    /// can tombstone merged sources without breaking the
1759    /// `segment_id = index_into_vec` contract that on-disk
1760    /// `RowLocator::Cold { segment_id }` already serialized.
1761    /// `None` slot = the segment was retired by compaction; the
1762    /// physical file may still be on disk (next CHECKPOINT writes
1763    /// a manifest that no longer lists it, and the file becomes
1764    /// an orphan eligible for offline cleanup).
1765    cold_segments: Vec<Option<Arc<OwnedSegment>>>,
1766    /// v7.12.4 — user-defined functions (PL/pgSQL + SQL).
1767    /// Keyed by function name (PG overloading is out of scope).
1768    /// Bodies are stored as the raw source text the parser saw
1769    /// between `$$ ... $$`; the engine re-parses on each
1770    /// invocation. This keeps `spg-storage` free of `spg-sql`
1771    /// dependency — same pattern as partial-index predicates.
1772    functions: BTreeMap<String, FunctionDef>,
1773    /// v7.12.4 — triggers in insertion order. Multiple triggers
1774    /// per table / event fire in this order (matching PG's
1775    /// alphabetical-by-default with insertion-stable tie-break
1776    /// behaviour — we just keep insertion order for now).
1777    triggers: Vec<TriggerDef>,
1778    /// v7.17.0 — catalogued SEQUENCE objects (Phase 1.1). Each
1779    /// `nextval(name)` reaches in here, atomically increments
1780    /// `last_value` / flips `is_called`, returns the new value.
1781    /// Persisted in catalog FILE_VERSION 26+; older catalogs
1782    /// deserialise with an empty map.
1783    sequences: BTreeMap<String, SequenceDef>,
1784    /// v7.17.0 — catalogued VIEW objects (Phase 1.2). Each
1785    /// `SELECT FROM v` at engine exec-time looks up `v` here and
1786    /// prepends the view body as a synthetic CTE. Persisted in
1787    /// catalog FILE_VERSION 27+; older catalogs deserialise with
1788    /// an empty map.
1789    views: BTreeMap<String, ViewDef>,
1790    /// v7.17.0 — catalogued MATERIALIZED VIEW source registry
1791    /// (Phase 1.3). Maps name → SELECT source. The materialised
1792    /// rows themselves live as a regular `Table` with the same
1793    /// name; REFRESH re-parses + re-executes the source against
1794    /// the table. Persisted in catalog FILE_VERSION 28+;
1795    /// older catalogs deserialise with an empty map.
1796    materialized_views: BTreeMap<String, String>,
1797    /// v7.17.0 — catalogued user-defined ENUM types (Phase 1.4).
1798    /// Maps name → label list. Columns reference these by name
1799    /// via `ColumnSchema.user_enum_type`. Persisted in catalog
1800    /// FILE_VERSION 29+; older catalogs deserialise with an empty
1801    /// map.
1802    enum_types: BTreeMap<String, EnumDef>,
1803    /// v7.17.0 — catalogued user-defined DOMAIN types (Phase 1.5).
1804    /// Maps name → base + CHECK constraints. Columns reference
1805    /// these by name via `ColumnSchema.user_domain_type`.
1806    /// Persisted in catalog FILE_VERSION 30+; older catalogs
1807    /// deserialise with an empty map.
1808    domain_types: BTreeMap<String, DomainDef>,
1809    /// v7.17.0 — schema-namespace registry (Phase 1.6). Tracks
1810    /// which schemas exist. `public`, `pg_catalog`, and
1811    /// `information_schema` are built-in and always present.
1812    /// Schema-qualified table references still strip the prefix
1813    /// at lookup time per v7.16-and-earlier — full
1814    /// schema-as-isolation is v7.18+ scope. Persisted in catalog
1815    /// FILE_VERSION 31+; older catalogs deserialise with just
1816    /// the built-ins.
1817    schemas: alloc::collections::BTreeSet<String>,
1818}
1819
1820/// v7.12.4 — catalogued user-defined function. `body` is the raw
1821/// source text between `$$ ... $$`; the engine re-parses it on
1822/// invocation. This keeps the storage codec stable when the
1823/// PL/pgSQL surface grows (no breaking-change risk on the disk
1824/// format).
1825#[derive(Debug, Clone, PartialEq, Eq)]
1826pub struct FunctionDef {
1827    pub name: String,
1828    /// Display form of the argument list, e.g.
1829    /// `"(name TEXT, ts TIMESTAMP)"`. Empty `"()"` for the trigger
1830    /// function shape. Parser-side canonicalised before storage.
1831    pub args_repr: String,
1832    /// Display form of the return type, e.g. `"TRIGGER"` /
1833    /// `"INT"` / `"SETOF text"`. The engine special-cases
1834    /// `"TRIGGER"` (case-insensitive) to gate trigger-only
1835    /// semantics (NEW/OLD).
1836    pub returns: String,
1837    /// `LANGUAGE` clause, lowercased. `"plpgsql"` / `"sql"`.
1838    pub language: String,
1839    /// Source body of the function. PL/pgSQL: includes the
1840    /// surrounding `BEGIN ... END;`. SQL: includes the
1841    /// statement(s). The engine re-parses on invocation; bad
1842    /// bodies surface as a parse error at CALL time, not CREATE.
1843    pub body: String,
1844}
1845
1846/// v7.12.4 — catalogued trigger. References its function by
1847/// name; the function must exist at TRIGGER creation time
1848/// (forward references are deferred to v7.12.5+).
1849#[derive(Debug, Clone, PartialEq, Eq)]
1850pub struct TriggerDef {
1851    pub name: String,
1852    /// Watched table. Trigger is dropped when the table drops.
1853    pub table: String,
1854    /// `"BEFORE"` / `"AFTER"` / `"INSTEAD OF"`. Stored as the
1855    /// uppercased keyword so deserialised catalogs round-trip
1856    /// without canonicalisation surprises.
1857    pub timing: String,
1858    /// Each entry is one of `"INSERT"` / `"UPDATE"` / `"DELETE"`
1859    /// / `"TRUNCATE"`. `INSERT OR UPDATE` parses to two entries.
1860    pub events: Vec<String>,
1861    /// `"ROW"` / `"STATEMENT"`. v7.12.4 ships `"ROW"` only;
1862    /// `"STATEMENT"` parses and persists but the executor
1863    /// refuses it at trigger fire time.
1864    pub for_each: String,
1865    /// Name of the PL/pgSQL function to invoke.
1866    pub function: String,
1867    /// v7.13.0 — `UPDATE OF col, col, …` column-list filter
1868    /// (mailrs round-5 G7). Non-empty means the trigger fires
1869    /// only when at least one of these columns appears in the
1870    /// UPDATE's SET list. Empty = no column filter. Stored in
1871    /// catalog FILE_VERSION 23+; older catalogs deserialise with
1872    /// an empty vec.
1873    pub update_columns: Vec<String>,
1874    /// v7.16.1 — whether the trigger fires when its watched
1875    /// event occurs. Toggled by `ALTER TABLE … { ENABLE |
1876    /// DISABLE } TRIGGER …`; pg_dump --disable-triggers wraps
1877    /// every data block with a DISABLE/ENABLE pair so the
1878    /// rows already-computed in prod don't get re-rewritten.
1879    /// Defaults to `true` at CREATE TRIGGER time. Stored in
1880    /// catalog FILE_VERSION 25+; older catalogs deserialise
1881    /// with `enabled = true`.
1882    pub enabled: bool,
1883}
1884
1885/// v7.17.0 — catalogued SEQUENCE. PG semantics: a counter object
1886/// returning monotonically increasing values via `nextval(name)`.
1887/// `last_value` is the most recent value handed out; `is_called`
1888/// is false until the first `nextval`/`setval`. Stored separately
1889/// from tables in the catalog.
1890#[derive(Debug, Clone, PartialEq, Eq)]
1891pub struct SequenceDef {
1892    pub name: String,
1893    /// Data type — narrows the i64 range. PG default BIGINT.
1894    pub data_type: SequenceDataType,
1895    pub start: i64,
1896    pub increment: i64,
1897    pub min_value: i64,
1898    pub max_value: i64,
1899    pub cache: i64,
1900    pub cycle: bool,
1901    /// `OWNED BY` target — `(table, column)` or NONE.
1902    pub owned_by: Option<(String, String)>,
1903    /// Most recently handed-out value. Meaningless when
1904    /// `is_called == false`; in that case the NEXT `nextval`
1905    /// will return `start`.
1906    pub last_value: i64,
1907    pub is_called: bool,
1908}
1909
1910/// v7.17.0 — sequence integer width.
1911#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1912pub enum SequenceDataType {
1913    SmallInt,
1914    Int,
1915    BigInt,
1916}
1917
1918/// v7.17.0 Phase 1.6 — built-in schema names that every Catalog
1919/// understands without an explicit CREATE SCHEMA. Used by
1920/// [`Catalog::schema_exists`] and the engine's schema-qualified
1921/// lookup path.
1922#[must_use]
1923pub fn is_builtin_schema(name: &str) -> bool {
1924    name.eq_ignore_ascii_case("public")
1925        || name.eq_ignore_ascii_case("pg_catalog")
1926        || name.eq_ignore_ascii_case("information_schema")
1927}
1928
1929/// v7.17.0 — parse a PG-canonical UUID text representation into the
1930/// 16-byte network-order layout used by `Value::Uuid`. Accepted input
1931/// shapes (all case-insensitive):
1932///   * Canonical hyphenated 8-4-4-4-12 (`550e8400-e29b-41d4-a716-446655440000`)
1933///   * Unhyphenated 32-char hex (`550e8400e29b41d4a716446655440000`)
1934///   * Either form wrapped in `{ ... }`
1935///
1936/// Returns `None` for any malformed input (wrong length, non-hex
1937/// characters, misplaced hyphens). The caller surfaces a SQL error
1938/// at coercion time — silent acceptance of garbage would mask
1939/// application bugs and is exactly the divergence from PG that
1940/// breaks the 0-change cutover promise.
1941#[must_use]
1942pub fn parse_uuid_str(input: &str) -> Option<[u8; 16]> {
1943    let s = input.trim();
1944    // Strip surrounding braces if present.
1945    let s = if let Some(inner) = s.strip_prefix('{').and_then(|x| x.strip_suffix('}')) {
1946        inner
1947    } else {
1948        s
1949    };
1950    // Two valid shapes after braces are stripped: 32 hex chars or
1951    // the canonical 36-char hyphenated form.
1952    let hex: String = match s.len() {
1953        32 => s.to_ascii_lowercase(),
1954        36 => {
1955            // Hyphens must be exactly at positions 8, 13, 18, 23.
1956            let b = s.as_bytes();
1957            if b[8] != b'-' || b[13] != b'-' || b[18] != b'-' || b[23] != b'-' {
1958                return None;
1959            }
1960            let mut out = String::with_capacity(32);
1961            out.push_str(&s[0..8]);
1962            out.push_str(&s[9..13]);
1963            out.push_str(&s[14..18]);
1964            out.push_str(&s[19..23]);
1965            out.push_str(&s[24..36]);
1966            out.make_ascii_lowercase();
1967            out
1968        }
1969        _ => return None,
1970    };
1971    let bytes = hex.as_bytes();
1972    let mut out = [0u8; 16];
1973    for i in 0..16 {
1974        let hi = hex_nibble(bytes[i * 2])?;
1975        let lo = hex_nibble(bytes[i * 2 + 1])?;
1976        out[i] = (hi << 4) | lo;
1977    }
1978    Some(out)
1979}
1980
1981fn hex_nibble(b: u8) -> Option<u8> {
1982    match b {
1983        b'0'..=b'9' => Some(b - b'0'),
1984        b'a'..=b'f' => Some(10 + b - b'a'),
1985        b'A'..=b'F' => Some(10 + b - b'A'),
1986        _ => None,
1987    }
1988}
1989
1990/// v7.17.0 — render a `Value::Uuid` payload as the canonical
1991/// lowercase 8-4-4-4-12 hyphenated form PG `text` cast surfaces.
1992#[must_use]
1993pub fn format_uuid(b: &[u8; 16]) -> String {
1994    const HEX: &[u8; 16] = b"0123456789abcdef";
1995    let mut out = String::with_capacity(36);
1996    for (i, byte) in b.iter().enumerate() {
1997        if matches!(i, 4 | 6 | 8 | 10) {
1998            out.push('-');
1999        }
2000        out.push(HEX[(byte >> 4) as usize] as char);
2001        out.push(HEX[(byte & 0x0f) as usize] as char);
2002    }
2003    out
2004}
2005
2006/// v7.17.0 Phase 1.5 — catalogued user-defined DOMAIN. A domain
2007/// is a named CHECK-constrained alias over a built-in type;
2008/// columns bound to it inherit the base type plus the CHECK
2009/// predicates + NOT NULL + DEFAULT at INSERT/UPDATE time.
2010/// `default` / `checks` are stored as Display-form source so
2011/// `spg-storage` stays free of `spg-sql` dependency — same
2012/// pattern as FunctionDef / ViewDef.
2013#[derive(Debug, Clone, PartialEq, Eq)]
2014pub struct DomainDef {
2015    pub name: String,
2016    pub base_type: DataType,
2017    pub nullable: bool,
2018    pub default: Option<String>,
2019    pub checks: Vec<String>,
2020}
2021
2022/// v7.17.0 Phase 1.4 — catalogued user-defined ENUM type. The
2023/// label vector is order-preserving (PG enum ordering follows the
2024/// declared order). At INSERT/UPDATE on a column bound to this
2025/// enum, the engine looks up the value against `labels` and
2026/// rejects non-members.
2027#[derive(Debug, Clone, PartialEq, Eq)]
2028pub struct EnumDef {
2029    pub name: String,
2030    pub labels: Vec<String>,
2031}
2032
2033/// v7.17.0 Phase 1.2 — catalogued VIEW. The body is stored as the
2034/// raw source text the parser saw between `AS` and the statement
2035/// terminator; the engine re-parses on each invocation. Same
2036/// pattern as `FunctionDef` — keeps `spg-storage` free of
2037/// `spg-sql` dependency.
2038#[derive(Debug, Clone, PartialEq, Eq)]
2039pub struct ViewDef {
2040    pub name: String,
2041    /// Optional `(col, col, …)` rename list. Empty when the body's
2042    /// projected names are used directly.
2043    pub columns: Vec<String>,
2044    /// Raw SELECT source. Display-rendered at storage time so the
2045    /// catalog round-trips a deterministic form regardless of
2046    /// whitespace / comments in the original input. Re-parsed at
2047    /// SELECT-from-view time to materialise as a synthetic CTE.
2048    pub body: String,
2049}
2050
2051impl SequenceDataType {
2052    /// PG default min/max per AS clause.
2053    pub fn default_bounds(self, increment_positive: bool) -> (i64, i64) {
2054        match self {
2055            Self::SmallInt => {
2056                if increment_positive {
2057                    (1, i64::from(i16::MAX))
2058                } else {
2059                    (i64::from(i16::MIN), -1)
2060                }
2061            }
2062            Self::Int => {
2063                if increment_positive {
2064                    (1, i64::from(i32::MAX))
2065                } else {
2066                    (i64::from(i32::MIN), -1)
2067                }
2068            }
2069            Self::BigInt => {
2070                if increment_positive {
2071                    (1, i64::MAX)
2072                } else {
2073                    (i64::MIN, -1)
2074                }
2075            }
2076        }
2077    }
2078}
2079
2080impl Catalog {
2081    pub const fn new() -> Self {
2082        Self {
2083            tables: Vec::new(),
2084            by_name: BTreeMap::new(),
2085            cold_segments: Vec::new(),
2086            functions: BTreeMap::new(),
2087            triggers: Vec::new(),
2088            sequences: BTreeMap::new(),
2089            views: BTreeMap::new(),
2090            materialized_views: BTreeMap::new(),
2091            enum_types: BTreeMap::new(),
2092            domain_types: BTreeMap::new(),
2093            schemas: alloc::collections::BTreeSet::new(),
2094        }
2095    }
2096
2097    /// v7.12.4 — read-only view of catalogued user-defined
2098    /// functions. Engine callers go through here to look up the
2099    /// function body before re-parsing it for invocation.
2100    pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
2101        &self.functions
2102    }
2103
2104    /// v7.12.4 — register a new user-defined function. With
2105    /// `or_replace = false`, errors if the name is taken. The
2106    /// engine validates the body before passing it here.
2107    pub fn create_function(
2108        &mut self,
2109        def: FunctionDef,
2110        or_replace: bool,
2111    ) -> Result<(), StorageError> {
2112        if !or_replace && self.functions.contains_key(&def.name) {
2113            return Err(StorageError::Corrupt(format!(
2114                "function {:?} already exists (drop or use CREATE OR REPLACE)",
2115                def.name
2116            )));
2117        }
2118        self.functions.insert(def.name.clone(), def);
2119        Ok(())
2120    }
2121
2122    /// v7.12.4 — remove a user-defined function by name. Returns
2123    /// `true` if a function was removed, `false` if none matched.
2124    /// Caller decides whether to surface `if_exists` semantics.
2125    pub fn drop_function(&mut self, name: &str) -> bool {
2126        self.functions.remove(name).is_some()
2127    }
2128
2129    /// v7.17.0 — read-only handle to catalogued sequences.
2130    pub const fn sequences(&self) -> &BTreeMap<String, SequenceDef> {
2131        &self.sequences
2132    }
2133
2134    /// v7.17.0 — register a new SEQUENCE. Errors if `name`
2135    /// collides with an existing sequence and `if_not_exists`
2136    /// is false.
2137    pub fn create_sequence(
2138        &mut self,
2139        def: SequenceDef,
2140        if_not_exists: bool,
2141    ) -> Result<(), StorageError> {
2142        if self.sequences.contains_key(&def.name) {
2143            if if_not_exists {
2144                return Ok(());
2145            }
2146            return Err(StorageError::Corrupt(format!(
2147                "sequence {:?} already exists",
2148                def.name
2149            )));
2150        }
2151        self.sequences.insert(def.name.clone(), def);
2152        Ok(())
2153    }
2154
2155    /// v7.17.0 — remove a SEQUENCE by name. Returns `true` if a
2156    /// sequence was removed, `false` if none matched. Caller
2157    /// surfaces IF EXISTS semantics.
2158    pub fn drop_sequence(&mut self, name: &str) -> bool {
2159        self.sequences.remove(name).is_some()
2160    }
2161
2162    /// v7.17.0 — atomic nextval. Increments `last_value` per
2163    /// `increment`, returns the new value, sets `is_called`.
2164    /// Returns an error on CYCLE-less overflow.
2165    pub fn sequence_next_value(&mut self, name: &str) -> Result<i64, StorageError> {
2166        let Some(seq) = self.sequences.get_mut(name) else {
2167            return Err(StorageError::Corrupt(format!(
2168                "sequence {name:?} does not exist"
2169            )));
2170        };
2171        // PG semantics: when !is_called (fresh sequence or
2172        // setval(_, false)), the next nextval returns the stored
2173        // `last_value`. When is_called, it advances by `increment`
2174        // and CYCLE-wraps on overflow.
2175        let candidate = if seq.is_called {
2176            let next = seq.last_value.checked_add(seq.increment).ok_or_else(|| {
2177                StorageError::Corrupt(format!("sequence {name:?} arithmetic overflow"))
2178            })?;
2179            if seq.increment > 0 {
2180                if next > seq.max_value {
2181                    if seq.cycle {
2182                        seq.min_value
2183                    } else {
2184                        return Err(StorageError::Corrupt(format!(
2185                            "sequence {name:?} reached MAXVALUE ({})",
2186                            seq.max_value
2187                        )));
2188                    }
2189                } else {
2190                    next
2191                }
2192            } else if next < seq.min_value {
2193                if seq.cycle {
2194                    seq.max_value
2195                } else {
2196                    return Err(StorageError::Corrupt(format!(
2197                        "sequence {name:?} reached MINVALUE ({})",
2198                        seq.min_value
2199                    )));
2200                }
2201            } else {
2202                next
2203            }
2204        } else {
2205            seq.last_value
2206        };
2207        seq.last_value = candidate;
2208        seq.is_called = true;
2209        Ok(candidate)
2210    }
2211
2212    /// v7.17.0 — currval. Errors if the session has never called
2213    /// nextval on this sequence (PG semantics). At the catalog
2214    /// level we approximate "session" with "is_called persisted";
2215    /// the engine session-tracking layer can wrap this for the
2216    /// strict per-session semantics later.
2217    pub fn sequence_current_value(&self, name: &str) -> Result<i64, StorageError> {
2218        let Some(seq) = self.sequences.get(name) else {
2219            return Err(StorageError::Corrupt(format!(
2220                "sequence {name:?} does not exist"
2221            )));
2222        };
2223        if !seq.is_called {
2224            return Err(StorageError::Corrupt(format!(
2225                "currval of sequence {name:?} is not yet defined in this session"
2226            )));
2227        }
2228        Ok(seq.last_value)
2229    }
2230
2231    /// v7.17.0 — setval(name, value [, is_called]). PG returns
2232    /// `value` regardless. `is_called=true` means the NEXT
2233    /// nextval will return `value + increment`; `is_called=false`
2234    /// means the next nextval will return `value`.
2235    pub fn sequence_set_value(
2236        &mut self,
2237        name: &str,
2238        value: i64,
2239        is_called: bool,
2240    ) -> Result<i64, StorageError> {
2241        let Some(seq) = self.sequences.get_mut(name) else {
2242            return Err(StorageError::Corrupt(format!(
2243                "sequence {name:?} does not exist"
2244            )));
2245        };
2246        seq.last_value = value;
2247        seq.is_called = is_called;
2248        Ok(value)
2249    }
2250
2251    /// v7.17.0 Phase 1.2 — read-only handle to catalogued views.
2252    pub const fn views(&self) -> &BTreeMap<String, ViewDef> {
2253        &self.views
2254    }
2255
2256    /// v7.17.0 Phase 1.2 — install a VIEW. `or_replace=true`
2257    /// overwrites an existing entry; `if_not_exists=true` is a
2258    /// silent no-op when the name is taken. Errors if both flags
2259    /// are off and the name collides.
2260    pub fn create_view(
2261        &mut self,
2262        def: ViewDef,
2263        or_replace: bool,
2264        if_not_exists: bool,
2265    ) -> Result<(), StorageError> {
2266        if self.views.contains_key(&def.name) {
2267            if or_replace {
2268                self.views.insert(def.name.clone(), def);
2269                return Ok(());
2270            }
2271            if if_not_exists {
2272                return Ok(());
2273            }
2274            return Err(StorageError::Corrupt(format!(
2275                "view {:?} already exists",
2276                def.name
2277            )));
2278        }
2279        // Reject name collision with tables / sequences — same
2280        // namespace per PG.
2281        if self.by_name.contains_key(&def.name) {
2282            return Err(StorageError::Corrupt(format!(
2283                "view {:?} would shadow an existing table",
2284                def.name
2285            )));
2286        }
2287        if self.sequences.contains_key(&def.name) {
2288            return Err(StorageError::Corrupt(format!(
2289                "view {:?} would shadow an existing sequence",
2290                def.name
2291            )));
2292        }
2293        self.views.insert(def.name.clone(), def);
2294        Ok(())
2295    }
2296
2297    /// v7.17.0 Phase 1.2 — remove a view by name. Returns true if
2298    /// a view was removed.
2299    pub fn drop_view(&mut self, name: &str) -> bool {
2300        self.views.remove(name).is_some()
2301    }
2302
2303    /// v7.17.0 Phase 1.3 — read-only handle to the materialised-
2304    /// view source registry. Each entry pairs with a regular
2305    /// table of the same name that holds the cached rows.
2306    pub const fn materialized_views(&self) -> &BTreeMap<String, String> {
2307        &self.materialized_views
2308    }
2309
2310    /// v7.17.0 Phase 1.3 — register a source for a materialised
2311    /// view. Caller has already created the backing table.
2312    pub fn register_materialized_view(&mut self, name: String, body: String) {
2313        self.materialized_views.insert(name, body);
2314    }
2315
2316    /// v7.17.0 Phase 1.3 — drop the source registry entry. Returns
2317    /// true if a source was unregistered. Caller separately drops
2318    /// the backing table.
2319    pub fn drop_materialized_view_source(&mut self, name: &str) -> bool {
2320        self.materialized_views.remove(name).is_some()
2321    }
2322
2323    /// v7.17.0 Phase 1.4 — read-only handle to user-defined ENUM
2324    /// catalog.
2325    pub const fn enum_types(&self) -> &BTreeMap<String, EnumDef> {
2326        &self.enum_types
2327    }
2328
2329    /// v7.17.0 Phase 1.4 — install a new ENUM type. Errors if
2330    /// `name` collides with an existing enum (no IF NOT EXISTS
2331    /// per PG semantics for CREATE TYPE).
2332    pub fn create_enum_type(&mut self, def: EnumDef) -> Result<(), StorageError> {
2333        if self.enum_types.contains_key(&def.name) {
2334            return Err(StorageError::Corrupt(format!(
2335                "type {:?} already exists",
2336                def.name
2337            )));
2338        }
2339        self.enum_types.insert(def.name.clone(), def);
2340        Ok(())
2341    }
2342
2343    /// v7.17.0 Phase 1.4 — drop an ENUM type by name. Returns
2344    /// true if a type was removed.
2345    pub fn drop_enum_type(&mut self, name: &str) -> bool {
2346        self.enum_types.remove(name).is_some()
2347    }
2348
2349    /// v7.17.0 Phase 1.5 — read-only handle to DOMAIN catalog.
2350    pub const fn domain_types(&self) -> &BTreeMap<String, DomainDef> {
2351        &self.domain_types
2352    }
2353
2354    /// v7.17.0 Phase 1.5 — install a DOMAIN. Errors on collision
2355    /// with an existing domain.
2356    pub fn create_domain_type(&mut self, def: DomainDef) -> Result<(), StorageError> {
2357        if self.domain_types.contains_key(&def.name) {
2358            return Err(StorageError::Corrupt(format!(
2359                "domain {:?} already exists",
2360                def.name
2361            )));
2362        }
2363        self.domain_types.insert(def.name.clone(), def);
2364        Ok(())
2365    }
2366
2367    /// v7.17.0 Phase 1.5 — drop a DOMAIN by name.
2368    pub fn drop_domain_type(&mut self, name: &str) -> bool {
2369        self.domain_types.remove(name).is_some()
2370    }
2371
2372    /// v7.17.0 Phase 1.6 — read-only handle to the user-created
2373    /// schema registry. Built-in schemas (`public`, `pg_catalog`,
2374    /// `information_schema`) are NOT included here; use
2375    /// [`schema_exists`](Self::schema_exists) for the full
2376    /// check.
2377    pub const fn user_schemas(&self) -> &alloc::collections::BTreeSet<String> {
2378        &self.schemas
2379    }
2380
2381    /// v7.17.0 Phase 1.6 — schema-name resolver. Returns true
2382    /// for built-in schemas + every user-CREATEd one. Used by
2383    /// CREATE SCHEMA collision checks and (future) by
2384    /// information_schema.schemata.
2385    pub fn schema_exists(&self, name: &str) -> bool {
2386        is_builtin_schema(name) || self.schemas.contains(name)
2387    }
2388
2389    /// v7.17.0 Phase 1.6 — register a new schema. Errors if the
2390    /// name already exists and `if_not_exists=false`. Built-in
2391    /// names cannot be redeclared.
2392    pub fn create_schema(&mut self, name: String, if_not_exists: bool) -> Result<(), StorageError> {
2393        if is_builtin_schema(&name) {
2394            if if_not_exists {
2395                return Ok(());
2396            }
2397            return Err(StorageError::Corrupt(format!(
2398                "schema {name:?} is built-in and cannot be redeclared"
2399            )));
2400        }
2401        if self.schemas.contains(&name) {
2402            if if_not_exists {
2403                return Ok(());
2404            }
2405            return Err(StorageError::Corrupt(format!(
2406                "schema {name:?} already exists"
2407            )));
2408        }
2409        self.schemas.insert(name);
2410        Ok(())
2411    }
2412
2413    /// v7.17.0 Phase 1.6 — drop a user-created schema. Returns
2414    /// true if a schema was removed. Built-in names always
2415    /// return false (cannot be dropped). Tables that previously
2416    /// used the schema as a prefix keep their bare name and stay
2417    /// queryable — this is the "prefix routing, not isolation"
2418    /// posture documented in v7.17 Phase 1.6.
2419    pub fn drop_schema(&mut self, name: &str) -> Result<bool, StorageError> {
2420        if is_builtin_schema(name) {
2421            return Err(StorageError::Corrupt(format!(
2422                "schema {name:?} is built-in and cannot be dropped"
2423            )));
2424        }
2425        Ok(self.schemas.remove(name))
2426    }
2427
2428    /// v7.17.0 — ALTER SEQUENCE option merge. Caller-provided
2429    /// updates overwrite the matching fields; unset fields keep
2430    /// their stored values. RESTART variants update last_value
2431    /// directly per PG: `RESTART` resets to current `start`;
2432    /// `RESTART WITH n` resets to `n`.
2433    #[allow(clippy::too_many_arguments)]
2434    pub fn alter_sequence(
2435        &mut self,
2436        name: &str,
2437        increment: Option<i64>,
2438        min_value: Option<i64>,
2439        max_value: Option<i64>,
2440        start: Option<i64>,
2441        restart: Option<Option<i64>>,
2442        cache: Option<i64>,
2443        cycle: Option<bool>,
2444        owned_by: Option<Option<(String, String)>>,
2445    ) -> Result<(), StorageError> {
2446        let Some(seq) = self.sequences.get_mut(name) else {
2447            return Err(StorageError::Corrupt(format!(
2448                "sequence {name:?} does not exist"
2449            )));
2450        };
2451        if let Some(v) = increment {
2452            seq.increment = v;
2453        }
2454        if let Some(v) = min_value {
2455            seq.min_value = v;
2456        }
2457        if let Some(v) = max_value {
2458            seq.max_value = v;
2459        }
2460        if let Some(v) = start {
2461            seq.start = v;
2462        }
2463        if let Some(restart_value) = restart {
2464            seq.last_value = restart_value.unwrap_or(seq.start);
2465            seq.is_called = false;
2466        }
2467        if let Some(v) = cache {
2468            seq.cache = v;
2469        }
2470        if let Some(v) = cycle {
2471            seq.cycle = v;
2472        }
2473        if let Some(v) = owned_by {
2474            seq.owned_by = v;
2475        }
2476        Ok(())
2477    }
2478
2479    /// v7.12.4 — read-only slice of all catalogued triggers.
2480    /// Engine row-write paths filter this by (table, event,
2481    /// timing) and fire matches in slice order.
2482    pub fn triggers(&self) -> &[TriggerDef] {
2483        &self.triggers
2484    }
2485
2486    /// v7.15.0 — mutable handle to the trigger slice for
2487    /// `ALTER TABLE … RENAME COLUMN`, which rewrites every
2488    /// `update_columns` entry that referenced the renamed
2489    /// column.
2490    pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
2491        &mut self.triggers
2492    }
2493
2494    /// v7.12.4 — register a new trigger. With `or_replace = false`,
2495    /// errors when a trigger with the same name already exists on
2496    /// the same table (PG scoping rule — trigger names are
2497    /// per-table, not global). Trigger function must already
2498    /// exist in the catalog at registration time.
2499    pub fn create_trigger(
2500        &mut self,
2501        def: TriggerDef,
2502        or_replace: bool,
2503    ) -> Result<(), StorageError> {
2504        if !self.by_name.contains_key(&def.table) {
2505            return Err(StorageError::TableNotFound {
2506                name: def.table.clone(),
2507            });
2508        }
2509        if !self.functions.contains_key(&def.function) {
2510            return Err(StorageError::Corrupt(format!(
2511                "trigger {:?} references unknown function {:?}",
2512                def.name, def.function
2513            )));
2514        }
2515        let dup = self
2516            .triggers
2517            .iter()
2518            .position(|t| t.name == def.name && t.table == def.table);
2519        match (dup, or_replace) {
2520            (Some(_), false) => Err(StorageError::Corrupt(format!(
2521                "trigger {:?} already exists on table {:?}",
2522                def.name, def.table
2523            ))),
2524            (Some(i), true) => {
2525                self.triggers[i] = def;
2526                Ok(())
2527            }
2528            (None, _) => {
2529                self.triggers.push(def);
2530                Ok(())
2531            }
2532        }
2533    }
2534
2535    /// v7.12.4 — remove a trigger by `(name, table)`. Returns
2536    /// `true` if one was removed.
2537    pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
2538        let before = self.triggers.len();
2539        self.triggers
2540            .retain(|t| !(t.name == name && t.table == table));
2541        before != self.triggers.len()
2542    }
2543
2544    pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2545        if self.by_name.contains_key(&schema.name) {
2546            return Err(StorageError::DuplicateTable {
2547                name: schema.name.clone(),
2548            });
2549        }
2550        let idx = self.tables.len();
2551        let name = schema.name.clone();
2552        self.tables.push(Table::new(schema));
2553        self.by_name.insert(name, idx);
2554        Ok(())
2555    }
2556
2557    pub fn get(&self, name: &str) -> Option<&Table> {
2558        let idx = *self.by_name.get(name)?;
2559        self.tables.get(idx)
2560    }
2561
2562    pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2563        let idx = *self.by_name.get(name)?;
2564        self.tables.get_mut(idx)
2565    }
2566
2567    /// v7.34 (crash-recovery P0 #2) — replay a row-level redo log onto
2568    /// this catalog (the [`RowChange`] physical-redo apply primitive that
2569    /// row-level WAL recovery will use in place of statement re-execution).
2570    /// Applies each change in order via the same `Table` mutators the
2571    /// engine used — no uniqueness/FK/parse/plan: the original execution
2572    /// already validated, replay trusts and applies. Positions are
2573    /// physical and only valid when replayed from the matching checkpoint
2574    /// baseline in original order (see [`RowChange`] docs).
2575    ///
2576    /// A change naming an absent table, or whose position is out of range,
2577    /// is a corrupt/misaligned log and surfaces as an error rather than a
2578    /// silent skip.
2579    pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), StorageError> {
2580        for change in changes {
2581            match change {
2582                RowChange::Insert { table, row } => {
2583                    self.table_for_redo(table)?.insert(row.clone())?;
2584                }
2585                RowChange::Update {
2586                    table,
2587                    pos,
2588                    new_row,
2589                } => {
2590                    self.table_for_redo(table)?
2591                        .update_row(*pos, new_row.clone())?;
2592                }
2593                RowChange::Delete { table, positions } => {
2594                    self.table_for_redo(table)?.delete_rows(positions);
2595                }
2596            }
2597        }
2598        Ok(())
2599    }
2600
2601    fn table_for_redo(&mut self, name: &str) -> Result<&mut Table, StorageError> {
2602        self.get_mut(name)
2603            .ok_or_else(|| StorageError::Corrupt(alloc::format!("redo: unknown table {name:?}")))
2604    }
2605
2606    /// v7.34 (crash-recovery P0 #2) — enable row-level redo capture on
2607    /// every table (the engine calls this before a mutating statement
2608    /// when persistence is on; idempotent, keeps any in-flight capture).
2609    pub fn enable_redo_all(&mut self) {
2610        for t in &mut self.tables {
2611            t.enable_redo();
2612        }
2613    }
2614
2615    /// v7.34 — drain the row-level redo captured across all tables, in
2616    /// table order then per-table apply order, and stop capturing. The
2617    /// engine calls this after a successful mutating statement and writes
2618    /// the returned [`RowChange`]s to the WAL in place of the SQL text.
2619    pub fn drain_redo(&mut self) -> Vec<RowChange> {
2620        let mut all = Vec::new();
2621        for t in &mut self.tables {
2622            all.extend(t.take_redo());
2623        }
2624        all
2625    }
2626
2627    pub fn table_count(&self) -> usize {
2628        self.tables.len()
2629    }
2630
2631    /// v7.14.0 — remove a table by name. Returns `true` when the
2632    /// table existed (and is now gone), `false` when it didn't.
2633    /// Used by `DROP TABLE` from pg_dump / mysqldump preambles
2634    /// where the dump re-creates schema and starts with
2635    /// `DROP TABLE IF EXISTS`.
2636    pub fn drop_table(&mut self, name: &str) -> bool {
2637        let Some(idx) = self.by_name.remove(name) else {
2638            return false;
2639        };
2640        // swap_remove invalidates the trailing index → rebuild
2641        // by_name for affected entries.
2642        self.tables.swap_remove(idx);
2643        // Re-stamp moved table's index slot in by_name.
2644        if idx < self.tables.len() {
2645            let moved_name = self.tables[idx].schema.name.clone();
2646            self.by_name.insert(moved_name, idx);
2647        }
2648        true
2649    }
2650
2651    /// v7.16.2 — rename a table (mailrs round-10 A.5). Updates
2652    /// the schema name, the catalog name → index map, and
2653    /// rewrites every reference dangling at the table name:
2654    ///   * every FK on every OTHER table whose `parent_table`
2655    ///     pointed at the old name now points at the new
2656    ///     name, so FK enforcement keeps working
2657    ///   * every trigger watching the table updates its `table`
2658    ///     field
2659    /// Returns `Ok` on success; `Err(StorageError::TableNotFound)`
2660    /// when the old name isn't in the catalog and
2661    /// `Err(StorageError::DuplicateTable)` when the new name is
2662    /// already taken.
2663    pub fn rename_table(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
2664        if old == new {
2665            return Ok(());
2666        }
2667        if self.by_name.contains_key(new) {
2668            return Err(StorageError::Corrupt(format!(
2669                "rename_table: target name {new:?} already exists"
2670            )));
2671        }
2672        let idx = self
2673            .by_name
2674            .remove(old)
2675            .ok_or_else(|| StorageError::TableNotFound { name: old.into() })?;
2676        self.tables[idx].schema.name = new.to_string();
2677        self.by_name.insert(new.to_string(), idx);
2678        for t in &mut self.tables {
2679            for fk in &mut t.schema.foreign_keys {
2680                if fk.parent_table == old {
2681                    fk.parent_table = new.to_string();
2682                }
2683            }
2684        }
2685        for trig in &mut self.triggers {
2686            if trig.table == old {
2687                trig.table = new.to_string();
2688            }
2689        }
2690        Ok(())
2691    }
2692
2693    /// v7.16.2 — rename an index by name. Walks every table
2694    /// since the index lives on its owning table; updates the
2695    /// name in place. Errors with `IndexNotFound` when no
2696    /// index matches. mailrs round-10 A.5.
2697    pub fn rename_index(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
2698        if old == new {
2699            return Ok(());
2700        }
2701        // Reject the new name if it already exists anywhere.
2702        for t in &self.tables {
2703            if t.indices.iter().any(|i| i.name == new) {
2704                return Err(StorageError::Corrupt(format!(
2705                    "rename_index: target name {new:?} already exists"
2706                )));
2707            }
2708        }
2709        for t in &mut self.tables {
2710            for i in &mut t.indices {
2711                if i.name == old {
2712                    i.name = new.to_string();
2713                    return Ok(());
2714                }
2715            }
2716        }
2717        Err(StorageError::IndexNotFound { name: old.into() })
2718    }
2719
2720    /// v7.14.0 — remove a named index across the catalog.
2721    /// Returns `true` when found + dropped.
2722    pub fn drop_named_index(&mut self, name: &str) -> bool {
2723        for t in &mut self.tables {
2724            let before = t.indices.len();
2725            t.indices.retain(|i| i.name != name);
2726            if t.indices.len() != before {
2727                return true;
2728            }
2729        }
2730        false
2731    }
2732
2733    /// Borrow-free copy of every table's name in catalog order
2734    /// (= insertion order, matching the on-disk encoding).
2735    pub fn table_names(&self) -> Vec<String> {
2736        self.tables.iter().map(|t| t.schema.name.clone()).collect()
2737    }
2738
2739    /// v5.1: register a cold-tier segment that already lives in
2740    /// memory (caller did the file read). Returns the
2741    /// `segment_id` that `RowLocator::Cold { segment_id, .. }`
2742    /// will reference — currently this is just the index into
2743    /// `cold_segments`, but treat it as an opaque token.
2744    ///
2745    /// Storage is `no_std`, so file I/O is the caller's
2746    /// responsibility — `spg-server` reads the file and forwards
2747    /// the bytes here. The bytes stay resident in the catalog
2748    /// for the life of the `Catalog`, parsed only once.
2749    pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2750        let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2751            StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2752        })?;
2753        let seg = OwnedSegment::from_bytes(bytes)
2754            .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2755        self.cold_segments.push(Some(Arc::new(seg)));
2756        Ok(id)
2757    }
2758
2759    /// v6.7.3 — register a cold-tier segment at a specific id. Used
2760    /// by the spg-server manifest-boot path so segments whose
2761    /// neighbouring ids were retired by compaction still get back
2762    /// the same `segment_id` they had pre-restart (the
2763    /// `RowLocator::Cold { segment_id }` baked into the BTree-index
2764    /// snapshot persists across restart and must continue to
2765    /// resolve).
2766    ///
2767    /// Pads the Vec with `None` slots up to `target_id` if needed.
2768    /// Errors when the target slot is already occupied (would
2769    /// stomp another segment), the parse fails, or `target_id`
2770    /// exceeds `u32::MAX`.
2771    pub fn load_segment_bytes_at(
2772        &mut self,
2773        target_id: u32,
2774        bytes: Vec<u8>,
2775    ) -> Result<(), StorageError> {
2776        let seg = OwnedSegment::from_bytes(bytes)
2777            .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2778        let idx = target_id as usize;
2779        while self.cold_segments.len() <= idx {
2780            self.cold_segments.push(None);
2781        }
2782        if self.cold_segments[idx].is_some() {
2783            return Err(StorageError::Corrupt(format!(
2784                "load_segment_bytes_at: segment_id {target_id} already occupied"
2785            )));
2786        }
2787        self.cold_segments[idx] = Some(Arc::new(seg));
2788        Ok(())
2789    }
2790
2791    /// v6.7.3 — retire a cold-tier segment slot (compaction-driven).
2792    /// The physical file is the caller's concern (typically kept
2793    /// on disk until the next CHECKPOINT writes a manifest that
2794    /// no longer lists it); this just flips the in-memory slot
2795    /// to `None` so later cold lookups for `segment_id` resolve
2796    /// as "unknown" instead of returning a stale row.
2797    ///
2798    /// No-op when the slot is already `None`. Errors only when
2799    /// `segment_id` is out of bounds.
2800    pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2801        let idx = segment_id as usize;
2802        if idx >= self.cold_segments.len() {
2803            return Err(StorageError::Corrupt(format!(
2804                "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2805                self.cold_segments.len()
2806            )));
2807        }
2808        self.cold_segments[idx] = None;
2809        Ok(())
2810    }
2811
2812    /// Number of *active* (non-tombstoned) cold segments.
2813    #[must_use]
2814    pub fn cold_segment_count(&self) -> usize {
2815        self.cold_segments.iter().filter(|s| s.is_some()).count()
2816    }
2817
2818    /// Slot count including tombstones (= the next id the
2819    /// no-arg `load_segment_bytes` would allocate).
2820    #[must_use]
2821    pub fn cold_segment_slot_count(&self) -> usize {
2822        self.cold_segments.len()
2823    }
2824
2825    /// v6.2.7 — list every *active* cold-tier segment id known to
2826    /// this catalog (skips compaction tombstones since v6.7.3).
2827    /// Used by EXPLAIN ANALYZE to annotate scan nodes with the
2828    /// segments they could have walked.
2829    #[must_use]
2830    pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2831        self.cold_segments
2832            .iter()
2833            .enumerate()
2834            .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2835            .collect()
2836    }
2837
2838    /// v5.2.1: sum of `Table::hot_bytes` across every table. The v5.2
2839    /// freezer compares this against `SPG_HOT_TIER_BYTES` (parsed at
2840    /// server startup; default 4 GiB) and wakes when the budget is
2841    /// crossed. Pre-freezer (v5.2.1) this is measurement-only — the
2842    /// counter exposes whether the budget is being approached without
2843    /// triggering any demotion.
2844    #[must_use]
2845    pub fn hot_tier_bytes(&self) -> u64 {
2846        self.tables
2847            .iter()
2848            .map(Table::hot_bytes)
2849            .fold(0u64, u64::saturating_add)
2850    }
2851
2852    /// v5.2.2: freeze the **first** `max_rows` rows of `table_name`'s
2853    /// hot tier into a brand-new cold-tier segment. The named `BTree`
2854    /// index supplies the per-row PK (its column must be an integer
2855    /// type — v5.2.2 only supports `IndexKey::Int` PKs, matching the
2856    /// `index_key_as_u64` constraint used by the cold-tier lookup
2857    /// path). On success returns a [`FreezeReport`] with the
2858    /// freshly-allocated segment id, the count of rows that moved,
2859    /// the encoded segment bytes (so the caller can persist them to
2860    /// disk for later reload via `SPG_PRELOAD_COLD_SEGMENT`), and the
2861    /// hot-tier byte delta that was reclaimed.
2862    ///
2863    /// **Semantics**:
2864    /// 1. The first `max_rows` rows (by hot-tier position — same as
2865    ///    insertion order under v4.39 `PersistentVec`) are read.
2866    /// 2. Rows are sorted ascending by PK and serialised into a new
2867    ///    segment via [`encode_segment`].
2868    /// 3. The hot rows are dropped via [`Table::delete_rows`]; the
2869    ///    `rebuild_indices` it triggers regenerates `Hot` locators
2870    ///    for every remaining row (their positions shift down by
2871    ///    `max_rows`). Existing `Cold` locators in this index — from
2872    ///    a previous freeze — are also rebuilt **but with empty
2873    ///    payload** since rebuild reads only `self.rows`; this
2874    ///    routine re-registers them at the end of the call so the
2875    ///    user-visible state preserves all prior cold locators.
2876    /// 4. The new segment is loaded into `self.cold_segments` via
2877    ///    [`Catalog::load_segment_bytes`] (allocating a fresh
2878    ///    `segment_id`). New `Cold` locators are registered on the
2879    ///    named index — one per frozen row.
2880    ///
2881    /// **v5.2.2 limits** (relaxed in later sub-versions):
2882    /// - INSERT-only flow: subsequent UPDATE/DELETE on a frozen row
2883    ///   returns a stale-locator error (no promote-on-write until
2884    ///   v5.2.3).
2885    /// - Single-table scope: callers iterate tables themselves.
2886    /// - All-or-nothing: returns `Err` and leaves catalog unchanged
2887    ///   if any step fails before the atomic swap point.
2888    ///
2889    /// Errors:
2890    /// - [`StorageError::Corrupt`] for missing table/index, non-`BTree`
2891    ///   index, non-integer PK column, `max_rows == 0`, or
2892    ///   `max_rows > row_count`.
2893    /// - The encoder's [`SegmentError`] surfaces as `Corrupt` (the
2894    ///   only realistic source is "a single row is larger than the
2895    ///   page size"; SPG schemas don't hit it in practice).
2896    pub fn freeze_oldest_to_cold(
2897        &mut self,
2898        table_name: &str,
2899        index_name: &str,
2900        max_rows: usize,
2901    ) -> Result<FreezeReport, StorageError> {
2902        // --- validation phase: never mutates ---------------------
2903        if max_rows == 0 {
2904            return Err(StorageError::Corrupt(
2905                "freeze_oldest_to_cold: max_rows must be > 0".into(),
2906            ));
2907        }
2908        let table = self.get(table_name).ok_or_else(|| {
2909            StorageError::Corrupt(format!(
2910                "freeze_oldest_to_cold: table {table_name:?} not found"
2911            ))
2912        })?;
2913        if max_rows > table.rows.len() {
2914            return Err(StorageError::Corrupt(format!(
2915                "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2916                table.rows.len()
2917            )));
2918        }
2919        let idx = table
2920            .indices
2921            .iter()
2922            .find(|i| i.name == index_name)
2923            .ok_or_else(|| {
2924                StorageError::Corrupt(format!(
2925                    "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2926                ))
2927            })?;
2928        if !matches!(idx.kind, IndexKind::BTree(_)) {
2929            return Err(StorageError::Corrupt(format!(
2930                "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2931            )));
2932        }
2933        let column_position = idx.column_position;
2934
2935        // --- segment build phase: reads only --------------------
2936        let schema = table.schema.clone();
2937        let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2938        for row_idx in 0..max_rows {
2939            let row = table.rows.get(row_idx).expect("bounds-checked above");
2940            let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2941                StorageError::Corrupt(format!(
2942                    "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2943                ))
2944            })?;
2945            let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2946                StorageError::Corrupt(format!(
2947                    "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2948                     v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2949                ))
2950            })?;
2951            to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2952        }
2953        // encode_segment requires ascending u64 keys. Sort by PK
2954        // before encoding; the caller's row-position order is not
2955        // necessarily PK order (e.g. workloads that insert random
2956        // PKs).
2957        to_freeze.sort_by_key(|(k, _, _)| *k);
2958        // Reject duplicate PKs — encode_segment also rejects them
2959        // (`SegmentError::UnsortedKey`), but the resulting error
2960        // message there is misleading. Surface a clearer one.
2961        for w in to_freeze.windows(2) {
2962            if w[0].0 == w[1].0 {
2963                return Err(StorageError::Corrupt(format!(
2964                    "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2965                    w[0].0
2966                )));
2967            }
2968        }
2969        // Snapshot the (key, locator) pairs that will be registered
2970        // post-swap. Cloning the IndexKey out before the move makes
2971        // the registration loop borrow-free.
2972        let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2973        // Segment encode is now infallible w.r.t. ordering. Map the
2974        // `SegmentError` into a `StorageError::Corrupt` so the
2975        // public surface stays one error type.
2976        let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2977            .into_iter()
2978            .map(|(k, body, _)| (k, body))
2979            .collect();
2980        let frozen_rows = seg_rows.len();
2981        let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2982            .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2983
2984        // --- atomic swap phase: mutations only past this point ---
2985        // v5.2.3 made `Table::rebuild_indices` preserve every Cold
2986        // locator across the per-table rebuild, so `delete_rows`
2987        // below no longer wipes prior-freeze cold entries. The pre-
2988        // v5.2.3 capture-then-re-register that used to live here
2989        // was removed in v5.3.1 — keeping it would double-count
2990        // every prior-frozen key's Cold locator on each subsequent
2991        // freeze.
2992        let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2993        let positions: Vec<usize> = (0..max_rows).collect();
2994        let t_mut = self
2995            .get_mut(table_name)
2996            .expect("just validated; still present");
2997        let removed = t_mut.delete_rows(&positions);
2998        debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2999        let bytes_after = t_mut.hot_bytes();
3000        let bytes_freed = bytes_before.saturating_sub(bytes_after);
3001
3002        let segment_id = self
3003            .load_segment_bytes(seg_bytes.clone())
3004            .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3005        let new_cold = post_swap_keys.into_iter().map(|k| {
3006            (
3007                k,
3008                RowLocator::Cold {
3009                    segment_id,
3010                    page_offset: 0,
3011                },
3012            )
3013        });
3014        let t_mut = self.get_mut(table_name).expect("still present");
3015        t_mut.register_cold_locators(index_name, new_cold)?;
3016
3017        Ok(FreezeReport {
3018            segment_id,
3019            frozen_rows,
3020            bytes_freed,
3021            segment_bytes: seg_bytes,
3022        })
3023    }
3024
3025    /// v5.1: borrow the cold segment at `segment_id`. Used by the
3026    /// spg-server preload path to enumerate (key, locator) pairs
3027    /// after loading a segment, so it can call
3028    /// [`Table::register_cold_locators`] without re-parsing the
3029    /// bytes.
3030    #[must_use]
3031    pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3032        self.cold_segments
3033            .get(segment_id as usize)
3034            .and_then(|s| s.as_deref())
3035    }
3036
3037    /// v5.1: resolve a single `RowLocator::Cold` to its underlying
3038    /// `Row`. Decoupled from [`Catalog::lookup_by_pk`] so callers
3039    /// iterating a multi-locator slice (e.g. the engine's index
3040    /// seek path) can dispatch per locator instead of getting back
3041    /// only the first row for a key. Returns `None` when the
3042    /// segment isn't registered, the key isn't `u64`-coercible, or
3043    /// the segment doesn't actually carry the key (bloom or page-
3044    /// index reject).
3045    pub fn resolve_cold_locator(
3046        &self,
3047        table_name: &str,
3048        segment_id: u32,
3049        key: &IndexKey,
3050    ) -> Option<Row> {
3051        let t = self.get(table_name)?;
3052        let u64_key = index_key_as_u64(key)?;
3053        let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3054        let payload = seg.lookup(u64_key)?;
3055        let (row, _) = decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
3056        Some(row)
3057    }
3058
3059    /// v5.1: indexed PK lookup that dispatches per locator,
3060    /// returning the first matching row from either the hot tier
3061    /// (`Table::rows`) or a registered cold segment.
3062    ///
3063    /// The cold path requires the index column to be coercible to
3064    /// a `u64` (the segment's PK type) and the segment payload to
3065    /// be a [`encode_row_body_dense`]-encoded row body for the
3066    /// same schema. v5.1 ships this for BIGINT / INT / SMALLINT
3067    /// PKs; other types fall through to hot-only behavior.
3068    ///
3069    /// Returns `None` if (a) the table or index doesn't exist,
3070    /// (b) the key isn't in the index at all, or (c) the key was
3071    /// resolved to a stale locator (Hot index out of range, Cold
3072    /// segment id unknown, segment lookup miss). Does not surface
3073    /// segment-decode errors — those would indicate corrupted
3074    /// cold-tier files and should be caught at
3075    /// [`Catalog::load_segment_bytes`] time.
3076    pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3077        let t = self.get(table)?;
3078        let idx = t.indices.iter().find(|i| i.name == index_name)?;
3079        let locators = idx.lookup_eq(key);
3080        let cold_u64_key = index_key_as_u64(key);
3081        for loc in locators {
3082            match *loc {
3083                RowLocator::Hot(i) => {
3084                    if let Some(row) = t.rows.get(i) {
3085                        return Some(row.clone());
3086                    }
3087                }
3088                RowLocator::Cold {
3089                    segment_id,
3090                    page_offset: _,
3091                } => {
3092                    let Some(u64_key) = cold_u64_key else {
3093                        // Key type not coercible to u64 — cold tier
3094                        // only handles BIGINT/INT/SMALLINT in v5.1.
3095                        continue;
3096                    };
3097                    let Some(seg) = self
3098                        .cold_segments
3099                        .get(segment_id as usize)
3100                        .and_then(|s| s.as_deref())
3101                    else {
3102                        // v6.7.3 — `None` slot = compaction
3103                        // retired this segment; the live locator
3104                        // on a freshly-compacted index points to
3105                        // the merged segment_id, so a Cold hit
3106                        // here against a tombstone means the BTree
3107                        // entry hasn't been swapped yet (mid-
3108                        // compaction reader race) or the caller is
3109                        // looking up a stale snapshot. Skip — the
3110                        // next locator in the list, if any, is
3111                        // typically the merged segment.
3112                        continue;
3113                    };
3114                    let Some(payload) = seg.lookup(u64_key) else {
3115                        continue;
3116                    };
3117                    let (row, _) =
3118                        decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
3119                    return Some(row);
3120                }
3121            }
3122        }
3123        None
3124    }
3125
3126    /// v5.2.3: promote a frozen row back to the hot tier so an
3127    /// UPDATE / DELETE can mutate it. Reads the cold-tier row body
3128    /// (decoded from its registered segment), pushes it into
3129    /// `table.rows` via [`Table::insert`] (which also adds a fresh
3130    /// `Hot(new_idx)` locator on `index_name`), then retires the
3131    /// shadowed `Cold` locator via
3132    /// [`Table::remove_cold_locators_for_key`]. The cold-tier row
3133    /// in the segment file becomes garbage — recoverable when a
3134    /// future cold-segment compaction job lands.
3135    ///
3136    /// Returns:
3137    /// - `Ok(Some(new_hot_idx))` when the key resolved through a
3138    ///   cold locator and the promote completed. `new_hot_idx` is
3139    ///   the position the row now occupies in `table.rows`.
3140    /// - `Ok(None)` when the key has no Cold locator on the index
3141    ///   (already hot, or wasn't present at all). Callers treat this
3142    ///   as "nothing to do here, fall back to the hot-only path".
3143    ///
3144    /// Errors when the table / index doesn't exist, the index isn't
3145    /// `BTree`, the cold segment is missing / can't decode the row,
3146    /// or the inferred row body fails `Table::insert` validation.
3147    pub fn promote_cold_row(
3148        &mut self,
3149        table_name: &str,
3150        index_name: &str,
3151        key: &IndexKey,
3152    ) -> Result<Option<usize>, StorageError> {
3153        let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3154        let Some((segment_id, _page_offset)) = cold_loc else {
3155            return Ok(None);
3156        };
3157        let u64_key = index_key_as_u64(key).ok_or_else(|| {
3158            StorageError::Corrupt(
3159                "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3160                    .into(),
3161            )
3162        })?;
3163        // Read the row body from the segment. Borrow the segment +
3164        // schema short-term so we can then take `&mut self` for the
3165        // hot-side insert.
3166        let schema = self
3167            .get(table_name)
3168            .ok_or_else(|| {
3169                StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3170            })?
3171            .schema
3172            .clone();
3173        let seg = self
3174            .cold_segments
3175            .get(segment_id as usize)
3176            .and_then(|s| s.as_ref())
3177            .ok_or_else(|| {
3178                StorageError::Corrupt(format!(
3179                    "promote_cold_row: segment {segment_id} not registered on catalog"
3180                ))
3181            })?;
3182        let payload = seg.lookup(u64_key).ok_or_else(|| {
3183            StorageError::Corrupt(format!(
3184                "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3185                 but the segment's bloom/page lookup didn't return a row"
3186            ))
3187        })?;
3188        let (row, _consumed) = decode_row_body_dense(&payload, &schema, seg.codec_version())?;
3189        // Insert the promoted row into the hot tier. `Table::insert`
3190        // appends to `self.rows`, adds a `Hot(new_idx)` locator to
3191        // every BTree index covering the row's keyed columns, and
3192        // increments `hot_bytes`.
3193        let t = self
3194            .get_mut(table_name)
3195            .expect("table existed at lookup time");
3196        t.insert(row)?;
3197        let new_hot_idx =
3198            t.rows.len().checked_sub(1).ok_or_else(|| {
3199                StorageError::Corrupt("promote_cold_row: empty after insert".into())
3200            })?;
3201        // The hot insert added Hot(new_idx) alongside the still-
3202        // present Cold locator. Drop the Cold entry so future
3203        // lookups return only the fresh hot row.
3204        t.remove_cold_locators_for_key(index_name, key)?;
3205        Ok(Some(new_hot_idx))
3206    }
3207
3208    /// v5.2.3: shadow a frozen row's index entry. Used by DELETE
3209    /// when the row to remove lives in a cold-tier segment — the
3210    /// row body stays in the segment file (becoming garbage) but
3211    /// every `Cold` locator for `key` on `index_name` is removed
3212    /// so PK lookups stop returning it.
3213    ///
3214    /// Returns the number of cold locators retired (0 when the key
3215    /// has no cold entries — the DELETE fell on a hot row or a
3216    /// key that was already absent). Errors when the table /
3217    /// index doesn't exist or the index isn't `BTree`.
3218    ///
3219    /// Cold-segment compaction (which merges shadowed-heavy
3220    /// segments and reclaims their disk footprint) lands in a
3221    /// later v5.x sub-version; until then, repeated UPDATE/DELETE
3222    /// of cold rows can amplify cold-segment disk usage by up to
3223    /// 1-2× — still well under typical LSM-tree shadowing because
3224    /// SPG segments are bulk-baked, not write-merged.
3225    pub fn shadow_cold_row(
3226        &mut self,
3227        table_name: &str,
3228        index_name: &str,
3229        key: &IndexKey,
3230    ) -> Result<usize, StorageError> {
3231        let t = self.get_mut(table_name).ok_or_else(|| {
3232            StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3233        })?;
3234        t.remove_cold_locators_for_key(index_name, key)
3235    }
3236
3237    /// v6.7.4 — read-only slice preparation for the parallel
3238    /// freezer. Walks rows in `row_range`, builds the
3239    /// `(pk_u64, encoded_body, IndexKey)` triples that the
3240    /// coordinator's k-way merge consumes, sorts the slice by
3241    /// `pk_u64`, and returns a [`FreezeSlice`].
3242    ///
3243    /// Caller invariants:
3244    /// - `row_range.end <= table.rows.len()` (caller's job to
3245    ///   compute the partition).
3246    /// - All slices passed to `commit_freeze_slices` must cover a
3247    ///   contiguous half-open range `[0, total_max_rows)` with no
3248    ///   gaps and no overlaps. The coordinator validates this
3249    ///   invariant before committing.
3250    ///
3251    /// `&self`-only — multiple workers can run this concurrently
3252    /// against the same `Catalog` reference under the engine's
3253    /// write lock (workers don't mutate; the coordinator does).
3254    pub fn prepare_freeze_slice(
3255        &self,
3256        table_name: &str,
3257        index_name: &str,
3258        row_range: core::ops::Range<usize>,
3259    ) -> Result<FreezeSlice, StorageError> {
3260        let table = self.get(table_name).ok_or_else(|| {
3261            StorageError::Corrupt(format!(
3262                "prepare_freeze_slice: table {table_name:?} not found"
3263            ))
3264        })?;
3265        let idx = table
3266            .indices
3267            .iter()
3268            .find(|i| i.name == index_name)
3269            .ok_or_else(|| {
3270                StorageError::Corrupt(format!(
3271                    "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3272                ))
3273            })?;
3274        if !matches!(idx.kind, IndexKind::BTree(_)) {
3275            return Err(StorageError::Corrupt(format!(
3276                "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3277            )));
3278        }
3279        if row_range.end > table.rows.len() {
3280            return Err(StorageError::Corrupt(format!(
3281                "prepare_freeze_slice: row_range end {} > row_count {}",
3282                row_range.end,
3283                table.rows.len()
3284            )));
3285        }
3286        let column_position = idx.column_position;
3287        let schema = table.schema.clone();
3288        let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3289        for row_idx in row_range.clone() {
3290            let row = table.rows.get(row_idx).expect("bounds-checked above");
3291            let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3292                StorageError::Corrupt(format!(
3293                    "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3294                ))
3295            })?;
3296            let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3297                StorageError::Corrupt(format!(
3298                    "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3299                     v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3300                ))
3301            })?;
3302            rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3303        }
3304        rows.sort_by_key(|(k, _, _)| *k);
3305        Ok(FreezeSlice { row_range, rows })
3306    }
3307
3308    /// v6.7.4 — coordinator commit step. Merges N
3309    /// [`FreezeSlice`]s into one segment via the standard
3310    /// [`encode_segment`] path, atomically swaps the catalog
3311    /// state (delete the union row range + register Cold
3312    /// locators + load the segment).
3313    ///
3314    /// Validates that the slices cover a contiguous, gap-free,
3315    /// overlap-free half-open range starting at index 0 (the
3316    /// freezer always freezes "oldest first" — same semantics as
3317    /// the single-threaded [`Catalog::freeze_oldest_to_cold`]).
3318    ///
3319    /// Empty `slices` → no-op success (returns a zero-row report
3320    /// without mutating). Total row count = `Σ slice.rows.len()`.
3321    pub fn commit_freeze_slices(
3322        &mut self,
3323        table_name: &str,
3324        index_name: &str,
3325        slices: Vec<FreezeSlice>,
3326    ) -> Result<FreezeReport, StorageError> {
3327        // --- validation phase: never mutates ---------------------
3328        let table = self.get(table_name).ok_or_else(|| {
3329            StorageError::Corrupt(format!(
3330                "commit_freeze_slices: table {table_name:?} not found"
3331            ))
3332        })?;
3333        let idx = table
3334            .indices
3335            .iter()
3336            .find(|i| i.name == index_name)
3337            .ok_or_else(|| {
3338                StorageError::Corrupt(format!(
3339                    "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3340                ))
3341            })?;
3342        if !matches!(idx.kind, IndexKind::BTree(_)) {
3343            return Err(StorageError::Corrupt(format!(
3344                "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3345            )));
3346        }
3347        // Validate slice coverage: contiguous from 0, no gaps, no
3348        // overlaps. Allow the caller to pass slices in any order —
3349        // sort by row_range.start first.
3350        let mut ordered = slices;
3351        ordered.sort_by_key(|s| s.row_range.start);
3352        // Drop fully-empty slices that fell out of an uneven
3353        // partition; they carry no data but contribute to the
3354        // contiguity check, so keep them in line.
3355        let mut expected_start = 0usize;
3356        for s in &ordered {
3357            if s.row_range.start != expected_start {
3358                return Err(StorageError::Corrupt(format!(
3359                    "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3360                    s.row_range.start, expected_start
3361                )));
3362            }
3363            expected_start = s.row_range.end;
3364        }
3365        let max_rows = expected_start;
3366        if max_rows > table.rows.len() {
3367            return Err(StorageError::Corrupt(format!(
3368                "commit_freeze_slices: total row range {} exceeds row_count {}",
3369                max_rows,
3370                table.rows.len()
3371            )));
3372        }
3373        if max_rows == 0 {
3374            return Ok(FreezeReport {
3375                segment_id: u32::MAX,
3376                frozen_rows: 0,
3377                bytes_freed: 0,
3378                segment_bytes: Vec::new(),
3379            });
3380        }
3381
3382        // --- segment build phase: reads only --------------------
3383        // K-way merge of already-sorted slices. Each slice's rows
3384        // are ascending by pk_u64; we keep a per-slice cursor and
3385        // pull the next-smallest head until every cursor drains.
3386        let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3387        if total_rows != max_rows {
3388            return Err(StorageError::Corrupt(format!(
3389                "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3390            )));
3391        }
3392        let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3393        let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3394        loop {
3395            // Pick the slice whose head row has the smallest key
3396            // and isn't yet exhausted.
3397            let mut pick: Option<usize> = None;
3398            for (i, c) in cursors.iter().enumerate() {
3399                let slice = &ordered[i];
3400                if *c >= slice.rows.len() {
3401                    continue;
3402                }
3403                match pick {
3404                    None => pick = Some(i),
3405                    Some(j) => {
3406                        if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3407                            pick = Some(i);
3408                        }
3409                    }
3410                }
3411            }
3412            let Some(i) = pick else { break };
3413            let row = ordered[i].rows[cursors[i]].clone();
3414            cursors[i] += 1;
3415            merged.push(row);
3416        }
3417        // Reject duplicate PKs — same error as the single-threaded
3418        // path so callers get a uniform surface.
3419        for w in merged.windows(2) {
3420            if w[0].0 == w[1].0 {
3421                return Err(StorageError::Corrupt(format!(
3422                    "commit_freeze_slices: duplicate PK {} across slices",
3423                    w[0].0
3424                )));
3425            }
3426        }
3427        let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3428        let seg_rows: Vec<(u64, Vec<u8>)> =
3429            merged.into_iter().map(|(k, body, _)| (k, body)).collect();
3430        let frozen_rows = seg_rows.len();
3431        let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3432            .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
3433
3434        // --- atomic swap phase: mutations only past this point ---
3435        let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3436        let positions: Vec<usize> = (0..max_rows).collect();
3437        let t_mut = self
3438            .get_mut(table_name)
3439            .expect("just validated; still present");
3440        let removed = t_mut.delete_rows(&positions);
3441        debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3442        let bytes_after = t_mut.hot_bytes();
3443        let bytes_freed = bytes_before.saturating_sub(bytes_after);
3444
3445        let segment_id = self
3446            .load_segment_bytes(seg_bytes.clone())
3447            .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3448        let new_cold = post_swap_keys.into_iter().map(|k| {
3449            (
3450                k,
3451                RowLocator::Cold {
3452                    segment_id,
3453                    page_offset: 0,
3454                },
3455            )
3456        });
3457        let t_mut = self.get_mut(table_name).expect("still present");
3458        t_mut.register_cold_locators(index_name, new_cold)?;
3459
3460        Ok(FreezeReport {
3461            segment_id,
3462            frozen_rows,
3463            bytes_freed,
3464            segment_bytes: seg_bytes,
3465        })
3466    }
3467
3468    /// v6.7.3 — compact every cold segment on `(table, index)` whose
3469    /// `OwnedSegment::bytes().len()` is below `target_segment_bytes`
3470    /// into a single larger merged segment. Rows present in source
3471    /// segment payloads but no longer referenced by any
3472    /// `RowLocator::Cold` on the index (DELETE'd + frozen rows
3473    /// retired via [`Catalog::shadow_cold_row`]) are GC'd in the
3474    /// merge.
3475    ///
3476    /// **Semantics**:
3477    /// 1. Walk the BTree index to collect every Cold locator that
3478    ///    targets a small (< threshold) segment. Each such
3479    ///    `(key, segment_id)` becomes a row in the merged segment;
3480    ///    payload is looked up from the source segment in-place.
3481    /// 2. Encode the collected rows into one new segment via
3482    ///    [`encode_segment`]; register it via
3483    ///    [`Catalog::load_segment_bytes`] (allocating a fresh
3484    ///    `merged_segment_id` at the end of `cold_segments`).
3485    /// 3. Rewrite the BTree index in one pass: every
3486    ///    `RowLocator::Cold { segment_id ∈ sources }` becomes
3487    ///    `RowLocator::Cold { segment_id = merged_id, page_offset = 0 }`.
3488    ///    Hot locators are untouched.
3489    /// 4. Tombstone every source slot via
3490    ///    [`Catalog::tombstone_segment`]. Source segment payloads
3491    ///    are no longer reachable through the catalog; the on-disk
3492    ///    files are the caller's concern.
3493    ///
3494    /// On fewer than 2 candidate segments the catalog is **not**
3495    /// mutated and a no-op report (`merged_segment_id: None`,
3496    /// `sources: []`) is returned. This is the routine case — a
3497    /// freshly-frozen table has at most 1 small segment, no merge
3498    /// possible.
3499    ///
3500    /// Atomicity: every mutating step runs after the read-only
3501    /// gather phase, so a panic before the merge encode leaves the
3502    /// catalog unchanged. The mutation block itself (load + rewrite +
3503    /// tombstone) takes only `&mut self` — callers serialise the
3504    /// engine write lock outside this function.
3505    ///
3506    /// Errors when the table / index doesn't exist, the index isn't
3507    /// `BTree`, the index column type isn't u64-coercible (cold-tier
3508    /// pre-condition), or a source segment fails its in-place
3509    /// row-body lookup (would indicate prior catalog corruption).
3510    pub fn compact_cold_segments(
3511        &mut self,
3512        table_name: &str,
3513        index_name: &str,
3514        target_segment_bytes: u64,
3515    ) -> Result<CompactReport, StorageError> {
3516        // --- validation phase ----------------------------------
3517        let t = self.get(table_name).ok_or_else(|| {
3518            StorageError::Corrupt(format!(
3519                "compact_cold_segments: table {table_name:?} not found"
3520            ))
3521        })?;
3522        let idx = t
3523            .indices
3524            .iter()
3525            .find(|i| i.name == index_name)
3526            .ok_or_else(|| {
3527                StorageError::Corrupt(format!(
3528                    "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3529                ))
3530            })?;
3531        let map = match &idx.kind {
3532            IndexKind::BTree(m) => m,
3533            IndexKind::Nsw(_)
3534            | IndexKind::Brin { .. }
3535            | IndexKind::Gin(_)
3536            | IndexKind::GinTrgm(_)
3537            | IndexKind::GinFulltext(_) => {
3538                return Err(StorageError::Corrupt(format!(
3539                    "compact_cold_segments: index {index_name:?} is not BTree; \
3540                     compaction applies only to BTree cold-tier indices"
3541                )));
3542            }
3543        };
3544
3545        // --- gather phase --------------------------------------
3546        // Step A: every segment_id this BTree index Cold-references.
3547        let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3548        for (_key, locators) in map.iter() {
3549            for loc in locators {
3550                if let RowLocator::Cold { segment_id, .. } = loc {
3551                    referenced_ids.insert(*segment_id);
3552                }
3553            }
3554        }
3555        // Step B: keep only the small + still-active ones.
3556        let candidate_set: BTreeSet<u32> = referenced_ids
3557            .into_iter()
3558            .filter(|id| {
3559                self.cold_segments
3560                    .get(*id as usize)
3561                    .and_then(|s| s.as_deref())
3562                    .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3563            })
3564            .collect();
3565        if candidate_set.len() < 2 {
3566            return Ok(CompactReport {
3567                sources: Vec::new(),
3568                merged_segment_id: None,
3569                merged_segment_bytes: Vec::new(),
3570                merged_rows: 0,
3571                deleted_rows_pruned: 0,
3572                bytes_reclaimed_estimate: 0,
3573            });
3574        }
3575        // Step C: pre-count source rows for the deleted-pruned metric.
3576        let mut source_row_count: usize = 0;
3577        let mut source_byte_total: u64 = 0;
3578        for &id in &candidate_set {
3579            let seg = self.cold_segments[id as usize]
3580                .as_ref()
3581                .expect("candidate selected only when slot is Some");
3582            source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3583            source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
3584        }
3585        // Step D: collect (key, body) pairs from every live Cold
3586        // locator pointing at a candidate. dedupe by key — one
3587        // BTree key resolves to at most one cold payload (the
3588        // freezer + promote/shadow flow keeps Cold locators
3589        // unique per key).
3590        let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3591        for (key, locators) in map.iter() {
3592            for loc in locators {
3593                let RowLocator::Cold { segment_id, .. } = loc else {
3594                    continue;
3595                };
3596                if !candidate_set.contains(segment_id) {
3597                    continue;
3598                }
3599                let u64_key = index_key_as_u64(key).ok_or_else(|| {
3600                    StorageError::Corrupt(format!(
3601                        "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3602                         cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3603                    ))
3604                })?;
3605                let seg = self.cold_segments[*segment_id as usize]
3606                    .as_ref()
3607                    .expect("candidate slot guaranteed Some above");
3608                let payload = seg.lookup(u64_key).ok_or_else(|| {
3609                    StorageError::Corrupt(format!(
3610                        "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3611                         at segment {segment_id} but the segment lookup missed"
3612                    ))
3613                })?;
3614                collected.insert(u64_key, (payload, key.clone()));
3615                break;
3616            }
3617        }
3618        let merged_rows = collected.len();
3619        let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3620
3621        // Step E: encode the merged segment. `BTreeMap<u64, _>`
3622        // iteration is ascending by key, which is what
3623        // `encode_segment` requires.
3624        let seg_rows: Vec<(u64, Vec<u8>)> = collected
3625            .iter()
3626            .map(|(k, (body, _))| (*k, body.clone()))
3627            .collect();
3628        let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3629            .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
3630        let merged_bytes_len = seg_bytes.len() as u64;
3631
3632        // --- atomic mutation phase ------------------------------
3633        let merged_segment_id = self
3634            .load_segment_bytes(seg_bytes.clone())
3635            .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3636
3637        // Rewrite the BTree index: every Cold locator pointing at
3638        // a candidate source becomes a Cold locator pointing at
3639        // the merged segment. Use a flat collect-then-replace
3640        // pattern so we never hold a `&self` borrow across the
3641        // `&mut self` write.
3642        let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3643            let t = self
3644                .get(table_name)
3645                .expect("table existed at the start of this fn");
3646            let idx = t
3647                .indices
3648                .iter()
3649                .find(|i| i.name == index_name)
3650                .expect("index existed at the start of this fn");
3651            let IndexKind::BTree(map) = &idx.kind else {
3652                unreachable!("validated above");
3653            };
3654            map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3655        };
3656        let t_mut = self
3657            .get_mut(table_name)
3658            .expect("table existed at the start of this fn");
3659        let idx_mut = t_mut
3660            .indices
3661            .iter_mut()
3662            .find(|i| i.name == index_name)
3663            .expect("index existed at the start of this fn");
3664        let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3665            unreachable!("validated above");
3666        };
3667        for (key, locators) in entries {
3668            let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3669            let mut changed = false;
3670            for loc in &locators {
3671                match *loc {
3672                    RowLocator::Cold {
3673                        segment_id,
3674                        page_offset: _,
3675                    } if candidate_set.contains(&segment_id) => {
3676                        let replacement = RowLocator::Cold {
3677                            segment_id: merged_segment_id,
3678                            page_offset: 0,
3679                        };
3680                        if !new_locs.contains(&replacement) {
3681                            new_locs.push(replacement);
3682                        }
3683                        changed = true;
3684                    }
3685                    other => new_locs.push(other),
3686                }
3687            }
3688            if changed {
3689                map_mut.insert_mut(key, new_locs);
3690            }
3691        }
3692
3693        // Tombstone every source slot. Last step — failures here
3694        // would leave the segment double-referenced in both
3695        // memory + manifest, but `tombstone_segment` only errors
3696        // on out-of-bounds, which we've already validated.
3697        for &id in &candidate_set {
3698            self.tombstone_segment(id)?;
3699        }
3700
3701        let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3702        Ok(CompactReport {
3703            sources: candidate_set.into_iter().collect(),
3704            merged_segment_id: Some(merged_segment_id),
3705            merged_segment_bytes: seg_bytes,
3706            merged_rows,
3707            deleted_rows_pruned,
3708            bytes_reclaimed_estimate,
3709        })
3710    }
3711
3712    /// Internal helper: scan `(table, index)` for a `Cold` locator
3713    /// keyed by `key`. Returns `Ok(Some((segment_id, page_offset)))`
3714    /// when found, `Ok(None)` when the key has only hot entries
3715    /// or no entries at all, `Err` on the same input-validation
3716    /// errors as the public `promote_cold_row` / `shadow_cold_row`.
3717    fn find_cold_locator(
3718        &self,
3719        table_name: &str,
3720        index_name: &str,
3721        key: &IndexKey,
3722    ) -> Result<Option<(u32, u32)>, StorageError> {
3723        let t = self.get(table_name).ok_or_else(|| {
3724            StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3725        })?;
3726        let idx = t
3727            .indices
3728            .iter()
3729            .find(|i| i.name == index_name)
3730            .ok_or_else(|| {
3731                StorageError::Corrupt(format!(
3732                    "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3733                ))
3734            })?;
3735        if !matches!(idx.kind, IndexKind::BTree(_)) {
3736            return Err(StorageError::Corrupt(format!(
3737                "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3738            )));
3739        }
3740        for loc in idx.lookup_eq(key) {
3741            if let RowLocator::Cold {
3742                segment_id,
3743                page_offset,
3744            } = *loc
3745            {
3746                return Ok(Some((segment_id, page_offset)));
3747            }
3748        }
3749        Ok(None)
3750    }
3751}
3752
3753/// Coerce an [`IndexKey`] to the `u64` that v5.1 cold-tier
3754/// segments use as their on-disk PK. Returns `None` for keys that
3755/// aren't representable as `u64` — Text PKs need a hash mapping
3756/// the segment writer baked in (deferred to v5.2+), Bool PKs are
3757/// almost never wide enough to be sharded into a cold tier.
3758fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3759    match key {
3760        // Reinterpret the i64 bit pattern as u64. Cold-tier segments
3761        // are sorted by this u64 view, so the chosen interpretation
3762        // only has to match between insert (bake_segment / freezer)
3763        // and lookup — using cast_unsigned keeps both sides honest
3764        // and silences clippy::cast_sign_loss.
3765        IndexKey::Int(n) => Some(n.cast_unsigned()),
3766        // Text / Bool / Uuid PKs aren't representable as u64 and so
3767        // can't participate in the u64-sorted cold-tier segment
3768        // PK layout. Same deferral story as Text — lookup falls
3769        // through the in-memory btree.
3770        IndexKey::Text(_) | IndexKey::Bool(_) | IndexKey::Uuid(_) => None,
3771    }
3772}
3773
3774#[derive(Debug, Clone, PartialEq, Eq)]
3775#[non_exhaustive]
3776pub enum StorageError {
3777    DuplicateTable {
3778        name: String,
3779    },
3780    TableNotFound {
3781        name: String,
3782    },
3783    ArityMismatch {
3784        expected: usize,
3785        actual: usize,
3786    },
3787    TypeMismatch {
3788        column: String,
3789        expected: DataType,
3790        actual: DataType,
3791        position: usize,
3792    },
3793    NullInNotNull {
3794        column: String,
3795    },
3796    /// Index with this name already exists on the table.
3797    DuplicateIndex {
3798        name: String,
3799    },
3800    /// Column referenced by an index doesn't exist on the table.
3801    ColumnNotFound {
3802        column: String,
3803    },
3804    /// On-disk format failed to parse — corrupted file, wrong magic, truncated
3805    /// payload, or unknown tag bytes.
3806    Corrupt(String),
3807    /// v6.0.4 — ALTER INDEX targeted an index name that doesn't
3808    /// exist on any table in this catalog.
3809    IndexNotFound {
3810        name: String,
3811    },
3812    /// v6.0.4 — operation requested isn't supported on this index
3813    /// kind / column type (e.g. ALTER INDEX REBUILD on a `BTree`
3814    /// index, or REBUILD WITH (encoding=…) on a non-vector column).
3815    Unsupported(String),
3816}
3817
3818impl fmt::Display for StorageError {
3819    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3820        match self {
3821            Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3822            Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3823            Self::ArityMismatch { expected, actual } => write!(
3824                f,
3825                "row arity mismatch: expected {expected} columns, got {actual}"
3826            ),
3827            Self::TypeMismatch {
3828                column,
3829                expected,
3830                actual,
3831                position,
3832            } => write!(
3833                f,
3834                "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3835            ),
3836            Self::NullInNotNull { column } => {
3837                write!(f, "NULL value in NOT NULL column {column:?}")
3838            }
3839            Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3840            Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3841            Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3842            Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3843            Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3844        }
3845    }
3846}
3847
3848impl ColumnSchema {
3849    pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3850        Self {
3851            name: name.into(),
3852            ty,
3853            nullable,
3854            default: None,
3855            runtime_default: None,
3856            auto_increment: false,
3857            user_enum_type: None,
3858            user_domain_type: None,
3859            on_update_runtime: None,
3860            collation: Collation::Binary,
3861            is_unsigned: false,
3862            inline_enum_variants: None,
3863            inline_set_variants: None,
3864        }
3865    }
3866
3867    /// Builder-style helper to attach a default value to an otherwise
3868    /// plain column schema. Used by the engine when CREATE TABLE
3869    /// specifies `column TYPE DEFAULT <expr>`.
3870    #[must_use]
3871    pub fn with_default(mut self, default: Value) -> Self {
3872        self.default = Some(default);
3873        self
3874    }
3875
3876    /// v7.9.21 — builder for runtime-evaluated defaults
3877    /// (`DEFAULT now()`, `DEFAULT CURRENT_TIMESTAMP`, …).
3878    /// `expr` is the Expr's `Display` form, re-parsed by the
3879    /// engine at each INSERT.
3880    #[must_use]
3881    pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3882        self.runtime_default = Some(expr.into());
3883        self
3884    }
3885
3886    /// Builder-style helper to mark a column as `AUTO_INCREMENT`.
3887    #[must_use]
3888    pub const fn with_auto_increment(mut self) -> Self {
3889        self.auto_increment = true;
3890        self
3891    }
3892}
3893
3894impl TableSchema {
3895    pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3896        Self {
3897            name: name.into(),
3898            columns,
3899            hot_tier_bytes: None,
3900            foreign_keys: Vec::new(),
3901            uniqueness_constraints: Vec::new(),
3902            checks: Vec::new(),
3903        }
3904    }
3905}
3906
3907// =========================================================================
3908// Persistent binary format for the catalog.
3909//
3910// Layout (little-endian throughout):
3911//
3912//   [magic "SPGDB001" 8 bytes][version u8]
3913//   [table_count u32]
3914//   for each table:
3915//       [name_len u16][name bytes]
3916//       [col_count u16]
3917//       for each col:
3918//           [name_len u16][name bytes]
3919//           [type_tag u8 + optional payload]
3920//               1=Int 2=BigInt 3=Float 4=Text 5=Bool
3921//               6=Vector(u32 dim)
3922//               7=SmallInt
3923//               8=Varchar(u32 max)
3924//               9=Char(u32 size)
3925//               10=Numeric(u8 precision, u8 scale)
3926//               11=Date
3927//               12=Timestamp
3928//           [nullable u8]   0/1
3929//           [default_tag u8] 0=none 1=value (followed by [value_tag u8] + bytes)
3930//       [row_count u32]
3931//       for each row, for each col, one [value_tag u8] + value bytes:
3932//           tag 0 (Null)     → no body
3933//           tag 1 (Int)      → i32 LE
3934//           tag 2 (BigInt)   → i64 LE
3935//           tag 3 (Float)    → f64 LE
3936//           tag 4 (Text)     → u16 LE len + UTF-8 bytes
3937//           tag 5 (Bool)     → u8 0/1
3938//           tag 6 (Vector)   → u32 LE dim + dim×f32 LE
3939//           tag 7 (SmallInt) → i16 LE
3940//           tag 8 (Numeric)  → i128 LE (16 bytes) + u8 scale
3941//           tag 9 (Date)     → i32 LE (days since Unix epoch)
3942//           tag 10 (Timestamp) → i64 LE (microseconds since Unix epoch)
3943//
3944// Bumped to version 3 when NUMERIC was added; to version 4 when
3945// AUTO_INCREMENT (per-column flag) + NSW index `kind` byte landed;
3946// to version 5 when DATE / TIMESTAMP were added; to version 6 when
3947// NSW graph topology started travelling on disk (v2.7); to version 7
3948// when the NSW topology became multi-layer HNSW (v2.13); to version 8
3949// when row encoding switched to schema-driven dense layout (v3.0.2 —
3950// per-row NULL bitmap + per-column fixed-width body, no per-cell type
3951// tag).
3952// =========================================================================
3953
3954const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3955/// Current catalog snapshot format version emitted by [`Catalog::serialize`].
3956///
3957/// v9 (v5.2) extends v8 by serialising `BTree` index entries directly — every
3958/// `(IndexKey, Vec<RowLocator>)` pair travels on disk with the v5.1
3959/// `RowLocator::write_le` tag-prefixed codec. v8 `BTree` indices stored no
3960/// entries at all (the map was rebuilt from `Table::rows` on load); v9
3961/// preserves on-disk Cold locators so freezer-produced cold-tier index
3962/// entries survive a catalog snapshot round-trip. v8 readers are accepted
3963/// by version dispatch in [`Catalog::deserialize`] — every entry decodes
3964/// as `RowLocator::Hot(_)` via `add_index` rebuild, identical to v5.1
3965/// behaviour.
3966/// v6.7.2 — bumped from 10 to 11 to append per-table
3967/// `hot_tier_bytes: Option<u64>` after the per-table indices
3968/// section. v10 catalogs (v6.7.1) load with `hot_tier_bytes =
3969/// None` for every table (the deserialiser short-circuits when
3970/// version < 11). v11 snapshots written by a pre-v6.7.2 binary
3971/// fail loudly at the version check, matching the v6.1.2 /
3972/// v6.1.4 / v6.2.0 / v6.7.1 envelope-bump upgrade fences.
3973///
3974/// v6.8.0 — bumped from 11 to 12: per-index
3975/// `included_columns: Vec<u16>` appended at the tail of each
3976/// index payload. v11 (= v6.7.2) catalogs load with
3977/// `included_columns = Vec::new()` for every index — same
3978/// "older readers, append-only extension" pattern as the v6.7.2
3979/// hot_tier_bytes byte.
3980/// v7.13.0 — bumped from 22 to 23. mailrs round-5 G3 / G10.
3981/// Per-table appendix gains two new sections:
3982///   * `checks: Vec<String>` — CHECK predicate sources (Display
3983///     form of the AST Expr); re-parsed on INSERT/UPDATE to
3984///     enforce against candidate rows. Same persistence pattern
3985///     as `Index::partial_predicate`.
3986///   * Per `UniquenessConstraint`: trailing `nulls_not_distinct:
3987///     u8` flag for PG 15+ `UNIQUE NULLS NOT DISTINCT (cols)`
3988///     semantics.
3989/// v22 catalogs deserialise with empty `checks` and every UC
3990/// at `nulls_not_distinct = false`.
3991/// v24 introduces:
3992///   * Index kind tag 4 = trigram-GIN (`gin_trgm_ops`-flavoured
3993///     `USING gin` over a TEXT/VARCHAR column). Payload shape is
3994///     identical to tag-3 GIN (String → Vec<RowLocator>); the
3995///     keys are PG-compatible 3-byte trigram shingles instead of
3996///     tsvector lexemes. v23 catalogs deserialise unchanged — no
3997///     v23 writer ever emitted tag 4.
3998/// v25 introduces:
3999///   * Per `TriggerDef`: trailing `enabled: u8` flag (mailrs
4000///     round-9 A.2.b — `ALTER TABLE … { ENABLE | DISABLE }
4001///     TRIGGER …`). v24 catalogs deserialise with every trigger
4002///     `enabled = true`, matching pre-v7.16.1 behaviour.
4003/// v26 introduces (v7.17.0 Phase 1.1):
4004///   * Trailing SEQUENCE catalog block after triggers. Encoded
4005///     as `u32 count` followed by per-sequence:
4006///     `name`, `data_type: u8` (0=SmallInt,1=Int,2=BigInt),
4007///     `start i64`, `increment i64`, `min_value i64`,
4008///     `max_value i64`, `cache i64`, `cycle u8`,
4009///     `owned_by_tag u8` (0=NONE, 1=Column → `table`,`column`),
4010///     `last_value i64`, `is_called u8`. v25-and-below catalogs
4011///     deserialise with an empty sequences map.
4012/// v27 introduces (v7.17.0 Phase 1.2):
4013///   * Trailing VIEW catalog block after sequences. Encoded as
4014///     `u32 count` followed by per-view:
4015///     `name`, `column_count u16`, then column names, then
4016///     `body` long-string. v26-and-below catalogs deserialise
4017///     with an empty views map.
4018/// v28 introduces (v7.17.0 Phase 1.3):
4019///   * Trailing MATERIALIZED VIEW source registry block after
4020///     views. Encoded as `u32 count` followed by per-entry:
4021///     `name`, `body` long-string. The materialised rows live
4022///     as a regular Table of the same name (already covered by
4023///     the pre-existing tables block). v27-and-below catalogs
4024///     deserialise with an empty map.
4025/// v29 introduces (v7.17.0 Phase 1.4):
4026///   * Per-table user_enum_type appendix (after the CHECK
4027///     appendix). Layout: `u16 count` followed by per-binding
4028///     `[u16 col_pos][str enum_name]`. Only columns whose
4029///     `user_enum_type` is Some land here; the catalog stays
4030///     compact for the common no-enum case.
4031///   * Trailing ENUM types catalog block after materialized
4032///     views. Encoded as `u32 count` followed by per-entry:
4033///     `name`, `u16 label_count`, then `label_count` short
4034///     strings. v28-and-below catalogs deserialise with an
4035///     empty enum_types map and every column's
4036///     `user_enum_type = None`.
4037/// v30 introduces (v7.17.0 Phase 1.5):
4038///   * Per-table user_domain_type appendix (after the
4039///     user_enum_type appendix). Same shape as the enum one.
4040///   * Trailing DOMAIN types catalog block after the enum
4041///     block. Encoded as `u32 count` followed by per-entry:
4042///     `name`, `data_type` byte, `nullable u8`,
4043///     `default_present u8` + optional default string,
4044///     `u16 check_count` then `check_count` Display-form
4045///     CHECK strings. v29-and-below catalogs deserialise with
4046///     an empty domain_types map and `user_domain_type = None`.
4047/// v31 introduces (v7.17.0 Phase 1.6):
4048///   * Trailing user-schemas block after the DOMAIN block.
4049///     Encoded as `u32 count` followed by `count` schema-name
4050///     short strings. Built-in schemas (`public`, `pg_catalog`,
4051///     `information_schema`) are NOT serialised — they're
4052///     hardcoded in `is_builtin_schema`. v30-and-below catalogs
4053///     deserialise with an empty user-schemas set.
4054/// v32 introduces (v7.17.0 Phase 2.1):
4055///   * Per-table on_update_runtime appendix (after the
4056///     user_domain_type appendix). Layout: `u16 count` followed
4057///     by per-binding `[u16 col_pos][str expr_src]`. Only
4058///     columns whose `on_update_runtime` is Some land here;
4059///     the catalog stays compact when no MySQL-shaped table
4060///     uses the attribute. v31-and-below catalogs deserialise
4061///     with every column's `on_update_runtime = None`.
4062/// v33 introduces (v7.17.0 Phase 2.2):
4063///   * Index kind tag 5 = fulltext-GIN (MySQL `FULLTEXT KEY`
4064///     surface over a TEXT / VARCHAR column). Payload shape is
4065///     identical to tag-3 / tag-4 GIN (`String → Vec<RowLocator>`);
4066///     the keys are lower-cased word lexemes (same rule as
4067///     `to_tsvector('simple', text)`). v32 catalogs deserialise
4068///     unchanged — no v32 writer ever emitted tag 5, and FULLTEXT
4069///     KEY was silently dropped pre-v7.17 so no rebuild shim is
4070///     needed for round-tripped catalogs.
4071/// v34 introduces (v7.17.0 Phase 2.5):
4072///   * Per-table collation appendix (after the on_update_runtime
4073///     appendix). Sparse layout: only columns whose `collation`
4074///     is non-Binary land here. `u16 count` then per-binding
4075///     `[u16 col_pos][u8 collation_tag]` where the tag matches
4076///     `Collation::TAG_*`. Snapshots written by v33-and-below
4077///     readers deserialise every column with `collation =
4078///     Binary`, preserving the prior byte-wise compare
4079///     semantics. Unknown tags read back as Binary too — keeps
4080///     a forward-compat path if a future v35 adds variants
4081///     and someone rolls back to a v34 reader.
4082/// v35 introduces (v7.17.0 Phase 4.4):
4083///   * Per-table is_unsigned appendix (after the collation
4084///     appendix). Sparse layout: only `is_unsigned = true`
4085///     columns land. `u16 count` then per-binding `[u16 col_pos]`.
4086///     v34-and-below catalogs deserialise every column as
4087///     `is_unsigned = false`, preserving the prior silent-
4088///     accept behaviour for negative inserts on UNSIGNED columns.
4089/// v46 introduces (v7.23, mailrs round-14):
4090///   * Escaped short-string codec — `write_str` lengths >= 0xFFFF
4091///     emit `[u16 0xFFFF][u32 real_len]` so TEXT cells (mail bodies,
4092///     document text) above 64 KiB encode instead of panicking.
4093///     One-way upgrade: v45-and-below readers reject v46 catalogs
4094///     loudly via the version gate; v46 readers decode v45 catalogs
4095///     with the plain-u16 rules (0xFFFF is a legitimate length
4096///     there).
4097/// v47 introduces (v7.27, mailrs round-21):
4098///   * Escaped lengths for the REMAINING u16-length cell payloads —
4099///     BYTEA cells, TEXT[] elements, tsvector lexemes and tsquery
4100///     terms — the same `[u16 0xFFFF][u32 real_len]` escape v46
4101///     gave short strings. Round-14 fixed TEXT and missed these;
4102///     round-21 fired the BYTEA twin during a production migration.
4103///     One-way upgrade, same posture as v46.
4104const FILE_VERSION: u8 = 47;
4105/// Oldest format version [`Catalog::deserialize`] still accepts. v8 is the
4106/// v3.0.2 dense-row layout; pre-v8 catalogs require an offline migration.
4107const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4108
4109// IndexKey wire format (v9):
4110//   tag 0 = Int  → [i64 LE]
4111//   tag 1 = Text → [u16 LE len + UTF-8 bytes] (via write_str / read_str)
4112//   tag 2 = Bool → [u8 0/1]
4113const INDEX_KEY_TAG_INT: u8 = 0;
4114const INDEX_KEY_TAG_TEXT: u8 = 1;
4115const INDEX_KEY_TAG_BOOL: u8 = 2;
4116/// v7.17.0 — `IndexKey::Uuid([u8; 16])`. Body = raw 16 bytes
4117/// (RFC 4122 byte order). Persisted only in FILE_VERSION 36+
4118/// catalogs.
4119const INDEX_KEY_TAG_UUID: u8 = 3;
4120
4121impl Catalog {
4122    /// Serialize the whole catalog (schema + every row) into a self-contained
4123    /// byte buffer. Format is documented above the impl block.
4124    pub fn serialize(&self) -> Vec<u8> {
4125        let mut out = Vec::with_capacity(64);
4126        out.extend_from_slice(FILE_MAGIC);
4127        out.push(FILE_VERSION);
4128        write_u32(
4129            &mut out,
4130            u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4131        );
4132        for t in &self.tables {
4133            write_str(&mut out, &t.schema.name);
4134            write_u16(
4135                &mut out,
4136                u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4137            );
4138            for c in &t.schema.columns {
4139                write_str(&mut out, &c.name);
4140                write_data_type(&mut out, c.ty);
4141                out.push(u8::from(c.nullable));
4142                match &c.default {
4143                    None => out.push(0),
4144                    Some(v) => {
4145                        out.push(1);
4146                        write_value(&mut out, v);
4147                    }
4148                }
4149                out.push(u8::from(c.auto_increment));
4150            }
4151            write_u32(
4152                &mut out,
4153                u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4154            );
4155            // v3.0.2 dense row encoding (FILE_VERSION 8): per-row NULL
4156            // bitmap, then tightly-packed bodies. Identical wire format
4157            // as before — extracted into `encode_row_body_dense` so cold-
4158            // tier segments (v5.1+) can share the encoding.
4159            for row in &t.rows {
4160                out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4161            }
4162            // Index definitions. Per-index payload:
4163            //   [name][col_pos u16][kind u8]
4164            //     kind 0 = B-tree           (no params — rebuilt on load)
4165            //     kind 1 = NSW graph        (u16 M + serialized graph)
4166            // For NSW the graph topology travels on disk so startup
4167            // doesn't re-run the O(n²M) rebuild — see v2.7 notes.
4168            write_u16(
4169                &mut out,
4170                u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4171            );
4172            for idx in &t.indices {
4173                write_str(&mut out, &idx.name);
4174                write_u16(
4175                    &mut out,
4176                    u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4177                );
4178                match &idx.kind {
4179                    IndexKind::BTree(map) => {
4180                        out.push(0);
4181                        // v9: serialise the full PB map. Each entry's
4182                        // RowLocator list travels with the tag-prefixed
4183                        // codec from `row_locator::write_le`, so freezer-
4184                        // produced Cold locators survive a snapshot
4185                        // round-trip. v8 BTree wrote nothing here and
4186                        // rebuilt from rows — v9 readers tolerate v8 by
4187                        // version dispatch in `Catalog::deserialize`.
4188                        write_u32(
4189                            &mut out,
4190                            u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4191                        );
4192                        for (key, locators) in map {
4193                            write_index_key(&mut out, key);
4194                            write_u32(
4195                                &mut out,
4196                                u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4197                            );
4198                            for loc in locators {
4199                                loc.write_le(&mut out);
4200                            }
4201                        }
4202                    }
4203                    IndexKind::Nsw(g) => {
4204                        out.push(1);
4205                        write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4206                        write_nsw_graph(&mut out, g);
4207                    }
4208                    IndexKind::Brin { column_type } => {
4209                        // v6.7.1 — tag byte 2 = BRIN. Payload is the
4210                        // column type code (1 byte mapping to the
4211                        // shared DataType numeric encoding); no
4212                        // further data — BRIN summaries live in
4213                        // cold segments, not the catalog.
4214                        out.push(2);
4215                        write_data_type(&mut out, *column_type);
4216                    }
4217                    IndexKind::Gin(map) => {
4218                        // v7.12.3 — tag byte 3 = GIN. Payload mirrors
4219                        // the BTree encoding but with String (lexeme
4220                        // word) keys instead of IndexKey. Tag-prefixed
4221                        // RowLocator codec so freezer-produced Cold
4222                        // locators survive snapshot round-trip.
4223                        // FILE_VERSION 21+; v20 catalogs never wrote a
4224                        // GIN index (the AM degraded to BTree fallback
4225                        // pre-v7.12.3), so no migration shim is needed.
4226                        out.push(3);
4227                        write_u32(
4228                            &mut out,
4229                            u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4230                        );
4231                        for (word, locators) in map {
4232                            write_str(&mut out, word);
4233                            write_u32(
4234                                &mut out,
4235                                u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4236                            );
4237                            for loc in locators {
4238                                loc.write_le(&mut out);
4239                            }
4240                        }
4241                    }
4242                    IndexKind::GinTrgm(map) => {
4243                        // v7.15.0 — tag byte 4 = GinTrgm
4244                        // (`gin_trgm_ops` GIN over a TEXT column).
4245                        // Payload shape is identical to tag-3 GIN —
4246                        // `String → Vec<RowLocator>` posting lists.
4247                        // The String keys are 3-byte trigrams instead
4248                        // of tsvector lexemes; the deserializer
4249                        // dispatches on the tag, not the key shape.
4250                        // FILE_VERSION 24+; v23 catalogs never wrote
4251                        // a trigram-GIN.
4252                        out.push(4);
4253                        write_u32(
4254                            &mut out,
4255                            u32::try_from(map.len()).expect("≤ 4G trigram-GIN posting lists"),
4256                        );
4257                        for (tri, locators) in map {
4258                            write_str(&mut out, tri);
4259                            write_u32(
4260                                &mut out,
4261                                u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4262                            );
4263                            for loc in locators {
4264                                loc.write_le(&mut out);
4265                            }
4266                        }
4267                    }
4268                    IndexKind::GinFulltext(map) => {
4269                        // v7.17.0 Phase 2.2 — tag byte 5 =
4270                        // GinFulltext (MySQL `FULLTEXT KEY` GIN
4271                        // over a TEXT/VARCHAR column). Payload
4272                        // shape mirrors tag-3 / tag-4 GIN —
4273                        // `String → Vec<RowLocator>` posting
4274                        // lists keyed by lower-cased word
4275                        // lexemes. FILE_VERSION 33+; v32 catalogs
4276                        // never wrote a fulltext-GIN (FULLTEXT
4277                        // KEY was silently dropped pre-v7.17).
4278                        out.push(5);
4279                        write_u32(
4280                            &mut out,
4281                            u32::try_from(map.len()).expect("≤ 4G fulltext-GIN posting lists"),
4282                        );
4283                        for (lex, locators) in map {
4284                            write_str(&mut out, lex);
4285                            write_u32(
4286                                &mut out,
4287                                u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4288                            );
4289                            for loc in locators {
4290                                loc.write_le(&mut out);
4291                            }
4292                        }
4293                    }
4294                }
4295                // v6.8.0 — included_columns appendix per index.
4296                // Layout: [u16 num_included][num × u16 column_position].
4297                // v11 readers stop before this u16 (deserialise loop
4298                // gated on version >= 12); v12+ readers always
4299                // consume it. Empty Vec serialises as a bare 0u16.
4300                write_u16(
4301                    &mut out,
4302                    u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4303                );
4304                for col_pos in &idx.included_columns {
4305                    write_u16(
4306                        &mut out,
4307                        u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4308                    );
4309                }
4310                // v6.8.1 — partial_predicate appendix per index.
4311                // Layout: [u8 has_pred][u16 LE len][bytes (if has_pred)].
4312                // Same v12 gate as included_columns.
4313                match &idx.partial_predicate {
4314                    None => out.push(0),
4315                    Some(pred) => {
4316                        out.push(1);
4317                        write_str(&mut out, pred);
4318                    }
4319                }
4320                // v6.8.2 — expression appendix. Same shape as
4321                // partial_predicate.
4322                match &idx.expression {
4323                    None => out.push(0),
4324                    Some(expr) => {
4325                        out.push(1);
4326                        write_str(&mut out, expr);
4327                    }
4328                }
4329                // v7.9.29 — is_unique appendix (FILE_VERSION 16+).
4330                // Single byte 0/1. v15-and-below readers stop before
4331                // this byte; v16 readers always consume it. mailrs K1.
4332                out.push(u8::from(idx.is_unique));
4333                // v7.9.29 — extra_column_positions appendix.
4334                // Layout: [u16 count][count × u16 column_position].
4335                write_u16(
4336                    &mut out,
4337                    u16::try_from(idx.extra_column_positions.len())
4338                        .expect("≤ 65k extra cols / index"),
4339                );
4340                for cp in &idx.extra_column_positions {
4341                    write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4342                }
4343            }
4344            // v6.7.2 — per-table hot_tier_bytes Option<u64>.
4345            // Layout: [u8 has_value][u64 LE value (if has_value)].
4346            // v10 readers stop before this byte (deserialise loop
4347            // gated on version >= 11); v11+ readers always
4348            // consume it.
4349            match t.schema.hot_tier_bytes {
4350                None => out.push(0),
4351                Some(n) => {
4352                    out.push(1);
4353                    out.extend_from_slice(&n.to_le_bytes());
4354                }
4355            }
4356            // v7.6.1 — FOREIGN KEY appendix (catalog FILE_VERSION 13+).
4357            // Layout: [u16 LE fk_count]
4358            //   per fk:
4359            //     [u8 has_name] [str name (if has_name)]
4360            //     [u16 LE local_arity] [u16 LE local_pos]*arity
4361            //     [str parent_table]
4362            //     [u16 LE parent_arity] [u16 LE parent_pos]*arity
4363            //     [u8 on_delete_tag] [u8 on_update_tag]
4364            // Older catalogs (v12 and below) skip this block entirely;
4365            // their reader stops before this byte.
4366            write_u16(
4367                &mut out,
4368                u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4369            );
4370            for fk in &t.schema.foreign_keys {
4371                match &fk.name {
4372                    None => out.push(0),
4373                    Some(n) => {
4374                        out.push(1);
4375                        write_str(&mut out, n);
4376                    }
4377                }
4378                write_u16(
4379                    &mut out,
4380                    u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4381                );
4382                for &p in &fk.local_columns {
4383                    write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4384                }
4385                write_str(&mut out, &fk.parent_table);
4386                write_u16(
4387                    &mut out,
4388                    u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4389                );
4390                for &p in &fk.parent_columns {
4391                    write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4392                }
4393                out.push(fk.on_delete.tag());
4394                out.push(fk.on_update.tag());
4395            }
4396            // v7.9.19 — UniquenessConstraint appendix (catalog
4397            // FILE_VERSION 15+). Layout per table after the FK
4398            // block:
4399            //   [u16 count]
4400            //     per constraint:
4401            //       [u8 is_primary_key]
4402            //       [u16 arity][u16 col_pos]*arity
4403            // Older catalogs (v14 and below) skip this block.
4404            write_u16(
4405                &mut out,
4406                u16::try_from(t.schema.uniqueness_constraints.len())
4407                    .expect("≤ 65k uniqueness constraints/table"),
4408            );
4409            for uc in &t.schema.uniqueness_constraints {
4410                out.push(u8::from(uc.is_primary_key));
4411                write_u16(
4412                    &mut out,
4413                    u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4414                );
4415                for &p in &uc.columns {
4416                    write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4417                }
4418                // v7.13.0 — `nulls_not_distinct` flag
4419                // (FILE_VERSION 23+). Always written by writers at
4420                // version 23+; deserialise gates on `version >= 23`
4421                // so v22-and-below catalogs round-trip cleanly.
4422                out.push(u8::from(uc.nulls_not_distinct));
4423            }
4424            // v7.9.21 — runtime_default appendix per table.
4425            // Layout: [u16 count] then for each:
4426            //   [u16 col_pos][str expr]
4427            // Only columns whose runtime_default is Some land here;
4428            // catalog stays compact for the common literal-default
4429            // case.
4430            let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4431            for (i, c) in t.schema.columns.iter().enumerate() {
4432                if let Some(e) = &c.runtime_default {
4433                    rt_defaults.push((i, e.as_str()));
4434                }
4435            }
4436            write_u16(
4437                &mut out,
4438                u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4439            );
4440            for (pos, expr) in rt_defaults {
4441                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4442                write_str(&mut out, expr);
4443            }
4444            // v7.13.0 — CHECK constraint appendix per table.
4445            // Layout: [u16 count] then `count` Display-form
4446            // expression strings. Re-parsed on every INSERT/UPDATE
4447            // by the engine. FILE_VERSION 23+ only; v22 readers
4448            // never reach this block because the writer also moves
4449            // to v23 in lock-step.
4450            write_u16(
4451                &mut out,
4452                u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
4453            );
4454            for c in &t.schema.checks {
4455                write_str(&mut out, c.as_str());
4456            }
4457            // v7.17.0 Phase 1.4 — per-table user_enum_type
4458            // appendix. Layout: [u16 count] then
4459            // [u16 col_pos][str enum_name] per binding. Only
4460            // columns whose user_enum_type is Some land here.
4461            let mut enum_bindings: Vec<(usize, &str)> = Vec::new();
4462            for (i, c) in t.schema.columns.iter().enumerate() {
4463                if let Some(e) = &c.user_enum_type {
4464                    enum_bindings.push((i, e.as_str()));
4465                }
4466            }
4467            write_u16(
4468                &mut out,
4469                u16::try_from(enum_bindings.len()).expect("≤ 65k enum-typed columns/table"),
4470            );
4471            for (pos, ename) in enum_bindings {
4472                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4473                write_str(&mut out, ename);
4474            }
4475            // v7.17.0 Phase 1.5 — per-table user_domain_type
4476            // appendix. Same layout as the enum one. v29-and-
4477            // below readers stop after the enum appendix.
4478            let mut domain_bindings: Vec<(usize, &str)> = Vec::new();
4479            for (i, c) in t.schema.columns.iter().enumerate() {
4480                if let Some(d) = &c.user_domain_type {
4481                    domain_bindings.push((i, d.as_str()));
4482                }
4483            }
4484            write_u16(
4485                &mut out,
4486                u16::try_from(domain_bindings.len()).expect("≤ 65k domain-typed columns/table"),
4487            );
4488            for (pos, dname) in domain_bindings {
4489                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4490                write_str(&mut out, dname);
4491            }
4492            // v7.17.0 Phase 2.1 — per-table on_update_runtime
4493            // appendix. Sparse: only ON UPDATE-bound columns.
4494            let mut on_update_bindings: Vec<(usize, &str)> = Vec::new();
4495            for (i, c) in t.schema.columns.iter().enumerate() {
4496                if let Some(e) = &c.on_update_runtime {
4497                    on_update_bindings.push((i, e.as_str()));
4498                }
4499            }
4500            write_u16(
4501                &mut out,
4502                u16::try_from(on_update_bindings.len()).expect("≤ 65k ON UPDATE columns/table"),
4503            );
4504            for (pos, expr_src) in on_update_bindings {
4505                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4506                write_str(&mut out, expr_src);
4507            }
4508            // v7.17.0 Phase 2.5 — per-table collation appendix.
4509            // Sparse: only non-Binary columns land. Layout:
4510            // `[u16 count][u16 col_pos][u8 tag] × count`.
4511            let mut coll_bindings: Vec<(usize, u8)> = Vec::new();
4512            for (i, c) in t.schema.columns.iter().enumerate() {
4513                let tag = match c.collation {
4514                    Collation::Binary => continue,
4515                    Collation::CaseInsensitive => Collation::TAG_CASE_INSENSITIVE,
4516                };
4517                coll_bindings.push((i, tag));
4518            }
4519            write_u16(
4520                &mut out,
4521                u16::try_from(coll_bindings.len()).expect("≤ 65k collation bindings/table"),
4522            );
4523            for (pos, tag) in coll_bindings {
4524                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4525                out.push(tag);
4526            }
4527            // v7.17.0 Phase 4.4 — per-table is_unsigned appendix.
4528            // Sparse: only UNSIGNED columns land. Layout:
4529            // `[u16 count][u16 col_pos] × count`.
4530            let mut unsigned_bindings: Vec<usize> = Vec::new();
4531            for (i, c) in t.schema.columns.iter().enumerate() {
4532                if c.is_unsigned {
4533                    unsigned_bindings.push(i);
4534                }
4535            }
4536            write_u16(
4537                &mut out,
4538                u16::try_from(unsigned_bindings.len()).expect("≤ 65k UNSIGNED columns/table"),
4539            );
4540            for pos in unsigned_bindings {
4541                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4542            }
4543            // v7.17.0 Phase 3.P0-36 — per-table inline_enum_variants
4544            // appendix. Sparse: only ENUM columns land. Layout:
4545            // `[u16 count] then per binding [u16 col_pos]
4546            // [u16 variant_count] then variant strings`.
4547            // FILE_VERSION 41+; v40 readers never reach this block.
4548            let mut enum_inline_bindings: Vec<(usize, &[String])> = Vec::new();
4549            for (i, c) in t.schema.columns.iter().enumerate() {
4550                if let Some(vs) = &c.inline_enum_variants {
4551                    enum_inline_bindings.push((i, vs.as_slice()));
4552                }
4553            }
4554            write_u16(
4555                &mut out,
4556                u16::try_from(enum_inline_bindings.len()).expect("≤ 65k inline-ENUM columns/table"),
4557            );
4558            for (pos, variants) in enum_inline_bindings {
4559                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4560                write_u16(
4561                    &mut out,
4562                    u16::try_from(variants.len()).expect("≤ 65k variants/ENUM"),
4563                );
4564                for v in variants {
4565                    write_str(&mut out, v.as_str());
4566                }
4567            }
4568            // v7.17.0 Phase 3.P0-37 — per-table inline_set_variants
4569            // appendix. Same layout as the inline ENUM block.
4570            // FILE_VERSION 42+; v41 readers never reach this block.
4571            let mut set_inline_bindings: Vec<(usize, &[String])> = Vec::new();
4572            for (i, c) in t.schema.columns.iter().enumerate() {
4573                if let Some(vs) = &c.inline_set_variants {
4574                    set_inline_bindings.push((i, vs.as_slice()));
4575                }
4576            }
4577            write_u16(
4578                &mut out,
4579                u16::try_from(set_inline_bindings.len()).expect("≤ 65k inline-SET columns/table"),
4580            );
4581            for (pos, variants) in set_inline_bindings {
4582                write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4583                write_u16(
4584                    &mut out,
4585                    u16::try_from(variants.len()).expect("≤ 65k variants/SET"),
4586                );
4587                for v in variants {
4588                    write_str(&mut out, v.as_str());
4589                }
4590            }
4591        }
4592        // v7.12.4 — catalog-wide appendix: user-defined functions
4593        // then triggers. FILE_VERSION 22+ only. v21 and earlier
4594        // readers stop after the last table; v22 readers always
4595        // consume two `u32` counts (possibly zero).
4596        //
4597        // Function entry layout:
4598        //   [str name] [str args_repr] [str returns]
4599        //   [str language] [str body]
4600        // Trigger entry layout:
4601        //   [str name] [str table] [str timing]
4602        //   [u16 event_count] (event_count × str)
4603        //   [str for_each] [str function]
4604        write_u32(
4605            &mut out,
4606            u32::try_from(self.functions.len()).expect("≤ 4G functions"),
4607        );
4608        for fd in self.functions.values() {
4609            write_str(&mut out, &fd.name);
4610            write_str(&mut out, &fd.args_repr);
4611            write_str(&mut out, &fd.returns);
4612            write_str(&mut out, &fd.language);
4613            write_str_long(&mut out, &fd.body);
4614        }
4615        write_u32(
4616            &mut out,
4617            u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
4618        );
4619        for td in &self.triggers {
4620            write_str(&mut out, &td.name);
4621            write_str(&mut out, &td.table);
4622            write_str(&mut out, &td.timing);
4623            write_u16(
4624                &mut out,
4625                u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
4626            );
4627            for ev in &td.events {
4628                write_str(&mut out, ev);
4629            }
4630            write_str(&mut out, &td.for_each);
4631            write_str(&mut out, &td.function);
4632            // v7.13.0 — `UPDATE OF cols` filter
4633            // (FILE_VERSION 23+). v22 readers omit; v23 writers
4634            // always emit (possibly zero).
4635            write_u16(
4636                &mut out,
4637                u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
4638            );
4639            for c in &td.update_columns {
4640                write_str(&mut out, c);
4641            }
4642            // v7.16.1 — TriggerDef.enabled (FILE_VERSION 25+).
4643            out.push(u8::from(td.enabled));
4644        }
4645        // v7.17.0 Phase 1.1 — SEQUENCE catalog block (FILE_VERSION 26+).
4646        write_u32(
4647            &mut out,
4648            u32::try_from(self.sequences.len()).expect("≤ 4G sequences"),
4649        );
4650        for seq in self.sequences.values() {
4651            write_str(&mut out, &seq.name);
4652            out.push(match seq.data_type {
4653                SequenceDataType::SmallInt => 0,
4654                SequenceDataType::Int => 1,
4655                SequenceDataType::BigInt => 2,
4656            });
4657            out.extend_from_slice(&seq.start.to_le_bytes());
4658            out.extend_from_slice(&seq.increment.to_le_bytes());
4659            out.extend_from_slice(&seq.min_value.to_le_bytes());
4660            out.extend_from_slice(&seq.max_value.to_le_bytes());
4661            out.extend_from_slice(&seq.cache.to_le_bytes());
4662            out.push(u8::from(seq.cycle));
4663            match &seq.owned_by {
4664                None => out.push(0),
4665                Some((table, column)) => {
4666                    out.push(1);
4667                    write_str(&mut out, table);
4668                    write_str(&mut out, column);
4669                }
4670            }
4671            out.extend_from_slice(&seq.last_value.to_le_bytes());
4672            out.push(u8::from(seq.is_called));
4673        }
4674        // v7.17.0 Phase 1.2 — VIEW catalog block (FILE_VERSION 27+).
4675        write_u32(
4676            &mut out,
4677            u32::try_from(self.views.len()).expect("≤ 4G views"),
4678        );
4679        for view in self.views.values() {
4680            write_str(&mut out, &view.name);
4681            write_u16(
4682                &mut out,
4683                u16::try_from(view.columns.len()).expect("≤ 65k cols / view"),
4684            );
4685            for c in &view.columns {
4686                write_str(&mut out, c);
4687            }
4688            write_str_long(&mut out, &view.body);
4689        }
4690        // v7.17.0 Phase 1.3 — MATERIALIZED VIEW source registry
4691        // (FILE_VERSION 28+). The backing rows live as a regular
4692        // table of the same name already in the tables block.
4693        write_u32(
4694            &mut out,
4695            u32::try_from(self.materialized_views.len()).expect("≤ 4G materialized views"),
4696        );
4697        for (name, body) in &self.materialized_views {
4698            write_str(&mut out, name);
4699            write_str_long(&mut out, body);
4700        }
4701        // v7.17.0 Phase 1.4 — ENUM types catalog block
4702        // (FILE_VERSION 29+).
4703        write_u32(
4704            &mut out,
4705            u32::try_from(self.enum_types.len()).expect("≤ 4G enum types"),
4706        );
4707        for e in self.enum_types.values() {
4708            write_str(&mut out, &e.name);
4709            write_u16(
4710                &mut out,
4711                u16::try_from(e.labels.len()).expect("≤ 65k labels / enum"),
4712            );
4713            for l in &e.labels {
4714                write_str(&mut out, l);
4715            }
4716        }
4717        // v7.17.0 Phase 1.5 — DOMAIN types catalog block
4718        // (FILE_VERSION 30+).
4719        write_u32(
4720            &mut out,
4721            u32::try_from(self.domain_types.len()).expect("≤ 4G domain types"),
4722        );
4723        for d in self.domain_types.values() {
4724            write_str(&mut out, &d.name);
4725            write_data_type(&mut out, d.base_type);
4726            out.push(u8::from(d.nullable));
4727            match &d.default {
4728                None => out.push(0),
4729                Some(s) => {
4730                    out.push(1);
4731                    write_str(&mut out, s);
4732                }
4733            }
4734            write_u16(
4735                &mut out,
4736                u16::try_from(d.checks.len()).expect("≤ 65k CHECKs / domain"),
4737            );
4738            for c in &d.checks {
4739                write_str(&mut out, c);
4740            }
4741        }
4742        // v7.17.0 Phase 1.6 — user-schemas registry
4743        // (FILE_VERSION 31+). Built-ins are hardcoded in
4744        // `is_builtin_schema` and not persisted.
4745        write_u32(
4746            &mut out,
4747            u32::try_from(self.schemas.len()).expect("≤ 4G schemas"),
4748        );
4749        for name in &self.schemas {
4750            write_str(&mut out, name);
4751        }
4752        out
4753    }
4754
4755    /// Deserialize a previously-serialized catalog. Rejects bad magic, version
4756    /// mismatch, unknown tags, truncation, and trailing bytes.
4757    pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4758        let mut cur = Cursor::new(buf);
4759        let magic = cur.take(8)?;
4760        if magic != FILE_MAGIC {
4761            return Err(StorageError::Corrupt(format!(
4762                "bad magic: expected SPGDB001, got {magic:?}"
4763            )));
4764        }
4765        let version = cur.read_u8()?;
4766        if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4767            return Err(StorageError::Corrupt(format!(
4768                "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4769            )));
4770        }
4771        // v7.23/v7.27 — escape decoding is version-gated (see
4772        // STR_LEN_ESCAPE / Cursor::codec_version).
4773        cur.codec_version = version;
4774        let table_count = cur.read_u32()? as usize;
4775        let mut cat = Self::new();
4776        for _ in 0..table_count {
4777            deserialize_table(&mut cur, &mut cat, version)?;
4778        }
4779        // v7.12.4 — catalog-wide function + trigger appendix.
4780        // FILE_VERSION 22+ only; v21 and earlier catalogs stop
4781        // after the last table.
4782        if version >= 22 {
4783            let fn_count = cur.read_u32()? as usize;
4784            for _ in 0..fn_count {
4785                let name = cur.read_str()?;
4786                let args_repr = cur.read_str()?;
4787                let returns = cur.read_str()?;
4788                let language = cur.read_str()?;
4789                let body = cur.read_str_long()?;
4790                cat.functions.insert(
4791                    name.clone(),
4792                    FunctionDef {
4793                        name,
4794                        args_repr,
4795                        returns,
4796                        language,
4797                        body,
4798                    },
4799                );
4800            }
4801            let trg_count = cur.read_u32()? as usize;
4802            for _ in 0..trg_count {
4803                let name = cur.read_str()?;
4804                let table = cur.read_str()?;
4805                let timing = cur.read_str()?;
4806                let ev_count = cur.read_u16()? as usize;
4807                let mut events = Vec::with_capacity(ev_count);
4808                for _ in 0..ev_count {
4809                    events.push(cur.read_str()?);
4810                }
4811                let for_each = cur.read_str()?;
4812                let function = cur.read_str()?;
4813                // v7.13.0 — trailing `UPDATE OF cols` filter
4814                // (FILE_VERSION 23+ only; v22 catalogs omit and
4815                // deserialise with an empty vec).
4816                let update_columns = if version >= 23 {
4817                    let n = cur.read_u16()? as usize;
4818                    let mut cols = Vec::with_capacity(n);
4819                    for _ in 0..n {
4820                        cols.push(cur.read_str()?);
4821                    }
4822                    cols
4823                } else {
4824                    Vec::new()
4825                };
4826                // v7.16.1 — TriggerDef.enabled (FILE_VERSION 25+).
4827                // v24-and-below catalogs deserialise with `true`
4828                // — pre-v7.16.1 every trigger always fired.
4829                let enabled = if version >= 25 {
4830                    cur.read_u8()? != 0
4831                } else {
4832                    true
4833                };
4834                cat.triggers.push(TriggerDef {
4835                    name,
4836                    table,
4837                    timing,
4838                    events,
4839                    for_each,
4840                    function,
4841                    update_columns,
4842                    enabled,
4843                });
4844            }
4845        }
4846        // v7.17.0 Phase 1.1 — SEQUENCE block (FILE_VERSION 26+).
4847        // v25-and-below catalogs omit; we leave the map empty.
4848        if version >= 26 {
4849            let seq_count = cur.read_u32()? as usize;
4850            for _ in 0..seq_count {
4851                let name = cur.read_str()?;
4852                let data_type = match cur.read_u8()? {
4853                    0 => SequenceDataType::SmallInt,
4854                    1 => SequenceDataType::Int,
4855                    2 => SequenceDataType::BigInt,
4856                    other => {
4857                        return Err(StorageError::Corrupt(format!(
4858                            "unknown SEQUENCE data-type tag {other}"
4859                        )));
4860                    }
4861                };
4862                let start = cur.read_i64()?;
4863                let increment = cur.read_i64()?;
4864                let min_value = cur.read_i64()?;
4865                let max_value = cur.read_i64()?;
4866                let cache = cur.read_i64()?;
4867                let cycle = cur.read_u8()? != 0;
4868                let owned_by = match cur.read_u8()? {
4869                    0 => None,
4870                    1 => {
4871                        let t = cur.read_str()?;
4872                        let c = cur.read_str()?;
4873                        Some((t, c))
4874                    }
4875                    other => {
4876                        return Err(StorageError::Corrupt(format!(
4877                            "unknown SEQUENCE owned-by tag {other}"
4878                        )));
4879                    }
4880                };
4881                let last_value = cur.read_i64()?;
4882                let is_called = cur.read_u8()? != 0;
4883                cat.sequences.insert(
4884                    name.clone(),
4885                    SequenceDef {
4886                        name,
4887                        data_type,
4888                        start,
4889                        increment,
4890                        min_value,
4891                        max_value,
4892                        cache,
4893                        cycle,
4894                        owned_by,
4895                        last_value,
4896                        is_called,
4897                    },
4898                );
4899            }
4900        }
4901        // v7.17.0 Phase 1.2 — VIEW block (FILE_VERSION 27+).
4902        // v26-and-below catalogs omit; we leave the map empty.
4903        if version >= 27 {
4904            let view_count = cur.read_u32()? as usize;
4905            for _ in 0..view_count {
4906                let name = cur.read_str()?;
4907                let col_count = cur.read_u16()? as usize;
4908                let mut columns = Vec::with_capacity(col_count);
4909                for _ in 0..col_count {
4910                    columns.push(cur.read_str()?);
4911                }
4912                let body = cur.read_str_long()?;
4913                cat.views.insert(
4914                    name.clone(),
4915                    ViewDef {
4916                        name,
4917                        columns,
4918                        body,
4919                    },
4920                );
4921            }
4922        }
4923        // v7.17.0 Phase 1.3 — MATERIALIZED VIEW source registry
4924        // (FILE_VERSION 28+). v27-and-below catalogs omit.
4925        if version >= 28 {
4926            let mv_count = cur.read_u32()? as usize;
4927            for _ in 0..mv_count {
4928                let name = cur.read_str()?;
4929                let body = cur.read_str_long()?;
4930                cat.materialized_views.insert(name, body);
4931            }
4932        }
4933        // v7.17.0 Phase 1.4 — ENUM types catalog block
4934        // (FILE_VERSION 29+).
4935        if version >= 29 {
4936            let etype_count = cur.read_u32()? as usize;
4937            for _ in 0..etype_count {
4938                let name = cur.read_str()?;
4939                let label_count = cur.read_u16()? as usize;
4940                let mut labels = Vec::with_capacity(label_count);
4941                for _ in 0..label_count {
4942                    labels.push(cur.read_str()?);
4943                }
4944                cat.enum_types
4945                    .insert(name.clone(), EnumDef { name, labels });
4946            }
4947        }
4948        // v7.17.0 Phase 1.5 — DOMAIN types catalog block
4949        // (FILE_VERSION 30+).
4950        if version >= 30 {
4951            let dtype_count = cur.read_u32()? as usize;
4952            for _ in 0..dtype_count {
4953                let name = cur.read_str()?;
4954                let base_type = cur.read_data_type()?;
4955                let nullable = cur.read_u8()? != 0;
4956                let default = match cur.read_u8()? {
4957                    0 => None,
4958                    1 => Some(cur.read_str()?),
4959                    other => {
4960                        return Err(StorageError::Corrupt(format!(
4961                            "unknown DOMAIN default tag {other}"
4962                        )));
4963                    }
4964                };
4965                let check_count = cur.read_u16()? as usize;
4966                let mut checks = Vec::with_capacity(check_count);
4967                for _ in 0..check_count {
4968                    checks.push(cur.read_str()?);
4969                }
4970                cat.domain_types.insert(
4971                    name.clone(),
4972                    DomainDef {
4973                        name,
4974                        base_type,
4975                        nullable,
4976                        default,
4977                        checks,
4978                    },
4979                );
4980            }
4981        }
4982        // v7.17.0 Phase 1.6 — user-schemas registry
4983        // (FILE_VERSION 31+).
4984        if version >= 31 {
4985            let sch_count = cur.read_u32()? as usize;
4986            for _ in 0..sch_count {
4987                let name = cur.read_str()?;
4988                cat.schemas.insert(name);
4989            }
4990        }
4991        if cur.pos < buf.len() {
4992            return Err(StorageError::Corrupt(format!(
4993                "trailing bytes: {} unread",
4994                buf.len() - cur.pos
4995            )));
4996        }
4997        Ok(cat)
4998    }
4999}
5000
5001#[cfg(test)]
5002mod tests;