Skip to main content

nookdb_core/
storage.rs

1//! Read and write transaction handles wrapping redb transactions.
2
3use redb::{ReadableTable, Table, TableDefinition};
4
5use crate::codec::{
6    collection_prefix_lower, collection_prefix_upper, encode_key, strip_collection_prefix,
7};
8use crate::database::{map_redb_storage_error, map_redb_table_error};
9use crate::error::NookError;
10use crate::notify::{ChangeOp, DocChange};
11
12/// All entries live in this single redb table. Collection routing is
13/// done via composite keys (see `crate::codec`).
14pub(crate) const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
15
16/// One (key, value) pair from a `list_collection` scan. Both byte vectors.
17pub type Entry = (Vec<u8>, Vec<u8>);
18
19/// Handle exposed to read transaction callbacks. MVCC snapshot.
20///
21/// No lifetime parameter: `redb::ReadOnlyTable` is `Arc<TransactionGuard>`-backed
22/// and fully owns its data. The read-side table does not borrow the `ReadTransaction`.
23pub struct ReadTx {
24    /// `None` when the "entries" table has never been written to (fresh db).
25    table: Option<redb::ReadOnlyTable<&'static [u8], &'static [u8]>>,
26    /// `None` when the index table has never been written to (no index
27    /// maintenance has occurred yet). Opened from the same MVCC snapshot
28    /// as `table`; like `table`, the `Arc`-backed `ReadOnlyTable` fully
29    /// owns its data and does not borrow the `ReadTransaction`.
30    index_table: Option<redb::ReadOnlyTable<&'static [u8], &'static [u8]>>,
31}
32
33impl ReadTx {
34    pub(crate) fn new(txn: &redb::ReadTransaction) -> Result<Self, NookError> {
35        let table = match txn.open_table(ENTRIES) {
36            Ok(table) => Some(table),
37            Err(redb::TableError::TableDoesNotExist(_)) => None,
38            Err(e) => return Err(map_redb_table_error(e)),
39        };
40        let index_table = match txn.open_table(crate::index::engine::INDEX_ENTRIES) {
41            Ok(t) => Some(t),
42            Err(redb::TableError::TableDoesNotExist(_)) => None,
43            Err(e) => return Err(map_redb_table_error(e)),
44        };
45        Ok(Self { table, index_table })
46    }
47
48    /// Returns the value stored under `(collection, key)`, or `None`.
49    ///
50    /// # Errors
51    ///
52    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
53    /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
54    /// on underlying storage failure.
55    pub fn get(&self, collection: &str, key: &[u8]) -> Result<Option<Vec<u8>>, NookError> {
56        let Some(ref table) = self.table else {
57            // Validate the collection name BEFORE the empty-table early return, so a bad collection still yields InvalidArg (not Ok(None)/Ok(empty)) even on a fresh, never-written db.
58            encode_key(collection, key)?;
59            return Ok(None);
60        };
61        let composite = encode_key(collection, key)?;
62        let guard = table
63            .get(composite.as_slice())
64            .map_err(map_redb_storage_error)?;
65        Ok(guard.map(|v| v.value().to_vec()))
66    }
67
68    /// Returns all `(key, value)` pairs in the named collection,
69    /// in lexicographic key order. May be empty.
70    ///
71    /// # Errors
72    ///
73    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
74    /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
75    /// on underlying storage failure.
76    pub fn list_collection(&self, collection: &str) -> Result<Vec<Entry>, NookError> {
77        let Some(ref table) = self.table else {
78            // Validate the collection name BEFORE the empty-table early return, so a bad collection still yields InvalidArg (not Ok(None)/Ok(empty)) even on a fresh, never-written db.
79            collection_prefix_lower(collection)?;
80            return Ok(vec![]);
81        };
82        let lower = collection_prefix_lower(collection)?;
83        let upper = collection_prefix_upper(collection)?;
84        let iter = table
85            .range::<&[u8]>(lower.as_slice()..upper.as_slice())
86            .map_err(map_redb_storage_error)?;
87        let mut out = Vec::new();
88        for entry in iter {
89            let (k, v) = entry.map_err(map_redb_storage_error)?;
90            let composite: &[u8] = k.value();
91            let user_key = strip_collection_prefix(composite, collection)
92                .ok_or_else(|| NookError::Corruption {
93                    msg: format!(
94                        "composite key missing expected prefix for collection {collection:?}"
95                    ),
96                })?
97                .to_vec();
98            out.push((user_key, v.value().to_vec()));
99        }
100        Ok(out)
101    }
102
103    /// Returns every `(composite_key, value)` pair across the entire
104    /// `entries` table, in key order. Used by `crate::backup::write_backup`
105    /// to stream a full DB snapshot. The composite key encoding is opaque
106    /// here — the backup format does not decompose it (restore writes the
107    /// same composite keys back).
108    ///
109    /// # Errors
110    ///
111    /// Returns `NookError::Storage` or `NookError::Corruption` on
112    /// underlying storage failure.
113    pub fn list_entries_raw(&self) -> Result<Vec<Entry>, NookError> {
114        let Some(ref table) = self.table else {
115            return Ok(vec![]);
116        };
117        let iter = table.iter().map_err(map_redb_storage_error)?;
118        let mut out = Vec::new();
119        for entry in iter {
120            let (k, v) = entry.map_err(map_redb_storage_error)?;
121            out.push((k.value().to_vec(), v.value().to_vec()));
122        }
123        Ok(out)
124    }
125
126    /// Returns every value stored under index keys in the half-open
127    /// range `[lo, hi)` of the secondary-index table, in key order.
128    ///
129    /// Returns an empty vector when the index table has never been
130    /// written (no index maintenance has occurred yet). Used by the
131    /// index engine for equality lookups; collection/key routing is
132    /// encoded into `lo`/`hi` by `crate::index::engine`.
133    ///
134    /// # Errors
135    ///
136    /// Returns `NookError::Storage` or `NookError::Corruption` on
137    /// underlying storage failure.
138    pub fn index_range_values(&self, lo: &[u8], hi: &[u8]) -> Result<Vec<Vec<u8>>, NookError> {
139        let Some(ref t) = self.index_table else {
140            return Ok(vec![]);
141        };
142        let iter = t.range::<&[u8]>(lo..hi).map_err(map_redb_storage_error)?;
143        let mut out = Vec::new();
144        for r in iter {
145            let (_k, v) = r.map_err(map_redb_storage_error)?;
146            out.push(v.value().to_vec());
147        }
148        Ok(out)
149    }
150}
151
152/// Handle exposed to write transaction callbacks. Serializable.
153pub struct WriteTx<'tx> {
154    table: Table<'tx, &'static [u8], &'static [u8]>,
155    /// Secondary-index table, opened from the SAME write transaction as
156    /// `table` so index maintenance and document writes commit (or roll
157    /// back) atomically together. redb permits multiple tables open
158    /// concurrently within one `WriteTransaction`; each `Table` borrows
159    /// `&'tx WriteTransaction` immutably (`open_table` takes `&'txn self`).
160    index_table: Table<'tx, &'static [u8], &'static [u8]>,
161    /// Accumulates document-level changes made within this transaction.
162    /// Drained by `take_touched` after a successful commit; rollback drops
163    /// `WriteTx` without ever calling `take_touched`, so the vec is simply
164    /// discarded.
165    touched: Vec<DocChange>,
166}
167
168impl<'tx> WriteTx<'tx> {
169    pub(crate) fn new(txn: &'tx redb::WriteTransaction) -> Result<Self, NookError> {
170        let table = txn.open_table(ENTRIES).map_err(map_redb_table_error)?;
171        let index_table = txn
172            .open_table(crate::index::engine::INDEX_ENTRIES)
173            .map_err(map_redb_table_error)?;
174        Ok(Self {
175            table,
176            index_table,
177            touched: Vec::new(),
178        })
179    }
180
181    /// Drains and returns every document change accumulated so far in this
182    /// transaction. A second call on the same transaction returns an empty
183    /// `Vec`. Intended to be called by `Database::write` immediately after
184    /// a successful commit to build a [`crate::notify::CommitEvent`];
185    /// rollback simply drops `WriteTx` without calling this method.
186    pub fn take_touched(&mut self) -> Vec<DocChange> {
187        std::mem::take(&mut self.touched)
188    }
189
190    /// Inserts or overwrites `(collection, key) -> value`.
191    ///
192    /// # Errors
193    ///
194    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
195    /// a null byte. Returns `NookError::Storage` on underlying storage failure.
196    pub fn put(&mut self, collection: &str, key: &[u8], value: &[u8]) -> Result<(), NookError> {
197        let composite = encode_key(collection, key)?;
198        self.table
199            .insert(composite.as_slice(), value)
200            .map_err(map_redb_storage_error)?;
201        self.touched.push(DocChange {
202            collection: collection.to_string(),
203            op: ChangeOp::Insert,
204            doc_id: key.to_vec(),
205        });
206        Ok(())
207    }
208
209    /// Same semantics as `ReadTx::get`, but reads from the current
210    /// uncommitted state of this write transaction.
211    ///
212    /// # Errors
213    ///
214    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
215    /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
216    /// on underlying storage failure.
217    pub fn get(&self, collection: &str, key: &[u8]) -> Result<Option<Vec<u8>>, NookError> {
218        let composite = encode_key(collection, key)?;
219        let guard = self
220            .table
221            .get(composite.as_slice())
222            .map_err(map_redb_storage_error)?;
223        Ok(guard.map(|v| v.value().to_vec()))
224    }
225
226    /// Removes `(collection, key)`. Returns `true` if the key was
227    /// present, `false` otherwise.
228    ///
229    /// # Errors
230    ///
231    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
232    /// a null byte. Returns `NookError::Storage` on underlying storage failure.
233    pub fn delete(&mut self, collection: &str, key: &[u8]) -> Result<bool, NookError> {
234        let composite = encode_key(collection, key)?;
235        let removed = self
236            .table
237            .remove(composite.as_slice())
238            .map_err(map_redb_storage_error)?;
239        let existed = removed.is_some();
240        if existed {
241            self.touched.push(DocChange {
242                collection: collection.to_string(),
243                op: ChangeOp::Delete,
244                doc_id: key.to_vec(),
245            });
246        }
247        Ok(existed)
248    }
249
250    /// Inserts (or overwrites) a raw `key -> value` pair into the
251    /// secondary-index table within this write transaction. The
252    /// composite index key layout is owned by `crate::index::engine`;
253    /// this method is layout-agnostic.
254    ///
255    /// # Errors
256    ///
257    /// Returns `NookError::Storage` on underlying storage failure.
258    pub fn index_put(&mut self, k: &[u8], v: &[u8]) -> Result<(), NookError> {
259        self.index_table
260            .insert(k, v)
261            .map_err(map_redb_storage_error)?;
262        Ok(())
263    }
264
265    /// Removes a raw key from the secondary-index table within this
266    /// write transaction. A missing key is not an error.
267    ///
268    /// # Errors
269    ///
270    /// Returns `NookError::Storage` on underlying storage failure.
271    pub fn index_delete(&mut self, k: &[u8]) -> Result<(), NookError> {
272        self.index_table.remove(k).map_err(map_redb_storage_error)?;
273        Ok(())
274    }
275
276    /// Same semantics as `ReadTx::index_range_values`, but reads from the
277    /// current uncommitted state of this write transaction. Lets the
278    /// index engine perform a unique-constraint pre-check that observes
279    /// the in-flight write (a separate read snapshot would miss prior
280    /// inserts made within the same transaction).
281    ///
282    /// Returns an empty vector when no index maintenance has occurred in
283    /// this transaction (the index table is created lazily). The
284    /// composite index key layout is owned by `crate::index::engine`;
285    /// this method is layout-agnostic.
286    ///
287    /// # Errors
288    ///
289    /// Returns `NookError::Storage` or `NookError::Corruption` on
290    /// underlying storage failure.
291    pub fn index_range_values(&self, lo: &[u8], hi: &[u8]) -> Result<Vec<Vec<u8>>, NookError> {
292        let iter = self
293            .index_table
294            .range::<&[u8]>(lo..hi)
295            .map_err(map_redb_storage_error)?;
296        let mut out = Vec::new();
297        for r in iter {
298            let (_k, v) = r.map_err(map_redb_storage_error)?;
299            out.push(v.value().to_vec());
300        }
301        Ok(out)
302    }
303
304    /// Returns true iff the `entries` table has at least one row.
305    ///
306    /// Used by backup restore to enforce the `allow_overwrite` gate.
307    ///
308    /// # Errors
309    ///
310    /// Returns `NookError::Storage` on underlying storage failure.
311    pub fn has_any_entry(&self) -> Result<bool, NookError> {
312        let mut iter = self.table.iter().map_err(map_redb_storage_error)?;
313        Ok(iter.next().is_some())
314    }
315
316    /// Removes every row from the `entries` table AND the index table.
317    ///
318    /// Used by backup restore when `allow_overwrite=true`.
319    ///
320    /// # Errors
321    ///
322    /// Returns `NookError::Storage` on underlying storage failure.
323    pub fn clear_entries(&mut self) -> Result<(), NookError> {
324        self.table
325            .retain(|_, _| false)
326            .map_err(map_redb_storage_error)?;
327        self.index_table
328            .retain(|_, _| false)
329            .map_err(map_redb_storage_error)?;
330        Ok(())
331    }
332
333    /// Inserts a raw `(composite_key, value)` pair into the `entries` table.
334    ///
335    /// Used by backup restore to replay a `.nbkp` snapshot. Does NOT touch
336    /// the index table — backup restore replays composite keys from the same
337    /// source table, and the index table is empty (cleared by `clear_entries`)
338    /// so secondary indexes will be lost after restore until reindexed.
339    ///
340    /// # Errors
341    ///
342    /// Returns `NookError::Storage` on underlying storage failure.
343    pub fn put_raw(&mut self, composite_key: &[u8], value: &[u8]) -> Result<(), NookError> {
344        self.table
345            .insert(composite_key, value)
346            .map_err(map_redb_storage_error)?;
347        Ok(())
348    }
349
350    /// Same semantics as `ReadTx::list_collection`, but reads from the
351    /// current uncommitted state of this write transaction.
352    ///
353    /// # Errors
354    ///
355    /// Returns `NookError::InvalidArg` if `collection` is empty or contains
356    /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
357    /// on underlying storage failure.
358    pub fn list_collection(&self, collection: &str) -> Result<Vec<Entry>, NookError> {
359        let lower = collection_prefix_lower(collection)?;
360        let upper = collection_prefix_upper(collection)?;
361        let iter = self
362            .table
363            .range::<&[u8]>(lower.as_slice()..upper.as_slice())
364            .map_err(map_redb_storage_error)?;
365        let mut out = Vec::new();
366        for entry in iter {
367            let (k, v) = entry.map_err(map_redb_storage_error)?;
368            let composite: &[u8] = k.value();
369            let user_key = strip_collection_prefix(composite, collection)
370                .ok_or_else(|| NookError::Corruption {
371                    msg: format!(
372                        "composite key missing expected prefix for collection {collection:?}"
373                    ),
374                })?
375                .to_vec();
376            out.push((user_key, v.value().to_vec()));
377        }
378        Ok(out)
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use crate::database::Database;
385    use crate::notify::ChangeOp;
386
387    fn fresh_db() -> (tempfile::TempDir, Database) {
388        let dir = tempfile::tempdir().unwrap();
389        let path = dir.path().join("test.db");
390        let db = Database::open(&path).unwrap();
391        (dir, db)
392    }
393
394    #[test]
395    fn write_tx_records_put_as_insert_and_real_delete_as_delete() {
396        let (_dir, db) = fresh_db();
397        let touched = db
398            .write(|tx| {
399                tx.put("users", b"u1", b"Ali")?;
400                tx.put("users", b"u2", b"Veli")?;
401                let _ = tx.delete("users", b"u2")?; // existed → recorded
402                let _ = tx.delete("users", b"ghost")?; // absent → NOT recorded
403                Ok(tx.take_touched())
404            })
405            .unwrap();
406        let summary: Vec<(&str, ChangeOp, &[u8])> = touched
407            .iter()
408            .map(|c| (c.collection.as_str(), c.op, c.doc_id.as_slice()))
409            .collect();
410        assert_eq!(
411            summary,
412            vec![
413                ("users", ChangeOp::Insert, &b"u1"[..]),
414                ("users", ChangeOp::Insert, &b"u2"[..]),
415                ("users", ChangeOp::Delete, &b"u2"[..]),
416            ]
417        );
418    }
419
420    #[test]
421    fn take_touched_drains_so_a_second_call_is_empty() {
422        let (_dir, db) = fresh_db();
423        let (first, second) = db
424            .write(|tx| {
425                tx.put("c", b"k", b"v")?;
426                let a = tx.take_touched();
427                let b = tx.take_touched();
428                Ok((a.len(), b.len()))
429            })
430            .unwrap();
431        assert_eq!((first, second), (1, 0));
432    }
433}