contextdb_relational/
store.rs1use contextdb_core::{RowId, TableMeta, TableName, VersionedRow};
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6pub struct RelationalStore {
7 pub tables: RwLock<HashMap<TableName, Vec<VersionedRow>>>,
8 pub table_meta: RwLock<HashMap<TableName, TableMeta>>,
9 next_row_id: AtomicU64,
10}
11
12impl Default for RelationalStore {
13 fn default() -> Self {
14 Self::new()
15 }
16}
17
18impl RelationalStore {
19 pub fn new() -> Self {
20 Self {
21 tables: RwLock::new(HashMap::new()),
22 table_meta: RwLock::new(HashMap::new()),
23 next_row_id: AtomicU64::new(1),
24 }
25 }
26
27 pub fn new_row_id(&self) -> RowId {
28 self.next_row_id.fetch_add(1, Ordering::SeqCst)
29 }
30
31 pub fn apply_inserts(&self, inserts: Vec<(TableName, VersionedRow)>) {
32 let mut tables = self.tables.write();
33 for (table_name, row) in inserts {
34 tables.entry(table_name).or_default().push(row);
35 }
36 }
37
38 pub fn apply_deletes(&self, deletes: Vec<(TableName, RowId, u64)>) {
39 let mut tables = self.tables.write();
40 for (table_name, row_id, deleted_tx) in deletes {
41 if let Some(rows) = tables.get_mut(&table_name) {
42 for row in rows.iter_mut() {
43 if row.row_id == row_id && row.deleted_tx.is_none() {
44 row.deleted_tx = Some(deleted_tx);
45 }
46 }
47 }
48 }
49 }
50
51 pub fn create_table(&self, name: &str, meta: TableMeta) {
52 self.tables.write().entry(name.to_string()).or_default();
53 self.table_meta.write().insert(name.to_string(), meta);
54 }
55
56 pub fn insert_loaded_row(&self, name: &str, row: VersionedRow) {
57 self.tables
58 .write()
59 .entry(name.to_string())
60 .or_default()
61 .push(row);
62 }
63
64 pub fn max_row_id(&self) -> RowId {
65 self.tables
66 .read()
67 .values()
68 .flat_map(|rows| rows.iter().map(|row| row.row_id))
69 .max()
70 .unwrap_or(0)
71 }
72
73 pub fn set_next_row_id(&self, next_row_id: RowId) {
74 self.next_row_id.store(next_row_id, Ordering::SeqCst);
75 }
76
77 pub fn drop_table(&self, name: &str) {
78 self.tables.write().remove(name);
79 self.table_meta.write().remove(name);
80 }
81
82 pub fn alter_table_add_column(
83 &self,
84 table: &str,
85 col: contextdb_core::ColumnDef,
86 ) -> Result<(), String> {
87 let mut meta = self.table_meta.write();
88 let m = meta
89 .get_mut(table)
90 .ok_or_else(|| format!("table '{}' not found", table))?;
91 if m.columns.iter().any(|c| c.name == col.name) {
92 return Err(format!(
93 "column '{}' already exists in table '{}'",
94 col.name, table
95 ));
96 }
97 m.columns.push(col);
98 Ok(())
99 }
100
101 pub fn alter_table_drop_column(&self, table: &str, column: &str) -> Result<(), String> {
102 {
103 let mut meta = self.table_meta.write();
104 let m = meta
105 .get_mut(table)
106 .ok_or_else(|| format!("table '{}' not found", table))?;
107 let pos = m
108 .columns
109 .iter()
110 .position(|c| c.name == column)
111 .ok_or_else(|| {
112 format!("column '{}' does not exist in table '{}'", column, table)
113 })?;
114 if m.columns[pos].primary_key {
115 return Err(format!("cannot drop primary key column '{}'", column));
116 }
117 m.columns.remove(pos);
118 }
119 {
120 let mut tables = self.tables.write();
121 if let Some(rows) = tables.get_mut(table) {
122 for row in rows.iter_mut() {
123 row.values.remove(column);
124 }
125 }
126 }
127 Ok(())
128 }
129
130 pub fn alter_table_rename_column(
131 &self,
132 table: &str,
133 from: &str,
134 to: &str,
135 ) -> Result<(), String> {
136 {
137 let mut meta = self.table_meta.write();
138 let m = meta
139 .get_mut(table)
140 .ok_or_else(|| format!("table '{}' not found", table))?;
141 if m.columns.iter().any(|c| c.name == to) {
142 return Err(format!(
143 "column '{}' already exists in table '{}'",
144 to, table
145 ));
146 }
147 let col = m
148 .columns
149 .iter_mut()
150 .find(|c| c.name == from)
151 .ok_or_else(|| format!("column '{}' does not exist in table '{}'", from, table))?;
152 if col.primary_key {
153 return Err(format!("cannot rename primary key column '{}'", from));
154 }
155 col.name = to.to_string();
156 }
157 {
158 let mut tables = self.tables.write();
159 if let Some(rows) = tables.get_mut(table) {
160 for row in rows.iter_mut() {
161 if let Some(val) = row.values.remove(from) {
162 row.values.insert(to.to_string(), val);
163 }
164 }
165 }
166 }
167 Ok(())
168 }
169
170 pub fn is_immutable(&self, table: &str) -> bool {
171 self.table_meta
172 .read()
173 .get(table)
174 .is_some_and(|m| m.immutable)
175 }
176
177 pub fn validate_state_transition(
178 &self,
179 table: &str,
180 column: &str,
181 from: &str,
182 to: &str,
183 ) -> bool {
184 self.table_meta
185 .read()
186 .get(table)
187 .and_then(|m| m.state_machine.as_ref())
188 .filter(|sm| sm.column == column)
189 .is_none_or(|sm| {
190 sm.transitions
191 .get(from)
192 .is_some_and(|targets| targets.iter().any(|t| t == to))
193 })
194 }
195
196 pub fn table_names(&self) -> Vec<String> {
197 let mut names: Vec<_> = self.tables.read().keys().cloned().collect();
198 names.sort();
199 names
200 }
201
202 pub fn table_meta(&self, name: &str) -> Option<TableMeta> {
203 self.table_meta.read().get(name).cloned()
204 }
205}