nookdb_core/storage.rs
1//! Read and write transaction handles wrapping redb transactions.
2
3use redb::{ReadableTable, Table, TableDefinition};
4
5use crate::codec::{
6 collection_prefix_lower, collection_prefix_upper, encode_key, strip_collection_prefix,
7};
8use crate::database::{map_redb_storage_error, map_redb_table_error};
9use crate::error::NookError;
10use crate::notify::{ChangeOp, DocChange};
11
12/// All entries live in this single redb table. Collection routing is
13/// done via composite keys (see `crate::codec`).
14pub(crate) const ENTRIES: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
15
16/// One (key, value) pair from a `list_collection` scan. Both byte vectors.
17pub type Entry = (Vec<u8>, Vec<u8>);
18
19/// Handle exposed to read transaction callbacks. MVCC snapshot.
20///
21/// No lifetime parameter: `redb::ReadOnlyTable` is `Arc<TransactionGuard>`-backed
22/// and fully owns its data. The read-side table does not borrow the `ReadTransaction`.
23pub struct ReadTx {
24 /// `None` when the "entries" table has never been written to (fresh db).
25 table: Option<redb::ReadOnlyTable<&'static [u8], &'static [u8]>>,
26 /// `None` when the index table has never been written to (no index
27 /// maintenance has occurred yet). Opened from the same MVCC snapshot
28 /// as `table`; like `table`, the `Arc`-backed `ReadOnlyTable` fully
29 /// owns its data and does not borrow the `ReadTransaction`.
30 index_table: Option<redb::ReadOnlyTable<&'static [u8], &'static [u8]>>,
31}
32
33impl ReadTx {
34 pub(crate) fn new(txn: &redb::ReadTransaction) -> Result<Self, NookError> {
35 let table = match txn.open_table(ENTRIES) {
36 Ok(table) => Some(table),
37 Err(redb::TableError::TableDoesNotExist(_)) => None,
38 Err(e) => return Err(map_redb_table_error(e)),
39 };
40 let index_table = match txn.open_table(crate::index::engine::INDEX_ENTRIES) {
41 Ok(t) => Some(t),
42 Err(redb::TableError::TableDoesNotExist(_)) => None,
43 Err(e) => return Err(map_redb_table_error(e)),
44 };
45 Ok(Self { table, index_table })
46 }
47
48 /// Returns the value stored under `(collection, key)`, or `None`.
49 ///
50 /// # Errors
51 ///
52 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
53 /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
54 /// on underlying storage failure.
55 pub fn get(&self, collection: &str, key: &[u8]) -> Result<Option<Vec<u8>>, NookError> {
56 let Some(ref table) = self.table else {
57 // Validate the collection name BEFORE the empty-table early return, so a bad collection still yields InvalidArg (not Ok(None)/Ok(empty)) even on a fresh, never-written db.
58 encode_key(collection, key)?;
59 return Ok(None);
60 };
61 let composite = encode_key(collection, key)?;
62 let guard = table
63 .get(composite.as_slice())
64 .map_err(map_redb_storage_error)?;
65 Ok(guard.map(|v| v.value().to_vec()))
66 }
67
68 /// Returns all `(key, value)` pairs in the named collection,
69 /// in lexicographic key order. May be empty.
70 ///
71 /// # Errors
72 ///
73 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
74 /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
75 /// on underlying storage failure.
76 pub fn list_collection(&self, collection: &str) -> Result<Vec<Entry>, NookError> {
77 let Some(ref table) = self.table else {
78 // Validate the collection name BEFORE the empty-table early return, so a bad collection still yields InvalidArg (not Ok(None)/Ok(empty)) even on a fresh, never-written db.
79 collection_prefix_lower(collection)?;
80 return Ok(vec![]);
81 };
82 let lower = collection_prefix_lower(collection)?;
83 let upper = collection_prefix_upper(collection)?;
84 let iter = table
85 .range::<&[u8]>(lower.as_slice()..upper.as_slice())
86 .map_err(map_redb_storage_error)?;
87 let mut out = Vec::new();
88 for entry in iter {
89 let (k, v) = entry.map_err(map_redb_storage_error)?;
90 let composite: &[u8] = k.value();
91 let user_key = strip_collection_prefix(composite, collection)
92 .ok_or_else(|| NookError::Corruption {
93 msg: format!(
94 "composite key missing expected prefix for collection {collection:?}"
95 ),
96 })?
97 .to_vec();
98 out.push((user_key, v.value().to_vec()));
99 }
100 Ok(out)
101 }
102
103 /// Returns every `(composite_key, value)` pair across the entire
104 /// `entries` table, in key order. Used by `crate::backup::write_backup`
105 /// to stream a full DB snapshot. The composite key encoding is opaque
106 /// here — the backup format does not decompose it (restore writes the
107 /// same composite keys back).
108 ///
109 /// # Errors
110 ///
111 /// Returns `NookError::Storage` or `NookError::Corruption` on
112 /// underlying storage failure.
113 pub fn list_entries_raw(&self) -> Result<Vec<Entry>, NookError> {
114 let Some(ref table) = self.table else {
115 return Ok(vec![]);
116 };
117 let iter = table.iter().map_err(map_redb_storage_error)?;
118 let mut out = Vec::new();
119 for entry in iter {
120 let (k, v) = entry.map_err(map_redb_storage_error)?;
121 out.push((k.value().to_vec(), v.value().to_vec()));
122 }
123 Ok(out)
124 }
125
126 /// Returns every value stored under index keys in the half-open
127 /// range `[lo, hi)` of the secondary-index table, in key order.
128 ///
129 /// Returns an empty vector when the index table has never been
130 /// written (no index maintenance has occurred yet). Used by the
131 /// index engine for equality lookups; collection/key routing is
132 /// encoded into `lo`/`hi` by `crate::index::engine`.
133 ///
134 /// # Errors
135 ///
136 /// Returns `NookError::Storage` or `NookError::Corruption` on
137 /// underlying storage failure.
138 pub fn index_range_values(&self, lo: &[u8], hi: &[u8]) -> Result<Vec<Vec<u8>>, NookError> {
139 let Some(ref t) = self.index_table else {
140 return Ok(vec![]);
141 };
142 let iter = t.range::<&[u8]>(lo..hi).map_err(map_redb_storage_error)?;
143 let mut out = Vec::new();
144 for r in iter {
145 let (_k, v) = r.map_err(map_redb_storage_error)?;
146 out.push(v.value().to_vec());
147 }
148 Ok(out)
149 }
150}
151
152/// Handle exposed to write transaction callbacks. Serializable.
153pub struct WriteTx<'tx> {
154 table: Table<'tx, &'static [u8], &'static [u8]>,
155 /// Secondary-index table, opened from the SAME write transaction as
156 /// `table` so index maintenance and document writes commit (or roll
157 /// back) atomically together. redb permits multiple tables open
158 /// concurrently within one `WriteTransaction`; each `Table` borrows
159 /// `&'tx WriteTransaction` immutably (`open_table` takes `&'txn self`).
160 index_table: Table<'tx, &'static [u8], &'static [u8]>,
161 /// Accumulates document-level changes made within this transaction.
162 /// Drained by `take_touched` after a successful commit; rollback drops
163 /// `WriteTx` without ever calling `take_touched`, so the vec is simply
164 /// discarded.
165 touched: Vec<DocChange>,
166}
167
168impl<'tx> WriteTx<'tx> {
169 pub(crate) fn new(txn: &'tx redb::WriteTransaction) -> Result<Self, NookError> {
170 let table = txn.open_table(ENTRIES).map_err(map_redb_table_error)?;
171 let index_table = txn
172 .open_table(crate::index::engine::INDEX_ENTRIES)
173 .map_err(map_redb_table_error)?;
174 Ok(Self {
175 table,
176 index_table,
177 touched: Vec::new(),
178 })
179 }
180
181 /// Drains and returns every document change accumulated so far in this
182 /// transaction. A second call on the same transaction returns an empty
183 /// `Vec`. Intended to be called by `Database::write` immediately after
184 /// a successful commit to build a [`crate::notify::CommitEvent`];
185 /// rollback simply drops `WriteTx` without calling this method.
186 pub fn take_touched(&mut self) -> Vec<DocChange> {
187 std::mem::take(&mut self.touched)
188 }
189
190 /// Inserts or overwrites `(collection, key) -> value`.
191 ///
192 /// # Errors
193 ///
194 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
195 /// a null byte. Returns `NookError::Storage` on underlying storage failure.
196 pub fn put(&mut self, collection: &str, key: &[u8], value: &[u8]) -> Result<(), NookError> {
197 let composite = encode_key(collection, key)?;
198 self.table
199 .insert(composite.as_slice(), value)
200 .map_err(map_redb_storage_error)?;
201 self.touched.push(DocChange {
202 collection: collection.to_string(),
203 op: ChangeOp::Insert,
204 doc_id: key.to_vec(),
205 });
206 Ok(())
207 }
208
209 /// Same semantics as `ReadTx::get`, but reads from the current
210 /// uncommitted state of this write transaction.
211 ///
212 /// # Errors
213 ///
214 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
215 /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
216 /// on underlying storage failure.
217 pub fn get(&self, collection: &str, key: &[u8]) -> Result<Option<Vec<u8>>, NookError> {
218 let composite = encode_key(collection, key)?;
219 let guard = self
220 .table
221 .get(composite.as_slice())
222 .map_err(map_redb_storage_error)?;
223 Ok(guard.map(|v| v.value().to_vec()))
224 }
225
226 /// Removes `(collection, key)`. Returns `true` if the key was
227 /// present, `false` otherwise.
228 ///
229 /// # Errors
230 ///
231 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
232 /// a null byte. Returns `NookError::Storage` on underlying storage failure.
233 pub fn delete(&mut self, collection: &str, key: &[u8]) -> Result<bool, NookError> {
234 let composite = encode_key(collection, key)?;
235 let removed = self
236 .table
237 .remove(composite.as_slice())
238 .map_err(map_redb_storage_error)?;
239 let existed = removed.is_some();
240 if existed {
241 self.touched.push(DocChange {
242 collection: collection.to_string(),
243 op: ChangeOp::Delete,
244 doc_id: key.to_vec(),
245 });
246 }
247 Ok(existed)
248 }
249
250 /// Inserts (or overwrites) a raw `key -> value` pair into the
251 /// secondary-index table within this write transaction. The
252 /// composite index key layout is owned by `crate::index::engine`;
253 /// this method is layout-agnostic.
254 ///
255 /// # Errors
256 ///
257 /// Returns `NookError::Storage` on underlying storage failure.
258 pub fn index_put(&mut self, k: &[u8], v: &[u8]) -> Result<(), NookError> {
259 self.index_table
260 .insert(k, v)
261 .map_err(map_redb_storage_error)?;
262 Ok(())
263 }
264
265 /// Removes a raw key from the secondary-index table within this
266 /// write transaction. A missing key is not an error.
267 ///
268 /// # Errors
269 ///
270 /// Returns `NookError::Storage` on underlying storage failure.
271 pub fn index_delete(&mut self, k: &[u8]) -> Result<(), NookError> {
272 self.index_table.remove(k).map_err(map_redb_storage_error)?;
273 Ok(())
274 }
275
276 /// Same semantics as `ReadTx::index_range_values`, but reads from the
277 /// current uncommitted state of this write transaction. Lets the
278 /// index engine perform a unique-constraint pre-check that observes
279 /// the in-flight write (a separate read snapshot would miss prior
280 /// inserts made within the same transaction).
281 ///
282 /// Returns an empty vector when no index maintenance has occurred in
283 /// this transaction (the index table is created lazily). The
284 /// composite index key layout is owned by `crate::index::engine`;
285 /// this method is layout-agnostic.
286 ///
287 /// # Errors
288 ///
289 /// Returns `NookError::Storage` or `NookError::Corruption` on
290 /// underlying storage failure.
291 pub fn index_range_values(&self, lo: &[u8], hi: &[u8]) -> Result<Vec<Vec<u8>>, NookError> {
292 let iter = self
293 .index_table
294 .range::<&[u8]>(lo..hi)
295 .map_err(map_redb_storage_error)?;
296 let mut out = Vec::new();
297 for r in iter {
298 let (_k, v) = r.map_err(map_redb_storage_error)?;
299 out.push(v.value().to_vec());
300 }
301 Ok(out)
302 }
303
304 /// Returns true iff the `entries` table has at least one row.
305 ///
306 /// Used by backup restore to enforce the `allow_overwrite` gate.
307 ///
308 /// # Errors
309 ///
310 /// Returns `NookError::Storage` on underlying storage failure.
311 pub fn has_any_entry(&self) -> Result<bool, NookError> {
312 let mut iter = self.table.iter().map_err(map_redb_storage_error)?;
313 Ok(iter.next().is_some())
314 }
315
316 /// Removes every row from the `entries` table AND the index table.
317 ///
318 /// Used by backup restore when `allow_overwrite=true`.
319 ///
320 /// # Errors
321 ///
322 /// Returns `NookError::Storage` on underlying storage failure.
323 pub fn clear_entries(&mut self) -> Result<(), NookError> {
324 self.table
325 .retain(|_, _| false)
326 .map_err(map_redb_storage_error)?;
327 self.index_table
328 .retain(|_, _| false)
329 .map_err(map_redb_storage_error)?;
330 Ok(())
331 }
332
333 /// Inserts a raw `(composite_key, value)` pair into the `entries` table.
334 ///
335 /// Used by backup restore to replay a `.nbkp` snapshot. Does NOT touch
336 /// the index table — backup restore replays composite keys from the same
337 /// source table, and the index table is empty (cleared by `clear_entries`)
338 /// so secondary indexes will be lost after restore until reindexed.
339 ///
340 /// # Errors
341 ///
342 /// Returns `NookError::Storage` on underlying storage failure.
343 pub fn put_raw(&mut self, composite_key: &[u8], value: &[u8]) -> Result<(), NookError> {
344 self.table
345 .insert(composite_key, value)
346 .map_err(map_redb_storage_error)?;
347 Ok(())
348 }
349
350 /// Same semantics as `ReadTx::list_collection`, but reads from the
351 /// current uncommitted state of this write transaction.
352 ///
353 /// # Errors
354 ///
355 /// Returns `NookError::InvalidArg` if `collection` is empty or contains
356 /// a null byte. Returns `NookError::Storage` or `NookError::Corruption`
357 /// on underlying storage failure.
358 pub fn list_collection(&self, collection: &str) -> Result<Vec<Entry>, NookError> {
359 let lower = collection_prefix_lower(collection)?;
360 let upper = collection_prefix_upper(collection)?;
361 let iter = self
362 .table
363 .range::<&[u8]>(lower.as_slice()..upper.as_slice())
364 .map_err(map_redb_storage_error)?;
365 let mut out = Vec::new();
366 for entry in iter {
367 let (k, v) = entry.map_err(map_redb_storage_error)?;
368 let composite: &[u8] = k.value();
369 let user_key = strip_collection_prefix(composite, collection)
370 .ok_or_else(|| NookError::Corruption {
371 msg: format!(
372 "composite key missing expected prefix for collection {collection:?}"
373 ),
374 })?
375 .to_vec();
376 out.push((user_key, v.value().to_vec()));
377 }
378 Ok(out)
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use crate::database::Database;
385 use crate::notify::ChangeOp;
386
387 fn fresh_db() -> (tempfile::TempDir, Database) {
388 let dir = tempfile::tempdir().unwrap();
389 let path = dir.path().join("test.db");
390 let db = Database::open(&path).unwrap();
391 (dir, db)
392 }
393
394 #[test]
395 fn write_tx_records_put_as_insert_and_real_delete_as_delete() {
396 let (_dir, db) = fresh_db();
397 let touched = db
398 .write(|tx| {
399 tx.put("users", b"u1", b"Ali")?;
400 tx.put("users", b"u2", b"Veli")?;
401 let _ = tx.delete("users", b"u2")?; // existed → recorded
402 let _ = tx.delete("users", b"ghost")?; // absent → NOT recorded
403 Ok(tx.take_touched())
404 })
405 .unwrap();
406 let summary: Vec<(&str, ChangeOp, &[u8])> = touched
407 .iter()
408 .map(|c| (c.collection.as_str(), c.op, c.doc_id.as_slice()))
409 .collect();
410 assert_eq!(
411 summary,
412 vec![
413 ("users", ChangeOp::Insert, &b"u1"[..]),
414 ("users", ChangeOp::Insert, &b"u2"[..]),
415 ("users", ChangeOp::Delete, &b"u2"[..]),
416 ]
417 );
418 }
419
420 #[test]
421 fn take_touched_drains_so_a_second_call_is_empty() {
422 let (_dir, db) = fresh_db();
423 let (first, second) = db
424 .write(|tx| {
425 tx.put("c", b"k", b"v")?;
426 let a = tx.take_touched();
427 let b = tx.take_touched();
428 Ok((a.len(), b.len()))
429 })
430 .unwrap();
431 assert_eq!((first, second), (1, 0));
432 }
433}