Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6pub use compact_str::CompactString;
7
8use crate::parser::Expr;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22    Json,
23    Jsonb,
24    TsVector,
25    TsQuery,
26    Array,
27    Vector { dim: u16 },
28}
29
30impl DataType {
31    pub fn type_tag(self) -> u8 {
32        match self {
33            DataType::Null => 0,
34            DataType::Blob => 1,
35            DataType::Text => 2,
36            DataType::Boolean => 3,
37            DataType::Integer => 4,
38            DataType::Real => 5,
39            DataType::Time => 6,
40            DataType::Date => 7,
41            DataType::Timestamp => 8,
42            DataType::Interval => 9,
43            DataType::Json => 10,
44            DataType::Jsonb => 11,
45            DataType::TsVector => 12,
46            DataType::TsQuery => 13,
47            DataType::Array => 14,
48            DataType::Vector { .. } => 15,
49        }
50    }
51
52    /// Vector returns a `dim: 0` sentinel; the real dim is read at the schema layer.
53    pub fn from_tag(tag: u8) -> Option<Self> {
54        match tag {
55            0 => Some(DataType::Null),
56            1 => Some(DataType::Blob),
57            2 => Some(DataType::Text),
58            3 => Some(DataType::Boolean),
59            4 => Some(DataType::Integer),
60            5 => Some(DataType::Real),
61            6 => Some(DataType::Time),
62            7 => Some(DataType::Date),
63            8 => Some(DataType::Timestamp),
64            9 => Some(DataType::Interval),
65            10 => Some(DataType::Json),
66            11 => Some(DataType::Jsonb),
67            12 => Some(DataType::TsVector),
68            13 => Some(DataType::TsQuery),
69            14 => Some(DataType::Array),
70            15 => Some(DataType::Vector { dim: 0 }),
71            _ => None,
72        }
73    }
74}
75
76impl fmt::Display for DataType {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            DataType::Null => write!(f, "NULL"),
80            DataType::Integer => write!(f, "INTEGER"),
81            DataType::Real => write!(f, "REAL"),
82            DataType::Text => write!(f, "TEXT"),
83            DataType::Blob => write!(f, "BLOB"),
84            DataType::Boolean => write!(f, "BOOLEAN"),
85            DataType::Time => write!(f, "TIME"),
86            DataType::Date => write!(f, "DATE"),
87            DataType::Timestamp => write!(f, "TIMESTAMP"),
88            DataType::Interval => write!(f, "INTERVAL"),
89            DataType::Json => write!(f, "JSON"),
90            DataType::Jsonb => write!(f, "JSONB"),
91            DataType::TsVector => write!(f, "TSVECTOR"),
92            DataType::TsQuery => write!(f, "TSQUERY"),
93            DataType::Array => write!(f, "ARRAY"),
94            DataType::Vector { dim } => write!(f, "VECTOR({dim})"),
95        }
96    }
97}
98
99/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
100/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
101#[derive(Debug, Clone, Default)]
102pub enum Value {
103    #[default]
104    Null,
105    Integer(i64),
106    Real(f64),
107    Text(CompactString),
108    Blob(Vec<u8>),
109    Boolean(bool),
110    Time(i64),
111    Date(i32),
112    Timestamp(i64),
113    Interval {
114        months: i32,
115        days: i32,
116        micros: i64,
117    },
118    Json(CompactString),
119    Jsonb(Arc<[u8]>),
120    TsVector(Arc<[u8]>),
121    TsQuery(Arc<[u8]>),
122    Array(Arc<Vec<Value>>),
123    Vector(Arc<[f32]>),
124}
125
126impl Value {
127    pub fn data_type(&self) -> DataType {
128        match self {
129            Value::Null => DataType::Null,
130            Value::Integer(_) => DataType::Integer,
131            Value::Real(_) => DataType::Real,
132            Value::Text(_) => DataType::Text,
133            Value::Blob(_) => DataType::Blob,
134            Value::Boolean(_) => DataType::Boolean,
135            Value::Time(_) => DataType::Time,
136            Value::Date(_) => DataType::Date,
137            Value::Timestamp(_) => DataType::Timestamp,
138            Value::Interval { .. } => DataType::Interval,
139            Value::Json(_) => DataType::Json,
140            Value::Jsonb(_) => DataType::Jsonb,
141            Value::TsVector(_) => DataType::TsVector,
142            Value::TsQuery(_) => DataType::TsQuery,
143            Value::Array(_) => DataType::Array,
144            Value::Vector(v) => DataType::Vector {
145                dim: v.len() as u16,
146            },
147        }
148    }
149
150    pub fn is_null(&self) -> bool {
151        matches!(self, Value::Null)
152    }
153
154    pub fn is_finite_temporal(&self) -> bool {
155        match self {
156            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
157            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
158            _ => true,
159        }
160    }
161
162    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
163        match (self, target) {
164            (_, DataType::Null) => Some(Value::Null),
165            (Value::Null, _) => Some(Value::Null),
166            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
167            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
168            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
169            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
170            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
171            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
172            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
173            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
174            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
175            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
176            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
177            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
178            (Value::TsVector(b), DataType::TsVector) => Some(Value::TsVector(b.clone())),
179            (Value::TsQuery(b), DataType::TsQuery) => Some(Value::TsQuery(b.clone())),
180            (Value::Array(a), DataType::Array) => Some(Value::Array(a.clone())),
181            (
182                Value::Interval {
183                    months,
184                    days,
185                    micros,
186                },
187                DataType::Interval,
188            ) => Some(Value::Interval {
189                months: *months,
190                days: *days,
191                micros: *micros,
192            }),
193            _ => None,
194        }
195    }
196
197    pub fn coerce_into(self, target: DataType) -> Option<Value> {
198        if self.is_null() || target == DataType::Null {
199            return Some(Value::Null);
200        }
201        if self.data_type() == target {
202            return Some(self);
203        }
204        match (self, target) {
205            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
206            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
207            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
208            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
209            (Value::Text(s), DataType::Date) => {
210                crate::datetime::parse_date(&s).ok().map(Value::Date)
211            }
212            (Value::Text(s), DataType::Time) => {
213                crate::datetime::parse_time(&s).ok().map(Value::Time)
214            }
215            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
216                .ok()
217                .map(Value::Timestamp),
218            (Value::Text(s), DataType::Interval) => {
219                crate::datetime::parse_interval(&s)
220                    .ok()
221                    .map(|(m, d, u)| Value::Interval {
222                        months: m,
223                        days: d,
224                        micros: u,
225                    })
226            }
227            // INTEGER → TIMESTAMP: Unix epoch seconds.
228            (Value::Integer(n), DataType::Timestamp) => {
229                n.checked_mul(1_000_000).map(Value::Timestamp)
230            }
231            (Value::Integer(n), DataType::Date) => {
232                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
233                    Some(Value::Date(n as i32))
234                } else {
235                    None
236                }
237            }
238            (Value::Integer(n), DataType::Time) => {
239                if (0..=86_400_000_000).contains(&n) {
240                    Some(Value::Time(n))
241                } else {
242                    None
243                }
244            }
245            (Value::Integer(n), DataType::Interval) => {
246                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
247                    Some(Value::Interval {
248                        months: 0,
249                        days: n as i32,
250                        micros: 0,
251                    })
252                } else {
253                    None
254                }
255            }
256            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
257            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
258            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
259            (Value::Date(d), DataType::Timestamp) => {
260                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
261            }
262            (Value::Timestamp(t), DataType::Date) => {
263                // div_euclid floors correctly for negative µs (pre-1970).
264                let days = t.div_euclid(86_400_000_000);
265                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
266                    Some(Value::Date(days as i32))
267                } else {
268                    None
269                }
270            }
271            (v, DataType::Text)
272                if matches!(
273                    v.data_type(),
274                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
275                ) =>
276            {
277                Some(Value::Text(v.to_string().into()))
278            }
279            (Value::Text(s), DataType::Json) => {
280                crate::json::validate_text(&s).ok()?;
281                Some(Value::Json(s))
282            }
283            (Value::Text(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
284            (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
285            (Value::Json(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
286            (Value::Jsonb(b), DataType::Text) => crate::json::decode_to_text(&b)
287                .ok()
288                .map(|t| Value::Text(t.into())),
289            (Value::Jsonb(b), DataType::Json) => crate::json::decode_to_text(&b)
290                .ok()
291                .map(|t| Value::Json(t.into())),
292            _ => None,
293        }
294    }
295
296    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
297        if matches!(self, Value::Null) {
298            return Some(Value::Null);
299        }
300        if self.data_type() == target {
301            return Some(self.clone());
302        }
303        match (self, target) {
304            (Value::Integer(i), DataType::Real) => {
305                if i.unsigned_abs() <= (1u64 << 53) {
306                    Some(Value::Real(*i as f64))
307                } else {
308                    None
309                }
310            }
311            (Value::Real(r), DataType::Integer) => {
312                if r.is_finite()
313                    && r.fract() == 0.0
314                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
315                {
316                    Some(Value::Integer(*r as i64))
317                } else {
318                    None
319                }
320            }
321            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
322            (Value::Integer(i), DataType::Boolean) => match i {
323                0 => Some(Value::Boolean(false)),
324                1 => Some(Value::Boolean(true)),
325                _ => None,
326            },
327            (Value::Text(s), DataType::Integer) => {
328                let trimmed = s.as_str();
329                let parsed: i64 = trimmed.parse().ok()?;
330                if parsed.to_string() == trimmed {
331                    Some(Value::Integer(parsed))
332                } else {
333                    None
334                }
335            }
336            (Value::Text(s), DataType::Real) => {
337                let trimmed = s.as_str();
338                let parsed: f64 = trimmed.parse().ok()?;
339                if parsed.is_finite() {
340                    Some(Value::Real(parsed))
341                } else {
342                    None
343                }
344            }
345            (Value::Text(_), DataType::Date)
346            | (Value::Text(_), DataType::Time)
347            | (Value::Text(_), DataType::Timestamp)
348            | (Value::Text(_), DataType::Interval)
349            | (Value::Text(_), DataType::Json)
350            | (Value::Text(_), DataType::Jsonb)
351            | (Value::Json(_), DataType::Jsonb)
352            | (Value::Json(_), DataType::Text)
353            | (Value::Jsonb(_), DataType::Json)
354            | (Value::Jsonb(_), DataType::Text) => self.clone().coerce_into(target),
355            (Value::Date(d), DataType::Timestamp) => (*d as i64)
356                .checked_mul(86_400_000_000)
357                .map(Value::Timestamp),
358            (Value::Timestamp(t), DataType::Date) => {
359                if t % 86_400_000_000 == 0 {
360                    let days = t.div_euclid(86_400_000_000);
361                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
362                        Some(Value::Date(days as i32))
363                    } else {
364                        None
365                    }
366                } else {
367                    None
368                }
369            }
370            _ => None,
371        }
372    }
373
374    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
375    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
376        match (self, other) {
377            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
378            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
379            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
380            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
381            _ => None,
382        }
383    }
384}
385
386impl PartialEq for Value {
387    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
388    // normalizes separately (see eval.rs).
389    fn eq(&self, other: &Self) -> bool {
390        match (self, other) {
391            (Value::Null, Value::Null) => true,
392            (Value::Integer(a), Value::Integer(b)) => a == b,
393            (Value::Real(a), Value::Real(b)) => a == b,
394            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
395            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
396            (Value::Text(a), Value::Text(b)) => a == b,
397            (Value::Blob(a), Value::Blob(b)) => a == b,
398            (Value::Boolean(a), Value::Boolean(b)) => a == b,
399            (Value::Time(a), Value::Time(b)) => a == b,
400            (Value::Date(a), Value::Date(b)) => a == b,
401            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
402            (
403                Value::Interval {
404                    months: am,
405                    days: ad,
406                    micros: au,
407                },
408                Value::Interval {
409                    months: bm,
410                    days: bd,
411                    micros: bu,
412                },
413            ) => am == bm && ad == bd && au == bu,
414            (Value::Json(a), Value::Json(b)) => a == b,
415            (Value::Jsonb(a), Value::Jsonb(b)) => a == b,
416            (Value::TsVector(a), Value::TsVector(b)) => a == b,
417            (Value::TsQuery(a), Value::TsQuery(b)) => a == b,
418            (Value::Array(a), Value::Array(b)) => a == b,
419            _ => false,
420        }
421    }
422}
423
424impl Eq for Value {}
425
426impl Hash for Value {
427    fn hash<H: Hasher>(&self, state: &mut H) {
428        match self {
429            Value::Null => 0u8.hash(state),
430            Value::Integer(i) => {
431                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
432                // matching the cross-type PartialEq contract.
433                1u8.hash(state);
434                (*i as f64).to_bits().hash(state);
435            }
436            Value::Real(r) => {
437                1u8.hash(state);
438                r.to_bits().hash(state);
439            }
440            Value::Text(s) => {
441                2u8.hash(state);
442                s.hash(state);
443            }
444            Value::Blob(b) => {
445                3u8.hash(state);
446                b.hash(state);
447            }
448            Value::Boolean(b) => {
449                4u8.hash(state);
450                b.hash(state);
451            }
452            Value::Time(t) => {
453                5u8.hash(state);
454                t.hash(state);
455            }
456            Value::Date(d) => {
457                6u8.hash(state);
458                d.hash(state);
459            }
460            Value::Timestamp(t) => {
461                7u8.hash(state);
462                t.hash(state);
463            }
464            Value::Interval {
465                months,
466                days,
467                micros,
468            } => {
469                8u8.hash(state);
470                months.hash(state);
471                days.hash(state);
472                micros.hash(state);
473            }
474            Value::Json(s) => {
475                9u8.hash(state);
476                s.hash(state);
477            }
478            Value::Jsonb(b) => {
479                10u8.hash(state);
480                b.hash(state);
481            }
482            Value::TsVector(b) => {
483                11u8.hash(state);
484                b.hash(state);
485            }
486            Value::TsQuery(b) => {
487                12u8.hash(state);
488                b.hash(state);
489            }
490            Value::Array(a) => {
491                13u8.hash(state);
492                a.hash(state);
493            }
494            Value::Vector(v) => {
495                14u8.hash(state);
496                v.len().hash(state);
497                for &x in v.iter() {
498                    x.to_bits().hash(state);
499                }
500            }
501        }
502    }
503}
504
505impl PartialOrd for Value {
506    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
507        Some(self.cmp(other))
508    }
509}
510
511impl Ord for Value {
512    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
513    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
514    fn cmp(&self, other: &Self) -> Ordering {
515        match (self, other) {
516            (Value::Null, Value::Null) => Ordering::Equal,
517            (Value::Null, _) => Ordering::Less,
518            (_, Value::Null) => Ordering::Greater,
519
520            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
521            (Value::Boolean(_), _) => Ordering::Less,
522            (_, Value::Boolean(_)) => Ordering::Greater,
523
524            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
525                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
526            }
527            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
528            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
529
530            (Value::Time(a), Value::Time(b)) => a.cmp(b),
531            (Value::Time(_), _) => Ordering::Less,
532            (_, Value::Time(_)) => Ordering::Greater,
533
534            (Value::Date(a), Value::Date(b)) => a.cmp(b),
535            (Value::Date(_), _) => Ordering::Less,
536            (_, Value::Date(_)) => Ordering::Greater,
537
538            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
539            (Value::Timestamp(_), _) => Ordering::Less,
540            (_, Value::Timestamp(_)) => Ordering::Greater,
541
542            (
543                Value::Interval {
544                    months: am,
545                    days: ad,
546                    micros: au,
547                },
548                Value::Interval {
549                    months: bm,
550                    days: bd,
551                    micros: bu,
552                },
553            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
554            (Value::Interval { .. }, _) => Ordering::Less,
555            (_, Value::Interval { .. }) => Ordering::Greater,
556
557            (Value::Json(a), Value::Json(b)) => a.cmp(b),
558            (Value::Json(_), _) => Ordering::Less,
559            (_, Value::Json(_)) => Ordering::Greater,
560
561            (Value::Jsonb(a), Value::Jsonb(b)) => a.as_ref().cmp(b.as_ref()),
562            (Value::Jsonb(_), _) => Ordering::Less,
563            (_, Value::Jsonb(_)) => Ordering::Greater,
564
565            (Value::TsVector(a), Value::TsVector(b)) => a.as_ref().cmp(b.as_ref()),
566            (Value::TsVector(_), _) => Ordering::Less,
567            (_, Value::TsVector(_)) => Ordering::Greater,
568
569            (Value::TsQuery(a), Value::TsQuery(b)) => a.as_ref().cmp(b.as_ref()),
570            (Value::TsQuery(_), _) => Ordering::Less,
571            (_, Value::TsQuery(_)) => Ordering::Greater,
572
573            (Value::Array(a), Value::Array(b)) => a.as_ref().cmp(b.as_ref()),
574            (Value::Array(_), _) => Ordering::Less,
575            (_, Value::Array(_)) => Ordering::Greater,
576
577            (Value::Vector(a), Value::Vector(b)) => a.len().cmp(&b.len()).then_with(|| {
578                for (x, y) in a.iter().zip(b.iter()) {
579                    let ord = x.total_cmp(y);
580                    if ord != Ordering::Equal {
581                        return ord;
582                    }
583                }
584                Ordering::Equal
585            }),
586            (Value::Vector(_), _) => Ordering::Less,
587            (_, Value::Vector(_)) => Ordering::Greater,
588
589            (Value::Text(a), Value::Text(b)) => a.cmp(b),
590            (Value::Text(_), _) => Ordering::Less,
591            (_, Value::Text(_)) => Ordering::Greater,
592
593            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
594        }
595    }
596}
597
598impl fmt::Display for Value {
599    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600        match self {
601            Value::Null => write!(f, "NULL"),
602            Value::Integer(i) => write!(f, "{i}"),
603            Value::Real(r) => {
604                if r.fract() == 0.0 && r.is_finite() {
605                    write!(f, "{r:.1}")
606                } else {
607                    write!(f, "{r}")
608                }
609            }
610            Value::Text(s) => write!(f, "{s}"),
611            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
612            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
613            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
614            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
615            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
616            Value::Interval {
617                months,
618                days,
619                micros,
620            } => {
621                write!(
622                    f,
623                    "{}",
624                    crate::datetime::format_interval(*months, *days, *micros)
625                )
626            }
627            Value::Json(s) => write!(f, "{s}"),
628            Value::Jsonb(b) => match crate::json::decode_to_text(b) {
629                Ok(s) => write!(f, "{s}"),
630                Err(_) => write!(f, "<invalid jsonb>"),
631            },
632            Value::TsVector(b) => write!(f, "{}", crate::fts::tsvector_display(b)),
633            Value::TsQuery(b) => write!(f, "{}", crate::fts::tsquery_display(b)),
634            Value::Array(a) => {
635                write!(f, "{{")?;
636                for (i, elem) in a.iter().enumerate() {
637                    if i > 0 {
638                        write!(f, ",")?;
639                    }
640                    match elem {
641                        Value::Null => write!(f, "NULL")?,
642                        Value::Text(s) => {
643                            write!(f, "\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))?
644                        }
645                        other => write!(f, "{other}")?,
646                    }
647                }
648                write!(f, "}}")
649            }
650            Value::Vector(v) => {
651                write!(f, "[")?;
652                for (i, &x) in v.iter().enumerate() {
653                    if i > 0 {
654                        write!(f, ",")?;
655                    }
656                    write!(f, "{x}")?;
657                }
658                write!(f, "]")
659            }
660        }
661    }
662}
663
664fn hex_encode(data: &[u8]) -> String {
665    let mut s = String::with_capacity(data.len() * 2);
666    for byte in data {
667        s.push_str(&format!("{byte:02X}"));
668    }
669    s
670}
671
672#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
673#[repr(u8)]
674pub enum Collation {
675    #[default]
676    Binary = 0,
677    NoCase = 1,
678    Rtrim = 2,
679}
680
681impl Collation {
682    pub fn from_tag(tag: u8) -> Option<Self> {
683        match tag {
684            0 => Some(Self::Binary),
685            1 => Some(Self::NoCase),
686            2 => Some(Self::Rtrim),
687            _ => None,
688        }
689    }
690
691    pub fn from_name(name: &str) -> Option<Self> {
692        match name.to_ascii_uppercase().as_str() {
693            "BINARY" => Some(Self::Binary),
694            "NOCASE" => Some(Self::NoCase),
695            "RTRIM" => Some(Self::Rtrim),
696            _ => None,
697        }
698    }
699
700    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
701        match self {
702            Collation::Binary => a.cmp(b),
703            Collation::NoCase => Iterator::cmp(
704                a.chars().map(|c| c.to_ascii_lowercase()),
705                b.chars().map(|c| c.to_ascii_lowercase()),
706            ),
707            Collation::Rtrim => {
708                let la = a.trim_end_matches(' ');
709                let lb = b.trim_end_matches(' ');
710                la.cmp(lb)
711            }
712        }
713    }
714
715    pub fn eq_text(self, a: &str, b: &str) -> bool {
716        match self {
717            Collation::Binary => a == b,
718            Collation::NoCase => a.eq_ignore_ascii_case(b),
719            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
720        }
721    }
722}
723
724#[derive(Debug, Clone)]
725pub struct ColumnDef {
726    pub name: String,
727    pub data_type: DataType,
728    pub nullable: bool,
729    pub position: u16,
730    pub default_expr: Option<Expr>,
731    pub default_sql: Option<String>,
732    pub check_expr: Option<Expr>,
733    pub check_sql: Option<String>,
734    pub check_name: Option<String>,
735    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
736    pub is_with_timezone: bool,
737    pub generated_expr: Option<Expr>,
738    pub generated_sql: Option<String>,
739    pub generated_kind: Option<crate::parser::GeneratedKind>,
740    pub collation: Collation,
741}
742
743#[derive(Debug, Clone, Copy, PartialEq, Eq)]
744pub enum GinOpsClass {
745    /// One entry per (key, value) pair; supports `@>` `?` `?|` `?&`.
746    JsonbOps,
747    /// One entry per hash(path‖value); supports `@>` only, ~3x smaller index.
748    JsonbPathOps,
749}
750
751impl GinOpsClass {
752    pub fn as_tag(self) -> u8 {
753        match self {
754            Self::JsonbOps => 0,
755            Self::JsonbPathOps => 1,
756        }
757    }
758
759    pub fn from_tag(t: u8) -> Option<Self> {
760        match t {
761            0 => Some(Self::JsonbOps),
762            1 => Some(Self::JsonbPathOps),
763            _ => None,
764        }
765    }
766}
767
768#[derive(Debug, Clone, Copy, PartialEq, Eq)]
769pub enum InvertedKind {
770    Gin(GinOpsClass),
771    Fts { config_id: u8 },
772    Ann { metric: AnnMetric },
773}
774
775#[derive(Debug, Clone, Copy, PartialEq, Eq)]
776pub enum AnnMetric {
777    L2,
778    Inner,
779    Cosine,
780}
781
782impl AnnMetric {
783    pub fn as_tag(self) -> u8 {
784        match self {
785            Self::L2 => 0,
786            Self::Inner => 1,
787            Self::Cosine => 2,
788        }
789    }
790
791    pub fn from_tag(t: u8) -> Option<Self> {
792        match t {
793            0 => Some(Self::L2),
794            1 => Some(Self::Inner),
795            2 => Some(Self::Cosine),
796            _ => None,
797        }
798    }
799}
800
801#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
802pub enum IndexKind {
803    #[default]
804    BTree,
805    Inverted(InvertedKind),
806}
807
808/// `IndexKey::Column` for `CREATE INDEX ON t (email)`, `IndexKey::Expr` for `LOWER(email)`.
809#[derive(Debug, Clone)]
810pub struct IndexDef {
811    pub name: String,
812    pub keys: Vec<IndexKey>,
813    pub unique: bool,
814    pub predicate_sql: Option<String>,
815    pub predicate_expr: Option<crate::parser::Expr>,
816    pub kind: IndexKind,
817    /// ANN-only: schema column indices pushed into the PRISM cell filter.
818    /// Empty for every other index kind.
819    pub ann_filter_cols: Vec<u16>,
820}
821
822#[derive(Debug, Clone)]
823pub enum IndexKey {
824    Column {
825        idx: u16,
826        collate: Collation,
827    },
828    Expr {
829        expr: crate::parser::Expr,
830        original_sql: String,
831    },
832}
833
834impl IndexDef {
835    /// Used by FK/UNIQUE auto-indexes; expression-key indexes go through a different path.
836    pub fn from_column_lists(
837        name: String,
838        columns: Vec<u16>,
839        collations: Vec<Collation>,
840        unique: bool,
841        predicate_sql: Option<String>,
842        predicate_expr: Option<crate::parser::Expr>,
843        kind: IndexKind,
844    ) -> Self {
845        let keys = if collations.is_empty() {
846            columns
847                .into_iter()
848                .map(|idx| IndexKey::Column {
849                    idx,
850                    collate: Collation::Binary,
851                })
852                .collect()
853        } else {
854            columns
855                .into_iter()
856                .zip(collations)
857                .map(|(idx, collate)| IndexKey::Column { idx, collate })
858                .collect()
859        };
860        Self {
861            name,
862            keys,
863            unique,
864            predicate_sql,
865            predicate_expr,
866            kind,
867            ann_filter_cols: Vec::new(),
868        }
869    }
870
871    /// Expression keys are skipped (positions only come from `IndexKey::Column`).
872    pub fn columns_vec(&self) -> Vec<u16> {
873        self.keys
874            .iter()
875            .filter_map(|k| match k {
876                IndexKey::Column { idx, .. } => Some(*idx),
877                IndexKey::Expr { .. } => None,
878            })
879            .collect()
880    }
881
882    /// Expression keys default to Binary.
883    pub fn collations_vec(&self) -> Vec<Collation> {
884        self.keys
885            .iter()
886            .map(|k| match k {
887                IndexKey::Column { collate, .. } => *collate,
888                IndexKey::Expr { .. } => Collation::Binary,
889            })
890            .collect()
891    }
892
893    pub fn column_positions_iter(&self) -> impl Iterator<Item = u16> + '_ {
894        self.keys.iter().filter_map(|k| match k {
895            IndexKey::Column { idx, .. } => Some(*idx),
896            IndexKey::Expr { .. } => None,
897        })
898    }
899
900    pub fn collation_at(&self, i: usize) -> Collation {
901        match self.keys.get(i) {
902            Some(IndexKey::Column { collate, .. }) => *collate,
903            _ => Collation::Binary,
904        }
905    }
906
907    pub fn is_pure_column_index(&self) -> bool {
908        self.keys
909            .iter()
910            .all(|k| matches!(k, IndexKey::Column { .. }))
911    }
912}
913
914#[derive(Debug, Clone)]
915pub struct ViewDef {
916    pub name: String,
917    pub sql: String,
918    pub column_aliases: Vec<String>,
919}
920
921const VIEW_DEF_VERSION: u8 = 1;
922
923impl ViewDef {
924    pub fn serialize(&self) -> Vec<u8> {
925        let mut buf = Vec::new();
926        buf.push(VIEW_DEF_VERSION);
927
928        let name_bytes = self.name.as_bytes();
929        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
930        buf.extend_from_slice(name_bytes);
931
932        let sql_bytes = self.sql.as_bytes();
933        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
934        buf.extend_from_slice(sql_bytes);
935
936        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
937        for alias in &self.column_aliases {
938            let alias_bytes = alias.as_bytes();
939            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
940            buf.extend_from_slice(alias_bytes);
941        }
942
943        buf
944    }
945
946    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
947        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
948            return Err(crate::error::SqlError::InvalidValue(
949                "invalid view definition version".into(),
950            ));
951        }
952        let mut pos = 1;
953
954        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
955        pos += 2;
956        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
957        pos += name_len;
958
959        let sql_len =
960            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
961        pos += 4;
962        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
963        pos += sql_len;
964
965        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
966        pos += 2;
967        let mut column_aliases = Vec::with_capacity(alias_count);
968        for _ in 0..alias_count {
969            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
970            pos += 2;
971            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
972            pos += alias_len;
973            column_aliases.push(alias);
974        }
975
976        Ok(Self {
977            name,
978            sql,
979            column_aliases,
980        })
981    }
982}
983
984/// Backing table shares the matview's name and is repopulated on REFRESH.
985#[derive(Debug, Clone)]
986pub struct MatviewDef {
987    pub name: String,
988    pub select_sql: String,
989    pub backing_table: String,
990    pub with_data: bool,
991    pub created_at_micros: i64,
992}
993
994const MATVIEW_DEF_VERSION: u8 = 1;
995
996impl MatviewDef {
997    pub fn backing_table_name(name: &str) -> String {
998        name.to_ascii_lowercase()
999    }
1000
1001    pub fn serialize(&self) -> Vec<u8> {
1002        let mut buf = Vec::new();
1003        buf.push(MATVIEW_DEF_VERSION);
1004        write_short_str(&mut buf, &self.name);
1005        write_long_str(&mut buf, &self.select_sql);
1006        write_short_str(&mut buf, &self.backing_table);
1007        buf.push(if self.with_data { 1 } else { 0 });
1008        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
1009        buf
1010    }
1011
1012    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1013        if data.is_empty() || data[0] != MATVIEW_DEF_VERSION {
1014            return Err(crate::error::SqlError::InvalidValue(
1015                "invalid matview definition version".into(),
1016            ));
1017        }
1018        let mut pos = 1usize;
1019        let name = read_short_str(data, &mut pos);
1020        let select_sql = read_long_str(data, &mut pos);
1021        let backing_table = read_short_str(data, &mut pos);
1022        let with_data = data[pos] != 0;
1023        pos += 1;
1024        let created_at_micros = i64::from_le_bytes([
1025            data[pos],
1026            data[pos + 1],
1027            data[pos + 2],
1028            data[pos + 3],
1029            data[pos + 4],
1030            data[pos + 5],
1031            data[pos + 6],
1032            data[pos + 7],
1033        ]);
1034        Ok(Self {
1035            name,
1036            select_sql,
1037            backing_table,
1038            with_data,
1039            created_at_micros,
1040        })
1041    }
1042}
1043
1044#[derive(Debug, Clone)]
1045pub struct TriggerDef {
1046    pub name: String,
1047    pub timing: crate::parser::TriggerTiming,
1048    pub events: Vec<crate::parser::TriggerEvent>,
1049    pub target: String,
1050    pub granularity: crate::parser::TriggerGranularity,
1051    pub referencing: Option<crate::parser::TransitionTables>,
1052    pub when_sql: Option<String>,
1053    pub body_sql: String,
1054    pub enabled: bool,
1055    pub created_at_micros: i64,
1056}
1057
1058const TRIGGER_DEF_VERSION: u8 = 1;
1059
1060impl TriggerDef {
1061    pub fn serialize(&self) -> Vec<u8> {
1062        let mut buf = Vec::new();
1063        buf.push(TRIGGER_DEF_VERSION);
1064
1065        write_short_str(&mut buf, &self.name);
1066        buf.push(match self.timing {
1067            crate::parser::TriggerTiming::Before => 0,
1068            crate::parser::TriggerTiming::After => 1,
1069            crate::parser::TriggerTiming::InsteadOf => 2,
1070        });
1071
1072        buf.extend_from_slice(&(self.events.len() as u16).to_le_bytes());
1073        for ev in &self.events {
1074            match ev {
1075                crate::parser::TriggerEvent::Insert => buf.push(0),
1076                crate::parser::TriggerEvent::Delete => buf.push(1),
1077                crate::parser::TriggerEvent::Update(cols) => {
1078                    buf.push(2);
1079                    buf.extend_from_slice(&(cols.len() as u16).to_le_bytes());
1080                    for c in cols {
1081                        write_short_str(&mut buf, c);
1082                    }
1083                }
1084            }
1085        }
1086
1087        write_short_str(&mut buf, &self.target);
1088        buf.push(match self.granularity {
1089            crate::parser::TriggerGranularity::ForEachRow => 0,
1090            crate::parser::TriggerGranularity::ForEachStatement => 1,
1091        });
1092
1093        match &self.referencing {
1094            None => buf.push(0),
1095            Some(r) => {
1096                buf.push(1);
1097                write_opt_string(&mut buf, &r.new_table_alias);
1098                write_opt_string(&mut buf, &r.old_table_alias);
1099            }
1100        }
1101
1102        match &self.when_sql {
1103            None => buf.push(0),
1104            Some(s) => {
1105                buf.push(1);
1106                write_long_str(&mut buf, s);
1107            }
1108        }
1109
1110        write_long_str(&mut buf, &self.body_sql);
1111        buf.push(if self.enabled { 1 } else { 0 });
1112        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
1113
1114        buf
1115    }
1116
1117    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1118        if data.is_empty() || data[0] != TRIGGER_DEF_VERSION {
1119            return Err(crate::error::SqlError::InvalidValue(
1120                "invalid trigger definition version".into(),
1121            ));
1122        }
1123        let mut pos = 1;
1124        let name = read_short_str(data, &mut pos);
1125        let timing = match data[pos] {
1126            0 => crate::parser::TriggerTiming::Before,
1127            1 => crate::parser::TriggerTiming::After,
1128            2 => crate::parser::TriggerTiming::InsteadOf,
1129            _ => {
1130                return Err(crate::error::SqlError::InvalidValue(
1131                    "invalid trigger timing tag".into(),
1132                ))
1133            }
1134        };
1135        pos += 1;
1136
1137        let event_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1138        pos += 2;
1139        let mut events = Vec::with_capacity(event_count);
1140        for _ in 0..event_count {
1141            let tag = data[pos];
1142            pos += 1;
1143            let ev = match tag {
1144                0 => crate::parser::TriggerEvent::Insert,
1145                1 => crate::parser::TriggerEvent::Delete,
1146                2 => {
1147                    let cnt = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1148                    pos += 2;
1149                    let mut cols = Vec::with_capacity(cnt);
1150                    for _ in 0..cnt {
1151                        cols.push(read_short_str(data, &mut pos));
1152                    }
1153                    crate::parser::TriggerEvent::Update(cols)
1154                }
1155                _ => {
1156                    return Err(crate::error::SqlError::InvalidValue(
1157                        "invalid trigger event tag".into(),
1158                    ))
1159                }
1160            };
1161            events.push(ev);
1162        }
1163
1164        let target = read_short_str(data, &mut pos);
1165        let granularity = match data[pos] {
1166            0 => crate::parser::TriggerGranularity::ForEachRow,
1167            1 => crate::parser::TriggerGranularity::ForEachStatement,
1168            _ => {
1169                return Err(crate::error::SqlError::InvalidValue(
1170                    "invalid trigger granularity tag".into(),
1171                ))
1172            }
1173        };
1174        pos += 1;
1175
1176        let referencing = if data[pos] == 0 {
1177            pos += 1;
1178            None
1179        } else {
1180            pos += 1;
1181            let new_table_alias = read_opt_string(data, &mut pos);
1182            let old_table_alias = read_opt_string(data, &mut pos);
1183            Some(crate::parser::TransitionTables {
1184                new_table_alias,
1185                old_table_alias,
1186            })
1187        };
1188
1189        let when_sql = if data[pos] == 0 {
1190            pos += 1;
1191            None
1192        } else {
1193            pos += 1;
1194            Some(read_long_str(data, &mut pos))
1195        };
1196
1197        let body_sql = read_long_str(data, &mut pos);
1198        let enabled = data[pos] != 0;
1199        pos += 1;
1200        let created_at_micros = i64::from_le_bytes([
1201            data[pos],
1202            data[pos + 1],
1203            data[pos + 2],
1204            data[pos + 3],
1205            data[pos + 4],
1206            data[pos + 5],
1207            data[pos + 6],
1208            data[pos + 7],
1209        ]);
1210
1211        Ok(Self {
1212            name,
1213            timing,
1214            events,
1215            target,
1216            granularity,
1217            referencing,
1218            when_sql,
1219            body_sql,
1220            enabled,
1221            created_at_micros,
1222        })
1223    }
1224}
1225
1226fn write_short_str(buf: &mut Vec<u8>, s: &str) {
1227    let bytes = s.as_bytes();
1228    buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1229    buf.extend_from_slice(bytes);
1230}
1231
1232fn read_short_str(data: &[u8], pos: &mut usize) -> String {
1233    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1234    *pos += 2;
1235    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1236    *pos += len;
1237    s
1238}
1239
1240fn write_long_str(buf: &mut Vec<u8>, s: &str) {
1241    let bytes = s.as_bytes();
1242    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1243    buf.extend_from_slice(bytes);
1244}
1245
1246fn read_long_str(data: &[u8], pos: &mut usize) -> String {
1247    let len =
1248        u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]) as usize;
1249    *pos += 4;
1250    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1251    *pos += len;
1252    s
1253}
1254
1255#[derive(Debug, Clone)]
1256pub struct TableCheckDef {
1257    pub name: Option<String>,
1258    pub expr: Expr,
1259    pub sql: String,
1260}
1261
1262#[derive(Debug, Clone)]
1263pub struct ForeignKeySchemaEntry {
1264    pub name: Option<String>,
1265    pub columns: Vec<u16>,
1266    pub foreign_table: String,
1267    pub referred_columns: Vec<String>,
1268    pub on_delete: crate::parser::ReferentialAction,
1269    pub on_update: crate::parser::ReferentialAction,
1270    pub deferrable: bool,
1271    pub initially_deferred: bool,
1272}
1273
1274#[derive(Debug)]
1275pub struct TableSchema {
1276    pub name: String,
1277    pub columns: Vec<ColumnDef>,
1278    pub primary_key_columns: Vec<u16>,
1279    pub indices: Vec<IndexDef>,
1280    pub check_constraints: Vec<TableCheckDef>,
1281    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
1282    pub flags: u8,
1283    pk_idx_cache: Vec<usize>,
1284    non_pk_idx_cache: Vec<usize>,
1285    /// Sorted physical slots dropped via DROP COLUMN.
1286    dropped_non_pk_slots: Vec<u16>,
1287    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1288    decode_mapping_cache: Vec<usize>,
1289    /// Logical non-PK order -> physical encoding position.
1290    encoding_positions_cache: Vec<u16>,
1291    has_virtual_columns_cache: bool,
1292    column_map_cache: std::sync::OnceLock<crate::eval::ColumnMap>,
1293}
1294
1295impl Clone for TableSchema {
1296    fn clone(&self) -> Self {
1297        Self {
1298            name: self.name.clone(),
1299            columns: self.columns.clone(),
1300            primary_key_columns: self.primary_key_columns.clone(),
1301            indices: self.indices.clone(),
1302            check_constraints: self.check_constraints.clone(),
1303            foreign_keys: self.foreign_keys.clone(),
1304            flags: self.flags,
1305            pk_idx_cache: self.pk_idx_cache.clone(),
1306            non_pk_idx_cache: self.non_pk_idx_cache.clone(),
1307            dropped_non_pk_slots: self.dropped_non_pk_slots.clone(),
1308            decode_mapping_cache: self.decode_mapping_cache.clone(),
1309            encoding_positions_cache: self.encoding_positions_cache.clone(),
1310            has_virtual_columns_cache: self.has_virtual_columns_cache,
1311            column_map_cache: std::sync::OnceLock::new(),
1312        }
1313    }
1314}
1315
1316impl TableSchema {
1317    pub fn new(
1318        name: String,
1319        columns: Vec<ColumnDef>,
1320        primary_key_columns: Vec<u16>,
1321        indices: Vec<IndexDef>,
1322        check_constraints: Vec<TableCheckDef>,
1323        foreign_keys: Vec<ForeignKeySchemaEntry>,
1324    ) -> Self {
1325        Self::with_drops(
1326            name,
1327            columns,
1328            primary_key_columns,
1329            indices,
1330            check_constraints,
1331            foreign_keys,
1332            vec![],
1333        )
1334    }
1335
1336    pub fn with_drops(
1337        name: String,
1338        columns: Vec<ColumnDef>,
1339        primary_key_columns: Vec<u16>,
1340        indices: Vec<IndexDef>,
1341        check_constraints: Vec<TableCheckDef>,
1342        foreign_keys: Vec<ForeignKeySchemaEntry>,
1343        dropped_non_pk_slots: Vec<u16>,
1344    ) -> Self {
1345        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
1346        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
1347            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
1348            .collect();
1349
1350        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
1351        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
1352        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
1353
1354        let mut drop_idx = 0;
1355        let mut live_idx = 0;
1356        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
1357            if drop_idx < dropped_non_pk_slots.len()
1358                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
1359            {
1360                drop_idx += 1;
1361            } else {
1362                *slot = non_pk_idx_cache[live_idx];
1363                encoding_positions_cache.push(phys_pos as u16);
1364                live_idx += 1;
1365            }
1366        }
1367
1368        let has_virtual_columns_cache = columns.iter().any(|c| {
1369            matches!(
1370                c.generated_kind,
1371                Some(crate::parser::GeneratedKind::Virtual)
1372            )
1373        });
1374
1375        Self {
1376            name,
1377            columns,
1378            primary_key_columns,
1379            indices,
1380            check_constraints,
1381            foreign_keys,
1382            flags: 0,
1383            pk_idx_cache,
1384            non_pk_idx_cache,
1385            dropped_non_pk_slots,
1386            decode_mapping_cache,
1387            encoding_positions_cache,
1388            has_virtual_columns_cache,
1389            column_map_cache: std::sync::OnceLock::new(),
1390        }
1391    }
1392
1393    #[inline]
1394    pub fn column_map(&self) -> &crate::eval::ColumnMap {
1395        self.column_map_cache
1396            .get_or_init(|| crate::eval::ColumnMap::new(&self.columns))
1397    }
1398
1399    pub fn is_strict(&self) -> bool {
1400        self.flags & TABLE_FLAG_STRICT != 0
1401    }
1402
1403    pub fn has_virtual_columns(&self) -> bool {
1404        self.has_virtual_columns_cache
1405    }
1406
1407    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
1408    pub fn rebuild(self) -> Self {
1409        let drops = self.dropped_non_pk_slots;
1410        Self::with_drops(
1411            self.name,
1412            self.columns,
1413            self.primary_key_columns,
1414            self.indices,
1415            self.check_constraints,
1416            self.foreign_keys,
1417            drops,
1418        )
1419    }
1420
1421    pub fn has_checks(&self) -> bool {
1422        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
1423    }
1424
1425    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1426    pub fn decode_col_mapping(&self) -> &[usize] {
1427        &self.decode_mapping_cache
1428    }
1429
1430    /// Logical non-PK order -> physical encoding position.
1431    pub fn encoding_positions(&self) -> &[u16] {
1432        &self.encoding_positions_cache
1433    }
1434
1435    /// Total physical non-PK column count (live + dropped slots).
1436    pub fn physical_non_pk_count(&self) -> usize {
1437        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
1438    }
1439
1440    pub fn dropped_non_pk_slots(&self) -> &[u16] {
1441        &self.dropped_non_pk_slots
1442    }
1443
1444    pub fn without_column(&self, drop_pos: usize) -> Self {
1445        let non_pk_order = self
1446            .non_pk_idx_cache
1447            .iter()
1448            .position(|&i| i == drop_pos)
1449            .expect("cannot drop PK column via without_column");
1450        let physical_slot = self.encoding_positions_cache[non_pk_order];
1451
1452        let mut new_dropped = self.dropped_non_pk_slots.clone();
1453        new_dropped.push(physical_slot);
1454        new_dropped.sort();
1455
1456        let dropped_name = &self.columns[drop_pos].name;
1457        let drop_pos_u16 = drop_pos as u16;
1458
1459        let mut columns: Vec<ColumnDef> = self
1460            .columns
1461            .iter()
1462            .enumerate()
1463            .filter(|(i, _)| *i != drop_pos)
1464            .map(|(_, c)| {
1465                let mut col = c.clone();
1466                if col.position > drop_pos_u16 {
1467                    col.position -= 1;
1468                }
1469                col
1470            })
1471            .collect();
1472        for (i, col) in columns.iter_mut().enumerate() {
1473            col.position = i as u16;
1474        }
1475
1476        let primary_key_columns: Vec<u16> = self
1477            .primary_key_columns
1478            .iter()
1479            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1480            .collect();
1481
1482        let indices: Vec<IndexDef> = self
1483            .indices
1484            .iter()
1485            .map(|idx| IndexDef {
1486                name: idx.name.clone(),
1487                keys: idx
1488                    .keys
1489                    .iter()
1490                    .map(|k| match k {
1491                        IndexKey::Column { idx, collate } => IndexKey::Column {
1492                            idx: if *idx > drop_pos_u16 { *idx - 1 } else { *idx },
1493                            collate: *collate,
1494                        },
1495                        IndexKey::Expr { expr, original_sql } => IndexKey::Expr {
1496                            expr: expr.clone(),
1497                            original_sql: original_sql.clone(),
1498                        },
1499                    })
1500                    .collect(),
1501                unique: idx.unique,
1502                predicate_sql: idx.predicate_sql.clone(),
1503                predicate_expr: idx.predicate_expr.clone(),
1504                kind: idx.kind,
1505                ann_filter_cols: idx
1506                    .ann_filter_cols
1507                    .iter()
1508                    .filter(|&&p| p != drop_pos_u16)
1509                    .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1510                    .collect(),
1511            })
1512            .collect();
1513
1514        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
1515            .foreign_keys
1516            .iter()
1517            .map(|fk| ForeignKeySchemaEntry {
1518                name: fk.name.clone(),
1519                columns: fk
1520                    .columns
1521                    .iter()
1522                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
1523                    .collect(),
1524                foreign_table: fk.foreign_table.clone(),
1525                referred_columns: fk.referred_columns.clone(),
1526                on_delete: fk.on_delete,
1527                on_update: fk.on_update,
1528                deferrable: fk.deferrable,
1529                initially_deferred: fk.initially_deferred,
1530            })
1531            .collect();
1532
1533        // Filter out table-level CHECKs that reference the dropped column
1534        let dropped_lower = dropped_name.to_ascii_lowercase();
1535        let check_constraints: Vec<TableCheckDef> = self
1536            .check_constraints
1537            .iter()
1538            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
1539            .cloned()
1540            .collect();
1541
1542        Self::with_drops(
1543            self.name.clone(),
1544            columns,
1545            primary_key_columns,
1546            indices,
1547            check_constraints,
1548            foreign_keys,
1549            new_dropped,
1550        )
1551    }
1552}
1553
1554const SCHEMA_VERSION: u8 = 14;
1555pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
1556
1557fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
1558    match s {
1559        Some(s) => {
1560            let bytes = s.as_bytes();
1561            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1562            buf.extend_from_slice(bytes);
1563        }
1564        None => buf.extend_from_slice(&0u16.to_le_bytes()),
1565    }
1566}
1567
1568fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
1569    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1570    *pos += 2;
1571    if len == 0 {
1572        None
1573    } else {
1574        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1575        *pos += len;
1576        Some(s)
1577    }
1578}
1579
1580fn read_string(data: &[u8], pos: &mut usize) -> String {
1581    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1582    *pos += 2;
1583    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1584    *pos += len;
1585    s
1586}
1587
1588impl TableSchema {
1589    pub fn serialize(&self) -> Vec<u8> {
1590        let mut buf = Vec::new();
1591        buf.push(SCHEMA_VERSION);
1592
1593        let name_bytes = self.name.as_bytes();
1594        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
1595        buf.extend_from_slice(name_bytes);
1596
1597        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
1598
1599        for col in &self.columns {
1600            let col_name = col.name.as_bytes();
1601            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
1602            buf.extend_from_slice(col_name);
1603            buf.push(col.data_type.type_tag());
1604            if let DataType::Vector { dim } = col.data_type {
1605                buf.extend_from_slice(&dim.to_le_bytes());
1606            }
1607            buf.push(if col.nullable { 1 } else { 0 });
1608            buf.extend_from_slice(&col.position.to_le_bytes());
1609        }
1610
1611        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
1612        for &pk_idx in &self.primary_key_columns {
1613            buf.extend_from_slice(&pk_idx.to_le_bytes());
1614        }
1615
1616        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
1617        for idx in &self.indices {
1618            let idx_name = idx.name.as_bytes();
1619            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
1620            buf.extend_from_slice(idx_name);
1621            buf.extend_from_slice(&(idx.keys.len() as u16).to_le_bytes());
1622            for key in &idx.keys {
1623                let col_idx = match key {
1624                    IndexKey::Column { idx, .. } => *idx,
1625                    IndexKey::Expr { .. } => u16::MAX,
1626                };
1627                buf.extend_from_slice(&col_idx.to_le_bytes());
1628            }
1629            buf.push(if idx.unique { 1 } else { 0 });
1630        }
1631
1632        for col in &self.columns {
1633            let mut flags: u8 = 0;
1634            if col.default_sql.is_some() {
1635                flags |= 1;
1636            }
1637            if col.check_sql.is_some() {
1638                flags |= 2;
1639            }
1640            buf.push(flags);
1641            if let Some(ref sql) = col.default_sql {
1642                let bytes = sql.as_bytes();
1643                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1644                buf.extend_from_slice(bytes);
1645            }
1646            if let Some(ref sql) = col.check_sql {
1647                let bytes = sql.as_bytes();
1648                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1649                buf.extend_from_slice(bytes);
1650                write_opt_string(&mut buf, &col.check_name);
1651            }
1652        }
1653
1654        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1655        for chk in &self.check_constraints {
1656            write_opt_string(&mut buf, &chk.name);
1657            let sql_bytes = chk.sql.as_bytes();
1658            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1659            buf.extend_from_slice(sql_bytes);
1660        }
1661
1662        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1663        for fk in &self.foreign_keys {
1664            write_opt_string(&mut buf, &fk.name);
1665            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1666            for &col_idx in &fk.columns {
1667                buf.extend_from_slice(&col_idx.to_le_bytes());
1668            }
1669            let ft_bytes = fk.foreign_table.as_bytes();
1670            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1671            buf.extend_from_slice(ft_bytes);
1672            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1673            for rc in &fk.referred_columns {
1674                let rc_bytes = rc.as_bytes();
1675                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1676                buf.extend_from_slice(rc_bytes);
1677            }
1678        }
1679
1680        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1681        for &slot in &self.dropped_non_pk_slots {
1682            buf.extend_from_slice(&slot.to_le_bytes());
1683        }
1684
1685        for col in &self.columns {
1686            let kind_tag: u8 = match col.generated_kind {
1687                None => 0,
1688                Some(crate::parser::GeneratedKind::Stored) => 1,
1689                Some(crate::parser::GeneratedKind::Virtual) => 2,
1690            };
1691            buf.push(kind_tag);
1692            if kind_tag != 0 {
1693                let sql = col.generated_sql.as_deref().unwrap_or("");
1694                let bytes = sql.as_bytes();
1695                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1696                buf.extend_from_slice(bytes);
1697            }
1698        }
1699
1700        for idx in &self.indices {
1701            match &idx.predicate_sql {
1702                Some(sql) => {
1703                    buf.push(1);
1704                    let bytes = sql.as_bytes();
1705                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1706                    buf.extend_from_slice(bytes);
1707                }
1708                None => buf.push(0),
1709            }
1710        }
1711
1712        for fk in &self.foreign_keys {
1713            buf.push(fk.on_delete as u8);
1714            buf.push(fk.on_update as u8);
1715        }
1716
1717        for fk in &self.foreign_keys {
1718            let mut flags: u8 = 0;
1719            if fk.deferrable {
1720                flags |= 0b01;
1721            }
1722            if fk.initially_deferred {
1723                flags |= 0b10;
1724            }
1725            buf.push(flags);
1726        }
1727
1728        for col in &self.columns {
1729            buf.push(col.collation as u8);
1730        }
1731        for idx in &self.indices {
1732            let n = idx.keys.len() as u16;
1733            buf.extend_from_slice(&n.to_le_bytes());
1734            for key in &idx.keys {
1735                let c = match key {
1736                    IndexKey::Column { collate, .. } => *collate,
1737                    IndexKey::Expr { .. } => Collation::Binary,
1738                };
1739                buf.push(c as u8);
1740            }
1741        }
1742        for idx in &self.indices {
1743            match idx.kind {
1744                IndexKind::BTree => buf.push(0),
1745                IndexKind::Inverted(InvertedKind::Gin(ops)) => {
1746                    buf.push(1);
1747                    buf.push(ops.as_tag());
1748                }
1749                IndexKind::Inverted(InvertedKind::Fts { config_id }) => {
1750                    buf.push(2);
1751                    buf.push(config_id);
1752                }
1753                IndexKind::Inverted(InvertedKind::Ann { metric }) => {
1754                    buf.push(3);
1755                    buf.push(metric.as_tag());
1756                }
1757            }
1758        }
1759        buf.push(self.flags);
1760
1761        // v12: per-index expression-key extension. Emit (position, SQL) for each Expr key.
1762        // v11 readers stop before this section; v12 readers consume it.
1763        for idx in &self.indices {
1764            let expr_count = idx
1765                .keys
1766                .iter()
1767                .filter(|k| matches!(k, IndexKey::Expr { .. }))
1768                .count() as u16;
1769            buf.extend_from_slice(&expr_count.to_le_bytes());
1770            for (pos, key) in idx.keys.iter().enumerate() {
1771                if let IndexKey::Expr { original_sql, .. } = key {
1772                    buf.extend_from_slice(&(pos as u16).to_le_bytes());
1773                    let bytes = original_sql.as_bytes();
1774                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1775                    buf.extend_from_slice(bytes);
1776                }
1777            }
1778        }
1779
1780        // v14: per-index ANN filter columns.
1781        for idx in &self.indices {
1782            buf.extend_from_slice(&(idx.ann_filter_cols.len() as u16).to_le_bytes());
1783            for &col in &idx.ann_filter_cols {
1784                buf.extend_from_slice(&col.to_le_bytes());
1785            }
1786        }
1787
1788        buf
1789    }
1790
1791    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1792        let mut pos = 0;
1793
1794        if data.is_empty()
1795            || !matches!(
1796                data[0],
1797                1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | SCHEMA_VERSION
1798            )
1799        {
1800            return Err(crate::error::SqlError::InvalidValue(
1801                "invalid schema version".into(),
1802            ));
1803        }
1804        let version = data[0];
1805        pos += 1;
1806
1807        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1808        pos += 2;
1809        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1810        pos += name_len;
1811
1812        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1813        pos += 2;
1814
1815        let mut columns = Vec::with_capacity(col_count);
1816        for _ in 0..col_count {
1817            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1818            pos += 2;
1819            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1820            pos += col_name_len;
1821            let tag = data[pos];
1822            pos += 1;
1823            let data_type = if tag == 15 {
1824                let dim = u16::from_le_bytes([data[pos], data[pos + 1]]);
1825                pos += 2;
1826                DataType::Vector { dim }
1827            } else {
1828                DataType::from_tag(tag).ok_or_else(|| {
1829                    crate::error::SqlError::InvalidValue("unknown data type tag".into())
1830                })?
1831            };
1832            let nullable = data[pos] != 0;
1833            pos += 1;
1834            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1835            pos += 2;
1836            columns.push(ColumnDef {
1837                name: col_name,
1838                data_type,
1839                nullable,
1840                position,
1841                default_expr: None,
1842                default_sql: None,
1843                check_expr: None,
1844                check_sql: None,
1845                check_name: None,
1846                is_with_timezone: false,
1847                generated_expr: None,
1848                generated_sql: None,
1849                generated_kind: None,
1850                collation: Collation::Binary,
1851            });
1852        }
1853
1854        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1855        pos += 2;
1856        let mut primary_key_columns = Vec::with_capacity(pk_count);
1857        for _ in 0..pk_count {
1858            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1859            pos += 2;
1860            primary_key_columns.push(pk_idx);
1861        }
1862
1863        let indices = if version >= 2 && pos + 2 <= data.len() {
1864            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1865            pos += 2;
1866            let mut idxs = Vec::with_capacity(idx_count);
1867            for _ in 0..idx_count {
1868                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1869                pos += 2;
1870                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1871                pos += idx_name_len;
1872                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1873                pos += 2;
1874                let mut keys: Vec<IndexKey> = Vec::with_capacity(col_count);
1875                for _ in 0..col_count {
1876                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1877                    pos += 2;
1878                    // u16::MAX marks an expression key that the v12 section will fill in below.
1879                    // For v11 indexes (no expression section), this stays as a column placeholder.
1880                    keys.push(IndexKey::Column {
1881                        idx: col_idx,
1882                        collate: Collation::Binary,
1883                    });
1884                }
1885                let unique = data[pos] != 0;
1886                pos += 1;
1887                idxs.push(IndexDef {
1888                    name: idx_name,
1889                    keys,
1890                    unique,
1891                    predicate_sql: None,
1892                    predicate_expr: None,
1893                    kind: IndexKind::default(),
1894                    ann_filter_cols: Vec::new(),
1895                });
1896            }
1897            idxs
1898        } else {
1899            vec![]
1900        };
1901
1902        let mut check_constraints = Vec::new();
1903        let mut foreign_keys = Vec::new();
1904
1905        if version >= 3 && pos < data.len() {
1906            for col in &mut columns {
1907                let flags = data[pos];
1908                pos += 1;
1909                if flags & 1 != 0 {
1910                    let sql = read_string(data, &mut pos);
1911                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1912                        crate::error::SqlError::InvalidValue(format!(
1913                            "cannot parse DEFAULT expression: {sql}"
1914                        ))
1915                    })?);
1916                    col.default_sql = Some(sql);
1917                }
1918                if flags & 2 != 0 {
1919                    let sql = read_string(data, &mut pos);
1920                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1921                        crate::error::SqlError::InvalidValue(format!(
1922                            "cannot parse CHECK expression: {sql}"
1923                        ))
1924                    })?);
1925                    col.check_sql = Some(sql);
1926                    col.check_name = read_opt_string(data, &mut pos);
1927                }
1928            }
1929
1930            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1931            pos += 2;
1932            for _ in 0..chk_count {
1933                let name = read_opt_string(data, &mut pos);
1934                let sql = read_string(data, &mut pos);
1935                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1936                    crate::error::SqlError::InvalidValue(format!(
1937                        "cannot parse CHECK expression: {sql}"
1938                    ))
1939                })?;
1940                check_constraints.push(TableCheckDef { name, expr, sql });
1941            }
1942
1943            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1944            pos += 2;
1945            for _ in 0..fk_count {
1946                let name = read_opt_string(data, &mut pos);
1947                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1948                pos += 2;
1949                let mut cols = Vec::with_capacity(col_count);
1950                for _ in 0..col_count {
1951                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1952                    pos += 2;
1953                    cols.push(col_idx);
1954                }
1955                let foreign_table = read_string(data, &mut pos);
1956                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1957                pos += 2;
1958                let mut referred_columns = Vec::with_capacity(ref_count);
1959                for _ in 0..ref_count {
1960                    referred_columns.push(read_string(data, &mut pos));
1961                }
1962                foreign_keys.push(ForeignKeySchemaEntry {
1963                    name,
1964                    columns: cols,
1965                    foreign_table,
1966                    referred_columns,
1967                    on_delete: crate::parser::ReferentialAction::NoAction,
1968                    on_update: crate::parser::ReferentialAction::NoAction,
1969                    deferrable: false,
1970                    initially_deferred: false,
1971                });
1972            }
1973        }
1974        let mut dropped_non_pk_slots = Vec::new();
1975        if version >= 4 && pos + 2 <= data.len() {
1976            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1977            pos += 2;
1978            for _ in 0..slot_count {
1979                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1980                pos += 2;
1981                dropped_non_pk_slots.push(slot);
1982            }
1983        }
1984        if version >= 5 && pos < data.len() {
1985            for col in &mut columns {
1986                let kind_tag = data[pos];
1987                pos += 1;
1988                if kind_tag != 0 {
1989                    let len = u32::from_le_bytes([
1990                        data[pos],
1991                        data[pos + 1],
1992                        data[pos + 2],
1993                        data[pos + 3],
1994                    ]) as usize;
1995                    pos += 4;
1996                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1997                    pos += len;
1998                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1999                        crate::error::SqlError::InvalidValue(format!(
2000                            "cannot parse GENERATED expression: {sql}"
2001                        ))
2002                    })?;
2003                    col.generated_sql = Some(sql);
2004                    col.generated_expr = Some(expr);
2005                    col.generated_kind = Some(match kind_tag {
2006                        1 => crate::parser::GeneratedKind::Stored,
2007                        2 => crate::parser::GeneratedKind::Virtual,
2008                        _ => {
2009                            return Err(crate::error::SqlError::InvalidValue(
2010                                "unknown GENERATED kind tag".into(),
2011                            ));
2012                        }
2013                    });
2014                }
2015            }
2016        }
2017        let mut indices = indices;
2018        if version >= 6 && pos < data.len() {
2019            for idx in &mut indices {
2020                let flag = data[pos];
2021                pos += 1;
2022                if flag == 1 {
2023                    let len = u32::from_le_bytes([
2024                        data[pos],
2025                        data[pos + 1],
2026                        data[pos + 2],
2027                        data[pos + 3],
2028                    ]) as usize;
2029                    pos += 4;
2030                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
2031                    pos += len;
2032                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2033                        crate::error::SqlError::InvalidValue(format!(
2034                            "cannot parse partial-index predicate: {sql}"
2035                        ))
2036                    })?;
2037                    idx.predicate_sql = Some(sql);
2038                    idx.predicate_expr = Some(expr);
2039                }
2040            }
2041            for fk in &mut foreign_keys {
2042                fk.on_delete =
2043                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
2044                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
2045                    })?;
2046                pos += 1;
2047                fk.on_update =
2048                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
2049                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
2050                    })?;
2051                pos += 1;
2052            }
2053            if version >= 11 {
2054                for fk in &mut foreign_keys {
2055                    if pos >= data.len() {
2056                        break;
2057                    }
2058                    let flags = data[pos];
2059                    pos += 1;
2060                    fk.deferrable = flags & 0b01 != 0;
2061                    fk.initially_deferred = flags & 0b10 != 0;
2062                }
2063            }
2064        }
2065
2066        let mut columns = columns;
2067        let mut indices = indices;
2068        let mut flags: u8 = 0;
2069        if version >= 7 && pos < data.len() {
2070            for col in &mut columns {
2071                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
2072                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
2073                })?;
2074                pos += 1;
2075            }
2076            for idx in &mut indices {
2077                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2078                pos += 2;
2079                for i in 0..n {
2080                    let collate = Collation::from_tag(data[pos]).ok_or_else(|| {
2081                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
2082                    })?;
2083                    pos += 1;
2084                    if let Some(IndexKey::Column { collate: c, .. }) = idx.keys.get_mut(i) {
2085                        *c = collate;
2086                    }
2087                }
2088            }
2089            if version >= 9 {
2090                for idx in &mut indices {
2091                    if pos >= data.len() {
2092                        break;
2093                    }
2094                    let tag = data[pos];
2095                    pos += 1;
2096                    idx.kind = match tag {
2097                        0 => IndexKind::BTree,
2098                        1 => {
2099                            if pos >= data.len() {
2100                                return Err(crate::error::SqlError::InvalidValue(
2101                                    "GIN index missing opclass tag".into(),
2102                                ));
2103                            }
2104                            let ops = GinOpsClass::from_tag(data[pos]).ok_or_else(|| {
2105                                crate::error::SqlError::InvalidValue(
2106                                    "unknown GIN opclass tag".into(),
2107                                )
2108                            })?;
2109                            pos += 1;
2110                            IndexKind::Inverted(InvertedKind::Gin(ops))
2111                        }
2112                        2 => {
2113                            if pos >= data.len() {
2114                                return Err(crate::error::SqlError::InvalidValue(
2115                                    "FTS index missing config_id".into(),
2116                                ));
2117                            }
2118                            let config_id = data[pos];
2119                            pos += 1;
2120                            IndexKind::Inverted(InvertedKind::Fts { config_id })
2121                        }
2122                        3 => {
2123                            if pos >= data.len() {
2124                                return Err(crate::error::SqlError::InvalidValue(
2125                                    "ANN index missing metric tag".into(),
2126                                ));
2127                            }
2128                            let metric = AnnMetric::from_tag(data[pos]).ok_or_else(|| {
2129                                crate::error::SqlError::InvalidValue(
2130                                    "unknown ANN metric tag".into(),
2131                                )
2132                            })?;
2133                            pos += 1;
2134                            IndexKind::Inverted(InvertedKind::Ann { metric })
2135                        }
2136                        _ => {
2137                            return Err(crate::error::SqlError::InvalidValue(
2138                                "unknown IndexKind tag".into(),
2139                            ));
2140                        }
2141                    };
2142                }
2143            }
2144            if pos < data.len() {
2145                flags = data[pos];
2146                pos += 1;
2147            }
2148            if version >= 12 {
2149                for idx in &mut indices {
2150                    if pos + 2 > data.len() {
2151                        break;
2152                    }
2153                    let expr_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2154                    pos += 2;
2155                    for _ in 0..expr_count {
2156                        if pos + 6 > data.len() {
2157                            return Err(crate::error::SqlError::InvalidValue(
2158                                "truncated index expression key".into(),
2159                            ));
2160                        }
2161                        let key_pos = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2162                        pos += 2;
2163                        let sql_len = u32::from_le_bytes([
2164                            data[pos],
2165                            data[pos + 1],
2166                            data[pos + 2],
2167                            data[pos + 3],
2168                        ]) as usize;
2169                        pos += 4;
2170                        if pos + sql_len > data.len() {
2171                            return Err(crate::error::SqlError::InvalidValue(
2172                                "truncated expression-key SQL".into(),
2173                            ));
2174                        }
2175                        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
2176                        pos += sql_len;
2177                        let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2178                            crate::error::SqlError::InvalidValue(format!(
2179                                "cannot parse index expression: {sql}"
2180                            ))
2181                        })?;
2182                        if key_pos < idx.keys.len() {
2183                            idx.keys[key_pos] = IndexKey::Expr {
2184                                expr,
2185                                original_sql: sql,
2186                            };
2187                        }
2188                    }
2189                }
2190            }
2191            if version >= 14 {
2192                for idx in &mut indices {
2193                    if pos + 2 > data.len() {
2194                        break;
2195                    }
2196                    let fcount = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2197                    pos += 2;
2198                    let mut fcols = Vec::with_capacity(fcount);
2199                    for _ in 0..fcount {
2200                        if pos + 2 > data.len() {
2201                            return Err(crate::error::SqlError::InvalidValue(
2202                                "truncated ANN filter columns".into(),
2203                            ));
2204                        }
2205                        fcols.push(u16::from_le_bytes([data[pos], data[pos + 1]]));
2206                        pos += 2;
2207                    }
2208                    idx.ann_filter_cols = fcols;
2209                }
2210            }
2211        }
2212        let _ = pos;
2213
2214        let mut schema = Self::with_drops(
2215            name,
2216            columns,
2217            primary_key_columns,
2218            indices,
2219            check_constraints,
2220            foreign_keys,
2221            dropped_non_pk_slots,
2222        );
2223        schema.flags = flags;
2224        Ok(schema)
2225    }
2226
2227    pub fn column_index(&self, name: &str) -> Option<usize> {
2228        self.columns
2229            .iter()
2230            .position(|c| c.name.eq_ignore_ascii_case(name))
2231    }
2232
2233    pub fn non_pk_indices(&self) -> &[usize] {
2234        &self.non_pk_idx_cache
2235    }
2236
2237    pub fn pk_indices(&self) -> &[usize] {
2238        &self.pk_idx_cache
2239    }
2240
2241    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
2242        let lower = name.to_ascii_lowercase();
2243        self.indices.iter().find(|i| i.name == lower)
2244    }
2245
2246    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
2247        format!("__idx_{table_name}_{index_name}").into_bytes()
2248    }
2249}
2250
2251#[derive(Debug)]
2252pub enum ExecutionResult {
2253    RowsAffected(u64),
2254    Query(QueryResult),
2255    Ok,
2256}
2257
2258#[derive(Debug, Clone)]
2259pub struct QueryResult {
2260    pub columns: Vec<String>,
2261    pub rows: Vec<Vec<Value>>,
2262}
2263
2264#[cfg(test)]
2265#[path = "types_tests.rs"]
2266mod tests;