Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4use std::sync::Arc;
5
6pub use compact_str::CompactString;
7
8use crate::parser::Expr;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22    Json,
23    Jsonb,
24    TsVector,
25    TsQuery,
26    Array,
27    Vector { dim: u16 },
28}
29
30impl DataType {
31    pub fn type_tag(self) -> u8 {
32        match self {
33            DataType::Null => 0,
34            DataType::Blob => 1,
35            DataType::Text => 2,
36            DataType::Boolean => 3,
37            DataType::Integer => 4,
38            DataType::Real => 5,
39            DataType::Time => 6,
40            DataType::Date => 7,
41            DataType::Timestamp => 8,
42            DataType::Interval => 9,
43            DataType::Json => 10,
44            DataType::Jsonb => 11,
45            DataType::TsVector => 12,
46            DataType::TsQuery => 13,
47            DataType::Array => 14,
48            DataType::Vector { .. } => 15,
49        }
50    }
51
52    /// Vector returns a `dim: 0` sentinel; the real dim is read at the schema layer.
53    pub fn from_tag(tag: u8) -> Option<Self> {
54        match tag {
55            0 => Some(DataType::Null),
56            1 => Some(DataType::Blob),
57            2 => Some(DataType::Text),
58            3 => Some(DataType::Boolean),
59            4 => Some(DataType::Integer),
60            5 => Some(DataType::Real),
61            6 => Some(DataType::Time),
62            7 => Some(DataType::Date),
63            8 => Some(DataType::Timestamp),
64            9 => Some(DataType::Interval),
65            10 => Some(DataType::Json),
66            11 => Some(DataType::Jsonb),
67            12 => Some(DataType::TsVector),
68            13 => Some(DataType::TsQuery),
69            14 => Some(DataType::Array),
70            15 => Some(DataType::Vector { dim: 0 }),
71            _ => None,
72        }
73    }
74}
75
76impl fmt::Display for DataType {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            DataType::Null => write!(f, "NULL"),
80            DataType::Integer => write!(f, "INTEGER"),
81            DataType::Real => write!(f, "REAL"),
82            DataType::Text => write!(f, "TEXT"),
83            DataType::Blob => write!(f, "BLOB"),
84            DataType::Boolean => write!(f, "BOOLEAN"),
85            DataType::Time => write!(f, "TIME"),
86            DataType::Date => write!(f, "DATE"),
87            DataType::Timestamp => write!(f, "TIMESTAMP"),
88            DataType::Interval => write!(f, "INTERVAL"),
89            DataType::Json => write!(f, "JSON"),
90            DataType::Jsonb => write!(f, "JSONB"),
91            DataType::TsVector => write!(f, "TSVECTOR"),
92            DataType::TsQuery => write!(f, "TSQUERY"),
93            DataType::Array => write!(f, "ARRAY"),
94            DataType::Vector { dim } => write!(f, "VECTOR({dim})"),
95        }
96    }
97}
98
99/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
100/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
101#[derive(Debug, Clone, Default)]
102pub enum Value {
103    #[default]
104    Null,
105    Integer(i64),
106    Real(f64),
107    Text(CompactString),
108    Blob(Vec<u8>),
109    Boolean(bool),
110    Time(i64),
111    Date(i32),
112    Timestamp(i64),
113    Interval {
114        months: i32,
115        days: i32,
116        micros: i64,
117    },
118    Json(CompactString),
119    Jsonb(Arc<[u8]>),
120    TsVector(Arc<[u8]>),
121    TsQuery(Arc<[u8]>),
122    Array(Arc<Vec<Value>>),
123    Vector(Arc<[f32]>),
124}
125
126impl Value {
127    pub fn data_type(&self) -> DataType {
128        match self {
129            Value::Null => DataType::Null,
130            Value::Integer(_) => DataType::Integer,
131            Value::Real(_) => DataType::Real,
132            Value::Text(_) => DataType::Text,
133            Value::Blob(_) => DataType::Blob,
134            Value::Boolean(_) => DataType::Boolean,
135            Value::Time(_) => DataType::Time,
136            Value::Date(_) => DataType::Date,
137            Value::Timestamp(_) => DataType::Timestamp,
138            Value::Interval { .. } => DataType::Interval,
139            Value::Json(_) => DataType::Json,
140            Value::Jsonb(_) => DataType::Jsonb,
141            Value::TsVector(_) => DataType::TsVector,
142            Value::TsQuery(_) => DataType::TsQuery,
143            Value::Array(_) => DataType::Array,
144            Value::Vector(v) => DataType::Vector {
145                dim: v.len() as u16,
146            },
147        }
148    }
149
150    pub fn is_null(&self) -> bool {
151        matches!(self, Value::Null)
152    }
153
154    pub fn is_finite_temporal(&self) -> bool {
155        match self {
156            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
157            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
158            _ => true,
159        }
160    }
161
162    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
163        match (self, target) {
164            (_, DataType::Null) => Some(Value::Null),
165            (Value::Null, _) => Some(Value::Null),
166            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
167            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
168            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
169            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
170            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
171            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
172            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
173            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
174            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
175            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
176            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
177            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
178            (Value::TsVector(b), DataType::TsVector) => Some(Value::TsVector(b.clone())),
179            (Value::TsQuery(b), DataType::TsQuery) => Some(Value::TsQuery(b.clone())),
180            (Value::Array(a), DataType::Array) => Some(Value::Array(a.clone())),
181            (
182                Value::Interval {
183                    months,
184                    days,
185                    micros,
186                },
187                DataType::Interval,
188            ) => Some(Value::Interval {
189                months: *months,
190                days: *days,
191                micros: *micros,
192            }),
193            _ => None,
194        }
195    }
196
197    pub fn coerce_into(self, target: DataType) -> Option<Value> {
198        if self.is_null() || target == DataType::Null {
199            return Some(Value::Null);
200        }
201        if self.data_type() == target {
202            return Some(self);
203        }
204        match (self, target) {
205            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
206            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
207            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
208            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
209            (Value::Text(s), DataType::Date) => {
210                crate::datetime::parse_date(&s).ok().map(Value::Date)
211            }
212            (Value::Text(s), DataType::Time) => {
213                crate::datetime::parse_time(&s).ok().map(Value::Time)
214            }
215            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
216                .ok()
217                .map(Value::Timestamp),
218            (Value::Text(s), DataType::Interval) => {
219                crate::datetime::parse_interval(&s)
220                    .ok()
221                    .map(|(m, d, u)| Value::Interval {
222                        months: m,
223                        days: d,
224                        micros: u,
225                    })
226            }
227            // INTEGER → TIMESTAMP: Unix epoch seconds.
228            (Value::Integer(n), DataType::Timestamp) => {
229                n.checked_mul(1_000_000).map(Value::Timestamp)
230            }
231            (Value::Integer(n), DataType::Date) => {
232                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
233                    Some(Value::Date(n as i32))
234                } else {
235                    None
236                }
237            }
238            (Value::Integer(n), DataType::Time) => {
239                if (0..=86_400_000_000).contains(&n) {
240                    Some(Value::Time(n))
241                } else {
242                    None
243                }
244            }
245            (Value::Integer(n), DataType::Interval) => {
246                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
247                    Some(Value::Interval {
248                        months: 0,
249                        days: n as i32,
250                        micros: 0,
251                    })
252                } else {
253                    None
254                }
255            }
256            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
257            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
258            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
259            (Value::Date(d), DataType::Timestamp) => {
260                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
261            }
262            (Value::Timestamp(t), DataType::Date) => {
263                // div_euclid floors correctly for negative µs (pre-1970).
264                let days = t.div_euclid(86_400_000_000);
265                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
266                    Some(Value::Date(days as i32))
267                } else {
268                    None
269                }
270            }
271            (v, DataType::Text)
272                if matches!(
273                    v.data_type(),
274                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
275                ) =>
276            {
277                Some(Value::Text(v.to_string().into()))
278            }
279            (Value::Text(s), DataType::Json) => {
280                crate::json::validate_text(&s).ok()?;
281                Some(Value::Json(s))
282            }
283            (Value::Text(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
284            (Value::Json(s), DataType::Text) => Some(Value::Text(s)),
285            (Value::Json(s), DataType::Jsonb) => crate::json::text_to_jsonb(&s).ok(),
286            (Value::Jsonb(b), DataType::Text) => crate::json::decode_to_text(&b)
287                .ok()
288                .map(|t| Value::Text(t.into())),
289            (Value::Jsonb(b), DataType::Json) => crate::json::decode_to_text(&b)
290                .ok()
291                .map(|t| Value::Json(t.into())),
292            _ => None,
293        }
294    }
295
296    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
297        if matches!(self, Value::Null) {
298            return Some(Value::Null);
299        }
300        if self.data_type() == target {
301            return Some(self.clone());
302        }
303        match (self, target) {
304            (Value::Integer(i), DataType::Real) => {
305                if i.unsigned_abs() <= (1u64 << 53) {
306                    Some(Value::Real(*i as f64))
307                } else {
308                    None
309                }
310            }
311            (Value::Real(r), DataType::Integer) => {
312                if r.is_finite()
313                    && r.fract() == 0.0
314                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
315                {
316                    Some(Value::Integer(*r as i64))
317                } else {
318                    None
319                }
320            }
321            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
322            (Value::Integer(i), DataType::Boolean) => match i {
323                0 => Some(Value::Boolean(false)),
324                1 => Some(Value::Boolean(true)),
325                _ => None,
326            },
327            (Value::Text(s), DataType::Integer) => {
328                let trimmed = s.as_str();
329                let parsed: i64 = trimmed.parse().ok()?;
330                if parsed.to_string() == trimmed {
331                    Some(Value::Integer(parsed))
332                } else {
333                    None
334                }
335            }
336            (Value::Text(s), DataType::Real) => {
337                let trimmed = s.as_str();
338                let parsed: f64 = trimmed.parse().ok()?;
339                if parsed.is_finite() {
340                    Some(Value::Real(parsed))
341                } else {
342                    None
343                }
344            }
345            (Value::Text(_), DataType::Date)
346            | (Value::Text(_), DataType::Time)
347            | (Value::Text(_), DataType::Timestamp)
348            | (Value::Text(_), DataType::Interval)
349            | (Value::Text(_), DataType::Json)
350            | (Value::Text(_), DataType::Jsonb)
351            | (Value::Json(_), DataType::Jsonb)
352            | (Value::Json(_), DataType::Text)
353            | (Value::Jsonb(_), DataType::Json)
354            | (Value::Jsonb(_), DataType::Text) => self.clone().coerce_into(target),
355            (Value::Date(d), DataType::Timestamp) => (*d as i64)
356                .checked_mul(86_400_000_000)
357                .map(Value::Timestamp),
358            (Value::Timestamp(t), DataType::Date) => {
359                if t % 86_400_000_000 == 0 {
360                    let days = t.div_euclid(86_400_000_000);
361                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
362                        Some(Value::Date(days as i32))
363                    } else {
364                        None
365                    }
366                } else {
367                    None
368                }
369            }
370            _ => None,
371        }
372    }
373
374    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
375    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
376        match (self, other) {
377            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
378            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
379            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
380            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
381            _ => None,
382        }
383    }
384}
385
386impl PartialEq for Value {
387    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
388    // normalizes separately (see eval.rs).
389    fn eq(&self, other: &Self) -> bool {
390        match (self, other) {
391            (Value::Null, Value::Null) => true,
392            (Value::Integer(a), Value::Integer(b)) => a == b,
393            (Value::Real(a), Value::Real(b)) => a == b,
394            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
395            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
396            (Value::Text(a), Value::Text(b)) => a == b,
397            (Value::Blob(a), Value::Blob(b)) => a == b,
398            (Value::Boolean(a), Value::Boolean(b)) => a == b,
399            (Value::Time(a), Value::Time(b)) => a == b,
400            (Value::Date(a), Value::Date(b)) => a == b,
401            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
402            (
403                Value::Interval {
404                    months: am,
405                    days: ad,
406                    micros: au,
407                },
408                Value::Interval {
409                    months: bm,
410                    days: bd,
411                    micros: bu,
412                },
413            ) => am == bm && ad == bd && au == bu,
414            (Value::Json(a), Value::Json(b)) => a == b,
415            (Value::Jsonb(a), Value::Jsonb(b)) => a == b,
416            (Value::TsVector(a), Value::TsVector(b)) => a == b,
417            (Value::TsQuery(a), Value::TsQuery(b)) => a == b,
418            (Value::Array(a), Value::Array(b)) => a == b,
419            _ => false,
420        }
421    }
422}
423
424impl Eq for Value {}
425
426impl Hash for Value {
427    fn hash<H: Hasher>(&self, state: &mut H) {
428        match self {
429            Value::Null => 0u8.hash(state),
430            Value::Integer(i) => {
431                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
432                // matching the cross-type PartialEq contract.
433                1u8.hash(state);
434                (*i as f64).to_bits().hash(state);
435            }
436            Value::Real(r) => {
437                1u8.hash(state);
438                r.to_bits().hash(state);
439            }
440            Value::Text(s) => {
441                2u8.hash(state);
442                s.hash(state);
443            }
444            Value::Blob(b) => {
445                3u8.hash(state);
446                b.hash(state);
447            }
448            Value::Boolean(b) => {
449                4u8.hash(state);
450                b.hash(state);
451            }
452            Value::Time(t) => {
453                5u8.hash(state);
454                t.hash(state);
455            }
456            Value::Date(d) => {
457                6u8.hash(state);
458                d.hash(state);
459            }
460            Value::Timestamp(t) => {
461                7u8.hash(state);
462                t.hash(state);
463            }
464            Value::Interval {
465                months,
466                days,
467                micros,
468            } => {
469                8u8.hash(state);
470                months.hash(state);
471                days.hash(state);
472                micros.hash(state);
473            }
474            Value::Json(s) => {
475                9u8.hash(state);
476                s.hash(state);
477            }
478            Value::Jsonb(b) => {
479                10u8.hash(state);
480                b.hash(state);
481            }
482            Value::TsVector(b) => {
483                11u8.hash(state);
484                b.hash(state);
485            }
486            Value::TsQuery(b) => {
487                12u8.hash(state);
488                b.hash(state);
489            }
490            Value::Array(a) => {
491                13u8.hash(state);
492                a.hash(state);
493            }
494            Value::Vector(v) => {
495                14u8.hash(state);
496                v.len().hash(state);
497                for &x in v.iter() {
498                    x.to_bits().hash(state);
499                }
500            }
501        }
502    }
503}
504
505impl PartialOrd for Value {
506    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
507        Some(self.cmp(other))
508    }
509}
510
511impl Ord for Value {
512    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
513    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
514    fn cmp(&self, other: &Self) -> Ordering {
515        match (self, other) {
516            (Value::Null, Value::Null) => Ordering::Equal,
517            (Value::Null, _) => Ordering::Less,
518            (_, Value::Null) => Ordering::Greater,
519
520            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
521            (Value::Boolean(_), _) => Ordering::Less,
522            (_, Value::Boolean(_)) => Ordering::Greater,
523
524            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
525                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
526            }
527            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
528            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
529
530            (Value::Time(a), Value::Time(b)) => a.cmp(b),
531            (Value::Time(_), _) => Ordering::Less,
532            (_, Value::Time(_)) => Ordering::Greater,
533
534            (Value::Date(a), Value::Date(b)) => a.cmp(b),
535            (Value::Date(_), _) => Ordering::Less,
536            (_, Value::Date(_)) => Ordering::Greater,
537
538            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
539            (Value::Timestamp(_), _) => Ordering::Less,
540            (_, Value::Timestamp(_)) => Ordering::Greater,
541
542            (
543                Value::Interval {
544                    months: am,
545                    days: ad,
546                    micros: au,
547                },
548                Value::Interval {
549                    months: bm,
550                    days: bd,
551                    micros: bu,
552                },
553            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
554            (Value::Interval { .. }, _) => Ordering::Less,
555            (_, Value::Interval { .. }) => Ordering::Greater,
556
557            (Value::Json(a), Value::Json(b)) => a.cmp(b),
558            (Value::Json(_), _) => Ordering::Less,
559            (_, Value::Json(_)) => Ordering::Greater,
560
561            (Value::Jsonb(a), Value::Jsonb(b)) => a.as_ref().cmp(b.as_ref()),
562            (Value::Jsonb(_), _) => Ordering::Less,
563            (_, Value::Jsonb(_)) => Ordering::Greater,
564
565            (Value::TsVector(a), Value::TsVector(b)) => a.as_ref().cmp(b.as_ref()),
566            (Value::TsVector(_), _) => Ordering::Less,
567            (_, Value::TsVector(_)) => Ordering::Greater,
568
569            (Value::TsQuery(a), Value::TsQuery(b)) => a.as_ref().cmp(b.as_ref()),
570            (Value::TsQuery(_), _) => Ordering::Less,
571            (_, Value::TsQuery(_)) => Ordering::Greater,
572
573            (Value::Array(a), Value::Array(b)) => a.as_ref().cmp(b.as_ref()),
574            (Value::Array(_), _) => Ordering::Less,
575            (_, Value::Array(_)) => Ordering::Greater,
576
577            (Value::Vector(a), Value::Vector(b)) => a.len().cmp(&b.len()).then_with(|| {
578                for (x, y) in a.iter().zip(b.iter()) {
579                    let ord = x.total_cmp(y);
580                    if ord != Ordering::Equal {
581                        return ord;
582                    }
583                }
584                Ordering::Equal
585            }),
586            (Value::Vector(_), _) => Ordering::Less,
587            (_, Value::Vector(_)) => Ordering::Greater,
588
589            (Value::Text(a), Value::Text(b)) => a.cmp(b),
590            (Value::Text(_), _) => Ordering::Less,
591            (_, Value::Text(_)) => Ordering::Greater,
592
593            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
594        }
595    }
596}
597
598impl fmt::Display for Value {
599    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600        match self {
601            Value::Null => write!(f, "NULL"),
602            Value::Integer(i) => write!(f, "{i}"),
603            Value::Real(r) => {
604                if r.fract() == 0.0 && r.is_finite() {
605                    write!(f, "{r:.1}")
606                } else {
607                    write!(f, "{r}")
608                }
609            }
610            Value::Text(s) => write!(f, "{s}"),
611            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
612            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
613            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
614            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
615            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
616            Value::Interval {
617                months,
618                days,
619                micros,
620            } => {
621                write!(
622                    f,
623                    "{}",
624                    crate::datetime::format_interval(*months, *days, *micros)
625                )
626            }
627            Value::Json(s) => write!(f, "{s}"),
628            Value::Jsonb(b) => match crate::json::decode_to_text(b) {
629                Ok(s) => write!(f, "{s}"),
630                Err(_) => write!(f, "<invalid jsonb>"),
631            },
632            Value::TsVector(b) => write!(f, "{}", crate::fts::tsvector_display(b)),
633            Value::TsQuery(b) => write!(f, "{}", crate::fts::tsquery_display(b)),
634            Value::Array(a) => {
635                write!(f, "{{")?;
636                for (i, elem) in a.iter().enumerate() {
637                    if i > 0 {
638                        write!(f, ",")?;
639                    }
640                    match elem {
641                        Value::Null => write!(f, "NULL")?,
642                        Value::Text(s) => {
643                            write!(f, "\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))?
644                        }
645                        other => write!(f, "{other}")?,
646                    }
647                }
648                write!(f, "}}")
649            }
650            Value::Vector(v) => {
651                write!(f, "[")?;
652                for (i, &x) in v.iter().enumerate() {
653                    if i > 0 {
654                        write!(f, ",")?;
655                    }
656                    write!(f, "{x}")?;
657                }
658                write!(f, "]")
659            }
660        }
661    }
662}
663
664fn hex_encode(data: &[u8]) -> String {
665    let mut s = String::with_capacity(data.len() * 2);
666    for byte in data {
667        s.push_str(&format!("{byte:02X}"));
668    }
669    s
670}
671
672#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
673#[repr(u8)]
674pub enum Collation {
675    #[default]
676    Binary = 0,
677    NoCase = 1,
678    Rtrim = 2,
679}
680
681impl Collation {
682    pub fn from_tag(tag: u8) -> Option<Self> {
683        match tag {
684            0 => Some(Self::Binary),
685            1 => Some(Self::NoCase),
686            2 => Some(Self::Rtrim),
687            _ => None,
688        }
689    }
690
691    pub fn from_name(name: &str) -> Option<Self> {
692        match name.to_ascii_uppercase().as_str() {
693            "BINARY" => Some(Self::Binary),
694            "NOCASE" => Some(Self::NoCase),
695            "RTRIM" => Some(Self::Rtrim),
696            _ => None,
697        }
698    }
699
700    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
701        match self {
702            Collation::Binary => a.cmp(b),
703            Collation::NoCase => Iterator::cmp(
704                a.chars().map(|c| c.to_ascii_lowercase()),
705                b.chars().map(|c| c.to_ascii_lowercase()),
706            ),
707            Collation::Rtrim => {
708                let la = a.trim_end_matches(' ');
709                let lb = b.trim_end_matches(' ');
710                la.cmp(lb)
711            }
712        }
713    }
714
715    pub fn eq_text(self, a: &str, b: &str) -> bool {
716        match self {
717            Collation::Binary => a == b,
718            Collation::NoCase => a.eq_ignore_ascii_case(b),
719            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
720        }
721    }
722}
723
724#[derive(Debug, Clone)]
725pub struct ColumnDef {
726    pub name: String,
727    pub data_type: DataType,
728    pub nullable: bool,
729    pub position: u16,
730    pub default_expr: Option<Expr>,
731    pub default_sql: Option<String>,
732    pub check_expr: Option<Expr>,
733    pub check_sql: Option<String>,
734    pub check_name: Option<String>,
735    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
736    pub is_with_timezone: bool,
737    pub generated_expr: Option<Expr>,
738    pub generated_sql: Option<String>,
739    pub generated_kind: Option<crate::parser::GeneratedKind>,
740    pub collation: Collation,
741}
742
743#[derive(Debug, Clone, Copy, PartialEq, Eq)]
744pub enum GinOpsClass {
745    /// One entry per (key, value) pair; supports `@>` `?` `?|` `?&`.
746    JsonbOps,
747    /// One entry per hash(path‖value); supports `@>` only, ~3x smaller index.
748    JsonbPathOps,
749}
750
751impl GinOpsClass {
752    pub fn as_tag(self) -> u8 {
753        match self {
754            Self::JsonbOps => 0,
755            Self::JsonbPathOps => 1,
756        }
757    }
758
759    pub fn from_tag(t: u8) -> Option<Self> {
760        match t {
761            0 => Some(Self::JsonbOps),
762            1 => Some(Self::JsonbPathOps),
763            _ => None,
764        }
765    }
766}
767
768#[derive(Debug, Clone, Copy, PartialEq, Eq)]
769pub enum InvertedKind {
770    Gin(GinOpsClass),
771    Fts { config_id: u8 },
772    Ann { metric: AnnMetric },
773}
774
775#[derive(Debug, Clone, Copy, PartialEq, Eq)]
776pub enum AnnMetric {
777    L2,
778    Inner,
779    Cosine,
780}
781
782impl AnnMetric {
783    pub fn as_tag(self) -> u8 {
784        match self {
785            Self::L2 => 0,
786            Self::Inner => 1,
787            Self::Cosine => 2,
788        }
789    }
790
791    pub fn from_tag(t: u8) -> Option<Self> {
792        match t {
793            0 => Some(Self::L2),
794            1 => Some(Self::Inner),
795            2 => Some(Self::Cosine),
796            _ => None,
797        }
798    }
799}
800
801#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
802pub enum IndexKind {
803    #[default]
804    BTree,
805    Inverted(InvertedKind),
806}
807
808/// `IndexKey::Column` for `CREATE INDEX ON t (email)`, `IndexKey::Expr` for `LOWER(email)`.
809#[derive(Debug, Clone)]
810pub struct IndexDef {
811    pub name: String,
812    pub keys: Vec<IndexKey>,
813    pub unique: bool,
814    pub predicate_sql: Option<String>,
815    pub predicate_expr: Option<crate::parser::Expr>,
816    pub kind: IndexKind,
817    /// ANN-only: schema column indices pushed into the PRISM cell filter.
818    /// Empty for every other index kind.
819    pub ann_filter_cols: Vec<u16>,
820}
821
822#[derive(Debug, Clone)]
823pub enum IndexKey {
824    Column {
825        idx: u16,
826        collate: Collation,
827    },
828    Expr {
829        expr: crate::parser::Expr,
830        original_sql: String,
831    },
832}
833
834impl IndexDef {
835    /// Used by FK/UNIQUE auto-indexes; expression-key indexes go through a different path.
836    pub fn from_column_lists(
837        name: String,
838        columns: Vec<u16>,
839        collations: Vec<Collation>,
840        unique: bool,
841        predicate_sql: Option<String>,
842        predicate_expr: Option<crate::parser::Expr>,
843        kind: IndexKind,
844    ) -> Self {
845        let keys = if collations.is_empty() {
846            columns
847                .into_iter()
848                .map(|idx| IndexKey::Column {
849                    idx,
850                    collate: Collation::Binary,
851                })
852                .collect()
853        } else {
854            columns
855                .into_iter()
856                .zip(collations)
857                .map(|(idx, collate)| IndexKey::Column { idx, collate })
858                .collect()
859        };
860        Self {
861            name,
862            keys,
863            unique,
864            predicate_sql,
865            predicate_expr,
866            kind,
867            ann_filter_cols: Vec::new(),
868        }
869    }
870
871    /// Expression keys are skipped (positions only come from `IndexKey::Column`).
872    pub fn columns_vec(&self) -> Vec<u16> {
873        self.keys
874            .iter()
875            .filter_map(|k| match k {
876                IndexKey::Column { idx, .. } => Some(*idx),
877                IndexKey::Expr { .. } => None,
878            })
879            .collect()
880    }
881
882    /// Expression keys default to Binary.
883    pub fn collations_vec(&self) -> Vec<Collation> {
884        self.keys
885            .iter()
886            .map(|k| match k {
887                IndexKey::Column { collate, .. } => *collate,
888                IndexKey::Expr { .. } => Collation::Binary,
889            })
890            .collect()
891    }
892
893    pub fn column_positions_iter(&self) -> impl Iterator<Item = u16> + '_ {
894        self.keys.iter().filter_map(|k| match k {
895            IndexKey::Column { idx, .. } => Some(*idx),
896            IndexKey::Expr { .. } => None,
897        })
898    }
899
900    pub fn collation_at(&self, i: usize) -> Collation {
901        match self.keys.get(i) {
902            Some(IndexKey::Column { collate, .. }) => *collate,
903            _ => Collation::Binary,
904        }
905    }
906
907    pub fn is_pure_column_index(&self) -> bool {
908        self.keys
909            .iter()
910            .all(|k| matches!(k, IndexKey::Column { .. }))
911    }
912}
913
914#[derive(Debug, Clone)]
915pub struct ViewDef {
916    pub name: String,
917    pub sql: String,
918    pub column_aliases: Vec<String>,
919}
920
921const VIEW_DEF_VERSION: u8 = 1;
922
923impl ViewDef {
924    pub fn serialize(&self) -> Vec<u8> {
925        let mut buf = Vec::new();
926        buf.push(VIEW_DEF_VERSION);
927
928        let name_bytes = self.name.as_bytes();
929        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
930        buf.extend_from_slice(name_bytes);
931
932        let sql_bytes = self.sql.as_bytes();
933        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
934        buf.extend_from_slice(sql_bytes);
935
936        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
937        for alias in &self.column_aliases {
938            let alias_bytes = alias.as_bytes();
939            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
940            buf.extend_from_slice(alias_bytes);
941        }
942
943        buf
944    }
945
946    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
947        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
948            return Err(crate::error::SqlError::InvalidValue(
949                "invalid view definition version".into(),
950            ));
951        }
952        let mut pos = 1;
953
954        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
955        pos += 2;
956        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
957        pos += name_len;
958
959        let sql_len =
960            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
961        pos += 4;
962        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
963        pos += sql_len;
964
965        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
966        pos += 2;
967        let mut column_aliases = Vec::with_capacity(alias_count);
968        for _ in 0..alias_count {
969            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
970            pos += 2;
971            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
972            pos += alias_len;
973            column_aliases.push(alias);
974        }
975
976        Ok(Self {
977            name,
978            sql,
979            column_aliases,
980        })
981    }
982}
983
984/// Backing table shares the matview's name and is repopulated on REFRESH.
985#[derive(Debug, Clone)]
986pub struct MatviewDef {
987    pub name: String,
988    pub select_sql: String,
989    pub backing_table: String,
990    pub with_data: bool,
991    pub created_at_micros: i64,
992}
993
994const MATVIEW_DEF_VERSION: u8 = 1;
995
996impl MatviewDef {
997    pub fn backing_table_name(name: &str) -> String {
998        name.to_ascii_lowercase()
999    }
1000
1001    pub fn serialize(&self) -> Vec<u8> {
1002        let mut buf = Vec::new();
1003        buf.push(MATVIEW_DEF_VERSION);
1004        write_short_str(&mut buf, &self.name);
1005        write_long_str(&mut buf, &self.select_sql);
1006        write_short_str(&mut buf, &self.backing_table);
1007        buf.push(if self.with_data { 1 } else { 0 });
1008        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
1009        buf
1010    }
1011
1012    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1013        if data.is_empty() || data[0] != MATVIEW_DEF_VERSION {
1014            return Err(crate::error::SqlError::InvalidValue(
1015                "invalid matview definition version".into(),
1016            ));
1017        }
1018        let mut pos = 1usize;
1019        let name = read_short_str(data, &mut pos);
1020        let select_sql = read_long_str(data, &mut pos);
1021        let backing_table = read_short_str(data, &mut pos);
1022        let with_data = data[pos] != 0;
1023        pos += 1;
1024        let created_at_micros = i64::from_le_bytes([
1025            data[pos],
1026            data[pos + 1],
1027            data[pos + 2],
1028            data[pos + 3],
1029            data[pos + 4],
1030            data[pos + 5],
1031            data[pos + 6],
1032            data[pos + 7],
1033        ]);
1034        Ok(Self {
1035            name,
1036            select_sql,
1037            backing_table,
1038            with_data,
1039            created_at_micros,
1040        })
1041    }
1042}
1043
1044#[derive(Debug, Clone)]
1045pub struct TriggerDef {
1046    pub name: String,
1047    pub timing: crate::parser::TriggerTiming,
1048    pub events: Vec<crate::parser::TriggerEvent>,
1049    pub target: String,
1050    pub granularity: crate::parser::TriggerGranularity,
1051    pub referencing: Option<crate::parser::TransitionTables>,
1052    pub when_sql: Option<String>,
1053    pub body_sql: String,
1054    pub enabled: bool,
1055    pub created_at_micros: i64,
1056}
1057
1058const TRIGGER_DEF_VERSION: u8 = 1;
1059
1060impl TriggerDef {
1061    pub fn serialize(&self) -> Vec<u8> {
1062        let mut buf = Vec::new();
1063        buf.push(TRIGGER_DEF_VERSION);
1064
1065        write_short_str(&mut buf, &self.name);
1066        buf.push(match self.timing {
1067            crate::parser::TriggerTiming::Before => 0,
1068            crate::parser::TriggerTiming::After => 1,
1069            crate::parser::TriggerTiming::InsteadOf => 2,
1070        });
1071
1072        buf.extend_from_slice(&(self.events.len() as u16).to_le_bytes());
1073        for ev in &self.events {
1074            match ev {
1075                crate::parser::TriggerEvent::Insert => buf.push(0),
1076                crate::parser::TriggerEvent::Delete => buf.push(1),
1077                crate::parser::TriggerEvent::Update(cols) => {
1078                    buf.push(2);
1079                    buf.extend_from_slice(&(cols.len() as u16).to_le_bytes());
1080                    for c in cols {
1081                        write_short_str(&mut buf, c);
1082                    }
1083                }
1084            }
1085        }
1086
1087        write_short_str(&mut buf, &self.target);
1088        buf.push(match self.granularity {
1089            crate::parser::TriggerGranularity::ForEachRow => 0,
1090            crate::parser::TriggerGranularity::ForEachStatement => 1,
1091        });
1092
1093        match &self.referencing {
1094            None => buf.push(0),
1095            Some(r) => {
1096                buf.push(1);
1097                write_opt_string(&mut buf, &r.new_table_alias);
1098                write_opt_string(&mut buf, &r.old_table_alias);
1099            }
1100        }
1101
1102        match &self.when_sql {
1103            None => buf.push(0),
1104            Some(s) => {
1105                buf.push(1);
1106                write_long_str(&mut buf, s);
1107            }
1108        }
1109
1110        write_long_str(&mut buf, &self.body_sql);
1111        buf.push(if self.enabled { 1 } else { 0 });
1112        buf.extend_from_slice(&self.created_at_micros.to_le_bytes());
1113
1114        buf
1115    }
1116
1117    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1118        if data.is_empty() || data[0] != TRIGGER_DEF_VERSION {
1119            return Err(crate::error::SqlError::InvalidValue(
1120                "invalid trigger definition version".into(),
1121            ));
1122        }
1123        let mut pos = 1;
1124        let name = read_short_str(data, &mut pos);
1125        let timing = match data[pos] {
1126            0 => crate::parser::TriggerTiming::Before,
1127            1 => crate::parser::TriggerTiming::After,
1128            2 => crate::parser::TriggerTiming::InsteadOf,
1129            _ => {
1130                return Err(crate::error::SqlError::InvalidValue(
1131                    "invalid trigger timing tag".into(),
1132                ))
1133            }
1134        };
1135        pos += 1;
1136
1137        let event_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1138        pos += 2;
1139        let mut events = Vec::with_capacity(event_count);
1140        for _ in 0..event_count {
1141            let tag = data[pos];
1142            pos += 1;
1143            let ev = match tag {
1144                0 => crate::parser::TriggerEvent::Insert,
1145                1 => crate::parser::TriggerEvent::Delete,
1146                2 => {
1147                    let cnt = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1148                    pos += 2;
1149                    let mut cols = Vec::with_capacity(cnt);
1150                    for _ in 0..cnt {
1151                        cols.push(read_short_str(data, &mut pos));
1152                    }
1153                    crate::parser::TriggerEvent::Update(cols)
1154                }
1155                _ => {
1156                    return Err(crate::error::SqlError::InvalidValue(
1157                        "invalid trigger event tag".into(),
1158                    ))
1159                }
1160            };
1161            events.push(ev);
1162        }
1163
1164        let target = read_short_str(data, &mut pos);
1165        let granularity = match data[pos] {
1166            0 => crate::parser::TriggerGranularity::ForEachRow,
1167            1 => crate::parser::TriggerGranularity::ForEachStatement,
1168            _ => {
1169                return Err(crate::error::SqlError::InvalidValue(
1170                    "invalid trigger granularity tag".into(),
1171                ))
1172            }
1173        };
1174        pos += 1;
1175
1176        let referencing = if data[pos] == 0 {
1177            pos += 1;
1178            None
1179        } else {
1180            pos += 1;
1181            let new_table_alias = read_opt_string(data, &mut pos);
1182            let old_table_alias = read_opt_string(data, &mut pos);
1183            Some(crate::parser::TransitionTables {
1184                new_table_alias,
1185                old_table_alias,
1186            })
1187        };
1188
1189        let when_sql = if data[pos] == 0 {
1190            pos += 1;
1191            None
1192        } else {
1193            pos += 1;
1194            Some(read_long_str(data, &mut pos))
1195        };
1196
1197        let body_sql = read_long_str(data, &mut pos);
1198        let enabled = data[pos] != 0;
1199        pos += 1;
1200        let created_at_micros = i64::from_le_bytes([
1201            data[pos],
1202            data[pos + 1],
1203            data[pos + 2],
1204            data[pos + 3],
1205            data[pos + 4],
1206            data[pos + 5],
1207            data[pos + 6],
1208            data[pos + 7],
1209        ]);
1210
1211        Ok(Self {
1212            name,
1213            timing,
1214            events,
1215            target,
1216            granularity,
1217            referencing,
1218            when_sql,
1219            body_sql,
1220            enabled,
1221            created_at_micros,
1222        })
1223    }
1224}
1225
1226fn write_short_str(buf: &mut Vec<u8>, s: &str) {
1227    let bytes = s.as_bytes();
1228    buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1229    buf.extend_from_slice(bytes);
1230}
1231
1232fn read_short_str(data: &[u8], pos: &mut usize) -> String {
1233    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1234    *pos += 2;
1235    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1236    *pos += len;
1237    s
1238}
1239
1240fn write_long_str(buf: &mut Vec<u8>, s: &str) {
1241    let bytes = s.as_bytes();
1242    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1243    buf.extend_from_slice(bytes);
1244}
1245
1246fn read_long_str(data: &[u8], pos: &mut usize) -> String {
1247    let len =
1248        u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]) as usize;
1249    *pos += 4;
1250    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1251    *pos += len;
1252    s
1253}
1254
1255#[derive(Debug, Clone)]
1256pub struct TableCheckDef {
1257    pub name: Option<String>,
1258    pub expr: Expr,
1259    pub sql: String,
1260}
1261
1262#[derive(Debug, Clone)]
1263pub struct ForeignKeySchemaEntry {
1264    pub name: Option<String>,
1265    pub columns: Vec<u16>,
1266    pub foreign_table: String,
1267    pub referred_columns: Vec<String>,
1268    pub on_delete: crate::parser::ReferentialAction,
1269    pub on_update: crate::parser::ReferentialAction,
1270    pub deferrable: bool,
1271    pub initially_deferred: bool,
1272}
1273
1274#[derive(Debug)]
1275pub struct TableSchema {
1276    pub name: String,
1277    pub columns: Vec<ColumnDef>,
1278    pub primary_key_columns: Vec<u16>,
1279    pub indices: Vec<IndexDef>,
1280    pub check_constraints: Vec<TableCheckDef>,
1281    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
1282    pub flags: u8,
1283    pk_idx_cache: Vec<usize>,
1284    non_pk_idx_cache: Vec<usize>,
1285    /// Sorted physical slots dropped via DROP COLUMN.
1286    dropped_non_pk_slots: Vec<u16>,
1287    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1288    decode_mapping_cache: Vec<usize>,
1289    /// Logical non-PK order -> physical encoding position.
1290    encoding_positions_cache: Vec<u16>,
1291    has_virtual_columns_cache: bool,
1292    column_map_cache: std::sync::OnceLock<crate::eval::ColumnMap>,
1293}
1294
1295impl Clone for TableSchema {
1296    fn clone(&self) -> Self {
1297        Self {
1298            name: self.name.clone(),
1299            columns: self.columns.clone(),
1300            primary_key_columns: self.primary_key_columns.clone(),
1301            indices: self.indices.clone(),
1302            check_constraints: self.check_constraints.clone(),
1303            foreign_keys: self.foreign_keys.clone(),
1304            flags: self.flags,
1305            pk_idx_cache: self.pk_idx_cache.clone(),
1306            non_pk_idx_cache: self.non_pk_idx_cache.clone(),
1307            dropped_non_pk_slots: self.dropped_non_pk_slots.clone(),
1308            decode_mapping_cache: self.decode_mapping_cache.clone(),
1309            encoding_positions_cache: self.encoding_positions_cache.clone(),
1310            has_virtual_columns_cache: self.has_virtual_columns_cache,
1311            column_map_cache: std::sync::OnceLock::new(),
1312        }
1313    }
1314}
1315
1316impl TableSchema {
1317    pub fn new(
1318        name: String,
1319        columns: Vec<ColumnDef>,
1320        primary_key_columns: Vec<u16>,
1321        indices: Vec<IndexDef>,
1322        check_constraints: Vec<TableCheckDef>,
1323        foreign_keys: Vec<ForeignKeySchemaEntry>,
1324    ) -> Self {
1325        Self::with_drops(
1326            name,
1327            columns,
1328            primary_key_columns,
1329            indices,
1330            check_constraints,
1331            foreign_keys,
1332            vec![],
1333        )
1334    }
1335
1336    pub fn with_drops(
1337        name: String,
1338        columns: Vec<ColumnDef>,
1339        primary_key_columns: Vec<u16>,
1340        indices: Vec<IndexDef>,
1341        check_constraints: Vec<TableCheckDef>,
1342        foreign_keys: Vec<ForeignKeySchemaEntry>,
1343        dropped_non_pk_slots: Vec<u16>,
1344    ) -> Self {
1345        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
1346        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
1347            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
1348            .collect();
1349
1350        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
1351        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
1352        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
1353
1354        let mut drop_idx = 0;
1355        let mut live_idx = 0;
1356        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
1357            if drop_idx < dropped_non_pk_slots.len()
1358                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
1359            {
1360                drop_idx += 1;
1361            } else {
1362                *slot = non_pk_idx_cache[live_idx];
1363                encoding_positions_cache.push(phys_pos as u16);
1364                live_idx += 1;
1365            }
1366        }
1367
1368        let has_virtual_columns_cache = columns.iter().any(|c| {
1369            matches!(
1370                c.generated_kind,
1371                Some(crate::parser::GeneratedKind::Virtual)
1372            )
1373        });
1374
1375        Self {
1376            name,
1377            columns,
1378            primary_key_columns,
1379            indices,
1380            check_constraints,
1381            foreign_keys,
1382            flags: 0,
1383            pk_idx_cache,
1384            non_pk_idx_cache,
1385            dropped_non_pk_slots,
1386            decode_mapping_cache,
1387            encoding_positions_cache,
1388            has_virtual_columns_cache,
1389            column_map_cache: std::sync::OnceLock::new(),
1390        }
1391    }
1392
1393    #[inline]
1394    pub fn column_map(&self) -> &crate::eval::ColumnMap {
1395        self.column_map_cache
1396            .get_or_init(|| crate::eval::ColumnMap::new(&self.columns))
1397    }
1398
1399    pub fn is_strict(&self) -> bool {
1400        self.flags & TABLE_FLAG_STRICT != 0
1401    }
1402
1403    pub fn has_virtual_columns(&self) -> bool {
1404        self.has_virtual_columns_cache
1405    }
1406
1407    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
1408    pub fn rebuild(self) -> Self {
1409        let drops = self.dropped_non_pk_slots;
1410        Self::with_drops(
1411            self.name,
1412            self.columns,
1413            self.primary_key_columns,
1414            self.indices,
1415            self.check_constraints,
1416            self.foreign_keys,
1417            drops,
1418        )
1419    }
1420
1421    pub fn has_checks(&self) -> bool {
1422        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
1423    }
1424
1425    /// Only an ANN index owns a persisted segment; non-ANN tables skip the purge.
1426    pub fn has_ann_index(&self) -> bool {
1427        self.indices
1428            .iter()
1429            .any(|ix| matches!(ix.kind, IndexKind::Inverted(InvertedKind::Ann { .. })))
1430    }
1431
1432    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
1433    pub fn decode_col_mapping(&self) -> &[usize] {
1434        &self.decode_mapping_cache
1435    }
1436
1437    /// Logical non-PK order -> physical encoding position.
1438    pub fn encoding_positions(&self) -> &[u16] {
1439        &self.encoding_positions_cache
1440    }
1441
1442    /// Total physical non-PK column count (live + dropped slots).
1443    pub fn physical_non_pk_count(&self) -> usize {
1444        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
1445    }
1446
1447    pub fn dropped_non_pk_slots(&self) -> &[u16] {
1448        &self.dropped_non_pk_slots
1449    }
1450
1451    pub fn without_column(&self, drop_pos: usize) -> Self {
1452        let non_pk_order = self
1453            .non_pk_idx_cache
1454            .iter()
1455            .position(|&i| i == drop_pos)
1456            .expect("cannot drop PK column via without_column");
1457        let physical_slot = self.encoding_positions_cache[non_pk_order];
1458
1459        let mut new_dropped = self.dropped_non_pk_slots.clone();
1460        new_dropped.push(physical_slot);
1461        new_dropped.sort();
1462
1463        let dropped_name = &self.columns[drop_pos].name;
1464        let drop_pos_u16 = drop_pos as u16;
1465
1466        let mut columns: Vec<ColumnDef> = self
1467            .columns
1468            .iter()
1469            .enumerate()
1470            .filter(|(i, _)| *i != drop_pos)
1471            .map(|(_, c)| {
1472                let mut col = c.clone();
1473                if col.position > drop_pos_u16 {
1474                    col.position -= 1;
1475                }
1476                col
1477            })
1478            .collect();
1479        for (i, col) in columns.iter_mut().enumerate() {
1480            col.position = i as u16;
1481        }
1482
1483        let primary_key_columns: Vec<u16> = self
1484            .primary_key_columns
1485            .iter()
1486            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1487            .collect();
1488
1489        let indices: Vec<IndexDef> = self
1490            .indices
1491            .iter()
1492            .map(|idx| IndexDef {
1493                name: idx.name.clone(),
1494                keys: idx
1495                    .keys
1496                    .iter()
1497                    .map(|k| match k {
1498                        IndexKey::Column { idx, collate } => IndexKey::Column {
1499                            idx: if *idx > drop_pos_u16 { *idx - 1 } else { *idx },
1500                            collate: *collate,
1501                        },
1502                        IndexKey::Expr { expr, original_sql } => IndexKey::Expr {
1503                            expr: expr.clone(),
1504                            original_sql: original_sql.clone(),
1505                        },
1506                    })
1507                    .collect(),
1508                unique: idx.unique,
1509                predicate_sql: idx.predicate_sql.clone(),
1510                predicate_expr: idx.predicate_expr.clone(),
1511                kind: idx.kind,
1512                ann_filter_cols: idx
1513                    .ann_filter_cols
1514                    .iter()
1515                    .filter(|&&p| p != drop_pos_u16)
1516                    .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
1517                    .collect(),
1518            })
1519            .collect();
1520
1521        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
1522            .foreign_keys
1523            .iter()
1524            .map(|fk| ForeignKeySchemaEntry {
1525                name: fk.name.clone(),
1526                columns: fk
1527                    .columns
1528                    .iter()
1529                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
1530                    .collect(),
1531                foreign_table: fk.foreign_table.clone(),
1532                referred_columns: fk.referred_columns.clone(),
1533                on_delete: fk.on_delete,
1534                on_update: fk.on_update,
1535                deferrable: fk.deferrable,
1536                initially_deferred: fk.initially_deferred,
1537            })
1538            .collect();
1539
1540        // Filter out table-level CHECKs that reference the dropped column
1541        let dropped_lower = dropped_name.to_ascii_lowercase();
1542        let check_constraints: Vec<TableCheckDef> = self
1543            .check_constraints
1544            .iter()
1545            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
1546            .cloned()
1547            .collect();
1548
1549        Self::with_drops(
1550            self.name.clone(),
1551            columns,
1552            primary_key_columns,
1553            indices,
1554            check_constraints,
1555            foreign_keys,
1556            new_dropped,
1557        )
1558    }
1559}
1560
1561const SCHEMA_VERSION: u8 = 14;
1562pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
1563
1564fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
1565    match s {
1566        Some(s) => {
1567            let bytes = s.as_bytes();
1568            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1569            buf.extend_from_slice(bytes);
1570        }
1571        None => buf.extend_from_slice(&0u16.to_le_bytes()),
1572    }
1573}
1574
1575fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
1576    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1577    *pos += 2;
1578    if len == 0 {
1579        None
1580    } else {
1581        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1582        *pos += len;
1583        Some(s)
1584    }
1585}
1586
1587fn read_string(data: &[u8], pos: &mut usize) -> String {
1588    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
1589    *pos += 2;
1590    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
1591    *pos += len;
1592    s
1593}
1594
1595impl TableSchema {
1596    pub fn serialize(&self) -> Vec<u8> {
1597        let mut buf = Vec::new();
1598        buf.push(SCHEMA_VERSION);
1599
1600        let name_bytes = self.name.as_bytes();
1601        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
1602        buf.extend_from_slice(name_bytes);
1603
1604        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
1605
1606        for col in &self.columns {
1607            let col_name = col.name.as_bytes();
1608            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
1609            buf.extend_from_slice(col_name);
1610            buf.push(col.data_type.type_tag());
1611            if let DataType::Vector { dim } = col.data_type {
1612                buf.extend_from_slice(&dim.to_le_bytes());
1613            }
1614            buf.push(if col.nullable { 1 } else { 0 });
1615            buf.extend_from_slice(&col.position.to_le_bytes());
1616        }
1617
1618        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
1619        for &pk_idx in &self.primary_key_columns {
1620            buf.extend_from_slice(&pk_idx.to_le_bytes());
1621        }
1622
1623        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
1624        for idx in &self.indices {
1625            let idx_name = idx.name.as_bytes();
1626            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
1627            buf.extend_from_slice(idx_name);
1628            buf.extend_from_slice(&(idx.keys.len() as u16).to_le_bytes());
1629            for key in &idx.keys {
1630                let col_idx = match key {
1631                    IndexKey::Column { idx, .. } => *idx,
1632                    IndexKey::Expr { .. } => u16::MAX,
1633                };
1634                buf.extend_from_slice(&col_idx.to_le_bytes());
1635            }
1636            buf.push(if idx.unique { 1 } else { 0 });
1637        }
1638
1639        for col in &self.columns {
1640            let mut flags: u8 = 0;
1641            if col.default_sql.is_some() {
1642                flags |= 1;
1643            }
1644            if col.check_sql.is_some() {
1645                flags |= 2;
1646            }
1647            buf.push(flags);
1648            if let Some(ref sql) = col.default_sql {
1649                let bytes = sql.as_bytes();
1650                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1651                buf.extend_from_slice(bytes);
1652            }
1653            if let Some(ref sql) = col.check_sql {
1654                let bytes = sql.as_bytes();
1655                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1656                buf.extend_from_slice(bytes);
1657                write_opt_string(&mut buf, &col.check_name);
1658            }
1659        }
1660
1661        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1662        for chk in &self.check_constraints {
1663            write_opt_string(&mut buf, &chk.name);
1664            let sql_bytes = chk.sql.as_bytes();
1665            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1666            buf.extend_from_slice(sql_bytes);
1667        }
1668
1669        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1670        for fk in &self.foreign_keys {
1671            write_opt_string(&mut buf, &fk.name);
1672            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1673            for &col_idx in &fk.columns {
1674                buf.extend_from_slice(&col_idx.to_le_bytes());
1675            }
1676            let ft_bytes = fk.foreign_table.as_bytes();
1677            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1678            buf.extend_from_slice(ft_bytes);
1679            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1680            for rc in &fk.referred_columns {
1681                let rc_bytes = rc.as_bytes();
1682                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1683                buf.extend_from_slice(rc_bytes);
1684            }
1685        }
1686
1687        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1688        for &slot in &self.dropped_non_pk_slots {
1689            buf.extend_from_slice(&slot.to_le_bytes());
1690        }
1691
1692        for col in &self.columns {
1693            let kind_tag: u8 = match col.generated_kind {
1694                None => 0,
1695                Some(crate::parser::GeneratedKind::Stored) => 1,
1696                Some(crate::parser::GeneratedKind::Virtual) => 2,
1697            };
1698            buf.push(kind_tag);
1699            if kind_tag != 0 {
1700                let sql = col.generated_sql.as_deref().unwrap_or("");
1701                let bytes = sql.as_bytes();
1702                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1703                buf.extend_from_slice(bytes);
1704            }
1705        }
1706
1707        for idx in &self.indices {
1708            match &idx.predicate_sql {
1709                Some(sql) => {
1710                    buf.push(1);
1711                    let bytes = sql.as_bytes();
1712                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1713                    buf.extend_from_slice(bytes);
1714                }
1715                None => buf.push(0),
1716            }
1717        }
1718
1719        for fk in &self.foreign_keys {
1720            buf.push(fk.on_delete as u8);
1721            buf.push(fk.on_update as u8);
1722        }
1723
1724        for fk in &self.foreign_keys {
1725            let mut flags: u8 = 0;
1726            if fk.deferrable {
1727                flags |= 0b01;
1728            }
1729            if fk.initially_deferred {
1730                flags |= 0b10;
1731            }
1732            buf.push(flags);
1733        }
1734
1735        for col in &self.columns {
1736            buf.push(col.collation as u8);
1737        }
1738        for idx in &self.indices {
1739            let n = idx.keys.len() as u16;
1740            buf.extend_from_slice(&n.to_le_bytes());
1741            for key in &idx.keys {
1742                let c = match key {
1743                    IndexKey::Column { collate, .. } => *collate,
1744                    IndexKey::Expr { .. } => Collation::Binary,
1745                };
1746                buf.push(c as u8);
1747            }
1748        }
1749        for idx in &self.indices {
1750            match idx.kind {
1751                IndexKind::BTree => buf.push(0),
1752                IndexKind::Inverted(InvertedKind::Gin(ops)) => {
1753                    buf.push(1);
1754                    buf.push(ops.as_tag());
1755                }
1756                IndexKind::Inverted(InvertedKind::Fts { config_id }) => {
1757                    buf.push(2);
1758                    buf.push(config_id);
1759                }
1760                IndexKind::Inverted(InvertedKind::Ann { metric }) => {
1761                    buf.push(3);
1762                    buf.push(metric.as_tag());
1763                }
1764            }
1765        }
1766        buf.push(self.flags);
1767
1768        // v12: per-index expression-key extension. Emit (position, SQL) for each Expr key.
1769        // v11 readers stop before this section; v12 readers consume it.
1770        for idx in &self.indices {
1771            let expr_count = idx
1772                .keys
1773                .iter()
1774                .filter(|k| matches!(k, IndexKey::Expr { .. }))
1775                .count() as u16;
1776            buf.extend_from_slice(&expr_count.to_le_bytes());
1777            for (pos, key) in idx.keys.iter().enumerate() {
1778                if let IndexKey::Expr { original_sql, .. } = key {
1779                    buf.extend_from_slice(&(pos as u16).to_le_bytes());
1780                    let bytes = original_sql.as_bytes();
1781                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1782                    buf.extend_from_slice(bytes);
1783                }
1784            }
1785        }
1786
1787        // v14: per-index ANN filter columns.
1788        for idx in &self.indices {
1789            buf.extend_from_slice(&(idx.ann_filter_cols.len() as u16).to_le_bytes());
1790            for &col in &idx.ann_filter_cols {
1791                buf.extend_from_slice(&col.to_le_bytes());
1792            }
1793        }
1794
1795        buf
1796    }
1797
1798    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1799        let mut pos = 0;
1800
1801        if data.is_empty()
1802            || !matches!(
1803                data[0],
1804                1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | SCHEMA_VERSION
1805            )
1806        {
1807            return Err(crate::error::SqlError::InvalidValue(
1808                "invalid schema version".into(),
1809            ));
1810        }
1811        let version = data[0];
1812        pos += 1;
1813
1814        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1815        pos += 2;
1816        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1817        pos += name_len;
1818
1819        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1820        pos += 2;
1821
1822        let mut columns = Vec::with_capacity(col_count);
1823        for _ in 0..col_count {
1824            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1825            pos += 2;
1826            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1827            pos += col_name_len;
1828            let tag = data[pos];
1829            pos += 1;
1830            let data_type = if tag == 15 {
1831                let dim = u16::from_le_bytes([data[pos], data[pos + 1]]);
1832                pos += 2;
1833                DataType::Vector { dim }
1834            } else {
1835                DataType::from_tag(tag).ok_or_else(|| {
1836                    crate::error::SqlError::InvalidValue("unknown data type tag".into())
1837                })?
1838            };
1839            let nullable = data[pos] != 0;
1840            pos += 1;
1841            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1842            pos += 2;
1843            columns.push(ColumnDef {
1844                name: col_name,
1845                data_type,
1846                nullable,
1847                position,
1848                default_expr: None,
1849                default_sql: None,
1850                check_expr: None,
1851                check_sql: None,
1852                check_name: None,
1853                is_with_timezone: false,
1854                generated_expr: None,
1855                generated_sql: None,
1856                generated_kind: None,
1857                collation: Collation::Binary,
1858            });
1859        }
1860
1861        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1862        pos += 2;
1863        let mut primary_key_columns = Vec::with_capacity(pk_count);
1864        for _ in 0..pk_count {
1865            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1866            pos += 2;
1867            primary_key_columns.push(pk_idx);
1868        }
1869
1870        let indices = if version >= 2 && pos + 2 <= data.len() {
1871            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1872            pos += 2;
1873            let mut idxs = Vec::with_capacity(idx_count);
1874            for _ in 0..idx_count {
1875                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1876                pos += 2;
1877                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1878                pos += idx_name_len;
1879                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1880                pos += 2;
1881                let mut keys: Vec<IndexKey> = Vec::with_capacity(col_count);
1882                for _ in 0..col_count {
1883                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1884                    pos += 2;
1885                    // u16::MAX marks an expression key that the v12 section will fill in below.
1886                    // For v11 indexes (no expression section), this stays as a column placeholder.
1887                    keys.push(IndexKey::Column {
1888                        idx: col_idx,
1889                        collate: Collation::Binary,
1890                    });
1891                }
1892                let unique = data[pos] != 0;
1893                pos += 1;
1894                idxs.push(IndexDef {
1895                    name: idx_name,
1896                    keys,
1897                    unique,
1898                    predicate_sql: None,
1899                    predicate_expr: None,
1900                    kind: IndexKind::default(),
1901                    ann_filter_cols: Vec::new(),
1902                });
1903            }
1904            idxs
1905        } else {
1906            vec![]
1907        };
1908
1909        let mut check_constraints = Vec::new();
1910        let mut foreign_keys = Vec::new();
1911
1912        if version >= 3 && pos < data.len() {
1913            for col in &mut columns {
1914                let flags = data[pos];
1915                pos += 1;
1916                if flags & 1 != 0 {
1917                    let sql = read_string(data, &mut pos);
1918                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1919                        crate::error::SqlError::InvalidValue(format!(
1920                            "cannot parse DEFAULT expression: {sql}"
1921                        ))
1922                    })?);
1923                    col.default_sql = Some(sql);
1924                }
1925                if flags & 2 != 0 {
1926                    let sql = read_string(data, &mut pos);
1927                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1928                        crate::error::SqlError::InvalidValue(format!(
1929                            "cannot parse CHECK expression: {sql}"
1930                        ))
1931                    })?);
1932                    col.check_sql = Some(sql);
1933                    col.check_name = read_opt_string(data, &mut pos);
1934                }
1935            }
1936
1937            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1938            pos += 2;
1939            for _ in 0..chk_count {
1940                let name = read_opt_string(data, &mut pos);
1941                let sql = read_string(data, &mut pos);
1942                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1943                    crate::error::SqlError::InvalidValue(format!(
1944                        "cannot parse CHECK expression: {sql}"
1945                    ))
1946                })?;
1947                check_constraints.push(TableCheckDef { name, expr, sql });
1948            }
1949
1950            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1951            pos += 2;
1952            for _ in 0..fk_count {
1953                let name = read_opt_string(data, &mut pos);
1954                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1955                pos += 2;
1956                let mut cols = Vec::with_capacity(col_count);
1957                for _ in 0..col_count {
1958                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1959                    pos += 2;
1960                    cols.push(col_idx);
1961                }
1962                let foreign_table = read_string(data, &mut pos);
1963                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1964                pos += 2;
1965                let mut referred_columns = Vec::with_capacity(ref_count);
1966                for _ in 0..ref_count {
1967                    referred_columns.push(read_string(data, &mut pos));
1968                }
1969                foreign_keys.push(ForeignKeySchemaEntry {
1970                    name,
1971                    columns: cols,
1972                    foreign_table,
1973                    referred_columns,
1974                    on_delete: crate::parser::ReferentialAction::NoAction,
1975                    on_update: crate::parser::ReferentialAction::NoAction,
1976                    deferrable: false,
1977                    initially_deferred: false,
1978                });
1979            }
1980        }
1981        let mut dropped_non_pk_slots = Vec::new();
1982        if version >= 4 && pos + 2 <= data.len() {
1983            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1984            pos += 2;
1985            for _ in 0..slot_count {
1986                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1987                pos += 2;
1988                dropped_non_pk_slots.push(slot);
1989            }
1990        }
1991        if version >= 5 && pos < data.len() {
1992            for col in &mut columns {
1993                let kind_tag = data[pos];
1994                pos += 1;
1995                if kind_tag != 0 {
1996                    let len = u32::from_le_bytes([
1997                        data[pos],
1998                        data[pos + 1],
1999                        data[pos + 2],
2000                        data[pos + 3],
2001                    ]) as usize;
2002                    pos += 4;
2003                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
2004                    pos += len;
2005                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2006                        crate::error::SqlError::InvalidValue(format!(
2007                            "cannot parse GENERATED expression: {sql}"
2008                        ))
2009                    })?;
2010                    col.generated_sql = Some(sql);
2011                    col.generated_expr = Some(expr);
2012                    col.generated_kind = Some(match kind_tag {
2013                        1 => crate::parser::GeneratedKind::Stored,
2014                        2 => crate::parser::GeneratedKind::Virtual,
2015                        _ => {
2016                            return Err(crate::error::SqlError::InvalidValue(
2017                                "unknown GENERATED kind tag".into(),
2018                            ));
2019                        }
2020                    });
2021                }
2022            }
2023        }
2024        let mut indices = indices;
2025        if version >= 6 && pos < data.len() {
2026            for idx in &mut indices {
2027                let flag = data[pos];
2028                pos += 1;
2029                if flag == 1 {
2030                    let len = u32::from_le_bytes([
2031                        data[pos],
2032                        data[pos + 1],
2033                        data[pos + 2],
2034                        data[pos + 3],
2035                    ]) as usize;
2036                    pos += 4;
2037                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
2038                    pos += len;
2039                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2040                        crate::error::SqlError::InvalidValue(format!(
2041                            "cannot parse partial-index predicate: {sql}"
2042                        ))
2043                    })?;
2044                    idx.predicate_sql = Some(sql);
2045                    idx.predicate_expr = Some(expr);
2046                }
2047            }
2048            for fk in &mut foreign_keys {
2049                fk.on_delete =
2050                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
2051                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
2052                    })?;
2053                pos += 1;
2054                fk.on_update =
2055                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
2056                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
2057                    })?;
2058                pos += 1;
2059            }
2060            if version >= 11 {
2061                for fk in &mut foreign_keys {
2062                    if pos >= data.len() {
2063                        break;
2064                    }
2065                    let flags = data[pos];
2066                    pos += 1;
2067                    fk.deferrable = flags & 0b01 != 0;
2068                    fk.initially_deferred = flags & 0b10 != 0;
2069                }
2070            }
2071        }
2072
2073        let mut columns = columns;
2074        let mut indices = indices;
2075        let mut flags: u8 = 0;
2076        if version >= 7 && pos < data.len() {
2077            for col in &mut columns {
2078                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
2079                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
2080                })?;
2081                pos += 1;
2082            }
2083            for idx in &mut indices {
2084                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2085                pos += 2;
2086                for i in 0..n {
2087                    let collate = Collation::from_tag(data[pos]).ok_or_else(|| {
2088                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
2089                    })?;
2090                    pos += 1;
2091                    if let Some(IndexKey::Column { collate: c, .. }) = idx.keys.get_mut(i) {
2092                        *c = collate;
2093                    }
2094                }
2095            }
2096            if version >= 9 {
2097                for idx in &mut indices {
2098                    if pos >= data.len() {
2099                        break;
2100                    }
2101                    let tag = data[pos];
2102                    pos += 1;
2103                    idx.kind = match tag {
2104                        0 => IndexKind::BTree,
2105                        1 => {
2106                            if pos >= data.len() {
2107                                return Err(crate::error::SqlError::InvalidValue(
2108                                    "GIN index missing opclass tag".into(),
2109                                ));
2110                            }
2111                            let ops = GinOpsClass::from_tag(data[pos]).ok_or_else(|| {
2112                                crate::error::SqlError::InvalidValue(
2113                                    "unknown GIN opclass tag".into(),
2114                                )
2115                            })?;
2116                            pos += 1;
2117                            IndexKind::Inverted(InvertedKind::Gin(ops))
2118                        }
2119                        2 => {
2120                            if pos >= data.len() {
2121                                return Err(crate::error::SqlError::InvalidValue(
2122                                    "FTS index missing config_id".into(),
2123                                ));
2124                            }
2125                            let config_id = data[pos];
2126                            pos += 1;
2127                            IndexKind::Inverted(InvertedKind::Fts { config_id })
2128                        }
2129                        3 => {
2130                            if pos >= data.len() {
2131                                return Err(crate::error::SqlError::InvalidValue(
2132                                    "ANN index missing metric tag".into(),
2133                                ));
2134                            }
2135                            let metric = AnnMetric::from_tag(data[pos]).ok_or_else(|| {
2136                                crate::error::SqlError::InvalidValue(
2137                                    "unknown ANN metric tag".into(),
2138                                )
2139                            })?;
2140                            pos += 1;
2141                            IndexKind::Inverted(InvertedKind::Ann { metric })
2142                        }
2143                        _ => {
2144                            return Err(crate::error::SqlError::InvalidValue(
2145                                "unknown IndexKind tag".into(),
2146                            ));
2147                        }
2148                    };
2149                }
2150            }
2151            if pos < data.len() {
2152                flags = data[pos];
2153                pos += 1;
2154            }
2155            if version >= 12 {
2156                for idx in &mut indices {
2157                    if pos + 2 > data.len() {
2158                        break;
2159                    }
2160                    let expr_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2161                    pos += 2;
2162                    for _ in 0..expr_count {
2163                        if pos + 6 > data.len() {
2164                            return Err(crate::error::SqlError::InvalidValue(
2165                                "truncated index expression key".into(),
2166                            ));
2167                        }
2168                        let key_pos = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2169                        pos += 2;
2170                        let sql_len = u32::from_le_bytes([
2171                            data[pos],
2172                            data[pos + 1],
2173                            data[pos + 2],
2174                            data[pos + 3],
2175                        ]) as usize;
2176                        pos += 4;
2177                        if pos + sql_len > data.len() {
2178                            return Err(crate::error::SqlError::InvalidValue(
2179                                "truncated expression-key SQL".into(),
2180                            ));
2181                        }
2182                        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
2183                        pos += sql_len;
2184                        let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
2185                            crate::error::SqlError::InvalidValue(format!(
2186                                "cannot parse index expression: {sql}"
2187                            ))
2188                        })?;
2189                        if key_pos < idx.keys.len() {
2190                            idx.keys[key_pos] = IndexKey::Expr {
2191                                expr,
2192                                original_sql: sql,
2193                            };
2194                        }
2195                    }
2196                }
2197            }
2198            if version >= 14 {
2199                for idx in &mut indices {
2200                    if pos + 2 > data.len() {
2201                        break;
2202                    }
2203                    let fcount = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
2204                    pos += 2;
2205                    let mut fcols = Vec::with_capacity(fcount);
2206                    for _ in 0..fcount {
2207                        if pos + 2 > data.len() {
2208                            return Err(crate::error::SqlError::InvalidValue(
2209                                "truncated ANN filter columns".into(),
2210                            ));
2211                        }
2212                        fcols.push(u16::from_le_bytes([data[pos], data[pos + 1]]));
2213                        pos += 2;
2214                    }
2215                    idx.ann_filter_cols = fcols;
2216                }
2217            }
2218        }
2219        let _ = pos;
2220
2221        let mut schema = Self::with_drops(
2222            name,
2223            columns,
2224            primary_key_columns,
2225            indices,
2226            check_constraints,
2227            foreign_keys,
2228            dropped_non_pk_slots,
2229        );
2230        schema.flags = flags;
2231        Ok(schema)
2232    }
2233
2234    pub fn column_index(&self, name: &str) -> Option<usize> {
2235        self.columns
2236            .iter()
2237            .position(|c| c.name.eq_ignore_ascii_case(name))
2238    }
2239
2240    pub fn non_pk_indices(&self) -> &[usize] {
2241        &self.non_pk_idx_cache
2242    }
2243
2244    pub fn pk_indices(&self) -> &[usize] {
2245        &self.pk_idx_cache
2246    }
2247
2248    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
2249        let lower = name.to_ascii_lowercase();
2250        self.indices.iter().find(|i| i.name == lower)
2251    }
2252
2253    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
2254        format!("__idx_{table_name}_{index_name}").into_bytes()
2255    }
2256}
2257
2258#[derive(Debug)]
2259pub enum ExecutionResult {
2260    RowsAffected(u64),
2261    Query(QueryResult),
2262    Ok,
2263}
2264
2265#[derive(Debug, Clone)]
2266pub struct QueryResult {
2267    pub columns: Vec<String>,
2268    pub rows: Vec<Vec<Value>>,
2269}
2270
2271#[cfg(test)]
2272#[path = "types_tests.rs"]
2273mod tests;