Skip to main content

sqlrite/sql/db/
table.rs

1use crate::error::{Result, SQLRiteError};
2use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
3use crate::sql::fts::PostingList;
4use crate::sql::hnsw::{DistanceMetric, HnswIndex};
5use crate::sql::parser::create::{CreateQuery, ParsedColumn};
6use std::collections::{BTreeMap, HashMap};
7use std::fmt;
8use std::sync::{Arc, Mutex};
9
10use prettytable::{Cell as PrintCell, Row as PrintRow, Table as PrintTable};
11
12/// SQLRite data types
13/// Mapped after SQLite Data Type Storage Classes and SQLite Affinity Type
14/// (Datatypes In SQLite Version 3)[https://www.sqlite.org/datatype3.html]
15///
16/// `Vector(dim)` is the Phase 7a addition — a fixed-dimension dense f32
17/// array. The dimension is part of the type so a `VECTOR(384)` column
18/// rejects `[0.1, 0.2, 0.3]` at INSERT time as a clean type error
19/// rather than silently storing the wrong shape.
20#[derive(PartialEq, Debug, Clone)]
21pub enum DataType {
22    Integer,
23    Text,
24    Real,
25    Bool,
26    /// Dense f32 vector of fixed dimension. The `usize` is the column's
27    /// declared dimension; every value stored in the column must have
28    /// exactly that many elements.
29    Vector(usize),
30    /// Phase 7e — JSON column. Stored as canonical UTF-8 text (matches
31    /// SQLite's JSON1 extension), validated at INSERT time. The
32    /// `json_extract` family of functions parses on demand and returns
33    /// either a primitive `Value` (Integer / Real / Text / Bool / Null)
34    /// or a Text value carrying the JSON-encoded sub-object/array.
35    /// Q3 originally specified `bincoded serde_json::Value`, but bincode
36    /// was removed from the engine in Phase 3c — see the scope-correction
37    /// note in `docs/phase-7-plan.md` for the rationale on switching to
38    /// text storage.
39    Json,
40    None,
41    Invalid,
42}
43
44impl DataType {
45    /// Constructs a `DataType` from the wire string the parser produces.
46    /// Pre-Phase-7 the strings were one-of `"integer" | "text" | "real" |
47    /// "bool" | "none"`. Phase 7a adds `"vector(N)"` (case-insensitive,
48    /// N a positive integer) for the new vector column type — encoded
49    /// in-band so we don't have to plumb a richer type through the
50    /// existing string-based ParsedColumn pipeline.
51    pub fn new(cmd: String) -> DataType {
52        let lower = cmd.to_lowercase();
53        match lower.as_str() {
54            "integer" => DataType::Integer,
55            "text" => DataType::Text,
56            "real" => DataType::Real,
57            "bool" => DataType::Bool,
58            "json" => DataType::Json,
59            "none" => DataType::None,
60            other if other.starts_with("vector(") && other.ends_with(')') => {
61                // Strip the `vector(` prefix and trailing `)`, parse what's
62                // left as a positive integer dimension. Anything else is
63                // Invalid — surfaces a clean error at CREATE TABLE time.
64                let inside = &other["vector(".len()..other.len() - 1];
65                match inside.trim().parse::<usize>() {
66                    Ok(dim) if dim > 0 => DataType::Vector(dim),
67                    _ => {
68                        eprintln!("Invalid VECTOR dimension in {cmd}");
69                        DataType::Invalid
70                    }
71                }
72            }
73            _ => {
74                eprintln!("Invalid data type given {}", cmd);
75                DataType::Invalid
76            }
77        }
78    }
79
80    /// Inverse of `new` — returns the canonical lowercased wire string
81    /// for this DataType. Used by the parser to round-trip
82    /// `VECTOR(N)` → `DataType::Vector(N)` → `"vector(N)"` into
83    /// `ParsedColumn::datatype` so the rest of the pipeline keeps
84    /// working with strings.
85    pub fn to_wire_string(&self) -> String {
86        match self {
87            DataType::Integer => "Integer".to_string(),
88            DataType::Text => "Text".to_string(),
89            DataType::Real => "Real".to_string(),
90            DataType::Bool => "Bool".to_string(),
91            DataType::Vector(dim) => format!("vector({dim})"),
92            DataType::Json => "Json".to_string(),
93            DataType::None => "None".to_string(),
94            DataType::Invalid => "Invalid".to_string(),
95        }
96    }
97}
98
99impl fmt::Display for DataType {
100    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101        match self {
102            DataType::Integer => f.write_str("Integer"),
103            DataType::Text => f.write_str("Text"),
104            DataType::Real => f.write_str("Real"),
105            DataType::Bool => f.write_str("Boolean"),
106            DataType::Vector(dim) => write!(f, "Vector({dim})"),
107            DataType::Json => f.write_str("Json"),
108            DataType::None => f.write_str("None"),
109            DataType::Invalid => f.write_str("Invalid"),
110        }
111    }
112}
113
114/// The schema for each SQL Table is represented in memory by
115/// following structure.
116///
117/// `rows` is `Arc<Mutex<...>>` rather than `Rc<RefCell<...>>` so `Table`
118/// (and by extension `Database`) is `Send + Sync` — the Tauri desktop
119/// app holds the engine in shared state behind a `Mutex<Database>`, and
120/// Tauri's state container requires its contents to be thread-safe.
121#[derive(Debug)]
122pub struct Table {
123    /// Name of the table
124    pub tb_name: String,
125    /// Schema for each column, in declaration order.
126    pub columns: Vec<Column>,
127    /// Per-column row storage, keyed by column name. Every column's
128    /// `Row::T(BTreeMap)` is keyed by rowid, so all columns share the same
129    /// keyset after each write.
130    pub rows: Arc<Mutex<HashMap<String, Row>>>,
131    /// Secondary indexes on this table (Phase 3e). One auto-created entry
132    /// per UNIQUE or PRIMARY KEY column; explicit `CREATE INDEX` statements
133    /// add more. Looking up an index: iterate by column name, or by index
134    /// name via `Table::index_by_name`.
135    pub secondary_indexes: Vec<SecondaryIndex>,
136    /// HNSW indexes on VECTOR columns (Phase 7d.2). Maintained in lockstep
137    /// with row storage on INSERT (incremental); rebuilt on open from the
138    /// persisted CREATE INDEX SQL. The graph itself is NOT yet persisted —
139    /// see Phase 7d.3 for cell-encoded graph storage.
140    pub hnsw_indexes: Vec<HnswIndexEntry>,
141    /// FTS inverted indexes on TEXT columns (Phase 8b). Maintained in
142    /// lockstep with row storage on INSERT (incremental); DELETE / UPDATE
143    /// flag `needs_rebuild` and the next save rebuilds from current rows.
144    /// The posting lists themselves are NOT yet persisted — Phase 8c
145    /// wires the cell-encoded `KIND_FTS_POSTING` storage.
146    pub fts_indexes: Vec<FtsIndexEntry>,
147    /// ROWID of most recent insert.
148    pub last_rowid: i64,
149    /// PRIMARY KEY column name, or "-1" if the table has no PRIMARY KEY.
150    pub primary_key: String,
151}
152
153/// One HNSW index attached to a table. The distance metric is fixed
154/// at CREATE INDEX time via `USING hnsw (col) WITH (metric = '<m>')`
155/// (`l2` / `cosine` / `dot`); omitting the WITH clause defaults to L2,
156/// matching the pre-SQLR-28 behaviour for round-tripping older
157/// `sqlrite_master` rows that didn't carry a metric.
158#[derive(Debug, Clone)]
159pub struct HnswIndexEntry {
160    /// User-supplied name from `CREATE INDEX <name> …`. Unique across
161    /// both `secondary_indexes` and `hnsw_indexes` on a given table.
162    pub name: String,
163    /// The VECTOR column this index covers.
164    pub column_name: String,
165    /// Distance metric the graph was built for. The optimizer's HNSW
166    /// shortcut only fires when the query's `vec_distance_*` function
167    /// matches this metric — picking a non-matching distance falls
168    /// through to brute-force, since the graph topology is metric-
169    /// specific (an L2-pruned graph isn't a valid cosine search graph
170    /// in general, and vice versa).
171    pub metric: DistanceMetric,
172    /// The graph itself.
173    pub index: HnswIndex,
174    /// Phase 7d.3 — true iff a DELETE or UPDATE-on-vector-col has
175    /// invalidated the graph since the last rebuild. INSERT maintains
176    /// the graph incrementally and leaves this false. The next save
177    /// rebuilds dirty indexes from current rows before serializing.
178    pub needs_rebuild: bool,
179}
180
181/// One FTS index attached to a table (Phase 8b). The inverted index
182/// itself is a [`PostingList`]; metadata (name, column, dirty flag)
183/// lives here. Mirrors [`HnswIndexEntry`] field-for-field so the
184/// rebuild-on-save and DELETE/UPDATE invalidation paths can use one
185/// pattern across both index families.
186#[derive(Debug, Clone)]
187pub struct FtsIndexEntry {
188    /// User-supplied name from `CREATE INDEX <name> … USING fts(<col>)`.
189    /// Unique across `secondary_indexes`, `hnsw_indexes`, and
190    /// `fts_indexes` on a given table.
191    pub name: String,
192    /// The TEXT column this index covers.
193    pub column_name: String,
194    /// The inverted index + per-doc length cache.
195    pub index: PostingList,
196    /// True iff a DELETE or UPDATE-on-text-col has invalidated the
197    /// posting lists since the last rebuild. INSERT maintains the
198    /// index incrementally and leaves this false. The next save
199    /// rebuilds dirty indexes from current rows before serializing
200    /// (mirrors HNSW's Q7 strategy).
201    pub needs_rebuild: bool,
202}
203
204impl Table {
205    pub fn new(create_query: CreateQuery) -> Self {
206        let table_name = create_query.table_name;
207        let mut primary_key: String = String::from("-1");
208        let columns = create_query.columns;
209
210        let mut table_cols: Vec<Column> = vec![];
211        let table_rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
212        let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
213        for col in &columns {
214            let col_name = &col.name;
215            if col.is_pk {
216                primary_key = col_name.to_string();
217            }
218            table_cols.push(Column::with_default(
219                col_name.to_string(),
220                col.datatype.to_string(),
221                col.is_pk,
222                col.not_null,
223                col.is_unique,
224                col.default.clone(),
225            ));
226
227            let dt = DataType::new(col.datatype.to_string());
228            let row_storage = match &dt {
229                DataType::Integer => Row::Integer(BTreeMap::new()),
230                DataType::Real => Row::Real(BTreeMap::new()),
231                DataType::Text => Row::Text(BTreeMap::new()),
232                DataType::Bool => Row::Bool(BTreeMap::new()),
233                // The dimension is enforced at INSERT time against the
234                // column's declared DataType::Vector(dim). The Row variant
235                // itself doesn't carry the dim — every stored Vec<f32>
236                // already has it via .len().
237                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
238                // Phase 7e — JSON columns reuse Text storage (with
239                // INSERT-time validation that the bytes parse as JSON).
240                // No new Row variant; json_extract / json_type / etc.
241                // re-parse from text on demand. See `docs/phase-7-plan.md`
242                // Q3's scope-correction note for the storage choice.
243                DataType::Json => Row::Text(BTreeMap::new()),
244                DataType::Invalid | DataType::None => Row::None,
245            };
246            table_rows
247                .lock()
248                .expect("Table row storage mutex poisoned")
249                .insert(col.name.to_string(), row_storage);
250
251            // Auto-create an index for every UNIQUE / PRIMARY KEY column,
252            // but only for types we know how to index. Real / Bool / Vector
253            // UNIQUE columns fall back to the linear scan path in
254            // validate_unique_constraint — same behavior as before 3e.
255            // (Vector UNIQUE is unusual; the linear-scan path will work
256            // via Value::Vector PartialEq, just at O(N) cost.)
257            if (col.is_pk || col.is_unique) && matches!(dt, DataType::Integer | DataType::Text) {
258                let name = SecondaryIndex::auto_name(&table_name, &col.name);
259                match SecondaryIndex::new(
260                    name,
261                    table_name.clone(),
262                    col.name.clone(),
263                    &dt,
264                    true,
265                    IndexOrigin::Auto,
266                ) {
267                    Ok(idx) => secondary_indexes.push(idx),
268                    Err(_) => {
269                        // Unreachable given the matches! guard above, but
270                        // the builder returns Result so we keep the arm.
271                    }
272                }
273            }
274        }
275
276        Table {
277            tb_name: table_name,
278            columns: table_cols,
279            rows: table_rows,
280            secondary_indexes,
281            // HNSW indexes only land via explicit CREATE INDEX … USING hnsw
282            // statements (Phase 7d.2); never auto-created at CREATE TABLE
283            // time, because there's no UNIQUE-style constraint that
284            // implies a vector index.
285            hnsw_indexes: Vec::new(),
286            // Same story for FTS indexes — explicit `CREATE INDEX … USING
287            // fts(<col>)` only (Phase 8b).
288            fts_indexes: Vec::new(),
289            last_rowid: 0,
290            primary_key,
291        }
292    }
293
294    /// Deep-clones a `Table` for transaction snapshots (Phase 4f).
295    ///
296    /// The normal `Clone` derive would shallow-clone the `Arc<Mutex<_>>`
297    /// wrapping our row storage, leaving both copies sharing the same
298    /// inner map — mutating the snapshot would corrupt the live table
299    /// and vice versa. Instead we lock, clone the inner `HashMap`, and
300    /// wrap it in a fresh `Arc<Mutex<_>>`. Columns and indexes derive
301    /// `Clone` directly (all their fields are plain data).
302    pub fn deep_clone(&self) -> Self {
303        let cloned_rows: HashMap<String, Row> = {
304            let guard = self.rows.lock().expect("row mutex poisoned");
305            guard.clone()
306        };
307        Table {
308            tb_name: self.tb_name.clone(),
309            columns: self.columns.clone(),
310            rows: Arc::new(Mutex::new(cloned_rows)),
311            secondary_indexes: self.secondary_indexes.clone(),
312            // HnswIndexEntry derives Clone, so the snapshot owns its own
313            // graph copy. Phase 4f's snapshot-rollback semantics require
314            // the snapshot to be fully decoupled from live state.
315            hnsw_indexes: self.hnsw_indexes.clone(),
316            // Same fully-decoupled clone for FTS indexes (Phase 8b).
317            fts_indexes: self.fts_indexes.clone(),
318            last_rowid: self.last_rowid,
319            primary_key: self.primary_key.clone(),
320        }
321    }
322
323    /// Finds an auto- or explicit-index entry for a given column. Returns
324    /// `None` if the column isn't indexed.
325    pub fn index_for_column(&self, column: &str) -> Option<&SecondaryIndex> {
326        self.secondary_indexes
327            .iter()
328            .find(|i| i.column_name == column)
329    }
330
331    fn index_for_column_mut(&mut self, column: &str) -> Option<&mut SecondaryIndex> {
332        self.secondary_indexes
333            .iter_mut()
334            .find(|i| i.column_name == column)
335    }
336
337    /// Finds a secondary index by its own name (e.g., `sqlrite_autoindex_users_email`
338    /// or a user-provided CREATE INDEX name). Used by DROP INDEX and the
339    /// rename helpers below.
340    pub fn index_by_name(&self, name: &str) -> Option<&SecondaryIndex> {
341        self.secondary_indexes.iter().find(|i| i.name == name)
342    }
343
344    /// Renames a column in place. Updates row storage, the `Column`
345    /// metadata, every secondary / HNSW / FTS index whose `column_name`
346    /// matches, the `primary_key` pointer if the renamed column is the
347    /// PK, and any auto-index name that embedded the old column name.
348    ///
349    /// Caller-side validation (table existence, source-column existence
350    /// at the surface level, IF EXISTS) lives in the executor; this
351    /// method enforces the column-level invariants that have to be
352    /// checked under the `Table` borrow anyway.
353    pub fn rename_column(&mut self, old: &str, new: &str) -> Result<()> {
354        if !self.columns.iter().any(|c| c.column_name == old) {
355            return Err(SQLRiteError::General(format!(
356                "column '{old}' does not exist in table '{}'",
357                self.tb_name
358            )));
359        }
360        if old != new && self.columns.iter().any(|c| c.column_name == new) {
361            return Err(SQLRiteError::General(format!(
362                "column '{new}' already exists in table '{}'",
363                self.tb_name
364            )));
365        }
366        if old == new {
367            return Ok(());
368        }
369
370        for col in self.columns.iter_mut() {
371            if col.column_name == old {
372                col.column_name = new.to_string();
373            }
374        }
375
376        // Re-key the per-column row map.
377        {
378            let mut rows = self.rows.lock().expect("rows mutex poisoned");
379            if let Some(storage) = rows.remove(old) {
380                rows.insert(new.to_string(), storage);
381            }
382        }
383
384        if self.primary_key == old {
385            self.primary_key = new.to_string();
386        }
387
388        let table_name = self.tb_name.clone();
389        for idx in self.secondary_indexes.iter_mut() {
390            if idx.column_name == old {
391                idx.column_name = new.to_string();
392                if idx.origin == IndexOrigin::Auto
393                    && idx.name == SecondaryIndex::auto_name(&table_name, old)
394                {
395                    idx.name = SecondaryIndex::auto_name(&table_name, new);
396                }
397            }
398        }
399        for entry in self.hnsw_indexes.iter_mut() {
400            if entry.column_name == old {
401                entry.column_name = new.to_string();
402            }
403        }
404        for entry in self.fts_indexes.iter_mut() {
405            if entry.column_name == old {
406                entry.column_name = new.to_string();
407            }
408        }
409
410        Ok(())
411    }
412
413    /// Appends a new column to this table from a parsed column spec.
414    /// The new column's row storage is allocated empty; existing rowids
415    /// read NULL for the new column unless `parsed.default` is set, in
416    /// which case those rowids are backfilled with the default value.
417    ///
418    /// Rejects PK / UNIQUE on the added column (would require
419    /// backfill-with-uniqueness-check against existing rows). Rejects
420    /// NOT NULL without DEFAULT on a non-empty table — same rule SQLite
421    /// applies, and necessary because we have no other backfill source.
422    pub fn add_column(&mut self, parsed: ParsedColumn) -> Result<()> {
423        if self.contains_column(parsed.name.clone()) {
424            return Err(SQLRiteError::General(format!(
425                "column '{}' already exists in table '{}'",
426                parsed.name, self.tb_name
427            )));
428        }
429        if parsed.is_pk {
430            return Err(SQLRiteError::General(
431                "cannot ADD COLUMN with PRIMARY KEY constraint on existing table".to_string(),
432            ));
433        }
434        if parsed.is_unique {
435            return Err(SQLRiteError::General(
436                "cannot ADD COLUMN with UNIQUE constraint on existing table".to_string(),
437            ));
438        }
439        let table_has_rows = self
440            .columns
441            .first()
442            .map(|c| {
443                self.rows
444                    .lock()
445                    .expect("rows mutex poisoned")
446                    .get(&c.column_name)
447                    .map(|r| r.rowids().len())
448                    .unwrap_or(0)
449                    > 0
450            })
451            .unwrap_or(false);
452        if parsed.not_null && parsed.default.is_none() && table_has_rows {
453            return Err(SQLRiteError::General(format!(
454                "cannot ADD COLUMN '{}' NOT NULL without DEFAULT to a non-empty table",
455                parsed.name
456            )));
457        }
458
459        let new_column = Column::with_default(
460            parsed.name.clone(),
461            parsed.datatype.clone(),
462            parsed.is_pk,
463            parsed.not_null,
464            parsed.is_unique,
465            parsed.default.clone(),
466        );
467
468        // Allocate empty row storage for the new column. Mirrors the
469        // dispatch in `Table::new` so the new column behaves identically
470        // to one declared at CREATE TABLE time.
471        let row_storage = match &new_column.datatype {
472            DataType::Integer => Row::Integer(BTreeMap::new()),
473            DataType::Real => Row::Real(BTreeMap::new()),
474            DataType::Text => Row::Text(BTreeMap::new()),
475            DataType::Bool => Row::Bool(BTreeMap::new()),
476            DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
477            DataType::Json => Row::Text(BTreeMap::new()),
478            DataType::Invalid | DataType::None => Row::None,
479        };
480        {
481            let mut rows = self.rows.lock().expect("rows mutex poisoned");
482            rows.insert(parsed.name.clone(), row_storage);
483        }
484
485        // Backfill existing rowids with the default value, if any.
486        // NULL defaults are a no-op — a missing key in the BTreeMap reads
487        // as NULL anyway. Type mismatches were caught at `parse_one_column`
488        // time when the DEFAULT was evaluated against the declared
489        // datatype; reaching the `_` arm here would indicate a bug.
490        if let Some(default) = &parsed.default {
491            let existing_rowids = self.rowids();
492            let mut rows = self.rows.lock().expect("rows mutex poisoned");
493            let storage = rows.get_mut(&parsed.name).expect("just inserted");
494            match (storage, default) {
495                (Row::Integer(tree), Value::Integer(v)) => {
496                    let v32 = *v as i32;
497                    for rowid in existing_rowids {
498                        tree.insert(rowid, v32);
499                    }
500                }
501                (Row::Real(tree), Value::Real(v)) => {
502                    let v32 = *v as f32;
503                    for rowid in existing_rowids {
504                        tree.insert(rowid, v32);
505                    }
506                }
507                (Row::Text(tree), Value::Text(v)) => {
508                    for rowid in existing_rowids {
509                        tree.insert(rowid, v.clone());
510                    }
511                }
512                (Row::Bool(tree), Value::Bool(v)) => {
513                    for rowid in existing_rowids {
514                        tree.insert(rowid, *v);
515                    }
516                }
517                (_, Value::Null) => {} // no-op
518                (storage_ref, _) => {
519                    return Err(SQLRiteError::Internal(format!(
520                        "DEFAULT type does not match column storage for '{}': storage variant {:?}, default {:?}",
521                        parsed.name,
522                        std::mem::discriminant(storage_ref),
523                        default
524                    )));
525                }
526            }
527        }
528
529        self.columns.push(new_column);
530        Ok(())
531    }
532
533    /// Removes a column from this table. Refuses to drop the PRIMARY KEY
534    /// column or the only remaining column. Cascades to every index
535    /// (auto, explicit, HNSW, FTS) that referenced the column.
536    pub fn drop_column(&mut self, name: &str) -> Result<()> {
537        if !self.contains_column(name.to_string()) {
538            return Err(SQLRiteError::General(format!(
539                "column '{name}' does not exist in table '{}'",
540                self.tb_name
541            )));
542        }
543        if self.primary_key == name {
544            return Err(SQLRiteError::General(format!(
545                "cannot drop primary key column '{name}'"
546            )));
547        }
548        if self.columns.len() == 1 {
549            return Err(SQLRiteError::General(format!(
550                "cannot drop the only column of table '{}'",
551                self.tb_name
552            )));
553        }
554
555        self.columns.retain(|c| c.column_name != name);
556        {
557            let mut rows = self.rows.lock().expect("rows mutex poisoned");
558            rows.remove(name);
559        }
560        self.secondary_indexes.retain(|i| i.column_name != name);
561        self.hnsw_indexes.retain(|i| i.column_name != name);
562        self.fts_indexes.retain(|i| i.column_name != name);
563
564        Ok(())
565    }
566
567    /// Returns a `bool` informing if a `Column` with a specific name exists or not
568    ///
569    pub fn contains_column(&self, column: String) -> bool {
570        self.columns.iter().any(|col| col.column_name == column)
571    }
572
573    /// Returns the list of column names in declaration order.
574    pub fn column_names(&self) -> Vec<String> {
575        self.columns.iter().map(|c| c.column_name.clone()).collect()
576    }
577
578    /// Returns all rowids currently stored in the table, in ascending order.
579    /// Every column's BTreeMap has the same keyset, so we just read from the first column.
580    pub fn rowids(&self) -> Vec<i64> {
581        let Some(first) = self.columns.first() else {
582            return vec![];
583        };
584        let rows = self.rows.lock().expect("rows mutex poisoned");
585        rows.get(&first.column_name)
586            .map(|r| r.rowids())
587            .unwrap_or_default()
588    }
589
590    /// Reads a single cell at `(column, rowid)`.
591    pub fn get_value(&self, column: &str, rowid: i64) -> Option<Value> {
592        let rows = self.rows.lock().expect("rows mutex poisoned");
593        rows.get(column).and_then(|r| r.get(rowid))
594    }
595
596    /// Removes the row identified by `rowid` from every column's storage and
597    /// from every secondary index entry.
598    pub fn delete_row(&mut self, rowid: i64) {
599        // Snapshot the values we're about to delete so we can strip them
600        // from secondary indexes by (value, rowid) before the row storage
601        // disappears.
602        let per_column_values: Vec<(String, Option<Value>)> = self
603            .columns
604            .iter()
605            .map(|c| (c.column_name.clone(), self.get_value(&c.column_name, rowid)))
606            .collect();
607
608        // Remove from row storage.
609        {
610            let rows_clone = Arc::clone(&self.rows);
611            let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
612            for col in &self.columns {
613                if let Some(r) = row_data.get_mut(&col.column_name) {
614                    match r {
615                        Row::Integer(m) => {
616                            m.remove(&rowid);
617                        }
618                        Row::Text(m) => {
619                            m.remove(&rowid);
620                        }
621                        Row::Real(m) => {
622                            m.remove(&rowid);
623                        }
624                        Row::Bool(m) => {
625                            m.remove(&rowid);
626                        }
627                        Row::Vector(m) => {
628                            m.remove(&rowid);
629                        }
630                        Row::None => {}
631                    }
632                }
633            }
634        }
635
636        // Strip secondary-index entries. Non-indexed columns just don't
637        // show up in secondary_indexes and are no-ops here.
638        for (col_name, value) in per_column_values {
639            if let Some(idx) = self.index_for_column_mut(&col_name) {
640                if let Some(v) = value {
641                    idx.remove(&v, rowid);
642                }
643            }
644        }
645    }
646
647    /// Replays a single row at `rowid` when loading a table from disk. Takes
648    /// one typed value per column (in declaration order); `None` means the
649    /// stored cell carried a NULL for that column. Unlike `insert_row` this
650    /// trusts the on-disk state and does *not* re-check UNIQUE — we're
651    /// rebuilding a state that was already consistent when it was saved.
652    pub fn restore_row(&mut self, rowid: i64, values: Vec<Option<Value>>) -> Result<()> {
653        if values.len() != self.columns.len() {
654            return Err(SQLRiteError::Internal(format!(
655                "cell has {} values but table '{}' has {} columns",
656                values.len(),
657                self.tb_name,
658                self.columns.len()
659            )));
660        }
661
662        let column_names: Vec<String> =
663            self.columns.iter().map(|c| c.column_name.clone()).collect();
664
665        for (i, value) in values.into_iter().enumerate() {
666            let col_name = &column_names[i];
667
668            // Write into the per-column row storage first (scoped borrow so
669            // the secondary-index update below doesn't fight over `self`).
670            {
671                let rows_clone = Arc::clone(&self.rows);
672                let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
673                let cell = row_data.get_mut(col_name).ok_or_else(|| {
674                    SQLRiteError::Internal(format!("Row storage missing for column '{col_name}'"))
675                })?;
676
677                match (cell, &value) {
678                    // SQL NULL: leave the per-column BTreeMap entry
679                    // absent. `Row::*::get` returns `None` for missing
680                    // rowids, which `Table::get_value` relays and the
681                    // executor's `Identifier` arm renders as
682                    // `Value::Null`. Mirrors `insert_row`'s NULL path.
683                    (_, None) => { /* nothing to insert */ }
684                    (Row::Integer(map), Some(Value::Integer(v))) => {
685                        map.insert(rowid, *v as i32);
686                    }
687                    (Row::Text(map), Some(Value::Text(s))) => {
688                        map.insert(rowid, s.clone());
689                    }
690                    (Row::Real(map), Some(Value::Real(v))) => {
691                        map.insert(rowid, *v as f32);
692                    }
693                    (Row::Bool(map), Some(Value::Bool(v))) => {
694                        map.insert(rowid, *v);
695                    }
696                    (Row::Vector(map), Some(Value::Vector(v))) => {
697                        map.insert(rowid, v.clone());
698                    }
699                    (row, v) => {
700                        return Err(SQLRiteError::Internal(format!(
701                            "Type mismatch restoring column '{col_name}': storage {row:?} vs value {v:?}"
702                        )));
703                    }
704                }
705            }
706
707            // Maintain the secondary index (if any). NULL values are skipped
708            // by `insert`, matching the "NULL is not indexed" convention.
709            if let Some(v) = &value {
710                if let Some(idx) = self.index_for_column_mut(col_name) {
711                    idx.insert(v, rowid)?;
712                }
713            }
714        }
715
716        if rowid > self.last_rowid {
717            self.last_rowid = rowid;
718        }
719        Ok(())
720    }
721
722    /// Extracts a row as an ordered `Vec<Option<Value>>` matching the column
723    /// declaration order. Returns `None` entries for columns that hold NULL.
724    /// Used by `save_database` to turn a table's in-memory state into cells.
725    pub fn extract_row(&self, rowid: i64) -> Vec<Option<Value>> {
726        self.columns
727            .iter()
728            .map(|c| match self.get_value(&c.column_name, rowid) {
729                Some(Value::Null) => None,
730                Some(v) => Some(v),
731                None => None,
732            })
733            .collect()
734    }
735
736    /// Overwrites the cell at `(column, rowid)` with `new_val`. Enforces the
737    /// column's datatype and UNIQUE constraint, and updates any secondary
738    /// index.
739    ///
740    /// Returns `Err` if the column doesn't exist, the value type is incompatible,
741    /// or writing would violate UNIQUE.
742    pub fn set_value(&mut self, column: &str, rowid: i64, new_val: Value) -> Result<()> {
743        let col_index = self
744            .columns
745            .iter()
746            .position(|c| c.column_name == column)
747            .ok_or_else(|| SQLRiteError::General(format!("Column '{column}' not found")))?;
748
749        // No-op write — keep storage exactly the same.
750        let current = self.get_value(column, rowid);
751        if current.as_ref() == Some(&new_val) {
752            return Ok(());
753        }
754
755        // Enforce UNIQUE. Prefer an O(log N) index probe if we have one;
756        // fall back to a full column scan otherwise (Real/Bool UNIQUE
757        // columns, which don't get auto-indexed).
758        if self.columns[col_index].is_unique && !matches!(new_val, Value::Null) {
759            if let Some(idx) = self.index_for_column(column) {
760                for other in idx.lookup(&new_val) {
761                    if other != rowid {
762                        return Err(SQLRiteError::General(format!(
763                            "UNIQUE constraint violated for column '{column}'"
764                        )));
765                    }
766                }
767            } else {
768                for other in self.rowids() {
769                    if other == rowid {
770                        continue;
771                    }
772                    if self.get_value(column, other).as_ref() == Some(&new_val) {
773                        return Err(SQLRiteError::General(format!(
774                            "UNIQUE constraint violated for column '{column}'"
775                        )));
776                    }
777                }
778            }
779        }
780
781        // Drop the old index entry before writing the new value, so the
782        // post-write index insert doesn't clash with the previous state.
783        if let Some(old) = current {
784            if let Some(idx) = self.index_for_column_mut(column) {
785                idx.remove(&old, rowid);
786            }
787        }
788
789        // Write into the column's Row, type-checking against the declared DataType.
790        let declared = &self.columns[col_index].datatype;
791        {
792            let rows_clone = Arc::clone(&self.rows);
793            let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
794            let cell = row_data.get_mut(column).ok_or_else(|| {
795                SQLRiteError::Internal(format!("Row storage missing for column '{column}'"))
796            })?;
797
798            match (cell, &new_val, declared) {
799                (Row::Integer(m), Value::Integer(v), _) => {
800                    m.insert(rowid, *v as i32);
801                }
802                (Row::Real(m), Value::Real(v), _) => {
803                    m.insert(rowid, *v as f32);
804                }
805                (Row::Real(m), Value::Integer(v), _) => {
806                    m.insert(rowid, *v as f32);
807                }
808                (Row::Text(m), Value::Text(v), dt) => {
809                    // Phase 7e — UPDATE on a JSON column also validates
810                    // the new text is well-formed JSON, mirroring INSERT.
811                    if matches!(dt, DataType::Json) {
812                        if let Err(e) = serde_json::from_str::<serde_json::Value>(v) {
813                            return Err(SQLRiteError::General(format!(
814                                "Type mismatch: expected JSON for column '{column}', got '{v}': {e}"
815                            )));
816                        }
817                    }
818                    m.insert(rowid, v.clone());
819                }
820                (Row::Bool(m), Value::Bool(v), _) => {
821                    m.insert(rowid, *v);
822                }
823                (Row::Vector(m), Value::Vector(v), DataType::Vector(declared_dim)) => {
824                    if v.len() != *declared_dim {
825                        return Err(SQLRiteError::General(format!(
826                            "Vector dimension mismatch for column '{column}': declared {declared_dim}, got {}",
827                            v.len()
828                        )));
829                    }
830                    m.insert(rowid, v.clone());
831                }
832                // NULL writes: store the sentinel "Null" string for Text; for other
833                // types we leave storage as-is since those BTreeMaps can't hold NULL today.
834                (Row::Text(m), Value::Null, _) => {
835                    m.insert(rowid, "Null".to_string());
836                }
837                (_, new, dt) => {
838                    return Err(SQLRiteError::General(format!(
839                        "Type mismatch: cannot assign {} to column '{column}' of type {dt}",
840                        new.to_display_string()
841                    )));
842                }
843            }
844        }
845
846        // Maintain the secondary index, if any. NULL values are skipped by
847        // insert per convention.
848        if !matches!(new_val, Value::Null) {
849            if let Some(idx) = self.index_for_column_mut(column) {
850                idx.insert(&new_val, rowid)?;
851            }
852        }
853
854        Ok(())
855    }
856
857    /// Returns an immutable reference of `sql::db::table::Column` if the table contains a
858    /// column with the specified key as a column name.
859    ///
860    #[allow(dead_code)]
861    pub fn get_column(&mut self, column_name: String) -> Result<&Column> {
862        if let Some(column) = self
863            .columns
864            .iter()
865            .filter(|c| c.column_name == column_name)
866            .collect::<Vec<&Column>>()
867            .first()
868        {
869            Ok(column)
870        } else {
871            Err(SQLRiteError::General(String::from("Column not found.")))
872        }
873    }
874
875    /// Validates if columns and values being inserted violate the UNIQUE constraint.
876    /// PRIMARY KEY columns are automatically UNIQUE. Uses the corresponding
877    /// secondary index when one exists (O(log N) lookup); falls back to a
878    /// linear scan for indexable-but-not-indexed situations (e.g. a Real
879    /// UNIQUE column — Real isn't in the auto-indexed set).
880    pub fn validate_unique_constraint(
881        &mut self,
882        cols: &Vec<String>,
883        values: &Vec<Option<Value>>,
884    ) -> Result<()> {
885        for (idx, name) in cols.iter().enumerate() {
886            let column = self
887                .columns
888                .iter()
889                .find(|c| &c.column_name == name)
890                .ok_or_else(|| SQLRiteError::General(format!("Column '{name}' not found")))?;
891            if !column.is_unique {
892                continue;
893            }
894            let datatype = &column.datatype;
895
896            // Standard SQL UNIQUE allows multiple NULLs — skip the check.
897            let supplied = match &values[idx] {
898                None => continue,
899                Some(v) => v,
900            };
901
902            // Type-check the supplied Value against the column's declared
903            // datatype. Same shape as the dispatch in `insert_row`: an
904            // INTEGER column accepts Value::Integer; REAL accepts Real or
905            // widens Integer; TEXT/JSON accepts Text; BOOL accepts Bool;
906            // VECTOR accepts Vector with a matching dimension. Anything
907            // else short-circuits the insert with the same error message
908            // `insert_row` would emit for the same input.
909            let parsed: Value = match (datatype, supplied) {
910                (DataType::Integer, Value::Integer(n)) => Value::Integer(*n),
911                (DataType::Integer, other) => {
912                    return Err(SQLRiteError::General(format!(
913                        "Type mismatch: expected INTEGER for column '{name}', got '{}'",
914                        other.to_display_string()
915                    )));
916                }
917                (DataType::Text, Value::Text(s)) => Value::Text(s.clone()),
918                (DataType::Text, other) => {
919                    return Err(SQLRiteError::General(format!(
920                        "Type mismatch: expected TEXT for column '{name}', got '{}'",
921                        other.to_display_string()
922                    )));
923                }
924                (DataType::Real, Value::Real(f)) => Value::Real(*f),
925                (DataType::Real, Value::Integer(n)) => Value::Real(*n as f64),
926                (DataType::Real, other) => {
927                    return Err(SQLRiteError::General(format!(
928                        "Type mismatch: expected REAL for column '{name}', got '{}'",
929                        other.to_display_string()
930                    )));
931                }
932                (DataType::Bool, Value::Bool(b)) => Value::Bool(*b),
933                (DataType::Bool, other) => {
934                    return Err(SQLRiteError::General(format!(
935                        "Type mismatch: expected BOOL for column '{name}', got '{}'",
936                        other.to_display_string()
937                    )));
938                }
939                (DataType::Vector(declared_dim), Value::Vector(parsed_vec)) => {
940                    if parsed_vec.len() != *declared_dim {
941                        return Err(SQLRiteError::General(format!(
942                            "Vector dimension mismatch for column '{name}': declared {declared_dim}, got {}",
943                            parsed_vec.len()
944                        )));
945                    }
946                    Value::Vector(parsed_vec.clone())
947                }
948                (DataType::Vector(_), other) => {
949                    return Err(SQLRiteError::General(format!(
950                        "Type mismatch: expected VECTOR for column '{name}', got '{}'",
951                        other.to_display_string()
952                    )));
953                }
954                (DataType::Json, Value::Text(s)) => {
955                    // JSON values stored as Text. UNIQUE on a JSON column
956                    // compares the canonical text representation
957                    // verbatim — `{"a": 1}` and `{"a":1}` are distinct.
958                    // Document this if anyone actually requests UNIQUE
959                    // JSON; for MVP, treat-as-text is fine.
960                    Value::Text(s.clone())
961                }
962                (DataType::Json, other) => {
963                    return Err(SQLRiteError::General(format!(
964                        "Type mismatch: expected JSON for column '{name}', got '{}'",
965                        other.to_display_string()
966                    )));
967                }
968                (DataType::None | DataType::Invalid, _) => {
969                    return Err(SQLRiteError::Internal(format!(
970                        "column '{name}' has an unsupported datatype"
971                    )));
972                }
973            };
974
975            if let Some(secondary) = self.index_for_column(name) {
976                if secondary.would_violate_unique(&parsed) {
977                    return Err(SQLRiteError::General(format!(
978                        "UNIQUE constraint violated for column '{name}': value '{}' already exists",
979                        parsed.to_display_string()
980                    )));
981                }
982            } else {
983                // No secondary index (Real / Bool UNIQUE). Linear scan.
984                for other in self.rowids() {
985                    if self.get_value(name, other).as_ref() == Some(&parsed) {
986                        return Err(SQLRiteError::General(format!(
987                            "UNIQUE constraint violated for column '{name}': value '{}' already exists",
988                            parsed.to_display_string()
989                        )));
990                    }
991                }
992            }
993        }
994        Ok(())
995    }
996
997    /// Inserts all VALUES in its approprieta COLUMNS, using the ROWID an embedded INDEX on all ROWS
998    /// Every `Table` keeps track of the `last_rowid` in order to facilitate what the next one would be.
999    /// One limitation of this data structure is that we can only have one write transaction at a time, otherwise
1000    /// we could have a race condition on the last_rowid.
1001    ///
1002    /// Since we are loosely modeling after SQLite, this is also a limitation of SQLite (allowing only one write transcation at a time),
1003    /// So we are good. :)
1004    ///
1005    /// Returns `Err` (leaving the table unchanged) when the user supplies an
1006    /// incompatibly-typed value — no more panics on bad input.
1007    pub fn insert_row(&mut self, cols: &Vec<String>, values: &Vec<Option<Value>>) -> Result<()> {
1008        let mut next_rowid = self.last_rowid + 1;
1009
1010        // Auto-assign INTEGER PRIMARY KEY when the user omits it; otherwise
1011        // adopt the supplied value as the new rowid.
1012        if self.primary_key != "-1" {
1013            if !cols.iter().any(|col| col == &self.primary_key) {
1014                // Write the auto-assigned PK into row storage, then sync
1015                // the secondary index.
1016                let val = next_rowid as i32;
1017                let wrote_integer = {
1018                    let rows_clone = Arc::clone(&self.rows);
1019                    let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
1020                    let table_col_data = row_data.get_mut(&self.primary_key).ok_or_else(|| {
1021                        SQLRiteError::Internal(format!(
1022                            "Row storage missing for primary key column '{}'",
1023                            self.primary_key
1024                        ))
1025                    })?;
1026                    match table_col_data {
1027                        Row::Integer(tree) => {
1028                            tree.insert(next_rowid, val);
1029                            true
1030                        }
1031                        _ => false, // non-integer PK: auto-assign is a no-op
1032                    }
1033                };
1034                if wrote_integer {
1035                    let pk = self.primary_key.clone();
1036                    if let Some(idx) = self.index_for_column_mut(&pk) {
1037                        idx.insert(&Value::Integer(val as i64), next_rowid)?;
1038                    }
1039                }
1040            } else {
1041                for i in 0..cols.len() {
1042                    if cols[i] == self.primary_key {
1043                        next_rowid = match &values[i] {
1044                            Some(Value::Integer(n)) => *n,
1045                            None => {
1046                                return Err(SQLRiteError::General(format!(
1047                                    "Type mismatch: PRIMARY KEY column '{}' cannot be NULL",
1048                                    self.primary_key
1049                                )));
1050                            }
1051                            Some(other) => {
1052                                return Err(SQLRiteError::General(format!(
1053                                    "Type mismatch: PRIMARY KEY column '{}' expects INTEGER, got '{}'",
1054                                    self.primary_key,
1055                                    other.to_display_string()
1056                                )));
1057                            }
1058                        };
1059                    }
1060                }
1061            }
1062        }
1063
1064        // For every table column, either pick the supplied value or pad with NULL
1065        // so that every column's BTreeMap keeps the same rowid keyset.
1066        let column_names = self
1067            .columns
1068            .iter()
1069            .map(|col| col.column_name.to_string())
1070            .collect::<Vec<String>>();
1071        let mut j: usize = 0;
1072        for i in 0..column_names.len() {
1073            // `None` means SQL NULL: leave the column's BTreeMap entry
1074            // absent so reads come back as Value::Null via the missing-
1075            // rowid path.
1076            let mut val: Option<Value> = None;
1077            let key = &column_names[i];
1078            let mut column_supplied = false;
1079
1080            if let Some(supplied_key) = cols.get(j) {
1081                if supplied_key == &column_names[i] {
1082                    val = values[j].clone();
1083                    column_supplied = true;
1084                    j += 1;
1085                } else if self.primary_key == column_names[i] {
1086                    // PK already stored in the auto-assign branch above.
1087                    continue;
1088                }
1089            } else if self.primary_key == column_names[i] {
1090                continue;
1091            }
1092
1093            // Column was omitted from the INSERT column list. Substitute its
1094            // DEFAULT literal if one was declared at CREATE TABLE time;
1095            // otherwise it stays as None. SQLite semantics: an *explicit*
1096            // NULL is preserved as NULL — the default only fires for
1097            // omitted columns. `DEFAULT NULL` is treated as no default.
1098            if !column_supplied {
1099                val = self.columns[i]
1100                    .default
1101                    .clone()
1102                    .filter(|v| !matches!(v, Value::Null));
1103            }
1104
1105            // Step 1: write into row storage and compute the typed Value
1106            // we'll hand to the secondary index (if any).
1107            let typed_value: Option<Value> = {
1108                let rows_clone = Arc::clone(&self.rows);
1109                let mut row_data = rows_clone.lock().expect("rows mutex poisoned");
1110                let table_col_data = row_data.get_mut(key).ok_or_else(|| {
1111                    SQLRiteError::Internal(format!("Row storage missing for column '{key}'"))
1112                })?;
1113
1114                match (table_col_data, &val) {
1115                    // SQL NULL: leave the BTreeMap entry absent. Indexes are
1116                    // skipped (Step 2 below short-circuits on None).
1117                    (_, None) => None,
1118
1119                    (Row::Integer(tree), Some(Value::Integer(n))) => {
1120                        tree.insert(next_rowid, *n as i32);
1121                        Some(Value::Integer(*n))
1122                    }
1123                    (Row::Integer(_), Some(other)) => {
1124                        return Err(SQLRiteError::General(format!(
1125                            "Type mismatch: expected INTEGER for column '{key}', got '{}'",
1126                            other.to_display_string()
1127                        )));
1128                    }
1129
1130                    (Row::Text(tree), Some(Value::Text(s))) => {
1131                        // Phase 7e — JSON columns share Row::Text storage.
1132                        // Validate the value parses as JSON before storing;
1133                        // otherwise we'd happily write `not-json-at-all`
1134                        // and only fail when json_extract tried to parse
1135                        // it later.
1136                        if matches!(self.columns[i].datatype, DataType::Json) {
1137                            if let Err(e) = serde_json::from_str::<serde_json::Value>(s) {
1138                                return Err(SQLRiteError::General(format!(
1139                                    "Type mismatch: expected JSON for column '{key}', got '{s}': {e}"
1140                                )));
1141                            }
1142                        }
1143                        tree.insert(next_rowid, s.clone());
1144                        Some(Value::Text(s.clone()))
1145                    }
1146                    (Row::Text(_), Some(other)) => {
1147                        let label = if matches!(self.columns[i].datatype, DataType::Json) {
1148                            "JSON"
1149                        } else {
1150                            "TEXT"
1151                        };
1152                        return Err(SQLRiteError::General(format!(
1153                            "Type mismatch: expected {label} for column '{key}', got '{}'",
1154                            other.to_display_string()
1155                        )));
1156                    }
1157
1158                    (Row::Real(tree), Some(Value::Real(f))) => {
1159                        let f32_val = *f as f32;
1160                        tree.insert(next_rowid, f32_val);
1161                        Some(Value::Real(*f))
1162                    }
1163                    // Allow integer literals to widen into REAL columns
1164                    // (matches the previous string-parse behavior where
1165                    // `INSERT … VALUES (42)` into a REAL column worked).
1166                    (Row::Real(tree), Some(Value::Integer(n))) => {
1167                        let f32_val = *n as f32;
1168                        tree.insert(next_rowid, f32_val);
1169                        Some(Value::Real(*n as f64))
1170                    }
1171                    (Row::Real(_), Some(other)) => {
1172                        return Err(SQLRiteError::General(format!(
1173                            "Type mismatch: expected REAL for column '{key}', got '{}'",
1174                            other.to_display_string()
1175                        )));
1176                    }
1177
1178                    (Row::Bool(tree), Some(Value::Bool(b))) => {
1179                        tree.insert(next_rowid, *b);
1180                        Some(Value::Bool(*b))
1181                    }
1182                    (Row::Bool(_), Some(other)) => {
1183                        return Err(SQLRiteError::General(format!(
1184                            "Type mismatch: expected BOOL for column '{key}', got '{}'",
1185                            other.to_display_string()
1186                        )));
1187                    }
1188
1189                    (Row::Vector(tree), Some(Value::Vector(parsed))) => {
1190                        // The parser already turned a bracket-array literal
1191                        // into a typed Value::Vector. We still need to
1192                        // dim-check against the column's declared
1193                        // DataType::Vector(N).
1194                        let declared_dim = match &self.columns[i].datatype {
1195                            DataType::Vector(d) => *d,
1196                            other => {
1197                                return Err(SQLRiteError::Internal(format!(
1198                                    "Row::Vector storage on non-Vector column '{key}' (declared as {other})"
1199                                )));
1200                            }
1201                        };
1202                        if parsed.len() != declared_dim {
1203                            return Err(SQLRiteError::General(format!(
1204                                "Vector dimension mismatch for column '{key}': declared {declared_dim}, got {}",
1205                                parsed.len()
1206                            )));
1207                        }
1208                        tree.insert(next_rowid, parsed.clone());
1209                        Some(Value::Vector(parsed.clone()))
1210                    }
1211                    (Row::Vector(_), Some(other)) => {
1212                        return Err(SQLRiteError::General(format!(
1213                            "Type mismatch: expected VECTOR for column '{key}', got '{}'",
1214                            other.to_display_string()
1215                        )));
1216                    }
1217
1218                    (Row::None, _) => {
1219                        return Err(SQLRiteError::Internal(format!(
1220                            "Column '{key}' has no row storage"
1221                        )));
1222                    }
1223                }
1224            };
1225
1226            // Step 2: maintain the secondary index (if any). insert() is a
1227            // no-op for Value::Null and cheap for other value kinds.
1228            if let Some(v) = typed_value.clone() {
1229                if let Some(idx) = self.index_for_column_mut(key) {
1230                    idx.insert(&v, next_rowid)?;
1231                }
1232            }
1233
1234            // Step 3 (Phase 7d.2): maintain any HNSW indexes on this column.
1235            // The HNSW algorithm needs access to other rows' vectors when
1236            // wiring up neighbor edges, so build a get_vec closure that
1237            // pulls from the table's row storage (which we *just* updated
1238            // with the new value).
1239            if let Some(Value::Vector(new_vec)) = &typed_value {
1240                self.maintain_hnsw_on_insert(key, next_rowid, new_vec);
1241            }
1242
1243            // Step 4 (Phase 8b): maintain any FTS indexes on this column.
1244            // Cheap incremental update — PostingList::insert tokenizes
1245            // the value and adds postings under the new rowid. DELETE
1246            // and UPDATE take the rebuild-on-save path instead (Q7).
1247            if let Some(Value::Text(text)) = &typed_value {
1248                self.maintain_fts_on_insert(key, next_rowid, text);
1249            }
1250        }
1251        self.last_rowid = next_rowid;
1252        Ok(())
1253    }
1254
1255    /// After a row insert, push the new (rowid, vector) into every HNSW
1256    /// index whose column matches `column`. Split out of `insert_row` so
1257    /// the borrowing dance — we need both `&self.rows` (read other
1258    /// vectors) and `&mut self.hnsw_indexes` (insert into the graph) —
1259    /// stays localized.
1260    fn maintain_hnsw_on_insert(&mut self, column: &str, rowid: i64, new_vec: &[f32]) {
1261        // Snapshot the current vector storage so the get_vec closure
1262        // doesn't fight with `&mut self.hnsw_indexes`. For a typical
1263        // HNSW insert we touch ef_construction × log(N) other vectors,
1264        // so the snapshot cost is small relative to the graph wiring.
1265        let mut vec_snapshot: HashMap<i64, Vec<f32>> = HashMap::new();
1266        {
1267            let row_data = self.rows.lock().expect("rows mutex poisoned");
1268            if let Some(Row::Vector(map)) = row_data.get(column) {
1269                for (id, v) in map.iter() {
1270                    vec_snapshot.insert(*id, v.clone());
1271                }
1272            }
1273        }
1274        // The new row was just written into row storage — make sure the
1275        // snapshot reflects it (it should, but defensive).
1276        vec_snapshot.insert(rowid, new_vec.to_vec());
1277
1278        for entry in &mut self.hnsw_indexes {
1279            if entry.column_name == column {
1280                entry.index.insert(rowid, new_vec, |id| {
1281                    vec_snapshot.get(&id).cloned().unwrap_or_default()
1282                });
1283            }
1284        }
1285    }
1286
1287    /// After a row insert, push the new (rowid, text) into every FTS
1288    /// index whose column matches `column`. Phase 8b.
1289    ///
1290    /// Mirrors [`Self::maintain_hnsw_on_insert`] but the FTS index is
1291    /// self-contained — `PostingList::insert` only needs the new doc's
1292    /// text, not the rest of the corpus, so there's no snapshot dance.
1293    fn maintain_fts_on_insert(&mut self, column: &str, rowid: i64, text: &str) {
1294        for entry in &mut self.fts_indexes {
1295            if entry.column_name == column {
1296                entry.index.insert(rowid, text);
1297            }
1298        }
1299    }
1300
1301    /// Print the table schema to standard output in a pretty formatted way.
1302    ///
1303    /// # Example
1304    ///
1305    /// ```text
1306    /// let table = Table::new(payload);
1307    /// table.print_table_schema();
1308    ///
1309    /// Prints to standard output:
1310    ///    +-------------+-----------+-------------+--------+----------+
1311    ///    | Column Name | Data Type | PRIMARY KEY | UNIQUE | NOT NULL |
1312    ///    +-------------+-----------+-------------+--------+----------+
1313    ///    | id          | Integer   | true        | true   | true     |
1314    ///    +-------------+-----------+-------------+--------+----------+
1315    ///    | name        | Text      | false       | true   | false    |
1316    ///    +-------------+-----------+-------------+--------+----------+
1317    ///    | email       | Text      | false       | false  | false    |
1318    ///    +-------------+-----------+-------------+--------+----------+
1319    /// ```
1320    ///
1321    pub fn print_table_schema(&self) -> Result<usize> {
1322        let mut table = PrintTable::new();
1323        table.add_row(row![
1324            "Column Name",
1325            "Data Type",
1326            "PRIMARY KEY",
1327            "UNIQUE",
1328            "NOT NULL"
1329        ]);
1330
1331        for col in &self.columns {
1332            table.add_row(row![
1333                col.column_name,
1334                col.datatype,
1335                col.is_pk,
1336                col.is_unique,
1337                col.not_null
1338            ]);
1339        }
1340
1341        table.printstd();
1342        Ok(table.len() * 2 + 1)
1343    }
1344
1345    /// Print the table data to standard output in a pretty formatted way.
1346    ///
1347    /// # Example
1348    ///
1349    /// ```text
1350    /// let db_table = db.get_table_mut(table_name.to_string()).unwrap();
1351    /// db_table.print_table_data();
1352    ///
1353    /// Prints to standard output:
1354    ///     +----+---------+------------------------+
1355    ///     | id | name    | email                  |
1356    ///     +----+---------+------------------------+
1357    ///     | 1  | "Jack"  | "jack@mail.com"        |
1358    ///     +----+---------+------------------------+
1359    ///     | 10 | "Bob"   | "bob@main.com"         |
1360    ///     +----+---------+------------------------+
1361    ///     | 11 | "Bill"  | "bill@main.com"        |
1362    ///     +----+---------+------------------------+
1363    /// ```
1364    ///
1365    pub fn print_table_data(&self) {
1366        let mut print_table = PrintTable::new();
1367
1368        let column_names = self
1369            .columns
1370            .iter()
1371            .map(|col| col.column_name.to_string())
1372            .collect::<Vec<String>>();
1373
1374        let header_row = PrintRow::new(
1375            column_names
1376                .iter()
1377                .map(|col| PrintCell::new(col))
1378                .collect::<Vec<PrintCell>>(),
1379        );
1380
1381        let rows_clone = Arc::clone(&self.rows);
1382        let row_data = rows_clone.lock().expect("rows mutex poisoned");
1383        let first_col_data = row_data
1384            .get(&self.columns.first().unwrap().column_name)
1385            .unwrap();
1386        let num_rows = first_col_data.count();
1387        let mut print_table_rows: Vec<PrintRow> = vec![PrintRow::new(vec![]); num_rows];
1388
1389        for col_name in &column_names {
1390            let col_val = row_data
1391                .get(col_name)
1392                .expect("Can't find any rows with the given column");
1393            let columns: Vec<String> = col_val.get_serialized_col_data();
1394
1395            for i in 0..num_rows {
1396                if let Some(cell) = &columns.get(i) {
1397                    print_table_rows[i].add_cell(PrintCell::new(cell));
1398                } else {
1399                    print_table_rows[i].add_cell(PrintCell::new(""));
1400                }
1401            }
1402        }
1403
1404        print_table.add_row(header_row);
1405        for row in print_table_rows {
1406            print_table.add_row(row);
1407        }
1408
1409        print_table.printstd();
1410    }
1411}
1412
1413/// The schema for each SQL column in every table.
1414///
1415/// Per-column index state moved to `Table::secondary_indexes` in Phase 3e —
1416/// a single `Column` describes the declared schema (name, type, constraints)
1417/// and nothing more.
1418#[derive(PartialEq, Debug, Clone)]
1419pub struct Column {
1420    pub column_name: String,
1421    pub datatype: DataType,
1422    pub is_pk: bool,
1423    pub not_null: bool,
1424    pub is_unique: bool,
1425    /// Literal value to substitute when this column is omitted from an
1426    /// INSERT. Restricted to literal expressions at CREATE TABLE time.
1427    /// `None` means "no DEFAULT declared"; an INSERT that omits the column
1428    /// gets `Value::Null` instead.
1429    pub default: Option<Value>,
1430}
1431
1432impl Column {
1433    /// Builds a `Column` without a `DEFAULT` clause. Existing call sites
1434    /// (catalog-table setup, test fixtures) keep working unchanged.
1435    pub fn new(
1436        name: String,
1437        datatype: String,
1438        is_pk: bool,
1439        not_null: bool,
1440        is_unique: bool,
1441    ) -> Self {
1442        Self::with_default(name, datatype, is_pk, not_null, is_unique, None)
1443    }
1444
1445    /// Builds a `Column` with an optional `DEFAULT` literal. Used by the
1446    /// CREATE TABLE / `parse_create_sql` paths that propagate user-supplied
1447    /// defaults from `ParsedColumn`.
1448    pub fn with_default(
1449        name: String,
1450        datatype: String,
1451        is_pk: bool,
1452        not_null: bool,
1453        is_unique: bool,
1454        default: Option<Value>,
1455    ) -> Self {
1456        let dt = DataType::new(datatype);
1457        Column {
1458            column_name: name,
1459            datatype: dt,
1460            is_pk,
1461            not_null,
1462            is_unique,
1463            default,
1464        }
1465    }
1466}
1467
1468/// The schema for each SQL row in every table is represented in memory
1469/// by following structure
1470///
1471/// This is an enum representing each of the available types organized in a BTreeMap
1472/// data structure, using the ROWID and key and each corresponding type as value
1473#[derive(PartialEq, Debug, Clone)]
1474pub enum Row {
1475    Integer(BTreeMap<i64, i32>),
1476    Text(BTreeMap<i64, String>),
1477    Real(BTreeMap<i64, f32>),
1478    Bool(BTreeMap<i64, bool>),
1479    /// Phase 7a: dense f32 vector storage. Each `Vec<f32>` should have
1480    /// length matching the column's declared `DataType::Vector(dim)`,
1481    /// enforced at INSERT time. The Row variant doesn't carry the dim —
1482    /// it lives in the column metadata.
1483    Vector(BTreeMap<i64, Vec<f32>>),
1484    None,
1485}
1486
1487impl Row {
1488    fn get_serialized_col_data(&self) -> Vec<String> {
1489        match self {
1490            Row::Integer(cd) => cd.values().map(|v| v.to_string()).collect(),
1491            Row::Real(cd) => cd.values().map(|v| v.to_string()).collect(),
1492            Row::Text(cd) => cd.values().map(|v| v.to_string()).collect(),
1493            Row::Bool(cd) => cd.values().map(|v| v.to_string()).collect(),
1494            Row::Vector(cd) => cd.values().map(format_vector_for_display).collect(),
1495            Row::None => panic!("Found None in columns"),
1496        }
1497    }
1498
1499    fn count(&self) -> usize {
1500        match self {
1501            Row::Integer(cd) => cd.len(),
1502            Row::Real(cd) => cd.len(),
1503            Row::Text(cd) => cd.len(),
1504            Row::Bool(cd) => cd.len(),
1505            Row::Vector(cd) => cd.len(),
1506            Row::None => panic!("Found None in columns"),
1507        }
1508    }
1509
1510    /// Every column's BTreeMap is keyed by ROWID. All columns share the same keyset
1511    /// after an INSERT (missing columns are padded), so any column's keys are a valid
1512    /// iteration of the table's rowids.
1513    pub fn rowids(&self) -> Vec<i64> {
1514        match self {
1515            Row::Integer(m) => m.keys().copied().collect(),
1516            Row::Text(m) => m.keys().copied().collect(),
1517            Row::Real(m) => m.keys().copied().collect(),
1518            Row::Bool(m) => m.keys().copied().collect(),
1519            Row::Vector(m) => m.keys().copied().collect(),
1520            Row::None => vec![],
1521        }
1522    }
1523
1524    pub fn get(&self, rowid: i64) -> Option<Value> {
1525        match self {
1526            Row::Integer(m) => m.get(&rowid).map(|v| Value::Integer(i64::from(*v))),
1527            // INSERT stores the literal string "Null" in Text columns that were omitted
1528            // from the query — re-map that back to a real NULL on read.
1529            Row::Text(m) => m.get(&rowid).map(|v| {
1530                if v == "Null" {
1531                    Value::Null
1532                } else {
1533                    Value::Text(v.clone())
1534                }
1535            }),
1536            Row::Real(m) => m.get(&rowid).map(|v| Value::Real(f64::from(*v))),
1537            Row::Bool(m) => m.get(&rowid).map(|v| Value::Bool(*v)),
1538            Row::Vector(m) => m.get(&rowid).map(|v| Value::Vector(v.clone())),
1539            Row::None => None,
1540        }
1541    }
1542}
1543
1544/// Render a vector for human display. Used by both `Row::get_serialized_col_data`
1545/// (for the REPL's print-table path) and `Value::to_display_string`.
1546///
1547/// Format: `[0.1, 0.2, 0.3]` — JSON-like, decimal-minimal via `{}` Display.
1548/// For high-dimensional vectors (e.g. 384 elements) this produces a long
1549/// line; truncation ellipsis is a future polish (see Phase 7 plan, "What
1550/// this proposal does NOT commit to").
1551fn format_vector_for_display(v: &Vec<f32>) -> String {
1552    let mut s = String::with_capacity(v.len() * 6 + 2);
1553    s.push('[');
1554    for (i, x) in v.iter().enumerate() {
1555        if i > 0 {
1556            s.push_str(", ");
1557        }
1558        // Default f32 Display picks the minimal-roundtrip representation,
1559        // so 0.1f32 prints as "0.1" not "0.10000000149011612". Good enough.
1560        s.push_str(&x.to_string());
1561    }
1562    s.push(']');
1563    s
1564}
1565
1566/// Runtime value produced by query execution. Separate from the on-disk `Row` enum
1567/// so the executor can carry typed values (including NULL) across operators.
1568#[derive(Debug, Clone, PartialEq)]
1569pub enum Value {
1570    Integer(i64),
1571    Text(String),
1572    Real(f64),
1573    Bool(bool),
1574    /// Phase 7a: dense f32 vector as a runtime value. Carries its own
1575    /// dimension implicitly via `Vec::len`; the column it's being
1576    /// assigned to has a declared `DataType::Vector(N)` that's checked
1577    /// at INSERT/UPDATE time.
1578    Vector(Vec<f32>),
1579    Null,
1580}
1581
1582impl Value {
1583    pub fn to_display_string(&self) -> String {
1584        match self {
1585            Value::Integer(v) => v.to_string(),
1586            Value::Text(s) => s.clone(),
1587            Value::Real(f) => f.to_string(),
1588            Value::Bool(b) => b.to_string(),
1589            Value::Vector(v) => format_vector_for_display(v),
1590            Value::Null => String::from("NULL"),
1591        }
1592    }
1593}
1594
1595/// Parse a bracket-array literal like `"[0.1, 0.2, 0.3]"` (or `"[1, 2, 3]"`)
1596/// into a `Vec<f32>`. The parser/insert pipeline stores vector literals as
1597/// strings in `InsertQuery::rows` (a `Vec<Vec<String>>`); this helper is
1598/// the inverse — turn the string back into a typed vector at the boundary
1599/// where we actually need element-typed data.
1600///
1601/// Accepts:
1602/// - `[]` → empty vector (caller's dimension check rejects it for VECTOR(N≥1))
1603/// - `[0.1, 0.2, 0.3]` → standard float syntax
1604/// - `[1, 2, 3]` → integers, coerced to f32 (matches `VALUES (1, 2)` for
1605///   `REAL` columns; we widen ints to floats automatically)
1606/// - whitespace tolerated everywhere (Python/JSON/pgvector convention)
1607///
1608/// Rejects with a descriptive message:
1609/// - missing `[` / `]`
1610/// - non-numeric elements (`['foo', 0.1]`)
1611/// - NaN / Inf literals (we accept them via `f32::from_str` but caller can
1612///   reject if undesired — for now we let them through; HNSW etc. will
1613///   reject NaN at index time)
1614pub fn parse_vector_literal(s: &str) -> Result<Vec<f32>> {
1615    let trimmed = s.trim();
1616    if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
1617        return Err(SQLRiteError::General(format!(
1618            "expected bracket-array literal `[...]`, got `{s}`"
1619        )));
1620    }
1621    let inner = &trimmed[1..trimmed.len() - 1].trim();
1622    if inner.is_empty() {
1623        return Ok(Vec::new());
1624    }
1625    let mut out = Vec::new();
1626    for (i, part) in inner.split(',').enumerate() {
1627        let element = part.trim();
1628        let parsed: f32 = element.parse().map_err(|_| {
1629            SQLRiteError::General(format!("vector element {i} (`{element}`) is not a number"))
1630        })?;
1631        out.push(parsed);
1632    }
1633    Ok(out)
1634}
1635
1636#[cfg(test)]
1637mod tests {
1638    use super::*;
1639    use crate::sql::dialect::SqlriteDialect;
1640    use sqlparser::parser::Parser;
1641
1642    #[test]
1643    fn datatype_display_trait_test() {
1644        let integer = DataType::Integer;
1645        let text = DataType::Text;
1646        let real = DataType::Real;
1647        let boolean = DataType::Bool;
1648        let vector = DataType::Vector(384);
1649        let none = DataType::None;
1650        let invalid = DataType::Invalid;
1651
1652        assert_eq!(format!("{}", integer), "Integer");
1653        assert_eq!(format!("{}", text), "Text");
1654        assert_eq!(format!("{}", real), "Real");
1655        assert_eq!(format!("{}", boolean), "Boolean");
1656        assert_eq!(format!("{}", vector), "Vector(384)");
1657        assert_eq!(format!("{}", none), "None");
1658        assert_eq!(format!("{}", invalid), "Invalid");
1659    }
1660
1661    // -----------------------------------------------------------------
1662    // Phase 7a — VECTOR(N) column type
1663    // -----------------------------------------------------------------
1664
1665    #[test]
1666    fn datatype_new_parses_vector_dim() {
1667        // Standard cases.
1668        assert_eq!(DataType::new("vector(1)".to_string()), DataType::Vector(1));
1669        assert_eq!(
1670            DataType::new("vector(384)".to_string()),
1671            DataType::Vector(384)
1672        );
1673        assert_eq!(
1674            DataType::new("vector(1536)".to_string()),
1675            DataType::Vector(1536)
1676        );
1677
1678        // Case-insensitive on the keyword.
1679        assert_eq!(
1680            DataType::new("VECTOR(384)".to_string()),
1681            DataType::Vector(384)
1682        );
1683
1684        // Whitespace inside parens tolerated (the create-parser strips it
1685        // but the string-based round-trip in DataType::new is the one place
1686        // we don't fully control input formatting).
1687        assert_eq!(
1688            DataType::new("vector( 64 )".to_string()),
1689            DataType::Vector(64)
1690        );
1691    }
1692
1693    #[test]
1694    fn datatype_new_rejects_bad_vector_strings() {
1695        // dim = 0 is rejected (Q2: VECTOR(N≥1)).
1696        assert_eq!(DataType::new("vector(0)".to_string()), DataType::Invalid);
1697        // Non-numeric dim.
1698        assert_eq!(DataType::new("vector(abc)".to_string()), DataType::Invalid);
1699        // Empty parens.
1700        assert_eq!(DataType::new("vector()".to_string()), DataType::Invalid);
1701        // Negative dim wouldn't even parse as usize, so falls into Invalid.
1702        assert_eq!(DataType::new("vector(-3)".to_string()), DataType::Invalid);
1703    }
1704
1705    #[test]
1706    fn datatype_to_wire_string_round_trips_vector() {
1707        let dt = DataType::Vector(384);
1708        let wire = dt.to_wire_string();
1709        assert_eq!(wire, "vector(384)");
1710        // And feeds back through DataType::new losslessly — this is the
1711        // round-trip the ParsedColumn pipeline relies on.
1712        assert_eq!(DataType::new(wire), DataType::Vector(384));
1713    }
1714
1715    #[test]
1716    fn parse_vector_literal_accepts_floats() {
1717        let v = parse_vector_literal("[0.1, 0.2, 0.3]").expect("parse");
1718        assert_eq!(v, vec![0.1f32, 0.2, 0.3]);
1719    }
1720
1721    #[test]
1722    fn parse_vector_literal_accepts_ints_widening_to_f32() {
1723        let v = parse_vector_literal("[1, 2, 3]").expect("parse");
1724        assert_eq!(v, vec![1.0f32, 2.0, 3.0]);
1725    }
1726
1727    #[test]
1728    fn parse_vector_literal_handles_negatives_and_whitespace() {
1729        let v = parse_vector_literal("[ -1.5 ,  2.0,  -3.5 ]").expect("parse");
1730        assert_eq!(v, vec![-1.5f32, 2.0, -3.5]);
1731    }
1732
1733    #[test]
1734    fn parse_vector_literal_empty_brackets_is_empty_vec() {
1735        let v = parse_vector_literal("[]").expect("parse");
1736        assert!(v.is_empty());
1737    }
1738
1739    #[test]
1740    fn parse_vector_literal_rejects_non_bracketed() {
1741        assert!(parse_vector_literal("0.1, 0.2").is_err());
1742        assert!(parse_vector_literal("(0.1, 0.2)").is_err());
1743        assert!(parse_vector_literal("[0.1, 0.2").is_err()); // missing ]
1744        assert!(parse_vector_literal("0.1, 0.2]").is_err()); // missing [
1745    }
1746
1747    #[test]
1748    fn parse_vector_literal_rejects_non_numeric_elements() {
1749        let err = parse_vector_literal("[1.0, 'foo', 3.0]").unwrap_err();
1750        let msg = format!("{err}");
1751        assert!(
1752            msg.contains("vector element 1") && msg.contains("'foo'"),
1753            "error message should pinpoint the bad element: got `{msg}`"
1754        );
1755    }
1756
1757    #[test]
1758    fn value_vector_display_format() {
1759        let v = Value::Vector(vec![0.1, 0.2, 0.3]);
1760        assert_eq!(v.to_display_string(), "[0.1, 0.2, 0.3]");
1761
1762        // Empty vector displays as `[]`.
1763        let empty = Value::Vector(vec![]);
1764        assert_eq!(empty.to_display_string(), "[]");
1765    }
1766
1767    #[test]
1768    fn create_new_table_test() {
1769        let query_statement = "CREATE TABLE contacts (
1770            id INTEGER PRIMARY KEY,
1771            first_name TEXT NOT NULL,
1772            last_name TEXT NOT NULl,
1773            email TEXT NOT NULL UNIQUE,
1774            active BOOL,
1775            score REAL
1776        );";
1777        let dialect = SqlriteDialect::new();
1778        let mut ast = Parser::parse_sql(&dialect, query_statement).unwrap();
1779        if ast.len() > 1 {
1780            panic!("Expected a single query statement, but there are more then 1.")
1781        }
1782        let query = ast.pop().unwrap();
1783
1784        let create_query = CreateQuery::new(&query).unwrap();
1785
1786        let table = Table::new(create_query);
1787
1788        assert_eq!(table.columns.len(), 6);
1789        assert_eq!(table.last_rowid, 0);
1790
1791        let id_column = "id".to_string();
1792        if let Some(column) = table
1793            .columns
1794            .iter()
1795            .filter(|c| c.column_name == id_column)
1796            .collect::<Vec<&Column>>()
1797            .first()
1798        {
1799            assert!(column.is_pk);
1800            assert_eq!(column.datatype, DataType::Integer);
1801        } else {
1802            panic!("column not found");
1803        }
1804    }
1805
1806    #[test]
1807    fn print_table_schema_test() {
1808        let query_statement = "CREATE TABLE contacts (
1809            id INTEGER PRIMARY KEY,
1810            first_name TEXT NOT NULL,
1811            last_name TEXT NOT NULl
1812        );";
1813        let dialect = SqlriteDialect::new();
1814        let mut ast = Parser::parse_sql(&dialect, query_statement).unwrap();
1815        if ast.len() > 1 {
1816            panic!("Expected a single query statement, but there are more then 1.")
1817        }
1818        let query = ast.pop().unwrap();
1819
1820        let create_query = CreateQuery::new(&query).unwrap();
1821
1822        let table = Table::new(create_query);
1823        let lines_printed = table.print_table_schema();
1824        assert_eq!(lines_printed, Ok(9));
1825    }
1826}