Skip to main content

contextdb_relational/
store.rs

1use contextdb_core::{RowId, TableMeta, TableName, VersionedRow};
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6pub struct RelationalStore {
7    pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
8    pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
9    next_row_id: AtomicU64,
10}
11
12impl Default for RelationalStore {
13    fn default() -> Self {
14        Self::new()
15    }
16}
17
18impl RelationalStore {
19    pub fn new() -> Self {
20        Self {
21            tables: RwLock::new(HashMap::new()),
22            table_meta: RwLock::new(HashMap::new()),
23            next_row_id: AtomicU64::new(1),
24        }
25    }
26
27    pub fn new_row_id(&self) -> RowId {
28        self.next_row_id.fetch_add(1, Ordering::SeqCst)
29    }
30
31    pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
32        let mut tables = self.tables.write();
33        for (table_name, row) in inserts {
34            tables.entry(table_name).or_default().push(row);
35        }
36    }
37
38    pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, u64)>) {
39        let mut tables = self.tables.write();
40        for (table_name, row_id, deleted_tx) in deletes {
41            if let Some(rows) = tables.get_mut(&table_name) {
42                for row in rows.iter_mut() {
43                    if row.row_id == row_id && row.deleted_tx.is_none() {
44                        row.deleted_tx = Some(deleted_tx);
45                    }
46                }
47            }
48        }
49    }
50
51    pub fn create_table(&self, name: &str, meta: TableMeta) {
52        self.tables.write().entry(name.to_string()).or_default();
53        self.table_meta.write().insert(name.to_string(), meta);
54    }
55
56    pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
57        self.tables
58            .write()
59            .entry(name.to_string())
60            .or_default()
61            .push(row);
62    }
63
64    pub fn max_row_id(&self) -> RowId {
65        self.tables
66            .read()
67            .values()
68            .flat_map(|rows| rows.iter().map(|row| row.row_id))
69            .max()
70            .unwrap_or(0)
71    }
72
73    pub fn set_next_row_id(&self, next_row_id: RowId) {
74        self.next_row_id.store(next_row_id, Ordering::SeqCst);
75    }
76
77    pub fn drop_table(&self, name: &str) {
78        self.tables.write().remove(name);
79        self.table_meta.write().remove(name);
80    }
81
82    pub fn alter_table_add_column(
83        &self,
84        table: &str,
85        col: contextdb_core::ColumnDef,
86    ) -> Result<(), String> {
87        let mut meta = self.table_meta.write();
88        let m = meta
89            .get_mut(table)
90            .ok_or_else(|| format!("table '{}' not found", table))?;
91        if m.columns.iter().any(|c| c.name == col.name) {
92            return Err(format!(
93                "column '{}' already exists in table '{}'",
94                col.name, table
95            ));
96        }
97        m.columns.push(col);
98        Ok(())
99    }
100
101    pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
102        {
103            let mut meta = self.table_meta.write();
104            let m = meta
105                .get_mut(table)
106                .ok_or_else(|| format!("table '{}' not found", table))?;
107            let pos = m
108                .columns
109                .iter()
110                .position(|c| c.name == column)
111                .ok_or_else(|| {
112                    format!("column '{}' does not exist in table '{}'", column, table)
113                })?;
114            if m.columns[pos].primary_key {
115                return Err(format!("cannot drop primary key column '{}'", column));
116            }
117            m.columns.remove(pos);
118        }
119        {
120            let mut tables = self.tables.write();
121            if let Some(rows) = tables.get_mut(table) {
122                for row in rows.iter_mut() {
123                    row.values.remove(column);
124                }
125            }
126        }
127        Ok(())
128    }
129
130    pub fn alter_table_rename_column(
131        &self,
132        table: &str,
133        from: &str,
134        to: &str,
135    ) -> Result<(), String> {
136        {
137            let mut meta = self.table_meta.write();
138            let m = meta
139                .get_mut(table)
140                .ok_or_else(|| format!("table '{}' not found", table))?;
141            if m.columns.iter().any(|c| c.name == to) {
142                return Err(format!(
143                    "column '{}' already exists in table '{}'",
144                    to, table
145                ));
146            }
147            let col = m
148                .columns
149                .iter_mut()
150                .find(|c| c.name == from)
151                .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
152            if col.primary_key {
153                return Err(format!("cannot rename primary key column '{}'", from));
154            }
155            col.name = to.to_string();
156        }
157        {
158            let mut tables = self.tables.write();
159            if let Some(rows) = tables.get_mut(table) {
160                for row in rows.iter_mut() {
161                    if let Some(val) = row.values.remove(from) {
162                        row.values.insert(to.to_string(), val);
163                    }
164                }
165            }
166        }
167        Ok(())
168    }
169
170    pub fn is_immutable(&self, table: &str) -> bool {
171        self.table_meta
172            .read()
173            .get(table)
174            .is_some_and(|m| m.immutable)
175    }
176
177    pub fn validate_state_transition(
178        &self,
179        table: &str,
180        column: &str,
181        from: &str,
182        to: &str,
183    ) -> bool {
184        self.table_meta
185            .read()
186            .get(table)
187            .and_then(|m| m.state_machine.as_ref())
188            .filter(|sm| sm.column == column)
189            .is_none_or(|sm| {
190                sm.transitions
191                    .get(from)
192                    .is_some_and(|targets| targets.iter().any(|t| t == to))
193            })
194    }
195
196    pub fn table_names(&self) -> Vec<String> {
197        let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
198        names.sort();
199        names
200    }
201
202    pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
203        self.table_meta.read().get(name).cloned()
204    }
205}