1use contextdb_core::{
2 DirectedValue, IndexKey, RowId, SortDirection, TableMeta, TableName, TotalOrdAsc, TotalOrdDesc,
3 TxId, Value, VersionedRow,
4};
5use parking_lot::RwLock;
6use std::collections::{BTreeMap, HashMap};
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexEntry {
11 pub row_id: RowId,
12 pub created_tx: TxId,
13 pub deleted_tx: Option<TxId>,
14}
15
16impl IndexEntry {
17 pub fn visible_at(&self, snapshot: contextdb_core::SnapshotId) -> bool {
18 self.created_tx.0 <= snapshot.0 && self.deleted_tx.is_none_or(|tx| tx.0 > snapshot.0)
19 }
20}
21
22#[derive(Debug, Default)]
25pub struct IndexStorage {
26 pub columns: Vec<(String, SortDirection)>,
27 pub tree: BTreeMap<IndexKey, Vec<IndexEntry>>,
30}
31
32impl IndexStorage {
33 pub fn new(columns: Vec<(String, SortDirection)>) -> Self {
34 Self {
35 columns,
36 tree: BTreeMap::new(),
37 }
38 }
39
40 pub fn total_entries(&self) -> u64 {
42 self.tree
43 .values()
44 .map(|v| v.iter().filter(|e| e.deleted_tx.is_none()).count() as u64)
45 .sum()
46 }
47
48 pub fn total_entries_including_tombstones(&self) -> u64 {
50 self.tree.values().map(|v| v.len() as u64).sum()
51 }
52
53 pub fn insert_posting(&mut self, key: IndexKey, entry: IndexEntry) {
55 let vec = self.tree.entry(key).or_default();
56 let pos = vec
57 .binary_search_by(|e| e.row_id.cmp(&entry.row_id))
58 .unwrap_or_else(|i| i);
59 vec.insert(pos, entry);
60 }
61
62 pub fn tombstone_posting(&mut self, key: &IndexKey, row_id: RowId, deleted_tx: TxId) {
64 if let Some(vec) = self.tree.get_mut(key) {
65 for entry in vec.iter_mut() {
66 if entry.row_id == row_id && entry.deleted_tx.is_none() {
67 entry.deleted_tx = Some(deleted_tx);
68 return;
69 }
70 }
71 }
72 }
73}
74
75pub struct RelationalStore {
76 pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
77 pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
78 pub indexes: RwLock<HashMap<(TableName, String), IndexStorage>>,
82 pub index_write_lock_count: AtomicU64,
86 next_row_id: AtomicU64,
87}
88
89impl Default for RelationalStore {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl RelationalStore {
96 pub fn new() -> Self {
97 Self {
98 tables: RwLock::new(HashMap::new()),
99 table_meta: RwLock::new(HashMap::new()),
100 indexes: RwLock::new(HashMap::new()),
101 index_write_lock_count: AtomicU64::new(0),
102 next_row_id: AtomicU64::new(1),
103 }
104 }
105
106 pub fn new_row_id(&self) -> RowId {
107 RowId(self.next_row_id.fetch_add(1, Ordering::SeqCst))
108 }
109
110 pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
115 let mut tables = self.tables.write();
116 let mut indexes = self.indexes.write();
117 for (table_name, row) in inserts {
118 let entry = IndexEntry {
119 row_id: row.row_id,
120 created_tx: row.created_tx,
121 deleted_tx: row.deleted_tx,
122 };
123 for ((t, _), idx) in indexes.iter_mut() {
124 if t != &table_name {
125 continue;
126 }
127 let key = index_key_for_row(&idx.columns, &row.values);
128 idx.insert_posting(key, entry.clone());
129 }
130 tables.entry(table_name).or_default().push(row);
131 }
132 }
133
134 pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, TxId)>) {
135 let mut tables = self.tables.write();
136 let mut indexes = self.indexes.write();
137 for (table_name, row_id, deleted_tx) in deletes {
138 let row_values: Option<HashMap<String, Value>> = tables
139 .get(&table_name)
140 .and_then(|rows| rows.iter().find(|r| r.row_id == row_id))
141 .map(|r| r.values.clone());
142 if let Some(values) = row_values {
143 for ((t, _), idx) in indexes.iter_mut() {
144 if t != &table_name {
145 continue;
146 }
147 let key = index_key_for_row(&idx.columns, &values);
148 idx.tombstone_posting(&key, row_id, deleted_tx);
149 }
150 }
151 if let Some(rows) = tables.get_mut(&table_name) {
152 for row in rows.iter_mut() {
153 if row.row_id == row_id && row.deleted_tx.is_none() {
154 row.deleted_tx = Some(deleted_tx);
155 }
156 }
157 }
158 }
159 }
160
161 pub fn create_table(&self, name: &str, meta: TableMeta) {
162 self.tables.write().entry(name.to_string()).or_default();
163 self.table_meta.write().insert(name.to_string(), meta);
164 }
165
166 pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
167 {
171 let mut indexes = self.indexes.write();
172 let entry = IndexEntry {
173 row_id: row.row_id,
174 created_tx: row.created_tx,
175 deleted_tx: row.deleted_tx,
176 };
177 for ((t, _), idx) in indexes.iter_mut() {
178 if t != name {
179 continue;
180 }
181 let key = index_key_for_row(&idx.columns, &row.values);
182 idx.insert_posting(key, entry.clone());
183 }
184 }
185 self.tables
186 .write()
187 .entry(name.to_string())
188 .or_default()
189 .push(row);
190 }
191
192 pub fn max_row_id(&self) -> RowId {
193 self.tables
194 .read()
195 .values()
196 .flat_map(|rows| rows.iter().map(|row| row.row_id))
197 .max()
198 .unwrap_or(RowId(0))
199 }
200
201 pub fn set_next_row_id(&self, next_row_id: RowId) {
202 self.next_row_id.store(next_row_id.0, Ordering::SeqCst);
203 }
204
205 pub fn drop_table(&self, name: &str) {
206 self.tables.write().remove(name);
207 self.table_meta.write().remove(name);
208 let mut indexes = self.indexes.write();
210 indexes.retain(|(table, _), _| table != name);
211 }
212
213 pub fn create_index_storage(
217 &self,
218 table: &str,
219 name: &str,
220 columns: Vec<(String, SortDirection)>,
221 ) {
222 self.indexes.write().insert(
223 (table.to_string(), name.to_string()),
224 IndexStorage::new(columns),
225 );
226 }
227
228 pub fn drop_index_storage(&self, table: &str, name: &str) {
230 self.indexes
231 .write()
232 .remove(&(table.to_string(), name.to_string()));
233 }
234
235 pub fn rebuild_index(&self, table: &str, name: &str) {
240 let columns = {
241 let indexes = self.indexes.read();
242 match indexes.get(&(table.to_string(), name.to_string())) {
243 Some(idx) => idx.columns.clone(),
244 None => return,
245 }
246 };
247 let mut rebuilt = IndexStorage::new(columns.clone());
248 let tables = self.tables.read();
249 if let Some(rows) = tables.get(table) {
250 let mut sorted: Vec<&VersionedRow> = rows.iter().collect();
251 sorted.sort_by_key(|r| r.row_id);
252 for row in sorted {
253 let key = index_key_for_row(&columns, &row.values);
254 rebuilt.insert_posting(
255 key,
256 IndexEntry {
257 row_id: row.row_id,
258 created_tx: row.created_tx,
259 deleted_tx: row.deleted_tx,
260 },
261 );
262 }
263 }
264 self.indexes
265 .write()
266 .insert((table.to_string(), name.to_string()), rebuilt);
267 }
268
269 pub fn introspect_indexes_total_entries(&self) -> u64 {
272 self.indexes
273 .read()
274 .values()
275 .map(|s| s.total_entries_including_tombstones())
276 .sum()
277 }
278
279 pub fn bump_index_write_lock_count(&self) {
282 self.index_write_lock_count.fetch_add(1, Ordering::SeqCst);
283 }
284
285 pub fn index_write_lock_count(&self) -> u64 {
286 self.index_write_lock_count.load(Ordering::SeqCst)
287 }
288
289 pub fn alter_table_add_column(
290 &self,
291 table: &str,
292 col: contextdb_core::ColumnDef,
293 ) -> Result<(), String> {
294 let mut meta = self.table_meta.write();
295 let m = meta
296 .get_mut(table)
297 .ok_or_else(|| format!("table '{}' not found", table))?;
298 if m.columns.iter().any(|c| c.name == col.name) {
299 return Err(format!(
300 "column '{}' already exists in table '{}'",
301 col.name, table
302 ));
303 }
304 m.columns.push(col);
305 Ok(())
306 }
307
308 pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
309 {
310 let mut meta = self.table_meta.write();
311 let m = meta
312 .get_mut(table)
313 .ok_or_else(|| format!("table '{}' not found", table))?;
314 let pos = m
315 .columns
316 .iter()
317 .position(|c| c.name == column)
318 .ok_or_else(|| {
319 format!("column '{}' does not exist in table '{}'", column, table)
320 })?;
321 if m.columns[pos].primary_key {
322 return Err(format!("cannot drop primary key column '{}'", column));
323 }
324 m.columns.remove(pos);
325 }
326 {
327 let mut tables = self.tables.write();
328 if let Some(rows) = tables.get_mut(table) {
329 for row in rows.iter_mut() {
330 row.values.remove(column);
331 }
332 }
333 }
334 Ok(())
335 }
336
337 pub fn alter_table_rename_column(
338 &self,
339 table: &str,
340 from: &str,
341 to: &str,
342 ) -> Result<(), String> {
343 {
344 let mut meta = self.table_meta.write();
345 let m = meta
346 .get_mut(table)
347 .ok_or_else(|| format!("table '{}' not found", table))?;
348 if m.columns.iter().any(|c| c.name == to) {
349 return Err(format!(
350 "column '{}' already exists in table '{}'",
351 to, table
352 ));
353 }
354 let col = m
355 .columns
356 .iter_mut()
357 .find(|c| c.name == from)
358 .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
359 if col.primary_key {
360 return Err(format!("cannot rename primary key column '{}'", from));
361 }
362 col.name = to.to_string();
363 }
364 {
365 let mut tables = self.tables.write();
366 if let Some(rows) = tables.get_mut(table) {
367 for row in rows.iter_mut() {
368 if let Some(val) = row.values.remove(from) {
369 row.values.insert(to.to_string(), val);
370 }
371 }
372 }
373 }
374 Ok(())
375 }
376
377 pub fn is_immutable(&self, table: &str) -> bool {
378 self.table_meta
379 .read()
380 .get(table)
381 .is_some_and(|m| m.immutable)
382 }
383
384 pub fn validate_state_transition(
385 &self,
386 table: &str,
387 column: &str,
388 from: &str,
389 to: &str,
390 ) -> bool {
391 self.table_meta
392 .read()
393 .get(table)
394 .and_then(|m| m.state_machine.as_ref())
395 .filter(|sm| sm.column == column)
396 .is_none_or(|sm| {
397 sm.transitions
398 .get(from)
399 .is_some_and(|targets| targets.iter().any(|t| t == to))
400 })
401 }
402
403 pub fn table_names(&self) -> Vec<String> {
404 let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
405 names.sort();
406 names
407 }
408
409 pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
410 self.table_meta.read().get(name).cloned()
411 }
412}
413
414pub fn index_key_for_row(
417 columns: &[(String, SortDirection)],
418 values: &HashMap<String, Value>,
419) -> IndexKey {
420 columns
421 .iter()
422 .map(|(col, dir)| {
423 let v = values.get(col).cloned().unwrap_or(Value::Null);
424 match dir {
425 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
426 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
427 }
428 })
429 .collect()
430}
431
432pub fn index_key_from_values(columns: &[(String, SortDirection)], values: &[Value]) -> IndexKey {
434 columns
435 .iter()
436 .zip(values.iter())
437 .map(|((_, dir), v)| match dir {
438 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v.clone())),
439 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v.clone())),
440 })
441 .collect()
442}