Skip to main content

grumpydb/
engine.rs

1//! Storage engine: orchestrates all subsystems to provide CRUD operations.
2//!
3//! `GrumpyDb` is a thin wrapper over a single [`Collection`] with WAL logging.
4//! All data page access goes through the collection's buffer pool
5//! ([`crate::buffer::pool::BufferPool`]).
6//!
7//! **DEPRECATED in v5:** the type [`GrumpyDb`] is kept for one major-version
8//! cycle and will be removed in v6. New code should use
9//! [`crate::Database`] (with the `_default` collection if a single-collection
10//! store is sufficient). All internal references in this module are wrapped
11//! with `#[allow(deprecated)]` to keep the deprecation warning visible to
12//! downstream consumers without spamming our own builds.
13#![allow(deprecated)]
14
15use std::path::Path;
16use uuid::Uuid;
17
18use crate::collection::Collection;
19use crate::document::Document;
20use crate::document::value::Value;
21use crate::error::{GrumpyError, Result};
22use crate::page::manager::PageManager;
23use crate::wal::writer::WalWriter;
24
25/// Default number of frames in the buffer pool (256 frames × 8 KiB = 2 MiB).
26const DEFAULT_POOL_CAPACITY: usize = 256;
27
28/// The main GrumpyDB storage engine.
29///
30/// Provides CRUD operations on schema-less documents identified by UUID keys.
31/// Documents are stored in page-based files with B+Tree indexing.
32/// Data pages are cached in a buffer pool for reduced disk I/O.
33///
34/// # Example
35///
36/// ```no_run
37/// use grumpydb::{GrumpyDb, Value};
38/// use uuid::Uuid;
39///
40/// let mut db = GrumpyDb::open(std::path::Path::new("./mydb")).unwrap();
41/// let key = Uuid::new_v4();
42/// db.insert(key, Value::String("hello".into())).unwrap();
43/// assert_eq!(db.get(&key).unwrap(), Some(Value::String("hello".into())));
44/// db.close().unwrap();
45/// ```
46#[deprecated(
47    since = "5.0.0",
48    note = "use `Database` (with the `_default` collection if you only need a \
49            single-collection store). `GrumpyDb` will be removed in v6."
50)]
51pub struct GrumpyDb {
52    /// The underlying collection (data pages + primary index).
53    collection: Collection,
54    /// Write-Ahead Log for durability.
55    wal: WalWriter,
56    /// Write counter for periodic checkpointing.
57    writes_since_checkpoint: u32,
58}
59
60/// Number of writes between automatic checkpoints.
61const CHECKPOINT_INTERVAL: u32 = 100;
62
63/// Result of a compaction operation.
64#[derive(Debug)]
65pub struct CompactResult {
66    /// Number of documents preserved.
67    pub documents: u64,
68}
69
70impl GrumpyDb {
71    /// Opens or creates a database at the given directory path.
72    ///
73    /// Creates `data.db` for document storage and `primary.idx` for the B+Tree index.
74    /// Data pages are cached in a buffer pool (256 frames = 2 MiB by default).
75    /// If the files already exist, they are opened and the engine resumes.
76    pub fn open(path: &Path) -> Result<Self> {
77        Self::open_with_pool_capacity(path, DEFAULT_POOL_CAPACITY)
78    }
79
80    /// Opens a database with a custom buffer pool capacity (number of frames).
81    pub fn open_with_pool_capacity(path: &Path, pool_capacity: usize) -> Result<Self> {
82        std::fs::create_dir_all(path)?;
83
84        let data_path = path.join("data.db");
85        let index_path = path.join("primary.idx");
86        let wal_path = path.join("wal.log");
87
88        // WAL recovery happens BEFORE creating the Collection,
89        // because recovery needs two &mut PageManager references.
90        let mut wal = WalWriter::new(&wal_path)?;
91        let records = wal.read_all_records()?;
92        if !records.is_empty() {
93            let mut data_pm = PageManager::new(&data_path)?;
94            let mut index_pm = PageManager::new(&index_path)?;
95            crate::wal::recovery::recover(&records, &mut data_pm, &mut index_pm)?;
96            data_pm.sync()?;
97            index_pm.sync()?;
98            wal.log_checkpoint()?;
99            wal.truncate()?;
100        }
101
102        // Now open the Collection (wraps data.db + primary.idx in BufferPool + BTree)
103        let collection = Collection::open(path, "_default", pool_capacity)?;
104
105        Ok(Self {
106            collection,
107            wal,
108            writes_since_checkpoint: 0,
109        })
110    }
111
112    /// Inserts a document with the given UUID key.
113    ///
114    /// Returns `DuplicateKey` if the key already exists.
115    pub fn insert(&mut self, key: Uuid, value: Value) -> Result<()> {
116        let doc = Document::new(key, value);
117        let encoded = doc.encode();
118
119        let tx_id = self.wal.begin_tx();
120
121        let ((_page_id, _slot_id), records) = self.collection.insert_raw(key, &encoded)?;
122
123        // Log all page writes to WAL
124        for rec in &records {
125            self.wal
126                .log_page_write(tx_id, rec.page_id, &rec.before, &rec.after)?;
127        }
128
129        self.wal.log_commit(tx_id)?;
130        self.maybe_checkpoint()?;
131        Ok(())
132    }
133
134    /// Retrieves a document by its UUID key.
135    ///
136    /// Returns `None` if the key does not exist.
137    /// Uses the buffer pool — repeated reads of the same page hit the cache.
138    pub fn get(&mut self, key: &Uuid) -> Result<Option<Value>> {
139        let Some(raw) = self.collection.get_raw(key)? else {
140            return Ok(None);
141        };
142        let doc = Document::decode(&raw)?;
143        Ok(Some(doc.value))
144    }
145
146    /// Updates an existing document.
147    ///
148    /// Returns `KeyNotFound` if the key does not exist.
149    pub fn update(&mut self, key: &Uuid, value: Value) -> Result<()> {
150        if self.collection.get_raw(key)?.is_none() {
151            return Err(GrumpyError::KeyNotFound(*key));
152        }
153        self.delete(key)?;
154        self.insert(*key, value)?;
155        Ok(())
156    }
157
158    /// Deletes a document by its UUID key.
159    ///
160    /// Returns `KeyNotFound` if the key does not exist.
161    pub fn delete(&mut self, key: &Uuid) -> Result<()> {
162        let tx_id = self.wal.begin_tx();
163
164        let records = self.collection.delete_raw(key)?;
165
166        for rec in &records {
167            self.wal
168                .log_page_write(tx_id, rec.page_id, &rec.before, &rec.after)?;
169        }
170
171        self.wal.log_commit(tx_id)?;
172        self.maybe_checkpoint()?;
173        Ok(())
174    }
175
176    /// Scans documents in a UUID key range.
177    ///
178    /// Returns all documents whose keys fall within the given range, sorted by key.
179    pub fn scan(&mut self, range: impl std::ops::RangeBounds<Uuid>) -> Result<Vec<(Uuid, Value)>> {
180        let raw_results = self.collection.scan_raw(range)?;
181        let mut results = Vec::with_capacity(raw_results.len());
182        for (key, raw) in raw_results {
183            let doc = Document::decode(&raw)?;
184            results.push((key, doc.value));
185        }
186        Ok(results)
187    }
188
189    /// Flushes all data to disk and writes a WAL checkpoint.
190    pub fn flush(&mut self) -> Result<()> {
191        self.collection.flush()?;
192        self.wal.log_checkpoint()?;
193        self.wal.truncate()?;
194        self.writes_since_checkpoint = 0;
195        Ok(())
196    }
197
198    /// Closes the database, flushing all pending data.
199    pub fn close(mut self) -> Result<()> {
200        self.flush()
201    }
202
203    /// Returns the number of documents in the database.
204    pub fn document_count(&self) -> u64 {
205        self.collection.document_count()
206    }
207
208    /// Compacts the database: defragments data pages and rebuilds the B+Tree index.
209    pub fn compact(&mut self) -> Result<CompactResult> {
210        let docs = self.collection.compact()?;
211
212        self.wal.log_checkpoint()?;
213        self.wal.truncate()?;
214        self.writes_since_checkpoint = 0;
215
216        Ok(CompactResult { documents: docs })
217    }
218
219    /// Returns buffer pool statistics: `(read_count, write_count, cached_count, capacity)`.
220    pub fn pool_stats(&self) -> (u64, u64, usize, usize) {
221        self.collection.pool_stats()
222    }
223
224    /// Periodic checkpoint: flush + truncate WAL every N writes.
225    fn maybe_checkpoint(&mut self) -> Result<()> {
226        self.writes_since_checkpoint += 1;
227        if self.writes_since_checkpoint >= CHECKPOINT_INTERVAL {
228            self.flush()?;
229        }
230        Ok(())
231    }
232
233    /// Migrates all documents from this v1 `GrumpyDb` into a v2 `Database` collection.
234    ///
235    /// Reads every document via `scan(..)` and inserts them into the target
236    /// database/collection. The original v1 data is not modified.
237    ///
238    /// # Arguments
239    ///
240    /// * `target` — The target `Database` to insert documents into
241    /// * `collection` — The collection name within that database
242    ///
243    /// # Returns
244    ///
245    /// The number of documents migrated.
246    pub fn migrate_to_database(
247        &mut self,
248        target: &mut crate::database::Database,
249        collection: &str,
250    ) -> Result<u64> {
251        // Ensure the target collection exists
252        if !target.list_collections().contains(&collection) {
253            target.create_collection(collection)?;
254        }
255
256        let all = self.scan(..)?;
257        let count = all.len() as u64;
258
259        for (key, value) in all {
260            target.insert(collection, key, value)?;
261        }
262
263        Ok(count)
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use std::collections::BTreeMap;
271    use tempfile::TempDir;
272
273    fn setup() -> (TempDir, GrumpyDb) {
274        let dir = TempDir::new().unwrap();
275        let db = GrumpyDb::open(dir.path().join("testdb").as_path()).unwrap();
276        (dir, db)
277    }
278
279    #[test]
280    fn test_open_creates_files() {
281        let dir = TempDir::new().unwrap();
282        let db_path = dir.path().join("testdb");
283        let db = GrumpyDb::open(&db_path).unwrap();
284        assert!(db_path.join("data.db").exists());
285        assert!(db_path.join("primary.idx").exists());
286        db.close().unwrap();
287    }
288
289    #[test]
290    fn test_insert_and_get() {
291        let (_dir, mut db) = setup();
292        let key = Uuid::new_v4();
293        db.insert(key, Value::String("hello".into())).unwrap();
294        let val = db.get(&key).unwrap();
295        assert_eq!(val, Some(Value::String("hello".into())));
296    }
297
298    #[test]
299    fn test_get_nonexistent() {
300        let (_dir, mut db) = setup();
301        let val = db.get(&Uuid::new_v4()).unwrap();
302        assert_eq!(val, None);
303    }
304
305    #[test]
306    fn test_insert_duplicate_key() {
307        let (_dir, mut db) = setup();
308        let key = Uuid::new_v4();
309        db.insert(key, Value::Integer(1)).unwrap();
310        let result = db.insert(key, Value::Integer(2));
311        assert!(matches!(result, Err(GrumpyError::DuplicateKey(_))));
312    }
313
314    #[test]
315    fn test_delete() {
316        let (_dir, mut db) = setup();
317        let key = Uuid::new_v4();
318        db.insert(key, Value::Integer(42)).unwrap();
319        db.delete(&key).unwrap();
320        assert_eq!(db.get(&key).unwrap(), None);
321    }
322
323    #[test]
324    fn test_delete_nonexistent() {
325        let (_dir, mut db) = setup();
326        let result = db.delete(&Uuid::new_v4());
327        assert!(matches!(result, Err(GrumpyError::KeyNotFound(_))));
328    }
329
330    #[test]
331    fn test_update() {
332        let (_dir, mut db) = setup();
333        let key = Uuid::new_v4();
334        db.insert(key, Value::Integer(1)).unwrap();
335        db.update(&key, Value::Integer(2)).unwrap();
336        assert_eq!(db.get(&key).unwrap(), Some(Value::Integer(2)));
337    }
338
339    #[test]
340    fn test_update_nonexistent() {
341        let (_dir, mut db) = setup();
342        let result = db.update(&Uuid::new_v4(), Value::Integer(1));
343        assert!(matches!(result, Err(GrumpyError::KeyNotFound(_))));
344    }
345
346    #[test]
347    fn test_insert_complex_document() {
348        let (_dir, mut db) = setup();
349        let key = Uuid::new_v4();
350        let value = Value::Object(BTreeMap::from([
351            ("name".into(), Value::String("GrumpyDB".into())),
352            ("version".into(), Value::Integer(1)),
353            (
354                "tags".into(),
355                Value::Array(vec![
356                    Value::String("db".into()),
357                    Value::String("rust".into()),
358                ]),
359            ),
360        ]));
361        db.insert(key, value.clone()).unwrap();
362        assert_eq!(db.get(&key).unwrap(), Some(value));
363    }
364
365    #[test]
366    fn test_crud_lifecycle() {
367        let (_dir, mut db) = setup();
368        let key = Uuid::new_v4();
369
370        // Create
371        db.insert(key, Value::String("v1".into())).unwrap();
372        assert_eq!(db.get(&key).unwrap(), Some(Value::String("v1".into())));
373
374        // Update
375        db.update(&key, Value::String("v2".into())).unwrap();
376        assert_eq!(db.get(&key).unwrap(), Some(Value::String("v2".into())));
377
378        // Delete
379        db.delete(&key).unwrap();
380        assert_eq!(db.get(&key).unwrap(), None);
381    }
382
383    #[test]
384    fn test_multiple_inserts() {
385        let (_dir, mut db) = setup();
386        let mut keys = Vec::new();
387        for i in 0..100 {
388            let key = Uuid::from_u128(i);
389            db.insert(key, Value::Integer(i as i64)).unwrap();
390            keys.push(key);
391        }
392        for (i, key) in keys.iter().enumerate() {
393            assert_eq!(db.get(key).unwrap(), Some(Value::Integer(i as i64)));
394        }
395    }
396
397    #[test]
398    fn test_persistence_across_reopen() {
399        let dir = TempDir::new().unwrap();
400        let db_path = dir.path().join("testdb");
401        let key = Uuid::from_u128(42);
402
403        {
404            let mut db = GrumpyDb::open(&db_path).unwrap();
405            db.insert(key, Value::String("persistent".into())).unwrap();
406            db.close().unwrap();
407        }
408
409        {
410            let mut db = GrumpyDb::open(&db_path).unwrap();
411            let val = db.get(&key).unwrap();
412            assert_eq!(val, Some(Value::String("persistent".into())));
413        }
414    }
415
416    #[test]
417    fn test_scan_range() {
418        let (_dir, mut db) = setup();
419        for i in 0u128..20 {
420            db.insert(Uuid::from_u128(i), Value::Integer(i as i64))
421                .unwrap();
422        }
423
424        let start = Uuid::from_u128(5);
425        let end = Uuid::from_u128(10);
426        let results = db.scan(start..end).unwrap();
427
428        assert_eq!(results.len(), 5);
429        for (key, val) in &results {
430            let i = key.as_u128();
431            assert!((5..10).contains(&i));
432            assert_eq!(*val, Value::Integer(i as i64));
433        }
434    }
435
436    #[test]
437    fn test_scan_all() {
438        let (_dir, mut db) = setup();
439        for i in 0u128..10 {
440            db.insert(Uuid::from_u128(i), Value::Integer(i as i64))
441                .unwrap();
442        }
443
444        let results = db.scan(..).unwrap();
445        assert_eq!(results.len(), 10);
446
447        // Verify sorted order
448        for i in 1..results.len() {
449            assert!(results[i - 1].0 < results[i].0);
450        }
451    }
452
453    #[test]
454    fn test_overflow_document() {
455        let (_dir, mut db) = setup();
456        let key = Uuid::new_v4();
457        // Create a large document that will require overflow pages
458        let large_string = "x".repeat(10_000);
459        let value = Value::String(large_string.clone());
460        db.insert(key, value).unwrap();
461
462        let retrieved = db.get(&key).unwrap().unwrap();
463        assert_eq!(retrieved, Value::String(large_string));
464    }
465
466    #[test]
467    fn test_delete_overflow_document() {
468        let (_dir, mut db) = setup();
469        let key = Uuid::new_v4();
470        let value = Value::String("x".repeat(10_000));
471        db.insert(key, value).unwrap();
472        db.delete(&key).unwrap();
473        assert_eq!(db.get(&key).unwrap(), None);
474    }
475
476    #[test]
477    fn test_buffer_pool_cache_hits() {
478        let dir = TempDir::new().unwrap();
479        // Small pool (4 frames) to exercise caching
480        let mut db =
481            GrumpyDb::open_with_pool_capacity(dir.path().join("testdb").as_path(), 4).unwrap();
482
483        // Insert 10 documents — they'll share the current data page (cache hit)
484        let mut keys = Vec::new();
485        for i in 0u128..10 {
486            let key = Uuid::from_u128(i);
487            db.insert(key, Value::Integer(i as i64)).unwrap();
488            keys.push(key);
489        }
490
491        let (reads_before, _, _, _) = db.pool_stats();
492
493        // Re-read all 10 — the data page should be cached (0 or minimal reads)
494        for key in &keys {
495            assert!(db.get(key).unwrap().is_some());
496        }
497
498        let (reads_after, _, cached, capacity) = db.pool_stats();
499        // With a pool, most reads should come from cache
500        assert!(cached <= capacity);
501        // There should be far fewer disk reads than total get() calls
502        assert!(
503            reads_after - reads_before <= 2,
504            "expected mostly cache hits, got {} disk reads",
505            reads_after - reads_before
506        );
507    }
508
509    #[test]
510    fn test_buffer_pool_flush_persists() {
511        let dir = TempDir::new().unwrap();
512        let db_path = dir.path().join("testdb");
513        let key = Uuid::from_u128(99);
514
515        {
516            let mut db = GrumpyDb::open_with_pool_capacity(&db_path, 8).unwrap();
517            db.insert(key, Value::String("cached".into())).unwrap();
518            db.close().unwrap();
519        }
520
521        {
522            let mut db = GrumpyDb::open_with_pool_capacity(&db_path, 8).unwrap();
523            let val = db.get(&key).unwrap();
524            assert_eq!(val, Some(Value::String("cached".into())));
525        }
526    }
527
528    #[test]
529    fn test_pool_stats() {
530        let (_dir, db) = setup();
531        let (reads, writes, cached, capacity) = db.pool_stats();
532        assert_eq!(reads, 0);
533        assert_eq!(writes, 0);
534        assert!(cached <= capacity);
535        assert_eq!(capacity, DEFAULT_POOL_CAPACITY);
536    }
537
538    #[test]
539    fn test_compact_after_deletes() {
540        let (_dir, mut db) = setup();
541
542        // Insert 200 documents
543        let mut keys = Vec::new();
544        for i in 0u128..200 {
545            let key = Uuid::from_u128(i);
546            db.insert(key, Value::Integer(i as i64)).unwrap();
547            keys.push(key);
548        }
549        assert_eq!(db.document_count(), 200);
550
551        // Delete 100 of them
552        for key in &keys[..100] {
553            db.delete(key).unwrap();
554        }
555        assert_eq!(db.document_count(), 100);
556
557        // Compact
558        let result = db.compact().unwrap();
559        assert_eq!(result.documents, 100);
560        assert_eq!(db.document_count(), 100);
561
562        // Verify surviving documents
563        for key in &keys[100..] {
564            let val = db.get(key).unwrap();
565            assert!(val.is_some(), "key should survive compaction");
566        }
567
568        // Verify deleted documents stay deleted
569        for key in &keys[..100] {
570            assert_eq!(db.get(key).unwrap(), None);
571        }
572    }
573
574    #[test]
575    fn test_compact_with_overflow() {
576        let (_dir, mut db) = setup();
577
578        let key1 = Uuid::from_u128(1);
579        let key2 = Uuid::from_u128(2);
580
581        db.insert(key1, Value::String("x".repeat(10_000))).unwrap();
582        db.insert(key2, Value::Integer(42)).unwrap();
583        db.delete(&key2).unwrap();
584
585        let result = db.compact().unwrap();
586        assert_eq!(result.documents, 1);
587
588        let val = db.get(&key1).unwrap().unwrap();
589        assert_eq!(val, Value::String("x".repeat(10_000)));
590    }
591
592    #[test]
593    fn test_compact_empty_db() {
594        let (_dir, mut db) = setup();
595        let result = db.compact().unwrap();
596        assert_eq!(result.documents, 0);
597    }
598
599    #[test]
600    fn test_document_count() {
601        let (_dir, mut db) = setup();
602        assert_eq!(db.document_count(), 0);
603        let key = Uuid::new_v4();
604        db.insert(key, Value::Integer(1)).unwrap();
605        assert_eq!(db.document_count(), 1);
606        db.delete(&key).unwrap();
607        assert_eq!(db.document_count(), 0);
608    }
609
610    #[test]
611    fn test_migrate_to_database() {
612        let dir = TempDir::new().unwrap();
613        let v1_path = dir.path().join("v1");
614        let v2_path = dir.path().join("v2");
615
616        // Create v1 database with some docs
617        let mut v1 = GrumpyDb::open(&v1_path).unwrap();
618        for i in 0u128..100 {
619            v1.insert(
620                Uuid::from_u128(i),
621                Value::Object(BTreeMap::from([
622                    ("name".into(), Value::String(format!("doc_{i}"))),
623                    ("idx".into(), Value::Integer(i as i64)),
624                ])),
625            )
626            .unwrap();
627        }
628        assert_eq!(v1.document_count(), 100);
629
630        // Migrate to v2 database
631        let mut v2 = crate::database::Database::open(&v2_path).unwrap();
632        let migrated = v1.migrate_to_database(&mut v2, "imported").unwrap();
633        assert_eq!(migrated, 100);
634
635        // Verify all docs are in v2
636        assert_eq!(v2.document_count("imported").unwrap(), 100);
637        let val = v2.get("imported", &Uuid::from_u128(42)).unwrap().unwrap();
638        assert_eq!(
639            val.as_object().unwrap().get("name"),
640            Some(&Value::String("doc_42".into()))
641        );
642
643        v1.close().unwrap();
644        v2.close().unwrap();
645    }
646}