Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4
5pub use compact_str::CompactString;
6
7use crate::parser::Expr;
8
9/// SQL data types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18}
19
20impl DataType {
21    pub fn type_tag(self) -> u8 {
22        match self {
23            DataType::Null => 0,
24            DataType::Blob => 1,
25            DataType::Text => 2,
26            DataType::Boolean => 3,
27            DataType::Integer => 4,
28            DataType::Real => 5,
29        }
30    }
31
32    pub fn from_tag(tag: u8) -> Option<Self> {
33        match tag {
34            0 => Some(DataType::Null),
35            1 => Some(DataType::Blob),
36            2 => Some(DataType::Text),
37            3 => Some(DataType::Boolean),
38            4 => Some(DataType::Integer),
39            5 => Some(DataType::Real),
40            _ => None,
41        }
42    }
43}
44
45impl fmt::Display for DataType {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            DataType::Null => write!(f, "NULL"),
49            DataType::Integer => write!(f, "INTEGER"),
50            DataType::Real => write!(f, "REAL"),
51            DataType::Text => write!(f, "TEXT"),
52            DataType::Blob => write!(f, "BLOB"),
53            DataType::Boolean => write!(f, "BOOLEAN"),
54        }
55    }
56}
57
58/// SQL value.
59#[derive(Debug, Clone, Default)]
60pub enum Value {
61    #[default]
62    Null,
63    Integer(i64),
64    Real(f64),
65    Text(CompactString),
66    Blob(Vec<u8>),
67    Boolean(bool),
68}
69
70impl Value {
71    pub fn data_type(&self) -> DataType {
72        match self {
73            Value::Null => DataType::Null,
74            Value::Integer(_) => DataType::Integer,
75            Value::Real(_) => DataType::Real,
76            Value::Text(_) => DataType::Text,
77            Value::Blob(_) => DataType::Blob,
78            Value::Boolean(_) => DataType::Boolean,
79        }
80    }
81
82    pub fn is_null(&self) -> bool {
83        matches!(self, Value::Null)
84    }
85
86    /// Attempt to coerce this value to the target type.
87    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
88        match (self, target) {
89            (_, DataType::Null) => Some(Value::Null),
90            (Value::Null, _) => Some(Value::Null),
91            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
92            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
93            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
94            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
95            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
96            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
97            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
98            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
99            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
100            _ => None,
101        }
102    }
103
104    pub fn coerce_into(self, target: DataType) -> Option<Value> {
105        if self.is_null() || target == DataType::Null {
106            return Some(Value::Null);
107        }
108        if self.data_type() == target {
109            return Some(self);
110        }
111        match (self, target) {
112            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
113            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
114            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
115            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
116            _ => None,
117        }
118    }
119
120    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
121    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
122        match (self, other) {
123            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
124            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
125            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
126            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
127            _ => None,
128        }
129    }
130}
131
132impl PartialEq for Value {
133    fn eq(&self, other: &Self) -> bool {
134        match (self, other) {
135            (Value::Null, Value::Null) => true,
136            (Value::Integer(a), Value::Integer(b)) => a == b,
137            (Value::Real(a), Value::Real(b)) => a == b,
138            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
139            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
140            (Value::Text(a), Value::Text(b)) => a == b,
141            (Value::Blob(a), Value::Blob(b)) => a == b,
142            (Value::Boolean(a), Value::Boolean(b)) => a == b,
143            _ => false,
144        }
145    }
146}
147
148impl Eq for Value {}
149
150impl Hash for Value {
151    fn hash<H: Hasher>(&self, state: &mut H) {
152        match self {
153            Value::Null => 0u8.hash(state),
154            Value::Integer(i) => {
155                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
156                // matching the cross-type PartialEq contract.
157                1u8.hash(state);
158                (*i as f64).to_bits().hash(state);
159            }
160            Value::Real(r) => {
161                1u8.hash(state);
162                r.to_bits().hash(state);
163            }
164            Value::Text(s) => {
165                2u8.hash(state);
166                s.hash(state);
167            }
168            Value::Blob(b) => {
169                3u8.hash(state);
170                b.hash(state);
171            }
172            Value::Boolean(b) => {
173                4u8.hash(state);
174                b.hash(state);
175            }
176        }
177    }
178}
179
180impl PartialOrd for Value {
181    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
182        Some(self.cmp(other))
183    }
184}
185
186impl Ord for Value {
187    fn cmp(&self, other: &Self) -> Ordering {
188        // NULL < BOOLEAN < INTEGER/REAL (numeric) < TEXT < BLOB
189        match (self, other) {
190            (Value::Null, Value::Null) => Ordering::Equal,
191            (Value::Null, _) => Ordering::Less,
192            (_, Value::Null) => Ordering::Greater,
193
194            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
195            (Value::Boolean(_), _) => Ordering::Less,
196            (_, Value::Boolean(_)) => Ordering::Greater,
197
198            // Numeric: Integer and Real are comparable
199            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
200                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
201            }
202            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
203            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
204
205            (Value::Text(a), Value::Text(b)) => a.cmp(b),
206            (Value::Text(_), _) => Ordering::Less,
207            (_, Value::Text(_)) => Ordering::Greater,
208
209            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
210        }
211    }
212}
213
214impl fmt::Display for Value {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        match self {
217            Value::Null => write!(f, "NULL"),
218            Value::Integer(i) => write!(f, "{i}"),
219            Value::Real(r) => {
220                if r.fract() == 0.0 && r.is_finite() {
221                    write!(f, "{r:.1}")
222                } else {
223                    write!(f, "{r}")
224                }
225            }
226            Value::Text(s) => write!(f, "{s}"),
227            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
228            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
229        }
230    }
231}
232
233fn hex_encode(data: &[u8]) -> String {
234    let mut s = String::with_capacity(data.len() * 2);
235    for byte in data {
236        s.push_str(&format!("{byte:02X}"));
237    }
238    s
239}
240
241/// Column definition.
242#[derive(Debug, Clone)]
243pub struct ColumnDef {
244    pub name: String,
245    pub data_type: DataType,
246    pub nullable: bool,
247    pub position: u16,
248    pub default_expr: Option<Expr>,
249    pub default_sql: Option<String>,
250    pub check_expr: Option<Expr>,
251    pub check_sql: Option<String>,
252    pub check_name: Option<String>,
253}
254
255/// Index definition stored as part of the table schema.
256#[derive(Debug, Clone)]
257pub struct IndexDef {
258    pub name: String,
259    pub columns: Vec<u16>,
260    pub unique: bool,
261}
262
263/// Table-level CHECK constraint stored in schema.
264#[derive(Debug, Clone)]
265pub struct TableCheckDef {
266    pub name: Option<String>,
267    pub expr: Expr,
268    pub sql: String,
269}
270
271/// Foreign key definition stored in schema.
272#[derive(Debug, Clone)]
273pub struct ForeignKeySchemaEntry {
274    pub name: Option<String>,
275    pub columns: Vec<u16>,
276    pub foreign_table: String,
277    pub referred_columns: Vec<String>,
278}
279
280/// Table schema stored in the _schema table.
281#[derive(Debug, Clone)]
282pub struct TableSchema {
283    pub name: String,
284    pub columns: Vec<ColumnDef>,
285    pub primary_key_columns: Vec<u16>,
286    pub indices: Vec<IndexDef>,
287    pub check_constraints: Vec<TableCheckDef>,
288    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
289    pk_idx_cache: Vec<usize>,
290    non_pk_idx_cache: Vec<usize>,
291    /// Physical encoding slots that have been dropped (O(1) DROP COLUMN).
292    /// Sorted. Old rows still have data at these positions (skipped on decode);
293    /// new rows encode NULL there to maintain position consistency.
294    dropped_non_pk_slots: Vec<u16>,
295    /// Physical encoding position -> logical column index.
296    /// `usize::MAX` for dropped slots.
297    decode_mapping_cache: Vec<usize>,
298    /// Logical non-PK order -> physical encoding position.
299    /// `encoding_positions_cache[i]` is the physical slot for `non_pk_idx_cache[i]`.
300    encoding_positions_cache: Vec<u16>,
301}
302
303impl TableSchema {
304    pub fn new(
305        name: String,
306        columns: Vec<ColumnDef>,
307        primary_key_columns: Vec<u16>,
308        indices: Vec<IndexDef>,
309        check_constraints: Vec<TableCheckDef>,
310        foreign_keys: Vec<ForeignKeySchemaEntry>,
311    ) -> Self {
312        Self::with_drops(
313            name,
314            columns,
315            primary_key_columns,
316            indices,
317            check_constraints,
318            foreign_keys,
319            vec![],
320        )
321    }
322
323    pub fn with_drops(
324        name: String,
325        columns: Vec<ColumnDef>,
326        primary_key_columns: Vec<u16>,
327        indices: Vec<IndexDef>,
328        check_constraints: Vec<TableCheckDef>,
329        foreign_keys: Vec<ForeignKeySchemaEntry>,
330        dropped_non_pk_slots: Vec<u16>,
331    ) -> Self {
332        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
333        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
334            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
335            .collect();
336
337        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
338        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
339        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
340
341        let mut drop_idx = 0;
342        let mut live_idx = 0;
343        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
344            if drop_idx < dropped_non_pk_slots.len()
345                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
346            {
347                drop_idx += 1;
348            } else {
349                *slot = non_pk_idx_cache[live_idx];
350                encoding_positions_cache.push(phys_pos as u16);
351                live_idx += 1;
352            }
353        }
354
355        Self {
356            name,
357            columns,
358            primary_key_columns,
359            indices,
360            check_constraints,
361            foreign_keys,
362            pk_idx_cache,
363            non_pk_idx_cache,
364            dropped_non_pk_slots,
365            decode_mapping_cache,
366            encoding_positions_cache,
367        }
368    }
369
370    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
371    pub fn rebuild(self) -> Self {
372        let drops = self.dropped_non_pk_slots;
373        Self::with_drops(
374            self.name,
375            self.columns,
376            self.primary_key_columns,
377            self.indices,
378            self.check_constraints,
379            self.foreign_keys,
380            drops,
381        )
382    }
383
384    /// Returns true if any column-level or table-level CHECK constraints exist.
385    pub fn has_checks(&self) -> bool {
386        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
387    }
388
389    /// Physical encoding position -> logical column index mapping.
390    /// Length = physical_non_pk_count. `usize::MAX` for dropped slots.
391    pub fn decode_col_mapping(&self) -> &[usize] {
392        &self.decode_mapping_cache
393    }
394
395    /// Logical non-PK order -> physical encoding position.
396    /// `encoding_positions()[i]` is the physical slot for `non_pk_indices()[i]`.
397    pub fn encoding_positions(&self) -> &[u16] {
398        &self.encoding_positions_cache
399    }
400
401    /// Total physical non-PK column count (live + dropped slots).
402    pub fn physical_non_pk_count(&self) -> usize {
403        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
404    }
405
406    /// Physical encoding slots that have been dropped via O(1) DROP COLUMN.
407    pub fn dropped_non_pk_slots(&self) -> &[u16] {
408        &self.dropped_non_pk_slots
409    }
410
411    /// Create a new schema with the column at `drop_pos` removed.
412    /// O(1): marks the physical encoding slot as dropped instead of rewriting rows.
413    /// Decrements all logical position references > drop_pos. Filters out
414    /// table-level CHECK constraints referencing the dropped column.
415    pub fn without_column(&self, drop_pos: usize) -> Self {
416        // Find physical encoding slot for the dropped column
417        let non_pk_order = self
418            .non_pk_idx_cache
419            .iter()
420            .position(|&i| i == drop_pos)
421            .expect("cannot drop PK column via without_column");
422        let physical_slot = self.encoding_positions_cache[non_pk_order];
423
424        let mut new_dropped = self.dropped_non_pk_slots.clone();
425        new_dropped.push(physical_slot);
426        new_dropped.sort();
427
428        let dropped_name = &self.columns[drop_pos].name;
429        let drop_pos_u16 = drop_pos as u16;
430
431        let mut columns: Vec<ColumnDef> = self
432            .columns
433            .iter()
434            .enumerate()
435            .filter(|(i, _)| *i != drop_pos)
436            .map(|(_, c)| {
437                let mut col = c.clone();
438                if col.position > drop_pos_u16 {
439                    col.position -= 1;
440                }
441                col
442            })
443            .collect();
444        for (i, col) in columns.iter_mut().enumerate() {
445            col.position = i as u16;
446        }
447
448        let primary_key_columns: Vec<u16> = self
449            .primary_key_columns
450            .iter()
451            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
452            .collect();
453
454        let indices: Vec<IndexDef> = self
455            .indices
456            .iter()
457            .map(|idx| IndexDef {
458                name: idx.name.clone(),
459                columns: idx
460                    .columns
461                    .iter()
462                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
463                    .collect(),
464                unique: idx.unique,
465            })
466            .collect();
467
468        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
469            .foreign_keys
470            .iter()
471            .map(|fk| ForeignKeySchemaEntry {
472                name: fk.name.clone(),
473                columns: fk
474                    .columns
475                    .iter()
476                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
477                    .collect(),
478                foreign_table: fk.foreign_table.clone(),
479                referred_columns: fk.referred_columns.clone(),
480            })
481            .collect();
482
483        // Filter out table-level CHECKs that reference the dropped column
484        let dropped_lower = dropped_name.to_ascii_lowercase();
485        let check_constraints: Vec<TableCheckDef> = self
486            .check_constraints
487            .iter()
488            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
489            .cloned()
490            .collect();
491
492        Self::with_drops(
493            self.name.clone(),
494            columns,
495            primary_key_columns,
496            indices,
497            check_constraints,
498            foreign_keys,
499            new_dropped,
500        )
501    }
502}
503
504const SCHEMA_VERSION: u8 = 4;
505
506fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
507    match s {
508        Some(s) => {
509            let bytes = s.as_bytes();
510            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
511            buf.extend_from_slice(bytes);
512        }
513        None => buf.extend_from_slice(&0u16.to_le_bytes()),
514    }
515}
516
517fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
518    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
519    *pos += 2;
520    if len == 0 {
521        None
522    } else {
523        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
524        *pos += len;
525        Some(s)
526    }
527}
528
529fn read_string(data: &[u8], pos: &mut usize) -> String {
530    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
531    *pos += 2;
532    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
533    *pos += len;
534    s
535}
536
537impl TableSchema {
538    pub fn serialize(&self) -> Vec<u8> {
539        let mut buf = Vec::new();
540        buf.push(SCHEMA_VERSION);
541
542        // Table name
543        let name_bytes = self.name.as_bytes();
544        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
545        buf.extend_from_slice(name_bytes);
546
547        // Column count
548        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
549
550        // Columns (v1/v2 core fields)
551        for col in &self.columns {
552            let col_name = col.name.as_bytes();
553            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
554            buf.extend_from_slice(col_name);
555            buf.push(col.data_type.type_tag());
556            buf.push(if col.nullable { 1 } else { 0 });
557            buf.extend_from_slice(&col.position.to_le_bytes());
558        }
559
560        // Primary key columns
561        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
562        for &pk_idx in &self.primary_key_columns {
563            buf.extend_from_slice(&pk_idx.to_le_bytes());
564        }
565
566        // Indices (v2+)
567        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
568        for idx in &self.indices {
569            let idx_name = idx.name.as_bytes();
570            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
571            buf.extend_from_slice(idx_name);
572            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
573            for &col_idx in &idx.columns {
574                buf.extend_from_slice(&col_idx.to_le_bytes());
575            }
576            buf.push(if idx.unique { 1 } else { 0 });
577        }
578
579        // ── v3: per-column defaults and checks ──
580        for col in &self.columns {
581            let mut flags: u8 = 0;
582            if col.default_sql.is_some() {
583                flags |= 1;
584            }
585            if col.check_sql.is_some() {
586                flags |= 2;
587            }
588            buf.push(flags);
589            if let Some(ref sql) = col.default_sql {
590                let bytes = sql.as_bytes();
591                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
592                buf.extend_from_slice(bytes);
593            }
594            if let Some(ref sql) = col.check_sql {
595                let bytes = sql.as_bytes();
596                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
597                buf.extend_from_slice(bytes);
598                write_opt_string(&mut buf, &col.check_name);
599            }
600        }
601
602        // ── v3: table-level check constraints ──
603        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
604        for chk in &self.check_constraints {
605            write_opt_string(&mut buf, &chk.name);
606            let sql_bytes = chk.sql.as_bytes();
607            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
608            buf.extend_from_slice(sql_bytes);
609        }
610
611        // ── v3: foreign keys ──
612        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
613        for fk in &self.foreign_keys {
614            write_opt_string(&mut buf, &fk.name);
615            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
616            for &col_idx in &fk.columns {
617                buf.extend_from_slice(&col_idx.to_le_bytes());
618            }
619            let ft_bytes = fk.foreign_table.as_bytes();
620            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
621            buf.extend_from_slice(ft_bytes);
622            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
623            for rc in &fk.referred_columns {
624                let rc_bytes = rc.as_bytes();
625                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
626                buf.extend_from_slice(rc_bytes);
627            }
628        }
629
630        // v4: dropped non-PK encoding slots
631        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
632        for &slot in &self.dropped_non_pk_slots {
633            buf.extend_from_slice(&slot.to_le_bytes());
634        }
635
636        buf
637    }
638
639    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
640        let mut pos = 0;
641
642        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | SCHEMA_VERSION) {
643            return Err(crate::error::SqlError::InvalidValue(
644                "invalid schema version".into(),
645            ));
646        }
647        let version = data[0];
648        pos += 1;
649
650        // Table name
651        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
652        pos += 2;
653        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
654        pos += name_len;
655
656        // Column count
657        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
658        pos += 2;
659
660        let mut columns = Vec::with_capacity(col_count);
661        for _ in 0..col_count {
662            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
663            pos += 2;
664            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
665            pos += col_name_len;
666            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
667                crate::error::SqlError::InvalidValue("unknown data type tag".into())
668            })?;
669            pos += 1;
670            let nullable = data[pos] != 0;
671            pos += 1;
672            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
673            pos += 2;
674            columns.push(ColumnDef {
675                name: col_name,
676                data_type,
677                nullable,
678                position,
679                default_expr: None,
680                default_sql: None,
681                check_expr: None,
682                check_sql: None,
683                check_name: None,
684            });
685        }
686
687        // Primary key columns
688        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
689        pos += 2;
690        let mut primary_key_columns = Vec::with_capacity(pk_count);
691        for _ in 0..pk_count {
692            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
693            pos += 2;
694            primary_key_columns.push(pk_idx);
695        }
696
697        // Indices (v2+)
698        let indices = if version >= 2 && pos + 2 <= data.len() {
699            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
700            pos += 2;
701            let mut idxs = Vec::with_capacity(idx_count);
702            for _ in 0..idx_count {
703                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
704                pos += 2;
705                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
706                pos += idx_name_len;
707                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
708                pos += 2;
709                let mut cols = Vec::with_capacity(col_count);
710                for _ in 0..col_count {
711                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
712                    pos += 2;
713                    cols.push(col_idx);
714                }
715                let unique = data[pos] != 0;
716                pos += 1;
717                idxs.push(IndexDef {
718                    name: idx_name,
719                    columns: cols,
720                    unique,
721                });
722            }
723            idxs
724        } else {
725            vec![]
726        };
727
728        // v3: per-column defaults and checks
729        let mut check_constraints = Vec::new();
730        let mut foreign_keys = Vec::new();
731
732        if version >= 3 && pos < data.len() {
733            for col in &mut columns {
734                let flags = data[pos];
735                pos += 1;
736                if flags & 1 != 0 {
737                    let sql = read_string(data, &mut pos);
738                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
739                        crate::error::SqlError::InvalidValue(format!(
740                            "cannot parse DEFAULT expression: {sql}"
741                        ))
742                    })?);
743                    col.default_sql = Some(sql);
744                }
745                if flags & 2 != 0 {
746                    let sql = read_string(data, &mut pos);
747                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
748                        crate::error::SqlError::InvalidValue(format!(
749                            "cannot parse CHECK expression: {sql}"
750                        ))
751                    })?);
752                    col.check_sql = Some(sql);
753                    col.check_name = read_opt_string(data, &mut pos);
754                }
755            }
756
757            // Table-level check constraints
758            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
759            pos += 2;
760            for _ in 0..chk_count {
761                let name = read_opt_string(data, &mut pos);
762                let sql = read_string(data, &mut pos);
763                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
764                    crate::error::SqlError::InvalidValue(format!(
765                        "cannot parse CHECK expression: {sql}"
766                    ))
767                })?;
768                check_constraints.push(TableCheckDef { name, expr, sql });
769            }
770
771            // Foreign keys
772            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
773            pos += 2;
774            for _ in 0..fk_count {
775                let name = read_opt_string(data, &mut pos);
776                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
777                pos += 2;
778                let mut cols = Vec::with_capacity(col_count);
779                for _ in 0..col_count {
780                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
781                    pos += 2;
782                    cols.push(col_idx);
783                }
784                let foreign_table = read_string(data, &mut pos);
785                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
786                pos += 2;
787                let mut referred_columns = Vec::with_capacity(ref_count);
788                for _ in 0..ref_count {
789                    referred_columns.push(read_string(data, &mut pos));
790                }
791                foreign_keys.push(ForeignKeySchemaEntry {
792                    name,
793                    columns: cols,
794                    foreign_table,
795                    referred_columns,
796                });
797            }
798        }
799        // v4: dropped non-PK encoding slots
800        let mut dropped_non_pk_slots = Vec::new();
801        if version >= 4 && pos + 2 <= data.len() {
802            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
803            pos += 2;
804            for _ in 0..slot_count {
805                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
806                pos += 2;
807                dropped_non_pk_slots.push(slot);
808            }
809        }
810        let _ = pos;
811
812        Ok(Self::with_drops(
813            name,
814            columns,
815            primary_key_columns,
816            indices,
817            check_constraints,
818            foreign_keys,
819            dropped_non_pk_slots,
820        ))
821    }
822
823    /// Get column index by name (case-insensitive).
824    pub fn column_index(&self, name: &str) -> Option<usize> {
825        self.columns
826            .iter()
827            .position(|c| c.name.eq_ignore_ascii_case(name))
828    }
829
830    /// Get indices of non-PK columns (columns stored in the B+ tree value).
831    pub fn non_pk_indices(&self) -> &[usize] {
832        &self.non_pk_idx_cache
833    }
834
835    /// Get the PK column indices as usize.
836    pub fn pk_indices(&self) -> &[usize] {
837        &self.pk_idx_cache
838    }
839
840    /// Get index definition by name (case-insensitive).
841    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
842        let lower = name.to_ascii_lowercase();
843        self.indices.iter().find(|i| i.name == lower)
844    }
845
846    /// Get the KV table name for an index.
847    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
848        format!("__idx_{table_name}_{index_name}").into_bytes()
849    }
850}
851
852/// Result of executing a SQL statement.
853#[derive(Debug)]
854pub enum ExecutionResult {
855    RowsAffected(u64),
856    Query(QueryResult),
857    Ok,
858}
859
860/// Result of a SELECT query.
861#[derive(Debug, Clone)]
862pub struct QueryResult {
863    pub columns: Vec<String>,
864    pub rows: Vec<Vec<Value>>,
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870
871    #[test]
872    fn value_ordering() {
873        assert!(Value::Null < Value::Boolean(false));
874        assert!(Value::Boolean(false) < Value::Boolean(true));
875        assert!(Value::Boolean(true) < Value::Integer(0));
876        assert!(Value::Integer(-1) < Value::Integer(0));
877        assert!(Value::Integer(0) < Value::Real(0.5));
878        assert!(Value::Real(1.0) < Value::Text("".into()));
879        assert!(Value::Text("a".into()) < Value::Text("b".into()));
880        assert!(Value::Text("z".into()) < Value::Blob(vec![]));
881        assert!(Value::Blob(vec![0]) < Value::Blob(vec![1]));
882    }
883
884    #[test]
885    fn value_numeric_mixed() {
886        assert_eq!(Value::Integer(1), Value::Real(1.0));
887        assert!(Value::Integer(1) < Value::Real(1.5));
888        assert!(Value::Real(0.5) < Value::Integer(1));
889    }
890
891    #[test]
892    fn value_display() {
893        assert_eq!(format!("{}", Value::Null), "NULL");
894        assert_eq!(format!("{}", Value::Integer(42)), "42");
895        assert_eq!(format!("{}", Value::Real(3.15)), "3.15");
896        assert_eq!(format!("{}", Value::Real(1.0)), "1.0");
897        assert_eq!(format!("{}", Value::Text("hello".into())), "hello");
898        assert_eq!(format!("{}", Value::Blob(vec![0xDE, 0xAD])), "X'DEAD'");
899        assert_eq!(format!("{}", Value::Boolean(true)), "TRUE");
900        assert_eq!(format!("{}", Value::Boolean(false)), "FALSE");
901    }
902
903    #[test]
904    fn value_coerce() {
905        assert_eq!(
906            Value::Integer(42).coerce_to(DataType::Real),
907            Some(Value::Real(42.0))
908        );
909        assert_eq!(
910            Value::Boolean(true).coerce_to(DataType::Integer),
911            Some(Value::Integer(1))
912        );
913        assert_eq!(Value::Null.coerce_to(DataType::Integer), Some(Value::Null));
914        assert_eq!(Value::Text("x".into()).coerce_to(DataType::Integer), None);
915    }
916
917    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
918        ColumnDef {
919            name: name.into(),
920            data_type: dt,
921            nullable,
922            position: pos,
923            default_expr: None,
924            default_sql: None,
925            check_expr: None,
926            check_sql: None,
927            check_name: None,
928        }
929    }
930
931    #[test]
932    fn schema_roundtrip() {
933        let schema = TableSchema::new(
934            "users".into(),
935            vec![
936                col("id", DataType::Integer, false, 0),
937                col("name", DataType::Text, true, 1),
938                col("active", DataType::Boolean, false, 2),
939            ],
940            vec![0],
941            vec![],
942            vec![],
943            vec![],
944        );
945
946        let data = schema.serialize();
947        let restored = TableSchema::deserialize(&data).unwrap();
948
949        assert_eq!(restored.name, "users");
950        assert_eq!(restored.columns.len(), 3);
951        assert_eq!(restored.columns[0].name, "id");
952        assert_eq!(restored.columns[0].data_type, DataType::Integer);
953        assert!(!restored.columns[0].nullable);
954        assert_eq!(restored.columns[1].name, "name");
955        assert_eq!(restored.columns[1].data_type, DataType::Text);
956        assert!(restored.columns[1].nullable);
957        assert_eq!(restored.columns[2].name, "active");
958        assert_eq!(restored.columns[2].data_type, DataType::Boolean);
959        assert_eq!(restored.primary_key_columns, vec![0]);
960    }
961
962    #[test]
963    fn schema_roundtrip_with_indices() {
964        let schema = TableSchema::new(
965            "orders".into(),
966            vec![
967                col("id", DataType::Integer, false, 0),
968                col("customer", DataType::Text, false, 1),
969                col("amount", DataType::Real, true, 2),
970            ],
971            vec![0],
972            vec![
973                IndexDef {
974                    name: "idx_customer".into(),
975                    columns: vec![1],
976                    unique: false,
977                },
978                IndexDef {
979                    name: "idx_amount_uniq".into(),
980                    columns: vec![2],
981                    unique: true,
982                },
983            ],
984            vec![],
985            vec![],
986        );
987
988        let data = schema.serialize();
989        let restored = TableSchema::deserialize(&data).unwrap();
990
991        assert_eq!(restored.indices.len(), 2);
992        assert_eq!(restored.indices[0].name, "idx_customer");
993        assert_eq!(restored.indices[0].columns, vec![1]);
994        assert!(!restored.indices[0].unique);
995        assert_eq!(restored.indices[1].name, "idx_amount_uniq");
996        assert_eq!(restored.indices[1].columns, vec![2]);
997        assert!(restored.indices[1].unique);
998    }
999
1000    #[test]
1001    fn schema_v1_backward_compat() {
1002        let old_schema = TableSchema::new(
1003            "test".into(),
1004            vec![col("id", DataType::Integer, false, 0)],
1005            vec![0],
1006            vec![],
1007            vec![],
1008            vec![],
1009        );
1010        let mut data = old_schema.serialize();
1011        // Patch to v1 format: replace version byte and truncate everything after PK
1012        data[0] = 1;
1013        // v1 has no indices or v3 data - truncate after PK columns
1014        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col("id": name_len(2)+"id"(2)+type(1)+nullable(1)+position(2)) + pk_count(2) + pk(2)
1015        let v1_len = 1 + 2 + 4 + 2 + (2 + 2 + 1 + 1 + 2) + 2 + 2;
1016        data.truncate(v1_len);
1017
1018        let restored = TableSchema::deserialize(&data).unwrap();
1019        assert_eq!(restored.name, "test");
1020        assert!(restored.indices.is_empty());
1021        assert!(restored.check_constraints.is_empty());
1022        assert!(restored.foreign_keys.is_empty());
1023    }
1024
1025    #[test]
1026    fn schema_v2_backward_compat() {
1027        let schema = TableSchema::new(
1028            "test".into(),
1029            vec![col("id", DataType::Integer, false, 0)],
1030            vec![0],
1031            vec![],
1032            vec![],
1033            vec![],
1034        );
1035        let mut data = schema.serialize();
1036        // Patch version to 2 and truncate v3 data
1037        data[0] = 2;
1038        // v2 ends after indices section: find the v3 start and truncate
1039        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col(8) + pk_count(2) + pk(2) + idx_count(2)
1040        let v2_len = 1 + 2 + 4 + 2 + 8 + 2 + 2 + 2;
1041        data.truncate(v2_len);
1042
1043        let restored = TableSchema::deserialize(&data).unwrap();
1044        assert_eq!(restored.name, "test");
1045        assert!(restored.check_constraints.is_empty());
1046        assert!(restored.foreign_keys.is_empty());
1047        assert!(restored.columns[0].default_expr.is_none());
1048        assert!(restored.columns[0].check_expr.is_none());
1049    }
1050
1051    #[test]
1052    fn schema_roundtrip_with_defaults_and_checks() {
1053        use crate::parser::parse_sql_expr;
1054
1055        let mut columns = vec![
1056            col("id", DataType::Integer, false, 0),
1057            col("val", DataType::Integer, true, 1),
1058            col("name", DataType::Text, true, 2),
1059        ];
1060        columns[1].default_sql = Some("42".into());
1061        columns[1].default_expr = Some(parse_sql_expr("42").unwrap());
1062        columns[2].check_sql = Some("LENGTH(name) > 0".into());
1063        columns[2].check_expr = Some(parse_sql_expr("LENGTH(name) > 0").unwrap());
1064        columns[2].check_name = Some("chk_name_len".into());
1065
1066        let schema = TableSchema::new(
1067            "t".into(),
1068            columns,
1069            vec![0],
1070            vec![],
1071            vec![TableCheckDef {
1072                name: Some("chk_val_pos".into()),
1073                expr: parse_sql_expr("val > 0").unwrap(),
1074                sql: "val > 0".into(),
1075            }],
1076            vec![],
1077        );
1078
1079        let data = schema.serialize();
1080        let restored = TableSchema::deserialize(&data).unwrap();
1081
1082        assert_eq!(restored.columns[1].default_sql.as_deref(), Some("42"));
1083        assert!(restored.columns[1].default_expr.is_some());
1084        assert_eq!(
1085            restored.columns[2].check_sql.as_deref(),
1086            Some("LENGTH(name) > 0")
1087        );
1088        assert!(restored.columns[2].check_expr.is_some());
1089        assert_eq!(
1090            restored.columns[2].check_name.as_deref(),
1091            Some("chk_name_len")
1092        );
1093        assert_eq!(restored.check_constraints.len(), 1);
1094        assert_eq!(
1095            restored.check_constraints[0].name.as_deref(),
1096            Some("chk_val_pos")
1097        );
1098        assert_eq!(restored.check_constraints[0].sql, "val > 0");
1099    }
1100
1101    #[test]
1102    fn schema_roundtrip_with_foreign_keys() {
1103        let schema = TableSchema::new(
1104            "orders".into(),
1105            vec![
1106                col("id", DataType::Integer, false, 0),
1107                col("user_id", DataType::Integer, false, 1),
1108            ],
1109            vec![0],
1110            vec![],
1111            vec![],
1112            vec![ForeignKeySchemaEntry {
1113                name: Some("fk_user".into()),
1114                columns: vec![1],
1115                foreign_table: "users".into(),
1116                referred_columns: vec!["id".into()],
1117            }],
1118        );
1119
1120        let data = schema.serialize();
1121        let restored = TableSchema::deserialize(&data).unwrap();
1122
1123        assert_eq!(restored.foreign_keys.len(), 1);
1124        assert_eq!(restored.foreign_keys[0].name.as_deref(), Some("fk_user"));
1125        assert_eq!(restored.foreign_keys[0].columns, vec![1]);
1126        assert_eq!(restored.foreign_keys[0].foreign_table, "users");
1127        assert_eq!(restored.foreign_keys[0].referred_columns, vec!["id"]);
1128    }
1129
1130    #[test]
1131    fn data_type_display() {
1132        assert_eq!(format!("{}", DataType::Integer), "INTEGER");
1133        assert_eq!(format!("{}", DataType::Text), "TEXT");
1134        assert_eq!(format!("{}", DataType::Boolean), "BOOLEAN");
1135    }
1136}