Skip to main content

mnem_backend_redb/
lib.rs

1//! # mnem-backend-redb
2//!
3//! Production [`Blockstore`] + [`OpHeadsStore`] backed by
4//! [`redb`](https://github.com/cberner/redb) - a pure-Rust embedded
5//! ACID key-value store.
6//!
7//! A single `.redb` file holds two tables:
8//!
9//! - `objects`  - `CID bytes → object bytes`. Every Node, Edge, Tree
10//!   chunk, Commit, View, Operation is a row.
11//! - `op_heads` - set-of-CIDs modelled as `CID bytes → ()` with
12//!   presence-as-truth. The current op-heads set is the key-set of
13//!   this table.
14//!
15//! Writes are atomic per-call: each `put` / `delete` / `update` opens a
16//! write transaction, mutates, and commits (which fsyncs). No
17//! cross-call batching yet; a future refinement can expose a "batch
18//! mode" that holds a transaction open across multiple puts to reduce
19//! fsync overhead on the `mnem_core::repo::Transaction::commit` hot
20//! path.
21//!
22//! ## Concurrency
23//!
24//! redb: single-writer, many-reader per database file. Within a
25//! process, `Arc<Database>` is safe to share across threads - redb
26//! serialises writers internally. Across processes, redb's
27//! filesystem-locking protects the file format; concurrent write
28//! transactions from two processes will serialise or one will fail.
29//!
30//! ## Durability
31//!
32//! `tx.commit()` in redb calls `fsync` on the database file. The Simple
33//! backend's atomic-rename story is effectively what redb does
34//! internally for each commit - one transaction = one durable
35//! state transition.
36//!
37//! ## Backend choice notes
38//!
39//! redb is the production embedded backend for exactly these reasons:
40//! pure Rust, small dep tree, mmap reads, ACID writes, single-file
41//! persistence. Note redb majors break on-disk format (1 → 2 → 3 → 4).
42//! We pin the major in the crate's Cargo dependency and document the
43//! migration path when we bump.
44
45#![forbid(unsafe_code)]
46#![deny(missing_docs)]
47
48pub mod blockstore;
49pub mod knn_edges_store;
50pub mod op_heads;
51
52use std::path::{Path, PathBuf};
53use std::sync::Arc;
54
55use mnem_core::error::{Error, StoreError};
56use mnem_core::store::{Blockstore, OpHeadsStore};
57use redb::{Database, TableDefinition};
58
59pub use blockstore::RedbBlockstore;
60pub use knn_edges_store::{KNN_EDGES_TABLE, load_knn_edges, store_knn_edges};
61pub use op_heads::RedbOpHeadsStore;
62
63/// Objects table: CID bytes → object bytes.
64pub(crate) const OBJECTS_TABLE: TableDefinition<'_, &[u8], &[u8]> =
65    TableDefinition::new("mnem_objects");
66
67/// Op-heads table: CID bytes → unit (presence is the truth).
68pub(crate) const OP_HEADS_TABLE: TableDefinition<'_, &[u8], ()> =
69    TableDefinition::new("mnem_op_heads");
70
71/// Map a redb error into an [`StoreError`] that `mnem-core` can consume.
72pub(crate) fn redb_err<E: std::fmt::Display>(e: E) -> StoreError {
73    StoreError::Io(format!("redb: {e}"))
74}
75
76/// Open or create the redb database at `path` and return wired-up
77/// blockstore + op-heads stores.
78///
79/// The database file is created if absent; parent directories are
80/// created if needed. Tables are declared on the first write so
81/// subsequent opens see a consistent schema.
82///
83/// # Errors
84///
85/// Returns filesystem / redb errors on failure.
86pub fn open_or_init(
87    path: impl AsRef<Path>,
88) -> Result<(Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>, PathBuf), Error> {
89    let path = path.as_ref().to_owned();
90    if let Some(parent) = path.parent()
91        && !parent.as_os_str().is_empty()
92    {
93        std::fs::create_dir_all(parent)
94            .map_err(|e| StoreError::Io(format!("create parent dir: {e}")))?;
95    }
96
97    let db = Database::create(&path).map_err(redb_err)?;
98    // Force-create the tables so a fresh reopen sees them.
99    let tx = db.begin_write().map_err(redb_err)?;
100    {
101        let _ = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
102        let _ = tx.open_table(OP_HEADS_TABLE).map_err(redb_err)?;
103    }
104    tx.commit().map_err(redb_err)?;
105
106    let db = Arc::new(db);
107    let bs: Arc<dyn Blockstore> = Arc::new(RedbBlockstore::new(db.clone()));
108    let ohs: Arc<dyn OpHeadsStore> = Arc::new(RedbOpHeadsStore::new(db));
109    Ok((bs, ohs, path))
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use mnem_core::repo::ReadonlyRepo;
116    use std::sync::atomic::{AtomicU64, Ordering};
117
118    static COUNTER: AtomicU64 = AtomicU64::new(0);
119    fn tmp_file(name: &str) -> PathBuf {
120        let path = std::env::temp_dir().join(format!(
121            "mnem-redb-{name}-{}-{}.redb",
122            std::process::id(),
123            COUNTER.fetch_add(1, Ordering::Relaxed)
124        ));
125        let _ = std::fs::remove_file(&path);
126        path
127    }
128
129    #[test]
130    fn init_creates_file() {
131        let p = tmp_file("init");
132        let (_, _, file) = open_or_init(&p).unwrap();
133        assert!(file.exists());
134    }
135
136    #[test]
137    fn init_is_idempotent() {
138        let p = tmp_file("idem");
139        let _ = open_or_init(&p).unwrap();
140        let _ = open_or_init(&p).unwrap();
141        let _ = open_or_init(&p).unwrap();
142    }
143
144    #[test]
145    fn full_repo_persists_across_reopens() {
146        let p = tmp_file("persist");
147        let op_at_close = {
148            let (bs, ohs, _) = open_or_init(&p).unwrap();
149            let repo = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
150            let mut tx = repo.start_transaction();
151            let alice = mnem_core::objects::Node::new(mnem_core::id::NodeId::new_v7(), "Person");
152            let alice_id = alice.id;
153            tx.add_node(&alice).unwrap();
154            let r1 = tx.commit("a@example.org", "add Alice").unwrap();
155            assert!(r1.lookup_node(&alice_id).unwrap().is_some());
156            r1.op_id().clone()
157        };
158
159        // All handles dropped. Reopen.
160        {
161            let (bs, ohs, _) = open_or_init(&p).unwrap();
162            let repo = ReadonlyRepo::open(bs, ohs).unwrap();
163            assert_eq!(*repo.op_id(), op_at_close);
164        }
165    }
166}