Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4
5pub use compact_str::CompactString;
6
7use crate::parser::Expr;
8
9/// SQL data types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22}
23
24impl DataType {
25    pub fn type_tag(self) -> u8 {
26        match self {
27            DataType::Null => 0,
28            DataType::Blob => 1,
29            DataType::Text => 2,
30            DataType::Boolean => 3,
31            DataType::Integer => 4,
32            DataType::Real => 5,
33            DataType::Time => 6,
34            DataType::Date => 7,
35            DataType::Timestamp => 8,
36            DataType::Interval => 9,
37        }
38    }
39
40    pub fn from_tag(tag: u8) -> Option<Self> {
41        match tag {
42            0 => Some(DataType::Null),
43            1 => Some(DataType::Blob),
44            2 => Some(DataType::Text),
45            3 => Some(DataType::Boolean),
46            4 => Some(DataType::Integer),
47            5 => Some(DataType::Real),
48            6 => Some(DataType::Time),
49            7 => Some(DataType::Date),
50            8 => Some(DataType::Timestamp),
51            9 => Some(DataType::Interval),
52            _ => None,
53        }
54    }
55}
56
57impl fmt::Display for DataType {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        match self {
60            DataType::Null => write!(f, "NULL"),
61            DataType::Integer => write!(f, "INTEGER"),
62            DataType::Real => write!(f, "REAL"),
63            DataType::Text => write!(f, "TEXT"),
64            DataType::Blob => write!(f, "BLOB"),
65            DataType::Boolean => write!(f, "BOOLEAN"),
66            DataType::Time => write!(f, "TIME"),
67            DataType::Date => write!(f, "DATE"),
68            DataType::Timestamp => write!(f, "TIMESTAMP"),
69            DataType::Interval => write!(f, "INTERVAL"),
70        }
71    }
72}
73
74/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
75/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
76#[derive(Debug, Clone, Default)]
77pub enum Value {
78    #[default]
79    Null,
80    Integer(i64),
81    Real(f64),
82    Text(CompactString),
83    Blob(Vec<u8>),
84    Boolean(bool),
85    Time(i64),
86    Date(i32),
87    Timestamp(i64),
88    Interval {
89        months: i32,
90        days: i32,
91        micros: i64,
92    },
93}
94
95impl Value {
96    pub fn data_type(&self) -> DataType {
97        match self {
98            Value::Null => DataType::Null,
99            Value::Integer(_) => DataType::Integer,
100            Value::Real(_) => DataType::Real,
101            Value::Text(_) => DataType::Text,
102            Value::Blob(_) => DataType::Blob,
103            Value::Boolean(_) => DataType::Boolean,
104            Value::Time(_) => DataType::Time,
105            Value::Date(_) => DataType::Date,
106            Value::Timestamp(_) => DataType::Timestamp,
107            Value::Interval { .. } => DataType::Interval,
108        }
109    }
110
111    pub fn is_null(&self) -> bool {
112        matches!(self, Value::Null)
113    }
114
115    /// Returns true for `±infinity` sentinel values on DATE / TIMESTAMP; false otherwise.
116    pub fn is_finite_temporal(&self) -> bool {
117        match self {
118            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
119            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
120            _ => true,
121        }
122    }
123
124    /// Attempt to coerce this value to the target type.
125    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
126        match (self, target) {
127            (_, DataType::Null) => Some(Value::Null),
128            (Value::Null, _) => Some(Value::Null),
129            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
130            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
131            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
132            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
133            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
134            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
135            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
136            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
137            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
138            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
139            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
140            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
141            (
142                Value::Interval {
143                    months,
144                    days,
145                    micros,
146                },
147                DataType::Interval,
148            ) => Some(Value::Interval {
149                months: *months,
150                days: *days,
151                micros: *micros,
152            }),
153            _ => None,
154        }
155    }
156
157    pub fn coerce_into(self, target: DataType) -> Option<Value> {
158        if self.is_null() || target == DataType::Null {
159            return Some(Value::Null);
160        }
161        if self.data_type() == target {
162            return Some(self);
163        }
164        match (self, target) {
165            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
166            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
167            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
168            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
169            (Value::Text(s), DataType::Date) => {
170                crate::datetime::parse_date(&s).ok().map(Value::Date)
171            }
172            (Value::Text(s), DataType::Time) => {
173                crate::datetime::parse_time(&s).ok().map(Value::Time)
174            }
175            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
176                .ok()
177                .map(Value::Timestamp),
178            (Value::Text(s), DataType::Interval) => {
179                crate::datetime::parse_interval(&s)
180                    .ok()
181                    .map(|(m, d, u)| Value::Interval {
182                        months: m,
183                        days: d,
184                        micros: u,
185                    })
186            }
187            // INTEGER → TIMESTAMP: Unix epoch seconds.
188            (Value::Integer(n), DataType::Timestamp) => {
189                n.checked_mul(1_000_000).map(Value::Timestamp)
190            }
191            (Value::Integer(n), DataType::Date) => {
192                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
193                    Some(Value::Date(n as i32))
194                } else {
195                    None
196                }
197            }
198            (Value::Integer(n), DataType::Time) => {
199                if (0..=86_400_000_000).contains(&n) {
200                    Some(Value::Time(n))
201                } else {
202                    None
203                }
204            }
205            (Value::Integer(n), DataType::Interval) => {
206                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
207                    Some(Value::Interval {
208                        months: 0,
209                        days: n as i32,
210                        micros: 0,
211                    })
212                } else {
213                    None
214                }
215            }
216            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
217            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
218            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
219            (Value::Date(d), DataType::Timestamp) => {
220                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
221            }
222            (Value::Timestamp(t), DataType::Date) => {
223                // div_euclid floors correctly for negative µs (pre-1970).
224                let days = t.div_euclid(86_400_000_000);
225                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
226                    Some(Value::Date(days as i32))
227                } else {
228                    None
229                }
230            }
231            (v, DataType::Text)
232                if matches!(
233                    v.data_type(),
234                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
235                ) =>
236            {
237                Some(Value::Text(v.to_string().into()))
238            }
239            _ => None,
240        }
241    }
242
243    pub fn strict_coerce(&self, target: DataType) -> Option<Value> {
244        if matches!(self, Value::Null) {
245            return Some(Value::Null);
246        }
247        if self.data_type() == target {
248            return Some(self.clone());
249        }
250        match (self, target) {
251            (Value::Integer(i), DataType::Real) => {
252                if i.unsigned_abs() <= (1u64 << 53) {
253                    Some(Value::Real(*i as f64))
254                } else {
255                    None
256                }
257            }
258            (Value::Real(r), DataType::Integer) => {
259                if r.is_finite()
260                    && r.fract() == 0.0
261                    && (i64::MIN as f64..=i64::MAX as f64).contains(r)
262                {
263                    Some(Value::Integer(*r as i64))
264                } else {
265                    None
266                }
267            }
268            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
269            (Value::Integer(i), DataType::Boolean) => match i {
270                0 => Some(Value::Boolean(false)),
271                1 => Some(Value::Boolean(true)),
272                _ => None,
273            },
274            (Value::Text(s), DataType::Integer) => {
275                let trimmed = s.as_str();
276                let parsed: i64 = trimmed.parse().ok()?;
277                if parsed.to_string() == trimmed {
278                    Some(Value::Integer(parsed))
279                } else {
280                    None
281                }
282            }
283            (Value::Text(s), DataType::Real) => {
284                let trimmed = s.as_str();
285                let parsed: f64 = trimmed.parse().ok()?;
286                if parsed.is_finite() {
287                    Some(Value::Real(parsed))
288                } else {
289                    None
290                }
291            }
292            (Value::Text(_), DataType::Date)
293            | (Value::Text(_), DataType::Time)
294            | (Value::Text(_), DataType::Timestamp)
295            | (Value::Text(_), DataType::Interval) => self.clone().coerce_into(target),
296            (Value::Date(d), DataType::Timestamp) => (*d as i64)
297                .checked_mul(86_400_000_000)
298                .map(Value::Timestamp),
299            (Value::Timestamp(t), DataType::Date) => {
300                if t % 86_400_000_000 == 0 {
301                    let days = t.div_euclid(86_400_000_000);
302                    if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
303                        Some(Value::Date(days as i32))
304                    } else {
305                        None
306                    }
307                } else {
308                    None
309                }
310            }
311            _ => None,
312        }
313    }
314
315    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
316    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
317        match (self, other) {
318            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
319            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
320            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
321            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
322            _ => None,
323        }
324    }
325}
326
327impl PartialEq for Value {
328    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
329    // normalizes separately (see eval.rs).
330    fn eq(&self, other: &Self) -> bool {
331        match (self, other) {
332            (Value::Null, Value::Null) => true,
333            (Value::Integer(a), Value::Integer(b)) => a == b,
334            (Value::Real(a), Value::Real(b)) => a == b,
335            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
336            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
337            (Value::Text(a), Value::Text(b)) => a == b,
338            (Value::Blob(a), Value::Blob(b)) => a == b,
339            (Value::Boolean(a), Value::Boolean(b)) => a == b,
340            (Value::Time(a), Value::Time(b)) => a == b,
341            (Value::Date(a), Value::Date(b)) => a == b,
342            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
343            (
344                Value::Interval {
345                    months: am,
346                    days: ad,
347                    micros: au,
348                },
349                Value::Interval {
350                    months: bm,
351                    days: bd,
352                    micros: bu,
353                },
354            ) => am == bm && ad == bd && au == bu,
355            _ => false,
356        }
357    }
358}
359
360impl Eq for Value {}
361
362impl Hash for Value {
363    fn hash<H: Hasher>(&self, state: &mut H) {
364        match self {
365            Value::Null => 0u8.hash(state),
366            Value::Integer(i) => {
367                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
368                // matching the cross-type PartialEq contract.
369                1u8.hash(state);
370                (*i as f64).to_bits().hash(state);
371            }
372            Value::Real(r) => {
373                1u8.hash(state);
374                r.to_bits().hash(state);
375            }
376            Value::Text(s) => {
377                2u8.hash(state);
378                s.hash(state);
379            }
380            Value::Blob(b) => {
381                3u8.hash(state);
382                b.hash(state);
383            }
384            Value::Boolean(b) => {
385                4u8.hash(state);
386                b.hash(state);
387            }
388            Value::Time(t) => {
389                5u8.hash(state);
390                t.hash(state);
391            }
392            Value::Date(d) => {
393                6u8.hash(state);
394                d.hash(state);
395            }
396            Value::Timestamp(t) => {
397                7u8.hash(state);
398                t.hash(state);
399            }
400            Value::Interval {
401                months,
402                days,
403                micros,
404            } => {
405                8u8.hash(state);
406                months.hash(state);
407                days.hash(state);
408                micros.hash(state);
409            }
410        }
411    }
412}
413
414impl PartialOrd for Value {
415    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
416        Some(self.cmp(other))
417    }
418}
419
420impl Ord for Value {
421    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
422    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
423    fn cmp(&self, other: &Self) -> Ordering {
424        match (self, other) {
425            (Value::Null, Value::Null) => Ordering::Equal,
426            (Value::Null, _) => Ordering::Less,
427            (_, Value::Null) => Ordering::Greater,
428
429            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
430            (Value::Boolean(_), _) => Ordering::Less,
431            (_, Value::Boolean(_)) => Ordering::Greater,
432
433            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
434                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
435            }
436            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
437            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
438
439            (Value::Time(a), Value::Time(b)) => a.cmp(b),
440            (Value::Time(_), _) => Ordering::Less,
441            (_, Value::Time(_)) => Ordering::Greater,
442
443            (Value::Date(a), Value::Date(b)) => a.cmp(b),
444            (Value::Date(_), _) => Ordering::Less,
445            (_, Value::Date(_)) => Ordering::Greater,
446
447            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
448            (Value::Timestamp(_), _) => Ordering::Less,
449            (_, Value::Timestamp(_)) => Ordering::Greater,
450
451            (
452                Value::Interval {
453                    months: am,
454                    days: ad,
455                    micros: au,
456                },
457                Value::Interval {
458                    months: bm,
459                    days: bd,
460                    micros: bu,
461                },
462            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
463            (Value::Interval { .. }, _) => Ordering::Less,
464            (_, Value::Interval { .. }) => Ordering::Greater,
465
466            (Value::Text(a), Value::Text(b)) => a.cmp(b),
467            (Value::Text(_), _) => Ordering::Less,
468            (_, Value::Text(_)) => Ordering::Greater,
469
470            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
471        }
472    }
473}
474
475impl fmt::Display for Value {
476    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
477        match self {
478            Value::Null => write!(f, "NULL"),
479            Value::Integer(i) => write!(f, "{i}"),
480            Value::Real(r) => {
481                if r.fract() == 0.0 && r.is_finite() {
482                    write!(f, "{r:.1}")
483                } else {
484                    write!(f, "{r}")
485                }
486            }
487            Value::Text(s) => write!(f, "{s}"),
488            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
489            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
490            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
491            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
492            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
493            Value::Interval {
494                months,
495                days,
496                micros,
497            } => {
498                write!(
499                    f,
500                    "{}",
501                    crate::datetime::format_interval(*months, *days, *micros)
502                )
503            }
504        }
505    }
506}
507
508fn hex_encode(data: &[u8]) -> String {
509    let mut s = String::with_capacity(data.len() * 2);
510    for byte in data {
511        s.push_str(&format!("{byte:02X}"));
512    }
513    s
514}
515
516#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
517#[repr(u8)]
518pub enum Collation {
519    #[default]
520    Binary = 0,
521    NoCase = 1,
522    Rtrim = 2,
523}
524
525impl Collation {
526    pub fn from_tag(tag: u8) -> Option<Self> {
527        match tag {
528            0 => Some(Self::Binary),
529            1 => Some(Self::NoCase),
530            2 => Some(Self::Rtrim),
531            _ => None,
532        }
533    }
534
535    pub fn from_name(name: &str) -> Option<Self> {
536        match name.to_ascii_uppercase().as_str() {
537            "BINARY" => Some(Self::Binary),
538            "NOCASE" => Some(Self::NoCase),
539            "RTRIM" => Some(Self::Rtrim),
540            _ => None,
541        }
542    }
543
544    pub fn cmp_text(self, a: &str, b: &str) -> std::cmp::Ordering {
545        match self {
546            Collation::Binary => a.cmp(b),
547            Collation::NoCase => Iterator::cmp(
548                a.chars().map(|c| c.to_ascii_lowercase()),
549                b.chars().map(|c| c.to_ascii_lowercase()),
550            ),
551            Collation::Rtrim => {
552                let la = a.trim_end_matches(' ');
553                let lb = b.trim_end_matches(' ');
554                la.cmp(lb)
555            }
556        }
557    }
558
559    pub fn eq_text(self, a: &str, b: &str) -> bool {
560        match self {
561            Collation::Binary => a == b,
562            Collation::NoCase => a.eq_ignore_ascii_case(b),
563            Collation::Rtrim => a.trim_end_matches(' ') == b.trim_end_matches(' '),
564        }
565    }
566}
567
568/// Column definition.
569#[derive(Debug, Clone)]
570pub struct ColumnDef {
571    pub name: String,
572    pub data_type: DataType,
573    pub nullable: bool,
574    pub position: u16,
575    pub default_expr: Option<Expr>,
576    pub default_sql: Option<String>,
577    pub check_expr: Option<Expr>,
578    pub check_sql: Option<String>,
579    pub check_name: Option<String>,
580    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
581    pub is_with_timezone: bool,
582    pub generated_expr: Option<Expr>,
583    pub generated_sql: Option<String>,
584    pub generated_kind: Option<crate::parser::GeneratedKind>,
585    pub collation: Collation,
586}
587
588/// Index definition stored as part of the table schema.
589#[derive(Debug, Clone)]
590pub struct IndexDef {
591    pub name: String,
592    pub columns: Vec<u16>,
593    pub unique: bool,
594    pub predicate_sql: Option<String>,
595    pub predicate_expr: Option<crate::parser::Expr>,
596    pub collations: Vec<Collation>,
597}
598
599/// View definition stored in the _views metadata table.
600#[derive(Debug, Clone)]
601pub struct ViewDef {
602    pub name: String,
603    pub sql: String,
604    pub column_aliases: Vec<String>,
605}
606
607const VIEW_DEF_VERSION: u8 = 1;
608
609impl ViewDef {
610    pub fn serialize(&self) -> Vec<u8> {
611        let mut buf = Vec::new();
612        buf.push(VIEW_DEF_VERSION);
613
614        let name_bytes = self.name.as_bytes();
615        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
616        buf.extend_from_slice(name_bytes);
617
618        let sql_bytes = self.sql.as_bytes();
619        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
620        buf.extend_from_slice(sql_bytes);
621
622        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
623        for alias in &self.column_aliases {
624            let alias_bytes = alias.as_bytes();
625            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
626            buf.extend_from_slice(alias_bytes);
627        }
628
629        buf
630    }
631
632    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
633        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
634            return Err(crate::error::SqlError::InvalidValue(
635                "invalid view definition version".into(),
636            ));
637        }
638        let mut pos = 1;
639
640        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
641        pos += 2;
642        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
643        pos += name_len;
644
645        let sql_len =
646            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
647        pos += 4;
648        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
649        pos += sql_len;
650
651        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
652        pos += 2;
653        let mut column_aliases = Vec::with_capacity(alias_count);
654        for _ in 0..alias_count {
655            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
656            pos += 2;
657            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
658            pos += alias_len;
659            column_aliases.push(alias);
660        }
661
662        Ok(Self {
663            name,
664            sql,
665            column_aliases,
666        })
667    }
668}
669
670/// Table-level CHECK constraint stored in schema.
671#[derive(Debug, Clone)]
672pub struct TableCheckDef {
673    pub name: Option<String>,
674    pub expr: Expr,
675    pub sql: String,
676}
677
678/// Foreign key definition stored in schema.
679#[derive(Debug, Clone)]
680pub struct ForeignKeySchemaEntry {
681    pub name: Option<String>,
682    pub columns: Vec<u16>,
683    pub foreign_table: String,
684    pub referred_columns: Vec<String>,
685    pub on_delete: crate::parser::ReferentialAction,
686    pub on_update: crate::parser::ReferentialAction,
687}
688
689/// Table schema stored in the _schema table.
690#[derive(Debug, Clone)]
691pub struct TableSchema {
692    pub name: String,
693    pub columns: Vec<ColumnDef>,
694    pub primary_key_columns: Vec<u16>,
695    pub indices: Vec<IndexDef>,
696    pub check_constraints: Vec<TableCheckDef>,
697    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
698    pub flags: u8,
699    pk_idx_cache: Vec<usize>,
700    non_pk_idx_cache: Vec<usize>,
701    /// Sorted physical slots dropped via DROP COLUMN.
702    dropped_non_pk_slots: Vec<u16>,
703    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
704    decode_mapping_cache: Vec<usize>,
705    /// Logical non-PK order -> physical encoding position.
706    encoding_positions_cache: Vec<u16>,
707    has_virtual_columns_cache: bool,
708}
709
710impl TableSchema {
711    pub fn new(
712        name: String,
713        columns: Vec<ColumnDef>,
714        primary_key_columns: Vec<u16>,
715        indices: Vec<IndexDef>,
716        check_constraints: Vec<TableCheckDef>,
717        foreign_keys: Vec<ForeignKeySchemaEntry>,
718    ) -> Self {
719        Self::with_drops(
720            name,
721            columns,
722            primary_key_columns,
723            indices,
724            check_constraints,
725            foreign_keys,
726            vec![],
727        )
728    }
729
730    pub fn with_drops(
731        name: String,
732        columns: Vec<ColumnDef>,
733        primary_key_columns: Vec<u16>,
734        indices: Vec<IndexDef>,
735        check_constraints: Vec<TableCheckDef>,
736        foreign_keys: Vec<ForeignKeySchemaEntry>,
737        dropped_non_pk_slots: Vec<u16>,
738    ) -> Self {
739        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
740        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
741            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
742            .collect();
743
744        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
745        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
746        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
747
748        let mut drop_idx = 0;
749        let mut live_idx = 0;
750        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
751            if drop_idx < dropped_non_pk_slots.len()
752                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
753            {
754                drop_idx += 1;
755            } else {
756                *slot = non_pk_idx_cache[live_idx];
757                encoding_positions_cache.push(phys_pos as u16);
758                live_idx += 1;
759            }
760        }
761
762        let has_virtual_columns_cache = columns.iter().any(|c| {
763            matches!(
764                c.generated_kind,
765                Some(crate::parser::GeneratedKind::Virtual)
766            )
767        });
768
769        Self {
770            name,
771            columns,
772            primary_key_columns,
773            indices,
774            check_constraints,
775            foreign_keys,
776            flags: 0,
777            pk_idx_cache,
778            non_pk_idx_cache,
779            dropped_non_pk_slots,
780            decode_mapping_cache,
781            encoding_positions_cache,
782            has_virtual_columns_cache,
783        }
784    }
785
786    pub fn is_strict(&self) -> bool {
787        self.flags & TABLE_FLAG_STRICT != 0
788    }
789
790    pub fn has_virtual_columns(&self) -> bool {
791        self.has_virtual_columns_cache
792    }
793
794    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
795    pub fn rebuild(self) -> Self {
796        let drops = self.dropped_non_pk_slots;
797        Self::with_drops(
798            self.name,
799            self.columns,
800            self.primary_key_columns,
801            self.indices,
802            self.check_constraints,
803            self.foreign_keys,
804            drops,
805        )
806    }
807
808    /// Returns true if any column-level or table-level CHECK constraints exist.
809    pub fn has_checks(&self) -> bool {
810        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
811    }
812
813    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
814    pub fn decode_col_mapping(&self) -> &[usize] {
815        &self.decode_mapping_cache
816    }
817
818    /// Logical non-PK order -> physical encoding position.
819    pub fn encoding_positions(&self) -> &[u16] {
820        &self.encoding_positions_cache
821    }
822
823    /// Total physical non-PK column count (live + dropped slots).
824    pub fn physical_non_pk_count(&self) -> usize {
825        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
826    }
827
828    /// Physical encoding slots that have been dropped via DROP COLUMN.
829    pub fn dropped_non_pk_slots(&self) -> &[u16] {
830        &self.dropped_non_pk_slots
831    }
832
833    /// Return a new schema with the column at `drop_pos` marked as dropped.
834    pub fn without_column(&self, drop_pos: usize) -> Self {
835        let non_pk_order = self
836            .non_pk_idx_cache
837            .iter()
838            .position(|&i| i == drop_pos)
839            .expect("cannot drop PK column via without_column");
840        let physical_slot = self.encoding_positions_cache[non_pk_order];
841
842        let mut new_dropped = self.dropped_non_pk_slots.clone();
843        new_dropped.push(physical_slot);
844        new_dropped.sort();
845
846        let dropped_name = &self.columns[drop_pos].name;
847        let drop_pos_u16 = drop_pos as u16;
848
849        let mut columns: Vec<ColumnDef> = self
850            .columns
851            .iter()
852            .enumerate()
853            .filter(|(i, _)| *i != drop_pos)
854            .map(|(_, c)| {
855                let mut col = c.clone();
856                if col.position > drop_pos_u16 {
857                    col.position -= 1;
858                }
859                col
860            })
861            .collect();
862        for (i, col) in columns.iter_mut().enumerate() {
863            col.position = i as u16;
864        }
865
866        let primary_key_columns: Vec<u16> = self
867            .primary_key_columns
868            .iter()
869            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
870            .collect();
871
872        let indices: Vec<IndexDef> = self
873            .indices
874            .iter()
875            .map(|idx| IndexDef {
876                name: idx.name.clone(),
877                columns: idx
878                    .columns
879                    .iter()
880                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
881                    .collect(),
882                unique: idx.unique,
883                predicate_sql: idx.predicate_sql.clone(),
884                predicate_expr: idx.predicate_expr.clone(),
885                collations: idx.collations.clone(),
886            })
887            .collect();
888
889        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
890            .foreign_keys
891            .iter()
892            .map(|fk| ForeignKeySchemaEntry {
893                name: fk.name.clone(),
894                columns: fk
895                    .columns
896                    .iter()
897                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
898                    .collect(),
899                foreign_table: fk.foreign_table.clone(),
900                referred_columns: fk.referred_columns.clone(),
901                on_delete: fk.on_delete,
902                on_update: fk.on_update,
903            })
904            .collect();
905
906        // Filter out table-level CHECKs that reference the dropped column
907        let dropped_lower = dropped_name.to_ascii_lowercase();
908        let check_constraints: Vec<TableCheckDef> = self
909            .check_constraints
910            .iter()
911            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
912            .cloned()
913            .collect();
914
915        Self::with_drops(
916            self.name.clone(),
917            columns,
918            primary_key_columns,
919            indices,
920            check_constraints,
921            foreign_keys,
922            new_dropped,
923        )
924    }
925}
926
927const SCHEMA_VERSION: u8 = 7;
928pub const TABLE_FLAG_STRICT: u8 = 0b0000_0001;
929
930fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
931    match s {
932        Some(s) => {
933            let bytes = s.as_bytes();
934            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
935            buf.extend_from_slice(bytes);
936        }
937        None => buf.extend_from_slice(&0u16.to_le_bytes()),
938    }
939}
940
941fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
942    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
943    *pos += 2;
944    if len == 0 {
945        None
946    } else {
947        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
948        *pos += len;
949        Some(s)
950    }
951}
952
953fn read_string(data: &[u8], pos: &mut usize) -> String {
954    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
955    *pos += 2;
956    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
957    *pos += len;
958    s
959}
960
961impl TableSchema {
962    pub fn serialize(&self) -> Vec<u8> {
963        let mut buf = Vec::new();
964        buf.push(SCHEMA_VERSION);
965
966        let name_bytes = self.name.as_bytes();
967        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
968        buf.extend_from_slice(name_bytes);
969
970        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
971
972        for col in &self.columns {
973            let col_name = col.name.as_bytes();
974            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
975            buf.extend_from_slice(col_name);
976            buf.push(col.data_type.type_tag());
977            buf.push(if col.nullable { 1 } else { 0 });
978            buf.extend_from_slice(&col.position.to_le_bytes());
979        }
980
981        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
982        for &pk_idx in &self.primary_key_columns {
983            buf.extend_from_slice(&pk_idx.to_le_bytes());
984        }
985
986        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
987        for idx in &self.indices {
988            let idx_name = idx.name.as_bytes();
989            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
990            buf.extend_from_slice(idx_name);
991            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
992            for &col_idx in &idx.columns {
993                buf.extend_from_slice(&col_idx.to_le_bytes());
994            }
995            buf.push(if idx.unique { 1 } else { 0 });
996        }
997
998        for col in &self.columns {
999            let mut flags: u8 = 0;
1000            if col.default_sql.is_some() {
1001                flags |= 1;
1002            }
1003            if col.check_sql.is_some() {
1004                flags |= 2;
1005            }
1006            buf.push(flags);
1007            if let Some(ref sql) = col.default_sql {
1008                let bytes = sql.as_bytes();
1009                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1010                buf.extend_from_slice(bytes);
1011            }
1012            if let Some(ref sql) = col.check_sql {
1013                let bytes = sql.as_bytes();
1014                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
1015                buf.extend_from_slice(bytes);
1016                write_opt_string(&mut buf, &col.check_name);
1017            }
1018        }
1019
1020        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
1021        for chk in &self.check_constraints {
1022            write_opt_string(&mut buf, &chk.name);
1023            let sql_bytes = chk.sql.as_bytes();
1024            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
1025            buf.extend_from_slice(sql_bytes);
1026        }
1027
1028        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
1029        for fk in &self.foreign_keys {
1030            write_opt_string(&mut buf, &fk.name);
1031            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
1032            for &col_idx in &fk.columns {
1033                buf.extend_from_slice(&col_idx.to_le_bytes());
1034            }
1035            let ft_bytes = fk.foreign_table.as_bytes();
1036            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
1037            buf.extend_from_slice(ft_bytes);
1038            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
1039            for rc in &fk.referred_columns {
1040                let rc_bytes = rc.as_bytes();
1041                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
1042                buf.extend_from_slice(rc_bytes);
1043            }
1044        }
1045
1046        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
1047        for &slot in &self.dropped_non_pk_slots {
1048            buf.extend_from_slice(&slot.to_le_bytes());
1049        }
1050
1051        for col in &self.columns {
1052            let kind_tag: u8 = match col.generated_kind {
1053                None => 0,
1054                Some(crate::parser::GeneratedKind::Stored) => 1,
1055                Some(crate::parser::GeneratedKind::Virtual) => 2,
1056            };
1057            buf.push(kind_tag);
1058            if kind_tag != 0 {
1059                let sql = col.generated_sql.as_deref().unwrap_or("");
1060                let bytes = sql.as_bytes();
1061                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1062                buf.extend_from_slice(bytes);
1063            }
1064        }
1065
1066        for idx in &self.indices {
1067            match &idx.predicate_sql {
1068                Some(sql) => {
1069                    buf.push(1);
1070                    let bytes = sql.as_bytes();
1071                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
1072                    buf.extend_from_slice(bytes);
1073                }
1074                None => buf.push(0),
1075            }
1076        }
1077
1078        for fk in &self.foreign_keys {
1079            buf.push(fk.on_delete as u8);
1080            buf.push(fk.on_update as u8);
1081        }
1082
1083        for col in &self.columns {
1084            buf.push(col.collation as u8);
1085        }
1086        for idx in &self.indices {
1087            let n = idx.collations.len() as u16;
1088            buf.extend_from_slice(&n.to_le_bytes());
1089            for c in &idx.collations {
1090                buf.push(*c as u8);
1091            }
1092        }
1093        buf.push(self.flags);
1094
1095        buf
1096    }
1097
1098    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
1099        let mut pos = 0;
1100
1101        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | 4 | 5 | 6 | SCHEMA_VERSION) {
1102            return Err(crate::error::SqlError::InvalidValue(
1103                "invalid schema version".into(),
1104            ));
1105        }
1106        let version = data[0];
1107        pos += 1;
1108
1109        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1110        pos += 2;
1111        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
1112        pos += name_len;
1113
1114        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1115        pos += 2;
1116
1117        let mut columns = Vec::with_capacity(col_count);
1118        for _ in 0..col_count {
1119            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1120            pos += 2;
1121            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
1122            pos += col_name_len;
1123            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
1124                crate::error::SqlError::InvalidValue("unknown data type tag".into())
1125            })?;
1126            pos += 1;
1127            let nullable = data[pos] != 0;
1128            pos += 1;
1129            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
1130            pos += 2;
1131            columns.push(ColumnDef {
1132                name: col_name,
1133                data_type,
1134                nullable,
1135                position,
1136                default_expr: None,
1137                default_sql: None,
1138                check_expr: None,
1139                check_sql: None,
1140                check_name: None,
1141                is_with_timezone: false,
1142                generated_expr: None,
1143                generated_sql: None,
1144                generated_kind: None,
1145                collation: Collation::Binary,
1146            });
1147        }
1148
1149        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1150        pos += 2;
1151        let mut primary_key_columns = Vec::with_capacity(pk_count);
1152        for _ in 0..pk_count {
1153            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1154            pos += 2;
1155            primary_key_columns.push(pk_idx);
1156        }
1157
1158        let indices = if version >= 2 && pos + 2 <= data.len() {
1159            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1160            pos += 2;
1161            let mut idxs = Vec::with_capacity(idx_count);
1162            for _ in 0..idx_count {
1163                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1164                pos += 2;
1165                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1166                pos += idx_name_len;
1167                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1168                pos += 2;
1169                let mut cols = Vec::with_capacity(col_count);
1170                for _ in 0..col_count {
1171                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1172                    pos += 2;
1173                    cols.push(col_idx);
1174                }
1175                let unique = data[pos] != 0;
1176                pos += 1;
1177                idxs.push(IndexDef {
1178                    name: idx_name,
1179                    columns: cols,
1180                    unique,
1181                    predicate_sql: None,
1182                    predicate_expr: None,
1183                    collations: vec![],
1184                });
1185            }
1186            idxs
1187        } else {
1188            vec![]
1189        };
1190
1191        let mut check_constraints = Vec::new();
1192        let mut foreign_keys = Vec::new();
1193
1194        if version >= 3 && pos < data.len() {
1195            for col in &mut columns {
1196                let flags = data[pos];
1197                pos += 1;
1198                if flags & 1 != 0 {
1199                    let sql = read_string(data, &mut pos);
1200                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1201                        crate::error::SqlError::InvalidValue(format!(
1202                            "cannot parse DEFAULT expression: {sql}"
1203                        ))
1204                    })?);
1205                    col.default_sql = Some(sql);
1206                }
1207                if flags & 2 != 0 {
1208                    let sql = read_string(data, &mut pos);
1209                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1210                        crate::error::SqlError::InvalidValue(format!(
1211                            "cannot parse CHECK expression: {sql}"
1212                        ))
1213                    })?);
1214                    col.check_sql = Some(sql);
1215                    col.check_name = read_opt_string(data, &mut pos);
1216                }
1217            }
1218
1219            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1220            pos += 2;
1221            for _ in 0..chk_count {
1222                let name = read_opt_string(data, &mut pos);
1223                let sql = read_string(data, &mut pos);
1224                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1225                    crate::error::SqlError::InvalidValue(format!(
1226                        "cannot parse CHECK expression: {sql}"
1227                    ))
1228                })?;
1229                check_constraints.push(TableCheckDef { name, expr, sql });
1230            }
1231
1232            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1233            pos += 2;
1234            for _ in 0..fk_count {
1235                let name = read_opt_string(data, &mut pos);
1236                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1237                pos += 2;
1238                let mut cols = Vec::with_capacity(col_count);
1239                for _ in 0..col_count {
1240                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1241                    pos += 2;
1242                    cols.push(col_idx);
1243                }
1244                let foreign_table = read_string(data, &mut pos);
1245                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1246                pos += 2;
1247                let mut referred_columns = Vec::with_capacity(ref_count);
1248                for _ in 0..ref_count {
1249                    referred_columns.push(read_string(data, &mut pos));
1250                }
1251                foreign_keys.push(ForeignKeySchemaEntry {
1252                    name,
1253                    columns: cols,
1254                    foreign_table,
1255                    referred_columns,
1256                    on_delete: crate::parser::ReferentialAction::NoAction,
1257                    on_update: crate::parser::ReferentialAction::NoAction,
1258                });
1259            }
1260        }
1261        let mut dropped_non_pk_slots = Vec::new();
1262        if version >= 4 && pos + 2 <= data.len() {
1263            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1264            pos += 2;
1265            for _ in 0..slot_count {
1266                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1267                pos += 2;
1268                dropped_non_pk_slots.push(slot);
1269            }
1270        }
1271        if version >= 5 && pos < data.len() {
1272            for col in &mut columns {
1273                let kind_tag = data[pos];
1274                pos += 1;
1275                if kind_tag != 0 {
1276                    let len = u32::from_le_bytes([
1277                        data[pos],
1278                        data[pos + 1],
1279                        data[pos + 2],
1280                        data[pos + 3],
1281                    ]) as usize;
1282                    pos += 4;
1283                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1284                    pos += len;
1285                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1286                        crate::error::SqlError::InvalidValue(format!(
1287                            "cannot parse GENERATED expression: {sql}"
1288                        ))
1289                    })?;
1290                    col.generated_sql = Some(sql);
1291                    col.generated_expr = Some(expr);
1292                    col.generated_kind = Some(match kind_tag {
1293                        1 => crate::parser::GeneratedKind::Stored,
1294                        2 => crate::parser::GeneratedKind::Virtual,
1295                        _ => {
1296                            return Err(crate::error::SqlError::InvalidValue(
1297                                "unknown GENERATED kind tag".into(),
1298                            ));
1299                        }
1300                    });
1301                }
1302            }
1303        }
1304        let mut indices = indices;
1305        if version >= 6 && pos < data.len() {
1306            for idx in &mut indices {
1307                let flag = data[pos];
1308                pos += 1;
1309                if flag == 1 {
1310                    let len = u32::from_le_bytes([
1311                        data[pos],
1312                        data[pos + 1],
1313                        data[pos + 2],
1314                        data[pos + 3],
1315                    ]) as usize;
1316                    pos += 4;
1317                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1318                    pos += len;
1319                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1320                        crate::error::SqlError::InvalidValue(format!(
1321                            "cannot parse partial-index predicate: {sql}"
1322                        ))
1323                    })?;
1324                    idx.predicate_sql = Some(sql);
1325                    idx.predicate_expr = Some(expr);
1326                }
1327            }
1328            for fk in &mut foreign_keys {
1329                fk.on_delete =
1330                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1331                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
1332                    })?;
1333                pos += 1;
1334                fk.on_update =
1335                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1336                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
1337                    })?;
1338                pos += 1;
1339            }
1340        }
1341
1342        let mut columns = columns;
1343        let mut indices = indices;
1344        let mut flags: u8 = 0;
1345        if version >= 7 && pos < data.len() {
1346            for col in &mut columns {
1347                col.collation = Collation::from_tag(data[pos]).ok_or_else(|| {
1348                    crate::error::SqlError::InvalidValue("unknown collation tag".into())
1349                })?;
1350                pos += 1;
1351            }
1352            for idx in &mut indices {
1353                let n = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1354                pos += 2;
1355                let mut colls = Vec::with_capacity(n);
1356                for _ in 0..n {
1357                    colls.push(Collation::from_tag(data[pos]).ok_or_else(|| {
1358                        crate::error::SqlError::InvalidValue("unknown collation tag".into())
1359                    })?);
1360                    pos += 1;
1361                }
1362                idx.collations = colls;
1363            }
1364            if pos < data.len() {
1365                flags = data[pos];
1366                pos += 1;
1367            }
1368        }
1369        let _ = pos;
1370
1371        let mut schema = Self::with_drops(
1372            name,
1373            columns,
1374            primary_key_columns,
1375            indices,
1376            check_constraints,
1377            foreign_keys,
1378            dropped_non_pk_slots,
1379        );
1380        schema.flags = flags;
1381        Ok(schema)
1382    }
1383
1384    /// Get column index by name (case-insensitive).
1385    pub fn column_index(&self, name: &str) -> Option<usize> {
1386        self.columns
1387            .iter()
1388            .position(|c| c.name.eq_ignore_ascii_case(name))
1389    }
1390
1391    /// Get indices of non-PK columns (columns stored in the B+ tree value).
1392    pub fn non_pk_indices(&self) -> &[usize] {
1393        &self.non_pk_idx_cache
1394    }
1395
1396    /// Get the PK column indices as usize.
1397    pub fn pk_indices(&self) -> &[usize] {
1398        &self.pk_idx_cache
1399    }
1400
1401    /// Get index definition by name (case-insensitive).
1402    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
1403        let lower = name.to_ascii_lowercase();
1404        self.indices.iter().find(|i| i.name == lower)
1405    }
1406
1407    /// Get the KV table name for an index.
1408    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
1409        format!("__idx_{table_name}_{index_name}").into_bytes()
1410    }
1411}
1412
1413/// Result of executing a SQL statement.
1414#[derive(Debug)]
1415pub enum ExecutionResult {
1416    RowsAffected(u64),
1417    Query(QueryResult),
1418    Ok,
1419}
1420
1421/// Result of a SELECT query.
1422#[derive(Debug, Clone)]
1423pub struct QueryResult {
1424    pub columns: Vec<String>,
1425    pub rows: Vec<Vec<Value>>,
1426}
1427
1428#[cfg(test)]
1429#[path = "types_tests.rs"]
1430mod tests;