Skip to main content

contextdb_relational/
store.rs

1use contextdb_core::{
2    DirectedValue, IndexKey, RowId, SortDirection, TableMeta, TableName, TotalOrdAsc, TotalOrdDesc,
3    TxId, Value, VersionedRow,
4};
5use parking_lot::RwLock;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexEntry {
11    pub row_id: RowId,
12    pub created_tx: TxId,
13    pub deleted_tx: Option<TxId>,
14}
15
16impl IndexEntry {
17    pub fn visible_at(&self, snapshot: contextdb_core::SnapshotId) -> bool {
18        self.created_tx.0 <= snapshot.0 && self.deleted_tx.is_none_or(|tx| tx.0 > snapshot.0)
19    }
20}
21
22/// Storage for one declared index: BTreeMap keyed by `IndexKey` (direction-
23/// wrapped per column) to a Vec of posting entries with MVCC visibility.
24#[derive(Debug, Default)]
25pub struct IndexStorage {
26    pub columns: Vec<(String, SortDirection)>,
27    /// Non-unique keys map to Vec<IndexEntry>; tie-break by row_id ascending
28    /// is maintained at insert time (I18).
29    pub tree: BTreeMap<IndexKey, Vec<IndexEntry>>,
30}
31
32impl IndexStorage {
33    pub fn new(columns: Vec<(String, SortDirection)>) -> Self {
34        Self {
35            columns,
36            tree: BTreeMap::new(),
37        }
38    }
39
40    /// Number of live (non-tombstoned) postings, useful for leak detection.
41    pub fn total_entries(&self) -> u64 {
42        self.tree
43            .values()
44            .map(|v| v.iter().filter(|e| e.deleted_tx.is_none()).count() as u64)
45            .sum()
46    }
47
48    /// Total postings including tombstones (for DROP TABLE leak detection).
49    pub fn total_entries_including_tombstones(&self) -> u64 {
50        self.tree.values().map(|v| v.len() as u64).sum()
51    }
52
53    /// Insert a posting at the given key, placing it in row_id-ascending order.
54    pub fn insert_posting(&mut self, key: IndexKey, entry: IndexEntry) {
55        let vec = self.tree.entry(key).or_default();
56        let pos = vec
57            .binary_search_by(|e| e.row_id.cmp(&entry.row_id))
58            .unwrap_or_else(|i| i);
59        vec.insert(pos, entry);
60    }
61
62    /// Stamp `deleted_tx` on the posting matching `row_id` at `key`.
63    pub fn tombstone_posting(&mut self, key: &IndexKey, row_id: RowId, deleted_tx: TxId) {
64        if let Some(vec) = self.tree.get_mut(key) {
65            for entry in vec.iter_mut() {
66                if entry.row_id == row_id && entry.deleted_tx.is_none() {
67                    entry.deleted_tx = Some(deleted_tx);
68                    return;
69                }
70            }
71        }
72    }
73}
74
75pub struct RelationalStore {
76    pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
77    row_positions: RwLock<HashMap<(TableName, RowId), usize>>,
78    pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
79    /// (table, index_name) → IndexStorage. Lives alongside TableMeta.indexes
80    /// so readers / writers can look up the physical B-tree without needing a
81    /// second-level lock per index.
82    pub indexes: RwLock<HashMap<(TableName, String), IndexStorage>>,
83    /// Counts how many times `apply_changes`-style batch-level index-lock
84    /// acquisitions have happened. The per-row commit path does not bump this
85    /// counter; only coarse batch holders (`apply_changes` wrapping N rows) do.
86    pub index_write_lock_count: AtomicU64,
87    next_row_id: AtomicU64,
88}
89
90impl Default for RelationalStore {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl RelationalStore {
97    pub fn new() -> Self {
98        Self {
99            tables: RwLock::new(HashMap::new()),
100            row_positions: RwLock::new(HashMap::new()),
101            table_meta: RwLock::new(HashMap::new()),
102            indexes: RwLock::new(HashMap::new()),
103            index_write_lock_count: AtomicU64::new(0),
104            next_row_id: AtomicU64::new(1),
105        }
106    }
107
108    pub fn new_row_id(&self) -> RowId {
109        RowId(self.next_row_id.fetch_add(1, Ordering::SeqCst))
110    }
111
112    /// Apply row inserts AND maintain every index (user-declared AND auto)
113    /// on the affected tables. The index update runs under the same
114    /// write-lock scope that the relational insert takes (implicitly held
115    /// by the caller's commit_mutex).
116    pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
117        let mut tables = self.tables.write();
118        let mut indexes = self.indexes.write();
119        let mut row_positions = self.row_positions.write();
120        for (table_name, row) in inserts {
121            let entry = IndexEntry {
122                row_id: row.row_id,
123                created_tx: row.created_tx,
124                deleted_tx: row.deleted_tx,
125            };
126            for ((t, _), idx) in indexes.iter_mut() {
127                if t != &table_name {
128                    continue;
129                }
130                let key = index_key_for_row(&idx.columns, &row.values);
131                idx.insert_posting(key, entry.clone());
132            }
133            let row_id = row.row_id;
134            let rows = tables.entry(table_name.clone()).or_default();
135            row_positions.insert((table_name, row_id), rows.len());
136            rows.push(row);
137        }
138    }
139
140    pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, TxId)>) {
141        let mut tables = self.tables.write();
142        let mut indexes = self.indexes.write();
143        for (table_name, row_id, deleted_tx) in deletes {
144            let row_values: Option<HashMap<String, Value>> = tables
145                .get(&table_name)
146                .and_then(|rows| rows.iter().find(|r| r.row_id == row_id))
147                .map(|r| r.values.clone());
148            if let Some(values) = row_values {
149                for ((t, _), idx) in indexes.iter_mut() {
150                    if t != &table_name {
151                        continue;
152                    }
153                    let key = index_key_for_row(&idx.columns, &values);
154                    idx.tombstone_posting(&key, row_id, deleted_tx);
155                }
156            }
157            if let Some(rows) = tables.get_mut(&table_name) {
158                for row in rows.iter_mut() {
159                    if row.row_id == row_id && row.deleted_tx.is_none() {
160                        row.deleted_tx = Some(deleted_tx);
161                    }
162                }
163            }
164        }
165    }
166
167    pub fn create_table(&self, name: &str, meta: TableMeta) {
168        self.tables.write().entry(name.to_string()).or_default();
169        self.table_meta.write().insert(name.to_string(), meta);
170    }
171
172    pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
173        // Row-load during Database::open: populate every index (user-declared
174        // + auto) so the rebuild stays in lockstep. Rows arrive in row_id
175        // ascending order, satisfying I18.
176        {
177            let mut indexes = self.indexes.write();
178            let entry = IndexEntry {
179                row_id: row.row_id,
180                created_tx: row.created_tx,
181                deleted_tx: row.deleted_tx,
182            };
183            for ((t, _), idx) in indexes.iter_mut() {
184                if t != name {
185                    continue;
186                }
187                let key = index_key_for_row(&idx.columns, &row.values);
188                idx.insert_posting(key, entry.clone());
189            }
190        }
191        let mut tables = self.tables.write();
192        let rows = tables.entry(name.to_string()).or_default();
193        self.row_positions
194            .write()
195            .insert((name.to_string(), row.row_id), rows.len());
196        rows.push(row);
197    }
198
199    pub fn row_by_id(
200        &self,
201        table: &str,
202        row_id: RowId,
203        snapshot: contextdb_core::SnapshotId,
204    ) -> Option<VersionedRow> {
205        let tables = self.tables.read();
206        let positions = self.row_positions.read();
207        let position = *positions.get(&(table.to_string(), row_id))?;
208        drop(positions);
209        tables
210            .get(table)
211            .and_then(|rows| rows.get(position))
212            .filter(|row| row.row_id == row_id && row.visible_at(snapshot))
213            .cloned()
214    }
215
216    pub fn max_row_id(&self) -> RowId {
217        self.tables
218            .read()
219            .values()
220            .flat_map(|rows| rows.iter().map(|row| row.row_id))
221            .max()
222            .unwrap_or(RowId(0))
223    }
224
225    pub fn set_next_row_id(&self, next_row_id: RowId) {
226        self.next_row_id.store(next_row_id.0, Ordering::SeqCst);
227    }
228
229    pub fn drop_table(&self, name: &str) {
230        self.tables.write().remove(name);
231        self.row_positions
232            .write()
233            .retain(|(table, _), _| table != name);
234        self.table_meta.write().remove(name);
235        // Drop all indexes whose key-table matches; releases BTreeMap storage.
236        let mut indexes = self.indexes.write();
237        indexes.retain(|(table, _), _| table != name);
238    }
239
240    /// Register a new index storage for (table, name). The IndexDecl must
241    /// already be present in the table's TableMeta (callers should
242    /// `register_index_meta` before `create_index_storage`).
243    pub fn create_index_storage(
244        &self,
245        table: &str,
246        name: &str,
247        columns: Vec<(String, SortDirection)>,
248    ) {
249        self.indexes.write().insert(
250            (table.to_string(), name.to_string()),
251            IndexStorage::new(columns),
252        );
253    }
254
255    /// Remove an index storage. Called from DROP INDEX / CASCADE.
256    pub fn drop_index_storage(&self, table: &str, name: &str) {
257        self.indexes
258            .write()
259            .remove(&(table.to_string(), name.to_string()));
260    }
261
262    /// Build (or rebuild) an index storage by scanning the current table rows.
263    /// Used after Database::open completes rebuilding TableMeta.indexes but
264    /// before the executor sees queries. Iterates in row_id-ascending order
265    /// to preserve I18 tie-break stability.
266    pub fn rebuild_index(&self, table: &str, name: &str) {
267        let columns = {
268            let indexes = self.indexes.read();
269            match indexes.get(&(table.to_string(), name.to_string())) {
270                Some(idx) => idx.columns.clone(),
271                None => return,
272            }
273        };
274        let mut rebuilt = IndexStorage::new(columns.clone());
275        let tables = self.tables.read();
276        if let Some(rows) = tables.get(table) {
277            let mut sorted: Vec<&VersionedRow> = rows.iter().collect();
278            sorted.sort_by_key(|r| r.row_id);
279            for row in sorted {
280                let key = index_key_for_row(&columns, &row.values);
281                rebuilt.insert_posting(
282                    key,
283                    IndexEntry {
284                        row_id: row.row_id,
285                        created_tx: row.created_tx,
286                        deleted_tx: row.deleted_tx,
287                    },
288                );
289            }
290        }
291        self.indexes
292            .write()
293            .insert((table.to_string(), name.to_string()), rebuilt);
294    }
295
296    /// Introspect total postings across all indexes (including tombstones).
297    /// Tests use this to confirm DROP TABLE releases index storage.
298    pub fn introspect_indexes_total_entries(&self) -> u64 {
299        self.indexes
300            .read()
301            .values()
302            .map(|s| s.total_entries_including_tombstones())
303            .sum()
304    }
305
306    /// Bump the batch-level index-write lock counter. Called once per
307    /// `apply_changes` batch to prove I14.
308    pub fn bump_index_write_lock_count(&self) {
309        self.index_write_lock_count.fetch_add(1, Ordering::SeqCst);
310    }
311
312    pub fn index_write_lock_count(&self) -> u64 {
313        self.index_write_lock_count.load(Ordering::SeqCst)
314    }
315
316    pub fn alter_table_add_column(
317        &self,
318        table: &str,
319        col: contextdb_core::ColumnDef,
320    ) -> Result<(), String> {
321        let mut meta = self.table_meta.write();
322        let m = meta
323            .get_mut(table)
324            .ok_or_else(|| format!("table '{}' not found", table))?;
325        if m.columns.iter().any(|c| c.name == col.name) {
326            return Err(format!(
327                "column '{}' already exists in table '{}'",
328                col.name, table
329            ));
330        }
331        m.columns.push(col);
332        Ok(())
333    }
334
335    pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
336        {
337            let mut meta = self.table_meta.write();
338            let m = meta
339                .get_mut(table)
340                .ok_or_else(|| format!("table '{}' not found", table))?;
341            let pos = m
342                .columns
343                .iter()
344                .position(|c| c.name == column)
345                .ok_or_else(|| {
346                    format!("column '{}' does not exist in table '{}'", column, table)
347                })?;
348            if m.columns[pos].primary_key {
349                return Err(format!("cannot drop primary key column '{}'", column));
350            }
351            m.columns.remove(pos);
352        }
353        {
354            let mut tables = self.tables.write();
355            if let Some(rows) = tables.get_mut(table) {
356                for row in rows.iter_mut() {
357                    row.values.remove(column);
358                }
359            }
360        }
361        Ok(())
362    }
363
364    pub fn alter_table_rename_column(
365        &self,
366        table: &str,
367        from: &str,
368        to: &str,
369    ) -> Result<(), String> {
370        {
371            let mut meta = self.table_meta.write();
372            let m = meta
373                .get_mut(table)
374                .ok_or_else(|| format!("table '{}' not found", table))?;
375            if m.columns.iter().any(|c| c.name == to) {
376                return Err(format!(
377                    "column '{}' already exists in table '{}'",
378                    to, table
379                ));
380            }
381            let col = m
382                .columns
383                .iter_mut()
384                .find(|c| c.name == from)
385                .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
386            if col.primary_key {
387                return Err(format!("cannot rename primary key column '{}'", from));
388            }
389            col.name = to.to_string();
390        }
391        {
392            let mut tables = self.tables.write();
393            if let Some(rows) = tables.get_mut(table) {
394                for row in rows.iter_mut() {
395                    if let Some(val) = row.values.remove(from) {
396                        row.values.insert(to.to_string(), val);
397                    }
398                }
399            }
400        }
401        Ok(())
402    }
403
404    pub fn is_immutable(&self, table: &str) -> bool {
405        self.table_meta
406            .read()
407            .get(table)
408            .is_some_and(|m| m.immutable)
409    }
410
411    pub fn validate_state_transition(
412        &self,
413        table: &str,
414        column: &str,
415        from: &str,
416        to: &str,
417    ) -> bool {
418        self.table_meta
419            .read()
420            .get(table)
421            .and_then(|m| m.state_machine.as_ref())
422            .filter(|sm| sm.column == column)
423            .is_none_or(|sm| {
424                sm.transitions
425                    .get(from)
426                    .is_some_and(|targets| targets.iter().any(|t| t == to))
427            })
428    }
429
430    pub fn table_names(&self) -> Vec<String> {
431        let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
432        names.sort();
433        names
434    }
435
436    pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
437        self.table_meta.read().get(name).cloned()
438    }
439}
440
441/// Build the directed IndexKey for a row's values given the index's column
442/// declaration. Missing columns map to `Value::Null` (NULL partition).
443pub fn index_key_for_row(
444    columns: &[(String, SortDirection)],
445    values: &HashMap<String, Value>,
446) -> IndexKey {
447    columns
448        .iter()
449        .map(|(col, dir)| {
450            let v = values.get(col).cloned().unwrap_or(Value::Null);
451            match dir {
452                SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
453                SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
454            }
455        })
456        .collect()
457}
458
459/// Build the directed IndexKey for a Vec of Values (one per indexed column).
460pub fn index_key_from_values(columns: &[(String, SortDirection)], values: &[Value]) -> IndexKey {
461    columns
462        .iter()
463        .zip(values.iter())
464        .map(|((_, dir), v)| match dir {
465            SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v.clone())),
466            SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v.clone())),
467        })
468        .collect()
469}