1use std::path::Path;
4use std::sync::Arc;
5
6use redb::Database as RedbDatabase;
7
8use crate::error::NookError;
9use crate::notify::{CommitEvent, CommitObserver, Notifier, ObserverHandle};
10use crate::storage::{ReadTx, WriteTx};
11
12pub struct Database {
18 pub(crate) inner: RedbDatabase,
19 notifier: Notifier,
20}
21
22impl Database {
23 pub fn open(path: impl AsRef<Path>) -> Result<Self, NookError> {
32 let path = path.as_ref();
33 if let Some(parent) = path.parent() {
34 if !parent.as_os_str().is_empty() && !parent.exists() {
35 std::fs::create_dir_all(parent)?;
36 }
37 }
38 let inner = RedbDatabase::create(path).map_err(map_redb_db_error)?;
39 Ok(Self {
40 inner,
41 notifier: Notifier::new(),
42 })
43 }
44
45 #[must_use = "dropping the ObserverHandle immediately unregisters the observer"]
48 pub fn add_observer(&self, obs: Arc<dyn CommitObserver>) -> ObserverHandle {
49 self.notifier.add_observer(obs)
50 }
51
52 pub fn read<F, R>(&self, f: F) -> Result<R, NookError>
61 where
62 F: FnOnce(&ReadTx) -> Result<R, NookError>,
63 {
64 let txn = self.inner.begin_read().map_err(map_redb_tx_error)?;
65 let tx = ReadTx::new(&txn)?;
66 f(&tx)
67 }
68
69 pub fn write<F, R>(&self, f: F) -> Result<R, NookError>
81 where
82 F: for<'tx> FnOnce(&mut WriteTx<'tx>) -> Result<R, NookError>,
83 {
84 let txn = self.inner.begin_write().map_err(map_redb_tx_error)?;
85 let (result, touched) = {
89 let mut tx = WriteTx::new(&txn)?;
90 let r = f(&mut tx);
91 let t = tx.take_touched();
92 (r, t)
93 };
94 match result {
95 Ok(value) => {
96 txn.commit().map_err(map_redb_commit_error)?;
97 if !touched.is_empty() {
98 self.notifier.dispatch(&CommitEvent::new(touched));
99 }
100 Ok(value)
101 }
102 Err(user_err) => {
103 if let Err(abort_err) = txn.abort() {
104 return Err(NookError::Transaction {
105 msg: format!("rollback failed ({abort_err}); original error: {user_err}"),
106 });
107 }
108 Err(user_err)
109 }
110 }
111 }
112}
113
114fn map_redb_db_error(e: redb::DatabaseError) -> NookError {
115 match e {
116 redb::DatabaseError::Storage(s) => map_redb_storage_error(s),
117 other => NookError::Storage(std::io::Error::other(other.to_string())),
118 }
119}
120
121pub(crate) fn map_redb_storage_error(e: redb::StorageError) -> NookError {
122 match e {
123 redb::StorageError::Io(io_err) => NookError::Storage(io_err),
124 redb::StorageError::Corrupted(msg) => NookError::Corruption { msg },
125 other => NookError::Transaction {
126 msg: other.to_string(),
127 },
128 }
129}
130
131pub(crate) fn map_redb_table_error(e: redb::TableError) -> NookError {
132 match e {
133 redb::TableError::Storage(s) => map_redb_storage_error(s),
134 other => NookError::Transaction {
135 msg: other.to_string(),
136 },
137 }
138}
139
140fn map_redb_tx_error(e: redb::TransactionError) -> NookError {
141 match e {
142 redb::TransactionError::Storage(s) => map_redb_storage_error(s),
143 other => NookError::Transaction {
144 msg: other.to_string(),
145 },
146 }
147}
148
149fn map_redb_commit_error(e: redb::CommitError) -> NookError {
150 match e {
151 redb::CommitError::Storage(s) => map_redb_storage_error(s),
152 other => NookError::Transaction {
153 msg: other.to_string(),
154 },
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::error::NookErrorKind;
162
163 fn fresh_db() -> (tempfile::TempDir, Database) {
164 let dir = tempfile::tempdir().unwrap();
165 let path = dir.path().join("test.db");
166 let db = Database::open(&path).unwrap();
167 (dir, db)
168 }
169
170 #[test]
173 fn open_creates_file_at_path() {
174 let dir = tempfile::tempdir().unwrap();
175 let path = dir.path().join("test.db");
176 assert!(!path.exists());
177 let _db = Database::open(&path).unwrap();
178 assert!(path.exists());
179 }
180
181 #[test]
182 fn open_creates_missing_parent_dirs() {
183 let dir = tempfile::tempdir().unwrap();
184 let nested = dir.path().join("a").join("b").join("c").join("test.db");
185 assert!(!nested.exists());
186 let _db = Database::open(&nested).unwrap();
187 assert!(nested.exists());
188 }
189
190 #[test]
191 fn open_existing_file_succeeds() {
192 let dir = tempfile::tempdir().unwrap();
193 let path = dir.path().join("test.db");
194 {
195 let _db = Database::open(&path).unwrap();
196 }
197 let _db2 = Database::open(&path).unwrap();
198 assert!(path.exists());
199 }
200
201 #[test]
204 fn write_then_read_round_trips_a_value() {
205 let (_dir, db) = fresh_db();
206 db.write(|tx| tx.put("users", b"u1", b"Ali")).unwrap();
207 let got = db.read(|tx| tx.get("users", b"u1")).unwrap();
208 assert_eq!(got.as_deref(), Some(&b"Ali"[..]));
209 }
210
211 #[test]
212 fn read_of_missing_key_returns_none() {
213 let (_dir, db) = fresh_db();
214 let got = db.read(|tx| tx.get("users", b"missing")).unwrap();
215 assert_eq!(got, None);
216 }
217
218 #[test]
219 fn write_get_inside_same_tx_returns_current_value() {
220 let (_dir, db) = fresh_db();
221 let observed = db
222 .write(|tx| {
223 tx.put("c", b"k", b"v1")?;
224 tx.get("c", b"k")
225 })
226 .unwrap();
227 assert_eq!(observed.as_deref(), Some(&b"v1"[..]));
228 }
229
230 #[test]
231 fn delete_returns_true_when_key_existed() {
232 let (_dir, db) = fresh_db();
233 db.write(|tx| tx.put("c", b"k", b"v")).unwrap();
234 let removed = db.write(|tx| tx.delete("c", b"k")).unwrap();
235 assert!(removed);
236 let got = db.read(|tx| tx.get("c", b"k")).unwrap();
237 assert_eq!(got, None);
238 }
239
240 #[test]
241 fn delete_returns_false_when_key_missing() {
242 let (_dir, db) = fresh_db();
243 let removed = db.write(|tx| tx.delete("c", b"missing")).unwrap();
244 assert!(!removed);
245 }
246
247 #[test]
248 fn list_collection_returns_all_entries_for_that_collection_only() {
249 let (_dir, db) = fresh_db();
250 db.write(|tx| {
251 tx.put("users", b"u1", b"Ali")?;
252 tx.put("users", b"u2", b"Veli")?;
253 tx.put("posts", b"p1", b"Hello")?;
254 Ok(())
255 })
256 .unwrap();
257 let mut users = db.read(|tx| tx.list_collection("users")).unwrap();
258 users.sort();
259 assert_eq!(
260 users,
261 vec![
262 (b"u1".to_vec(), b"Ali".to_vec()),
263 (b"u2".to_vec(), b"Veli".to_vec()),
264 ]
265 );
266 let posts = db.read(|tx| tx.list_collection("posts")).unwrap();
267 assert_eq!(posts, vec![(b"p1".to_vec(), b"Hello".to_vec())]);
268 }
269
270 #[test]
271 fn list_collection_returns_empty_for_unknown_collection() {
272 let (_dir, db) = fresh_db();
273 let entries = db.read(|tx| tx.list_collection("nope")).unwrap();
274 assert!(entries.is_empty());
275 }
276
277 #[test]
280 fn write_rolls_back_when_callback_returns_err() {
281 let (_dir, db) = fresh_db();
282 let result = db.write(|tx| -> Result<(), NookError> {
283 tx.put("c", b"k", b"v")?;
284 Err(NookError::Transaction {
285 msg: "user-induced rollback".into(),
286 })
287 });
288 assert!(matches!(result, Err(NookError::Transaction { .. })));
289 let got = db.read(|tx| tx.get("c", b"k")).unwrap();
290 assert_eq!(got, None);
291 }
292
293 #[test]
294 fn write_rolls_back_when_callback_panics() {
295 let (_dir, db) = fresh_db();
296 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
297 let _ = db.write(|tx| -> Result<(), NookError> {
298 tx.put("c", b"k", b"v")?;
299 panic!("intentional panic");
300 });
301 }));
302 assert!(result.is_err(), "panic should propagate out of write");
303 let got = db.read(|tx| tx.get("c", b"k")).unwrap();
304 assert_eq!(got, None, "value committed despite panic");
305 }
306
307 #[test]
310 fn writes_persist_across_open_close() {
311 let dir = tempfile::tempdir().unwrap();
312 let path = dir.path().join("test.db");
313 {
314 let db = Database::open(&path).unwrap();
315 db.write(|tx| tx.put("c", b"k", b"persistent")).unwrap();
316 }
317 {
318 let db = Database::open(&path).unwrap();
319 let got = db.read(|tx| tx.get("c", b"k")).unwrap();
320 assert_eq!(got.as_deref(), Some(&b"persistent"[..]));
321 }
322 }
323
324 #[test]
327 fn put_rejects_collection_with_null_byte() {
328 let (_dir, db) = fresh_db();
329 let err = db.write(|tx| tx.put("bad\0name", b"k", b"v")).unwrap_err();
330 assert_eq!(err.kind(), NookErrorKind::InvalidArg);
331 }
332
333 #[test]
334 fn put_rejects_empty_collection() {
335 let (_dir, db) = fresh_db();
336 let err = db.write(|tx| tx.put("", b"k", b"v")).unwrap_err();
337 assert_eq!(err.kind(), NookErrorKind::InvalidArg);
338 }
339
340 #[test]
343 fn read_tx_sees_snapshot_not_later_writes() {
344 let (_dir, db) = fresh_db();
345 db.write(|tx| tx.put("c", b"k", b"v_old")).unwrap();
346 let read_observed: Option<Vec<u8>> = db
347 .read(|tx| {
348 let snapshot = tx.get("c", b"k")?;
349 Ok(snapshot)
350 })
351 .unwrap();
352 assert_eq!(read_observed.as_deref(), Some(&b"v_old"[..]));
353 }
354
355 #[test]
356 fn list_collection_inside_read_returns_committed_entries() {
357 let (_dir, db) = fresh_db();
358 db.write(|tx| {
359 tx.put("c", b"k1", b"v1")?;
360 tx.put("c", b"k2", b"v2")?;
361 Ok(())
362 })
363 .unwrap();
364 let entries = db.read(|tx| tx.list_collection("c")).unwrap();
365 assert_eq!(entries.len(), 2);
366 }
367
368 use crate::notify::{CommitEvent, CommitObserver};
371 use std::sync::{Arc, Mutex};
372
373 #[derive(Default)]
374 struct Spy(Mutex<Vec<Vec<String>>>); impl CommitObserver for Spy {
376 fn on_commit(&self, ev: &CommitEvent) {
377 self.0.lock().unwrap().push(
378 ev.touched_collections()
379 .into_iter()
380 .map(str::to_string)
381 .collect(),
382 );
383 }
384 }
385
386 #[test]
387 fn commit_dispatches_one_event_with_touched_collections() {
388 let (_dir, db) = fresh_db();
389 let spy = Arc::new(Spy::default());
390 let _h = db.add_observer(spy.clone());
391 db.write(|tx| {
392 tx.put("users", b"u1", b"Ali")?;
393 tx.put("posts", b"p1", b"Hi")?;
394 Ok(())
395 })
396 .unwrap();
397 let got = spy.0.lock().unwrap().clone();
398 assert_eq!(got, vec![vec!["posts".to_string(), "users".to_string()]]);
399 }
400
401 #[test]
402 fn rollback_and_panic_never_dispatch() {
403 let (_dir, db) = fresh_db();
404 let spy = Arc::new(Spy::default());
405 let _h = db.add_observer(spy.clone());
406
407 let _ = db.write(|tx| -> Result<(), NookError> {
408 tx.put("c", b"k", b"v")?;
409 Err(NookError::Transaction {
410 msg: "rollback".into(),
411 })
412 });
413 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
414 let _ = db.write(|tx| -> Result<(), NookError> {
415 tx.put("c", b"k", b"v")?;
416 panic!("boom");
417 });
418 }));
419 assert!(
420 spy.0.lock().unwrap().is_empty(),
421 "no dispatch on rollback/panic"
422 );
423 }
424
425 #[test]
426 fn a_no_op_commit_does_not_dispatch() {
427 let (_dir, db) = fresh_db();
428 let spy = Arc::new(Spy::default());
429 let _h = db.add_observer(spy.clone());
430 db.write(|_tx| Ok(())).unwrap(); assert!(
432 spy.0.lock().unwrap().is_empty(),
433 "empty CommitEvent suppressed"
434 );
435 }
436}