Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6pub use compact_str::CompactString;
7
8use crate::parser::Expr;
9
10/// SQL data types.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum DataType {
13    Null,
14    Integer,
15    Real,
16    Text,
17    Blob,
18    Boolean,
19    Time,
20    Date,
21    Timestamp,
22    Interval,
23    Json,
24    Jsonb,
25}
26
27impl DataType {
28    pub fn type_tag(self) -> u8 {
29        match self {
30            DataType::Null => 0,
31            DataType::Blob => 1,
32            DataType::Text => 2,
33            DataType::Boolean => 3,
34            DataType::Integer => 4,
35            DataType::Real => 5,
36            DataType::Time => 6,
37            DataType::Date => 7,
38            DataType::Timestamp => 8,
39            DataType::Interval => 9,
40            DataType::Json => 10,
41            DataType::Jsonb => 11,
42        }
43    }
44
45    pub fn from_tag(tag: u8) -> Option<Self> {
46        match tag {
47            0 => Some(DataType::Null),
48            1 => Some(DataType::Blob),
49            2 => Some(DataType::Text),
50            3 => Some(DataType::Boolean),
51            4 => Some(DataType::Integer),
52            5 => Some(DataType::Real),
53            6 => Some(DataType::Time),
54            7 => Some(DataType::Date),
55            8 => Some(DataType::Timestamp),
56            9 => Some(DataType::Interval),
57            10 => Some(DataType::Json),
58            11 => Some(DataType::Jsonb),
59            _ => None,
60        }
61    }
62}
63
64impl fmt::Display for DataType {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        match self {
67            DataType::Null => write!(f, "NULL"),
68            DataType::Integer => write!(f, "INTEGER"),
69            DataType::Real => write!(f, "REAL"),
70            DataType::Text => write!(f, "TEXT"),
71            DataType::Blob => write!(f, "BLOB"),
72            DataType::Boolean => write!(f, "BOOLEAN"),
73            DataType::Time => write!(f, "TIME"),
74            DataType::Date => write!(f, "DATE"),
75            DataType::Timestamp => write!(f, "TIMESTAMP"),
76            DataType::Interval => write!(f, "INTERVAL"),
77            DataType::Json => write!(f, "JSON"),
78            DataType::Jsonb => write!(f, "JSONB"),
79        }
80    }
81}
82
83/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
84/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
85#[derive(Debug, Clone, Default)]
86pub enum Value {
87    #[default]
88    Null,
89    Integer(i64),
90    Real(f64),
91    Text(CompactString),
92    Blob(Vec<u8>),
93    Boolean(bool),
94    Time(i64),
95    Date(i32),
96    Timestamp(i64),
97    Interval {
98        months: i32,
99        days: i32,
100        micros: i64,
101    },
102    Json(CompactString),
103    Jsonb(Arc<[u8]>),
104}
105
106impl Value {
107    pub fn data_type(&self) -> DataType {
108        match self {
109            Value::Null => DataType::Null,
110            Value::Integer(_) => DataType::Integer,
111            Value::Real(_) => DataType::Real,
112            Value::Text(_) => DataType::Text,
113            Value::Blob(_) => DataType::Blob,
114            Value::Boolean(_) => DataType::Boolean,
115            Value::Time(_) => DataType::Time,
116            Value::Date(_) => DataType::Date,
117            Value::Timestamp(_) => DataType::Timestamp,
118            Value::Interval { .. } => DataType::Interval,
119            Value::Json(_) => DataType::Json,
120            Value::Jsonb(_) => DataType::Jsonb,
121        }
122    }
123
124    pub fn is_null(&self) -> bool {
125        matches!(self, Value::Null)
126    }
127
128    /// Returns true for `±infinity` sentinel values on DATE / TIMESTAMP; false otherwise.
129    pub fn is_finite_temporal(&self) -> bool {
130        match self {
131            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
132            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
133            _ => true,
134        }
135    }
136
137    /// Attempt to coerce this value to the target type.
138    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
139        match (self, target) {
140            (_, DataType::Null) => Some(Value::Null),
141            (Value::Null, _) => Some(Value::Null),
142            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
143            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
144            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
145            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
146            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
147            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
148            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
149            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
150            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
151            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
152            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
153            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
154            (
155                Value::Interval {
156                    months,
157                    days,
158                    micros,
159                },
160                DataType::Interval,
161            ) => Some(Value::Interval {
162                months: *months,
163                days: *days,
164                micros: *micros,
165            }),
166            _ => None,
167        }
168    }
169
170    pub fn coerce_into(self, target: DataType) -> Option<Value> {
171        if self.is_null() || target == DataType::Null {
172            return Some(Value::Null);
173        }
174        if self.data_type() == target {
175            return Some(self);
176        }
177        match (self, target) {
178            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
179            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
180            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
181            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
182            (Value::Text(s), DataType::Date) => {
183                crate::datetime::parse_date(&s).ok().map(Value::Date)
184            }
185            (Value::Text(s), DataType::Time) => {
186                crate::datetime::parse_time(&s).ok().map(Value::Time)
187            }
188            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
189                .ok()
190                .map(Value::Timestamp),
191            (Value::Text(s), DataType::Interval) => {
192                crate::datetime::parse_interval(&s)
193                    .ok()
194                    .map(|(m, d, u)| Value::Interval {
195                        months: m,
196                        days: d,
197                        micros: u,
198                    })
199            }
200            // INTEGER → TIMESTAMP: Unix epoch seconds.
201            (Value::Integer(n), DataType::Timestamp) => {
202                n.checked_mul(1_000_000).map(Value::Timestamp)
203            }
204            (Value::Integer(n), DataType::Date) => {
205                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
206                    Some(Value::Date(n as i32))
207                } else {
208                    None
209                }
210            }
211            (Value::Integer(n), DataType::Time) => {
212                if (0..=86_400_000_000).contains(&n) {
213                    Some(Value::Time(n))
214                } else {
215                    None
216                }
217            }
218            (Value::Integer(n), DataType::Interval) => {
219                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
220                    Some(Value::Interval {
221                        months: 0,
222                        days: n as i32,
223                        micros: 0,
224                    })
225                } else {
226                    None
227                }
228            }
229            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
230            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
231            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
232            (Value::Date(d), DataType::Timestamp) => {
233                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
234            }
235            (Value::Timestamp(t), DataType::Date) => {
236                // div_euclid floors correctly for negative µs (pre-1970).
237                let days = t.div_euclid(86_400_000_000);
238                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
239                    Some(Value::Date(days as i32))
240                } else {
241                    None
242                }
243            }
244            (v, DataType::Text)
245                if matches!(
246                    v.data_type(),
247                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
248                ) =>
249            {
250                Some(Value::Text(v.to_string().into()))
251            }
252            (Value::Text(s), DataType::Json) => {
253                crate::json::validate_text(&s).ok()?;
254                Some(Value::Json(s))
255            }
256            (Value::Text(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
257            (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
258            (Value::Json(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
259            (Value::Jsonb(b), DataType::Text) => crate::json::decode_to_text(&b)
260                .ok()
261                .map(|t| Value::Text(t.into())),
262            (Value::Jsonb(b), DataType::Json) => crate::json::decode_to_text(&b)
263                .ok()
264                .map(|t| Value::Json(t.into())),
265            _ => None,
266        }
267    }
268
269    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
270        if matches!(self, Value::Null) {
271            return Some(Value::Null);
272        }
273        if self.data_type() == target {
274            return Some(self.clone());
275        }
276        match (self, target) {
277            (Value::Integer(i), DataType::Real) => {
278                if i.unsigned_abs() <= (1u64 << 53) {
279                    Some(Value::Real(*i as f64))
280                } else {
281                    None
282                }
283            }
284            (Value::Real(r), DataType::Integer) => {
285                if r.is_finite()
286                    && r.fract() == 0.0
287                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
288                {
289                    Some(Value::Integer(*r as i64))
290                } else {
291                    None
292                }
293            }
294            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
295            (Value::Integer(i), DataType::Boolean) => match i {
296                0 => Some(Value::Boolean(false)),
297                1 => Some(Value::Boolean(true)),
298                _ => None,
299            },
300            (Value::Text(s), DataType::Integer) => {
301                let trimmed = s.as_str();
302                let parsed: i64 = trimmed.parse().ok()?;
303                if parsed.to_string() == trimmed {
304                    Some(Value::Integer(parsed))
305                } else {
306                    None
307                }
308            }
309            (Value::Text(s), DataType::Real) => {
310                let trimmed = s.as_str();
311                let parsed: f64 = trimmed.parse().ok()?;
312                if parsed.is_finite() {
313                    Some(Value::Real(parsed))
314                } else {
315                    None
316                }
317            }
318            (Value::Text(_), DataType::Date)
319            | (Value::Text(_), DataType::Time)
320            | (Value::Text(_), DataType::Timestamp)
321            | (Value::Text(_), DataType::Interval)
322            | (Value::Text(_), DataType::Json)
323            | (Value::Text(_), DataType::Jsonb)
324            | (Value::Json(_), DataType::Jsonb)
325            | (Value::Json(_), DataType::Text)
326            | (Value::Jsonb(_), DataType::Json)
327            | (Value::Jsonb(_), DataType::Text) => self.clone().coerce_into(target),
328            (Value::Date(d), DataType::Timestamp) => (*d as i64)
329                .checked_mul(86_400_000_000)
330                .map(Value::Timestamp),
331            (Value::Timestamp(t), DataType::Date) => {
332                if t % 86_400_000_000 == 0 {
333                    let days = t.div_euclid(86_400_000_000);
334                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
335                        Some(Value::Date(days as i32))
336                    } else {
337                        None
338                    }
339                } else {
340                    None
341                }
342            }
343            _ => None,
344        }
345    }
346
347    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
348    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
349        match (self, other) {
350            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
351            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
352            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
353            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
354            _ => None,
355        }
356    }
357}
358
359impl PartialEq for Value {
360    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
361    // normalizes separately (see eval.rs).
362    fn eq(&self, other: &Self) -> bool {
363        match (self, other) {
364            (Value::Null, Value::Null) => true,
365            (Value::Integer(a), Value::Integer(b)) => a == b,
366            (Value::Real(a), Value::Real(b)) => a == b,
367            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
368            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
369            (Value::Text(a), Value::Text(b)) => a == b,
370            (Value::Blob(a), Value::Blob(b)) => a == b,
371            (Value::Boolean(a), Value::Boolean(b)) => a == b,
372            (Value::Time(a), Value::Time(b)) => a == b,
373            (Value::Date(a), Value::Date(b)) => a == b,
374            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
375            (
376                Value::Interval {
377                    months: am,
378                    days: ad,
379                    micros: au,
380                },
381                Value::Interval {
382                    months: bm,
383                    days: bd,
384                    micros: bu,
385                },
386            ) => am == bm && ad == bd && au == bu,
387            (Value::Json(a), Value::Json(b)) => a == b,
388            (Value::Jsonb(a), Value::Jsonb(b)) => a == b,
389            _ => false,
390        }
391    }
392}
393
394impl Eq for Value {}
395
396impl Hash for Value {
397    fn hash<H: Hasher>(&self, state: &mut H) {
398        match self {
399            Value::Null => 0u8.hash(state),
400            Value::Integer(i) => {
401                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
402                // matching the cross-type PartialEq contract.
403                1u8.hash(state);
404                (*i as f64).to_bits().hash(state);
405            }
406            Value::Real(r) => {
407                1u8.hash(state);
408                r.to_bits().hash(state);
409            }
410            Value::Text(s) => {
411                2u8.hash(state);
412                s.hash(state);
413            }
414            Value::Blob(b) => {
415                3u8.hash(state);
416                b.hash(state);
417            }
418            Value::Boolean(b) => {
419                4u8.hash(state);
420                b.hash(state);
421            }
422            Value::Time(t) => {
423                5u8.hash(state);
424                t.hash(state);
425            }
426            Value::Date(d) => {
427                6u8.hash(state);
428                d.hash(state);
429            }
430            Value::Timestamp(t) => {
431                7u8.hash(state);
432                t.hash(state);
433            }
434            Value::Interval {
435                months,
436                days,
437                micros,
438            } => {
439                8u8.hash(state);
440                months.hash(state);
441                days.hash(state);
442                micros.hash(state);
443            }
444            Value::Json(s) => {
445                9u8.hash(state);
446                s.hash(state);
447            }
448            Value::Jsonb(b) => {
449                10u8.hash(state);
450                b.hash(state);
451            }
452        }
453    }
454}
455
456impl PartialOrd for Value {
457    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
458        Some(self.cmp(other))
459    }
460}
461
462impl Ord for Value {
463    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
464    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
465    fn cmp(&self, other: &Self) -> Ordering {
466        match (self, other) {
467            (Value::Null, Value::Null) => Ordering::Equal,
468            (Value::Null, _) => Ordering::Less,
469            (_, Value::Null) => Ordering::Greater,
470
471            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
472            (Value::Boolean(_), _) => Ordering::Less,
473            (_, Value::Boolean(_)) => Ordering::Greater,
474
475            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
476                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
477            }
478            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
479            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
480
481            (Value::Time(a), Value::Time(b)) => a.cmp(b),
482            (Value::Time(_), _) => Ordering::Less,
483            (_, Value::Time(_)) => Ordering::Greater,
484
485            (Value::Date(a), Value::Date(b)) => a.cmp(b),
486            (Value::Date(_), _) => Ordering::Less,
487            (_, Value::Date(_)) => Ordering::Greater,
488
489            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
490            (Value::Timestamp(_), _) => Ordering::Less,
491            (_, Value::Timestamp(_)) => Ordering::Greater,
492
493            (
494                Value::Interval {
495                    months: am,
496                    days: ad,
497                    micros: au,
498                },
499                Value::Interval {
500                    months: bm,
501                    days: bd,
502                    micros: bu,
503                },
504            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
505            (Value::Interval { .. }, _) => Ordering::Less,
506            (_, Value::Interval { .. }) => Ordering::Greater,
507
508            (Value::Json(a), Value::Json(b)) => a.cmp(b),
509            (Value::Json(_), _) => Ordering::Less,
510            (_, Value::Json(_)) => Ordering::Greater,
511
512            (Value::Jsonb(a), Value::Jsonb(b)) => a.as_ref().cmp(b.as_ref()),
513            (Value::Jsonb(_), _) => Ordering::Less,
514            (_, Value::Jsonb(_)) => Ordering::Greater,
515
516            (Value::Text(a), Value::Text(b)) => a.cmp(b),
517            (Value::Text(_), _) => Ordering::Less,
518            (_, Value::Text(_)) => Ordering::Greater,
519
520            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
521        }
522    }
523}
524
525impl fmt::Display for Value {
526    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
527        match self {
528            Value::Null => write!(f, "NULL"),
529            Value::Integer(i) => write!(f, "{i}"),
530            Value::Real(r) => {
531                if r.fract() == 0.0 && r.is_finite() {
532                    write!(f, "{r:.1}")
533                } else {
534                    write!(f, "{r}")
535                }
536            }
537            Value::Text(s) => write!(f, "{s}"),
538            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
539            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
540            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
541            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
542            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
543            Value::Interval {
544                months,
545                days,
546                micros,
547            } => {
548                write!(
549                    f,
550                    "{}",
551                    crate::datetime::format_interval(*months, *days, *micros)
552                )
553            }
554            Value::Json(s) => write!(f, "{s}"),
555            Value::Jsonb(b) => match crate::json::decode_to_text(b) {
556                Ok(s) => write!(f, "{s}"),
557                Err(_) => write!(f, "<invalid jsonb>"),
558            },
559        }
560    }
561}
562
563fn hex_encode(data: &[u8]) -> String {
564    let mut s = String::with_capacity(data.len() * 2);
565    for byte in data {
566        s.push_str(&format!("{byte:02X}"));
567    }
568    s
569}
570
571#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
572#[repr(u8)]
573pub enum Collation {
574    #[default]
575    Binary = 0,
576    NoCase = 1,
577    Rtrim = 2,
578}
579
580impl Collation {
581    pub fn from_tag(tag: u8) -> Option<Self> {
582        match tag {
583            0 => Some(Self::Binary),
584            1 => Some(Self::NoCase),
585            2 => Some(Self::Rtrim),
586            _ => None,
587        }
588    }
589
590    pub fn from_name(name: &str) -> Option<Self> {
591        match name.to_ascii_uppercase().as_str() {
592            "BINARY" => Some(Self::Binary),
593            "NOCASE" => Some(Self::NoCase),
594            "RTRIM" => Some(Self::Rtrim),
595            _ => None,
596        }
597    }
598
599    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
600        match self {
601            Collation::Binary => a.cmp(b),
602            Collation::NoCase => Iterator::cmp(
603                a.chars().map(|c| c.to_ascii_lowercase()),
604                b.chars().map(|c| c.to_ascii_lowercase()),
605            ),
606            Collation::Rtrim => {
607                let la = a.trim_end_matches(' ');
608                let lb = b.trim_end_matches(' ');
609                la.cmp(lb)
610            }
611        }
612    }
613
614    pub fn eq_text(self, a: &str, b: &str) -> bool {
615        match self {
616            Collation::Binary => a == b,
617            Collation::NoCase => a.eq_ignore_ascii_case(b),
618            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
619        }
620    }
621}
622
623/// Column definition.
624#[derive(Debug, Clone)]
625pub struct ColumnDef {
626    pub name: String,
627    pub data_type: DataType,
628    pub nullable: bool,
629    pub position: u16,
630    pub default_expr: Option<Expr>,
631    pub default_sql: Option<String>,
632    pub check_expr: Option<Expr>,
633    pub check_sql: Option<String>,
634    pub check_name: Option<String>,
635    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
636    pub is_with_timezone: bool,
637    pub generated_expr: Option<Expr>,
638    pub generated_sql: Option<String>,
639    pub generated_kind: Option<crate::parser::GeneratedKind>,
640    pub collation: Collation,
641}
642
643#[derive(Debug, Clone, Copy, PartialEq, Eq)]
644pub enum GinOpsClass {
645    /// One entry per (key, value) pair; supports `@>` `?` `?|` `?&`.
646    JsonbOps,
647    /// One entry per hash(path‖value); supports `@>` only, ~3x smaller index.
648    JsonbPathOps,
649}
650
651impl GinOpsClass {
652    pub fn as_tag(self) -> u8 {
653        match self {
654            Self::JsonbOps => 0,
655            Self::JsonbPathOps => 1,
656        }
657    }
658
659    pub fn from_tag(t: u8) -> Option<Self> {
660        match t {
661            0 => Some(Self::JsonbOps),
662            1 => Some(Self::JsonbPathOps),
663            _ => None,
664        }
665    }
666}
667
668#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
669pub enum IndexKind {
670    #[default]
671    BTree,
672    Gin(GinOpsClass),
673}
674
675/// Index definition stored as part of the table schema.
676#[derive(Debug, Clone)]
677pub struct IndexDef {
678    pub name: String,
679    pub columns: Vec<u16>,
680    pub unique: bool,
681    pub predicate_sql: Option<String>,
682    pub predicate_expr: Option<crate::parser::Expr>,
683    pub collations: Vec<Collation>,
684    pub kind: IndexKind,
685}
686
687/// View definition stored in the _views metadata table.
688#[derive(Debug, Clone)]
689pub struct ViewDef {
690    pub name: String,
691    pub sql: String,
692    pub column_aliases: Vec<String>,
693}
694
695const VIEW_DEF_VERSION: u8 = 1;
696
697impl ViewDef {
698    pub fn serialize(&self) -> Vec<u8> {
699        let mut buf = Vec::new();
700        buf.push(VIEW_DEF_VERSION);
701
702        let name_bytes = self.name.as_bytes();
703        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
704        buf.extend_from_slice(name_bytes);
705
706        let sql_bytes = self.sql.as_bytes();
707        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
708        buf.extend_from_slice(sql_bytes);
709
710        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
711        for alias in &self.column_aliases {
712            let alias_bytes = alias.as_bytes();
713            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
714            buf.extend_from_slice(alias_bytes);
715        }
716
717        buf
718    }
719
720    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
721        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
722            return Err(crate::error::SqlError::InvalidValue(
723                "invalid view definition version".into(),
724            ));
725        }
726        let mut pos = 1;
727
728        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
729        pos += 2;
730        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
731        pos += name_len;
732
733        let sql_len =
734            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
735        pos += 4;
736        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
737        pos += sql_len;
738
739        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
740        pos += 2;
741        let mut column_aliases = Vec::with_capacity(alias_count);
742        for _ in 0..alias_count {
743            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
744            pos += 2;
745            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
746            pos += alias_len;
747            column_aliases.push(alias);
748        }
749
750        Ok(Self {
751            name,
752            sql,
753            column_aliases,
754        })
755    }
756}
757
758/// Table-level CHECK constraint stored in schema.
759#[derive(Debug, Clone)]
760pub struct TableCheckDef {
761    pub name: Option<String>,
762    pub expr: Expr,
763    pub sql: String,
764}
765
766/// Foreign key definition stored in schema.
767#[derive(Debug, Clone)]
768pub struct ForeignKeySchemaEntry {
769    pub name: Option<String>,
770    pub columns: Vec<u16>,
771    pub foreign_table: String,
772    pub referred_columns: Vec<String>,
773    pub on_delete: crate::parser::ReferentialAction,
774    pub on_update: crate::parser::ReferentialAction,
775}
776
777/// Table schema stored in the _schema table.
778#[derive(Debug, Clone)]
779pub struct TableSchema {
780    pub name: String,
781    pub columns: Vec<ColumnDef>,
782    pub primary_key_columns: Vec<u16>,
783    pub indices: Vec<IndexDef>,
784    pub check_constraints: Vec<TableCheckDef>,
785    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
786    pub flags: u8,
787    pk_idx_cache: Vec<usize>,
788    non_pk_idx_cache: Vec<usize>,
789    /// Sorted physical slots dropped via DROP COLUMN.
790    dropped_non_pk_slots: Vec<u16>,
791    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
792    decode_mapping_cache: Vec<usize>,
793    /// Logical non-PK order -> physical encoding position.
794    encoding_positions_cache: Vec<u16>,
795    has_virtual_columns_cache: bool,
796}
797
798impl TableSchema {
799    pub fn new(
800        name: String,
801        columns: Vec<ColumnDef>,
802        primary_key_columns: Vec<u16>,
803        indices: Vec<IndexDef>,
804        check_constraints: Vec<TableCheckDef>,
805        foreign_keys: Vec<ForeignKeySchemaEntry>,
806    ) -> Self {
807        Self::with_drops(
808            name,
809            columns,
810            primary_key_columns,
811            indices,
812            check_constraints,
813            foreign_keys,
814            vec![],
815        )
816    }
817
818    pub fn with_drops(
819        name: String,
820        columns: Vec<ColumnDef>,
821        primary_key_columns: Vec<u16>,
822        indices: Vec<IndexDef>,
823        check_constraints: Vec<TableCheckDef>,
824        foreign_keys: Vec<ForeignKeySchemaEntry>,
825        dropped_non_pk_slots: Vec<u16>,
826    ) -> Self {
827        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
828        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
829            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
830            .collect();
831
832        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
833        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
834        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
835
836        let mut drop_idx = 0;
837        let mut live_idx = 0;
838        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
839            if drop_idx < dropped_non_pk_slots.len()
840                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
841            {
842                drop_idx += 1;
843            } else {
844                *slot = non_pk_idx_cache[live_idx];
845                encoding_positions_cache.push(phys_pos as u16);
846                live_idx += 1;
847            }
848        }
849
850        let has_virtual_columns_cache = columns.iter().any(|c| {
851            matches!(
852                c.generated_kind,
853                Some(crate::parser::GeneratedKind::Virtual)
854            )
855        });
856
857        Self {
858            name,
859            columns,
860            primary_key_columns,
861            indices,
862            check_constraints,
863            foreign_keys,
864            flags: 0,
865            pk_idx_cache,
866            non_pk_idx_cache,
867            dropped_non_pk_slots,
868            decode_mapping_cache,
869            encoding_positions_cache,
870            has_virtual_columns_cache,
871        }
872    }
873
874    pub fn is_strict(&self) -> bool {
875        self.flags & TABLE_FLAG_STRICT != 0
876    }
877
878    pub fn has_virtual_columns(&self) -> bool {
879        self.has_virtual_columns_cache
880    }
881
882    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
883    pub fn rebuild(self) -> Self {
884        let drops = self.dropped_non_pk_slots;
885        Self::with_drops(
886            self.name,
887            self.columns,
888            self.primary_key_columns,
889            self.indices,
890            self.check_constraints,
891            self.foreign_keys,
892            drops,
893        )
894    }
895
896    /// Returns true if any column-level or table-level CHECK constraints exist.
897    pub fn has_checks(&self) -> bool {
898        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
899    }
900
901    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
902    pub fn decode_col_mapping(&self) -> &[usize] {
903        &self.decode_mapping_cache
904    }
905
906    /// Logical non-PK order -> physical encoding position.
907    pub fn encoding_positions(&self) -> &[u16] {
908        &self.encoding_positions_cache
909    }
910
911    /// Total physical non-PK column count (live + dropped slots).
912    pub fn physical_non_pk_count(&self) -> usize {
913        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
914    }
915
916    /// Physical encoding slots that have been dropped via DROP COLUMN.
917    pub fn dropped_non_pk_slots(&self) -> &[u16] {
918        &self.dropped_non_pk_slots
919    }
920
921    /// Return a new schema with the column at `drop_pos` marked as dropped.
922    pub fn without_column(&self, drop_pos: usize) -> Self {
923        let non_pk_order = self
924            .non_pk_idx_cache
925            .iter()
926            .position(|&i| i == drop_pos)
927            .expect("cannot drop PK column via without_column");
928        let physical_slot = self.encoding_positions_cache[non_pk_order];
929
930        let mut new_dropped = self.dropped_non_pk_slots.clone();
931        new_dropped.push(physical_slot);
932        new_dropped.sort();
933
934        let dropped_name = &self.columns[drop_pos].name;
935        let drop_pos_u16 = drop_pos as u16;
936
937        let mut columns: Vec<ColumnDef> = self
938            .columns
939            .iter()
940            .enumerate()
941            .filter(|(i, _)| *i != drop_pos)
942            .map(|(_, c)| {
943                let mut col = c.clone();
944                if col.position > drop_pos_u16 {
945                    col.position -= 1;
946                }
947                col
948            })
949            .collect();
950        for (i, col) in columns.iter_mut().enumerate() {
951            col.position = i as u16;
952        }
953
954        let primary_key_columns: Vec<u16> = self
955            .primary_key_columns
956            .iter()
957            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
958            .collect();
959
960        let indices: Vec<IndexDef> = self
961            .indices
962            .iter()
963            .map(|idx| IndexDef {
964                name: idx.name.clone(),
965                columns: idx
966                    .columns
967                    .iter()
968                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
969                    .collect(),
970                unique: idx.unique,
971                predicate_sql: idx.predicate_sql.clone(),
972                predicate_expr: idx.predicate_expr.clone(),
973                collations: idx.collations.clone(),
974                kind: idx.kind,
975            })
976            .collect();
977
978        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
979            .foreign_keys
980            .iter()
981            .map(|fk| ForeignKeySchemaEntry {
982                name: fk.name.clone(),
983                columns: fk
984                    .columns
985                    .iter()
986                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
987                    .collect(),
988                foreign_table: fk.foreign_table.clone(),
989                referred_columns: fk.referred_columns.clone(),
990                on_delete: fk.on_delete,
991                on_update: fk.on_update,
992            })
993            .collect();
994
995        // Filter out table-level CHECKs that reference the dropped column
996        let dropped_lower = dropped_name.to_ascii_lowercase();
997        let check_constraints: Vec<TableCheckDef> = self
998            .check_constraints
999            .iter()
1000            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
1001            .cloned()
1002            .collect();
1003
1004        Self::with_drops(
1005            self.name.clone(),
1006            columns,
1007            primary_key_columns,
1008            indices,
1009            check_constraints,
1010            foreign_keys,
1011            new_dropped,
1012        )
1013    }
1014}
1015
1016const SCHEMA_VERSION: u8 = 9;
1017pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
1018
1019fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
1020    match s {
1021        Some(s) => {
1022            let bytes = s.as_bytes();
1023            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1024            buf.extend_from_slice(bytes);
1025        }
1026        None => buf.extend_from_slice(&0u16.to_le_bytes()),
1027    }
1028}
1029
1030fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
1031    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1032    *pos += 2;
1033    if len == 0 {
1034        None
1035    } else {
1036        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1037        *pos += len;
1038        Some(s)
1039    }
1040}
1041
1042fn read_string(data: &[u8], pos: &mut usize) -> String {
1043    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1044    *pos += 2;
1045    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1046    *pos += len;
1047    s
1048}
1049
1050impl TableSchema {
1051    pub fn serialize(&self) -> Vec<u8> {
1052        let mut buf = Vec::new();
1053        buf.push(SCHEMA_VERSION);
1054
1055        let name_bytes = self.name.as_bytes();
1056        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
1057        buf.extend_from_slice(name_bytes);
1058
1059        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
1060
1061        for col in &self.columns {
1062            let col_name = col.name.as_bytes();
1063            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
1064            buf.extend_from_slice(col_name);
1065            buf.push(col.data_type.type_tag());
1066            buf.push(if col.nullable { 1 } else { 0 });
1067            buf.extend_from_slice(&col.position.to_le_bytes());
1068        }
1069
1070        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
1071        for &pk_idx in &self.primary_key_columns {
1072            buf.extend_from_slice(&pk_idx.to_le_bytes());
1073        }
1074
1075        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
1076        for idx in &self.indices {
1077            let idx_name = idx.name.as_bytes();
1078            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
1079            buf.extend_from_slice(idx_name);
1080            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
1081            for &col_idx in &idx.columns {
1082                buf.extend_from_slice(&col_idx.to_le_bytes());
1083            }
1084            buf.push(if idx.unique { 1 } else { 0 });
1085        }
1086
1087        for col in &self.columns {
1088            let mut flags: u8 = 0;
1089            if col.default_sql.is_some() {
1090                flags |= 1;
1091            }
1092            if col.check_sql.is_some() {
1093                flags |= 2;
1094            }
1095            buf.push(flags);
1096            if let Some(ref sql) = col.default_sql {
1097                let bytes = sql.as_bytes();
1098                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1099                buf.extend_from_slice(bytes);
1100            }
1101            if let Some(ref sql) = col.check_sql {
1102                let bytes = sql.as_bytes();
1103                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1104                buf.extend_from_slice(bytes);
1105                write_opt_string(&mut buf, &col.check_name);
1106            }
1107        }
1108
1109        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1110        for chk in &self.check_constraints {
1111            write_opt_string(&mut buf, &chk.name);
1112            let sql_bytes = chk.sql.as_bytes();
1113            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1114            buf.extend_from_slice(sql_bytes);
1115        }
1116
1117        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1118        for fk in &self.foreign_keys {
1119            write_opt_string(&mut buf, &fk.name);
1120            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1121            for &col_idx in &fk.columns {
1122                buf.extend_from_slice(&col_idx.to_le_bytes());
1123            }
1124            let ft_bytes = fk.foreign_table.as_bytes();
1125            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1126            buf.extend_from_slice(ft_bytes);
1127            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1128            for rc in &fk.referred_columns {
1129                let rc_bytes = rc.as_bytes();
1130                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1131                buf.extend_from_slice(rc_bytes);
1132            }
1133        }
1134
1135        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1136        for &slot in &self.dropped_non_pk_slots {
1137            buf.extend_from_slice(&slot.to_le_bytes());
1138        }
1139
1140        for col in &self.columns {
1141            let kind_tag: u8 = match col.generated_kind {
1142                None => 0,
1143                Some(crate::parser::GeneratedKind::Stored) => 1,
1144                Some(crate::parser::GeneratedKind::Virtual) => 2,
1145            };
1146            buf.push(kind_tag);
1147            if kind_tag != 0 {
1148                let sql = col.generated_sql.as_deref().unwrap_or("");
1149                let bytes = sql.as_bytes();
1150                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1151                buf.extend_from_slice(bytes);
1152            }
1153        }
1154
1155        for idx in &self.indices {
1156            match &idx.predicate_sql {
1157                Some(sql) => {
1158                    buf.push(1);
1159                    let bytes = sql.as_bytes();
1160                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1161                    buf.extend_from_slice(bytes);
1162                }
1163                None => buf.push(0),
1164            }
1165        }
1166
1167        for fk in &self.foreign_keys {
1168            buf.push(fk.on_delete as u8);
1169            buf.push(fk.on_update as u8);
1170        }
1171
1172        for col in &self.columns {
1173            buf.push(col.collation as u8);
1174        }
1175        for idx in &self.indices {
1176            let n = idx.collations.len() as u16;
1177            buf.extend_from_slice(&n.to_le_bytes());
1178            for c in &idx.collations {
1179                buf.push(*c as u8);
1180            }
1181        }
1182        for idx in &self.indices {
1183            match idx.kind {
1184                IndexKind::BTree => buf.push(0),
1185                IndexKind::Gin(ops) => {
1186                    buf.push(1);
1187                    buf.push(ops.as_tag());
1188                }
1189            }
1190        }
1191        buf.push(self.flags);
1192
1193        buf
1194    }
1195
1196    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1197        let mut pos = 0;
1198
1199        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | SCHEMA_VERSION) {
1200            return Err(crate::error::SqlError::InvalidValue(
1201                "invalid schema version".into(),
1202            ));
1203        }
1204        let version = data[0];
1205        pos += 1;
1206
1207        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1208        pos += 2;
1209        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1210        pos += name_len;
1211
1212        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1213        pos += 2;
1214
1215        let mut columns = Vec::with_capacity(col_count);
1216        for _ in 0..col_count {
1217            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1218            pos += 2;
1219            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1220            pos += col_name_len;
1221            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
1222                crate::error::SqlError::InvalidValue("unknown data type tag".into())
1223            })?;
1224            pos += 1;
1225            let nullable = data[pos] != 0;
1226            pos += 1;
1227            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1228            pos += 2;
1229            columns.push(ColumnDef {
1230                name: col_name,
1231                data_type,
1232                nullable,
1233                position,
1234                default_expr: None,
1235                default_sql: None,
1236                check_expr: None,
1237                check_sql: None,
1238                check_name: None,
1239                is_with_timezone: false,
1240                generated_expr: None,
1241                generated_sql: None,
1242                generated_kind: None,
1243                collation: Collation::Binary,
1244            });
1245        }
1246
1247        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1248        pos += 2;
1249        let mut primary_key_columns = Vec::with_capacity(pk_count);
1250        for _ in 0..pk_count {
1251            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1252            pos += 2;
1253            primary_key_columns.push(pk_idx);
1254        }
1255
1256        let indices = if version >= 2 && pos + 2 <= data.len() {
1257            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1258            pos += 2;
1259            let mut idxs = Vec::with_capacity(idx_count);
1260            for _ in 0..idx_count {
1261                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1262                pos += 2;
1263                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1264                pos += idx_name_len;
1265                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1266                pos += 2;
1267                let mut cols = Vec::with_capacity(col_count);
1268                for _ in 0..col_count {
1269                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1270                    pos += 2;
1271                    cols.push(col_idx);
1272                }
1273                let unique = data[pos] != 0;
1274                pos += 1;
1275                idxs.push(IndexDef {
1276                    name: idx_name,
1277                    columns: cols,
1278                    unique,
1279                    predicate_sql: None,
1280                    predicate_expr: None,
1281                    collations: vec![],
1282                    kind: IndexKind::default(),
1283                });
1284            }
1285            idxs
1286        } else {
1287            vec![]
1288        };
1289
1290        let mut check_constraints = Vec::new();
1291        let mut foreign_keys = Vec::new();
1292
1293        if version >= 3 && pos < data.len() {
1294            for col in &mut columns {
1295                let flags = data[pos];
1296                pos += 1;
1297                if flags & 1 != 0 {
1298                    let sql = read_string(data, &mut pos);
1299                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1300                        crate::error::SqlError::InvalidValue(format!(
1301                            "cannot parse DEFAULT expression: {sql}"
1302                        ))
1303                    })?);
1304                    col.default_sql = Some(sql);
1305                }
1306                if flags & 2 != 0 {
1307                    let sql = read_string(data, &mut pos);
1308                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1309                        crate::error::SqlError::InvalidValue(format!(
1310                            "cannot parse CHECK expression: {sql}"
1311                        ))
1312                    })?);
1313                    col.check_sql = Some(sql);
1314                    col.check_name = read_opt_string(data, &mut pos);
1315                }
1316            }
1317
1318            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1319            pos += 2;
1320            for _ in 0..chk_count {
1321                let name = read_opt_string(data, &mut pos);
1322                let sql = read_string(data, &mut pos);
1323                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1324                    crate::error::SqlError::InvalidValue(format!(
1325                        "cannot parse CHECK expression: {sql}"
1326                    ))
1327                })?;
1328                check_constraints.push(TableCheckDef { name, expr, sql });
1329            }
1330
1331            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1332            pos += 2;
1333            for _ in 0..fk_count {
1334                let name = read_opt_string(data, &mut pos);
1335                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1336                pos += 2;
1337                let mut cols = Vec::with_capacity(col_count);
1338                for _ in 0..col_count {
1339                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1340                    pos += 2;
1341                    cols.push(col_idx);
1342                }
1343                let foreign_table = read_string(data, &mut pos);
1344                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1345                pos += 2;
1346                let mut referred_columns = Vec::with_capacity(ref_count);
1347                for _ in 0..ref_count {
1348                    referred_columns.push(read_string(data, &mut pos));
1349                }
1350                foreign_keys.push(ForeignKeySchemaEntry {
1351                    name,
1352                    columns: cols,
1353                    foreign_table,
1354                    referred_columns,
1355                    on_delete: crate::parser::ReferentialAction::NoAction,
1356                    on_update: crate::parser::ReferentialAction::NoAction,
1357                });
1358            }
1359        }
1360        let mut dropped_non_pk_slots = Vec::new();
1361        if version >= 4 && pos + 2 <= data.len() {
1362            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1363            pos += 2;
1364            for _ in 0..slot_count {
1365                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1366                pos += 2;
1367                dropped_non_pk_slots.push(slot);
1368            }
1369        }
1370        if version >= 5 && pos < data.len() {
1371            for col in &mut columns {
1372                let kind_tag = data[pos];
1373                pos += 1;
1374                if kind_tag != 0 {
1375                    let len = u32::from_le_bytes([
1376                        data[pos],
1377                        data[pos + 1],
1378                        data[pos + 2],
1379                        data[pos + 3],
1380                    ]) as usize;
1381                    pos += 4;
1382                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1383                    pos += len;
1384                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1385                        crate::error::SqlError::InvalidValue(format!(
1386                            "cannot parse GENERATED expression: {sql}"
1387                        ))
1388                    })?;
1389                    col.generated_sql = Some(sql);
1390                    col.generated_expr = Some(expr);
1391                    col.generated_kind = Some(match kind_tag {
1392                        1 => crate::parser::GeneratedKind::Stored,
1393                        2 => crate::parser::GeneratedKind::Virtual,
1394                        _ => {
1395                            return Err(crate::error::SqlError::InvalidValue(
1396                                "unknown GENERATED kind tag".into(),
1397                            ));
1398                        }
1399                    });
1400                }
1401            }
1402        }
1403        let mut indices = indices;
1404        if version >= 6 && pos < data.len() {
1405            for idx in &mut indices {
1406                let flag = data[pos];
1407                pos += 1;
1408                if flag == 1 {
1409                    let len = u32::from_le_bytes([
1410                        data[pos],
1411                        data[pos + 1],
1412                        data[pos + 2],
1413                        data[pos + 3],
1414                    ]) as usize;
1415                    pos += 4;
1416                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1417                    pos += len;
1418                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1419                        crate::error::SqlError::InvalidValue(format!(
1420                            "cannot parse partial-index predicate: {sql}"
1421                        ))
1422                    })?;
1423                    idx.predicate_sql = Some(sql);
1424                    idx.predicate_expr = Some(expr);
1425                }
1426            }
1427            for fk in &mut foreign_keys {
1428                fk.on_delete =
1429                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1430                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
1431                    })?;
1432                pos += 1;
1433                fk.on_update =
1434                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1435                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
1436                    })?;
1437                pos += 1;
1438            }
1439        }
1440
1441        let mut columns = columns;
1442        let mut indices = indices;
1443        let mut flags: u8 = 0;
1444        if version >= 7 && pos < data.len() {
1445            for col in &mut columns {
1446                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
1447                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
1448                })?;
1449                pos += 1;
1450            }
1451            for idx in &mut indices {
1452                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1453                pos += 2;
1454                let mut colls = Vec::with_capacity(n);
1455                for _ in 0..n {
1456                    colls.push(Collation::from_tag(data[pos]).ok_or_else(|| {
1457                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
1458                    })?);
1459                    pos += 1;
1460                }
1461                idx.collations = colls;
1462            }
1463            if version >= 9 {
1464                for idx in &mut indices {
1465                    if pos >= data.len() {
1466                        break;
1467                    }
1468                    let tag = data[pos];
1469                    pos += 1;
1470                    idx.kind = match tag {
1471                        0 => IndexKind::BTree,
1472                        1 => {
1473                            if pos >= data.len() {
1474                                return Err(crate::error::SqlError::InvalidValue(
1475                                    "GIN index missing opclass tag".into(),
1476                                ));
1477                            }
1478                            let ops = GinOpsClass::from_tag(data[pos]).ok_or_else(|| {
1479                                crate::error::SqlError::InvalidValue(
1480                                    "unknown GIN opclass tag".into(),
1481                                )
1482                            })?;
1483                            pos += 1;
1484                            IndexKind::Gin(ops)
1485                        }
1486                        _ => {
1487                            return Err(crate::error::SqlError::InvalidValue(
1488                                "unknown IndexKind tag".into(),
1489                            ));
1490                        }
1491                    };
1492                }
1493            }
1494            if pos < data.len() {
1495                flags = data[pos];
1496                pos += 1;
1497            }
1498        }
1499        let _ = pos;
1500
1501        let mut schema = Self::with_drops(
1502            name,
1503            columns,
1504            primary_key_columns,
1505            indices,
1506            check_constraints,
1507            foreign_keys,
1508            dropped_non_pk_slots,
1509        );
1510        schema.flags = flags;
1511        Ok(schema)
1512    }
1513
1514    /// Get column index by name (case-insensitive).
1515    pub fn column_index(&self, name: &str) -> Option<usize> {
1516        self.columns
1517            .iter()
1518            .position(|c| c.name.eq_ignore_ascii_case(name))
1519    }
1520
1521    /// Get indices of non-PK columns (columns stored in the B+ tree value).
1522    pub fn non_pk_indices(&self) -> &[usize] {
1523        &self.non_pk_idx_cache
1524    }
1525
1526    /// Get the PK column indices as usize.
1527    pub fn pk_indices(&self) -> &[usize] {
1528        &self.pk_idx_cache
1529    }
1530
1531    /// Get index definition by name (case-insensitive).
1532    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
1533        let lower = name.to_ascii_lowercase();
1534        self.indices.iter().find(|i| i.name == lower)
1535    }
1536
1537    /// Get the KV table name for an index.
1538    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
1539        format!("__idx_{table_name}_{index_name}").into_bytes()
1540    }
1541}
1542
1543/// Result of executing a SQL statement.
1544#[derive(Debug)]
1545pub enum ExecutionResult {
1546    RowsAffected(u64),
1547    Query(QueryResult),
1548    Ok,
1549}
1550
1551/// Result of a SELECT query.
1552#[derive(Debug, Clone)]
1553pub struct QueryResult {
1554    pub columns: Vec<String>,
1555    pub rows: Vec<Vec<Value>>,
1556}
1557
1558#[cfg(test)]
1559#[path = "types_tests.rs"]
1560mod tests;