Skip to main content

nookdb_core/
database.rs

1//! `Database` — top-level handle owning the redb file lock.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use redb::Database as RedbDatabase;
7
8use crate::error::NookError;
9use crate::notify::{CommitEvent, CommitObserver, Notifier, ObserverHandle};
10use crate::storage::{ReadTx, WriteTx};
11
12/// Owning handle to a Nook database file.
13///
14/// Holds the OS-level file lock for the duration of its lifetime. Drop
15/// releases the lock. There is no explicit `close()` method on this type;
16/// the NAPI binding implements its own closure semantics on top.
17pub struct Database {
18    pub(crate) inner: RedbDatabase,
19    notifier: Notifier,
20}
21
22impl Database {
23    /// Opens the database at `path`, creating it (and any missing parent
24    /// directories) if necessary.
25    ///
26    /// # Errors
27    ///
28    /// Returns `NookError::Storage` if the file or parent directories cannot
29    /// be created, or if the database file cannot be opened or is locked by
30    /// another process.
31    pub fn open(path: impl AsRef<Path>) -> Result<Self, NookError> {
32        let path = path.as_ref();
33        if let Some(parent) = path.parent() {
34            if !parent.as_os_str().is_empty() && !parent.exists() {
35                std::fs::create_dir_all(parent)?;
36            }
37        }
38        let inner = RedbDatabase::create(path).map_err(map_redb_db_error)?;
39        Ok(Self {
40            inner,
41            notifier: Notifier::new(),
42        })
43    }
44
45    /// Registers a commit observer (stable extension seam, M3).
46    /// The returned handle unregisters on drop (RAII).
47    #[must_use = "dropping the ObserverHandle immediately unregisters the observer"]
48    pub fn add_observer(&self, obs: Arc<dyn CommitObserver>) -> ObserverHandle {
49        self.notifier.add_observer(obs)
50    }
51
52    /// Runs `f` inside a read transaction (MVCC snapshot). Read
53    /// transactions never block writers and may run in parallel with
54    /// each other.
55    ///
56    /// # Errors
57    ///
58    /// Returns `NookError::Transaction` if the read transaction cannot be
59    /// started, or propagates any error returned by `f`.
60    pub fn read<F, R>(&self, f: F) -> Result<R, NookError>
61    where
62        F: FnOnce(&ReadTx) -> Result<R, NookError>,
63    {
64        let txn = self.inner.begin_read().map_err(map_redb_tx_error)?;
65        let tx = ReadTx::new(&txn)?;
66        f(&tx)
67    }
68
69    /// Runs `f` inside a write transaction. The transaction commits on
70    /// `Ok` and rolls back on `Err` or panic.
71    ///
72    /// Write transactions are serializable: only one runs at a time.
73    ///
74    /// # Errors
75    ///
76    /// Returns `NookError::Transaction` if the write transaction cannot be
77    /// started or committed, or propagates any error returned by `f`.
78    /// Returns `NookError::Storage` or `NookError::Corruption` on
79    /// underlying storage failure.
80    pub fn write<F, R>(&self, f: F) -> Result<R, NookError>
81    where
82        F: for<'tx> FnOnce(&mut WriteTx<'tx>) -> Result<R, NookError>,
83    {
84        let txn = self.inner.begin_write().map_err(map_redb_tx_error)?;
85        // Scope-drop `tx` (whose Tables immutably borrow `&txn`) BEFORE
86        // `txn.commit()`/`txn.abort()` consume `txn` by value; `take_touched()`
87        // moves the change-set out first so it outlives the drop.
88        let (result, touched) = {
89            let mut tx = WriteTx::new(&txn)?;
90            let r = f(&mut tx);
91            let t = tx.take_touched();
92            (r, t)
93        };
94        match result {
95            Ok(value) => {
96                txn.commit().map_err(map_redb_commit_error)?;
97                if !touched.is_empty() {
98                    self.notifier.dispatch(&CommitEvent::new(touched));
99                }
100                Ok(value)
101            }
102            Err(user_err) => {
103                if let Err(abort_err) = txn.abort() {
104                    return Err(NookError::Transaction {
105                        msg: format!("rollback failed ({abort_err}); original error: {user_err}"),
106                    });
107                }
108                Err(user_err)
109            }
110        }
111    }
112}
113
114fn map_redb_db_error(e: redb::DatabaseError) -> NookError {
115    match e {
116        redb::DatabaseError::Storage(s) => map_redb_storage_error(s),
117        other => NookError::Storage(std::io::Error::other(other.to_string())),
118    }
119}
120
121pub(crate) fn map_redb_storage_error(e: redb::StorageError) -> NookError {
122    match e {
123        redb::StorageError::Io(io_err) => NookError::Storage(io_err),
124        redb::StorageError::Corrupted(msg) => NookError::Corruption { msg },
125        other => NookError::Transaction {
126            msg: other.to_string(),
127        },
128    }
129}
130
131pub(crate) fn map_redb_table_error(e: redb::TableError) -> NookError {
132    match e {
133        redb::TableError::Storage(s) => map_redb_storage_error(s),
134        other => NookError::Transaction {
135            msg: other.to_string(),
136        },
137    }
138}
139
140fn map_redb_tx_error(e: redb::TransactionError) -> NookError {
141    match e {
142        redb::TransactionError::Storage(s) => map_redb_storage_error(s),
143        other => NookError::Transaction {
144            msg: other.to_string(),
145        },
146    }
147}
148
149fn map_redb_commit_error(e: redb::CommitError) -> NookError {
150    match e {
151        redb::CommitError::Storage(s) => map_redb_storage_error(s),
152        other => NookError::Transaction {
153            msg: other.to_string(),
154        },
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::error::NookErrorKind;
162
163    fn fresh_db() -> (tempfile::TempDir, Database) {
164        let dir = tempfile::tempdir().unwrap();
165        let path = dir.path().join("test.db");
166        let db = Database::open(&path).unwrap();
167        (dir, db)
168    }
169
170    // ---- open() tests from Task 2 ----
171
172    #[test]
173    fn open_creates_file_at_path() {
174        let dir = tempfile::tempdir().unwrap();
175        let path = dir.path().join("test.db");
176        assert!(!path.exists());
177        let _db = Database::open(&path).unwrap();
178        assert!(path.exists());
179    }
180
181    #[test]
182    fn open_creates_missing_parent_dirs() {
183        let dir = tempfile::tempdir().unwrap();
184        let nested = dir.path().join("a").join("b").join("c").join("test.db");
185        assert!(!nested.exists());
186        let _db = Database::open(&nested).unwrap();
187        assert!(nested.exists());
188    }
189
190    #[test]
191    fn open_existing_file_succeeds() {
192        let dir = tempfile::tempdir().unwrap();
193        let path = dir.path().join("test.db");
194        {
195            let _db = Database::open(&path).unwrap();
196        }
197        let _db2 = Database::open(&path).unwrap();
198        assert!(path.exists());
199    }
200
201    // ---- write + read happy path ----
202
203    #[test]
204    fn write_then_read_round_trips_a_value() {
205        let (_dir, db) = fresh_db();
206        db.write(|tx| tx.put("users", b"u1", b"Ali")).unwrap();
207        let got = db.read(|tx| tx.get("users", b"u1")).unwrap();
208        assert_eq!(got.as_deref(), Some(&b"Ali"[..]));
209    }
210
211    #[test]
212    fn read_of_missing_key_returns_none() {
213        let (_dir, db) = fresh_db();
214        let got = db.read(|tx| tx.get("users", b"missing")).unwrap();
215        assert_eq!(got, None);
216    }
217
218    #[test]
219    fn write_get_inside_same_tx_returns_current_value() {
220        let (_dir, db) = fresh_db();
221        let observed = db
222            .write(|tx| {
223                tx.put("c", b"k", b"v1")?;
224                tx.get("c", b"k")
225            })
226            .unwrap();
227        assert_eq!(observed.as_deref(), Some(&b"v1"[..]));
228    }
229
230    #[test]
231    fn delete_returns_true_when_key_existed() {
232        let (_dir, db) = fresh_db();
233        db.write(|tx| tx.put("c", b"k", b"v")).unwrap();
234        let removed = db.write(|tx| tx.delete("c", b"k")).unwrap();
235        assert!(removed);
236        let got = db.read(|tx| tx.get("c", b"k")).unwrap();
237        assert_eq!(got, None);
238    }
239
240    #[test]
241    fn delete_returns_false_when_key_missing() {
242        let (_dir, db) = fresh_db();
243        let removed = db.write(|tx| tx.delete("c", b"missing")).unwrap();
244        assert!(!removed);
245    }
246
247    #[test]
248    fn list_collection_returns_all_entries_for_that_collection_only() {
249        let (_dir, db) = fresh_db();
250        db.write(|tx| {
251            tx.put("users", b"u1", b"Ali")?;
252            tx.put("users", b"u2", b"Veli")?;
253            tx.put("posts", b"p1", b"Hello")?;
254            Ok(())
255        })
256        .unwrap();
257        let mut users = db.read(|tx| tx.list_collection("users")).unwrap();
258        users.sort();
259        assert_eq!(
260            users,
261            vec![
262                (b"u1".to_vec(), b"Ali".to_vec()),
263                (b"u2".to_vec(), b"Veli".to_vec()),
264            ]
265        );
266        let posts = db.read(|tx| tx.list_collection("posts")).unwrap();
267        assert_eq!(posts, vec![(b"p1".to_vec(), b"Hello".to_vec())]);
268    }
269
270    #[test]
271    fn list_collection_returns_empty_for_unknown_collection() {
272        let (_dir, db) = fresh_db();
273        let entries = db.read(|tx| tx.list_collection("nope")).unwrap();
274        assert!(entries.is_empty());
275    }
276
277    // ---- rollback semantics ----
278
279    #[test]
280    fn write_rolls_back_when_callback_returns_err() {
281        let (_dir, db) = fresh_db();
282        let result = db.write(|tx| -> Result<(), NookError> {
283            tx.put("c", b"k", b"v")?;
284            Err(NookError::Transaction {
285                msg: "user-induced rollback".into(),
286            })
287        });
288        assert!(matches!(result, Err(NookError::Transaction { .. })));
289        let got = db.read(|tx| tx.get("c", b"k")).unwrap();
290        assert_eq!(got, None);
291    }
292
293    #[test]
294    fn write_rolls_back_when_callback_panics() {
295        let (_dir, db) = fresh_db();
296        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
297            let _ = db.write(|tx| -> Result<(), NookError> {
298                tx.put("c", b"k", b"v")?;
299                panic!("intentional panic");
300            });
301        }));
302        assert!(result.is_err(), "panic should propagate out of write");
303        let got = db.read(|tx| tx.get("c", b"k")).unwrap();
304        assert_eq!(got, None, "value committed despite panic");
305    }
306
307    // ---- persistence ----
308
309    #[test]
310    fn writes_persist_across_open_close() {
311        let dir = tempfile::tempdir().unwrap();
312        let path = dir.path().join("test.db");
313        {
314            let db = Database::open(&path).unwrap();
315            db.write(|tx| tx.put("c", b"k", b"persistent")).unwrap();
316        }
317        {
318            let db = Database::open(&path).unwrap();
319            let got = db.read(|tx| tx.get("c", b"k")).unwrap();
320            assert_eq!(got.as_deref(), Some(&b"persistent"[..]));
321        }
322    }
323
324    // ---- input validation ----
325
326    #[test]
327    fn put_rejects_collection_with_null_byte() {
328        let (_dir, db) = fresh_db();
329        let err = db.write(|tx| tx.put("bad\0name", b"k", b"v")).unwrap_err();
330        assert_eq!(err.kind(), NookErrorKind::InvalidArg);
331    }
332
333    #[test]
334    fn put_rejects_empty_collection() {
335        let (_dir, db) = fresh_db();
336        let err = db.write(|tx| tx.put("", b"k", b"v")).unwrap_err();
337        assert_eq!(err.kind(), NookErrorKind::InvalidArg);
338    }
339
340    // ---- read-tx isolation (MVCC snapshot) ----
341
342    #[test]
343    fn read_tx_sees_snapshot_not_later_writes() {
344        let (_dir, db) = fresh_db();
345        db.write(|tx| tx.put("c", b"k", b"v_old")).unwrap();
346        let read_observed: Option<Vec<u8>> = db
347            .read(|tx| {
348                let snapshot = tx.get("c", b"k")?;
349                Ok(snapshot)
350            })
351            .unwrap();
352        assert_eq!(read_observed.as_deref(), Some(&b"v_old"[..]));
353    }
354
355    #[test]
356    fn list_collection_inside_read_returns_committed_entries() {
357        let (_dir, db) = fresh_db();
358        db.write(|tx| {
359            tx.put("c", b"k1", b"v1")?;
360            tx.put("c", b"k2", b"v2")?;
361            Ok(())
362        })
363        .unwrap();
364        let entries = db.read(|tx| tx.list_collection("c")).unwrap();
365        assert_eq!(entries.len(), 2);
366    }
367
368    // ---- post-commit dispatch ----
369
370    use crate::notify::{CommitEvent, CommitObserver};
371    use std::sync::{Arc, Mutex};
372
373    #[derive(Default)]
374    struct Spy(Mutex<Vec<Vec<String>>>); // one Vec<collection> per received event
375    impl CommitObserver for Spy {
376        fn on_commit(&self, ev: &CommitEvent) {
377            self.0.lock().unwrap().push(
378                ev.touched_collections()
379                    .into_iter()
380                    .map(str::to_string)
381                    .collect(),
382            );
383        }
384    }
385
386    #[test]
387    fn commit_dispatches_one_event_with_touched_collections() {
388        let (_dir, db) = fresh_db();
389        let spy = Arc::new(Spy::default());
390        let _h = db.add_observer(spy.clone());
391        db.write(|tx| {
392            tx.put("users", b"u1", b"Ali")?;
393            tx.put("posts", b"p1", b"Hi")?;
394            Ok(())
395        })
396        .unwrap();
397        let got = spy.0.lock().unwrap().clone();
398        assert_eq!(got, vec![vec!["posts".to_string(), "users".to_string()]]);
399    }
400
401    #[test]
402    fn rollback_and_panic_never_dispatch() {
403        let (_dir, db) = fresh_db();
404        let spy = Arc::new(Spy::default());
405        let _h = db.add_observer(spy.clone());
406
407        let _ = db.write(|tx| -> Result<(), NookError> {
408            tx.put("c", b"k", b"v")?;
409            Err(NookError::Transaction {
410                msg: "rollback".into(),
411            })
412        });
413        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
414            let _ = db.write(|tx| -> Result<(), NookError> {
415                tx.put("c", b"k", b"v")?;
416                panic!("boom");
417            });
418        }));
419        assert!(
420            spy.0.lock().unwrap().is_empty(),
421            "no dispatch on rollback/panic"
422        );
423    }
424
425    #[test]
426    fn a_no_op_commit_does_not_dispatch() {
427        let (_dir, db) = fresh_db();
428        let spy = Arc::new(Spy::default());
429        let _h = db.add_observer(spy.clone());
430        db.write(|_tx| Ok(())).unwrap(); // committed but touched nothing
431        assert!(
432            spy.0.lock().unwrap().is_empty(),
433            "empty CommitEvent suppressed"
434        );
435    }
436}