contextdb-relational 0.3.2

Relational executor for contextdb — scan, insert, upsert, delete
Documentation
use contextdb_core::{RowId, TableMeta, TableName, VersionedRow};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};

pub struct RelationalStore {
    pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
    pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
    next_row_id: AtomicU64,
}

impl Default for RelationalStore {
    fn default() -> Self {
        Self::new()
    }
}

impl RelationalStore {
    pub fn new() -> Self {
        Self {
            tables: RwLock::new(HashMap::new()),
            table_meta: RwLock::new(HashMap::new()),
            next_row_id: AtomicU64::new(1),
        }
    }

    pub fn new_row_id(&self) -> RowId {
        self.next_row_id.fetch_add(1, Ordering::SeqCst)
    }

    pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
        let mut tables = self.tables.write();
        for (table_name, row) in inserts {
            tables.entry(table_name).or_default().push(row);
        }
    }

    pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, u64)>) {
        let mut tables = self.tables.write();
        for (table_name, row_id, deleted_tx) in deletes {
            if let Some(rows) = tables.get_mut(&table_name) {
                for row in rows.iter_mut() {
                    if row.row_id == row_id && row.deleted_tx.is_none() {
                        row.deleted_tx = Some(deleted_tx);
                    }
                }
            }
        }
    }

    pub fn create_table(&self, name: &str, meta: TableMeta) {
        self.tables.write().entry(name.to_string()).or_default();
        self.table_meta.write().insert(name.to_string(), meta);
    }

    pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
        self.tables
            .write()
            .entry(name.to_string())
            .or_default()
            .push(row);
    }

    pub fn max_row_id(&self) -> RowId {
        self.tables
            .read()
            .values()
            .flat_map(|rows| rows.iter().map(|row| row.row_id))
            .max()
            .unwrap_or(0)
    }

    pub fn set_next_row_id(&self, next_row_id: RowId) {
        self.next_row_id.store(next_row_id, Ordering::SeqCst);
    }

    pub fn drop_table(&self, name: &str) {
        self.tables.write().remove(name);
        self.table_meta.write().remove(name);
    }

    pub fn alter_table_add_column(
        &self,
        table: &str,
        col: contextdb_core::ColumnDef,
    ) -> Result<(), String> {
        let mut meta = self.table_meta.write();
        let m = meta
            .get_mut(table)
            .ok_or_else(|| format!("table '{}' not found", table))?;
        if m.columns.iter().any(|c| c.name == col.name) {
            return Err(format!(
                "column '{}' already exists in table '{}'",
                col.name, table
            ));
        }
        m.columns.push(col);
        Ok(())
    }

    pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
        {
            let mut meta = self.table_meta.write();
            let m = meta
                .get_mut(table)
                .ok_or_else(|| format!("table '{}' not found", table))?;
            let pos = m
                .columns
                .iter()
                .position(|c| c.name == column)
                .ok_or_else(|| {
                    format!("column '{}' does not exist in table '{}'", column, table)
                })?;
            if m.columns[pos].primary_key {
                return Err(format!("cannot drop primary key column '{}'", column));
            }
            m.columns.remove(pos);
        }
        {
            let mut tables = self.tables.write();
            if let Some(rows) = tables.get_mut(table) {
                for row in rows.iter_mut() {
                    row.values.remove(column);
                }
            }
        }
        Ok(())
    }

    pub fn alter_table_rename_column(
        &self,
        table: &str,
        from: &str,
        to: &str,
    ) -> Result<(), String> {
        {
            let mut meta = self.table_meta.write();
            let m = meta
                .get_mut(table)
                .ok_or_else(|| format!("table '{}' not found", table))?;
            if m.columns.iter().any(|c| c.name == to) {
                return Err(format!(
                    "column '{}' already exists in table '{}'",
                    to, table
                ));
            }
            let col = m
                .columns
                .iter_mut()
                .find(|c| c.name == from)
                .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
            if col.primary_key {
                return Err(format!("cannot rename primary key column '{}'", from));
            }
            col.name = to.to_string();
        }
        {
            let mut tables = self.tables.write();
            if let Some(rows) = tables.get_mut(table) {
                for row in rows.iter_mut() {
                    if let Some(val) = row.values.remove(from) {
                        row.values.insert(to.to_string(), val);
                    }
                }
            }
        }
        Ok(())
    }

    pub fn is_immutable(&self, table: &str) -> bool {
        self.table_meta
            .read()
            .get(table)
            .is_some_and(|m| m.immutable)
    }

    pub fn validate_state_transition(
        &self,
        table: &str,
        column: &str,
        from: &str,
        to: &str,
    ) -> bool {
        self.table_meta
            .read()
            .get(table)
            .and_then(|m| m.state_machine.as_ref())
            .filter(|sm| sm.column == column)
            .is_none_or(|sm| {
                sm.transitions
                    .get(from)
                    .is_some_and(|targets| targets.iter().any(|t| t == to))
            })
    }

    pub fn table_names(&self) -> Vec<String> {
        let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
        names.sort();
        names
    }

    pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
        self.table_meta.read().get(name).cloned()
    }
}