Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4
5pub use compact_str::CompactString;
6
7use crate::parser::Expr;
8
9/// SQL data types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22}
23
24impl DataType {
25    pub fn type_tag(self) -> u8 {
26        match self {
27            DataType::Null => 0,
28            DataType::Blob => 1,
29            DataType::Text => 2,
30            DataType::Boolean => 3,
31            DataType::Integer => 4,
32            DataType::Real => 5,
33            DataType::Time => 6,
34            DataType::Date => 7,
35            DataType::Timestamp => 8,
36            DataType::Interval => 9,
37        }
38    }
39
40    pub fn from_tag(tag: u8) -> Option<Self> {
41        match tag {
42            0 => Some(DataType::Null),
43            1 => Some(DataType::Blob),
44            2 => Some(DataType::Text),
45            3 => Some(DataType::Boolean),
46            4 => Some(DataType::Integer),
47            5 => Some(DataType::Real),
48            6 => Some(DataType::Time),
49            7 => Some(DataType::Date),
50            8 => Some(DataType::Timestamp),
51            9 => Some(DataType::Interval),
52            _ => None,
53        }
54    }
55}
56
57impl fmt::Display for DataType {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        match self {
60            DataType::Null => write!(f, "NULL"),
61            DataType::Integer => write!(f, "INTEGER"),
62            DataType::Real => write!(f, "REAL"),
63            DataType::Text => write!(f, "TEXT"),
64            DataType::Blob => write!(f, "BLOB"),
65            DataType::Boolean => write!(f, "BOOLEAN"),
66            DataType::Time => write!(f, "TIME"),
67            DataType::Date => write!(f, "DATE"),
68            DataType::Timestamp => write!(f, "TIMESTAMP"),
69            DataType::Interval => write!(f, "INTERVAL"),
70        }
71    }
72}
73
74/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
75/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
76#[derive(Debug, Clone, Default)]
77pub enum Value {
78    #[default]
79    Null,
80    Integer(i64),
81    Real(f64),
82    Text(CompactString),
83    Blob(Vec<u8>),
84    Boolean(bool),
85    Time(i64),
86    Date(i32),
87    Timestamp(i64),
88    Interval {
89        months: i32,
90        days: i32,
91        micros: i64,
92    },
93}
94
95impl Value {
96    pub fn data_type(&self) -> DataType {
97        match self {
98            Value::Null => DataType::Null,
99            Value::Integer(_) => DataType::Integer,
100            Value::Real(_) => DataType::Real,
101            Value::Text(_) => DataType::Text,
102            Value::Blob(_) => DataType::Blob,
103            Value::Boolean(_) => DataType::Boolean,
104            Value::Time(_) => DataType::Time,
105            Value::Date(_) => DataType::Date,
106            Value::Timestamp(_) => DataType::Timestamp,
107            Value::Interval { .. } => DataType::Interval,
108        }
109    }
110
111    pub fn is_null(&self) -> bool {
112        matches!(self, Value::Null)
113    }
114
115    /// Returns true for `±infinity` sentinel values on DATE / TIMESTAMP; false otherwise.
116    pub fn is_finite_temporal(&self) -> bool {
117        match self {
118            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
119            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
120            _ => true,
121        }
122    }
123
124    /// Attempt to coerce this value to the target type.
125    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
126        match (self, target) {
127            (_, DataType::Null) => Some(Value::Null),
128            (Value::Null, _) => Some(Value::Null),
129            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
130            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
131            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
132            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
133            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
134            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
135            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
136            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
137            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
138            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
139            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
140            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
141            (
142                Value::Interval {
143                    months,
144                    days,
145                    micros,
146                },
147                DataType::Interval,
148            ) => Some(Value::Interval {
149                months: *months,
150                days: *days,
151                micros: *micros,
152            }),
153            _ => None,
154        }
155    }
156
157    pub fn coerce_into(self, target: DataType) -> Option<Value> {
158        if self.is_null() || target == DataType::Null {
159            return Some(Value::Null);
160        }
161        if self.data_type() == target {
162            return Some(self);
163        }
164        match (self, target) {
165            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
166            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
167            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
168            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
169            (Value::Text(s), DataType::Date) => {
170                crate::datetime::parse_date(&s).ok().map(Value::Date)
171            }
172            (Value::Text(s), DataType::Time) => {
173                crate::datetime::parse_time(&s).ok().map(Value::Time)
174            }
175            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
176                .ok()
177                .map(Value::Timestamp),
178            (Value::Text(s), DataType::Interval) => {
179                crate::datetime::parse_interval(&s)
180                    .ok()
181                    .map(|(m, d, u)| Value::Interval {
182                        months: m,
183                        days: d,
184                        micros: u,
185                    })
186            }
187            // INTEGER → TIMESTAMP: Unix epoch seconds.
188            (Value::Integer(n), DataType::Timestamp) => {
189                n.checked_mul(1_000_000).map(Value::Timestamp)
190            }
191            (Value::Integer(n), DataType::Date) => {
192                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
193                    Some(Value::Date(n as i32))
194                } else {
195                    None
196                }
197            }
198            (Value::Integer(n), DataType::Time) => {
199                if (0..=86_400_000_000).contains(&n) {
200                    Some(Value::Time(n))
201                } else {
202                    None
203                }
204            }
205            (Value::Integer(n), DataType::Interval) => {
206                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
207                    Some(Value::Interval {
208                        months: 0,
209                        days: n as i32,
210                        micros: 0,
211                    })
212                } else {
213                    None
214                }
215            }
216            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
217            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
218            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
219            (Value::Date(d), DataType::Timestamp) => {
220                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
221            }
222            (Value::Timestamp(t), DataType::Date) => {
223                // div_euclid floors correctly for negative µs (pre-1970).
224                let days = t.div_euclid(86_400_000_000);
225                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
226                    Some(Value::Date(days as i32))
227                } else {
228                    None
229                }
230            }
231            (v, DataType::Text)
232                if matches!(
233                    v.data_type(),
234                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
235                ) =>
236            {
237                Some(Value::Text(v.to_string().into()))
238            }
239            _ => None,
240        }
241    }
242
243    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
244    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
245        match (self, other) {
246            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
247            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
248            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
249            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
250            _ => None,
251        }
252    }
253}
254
255impl PartialEq for Value {
256    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
257    // normalizes separately (see eval.rs).
258    fn eq(&self, other: &Self) -> bool {
259        match (self, other) {
260            (Value::Null, Value::Null) => true,
261            (Value::Integer(a), Value::Integer(b)) => a == b,
262            (Value::Real(a), Value::Real(b)) => a == b,
263            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
264            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
265            (Value::Text(a), Value::Text(b)) => a == b,
266            (Value::Blob(a), Value::Blob(b)) => a == b,
267            (Value::Boolean(a), Value::Boolean(b)) => a == b,
268            (Value::Time(a), Value::Time(b)) => a == b,
269            (Value::Date(a), Value::Date(b)) => a == b,
270            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
271            (
272                Value::Interval {
273                    months: am,
274                    days: ad,
275                    micros: au,
276                },
277                Value::Interval {
278                    months: bm,
279                    days: bd,
280                    micros: bu,
281                },
282            ) => am == bm && ad == bd && au == bu,
283            _ => false,
284        }
285    }
286}
287
288impl Eq for Value {}
289
290impl Hash for Value {
291    fn hash<H: Hasher>(&self, state: &mut H) {
292        match self {
293            Value::Null => 0u8.hash(state),
294            Value::Integer(i) => {
295                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
296                // matching the cross-type PartialEq contract.
297                1u8.hash(state);
298                (*i as f64).to_bits().hash(state);
299            }
300            Value::Real(r) => {
301                1u8.hash(state);
302                r.to_bits().hash(state);
303            }
304            Value::Text(s) => {
305                2u8.hash(state);
306                s.hash(state);
307            }
308            Value::Blob(b) => {
309                3u8.hash(state);
310                b.hash(state);
311            }
312            Value::Boolean(b) => {
313                4u8.hash(state);
314                b.hash(state);
315            }
316            Value::Time(t) => {
317                5u8.hash(state);
318                t.hash(state);
319            }
320            Value::Date(d) => {
321                6u8.hash(state);
322                d.hash(state);
323            }
324            Value::Timestamp(t) => {
325                7u8.hash(state);
326                t.hash(state);
327            }
328            Value::Interval {
329                months,
330                days,
331                micros,
332            } => {
333                8u8.hash(state);
334                months.hash(state);
335                days.hash(state);
336                micros.hash(state);
337            }
338        }
339    }
340}
341
342impl PartialOrd for Value {
343    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
344        Some(self.cmp(other))
345    }
346}
347
348impl Ord for Value {
349    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
350    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
351    fn cmp(&self, other: &Self) -> Ordering {
352        match (self, other) {
353            (Value::Null, Value::Null) => Ordering::Equal,
354            (Value::Null, _) => Ordering::Less,
355            (_, Value::Null) => Ordering::Greater,
356
357            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
358            (Value::Boolean(_), _) => Ordering::Less,
359            (_, Value::Boolean(_)) => Ordering::Greater,
360
361            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
362                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
363            }
364            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
365            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
366
367            (Value::Time(a), Value::Time(b)) => a.cmp(b),
368            (Value::Time(_), _) => Ordering::Less,
369            (_, Value::Time(_)) => Ordering::Greater,
370
371            (Value::Date(a), Value::Date(b)) => a.cmp(b),
372            (Value::Date(_), _) => Ordering::Less,
373            (_, Value::Date(_)) => Ordering::Greater,
374
375            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
376            (Value::Timestamp(_), _) => Ordering::Less,
377            (_, Value::Timestamp(_)) => Ordering::Greater,
378
379            (
380                Value::Interval {
381                    months: am,
382                    days: ad,
383                    micros: au,
384                },
385                Value::Interval {
386                    months: bm,
387                    days: bd,
388                    micros: bu,
389                },
390            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
391            (Value::Interval { .. }, _) => Ordering::Less,
392            (_, Value::Interval { .. }) => Ordering::Greater,
393
394            (Value::Text(a), Value::Text(b)) => a.cmp(b),
395            (Value::Text(_), _) => Ordering::Less,
396            (_, Value::Text(_)) => Ordering::Greater,
397
398            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
399        }
400    }
401}
402
403impl fmt::Display for Value {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        match self {
406            Value::Null => write!(f, "NULL"),
407            Value::Integer(i) => write!(f, "{i}"),
408            Value::Real(r) => {
409                if r.fract() == 0.0 && r.is_finite() {
410                    write!(f, "{r:.1}")
411                } else {
412                    write!(f, "{r}")
413                }
414            }
415            Value::Text(s) => write!(f, "{s}"),
416            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
417            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
418            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
419            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
420            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
421            Value::Interval {
422                months,
423                days,
424                micros,
425            } => {
426                write!(
427                    f,
428                    "{}",
429                    crate::datetime::format_interval(*months, *days, *micros)
430                )
431            }
432        }
433    }
434}
435
436fn hex_encode(data: &[u8]) -> String {
437    let mut s = String::with_capacity(data.len() * 2);
438    for byte in data {
439        s.push_str(&format!("{byte:02X}"));
440    }
441    s
442}
443
444/// Column definition.
445#[derive(Debug, Clone)]
446pub struct ColumnDef {
447    pub name: String,
448    pub data_type: DataType,
449    pub nullable: bool,
450    pub position: u16,
451    pub default_expr: Option<Expr>,
452    pub default_sql: Option<String>,
453    pub check_expr: Option<Expr>,
454    pub check_sql: Option<String>,
455    pub check_name: Option<String>,
456    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
457    pub is_with_timezone: bool,
458    pub generated_expr: Option<Expr>,
459    pub generated_sql: Option<String>,
460    pub generated_kind: Option<crate::parser::GeneratedKind>,
461}
462
463/// Index definition stored as part of the table schema.
464#[derive(Debug, Clone)]
465pub struct IndexDef {
466    pub name: String,
467    pub columns: Vec<u16>,
468    pub unique: bool,
469    pub predicate_sql: Option<String>,
470    pub predicate_expr: Option<crate::parser::Expr>,
471}
472
473/// View definition stored in the _views metadata table.
474#[derive(Debug, Clone)]
475pub struct ViewDef {
476    pub name: String,
477    pub sql: String,
478    pub column_aliases: Vec<String>,
479}
480
481const VIEW_DEF_VERSION: u8 = 1;
482
483impl ViewDef {
484    pub fn serialize(&self) -> Vec<u8> {
485        let mut buf = Vec::new();
486        buf.push(VIEW_DEF_VERSION);
487
488        let name_bytes = self.name.as_bytes();
489        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
490        buf.extend_from_slice(name_bytes);
491
492        let sql_bytes = self.sql.as_bytes();
493        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
494        buf.extend_from_slice(sql_bytes);
495
496        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
497        for alias in &self.column_aliases {
498            let alias_bytes = alias.as_bytes();
499            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
500            buf.extend_from_slice(alias_bytes);
501        }
502
503        buf
504    }
505
506    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
507        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
508            return Err(crate::error::SqlError::InvalidValue(
509                "invalid view definition version".into(),
510            ));
511        }
512        let mut pos = 1;
513
514        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
515        pos += 2;
516        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
517        pos += name_len;
518
519        let sql_len =
520            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
521        pos += 4;
522        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
523        pos += sql_len;
524
525        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
526        pos += 2;
527        let mut column_aliases = Vec::with_capacity(alias_count);
528        for _ in 0..alias_count {
529            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
530            pos += 2;
531            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
532            pos += alias_len;
533            column_aliases.push(alias);
534        }
535
536        Ok(Self {
537            name,
538            sql,
539            column_aliases,
540        })
541    }
542}
543
544/// Table-level CHECK constraint stored in schema.
545#[derive(Debug, Clone)]
546pub struct TableCheckDef {
547    pub name: Option<String>,
548    pub expr: Expr,
549    pub sql: String,
550}
551
552/// Foreign key definition stored in schema.
553#[derive(Debug, Clone)]
554pub struct ForeignKeySchemaEntry {
555    pub name: Option<String>,
556    pub columns: Vec<u16>,
557    pub foreign_table: String,
558    pub referred_columns: Vec<String>,
559    pub on_delete: crate::parser::ReferentialAction,
560    pub on_update: crate::parser::ReferentialAction,
561}
562
563/// Table schema stored in the _schema table.
564#[derive(Debug, Clone)]
565pub struct TableSchema {
566    pub name: String,
567    pub columns: Vec<ColumnDef>,
568    pub primary_key_columns: Vec<u16>,
569    pub indices: Vec<IndexDef>,
570    pub check_constraints: Vec<TableCheckDef>,
571    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
572    pk_idx_cache: Vec<usize>,
573    non_pk_idx_cache: Vec<usize>,
574    /// Sorted physical slots dropped via DROP COLUMN.
575    dropped_non_pk_slots: Vec<u16>,
576    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
577    decode_mapping_cache: Vec<usize>,
578    /// Logical non-PK order -> physical encoding position.
579    encoding_positions_cache: Vec<u16>,
580    has_virtual_columns_cache: bool,
581}
582
583impl TableSchema {
584    pub fn new(
585        name: String,
586        columns: Vec<ColumnDef>,
587        primary_key_columns: Vec<u16>,
588        indices: Vec<IndexDef>,
589        check_constraints: Vec<TableCheckDef>,
590        foreign_keys: Vec<ForeignKeySchemaEntry>,
591    ) -> Self {
592        Self::with_drops(
593            name,
594            columns,
595            primary_key_columns,
596            indices,
597            check_constraints,
598            foreign_keys,
599            vec![],
600        )
601    }
602
603    pub fn with_drops(
604        name: String,
605        columns: Vec<ColumnDef>,
606        primary_key_columns: Vec<u16>,
607        indices: Vec<IndexDef>,
608        check_constraints: Vec<TableCheckDef>,
609        foreign_keys: Vec<ForeignKeySchemaEntry>,
610        dropped_non_pk_slots: Vec<u16>,
611    ) -> Self {
612        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
613        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
614            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
615            .collect();
616
617        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
618        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
619        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
620
621        let mut drop_idx = 0;
622        let mut live_idx = 0;
623        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
624            if drop_idx < dropped_non_pk_slots.len()
625                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
626            {
627                drop_idx += 1;
628            } else {
629                *slot = non_pk_idx_cache[live_idx];
630                encoding_positions_cache.push(phys_pos as u16);
631                live_idx += 1;
632            }
633        }
634
635        let has_virtual_columns_cache = columns.iter().any(|c| {
636            matches!(
637                c.generated_kind,
638                Some(crate::parser::GeneratedKind::Virtual)
639            )
640        });
641
642        Self {
643            name,
644            columns,
645            primary_key_columns,
646            indices,
647            check_constraints,
648            foreign_keys,
649            pk_idx_cache,
650            non_pk_idx_cache,
651            dropped_non_pk_slots,
652            decode_mapping_cache,
653            encoding_positions_cache,
654            has_virtual_columns_cache,
655        }
656    }
657
658    pub fn has_virtual_columns(&self) -> bool {
659        self.has_virtual_columns_cache
660    }
661
662    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
663    pub fn rebuild(self) -> Self {
664        let drops = self.dropped_non_pk_slots;
665        Self::with_drops(
666            self.name,
667            self.columns,
668            self.primary_key_columns,
669            self.indices,
670            self.check_constraints,
671            self.foreign_keys,
672            drops,
673        )
674    }
675
676    /// Returns true if any column-level or table-level CHECK constraints exist.
677    pub fn has_checks(&self) -> bool {
678        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
679    }
680
681    /// Physical position -> logical column index. `usize::MAX` for dropped slots.
682    pub fn decode_col_mapping(&self) -> &[usize] {
683        &self.decode_mapping_cache
684    }
685
686    /// Logical non-PK order -> physical encoding position.
687    pub fn encoding_positions(&self) -> &[u16] {
688        &self.encoding_positions_cache
689    }
690
691    /// Total physical non-PK column count (live + dropped slots).
692    pub fn physical_non_pk_count(&self) -> usize {
693        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
694    }
695
696    /// Physical encoding slots that have been dropped via DROP COLUMN.
697    pub fn dropped_non_pk_slots(&self) -> &[u16] {
698        &self.dropped_non_pk_slots
699    }
700
701    /// Return a new schema with the column at `drop_pos` marked as dropped.
702    pub fn without_column(&self, drop_pos: usize) -> Self {
703        let non_pk_order = self
704            .non_pk_idx_cache
705            .iter()
706            .position(|&i| i == drop_pos)
707            .expect("cannot drop PK column via without_column");
708        let physical_slot = self.encoding_positions_cache[non_pk_order];
709
710        let mut new_dropped = self.dropped_non_pk_slots.clone();
711        new_dropped.push(physical_slot);
712        new_dropped.sort();
713
714        let dropped_name = &self.columns[drop_pos].name;
715        let drop_pos_u16 = drop_pos as u16;
716
717        let mut columns: Vec<ColumnDef> = self
718            .columns
719            .iter()
720            .enumerate()
721            .filter(|(i, _)| *i != drop_pos)
722            .map(|(_, c)| {
723                let mut col = c.clone();
724                if col.position > drop_pos_u16 {
725                    col.position -= 1;
726                }
727                col
728            })
729            .collect();
730        for (i, col) in columns.iter_mut().enumerate() {
731            col.position = i as u16;
732        }
733
734        let primary_key_columns: Vec<u16> = self
735            .primary_key_columns
736            .iter()
737            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
738            .collect();
739
740        let indices: Vec<IndexDef> = self
741            .indices
742            .iter()
743            .map(|idx| IndexDef {
744                name: idx.name.clone(),
745                columns: idx
746                    .columns
747                    .iter()
748                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
749                    .collect(),
750                unique: idx.unique,
751                predicate_sql: idx.predicate_sql.clone(),
752                predicate_expr: idx.predicate_expr.clone(),
753            })
754            .collect();
755
756        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
757            .foreign_keys
758            .iter()
759            .map(|fk| ForeignKeySchemaEntry {
760                name: fk.name.clone(),
761                columns: fk
762                    .columns
763                    .iter()
764                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
765                    .collect(),
766                foreign_table: fk.foreign_table.clone(),
767                referred_columns: fk.referred_columns.clone(),
768                on_delete: fk.on_delete,
769                on_update: fk.on_update,
770            })
771            .collect();
772
773        // Filter out table-level CHECKs that reference the dropped column
774        let dropped_lower = dropped_name.to_ascii_lowercase();
775        let check_constraints: Vec<TableCheckDef> = self
776            .check_constraints
777            .iter()
778            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
779            .cloned()
780            .collect();
781
782        Self::with_drops(
783            self.name.clone(),
784            columns,
785            primary_key_columns,
786            indices,
787            check_constraints,
788            foreign_keys,
789            new_dropped,
790        )
791    }
792}
793
794const SCHEMA_VERSION: u8 = 6;
795
796fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
797    match s {
798        Some(s) => {
799            let bytes = s.as_bytes();
800            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
801            buf.extend_from_slice(bytes);
802        }
803        None => buf.extend_from_slice(&0u16.to_le_bytes()),
804    }
805}
806
807fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
808    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
809    *pos += 2;
810    if len == 0 {
811        None
812    } else {
813        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
814        *pos += len;
815        Some(s)
816    }
817}
818
819fn read_string(data: &[u8], pos: &mut usize) -> String {
820    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
821    *pos += 2;
822    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
823    *pos += len;
824    s
825}
826
827impl TableSchema {
828    pub fn serialize(&self) -> Vec<u8> {
829        let mut buf = Vec::new();
830        buf.push(SCHEMA_VERSION);
831
832        let name_bytes = self.name.as_bytes();
833        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
834        buf.extend_from_slice(name_bytes);
835
836        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
837
838        for col in &self.columns {
839            let col_name = col.name.as_bytes();
840            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
841            buf.extend_from_slice(col_name);
842            buf.push(col.data_type.type_tag());
843            buf.push(if col.nullable { 1 } else { 0 });
844            buf.extend_from_slice(&col.position.to_le_bytes());
845        }
846
847        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
848        for &pk_idx in &self.primary_key_columns {
849            buf.extend_from_slice(&pk_idx.to_le_bytes());
850        }
851
852        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
853        for idx in &self.indices {
854            let idx_name = idx.name.as_bytes();
855            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
856            buf.extend_from_slice(idx_name);
857            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
858            for &col_idx in &idx.columns {
859                buf.extend_from_slice(&col_idx.to_le_bytes());
860            }
861            buf.push(if idx.unique { 1 } else { 0 });
862        }
863
864        for col in &self.columns {
865            let mut flags: u8 = 0;
866            if col.default_sql.is_some() {
867                flags |= 1;
868            }
869            if col.check_sql.is_some() {
870                flags |= 2;
871            }
872            buf.push(flags);
873            if let Some(ref sql) = col.default_sql {
874                let bytes = sql.as_bytes();
875                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
876                buf.extend_from_slice(bytes);
877            }
878            if let Some(ref sql) = col.check_sql {
879                let bytes = sql.as_bytes();
880                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
881                buf.extend_from_slice(bytes);
882                write_opt_string(&mut buf, &col.check_name);
883            }
884        }
885
886        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
887        for chk in &self.check_constraints {
888            write_opt_string(&mut buf, &chk.name);
889            let sql_bytes = chk.sql.as_bytes();
890            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
891            buf.extend_from_slice(sql_bytes);
892        }
893
894        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
895        for fk in &self.foreign_keys {
896            write_opt_string(&mut buf, &fk.name);
897            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
898            for &col_idx in &fk.columns {
899                buf.extend_from_slice(&col_idx.to_le_bytes());
900            }
901            let ft_bytes = fk.foreign_table.as_bytes();
902            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
903            buf.extend_from_slice(ft_bytes);
904            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
905            for rc in &fk.referred_columns {
906                let rc_bytes = rc.as_bytes();
907                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
908                buf.extend_from_slice(rc_bytes);
909            }
910        }
911
912        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
913        for &slot in &self.dropped_non_pk_slots {
914            buf.extend_from_slice(&slot.to_le_bytes());
915        }
916
917        for col in &self.columns {
918            let kind_tag: u8 = match col.generated_kind {
919                None => 0,
920                Some(crate::parser::GeneratedKind::Stored) => 1,
921                Some(crate::parser::GeneratedKind::Virtual) => 2,
922            };
923            buf.push(kind_tag);
924            if kind_tag != 0 {
925                let sql = col.generated_sql.as_deref().unwrap_or("");
926                let bytes = sql.as_bytes();
927                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
928                buf.extend_from_slice(bytes);
929            }
930        }
931
932        for idx in &self.indices {
933            match &idx.predicate_sql {
934                Some(sql) => {
935                    buf.push(1);
936                    let bytes = sql.as_bytes();
937                    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
938                    buf.extend_from_slice(bytes);
939                }
940                None => buf.push(0),
941            }
942        }
943
944        for fk in &self.foreign_keys {
945            buf.push(fk.on_delete as u8);
946            buf.push(fk.on_update as u8);
947        }
948
949        buf
950    }
951
952    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
953        let mut pos = 0;
954
955        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | 4 | 5 | SCHEMA_VERSION) {
956            return Err(crate::error::SqlError::InvalidValue(
957                "invalid schema version".into(),
958            ));
959        }
960        let version = data[0];
961        pos += 1;
962
963        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
964        pos += 2;
965        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
966        pos += name_len;
967
968        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
969        pos += 2;
970
971        let mut columns = Vec::with_capacity(col_count);
972        for _ in 0..col_count {
973            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
974            pos += 2;
975            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
976            pos += col_name_len;
977            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
978                crate::error::SqlError::InvalidValue("unknown data type tag".into())
979            })?;
980            pos += 1;
981            let nullable = data[pos] != 0;
982            pos += 1;
983            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
984            pos += 2;
985            columns.push(ColumnDef {
986                name: col_name,
987                data_type,
988                nullable,
989                position,
990                default_expr: None,
991                default_sql: None,
992                check_expr: None,
993                check_sql: None,
994                check_name: None,
995                is_with_timezone: false,
996                generated_expr: None,
997                generated_sql: None,
998                generated_kind: None,
999            });
1000        }
1001
1002        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1003        pos += 2;
1004        let mut primary_key_columns = Vec::with_capacity(pk_count);
1005        for _ in 0..pk_count {
1006            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1007            pos += 2;
1008            primary_key_columns.push(pk_idx);
1009        }
1010
1011        let indices = if version >= 2 && pos + 2 <= data.len() {
1012            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1013            pos += 2;
1014            let mut idxs = Vec::with_capacity(idx_count);
1015            for _ in 0..idx_count {
1016                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1017                pos += 2;
1018                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
1019                pos += idx_name_len;
1020                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1021                pos += 2;
1022                let mut cols = Vec::with_capacity(col_count);
1023                for _ in 0..col_count {
1024                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1025                    pos += 2;
1026                    cols.push(col_idx);
1027                }
1028                let unique = data[pos] != 0;
1029                pos += 1;
1030                idxs.push(IndexDef {
1031                    name: idx_name,
1032                    columns: cols,
1033                    unique,
1034                    predicate_sql: None,
1035                    predicate_expr: None,
1036                });
1037            }
1038            idxs
1039        } else {
1040            vec![]
1041        };
1042
1043        let mut check_constraints = Vec::new();
1044        let mut foreign_keys = Vec::new();
1045
1046        if version >= 3 && pos < data.len() {
1047            for col in &mut columns {
1048                let flags = data[pos];
1049                pos += 1;
1050                if flags & 1 != 0 {
1051                    let sql = read_string(data, &mut pos);
1052                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1053                        crate::error::SqlError::InvalidValue(format!(
1054                            "cannot parse DEFAULT expression: {sql}"
1055                        ))
1056                    })?);
1057                    col.default_sql = Some(sql);
1058                }
1059                if flags & 2 != 0 {
1060                    let sql = read_string(data, &mut pos);
1061                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1062                        crate::error::SqlError::InvalidValue(format!(
1063                            "cannot parse CHECK expression: {sql}"
1064                        ))
1065                    })?);
1066                    col.check_sql = Some(sql);
1067                    col.check_name = read_opt_string(data, &mut pos);
1068                }
1069            }
1070
1071            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1072            pos += 2;
1073            for _ in 0..chk_count {
1074                let name = read_opt_string(data, &mut pos);
1075                let sql = read_string(data, &mut pos);
1076                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1077                    crate::error::SqlError::InvalidValue(format!(
1078                        "cannot parse CHECK expression: {sql}"
1079                    ))
1080                })?;
1081                check_constraints.push(TableCheckDef { name, expr, sql });
1082            }
1083
1084            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1085            pos += 2;
1086            for _ in 0..fk_count {
1087                let name = read_opt_string(data, &mut pos);
1088                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1089                pos += 2;
1090                let mut cols = Vec::with_capacity(col_count);
1091                for _ in 0..col_count {
1092                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1093                    pos += 2;
1094                    cols.push(col_idx);
1095                }
1096                let foreign_table = read_string(data, &mut pos);
1097                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1098                pos += 2;
1099                let mut referred_columns = Vec::with_capacity(ref_count);
1100                for _ in 0..ref_count {
1101                    referred_columns.push(read_string(data, &mut pos));
1102                }
1103                foreign_keys.push(ForeignKeySchemaEntry {
1104                    name,
1105                    columns: cols,
1106                    foreign_table,
1107                    referred_columns,
1108                    on_delete: crate::parser::ReferentialAction::NoAction,
1109                    on_update: crate::parser::ReferentialAction::NoAction,
1110                });
1111            }
1112        }
1113        let mut dropped_non_pk_slots = Vec::new();
1114        if version >= 4 && pos + 2 <= data.len() {
1115            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1116            pos += 2;
1117            for _ in 0..slot_count {
1118                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1119                pos += 2;
1120                dropped_non_pk_slots.push(slot);
1121            }
1122        }
1123        if version >= 5 && pos < data.len() {
1124            for col in &mut columns {
1125                let kind_tag = data[pos];
1126                pos += 1;
1127                if kind_tag != 0 {
1128                    let len = u32::from_le_bytes([
1129                        data[pos],
1130                        data[pos + 1],
1131                        data[pos + 2],
1132                        data[pos + 3],
1133                    ]) as usize;
1134                    pos += 4;
1135                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1136                    pos += len;
1137                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1138                        crate::error::SqlError::InvalidValue(format!(
1139                            "cannot parse GENERATED expression: {sql}"
1140                        ))
1141                    })?;
1142                    col.generated_sql = Some(sql);
1143                    col.generated_expr = Some(expr);
1144                    col.generated_kind = Some(match kind_tag {
1145                        1 => crate::parser::GeneratedKind::Stored,
1146                        2 => crate::parser::GeneratedKind::Virtual,
1147                        _ => {
1148                            return Err(crate::error::SqlError::InvalidValue(
1149                                "unknown GENERATED kind tag".into(),
1150                            ));
1151                        }
1152                    });
1153                }
1154            }
1155        }
1156        let mut indices = indices;
1157        if version >= 6 && pos < data.len() {
1158            for idx in &mut indices {
1159                let flag = data[pos];
1160                pos += 1;
1161                if flag == 1 {
1162                    let len = u32::from_le_bytes([
1163                        data[pos],
1164                        data[pos + 1],
1165                        data[pos + 2],
1166                        data[pos + 3],
1167                    ]) as usize;
1168                    pos += 4;
1169                    let sql = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
1170                    pos += len;
1171                    let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1172                        crate::error::SqlError::InvalidValue(format!(
1173                            "cannot parse partial-index predicate: {sql}"
1174                        ))
1175                    })?;
1176                    idx.predicate_sql = Some(sql);
1177                    idx.predicate_expr = Some(expr);
1178                }
1179            }
1180            for fk in &mut foreign_keys {
1181                fk.on_delete =
1182                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1183                        crate::error::SqlError::InvalidValue("unknown FK on_delete tag".into())
1184                    })?;
1185                pos += 1;
1186                fk.on_update =
1187                    crate::parser::ReferentialAction::from_tag(data[pos]).ok_or_else(|| {
1188                        crate::error::SqlError::InvalidValue("unknown FK on_update tag".into())
1189                    })?;
1190                pos += 1;
1191            }
1192        }
1193        let _ = pos;
1194
1195        Ok(Self::with_drops(
1196            name,
1197            columns,
1198            primary_key_columns,
1199            indices,
1200            check_constraints,
1201            foreign_keys,
1202            dropped_non_pk_slots,
1203        ))
1204    }
1205
1206    /// Get column index by name (case-insensitive).
1207    pub fn column_index(&self, name: &str) -> Option<usize> {
1208        self.columns
1209            .iter()
1210            .position(|c| c.name.eq_ignore_ascii_case(name))
1211    }
1212
1213    /// Get indices of non-PK columns (columns stored in the B+ tree value).
1214    pub fn non_pk_indices(&self) -> &[usize] {
1215        &self.non_pk_idx_cache
1216    }
1217
1218    /// Get the PK column indices as usize.
1219    pub fn pk_indices(&self) -> &[usize] {
1220        &self.pk_idx_cache
1221    }
1222
1223    /// Get index definition by name (case-insensitive).
1224    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
1225        let lower = name.to_ascii_lowercase();
1226        self.indices.iter().find(|i| i.name == lower)
1227    }
1228
1229    /// Get the KV table name for an index.
1230    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
1231        format!("__idx_{table_name}_{index_name}").into_bytes()
1232    }
1233}
1234
1235/// Result of executing a SQL statement.
1236#[derive(Debug)]
1237pub enum ExecutionResult {
1238    RowsAffected(u64),
1239    Query(QueryResult),
1240    Ok,
1241}
1242
1243/// Result of a SELECT query.
1244#[derive(Debug, Clone)]
1245pub struct QueryResult {
1246    pub columns: Vec<String>,
1247    pub rows: Vec<Vec<Value>>,
1248}
1249
1250#[cfg(test)]
1251#[path = "types_tests.rs"]
1252mod tests;