1use crate::composite_store::ChangeLogEntry;
2use crate::sync_types::DdlChange;
3use contextdb_core::{AdjEntry, Error, Result, TableMeta, VectorEntry, VersionedRow};
4use contextdb_tx::WriteSet;
5use redb::{ReadableDatabase, ReadableTable, TableDefinition};
6use std::collections::HashMap;
7use std::path::Path;
8
9const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
10const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("config");
11const CHANGE_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("change_log");
12const DDL_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("ddl_log");
13const GRAPH_FWD_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_fwd");
14const GRAPH_REV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_rev");
15const VECTORS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("vectors");
16
17pub struct RedbPersistence {
18 path: std::path::PathBuf,
19}
20
21impl RedbPersistence {
22 pub fn create(path: &Path) -> Result<Self> {
23 let _db = redb::Database::create(path).map_err(Self::storage_error)?;
24 Self::acquire_pid_lock(path)?;
25 Ok(Self {
26 path: path.to_path_buf(),
27 })
28 }
29
30 pub fn open(path: &Path) -> Result<Self> {
31 Self::acquire_pid_lock(path)?;
32 let _db = redb::Database::open(path).map_err(Self::storage_error)?;
33 Ok(Self {
34 path: path.to_path_buf(),
35 })
36 }
37
38 pub fn close(&self) {
39 Self::release_pid_lock(&self.path);
40 }
41
42 pub fn path(&self) -> &Path {
43 &self.path
44 }
45
46 fn acquire_pid_lock(path: &Path) -> Result<()> {
49 let lock_path = path.with_extension("lock");
50 if lock_path.exists()
51 && let Ok(contents) = std::fs::read_to_string(&lock_path)
52 && let Ok(pid) = contents.trim().parse::<u32>()
53 {
54 let proc_path = format!("/proc/{}", pid);
55 if std::path::Path::new(&proc_path).exists() && pid != std::process::id() {
56 return Err(Error::Other(
57 "database is locked (another process may have it open)".to_string(),
58 ));
59 }
60 }
62 std::fs::write(&lock_path, std::process::id().to_string()).map_err(Self::storage_error)?;
63 Ok(())
64 }
65
66 fn release_pid_lock(path: &Path) {
67 let lock_path = path.with_extension("lock");
68 let _ = std::fs::remove_file(&lock_path);
69 }
70
71 pub fn flush_data(&self, ws: &WriteSet) -> Result<()> {
72 self.flush_data_with_logs(ws, &[])
73 }
74
75 pub fn flush_data_with_logs(&self, ws: &WriteSet, change_log: &[ChangeLogEntry]) -> Result<()> {
76 self.with_db(|db| {
77 let write_txn = db.begin_write().map_err(Self::storage_error)?;
78
79 for (table, row) in &ws.relational_inserts {
80 let table_name = Self::rel_table_name(table);
81 let table_def: TableDefinition<u64, &[u8]> =
82 TableDefinition::new(table_name.as_str());
83 let mut redb_table = write_txn
84 .open_table(table_def)
85 .map_err(Self::storage_error)?;
86 let encoded = Self::encode(row)?;
87 redb_table
88 .insert(row.row_id, encoded.as_slice())
89 .map_err(Self::storage_error)?;
90 }
91
92 for (table, row_id, deleted_tx) in &ws.relational_deletes {
93 let table_name = Self::rel_table_name(table);
94 let table_def: TableDefinition<u64, &[u8]> =
95 TableDefinition::new(table_name.as_str());
96 let mut redb_table = write_txn
97 .open_table(table_def)
98 .map_err(Self::storage_error)?;
99 let bytes = {
100 let existing = redb_table
101 .get(*row_id)
102 .map_err(Self::storage_error)?
103 .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))?;
104 let bytes: &[u8] = existing.value();
105 bytes.to_vec()
106 };
107 let mut row: VersionedRow = Self::decode(&bytes)?;
108 row.deleted_tx = Some(*deleted_tx);
109 let encoded = Self::encode(&row)?;
110 redb_table
111 .insert(*row_id, encoded.as_slice())
112 .map_err(Self::storage_error)?;
113 }
114
115 {
116 let mut fwd_table = write_txn
117 .open_table(GRAPH_FWD_TABLE)
118 .map_err(Self::storage_error)?;
119 let mut rev_table = write_txn
120 .open_table(GRAPH_REV_TABLE)
121 .map_err(Self::storage_error)?;
122
123 for entry in &ws.adj_inserts {
124 let encoded = Self::encode(entry)?;
125 let fwd_key = Self::graph_fwd_key(entry);
126 let rev_key = Self::graph_rev_key(entry);
127 fwd_table
128 .insert(fwd_key.as_slice(), encoded.as_slice())
129 .map_err(Self::storage_error)?;
130 rev_table
131 .insert(rev_key.as_slice(), encoded.as_slice())
132 .map_err(Self::storage_error)?;
133 }
134
135 for (source, edge_type, target, deleted_tx) in &ws.adj_deletes {
136 let fwd_key = Self::graph_fwd_key_parts(source, target, edge_type);
137 let rev_key = Self::graph_rev_key_parts(source, target, edge_type);
138
139 let bytes = {
140 let fwd_existing = fwd_table
141 .get(fwd_key.as_slice())
142 .map_err(Self::storage_error)?
143 .ok_or_else(|| {
144 Error::NotFound(format!(
145 "edge {source} -[{edge_type}]-> {target} in graph_fwd"
146 ))
147 })?;
148 let bytes: &[u8] = fwd_existing.value();
149 bytes.to_vec()
150 };
151 let mut edge: AdjEntry = Self::decode(&bytes)?;
152 edge.deleted_tx = Some(*deleted_tx);
153 let encoded = Self::encode(&edge)?;
154
155 fwd_table
156 .insert(fwd_key.as_slice(), encoded.as_slice())
157 .map_err(Self::storage_error)?;
158 rev_table
159 .insert(rev_key.as_slice(), encoded.as_slice())
160 .map_err(Self::storage_error)?;
161 }
162 }
163
164 {
165 let mut vectors_table = write_txn
166 .open_table(VECTORS_TABLE)
167 .map_err(Self::storage_error)?;
168
169 for entry in &ws.vector_inserts {
170 let encoded = Self::encode(entry)?;
171 vectors_table
172 .insert(entry.row_id, encoded.as_slice())
173 .map_err(Self::storage_error)?;
174 }
175
176 for (row_id, deleted_tx) in &ws.vector_deletes {
177 let bytes = {
178 let existing = vectors_table
179 .get(*row_id)
180 .map_err(Self::storage_error)?
181 .ok_or_else(|| Error::NotFound(format!("vector row {row_id}")))?;
182 let bytes: &[u8] = existing.value();
183 bytes.to_vec()
184 };
185 let mut entry: VectorEntry = Self::decode(&bytes)?;
186 entry.deleted_tx = Some(*deleted_tx);
187 let encoded = Self::encode(&entry)?;
188 vectors_table
189 .insert(*row_id, encoded.as_slice())
190 .map_err(Self::storage_error)?;
191 }
192 }
193
194 if !change_log.is_empty() {
195 let mut table = write_txn
196 .open_table(CHANGE_LOG_TABLE)
197 .map_err(Self::storage_error)?;
198 let lsn = ws.commit_lsn.unwrap_or(0);
199 for (index, entry) in change_log.iter().enumerate() {
200 let key = Self::change_log_key(lsn, index);
201 let encoded = Self::encode(entry)?;
202 table
203 .insert(key.as_str(), encoded.as_slice())
204 .map_err(Self::storage_error)?;
205 }
206 }
207
208 write_txn.commit().map_err(Self::storage_error)?;
209 Ok(())
210 })
211 }
212
213 pub fn flush_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
214 self.with_db(|db| {
215 let write_txn = db.begin_write().map_err(Self::storage_error)?;
216 {
217 let mut meta_table = write_txn
218 .open_table(META_TABLE)
219 .map_err(Self::storage_error)?;
220 let key = Self::meta_key(name);
221 let encoded = Self::encode(meta)?;
222 meta_table
223 .insert(key.as_str(), encoded.as_slice())
224 .map_err(Self::storage_error)?;
225 }
226 write_txn.commit().map_err(Self::storage_error)?;
227 Ok(())
228 })
229 }
230
231 pub fn remove_table_meta(&self, name: &str) -> Result<()> {
232 self.with_db(|db| {
233 let write_txn = db.begin_write().map_err(Self::storage_error)?;
234 {
235 let mut meta_table = write_txn
236 .open_table(META_TABLE)
237 .map_err(Self::storage_error)?;
238 let key = Self::meta_key(name);
239 meta_table
240 .remove(key.as_str())
241 .map_err(Self::storage_error)?;
242 }
243 write_txn.commit().map_err(Self::storage_error)?;
244 Ok(())
245 })
246 }
247
248 pub fn flush_config_value<T: serde::Serialize>(&self, key: &str, value: &T) -> Result<()> {
249 self.with_db(|db| {
250 let write_txn = db.begin_write().map_err(Self::storage_error)?;
251 {
252 let mut config_table = write_txn
253 .open_table(CONFIG_TABLE)
254 .map_err(Self::storage_error)?;
255 let encoded = Self::encode(value)?;
256 config_table
257 .insert(key, encoded.as_slice())
258 .map_err(Self::storage_error)?;
259 }
260 write_txn.commit().map_err(Self::storage_error)?;
261 Ok(())
262 })
263 }
264
265 pub fn remove_config_value(&self, key: &str) -> Result<()> {
266 self.with_db(|db| {
267 let write_txn = db.begin_write().map_err(Self::storage_error)?;
268 {
269 let mut config_table = write_txn
270 .open_table(CONFIG_TABLE)
271 .map_err(Self::storage_error)?;
272 config_table.remove(key).map_err(Self::storage_error)?;
273 }
274 write_txn.commit().map_err(Self::storage_error)?;
275 Ok(())
276 })
277 }
278
279 pub fn append_change_log(&self, lsn: u64, entries: &[ChangeLogEntry]) -> Result<()> {
280 if entries.is_empty() {
281 return Ok(());
282 }
283 self.with_db(|db| {
284 let write_txn = db.begin_write().map_err(Self::storage_error)?;
285 {
286 let mut table = write_txn
287 .open_table(CHANGE_LOG_TABLE)
288 .map_err(Self::storage_error)?;
289 for (index, entry) in entries.iter().enumerate() {
290 let key = Self::change_log_key(lsn, index);
291 let encoded = Self::encode(entry)?;
292 table
293 .insert(key.as_str(), encoded.as_slice())
294 .map_err(Self::storage_error)?;
295 }
296 }
297 write_txn.commit().map_err(Self::storage_error)?;
298 Ok(())
299 })
300 }
301
302 pub fn append_ddl_log(&self, lsn: u64, change: &DdlChange) -> Result<()> {
303 self.with_db(|db| {
304 let write_txn = db.begin_write().map_err(Self::storage_error)?;
305 {
306 let mut table = write_txn
307 .open_table(DDL_LOG_TABLE)
308 .map_err(Self::storage_error)?;
309 let key = Self::ddl_log_key(lsn);
310 let encoded = Self::encode(change)?;
311 table
312 .insert(key.as_str(), encoded.as_slice())
313 .map_err(Self::storage_error)?;
314 }
315 write_txn.commit().map_err(Self::storage_error)?;
316 Ok(())
317 })
318 }
319
320 pub fn remove_table_data(&self, name: &str) -> Result<()> {
321 self.with_db(|db| {
322 let write_txn = db.begin_write().map_err(Self::storage_error)?;
323 let table_name = Self::rel_table_name(name);
324 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
325 let _ = write_txn
326 .delete_table(table_def)
327 .map_err(Self::storage_error)?;
328 write_txn.commit().map_err(Self::storage_error)?;
329 Ok(())
330 })
331 }
332
333 pub fn rewrite_table_rows(&self, name: &str, rows: &[VersionedRow]) -> Result<()> {
334 self.with_db(|db| {
335 let write_txn = db.begin_write().map_err(Self::storage_error)?;
336 {
337 let table_name = Self::rel_table_name(name);
338 let table_def: TableDefinition<u64, &[u8]> =
339 TableDefinition::new(table_name.as_str());
340 let _ = write_txn.delete_table(table_def);
341 let mut redb_table = write_txn
342 .open_table(table_def)
343 .map_err(Self::storage_error)?;
344 for row in rows {
345 let encoded = Self::encode(row)?;
346 redb_table
347 .insert(row.row_id, encoded.as_slice())
348 .map_err(Self::storage_error)?;
349 }
350 }
351 write_txn.commit().map_err(Self::storage_error)?;
352 Ok(())
353 })
354 }
355
356 pub fn rewrite_vectors(&self, vectors: &[VectorEntry]) -> Result<()> {
357 self.with_db(|db| {
358 let write_txn = db.begin_write().map_err(Self::storage_error)?;
359 let _ = write_txn.delete_table(VECTORS_TABLE);
360 {
361 let mut table = write_txn
362 .open_table(VECTORS_TABLE)
363 .map_err(Self::storage_error)?;
364 for entry in vectors {
365 let encoded = Self::encode(entry)?;
366 table
367 .insert(entry.row_id, encoded.as_slice())
368 .map_err(Self::storage_error)?;
369 }
370 }
371 write_txn.commit().map_err(Self::storage_error)?;
372 Ok(())
373 })
374 }
375
376 pub fn rewrite_graph_edges(&self, edges: &[AdjEntry]) -> Result<()> {
377 self.with_db(|db| {
378 let write_txn = db.begin_write().map_err(Self::storage_error)?;
379 let _ = write_txn.delete_table(GRAPH_FWD_TABLE);
380 let _ = write_txn.delete_table(GRAPH_REV_TABLE);
381 {
382 let mut fwd_table = write_txn
383 .open_table(GRAPH_FWD_TABLE)
384 .map_err(Self::storage_error)?;
385 let mut rev_table = write_txn
386 .open_table(GRAPH_REV_TABLE)
387 .map_err(Self::storage_error)?;
388
389 for entry in edges {
390 let encoded = Self::encode(entry)?;
391 let fwd_key = Self::graph_fwd_key(entry);
392 let rev_key = Self::graph_rev_key(entry);
393 fwd_table
394 .insert(fwd_key.as_slice(), encoded.as_slice())
395 .map_err(Self::storage_error)?;
396 rev_table
397 .insert(rev_key.as_slice(), encoded.as_slice())
398 .map_err(Self::storage_error)?;
399 }
400 }
401 write_txn.commit().map_err(Self::storage_error)?;
402 Ok(())
403 })
404 }
405
406 pub fn load_all_table_meta(&self) -> Result<HashMap<String, TableMeta>> {
407 self.with_db(|db| {
408 let read_txn = db.begin_read().map_err(Self::storage_error)?;
409 let meta_table = match read_txn.open_table(META_TABLE) {
410 Ok(table) => table,
411 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(HashMap::new()),
412 Err(err) => return Err(Self::storage_error(err)),
413 };
414
415 let mut tables = HashMap::new();
416 for entry in meta_table.iter().map_err(Self::storage_error)? {
417 let (key, value) = entry.map_err(Self::storage_error)?;
418 let key = key.value();
419 if let Some(name) = key.strip_prefix("table:") {
420 tables.insert(name.to_string(), Self::decode(value.value())?);
421 }
422 }
423 Ok(tables)
424 })
425 }
426
427 pub fn load_config_value<T: serde::de::DeserializeOwned>(
428 &self,
429 key: &str,
430 ) -> Result<Option<T>> {
431 self.with_db(|db| {
432 let read_txn = db.begin_read().map_err(Self::storage_error)?;
433 let config_table = match read_txn.open_table(CONFIG_TABLE) {
434 Ok(table) => table,
435 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
436 Err(err) => return Err(Self::storage_error(err)),
437 };
438 let value = match config_table.get(key).map_err(Self::storage_error)? {
439 Some(value) => Some(Self::decode(value.value())?),
440 None => None,
441 };
442 Ok(value)
443 })
444 }
445
446 pub fn load_relational_table(&self, name: &str) -> Result<Vec<VersionedRow>> {
447 self.with_db(|db| {
448 let read_txn = db.begin_read().map_err(Self::storage_error)?;
449 let table_name = Self::rel_table_name(name);
450 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
451 let table = match read_txn.open_table(table_def) {
452 Ok(table) => table,
453 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
454 Err(err) => return Err(Self::storage_error(err)),
455 };
456
457 let mut rows = Vec::new();
458 for entry in table.iter().map_err(Self::storage_error)? {
459 let (_, value) = entry.map_err(Self::storage_error)?;
460 rows.push(Self::decode(value.value())?);
461 }
462 Ok(rows)
463 })
464 }
465
466 pub fn load_all_tables(&self) -> Result<HashMap<String, Vec<VersionedRow>>> {
467 let mut all_tables = HashMap::new();
468 for name in self.load_all_table_meta()?.into_keys() {
469 let rows = self.load_relational_table(&name)?;
470 all_tables.insert(name, rows);
471 }
472 Ok(all_tables)
473 }
474
475 pub fn load_forward_edges(&self) -> Result<Vec<AdjEntry>> {
476 self.load_graph_table(GRAPH_FWD_TABLE)
477 }
478
479 pub fn load_reverse_edges(&self) -> Result<Vec<AdjEntry>> {
480 self.load_graph_table(GRAPH_REV_TABLE)
481 }
482
483 pub fn load_vectors(&self) -> Result<Vec<VectorEntry>> {
484 self.with_db(|db| {
485 let read_txn = db.begin_read().map_err(Self::storage_error)?;
486 let table = match read_txn.open_table(VECTORS_TABLE) {
487 Ok(table) => table,
488 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
489 Err(err) => return Err(Self::storage_error(err)),
490 };
491
492 let mut vectors = Vec::new();
493 for entry in table.iter().map_err(Self::storage_error)? {
494 let (_, value) = entry.map_err(Self::storage_error)?;
495 vectors.push(Self::decode(value.value())?);
496 }
497 Ok(vectors)
498 })
499 }
500
501 pub fn load_change_log(&self) -> Result<Vec<ChangeLogEntry>> {
502 self.with_db(|db| {
503 let read_txn = db.begin_read().map_err(Self::storage_error)?;
504 let table = match read_txn.open_table(CHANGE_LOG_TABLE) {
505 Ok(table) => table,
506 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
507 Err(err) => return Err(Self::storage_error(err)),
508 };
509
510 let mut entries = Vec::new();
511 for entry in table.iter().map_err(Self::storage_error)? {
512 let (_, value) = entry.map_err(Self::storage_error)?;
513 entries.push(Self::decode(value.value())?);
514 }
515 Ok(entries)
516 })
517 }
518
519 pub fn load_ddl_log(&self) -> Result<Vec<(u64, DdlChange)>> {
520 self.with_db(|db| {
521 let read_txn = db.begin_read().map_err(Self::storage_error)?;
522 let table = match read_txn.open_table(DDL_LOG_TABLE) {
523 Ok(table) => table,
524 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
525 Err(err) => return Err(Self::storage_error(err)),
526 };
527
528 let mut entries = Vec::new();
529 for entry in table.iter().map_err(Self::storage_error)? {
530 let (key, value) = entry.map_err(Self::storage_error)?;
531 let lsn = key
532 .value()
533 .parse::<u64>()
534 .map_err(|err| Error::Other(format!("invalid ddl log key: {err}")))?;
535 entries.push((lsn, Self::decode(value.value())?));
536 }
537 Ok(entries)
538 })
539 }
540
541 fn load_graph_table(&self, definition: TableDefinition<&[u8], &[u8]>) -> Result<Vec<AdjEntry>> {
542 self.with_db(|db| {
543 let read_txn = db.begin_read().map_err(Self::storage_error)?;
544 let table = match read_txn.open_table(definition) {
545 Ok(table) => table,
546 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
547 Err(err) => return Err(Self::storage_error(err)),
548 };
549
550 let mut entries = Vec::new();
551 for entry in table.iter().map_err(Self::storage_error)? {
552 let (_, value) = entry.map_err(Self::storage_error)?;
553 entries.push(Self::decode(value.value())?);
554 }
555 Ok(entries)
556 })
557 }
558
559 fn rel_table_name(name: &str) -> String {
560 format!("rel_{name}")
561 }
562
563 fn meta_key(name: &str) -> String {
564 format!("table:{name}")
565 }
566
567 fn change_log_key(lsn: u64, index: usize) -> String {
568 format!("{lsn:020}:{index:06}")
569 }
570
571 fn ddl_log_key(lsn: u64) -> String {
572 format!("{lsn:020}")
573 }
574
575 fn graph_fwd_key(entry: &AdjEntry) -> Vec<u8> {
576 Self::graph_fwd_key_parts(&entry.source, &entry.target, &entry.edge_type)
577 }
578
579 fn graph_rev_key(entry: &AdjEntry) -> Vec<u8> {
580 Self::graph_rev_key_parts(&entry.source, &entry.target, &entry.edge_type)
581 }
582
583 fn graph_fwd_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
584 let mut key = Vec::with_capacity(32 + edge_type.len());
585 key.extend_from_slice(source.as_bytes());
586 key.extend_from_slice(target.as_bytes());
587 key.extend_from_slice(edge_type.as_bytes());
588 key
589 }
590
591 fn graph_rev_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
592 let mut key = Vec::with_capacity(32 + edge_type.len());
593 key.extend_from_slice(target.as_bytes());
594 key.extend_from_slice(source.as_bytes());
595 key.extend_from_slice(edge_type.as_bytes());
596 key
597 }
598
599 fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>> {
600 bincode::serde::encode_to_vec(value, bincode::config::standard())
601 .map_err(|err| Error::Other(format!("bincode encode error: {err}")))
602 }
603
604 fn decode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
605 let (value, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())
606 .map_err(|err| Error::Other(format!("bincode decode error: {err}")))?;
607 Ok(value)
608 }
609
610 fn storage_error(err: impl std::fmt::Display) -> Error {
611 let msg = err.to_string();
612 if msg.contains("lock") || msg.contains("already open") {
613 Error::Other(format!(
614 "database is locked (another process may have it open): {msg}"
615 ))
616 } else {
617 Error::Other(format!("redb error: {msg}"))
618 }
619 }
620
621 fn with_db<T>(&self, f: impl FnOnce(&redb::Database) -> Result<T>) -> Result<T> {
622 let db = redb::Database::open(&self.path).map_err(Self::storage_error)?;
623 f(&db)
624 }
625}
626
627impl Drop for RedbPersistence {
628 fn drop(&mut self) {
629 Self::release_pid_lock(&self.path);
630 }
631}