Skip to main content

graphrefly_storage/
redb.rs

1//! redb-backed kv backend (M4.D — DS-14-storage Audit 4, ACID transactions).
2//!
3//! [`RedbBackend`] wraps a [`redb::Database`] as a [`StorageBackend`]. Each
4//! `write()` / `delete()` opens its own write transaction and commits — ACID
5//! guarantees per call. `read()` / `list()` use read transactions (concurrent
6//! with any write). `flush()` is a no-op (writes are durable on commit).
7//! Space reclamation is managed internally by redb; `flush()` is a no-op
8//! since each `write()` commits its own transaction.
9//!
10//! All keys live in a single `"graphrefly"` table (D162 — tiers already
11//! namespace via key prefixes; a single table matches the flat-kv model of
12//! [`MemoryBackend`] / [`FileBackend`]).
13//!
14//! # Thread safety
15//!
16//! `redb::Database` is `Send + Sync`. `begin_write()` serializes concurrent
17//! writers by design (MVCC — one writer at a time, concurrent readers). No
18//! additional synchronization needed on `RedbBackend`. This structurally
19//! closes the F3 deferred concern (concurrent flush race on append-log) —
20//! see `porting-deferred.md`.
21//!
22//! # Crash safety
23//!
24//! redb is crash-safe by default — committed transactions survive process
25//! crashes. Unlike [`FileBackend`]'s per-file atomic-rename, redb provides
26//! cross-key atomicity within a single transaction. The per-write-call
27//! transaction granularity (D163) means each `StorageBackend::write()` is
28//! individually atomic; cross-key atomicity (e.g., flushing multiple
29//! append-log buckets) would require holding one transaction across multiple
30//! calls, which isn't supported by the current `StorageBackend` trait.
31//! Cross-key atomicity at the tier level is a M4.E concern
32//! (`Graph::attach_storage` batched flush).
33//!
34//! Cargo feature: gated behind `redb-store` (default-on).
35
36use std::path::Path;
37use std::sync::Arc;
38
39use redb::{Database, ReadableTable, TableDefinition};
40
41use crate::backend::StorageBackend;
42use crate::codec::{Codec, JsonCodec};
43use crate::error::StorageError;
44use crate::memory::{
45    append_log_storage, kv_storage, snapshot_storage, AppendLogStorage, AppendLogStorageOptions,
46    KvStorage, KvStorageOptions, SnapshotStorage, SnapshotStorageOptions,
47};
48
49use serde::{de::DeserializeOwned, Serialize};
50
51/// Single table for all keys (D162). Tiers namespace via key prefixes
52/// (e.g. `"graph/wal/00000000000000000001"`, `"snapshot:my-graph"`).
53const TABLE: TableDefinition<'_, &str, &[u8]> = TableDefinition::new("graphrefly");
54
55/// Map redb errors to [`StorageError::BackendError`].
56fn map_err(e: impl std::error::Error + Send + Sync + 'static) -> StorageError {
57    StorageError::BackendError {
58        message: e.to_string(),
59        source: Some(Box::new(e)),
60    }
61}
62
63/// redb-backed [`StorageBackend`].
64///
65/// One [`redb::Database`] file per backend instance. All keys share a single
66/// `"graphrefly"` table. Per-write-call ACID transactions (D163).
67///
68/// # Example
69///
70/// ```ignore
71/// use std::sync::Arc;
72/// use graphrefly_storage::{redb_backend, snapshot_storage, SnapshotStorageOptions};
73///
74/// let backend = redb_backend("./checkpoints.redb");
75/// let tier = snapshot_storage(backend, SnapshotStorageOptions::<MyState, _>::default());
76/// tier.save(state).unwrap();
77/// ```
78pub struct RedbBackend {
79    db: Database,
80    name: String,
81}
82
83// Manual Debug because redb::Database doesn't impl Debug.
84impl std::fmt::Debug for RedbBackend {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("RedbBackend")
87            .field("name", &self.name)
88            .finish_non_exhaustive()
89    }
90}
91
92impl RedbBackend {
93    /// Open or create a redb database at `path`.
94    ///
95    /// # Errors
96    ///
97    /// Returns `StorageError::BackendError` if the database file can't be
98    /// opened or created (permissions, corrupt file, etc.).
99    pub fn new(path: impl AsRef<Path>) -> Result<Self, StorageError> {
100        let path = path.as_ref();
101        let db = Database::create(path).map_err(map_err)?;
102        let name = format!("redb:{}", path.display());
103        Ok(Self { db, name })
104    }
105
106    /// Diagnostic: the path-derived name (e.g. `"redb:./checkpoints.redb"`).
107    #[must_use]
108    pub fn name_str(&self) -> &str {
109        &self.name
110    }
111}
112
113impl StorageBackend for RedbBackend {
114    fn name(&self) -> &str {
115        &self.name
116    }
117
118    fn read(&self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
119        let txn = self.db.begin_read().map_err(map_err)?;
120        let table = match txn.open_table(TABLE) {
121            Ok(t) => t,
122            // Table doesn't exist yet → empty database, no keys.
123            Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
124            Err(e) => return Err(map_err(e)),
125        };
126        match table.get(key).map_err(map_err)? {
127            Some(guard) => Ok(Some(guard.value().to_vec())),
128            None => Ok(None),
129        }
130    }
131
132    fn write(&self, key: &str, bytes: &[u8]) -> Result<(), StorageError> {
133        let txn = self.db.begin_write().map_err(map_err)?;
134        {
135            let mut table = txn.open_table(TABLE).map_err(map_err)?;
136            table.insert(key, bytes).map_err(map_err)?;
137        }
138        txn.commit().map_err(map_err)?;
139        Ok(())
140    }
141
142    fn delete(&self, key: &str) -> Result<(), StorageError> {
143        let txn = self.db.begin_write().map_err(map_err)?;
144        {
145            let mut table = match txn.open_table(TABLE) {
146                Ok(t) => t,
147                // Table doesn't exist → nothing to delete.
148                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(()),
149                Err(e) => return Err(map_err(e)),
150            };
151            // remove() returns the old value (or None); we discard it.
152            let _ = table.remove(key).map_err(map_err)?;
153        }
154        txn.commit().map_err(map_err)?;
155        Ok(())
156    }
157
158    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
159        let txn = self.db.begin_read().map_err(map_err)?;
160        let table = match txn.open_table(TABLE) {
161            Ok(t) => t,
162            Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
163            Err(e) => return Err(map_err(e)),
164        };
165
166        let mut keys = Vec::new();
167        if prefix.is_empty() {
168            // All keys.
169            let iter = table.iter().map_err(map_err)?;
170            for entry in iter {
171                let entry = entry.map_err(map_err)?;
172                keys.push(entry.0.value().to_string());
173            }
174        } else {
175            // Range scan: redb's B-tree stores keys in lex-ASC order.
176            // We start from `prefix` and take while the key starts with it.
177            let iter = table.range(prefix..).map_err(map_err)?;
178            for entry in iter {
179                let entry = entry.map_err(map_err)?;
180                let k = entry.0.value();
181                if !k.starts_with(prefix) {
182                    break;
183                }
184                keys.push(k.to_string());
185            }
186        }
187        Ok(keys)
188    }
189
190    fn flush(&self) -> Result<(), StorageError> {
191        // No-op: each write() commits its own transaction (D163).
192        Ok(())
193    }
194}
195
196// ── Factory ──────────────────────────────────────────────────────────────
197
198/// Open or create a redb database at `path`, returning an `Arc<RedbBackend>`.
199///
200/// Use this when sharing a single backend across multiple tiers (the paired
201/// `{ snapshot, wal }` pattern from DS-14-storage §a).
202///
203/// # Errors
204///
205/// Returns `StorageError::BackendError` on filesystem / permission / corrupt
206/// file errors.
207pub fn redb_backend(path: impl AsRef<Path>) -> Result<Arc<RedbBackend>, StorageError> {
208    Ok(Arc::new(RedbBackend::new(path)?))
209}
210
211// ── Convenience tier wrappers ────────────────────────────────────────────
212
213/// Snapshot tier over a redb backend at `path`.
214///
215/// # Errors
216///
217/// Returns `StorageError::BackendError` if the database can't be opened.
218///
219/// # Panics
220///
221/// Panics if `opts.compact_every == Some(0)`.
222pub fn redb_snapshot<T, C>(
223    path: impl AsRef<Path>,
224    opts: SnapshotStorageOptions<T, C>,
225) -> Result<SnapshotStorage<RedbBackend, T, C>, StorageError>
226where
227    T: Send + Sync + 'static,
228    C: Codec<T>,
229{
230    Ok(snapshot_storage(redb_backend(path)?, opts))
231}
232
233/// Snapshot tier over a redb backend with default options (`JsonCodec`).
234///
235/// # Errors
236///
237/// Returns `StorageError::BackendError` if the database can't be opened.
238pub fn redb_snapshot_default<T>(
239    path: impl AsRef<Path>,
240) -> Result<SnapshotStorage<RedbBackend, T, JsonCodec>, StorageError>
241where
242    T: Serialize + DeserializeOwned + Send + Sync + 'static,
243{
244    redb_snapshot(path, SnapshotStorageOptions::default())
245}
246
247/// Append-log tier over a redb backend at `path`.
248///
249/// # Errors
250///
251/// Returns `StorageError::BackendError` if the database can't be opened.
252///
253/// # Panics
254///
255/// Panics if `opts.compact_every == Some(0)`.
256pub fn redb_append_log<T, C>(
257    path: impl AsRef<Path>,
258    opts: AppendLogStorageOptions<T, C>,
259) -> Result<AppendLogStorage<RedbBackend, T, C>, StorageError>
260where
261    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
262    C: Codec<Vec<T>>,
263{
264    Ok(append_log_storage(redb_backend(path)?, opts))
265}
266
267/// Append-log tier over a redb backend with default options (`JsonCodec`).
268///
269/// # Errors
270///
271/// Returns `StorageError::BackendError` if the database can't be opened.
272pub fn redb_append_log_default<T>(
273    path: impl AsRef<Path>,
274) -> Result<AppendLogStorage<RedbBackend, T, JsonCodec>, StorageError>
275where
276    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
277{
278    redb_append_log(path, AppendLogStorageOptions::default())
279}
280
281/// KV tier over a redb backend at `path`.
282///
283/// # Errors
284///
285/// Returns `StorageError::BackendError` if the database can't be opened.
286///
287/// # Panics
288///
289/// Panics if `opts.compact_every == Some(0)`.
290pub fn redb_kv<T, C>(
291    path: impl AsRef<Path>,
292    opts: KvStorageOptions<T, C>,
293) -> Result<KvStorage<RedbBackend, T, C>, StorageError>
294where
295    T: Send + Sync + 'static,
296    C: Codec<T>,
297{
298    Ok(kv_storage(redb_backend(path)?, opts))
299}
300
301/// KV tier over a redb backend with default options (`JsonCodec`).
302///
303/// # Errors
304///
305/// Returns `StorageError::BackendError` if the database can't be opened.
306pub fn redb_kv_default<T>(
307    path: impl AsRef<Path>,
308) -> Result<KvStorage<RedbBackend, T, JsonCodec>, StorageError>
309where
310    T: Serialize + DeserializeOwned + Send + Sync + 'static,
311{
312    redb_kv(path, KvStorageOptions::default())
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use tempfile::TempDir;
319
320    fn tmp_db() -> (TempDir, Arc<RedbBackend>) {
321        let dir = TempDir::new().unwrap();
322        let path = dir.path().join("test.redb");
323        let backend = redb_backend(&path).unwrap();
324        (dir, backend)
325    }
326
327    #[test]
328    fn read_write_round_trip() {
329        let (_dir, b) = tmp_db();
330        b.write("k1", b"hello").unwrap();
331        assert_eq!(b.read("k1").unwrap(), Some(b"hello".to_vec()));
332    }
333
334    #[test]
335    fn read_miss_returns_none() {
336        let (_dir, b) = tmp_db();
337        assert!(b.read("nope").unwrap().is_none());
338    }
339
340    #[test]
341    fn read_from_empty_database_returns_none() {
342        let (_dir, b) = tmp_db();
343        // No writes at all — table doesn't exist yet.
344        assert!(b.read("anything").unwrap().is_none());
345    }
346
347    #[test]
348    fn delete_removes_key() {
349        let (_dir, b) = tmp_db();
350        b.write("k", b"v").unwrap();
351        b.delete("k").unwrap();
352        assert!(b.read("k").unwrap().is_none());
353    }
354
355    #[test]
356    fn delete_nonexistent_key_is_ok() {
357        let (_dir, b) = tmp_db();
358        // No table exists yet.
359        b.delete("nope").unwrap();
360        // Table exists but key doesn't.
361        b.write("other", b"v").unwrap();
362        b.delete("nope").unwrap();
363    }
364
365    #[test]
366    fn list_lex_asc() {
367        let (_dir, b) = tmp_db();
368        b.write("g/10", b"a").unwrap();
369        b.write("g/02", b"b").unwrap();
370        b.write("g/01", b"c").unwrap();
371        b.write("other", b"d").unwrap();
372        let keys = b.list("g/").unwrap();
373        assert_eq!(keys, vec!["g/01", "g/02", "g/10"]);
374    }
375
376    #[test]
377    fn list_empty_prefix_returns_all_sorted() {
378        let (_dir, b) = tmp_db();
379        b.write("b", b"y").unwrap();
380        b.write("a", b"x").unwrap();
381        let keys = b.list("").unwrap();
382        assert_eq!(keys, vec!["a", "b"]);
383    }
384
385    #[test]
386    fn list_empty_database_returns_empty() {
387        let (_dir, b) = tmp_db();
388        let keys = b.list("g/").unwrap();
389        assert!(keys.is_empty());
390    }
391
392    #[test]
393    fn write_overwrites_existing_key() {
394        let (_dir, b) = tmp_db();
395        b.write("k", b"v1").unwrap();
396        b.write("k", b"v2").unwrap();
397        assert_eq!(b.read("k").unwrap(), Some(b"v2".to_vec()));
398    }
399
400    #[test]
401    fn name_includes_path() {
402        let dir = TempDir::new().unwrap();
403        let path = dir.path().join("test.redb");
404        let b = RedbBackend::new(&path).unwrap();
405        assert!(b.name().starts_with("redb:"));
406        assert!(b.name().contains("test.redb"));
407    }
408
409    #[test]
410    fn flush_is_noop() {
411        let (_dir, b) = tmp_db();
412        b.write("k", b"v").unwrap();
413        b.flush().unwrap();
414        assert_eq!(b.read("k").unwrap(), Some(b"v".to_vec()));
415    }
416
417    #[test]
418    fn data_survives_reopen() {
419        let dir = TempDir::new().unwrap();
420        let path = dir.path().join("persist.redb");
421        {
422            let b = RedbBackend::new(&path).unwrap();
423            b.write("k1", b"durable").unwrap();
424        }
425        // Reopen same file.
426        let b2 = RedbBackend::new(&path).unwrap();
427        assert_eq!(b2.read("k1").unwrap(), Some(b"durable".to_vec()));
428    }
429
430    #[test]
431    fn concurrent_readers_and_writer() {
432        let (_dir, b) = tmp_db();
433        b.write("k", b"v1").unwrap();
434
435        // Read should work while we do writes (MVCC).
436        let v = b.read("k").unwrap();
437        assert_eq!(v, Some(b"v1".to_vec()));
438        b.write("k", b"v2").unwrap();
439        let v2 = b.read("k").unwrap();
440        assert_eq!(v2, Some(b"v2".to_vec()));
441    }
442
443    #[test]
444    fn arc_factory_shares_backend() {
445        let dir = TempDir::new().unwrap();
446        let path = dir.path().join("shared.redb");
447        let b1 = redb_backend(&path).unwrap();
448        // Clone the Arc — same underlying Database.
449        let b2 = Arc::clone(&b1);
450        b1.write("k", b"from_b1").unwrap();
451        assert_eq!(b2.read("k").unwrap(), Some(b"from_b1".to_vec()));
452    }
453}