Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6pub use compact_str::CompactString;
7
8use crate::parser::Expr;
9
10/// SQL data types.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum DataType {
13    Null,
14    Integer,
15    Real,
16    Text,
17    Blob,
18    Boolean,
19    Time,
20    Date,
21    Timestamp,
22    Interval,
23    Json,
24    Jsonb,
25    TsVector,
26    TsQuery,
27}
28
29impl DataType {
30    pub fn type_tag(self) -> u8 {
31        match self {
32            DataType::Null => 0,
33            DataType::Blob => 1,
34            DataType::Text => 2,
35            DataType::Boolean => 3,
36            DataType::Integer => 4,
37            DataType::Real => 5,
38            DataType::Time => 6,
39            DataType::Date => 7,
40            DataType::Timestamp => 8,
41            DataType::Interval => 9,
42            DataType::Json => 10,
43            DataType::Jsonb => 11,
44            DataType::TsVector => 12,
45            DataType::TsQuery => 13,
46        }
47    }
48
49    pub fn from_tag(tag: u8) -> Option<Self> {
50        match tag {
51            0 => Some(DataType::Null),
52            1 => Some(DataType::Blob),
53            2 => Some(DataType::Text),
54            3 => Some(DataType::Boolean),
55            4 => Some(DataType::Integer),
56            5 => Some(DataType::Real),
57            6 => Some(DataType::Time),
58            7 => Some(DataType::Date),
59            8 => Some(DataType::Timestamp),
60            9 => Some(DataType::Interval),
61            10 => Some(DataType::Json),
62            11 => Some(DataType::Jsonb),
63            12 => Some(DataType::TsVector),
64            13 => Some(DataType::TsQuery),
65            _ => None,
66        }
67    }
68}
69
70impl fmt::Display for DataType {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        match self {
73            DataType::Null => write!(f, "NULL"),
74            DataType::Integer => write!(f, "INTEGER"),
75            DataType::Real => write!(f, "REAL"),
76            DataType::Text => write!(f, "TEXT"),
77            DataType::Blob => write!(f, "BLOB"),
78            DataType::Boolean => write!(f, "BOOLEAN"),
79            DataType::Time => write!(f, "TIME"),
80            DataType::Date => write!(f, "DATE"),
81            DataType::Timestamp => write!(f, "TIMESTAMP"),
82            DataType::Interval => write!(f, "INTERVAL"),
83            DataType::Json => write!(f, "JSON"),
84            DataType::Jsonb => write!(f, "JSONB"),
85            DataType::TsVector => write!(f, "TSVECTOR"),
86            DataType::TsQuery => write!(f, "TSQUERY"),
87        }
88    }
89}
90
91/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
92/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
93#[derive(Debug, Clone, Default)]
94pub enum Value {
95    #[default]
96    Null,
97    Integer(i64),
98    Real(f64),
99    Text(CompactString),
100    Blob(Vec<u8>),
101    Boolean(bool),
102    Time(i64),
103    Date(i32),
104    Timestamp(i64),
105    Interval {
106        months: i32,
107        days: i32,
108        micros: i64,
109    },
110    Json(CompactString),
111    Jsonb(Arc<[u8]>),
112    TsVector(Arc<[u8]>),
113    TsQuery(Arc<[u8]>),
114}
115
116impl Value {
117    pub fn data_type(&self) -> DataType {
118        match self {
119            Value::Null => DataType::Null,
120            Value::Integer(_) => DataType::Integer,
121            Value::Real(_) => DataType::Real,
122            Value::Text(_) => DataType::Text,
123            Value::Blob(_) => DataType::Blob,
124            Value::Boolean(_) => DataType::Boolean,
125            Value::Time(_) => DataType::Time,
126            Value::Date(_) => DataType::Date,
127            Value::Timestamp(_) => DataType::Timestamp,
128            Value::Interval { .. } => DataType::Interval,
129            Value::Json(_) => DataType::Json,
130            Value::Jsonb(_) => DataType::Jsonb,
131            Value::TsVector(_) => DataType::TsVector,
132            Value::TsQuery(_) => DataType::TsQuery,
133        }
134    }
135
136    pub fn is_null(&self) -> bool {
137        matches!(self, Value::Null)
138    }
139
140    /// Returns true for `±infinity` sentinel values on DATE / TIMESTAMP; false otherwise.
141    pub fn is_finite_temporal(&self) -> bool {
142        match self {
143            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
144            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
145            _ => true,
146        }
147    }
148
149    /// Attempt to coerce this value to the target type.
150    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
151        match (self, target) {
152            (_, DataType::Null) => Some(Value::Null),
153            (Value::Null, _) => Some(Value::Null),
154            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
155            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
156            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
157            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
158            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
159            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
160            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
161            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
162            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
163            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
164            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
165            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
166            (Value::TsVector(b), DataType::TsVector) => Some(Value::TsVector(b.clone())),
167            (Value::TsQuery(b), DataType::TsQuery) => Some(Value::TsQuery(b.clone())),
168            (
169                Value::Interval {
170                    months,
171                    days,
172                    micros,
173                },
174                DataType::Interval,
175            ) => Some(Value::Interval {
176                months: *months,
177                days: *days,
178                micros: *micros,
179            }),
180            _ => None,
181        }
182    }
183
184    pub fn coerce_into(self, target: DataType) -> Option<Value> {
185        if self.is_null() || target == DataType::Null {
186            return Some(Value::Null);
187        }
188        if self.data_type() == target {
189            return Some(self);
190        }
191        match (self, target) {
192            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
193            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
194            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
195            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
196            (Value::Text(s), DataType::Date) => {
197                crate::datetime::parse_date(&s).ok().map(Value::Date)
198            }
199            (Value::Text(s), DataType::Time) => {
200                crate::datetime::parse_time(&s).ok().map(Value::Time)
201            }
202            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
203                .ok()
204                .map(Value::Timestamp),
205            (Value::Text(s), DataType::Interval) => {
206                crate::datetime::parse_interval(&s)
207                    .ok()
208                    .map(|(m, d, u)| Value::Interval {
209                        months: m,
210                        days: d,
211                        micros: u,
212                    })
213            }
214            // INTEGER → TIMESTAMP: Unix epoch seconds.
215            (Value::Integer(n), DataType::Timestamp) => {
216                n.checked_mul(1_000_000).map(Value::Timestamp)
217            }
218            (Value::Integer(n), DataType::Date) => {
219                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
220                    Some(Value::Date(n as i32))
221                } else {
222                    None
223                }
224            }
225            (Value::Integer(n), DataType::Time) => {
226                if (0..=86_400_000_000).contains(&n) {
227                    Some(Value::Time(n))
228                } else {
229                    None
230                }
231            }
232            (Value::Integer(n), DataType::Interval) => {
233                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
234                    Some(Value::Interval {
235                        months: 0,
236                        days: n as i32,
237                        micros: 0,
238                    })
239                } else {
240                    None
241                }
242            }
243            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
244            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
245            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
246            (Value::Date(d), DataType::Timestamp) => {
247                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
248            }
249            (Value::Timestamp(t), DataType::Date) => {
250                // div_euclid floors correctly for negative µs (pre-1970).
251                let days = t.div_euclid(86_400_000_000);
252                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
253                    Some(Value::Date(days as i32))
254                } else {
255                    None
256                }
257            }
258            (v, DataType::Text)
259                if matches!(
260                    v.data_type(),
261                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
262                ) =>
263            {
264                Some(Value::Text(v.to_string().into()))
265            }
266            (Value::Text(s), DataType::Json) => {
267                crate::json::validate_text(&s).ok()?;
268                Some(Value::Json(s))
269            }
270            (Value::Text(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
271            (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
272            (Value::Json(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
273            (Value::Jsonb(b), DataType::Text) => crate::json::decode_to_text(&b)
274                .ok()
275                .map(|t| Value::Text(t.into())),
276            (Value::Jsonb(b), DataType::Json) => crate::json::decode_to_text(&b)
277                .ok()
278                .map(|t| Value::Json(t.into())),
279            _ => None,
280        }
281    }
282
283    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
284        if matches!(self, Value::Null) {
285            return Some(Value::Null);
286        }
287        if self.data_type() == target {
288            return Some(self.clone());
289        }
290        match (self, target) {
291            (Value::Integer(i), DataType::Real) => {
292                if i.unsigned_abs() <= (1u64 << 53) {
293                    Some(Value::Real(*i as f64))
294                } else {
295                    None
296                }
297            }
298            (Value::Real(r), DataType::Integer) => {
299                if r.is_finite()
300                    && r.fract() == 0.0
301                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
302                {
303                    Some(Value::Integer(*r as i64))
304                } else {
305                    None
306                }
307            }
308            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
309            (Value::Integer(i), DataType::Boolean) => match i {
310                0 => Some(Value::Boolean(false)),
311                1 => Some(Value::Boolean(true)),
312                _ => None,
313            },
314            (Value::Text(s), DataType::Integer) => {
315                let trimmed = s.as_str();
316                let parsed: i64 = trimmed.parse().ok()?;
317                if parsed.to_string() == trimmed {
318                    Some(Value::Integer(parsed))
319                } else {
320                    None
321                }
322            }
323            (Value::Text(s), DataType::Real) => {
324                let trimmed = s.as_str();
325                let parsed: f64 = trimmed.parse().ok()?;
326                if parsed.is_finite() {
327                    Some(Value::Real(parsed))
328                } else {
329                    None
330                }
331            }
332            (Value::Text(_), DataType::Date)
333            | (Value::Text(_), DataType::Time)
334            | (Value::Text(_), DataType::Timestamp)
335            | (Value::Text(_), DataType::Interval)
336            | (Value::Text(_), DataType::Json)
337            | (Value::Text(_), DataType::Jsonb)
338            | (Value::Json(_), DataType::Jsonb)
339            | (Value::Json(_), DataType::Text)
340            | (Value::Jsonb(_), DataType::Json)
341            | (Value::Jsonb(_), DataType::Text) => self.clone().coerce_into(target),
342            (Value::Date(d), DataType::Timestamp) => (*d as i64)
343                .checked_mul(86_400_000_000)
344                .map(Value::Timestamp),
345            (Value::Timestamp(t), DataType::Date) => {
346                if t % 86_400_000_000 == 0 {
347                    let days = t.div_euclid(86_400_000_000);
348                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
349                        Some(Value::Date(days as i32))
350                    } else {
351                        None
352                    }
353                } else {
354                    None
355                }
356            }
357            _ => None,
358        }
359    }
360
361    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
362    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
363        match (self, other) {
364            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
365            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
366            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
367            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
368            _ => None,
369        }
370    }
371}
372
373impl PartialEq for Value {
374    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
375    // normalizes separately (see eval.rs).
376    fn eq(&self, other: &Self) -> bool {
377        match (self, other) {
378            (Value::Null, Value::Null) => true,
379            (Value::Integer(a), Value::Integer(b)) => a == b,
380            (Value::Real(a), Value::Real(b)) => a == b,
381            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
382            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
383            (Value::Text(a), Value::Text(b)) => a == b,
384            (Value::Blob(a), Value::Blob(b)) => a == b,
385            (Value::Boolean(a), Value::Boolean(b)) => a == b,
386            (Value::Time(a), Value::Time(b)) => a == b,
387            (Value::Date(a), Value::Date(b)) => a == b,
388            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
389            (
390                Value::Interval {
391                    months: am,
392                    days: ad,
393                    micros: au,
394                },
395                Value::Interval {
396                    months: bm,
397                    days: bd,
398                    micros: bu,
399                },
400            ) => am == bm && ad == bd && au == bu,
401            (Value::Json(a), Value::Json(b)) => a == b,
402            (Value::Jsonb(a), Value::Jsonb(b)) => a == b,
403            (Value::TsVector(a), Value::TsVector(b)) => a == b,
404            (Value::TsQuery(a), Value::TsQuery(b)) => a == b,
405            _ => false,
406        }
407    }
408}
409
410impl Eq for Value {}
411
412impl Hash for Value {
413    fn hash<H: Hasher>(&self, state: &mut H) {
414        match self {
415            Value::Null => 0u8.hash(state),
416            Value::Integer(i) => {
417                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
418                // matching the cross-type PartialEq contract.
419                1u8.hash(state);
420                (*i as f64).to_bits().hash(state);
421            }
422            Value::Real(r) => {
423                1u8.hash(state);
424                r.to_bits().hash(state);
425            }
426            Value::Text(s) => {
427                2u8.hash(state);
428                s.hash(state);
429            }
430            Value::Blob(b) => {
431                3u8.hash(state);
432                b.hash(state);
433            }
434            Value::Boolean(b) => {
435                4u8.hash(state);
436                b.hash(state);
437            }
438            Value::Time(t) => {
439                5u8.hash(state);
440                t.hash(state);
441            }
442            Value::Date(d) => {
443                6u8.hash(state);
444                d.hash(state);
445            }
446            Value::Timestamp(t) => {
447                7u8.hash(state);
448                t.hash(state);
449            }
450            Value::Interval {
451                months,
452                days,
453                micros,
454            } => {
455                8u8.hash(state);
456                months.hash(state);
457                days.hash(state);
458                micros.hash(state);
459            }
460            Value::Json(s) => {
461                9u8.hash(state);
462                s.hash(state);
463            }
464            Value::Jsonb(b) => {
465                10u8.hash(state);
466                b.hash(state);
467            }
468            Value::TsVector(b) => {
469                11u8.hash(state);
470                b.hash(state);
471            }
472            Value::TsQuery(b) => {
473                12u8.hash(state);
474                b.hash(state);
475            }
476        }
477    }
478}
479
480impl PartialOrd for Value {
481    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
482        Some(self.cmp(other))
483    }
484}
485
486impl Ord for Value {
487    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
488    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
489    fn cmp(&self, other: &Self) -> Ordering {
490        match (self, other) {
491            (Value::Null, Value::Null) => Ordering::Equal,
492            (Value::Null, _) => Ordering::Less,
493            (_, Value::Null) => Ordering::Greater,
494
495            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
496            (Value::Boolean(_), _) => Ordering::Less,
497            (_, Value::Boolean(_)) => Ordering::Greater,
498
499            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
500                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
501            }
502            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
503            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
504
505            (Value::Time(a), Value::Time(b)) => a.cmp(b),
506            (Value::Time(_), _) => Ordering::Less,
507            (_, Value::Time(_)) => Ordering::Greater,
508
509            (Value::Date(a), Value::Date(b)) => a.cmp(b),
510            (Value::Date(_), _) => Ordering::Less,
511            (_, Value::Date(_)) => Ordering::Greater,
512
513            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
514            (Value::Timestamp(_), _) => Ordering::Less,
515            (_, Value::Timestamp(_)) => Ordering::Greater,
516
517            (
518                Value::Interval {
519                    months: am,
520                    days: ad,
521                    micros: au,
522                },
523                Value::Interval {
524                    months: bm,
525                    days: bd,
526                    micros: bu,
527                },
528            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
529            (Value::Interval { .. }, _) => Ordering::Less,
530            (_, Value::Interval { .. }) => Ordering::Greater,
531
532            (Value::Json(a), Value::Json(b)) => a.cmp(b),
533            (Value::Json(_), _) => Ordering::Less,
534            (_, Value::Json(_)) => Ordering::Greater,
535
536            (Value::Jsonb(a), Value::Jsonb(b)) => a.as_ref().cmp(b.as_ref()),
537            (Value::Jsonb(_), _) => Ordering::Less,
538            (_, Value::Jsonb(_)) => Ordering::Greater,
539
540            (Value::TsVector(a), Value::TsVector(b)) => a.as_ref().cmp(b.as_ref()),
541            (Value::TsVector(_), _) => Ordering::Less,
542            (_, Value::TsVector(_)) => Ordering::Greater,
543
544            (Value::TsQuery(a), Value::TsQuery(b)) => a.as_ref().cmp(b.as_ref()),
545            (Value::TsQuery(_), _) => Ordering::Less,
546            (_, Value::TsQuery(_)) => Ordering::Greater,
547
548            (Value::Text(a), Value::Text(b)) => a.cmp(b),
549            (Value::Text(_), _) => Ordering::Less,
550            (_, Value::Text(_)) => Ordering::Greater,
551
552            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
553        }
554    }
555}
556
557impl fmt::Display for Value {
558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559        match self {
560            Value::Null => write!(f, "NULL"),
561            Value::Integer(i) => write!(f, "{i}"),
562            Value::Real(r) => {
563                if r.fract() == 0.0 && r.is_finite() {
564                    write!(f, "{r:.1}")
565                } else {
566                    write!(f, "{r}")
567                }
568            }
569            Value::Text(s) => write!(f, "{s}"),
570            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
571            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
572            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
573            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
574            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
575            Value::Interval {
576                months,
577                days,
578                micros,
579            } => {
580                write!(
581                    f,
582                    "{}",
583                    crate::datetime::format_interval(*months, *days, *micros)
584                )
585            }
586            Value::Json(s) => write!(f, "{s}"),
587            Value::Jsonb(b) => match crate::json::decode_to_text(b) {
588                Ok(s) => write!(f, "{s}"),
589                Err(_) => write!(f, "<invalid jsonb>"),
590            },
591            Value::TsVector(b) => write!(f, "{}", crate::fts::tsvector_display(b)),
592            Value::TsQuery(b) => write!(f, "{}", crate::fts::tsquery_display(b)),
593        }
594    }
595}
596
597fn hex_encode(data: &[u8]) -> String {
598    let mut s = String::with_capacity(data.len() * 2);
599    for byte in data {
600        s.push_str(&format!("{byte:02X}"));
601    }
602    s
603}
604
605#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
606#[repr(u8)]
607pub enum Collation {
608    #[default]
609    Binary = 0,
610    NoCase = 1,
611    Rtrim = 2,
612}
613
614impl Collation {
615    pub fn from_tag(tag: u8) -> Option<Self> {
616        match tag {
617            0 => Some(Self::Binary),
618            1 => Some(Self::NoCase),
619            2 => Some(Self::Rtrim),
620            _ => None,
621        }
622    }
623
624    pub fn from_name(name: &str) -> Option<Self> {
625        match name.to_ascii_uppercase().as_str() {
626            "BINARY" => Some(Self::Binary),
627            "NOCASE" => Some(Self::NoCase),
628            "RTRIM" => Some(Self::Rtrim),
629            _ => None,
630        }
631    }
632
633    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
634        match self {
635            Collation::Binary => a.cmp(b),
636            Collation::NoCase => Iterator::cmp(
637                a.chars().map(|c| c.to_ascii_lowercase()),
638                b.chars().map(|c| c.to_ascii_lowercase()),
639            ),
640            Collation::Rtrim => {
641                let la = a.trim_end_matches(' ');
642                let lb = b.trim_end_matches(' ');
643                la.cmp(lb)
644            }
645        }
646    }
647
648    pub fn eq_text(self, a: &str, b: &str) -> bool {
649        match self {
650            Collation::Binary => a == b,
651            Collation::NoCase => a.eq_ignore_ascii_case(b),
652            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
653        }
654    }
655}
656
657/// Column definition.
658#[derive(Debug, Clone)]
659pub struct ColumnDef {
660    pub name: String,
661    pub data_type: DataType,
662    pub nullable: bool,
663    pub position: u16,
664    pub default_expr: Option<Expr>,
665    pub default_sql: Option<String>,
666    pub check_expr: Option<Expr>,
667    pub check_sql: Option<String>,
668    pub check_name: Option<String>,
669    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
670    pub is_with_timezone: bool,
671    pub generated_expr: Option<Expr>,
672    pub generated_sql: Option<String>,
673    pub generated_kind: Option<crate::parser::GeneratedKind>,
674    pub collation: Collation,
675}
676
677#[derive(Debug, Clone, Copy, PartialEq, Eq)]
678pub enum GinOpsClass {
679    /// One entry per (key, value) pair; supports `@>` `?` `?|` `?&`.
680    JsonbOps,
681    /// One entry per hash(path‖value); supports `@>` only, ~3x smaller index.
682    JsonbPathOps,
683}
684
685impl GinOpsClass {
686    pub fn as_tag(self) -> u8 {
687        match self {
688            Self::JsonbOps => 0,
689            Self::JsonbPathOps => 1,
690        }
691    }
692
693    pub fn from_tag(t: u8) -> Option<Self> {
694        match t {
695            0 => Some(Self::JsonbOps),
696            1 => Some(Self::JsonbPathOps),
697            _ => None,
698        }
699    }
700}
701
702#[derive(Debug, Clone, Copy, PartialEq, Eq)]
703pub enum InvertedKind {
704    Gin(GinOpsClass),
705    Fts { config_id: u8 },
706}
707
708#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
709pub enum IndexKind {
710    #[default]
711    BTree,
712    Inverted(InvertedKind),
713}
714
715/// Index definition stored as part of the table schema.
716#[derive(Debug, Clone)]
717pub struct IndexDef {
718    pub name: String,
719    pub columns: Vec<u16>,
720    pub unique: bool,
721    pub predicate_sql: Option<String>,
722    pub predicate_expr: Option<crate::parser::Expr>,
723    pub collations: Vec<Collation>,
724    pub kind: IndexKind,
725}
726
727/// View definition stored in the _views metadata table.
728#[derive(Debug, Clone)]
729pub struct ViewDef {
730    pub name: String,
731    pub sql: String,
732    pub column_aliases: Vec<String>,
733}
734
735const VIEW_DEF_VERSION: u8 = 1;
736
737impl ViewDef {
738    pub fn serialize(&self) -> Vec<u8> {
739        let mut buf = Vec::new();
740        buf.push(VIEW_DEF_VERSION);
741
742        let name_bytes = self.name.as_bytes();
743        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
744        buf.extend_from_slice(name_bytes);
745
746        let sql_bytes = self.sql.as_bytes();
747        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
748        buf.extend_from_slice(sql_bytes);
749
750        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
751        for alias in &self.column_aliases {
752            let alias_bytes = alias.as_bytes();
753            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
754            buf.extend_from_slice(alias_bytes);
755        }
756
757        buf
758    }
759
760    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
761        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
762            return Err(crate::error::SqlError::InvalidValue(
763                "invalid view definition version".into(),
764            ));
765        }
766        let mut pos = 1;
767
768        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
769        pos += 2;
770        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
771        pos += name_len;
772
773        let sql_len =
774            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
775        pos += 4;
776        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
777        pos += sql_len;
778
779        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
780        pos += 2;
781        let mut column_aliases = Vec::with_capacity(alias_count);
782        for _ in 0..alias_count {
783            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
784            pos += 2;
785            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
786            pos += alias_len;
787            column_aliases.push(alias);
788        }
789
790        Ok(Self {
791            name,
792            sql,
793            column_aliases,
794        })
795    }
796}
797
798/// Table-level CHECK constraint stored in schema.
799#[derive(Debug, Clone)]
800pub struct TableCheckDef {
801    pub name: Option<String>,
802    pub expr: Expr,
803    pub sql: String,
804}
805
806/// Foreign key definition stored in schema.
807#[derive(Debug, Clone)]
808pub struct ForeignKeySchemaEntry {
809    pub name: Option<String>,
810    pub columns: Vec<u16>,
811    pub foreign_table: String,
812    pub referred_columns: Vec<String>,
813    pub on_delete: crate::parser::ReferentialAction,
814    pub on_update: crate::parser::ReferentialAction,
815    pub deferrable: bool,
816    pub initially_deferred: bool,
817}
818
819/// Table schema stored in the _schema table.
820#[derive(Debug)]
821pub struct TableSchema {
822    pub name: String,
823    pub columns: Vec<ColumnDef>,
824    pub primary_key_columns: Vec<u16>,
825    pub indices: Vec<IndexDef>,
826    pub check_constraints: Vec<TableCheckDef>,
827    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
828    pub flags: u8,
829    pk_idx_cache: Vec<usize>,
830    non_pk_idx_cache: Vec<usize>,
831    /// Sorted physical slots dropped via DROP COLUMN.
832    dropped_non_pk_slots: Vec<u16>,
833    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
834    decode_mapping_cache: Vec<usize>,
835    /// Logical non-PK order -> physical encoding position.
836    encoding_positions_cache: Vec<u16>,
837    has_virtual_columns_cache: bool,
838    column_map_cache: std::sync::OnceLock<crate::eval::ColumnMap>,
839}
840
841impl Clone for TableSchema {
842    fn clone(&self) -> Self {
843        Self {
844            name: self.name.clone(),
845            columns: self.columns.clone(),
846            primary_key_columns: self.primary_key_columns.clone(),
847            indices: self.indices.clone(),
848            check_constraints: self.check_constraints.clone(),
849            foreign_keys: self.foreign_keys.clone(),
850            flags: self.flags,
851            pk_idx_cache: self.pk_idx_cache.clone(),
852            non_pk_idx_cache: self.non_pk_idx_cache.clone(),
853            dropped_non_pk_slots: self.dropped_non_pk_slots.clone(),
854            decode_mapping_cache: self.decode_mapping_cache.clone(),
855            encoding_positions_cache: self.encoding_positions_cache.clone(),
856            has_virtual_columns_cache: self.has_virtual_columns_cache,
857            column_map_cache: std::sync::OnceLock::new(),
858        }
859    }
860}
861
862impl TableSchema {
863    pub fn new(
864        name: String,
865        columns: Vec<ColumnDef>,
866        primary_key_columns: Vec<u16>,
867        indices: Vec<IndexDef>,
868        check_constraints: Vec<TableCheckDef>,
869        foreign_keys: Vec<ForeignKeySchemaEntry>,
870    ) -> Self {
871        Self::with_drops(
872            name,
873            columns,
874            primary_key_columns,
875            indices,
876            check_constraints,
877            foreign_keys,
878            vec![],
879        )
880    }
881
882    pub fn with_drops(
883        name: String,
884        columns: Vec<ColumnDef>,
885        primary_key_columns: Vec<u16>,
886        indices: Vec<IndexDef>,
887        check_constraints: Vec<TableCheckDef>,
888        foreign_keys: Vec<ForeignKeySchemaEntry>,
889        dropped_non_pk_slots: Vec<u16>,
890    ) -> Self {
891        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
892        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
893            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
894            .collect();
895
896        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
897        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
898        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
899
900        let mut drop_idx = 0;
901        let mut live_idx = 0;
902        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
903            if drop_idx < dropped_non_pk_slots.len()
904                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
905            {
906                drop_idx += 1;
907            } else {
908                *slot = non_pk_idx_cache[live_idx];
909                encoding_positions_cache.push(phys_pos as u16);
910                live_idx += 1;
911            }
912        }
913
914        let has_virtual_columns_cache = columns.iter().any(|c| {
915            matches!(
916                c.generated_kind,
917                Some(crate::parser::GeneratedKind::Virtual)
918            )
919        });
920
921        Self {
922            name,
923            columns,
924            primary_key_columns,
925            indices,
926            check_constraints,
927            foreign_keys,
928            flags: 0,
929            pk_idx_cache,
930            non_pk_idx_cache,
931            dropped_non_pk_slots,
932            decode_mapping_cache,
933            encoding_positions_cache,
934            has_virtual_columns_cache,
935            column_map_cache: std::sync::OnceLock::new(),
936        }
937    }
938
939    #[inline]
940    pub fn column_map(&self) -> &crate::eval::ColumnMap {
941        self.column_map_cache
942            .get_or_init(|| crate::eval::ColumnMap::new(&self.columns))
943    }
944
945    pub fn is_strict(&self) -> bool {
946        self.flags & TABLE_FLAG_STRICT != 0
947    }
948
949    pub fn has_virtual_columns(&self) -> bool {
950        self.has_virtual_columns_cache
951    }
952
953    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
954    pub fn rebuild(self) -> Self {
955        let drops = self.dropped_non_pk_slots;
956        Self::with_drops(
957            self.name,
958            self.columns,
959            self.primary_key_columns,
960            self.indices,
961            self.check_constraints,
962            self.foreign_keys,
963            drops,
964        )
965    }
966
967    /// Returns true if any column-level or table-level CHECK constraints exist.
968    pub fn has_checks(&self) -> bool {
969        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
970    }
971
972    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
973    pub fn decode_col_mapping(&self) -> &[usize] {
974        &self.decode_mapping_cache
975    }
976
977    /// Logical non-PK order -> physical encoding position.
978    pub fn encoding_positions(&self) -> &[u16] {
979        &self.encoding_positions_cache
980    }
981
982    /// Total physical non-PK column count (live + dropped slots).
983    pub fn physical_non_pk_count(&self) -> usize {
984        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
985    }
986
987    /// Physical encoding slots that have been dropped via DROP COLUMN.
988    pub fn dropped_non_pk_slots(&self) -> &[u16] {
989        &self.dropped_non_pk_slots
990    }
991
992    /// Return a new schema with the column at `drop_pos` marked as dropped.
993    pub fn without_column(&self, drop_pos: usize) -> Self {
994        let non_pk_order = self
995            .non_pk_idx_cache
996            .iter()
997            .position(|&i| i == drop_pos)
998            .expect("cannot drop PK column via without_column");
999        let physical_slot = self.encoding_positions_cache[non_pk_order];
1000
1001        let mut new_dropped = self.dropped_non_pk_slots.clone();
1002        new_dropped.push(physical_slot);
1003        new_dropped.sort();
1004
1005        let dropped_name = &self.columns[drop_pos].name;
1006        let drop_pos_u16 = drop_pos as u16;
1007
1008        let mut columns: Vec<ColumnDef> = self
1009            .columns
1010            .iter()
1011            .enumerate()
1012            .filter(|(i, _)| *i != drop_pos)
1013            .map(|(_, c)| {
1014                let mut col = c.clone();
1015                if col.position > drop_pos_u16 {
1016                    col.position -= 1;
1017                }
1018                col
1019            })
1020            .collect();
1021        for (i, col) in columns.iter_mut().enumerate() {
1022            col.position = i as u16;
1023        }
1024
1025        let primary_key_columns: Vec<u16> = self
1026            .primary_key_columns
1027            .iter()
1028            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1029            .collect();
1030
1031        let indices: Vec<IndexDef> = self
1032            .indices
1033            .iter()
1034            .map(|idx| IndexDef {
1035                name: idx.name.clone(),
1036                columns: idx
1037                    .columns
1038                    .iter()
1039                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
1040                    .collect(),
1041                unique: idx.unique,
1042                predicate_sql: idx.predicate_sql.clone(),
1043                predicate_expr: idx.predicate_expr.clone(),
1044                collations: idx.collations.clone(),
1045                kind: idx.kind,
1046            })
1047            .collect();
1048
1049        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
1050            .foreign_keys
1051            .iter()
1052            .map(|fk| ForeignKeySchemaEntry {
1053                name: fk.name.clone(),
1054                columns: fk
1055                    .columns
1056                    .iter()
1057                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
1058                    .collect(),
1059                foreign_table: fk.foreign_table.clone(),
1060                referred_columns: fk.referred_columns.clone(),
1061                on_delete: fk.on_delete,
1062                on_update: fk.on_update,
1063                deferrable: fk.deferrable,
1064                initially_deferred: fk.initially_deferred,
1065            })
1066            .collect();
1067
1068        // Filter out table-level CHECKs that reference the dropped column
1069        let dropped_lower = dropped_name.to_ascii_lowercase();
1070        let check_constraints: Vec<TableCheckDef> = self
1071            .check_constraints
1072            .iter()
1073            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
1074            .cloned()
1075            .collect();
1076
1077        Self::with_drops(
1078            self.name.clone(),
1079            columns,
1080            primary_key_columns,
1081            indices,
1082            check_constraints,
1083            foreign_keys,
1084            new_dropped,
1085        )
1086    }
1087}
1088
1089const SCHEMA_VERSION: u8 = 11;
1090pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
1091
1092fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
1093    match s {
1094        Some(s) => {
1095            let bytes = s.as_bytes();
1096            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1097            buf.extend_from_slice(bytes);
1098        }
1099        None => buf.extend_from_slice(&0u16.to_le_bytes()),
1100    }
1101}
1102
1103fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
1104    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1105    *pos += 2;
1106    if len == 0 {
1107        None
1108    } else {
1109        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1110        *pos += len;
1111        Some(s)
1112    }
1113}
1114
1115fn read_string(data: &[u8], pos: &mut usize) -> String {
1116    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1117    *pos += 2;
1118    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1119    *pos += len;
1120    s
1121}
1122
1123impl TableSchema {
1124    pub fn serialize(&self) -> Vec<u8> {
1125        let mut buf = Vec::new();
1126        buf.push(SCHEMA_VERSION);
1127
1128        let name_bytes = self.name.as_bytes();
1129        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
1130        buf.extend_from_slice(name_bytes);
1131
1132        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
1133
1134        for col in &self.columns {
1135            let col_name = col.name.as_bytes();
1136            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
1137            buf.extend_from_slice(col_name);
1138            buf.push(col.data_type.type_tag());
1139            buf.push(if col.nullable { 1 } else { 0 });
1140            buf.extend_from_slice(&col.position.to_le_bytes());
1141        }
1142
1143        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
1144        for &pk_idx in &self.primary_key_columns {
1145            buf.extend_from_slice(&pk_idx.to_le_bytes());
1146        }
1147
1148        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
1149        for idx in &self.indices {
1150            let idx_name = idx.name.as_bytes();
1151            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
1152            buf.extend_from_slice(idx_name);
1153            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
1154            for &col_idx in &idx.columns {
1155                buf.extend_from_slice(&col_idx.to_le_bytes());
1156            }
1157            buf.push(if idx.unique { 1 } else { 0 });
1158        }
1159
1160        for col in &self.columns {
1161            let mut flags: u8 = 0;
1162            if col.default_sql.is_some() {
1163                flags |= 1;
1164            }
1165            if col.check_sql.is_some() {
1166                flags |= 2;
1167            }
1168            buf.push(flags);
1169            if let Some(ref sql) = col.default_sql {
1170                let bytes = sql.as_bytes();
1171                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1172                buf.extend_from_slice(bytes);
1173            }
1174            if let Some(ref sql) = col.check_sql {
1175                let bytes = sql.as_bytes();
1176                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1177                buf.extend_from_slice(bytes);
1178                write_opt_string(&mut buf, &col.check_name);
1179            }
1180        }
1181
1182        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1183        for chk in &self.check_constraints {
1184            write_opt_string(&mut buf, &chk.name);
1185            let sql_bytes = chk.sql.as_bytes();
1186            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1187            buf.extend_from_slice(sql_bytes);
1188        }
1189
1190        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1191        for fk in &self.foreign_keys {
1192            write_opt_string(&mut buf, &fk.name);
1193            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1194            for &col_idx in &fk.columns {
1195                buf.extend_from_slice(&col_idx.to_le_bytes());
1196            }
1197            let ft_bytes = fk.foreign_table.as_bytes();
1198            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1199            buf.extend_from_slice(ft_bytes);
1200            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1201            for rc in &fk.referred_columns {
1202                let rc_bytes = rc.as_bytes();
1203                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1204                buf.extend_from_slice(rc_bytes);
1205            }
1206        }
1207
1208        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1209        for &slot in &self.dropped_non_pk_slots {
1210            buf.extend_from_slice(&slot.to_le_bytes());
1211        }
1212
1213        for col in &self.columns {
1214            let kind_tag: u8 = match col.generated_kind {
1215                None => 0,
1216                Some(crate::parser::GeneratedKind::Stored) => 1,
1217                Some(crate::parser::GeneratedKind::Virtual) => 2,
1218            };
1219            buf.push(kind_tag);
1220            if kind_tag != 0 {
1221                let sql = col.generated_sql.as_deref().unwrap_or("");
1222                let bytes = sql.as_bytes();
1223                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1224                buf.extend_from_slice(bytes);
1225            }
1226        }
1227
1228        for idx in &self.indices {
1229            match &idx.predicate_sql {
1230                Some(sql) => {
1231                    buf.push(1);
1232                    let bytes = sql.as_bytes();
1233                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1234                    buf.extend_from_slice(bytes);
1235                }
1236                None => buf.push(0),
1237            }
1238        }
1239
1240        for fk in &self.foreign_keys {
1241            buf.push(fk.on_delete as u8);
1242            buf.push(fk.on_update as u8);
1243        }
1244
1245        for fk in &self.foreign_keys {
1246            let mut flags: u8 = 0;
1247            if fk.deferrable {
1248                flags |= 0b01;
1249            }
1250            if fk.initially_deferred {
1251                flags |= 0b10;
1252            }
1253            buf.push(flags);
1254        }
1255
1256        for col in &self.columns {
1257            buf.push(col.collation as u8);
1258        }
1259        for idx in &self.indices {
1260            let n = idx.collations.len() as u16;
1261            buf.extend_from_slice(&n.to_le_bytes());
1262            for c in &idx.collations {
1263                buf.push(*c as u8);
1264            }
1265        }
1266        for idx in &self.indices {
1267            match idx.kind {
1268                IndexKind::BTree => buf.push(0),
1269                IndexKind::Inverted(InvertedKind::Gin(ops)) => {
1270                    buf.push(1);
1271                    buf.push(ops.as_tag());
1272                }
1273                IndexKind::Inverted(InvertedKind::Fts { config_id }) => {
1274                    buf.push(2);
1275                    buf.push(config_id);
1276                }
1277            }
1278        }
1279        buf.push(self.flags);
1280
1281        buf
1282    }
1283
1284    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1285        let mut pos = 0;
1286
1287        if data.is_empty()
1288            || !matches!(
1289                data[0],
1290                1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | SCHEMA_VERSION
1291            )
1292        {
1293            return Err(crate::error::SqlError::InvalidValue(
1294                "invalid schema version".into(),
1295            ));
1296        }
1297        let version = data[0];
1298        pos += 1;
1299
1300        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1301        pos += 2;
1302        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1303        pos += name_len;
1304
1305        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1306        pos += 2;
1307
1308        let mut columns = Vec::with_capacity(col_count);
1309        for _ in 0..col_count {
1310            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1311            pos += 2;
1312            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1313            pos += col_name_len;
1314            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
1315                crate::error::SqlError::InvalidValue("unknown data type tag".into())
1316            })?;
1317            pos += 1;
1318            let nullable = data[pos] != 0;
1319            pos += 1;
1320            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1321            pos += 2;
1322            columns.push(ColumnDef {
1323                name: col_name,
1324                data_type,
1325                nullable,
1326                position,
1327                default_expr: None,
1328                default_sql: None,
1329                check_expr: None,
1330                check_sql: None,
1331                check_name: None,
1332                is_with_timezone: false,
1333                generated_expr: None,
1334                generated_sql: None,
1335                generated_kind: None,
1336                collation: Collation::Binary,
1337            });
1338        }
1339
1340        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1341        pos += 2;
1342        let mut primary_key_columns = Vec::with_capacity(pk_count);
1343        for _ in 0..pk_count {
1344            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1345            pos += 2;
1346            primary_key_columns.push(pk_idx);
1347        }
1348
1349        let indices = if version >= 2 && pos + 2 <= data.len() {
1350            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1351            pos += 2;
1352            let mut idxs = Vec::with_capacity(idx_count);
1353            for _ in 0..idx_count {
1354                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1355                pos += 2;
1356                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1357                pos += idx_name_len;
1358                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1359                pos += 2;
1360                let mut cols = Vec::with_capacity(col_count);
1361                for _ in 0..col_count {
1362                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1363                    pos += 2;
1364                    cols.push(col_idx);
1365                }
1366                let unique = data[pos] != 0;
1367                pos += 1;
1368                idxs.push(IndexDef {
1369                    name: idx_name,
1370                    columns: cols,
1371                    unique,
1372                    predicate_sql: None,
1373                    predicate_expr: None,
1374                    collations: vec![],
1375                    kind: IndexKind::default(),
1376                });
1377            }
1378            idxs
1379        } else {
1380            vec![]
1381        };
1382
1383        let mut check_constraints = Vec::new();
1384        let mut foreign_keys = Vec::new();
1385
1386        if version >= 3 && pos < data.len() {
1387            for col in &mut columns {
1388                let flags = data[pos];
1389                pos += 1;
1390                if flags & 1 != 0 {
1391                    let sql = read_string(data, &mut pos);
1392                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1393                        crate::error::SqlError::InvalidValue(format!(
1394                            "cannot parse DEFAULT expression: {sql}"
1395                        ))
1396                    })?);
1397                    col.default_sql = Some(sql);
1398                }
1399                if flags & 2 != 0 {
1400                    let sql = read_string(data, &mut pos);
1401                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1402                        crate::error::SqlError::InvalidValue(format!(
1403                            "cannot parse CHECK expression: {sql}"
1404                        ))
1405                    })?);
1406                    col.check_sql = Some(sql);
1407                    col.check_name = read_opt_string(data, &mut pos);
1408                }
1409            }
1410
1411            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1412            pos += 2;
1413            for _ in 0..chk_count {
1414                let name = read_opt_string(data, &mut pos);
1415                let sql = read_string(data, &mut pos);
1416                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1417                    crate::error::SqlError::InvalidValue(format!(
1418                        "cannot parse CHECK expression: {sql}"
1419                    ))
1420                })?;
1421                check_constraints.push(TableCheckDef { name, expr, sql });
1422            }
1423
1424            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1425            pos += 2;
1426            for _ in 0..fk_count {
1427                let name = read_opt_string(data, &mut pos);
1428                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1429                pos += 2;
1430                let mut cols = Vec::with_capacity(col_count);
1431                for _ in 0..col_count {
1432                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1433                    pos += 2;
1434                    cols.push(col_idx);
1435                }
1436                let foreign_table = read_string(data, &mut pos);
1437                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1438                pos += 2;
1439                let mut referred_columns = Vec::with_capacity(ref_count);
1440                for _ in 0..ref_count {
1441                    referred_columns.push(read_string(data, &mut pos));
1442                }
1443                foreign_keys.push(ForeignKeySchemaEntry {
1444                    name,
1445                    columns: cols,
1446                    foreign_table,
1447                    referred_columns,
1448                    on_delete: crate::parser::ReferentialAction::NoAction,
1449                    on_update: crate::parser::ReferentialAction::NoAction,
1450                    deferrable: false,
1451                    initially_deferred: false,
1452                });
1453            }
1454        }
1455        let mut dropped_non_pk_slots = Vec::new();
1456        if version >= 4 && pos + 2 <= data.len() {
1457            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1458            pos += 2;
1459            for _ in 0..slot_count {
1460                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1461                pos += 2;
1462                dropped_non_pk_slots.push(slot);
1463            }
1464        }
1465        if version >= 5 && pos < data.len() {
1466            for col in &mut columns {
1467                let kind_tag = data[pos];
1468                pos += 1;
1469                if kind_tag != 0 {
1470                    let len = u32::from_le_bytes([
1471                        data[pos],
1472                        data[pos + 1],
1473                        data[pos + 2],
1474                        data[pos + 3],
1475                    ]) as usize;
1476                    pos += 4;
1477                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1478                    pos += len;
1479                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1480                        crate::error::SqlError::InvalidValue(format!(
1481                            "cannot parse GENERATED expression: {sql}"
1482                        ))
1483                    })?;
1484                    col.generated_sql = Some(sql);
1485                    col.generated_expr = Some(expr);
1486                    col.generated_kind = Some(match kind_tag {
1487                        1 => crate::parser::GeneratedKind::Stored,
1488                        2 => crate::parser::GeneratedKind::Virtual,
1489                        _ => {
1490                            return Err(crate::error::SqlError::InvalidValue(
1491                                "unknown GENERATED kind tag".into(),
1492                            ));
1493                        }
1494                    });
1495                }
1496            }
1497        }
1498        let mut indices = indices;
1499        if version >= 6 && pos < data.len() {
1500            for idx in &mut indices {
1501                let flag = data[pos];
1502                pos += 1;
1503                if flag == 1 {
1504                    let len = u32::from_le_bytes([
1505                        data[pos],
1506                        data[pos + 1],
1507                        data[pos + 2],
1508                        data[pos + 3],
1509                    ]) as usize;
1510                    pos += 4;
1511                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1512                    pos += len;
1513                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1514                        crate::error::SqlError::InvalidValue(format!(
1515                            "cannot parse partial-index predicate: {sql}"
1516                        ))
1517                    })?;
1518                    idx.predicate_sql = Some(sql);
1519                    idx.predicate_expr = Some(expr);
1520                }
1521            }
1522            for fk in &mut foreign_keys {
1523                fk.on_delete =
1524                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1525                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
1526                    })?;
1527                pos += 1;
1528                fk.on_update =
1529                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1530                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
1531                    })?;
1532                pos += 1;
1533            }
1534            if version >= 11 {
1535                for fk in &mut foreign_keys {
1536                    if pos >= data.len() {
1537                        break;
1538                    }
1539                    let flags = data[pos];
1540                    pos += 1;
1541                    fk.deferrable = flags & 0b01 != 0;
1542                    fk.initially_deferred = flags & 0b10 != 0;
1543                }
1544            }
1545        }
1546
1547        let mut columns = columns;
1548        let mut indices = indices;
1549        let mut flags: u8 = 0;
1550        if version >= 7 && pos < data.len() {
1551            for col in &mut columns {
1552                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
1553                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
1554                })?;
1555                pos += 1;
1556            }
1557            for idx in &mut indices {
1558                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1559                pos += 2;
1560                let mut colls = Vec::with_capacity(n);
1561                for _ in 0..n {
1562                    colls.push(Collation::from_tag(data[pos]).ok_or_else(|| {
1563                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
1564                    })?);
1565                    pos += 1;
1566                }
1567                idx.collations = colls;
1568            }
1569            if version >= 9 {
1570                for idx in &mut indices {
1571                    if pos >= data.len() {
1572                        break;
1573                    }
1574                    let tag = data[pos];
1575                    pos += 1;
1576                    idx.kind = match tag {
1577                        0 => IndexKind::BTree,
1578                        1 => {
1579                            if pos >= data.len() {
1580                                return Err(crate::error::SqlError::InvalidValue(
1581                                    "GIN index missing opclass tag".into(),
1582                                ));
1583                            }
1584                            let ops = GinOpsClass::from_tag(data[pos]).ok_or_else(|| {
1585                                crate::error::SqlError::InvalidValue(
1586                                    "unknown GIN opclass tag".into(),
1587                                )
1588                            })?;
1589                            pos += 1;
1590                            IndexKind::Inverted(InvertedKind::Gin(ops))
1591                        }
1592                        2 => {
1593                            if pos >= data.len() {
1594                                return Err(crate::error::SqlError::InvalidValue(
1595                                    "FTS index missing config_id".into(),
1596                                ));
1597                            }
1598                            let config_id = data[pos];
1599                            pos += 1;
1600                            IndexKind::Inverted(InvertedKind::Fts { config_id })
1601                        }
1602                        _ => {
1603                            return Err(crate::error::SqlError::InvalidValue(
1604                                "unknown IndexKind tag".into(),
1605                            ));
1606                        }
1607                    };
1608                }
1609            }
1610            if pos < data.len() {
1611                flags = data[pos];
1612                pos += 1;
1613            }
1614        }
1615        let _ = pos;
1616
1617        let mut schema = Self::with_drops(
1618            name,
1619            columns,
1620            primary_key_columns,
1621            indices,
1622            check_constraints,
1623            foreign_keys,
1624            dropped_non_pk_slots,
1625        );
1626        schema.flags = flags;
1627        Ok(schema)
1628    }
1629
1630    /// Get column index by name (case-insensitive).
1631    pub fn column_index(&self, name: &str) -> Option<usize> {
1632        self.columns
1633            .iter()
1634            .position(|c| c.name.eq_ignore_ascii_case(name))
1635    }
1636
1637    /// Get indices of non-PK columns (columns stored in the B+ tree value).
1638    pub fn non_pk_indices(&self) -> &[usize] {
1639        &self.non_pk_idx_cache
1640    }
1641
1642    /// Get the PK column indices as usize.
1643    pub fn pk_indices(&self) -> &[usize] {
1644        &self.pk_idx_cache
1645    }
1646
1647    /// Get index definition by name (case-insensitive).
1648    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
1649        let lower = name.to_ascii_lowercase();
1650        self.indices.iter().find(|i| i.name == lower)
1651    }
1652
1653    /// Get the KV table name for an index.
1654    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
1655        format!("__idx_{table_name}_{index_name}").into_bytes()
1656    }
1657}
1658
1659/// Result of executing a SQL statement.
1660#[derive(Debug)]
1661pub enum ExecutionResult {
1662    RowsAffected(u64),
1663    Query(QueryResult),
1664    Ok,
1665}
1666
1667/// Result of a SELECT query.
1668#[derive(Debug, Clone)]
1669pub struct QueryResult {
1670    pub columns: Vec<String>,
1671    pub rows: Vec<Vec<Value>>,
1672}
1673
1674#[cfg(test)]
1675#[path = "types_tests.rs"]
1676mod tests;