Skip to main content

sqlrite/sql/db/
table.rs

1use crate::error::{Result, SQLRiteError};
2use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
3use crate::sql::parser::create::CreateQuery;
4use std::collections::{BTreeMap, HashMap};
5use std::fmt;
6use std::sync::{Arc, Mutex};
7
8use prettytable::{Cell as PrintCell, Row as PrintRow, Table as PrintTable};
9
10/// SQLRite data types
11/// Mapped after SQLite Data Type Storage Classes and SQLite Affinity Type
12/// (Datatypes In SQLite Version 3)[https://www.sqlite.org/datatype3.html]
13#[derive(PartialEq, Debug, Clone)]
14pub enum DataType {
15    Integer,
16    Text,
17    Real,
18    Bool,
19    None,
20    Invalid,
21}
22
23impl DataType {
24    pub fn new(cmd: String) -> DataType {
25        match cmd.to_lowercase().as_ref() {
26            "integer" => DataType::Integer,
27            "text" => DataType::Text,
28            "real" => DataType::Real,
29            "bool" => DataType::Bool,
30            "none" => DataType::None,
31            _ => {
32                eprintln!("Invalid data type given {}", cmd);
33                DataType::Invalid
34            }
35        }
36    }
37}
38
39impl fmt::Display for DataType {
40    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41        match *self {
42            DataType::Integer => f.write_str("Integer"),
43            DataType::Text => f.write_str("Text"),
44            DataType::Real => f.write_str("Real"),
45            DataType::Bool => f.write_str("Boolean"),
46            DataType::None => f.write_str("None"),
47            DataType::Invalid => f.write_str("Invalid"),
48        }
49    }
50}
51
52/// The schema for each SQL Table is represented in memory by
53/// following structure.
54///
55/// `rows` is `Arc<Mutex<...>>` rather than `Rc<RefCell<...>>` so `Table`
56/// (and by extension `Database`) is `Send + Sync` — the Tauri desktop
57/// app holds the engine in shared state behind a `Mutex<Database>`, and
58/// Tauri's state container requires its contents to be thread-safe.
59#[derive(Debug)]
60pub struct Table {
61    /// Name of the table
62    pub tb_name: String,
63    /// Schema for each column, in declaration order.
64    pub columns: Vec<Column>,
65    /// Per-column row storage, keyed by column name. Every column's
66    /// `Row::T(BTreeMap)` is keyed by rowid, so all columns share the same
67    /// keyset after each write.
68    pub rows: Arc<Mutex<HashMap<String, Row>>>,
69    /// Secondary indexes on this table (Phase 3e). One auto-created entry
70    /// per UNIQUE or PRIMARY KEY column; explicit `CREATE INDEX` statements
71    /// add more. Looking up an index: iterate by column name, or by index
72    /// name via `Table::index_by_name`.
73    pub secondary_indexes: Vec<SecondaryIndex>,
74    /// ROWID of most recent insert.
75    pub last_rowid: i64,
76    /// PRIMARY KEY column name, or "-1" if the table has no PRIMARY KEY.
77    pub primary_key: String,
78}
79
80impl Table {
81    pub fn new(create_query: CreateQuery) -> Self {
82        let table_name = create_query.table_name;
83        let mut primary_key: String = String::from("-1");
84        let columns = create_query.columns;
85
86        let mut table_cols: Vec<Column> = vec![];
87        let table_rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
88        let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
89        for col in &columns {
90            let col_name = &col.name;
91            if col.is_pk {
92                primary_key = col_name.to_string();
93            }
94            table_cols.push(Column::new(
95                col_name.to_string(),
96                col.datatype.to_string(),
97                col.is_pk,
98                col.not_null,
99                col.is_unique,
100            ));
101
102            let dt = DataType::new(col.datatype.to_string());
103            let row_storage = match dt {
104                DataType::Integer => Row::Integer(BTreeMap::new()),
105                DataType::Real => Row::Real(BTreeMap::new()),
106                DataType::Text => Row::Text(BTreeMap::new()),
107                DataType::Bool => Row::Bool(BTreeMap::new()),
108                DataType::Invalid | DataType::None => Row::None,
109            };
110            table_rows
111                .lock()
112                .expect("Table row storage mutex poisoned")
113                .insert(col.name.to_string(), row_storage);
114
115            // Auto-create an index for every UNIQUE / PRIMARY KEY column,
116            // but only for types we know how to index. Real / Bool UNIQUE
117            // columns fall back to the linear scan path in
118            // validate_unique_constraint — same behavior as before 3e.
119            if (col.is_pk || col.is_unique) && matches!(dt, DataType::Integer | DataType::Text) {
120                let name = SecondaryIndex::auto_name(&table_name, &col.name);
121                match SecondaryIndex::new(
122                    name,
123                    table_name.clone(),
124                    col.name.clone(),
125                    &dt,
126                    true,
127                    IndexOrigin::Auto,
128                ) {
129                    Ok(idx) => secondary_indexes.push(idx),
130                    Err(_) => {
131                        // Unreachable given the matches! guard above, but
132                        // the builder returns Result so we keep the arm.
133                    }
134                }
135            }
136        }
137
138        Table {
139            tb_name: table_name,
140            columns: table_cols,
141            rows: table_rows,
142            secondary_indexes,
143            last_rowid: 0,
144            primary_key,
145        }
146    }
147
148    /// Deep-clones a `Table` for transaction snapshots (Phase 4f).
149    ///
150    /// The normal `Clone` derive would shallow-clone the `Arc<Mutex<_>>`
151    /// wrapping our row storage, leaving both copies sharing the same
152    /// inner map — mutating the snapshot would corrupt the live table
153    /// and vice versa. Instead we lock, clone the inner `HashMap`, and
154    /// wrap it in a fresh `Arc<Mutex<_>>`. Columns and indexes derive
155    /// `Clone` directly (all their fields are plain data).
156    pub fn deep_clone(&self) -> Self {
157        let cloned_rows: HashMap<String, Row> = {
158            let guard = self.rows.lock().expect("row mutex poisoned");
159            guard.clone()
160        };
161        Table {
162            tb_name: self.tb_name.clone(),
163            columns: self.columns.clone(),
164            rows: Arc::new(Mutex::new(cloned_rows)),
165            secondary_indexes: self.secondary_indexes.clone(),
166            last_rowid: self.last_rowid,
167            primary_key: self.primary_key.clone(),
168        }
169    }
170
171    /// Finds an auto- or explicit-index entry for a given column. Returns
172    /// `None` if the column isn't indexed.
173    pub fn index_for_column(&self, column: &str) -> Option<&SecondaryIndex> {
174        self.secondary_indexes
175            .iter()
176            .find(|i| i.column_name == column)
177    }
178
179    fn index_for_column_mut(&mut self, column: &str) -> Option<&mut SecondaryIndex> {
180        self.secondary_indexes
181            .iter_mut()
182            .find(|i| i.column_name == column)
183    }
184
185    /// Finds a secondary index by its own name (e.g., `sqlrite_autoindex_users_email`
186    /// or a user-provided CREATE INDEX name). Used by Phase 3e.2 to look up
187    /// explicit indexes when DROP INDEX lands.
188    #[allow(dead_code)]
189    pub fn index_by_name(&self, name: &str) -> Option<&SecondaryIndex> {
190        self.secondary_indexes.iter().find(|i| i.name == name)
191    }
192
193    /// Returns a `bool` informing if a `Column` with a specific name exists or not
194    ///
195    pub fn contains_column(&self, column: String) -> bool {
196        self.columns.iter().any(|col| col.column_name == column)
197    }
198
199    /// Returns the list of column names in declaration order.
200    pub fn column_names(&self) -> Vec<String> {
201        self.columns.iter().map(|c| c.column_name.clone()).collect()
202    }
203
204    /// Returns all rowids currently stored in the table, in ascending order.
205    /// Every column's BTreeMap has the same keyset, so we just read from the first column.
206    pub fn rowids(&self) -> Vec<i64> {
207        let Some(first) = self.columns.first() else {
208            return vec![];
209        };
210        let rows = self.rows.lock().expect("rows mutex poisoned");
211        rows.get(&first.column_name)
212            .map(|r| r.rowids())
213            .unwrap_or_default()
214    }
215
216    /// Reads a single cell at `(column, rowid)`.
217    pub fn get_value(&self, column: &str, rowid: i64) -> Option<Value> {
218        let rows = self.rows.lock().expect("rows mutex poisoned");
219        rows.get(column).and_then(|r| r.get(rowid))
220    }
221
222    /// Removes the row identified by `rowid` from every column's storage and
223    /// from every secondary index entry.
224    pub fn delete_row(&mut self, rowid: i64) {
225        // Snapshot the values we're about to delete so we can strip them
226        // from secondary indexes by (value, rowid) before the row storage
227        // disappears.
228        let per_column_values: Vec<(String, Option<Value>)> = self
229            .columns
230            .iter()
231            .map(|c| (c.column_name.clone(), self.get_value(&c.column_name, rowid)))
232            .collect();
233
234        // Remove from row storage.
235        {
236            let rows_clone = Arc::clone(&self.rows);
237            let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
238            for col in &self.columns {
239                if let Some(r) = row_data.get_mut(&col.column_name) {
240                    match r {
241                        Row::Integer(m) => {
242                            m.remove(&rowid);
243                        }
244                        Row::Text(m) => {
245                            m.remove(&rowid);
246                        }
247                        Row::Real(m) => {
248                            m.remove(&rowid);
249                        }
250                        Row::Bool(m) => {
251                            m.remove(&rowid);
252                        }
253                        Row::None => {}
254                    }
255                }
256            }
257        }
258
259        // Strip secondary-index entries. Non-indexed columns just don't
260        // show up in secondary_indexes and are no-ops here.
261        for (col_name, value) in per_column_values {
262            if let Some(idx) = self.index_for_column_mut(&col_name) {
263                if let Some(v) = value {
264                    idx.remove(&v, rowid);
265                }
266            }
267        }
268    }
269
270    /// Replays a single row at `rowid` when loading a table from disk. Takes
271    /// one typed value per column (in declaration order); `None` means the
272    /// stored cell carried a NULL for that column. Unlike `insert_row` this
273    /// trusts the on-disk state and does *not* re-check UNIQUE — we're
274    /// rebuilding a state that was already consistent when it was saved.
275    pub fn restore_row(&mut self, rowid: i64, values: Vec<Option<Value>>) -> Result<()> {
276        if values.len() != self.columns.len() {
277            return Err(SQLRiteError::Internal(format!(
278                "cell has {} values but table '{}' has {} columns",
279                values.len(),
280                self.tb_name,
281                self.columns.len()
282            )));
283        }
284
285        let column_names: Vec<String> =
286            self.columns.iter().map(|c| c.column_name.clone()).collect();
287
288        for (i, value) in values.into_iter().enumerate() {
289            let col_name = &column_names[i];
290
291            // Write into the per-column row storage first (scoped borrow so
292            // the secondary-index update below doesn't fight over `self`).
293            {
294                let rows_clone = Arc::clone(&self.rows);
295                let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
296                let cell = row_data.get_mut(col_name).ok_or_else(|| {
297                    SQLRiteError::Internal(format!("Row storage missing for column '{col_name}'"))
298                })?;
299
300                match (cell, &value) {
301                    (Row::Integer(map), Some(Value::Integer(v))) => {
302                        map.insert(rowid, *v as i32);
303                    }
304                    (Row::Integer(_), None) => {
305                        return Err(SQLRiteError::Internal(format!(
306                            "Integer column '{col_name}' cannot store NULL — corrupt cell?"
307                        )));
308                    }
309                    (Row::Text(map), Some(Value::Text(s))) => {
310                        map.insert(rowid, s.clone());
311                    }
312                    (Row::Text(map), None) => {
313                        // Matches the on-insert convention: NULL in Text
314                        // storage is represented by the literal "Null"
315                        // sentinel and not added to the index.
316                        map.insert(rowid, "Null".to_string());
317                    }
318                    (Row::Real(map), Some(Value::Real(v))) => {
319                        map.insert(rowid, *v as f32);
320                    }
321                    (Row::Real(_), None) => {
322                        return Err(SQLRiteError::Internal(format!(
323                            "Real column '{col_name}' cannot store NULL — corrupt cell?"
324                        )));
325                    }
326                    (Row::Bool(map), Some(Value::Bool(v))) => {
327                        map.insert(rowid, *v);
328                    }
329                    (Row::Bool(_), None) => {
330                        return Err(SQLRiteError::Internal(format!(
331                            "Bool column '{col_name}' cannot store NULL — corrupt cell?"
332                        )));
333                    }
334                    (row, v) => {
335                        return Err(SQLRiteError::Internal(format!(
336                            "Type mismatch restoring column '{col_name}': storage {row:?} vs value {v:?}"
337                        )));
338                    }
339                }
340            }
341
342            // Maintain the secondary index (if any). NULL values are skipped
343            // by `insert`, matching the "NULL is not indexed" convention.
344            if let Some(v) = &value {
345                if let Some(idx) = self.index_for_column_mut(col_name) {
346                    idx.insert(v, rowid)?;
347                }
348            }
349        }
350
351        if rowid > self.last_rowid {
352            self.last_rowid = rowid;
353        }
354        Ok(())
355    }
356
357    /// Extracts a row as an ordered `Vec<Option<Value>>` matching the column
358    /// declaration order. Returns `None` entries for columns that hold NULL.
359    /// Used by `save_database` to turn a table's in-memory state into cells.
360    pub fn extract_row(&self, rowid: i64) -> Vec<Option<Value>> {
361        self.columns
362            .iter()
363            .map(|c| match self.get_value(&c.column_name, rowid) {
364                Some(Value::Null) => None,
365                Some(v) => Some(v),
366                None => None,
367            })
368            .collect()
369    }
370
371    /// Overwrites the cell at `(column, rowid)` with `new_val`. Enforces the
372    /// column's datatype and UNIQUE constraint, and updates any secondary
373    /// index.
374    ///
375    /// Returns `Err` if the column doesn't exist, the value type is incompatible,
376    /// or writing would violate UNIQUE.
377    pub fn set_value(&mut self, column: &str, rowid: i64, new_val: Value) -> Result<()> {
378        let col_index = self
379            .columns
380            .iter()
381            .position(|c| c.column_name == column)
382            .ok_or_else(|| SQLRiteError::General(format!("Column '{column}' not found")))?;
383
384        // No-op write — keep storage exactly the same.
385        let current = self.get_value(column, rowid);
386        if current.as_ref() == Some(&new_val) {
387            return Ok(());
388        }
389
390        // Enforce UNIQUE. Prefer an O(log N) index probe if we have one;
391        // fall back to a full column scan otherwise (Real/Bool UNIQUE
392        // columns, which don't get auto-indexed).
393        if self.columns[col_index].is_unique && !matches!(new_val, Value::Null) {
394            if let Some(idx) = self.index_for_column(column) {
395                for other in idx.lookup(&new_val) {
396                    if other != rowid {
397                        return Err(SQLRiteError::General(format!(
398                            "UNIQUE constraint violated for column '{column}'"
399                        )));
400                    }
401                }
402            } else {
403                for other in self.rowids() {
404                    if other == rowid {
405                        continue;
406                    }
407                    if self.get_value(column, other).as_ref() == Some(&new_val) {
408                        return Err(SQLRiteError::General(format!(
409                            "UNIQUE constraint violated for column '{column}'"
410                        )));
411                    }
412                }
413            }
414        }
415
416        // Drop the old index entry before writing the new value, so the
417        // post-write index insert doesn't clash with the previous state.
418        if let Some(old) = current {
419            if let Some(idx) = self.index_for_column_mut(column) {
420                idx.remove(&old, rowid);
421            }
422        }
423
424        // Write into the column's Row, type-checking against the declared DataType.
425        let declared = &self.columns[col_index].datatype;
426        {
427            let rows_clone = Arc::clone(&self.rows);
428            let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
429            let cell = row_data.get_mut(column).ok_or_else(|| {
430                SQLRiteError::Internal(format!("Row storage missing for column '{column}'"))
431            })?;
432
433            match (cell, &new_val, declared) {
434                (Row::Integer(m), Value::Integer(v), _) => {
435                    m.insert(rowid, *v as i32);
436                }
437                (Row::Real(m), Value::Real(v), _) => {
438                    m.insert(rowid, *v as f32);
439                }
440                (Row::Real(m), Value::Integer(v), _) => {
441                    m.insert(rowid, *v as f32);
442                }
443                (Row::Text(m), Value::Text(v), _) => {
444                    m.insert(rowid, v.clone());
445                }
446                (Row::Bool(m), Value::Bool(v), _) => {
447                    m.insert(rowid, *v);
448                }
449                // NULL writes: store the sentinel "Null" string for Text; for other
450                // types we leave storage as-is since those BTreeMaps can't hold NULL today.
451                (Row::Text(m), Value::Null, _) => {
452                    m.insert(rowid, "Null".to_string());
453                }
454                (_, new, dt) => {
455                    return Err(SQLRiteError::General(format!(
456                        "Type mismatch: cannot assign {} to column '{column}' of type {dt}",
457                        new.to_display_string()
458                    )));
459                }
460            }
461        }
462
463        // Maintain the secondary index, if any. NULL values are skipped by
464        // insert per convention.
465        if !matches!(new_val, Value::Null) {
466            if let Some(idx) = self.index_for_column_mut(column) {
467                idx.insert(&new_val, rowid)?;
468            }
469        }
470
471        Ok(())
472    }
473
474    /// Returns an immutable reference of `sql::db::table::Column` if the table contains a
475    /// column with the specified key as a column name.
476    ///
477    #[allow(dead_code)]
478    pub fn get_column(&mut self, column_name: String) -> Result<&Column> {
479        if let Some(column) = self
480            .columns
481            .iter()
482            .filter(|c| c.column_name == column_name)
483            .collect::<Vec<&Column>>()
484            .first()
485        {
486            Ok(column)
487        } else {
488            Err(SQLRiteError::General(String::from("Column not found.")))
489        }
490    }
491
492    /// Validates if columns and values being inserted violate the UNIQUE constraint.
493    /// PRIMARY KEY columns are automatically UNIQUE. Uses the corresponding
494    /// secondary index when one exists (O(log N) lookup); falls back to a
495    /// linear scan for indexable-but-not-indexed situations (e.g. a Real
496    /// UNIQUE column — Real isn't in the auto-indexed set).
497    pub fn validate_unique_constraint(
498        &mut self,
499        cols: &Vec<String>,
500        values: &Vec<String>,
501    ) -> Result<()> {
502        for (idx, name) in cols.iter().enumerate() {
503            let column = self
504                .columns
505                .iter()
506                .find(|c| &c.column_name == name)
507                .ok_or_else(|| SQLRiteError::General(format!("Column '{name}' not found")))?;
508            if !column.is_unique {
509                continue;
510            }
511            let datatype = &column.datatype;
512            let val = &values[idx];
513
514            // Parse the string value into a runtime Value according to the
515            // declared column type. If parsing fails the caller's insert
516            // would also fail with the same error; surface it here so we
517            // don't emit a misleading "unique OK" on bad input.
518            let parsed = match datatype {
519                DataType::Integer => val.parse::<i64>().map(Value::Integer).map_err(|_| {
520                    SQLRiteError::General(format!(
521                        "Type mismatch: expected INTEGER for column '{name}', got '{val}'"
522                    ))
523                })?,
524                DataType::Text => Value::Text(val.clone()),
525                DataType::Real => val.parse::<f64>().map(Value::Real).map_err(|_| {
526                    SQLRiteError::General(format!(
527                        "Type mismatch: expected REAL for column '{name}', got '{val}'"
528                    ))
529                })?,
530                DataType::Bool => val.parse::<bool>().map(Value::Bool).map_err(|_| {
531                    SQLRiteError::General(format!(
532                        "Type mismatch: expected BOOL for column '{name}', got '{val}'"
533                    ))
534                })?,
535                DataType::None | DataType::Invalid => {
536                    return Err(SQLRiteError::Internal(format!(
537                        "column '{name}' has an unsupported datatype"
538                    )));
539                }
540            };
541
542            if let Some(secondary) = self.index_for_column(name) {
543                if secondary.would_violate_unique(&parsed) {
544                    return Err(SQLRiteError::General(format!(
545                        "UNIQUE constraint violated for column '{name}': value '{val}' already exists"
546                    )));
547                }
548            } else {
549                // No secondary index (Real / Bool UNIQUE). Linear scan.
550                for other in self.rowids() {
551                    if self.get_value(name, other).as_ref() == Some(&parsed) {
552                        return Err(SQLRiteError::General(format!(
553                            "UNIQUE constraint violated for column '{name}': value '{val}' already exists"
554                        )));
555                    }
556                }
557            }
558        }
559        Ok(())
560    }
561
562    /// Inserts all VALUES in its approprieta COLUMNS, using the ROWID an embedded INDEX on all ROWS
563    /// Every `Table` keeps track of the `last_rowid` in order to facilitate what the next one would be.
564    /// One limitation of this data structure is that we can only have one write transaction at a time, otherwise
565    /// we could have a race condition on the last_rowid.
566    ///
567    /// Since we are loosely modeling after SQLite, this is also a limitation of SQLite (allowing only one write transcation at a time),
568    /// So we are good. :)
569    ///
570    /// Returns `Err` (leaving the table unchanged) when the user supplies an
571    /// incompatibly-typed value — no more panics on bad input.
572    pub fn insert_row(&mut self, cols: &Vec<String>, values: &Vec<String>) -> Result<()> {
573        let mut next_rowid = self.last_rowid + 1;
574
575        // Auto-assign INTEGER PRIMARY KEY when the user omits it; otherwise
576        // adopt the supplied value as the new rowid.
577        if self.primary_key != "-1" {
578            if !cols.iter().any(|col| col == &self.primary_key) {
579                // Write the auto-assigned PK into row storage, then sync
580                // the secondary index.
581                let val = next_rowid as i32;
582                let wrote_integer = {
583                    let rows_clone = Arc::clone(&self.rows);
584                    let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
585                    let table_col_data = row_data.get_mut(&self.primary_key).ok_or_else(|| {
586                        SQLRiteError::Internal(format!(
587                            "Row storage missing for primary key column '{}'",
588                            self.primary_key
589                        ))
590                    })?;
591                    match table_col_data {
592                        Row::Integer(tree) => {
593                            tree.insert(next_rowid, val);
594                            true
595                        }
596                        _ => false, // non-integer PK: auto-assign is a no-op
597                    }
598                };
599                if wrote_integer {
600                    let pk = self.primary_key.clone();
601                    if let Some(idx) = self.index_for_column_mut(&pk) {
602                        idx.insert(&Value::Integer(val as i64), next_rowid)?;
603                    }
604                }
605            } else {
606                for i in 0..cols.len() {
607                    if cols[i] == self.primary_key {
608                        let val = &values[i];
609                        next_rowid = val.parse::<i64>().map_err(|_| {
610                            SQLRiteError::General(format!(
611                                "Type mismatch: PRIMARY KEY column '{}' expects INTEGER, got '{val}'",
612                                self.primary_key
613                            ))
614                        })?;
615                    }
616                }
617            }
618        }
619
620        // For every table column, either pick the supplied value or pad with NULL
621        // so that every column's BTreeMap keeps the same rowid keyset.
622        let column_names = self
623            .columns
624            .iter()
625            .map(|col| col.column_name.to_string())
626            .collect::<Vec<String>>();
627        let mut j: usize = 0;
628        for i in 0..column_names.len() {
629            let mut val = String::from("Null");
630            let key = &column_names[i];
631
632            if let Some(supplied_key) = cols.get(j) {
633                if supplied_key == &column_names[i] {
634                    val = values[j].to_string();
635                    j += 1;
636                } else if self.primary_key == column_names[i] {
637                    // PK already stored in the auto-assign branch above.
638                    continue;
639                }
640            } else if self.primary_key == column_names[i] {
641                continue;
642            }
643
644            // Step 1: write into row storage and compute the typed Value
645            // we'll hand to the secondary index (if any).
646            let typed_value: Option<Value> = {
647                let rows_clone = Arc::clone(&self.rows);
648                let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
649                let table_col_data = row_data.get_mut(key).ok_or_else(|| {
650                    SQLRiteError::Internal(format!("Row storage missing for column '{key}'"))
651                })?;
652
653                match table_col_data {
654                    Row::Integer(tree) => {
655                        let parsed = val.parse::<i32>().map_err(|_| {
656                            SQLRiteError::General(format!(
657                                "Type mismatch: expected INTEGER for column '{key}', got '{val}'"
658                            ))
659                        })?;
660                        tree.insert(next_rowid, parsed);
661                        Some(Value::Integer(parsed as i64))
662                    }
663                    Row::Text(tree) => {
664                        tree.insert(next_rowid, val.to_string());
665                        // "Null" sentinel stays out of the index — it isn't a
666                        // real user value.
667                        if val != "Null" {
668                            Some(Value::Text(val.to_string()))
669                        } else {
670                            None
671                        }
672                    }
673                    Row::Real(tree) => {
674                        let parsed = val.parse::<f32>().map_err(|_| {
675                            SQLRiteError::General(format!(
676                                "Type mismatch: expected REAL for column '{key}', got '{val}'"
677                            ))
678                        })?;
679                        tree.insert(next_rowid, parsed);
680                        Some(Value::Real(parsed as f64))
681                    }
682                    Row::Bool(tree) => {
683                        let parsed = val.parse::<bool>().map_err(|_| {
684                            SQLRiteError::General(format!(
685                                "Type mismatch: expected BOOL for column '{key}', got '{val}'"
686                            ))
687                        })?;
688                        tree.insert(next_rowid, parsed);
689                        Some(Value::Bool(parsed))
690                    }
691                    Row::None => {
692                        return Err(SQLRiteError::Internal(format!(
693                            "Column '{key}' has no row storage"
694                        )));
695                    }
696                }
697            };
698
699            // Step 2: maintain the secondary index (if any). insert() is a
700            // no-op for Value::Null and cheap for other value kinds.
701            if let Some(v) = typed_value {
702                if let Some(idx) = self.index_for_column_mut(key) {
703                    idx.insert(&v, next_rowid)?;
704                }
705            }
706        }
707        self.last_rowid = next_rowid;
708        Ok(())
709    }
710
711    /// Print the table schema to standard output in a pretty formatted way.
712    ///
713    /// # Example
714    ///
715    /// ```text
716    /// let table = Table::new(payload);
717    /// table.print_table_schema();
718    ///
719    /// Prints to standard output:
720    ///    +-------------+-----------+-------------+--------+----------+
721    ///    | Column Name | Data Type | PRIMARY KEY | UNIQUE | NOT NULL |
722    ///    +-------------+-----------+-------------+--------+----------+
723    ///    | id          | Integer   | true        | true   | true     |
724    ///    +-------------+-----------+-------------+--------+----------+
725    ///    | name        | Text      | false       | true   | false    |
726    ///    +-------------+-----------+-------------+--------+----------+
727    ///    | email       | Text      | false       | false  | false    |
728    ///    +-------------+-----------+-------------+--------+----------+
729    /// ```
730    ///
731    pub fn print_table_schema(&self) -> Result<usize> {
732        let mut table = PrintTable::new();
733        table.add_row(row![
734            "Column Name",
735            "Data Type",
736            "PRIMARY KEY",
737            "UNIQUE",
738            "NOT NULL"
739        ]);
740
741        for col in &self.columns {
742            table.add_row(row![
743                col.column_name,
744                col.datatype,
745                col.is_pk,
746                col.is_unique,
747                col.not_null
748            ]);
749        }
750
751        table.printstd();
752        Ok(table.len() * 2 + 1)
753    }
754
755    /// Print the table data to standard output in a pretty formatted way.
756    ///
757    /// # Example
758    ///
759    /// ```text
760    /// let db_table = db.get_table_mut(table_name.to_string()).unwrap();
761    /// db_table.print_table_data();
762    ///
763    /// Prints to standard output:
764    ///     +----+---------+------------------------+
765    ///     | id | name    | email                  |
766    ///     +----+---------+------------------------+
767    ///     | 1  | "Jack"  | "jack@mail.com"        |
768    ///     +----+---------+------------------------+
769    ///     | 10 | "Bob"   | "bob@main.com"         |
770    ///     +----+---------+------------------------+
771    ///     | 11 | "Bill"  | "bill@main.com"        |
772    ///     +----+---------+------------------------+
773    /// ```
774    ///
775    pub fn print_table_data(&self) {
776        let mut print_table = PrintTable::new();
777
778        let column_names = self
779            .columns
780            .iter()
781            .map(|col| col.column_name.to_string())
782            .collect::<Vec<String>>();
783
784        let header_row = PrintRow::new(
785            column_names
786                .iter()
787                .map(|col| PrintCell::new(col))
788                .collect::<Vec<PrintCell>>(),
789        );
790
791        let rows_clone = Arc::clone(&self.rows);
792        let row_data = rows_clone.lock().expect("rows mutex poisoned");
793        let first_col_data = row_data
794            .get(&self.columns.first().unwrap().column_name)
795            .unwrap();
796        let num_rows = first_col_data.count();
797        let mut print_table_rows: Vec<PrintRow> = vec![PrintRow::new(vec![]); num_rows];
798
799        for col_name in &column_names {
800            let col_val = row_data
801                .get(col_name)
802                .expect("Can't find any rows with the given column");
803            let columns: Vec<String> = col_val.get_serialized_col_data();
804
805            for i in 0..num_rows {
806                if let Some(cell) = &columns.get(i) {
807                    print_table_rows[i].add_cell(PrintCell::new(cell));
808                } else {
809                    print_table_rows[i].add_cell(PrintCell::new(""));
810                }
811            }
812        }
813
814        print_table.add_row(header_row);
815        for row in print_table_rows {
816            print_table.add_row(row);
817        }
818
819        print_table.printstd();
820    }
821}
822
823/// The schema for each SQL column in every table.
824///
825/// Per-column index state moved to `Table::secondary_indexes` in Phase 3e —
826/// a single `Column` describes the declared schema (name, type, constraints)
827/// and nothing more.
828#[derive(PartialEq, Debug, Clone)]
829pub struct Column {
830    pub column_name: String,
831    pub datatype: DataType,
832    pub is_pk: bool,
833    pub not_null: bool,
834    pub is_unique: bool,
835}
836
837impl Column {
838    pub fn new(
839        name: String,
840        datatype: String,
841        is_pk: bool,
842        not_null: bool,
843        is_unique: bool,
844    ) -> Self {
845        let dt = DataType::new(datatype);
846        Column {
847            column_name: name,
848            datatype: dt,
849            is_pk,
850            not_null,
851            is_unique,
852        }
853    }
854}
855
856/// The schema for each SQL row in every table is represented in memory
857/// by following structure
858///
859/// This is an enum representing each of the available types organized in a BTreeMap
860/// data structure, using the ROWID and key and each corresponding type as value
861#[derive(PartialEq, Debug, Clone)]
862pub enum Row {
863    Integer(BTreeMap<i64, i32>),
864    Text(BTreeMap<i64, String>),
865    Real(BTreeMap<i64, f32>),
866    Bool(BTreeMap<i64, bool>),
867    None,
868}
869
870impl Row {
871    fn get_serialized_col_data(&self) -> Vec<String> {
872        match self {
873            Row::Integer(cd) => cd.values().map(|v| v.to_string()).collect(),
874            Row::Real(cd) => cd.values().map(|v| v.to_string()).collect(),
875            Row::Text(cd) => cd.values().map(|v| v.to_string()).collect(),
876            Row::Bool(cd) => cd.values().map(|v| v.to_string()).collect(),
877            Row::None => panic!("Found None in columns"),
878        }
879    }
880
881    fn count(&self) -> usize {
882        match self {
883            Row::Integer(cd) => cd.len(),
884            Row::Real(cd) => cd.len(),
885            Row::Text(cd) => cd.len(),
886            Row::Bool(cd) => cd.len(),
887            Row::None => panic!("Found None in columns"),
888        }
889    }
890
891    /// Every column's BTreeMap is keyed by ROWID. All columns share the same keyset
892    /// after an INSERT (missing columns are padded), so any column's keys are a valid
893    /// iteration of the table's rowids.
894    pub fn rowids(&self) -> Vec<i64> {
895        match self {
896            Row::Integer(m) => m.keys().copied().collect(),
897            Row::Text(m) => m.keys().copied().collect(),
898            Row::Real(m) => m.keys().copied().collect(),
899            Row::Bool(m) => m.keys().copied().collect(),
900            Row::None => vec![],
901        }
902    }
903
904    pub fn get(&self, rowid: i64) -> Option<Value> {
905        match self {
906            Row::Integer(m) => m.get(&rowid).map(|v| Value::Integer(i64::from(*v))),
907            // INSERT stores the literal string "Null" in Text columns that were omitted
908            // from the query — re-map that back to a real NULL on read.
909            Row::Text(m) => m.get(&rowid).map(|v| {
910                if v == "Null" {
911                    Value::Null
912                } else {
913                    Value::Text(v.clone())
914                }
915            }),
916            Row::Real(m) => m.get(&rowid).map(|v| Value::Real(f64::from(*v))),
917            Row::Bool(m) => m.get(&rowid).map(|v| Value::Bool(*v)),
918            Row::None => None,
919        }
920    }
921}
922
923/// Runtime value produced by query execution. Separate from the on-disk `Row` enum
924/// so the executor can carry typed values (including NULL) across operators.
925#[derive(Debug, Clone, PartialEq)]
926pub enum Value {
927    Integer(i64),
928    Text(String),
929    Real(f64),
930    Bool(bool),
931    Null,
932}
933
934impl Value {
935    pub fn to_display_string(&self) -> String {
936        match self {
937            Value::Integer(v) => v.to_string(),
938            Value::Text(s) => s.clone(),
939            Value::Real(f) => f.to_string(),
940            Value::Bool(b) => b.to_string(),
941            Value::Null => String::from("NULL"),
942        }
943    }
944}
945
946#[cfg(test)]
947mod tests {
948    use super::*;
949    use sqlparser::dialect::SQLiteDialect;
950    use sqlparser::parser::Parser;
951
952    #[test]
953    fn datatype_display_trait_test() {
954        let integer = DataType::Integer;
955        let text = DataType::Text;
956        let real = DataType::Real;
957        let boolean = DataType::Bool;
958        let none = DataType::None;
959        let invalid = DataType::Invalid;
960
961        assert_eq!(format!("{}", integer), "Integer");
962        assert_eq!(format!("{}", text), "Text");
963        assert_eq!(format!("{}", real), "Real");
964        assert_eq!(format!("{}", boolean), "Boolean");
965        assert_eq!(format!("{}", none), "None");
966        assert_eq!(format!("{}", invalid), "Invalid");
967    }
968
969    #[test]
970    fn create_new_table_test() {
971        let query_statement = "CREATE TABLE contacts (
972            id INTEGER PRIMARY KEY,
973            first_name TEXT NOT NULL,
974            last_name TEXT NOT NULl,
975            email TEXT NOT NULL UNIQUE,
976            active BOOL,
977            score REAL
978        );";
979        let dialect = SQLiteDialect {};
980        let mut ast = Parser::parse_sql(&dialect, query_statement).unwrap();
981        if ast.len() > 1 {
982            panic!("Expected a single query statement, but there are more then 1.")
983        }
984        let query = ast.pop().unwrap();
985
986        let create_query = CreateQuery::new(&query).unwrap();
987
988        let table = Table::new(create_query);
989
990        assert_eq!(table.columns.len(), 6);
991        assert_eq!(table.last_rowid, 0);
992
993        let id_column = "id".to_string();
994        if let Some(column) = table
995            .columns
996            .iter()
997            .filter(|c| c.column_name == id_column)
998            .collect::<Vec<&Column>>()
999            .first()
1000        {
1001            assert!(column.is_pk);
1002            assert_eq!(column.datatype, DataType::Integer);
1003        } else {
1004            panic!("column not found");
1005        }
1006    }
1007
1008    #[test]
1009    fn print_table_schema_test() {
1010        let query_statement = "CREATE TABLE contacts (
1011            id INTEGER PRIMARY KEY,
1012            first_name TEXT NOT NULL,
1013            last_name TEXT NOT NULl
1014        );";
1015        let dialect = SQLiteDialect {};
1016        let mut ast = Parser::parse_sql(&dialect, query_statement).unwrap();
1017        if ast.len() > 1 {
1018            panic!("Expected a single query statement, but there are more then 1.")
1019        }
1020        let query = ast.pop().unwrap();
1021
1022        let create_query = CreateQuery::new(&query).unwrap();
1023
1024        let table = Table::new(create_query);
1025        let lines_printed = table.print_table_schema();
1026        assert_eq!(lines_printed, Ok(9));
1027    }
1028}