Skip to main content

contextdb_relational/
store.rs

1use contextdb_core::{
2    DirectedValue, IndexKey, RowId, SortDirection, TableMeta, TableName, TotalOrdAsc, TotalOrdDesc,
3    TxId, Value, VersionedRow,
4};
5use parking_lot::RwLock;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexEntry {
11    pub row_id: RowId,
12    pub created_tx: TxId,
13    pub deleted_tx: Option<TxId>,
14}
15
16impl IndexEntry {
17    pub fn visible_at(&self, snapshot: contextdb_core::SnapshotId) -> bool {
18        self.created_tx.0 <= snapshot.0 && self.deleted_tx.is_none_or(|tx| tx.0 > snapshot.0)
19    }
20}
21
22/// Storage for one declared index: BTreeMap keyed by `IndexKey` (direction-
23/// wrapped per column) to a Vec of posting entries with MVCC visibility.
24#[derive(Debug, Default)]
25pub struct IndexStorage {
26    pub columns: Vec<(String, SortDirection)>,
27    /// Non-unique keys map to Vec<IndexEntry>; tie-break by row_id ascending
28    /// is maintained at insert time (I18).
29    pub tree: BTreeMap<IndexKey, Vec<IndexEntry>>,
30}
31
32impl IndexStorage {
33    pub fn new(columns: Vec<(String, SortDirection)>) -> Self {
34        Self {
35            columns,
36            tree: BTreeMap::new(),
37        }
38    }
39
40    /// Number of live (non-tombstoned) postings, useful for leak detection.
41    pub fn total_entries(&self) -> u64 {
42        self.tree
43            .values()
44            .map(|v| v.iter().filter(|e| e.deleted_tx.is_none()).count() as u64)
45            .sum()
46    }
47
48    /// Total postings including tombstones (for DROP TABLE leak detection).
49    pub fn total_entries_including_tombstones(&self) -> u64 {
50        self.tree.values().map(|v| v.len() as u64).sum()
51    }
52
53    /// Insert a posting at the given key, placing it in row_id-ascending order.
54    pub fn insert_posting(&mut self, key: IndexKey, entry: IndexEntry) {
55        let vec = self.tree.entry(key).or_default();
56        let pos = vec
57            .binary_search_by(|e| e.row_id.cmp(&entry.row_id))
58            .unwrap_or_else(|i| i);
59        vec.insert(pos, entry);
60    }
61
62    /// Stamp `deleted_tx` on the posting matching `row_id` at `key`.
63    pub fn tombstone_posting(&mut self, key: &IndexKey, row_id: RowId, deleted_tx: TxId) {
64        if let Some(vec) = self.tree.get_mut(key) {
65            for entry in vec.iter_mut() {
66                if entry.row_id == row_id && entry.deleted_tx.is_none() {
67                    entry.deleted_tx = Some(deleted_tx);
68                    return;
69                }
70            }
71        }
72    }
73}
74
75pub struct RelationalStore {
76    pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
77    pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
78    /// (table, index_name) → IndexStorage. Lives alongside TableMeta.indexes
79    /// so readers / writers can look up the physical B-tree without needing a
80    /// second-level lock per index.
81    pub indexes: RwLock<HashMap<(TableName, String), IndexStorage>>,
82    /// Counts how many times `apply_changes`-style batch-level index-lock
83    /// acquisitions have happened. The per-row commit path does not bump this
84    /// counter; only coarse batch holders (`apply_changes` wrapping N rows) do.
85    pub index_write_lock_count: AtomicU64,
86    next_row_id: AtomicU64,
87}
88
89impl Default for RelationalStore {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl RelationalStore {
96    pub fn new() -> Self {
97        Self {
98            tables: RwLock::new(HashMap::new()),
99            table_meta: RwLock::new(HashMap::new()),
100            indexes: RwLock::new(HashMap::new()),
101            index_write_lock_count: AtomicU64::new(0),
102            next_row_id: AtomicU64::new(1),
103        }
104    }
105
106    pub fn new_row_id(&self) -> RowId {
107        RowId(self.next_row_id.fetch_add(1, Ordering::SeqCst))
108    }
109
110    /// Apply row inserts AND maintain every index (user-declared AND auto)
111    /// on the affected tables. The index update runs under the same
112    /// write-lock scope that the relational insert takes (implicitly held
113    /// by the caller's commit_mutex).
114    pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
115        let mut tables = self.tables.write();
116        let mut indexes = self.indexes.write();
117        for (table_name, row) in inserts {
118            let entry = IndexEntry {
119                row_id: row.row_id,
120                created_tx: row.created_tx,
121                deleted_tx: row.deleted_tx,
122            };
123            for ((t, _), idx) in indexes.iter_mut() {
124                if t != &table_name {
125                    continue;
126                }
127                let key = index_key_for_row(&idx.columns, &row.values);
128                idx.insert_posting(key, entry.clone());
129            }
130            tables.entry(table_name).or_default().push(row);
131        }
132    }
133
134    pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, TxId)>) {
135        let mut tables = self.tables.write();
136        let mut indexes = self.indexes.write();
137        for (table_name, row_id, deleted_tx) in deletes {
138            let row_values: Option<HashMap<String, Value>> = tables
139                .get(&table_name)
140                .and_then(|rows| rows.iter().find(|r| r.row_id == row_id))
141                .map(|r| r.values.clone());
142            if let Some(values) = row_values {
143                for ((t, _), idx) in indexes.iter_mut() {
144                    if t != &table_name {
145                        continue;
146                    }
147                    let key = index_key_for_row(&idx.columns, &values);
148                    idx.tombstone_posting(&key, row_id, deleted_tx);
149                }
150            }
151            if let Some(rows) = tables.get_mut(&table_name) {
152                for row in rows.iter_mut() {
153                    if row.row_id == row_id && row.deleted_tx.is_none() {
154                        row.deleted_tx = Some(deleted_tx);
155                    }
156                }
157            }
158        }
159    }
160
161    pub fn create_table(&self, name: &str, meta: TableMeta) {
162        self.tables.write().entry(name.to_string()).or_default();
163        self.table_meta.write().insert(name.to_string(), meta);
164    }
165
166    pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
167        // Row-load during Database::open: populate every index (user-declared
168        // + auto) so the rebuild stays in lockstep. Rows arrive in row_id
169        // ascending order, satisfying I18.
170        {
171            let mut indexes = self.indexes.write();
172            let entry = IndexEntry {
173                row_id: row.row_id,
174                created_tx: row.created_tx,
175                deleted_tx: row.deleted_tx,
176            };
177            for ((t, _), idx) in indexes.iter_mut() {
178                if t != name {
179                    continue;
180                }
181                let key = index_key_for_row(&idx.columns, &row.values);
182                idx.insert_posting(key, entry.clone());
183            }
184        }
185        self.tables
186            .write()
187            .entry(name.to_string())
188            .or_default()
189            .push(row);
190    }
191
192    pub fn max_row_id(&self) -> RowId {
193        self.tables
194            .read()
195            .values()
196            .flat_map(|rows| rows.iter().map(|row| row.row_id))
197            .max()
198            .unwrap_or(RowId(0))
199    }
200
201    pub fn set_next_row_id(&self, next_row_id: RowId) {
202        self.next_row_id.store(next_row_id.0, Ordering::SeqCst);
203    }
204
205    pub fn drop_table(&self, name: &str) {
206        self.tables.write().remove(name);
207        self.table_meta.write().remove(name);
208        // Drop all indexes whose key-table matches; releases BTreeMap storage.
209        let mut indexes = self.indexes.write();
210        indexes.retain(|(table, _), _| table != name);
211    }
212
213    /// Register a new index storage for (table, name). The IndexDecl must
214    /// already be present in the table's TableMeta (callers should
215    /// `register_index_meta` before `create_index_storage`).
216    pub fn create_index_storage(
217        &self,
218        table: &str,
219        name: &str,
220        columns: Vec<(String, SortDirection)>,
221    ) {
222        self.indexes.write().insert(
223            (table.to_string(), name.to_string()),
224            IndexStorage::new(columns),
225        );
226    }
227
228    /// Remove an index storage. Called from DROP INDEX / CASCADE.
229    pub fn drop_index_storage(&self, table: &str, name: &str) {
230        self.indexes
231            .write()
232            .remove(&(table.to_string(), name.to_string()));
233    }
234
235    /// Build (or rebuild) an index storage by scanning the current table rows.
236    /// Used after Database::open completes rebuilding TableMeta.indexes but
237    /// before the executor sees queries. Iterates in row_id-ascending order
238    /// to preserve I18 tie-break stability.
239    pub fn rebuild_index(&self, table: &str, name: &str) {
240        let columns = {
241            let indexes = self.indexes.read();
242            match indexes.get(&(table.to_string(), name.to_string())) {
243                Some(idx) => idx.columns.clone(),
244                None => return,
245            }
246        };
247        let mut rebuilt = IndexStorage::new(columns.clone());
248        let tables = self.tables.read();
249        if let Some(rows) = tables.get(table) {
250            let mut sorted: Vec<&VersionedRow> = rows.iter().collect();
251            sorted.sort_by_key(|r| r.row_id);
252            for row in sorted {
253                let key = index_key_for_row(&columns, &row.values);
254                rebuilt.insert_posting(
255                    key,
256                    IndexEntry {
257                        row_id: row.row_id,
258                        created_tx: row.created_tx,
259                        deleted_tx: row.deleted_tx,
260                    },
261                );
262            }
263        }
264        self.indexes
265            .write()
266            .insert((table.to_string(), name.to_string()), rebuilt);
267    }
268
269    /// Introspect total postings across all indexes (including tombstones).
270    /// Tests use this to confirm DROP TABLE releases index storage.
271    pub fn introspect_indexes_total_entries(&self) -> u64 {
272        self.indexes
273            .read()
274            .values()
275            .map(|s| s.total_entries_including_tombstones())
276            .sum()
277    }
278
279    /// Bump the batch-level index-write lock counter. Called once per
280    /// `apply_changes` batch to prove I14.
281    pub fn bump_index_write_lock_count(&self) {
282        self.index_write_lock_count.fetch_add(1, Ordering::SeqCst);
283    }
284
285    pub fn index_write_lock_count(&self) -> u64 {
286        self.index_write_lock_count.load(Ordering::SeqCst)
287    }
288
289    pub fn alter_table_add_column(
290        &self,
291        table: &str,
292        col: contextdb_core::ColumnDef,
293    ) -> Result<(), String> {
294        let mut meta = self.table_meta.write();
295        let m = meta
296            .get_mut(table)
297            .ok_or_else(|| format!("table '{}' not found", table))?;
298        if m.columns.iter().any(|c| c.name == col.name) {
299            return Err(format!(
300                "column '{}' already exists in table '{}'",
301                col.name, table
302            ));
303        }
304        m.columns.push(col);
305        Ok(())
306    }
307
308    pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
309        {
310            let mut meta = self.table_meta.write();
311            let m = meta
312                .get_mut(table)
313                .ok_or_else(|| format!("table '{}' not found", table))?;
314            let pos = m
315                .columns
316                .iter()
317                .position(|c| c.name == column)
318                .ok_or_else(|| {
319                    format!("column '{}' does not exist in table '{}'", column, table)
320                })?;
321            if m.columns[pos].primary_key {
322                return Err(format!("cannot drop primary key column '{}'", column));
323            }
324            m.columns.remove(pos);
325        }
326        {
327            let mut tables = self.tables.write();
328            if let Some(rows) = tables.get_mut(table) {
329                for row in rows.iter_mut() {
330                    row.values.remove(column);
331                }
332            }
333        }
334        Ok(())
335    }
336
337    pub fn alter_table_rename_column(
338        &self,
339        table: &str,
340        from: &str,
341        to: &str,
342    ) -> Result<(), String> {
343        {
344            let mut meta = self.table_meta.write();
345            let m = meta
346                .get_mut(table)
347                .ok_or_else(|| format!("table '{}' not found", table))?;
348            if m.columns.iter().any(|c| c.name == to) {
349                return Err(format!(
350                    "column '{}' already exists in table '{}'",
351                    to, table
352                ));
353            }
354            let col = m
355                .columns
356                .iter_mut()
357                .find(|c| c.name == from)
358                .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
359            if col.primary_key {
360                return Err(format!("cannot rename primary key column '{}'", from));
361            }
362            col.name = to.to_string();
363        }
364        {
365            let mut tables = self.tables.write();
366            if let Some(rows) = tables.get_mut(table) {
367                for row in rows.iter_mut() {
368                    if let Some(val) = row.values.remove(from) {
369                        row.values.insert(to.to_string(), val);
370                    }
371                }
372            }
373        }
374        Ok(())
375    }
376
377    pub fn is_immutable(&self, table: &str) -> bool {
378        self.table_meta
379            .read()
380            .get(table)
381            .is_some_and(|m| m.immutable)
382    }
383
384    pub fn validate_state_transition(
385        &self,
386        table: &str,
387        column: &str,
388        from: &str,
389        to: &str,
390    ) -> bool {
391        self.table_meta
392            .read()
393            .get(table)
394            .and_then(|m| m.state_machine.as_ref())
395            .filter(|sm| sm.column == column)
396            .is_none_or(|sm| {
397                sm.transitions
398                    .get(from)
399                    .is_some_and(|targets| targets.iter().any(|t| t == to))
400            })
401    }
402
403    pub fn table_names(&self) -> Vec<String> {
404        let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
405        names.sort();
406        names
407    }
408
409    pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
410        self.table_meta.read().get(name).cloned()
411    }
412}
413
414/// Build the directed IndexKey for a row's values given the index's column
415/// declaration. Missing columns map to `Value::Null` (NULL partition).
416pub fn index_key_for_row(
417    columns: &[(String, SortDirection)],
418    values: &HashMap<String, Value>,
419) -> IndexKey {
420    columns
421        .iter()
422        .map(|(col, dir)| {
423            let v = values.get(col).cloned().unwrap_or(Value::Null);
424            match dir {
425                SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
426                SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
427            }
428        })
429        .collect()
430}
431
432/// Build the directed IndexKey for a Vec of Values (one per indexed column).
433pub fn index_key_from_values(columns: &[(String, SortDirection)], values: &[Value]) -> IndexKey {
434    columns
435        .iter()
436        .zip(values.iter())
437        .map(|((_, dir), v)| match dir {
438            SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v.clone())),
439            SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v.clone())),
440        })
441        .collect()
442}