Skip to main content

contextdb_engine/
persistence.rs

1use crate::composite_store::ChangeLogEntry;
2use crate::sync_types::DdlChange;
3use contextdb_core::{
4    AdjEntry, ColumnType, Error, Lsn, Result, TableMeta, Value, VectorEntry, VectorIndexRef,
5    VectorQuantization, VersionedRow,
6};
7use contextdb_tx::WriteSet;
8use redb::{ReadableDatabase, ReadableTable, TableDefinition};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::panic::{AssertUnwindSafe, catch_unwind};
12use std::path::Path;
13
14const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
15const FORMAT_METADATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("metadata");
16const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("config");
17const CHANGE_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("change_log");
18const DDL_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("ddl_log");
19const GRAPH_FWD_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_fwd");
20const GRAPH_REV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_rev");
21const VECTORS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("vector_entries");
22const FORMAT_VERSION_KEY: &str = "format_version";
23const CURRENT_FORMAT_VERSION: &str = "1.0.0";
24
25pub struct RedbPersistence {
26    path: std::path::PathBuf,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30struct PersistedVersionedRow {
31    row_id: contextdb_core::RowId,
32    values: HashMap<String, PersistedValue>,
33    created_tx: contextdb_core::TxId,
34    deleted_tx: Option<contextdb_core::TxId>,
35    lsn: Lsn,
36    created_at: Option<contextdb_core::Wallclock>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40enum PersistedValue {
41    Plain(Value),
42    Vector(PersistedVector),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct PersistedVectorEntry {
47    index: VectorIndexRef,
48    row_id: contextdb_core::RowId,
49    vector: PersistedVector,
50    created_tx: contextdb_core::TxId,
51    deleted_tx: Option<contextdb_core::TxId>,
52    lsn: Lsn,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56enum PersistedVector {
57    F32(Vec<f32>),
58    SQ8 {
59        min: f32,
60        max: f32,
61        len: u32,
62        payload: Vec<u8>,
63    },
64    SQ4 {
65        min: f32,
66        max: f32,
67        len: u32,
68        payload: Vec<u8>,
69    },
70}
71
72impl PersistedVector {
73    fn from_f32(vector: &[f32], quantization: VectorQuantization) -> Self {
74        match quantization {
75            VectorQuantization::F32 => PersistedVector::F32(vector.to_vec()),
76            VectorQuantization::SQ8 => {
77                let (min, max) = vector_min_max(vector);
78                let range = max - min;
79                let payload = if range <= f32::EPSILON {
80                    vec![0; vector.len()]
81                } else {
82                    vector
83                        .iter()
84                        .map(|value| {
85                            (((*value - min) / range) * 255.0).round().clamp(0.0, 255.0) as u8
86                        })
87                        .collect()
88                };
89                PersistedVector::SQ8 {
90                    min,
91                    max,
92                    len: vector.len() as u32,
93                    payload,
94                }
95            }
96            VectorQuantization::SQ4 => {
97                let (min, max) = vector_min_max(vector);
98                let range = max - min;
99                let mut payload = Vec::with_capacity(vector.len().div_ceil(2));
100                let quantized = if range <= f32::EPSILON {
101                    vec![0; vector.len()]
102                } else {
103                    vector
104                        .iter()
105                        .map(|value| {
106                            (((*value - min) / range) * 15.0).round().clamp(0.0, 15.0) as u8
107                        })
108                        .collect::<Vec<_>>()
109                };
110                for pair in quantized.chunks(2) {
111                    let hi = pair[0] & 0x0f;
112                    let lo = pair.get(1).copied().unwrap_or(0) & 0x0f;
113                    payload.push((hi << 4) | lo);
114                }
115                PersistedVector::SQ4 {
116                    min,
117                    max,
118                    len: vector.len() as u32,
119                    payload,
120                }
121            }
122        }
123    }
124
125    fn to_f32(&self) -> Vec<f32> {
126        match self {
127            PersistedVector::F32(vector) => vector.clone(),
128            PersistedVector::SQ8 {
129                min,
130                max,
131                len,
132                payload,
133            } => {
134                let range = *max - *min;
135                payload
136                    .iter()
137                    .take(*len as usize)
138                    .map(|byte| {
139                        if range <= f32::EPSILON {
140                            *min
141                        } else {
142                            *min + ((*byte as f32) / 255.0) * range
143                        }
144                    })
145                    .collect()
146            }
147            PersistedVector::SQ4 {
148                min,
149                max,
150                len,
151                payload,
152            } => {
153                let range = *max - *min;
154                let mut values = Vec::with_capacity(*len as usize);
155                for byte in payload {
156                    for q in [byte >> 4, byte & 0x0f] {
157                        if values.len() == *len as usize {
158                            break;
159                        }
160                        values.push(if range <= f32::EPSILON {
161                            *min
162                        } else {
163                            *min + ((q as f32) / 15.0) * range
164                        });
165                    }
166                }
167                values
168            }
169        }
170    }
171}
172
173fn vector_min_max(vector: &[f32]) -> (f32, f32) {
174    let Some((first, rest)) = vector.split_first() else {
175        return (0.0, 0.0);
176    };
177    rest.iter()
178        .copied()
179        .fold((*first, *first), |(min, max), value| {
180            (min.min(value), max.max(value))
181        })
182}
183
184impl RedbPersistence {
185    pub fn create(path: &Path) -> Result<Self> {
186        Self::acquire_pid_lock(path)?;
187        let result = (|| {
188            let db = redb::Database::create(path).map_err(Self::storage_error)?;
189            Self::write_format_marker(&db)?;
190            Ok(Self {
191                path: path.to_path_buf(),
192            })
193        })();
194        if result.is_err() {
195            Self::release_pid_lock(path);
196        }
197        result
198    }
199
200    pub fn open(path: &Path) -> Result<Self> {
201        Self::acquire_pid_lock(path)?;
202        let result = (|| {
203            let db = Self::open_db_checked(path)?;
204            Self::validate_format_marker(&db, path)?;
205            Ok(Self {
206                path: path.to_path_buf(),
207            })
208        })();
209        if result.is_err() {
210            Self::release_pid_lock(path);
211        }
212        result
213    }
214
215    pub fn close(&self) {
216        Self::release_pid_lock(&self.path);
217    }
218
219    pub fn path(&self) -> &Path {
220        &self.path
221    }
222
223    /// PID-based advisory lock. Writes current PID to a .lock file.
224    /// On open, checks if the .lock file exists and if the PID in it is still alive.
225    fn acquire_pid_lock(path: &Path) -> Result<()> {
226        let lock_path = path.with_extension("lock");
227        if lock_path.exists()
228            && let Ok(contents) = std::fs::read_to_string(&lock_path)
229            && let Ok(pid) = contents.trim().parse::<u32>()
230        {
231            let proc_path = format!("/proc/{}", pid);
232            if std::path::Path::new(&proc_path).exists() && pid != std::process::id() {
233                return Err(Error::Other(
234                    "database is locked (another process may have it open)".to_string(),
235                ));
236            }
237            // Either PID not running (stale lock) or same process — overwrite
238        }
239        std::fs::write(&lock_path, std::process::id().to_string()).map_err(Self::storage_error)?;
240        Ok(())
241    }
242
243    fn release_pid_lock(path: &Path) {
244        let lock_path = path.with_extension("lock");
245        let _ = std::fs::remove_file(&lock_path);
246    }
247
248    fn open_db_checked(path: &Path) -> Result<redb::Database> {
249        match catch_unwind(AssertUnwindSafe(|| redb::Database::open(path))) {
250            Ok(Ok(db)) => Ok(db),
251            Ok(Err(err)) => Err(Error::StoreCorrupted {
252                path: path.display().to_string(),
253                reason: format!("metadata/format could not be read: {err}"),
254            }),
255            Err(_) => Err(Error::StoreCorrupted {
256                path: path.display().to_string(),
257                reason: "metadata/format read panicked; store may be truncated or corrupt"
258                    .to_string(),
259            }),
260        }
261    }
262
263    fn write_format_marker(db: &redb::Database) -> Result<()> {
264        let write_txn = db.begin_write().map_err(Self::storage_error)?;
265        {
266            let mut table = write_txn
267                .open_table(FORMAT_METADATA_TABLE)
268                .map_err(Self::storage_error)?;
269            let encoded = Self::encode(&CURRENT_FORMAT_VERSION.to_string())?;
270            table
271                .insert(FORMAT_VERSION_KEY, encoded.as_slice())
272                .map_err(Self::storage_error)?;
273        }
274        write_txn.commit().map_err(Self::storage_error)?;
275        Ok(())
276    }
277
278    fn validate_format_marker(db: &redb::Database, path: &Path) -> Result<()> {
279        let read_txn = db.begin_read().map_err(|err| Error::StoreCorrupted {
280            path: path.display().to_string(),
281            reason: format!("metadata read failed: {err}"),
282        })?;
283        let table = match read_txn.open_table(FORMAT_METADATA_TABLE) {
284            Ok(table) => table,
285            Err(redb::TableError::TableDoesNotExist(_)) => {
286                return Err(Error::LegacyVectorStoreDetected {
287                    found_format_marker: String::new(),
288                    expected_release: CURRENT_FORMAT_VERSION.to_string(),
289                });
290            }
291            Err(err) => {
292                return Err(Error::StoreCorrupted {
293                    path: path.display().to_string(),
294                    reason: format!("metadata table could not be read: {err}"),
295                });
296            }
297        };
298        let value = table
299            .get(FORMAT_VERSION_KEY)
300            .map_err(|err| Error::StoreCorrupted {
301                path: path.display().to_string(),
302                reason: format!("metadata format_version could not be read: {err}"),
303            })?
304            .ok_or_else(|| Error::StoreCorrupted {
305                path: path.display().to_string(),
306                reason: "metadata table is missing format_version".to_string(),
307            })?;
308        let marker: String = Self::decode(value.value()).map_err(|err| Error::StoreCorrupted {
309            path: path.display().to_string(),
310            reason: format!("metadata format_version is corrupt: {err}"),
311        })?;
312        if marker == CURRENT_FORMAT_VERSION {
313            Ok(())
314        } else {
315            Err(Error::LegacyVectorStoreDetected {
316                found_format_marker: marker,
317                expected_release: CURRENT_FORMAT_VERSION.to_string(),
318            })
319        }
320    }
321
322    pub fn flush_data(&self, ws: &WriteSet) -> Result<()> {
323        self.flush_data_with_logs(ws, &[])
324    }
325
326    pub fn flush_data_with_logs(&self, ws: &WriteSet, change_log: &[ChangeLogEntry]) -> Result<()> {
327        let table_meta = self.load_all_table_meta()?;
328        let vector_quantization = Self::vector_quantization_map(&table_meta);
329        self.with_db(|db| {
330            let write_txn = db.begin_write().map_err(Self::storage_error)?;
331
332            for (table, row) in &ws.relational_inserts {
333                let table_name = Self::rel_table_name(table);
334                let table_def: TableDefinition<u64, &[u8]> =
335                    TableDefinition::new(table_name.as_str());
336                let mut redb_table = write_txn
337                    .open_table(table_def)
338                    .map_err(Self::storage_error)?;
339                let encoded = Self::encode_versioned_row(row, table_meta.get(table))?;
340                redb_table
341                    .insert(row.row_id.0, encoded.as_slice())
342                    .map_err(Self::storage_error)?;
343            }
344
345            for (table, row_id, deleted_tx) in &ws.relational_deletes {
346                let table_name = Self::rel_table_name(table);
347                let table_def: TableDefinition<u64, &[u8]> =
348                    TableDefinition::new(table_name.as_str());
349                let mut redb_table = write_txn
350                    .open_table(table_def)
351                    .map_err(Self::storage_error)?;
352                let bytes = {
353                    let existing = redb_table
354                        .get(row_id.0)
355                        .map_err(Self::storage_error)?
356                        .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))?;
357                    let bytes: &[u8] = existing.value();
358                    bytes.to_vec()
359                };
360                let mut row = Self::decode_versioned_row(&bytes, table_meta.get(table))?;
361                row.deleted_tx = Some(*deleted_tx);
362                let encoded = Self::encode_versioned_row(&row, table_meta.get(table))?;
363                redb_table
364                    .insert(row_id.0, encoded.as_slice())
365                    .map_err(Self::storage_error)?;
366            }
367
368            {
369                let mut fwd_table = write_txn
370                    .open_table(GRAPH_FWD_TABLE)
371                    .map_err(Self::storage_error)?;
372                let mut rev_table = write_txn
373                    .open_table(GRAPH_REV_TABLE)
374                    .map_err(Self::storage_error)?;
375
376                for entry in &ws.adj_inserts {
377                    let encoded = Self::encode(entry)?;
378                    let fwd_key = Self::graph_fwd_key(entry);
379                    let rev_key = Self::graph_rev_key(entry);
380                    fwd_table
381                        .insert(fwd_key.as_slice(), encoded.as_slice())
382                        .map_err(Self::storage_error)?;
383                    rev_table
384                        .insert(rev_key.as_slice(), encoded.as_slice())
385                        .map_err(Self::storage_error)?;
386                }
387
388                for (source, edge_type, target, deleted_tx) in &ws.adj_deletes {
389                    let fwd_key = Self::graph_fwd_key_parts(source, target, edge_type);
390                    let rev_key = Self::graph_rev_key_parts(source, target, edge_type);
391
392                    let bytes = {
393                        let fwd_existing = fwd_table
394                            .get(fwd_key.as_slice())
395                            .map_err(Self::storage_error)?
396                            .ok_or_else(|| {
397                                Error::NotFound(format!(
398                                    "edge {source} -[{edge_type}]-> {target} in graph_fwd"
399                                ))
400                            })?;
401                        let bytes: &[u8] = fwd_existing.value();
402                        bytes.to_vec()
403                    };
404                    let mut edge: AdjEntry = Self::decode(&bytes)?;
405                    edge.deleted_tx = Some(*deleted_tx);
406                    let encoded = Self::encode(&edge)?;
407
408                    fwd_table
409                        .insert(fwd_key.as_slice(), encoded.as_slice())
410                        .map_err(Self::storage_error)?;
411                    rev_table
412                        .insert(rev_key.as_slice(), encoded.as_slice())
413                        .map_err(Self::storage_error)?;
414                }
415            }
416
417            {
418                let mut vectors_table = write_txn
419                    .open_table(VECTORS_TABLE)
420                    .map_err(Self::storage_error)?;
421
422                for entry in &ws.vector_inserts {
423                    let quantization = vector_quantization
424                        .get(&entry.index)
425                        .copied()
426                        .unwrap_or_default();
427                    let encoded = Self::encode_vector_entry(entry, quantization)?;
428                    let key = Self::vector_key(entry);
429                    vectors_table
430                        .insert(key.as_slice(), encoded.as_slice())
431                        .map_err(Self::storage_error)?;
432                }
433
434                for (index, row_id, deleted_tx) in &ws.vector_deletes {
435                    let key = Self::vector_key_parts(index, *row_id);
436                    let bytes = {
437                        let existing = vectors_table
438                            .get(key.as_slice())
439                            .map_err(Self::storage_error)?
440                            .ok_or_else(|| Error::NotFound(format!("vector row {row_id}")))?;
441                        let bytes: &[u8] = existing.value();
442                        bytes.to_vec()
443                    };
444                    let mut entry = Self::decode_vector_entry(&bytes)?;
445                    entry.deleted_tx = Some(*deleted_tx);
446                    let quantization = vector_quantization
447                        .get(&entry.index)
448                        .copied()
449                        .unwrap_or_default();
450                    let encoded = Self::encode_vector_entry(&entry, quantization)?;
451                    vectors_table
452                        .insert(key.as_slice(), encoded.as_slice())
453                        .map_err(Self::storage_error)?;
454                }
455
456                for (index, old_row_id, new_row_id, tx) in &ws.vector_moves {
457                    let old_key = Self::vector_key_parts(index, *old_row_id);
458                    let bytes = {
459                        let existing = vectors_table
460                            .get(old_key.as_slice())
461                            .map_err(Self::storage_error)?
462                            .ok_or_else(|| Error::NotFound(format!("vector row {old_row_id}")))?;
463                        let bytes: &[u8] = existing.value();
464                        bytes.to_vec()
465                    };
466                    let mut old_entry = Self::decode_vector_entry(&bytes)?;
467                    old_entry.deleted_tx = Some(*tx);
468                    let quantization = vector_quantization
469                        .get(&old_entry.index)
470                        .copied()
471                        .unwrap_or_default();
472                    let old_encoded = Self::encode_vector_entry(&old_entry, quantization)?;
473                    vectors_table
474                        .insert(old_key.as_slice(), old_encoded.as_slice())
475                        .map_err(Self::storage_error)?;
476
477                    let mut new_entry = old_entry;
478                    new_entry.row_id = *new_row_id;
479                    new_entry.created_tx = *tx;
480                    new_entry.deleted_tx = None;
481                    new_entry.lsn = ws.commit_lsn.unwrap_or(Lsn(0));
482                    let new_key = Self::vector_key(&new_entry);
483                    let new_encoded = Self::encode_vector_entry(&new_entry, quantization)?;
484                    vectors_table
485                        .insert(new_key.as_slice(), new_encoded.as_slice())
486                        .map_err(Self::storage_error)?;
487                }
488            }
489
490            if !change_log.is_empty() {
491                let mut table = write_txn
492                    .open_table(CHANGE_LOG_TABLE)
493                    .map_err(Self::storage_error)?;
494                let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
495                for (index, entry) in change_log.iter().enumerate() {
496                    let key = Self::change_log_key(lsn, index);
497                    let encoded = Self::encode(entry)?;
498                    table
499                        .insert(key.as_str(), encoded.as_slice())
500                        .map_err(Self::storage_error)?;
501                }
502            }
503
504            write_txn.commit().map_err(Self::storage_error)?;
505            Ok(())
506        })
507    }
508
509    pub fn flush_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
510        self.with_db(|db| {
511            let write_txn = db.begin_write().map_err(Self::storage_error)?;
512            {
513                let mut meta_table = write_txn
514                    .open_table(META_TABLE)
515                    .map_err(Self::storage_error)?;
516                let key = Self::meta_key(name);
517                let encoded = Self::encode(meta)?;
518                meta_table
519                    .insert(key.as_str(), encoded.as_slice())
520                    .map_err(Self::storage_error)?;
521            }
522            write_txn.commit().map_err(Self::storage_error)?;
523            Ok(())
524        })
525    }
526
527    pub fn remove_table_meta(&self, name: &str) -> Result<()> {
528        self.with_db(|db| {
529            let write_txn = db.begin_write().map_err(Self::storage_error)?;
530            {
531                let mut meta_table = write_txn
532                    .open_table(META_TABLE)
533                    .map_err(Self::storage_error)?;
534                let key = Self::meta_key(name);
535                meta_table
536                    .remove(key.as_str())
537                    .map_err(Self::storage_error)?;
538            }
539            write_txn.commit().map_err(Self::storage_error)?;
540            Ok(())
541        })
542    }
543
544    pub fn flush_config_value<T: serde::Serialize>(&self, key: &str, value: &T) -> Result<()> {
545        self.with_db(|db| {
546            let write_txn = db.begin_write().map_err(Self::storage_error)?;
547            {
548                let mut config_table = write_txn
549                    .open_table(CONFIG_TABLE)
550                    .map_err(Self::storage_error)?;
551                let encoded = Self::encode(value)?;
552                config_table
553                    .insert(key, encoded.as_slice())
554                    .map_err(Self::storage_error)?;
555            }
556            write_txn.commit().map_err(Self::storage_error)?;
557            Ok(())
558        })
559    }
560
561    pub fn remove_config_value(&self, key: &str) -> Result<()> {
562        self.with_db(|db| {
563            let write_txn = db.begin_write().map_err(Self::storage_error)?;
564            {
565                let mut config_table = write_txn
566                    .open_table(CONFIG_TABLE)
567                    .map_err(Self::storage_error)?;
568                config_table.remove(key).map_err(Self::storage_error)?;
569            }
570            write_txn.commit().map_err(Self::storage_error)?;
571            Ok(())
572        })
573    }
574
575    pub fn append_change_log(&self, lsn: Lsn, entries: &[ChangeLogEntry]) -> Result<()> {
576        if entries.is_empty() {
577            return Ok(());
578        }
579        self.with_db(|db| {
580            let write_txn = db.begin_write().map_err(Self::storage_error)?;
581            {
582                let mut table = write_txn
583                    .open_table(CHANGE_LOG_TABLE)
584                    .map_err(Self::storage_error)?;
585                for (index, entry) in entries.iter().enumerate() {
586                    let key = Self::change_log_key(lsn, index);
587                    let encoded = Self::encode(entry)?;
588                    table
589                        .insert(key.as_str(), encoded.as_slice())
590                        .map_err(Self::storage_error)?;
591                }
592            }
593            write_txn.commit().map_err(Self::storage_error)?;
594            Ok(())
595        })
596    }
597
598    pub fn append_ddl_log(&self, lsn: Lsn, change: &DdlChange) -> Result<()> {
599        self.with_db(|db| {
600            let write_txn = db.begin_write().map_err(Self::storage_error)?;
601            {
602                let mut table = write_txn
603                    .open_table(DDL_LOG_TABLE)
604                    .map_err(Self::storage_error)?;
605                let key = Self::ddl_log_key(lsn);
606                let encoded = Self::encode(change)?;
607                table
608                    .insert(key.as_str(), encoded.as_slice())
609                    .map_err(Self::storage_error)?;
610            }
611            write_txn.commit().map_err(Self::storage_error)?;
612            Ok(())
613        })
614    }
615
616    pub fn remove_table_data(&self, name: &str) -> Result<()> {
617        self.with_db(|db| {
618            let write_txn = db.begin_write().map_err(Self::storage_error)?;
619            let table_name = Self::rel_table_name(name);
620            let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
621            let _ = write_txn
622                .delete_table(table_def)
623                .map_err(Self::storage_error)?;
624            write_txn.commit().map_err(Self::storage_error)?;
625            Ok(())
626        })
627    }
628
629    pub fn rewrite_table_rows(&self, name: &str, rows: &[VersionedRow]) -> Result<()> {
630        let table_meta = self.load_all_table_meta()?;
631        self.with_db(|db| {
632            let write_txn = db.begin_write().map_err(Self::storage_error)?;
633            {
634                let table_name = Self::rel_table_name(name);
635                let table_def: TableDefinition<u64, &[u8]> =
636                    TableDefinition::new(table_name.as_str());
637                let _ = write_txn.delete_table(table_def);
638                let mut redb_table = write_txn
639                    .open_table(table_def)
640                    .map_err(Self::storage_error)?;
641                for row in rows {
642                    let encoded = Self::encode_versioned_row(row, table_meta.get(name))?;
643                    redb_table
644                        .insert(row.row_id.0, encoded.as_slice())
645                        .map_err(Self::storage_error)?;
646                }
647            }
648            write_txn.commit().map_err(Self::storage_error)?;
649            Ok(())
650        })
651    }
652
653    pub fn rewrite_vectors(&self, vectors: &[VectorEntry]) -> Result<()> {
654        let table_meta = self.load_all_table_meta()?;
655        let vector_quantization = Self::vector_quantization_map(&table_meta);
656        self.with_db(|db| {
657            let write_txn = db.begin_write().map_err(Self::storage_error)?;
658            let _ = write_txn.delete_table(VECTORS_TABLE);
659            {
660                let mut table = write_txn
661                    .open_table(VECTORS_TABLE)
662                    .map_err(Self::storage_error)?;
663                for entry in vectors {
664                    let quantization = vector_quantization
665                        .get(&entry.index)
666                        .copied()
667                        .unwrap_or_default();
668                    let encoded = Self::encode_vector_entry(entry, quantization)?;
669                    let key = Self::vector_key(entry);
670                    table
671                        .insert(key.as_slice(), encoded.as_slice())
672                        .map_err(Self::storage_error)?;
673                }
674            }
675            write_txn.commit().map_err(Self::storage_error)?;
676            Ok(())
677        })
678    }
679
680    pub fn rewrite_graph_edges(&self, edges: &[AdjEntry]) -> Result<()> {
681        self.with_db(|db| {
682            let write_txn = db.begin_write().map_err(Self::storage_error)?;
683            let _ = write_txn.delete_table(GRAPH_FWD_TABLE);
684            let _ = write_txn.delete_table(GRAPH_REV_TABLE);
685            {
686                let mut fwd_table = write_txn
687                    .open_table(GRAPH_FWD_TABLE)
688                    .map_err(Self::storage_error)?;
689                let mut rev_table = write_txn
690                    .open_table(GRAPH_REV_TABLE)
691                    .map_err(Self::storage_error)?;
692
693                for entry in edges {
694                    let encoded = Self::encode(entry)?;
695                    let fwd_key = Self::graph_fwd_key(entry);
696                    let rev_key = Self::graph_rev_key(entry);
697                    fwd_table
698                        .insert(fwd_key.as_slice(), encoded.as_slice())
699                        .map_err(Self::storage_error)?;
700                    rev_table
701                        .insert(rev_key.as_slice(), encoded.as_slice())
702                        .map_err(Self::storage_error)?;
703                }
704            }
705            write_txn.commit().map_err(Self::storage_error)?;
706            Ok(())
707        })
708    }
709
710    pub fn load_all_table_meta(&self) -> Result<HashMap<String, TableMeta>> {
711        self.with_db(|db| {
712            let read_txn = db.begin_read().map_err(Self::storage_error)?;
713            let meta_table = match read_txn.open_table(META_TABLE) {
714                Ok(table) => table,
715                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(HashMap::new()),
716                Err(err) => return Err(Self::storage_error(err)),
717            };
718
719            let mut tables = HashMap::new();
720            for entry in meta_table.iter().map_err(Self::storage_error)? {
721                let (key, value) = entry.map_err(Self::storage_error)?;
722                let key = key.value();
723                if let Some(name) = key.strip_prefix("table:") {
724                    tables.insert(name.to_string(), Self::decode(value.value())?);
725                }
726            }
727            Ok(tables)
728        })
729    }
730
731    pub fn load_config_value<T: serde::de::DeserializeOwned>(
732        &self,
733        key: &str,
734    ) -> Result<Option<T>> {
735        self.with_db(|db| {
736            let read_txn = db.begin_read().map_err(Self::storage_error)?;
737            let config_table = match read_txn.open_table(CONFIG_TABLE) {
738                Ok(table) => table,
739                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
740                Err(err) => return Err(Self::storage_error(err)),
741            };
742            let value = match config_table.get(key).map_err(Self::storage_error)? {
743                Some(value) => Some(Self::decode(value.value())?),
744                None => None,
745            };
746            Ok(value)
747        })
748    }
749
750    pub fn load_relational_table(&self, name: &str) -> Result<Vec<VersionedRow>> {
751        self.with_db(|db| {
752            let read_txn = db.begin_read().map_err(Self::storage_error)?;
753            let table_meta = Self::load_table_meta_in_read_txn(&read_txn, name)?;
754            let table_name = Self::rel_table_name(name);
755            let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
756            let table = match read_txn.open_table(table_def) {
757                Ok(table) => table,
758                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
759                Err(err) => return Err(Self::storage_error(err)),
760            };
761
762            let mut rows = Vec::new();
763            for entry in table.iter().map_err(Self::storage_error)? {
764                let (_, value) = entry.map_err(Self::storage_error)?;
765                rows.push(Self::decode_versioned_row(
766                    value.value(),
767                    table_meta.as_ref(),
768                )?);
769            }
770            Ok(rows)
771        })
772    }
773
774    pub fn load_all_tables(&self) -> Result<HashMap<String, Vec<VersionedRow>>> {
775        let mut all_tables = HashMap::new();
776        for name in self.load_all_table_meta()?.into_keys() {
777            let rows = self.load_relational_table(&name)?;
778            all_tables.insert(name, rows);
779        }
780        Ok(all_tables)
781    }
782
783    pub fn load_forward_edges(&self) -> Result<Vec<AdjEntry>> {
784        self.load_graph_table(GRAPH_FWD_TABLE)
785    }
786
787    pub fn load_reverse_edges(&self) -> Result<Vec<AdjEntry>> {
788        self.load_graph_table(GRAPH_REV_TABLE)
789    }
790
791    pub fn load_vectors(&self) -> Result<Vec<VectorEntry>> {
792        self.with_db(|db| {
793            let read_txn = db.begin_read().map_err(Self::storage_error)?;
794            let table = match read_txn.open_table(VECTORS_TABLE) {
795                Ok(table) => table,
796                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
797                Err(err) => return Err(Self::storage_error(err)),
798            };
799
800            let mut vectors = Vec::new();
801            for entry in table.iter().map_err(Self::storage_error)? {
802                let (_, value) = entry.map_err(Self::storage_error)?;
803                vectors.push(Self::decode_vector_entry(value.value())?);
804            }
805            Ok(vectors)
806        })
807    }
808
809    pub fn load_change_log(&self) -> Result<Vec<ChangeLogEntry>> {
810        self.with_db(|db| {
811            let read_txn = db.begin_read().map_err(Self::storage_error)?;
812            let table = match read_txn.open_table(CHANGE_LOG_TABLE) {
813                Ok(table) => table,
814                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
815                Err(err) => return Err(Self::storage_error(err)),
816            };
817
818            let mut entries = Vec::new();
819            for entry in table.iter().map_err(Self::storage_error)? {
820                let (_, value) = entry.map_err(Self::storage_error)?;
821                entries.push(Self::decode(value.value())?);
822            }
823            Ok(entries)
824        })
825    }
826
827    pub fn load_ddl_log(&self) -> Result<Vec<(Lsn, DdlChange)>> {
828        self.with_db(|db| {
829            let read_txn = db.begin_read().map_err(Self::storage_error)?;
830            let table = match read_txn.open_table(DDL_LOG_TABLE) {
831                Ok(table) => table,
832                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
833                Err(err) => return Err(Self::storage_error(err)),
834            };
835
836            let mut entries = Vec::new();
837            for entry in table.iter().map_err(Self::storage_error)? {
838                let (key, value) = entry.map_err(Self::storage_error)?;
839                let lsn = key
840                    .value()
841                    .parse::<u64>()
842                    .map_err(|err| Error::Other(format!("invalid ddl log key: {err}")))?;
843                entries.push((Lsn(lsn), Self::decode(value.value())?));
844            }
845            Ok(entries)
846        })
847    }
848
849    fn load_graph_table(&self, definition: TableDefinition<&[u8], &[u8]>) -> Result<Vec<AdjEntry>> {
850        self.with_db(|db| {
851            let read_txn = db.begin_read().map_err(Self::storage_error)?;
852            let table = match read_txn.open_table(definition) {
853                Ok(table) => table,
854                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
855                Err(err) => return Err(Self::storage_error(err)),
856            };
857
858            let mut entries = Vec::new();
859            for entry in table.iter().map_err(Self::storage_error)? {
860                let (_, value) = entry.map_err(Self::storage_error)?;
861                entries.push(Self::decode(value.value())?);
862            }
863            Ok(entries)
864        })
865    }
866
867    fn load_table_meta_in_read_txn(
868        read_txn: &redb::ReadTransaction,
869        name: &str,
870    ) -> Result<Option<TableMeta>> {
871        let meta_table = match read_txn.open_table(META_TABLE) {
872            Ok(table) => table,
873            Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
874            Err(err) => return Err(Self::storage_error(err)),
875        };
876        let key = Self::meta_key(name);
877        meta_table
878            .get(key.as_str())
879            .map_err(Self::storage_error)?
880            .map(|value| Self::decode(value.value()))
881            .transpose()
882    }
883
884    fn vector_quantization_map(
885        table_meta: &HashMap<String, TableMeta>,
886    ) -> HashMap<VectorIndexRef, VectorQuantization> {
887        let mut indexes = HashMap::new();
888        for (table, meta) in table_meta {
889            for column in &meta.columns {
890                if matches!(column.column_type, ColumnType::Vector(_)) {
891                    indexes.insert(
892                        VectorIndexRef::new(table.clone(), column.name.clone()),
893                        column.quantization,
894                    );
895                }
896            }
897        }
898        indexes
899    }
900
901    fn column_quantization(meta: Option<&TableMeta>, column_name: &str) -> VectorQuantization {
902        meta.and_then(|meta| {
903            meta.columns
904                .iter()
905                .find(|column| {
906                    column.name == column_name
907                        && matches!(column.column_type, ColumnType::Vector(_))
908                })
909                .map(|column| column.quantization)
910        })
911        .unwrap_or_default()
912    }
913
914    fn encode_versioned_row(row: &VersionedRow, meta: Option<&TableMeta>) -> Result<Vec<u8>> {
915        let values = row
916            .values
917            .iter()
918            .map(|(column, value)| {
919                let persisted = match value {
920                    Value::Vector(vector) => {
921                        let quantization = Self::column_quantization(meta, column);
922                        if matches!(quantization, VectorQuantization::F32) {
923                            PersistedValue::Vector(PersistedVector::from_f32(vector, quantization))
924                        } else {
925                            PersistedValue::Plain(Value::Null)
926                        }
927                    }
928                    _ => PersistedValue::Plain(value.clone()),
929                };
930                (column.clone(), persisted)
931            })
932            .collect::<HashMap<_, _>>();
933        Self::encode(&PersistedVersionedRow {
934            row_id: row.row_id,
935            values,
936            created_tx: row.created_tx,
937            deleted_tx: row.deleted_tx,
938            lsn: row.lsn,
939            created_at: row.created_at,
940        })
941    }
942
943    fn decode_versioned_row(bytes: &[u8], _meta: Option<&TableMeta>) -> Result<VersionedRow> {
944        let persisted: PersistedVersionedRow = Self::decode(bytes)?;
945        let values = persisted
946            .values
947            .into_iter()
948            .map(|(column, value)| {
949                let value = match value {
950                    PersistedValue::Plain(value) => value,
951                    PersistedValue::Vector(vector) => Value::Vector(vector.to_f32()),
952                };
953                (column, value)
954            })
955            .collect::<HashMap<_, _>>();
956        Ok(VersionedRow {
957            row_id: persisted.row_id,
958            values,
959            created_tx: persisted.created_tx,
960            deleted_tx: persisted.deleted_tx,
961            lsn: persisted.lsn,
962            created_at: persisted.created_at,
963        })
964    }
965
966    fn encode_vector_entry(
967        entry: &VectorEntry,
968        quantization: VectorQuantization,
969    ) -> Result<Vec<u8>> {
970        Self::encode(&PersistedVectorEntry {
971            index: entry.index.clone(),
972            row_id: entry.row_id,
973            vector: PersistedVector::from_f32(&entry.vector, quantization),
974            created_tx: entry.created_tx,
975            deleted_tx: entry.deleted_tx,
976            lsn: entry.lsn,
977        })
978    }
979
980    fn decode_vector_entry(bytes: &[u8]) -> Result<VectorEntry> {
981        let persisted: PersistedVectorEntry = Self::decode(bytes)?;
982        Ok(VectorEntry {
983            index: persisted.index,
984            row_id: persisted.row_id,
985            vector: persisted.vector.to_f32(),
986            created_tx: persisted.created_tx,
987            deleted_tx: persisted.deleted_tx,
988            lsn: persisted.lsn,
989        })
990    }
991
992    fn rel_table_name(name: &str) -> String {
993        format!("rel_{name}")
994    }
995
996    fn meta_key(name: &str) -> String {
997        format!("table:{name}")
998    }
999
1000    fn change_log_key(lsn: Lsn, index: usize) -> String {
1001        format!("{:020}:{index:06}", lsn.0)
1002    }
1003
1004    fn ddl_log_key(lsn: Lsn) -> String {
1005        format!("{:020}", lsn.0)
1006    }
1007
1008    fn graph_fwd_key(entry: &AdjEntry) -> Vec<u8> {
1009        Self::graph_fwd_key_parts(&entry.source, &entry.target, &entry.edge_type)
1010    }
1011
1012    fn graph_rev_key(entry: &AdjEntry) -> Vec<u8> {
1013        Self::graph_rev_key_parts(&entry.source, &entry.target, &entry.edge_type)
1014    }
1015
1016    fn vector_key(entry: &VectorEntry) -> Vec<u8> {
1017        Self::vector_key_parts(&entry.index, entry.row_id)
1018    }
1019
1020    fn vector_key_parts(index: &VectorIndexRef, row_id: contextdb_core::RowId) -> Vec<u8> {
1021        let mut key = Vec::with_capacity(index.table.len() + index.column.len() + 18);
1022        key.extend_from_slice(index.table.as_bytes());
1023        key.push(0);
1024        key.extend_from_slice(index.column.as_bytes());
1025        key.push(0);
1026        key.extend_from_slice(&row_id.0.to_be_bytes());
1027        key
1028    }
1029
1030    fn graph_fwd_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
1031        let mut key = Vec::with_capacity(32 + edge_type.len());
1032        key.extend_from_slice(source.as_bytes());
1033        key.extend_from_slice(target.as_bytes());
1034        key.extend_from_slice(edge_type.as_bytes());
1035        key
1036    }
1037
1038    fn graph_rev_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
1039        let mut key = Vec::with_capacity(32 + edge_type.len());
1040        key.extend_from_slice(target.as_bytes());
1041        key.extend_from_slice(source.as_bytes());
1042        key.extend_from_slice(edge_type.as_bytes());
1043        key
1044    }
1045
1046    fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>> {
1047        bincode::serde::encode_to_vec(value, bincode::config::standard())
1048            .map_err(|err| Error::Other(format!("bincode encode error: {err}")))
1049    }
1050
1051    fn decode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
1052        let (value, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())
1053            .map_err(|err| Error::Other(format!("bincode decode error: {err}")))?;
1054        Ok(value)
1055    }
1056
1057    fn storage_error(err: impl std::fmt::Display) -> Error {
1058        let msg = err.to_string();
1059        if msg.contains("lock") || msg.contains("already open") {
1060            Error::Other(format!(
1061                "database is locked (another process may have it open): {msg}"
1062            ))
1063        } else {
1064            Error::Other(format!("redb error: {msg}"))
1065        }
1066    }
1067
1068    fn with_db<T>(&self, f: impl FnOnce(&redb::Database) -> Result<T>) -> Result<T> {
1069        let db = redb::Database::open(&self.path).map_err(Self::storage_error)?;
1070        f(&db)
1071    }
1072}
1073
1074impl Drop for RedbPersistence {
1075    fn drop(&mut self) {
1076        Self::release_pid_lock(&self.path);
1077    }
1078}