Skip to main content

citadel_sql/
types.rs

1use std::cmp::Ordering;
2use std::fmt;
3use std::hash::{Hash, Hasher};
4
5pub use compact_str::CompactString;
6
7use crate::parser::Expr;
8
9/// SQL data types.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DataType {
12    Null,
13    Integer,
14    Real,
15    Text,
16    Blob,
17    Boolean,
18}
19
20impl DataType {
21    pub fn type_tag(self) -> u8 {
22        match self {
23            DataType::Null => 0,
24            DataType::Blob => 1,
25            DataType::Text => 2,
26            DataType::Boolean => 3,
27            DataType::Integer => 4,
28            DataType::Real => 5,
29        }
30    }
31
32    pub fn from_tag(tag: u8) -> Option<Self> {
33        match tag {
34            0 => Some(DataType::Null),
35            1 => Some(DataType::Blob),
36            2 => Some(DataType::Text),
37            3 => Some(DataType::Boolean),
38            4 => Some(DataType::Integer),
39            5 => Some(DataType::Real),
40            _ => None,
41        }
42    }
43}
44
45impl fmt::Display for DataType {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            DataType::Null => write!(f, "NULL"),
49            DataType::Integer => write!(f, "INTEGER"),
50            DataType::Real => write!(f, "REAL"),
51            DataType::Text => write!(f, "TEXT"),
52            DataType::Blob => write!(f, "BLOB"),
53            DataType::Boolean => write!(f, "BOOLEAN"),
54        }
55    }
56}
57
58/// SQL value.
59#[derive(Debug, Clone, Default)]
60pub enum Value {
61    #[default]
62    Null,
63    Integer(i64),
64    Real(f64),
65    Text(CompactString),
66    Blob(Vec<u8>),
67    Boolean(bool),
68}
69
70impl Value {
71    pub fn data_type(&self) -> DataType {
72        match self {
73            Value::Null => DataType::Null,
74            Value::Integer(_) => DataType::Integer,
75            Value::Real(_) => DataType::Real,
76            Value::Text(_) => DataType::Text,
77            Value::Blob(_) => DataType::Blob,
78            Value::Boolean(_) => DataType::Boolean,
79        }
80    }
81
82    pub fn is_null(&self) -> bool {
83        matches!(self, Value::Null)
84    }
85
86    /// Attempt to coerce this value to the target type.
87    pub fn coerce_to(&self, target: DataType) -> Option<Value> {
88        match (self, target) {
89            (_, DataType::Null) => Some(Value::Null),
90            (Value::Null, _) => Some(Value::Null),
91            (Value::Integer(i), DataType::Integer) => Some(Value::Integer(*i)),
92            (Value::Integer(i), DataType::Real) => Some(Value::Real(*i as f64)),
93            (Value::Real(r), DataType::Real) => Some(Value::Real(*r)),
94            (Value::Real(r), DataType::Integer) => Some(Value::Integer(*r as i64)),
95            (Value::Text(s), DataType::Text) => Some(Value::Text(s.clone())),
96            (Value::Blob(b), DataType::Blob) => Some(Value::Blob(b.clone())),
97            (Value::Boolean(b), DataType::Boolean) => Some(Value::Boolean(*b)),
98            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if *b { 1 } else { 0 })),
99            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(*i != 0)),
100            _ => None,
101        }
102    }
103
104    pub fn coerce_into(self, target: DataType) -> Option<Value> {
105        if self.is_null() || target == DataType::Null {
106            return Some(Value::Null);
107        }
108        if self.data_type() == target {
109            return Some(self);
110        }
111        match (self, target) {
112            (Value::Integer(i), DataType::Real) => Some(Value::Real(i as f64)),
113            (Value::Real(r), DataType::Integer) => Some(Value::Integer(r as i64)),
114            (Value::Boolean(b), DataType::Integer) => Some(Value::Integer(if b { 1 } else { 0 })),
115            (Value::Integer(i), DataType::Boolean) => Some(Value::Boolean(i != 0)),
116            _ => None,
117        }
118    }
119
120    /// Numeric ordering for Integer and Real values (promotes to f64 for mixed).
121    fn numeric_cmp(&self, other: &Value) -> Option<Ordering> {
122        match (self, other) {
123            (Value::Integer(a), Value::Integer(b)) => Some(a.cmp(b)),
124            (Value::Real(a), Value::Real(b)) => a.partial_cmp(b),
125            (Value::Integer(a), Value::Real(b)) => (*a as f64).partial_cmp(b),
126            (Value::Real(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)),
127            _ => None,
128        }
129    }
130}
131
132impl PartialEq for Value {
133    fn eq(&self, other: &Self) -> bool {
134        match (self, other) {
135            (Value::Null, Value::Null) => true,
136            (Value::Integer(a), Value::Integer(b)) => a == b,
137            (Value::Real(a), Value::Real(b)) => a == b,
138            (Value::Integer(a), Value::Real(b)) => (*a as f64) == *b,
139            (Value::Real(a), Value::Integer(b)) => *a == (*b as f64),
140            (Value::Text(a), Value::Text(b)) => a == b,
141            (Value::Blob(a), Value::Blob(b)) => a == b,
142            (Value::Boolean(a), Value::Boolean(b)) => a == b,
143            _ => false,
144        }
145    }
146}
147
148impl Eq for Value {}
149
150impl Hash for Value {
151    fn hash<H: Hasher>(&self, state: &mut H) {
152        match self {
153            Value::Null => 0u8.hash(state),
154            Value::Integer(i) => {
155                // Hash via f64 bits so Integer(n) and Real(n.0) produce the same hash,
156                // matching the cross-type PartialEq contract.
157                1u8.hash(state);
158                (*i as f64).to_bits().hash(state);
159            }
160            Value::Real(r) => {
161                1u8.hash(state);
162                r.to_bits().hash(state);
163            }
164            Value::Text(s) => {
165                2u8.hash(state);
166                s.hash(state);
167            }
168            Value::Blob(b) => {
169                3u8.hash(state);
170                b.hash(state);
171            }
172            Value::Boolean(b) => {
173                4u8.hash(state);
174                b.hash(state);
175            }
176        }
177    }
178}
179
180impl PartialOrd for Value {
181    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
182        Some(self.cmp(other))
183    }
184}
185
186impl Ord for Value {
187    fn cmp(&self, other: &Self) -> Ordering {
188        // NULL < BOOLEAN < INTEGER/REAL (numeric) < TEXT < BLOB
189        match (self, other) {
190            (Value::Null, Value::Null) => Ordering::Equal,
191            (Value::Null, _) => Ordering::Less,
192            (_, Value::Null) => Ordering::Greater,
193
194            (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
195            (Value::Boolean(_), _) => Ordering::Less,
196            (_, Value::Boolean(_)) => Ordering::Greater,
197
198            // Numeric: Integer and Real are comparable
199            (Value::Integer(_) | Value::Real(_), Value::Integer(_) | Value::Real(_)) => {
200                self.numeric_cmp(other).unwrap_or(Ordering::Equal)
201            }
202            (Value::Integer(_) | Value::Real(_), _) => Ordering::Less,
203            (_, Value::Integer(_) | Value::Real(_)) => Ordering::Greater,
204
205            (Value::Text(a), Value::Text(b)) => a.cmp(b),
206            (Value::Text(_), _) => Ordering::Less,
207            (_, Value::Text(_)) => Ordering::Greater,
208
209            (Value::Blob(a), Value::Blob(b)) => a.cmp(b),
210        }
211    }
212}
213
214impl fmt::Display for Value {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        match self {
217            Value::Null => write!(f, "NULL"),
218            Value::Integer(i) => write!(f, "{i}"),
219            Value::Real(r) => {
220                if r.fract() == 0.0 && r.is_finite() {
221                    write!(f, "{r:.1}")
222                } else {
223                    write!(f, "{r}")
224                }
225            }
226            Value::Text(s) => write!(f, "{s}"),
227            Value::Blob(b) => write!(f, "X'{}'", hex_encode(b)),
228            Value::Boolean(b) => write!(f, "{}", if *b { "TRUE" } else { "FALSE" }),
229        }
230    }
231}
232
233fn hex_encode(data: &[u8]) -> String {
234    let mut s = String::with_capacity(data.len() * 2);
235    for byte in data {
236        s.push_str(&format!("{byte:02X}"));
237    }
238    s
239}
240
241/// Column definition.
242#[derive(Debug, Clone)]
243pub struct ColumnDef {
244    pub name: String,
245    pub data_type: DataType,
246    pub nullable: bool,
247    pub position: u16,
248    pub default_expr: Option<Expr>,
249    pub default_sql: Option<String>,
250    pub check_expr: Option<Expr>,
251    pub check_sql: Option<String>,
252    pub check_name: Option<String>,
253}
254
255/// Index definition stored as part of the table schema.
256#[derive(Debug, Clone)]
257pub struct IndexDef {
258    pub name: String,
259    pub columns: Vec<u16>,
260    pub unique: bool,
261}
262
263/// View definition stored in the _views metadata table.
264#[derive(Debug, Clone)]
265pub struct ViewDef {
266    pub name: String,
267    pub sql: String,
268    pub column_aliases: Vec<String>,
269}
270
271const VIEW_DEF_VERSION: u8 = 1;
272
273impl ViewDef {
274    pub fn serialize(&self) -> Vec<u8> {
275        let mut buf = Vec::new();
276        buf.push(VIEW_DEF_VERSION);
277
278        let name_bytes = self.name.as_bytes();
279        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
280        buf.extend_from_slice(name_bytes);
281
282        let sql_bytes = self.sql.as_bytes();
283        buf.extend_from_slice(&(sql_bytes.len() as u32).to_le_bytes());
284        buf.extend_from_slice(sql_bytes);
285
286        buf.extend_from_slice(&(self.column_aliases.len() as u16).to_le_bytes());
287        for alias in &self.column_aliases {
288            let alias_bytes = alias.as_bytes();
289            buf.extend_from_slice(&(alias_bytes.len() as u16).to_le_bytes());
290            buf.extend_from_slice(alias_bytes);
291        }
292
293        buf
294    }
295
296    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
297        if data.is_empty() || data[0] != VIEW_DEF_VERSION {
298            return Err(crate::error::SqlError::InvalidValue(
299                "invalid view definition version".into(),
300            ));
301        }
302        let mut pos = 1;
303
304        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
305        pos += 2;
306        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
307        pos += name_len;
308
309        let sql_len =
310            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
311        pos += 4;
312        let sql = String::from_utf8_lossy(&data[pos..pos + sql_len]).into_owned();
313        pos += sql_len;
314
315        let alias_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
316        pos += 2;
317        let mut column_aliases = Vec::with_capacity(alias_count);
318        for _ in 0..alias_count {
319            let alias_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
320            pos += 2;
321            let alias = String::from_utf8_lossy(&data[pos..pos + alias_len]).into_owned();
322            pos += alias_len;
323            column_aliases.push(alias);
324        }
325
326        Ok(Self {
327            name,
328            sql,
329            column_aliases,
330        })
331    }
332}
333
334/// Table-level CHECK constraint stored in schema.
335#[derive(Debug, Clone)]
336pub struct TableCheckDef {
337    pub name: Option<String>,
338    pub expr: Expr,
339    pub sql: String,
340}
341
342/// Foreign key definition stored in schema.
343#[derive(Debug, Clone)]
344pub struct ForeignKeySchemaEntry {
345    pub name: Option<String>,
346    pub columns: Vec<u16>,
347    pub foreign_table: String,
348    pub referred_columns: Vec<String>,
349}
350
351/// Table schema stored in the _schema table.
352#[derive(Debug, Clone)]
353pub struct TableSchema {
354    pub name: String,
355    pub columns: Vec<ColumnDef>,
356    pub primary_key_columns: Vec<u16>,
357    pub indices: Vec<IndexDef>,
358    pub check_constraints: Vec<TableCheckDef>,
359    pub foreign_keys: Vec<ForeignKeySchemaEntry>,
360    pk_idx_cache: Vec<usize>,
361    non_pk_idx_cache: Vec<usize>,
362    /// Physical encoding slots that have been dropped (O(1) DROP COLUMN).
363    /// Sorted. Old rows still have data at these positions (skipped on decode);
364    /// new rows encode NULL there to maintain position consistency.
365    dropped_non_pk_slots: Vec<u16>,
366    /// Physical encoding position -> logical column index.
367    /// `usize::MAX` for dropped slots.
368    decode_mapping_cache: Vec<usize>,
369    /// Logical non-PK order -> physical encoding position.
370    /// `encoding_positions_cache[i]` is the physical slot for `non_pk_idx_cache[i]`.
371    encoding_positions_cache: Vec<u16>,
372}
373
374impl TableSchema {
375    pub fn new(
376        name: String,
377        columns: Vec<ColumnDef>,
378        primary_key_columns: Vec<u16>,
379        indices: Vec<IndexDef>,
380        check_constraints: Vec<TableCheckDef>,
381        foreign_keys: Vec<ForeignKeySchemaEntry>,
382    ) -> Self {
383        Self::with_drops(
384            name,
385            columns,
386            primary_key_columns,
387            indices,
388            check_constraints,
389            foreign_keys,
390            vec![],
391        )
392    }
393
394    pub fn with_drops(
395        name: String,
396        columns: Vec<ColumnDef>,
397        primary_key_columns: Vec<u16>,
398        indices: Vec<IndexDef>,
399        check_constraints: Vec<TableCheckDef>,
400        foreign_keys: Vec<ForeignKeySchemaEntry>,
401        dropped_non_pk_slots: Vec<u16>,
402    ) -> Self {
403        let pk_idx_cache: Vec<usize> = primary_key_columns.iter().map(|&i| i as usize).collect();
404        let non_pk_idx_cache: Vec<usize> = (0..columns.len())
405            .filter(|i| !primary_key_columns.contains(&(*i as u16)))
406            .collect();
407
408        let physical_count = non_pk_idx_cache.len() + dropped_non_pk_slots.len();
409        let mut decode_mapping_cache = vec![usize::MAX; physical_count];
410        let mut encoding_positions_cache = Vec::with_capacity(non_pk_idx_cache.len());
411
412        let mut drop_idx = 0;
413        let mut live_idx = 0;
414        for (phys_pos, slot) in decode_mapping_cache.iter_mut().enumerate() {
415            if drop_idx < dropped_non_pk_slots.len()
416                && dropped_non_pk_slots[drop_idx] as usize == phys_pos
417            {
418                drop_idx += 1;
419            } else {
420                *slot = non_pk_idx_cache[live_idx];
421                encoding_positions_cache.push(phys_pos as u16);
422                live_idx += 1;
423            }
424        }
425
426        Self {
427            name,
428            columns,
429            primary_key_columns,
430            indices,
431            check_constraints,
432            foreign_keys,
433            pk_idx_cache,
434            non_pk_idx_cache,
435            dropped_non_pk_slots,
436            decode_mapping_cache,
437            encoding_positions_cache,
438        }
439    }
440
441    /// Rebuild caches (preserving dropped slots). Use after mutating fields in place.
442    pub fn rebuild(self) -> Self {
443        let drops = self.dropped_non_pk_slots;
444        Self::with_drops(
445            self.name,
446            self.columns,
447            self.primary_key_columns,
448            self.indices,
449            self.check_constraints,
450            self.foreign_keys,
451            drops,
452        )
453    }
454
455    /// Returns true if any column-level or table-level CHECK constraints exist.
456    pub fn has_checks(&self) -> bool {
457        !self.check_constraints.is_empty() || self.columns.iter().any(|c| c.check_expr.is_some())
458    }
459
460    /// Physical encoding position -> logical column index mapping.
461    /// Length = physical_non_pk_count. `usize::MAX` for dropped slots.
462    pub fn decode_col_mapping(&self) -> &[usize] {
463        &self.decode_mapping_cache
464    }
465
466    /// Logical non-PK order -> physical encoding position.
467    /// `encoding_positions()[i]` is the physical slot for `non_pk_indices()[i]`.
468    pub fn encoding_positions(&self) -> &[u16] {
469        &self.encoding_positions_cache
470    }
471
472    /// Total physical non-PK column count (live + dropped slots).
473    pub fn physical_non_pk_count(&self) -> usize {
474        self.non_pk_idx_cache.len() + self.dropped_non_pk_slots.len()
475    }
476
477    /// Physical encoding slots that have been dropped via O(1) DROP COLUMN.
478    pub fn dropped_non_pk_slots(&self) -> &[u16] {
479        &self.dropped_non_pk_slots
480    }
481
482    /// Create a new schema with the column at `drop_pos` removed.
483    /// O(1): marks the physical encoding slot as dropped instead of rewriting rows.
484    /// Decrements all logical position references > drop_pos. Filters out
485    /// table-level CHECK constraints referencing the dropped column.
486    pub fn without_column(&self, drop_pos: usize) -> Self {
487        // Find physical encoding slot for the dropped column
488        let non_pk_order = self
489            .non_pk_idx_cache
490            .iter()
491            .position(|&i| i == drop_pos)
492            .expect("cannot drop PK column via without_column");
493        let physical_slot = self.encoding_positions_cache[non_pk_order];
494
495        let mut new_dropped = self.dropped_non_pk_slots.clone();
496        new_dropped.push(physical_slot);
497        new_dropped.sort();
498
499        let dropped_name = &self.columns[drop_pos].name;
500        let drop_pos_u16 = drop_pos as u16;
501
502        let mut columns: Vec<ColumnDef> = self
503            .columns
504            .iter()
505            .enumerate()
506            .filter(|(i, _)| *i != drop_pos)
507            .map(|(_, c)| {
508                let mut col = c.clone();
509                if col.position > drop_pos_u16 {
510                    col.position -= 1;
511                }
512                col
513            })
514            .collect();
515        for (i, col) in columns.iter_mut().enumerate() {
516            col.position = i as u16;
517        }
518
519        let primary_key_columns: Vec<u16> = self
520            .primary_key_columns
521            .iter()
522            .map(|&p| if p > drop_pos_u16 { p - 1 } else { p })
523            .collect();
524
525        let indices: Vec<IndexDef> = self
526            .indices
527            .iter()
528            .map(|idx| IndexDef {
529                name: idx.name.clone(),
530                columns: idx
531                    .columns
532                    .iter()
533                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
534                    .collect(),
535                unique: idx.unique,
536            })
537            .collect();
538
539        let foreign_keys: Vec<ForeignKeySchemaEntry> = self
540            .foreign_keys
541            .iter()
542            .map(|fk| ForeignKeySchemaEntry {
543                name: fk.name.clone(),
544                columns: fk
545                    .columns
546                    .iter()
547                    .map(|&c| if c > drop_pos_u16 { c - 1 } else { c })
548                    .collect(),
549                foreign_table: fk.foreign_table.clone(),
550                referred_columns: fk.referred_columns.clone(),
551            })
552            .collect();
553
554        // Filter out table-level CHECKs that reference the dropped column
555        let dropped_lower = dropped_name.to_ascii_lowercase();
556        let check_constraints: Vec<TableCheckDef> = self
557            .check_constraints
558            .iter()
559            .filter(|c| !c.sql.to_ascii_lowercase().contains(&dropped_lower))
560            .cloned()
561            .collect();
562
563        Self::with_drops(
564            self.name.clone(),
565            columns,
566            primary_key_columns,
567            indices,
568            check_constraints,
569            foreign_keys,
570            new_dropped,
571        )
572    }
573}
574
575const SCHEMA_VERSION: u8 = 4;
576
577fn write_opt_string(buf: &mut Vec<u8>, s: &Option<String>) {
578    match s {
579        Some(s) => {
580            let bytes = s.as_bytes();
581            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
582            buf.extend_from_slice(bytes);
583        }
584        None => buf.extend_from_slice(&0u16.to_le_bytes()),
585    }
586}
587
588fn read_opt_string(data: &[u8], pos: &mut usize) -> Option<String> {
589    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
590    *pos += 2;
591    if len == 0 {
592        None
593    } else {
594        let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
595        *pos += len;
596        Some(s)
597    }
598}
599
600fn read_string(data: &[u8], pos: &mut usize) -> String {
601    let len = u16::from_le_bytes([data[*pos], data[*pos + 1]]) as usize;
602    *pos += 2;
603    let s = String::from_utf8_lossy(&data[*pos..*pos + len]).into_owned();
604    *pos += len;
605    s
606}
607
608impl TableSchema {
609    pub fn serialize(&self) -> Vec<u8> {
610        let mut buf = Vec::new();
611        buf.push(SCHEMA_VERSION);
612
613        // Table name
614        let name_bytes = self.name.as_bytes();
615        buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
616        buf.extend_from_slice(name_bytes);
617
618        // Column count
619        buf.extend_from_slice(&(self.columns.len() as u16).to_le_bytes());
620
621        // Columns (v1/v2 core fields)
622        for col in &self.columns {
623            let col_name = col.name.as_bytes();
624            buf.extend_from_slice(&(col_name.len() as u16).to_le_bytes());
625            buf.extend_from_slice(col_name);
626            buf.push(col.data_type.type_tag());
627            buf.push(if col.nullable { 1 } else { 0 });
628            buf.extend_from_slice(&col.position.to_le_bytes());
629        }
630
631        // Primary key columns
632        buf.extend_from_slice(&(self.primary_key_columns.len() as u16).to_le_bytes());
633        for &pk_idx in &self.primary_key_columns {
634            buf.extend_from_slice(&pk_idx.to_le_bytes());
635        }
636
637        // Indices (v2+)
638        buf.extend_from_slice(&(self.indices.len() as u16).to_le_bytes());
639        for idx in &self.indices {
640            let idx_name = idx.name.as_bytes();
641            buf.extend_from_slice(&(idx_name.len() as u16).to_le_bytes());
642            buf.extend_from_slice(idx_name);
643            buf.extend_from_slice(&(idx.columns.len() as u16).to_le_bytes());
644            for &col_idx in &idx.columns {
645                buf.extend_from_slice(&col_idx.to_le_bytes());
646            }
647            buf.push(if idx.unique { 1 } else { 0 });
648        }
649
650        // ── v3: per-column defaults and checks ──
651        for col in &self.columns {
652            let mut flags: u8 = 0;
653            if col.default_sql.is_some() {
654                flags |= 1;
655            }
656            if col.check_sql.is_some() {
657                flags |= 2;
658            }
659            buf.push(flags);
660            if let Some(ref sql) = col.default_sql {
661                let bytes = sql.as_bytes();
662                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
663                buf.extend_from_slice(bytes);
664            }
665            if let Some(ref sql) = col.check_sql {
666                let bytes = sql.as_bytes();
667                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
668                buf.extend_from_slice(bytes);
669                write_opt_string(&mut buf, &col.check_name);
670            }
671        }
672
673        // ── v3: table-level check constraints ──
674        buf.extend_from_slice(&(self.check_constraints.len() as u16).to_le_bytes());
675        for chk in &self.check_constraints {
676            write_opt_string(&mut buf, &chk.name);
677            let sql_bytes = chk.sql.as_bytes();
678            buf.extend_from_slice(&(sql_bytes.len() as u16).to_le_bytes());
679            buf.extend_from_slice(sql_bytes);
680        }
681
682        // ── v3: foreign keys ──
683        buf.extend_from_slice(&(self.foreign_keys.len() as u16).to_le_bytes());
684        for fk in &self.foreign_keys {
685            write_opt_string(&mut buf, &fk.name);
686            buf.extend_from_slice(&(fk.columns.len() as u16).to_le_bytes());
687            for &col_idx in &fk.columns {
688                buf.extend_from_slice(&col_idx.to_le_bytes());
689            }
690            let ft_bytes = fk.foreign_table.as_bytes();
691            buf.extend_from_slice(&(ft_bytes.len() as u16).to_le_bytes());
692            buf.extend_from_slice(ft_bytes);
693            buf.extend_from_slice(&(fk.referred_columns.len() as u16).to_le_bytes());
694            for rc in &fk.referred_columns {
695                let rc_bytes = rc.as_bytes();
696                buf.extend_from_slice(&(rc_bytes.len() as u16).to_le_bytes());
697                buf.extend_from_slice(rc_bytes);
698            }
699        }
700
701        // v4: dropped non-PK encoding slots
702        buf.extend_from_slice(&(self.dropped_non_pk_slots.len() as u16).to_le_bytes());
703        for &slot in &self.dropped_non_pk_slots {
704            buf.extend_from_slice(&slot.to_le_bytes());
705        }
706
707        buf
708    }
709
710    pub fn deserialize(data: &[u8]) -> crate::error::Result<Self> {
711        let mut pos = 0;
712
713        if data.is_empty() || !matches!(data[0], 1 | 2 | 3 | SCHEMA_VERSION) {
714            return Err(crate::error::SqlError::InvalidValue(
715                "invalid schema version".into(),
716            ));
717        }
718        let version = data[0];
719        pos += 1;
720
721        // Table name
722        let name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
723        pos += 2;
724        let name = String::from_utf8_lossy(&data[pos..pos + name_len]).into_owned();
725        pos += name_len;
726
727        // Column count
728        let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
729        pos += 2;
730
731        let mut columns = Vec::with_capacity(col_count);
732        for _ in 0..col_count {
733            let col_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
734            pos += 2;
735            let col_name = String::from_utf8_lossy(&data[pos..pos + col_name_len]).into_owned();
736            pos += col_name_len;
737            let data_type = DataType::from_tag(data[pos]).ok_or_else(|| {
738                crate::error::SqlError::InvalidValue("unknown data type tag".into())
739            })?;
740            pos += 1;
741            let nullable = data[pos] != 0;
742            pos += 1;
743            let position = u16::from_le_bytes([data[pos], data[pos + 1]]);
744            pos += 2;
745            columns.push(ColumnDef {
746                name: col_name,
747                data_type,
748                nullable,
749                position,
750                default_expr: None,
751                default_sql: None,
752                check_expr: None,
753                check_sql: None,
754                check_name: None,
755            });
756        }
757
758        // Primary key columns
759        let pk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
760        pos += 2;
761        let mut primary_key_columns = Vec::with_capacity(pk_count);
762        for _ in 0..pk_count {
763            let pk_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
764            pos += 2;
765            primary_key_columns.push(pk_idx);
766        }
767
768        // Indices (v2+)
769        let indices = if version >= 2 && pos + 2 <= data.len() {
770            let idx_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
771            pos += 2;
772            let mut idxs = Vec::with_capacity(idx_count);
773            for _ in 0..idx_count {
774                let idx_name_len = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
775                pos += 2;
776                let idx_name = String::from_utf8_lossy(&data[pos..pos + idx_name_len]).into_owned();
777                pos += idx_name_len;
778                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
779                pos += 2;
780                let mut cols = Vec::with_capacity(col_count);
781                for _ in 0..col_count {
782                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
783                    pos += 2;
784                    cols.push(col_idx);
785                }
786                let unique = data[pos] != 0;
787                pos += 1;
788                idxs.push(IndexDef {
789                    name: idx_name,
790                    columns: cols,
791                    unique,
792                });
793            }
794            idxs
795        } else {
796            vec![]
797        };
798
799        // v3: per-column defaults and checks
800        let mut check_constraints = Vec::new();
801        let mut foreign_keys = Vec::new();
802
803        if version >= 3 && pos < data.len() {
804            for col in &mut columns {
805                let flags = data[pos];
806                pos += 1;
807                if flags & 1 != 0 {
808                    let sql = read_string(data, &mut pos);
809                    col.default_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
810                        crate::error::SqlError::InvalidValue(format!(
811                            "cannot parse DEFAULT expression: {sql}"
812                        ))
813                    })?);
814                    col.default_sql = Some(sql);
815                }
816                if flags & 2 != 0 {
817                    let sql = read_string(data, &mut pos);
818                    col.check_expr = Some(crate::parser::parse_sql_expr(&sql).map_err(|_| {
819                        crate::error::SqlError::InvalidValue(format!(
820                            "cannot parse CHECK expression: {sql}"
821                        ))
822                    })?);
823                    col.check_sql = Some(sql);
824                    col.check_name = read_opt_string(data, &mut pos);
825                }
826            }
827
828            // Table-level check constraints
829            let chk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
830            pos += 2;
831            for _ in 0..chk_count {
832                let name = read_opt_string(data, &mut pos);
833                let sql = read_string(data, &mut pos);
834                let expr = crate::parser::parse_sql_expr(&sql).map_err(|_| {
835                    crate::error::SqlError::InvalidValue(format!(
836                        "cannot parse CHECK expression: {sql}"
837                    ))
838                })?;
839                check_constraints.push(TableCheckDef { name, expr, sql });
840            }
841
842            // Foreign keys
843            let fk_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
844            pos += 2;
845            for _ in 0..fk_count {
846                let name = read_opt_string(data, &mut pos);
847                let col_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
848                pos += 2;
849                let mut cols = Vec::with_capacity(col_count);
850                for _ in 0..col_count {
851                    let col_idx = u16::from_le_bytes([data[pos], data[pos + 1]]);
852                    pos += 2;
853                    cols.push(col_idx);
854                }
855                let foreign_table = read_string(data, &mut pos);
856                let ref_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
857                pos += 2;
858                let mut referred_columns = Vec::with_capacity(ref_count);
859                for _ in 0..ref_count {
860                    referred_columns.push(read_string(data, &mut pos));
861                }
862                foreign_keys.push(ForeignKeySchemaEntry {
863                    name,
864                    columns: cols,
865                    foreign_table,
866                    referred_columns,
867                });
868            }
869        }
870        // v4: dropped non-PK encoding slots
871        let mut dropped_non_pk_slots = Vec::new();
872        if version >= 4 && pos + 2 <= data.len() {
873            let slot_count = u16::from_le_bytes([data[pos], data[pos + 1]]) as usize;
874            pos += 2;
875            for _ in 0..slot_count {
876                let slot = u16::from_le_bytes([data[pos], data[pos + 1]]);
877                pos += 2;
878                dropped_non_pk_slots.push(slot);
879            }
880        }
881        let _ = pos;
882
883        Ok(Self::with_drops(
884            name,
885            columns,
886            primary_key_columns,
887            indices,
888            check_constraints,
889            foreign_keys,
890            dropped_non_pk_slots,
891        ))
892    }
893
894    /// Get column index by name (case-insensitive).
895    pub fn column_index(&self, name: &str) -> Option<usize> {
896        self.columns
897            .iter()
898            .position(|c| c.name.eq_ignore_ascii_case(name))
899    }
900
901    /// Get indices of non-PK columns (columns stored in the B+ tree value).
902    pub fn non_pk_indices(&self) -> &[usize] {
903        &self.non_pk_idx_cache
904    }
905
906    /// Get the PK column indices as usize.
907    pub fn pk_indices(&self) -> &[usize] {
908        &self.pk_idx_cache
909    }
910
911    /// Get index definition by name (case-insensitive).
912    pub fn index_by_name(&self, name: &str) -> Option<&IndexDef> {
913        let lower = name.to_ascii_lowercase();
914        self.indices.iter().find(|i| i.name == lower)
915    }
916
917    /// Get the KV table name for an index.
918    pub fn index_table_name(table_name: &str, index_name: &str) -> Vec<u8> {
919        format!("__idx_{table_name}_{index_name}").into_bytes()
920    }
921}
922
923/// Result of executing a SQL statement.
924#[derive(Debug)]
925pub enum ExecutionResult {
926    RowsAffected(u64),
927    Query(QueryResult),
928    Ok,
929}
930
931/// Result of a SELECT query.
932#[derive(Debug, Clone)]
933pub struct QueryResult {
934    pub columns: Vec<String>,
935    pub rows: Vec<Vec<Value>>,
936}
937
938#[cfg(test)]
939mod tests {
940    use super::*;
941
942    #[test]
943    fn value_ordering() {
944        assert!(Value::Null < Value::Boolean(false));
945        assert!(Value::Boolean(false) < Value::Boolean(true));
946        assert!(Value::Boolean(true) < Value::Integer(0));
947        assert!(Value::Integer(-1) < Value::Integer(0));
948        assert!(Value::Integer(0) < Value::Real(0.5));
949        assert!(Value::Real(1.0) < Value::Text("".into()));
950        assert!(Value::Text("a".into()) < Value::Text("b".into()));
951        assert!(Value::Text("z".into()) < Value::Blob(vec![]));
952        assert!(Value::Blob(vec![0]) < Value::Blob(vec![1]));
953    }
954
955    #[test]
956    fn value_numeric_mixed() {
957        assert_eq!(Value::Integer(1), Value::Real(1.0));
958        assert!(Value::Integer(1) < Value::Real(1.5));
959        assert!(Value::Real(0.5) < Value::Integer(1));
960    }
961
962    #[test]
963    fn value_display() {
964        assert_eq!(format!("{}", Value::Null), "NULL");
965        assert_eq!(format!("{}", Value::Integer(42)), "42");
966        assert_eq!(format!("{}", Value::Real(3.15)), "3.15");
967        assert_eq!(format!("{}", Value::Real(1.0)), "1.0");
968        assert_eq!(format!("{}", Value::Text("hello".into())), "hello");
969        assert_eq!(format!("{}", Value::Blob(vec![0xDE, 0xAD])), "X'DEAD'");
970        assert_eq!(format!("{}", Value::Boolean(true)), "TRUE");
971        assert_eq!(format!("{}", Value::Boolean(false)), "FALSE");
972    }
973
974    #[test]
975    fn value_coerce() {
976        assert_eq!(
977            Value::Integer(42).coerce_to(DataType::Real),
978            Some(Value::Real(42.0))
979        );
980        assert_eq!(
981            Value::Boolean(true).coerce_to(DataType::Integer),
982            Some(Value::Integer(1))
983        );
984        assert_eq!(Value::Null.coerce_to(DataType::Integer), Some(Value::Null));
985        assert_eq!(Value::Text("x".into()).coerce_to(DataType::Integer), None);
986    }
987
988    fn col(name: &str, dt: DataType, nullable: bool, pos: u16) -> ColumnDef {
989        ColumnDef {
990            name: name.into(),
991            data_type: dt,
992            nullable,
993            position: pos,
994            default_expr: None,
995            default_sql: None,
996            check_expr: None,
997            check_sql: None,
998            check_name: None,
999        }
1000    }
1001
1002    #[test]
1003    fn schema_roundtrip() {
1004        let schema = TableSchema::new(
1005            "users".into(),
1006            vec![
1007                col("id", DataType::Integer, false, 0),
1008                col("name", DataType::Text, true, 1),
1009                col("active", DataType::Boolean, false, 2),
1010            ],
1011            vec![0],
1012            vec![],
1013            vec![],
1014            vec![],
1015        );
1016
1017        let data = schema.serialize();
1018        let restored = TableSchema::deserialize(&data).unwrap();
1019
1020        assert_eq!(restored.name, "users");
1021        assert_eq!(restored.columns.len(), 3);
1022        assert_eq!(restored.columns[0].name, "id");
1023        assert_eq!(restored.columns[0].data_type, DataType::Integer);
1024        assert!(!restored.columns[0].nullable);
1025        assert_eq!(restored.columns[1].name, "name");
1026        assert_eq!(restored.columns[1].data_type, DataType::Text);
1027        assert!(restored.columns[1].nullable);
1028        assert_eq!(restored.columns[2].name, "active");
1029        assert_eq!(restored.columns[2].data_type, DataType::Boolean);
1030        assert_eq!(restored.primary_key_columns, vec![0]);
1031    }
1032
1033    #[test]
1034    fn schema_roundtrip_with_indices() {
1035        let schema = TableSchema::new(
1036            "orders".into(),
1037            vec![
1038                col("id", DataType::Integer, false, 0),
1039                col("customer", DataType::Text, false, 1),
1040                col("amount", DataType::Real, true, 2),
1041            ],
1042            vec![0],
1043            vec![
1044                IndexDef {
1045                    name: "idx_customer".into(),
1046                    columns: vec![1],
1047                    unique: false,
1048                },
1049                IndexDef {
1050                    name: "idx_amount_uniq".into(),
1051                    columns: vec![2],
1052                    unique: true,
1053                },
1054            ],
1055            vec![],
1056            vec![],
1057        );
1058
1059        let data = schema.serialize();
1060        let restored = TableSchema::deserialize(&data).unwrap();
1061
1062        assert_eq!(restored.indices.len(), 2);
1063        assert_eq!(restored.indices[0].name, "idx_customer");
1064        assert_eq!(restored.indices[0].columns, vec![1]);
1065        assert!(!restored.indices[0].unique);
1066        assert_eq!(restored.indices[1].name, "idx_amount_uniq");
1067        assert_eq!(restored.indices[1].columns, vec![2]);
1068        assert!(restored.indices[1].unique);
1069    }
1070
1071    #[test]
1072    fn schema_v1_backward_compat() {
1073        let old_schema = TableSchema::new(
1074            "test".into(),
1075            vec![col("id", DataType::Integer, false, 0)],
1076            vec![0],
1077            vec![],
1078            vec![],
1079            vec![],
1080        );
1081        let mut data = old_schema.serialize();
1082        // Patch to v1 format: replace version byte and truncate everything after PK
1083        data[0] = 1;
1084        // v1 has no indices or v3 data - truncate after PK columns
1085        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col("id": name_len(2)+"id"(2)+type(1)+nullable(1)+position(2)) + pk_count(2) + pk(2)
1086        let v1_len = 1 + 2 + 4 + 2 + (2 + 2 + 1 + 1 + 2) + 2 + 2;
1087        data.truncate(v1_len);
1088
1089        let restored = TableSchema::deserialize(&data).unwrap();
1090        assert_eq!(restored.name, "test");
1091        assert!(restored.indices.is_empty());
1092        assert!(restored.check_constraints.is_empty());
1093        assert!(restored.foreign_keys.is_empty());
1094    }
1095
1096    #[test]
1097    fn schema_v2_backward_compat() {
1098        let schema = TableSchema::new(
1099            "test".into(),
1100            vec![col("id", DataType::Integer, false, 0)],
1101            vec![0],
1102            vec![],
1103            vec![],
1104            vec![],
1105        );
1106        let mut data = schema.serialize();
1107        // Patch version to 2 and truncate v3 data
1108        data[0] = 2;
1109        // v2 ends after indices section: find the v3 start and truncate
1110        // Header(1) + name_len(2) + "test"(4) + col_count(2) + col(8) + pk_count(2) + pk(2) + idx_count(2)
1111        let v2_len = 1 + 2 + 4 + 2 + 8 + 2 + 2 + 2;
1112        data.truncate(v2_len);
1113
1114        let restored = TableSchema::deserialize(&data).unwrap();
1115        assert_eq!(restored.name, "test");
1116        assert!(restored.check_constraints.is_empty());
1117        assert!(restored.foreign_keys.is_empty());
1118        assert!(restored.columns[0].default_expr.is_none());
1119        assert!(restored.columns[0].check_expr.is_none());
1120    }
1121
1122    #[test]
1123    fn schema_roundtrip_with_defaults_and_checks() {
1124        use crate::parser::parse_sql_expr;
1125
1126        let mut columns = vec![
1127            col("id", DataType::Integer, false, 0),
1128            col("val", DataType::Integer, true, 1),
1129            col("name", DataType::Text, true, 2),
1130        ];
1131        columns[1].default_sql = Some("42".into());
1132        columns[1].default_expr = Some(parse_sql_expr("42").unwrap());
1133        columns[2].check_sql = Some("LENGTH(name) > 0".into());
1134        columns[2].check_expr = Some(parse_sql_expr("LENGTH(name) > 0").unwrap());
1135        columns[2].check_name = Some("chk_name_len".into());
1136
1137        let schema = TableSchema::new(
1138            "t".into(),
1139            columns,
1140            vec![0],
1141            vec![],
1142            vec![TableCheckDef {
1143                name: Some("chk_val_pos".into()),
1144                expr: parse_sql_expr("val > 0").unwrap(),
1145                sql: "val > 0".into(),
1146            }],
1147            vec![],
1148        );
1149
1150        let data = schema.serialize();
1151        let restored = TableSchema::deserialize(&data).unwrap();
1152
1153        assert_eq!(restored.columns[1].default_sql.as_deref(), Some("42"));
1154        assert!(restored.columns[1].default_expr.is_some());
1155        assert_eq!(
1156            restored.columns[2].check_sql.as_deref(),
1157            Some("LENGTH(name) > 0")
1158        );
1159        assert!(restored.columns[2].check_expr.is_some());
1160        assert_eq!(
1161            restored.columns[2].check_name.as_deref(),
1162            Some("chk_name_len")
1163        );
1164        assert_eq!(restored.check_constraints.len(), 1);
1165        assert_eq!(
1166            restored.check_constraints[0].name.as_deref(),
1167            Some("chk_val_pos")
1168        );
1169        assert_eq!(restored.check_constraints[0].sql, "val > 0");
1170    }
1171
1172    #[test]
1173    fn schema_roundtrip_with_foreign_keys() {
1174        let schema = TableSchema::new(
1175            "orders".into(),
1176            vec![
1177                col("id", DataType::Integer, false, 0),
1178                col("user_id", DataType::Integer, false, 1),
1179            ],
1180            vec![0],
1181            vec![],
1182            vec![],
1183            vec![ForeignKeySchemaEntry {
1184                name: Some("fk_user".into()),
1185                columns: vec![1],
1186                foreign_table: "users".into(),
1187                referred_columns: vec!["id".into()],
1188            }],
1189        );
1190
1191        let data = schema.serialize();
1192        let restored = TableSchema::deserialize(&data).unwrap();
1193
1194        assert_eq!(restored.foreign_keys.len(), 1);
1195        assert_eq!(restored.foreign_keys[0].name.as_deref(), Some("fk_user"));
1196        assert_eq!(restored.foreign_keys[0].columns, vec![1]);
1197        assert_eq!(restored.foreign_keys[0].foreign_table, "users");
1198        assert_eq!(restored.foreign_keys[0].referred_columns, vec!["id"]);
1199    }
1200
1201    #[test]
1202    fn data_type_display() {
1203        assert_eq!(format!("{}", DataType::Integer), "INTEGER");
1204        assert_eq!(format!("{}", DataType::Text), "TEXT");
1205        assert_eq!(format!("{}", DataType::Boolean), "BOOLEAN");
1206    }
1207}