Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6pub use compact_str::CompactString;
7
8use crate::parser::Expr;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22    Json,
23    Jsonb,
24    TsVector,
25    TsQuery,
26    Array,
27}
28
29impl DataType {
30    pub fn type_tag(self) -> u8 {
31        match self {
32            DataType::Null => 0,
33            DataType::Blob => 1,
34            DataType::Text => 2,
35            DataType::Boolean => 3,
36            DataType::Integer => 4,
37            DataType::Real => 5,
38            DataType::Time => 6,
39            DataType::Date => 7,
40            DataType::Timestamp => 8,
41            DataType::Interval => 9,
42            DataType::Json => 10,
43            DataType::Jsonb => 11,
44            DataType::TsVector => 12,
45            DataType::TsQuery => 13,
46            DataType::Array => 14,
47        }
48    }
49
50    pub fn from_tag(tag: u8) -> Option<Self> {
51        match tag {
52            0 => Some(DataType::Null),
53            1 => Some(DataType::Blob),
54            2 => Some(DataType::Text),
55            3 => Some(DataType::Boolean),
56            4 => Some(DataType::Integer),
57            5 => Some(DataType::Real),
58            6 => Some(DataType::Time),
59            7 => Some(DataType::Date),
60            8 => Some(DataType::Timestamp),
61            9 => Some(DataType::Interval),
62            10 => Some(DataType::Json),
63            11 => Some(DataType::Jsonb),
64            12 => Some(DataType::TsVector),
65            13 => Some(DataType::TsQuery),
66            14 => Some(DataType::Array),
67            _ => None,
68        }
69    }
70}
71
72impl fmt::Display for DataType {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        match self {
75            DataType::Null => write!(f, "NULL"),
76            DataType::Integer => write!(f, "INTEGER"),
77            DataType::Real => write!(f, "REAL"),
78            DataType::Text => write!(f, "TEXT"),
79            DataType::Blob => write!(f, "BLOB"),
80            DataType::Boolean => write!(f, "BOOLEAN"),
81            DataType::Time => write!(f, "TIME"),
82            DataType::Date => write!(f, "DATE"),
83            DataType::Timestamp => write!(f, "TIMESTAMP"),
84            DataType::Interval => write!(f, "INTERVAL"),
85            DataType::Json => write!(f, "JSON"),
86            DataType::Jsonb => write!(f, "JSONB"),
87            DataType::TsVector => write!(f, "TSVECTOR"),
88            DataType::TsQuery => write!(f, "TSQUERY"),
89            DataType::Array => write!(f, "ARRAY"),
90        }
91    }
92}
93
94/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
95/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
96#[derive(Debug, Clone, Default)]
97pub enum Value {
98    #[default]
99    Null,
100    Integer(i64),
101    Real(f64),
102    Text(CompactString),
103    Blob(Vec<u8>),
104    Boolean(bool),
105    Time(i64),
106    Date(i32),
107    Timestamp(i64),
108    Interval {
109        months: i32,
110        days: i32,
111        micros: i64,
112    },
113    Json(CompactString),
114    Jsonb(Arc<[u8]>),
115    TsVector(Arc<[u8]>),
116    TsQuery(Arc<[u8]>),
117    Array(Arc<Vec<Value>>),
118}
119
120impl Value {
121    pub fn data_type(&self) -> DataType {
122        match self {
123            Value::Null => DataType::Null,
124            Value::Integer(_) => DataType::Integer,
125            Value::Real(_) => DataType::Real,
126            Value::Text(_) => DataType::Text,
127            Value::Blob(_) => DataType::Blob,
128            Value::Boolean(_) => DataType::Boolean,
129            Value::Time(_) => DataType::Time,
130            Value::Date(_) => DataType::Date,
131            Value::Timestamp(_) => DataType::Timestamp,
132            Value::Interval { .. } => DataType::Interval,
133            Value::Json(_) => DataType::Json,
134            Value::Jsonb(_) => DataType::Jsonb,
135            Value::TsVector(_) => DataType::TsVector,
136            Value::TsQuery(_) => DataType::TsQuery,
137            Value::Array(_) => DataType::Array,
138        }
139    }
140
141    pub fn is_null(&self) -> bool {
142        matches!(self, Value::Null)
143    }
144
145    pub fn is_finite_temporal(&self) -> bool {
146        match self {
147            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
148            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
149            _ => true,
150        }
151    }
152
153    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
154        match (self, target) {
155            (_, DataType::Null) => Some(Value::Null),
156            (Value::Null, _) => Some(Value::Null),
157            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
158            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
159            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
160            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
161            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
162            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
163            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
164            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
165            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
166            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
167            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
168            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
169            (Value::TsVector(b), DataType::TsVector) => Some(Value::TsVector(b.clone())),
170            (Value::TsQuery(b), DataType::TsQuery) => Some(Value::TsQuery(b.clone())),
171            (Value::Array(a), DataType::Array) => Some(Value::Array(a.clone())),
172            (
173                Value::Interval {
174                    months,
175                    days,
176                    micros,
177                },
178                DataType::Interval,
179            ) => Some(Value::Interval {
180                months: *months,
181                days: *days,
182                micros: *micros,
183            }),
184            _ => None,
185        }
186    }
187
188    pub fn coerce_into(self, target: DataType) -> Option<Value> {
189        if self.is_null() || target == DataType::Null {
190            return Some(Value::Null);
191        }
192        if self.data_type() == target {
193            return Some(self);
194        }
195        match (self, target) {
196            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
197            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
198            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
199            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
200            (Value::Text(s), DataType::Date) => {
201                crate::datetime::parse_date(&s).ok().map(Value::Date)
202            }
203            (Value::Text(s), DataType::Time) => {
204                crate::datetime::parse_time(&s).ok().map(Value::Time)
205            }
206            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
207                .ok()
208                .map(Value::Timestamp),
209            (Value::Text(s), DataType::Interval) => {
210                crate::datetime::parse_interval(&s)
211                    .ok()
212                    .map(|(m, d, u)| Value::Interval {
213                        months: m,
214                        days: d,
215                        micros: u,
216                    })
217            }
218            // INTEGER → TIMESTAMP: Unix epoch seconds.
219            (Value::Integer(n), DataType::Timestamp) => {
220                n.checked_mul(1_000_000).map(Value::Timestamp)
221            }
222            (Value::Integer(n), DataType::Date) => {
223                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
224                    Some(Value::Date(n as i32))
225                } else {
226                    None
227                }
228            }
229            (Value::Integer(n), DataType::Time) => {
230                if (0..=86_400_000_000).contains(&n) {
231                    Some(Value::Time(n))
232                } else {
233                    None
234                }
235            }
236            (Value::Integer(n), DataType::Interval) => {
237                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
238                    Some(Value::Interval {
239                        months: 0,
240                        days: n as i32,
241                        micros: 0,
242                    })
243                } else {
244                    None
245                }
246            }
247            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
248            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
249            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
250            (Value::Date(d), DataType::Timestamp) => {
251                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
252            }
253            (Value::Timestamp(t), DataType::Date) => {
254                // div_euclid floors correctly for negative µs (pre-1970).
255                let days = t.div_euclid(86_400_000_000);
256                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
257                    Some(Value::Date(days as i32))
258                } else {
259                    None
260                }
261            }
262            (v, DataType::Text)
263                if matches!(
264                    v.data_type(),
265                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
266                ) =>
267            {
268                Some(Value::Text(v.to_string().into()))
269            }
270            (Value::Text(s), DataType::Json) => {
271                crate::json::validate_text(&s).ok()?;
272                Some(Value::Json(s))
273            }
274            (Value::Text(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
275            (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
276            (Value::Json(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
277            (Value::Jsonb(b), DataType::Text) => crate::json::decode_to_text(&b)
278                .ok()
279                .map(|t| Value::Text(t.into())),
280            (Value::Jsonb(b), DataType::Json) => crate::json::decode_to_text(&b)
281                .ok()
282                .map(|t| Value::Json(t.into())),
283            _ => None,
284        }
285    }
286
287    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
288        if matches!(self, Value::Null) {
289            return Some(Value::Null);
290        }
291        if self.data_type() == target {
292            return Some(self.clone());
293        }
294        match (self, target) {
295            (Value::Integer(i), DataType::Real) => {
296                if i.unsigned_abs() <= (1u64 << 53) {
297                    Some(Value::Real(*i as f64))
298                } else {
299                    None
300                }
301            }
302            (Value::Real(r), DataType::Integer) => {
303                if r.is_finite()
304                    && r.fract() == 0.0
305                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
306                {
307                    Some(Value::Integer(*r as i64))
308                } else {
309                    None
310                }
311            }
312            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
313            (Value::Integer(i), DataType::Boolean) => match i {
314                0 => Some(Value::Boolean(false)),
315                1 => Some(Value::Boolean(true)),
316                _ => None,
317            },
318            (Value::Text(s), DataType::Integer) => {
319                let trimmed = s.as_str();
320                let parsed: i64 = trimmed.parse().ok()?;
321                if parsed.to_string() == trimmed {
322                    Some(Value::Integer(parsed))
323                } else {
324                    None
325                }
326            }
327            (Value::Text(s), DataType::Real) => {
328                let trimmed = s.as_str();
329                let parsed: f64 = trimmed.parse().ok()?;
330                if parsed.is_finite() {
331                    Some(Value::Real(parsed))
332                } else {
333                    None
334                }
335            }
336            (Value::Text(_), DataType::Date)
337            | (Value::Text(_), DataType::Time)
338            | (Value::Text(_), DataType::Timestamp)
339            | (Value::Text(_), DataType::Interval)
340            | (Value::Text(_), DataType::Json)
341            | (Value::Text(_), DataType::Jsonb)
342            | (Value::Json(_), DataType::Jsonb)
343            | (Value::Json(_), DataType::Text)
344            | (Value::Jsonb(_), DataType::Json)
345            | (Value::Jsonb(_), DataType::Text) => self.clone().coerce_into(target),
346            (Value::Date(d), DataType::Timestamp) => (*d as i64)
347                .checked_mul(86_400_000_000)
348                .map(Value::Timestamp),
349            (Value::Timestamp(t), DataType::Date) => {
350                if t % 86_400_000_000 == 0 {
351                    let days = t.div_euclid(86_400_000_000);
352                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
353                        Some(Value::Date(days as i32))
354                    } else {
355                        None
356                    }
357                } else {
358                    None
359                }
360            }
361            _ => None,
362        }
363    }
364
365    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
366    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
367        match (self, other) {
368            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
369            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
370            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
371            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
372            _ => None,
373        }
374    }
375}
376
377impl PartialEq for Value {
378    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
379    // normalizes separately (see eval.rs).
380    fn eq(&self, other: &Self) -> bool {
381        match (self, other) {
382            (Value::Null, Value::Null) => true,
383            (Value::Integer(a), Value::Integer(b)) => a == b,
384            (Value::Real(a), Value::Real(b)) => a == b,
385            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
386            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
387            (Value::Text(a), Value::Text(b)) => a == b,
388            (Value::Blob(a), Value::Blob(b)) => a == b,
389            (Value::Boolean(a), Value::Boolean(b)) => a == b,
390            (Value::Time(a), Value::Time(b)) => a == b,
391            (Value::Date(a), Value::Date(b)) => a == b,
392            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
393            (
394                Value::Interval {
395                    months: am,
396                    days: ad,
397                    micros: au,
398                },
399                Value::Interval {
400                    months: bm,
401                    days: bd,
402                    micros: bu,
403                },
404            ) => am == bm && ad == bd && au == bu,
405            (Value::Json(a), Value::Json(b)) => a == b,
406            (Value::Jsonb(a), Value::Jsonb(b)) => a == b,
407            (Value::TsVector(a), Value::TsVector(b)) => a == b,
408            (Value::TsQuery(a), Value::TsQuery(b)) => a == b,
409            (Value::Array(a), Value::Array(b)) => a == b,
410            _ => false,
411        }
412    }
413}
414
415impl Eq for Value {}
416
417impl Hash for Value {
418    fn hash<H: Hasher>(&self, state: &mut H) {
419        match self {
420            Value::Null => 0u8.hash(state),
421            Value::Integer(i) => {
422                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
423                // matching the cross-type PartialEq contract.
424                1u8.hash(state);
425                (*i as f64).to_bits().hash(state);
426            }
427            Value::Real(r) => {
428                1u8.hash(state);
429                r.to_bits().hash(state);
430            }
431            Value::Text(s) => {
432                2u8.hash(state);
433                s.hash(state);
434            }
435            Value::Blob(b) => {
436                3u8.hash(state);
437                b.hash(state);
438            }
439            Value::Boolean(b) => {
440                4u8.hash(state);
441                b.hash(state);
442            }
443            Value::Time(t) => {
444                5u8.hash(state);
445                t.hash(state);
446            }
447            Value::Date(d) => {
448                6u8.hash(state);
449                d.hash(state);
450            }
451            Value::Timestamp(t) => {
452                7u8.hash(state);
453                t.hash(state);
454            }
455            Value::Interval {
456                months,
457                days,
458                micros,
459            } => {
460                8u8.hash(state);
461                months.hash(state);
462                days.hash(state);
463                micros.hash(state);
464            }
465            Value::Json(s) => {
466                9u8.hash(state);
467                s.hash(state);
468            }
469            Value::Jsonb(b) => {
470                10u8.hash(state);
471                b.hash(state);
472            }
473            Value::TsVector(b) => {
474                11u8.hash(state);
475                b.hash(state);
476            }
477            Value::TsQuery(b) => {
478                12u8.hash(state);
479                b.hash(state);
480            }
481            Value::Array(a) => {
482                13u8.hash(state);
483                a.hash(state);
484            }
485        }
486    }
487}
488
489impl PartialOrd for Value {
490    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
491        Some(self.cmp(other))
492    }
493}
494
495impl Ord for Value {
496    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
497    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
498    fn cmp(&self, other: &Self) -> Ordering {
499        match (self, other) {
500            (Value::Null, Value::Null) => Ordering::Equal,
501            (Value::Null, _) => Ordering::Less,
502            (_, Value::Null) => Ordering::Greater,
503
504            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
505            (Value::Boolean(_), _) => Ordering::Less,
506            (_, Value::Boolean(_)) => Ordering::Greater,
507
508            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
509                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
510            }
511            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
512            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
513
514            (Value::Time(a), Value::Time(b)) => a.cmp(b),
515            (Value::Time(_), _) => Ordering::Less,
516            (_, Value::Time(_)) => Ordering::Greater,
517
518            (Value::Date(a), Value::Date(b)) => a.cmp(b),
519            (Value::Date(_), _) => Ordering::Less,
520            (_, Value::Date(_)) => Ordering::Greater,
521
522            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
523            (Value::Timestamp(_), _) => Ordering::Less,
524            (_, Value::Timestamp(_)) => Ordering::Greater,
525
526            (
527                Value::Interval {
528                    months: am,
529                    days: ad,
530                    micros: au,
531                },
532                Value::Interval {
533                    months: bm,
534                    days: bd,
535                    micros: bu,
536                },
537            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
538            (Value::Interval { .. }, _) => Ordering::Less,
539            (_, Value::Interval { .. }) => Ordering::Greater,
540
541            (Value::Json(a), Value::Json(b)) => a.cmp(b),
542            (Value::Json(_), _) => Ordering::Less,
543            (_, Value::Json(_)) => Ordering::Greater,
544
545            (Value::Jsonb(a), Value::Jsonb(b)) => a.as_ref().cmp(b.as_ref()),
546            (Value::Jsonb(_), _) => Ordering::Less,
547            (_, Value::Jsonb(_)) => Ordering::Greater,
548
549            (Value::TsVector(a), Value::TsVector(b)) => a.as_ref().cmp(b.as_ref()),
550            (Value::TsVector(_), _) => Ordering::Less,
551            (_, Value::TsVector(_)) => Ordering::Greater,
552
553            (Value::TsQuery(a), Value::TsQuery(b)) => a.as_ref().cmp(b.as_ref()),
554            (Value::TsQuery(_), _) => Ordering::Less,
555            (_, Value::TsQuery(_)) => Ordering::Greater,
556
557            (Value::Array(a), Value::Array(b)) => a.as_ref().cmp(b.as_ref()),
558            (Value::Array(_), _) => Ordering::Less,
559            (_, Value::Array(_)) => Ordering::Greater,
560
561            (Value::Text(a), Value::Text(b)) => a.cmp(b),
562            (Value::Text(_), _) => Ordering::Less,
563            (_, Value::Text(_)) => Ordering::Greater,
564
565            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
566        }
567    }
568}
569
570impl fmt::Display for Value {
571    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
572        match self {
573            Value::Null => write!(f, "NULL"),
574            Value::Integer(i) => write!(f, "{i}"),
575            Value::Real(r) => {
576                if r.fract() == 0.0 && r.is_finite() {
577                    write!(f, "{r:.1}")
578                } else {
579                    write!(f, "{r}")
580                }
581            }
582            Value::Text(s) => write!(f, "{s}"),
583            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
584            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
585            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
586            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
587            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
588            Value::Interval {
589                months,
590                days,
591                micros,
592            } => {
593                write!(
594                    f,
595                    "{}",
596                    crate::datetime::format_interval(*months, *days, *micros)
597                )
598            }
599            Value::Json(s) => write!(f, "{s}"),
600            Value::Jsonb(b) => match crate::json::decode_to_text(b) {
601                Ok(s) => write!(f, "{s}"),
602                Err(_) => write!(f, "<invalid jsonb>"),
603            },
604            Value::TsVector(b) => write!(f, "{}", crate::fts::tsvector_display(b)),
605            Value::TsQuery(b) => write!(f, "{}", crate::fts::tsquery_display(b)),
606            Value::Array(a) => {
607                write!(f, "{{")?;
608                for (i, elem) in a.iter().enumerate() {
609                    if i > 0 {
610                        write!(f, ",")?;
611                    }
612                    match elem {
613                        Value::Null => write!(f, "NULL")?,
614                        Value::Text(s) => {
615                            write!(f, "\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))?
616                        }
617                        other => write!(f, "{other}")?,
618                    }
619                }
620                write!(f, "}}")
621            }
622        }
623    }
624}
625
626fn hex_encode(data: &[u8]) -> String {
627    let mut s = String::with_capacity(data.len() * 2);
628    for byte in data {
629        s.push_str(&format!("{byte:02X}"));
630    }
631    s
632}
633
634#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
635#[repr(u8)]
636pub enum Collation {
637    #[default]
638    Binary = 0,
639    NoCase = 1,
640    Rtrim = 2,
641}
642
643impl Collation {
644    pub fn from_tag(tag: u8) -> Option<Self> {
645        match tag {
646            0 => Some(Self::Binary),
647            1 => Some(Self::NoCase),
648            2 => Some(Self::Rtrim),
649            _ => None,
650        }
651    }
652
653    pub fn from_name(name: &str) -> Option<Self> {
654        match name.to_ascii_uppercase().as_str() {
655            "BINARY" => Some(Self::Binary),
656            "NOCASE" => Some(Self::NoCase),
657            "RTRIM" => Some(Self::Rtrim),
658            _ => None,
659        }
660    }
661
662    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
663        match self {
664            Collation::Binary => a.cmp(b),
665            Collation::NoCase => Iterator::cmp(
666                a.chars().map(|c| c.to_ascii_lowercase()),
667                b.chars().map(|c| c.to_ascii_lowercase()),
668            ),
669            Collation::Rtrim => {
670                let la = a.trim_end_matches(' ');
671                let lb = b.trim_end_matches(' ');
672                la.cmp(lb)
673            }
674        }
675    }
676
677    pub fn eq_text(self, a: &str, b: &str) -> bool {
678        match self {
679            Collation::Binary => a == b,
680            Collation::NoCase => a.eq_ignore_ascii_case(b),
681            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
682        }
683    }
684}
685
686#[derive(Debug, Clone)]
687pub struct ColumnDef {
688    pub name: String,
689    pub data_type: DataType,
690    pub nullable: bool,
691    pub position: u16,
692    pub default_expr: Option<Expr>,
693    pub default_sql: Option<String>,
694    pub check_expr: Option<Expr>,
695    pub check_sql: Option<String>,
696    pub check_name: Option<String>,
697    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
698    pub is_with_timezone: bool,
699    pub generated_expr: Option<Expr>,
700    pub generated_sql: Option<String>,
701    pub generated_kind: Option<crate::parser::GeneratedKind>,
702    pub collation: Collation,
703}
704
705#[derive(Debug, Clone, Copy, PartialEq, Eq)]
706pub enum GinOpsClass {
707    /// One entry per (key, value) pair; supports `@>` `?` `?|` `?&`.
708    JsonbOps,
709    /// One entry per hash(path‖value); supports `@>` only, ~3x smaller index.
710    JsonbPathOps,
711}
712
713impl GinOpsClass {
714    pub fn as_tag(self) -> u8 {
715        match self {
716            Self::JsonbOps => 0,
717            Self::JsonbPathOps => 1,
718        }
719    }
720
721    pub fn from_tag(t: u8) -> Option<Self> {
722        match t {
723            0 => Some(Self::JsonbOps),
724            1 => Some(Self::JsonbPathOps),
725            _ => None,
726        }
727    }
728}
729
730#[derive(Debug, Clone, Copy, PartialEq, Eq)]
731pub enum InvertedKind {
732    Gin(GinOpsClass),
733    Fts { config_id: u8 },
734}
735
736#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
737pub enum IndexKind {
738    #[default]
739    BTree,
740    Inverted(InvertedKind),
741}
742
743/// `IndexKey::Column` for `CREATE INDEX ON t (email)`, `IndexKey::Expr` for `LOWER(email)`.
744#[derive(Debug, Clone)]
745pub struct IndexDef {
746    pub name: String,
747    pub keys: Vec<IndexKey>,
748    pub unique: bool,
749    pub predicate_sql: Option<String>,
750    pub predicate_expr: Option<crate::parser::Expr>,
751    pub kind: IndexKind,
752}
753
754#[derive(Debug, Clone)]
755pub enum IndexKey {
756    Column {
757        idx: u16,
758        collate: Collation,
759    },
760    Expr {
761        expr: crate::parser::Expr,
762        original_sql: String,
763    },
764}
765
766impl IndexDef {
767    /// Used by FK/UNIQUE auto-indexes; expression-key indexes go through a different path.
768    pub fn from_column_lists(
769        name: String,
770        columns: Vec<u16>,
771        collations: Vec<Collation>,
772        unique: bool,
773        predicate_sql: Option<String>,
774        predicate_expr: Option<crate::parser::Expr>,
775        kind: IndexKind,
776    ) -> Self {
777        let keys = if collations.is_empty() {
778            columns
779                .into_iter()
780                .map(|idx| IndexKey::Column {
781                    idx,
782                    collate: Collation::Binary,
783                })
784                .collect()
785        } else {
786            columns
787                .into_iter()
788                .zip(collations)
789                .map(|(idx, collate)| IndexKey::Column { idx, collate })
790                .collect()
791        };
792        Self {
793            name,
794            keys,
795            unique,
796            predicate_sql,
797            predicate_expr,
798            kind,
799        }
800    }
801
802    /// Expression keys are skipped (positions only come from `IndexKey::Column`).
803    pub fn columns_vec(&self) -> Vec<u16> {
804        self.keys
805            .iter()
806            .filter_map(|k| match k {
807                IndexKey::Column { idx, .. } => Some(*idx),
808                IndexKey::Expr { .. } => None,
809            })
810            .collect()
811    }
812
813    /// Expression keys default to Binary.
814    pub fn collations_vec(&self) -> Vec<Collation> {
815        self.keys
816            .iter()
817            .map(|k| match k {
818                IndexKey::Column { collate, .. } => *collate,
819                IndexKey::Expr { .. } => Collation::Binary,
820            })
821            .collect()
822    }
823
824    pub fn column_positions_iter(&self) -> impl Iterator<Item = u16> + '_ {
825        self.keys.iter().filter_map(|k| match k {
826            IndexKey::Column { idx, .. } => Some(*idx),
827            IndexKey::Expr { .. } => None,
828        })
829    }
830
831    pub fn collation_at(&self, i: usize) -> Collation {
832        match self.keys.get(i) {
833            Some(IndexKey::Column { collate, .. }) => *collate,
834            _ => Collation::Binary,
835        }
836    }
837
838    pub fn is_pure_column_index(&self) -> bool {
839        self.keys
840            .iter()
841            .all(|k| matches!(k, IndexKey::Column { .. }))
842    }
843}
844
845#[derive(Debug, Clone)]
846pub struct ViewDef {
847    pub name: String,
848    pub sql: String,
849    pub column_aliases: Vec<String>,
850}
851
852const VIEW_DEF_VERSION: u8 = 1;
853
854impl ViewDef {
855    pub fn serialize(&self) -> Vec<u8> {
856        let mut buf = Vec::new();
857        buf.push(VIEW_DEF_VERSION);
858
859        let name_bytes = self.name.as_bytes();
860        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
861        buf.extend_from_slice(name_bytes);
862
863        let sql_bytes = self.sql.as_bytes();
864        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
865        buf.extend_from_slice(sql_bytes);
866
867        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
868        for alias in &self.column_aliases {
869            let alias_bytes = alias.as_bytes();
870            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
871            buf.extend_from_slice(alias_bytes);
872        }
873
874        buf
875    }
876
877    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
878        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
879            return Err(crate::error::SqlError::InvalidValue(
880                "invalid view definition version".into(),
881            ));
882        }
883        let mut pos = 1;
884
885        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
886        pos += 2;
887        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
888        pos += name_len;
889
890        let sql_len =
891            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
892        pos += 4;
893        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
894        pos += sql_len;
895
896        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
897        pos += 2;
898        let mut column_aliases = Vec::with_capacity(alias_count);
899        for _ in 0..alias_count {
900            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
901            pos += 2;
902            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
903            pos += alias_len;
904            column_aliases.push(alias);
905        }
906
907        Ok(Self {
908            name,
909            sql,
910            column_aliases,
911        })
912    }
913}
914
915/// Backing table shares the matview's name and is repopulated on REFRESH.
916#[derive(Debug, Clone)]
917pub struct MatviewDef {
918    pub name: String,
919    pub select_sql: String,
920    pub backing_table: String,
921    pub with_data: bool,
922    pub created_at_micros: i64,
923}
924
925const MATVIEW_DEF_VERSION: u8 = 1;
926
927impl MatviewDef {
928    pub fn backing_table_name(name: &str) -> String {
929        name.to_ascii_lowercase()
930    }
931
932    pub fn serialize(&self) -> Vec<u8> {
933        let mut buf = Vec::new();
934        buf.push(MATVIEW_DEF_VERSION);
935        write_short_str(&mut buf, &self.name);
936        write_long_str(&mut buf, &self.select_sql);
937        write_short_str(&mut buf, &self.backing_table);
938        buf.push(if self.with_data { 1 } else { 0 });
939        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
940        buf
941    }
942
943    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
944        if data.is_empty() || data[0] != MATVIEW_DEF_VERSION {
945            return Err(crate::error::SqlError::InvalidValue(
946                "invalid matview definition version".into(),
947            ));
948        }
949        let mut pos = 1usize;
950        let name = read_short_str(data, &mut pos);
951        let select_sql = read_long_str(data, &mut pos);
952        let backing_table = read_short_str(data, &mut pos);
953        let with_data = data[pos] != 0;
954        pos += 1;
955        let created_at_micros = i64::from_le_bytes([
956            data[pos],
957            data[pos + 1],
958            data[pos + 2],
959            data[pos + 3],
960            data[pos + 4],
961            data[pos + 5],
962            data[pos + 6],
963            data[pos + 7],
964        ]);
965        Ok(Self {
966            name,
967            select_sql,
968            backing_table,
969            with_data,
970            created_at_micros,
971        })
972    }
973}
974
975#[derive(Debug, Clone)]
976pub struct TriggerDef {
977    pub name: String,
978    pub timing: crate::parser::TriggerTiming,
979    pub events: Vec<crate::parser::TriggerEvent>,
980    pub target: String,
981    pub granularity: crate::parser::TriggerGranularity,
982    pub referencing: Option<crate::parser::TransitionTables>,
983    pub when_sql: Option<String>,
984    pub body_sql: String,
985    pub enabled: bool,
986    pub created_at_micros: i64,
987}
988
989const TRIGGER_DEF_VERSION: u8 = 1;
990
991impl TriggerDef {
992    pub fn serialize(&self) -> Vec<u8> {
993        let mut buf = Vec::new();
994        buf.push(TRIGGER_DEF_VERSION);
995
996        write_short_str(&mut buf, &self.name);
997        buf.push(match self.timing {
998            crate::parser::TriggerTiming::Before => 0,
999            crate::parser::TriggerTiming::After => 1,
1000            crate::parser::TriggerTiming::InsteadOf => 2,
1001        });
1002
1003        buf.extend_from_slice(&(self.events.len() as u16).to_le_bytes());
1004        for ev in &self.events {
1005            match ev {
1006                crate::parser::TriggerEvent::Insert => buf.push(0),
1007                crate::parser::TriggerEvent::Delete => buf.push(1),
1008                crate::parser::TriggerEvent::Update(cols) => {
1009                    buf.push(2);
1010                    buf.extend_from_slice(&(cols.len() as u16).to_le_bytes());
1011                    for c in cols {
1012                        write_short_str(&mut buf, c);
1013                    }
1014                }
1015            }
1016        }
1017
1018        write_short_str(&mut buf, &self.target);
1019        buf.push(match self.granularity {
1020            crate::parser::TriggerGranularity::ForEachRow => 0,
1021            crate::parser::TriggerGranularity::ForEachStatement => 1,
1022        });
1023
1024        match &self.referencing {
1025            None => buf.push(0),
1026            Some(r) => {
1027                buf.push(1);
1028                write_opt_string(&mut buf, &r.new_table_alias);
1029                write_opt_string(&mut buf, &r.old_table_alias);
1030            }
1031        }
1032
1033        match &self.when_sql {
1034            None => buf.push(0),
1035            Some(s) => {
1036                buf.push(1);
1037                write_long_str(&mut buf, s);
1038            }
1039        }
1040
1041        write_long_str(&mut buf, &self.body_sql);
1042        buf.push(if self.enabled { 1 } else { 0 });
1043        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
1044
1045        buf
1046    }
1047
1048    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1049        if data.is_empty() || data[0] != TRIGGER_DEF_VERSION {
1050            return Err(crate::error::SqlError::InvalidValue(
1051                "invalid trigger definition version".into(),
1052            ));
1053        }
1054        let mut pos = 1;
1055        let name = read_short_str(data, &mut pos);
1056        let timing = match data[pos] {
1057            0 => crate::parser::TriggerTiming::Before,
1058            1 => crate::parser::TriggerTiming::After,
1059            2 => crate::parser::TriggerTiming::InsteadOf,
1060            _ => {
1061                return Err(crate::error::SqlError::InvalidValue(
1062                    "invalid trigger timing tag".into(),
1063                ))
1064            }
1065        };
1066        pos += 1;
1067
1068        let event_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1069        pos += 2;
1070        let mut events = Vec::with_capacity(event_count);
1071        for _ in 0..event_count {
1072            let tag = data[pos];
1073            pos += 1;
1074            let ev = match tag {
1075                0 => crate::parser::TriggerEvent::Insert,
1076                1 => crate::parser::TriggerEvent::Delete,
1077                2 => {
1078                    let cnt = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1079                    pos += 2;
1080                    let mut cols = Vec::with_capacity(cnt);
1081                    for _ in 0..cnt {
1082                        cols.push(read_short_str(data, &mut pos));
1083                    }
1084                    crate::parser::TriggerEvent::Update(cols)
1085                }
1086                _ => {
1087                    return Err(crate::error::SqlError::InvalidValue(
1088                        "invalid trigger event tag".into(),
1089                    ))
1090                }
1091            };
1092            events.push(ev);
1093        }
1094
1095        let target = read_short_str(data, &mut pos);
1096        let granularity = match data[pos] {
1097            0 => crate::parser::TriggerGranularity::ForEachRow,
1098            1 => crate::parser::TriggerGranularity::ForEachStatement,
1099            _ => {
1100                return Err(crate::error::SqlError::InvalidValue(
1101                    "invalid trigger granularity tag".into(),
1102                ))
1103            }
1104        };
1105        pos += 1;
1106
1107        let referencing = if data[pos] == 0 {
1108            pos += 1;
1109            None
1110        } else {
1111            pos += 1;
1112            let new_table_alias = read_opt_string(data, &mut pos);
1113            let old_table_alias = read_opt_string(data, &mut pos);
1114            Some(crate::parser::TransitionTables {
1115                new_table_alias,
1116                old_table_alias,
1117            })
1118        };
1119
1120        let when_sql = if data[pos] == 0 {
1121            pos += 1;
1122            None
1123        } else {
1124            pos += 1;
1125            Some(read_long_str(data, &mut pos))
1126        };
1127
1128        let body_sql = read_long_str(data, &mut pos);
1129        let enabled = data[pos] != 0;
1130        pos += 1;
1131        let created_at_micros = i64::from_le_bytes([
1132            data[pos],
1133            data[pos + 1],
1134            data[pos + 2],
1135            data[pos + 3],
1136            data[pos + 4],
1137            data[pos + 5],
1138            data[pos + 6],
1139            data[pos + 7],
1140        ]);
1141
1142        Ok(Self {
1143            name,
1144            timing,
1145            events,
1146            target,
1147            granularity,
1148            referencing,
1149            when_sql,
1150            body_sql,
1151            enabled,
1152            created_at_micros,
1153        })
1154    }
1155}
1156
1157fn write_short_str(buf: &mut Vec<u8>, s: &str) {
1158    let bytes = s.as_bytes();
1159    buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1160    buf.extend_from_slice(bytes);
1161}
1162
1163fn read_short_str(data: &[u8], pos: &mut usize) -> String {
1164    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1165    *pos += 2;
1166    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1167    *pos += len;
1168    s
1169}
1170
1171fn write_long_str(buf: &mut Vec<u8>, s: &str) {
1172    let bytes = s.as_bytes();
1173    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1174    buf.extend_from_slice(bytes);
1175}
1176
1177fn read_long_str(data: &[u8], pos: &mut usize) -> String {
1178    let len =
1179        u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]) as usize;
1180    *pos += 4;
1181    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1182    *pos += len;
1183    s
1184}
1185
1186#[derive(Debug, Clone)]
1187pub struct TableCheckDef {
1188    pub name: Option<String>,
1189    pub expr: Expr,
1190    pub sql: String,
1191}
1192
1193#[derive(Debug, Clone)]
1194pub struct ForeignKeySchemaEntry {
1195    pub name: Option<String>,
1196    pub columns: Vec<u16>,
1197    pub foreign_table: String,
1198    pub referred_columns: Vec<String>,
1199    pub on_delete: crate::parser::ReferentialAction,
1200    pub on_update: crate::parser::ReferentialAction,
1201    pub deferrable: bool,
1202    pub initially_deferred: bool,
1203}
1204
1205#[derive(Debug)]
1206pub struct TableSchema {
1207    pub name: String,
1208    pub columns: Vec<ColumnDef>,
1209    pub primary_key_columns: Vec<u16>,
1210    pub indices: Vec<IndexDef>,
1211    pub check_constraints: Vec<TableCheckDef>,
1212    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
1213    pub flags: u8,
1214    pk_idx_cache: Vec<usize>,
1215    non_pk_idx_cache: Vec<usize>,
1216    /// Sorted physical slots dropped via DROP COLUMN.
1217    dropped_non_pk_slots: Vec<u16>,
1218    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1219    decode_mapping_cache: Vec<usize>,
1220    /// Logical non-PK order -> physical encoding position.
1221    encoding_positions_cache: Vec<u16>,
1222    has_virtual_columns_cache: bool,
1223    column_map_cache: std::sync::OnceLock<crate::eval::ColumnMap>,
1224}
1225
1226impl Clone for TableSchema {
1227    fn clone(&self) -> Self {
1228        Self {
1229            name: self.name.clone(),
1230            columns: self.columns.clone(),
1231            primary_key_columns: self.primary_key_columns.clone(),
1232            indices: self.indices.clone(),
1233            check_constraints: self.check_constraints.clone(),
1234            foreign_keys: self.foreign_keys.clone(),
1235            flags: self.flags,
1236            pk_idx_cache: self.pk_idx_cache.clone(),
1237            non_pk_idx_cache: self.non_pk_idx_cache.clone(),
1238            dropped_non_pk_slots: self.dropped_non_pk_slots.clone(),
1239            decode_mapping_cache: self.decode_mapping_cache.clone(),
1240            encoding_positions_cache: self.encoding_positions_cache.clone(),
1241            has_virtual_columns_cache: self.has_virtual_columns_cache,
1242            column_map_cache: std::sync::OnceLock::new(),
1243        }
1244    }
1245}
1246
1247impl TableSchema {
1248    pub fn new(
1249        name: String,
1250        columns: Vec<ColumnDef>,
1251        primary_key_columns: Vec<u16>,
1252        indices: Vec<IndexDef>,
1253        check_constraints: Vec<TableCheckDef>,
1254        foreign_keys: Vec<ForeignKeySchemaEntry>,
1255    ) -> Self {
1256        Self::with_drops(
1257            name,
1258            columns,
1259            primary_key_columns,
1260            indices,
1261            check_constraints,
1262            foreign_keys,
1263            vec![],
1264        )
1265    }
1266
1267    pub fn with_drops(
1268        name: String,
1269        columns: Vec<ColumnDef>,
1270        primary_key_columns: Vec<u16>,
1271        indices: Vec<IndexDef>,
1272        check_constraints: Vec<TableCheckDef>,
1273        foreign_keys: Vec<ForeignKeySchemaEntry>,
1274        dropped_non_pk_slots: Vec<u16>,
1275    ) -> Self {
1276        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
1277        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
1278            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
1279            .collect();
1280
1281        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
1282        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
1283        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
1284
1285        let mut drop_idx = 0;
1286        let mut live_idx = 0;
1287        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
1288            if drop_idx < dropped_non_pk_slots.len()
1289                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
1290            {
1291                drop_idx += 1;
1292            } else {
1293                *slot = non_pk_idx_cache[live_idx];
1294                encoding_positions_cache.push(phys_pos as u16);
1295                live_idx += 1;
1296            }
1297        }
1298
1299        let has_virtual_columns_cache = columns.iter().any(|c| {
1300            matches!(
1301                c.generated_kind,
1302                Some(crate::parser::GeneratedKind::Virtual)
1303            )
1304        });
1305
1306        Self {
1307            name,
1308            columns,
1309            primary_key_columns,
1310            indices,
1311            check_constraints,
1312            foreign_keys,
1313            flags: 0,
1314            pk_idx_cache,
1315            non_pk_idx_cache,
1316            dropped_non_pk_slots,
1317            decode_mapping_cache,
1318            encoding_positions_cache,
1319            has_virtual_columns_cache,
1320            column_map_cache: std::sync::OnceLock::new(),
1321        }
1322    }
1323
1324    #[inline]
1325    pub fn column_map(&self) -> &crate::eval::ColumnMap {
1326        self.column_map_cache
1327            .get_or_init(|| crate::eval::ColumnMap::new(&self.columns))
1328    }
1329
1330    pub fn is_strict(&self) -> bool {
1331        self.flags & TABLE_FLAG_STRICT != 0
1332    }
1333
1334    pub fn has_virtual_columns(&self) -> bool {
1335        self.has_virtual_columns_cache
1336    }
1337
1338    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
1339    pub fn rebuild(self) -> Self {
1340        let drops = self.dropped_non_pk_slots;
1341        Self::with_drops(
1342            self.name,
1343            self.columns,
1344            self.primary_key_columns,
1345            self.indices,
1346            self.check_constraints,
1347            self.foreign_keys,
1348            drops,
1349        )
1350    }
1351
1352    pub fn has_checks(&self) -> bool {
1353        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
1354    }
1355
1356    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1357    pub fn decode_col_mapping(&self) -> &[usize] {
1358        &self.decode_mapping_cache
1359    }
1360
1361    /// Logical non-PK order -> physical encoding position.
1362    pub fn encoding_positions(&self) -> &[u16] {
1363        &self.encoding_positions_cache
1364    }
1365
1366    /// Total physical non-PK column count (live + dropped slots).
1367    pub fn physical_non_pk_count(&self) -> usize {
1368        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
1369    }
1370
1371    pub fn dropped_non_pk_slots(&self) -> &[u16] {
1372        &self.dropped_non_pk_slots
1373    }
1374
1375    pub fn without_column(&self, drop_pos: usize) -> Self {
1376        let non_pk_order = self
1377            .non_pk_idx_cache
1378            .iter()
1379            .position(|&i| i == drop_pos)
1380            .expect("cannot drop PK column via without_column");
1381        let physical_slot = self.encoding_positions_cache[non_pk_order];
1382
1383        let mut new_dropped = self.dropped_non_pk_slots.clone();
1384        new_dropped.push(physical_slot);
1385        new_dropped.sort();
1386
1387        let dropped_name = &self.columns[drop_pos].name;
1388        let drop_pos_u16 = drop_pos as u16;
1389
1390        let mut columns: Vec<ColumnDef> = self
1391            .columns
1392            .iter()
1393            .enumerate()
1394            .filter(|(i, _)| *i != drop_pos)
1395            .map(|(_, c)| {
1396                let mut col = c.clone();
1397                if col.position > drop_pos_u16 {
1398                    col.position -= 1;
1399                }
1400                col
1401            })
1402            .collect();
1403        for (i, col) in columns.iter_mut().enumerate() {
1404            col.position = i as u16;
1405        }
1406
1407        let primary_key_columns: Vec<u16> = self
1408            .primary_key_columns
1409            .iter()
1410            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1411            .collect();
1412
1413        let indices: Vec<IndexDef> = self
1414            .indices
1415            .iter()
1416            .map(|idx| IndexDef {
1417                name: idx.name.clone(),
1418                keys: idx
1419                    .keys
1420                    .iter()
1421                    .map(|k| match k {
1422                        IndexKey::Column { idx, collate } => IndexKey::Column {
1423                            idx: if *idx > drop_pos_u16 { *idx - 1 } else { *idx },
1424                            collate: *collate,
1425                        },
1426                        IndexKey::Expr { expr, original_sql } => IndexKey::Expr {
1427                            expr: expr.clone(),
1428                            original_sql: original_sql.clone(),
1429                        },
1430                    })
1431                    .collect(),
1432                unique: idx.unique,
1433                predicate_sql: idx.predicate_sql.clone(),
1434                predicate_expr: idx.predicate_expr.clone(),
1435                kind: idx.kind,
1436            })
1437            .collect();
1438
1439        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
1440            .foreign_keys
1441            .iter()
1442            .map(|fk| ForeignKeySchemaEntry {
1443                name: fk.name.clone(),
1444                columns: fk
1445                    .columns
1446                    .iter()
1447                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
1448                    .collect(),
1449                foreign_table: fk.foreign_table.clone(),
1450                referred_columns: fk.referred_columns.clone(),
1451                on_delete: fk.on_delete,
1452                on_update: fk.on_update,
1453                deferrable: fk.deferrable,
1454                initially_deferred: fk.initially_deferred,
1455            })
1456            .collect();
1457
1458        // Filter out table-level CHECKs that reference the dropped column
1459        let dropped_lower = dropped_name.to_ascii_lowercase();
1460        let check_constraints: Vec<TableCheckDef> = self
1461            .check_constraints
1462            .iter()
1463            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
1464            .cloned()
1465            .collect();
1466
1467        Self::with_drops(
1468            self.name.clone(),
1469            columns,
1470            primary_key_columns,
1471            indices,
1472            check_constraints,
1473            foreign_keys,
1474            new_dropped,
1475        )
1476    }
1477}
1478
1479const SCHEMA_VERSION: u8 = 12;
1480pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
1481
1482fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
1483    match s {
1484        Some(s) => {
1485            let bytes = s.as_bytes();
1486            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1487            buf.extend_from_slice(bytes);
1488        }
1489        None => buf.extend_from_slice(&0u16.to_le_bytes()),
1490    }
1491}
1492
1493fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
1494    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1495    *pos += 2;
1496    if len == 0 {
1497        None
1498    } else {
1499        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1500        *pos += len;
1501        Some(s)
1502    }
1503}
1504
1505fn read_string(data: &[u8], pos: &mut usize) -> String {
1506    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1507    *pos += 2;
1508    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1509    *pos += len;
1510    s
1511}
1512
1513impl TableSchema {
1514    pub fn serialize(&self) -> Vec<u8> {
1515        let mut buf = Vec::new();
1516        buf.push(SCHEMA_VERSION);
1517
1518        let name_bytes = self.name.as_bytes();
1519        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
1520        buf.extend_from_slice(name_bytes);
1521
1522        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
1523
1524        for col in &self.columns {
1525            let col_name = col.name.as_bytes();
1526            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
1527            buf.extend_from_slice(col_name);
1528            buf.push(col.data_type.type_tag());
1529            buf.push(if col.nullable { 1 } else { 0 });
1530            buf.extend_from_slice(&col.position.to_le_bytes());
1531        }
1532
1533        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
1534        for &pk_idx in &self.primary_key_columns {
1535            buf.extend_from_slice(&pk_idx.to_le_bytes());
1536        }
1537
1538        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
1539        for idx in &self.indices {
1540            let idx_name = idx.name.as_bytes();
1541            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
1542            buf.extend_from_slice(idx_name);
1543            buf.extend_from_slice(&(idx.keys.len() as u16).to_le_bytes());
1544            for key in &idx.keys {
1545                let col_idx = match key {
1546                    IndexKey::Column { idx, .. } => *idx,
1547                    IndexKey::Expr { .. } => u16::MAX,
1548                };
1549                buf.extend_from_slice(&col_idx.to_le_bytes());
1550            }
1551            buf.push(if idx.unique { 1 } else { 0 });
1552        }
1553
1554        for col in &self.columns {
1555            let mut flags: u8 = 0;
1556            if col.default_sql.is_some() {
1557                flags |= 1;
1558            }
1559            if col.check_sql.is_some() {
1560                flags |= 2;
1561            }
1562            buf.push(flags);
1563            if let Some(ref sql) = col.default_sql {
1564                let bytes = sql.as_bytes();
1565                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1566                buf.extend_from_slice(bytes);
1567            }
1568            if let Some(ref sql) = col.check_sql {
1569                let bytes = sql.as_bytes();
1570                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1571                buf.extend_from_slice(bytes);
1572                write_opt_string(&mut buf, &col.check_name);
1573            }
1574        }
1575
1576        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1577        for chk in &self.check_constraints {
1578            write_opt_string(&mut buf, &chk.name);
1579            let sql_bytes = chk.sql.as_bytes();
1580            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1581            buf.extend_from_slice(sql_bytes);
1582        }
1583
1584        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1585        for fk in &self.foreign_keys {
1586            write_opt_string(&mut buf, &fk.name);
1587            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1588            for &col_idx in &fk.columns {
1589                buf.extend_from_slice(&col_idx.to_le_bytes());
1590            }
1591            let ft_bytes = fk.foreign_table.as_bytes();
1592            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1593            buf.extend_from_slice(ft_bytes);
1594            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1595            for rc in &fk.referred_columns {
1596                let rc_bytes = rc.as_bytes();
1597                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1598                buf.extend_from_slice(rc_bytes);
1599            }
1600        }
1601
1602        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1603        for &slot in &self.dropped_non_pk_slots {
1604            buf.extend_from_slice(&slot.to_le_bytes());
1605        }
1606
1607        for col in &self.columns {
1608            let kind_tag: u8 = match col.generated_kind {
1609                None => 0,
1610                Some(crate::parser::GeneratedKind::Stored) => 1,
1611                Some(crate::parser::GeneratedKind::Virtual) => 2,
1612            };
1613            buf.push(kind_tag);
1614            if kind_tag != 0 {
1615                let sql = col.generated_sql.as_deref().unwrap_or("");
1616                let bytes = sql.as_bytes();
1617                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1618                buf.extend_from_slice(bytes);
1619            }
1620        }
1621
1622        for idx in &self.indices {
1623            match &idx.predicate_sql {
1624                Some(sql) => {
1625                    buf.push(1);
1626                    let bytes = sql.as_bytes();
1627                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1628                    buf.extend_from_slice(bytes);
1629                }
1630                None => buf.push(0),
1631            }
1632        }
1633
1634        for fk in &self.foreign_keys {
1635            buf.push(fk.on_delete as u8);
1636            buf.push(fk.on_update as u8);
1637        }
1638
1639        for fk in &self.foreign_keys {
1640            let mut flags: u8 = 0;
1641            if fk.deferrable {
1642                flags |= 0b01;
1643            }
1644            if fk.initially_deferred {
1645                flags |= 0b10;
1646            }
1647            buf.push(flags);
1648        }
1649
1650        for col in &self.columns {
1651            buf.push(col.collation as u8);
1652        }
1653        for idx in &self.indices {
1654            let n = idx.keys.len() as u16;
1655            buf.extend_from_slice(&n.to_le_bytes());
1656            for key in &idx.keys {
1657                let c = match key {
1658                    IndexKey::Column { collate, .. } => *collate,
1659                    IndexKey::Expr { .. } => Collation::Binary,
1660                };
1661                buf.push(c as u8);
1662            }
1663        }
1664        for idx in &self.indices {
1665            match idx.kind {
1666                IndexKind::BTree => buf.push(0),
1667                IndexKind::Inverted(InvertedKind::Gin(ops)) => {
1668                    buf.push(1);
1669                    buf.push(ops.as_tag());
1670                }
1671                IndexKind::Inverted(InvertedKind::Fts { config_id }) => {
1672                    buf.push(2);
1673                    buf.push(config_id);
1674                }
1675            }
1676        }
1677        buf.push(self.flags);
1678
1679        // v12: per-index expression-key extension. Emit (position, SQL) for each Expr key.
1680        // v11 readers stop before this section; v12 readers consume it.
1681        for idx in &self.indices {
1682            let expr_count = idx
1683                .keys
1684                .iter()
1685                .filter(|k| matches!(k, IndexKey::Expr { .. }))
1686                .count() as u16;
1687            buf.extend_from_slice(&expr_count.to_le_bytes());
1688            for (pos, key) in idx.keys.iter().enumerate() {
1689                if let IndexKey::Expr { original_sql, .. } = key {
1690                    buf.extend_from_slice(&(pos as u16).to_le_bytes());
1691                    let bytes = original_sql.as_bytes();
1692                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1693                    buf.extend_from_slice(bytes);
1694                }
1695            }
1696        }
1697
1698        buf
1699    }
1700
1701    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1702        let mut pos = 0;
1703
1704        if data.is_empty()
1705            || !matches!(
1706                data[0],
1707                1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | SCHEMA_VERSION
1708            )
1709        {
1710            return Err(crate::error::SqlError::InvalidValue(
1711                "invalid schema version".into(),
1712            ));
1713        }
1714        let version = data[0];
1715        pos += 1;
1716
1717        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1718        pos += 2;
1719        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1720        pos += name_len;
1721
1722        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1723        pos += 2;
1724
1725        let mut columns = Vec::with_capacity(col_count);
1726        for _ in 0..col_count {
1727            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1728            pos += 2;
1729            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1730            pos += col_name_len;
1731            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
1732                crate::error::SqlError::InvalidValue("unknown data type tag".into())
1733            })?;
1734            pos += 1;
1735            let nullable = data[pos] != 0;
1736            pos += 1;
1737            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1738            pos += 2;
1739            columns.push(ColumnDef {
1740                name: col_name,
1741                data_type,
1742                nullable,
1743                position,
1744                default_expr: None,
1745                default_sql: None,
1746                check_expr: None,
1747                check_sql: None,
1748                check_name: None,
1749                is_with_timezone: false,
1750                generated_expr: None,
1751                generated_sql: None,
1752                generated_kind: None,
1753                collation: Collation::Binary,
1754            });
1755        }
1756
1757        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1758        pos += 2;
1759        let mut primary_key_columns = Vec::with_capacity(pk_count);
1760        for _ in 0..pk_count {
1761            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1762            pos += 2;
1763            primary_key_columns.push(pk_idx);
1764        }
1765
1766        let indices = if version >= 2 && pos + 2 <= data.len() {
1767            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1768            pos += 2;
1769            let mut idxs = Vec::with_capacity(idx_count);
1770            for _ in 0..idx_count {
1771                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1772                pos += 2;
1773                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1774                pos += idx_name_len;
1775                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1776                pos += 2;
1777                let mut keys: Vec<IndexKey> = Vec::with_capacity(col_count);
1778                for _ in 0..col_count {
1779                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1780                    pos += 2;
1781                    // u16::MAX marks an expression key that the v12 section will fill in below.
1782                    // For v11 indexes (no expression section), this stays as a column placeholder.
1783                    keys.push(IndexKey::Column {
1784                        idx: col_idx,
1785                        collate: Collation::Binary,
1786                    });
1787                }
1788                let unique = data[pos] != 0;
1789                pos += 1;
1790                idxs.push(IndexDef {
1791                    name: idx_name,
1792                    keys,
1793                    unique,
1794                    predicate_sql: None,
1795                    predicate_expr: None,
1796                    kind: IndexKind::default(),
1797                });
1798            }
1799            idxs
1800        } else {
1801            vec![]
1802        };
1803
1804        let mut check_constraints = Vec::new();
1805        let mut foreign_keys = Vec::new();
1806
1807        if version >= 3 && pos < data.len() {
1808            for col in &mut columns {
1809                let flags = data[pos];
1810                pos += 1;
1811                if flags & 1 != 0 {
1812                    let sql = read_string(data, &mut pos);
1813                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1814                        crate::error::SqlError::InvalidValue(format!(
1815                            "cannot parse DEFAULT expression: {sql}"
1816                        ))
1817                    })?);
1818                    col.default_sql = Some(sql);
1819                }
1820                if flags & 2 != 0 {
1821                    let sql = read_string(data, &mut pos);
1822                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1823                        crate::error::SqlError::InvalidValue(format!(
1824                            "cannot parse CHECK expression: {sql}"
1825                        ))
1826                    })?);
1827                    col.check_sql = Some(sql);
1828                    col.check_name = read_opt_string(data, &mut pos);
1829                }
1830            }
1831
1832            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1833            pos += 2;
1834            for _ in 0..chk_count {
1835                let name = read_opt_string(data, &mut pos);
1836                let sql = read_string(data, &mut pos);
1837                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1838                    crate::error::SqlError::InvalidValue(format!(
1839                        "cannot parse CHECK expression: {sql}"
1840                    ))
1841                })?;
1842                check_constraints.push(TableCheckDef { name, expr, sql });
1843            }
1844
1845            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1846            pos += 2;
1847            for _ in 0..fk_count {
1848                let name = read_opt_string(data, &mut pos);
1849                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1850                pos += 2;
1851                let mut cols = Vec::with_capacity(col_count);
1852                for _ in 0..col_count {
1853                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1854                    pos += 2;
1855                    cols.push(col_idx);
1856                }
1857                let foreign_table = read_string(data, &mut pos);
1858                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1859                pos += 2;
1860                let mut referred_columns = Vec::with_capacity(ref_count);
1861                for _ in 0..ref_count {
1862                    referred_columns.push(read_string(data, &mut pos));
1863                }
1864                foreign_keys.push(ForeignKeySchemaEntry {
1865                    name,
1866                    columns: cols,
1867                    foreign_table,
1868                    referred_columns,
1869                    on_delete: crate::parser::ReferentialAction::NoAction,
1870                    on_update: crate::parser::ReferentialAction::NoAction,
1871                    deferrable: false,
1872                    initially_deferred: false,
1873                });
1874            }
1875        }
1876        let mut dropped_non_pk_slots = Vec::new();
1877        if version >= 4 && pos + 2 <= data.len() {
1878            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1879            pos += 2;
1880            for _ in 0..slot_count {
1881                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1882                pos += 2;
1883                dropped_non_pk_slots.push(slot);
1884            }
1885        }
1886        if version >= 5 && pos < data.len() {
1887            for col in &mut columns {
1888                let kind_tag = data[pos];
1889                pos += 1;
1890                if kind_tag != 0 {
1891                    let len = u32::from_le_bytes([
1892                        data[pos],
1893                        data[pos + 1],
1894                        data[pos + 2],
1895                        data[pos + 3],
1896                    ]) as usize;
1897                    pos += 4;
1898                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1899                    pos += len;
1900                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1901                        crate::error::SqlError::InvalidValue(format!(
1902                            "cannot parse GENERATED expression: {sql}"
1903                        ))
1904                    })?;
1905                    col.generated_sql = Some(sql);
1906                    col.generated_expr = Some(expr);
1907                    col.generated_kind = Some(match kind_tag {
1908                        1 => crate::parser::GeneratedKind::Stored,
1909                        2 => crate::parser::GeneratedKind::Virtual,
1910                        _ => {
1911                            return Err(crate::error::SqlError::InvalidValue(
1912                                "unknown GENERATED kind tag".into(),
1913                            ));
1914                        }
1915                    });
1916                }
1917            }
1918        }
1919        let mut indices = indices;
1920        if version >= 6 && pos < data.len() {
1921            for idx in &mut indices {
1922                let flag = data[pos];
1923                pos += 1;
1924                if flag == 1 {
1925                    let len = u32::from_le_bytes([
1926                        data[pos],
1927                        data[pos + 1],
1928                        data[pos + 2],
1929                        data[pos + 3],
1930                    ]) as usize;
1931                    pos += 4;
1932                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1933                    pos += len;
1934                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1935                        crate::error::SqlError::InvalidValue(format!(
1936                            "cannot parse partial-index predicate: {sql}"
1937                        ))
1938                    })?;
1939                    idx.predicate_sql = Some(sql);
1940                    idx.predicate_expr = Some(expr);
1941                }
1942            }
1943            for fk in &mut foreign_keys {
1944                fk.on_delete =
1945                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1946                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
1947                    })?;
1948                pos += 1;
1949                fk.on_update =
1950                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1951                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
1952                    })?;
1953                pos += 1;
1954            }
1955            if version >= 11 {
1956                for fk in &mut foreign_keys {
1957                    if pos >= data.len() {
1958                        break;
1959                    }
1960                    let flags = data[pos];
1961                    pos += 1;
1962                    fk.deferrable = flags & 0b01 != 0;
1963                    fk.initially_deferred = flags & 0b10 != 0;
1964                }
1965            }
1966        }
1967
1968        let mut columns = columns;
1969        let mut indices = indices;
1970        let mut flags: u8 = 0;
1971        if version >= 7 && pos < data.len() {
1972            for col in &mut columns {
1973                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
1974                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
1975                })?;
1976                pos += 1;
1977            }
1978            for idx in &mut indices {
1979                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1980                pos += 2;
1981                for i in 0..n {
1982                    let collate = Collation::from_tag(data[pos]).ok_or_else(|| {
1983                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
1984                    })?;
1985                    pos += 1;
1986                    if let Some(IndexKey::Column { collate: c, .. }) = idx.keys.get_mut(i) {
1987                        *c = collate;
1988                    }
1989                }
1990            }
1991            if version >= 9 {
1992                for idx in &mut indices {
1993                    if pos >= data.len() {
1994                        break;
1995                    }
1996                    let tag = data[pos];
1997                    pos += 1;
1998                    idx.kind = match tag {
1999                        0 => IndexKind::BTree,
2000                        1 => {
2001                            if pos >= data.len() {
2002                                return Err(crate::error::SqlError::InvalidValue(
2003                                    "GIN index missing opclass tag".into(),
2004                                ));
2005                            }
2006                            let ops = GinOpsClass::from_tag(data[pos]).ok_or_else(|| {
2007                                crate::error::SqlError::InvalidValue(
2008                                    "unknown GIN opclass tag".into(),
2009                                )
2010                            })?;
2011                            pos += 1;
2012                            IndexKind::Inverted(InvertedKind::Gin(ops))
2013                        }
2014                        2 => {
2015                            if pos >= data.len() {
2016                                return Err(crate::error::SqlError::InvalidValue(
2017                                    "FTS index missing config_id".into(),
2018                                ));
2019                            }
2020                            let config_id = data[pos];
2021                            pos += 1;
2022                            IndexKind::Inverted(InvertedKind::Fts { config_id })
2023                        }
2024                        _ => {
2025                            return Err(crate::error::SqlError::InvalidValue(
2026                                "unknown IndexKind tag".into(),
2027                            ));
2028                        }
2029                    };
2030                }
2031            }
2032            if pos < data.len() {
2033                flags = data[pos];
2034                pos += 1;
2035            }
2036            if version >= 12 {
2037                for idx in &mut indices {
2038                    if pos + 2 > data.len() {
2039                        break;
2040                    }
2041                    let expr_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2042                    pos += 2;
2043                    for _ in 0..expr_count {
2044                        if pos + 6 > data.len() {
2045                            return Err(crate::error::SqlError::InvalidValue(
2046                                "truncated index expression key".into(),
2047                            ));
2048                        }
2049                        let key_pos = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2050                        pos += 2;
2051                        let sql_len = u32::from_le_bytes([
2052                            data[pos],
2053                            data[pos + 1],
2054                            data[pos + 2],
2055                            data[pos + 3],
2056                        ]) as usize;
2057                        pos += 4;
2058                        if pos + sql_len > data.len() {
2059                            return Err(crate::error::SqlError::InvalidValue(
2060                                "truncated expression-key SQL".into(),
2061                            ));
2062                        }
2063                        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
2064                        pos += sql_len;
2065                        let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2066                            crate::error::SqlError::InvalidValue(format!(
2067                                "cannot parse index expression: {sql}"
2068                            ))
2069                        })?;
2070                        if key_pos < idx.keys.len() {
2071                            idx.keys[key_pos] = IndexKey::Expr {
2072                                expr,
2073                                original_sql: sql,
2074                            };
2075                        }
2076                    }
2077                }
2078            }
2079        }
2080        let _ = pos;
2081
2082        let mut schema = Self::with_drops(
2083            name,
2084            columns,
2085            primary_key_columns,
2086            indices,
2087            check_constraints,
2088            foreign_keys,
2089            dropped_non_pk_slots,
2090        );
2091        schema.flags = flags;
2092        Ok(schema)
2093    }
2094
2095    pub fn column_index(&self, name: &str) -> Option<usize> {
2096        self.columns
2097            .iter()
2098            .position(|c| c.name.eq_ignore_ascii_case(name))
2099    }
2100
2101    pub fn non_pk_indices(&self) -> &[usize] {
2102        &self.non_pk_idx_cache
2103    }
2104
2105    pub fn pk_indices(&self) -> &[usize] {
2106        &self.pk_idx_cache
2107    }
2108
2109    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
2110        let lower = name.to_ascii_lowercase();
2111        self.indices.iter().find(|i| i.name == lower)
2112    }
2113
2114    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
2115        format!("__idx_{table_name}_{index_name}").into_bytes()
2116    }
2117}
2118
2119#[derive(Debug)]
2120pub enum ExecutionResult {
2121    RowsAffected(u64),
2122    Query(QueryResult),
2123    Ok,
2124}
2125
2126#[derive(Debug, Clone)]
2127pub struct QueryResult {
2128    pub columns: Vec<String>,
2129    pub rows: Vec<Vec<Value>>,
2130}
2131
2132#[cfg(test)]
2133#[path = "types_tests.rs"]
2134mod tests;