Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4
5pub use compact_str::CompactString;
6
7use crate::parser::Expr;
8
9/// SQL data types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18    Time,
19    Date,
20    Timestamp,
21    Interval,
22}
23
24impl DataType {
25    pub fn type_tag(self) -> u8 {
26        match self {
27            DataType::Null => 0,
28            DataType::Blob => 1,
29            DataType::Text => 2,
30            DataType::Boolean => 3,
31            DataType::Integer => 4,
32            DataType::Real => 5,
33            DataType::Time => 6,
34            DataType::Date => 7,
35            DataType::Timestamp => 8,
36            DataType::Interval => 9,
37        }
38    }
39
40    pub fn from_tag(tag: u8) -> Option<Self> {
41        match tag {
42            0 => Some(DataType::Null),
43            1 => Some(DataType::Blob),
44            2 => Some(DataType::Text),
45            3 => Some(DataType::Boolean),
46            4 => Some(DataType::Integer),
47            5 => Some(DataType::Real),
48            6 => Some(DataType::Time),
49            7 => Some(DataType::Date),
50            8 => Some(DataType::Timestamp),
51            9 => Some(DataType::Interval),
52            _ => None,
53        }
54    }
55}
56
57impl fmt::Display for DataType {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        match self {
60            DataType::Null => write!(f, "NULL"),
61            DataType::Integer => write!(f, "INTEGER"),
62            DataType::Real => write!(f, "REAL"),
63            DataType::Text => write!(f, "TEXT"),
64            DataType::Blob => write!(f, "BLOB"),
65            DataType::Boolean => write!(f, "BOOLEAN"),
66            DataType::Time => write!(f, "TIME"),
67            DataType::Date => write!(f, "DATE"),
68            DataType::Timestamp => write!(f, "TIMESTAMP"),
69            DataType::Interval => write!(f, "INTERVAL"),
70        }
71    }
72}
73
74/// SQL value. Temporal epochs: days/µs since 1970-01-01 UTC.
75/// `Date`/`Timestamp` reserve `i{32,64}::{MAX,MIN}` as `±infinity` sentinels.
76#[derive(Debug, Clone, Default)]
77pub enum Value {
78    #[default]
79    Null,
80    Integer(i64),
81    Real(f64),
82    Text(CompactString),
83    Blob(Vec<u8>),
84    Boolean(bool),
85    Time(i64),
86    Date(i32),
87    Timestamp(i64),
88    Interval {
89        months: i32,
90        days: i32,
91        micros: i64,
92    },
93}
94
95impl Value {
96    pub fn data_type(&self) -> DataType {
97        match self {
98            Value::Null => DataType::Null,
99            Value::Integer(_) => DataType::Integer,
100            Value::Real(_) => DataType::Real,
101            Value::Text(_) => DataType::Text,
102            Value::Blob(_) => DataType::Blob,
103            Value::Boolean(_) => DataType::Boolean,
104            Value::Time(_) => DataType::Time,
105            Value::Date(_) => DataType::Date,
106            Value::Timestamp(_) => DataType::Timestamp,
107            Value::Interval { .. } => DataType::Interval,
108        }
109    }
110
111    pub fn is_null(&self) -> bool {
112        matches!(self, Value::Null)
113    }
114
115    /// Returns true for `±infinity` sentinel values on DATE / TIMESTAMP; false otherwise.
116    pub fn is_finite_temporal(&self) -> bool {
117        match self {
118            Value::Date(d) => *d != i32::MAX && *d != i32::MIN,
119            Value::Timestamp(t) => *t != i64::MAX && *t != i64::MIN,
120            _ => true,
121        }
122    }
123
124    /// Attempt to coerce this value to the target type.
125    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
126        match (self, target) {
127            (_, DataType::Null) => Some(Value::Null),
128            (Value::Null, _) => Some(Value::Null),
129            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
130            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
131            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
132            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
133            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
134            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
135            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
136            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
137            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
138            (Value::Time(t), DataType::Time) => Some(Value::Time(*t)),
139            (Value::Date(d), DataType::Date) => Some(Value::Date(*d)),
140            (Value::Timestamp(t), DataType::Timestamp) => Some(Value::Timestamp(*t)),
141            (
142                Value::Interval {
143                    months,
144                    days,
145                    micros,
146                },
147                DataType::Interval,
148            ) => Some(Value::Interval {
149                months: *months,
150                days: *days,
151                micros: *micros,
152            }),
153            _ => None,
154        }
155    }
156
157    pub fn coerce_into(self, target: DataType) -> Option<Value> {
158        if self.is_null() || target == DataType::Null {
159            return Some(Value::Null);
160        }
161        if self.data_type() == target {
162            return Some(self);
163        }
164        match (self, target) {
165            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
166            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
167            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
168            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
169            (Value::Text(s), DataType::Date) => {
170                crate::datetime::parse_date(&s).ok().map(Value::Date)
171            }
172            (Value::Text(s), DataType::Time) => {
173                crate::datetime::parse_time(&s).ok().map(Value::Time)
174            }
175            (Value::Text(s), DataType::Timestamp) => crate::datetime::parse_timestamp(&s)
176                .ok()
177                .map(Value::Timestamp),
178            (Value::Text(s), DataType::Interval) => {
179                crate::datetime::parse_interval(&s)
180                    .ok()
181                    .map(|(m, d, u)| Value::Interval {
182                        months: m,
183                        days: d,
184                        micros: u,
185                    })
186            }
187            // INTEGER → TIMESTAMP: Unix epoch seconds.
188            (Value::Integer(n), DataType::Timestamp) => {
189                n.checked_mul(1_000_000).map(Value::Timestamp)
190            }
191            (Value::Integer(n), DataType::Date) => {
192                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
193                    Some(Value::Date(n as i32))
194                } else {
195                    None
196                }
197            }
198            (Value::Integer(n), DataType::Time) => {
199                if (0..=86_400_000_000).contains(&n) {
200                    Some(Value::Time(n))
201                } else {
202                    None
203                }
204            }
205            (Value::Integer(n), DataType::Interval) => {
206                if n >= i32::MIN as i64 && n <= i32::MAX as i64 {
207                    Some(Value::Interval {
208                        months: 0,
209                        days: n as i32,
210                        micros: 0,
211                    })
212                } else {
213                    None
214                }
215            }
216            (Value::Timestamp(t), DataType::Integer) => Some(Value::Integer(t / 1_000_000)),
217            (Value::Date(d), DataType::Integer) => Some(Value::Integer(d as i64)),
218            (Value::Time(t), DataType::Integer) => Some(Value::Integer(t)),
219            (Value::Date(d), DataType::Timestamp) => {
220                (d as i64).checked_mul(86_400_000_000).map(Value::Timestamp)
221            }
222            (Value::Timestamp(t), DataType::Date) => {
223                // div_euclid floors correctly for negative µs (pre-1970).
224                let days = t.div_euclid(86_400_000_000);
225                if days >= i32::MIN as i64 && days <= i32::MAX as i64 {
226                    Some(Value::Date(days as i32))
227                } else {
228                    None
229                }
230            }
231            (v, DataType::Text)
232                if matches!(
233                    v.data_type(),
234                    DataType::Date | DataType::Time | DataType::Timestamp | DataType::Interval
235                ) =>
236            {
237                Some(Value::Text(v.to_string().into()))
238            }
239            _ => None,
240        }
241    }
242
243    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
244    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
245        match (self, other) {
246            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
247            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
248            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
249            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
250            _ => None,
251        }
252    }
253}
254
255impl PartialEq for Value {
256    // Field-wise for Eq/Hash/Ord transitivity. SQL-level `=` on INTERVAL
257    // normalizes separately (see eval.rs).
258    fn eq(&self, other: &Self) -> bool {
259        match (self, other) {
260            (Value::Null, Value::Null) => true,
261            (Value::Integer(a), Value::Integer(b)) => a == b,
262            (Value::Real(a), Value::Real(b)) => a == b,
263            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
264            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
265            (Value::Text(a), Value::Text(b)) => a == b,
266            (Value::Blob(a), Value::Blob(b)) => a == b,
267            (Value::Boolean(a), Value::Boolean(b)) => a == b,
268            (Value::Time(a), Value::Time(b)) => a == b,
269            (Value::Date(a), Value::Date(b)) => a == b,
270            (Value::Timestamp(a), Value::Timestamp(b)) => a == b,
271            (
272                Value::Interval {
273                    months: am,
274                    days: ad,
275                    micros: au,
276                },
277                Value::Interval {
278                    months: bm,
279                    days: bd,
280                    micros: bu,
281                },
282            ) => am == bm && ad == bd && au == bu,
283            _ => false,
284        }
285    }
286}
287
288impl Eq for Value {}
289
290impl Hash for Value {
291    fn hash<H: Hasher>(&self, state: &mut H) {
292        match self {
293            Value::Null => 0u8.hash(state),
294            Value::Integer(i) => {
295                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
296                // matching the cross-type PartialEq contract.
297                1u8.hash(state);
298                (*i as f64).to_bits().hash(state);
299            }
300            Value::Real(r) => {
301                1u8.hash(state);
302                r.to_bits().hash(state);
303            }
304            Value::Text(s) => {
305                2u8.hash(state);
306                s.hash(state);
307            }
308            Value::Blob(b) => {
309                3u8.hash(state);
310                b.hash(state);
311            }
312            Value::Boolean(b) => {
313                4u8.hash(state);
314                b.hash(state);
315            }
316            Value::Time(t) => {
317                5u8.hash(state);
318                t.hash(state);
319            }
320            Value::Date(d) => {
321                6u8.hash(state);
322                d.hash(state);
323            }
324            Value::Timestamp(t) => {
325                7u8.hash(state);
326                t.hash(state);
327            }
328            Value::Interval {
329                months,
330                days,
331                micros,
332            } => {
333                8u8.hash(state);
334                months.hash(state);
335                days.hash(state);
336                micros.hash(state);
337            }
338        }
339    }
340}
341
342impl PartialOrd for Value {
343    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
344        Some(self.cmp(other))
345    }
346}
347
348impl Ord for Value {
349    // Order: NULL < BOOLEAN < numeric < TIME < DATE < TIMESTAMP < INTERVAL < TEXT < BLOB.
350    // INTERVAL compares field-wise for trait-invariant safety; SQL-level ops normalize.
351    fn cmp(&self, other: &Self) -> Ordering {
352        match (self, other) {
353            (Value::Null, Value::Null) => Ordering::Equal,
354            (Value::Null, _) => Ordering::Less,
355            (_, Value::Null) => Ordering::Greater,
356
357            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
358            (Value::Boolean(_), _) => Ordering::Less,
359            (_, Value::Boolean(_)) => Ordering::Greater,
360
361            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
362                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
363            }
364            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
365            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
366
367            (Value::Time(a), Value::Time(b)) => a.cmp(b),
368            (Value::Time(_), _) => Ordering::Less,
369            (_, Value::Time(_)) => Ordering::Greater,
370
371            (Value::Date(a), Value::Date(b)) => a.cmp(b),
372            (Value::Date(_), _) => Ordering::Less,
373            (_, Value::Date(_)) => Ordering::Greater,
374
375            (Value::Timestamp(a), Value::Timestamp(b)) => a.cmp(b),
376            (Value::Timestamp(_), _) => Ordering::Less,
377            (_, Value::Timestamp(_)) => Ordering::Greater,
378
379            (
380                Value::Interval {
381                    months: am,
382                    days: ad,
383                    micros: au,
384                },
385                Value::Interval {
386                    months: bm,
387                    days: bd,
388                    micros: bu,
389                },
390            ) => am.cmp(bm).then(ad.cmp(bd)).then(au.cmp(bu)),
391            (Value::Interval { .. }, _) => Ordering::Less,
392            (_, Value::Interval { .. }) => Ordering::Greater,
393
394            (Value::Text(a), Value::Text(b)) => a.cmp(b),
395            (Value::Text(_), _) => Ordering::Less,
396            (_, Value::Text(_)) => Ordering::Greater,
397
398            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
399        }
400    }
401}
402
403impl fmt::Display for Value {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        match self {
406            Value::Null => write!(f, "NULL"),
407            Value::Integer(i) => write!(f, "{i}"),
408            Value::Real(r) => {
409                if r.fract() == 0.0 && r.is_finite() {
410                    write!(f, "{r:.1}")
411                } else {
412                    write!(f, "{r}")
413                }
414            }
415            Value::Text(s) => write!(f, "{s}"),
416            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
417            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
418            Value::Time(t) => write!(f, "{}", crate::datetime::format_time(*t)),
419            Value::Date(d) => write!(f, "{}", crate::datetime::format_date(*d)),
420            Value::Timestamp(t) => write!(f, "{}", crate::datetime::format_timestamp(*t)),
421            Value::Interval {
422                months,
423                days,
424                micros,
425            } => {
426                write!(
427                    f,
428                    "{}",
429                    crate::datetime::format_interval(*months, *days, *micros)
430                )
431            }
432        }
433    }
434}
435
436fn hex_encode(data: &[u8]) -> String {
437    let mut s = String::with_capacity(data.len() * 2);
438    for byte in data {
439        s.push_str(&format!("{byte:02X}"));
440    }
441    s
442}
443
444/// Column definition.
445#[derive(Debug, Clone)]
446pub struct ColumnDef {
447    pub name: String,
448    pub data_type: DataType,
449    pub nullable: bool,
450    pub position: u16,
451    pub default_expr: Option<Expr>,
452    pub default_sql: Option<String>,
453    pub check_expr: Option<Expr>,
454    pub check_sql: Option<String>,
455    pub check_name: Option<String>,
456    /// Display-only flag for `TIMESTAMPTZ` / `TIMETZ`; storage is i64 µs UTC.
457    pub is_with_timezone: bool,
458}
459
460/// Index definition stored as part of the table schema.
461#[derive(Debug, Clone)]
462pub struct IndexDef {
463    pub name: String,
464    pub columns: Vec<u16>,
465    pub unique: bool,
466}
467
468/// View definition stored in the _views metadata table.
469#[derive(Debug, Clone)]
470pub struct ViewDef {
471    pub name: String,
472    pub sql: String,
473    pub column_aliases: Vec<String>,
474}
475
476const VIEW_DEF_VERSION: u8 = 1;
477
478impl ViewDef {
479    pub fn serialize(&self) -> Vec<u8> {
480        let mut buf = Vec::new();
481        buf.push(VIEW_DEF_VERSION);
482
483        let name_bytes = self.name.as_bytes();
484        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
485        buf.extend_from_slice(name_bytes);
486
487        let sql_bytes = self.sql.as_bytes();
488        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
489        buf.extend_from_slice(sql_bytes);
490
491        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
492        for alias in &self.column_aliases {
493            let alias_bytes = alias.as_bytes();
494            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
495            buf.extend_from_slice(alias_bytes);
496        }
497
498        buf
499    }
500
501    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
502        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
503            return Err(crate::error::SqlError::InvalidValue(
504                "invalid view definition version".into(),
505            ));
506        }
507        let mut pos = 1;
508
509        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
510        pos += 2;
511        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
512        pos += name_len;
513
514        let sql_len =
515            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
516        pos += 4;
517        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
518        pos += sql_len;
519
520        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
521        pos += 2;
522        let mut column_aliases = Vec::with_capacity(alias_count);
523        for _ in 0..alias_count {
524            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
525            pos += 2;
526            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
527            pos += alias_len;
528            column_aliases.push(alias);
529        }
530
531        Ok(Self {
532            name,
533            sql,
534            column_aliases,
535        })
536    }
537}
538
539/// Table-level CHECK constraint stored in schema.
540#[derive(Debug, Clone)]
541pub struct TableCheckDef {
542    pub name: Option<String>,
543    pub expr: Expr,
544    pub sql: String,
545}
546
547/// Foreign key definition stored in schema.
548#[derive(Debug, Clone)]
549pub struct ForeignKeySchemaEntry {
550    pub name: Option<String>,
551    pub columns: Vec<u16>,
552    pub foreign_table: String,
553    pub referred_columns: Vec<String>,
554}
555
556/// Table schema stored in the _schema table.
557#[derive(Debug, Clone)]
558pub struct TableSchema {
559    pub name: String,
560    pub columns: Vec<ColumnDef>,
561    pub primary_key_columns: Vec<u16>,
562    pub indices: Vec<IndexDef>,
563    pub check_constraints: Vec<TableCheckDef>,
564    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
565    pk_idx_cache: Vec<usize>,
566    non_pk_idx_cache: Vec<usize>,
567    /// Physical encoding slots dropped via DROP COLUMN. Sorted.
568    /// Old rows have data at these positions (skipped on decode);
569    /// new rows encode NULL to maintain position consistency.
570    dropped_non_pk_slots: Vec<u16>,
571    /// Physical encoding position -> logical column index.
572    /// `usize::MAX` for dropped slots.
573    decode_mapping_cache: Vec<usize>,
574    /// Logical non-PK order -> physical encoding position.
575    /// `encoding_positions_cache[i]` is the physical slot for `non_pk_idx_cache[i]`.
576    encoding_positions_cache: Vec<u16>,
577}
578
579impl TableSchema {
580    pub fn new(
581        name: String,
582        columns: Vec<ColumnDef>,
583        primary_key_columns: Vec<u16>,
584        indices: Vec<IndexDef>,
585        check_constraints: Vec<TableCheckDef>,
586        foreign_keys: Vec<ForeignKeySchemaEntry>,
587    ) -> Self {
588        Self::with_drops(
589            name,
590            columns,
591            primary_key_columns,
592            indices,
593            check_constraints,
594            foreign_keys,
595            vec![],
596        )
597    }
598
599    pub fn with_drops(
600        name: String,
601        columns: Vec<ColumnDef>,
602        primary_key_columns: Vec<u16>,
603        indices: Vec<IndexDef>,
604        check_constraints: Vec<TableCheckDef>,
605        foreign_keys: Vec<ForeignKeySchemaEntry>,
606        dropped_non_pk_slots: Vec<u16>,
607    ) -> Self {
608        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
609        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
610            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
611            .collect();
612
613        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
614        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
615        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
616
617        let mut drop_idx = 0;
618        let mut live_idx = 0;
619        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
620            if drop_idx < dropped_non_pk_slots.len()
621                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
622            {
623                drop_idx += 1;
624            } else {
625                *slot = non_pk_idx_cache[live_idx];
626                encoding_positions_cache.push(phys_pos as u16);
627                live_idx += 1;
628            }
629        }
630
631        Self {
632            name,
633            columns,
634            primary_key_columns,
635            indices,
636            check_constraints,
637            foreign_keys,
638            pk_idx_cache,
639            non_pk_idx_cache,
640            dropped_non_pk_slots,
641            decode_mapping_cache,
642            encoding_positions_cache,
643        }
644    }
645
646    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
647    pub fn rebuild(self) -> Self {
648        let drops = self.dropped_non_pk_slots;
649        Self::with_drops(
650            self.name,
651            self.columns,
652            self.primary_key_columns,
653            self.indices,
654            self.check_constraints,
655            self.foreign_keys,
656            drops,
657        )
658    }
659
660    /// Returns true if any column-level or table-level CHECK constraints exist.
661    pub fn has_checks(&self) -> bool {
662        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
663    }
664
665    /// Physical encoding position -> logical column index mapping.
666    /// Length = physical_non_pk_count. `usize::MAX` for dropped slots.
667    pub fn decode_col_mapping(&self) -> &[usize] {
668        &self.decode_mapping_cache
669    }
670
671    /// Logical non-PK order -> physical encoding position.
672    /// `encoding_positions()[i]` is the physical slot for `non_pk_indices()[i]`.
673    pub fn encoding_positions(&self) -> &[u16] {
674        &self.encoding_positions_cache
675    }
676
677    /// Total physical non-PK column count (live + dropped slots).
678    pub fn physical_non_pk_count(&self) -> usize {
679        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
680    }
681
682    /// Physical encoding slots that have been dropped via DROP COLUMN.
683    pub fn dropped_non_pk_slots(&self) -> &[u16] {
684        &self.dropped_non_pk_slots
685    }
686
687    /// Return a new schema with the column at `drop_pos` marked as dropped.
688    /// Row data is not rewritten; decode skips the slot. Logical positions
689    /// above `drop_pos` shift down; table-level CHECKs referencing it are
690    /// filtered out.
691    pub fn without_column(&self, drop_pos: usize) -> Self {
692        let non_pk_order = self
693            .non_pk_idx_cache
694            .iter()
695            .position(|&i| i == drop_pos)
696            .expect("cannot drop PK column via without_column");
697        let physical_slot = self.encoding_positions_cache[non_pk_order];
698
699        let mut new_dropped = self.dropped_non_pk_slots.clone();
700        new_dropped.push(physical_slot);
701        new_dropped.sort();
702
703        let dropped_name = &self.columns[drop_pos].name;
704        let drop_pos_u16 = drop_pos as u16;
705
706        let mut columns: Vec<ColumnDef> = self
707            .columns
708            .iter()
709            .enumerate()
710            .filter(|(i, _)| *i != drop_pos)
711            .map(|(_, c)| {
712                let mut col = c.clone();
713                if col.position > drop_pos_u16 {
714                    col.position -= 1;
715                }
716                col
717            })
718            .collect();
719        for (i, col) in columns.iter_mut().enumerate() {
720            col.position = i as u16;
721        }
722
723        let primary_key_columns: Vec<u16> = self
724            .primary_key_columns
725            .iter()
726            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
727            .collect();
728
729        let indices: Vec<IndexDef> = self
730            .indices
731            .iter()
732            .map(|idx| IndexDef {
733                name: idx.name.clone(),
734                columns: idx
735                    .columns
736                    .iter()
737                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
738                    .collect(),
739                unique: idx.unique,
740            })
741            .collect();
742
743        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
744            .foreign_keys
745            .iter()
746            .map(|fk| ForeignKeySchemaEntry {
747                name: fk.name.clone(),
748                columns: fk
749                    .columns
750                    .iter()
751                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
752                    .collect(),
753                foreign_table: fk.foreign_table.clone(),
754                referred_columns: fk.referred_columns.clone(),
755            })
756            .collect();
757
758        // Filter out table-level CHECKs that reference the dropped column
759        let dropped_lower = dropped_name.to_ascii_lowercase();
760        let check_constraints: Vec<TableCheckDef> = self
761            .check_constraints
762            .iter()
763            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
764            .cloned()
765            .collect();
766
767        Self::with_drops(
768            self.name.clone(),
769            columns,
770            primary_key_columns,
771            indices,
772            check_constraints,
773            foreign_keys,
774            new_dropped,
775        )
776    }
777}
778
779const SCHEMA_VERSION: u8 = 4;
780
781fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
782    match s {
783        Some(s) => {
784            let bytes = s.as_bytes();
785            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
786            buf.extend_from_slice(bytes);
787        }
788        None => buf.extend_from_slice(&0u16.to_le_bytes()),
789    }
790}
791
792fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
793    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
794    *pos += 2;
795    if len == 0 {
796        None
797    } else {
798        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
799        *pos += len;
800        Some(s)
801    }
802}
803
804fn read_string(data: &[u8], pos: &mut usize) -> String {
805    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
806    *pos += 2;
807    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
808    *pos += len;
809    s
810}
811
812impl TableSchema {
813    pub fn serialize(&self) -> Vec<u8> {
814        let mut buf = Vec::new();
815        buf.push(SCHEMA_VERSION);
816
817        let name_bytes = self.name.as_bytes();
818        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
819        buf.extend_from_slice(name_bytes);
820
821        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
822
823        for col in &self.columns {
824            let col_name = col.name.as_bytes();
825            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
826            buf.extend_from_slice(col_name);
827            buf.push(col.data_type.type_tag());
828            buf.push(if col.nullable { 1 } else { 0 });
829            buf.extend_from_slice(&col.position.to_le_bytes());
830        }
831
832        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
833        for &pk_idx in &self.primary_key_columns {
834            buf.extend_from_slice(&pk_idx.to_le_bytes());
835        }
836
837        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
838        for idx in &self.indices {
839            let idx_name = idx.name.as_bytes();
840            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
841            buf.extend_from_slice(idx_name);
842            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
843            for &col_idx in &idx.columns {
844                buf.extend_from_slice(&col_idx.to_le_bytes());
845            }
846            buf.push(if idx.unique { 1 } else { 0 });
847        }
848
849        for col in &self.columns {
850            let mut flags: u8 = 0;
851            if col.default_sql.is_some() {
852                flags |= 1;
853            }
854            if col.check_sql.is_some() {
855                flags |= 2;
856            }
857            buf.push(flags);
858            if let Some(ref sql) = col.default_sql {
859                let bytes = sql.as_bytes();
860                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
861                buf.extend_from_slice(bytes);
862            }
863            if let Some(ref sql) = col.check_sql {
864                let bytes = sql.as_bytes();
865                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
866                buf.extend_from_slice(bytes);
867                write_opt_string(&mut buf, &col.check_name);
868            }
869        }
870
871        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
872        for chk in &self.check_constraints {
873            write_opt_string(&mut buf, &chk.name);
874            let sql_bytes = chk.sql.as_bytes();
875            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
876            buf.extend_from_slice(sql_bytes);
877        }
878
879        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
880        for fk in &self.foreign_keys {
881            write_opt_string(&mut buf, &fk.name);
882            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
883            for &col_idx in &fk.columns {
884                buf.extend_from_slice(&col_idx.to_le_bytes());
885            }
886            let ft_bytes = fk.foreign_table.as_bytes();
887            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
888            buf.extend_from_slice(ft_bytes);
889            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
890            for rc in &fk.referred_columns {
891                let rc_bytes = rc.as_bytes();
892                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
893                buf.extend_from_slice(rc_bytes);
894            }
895        }
896
897        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
898        for &slot in &self.dropped_non_pk_slots {
899            buf.extend_from_slice(&slot.to_le_bytes());
900        }
901
902        buf
903    }
904
905    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
906        let mut pos = 0;
907
908        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | SCHEMA_VERSION) {
909            return Err(crate::error::SqlError::InvalidValue(
910                "invalid schema version".into(),
911            ));
912        }
913        let version = data[0];
914        pos += 1;
915
916        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
917        pos += 2;
918        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
919        pos += name_len;
920
921        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
922        pos += 2;
923
924        let mut columns = Vec::with_capacity(col_count);
925        for _ in 0..col_count {
926            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
927            pos += 2;
928            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
929            pos += col_name_len;
930            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
931                crate::error::SqlError::InvalidValue("unknown data type tag".into())
932            })?;
933            pos += 1;
934            let nullable = data[pos] != 0;
935            pos += 1;
936            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
937            pos += 2;
938            columns.push(ColumnDef {
939                name: col_name,
940                data_type,
941                nullable,
942                position,
943                default_expr: None,
944                default_sql: None,
945                check_expr: None,
946                check_sql: None,
947                check_name: None,
948                is_with_timezone: false,
949            });
950        }
951
952        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
953        pos += 2;
954        let mut primary_key_columns = Vec::with_capacity(pk_count);
955        for _ in 0..pk_count {
956            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
957            pos += 2;
958            primary_key_columns.push(pk_idx);
959        }
960
961        let indices = if version >= 2 && pos + 2 <= data.len() {
962            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
963            pos += 2;
964            let mut idxs = Vec::with_capacity(idx_count);
965            for _ in 0..idx_count {
966                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
967                pos += 2;
968                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
969                pos += idx_name_len;
970                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
971                pos += 2;
972                let mut cols = Vec::with_capacity(col_count);
973                for _ in 0..col_count {
974                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
975                    pos += 2;
976                    cols.push(col_idx);
977                }
978                let unique = data[pos] != 0;
979                pos += 1;
980                idxs.push(IndexDef {
981                    name: idx_name,
982                    columns: cols,
983                    unique,
984                });
985            }
986            idxs
987        } else {
988            vec![]
989        };
990
991        let mut check_constraints = Vec::new();
992        let mut foreign_keys = Vec::new();
993
994        if version >= 3 && pos < data.len() {
995            for col in &mut columns {
996                let flags = data[pos];
997                pos += 1;
998                if flags & 1 != 0 {
999                    let sql = read_string(data, &mut pos);
1000                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1001                        crate::error::SqlError::InvalidValue(format!(
1002                            "cannot parse DEFAULT expression: {sql}"
1003                        ))
1004                    })?);
1005                    col.default_sql = Some(sql);
1006                }
1007                if flags & 2 != 0 {
1008                    let sql = read_string(data, &mut pos);
1009                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
1010                        crate::error::SqlError::InvalidValue(format!(
1011                            "cannot parse CHECK expression: {sql}"
1012                        ))
1013                    })?);
1014                    col.check_sql = Some(sql);
1015                    col.check_name = read_opt_string(data, &mut pos);
1016                }
1017            }
1018
1019            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1020            pos += 2;
1021            for _ in 0..chk_count {
1022                let name = read_opt_string(data, &mut pos);
1023                let sql = read_string(data, &mut pos);
1024                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
1025                    crate::error::SqlError::InvalidValue(format!(
1026                        "cannot parse CHECK expression: {sql}"
1027                    ))
1028                })?;
1029                check_constraints.push(TableCheckDef { name, expr, sql });
1030            }
1031
1032            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1033            pos += 2;
1034            for _ in 0..fk_count {
1035                let name = read_opt_string(data, &mut pos);
1036                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1037                pos += 2;
1038                let mut cols = Vec::with_capacity(col_count);
1039                for _ in 0..col_count {
1040                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
1041                    pos += 2;
1042                    cols.push(col_idx);
1043                }
1044                let foreign_table = read_string(data, &mut pos);
1045                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1046                pos += 2;
1047                let mut referred_columns = Vec::with_capacity(ref_count);
1048                for _ in 0..ref_count {
1049                    referred_columns.push(read_string(data, &mut pos));
1050                }
1051                foreign_keys.push(ForeignKeySchemaEntry {
1052                    name,
1053                    columns: cols,
1054                    foreign_table,
1055                    referred_columns,
1056                });
1057            }
1058        }
1059        let mut dropped_non_pk_slots = Vec::new();
1060        if version >= 4 && pos + 2 <= data.len() {
1061            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
1062            pos += 2;
1063            for _ in 0..slot_count {
1064                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
1065                pos += 2;
1066                dropped_non_pk_slots.push(slot);
1067            }
1068        }
1069        let _ = pos;
1070
1071        Ok(Self::with_drops(
1072            name,
1073            columns,
1074            primary_key_columns,
1075            indices,
1076            check_constraints,
1077            foreign_keys,
1078            dropped_non_pk_slots,
1079        ))
1080    }
1081
1082    /// Get column index by name (case-insensitive).
1083    pub fn column_index(&self, name: &str) -> Option<usize> {
1084        self.columns
1085            .iter()
1086            .position(|c| c.name.eq_ignore_ascii_case(name))
1087    }
1088
1089    /// Get indices of non-PK columns (columns stored in the B+ tree value).
1090    pub fn non_pk_indices(&self) -> &[usize] {
1091        &self.non_pk_idx_cache
1092    }
1093
1094    /// Get the PK column indices as usize.
1095    pub fn pk_indices(&self) -> &[usize] {
1096        &self.pk_idx_cache
1097    }
1098
1099    /// Get index definition by name (case-insensitive).
1100    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
1101        let lower = name.to_ascii_lowercase();
1102        self.indices.iter().find(|i| i.name == lower)
1103    }
1104
1105    /// Get the KV table name for an index.
1106    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
1107        format!("__idx_{table_name}_{index_name}").into_bytes()
1108    }
1109}
1110
1111/// Result of executing a SQL statement.
1112#[derive(Debug)]
1113pub enum ExecutionResult {
1114    RowsAffected(u64),
1115    Query(QueryResult),
1116    Ok,
1117}
1118
1119/// Result of a SELECT query.
1120#[derive(Debug, Clone)]
1121pub struct QueryResult {
1122    pub columns: Vec<String>,
1123    pub rows: Vec<Vec<Value>>,
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128    use super::*;
1129
1130    #[test]
1131    fn value_ordering() {
1132        assert!(Value::Null < Value::Boolean(false));
1133        assert!(Value::Boolean(false) < Value::Boolean(true));
1134        assert!(Value::Boolean(true) < Value::Integer(0));
1135        assert!(Value::Integer(-1) < Value::Integer(0));
1136        assert!(Value::Integer(0) < Value::Real(0.5));
1137        assert!(Value::Real(1.0) < Value::Text("".into()));
1138        assert!(Value::Text("a".into()) < Value::Text("b".into()));
1139        assert!(Value::Text("z".into()) < Value::Blob(vec![]));
1140        assert!(Value::Blob(vec![0]) < Value::Blob(vec![1]));
1141    }
1142
1143    #[test]
1144    fn value_numeric_mixed() {
1145        assert_eq!(Value::Integer(1), Value::Real(1.0));
1146        assert!(Value::Integer(1) < Value::Real(1.5));
1147        assert!(Value::Real(0.5) < Value::Integer(1));
1148    }
1149
1150    #[test]
1151    fn value_display() {
1152        assert_eq!(format!("{}", Value::Null), "NULL");
1153        assert_eq!(format!("{}", Value::Integer(42)), "42");
1154        assert_eq!(format!("{}", Value::Real(3.15)), "3.15");
1155        assert_eq!(format!("{}", Value::Real(1.0)), "1.0");
1156        assert_eq!(format!("{}", Value::Text("hello".into())), "hello");
1157        assert_eq!(format!("{}", Value::Blob(vec![0xDE, 0xAD])), "X'DEAD'");
1158        assert_eq!(format!("{}", Value::Boolean(true)), "TRUE");
1159        assert_eq!(format!("{}", Value::Boolean(false)), "FALSE");
1160    }
1161
1162    #[test]
1163    fn value_coerce() {
1164        assert_eq!(
1165            Value::Integer(42).coerce_to(DataType::Real),
1166            Some(Value::Real(42.0))
1167        );
1168        assert_eq!(
1169            Value::Boolean(true).coerce_to(DataType::Integer),
1170            Some(Value::Integer(1))
1171        );
1172        assert_eq!(Value::Null.coerce_to(DataType::Integer), Some(Value::Null));
1173        assert_eq!(Value::Text("x".into()).coerce_to(DataType::Integer), None);
1174    }
1175
1176    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
1177        ColumnDef {
1178            name: name.into(),
1179            data_type: dt,
1180            nullable,
1181            position: pos,
1182            default_expr: None,
1183            default_sql: None,
1184            check_expr: None,
1185            check_sql: None,
1186            check_name: None,
1187            is_with_timezone: false,
1188        }
1189    }
1190
1191    #[test]
1192    fn schema_roundtrip() {
1193        let schema = TableSchema::new(
1194            "users".into(),
1195            vec![
1196                col("id", DataType::Integer, false, 0),
1197                col("name", DataType::Text, true, 1),
1198                col("active", DataType::Boolean, false, 2),
1199            ],
1200            vec![0],
1201            vec![],
1202            vec![],
1203            vec![],
1204        );
1205
1206        let data = schema.serialize();
1207        let restored = TableSchema::deserialize(&data).unwrap();
1208
1209        assert_eq!(restored.name, "users");
1210        assert_eq!(restored.columns.len(), 3);
1211        assert_eq!(restored.columns[0].name, "id");
1212        assert_eq!(restored.columns[0].data_type, DataType::Integer);
1213        assert!(!restored.columns[0].nullable);
1214        assert_eq!(restored.columns[1].name, "name");
1215        assert_eq!(restored.columns[1].data_type, DataType::Text);
1216        assert!(restored.columns[1].nullable);
1217        assert_eq!(restored.columns[2].name, "active");
1218        assert_eq!(restored.columns[2].data_type, DataType::Boolean);
1219        assert_eq!(restored.primary_key_columns, vec![0]);
1220    }
1221
1222    #[test]
1223    fn schema_roundtrip_with_indices() {
1224        let schema = TableSchema::new(
1225            "orders".into(),
1226            vec![
1227                col("id", DataType::Integer, false, 0),
1228                col("customer", DataType::Text, false, 1),
1229                col("amount", DataType::Real, true, 2),
1230            ],
1231            vec![0],
1232            vec![
1233                IndexDef {
1234                    name: "idx_customer".into(),
1235                    columns: vec![1],
1236                    unique: false,
1237                },
1238                IndexDef {
1239                    name: "idx_amount_uniq".into(),
1240                    columns: vec![2],
1241                    unique: true,
1242                },
1243            ],
1244            vec![],
1245            vec![],
1246        );
1247
1248        let data = schema.serialize();
1249        let restored = TableSchema::deserialize(&data).unwrap();
1250
1251        assert_eq!(restored.indices.len(), 2);
1252        assert_eq!(restored.indices[0].name, "idx_customer");
1253        assert_eq!(restored.indices[0].columns, vec![1]);
1254        assert!(!restored.indices[0].unique);
1255        assert_eq!(restored.indices[1].name, "idx_amount_uniq");
1256        assert_eq!(restored.indices[1].columns, vec![2]);
1257        assert!(restored.indices[1].unique);
1258    }
1259
1260    #[test]
1261    fn schema_v1_backward_compat() {
1262        let old_schema = TableSchema::new(
1263            "test".into(),
1264            vec![col("id", DataType::Integer, false, 0)],
1265            vec![0],
1266            vec![],
1267            vec![],
1268            vec![],
1269        );
1270        let mut data = old_schema.serialize();
1271        // Patch to v1 format: replace version byte and truncate everything after PK
1272        data[0] = 1;
1273        // v1 has no indices or v3 data - truncate after PK columns
1274        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col("id": name_len(2)+"id"(2)+type(1)+nullable(1)+position(2)) + pk_count(2) + pk(2)
1275        let v1_len = 1 + 2 + 4 + 2 + (2 + 2 + 1 + 1 + 2) + 2 + 2;
1276        data.truncate(v1_len);
1277
1278        let restored = TableSchema::deserialize(&data).unwrap();
1279        assert_eq!(restored.name, "test");
1280        assert!(restored.indices.is_empty());
1281        assert!(restored.check_constraints.is_empty());
1282        assert!(restored.foreign_keys.is_empty());
1283    }
1284
1285    #[test]
1286    fn schema_v2_backward_compat() {
1287        let schema = TableSchema::new(
1288            "test".into(),
1289            vec![col("id", DataType::Integer, false, 0)],
1290            vec![0],
1291            vec![],
1292            vec![],
1293            vec![],
1294        );
1295        let mut data = schema.serialize();
1296        // Patch version to 2 and truncate v3 data
1297        data[0] = 2;
1298        // v2 ends after indices section: find the v3 start and truncate
1299        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col(8) + pk_count(2) + pk(2) + idx_count(2)
1300        let v2_len = 1 + 2 + 4 + 2 + 8 + 2 + 2 + 2;
1301        data.truncate(v2_len);
1302
1303        let restored = TableSchema::deserialize(&data).unwrap();
1304        assert_eq!(restored.name, "test");
1305        assert!(restored.check_constraints.is_empty());
1306        assert!(restored.foreign_keys.is_empty());
1307        assert!(restored.columns[0].default_expr.is_none());
1308        assert!(restored.columns[0].check_expr.is_none());
1309    }
1310
1311    #[test]
1312    fn schema_roundtrip_with_defaults_and_checks() {
1313        use crate::parser::parse_sql_expr;
1314
1315        let mut columns = vec![
1316            col("id", DataType::Integer, false, 0),
1317            col("val", DataType::Integer, true, 1),
1318            col("name", DataType::Text, true, 2),
1319        ];
1320        columns[1].default_sql = Some("42".into());
1321        columns[1].default_expr = Some(parse_sql_expr("42").unwrap());
1322        columns[2].check_sql = Some("LENGTH(name) > 0".into());
1323        columns[2].check_expr = Some(parse_sql_expr("LENGTH(name) > 0").unwrap());
1324        columns[2].check_name = Some("chk_name_len".into());
1325
1326        let schema = TableSchema::new(
1327            "t".into(),
1328            columns,
1329            vec![0],
1330            vec![],
1331            vec![TableCheckDef {
1332                name: Some("chk_val_pos".into()),
1333                expr: parse_sql_expr("val > 0").unwrap(),
1334                sql: "val > 0".into(),
1335            }],
1336            vec![],
1337        );
1338
1339        let data = schema.serialize();
1340        let restored = TableSchema::deserialize(&data).unwrap();
1341
1342        assert_eq!(restored.columns[1].default_sql.as_deref(), Some("42"));
1343        assert!(restored.columns[1].default_expr.is_some());
1344        assert_eq!(
1345            restored.columns[2].check_sql.as_deref(),
1346            Some("LENGTH(name) > 0")
1347        );
1348        assert!(restored.columns[2].check_expr.is_some());
1349        assert_eq!(
1350            restored.columns[2].check_name.as_deref(),
1351            Some("chk_name_len")
1352        );
1353        assert_eq!(restored.check_constraints.len(), 1);
1354        assert_eq!(
1355            restored.check_constraints[0].name.as_deref(),
1356            Some("chk_val_pos")
1357        );
1358        assert_eq!(restored.check_constraints[0].sql, "val > 0");
1359    }
1360
1361    #[test]
1362    fn schema_roundtrip_with_foreign_keys() {
1363        let schema = TableSchema::new(
1364            "orders".into(),
1365            vec![
1366                col("id", DataType::Integer, false, 0),
1367                col("user_id", DataType::Integer, false, 1),
1368            ],
1369            vec![0],
1370            vec![],
1371            vec![],
1372            vec![ForeignKeySchemaEntry {
1373                name: Some("fk_user".into()),
1374                columns: vec![1],
1375                foreign_table: "users".into(),
1376                referred_columns: vec!["id".into()],
1377            }],
1378        );
1379
1380        let data = schema.serialize();
1381        let restored = TableSchema::deserialize(&data).unwrap();
1382
1383        assert_eq!(restored.foreign_keys.len(), 1);
1384        assert_eq!(restored.foreign_keys[0].name.as_deref(), Some("fk_user"));
1385        assert_eq!(restored.foreign_keys[0].columns, vec![1]);
1386        assert_eq!(restored.foreign_keys[0].foreign_table, "users");
1387        assert_eq!(restored.foreign_keys[0].referred_columns, vec!["id"]);
1388    }
1389
1390    #[test]
1391    fn data_type_display() {
1392        assert_eq!(format!("{}", DataType::Integer), "INTEGER");
1393        assert_eq!(format!("{}", DataType::Text), "TEXT");
1394        assert_eq!(format!("{}", DataType::Boolean), "BOOLEAN");
1395    }
1396}