use contextdb_core::{
DirectedValue, IndexKey, RowId, SortDirection, TableMeta, TableName, TotalOrdAsc, TotalOrdDesc,
TxId, Value, VersionedRow,
};
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IndexEntry {
pub row_id: RowId,
pub created_tx: TxId,
pub deleted_tx: Option<TxId>,
}
impl IndexEntry {
pub fn visible_at(&self, snapshot: contextdb_core::SnapshotId) -> bool {
self.created_tx.0 <= snapshot.0 && self.deleted_tx.is_none_or(|tx| tx.0 > snapshot.0)
}
}
#[derive(Debug, Default)]
pub struct IndexStorage {
pub columns: Vec<(String, SortDirection)>,
pub tree: BTreeMap<IndexKey, Vec<IndexEntry>>,
}
impl IndexStorage {
pub fn new(columns: Vec<(String, SortDirection)>) -> Self {
Self {
columns,
tree: BTreeMap::new(),
}
}
pub fn total_entries(&self) -> u64 {
self.tree
.values()
.map(|v| v.iter().filter(|e| e.deleted_tx.is_none()).count() as u64)
.sum()
}
pub fn total_entries_including_tombstones(&self) -> u64 {
self.tree.values().map(|v| v.len() as u64).sum()
}
pub fn insert_posting(&mut self, key: IndexKey, entry: IndexEntry) {
let vec = self.tree.entry(key).or_default();
let pos = vec
.binary_search_by(|e| e.row_id.cmp(&entry.row_id))
.unwrap_or_else(|i| i);
vec.insert(pos, entry);
}
pub fn tombstone_posting(&mut self, key: &IndexKey, row_id: RowId, deleted_tx: TxId) {
if let Some(vec) = self.tree.get_mut(key) {
for entry in vec.iter_mut() {
if entry.row_id == row_id && entry.deleted_tx.is_none() {
entry.deleted_tx = Some(deleted_tx);
return;
}
}
}
}
}
pub struct RelationalStore {
pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
pub indexes: RwLock<HashMap<(TableName, String), IndexStorage>>,
pub index_write_lock_count: AtomicU64,
next_row_id: AtomicU64,
}
impl Default for RelationalStore {
fn default() -> Self {
Self::new()
}
}
impl RelationalStore {
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
table_meta: RwLock::new(HashMap::new()),
indexes: RwLock::new(HashMap::new()),
index_write_lock_count: AtomicU64::new(0),
next_row_id: AtomicU64::new(1),
}
}
pub fn new_row_id(&self) -> RowId {
RowId(self.next_row_id.fetch_add(1, Ordering::SeqCst))
}
pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
let mut tables = self.tables.write();
let mut indexes = self.indexes.write();
for (table_name, row) in inserts {
let entry = IndexEntry {
row_id: row.row_id,
created_tx: row.created_tx,
deleted_tx: row.deleted_tx,
};
for ((t, _), idx) in indexes.iter_mut() {
if t != &table_name {
continue;
}
let key = index_key_for_row(&idx.columns, &row.values);
idx.insert_posting(key, entry.clone());
}
tables.entry(table_name).or_default().push(row);
}
}
pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, TxId)>) {
let mut tables = self.tables.write();
let mut indexes = self.indexes.write();
for (table_name, row_id, deleted_tx) in deletes {
let row_values: Option<HashMap<String, Value>> = tables
.get(&table_name)
.and_then(|rows| rows.iter().find(|r| r.row_id == row_id))
.map(|r| r.values.clone());
if let Some(values) = row_values {
for ((t, _), idx) in indexes.iter_mut() {
if t != &table_name {
continue;
}
let key = index_key_for_row(&idx.columns, &values);
idx.tombstone_posting(&key, row_id, deleted_tx);
}
}
if let Some(rows) = tables.get_mut(&table_name) {
for row in rows.iter_mut() {
if row.row_id == row_id && row.deleted_tx.is_none() {
row.deleted_tx = Some(deleted_tx);
}
}
}
}
}
pub fn create_table(&self, name: &str, meta: TableMeta) {
self.tables.write().entry(name.to_string()).or_default();
self.table_meta.write().insert(name.to_string(), meta);
}
pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
{
let mut indexes = self.indexes.write();
let entry = IndexEntry {
row_id: row.row_id,
created_tx: row.created_tx,
deleted_tx: row.deleted_tx,
};
for ((t, _), idx) in indexes.iter_mut() {
if t != name {
continue;
}
let key = index_key_for_row(&idx.columns, &row.values);
idx.insert_posting(key, entry.clone());
}
}
self.tables
.write()
.entry(name.to_string())
.or_default()
.push(row);
}
pub fn max_row_id(&self) -> RowId {
self.tables
.read()
.values()
.flat_map(|rows| rows.iter().map(|row| row.row_id))
.max()
.unwrap_or(RowId(0))
}
pub fn set_next_row_id(&self, next_row_id: RowId) {
self.next_row_id.store(next_row_id.0, Ordering::SeqCst);
}
pub fn drop_table(&self, name: &str) {
self.tables.write().remove(name);
self.table_meta.write().remove(name);
let mut indexes = self.indexes.write();
indexes.retain(|(table, _), _| table != name);
}
pub fn create_index_storage(
&self,
table: &str,
name: &str,
columns: Vec<(String, SortDirection)>,
) {
self.indexes.write().insert(
(table.to_string(), name.to_string()),
IndexStorage::new(columns),
);
}
pub fn drop_index_storage(&self, table: &str, name: &str) {
self.indexes
.write()
.remove(&(table.to_string(), name.to_string()));
}
pub fn rebuild_index(&self, table: &str, name: &str) {
let columns = {
let indexes = self.indexes.read();
match indexes.get(&(table.to_string(), name.to_string())) {
Some(idx) => idx.columns.clone(),
None => return,
}
};
let mut rebuilt = IndexStorage::new(columns.clone());
let tables = self.tables.read();
if let Some(rows) = tables.get(table) {
let mut sorted: Vec<&VersionedRow> = rows.iter().collect();
sorted.sort_by_key(|r| r.row_id);
for row in sorted {
let key = index_key_for_row(&columns, &row.values);
rebuilt.insert_posting(
key,
IndexEntry {
row_id: row.row_id,
created_tx: row.created_tx,
deleted_tx: row.deleted_tx,
},
);
}
}
self.indexes
.write()
.insert((table.to_string(), name.to_string()), rebuilt);
}
pub fn introspect_indexes_total_entries(&self) -> u64 {
self.indexes
.read()
.values()
.map(|s| s.total_entries_including_tombstones())
.sum()
}
pub fn bump_index_write_lock_count(&self) {
self.index_write_lock_count.fetch_add(1, Ordering::SeqCst);
}
pub fn index_write_lock_count(&self) -> u64 {
self.index_write_lock_count.load(Ordering::SeqCst)
}
pub fn alter_table_add_column(
&self,
table: &str,
col: contextdb_core::ColumnDef,
) -> Result<(), String> {
let mut meta = self.table_meta.write();
let m = meta
.get_mut(table)
.ok_or_else(|| format!("table '{}' not found", table))?;
if m.columns.iter().any(|c| c.name == col.name) {
return Err(format!(
"column '{}' already exists in table '{}'",
col.name, table
));
}
m.columns.push(col);
Ok(())
}
pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
{
let mut meta = self.table_meta.write();
let m = meta
.get_mut(table)
.ok_or_else(|| format!("table '{}' not found", table))?;
let pos = m
.columns
.iter()
.position(|c| c.name == column)
.ok_or_else(|| {
format!("column '{}' does not exist in table '{}'", column, table)
})?;
if m.columns[pos].primary_key {
return Err(format!("cannot drop primary key column '{}'", column));
}
m.columns.remove(pos);
}
{
let mut tables = self.tables.write();
if let Some(rows) = tables.get_mut(table) {
for row in rows.iter_mut() {
row.values.remove(column);
}
}
}
Ok(())
}
pub fn alter_table_rename_column(
&self,
table: &str,
from: &str,
to: &str,
) -> Result<(), String> {
{
let mut meta = self.table_meta.write();
let m = meta
.get_mut(table)
.ok_or_else(|| format!("table '{}' not found", table))?;
if m.columns.iter().any(|c| c.name == to) {
return Err(format!(
"column '{}' already exists in table '{}'",
to, table
));
}
let col = m
.columns
.iter_mut()
.find(|c| c.name == from)
.ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
if col.primary_key {
return Err(format!("cannot rename primary key column '{}'", from));
}
col.name = to.to_string();
}
{
let mut tables = self.tables.write();
if let Some(rows) = tables.get_mut(table) {
for row in rows.iter_mut() {
if let Some(val) = row.values.remove(from) {
row.values.insert(to.to_string(), val);
}
}
}
}
Ok(())
}
pub fn is_immutable(&self, table: &str) -> bool {
self.table_meta
.read()
.get(table)
.is_some_and(|m| m.immutable)
}
pub fn validate_state_transition(
&self,
table: &str,
column: &str,
from: &str,
to: &str,
) -> bool {
self.table_meta
.read()
.get(table)
.and_then(|m| m.state_machine.as_ref())
.filter(|sm| sm.column == column)
.is_none_or(|sm| {
sm.transitions
.get(from)
.is_some_and(|targets| targets.iter().any(|t| t == to))
})
}
pub fn table_names(&self) -> Vec<String> {
let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
names.sort();
names
}
pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
self.table_meta.read().get(name).cloned()
}
}
pub fn index_key_for_row(
columns: &[(String, SortDirection)],
values: &HashMap<String, Value>,
) -> IndexKey {
columns
.iter()
.map(|(col, dir)| {
let v = values.get(col).cloned().unwrap_or(Value::Null);
match dir {
SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
}
})
.collect()
}
pub fn index_key_from_values(columns: &[(String, SortDirection)], values: &[Value]) -> IndexKey {
columns
.iter()
.zip(values.iter())
.map(|((_, dir), v)| match dir {
SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v.clone())),
SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v.clone())),
})
.collect()
}