1use crate::composite_store::ChangeLogEntry;
2use crate::sync_types::DdlChange;
3use contextdb_core::{
4 AdjEntry, ColumnType, Error, Lsn, Result, TableMeta, Value, VectorEntry, VectorIndexRef,
5 VectorQuantization, VersionedRow,
6};
7use contextdb_tx::WriteSet;
8use redb::{ReadableDatabase, ReadableTable, TableDefinition};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::panic::{AssertUnwindSafe, catch_unwind};
12use std::path::Path;
13
14const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
15const FORMAT_METADATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("metadata");
16const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("config");
17const CHANGE_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("change_log");
18const DDL_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("ddl_log");
19const GRAPH_FWD_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_fwd");
20const GRAPH_REV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_rev");
21const VECTORS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("vector_entries");
22const FORMAT_VERSION_KEY: &str = "format_version";
23const CURRENT_FORMAT_VERSION: &str = "1.0.0";
24
25pub struct RedbPersistence {
26 path: std::path::PathBuf,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30struct PersistedVersionedRow {
31 row_id: contextdb_core::RowId,
32 values: HashMap<String, PersistedValue>,
33 created_tx: contextdb_core::TxId,
34 deleted_tx: Option<contextdb_core::TxId>,
35 lsn: Lsn,
36 created_at: Option<contextdb_core::Wallclock>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40enum PersistedValue {
41 Plain(Value),
42 Vector(PersistedVector),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct PersistedVectorEntry {
47 index: VectorIndexRef,
48 row_id: contextdb_core::RowId,
49 vector: PersistedVector,
50 created_tx: contextdb_core::TxId,
51 deleted_tx: Option<contextdb_core::TxId>,
52 lsn: Lsn,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56enum PersistedVector {
57 F32(Vec<f32>),
58 SQ8 {
59 min: f32,
60 max: f32,
61 len: u32,
62 payload: Vec<u8>,
63 },
64 SQ4 {
65 min: f32,
66 max: f32,
67 len: u32,
68 payload: Vec<u8>,
69 },
70}
71
72impl PersistedVector {
73 fn from_f32(vector: &[f32], quantization: VectorQuantization) -> Self {
74 match quantization {
75 VectorQuantization::F32 => PersistedVector::F32(vector.to_vec()),
76 VectorQuantization::SQ8 => {
77 let (min, max) = vector_min_max(vector);
78 let range = max - min;
79 let payload = if range <= f32::EPSILON {
80 vec![0; vector.len()]
81 } else {
82 vector
83 .iter()
84 .map(|value| {
85 (((*value - min) / range) * 255.0).round().clamp(0.0, 255.0) as u8
86 })
87 .collect()
88 };
89 PersistedVector::SQ8 {
90 min,
91 max,
92 len: vector.len() as u32,
93 payload,
94 }
95 }
96 VectorQuantization::SQ4 => {
97 let (min, max) = vector_min_max(vector);
98 let range = max - min;
99 let mut payload = Vec::with_capacity(vector.len().div_ceil(2));
100 let quantized = if range <= f32::EPSILON {
101 vec![0; vector.len()]
102 } else {
103 vector
104 .iter()
105 .map(|value| {
106 (((*value - min) / range) * 15.0).round().clamp(0.0, 15.0) as u8
107 })
108 .collect::<Vec<_>>()
109 };
110 for pair in quantized.chunks(2) {
111 let hi = pair[0] & 0x0f;
112 let lo = pair.get(1).copied().unwrap_or(0) & 0x0f;
113 payload.push((hi << 4) | lo);
114 }
115 PersistedVector::SQ4 {
116 min,
117 max,
118 len: vector.len() as u32,
119 payload,
120 }
121 }
122 }
123 }
124
125 fn to_f32(&self) -> Vec<f32> {
126 match self {
127 PersistedVector::F32(vector) => vector.clone(),
128 PersistedVector::SQ8 {
129 min,
130 max,
131 len,
132 payload,
133 } => {
134 let range = *max - *min;
135 payload
136 .iter()
137 .take(*len as usize)
138 .map(|byte| {
139 if range <= f32::EPSILON {
140 *min
141 } else {
142 *min + ((*byte as f32) / 255.0) * range
143 }
144 })
145 .collect()
146 }
147 PersistedVector::SQ4 {
148 min,
149 max,
150 len,
151 payload,
152 } => {
153 let range = *max - *min;
154 let mut values = Vec::with_capacity(*len as usize);
155 for byte in payload {
156 for q in [byte >> 4, byte & 0x0f] {
157 if values.len() == *len as usize {
158 break;
159 }
160 values.push(if range <= f32::EPSILON {
161 *min
162 } else {
163 *min + ((q as f32) / 15.0) * range
164 });
165 }
166 }
167 values
168 }
169 }
170 }
171}
172
173fn vector_min_max(vector: &[f32]) -> (f32, f32) {
174 let Some((first, rest)) = vector.split_first() else {
175 return (0.0, 0.0);
176 };
177 rest.iter()
178 .copied()
179 .fold((*first, *first), |(min, max), value| {
180 (min.min(value), max.max(value))
181 })
182}
183
184impl RedbPersistence {
185 pub fn create(path: &Path) -> Result<Self> {
186 Self::acquire_pid_lock(path)?;
187 let result = (|| {
188 let db = redb::Database::create(path).map_err(Self::storage_error)?;
189 Self::write_format_marker(&db)?;
190 Ok(Self {
191 path: path.to_path_buf(),
192 })
193 })();
194 if result.is_err() {
195 Self::release_pid_lock(path);
196 }
197 result
198 }
199
200 pub fn open(path: &Path) -> Result<Self> {
201 Self::acquire_pid_lock(path)?;
202 let result = (|| {
203 let db = Self::open_db_checked(path)?;
204 Self::validate_format_marker(&db, path)?;
205 Ok(Self {
206 path: path.to_path_buf(),
207 })
208 })();
209 if result.is_err() {
210 Self::release_pid_lock(path);
211 }
212 result
213 }
214
215 pub fn close(&self) {
216 Self::release_pid_lock(&self.path);
217 }
218
219 pub fn path(&self) -> &Path {
220 &self.path
221 }
222
223 fn acquire_pid_lock(path: &Path) -> Result<()> {
226 let lock_path = path.with_extension("lock");
227 if lock_path.exists()
228 && let Ok(contents) = std::fs::read_to_string(&lock_path)
229 && let Ok(pid) = contents.trim().parse::<u32>()
230 {
231 let proc_path = format!("/proc/{}", pid);
232 if std::path::Path::new(&proc_path).exists() && pid != std::process::id() {
233 return Err(Error::Other(
234 "database is locked (another process may have it open)".to_string(),
235 ));
236 }
237 }
239 std::fs::write(&lock_path, std::process::id().to_string()).map_err(Self::storage_error)?;
240 Ok(())
241 }
242
243 fn release_pid_lock(path: &Path) {
244 let lock_path = path.with_extension("lock");
245 let _ = std::fs::remove_file(&lock_path);
246 }
247
248 fn open_db_checked(path: &Path) -> Result<redb::Database> {
249 match catch_unwind(AssertUnwindSafe(|| redb::Database::open(path))) {
250 Ok(Ok(db)) => Ok(db),
251 Ok(Err(err)) => Err(Error::StoreCorrupted {
252 path: path.display().to_string(),
253 reason: format!("metadata/format could not be read: {err}"),
254 }),
255 Err(_) => Err(Error::StoreCorrupted {
256 path: path.display().to_string(),
257 reason: "metadata/format read panicked; store may be truncated or corrupt"
258 .to_string(),
259 }),
260 }
261 }
262
263 fn write_format_marker(db: &redb::Database) -> Result<()> {
264 let write_txn = db.begin_write().map_err(Self::storage_error)?;
265 {
266 let mut table = write_txn
267 .open_table(FORMAT_METADATA_TABLE)
268 .map_err(Self::storage_error)?;
269 let encoded = Self::encode(&CURRENT_FORMAT_VERSION.to_string())?;
270 table
271 .insert(FORMAT_VERSION_KEY, encoded.as_slice())
272 .map_err(Self::storage_error)?;
273 }
274 write_txn.commit().map_err(Self::storage_error)?;
275 Ok(())
276 }
277
278 fn validate_format_marker(db: &redb::Database, path: &Path) -> Result<()> {
279 let read_txn = db.begin_read().map_err(|err| Error::StoreCorrupted {
280 path: path.display().to_string(),
281 reason: format!("metadata read failed: {err}"),
282 })?;
283 let table = match read_txn.open_table(FORMAT_METADATA_TABLE) {
284 Ok(table) => table,
285 Err(redb::TableError::TableDoesNotExist(_)) => {
286 return Err(Error::LegacyVectorStoreDetected {
287 found_format_marker: String::new(),
288 expected_release: CURRENT_FORMAT_VERSION.to_string(),
289 });
290 }
291 Err(err) => {
292 return Err(Error::StoreCorrupted {
293 path: path.display().to_string(),
294 reason: format!("metadata table could not be read: {err}"),
295 });
296 }
297 };
298 let value = table
299 .get(FORMAT_VERSION_KEY)
300 .map_err(|err| Error::StoreCorrupted {
301 path: path.display().to_string(),
302 reason: format!("metadata format_version could not be read: {err}"),
303 })?
304 .ok_or_else(|| Error::StoreCorrupted {
305 path: path.display().to_string(),
306 reason: "metadata table is missing format_version".to_string(),
307 })?;
308 let marker: String = Self::decode(value.value()).map_err(|err| Error::StoreCorrupted {
309 path: path.display().to_string(),
310 reason: format!("metadata format_version is corrupt: {err}"),
311 })?;
312 if marker == CURRENT_FORMAT_VERSION {
313 Ok(())
314 } else {
315 Err(Error::LegacyVectorStoreDetected {
316 found_format_marker: marker,
317 expected_release: CURRENT_FORMAT_VERSION.to_string(),
318 })
319 }
320 }
321
322 pub fn flush_data(&self, ws: &WriteSet) -> Result<()> {
323 self.flush_data_with_logs(ws, &[])
324 }
325
326 pub fn flush_data_with_logs(&self, ws: &WriteSet, change_log: &[ChangeLogEntry]) -> Result<()> {
327 let table_meta = self.load_all_table_meta()?;
328 let vector_quantization = Self::vector_quantization_map(&table_meta);
329 self.with_db(|db| {
330 let write_txn = db.begin_write().map_err(Self::storage_error)?;
331
332 for (table, row) in &ws.relational_inserts {
333 let table_name = Self::rel_table_name(table);
334 let table_def: TableDefinition<u64, &[u8]> =
335 TableDefinition::new(table_name.as_str());
336 let mut redb_table = write_txn
337 .open_table(table_def)
338 .map_err(Self::storage_error)?;
339 let encoded = Self::encode_versioned_row(row, table_meta.get(table))?;
340 redb_table
341 .insert(row.row_id.0, encoded.as_slice())
342 .map_err(Self::storage_error)?;
343 }
344
345 for (table, row_id, deleted_tx) in &ws.relational_deletes {
346 let table_name = Self::rel_table_name(table);
347 let table_def: TableDefinition<u64, &[u8]> =
348 TableDefinition::new(table_name.as_str());
349 let mut redb_table = write_txn
350 .open_table(table_def)
351 .map_err(Self::storage_error)?;
352 let bytes = {
353 let existing = redb_table
354 .get(row_id.0)
355 .map_err(Self::storage_error)?
356 .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))?;
357 let bytes: &[u8] = existing.value();
358 bytes.to_vec()
359 };
360 let mut row = Self::decode_versioned_row(&bytes, table_meta.get(table))?;
361 row.deleted_tx = Some(*deleted_tx);
362 let encoded = Self::encode_versioned_row(&row, table_meta.get(table))?;
363 redb_table
364 .insert(row_id.0, encoded.as_slice())
365 .map_err(Self::storage_error)?;
366 }
367
368 {
369 let mut fwd_table = write_txn
370 .open_table(GRAPH_FWD_TABLE)
371 .map_err(Self::storage_error)?;
372 let mut rev_table = write_txn
373 .open_table(GRAPH_REV_TABLE)
374 .map_err(Self::storage_error)?;
375
376 for entry in &ws.adj_inserts {
377 let encoded = Self::encode(entry)?;
378 let fwd_key = Self::graph_fwd_key(entry);
379 let rev_key = Self::graph_rev_key(entry);
380 fwd_table
381 .insert(fwd_key.as_slice(), encoded.as_slice())
382 .map_err(Self::storage_error)?;
383 rev_table
384 .insert(rev_key.as_slice(), encoded.as_slice())
385 .map_err(Self::storage_error)?;
386 }
387
388 for (source, edge_type, target, deleted_tx) in &ws.adj_deletes {
389 let fwd_key = Self::graph_fwd_key_parts(source, target, edge_type);
390 let rev_key = Self::graph_rev_key_parts(source, target, edge_type);
391
392 let bytes = {
393 let fwd_existing = fwd_table
394 .get(fwd_key.as_slice())
395 .map_err(Self::storage_error)?
396 .ok_or_else(|| {
397 Error::NotFound(format!(
398 "edge {source} -[{edge_type}]-> {target} in graph_fwd"
399 ))
400 })?;
401 let bytes: &[u8] = fwd_existing.value();
402 bytes.to_vec()
403 };
404 let mut edge: AdjEntry = Self::decode(&bytes)?;
405 edge.deleted_tx = Some(*deleted_tx);
406 let encoded = Self::encode(&edge)?;
407
408 fwd_table
409 .insert(fwd_key.as_slice(), encoded.as_slice())
410 .map_err(Self::storage_error)?;
411 rev_table
412 .insert(rev_key.as_slice(), encoded.as_slice())
413 .map_err(Self::storage_error)?;
414 }
415 }
416
417 {
418 let mut vectors_table = write_txn
419 .open_table(VECTORS_TABLE)
420 .map_err(Self::storage_error)?;
421
422 for entry in &ws.vector_inserts {
423 let quantization = vector_quantization
424 .get(&entry.index)
425 .copied()
426 .unwrap_or_default();
427 let encoded = Self::encode_vector_entry(entry, quantization)?;
428 let key = Self::vector_key(entry);
429 vectors_table
430 .insert(key.as_slice(), encoded.as_slice())
431 .map_err(Self::storage_error)?;
432 }
433
434 for (index, row_id, deleted_tx) in &ws.vector_deletes {
435 let key = Self::vector_key_parts(index, *row_id);
436 let bytes = {
437 let existing = vectors_table
438 .get(key.as_slice())
439 .map_err(Self::storage_error)?
440 .ok_or_else(|| Error::NotFound(format!("vector row {row_id}")))?;
441 let bytes: &[u8] = existing.value();
442 bytes.to_vec()
443 };
444 let mut entry = Self::decode_vector_entry(&bytes)?;
445 entry.deleted_tx = Some(*deleted_tx);
446 let quantization = vector_quantization
447 .get(&entry.index)
448 .copied()
449 .unwrap_or_default();
450 let encoded = Self::encode_vector_entry(&entry, quantization)?;
451 vectors_table
452 .insert(key.as_slice(), encoded.as_slice())
453 .map_err(Self::storage_error)?;
454 }
455
456 for (index, old_row_id, new_row_id, tx) in &ws.vector_moves {
457 let old_key = Self::vector_key_parts(index, *old_row_id);
458 let bytes = {
459 let existing = vectors_table
460 .get(old_key.as_slice())
461 .map_err(Self::storage_error)?
462 .ok_or_else(|| Error::NotFound(format!("vector row {old_row_id}")))?;
463 let bytes: &[u8] = existing.value();
464 bytes.to_vec()
465 };
466 let mut old_entry = Self::decode_vector_entry(&bytes)?;
467 old_entry.deleted_tx = Some(*tx);
468 let quantization = vector_quantization
469 .get(&old_entry.index)
470 .copied()
471 .unwrap_or_default();
472 let old_encoded = Self::encode_vector_entry(&old_entry, quantization)?;
473 vectors_table
474 .insert(old_key.as_slice(), old_encoded.as_slice())
475 .map_err(Self::storage_error)?;
476
477 let mut new_entry = old_entry;
478 new_entry.row_id = *new_row_id;
479 new_entry.created_tx = *tx;
480 new_entry.deleted_tx = None;
481 new_entry.lsn = ws.commit_lsn.unwrap_or(Lsn(0));
482 let new_key = Self::vector_key(&new_entry);
483 let new_encoded = Self::encode_vector_entry(&new_entry, quantization)?;
484 vectors_table
485 .insert(new_key.as_slice(), new_encoded.as_slice())
486 .map_err(Self::storage_error)?;
487 }
488 }
489
490 if !change_log.is_empty() {
491 let mut table = write_txn
492 .open_table(CHANGE_LOG_TABLE)
493 .map_err(Self::storage_error)?;
494 let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
495 for (index, entry) in change_log.iter().enumerate() {
496 let key = Self::change_log_key(lsn, index);
497 let encoded = Self::encode(entry)?;
498 table
499 .insert(key.as_str(), encoded.as_slice())
500 .map_err(Self::storage_error)?;
501 }
502 }
503
504 write_txn.commit().map_err(Self::storage_error)?;
505 Ok(())
506 })
507 }
508
509 pub fn flush_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
510 self.with_db(|db| {
511 let write_txn = db.begin_write().map_err(Self::storage_error)?;
512 {
513 let mut meta_table = write_txn
514 .open_table(META_TABLE)
515 .map_err(Self::storage_error)?;
516 let key = Self::meta_key(name);
517 let encoded = Self::encode(meta)?;
518 meta_table
519 .insert(key.as_str(), encoded.as_slice())
520 .map_err(Self::storage_error)?;
521 }
522 write_txn.commit().map_err(Self::storage_error)?;
523 Ok(())
524 })
525 }
526
527 pub fn remove_table_meta(&self, name: &str) -> Result<()> {
528 self.with_db(|db| {
529 let write_txn = db.begin_write().map_err(Self::storage_error)?;
530 {
531 let mut meta_table = write_txn
532 .open_table(META_TABLE)
533 .map_err(Self::storage_error)?;
534 let key = Self::meta_key(name);
535 meta_table
536 .remove(key.as_str())
537 .map_err(Self::storage_error)?;
538 }
539 write_txn.commit().map_err(Self::storage_error)?;
540 Ok(())
541 })
542 }
543
544 pub fn flush_config_value<T: serde::Serialize>(&self, key: &str, value: &T) -> Result<()> {
545 self.with_db(|db| {
546 let write_txn = db.begin_write().map_err(Self::storage_error)?;
547 {
548 let mut config_table = write_txn
549 .open_table(CONFIG_TABLE)
550 .map_err(Self::storage_error)?;
551 let encoded = Self::encode(value)?;
552 config_table
553 .insert(key, encoded.as_slice())
554 .map_err(Self::storage_error)?;
555 }
556 write_txn.commit().map_err(Self::storage_error)?;
557 Ok(())
558 })
559 }
560
561 pub fn remove_config_value(&self, key: &str) -> Result<()> {
562 self.with_db(|db| {
563 let write_txn = db.begin_write().map_err(Self::storage_error)?;
564 {
565 let mut config_table = write_txn
566 .open_table(CONFIG_TABLE)
567 .map_err(Self::storage_error)?;
568 config_table.remove(key).map_err(Self::storage_error)?;
569 }
570 write_txn.commit().map_err(Self::storage_error)?;
571 Ok(())
572 })
573 }
574
575 pub fn append_change_log(&self, lsn: Lsn, entries: &[ChangeLogEntry]) -> Result<()> {
576 if entries.is_empty() {
577 return Ok(());
578 }
579 self.with_db(|db| {
580 let write_txn = db.begin_write().map_err(Self::storage_error)?;
581 {
582 let mut table = write_txn
583 .open_table(CHANGE_LOG_TABLE)
584 .map_err(Self::storage_error)?;
585 for (index, entry) in entries.iter().enumerate() {
586 let key = Self::change_log_key(lsn, index);
587 let encoded = Self::encode(entry)?;
588 table
589 .insert(key.as_str(), encoded.as_slice())
590 .map_err(Self::storage_error)?;
591 }
592 }
593 write_txn.commit().map_err(Self::storage_error)?;
594 Ok(())
595 })
596 }
597
598 pub fn append_ddl_log(&self, lsn: Lsn, change: &DdlChange) -> Result<()> {
599 self.with_db(|db| {
600 let write_txn = db.begin_write().map_err(Self::storage_error)?;
601 {
602 let mut table = write_txn
603 .open_table(DDL_LOG_TABLE)
604 .map_err(Self::storage_error)?;
605 let key = Self::ddl_log_key(lsn);
606 let encoded = Self::encode(change)?;
607 table
608 .insert(key.as_str(), encoded.as_slice())
609 .map_err(Self::storage_error)?;
610 }
611 write_txn.commit().map_err(Self::storage_error)?;
612 Ok(())
613 })
614 }
615
616 pub fn remove_table_data(&self, name: &str) -> Result<()> {
617 self.with_db(|db| {
618 let write_txn = db.begin_write().map_err(Self::storage_error)?;
619 let table_name = Self::rel_table_name(name);
620 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
621 let _ = write_txn
622 .delete_table(table_def)
623 .map_err(Self::storage_error)?;
624 write_txn.commit().map_err(Self::storage_error)?;
625 Ok(())
626 })
627 }
628
629 pub fn rewrite_table_rows(&self, name: &str, rows: &[VersionedRow]) -> Result<()> {
630 let table_meta = self.load_all_table_meta()?;
631 self.with_db(|db| {
632 let write_txn = db.begin_write().map_err(Self::storage_error)?;
633 {
634 let table_name = Self::rel_table_name(name);
635 let table_def: TableDefinition<u64, &[u8]> =
636 TableDefinition::new(table_name.as_str());
637 let _ = write_txn.delete_table(table_def);
638 let mut redb_table = write_txn
639 .open_table(table_def)
640 .map_err(Self::storage_error)?;
641 for row in rows {
642 let encoded = Self::encode_versioned_row(row, table_meta.get(name))?;
643 redb_table
644 .insert(row.row_id.0, encoded.as_slice())
645 .map_err(Self::storage_error)?;
646 }
647 }
648 write_txn.commit().map_err(Self::storage_error)?;
649 Ok(())
650 })
651 }
652
653 pub fn rewrite_vectors(&self, vectors: &[VectorEntry]) -> Result<()> {
654 let table_meta = self.load_all_table_meta()?;
655 let vector_quantization = Self::vector_quantization_map(&table_meta);
656 self.with_db(|db| {
657 let write_txn = db.begin_write().map_err(Self::storage_error)?;
658 let _ = write_txn.delete_table(VECTORS_TABLE);
659 {
660 let mut table = write_txn
661 .open_table(VECTORS_TABLE)
662 .map_err(Self::storage_error)?;
663 for entry in vectors {
664 let quantization = vector_quantization
665 .get(&entry.index)
666 .copied()
667 .unwrap_or_default();
668 let encoded = Self::encode_vector_entry(entry, quantization)?;
669 let key = Self::vector_key(entry);
670 table
671 .insert(key.as_slice(), encoded.as_slice())
672 .map_err(Self::storage_error)?;
673 }
674 }
675 write_txn.commit().map_err(Self::storage_error)?;
676 Ok(())
677 })
678 }
679
680 pub fn rewrite_graph_edges(&self, edges: &[AdjEntry]) -> Result<()> {
681 self.with_db(|db| {
682 let write_txn = db.begin_write().map_err(Self::storage_error)?;
683 let _ = write_txn.delete_table(GRAPH_FWD_TABLE);
684 let _ = write_txn.delete_table(GRAPH_REV_TABLE);
685 {
686 let mut fwd_table = write_txn
687 .open_table(GRAPH_FWD_TABLE)
688 .map_err(Self::storage_error)?;
689 let mut rev_table = write_txn
690 .open_table(GRAPH_REV_TABLE)
691 .map_err(Self::storage_error)?;
692
693 for entry in edges {
694 let encoded = Self::encode(entry)?;
695 let fwd_key = Self::graph_fwd_key(entry);
696 let rev_key = Self::graph_rev_key(entry);
697 fwd_table
698 .insert(fwd_key.as_slice(), encoded.as_slice())
699 .map_err(Self::storage_error)?;
700 rev_table
701 .insert(rev_key.as_slice(), encoded.as_slice())
702 .map_err(Self::storage_error)?;
703 }
704 }
705 write_txn.commit().map_err(Self::storage_error)?;
706 Ok(())
707 })
708 }
709
710 pub fn load_all_table_meta(&self) -> Result<HashMap<String, TableMeta>> {
711 self.with_db(|db| {
712 let read_txn = db.begin_read().map_err(Self::storage_error)?;
713 let meta_table = match read_txn.open_table(META_TABLE) {
714 Ok(table) => table,
715 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(HashMap::new()),
716 Err(err) => return Err(Self::storage_error(err)),
717 };
718
719 let mut tables = HashMap::new();
720 for entry in meta_table.iter().map_err(Self::storage_error)? {
721 let (key, value) = entry.map_err(Self::storage_error)?;
722 let key = key.value();
723 if let Some(name) = key.strip_prefix("table:") {
724 tables.insert(name.to_string(), Self::decode(value.value())?);
725 }
726 }
727 Ok(tables)
728 })
729 }
730
731 pub fn load_config_value<T: serde::de::DeserializeOwned>(
732 &self,
733 key: &str,
734 ) -> Result<Option<T>> {
735 self.with_db(|db| {
736 let read_txn = db.begin_read().map_err(Self::storage_error)?;
737 let config_table = match read_txn.open_table(CONFIG_TABLE) {
738 Ok(table) => table,
739 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
740 Err(err) => return Err(Self::storage_error(err)),
741 };
742 let value = match config_table.get(key).map_err(Self::storage_error)? {
743 Some(value) => Some(Self::decode(value.value())?),
744 None => None,
745 };
746 Ok(value)
747 })
748 }
749
750 pub fn load_relational_table(&self, name: &str) -> Result<Vec<VersionedRow>> {
751 self.with_db(|db| {
752 let read_txn = db.begin_read().map_err(Self::storage_error)?;
753 let table_meta = Self::load_table_meta_in_read_txn(&read_txn, name)?;
754 let table_name = Self::rel_table_name(name);
755 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
756 let table = match read_txn.open_table(table_def) {
757 Ok(table) => table,
758 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
759 Err(err) => return Err(Self::storage_error(err)),
760 };
761
762 let mut rows = Vec::new();
763 for entry in table.iter().map_err(Self::storage_error)? {
764 let (_, value) = entry.map_err(Self::storage_error)?;
765 rows.push(Self::decode_versioned_row(
766 value.value(),
767 table_meta.as_ref(),
768 )?);
769 }
770 Ok(rows)
771 })
772 }
773
774 pub fn load_all_tables(&self) -> Result<HashMap<String, Vec<VersionedRow>>> {
775 let mut all_tables = HashMap::new();
776 for name in self.load_all_table_meta()?.into_keys() {
777 let rows = self.load_relational_table(&name)?;
778 all_tables.insert(name, rows);
779 }
780 Ok(all_tables)
781 }
782
783 pub fn load_forward_edges(&self) -> Result<Vec<AdjEntry>> {
784 self.load_graph_table(GRAPH_FWD_TABLE)
785 }
786
787 pub fn load_reverse_edges(&self) -> Result<Vec<AdjEntry>> {
788 self.load_graph_table(GRAPH_REV_TABLE)
789 }
790
791 pub fn load_vectors(&self) -> Result<Vec<VectorEntry>> {
792 self.with_db(|db| {
793 let read_txn = db.begin_read().map_err(Self::storage_error)?;
794 let table = match read_txn.open_table(VECTORS_TABLE) {
795 Ok(table) => table,
796 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
797 Err(err) => return Err(Self::storage_error(err)),
798 };
799
800 let mut vectors = Vec::new();
801 for entry in table.iter().map_err(Self::storage_error)? {
802 let (_, value) = entry.map_err(Self::storage_error)?;
803 vectors.push(Self::decode_vector_entry(value.value())?);
804 }
805 Ok(vectors)
806 })
807 }
808
809 pub fn load_change_log(&self) -> Result<Vec<ChangeLogEntry>> {
810 self.with_db(|db| {
811 let read_txn = db.begin_read().map_err(Self::storage_error)?;
812 let table = match read_txn.open_table(CHANGE_LOG_TABLE) {
813 Ok(table) => table,
814 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
815 Err(err) => return Err(Self::storage_error(err)),
816 };
817
818 let mut entries = Vec::new();
819 for entry in table.iter().map_err(Self::storage_error)? {
820 let (_, value) = entry.map_err(Self::storage_error)?;
821 entries.push(Self::decode(value.value())?);
822 }
823 Ok(entries)
824 })
825 }
826
827 pub fn load_ddl_log(&self) -> Result<Vec<(Lsn, DdlChange)>> {
828 self.with_db(|db| {
829 let read_txn = db.begin_read().map_err(Self::storage_error)?;
830 let table = match read_txn.open_table(DDL_LOG_TABLE) {
831 Ok(table) => table,
832 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
833 Err(err) => return Err(Self::storage_error(err)),
834 };
835
836 let mut entries = Vec::new();
837 for entry in table.iter().map_err(Self::storage_error)? {
838 let (key, value) = entry.map_err(Self::storage_error)?;
839 let lsn = key
840 .value()
841 .parse::<u64>()
842 .map_err(|err| Error::Other(format!("invalid ddl log key: {err}")))?;
843 entries.push((Lsn(lsn), Self::decode(value.value())?));
844 }
845 Ok(entries)
846 })
847 }
848
849 fn load_graph_table(&self, definition: TableDefinition<&[u8], &[u8]>) -> Result<Vec<AdjEntry>> {
850 self.with_db(|db| {
851 let read_txn = db.begin_read().map_err(Self::storage_error)?;
852 let table = match read_txn.open_table(definition) {
853 Ok(table) => table,
854 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
855 Err(err) => return Err(Self::storage_error(err)),
856 };
857
858 let mut entries = Vec::new();
859 for entry in table.iter().map_err(Self::storage_error)? {
860 let (_, value) = entry.map_err(Self::storage_error)?;
861 entries.push(Self::decode(value.value())?);
862 }
863 Ok(entries)
864 })
865 }
866
867 fn load_table_meta_in_read_txn(
868 read_txn: &redb::ReadTransaction,
869 name: &str,
870 ) -> Result<Option<TableMeta>> {
871 let meta_table = match read_txn.open_table(META_TABLE) {
872 Ok(table) => table,
873 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
874 Err(err) => return Err(Self::storage_error(err)),
875 };
876 let key = Self::meta_key(name);
877 meta_table
878 .get(key.as_str())
879 .map_err(Self::storage_error)?
880 .map(|value| Self::decode(value.value()))
881 .transpose()
882 }
883
884 fn vector_quantization_map(
885 table_meta: &HashMap<String, TableMeta>,
886 ) -> HashMap<VectorIndexRef, VectorQuantization> {
887 let mut indexes = HashMap::new();
888 for (table, meta) in table_meta {
889 for column in &meta.columns {
890 if matches!(column.column_type, ColumnType::Vector(_)) {
891 indexes.insert(
892 VectorIndexRef::new(table.clone(), column.name.clone()),
893 column.quantization,
894 );
895 }
896 }
897 }
898 indexes
899 }
900
901 fn column_quantization(meta: Option<&TableMeta>, column_name: &str) -> VectorQuantization {
902 meta.and_then(|meta| {
903 meta.columns
904 .iter()
905 .find(|column| {
906 column.name == column_name
907 && matches!(column.column_type, ColumnType::Vector(_))
908 })
909 .map(|column| column.quantization)
910 })
911 .unwrap_or_default()
912 }
913
914 fn encode_versioned_row(row: &VersionedRow, meta: Option<&TableMeta>) -> Result<Vec<u8>> {
915 let values = row
916 .values
917 .iter()
918 .map(|(column, value)| {
919 let persisted = match value {
920 Value::Vector(vector) => {
921 let quantization = Self::column_quantization(meta, column);
922 if matches!(quantization, VectorQuantization::F32) {
923 PersistedValue::Vector(PersistedVector::from_f32(vector, quantization))
924 } else {
925 PersistedValue::Plain(Value::Null)
926 }
927 }
928 _ => PersistedValue::Plain(value.clone()),
929 };
930 (column.clone(), persisted)
931 })
932 .collect::<HashMap<_, _>>();
933 Self::encode(&PersistedVersionedRow {
934 row_id: row.row_id,
935 values,
936 created_tx: row.created_tx,
937 deleted_tx: row.deleted_tx,
938 lsn: row.lsn,
939 created_at: row.created_at,
940 })
941 }
942
943 fn decode_versioned_row(bytes: &[u8], _meta: Option<&TableMeta>) -> Result<VersionedRow> {
944 let persisted: PersistedVersionedRow = Self::decode(bytes)?;
945 let values = persisted
946 .values
947 .into_iter()
948 .map(|(column, value)| {
949 let value = match value {
950 PersistedValue::Plain(value) => value,
951 PersistedValue::Vector(vector) => Value::Vector(vector.to_f32()),
952 };
953 (column, value)
954 })
955 .collect::<HashMap<_, _>>();
956 Ok(VersionedRow {
957 row_id: persisted.row_id,
958 values,
959 created_tx: persisted.created_tx,
960 deleted_tx: persisted.deleted_tx,
961 lsn: persisted.lsn,
962 created_at: persisted.created_at,
963 })
964 }
965
966 fn encode_vector_entry(
967 entry: &VectorEntry,
968 quantization: VectorQuantization,
969 ) -> Result<Vec<u8>> {
970 Self::encode(&PersistedVectorEntry {
971 index: entry.index.clone(),
972 row_id: entry.row_id,
973 vector: PersistedVector::from_f32(&entry.vector, quantization),
974 created_tx: entry.created_tx,
975 deleted_tx: entry.deleted_tx,
976 lsn: entry.lsn,
977 })
978 }
979
980 fn decode_vector_entry(bytes: &[u8]) -> Result<VectorEntry> {
981 let persisted: PersistedVectorEntry = Self::decode(bytes)?;
982 Ok(VectorEntry {
983 index: persisted.index,
984 row_id: persisted.row_id,
985 vector: persisted.vector.to_f32(),
986 created_tx: persisted.created_tx,
987 deleted_tx: persisted.deleted_tx,
988 lsn: persisted.lsn,
989 })
990 }
991
992 fn rel_table_name(name: &str) -> String {
993 format!("rel_{name}")
994 }
995
996 fn meta_key(name: &str) -> String {
997 format!("table:{name}")
998 }
999
1000 fn change_log_key(lsn: Lsn, index: usize) -> String {
1001 format!("{:020}:{index:06}", lsn.0)
1002 }
1003
1004 fn ddl_log_key(lsn: Lsn) -> String {
1005 format!("{:020}", lsn.0)
1006 }
1007
1008 fn graph_fwd_key(entry: &AdjEntry) -> Vec<u8> {
1009 Self::graph_fwd_key_parts(&entry.source, &entry.target, &entry.edge_type)
1010 }
1011
1012 fn graph_rev_key(entry: &AdjEntry) -> Vec<u8> {
1013 Self::graph_rev_key_parts(&entry.source, &entry.target, &entry.edge_type)
1014 }
1015
1016 fn vector_key(entry: &VectorEntry) -> Vec<u8> {
1017 Self::vector_key_parts(&entry.index, entry.row_id)
1018 }
1019
1020 fn vector_key_parts(index: &VectorIndexRef, row_id: contextdb_core::RowId) -> Vec<u8> {
1021 let mut key = Vec::with_capacity(index.table.len() + index.column.len() + 18);
1022 key.extend_from_slice(index.table.as_bytes());
1023 key.push(0);
1024 key.extend_from_slice(index.column.as_bytes());
1025 key.push(0);
1026 key.extend_from_slice(&row_id.0.to_be_bytes());
1027 key
1028 }
1029
1030 fn graph_fwd_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
1031 let mut key = Vec::with_capacity(32 + edge_type.len());
1032 key.extend_from_slice(source.as_bytes());
1033 key.extend_from_slice(target.as_bytes());
1034 key.extend_from_slice(edge_type.as_bytes());
1035 key
1036 }
1037
1038 fn graph_rev_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
1039 let mut key = Vec::with_capacity(32 + edge_type.len());
1040 key.extend_from_slice(target.as_bytes());
1041 key.extend_from_slice(source.as_bytes());
1042 key.extend_from_slice(edge_type.as_bytes());
1043 key
1044 }
1045
1046 fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>> {
1047 bincode::serde::encode_to_vec(value, bincode::config::standard())
1048 .map_err(|err| Error::Other(format!("bincode encode error: {err}")))
1049 }
1050
1051 fn decode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
1052 let (value, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())
1053 .map_err(|err| Error::Other(format!("bincode decode error: {err}")))?;
1054 Ok(value)
1055 }
1056
1057 fn storage_error(err: impl std::fmt::Display) -> Error {
1058 let msg = err.to_string();
1059 if msg.contains("lock") || msg.contains("already open") {
1060 Error::Other(format!(
1061 "database is locked (another process may have it open): {msg}"
1062 ))
1063 } else {
1064 Error::Other(format!("redb error: {msg}"))
1065 }
1066 }
1067
1068 fn with_db<T>(&self, f: impl FnOnce(&redb::Database) -> Result<T>) -> Result<T> {
1069 let db = redb::Database::open(&self.path).map_err(Self::storage_error)?;
1070 f(&db)
1071 }
1072}
1073
1074impl Drop for RedbPersistence {
1075 fn drop(&mut self) {
1076 Self::release_pid_lock(&self.path);
1077 }
1078}