1use contextdb_core::{
2 DirectedValue, IndexKey, RowId, SortDirection, TableMeta, TableName, TotalOrdAsc, TotalOrdDesc,
3 TxId, Value, VersionedRow,
4};
5use parking_lot::RwLock;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexEntry {
11 pub row_id: RowId,
12 pub created_tx: TxId,
13 pub deleted_tx: Option<TxId>,
14}
15
16impl IndexEntry {
17 pub fn visible_at(&self, snapshot: contextdb_core::SnapshotId) -> bool {
18 self.created_tx.0 <= snapshot.0 && self.deleted_tx.is_none_or(|tx| tx.0 > snapshot.0)
19 }
20}
21
22#[derive(Debug, Default)]
25pub struct IndexStorage {
26 pub columns: Vec<(String, SortDirection)>,
27 pub tree: BTreeMap<IndexKey, Vec<IndexEntry>>,
30}
31
32impl IndexStorage {
33 pub fn new(columns: Vec<(String, SortDirection)>) -> Self {
34 Self {
35 columns,
36 tree: BTreeMap::new(),
37 }
38 }
39
40 pub fn total_entries(&self) -> u64 {
42 self.tree
43 .values()
44 .map(|v| v.iter().filter(|e| e.deleted_tx.is_none()).count() as u64)
45 .sum()
46 }
47
48 pub fn total_entries_including_tombstones(&self) -> u64 {
50 self.tree.values().map(|v| v.len() as u64).sum()
51 }
52
53 pub fn insert_posting(&mut self, key: IndexKey, entry: IndexEntry) {
55 let vec = self.tree.entry(key).or_default();
56 let pos = vec
57 .binary_search_by(|e| e.row_id.cmp(&entry.row_id))
58 .unwrap_or_else(|i| i);
59 vec.insert(pos, entry);
60 }
61
62 pub fn tombstone_posting(&mut self, key: &IndexKey, row_id: RowId, deleted_tx: TxId) {
64 if let Some(vec) = self.tree.get_mut(key) {
65 for entry in vec.iter_mut() {
66 if entry.row_id == row_id && entry.deleted_tx.is_none() {
67 entry.deleted_tx = Some(deleted_tx);
68 return;
69 }
70 }
71 }
72 }
73}
74
75pub struct RelationalStore {
76 pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
77 row_positions: RwLock<HashMap<(TableName, RowId), usize>>,
78 pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
79 pub indexes: RwLock<HashMap<(TableName, String), IndexStorage>>,
83 pub index_write_lock_count: AtomicU64,
87 next_row_id: AtomicU64,
88}
89
90impl Default for RelationalStore {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96impl RelationalStore {
97 pub fn new() -> Self {
98 Self {
99 tables: RwLock::new(HashMap::new()),
100 row_positions: RwLock::new(HashMap::new()),
101 table_meta: RwLock::new(HashMap::new()),
102 indexes: RwLock::new(HashMap::new()),
103 index_write_lock_count: AtomicU64::new(0),
104 next_row_id: AtomicU64::new(1),
105 }
106 }
107
108 pub fn new_row_id(&self) -> RowId {
109 RowId(self.next_row_id.fetch_add(1, Ordering::SeqCst))
110 }
111
112 pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
117 let mut tables = self.tables.write();
118 let mut indexes = self.indexes.write();
119 let mut row_positions = self.row_positions.write();
120 for (table_name, row) in inserts {
121 let entry = IndexEntry {
122 row_id: row.row_id,
123 created_tx: row.created_tx,
124 deleted_tx: row.deleted_tx,
125 };
126 for ((t, _), idx) in indexes.iter_mut() {
127 if t != &table_name {
128 continue;
129 }
130 let key = index_key_for_row(&idx.columns, &row.values);
131 idx.insert_posting(key, entry.clone());
132 }
133 let row_id = row.row_id;
134 let rows = tables.entry(table_name.clone()).or_default();
135 row_positions.insert((table_name, row_id), rows.len());
136 rows.push(row);
137 }
138 }
139
140 pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, TxId)>) {
141 let mut tables = self.tables.write();
142 let mut indexes = self.indexes.write();
143 for (table_name, row_id, deleted_tx) in deletes {
144 let row_values: Option<HashMap<String, Value>> = tables
145 .get(&table_name)
146 .and_then(|rows| rows.iter().find(|r| r.row_id == row_id))
147 .map(|r| r.values.clone());
148 if let Some(values) = row_values {
149 for ((t, _), idx) in indexes.iter_mut() {
150 if t != &table_name {
151 continue;
152 }
153 let key = index_key_for_row(&idx.columns, &values);
154 idx.tombstone_posting(&key, row_id, deleted_tx);
155 }
156 }
157 if let Some(rows) = tables.get_mut(&table_name) {
158 for row in rows.iter_mut() {
159 if row.row_id == row_id && row.deleted_tx.is_none() {
160 row.deleted_tx = Some(deleted_tx);
161 }
162 }
163 }
164 }
165 }
166
167 pub fn create_table(&self, name: &str, meta: TableMeta) {
168 self.tables.write().entry(name.to_string()).or_default();
169 self.table_meta.write().insert(name.to_string(), meta);
170 }
171
172 pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
173 {
177 let mut indexes = self.indexes.write();
178 let entry = IndexEntry {
179 row_id: row.row_id,
180 created_tx: row.created_tx,
181 deleted_tx: row.deleted_tx,
182 };
183 for ((t, _), idx) in indexes.iter_mut() {
184 if t != name {
185 continue;
186 }
187 let key = index_key_for_row(&idx.columns, &row.values);
188 idx.insert_posting(key, entry.clone());
189 }
190 }
191 let mut tables = self.tables.write();
192 let rows = tables.entry(name.to_string()).or_default();
193 self.row_positions
194 .write()
195 .insert((name.to_string(), row.row_id), rows.len());
196 rows.push(row);
197 }
198
199 pub fn row_by_id(
200 &self,
201 table: &str,
202 row_id: RowId,
203 snapshot: contextdb_core::SnapshotId,
204 ) -> Option<VersionedRow> {
205 let tables = self.tables.read();
206 let positions = self.row_positions.read();
207 let position = *positions.get(&(table.to_string(), row_id))?;
208 drop(positions);
209 tables
210 .get(table)
211 .and_then(|rows| rows.get(position))
212 .filter(|row| row.row_id == row_id && row.visible_at(snapshot))
213 .cloned()
214 }
215
216 pub fn max_row_id(&self) -> RowId {
217 self.tables
218 .read()
219 .values()
220 .flat_map(|rows| rows.iter().map(|row| row.row_id))
221 .max()
222 .unwrap_or(RowId(0))
223 }
224
225 pub fn set_next_row_id(&self, next_row_id: RowId) {
226 self.next_row_id.store(next_row_id.0, Ordering::SeqCst);
227 }
228
229 pub fn drop_table(&self, name: &str) {
230 self.tables.write().remove(name);
231 self.row_positions
232 .write()
233 .retain(|(table, _), _| table != name);
234 self.table_meta.write().remove(name);
235 let mut indexes = self.indexes.write();
237 indexes.retain(|(table, _), _| table != name);
238 }
239
240 pub fn create_index_storage(
244 &self,
245 table: &str,
246 name: &str,
247 columns: Vec<(String, SortDirection)>,
248 ) {
249 self.indexes.write().insert(
250 (table.to_string(), name.to_string()),
251 IndexStorage::new(columns),
252 );
253 }
254
255 pub fn drop_index_storage(&self, table: &str, name: &str) {
257 self.indexes
258 .write()
259 .remove(&(table.to_string(), name.to_string()));
260 }
261
262 pub fn rebuild_index(&self, table: &str, name: &str) {
267 let columns = {
268 let indexes = self.indexes.read();
269 match indexes.get(&(table.to_string(), name.to_string())) {
270 Some(idx) => idx.columns.clone(),
271 None => return,
272 }
273 };
274 let mut rebuilt = IndexStorage::new(columns.clone());
275 let tables = self.tables.read();
276 if let Some(rows) = tables.get(table) {
277 let mut sorted: Vec<&VersionedRow> = rows.iter().collect();
278 sorted.sort_by_key(|r| r.row_id);
279 for row in sorted {
280 let key = index_key_for_row(&columns, &row.values);
281 rebuilt.insert_posting(
282 key,
283 IndexEntry {
284 row_id: row.row_id,
285 created_tx: row.created_tx,
286 deleted_tx: row.deleted_tx,
287 },
288 );
289 }
290 }
291 self.indexes
292 .write()
293 .insert((table.to_string(), name.to_string()), rebuilt);
294 }
295
296 pub fn introspect_indexes_total_entries(&self) -> u64 {
299 self.indexes
300 .read()
301 .values()
302 .map(|s| s.total_entries_including_tombstones())
303 .sum()
304 }
305
306 pub fn bump_index_write_lock_count(&self) {
309 self.index_write_lock_count.fetch_add(1, Ordering::SeqCst);
310 }
311
312 pub fn index_write_lock_count(&self) -> u64 {
313 self.index_write_lock_count.load(Ordering::SeqCst)
314 }
315
316 pub fn alter_table_add_column(
317 &self,
318 table: &str,
319 col: contextdb_core::ColumnDef,
320 ) -> Result<(), String> {
321 let mut meta = self.table_meta.write();
322 let m = meta
323 .get_mut(table)
324 .ok_or_else(|| format!("table '{}' not found", table))?;
325 if m.columns.iter().any(|c| c.name == col.name) {
326 return Err(format!(
327 "column '{}' already exists in table '{}'",
328 col.name, table
329 ));
330 }
331 m.columns.push(col);
332 Ok(())
333 }
334
335 pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
336 {
337 let mut meta = self.table_meta.write();
338 let m = meta
339 .get_mut(table)
340 .ok_or_else(|| format!("table '{}' not found", table))?;
341 let pos = m
342 .columns
343 .iter()
344 .position(|c| c.name == column)
345 .ok_or_else(|| {
346 format!("column '{}' does not exist in table '{}'", column, table)
347 })?;
348 if m.columns[pos].primary_key {
349 return Err(format!("cannot drop primary key column '{}'", column));
350 }
351 m.columns.remove(pos);
352 }
353 {
354 let mut tables = self.tables.write();
355 if let Some(rows) = tables.get_mut(table) {
356 for row in rows.iter_mut() {
357 row.values.remove(column);
358 }
359 }
360 }
361 Ok(())
362 }
363
364 pub fn alter_table_rename_column(
365 &self,
366 table: &str,
367 from: &str,
368 to: &str,
369 ) -> Result<(), String> {
370 {
371 let mut meta = self.table_meta.write();
372 let m = meta
373 .get_mut(table)
374 .ok_or_else(|| format!("table '{}' not found", table))?;
375 if m.columns.iter().any(|c| c.name == to) {
376 return Err(format!(
377 "column '{}' already exists in table '{}'",
378 to, table
379 ));
380 }
381 let col = m
382 .columns
383 .iter_mut()
384 .find(|c| c.name == from)
385 .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
386 if col.primary_key {
387 return Err(format!("cannot rename primary key column '{}'", from));
388 }
389 col.name = to.to_string();
390 }
391 {
392 let mut tables = self.tables.write();
393 if let Some(rows) = tables.get_mut(table) {
394 for row in rows.iter_mut() {
395 if let Some(val) = row.values.remove(from) {
396 row.values.insert(to.to_string(), val);
397 }
398 }
399 }
400 }
401 Ok(())
402 }
403
404 pub fn is_immutable(&self, table: &str) -> bool {
405 self.table_meta
406 .read()
407 .get(table)
408 .is_some_and(|m| m.immutable)
409 }
410
411 pub fn validate_state_transition(
412 &self,
413 table: &str,
414 column: &str,
415 from: &str,
416 to: &str,
417 ) -> bool {
418 self.table_meta
419 .read()
420 .get(table)
421 .and_then(|m| m.state_machine.as_ref())
422 .filter(|sm| sm.column == column)
423 .is_none_or(|sm| {
424 sm.transitions
425 .get(from)
426 .is_some_and(|targets| targets.iter().any(|t| t == to))
427 })
428 }
429
430 pub fn table_names(&self) -> Vec<String> {
431 let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
432 names.sort();
433 names
434 }
435
436 pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
437 self.table_meta.read().get(name).cloned()
438 }
439}
440
441pub fn index_key_for_row(
444 columns: &[(String, SortDirection)],
445 values: &HashMap<String, Value>,
446) -> IndexKey {
447 columns
448 .iter()
449 .map(|(col, dir)| {
450 let v = values.get(col).cloned().unwrap_or(Value::Null);
451 match dir {
452 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
453 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
454 }
455 })
456 .collect()
457}
458
459pub fn index_key_from_values(columns: &[(String, SortDirection)], values: &[Value]) -> IndexKey {
461 columns
462 .iter()
463 .zip(values.iter())
464 .map(|((_, dir), v)| match dir {
465 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v.clone())),
466 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v.clone())),
467 })
468 .collect()
469}