Skip to main content

reddb_server/storage/engine/
database.rs

1//! RedDB Database Engine
2//!
3//! The main entry point for the RedDB storage engine. Integrates all components:
4//! - Pager for page I/O
5//! - WAL for durability
6//! - Transactions for ACID properties
7//! - Checkpointing for WAL management
8//! - B-tree for indexing
9//!
10//! # Usage
11//!
12//! ```rust,ignore
13//! use reddb::storage::engine::Database;
14//!
15//! // Open or create a database
16//! let db = Database::open("mydata.rdb")?;
17//!
18//! // Begin a transaction
19//! let tx = db.begin()?;
20//!
21//! // Perform operations
22//! tx.put(b"key", b"value")?;
23//!
24//! // Commit
25//! tx.commit()?;
26//!
27//! // Close (or let it drop)
28//! db.close()?;
29//! ```
30//!
31//! # File Layout
32//!
33//! ```text
34//! mydata.rdb     - Main database file (pages)
35//! mydata.rdb-wal - Write-ahead log
36//! ```
37//!
38//! # References
39//!
40//! - Turso `core/database.rs` - Database lifecycle
41//! - SQLite architecture documentation
42
43use std::io;
44use std::path::{Path, PathBuf};
45use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47use super::{Page, PageType, Pager, PagerConfig};
48use crate::storage::wal::{
49    CheckpointError, CheckpointMode, CheckpointResult, Checkpointer, Transaction,
50    TransactionManager, TxError,
51};
52
53/// Database configuration
54#[derive(Debug, Clone)]
55pub struct DatabaseConfig {
56    /// Page cache size (number of pages)
57    pub cache_size: usize,
58    /// Whether to open read-only
59    pub read_only: bool,
60    /// Whether to create if not exists
61    pub create: bool,
62    /// Checkpoint mode
63    pub checkpoint_mode: CheckpointMode,
64    /// Auto-checkpoint threshold (pages)
65    /// Set to 0 to disable auto-checkpoint
66    pub auto_checkpoint_threshold: u32,
67    /// Whether to verify checksums on read
68    pub verify_checksums: bool,
69}
70
71impl Default for DatabaseConfig {
72    fn default() -> Self {
73        Self {
74            cache_size: 10_000,
75            read_only: false,
76            create: true,
77            checkpoint_mode: CheckpointMode::Full,
78            auto_checkpoint_threshold: 1000,
79            verify_checksums: true,
80        }
81    }
82}
83
84/// Database error types
85#[derive(Debug)]
86pub enum DatabaseError {
87    /// I/O error
88    Io(io::Error),
89    /// Pager error
90    Pager(String),
91    /// Internal lock was poisoned by a panic
92    LockPoisoned(&'static str),
93    /// Transaction error
94    Transaction(TxError),
95    /// Checkpoint error
96    Checkpoint(CheckpointError),
97    /// Database is read-only
98    ReadOnly,
99    /// Database is closed
100    Closed,
101}
102
103impl std::fmt::Display for DatabaseError {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        match self {
106            Self::Io(e) => write!(f, "I/O error: {}", e),
107            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
108            Self::LockPoisoned(name) => write!(f, "Lock poisoned: {}", name),
109            Self::Transaction(e) => write!(f, "Transaction error: {}", e),
110            Self::Checkpoint(e) => write!(f, "Checkpoint error: {}", e),
111            Self::ReadOnly => write!(f, "Database is read-only"),
112            Self::Closed => write!(f, "Database is closed"),
113        }
114    }
115}
116
117impl std::error::Error for DatabaseError {}
118
119impl From<io::Error> for DatabaseError {
120    fn from(e: io::Error) -> Self {
121        Self::Io(e)
122    }
123}
124
125impl From<TxError> for DatabaseError {
126    fn from(e: TxError) -> Self {
127        Self::Transaction(e)
128    }
129}
130
131impl From<CheckpointError> for DatabaseError {
132    fn from(e: CheckpointError) -> Self {
133        Self::Checkpoint(e)
134    }
135}
136
137/// Database state
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139enum DbState {
140    Open,
141    Closed,
142}
143
144/// RedDB Database Engine
145///
146/// The main entry point for database operations. Thread-safe.
147pub struct Database {
148    /// Database file path
149    path: PathBuf,
150    /// WAL file path
151    wal_path: PathBuf,
152    /// Pager (shared)
153    pager: Arc<Pager>,
154    /// Transaction manager (shared)
155    tx_manager: Arc<TransactionManager>,
156    /// Configuration
157    config: DatabaseConfig,
158    /// Database state
159    state: RwLock<DbState>,
160    /// Pages written since last checkpoint
161    pages_since_checkpoint: RwLock<u32>,
162    /// Background writer handle (P6.T1). `None` when the database
163    /// runs in read-only mode or the spawn failed. Dropping the
164    /// handle signals the writer thread to exit at the next round.
165    #[allow(dead_code)]
166    bgwriter: Option<crate::storage::cache::bgwriter::BgWriterHandle>,
167}
168
169impl Database {
170    fn state_read(&self) -> Result<RwLockReadGuard<'_, DbState>, DatabaseError> {
171        self.state
172            .read()
173            .map_err(|_| DatabaseError::LockPoisoned("database state"))
174    }
175
176    fn state_write(&self) -> Result<RwLockWriteGuard<'_, DbState>, DatabaseError> {
177        self.state
178            .write()
179            .map_err(|_| DatabaseError::LockPoisoned("database state"))
180    }
181
182    fn pages_since_checkpoint_read(&self) -> Result<RwLockReadGuard<'_, u32>, DatabaseError> {
183        self.pages_since_checkpoint
184            .read()
185            .map_err(|_| DatabaseError::LockPoisoned("pages since checkpoint"))
186    }
187
188    fn pages_since_checkpoint_write(&self) -> Result<RwLockWriteGuard<'_, u32>, DatabaseError> {
189        self.pages_since_checkpoint
190            .write()
191            .map_err(|_| DatabaseError::LockPoisoned("pages since checkpoint"))
192    }
193
194    /// Open or create a database
195    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DatabaseError> {
196        Self::open_with_config(path, DatabaseConfig::default())
197    }
198
199    /// Background-writer stats snapshot — `None` when the writer
200    /// isn't running (read-only mode, or spawn skipped). Exposed for
201    /// tests + introspection.
202    pub fn bgwriter_stats(&self) -> Option<crate::storage::cache::bgwriter::BgWriterStatsSnapshot> {
203        self.bgwriter.as_ref().map(|h| h.stats.snapshot())
204    }
205
206    /// Open or create a database with custom configuration
207    pub fn open_with_config<P: AsRef<Path>>(
208        path: P,
209        config: DatabaseConfig,
210    ) -> Result<Self, DatabaseError> {
211        let path = path.as_ref().to_path_buf();
212        let wal_path = path.with_extension("rdb-wal");
213
214        // Create pager config
215        let pager_config = PagerConfig {
216            cache_size: config.cache_size,
217            read_only: config.read_only,
218            create: config.create,
219            verify_checksums: config.verify_checksums,
220            double_write: true,
221            encryption: None,
222        };
223
224        // Open pager
225        let pager =
226            Pager::open(&path, pager_config).map_err(|e| DatabaseError::Pager(e.to_string()))?;
227        let pager = Arc::new(pager);
228
229        // Perform crash recovery if WAL exists
230        if wal_path.exists() && !config.read_only {
231            let recovery_result = Checkpointer::recover(&pager, &wal_path)?;
232            if recovery_result.pages_checkpointed > 0 {
233                tracing::info!(
234                    transactions = recovery_result.transactions_processed,
235                    pages = recovery_result.pages_checkpointed,
236                    "WAL recovery applied"
237                );
238            }
239        }
240
241        // Create transaction manager
242        let tx_manager = Arc::new(
243            TransactionManager::new(Arc::clone(&pager), &wal_path).map_err(DatabaseError::Io)?,
244        );
245
246        // P6.T1 — bgwriter wiring is gated behind `REDDB_BGWRITER=1`
247        // for now. The current `PagerDirtyFlusher::flush_some` calls
248        // `write_page` directly without enforcing WAL-first ordering;
249        // bench `update_single` triggered "B-tree insert error: Pager
250        // error: I/O error: failed to fill whole buffer" when the
251        // background flusher raced with the foreground commit on the
252        // same dirty page. Off by default until the flusher is
253        // taught to respect the per-page LSN gate the commit path
254        // uses (max_lsn → wal.flush(max_lsn) → write_page).
255        let bgwriter = if config.read_only
256            || !matches!(
257                std::env::var("REDDB_BGWRITER").ok().as_deref(),
258                Some("1") | Some("true") | Some("on")
259            ) {
260            None
261        } else {
262            let flusher = std::sync::Arc::new(
263                crate::storage::cache::bgwriter::PagerDirtyFlusher::new(Arc::downgrade(&pager)),
264            );
265            Some(crate::storage::cache::bgwriter::spawn(
266                flusher,
267                crate::storage::cache::bgwriter::BgWriterConfig::default(),
268            ))
269        };
270
271        Ok(Self {
272            path,
273            wal_path,
274            pager,
275            tx_manager,
276            config,
277            state: RwLock::new(DbState::Open),
278            pages_since_checkpoint: RwLock::new(0),
279            bgwriter,
280        })
281    }
282
283    /// Check if database is open
284    fn check_open(&self) -> Result<(), DatabaseError> {
285        if *self.state_read()? == DbState::Closed {
286            return Err(DatabaseError::Closed);
287        }
288        Ok(())
289    }
290
291    /// Begin a new transaction
292    pub fn begin(&self) -> Result<Transaction, DatabaseError> {
293        self.check_open()?;
294        Ok(self.tx_manager.begin()?)
295    }
296
297    /// Get a reference to the pager (for advanced operations)
298    pub fn pager(&self) -> &Arc<Pager> {
299        &self.pager
300    }
301
302    /// Get a reference to the transaction manager
303    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
304        &self.tx_manager
305    }
306
307    /// Allocate a new page
308    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, DatabaseError> {
309        self.check_open()?;
310        if self.config.read_only {
311            return Err(DatabaseError::ReadOnly);
312        }
313        self.pager
314            .allocate_page(page_type)
315            .map_err(|e| DatabaseError::Pager(e.to_string()))
316    }
317
318    /// Read a page
319    pub fn read_page(&self, page_id: u32) -> Result<Page, DatabaseError> {
320        self.check_open()?;
321        self.pager
322            .read_page(page_id)
323            .map_err(|e| DatabaseError::Pager(e.to_string()))
324    }
325
326    /// Perform a checkpoint
327    pub fn checkpoint(&self) -> Result<CheckpointResult, DatabaseError> {
328        self.check_open()?;
329        if self.config.read_only {
330            return Err(DatabaseError::ReadOnly);
331        }
332
333        let checkpointer = Checkpointer::new(self.config.checkpoint_mode);
334        let result = checkpointer.checkpoint(&self.pager, &self.wal_path)?;
335
336        // Reset counter
337        *self.pages_since_checkpoint_write()? = 0;
338
339        Ok(result)
340    }
341
342    /// Check if auto-checkpoint is needed and perform it
343    pub fn maybe_auto_checkpoint(&self) -> Result<Option<CheckpointResult>, DatabaseError> {
344        if self.config.auto_checkpoint_threshold == 0 {
345            return Ok(None);
346        }
347
348        let pages = *self.pages_since_checkpoint_read()?;
349        if pages >= self.config.auto_checkpoint_threshold {
350            Ok(Some(self.checkpoint()?))
351        } else {
352            Ok(None)
353        }
354    }
355
356    /// Increment pages-since-checkpoint counter
357    pub fn increment_page_count(&self, count: u32) {
358        let mut pages = self
359            .pages_since_checkpoint
360            .write()
361            .unwrap_or_else(|poisoned| poisoned.into_inner());
362        *pages = pages.saturating_add(count);
363    }
364
365    /// Sync all data to disk
366    pub fn sync(&self) -> Result<(), DatabaseError> {
367        self.check_open()?;
368        self.pager
369            .sync()
370            .map_err(|e| DatabaseError::Pager(e.to_string()))?;
371        self.tx_manager.sync_wal()?;
372        Ok(())
373    }
374
375    /// Close the database
376    ///
377    /// Performs a final checkpoint and syncs all data to disk.
378    pub fn close(self) -> Result<(), DatabaseError> {
379        // Mark as closed
380        *self.state_write()? = DbState::Closed;
381
382        // Wait for active transactions to complete
383        if self.tx_manager.has_active_transactions() {
384            tracing::warn!("closing database with active transactions");
385        }
386
387        // Final checkpoint if not read-only
388        if !self.config.read_only {
389            let checkpointer = Checkpointer::new(CheckpointMode::Truncate);
390            let _ = checkpointer.checkpoint(&self.pager, &self.wal_path);
391        }
392
393        // Sync pager
394        let _ = self.pager.sync();
395
396        Ok(())
397    }
398
399    /// Get database file path
400    pub fn path(&self) -> &Path {
401        &self.path
402    }
403
404    /// Get WAL file path
405    pub fn wal_path(&self) -> &Path {
406        &self.wal_path
407    }
408
409    /// Check if database is read-only
410    pub fn is_read_only(&self) -> bool {
411        self.config.read_only
412    }
413
414    /// Get page count
415    pub fn page_count(&self) -> u32 {
416        self.pager.page_count().unwrap_or(0)
417    }
418
419    /// Get database file size
420    pub fn file_size(&self) -> Result<u64, DatabaseError> {
421        self.pager
422            .file_size()
423            .map_err(|e| DatabaseError::Pager(e.to_string()))
424    }
425
426    /// Get cache statistics
427    pub fn cache_stats(&self) -> super::page_cache::CacheStats {
428        self.pager.cache_stats()
429    }
430}
431
432impl Drop for Database {
433    fn drop(&mut self) {
434        // Try to sync on drop
435        let state = self
436            .state
437            .read()
438            .unwrap_or_else(|poisoned| poisoned.into_inner());
439        if *state == DbState::Open {
440            drop(state);
441            let mut state = self
442                .state
443                .write()
444                .unwrap_or_else(|poisoned| poisoned.into_inner());
445            *state = DbState::Closed;
446            drop(state);
447
448            // Best-effort checkpoint and sync
449            if !self.config.read_only {
450                let checkpointer = Checkpointer::new(CheckpointMode::Full);
451                let _ = checkpointer.checkpoint(&self.pager, &self.wal_path);
452            }
453            let _ = self.pager.sync();
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use std::fs;
462    use std::time::{SystemTime, UNIX_EPOCH};
463
464    fn temp_db_path() -> PathBuf {
465        let timestamp = SystemTime::now()
466            .duration_since(UNIX_EPOCH)
467            .unwrap()
468            .as_nanos();
469        std::env::temp_dir().join(format!("reddb_engine_test_{}.rdb", timestamp))
470    }
471
472    fn cleanup(path: &Path) {
473        let _ = fs::remove_file(path);
474        let wal_path = path.with_extension("rdb-wal");
475        let _ = fs::remove_file(wal_path);
476    }
477
478    #[test]
479    fn test_database_open_create() {
480        let path = temp_db_path();
481        cleanup(&path);
482
483        {
484            let db = Database::open(&path).unwrap();
485            assert!(!db.is_read_only());
486            assert_eq!(db.page_count(), 3); // Header + reserved pages
487        }
488
489        // Should be able to reopen
490        {
491            let db = Database::open(&path).unwrap();
492            assert_eq!(db.page_count(), 3);
493        }
494
495        cleanup(&path);
496    }
497
498    #[test]
499    fn test_database_transaction() {
500        let path = temp_db_path();
501        cleanup(&path);
502
503        {
504            let db = Database::open(&path).unwrap();
505
506            // Allocate a page
507            let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
508            let page_id = page.page_id();
509
510            // Begin transaction
511            let mut tx = db.begin().unwrap();
512
513            // Write through transaction
514            let mut page = Page::new(PageType::BTreeLeaf, page_id);
515            page.as_bytes_mut()[100] = 0xAB;
516            tx.write_page(page_id, page).unwrap();
517
518            // Commit
519            tx.commit().unwrap();
520
521            // Verify
522            let read_page = db.read_page(page_id).unwrap();
523            assert_eq!(read_page.as_bytes()[100], 0xAB);
524        }
525
526        cleanup(&path);
527    }
528
529    #[test]
530    fn test_database_crash_recovery() {
531        let path = temp_db_path();
532        cleanup(&path);
533
534        let page_id;
535
536        // First session: write data but don't checkpoint
537        {
538            let db = Database::open(&path).unwrap();
539
540            // Allocate a page
541            let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
542            page_id = page.page_id();
543
544            // Write through transaction
545            let mut tx = db.begin().unwrap();
546            let mut page = Page::new(PageType::BTreeLeaf, page_id);
547            page.as_bytes_mut()[100] = 0xCD;
548            tx.write_page(page_id, page).unwrap();
549            tx.commit().unwrap();
550
551            // Sync WAL but don't checkpoint
552            db.sync().unwrap();
553
554            // Drop without calling close (simulate crash)
555        }
556
557        // Second session: should recover from WAL
558        {
559            let db = Database::open(&path).unwrap();
560
561            // Data should be recovered
562            let read_page = db.read_page(page_id).unwrap();
563            assert_eq!(read_page.as_bytes()[100], 0xCD);
564        }
565
566        cleanup(&path);
567    }
568
569    #[test]
570    fn test_database_checkpoint() {
571        let path = temp_db_path();
572        cleanup(&path);
573
574        {
575            let db = Database::open(&path).unwrap();
576
577            // Allocate pages
578            let page1 = db.allocate_page(PageType::BTreeLeaf).unwrap();
579            let page2 = db.allocate_page(PageType::BTreeLeaf).unwrap();
580
581            // Write through transactions
582            let mut tx1 = db.begin().unwrap();
583            let mut p1 = Page::new(PageType::BTreeLeaf, page1.page_id());
584            p1.as_bytes_mut()[100] = 0x11;
585            tx1.write_page(page1.page_id(), p1).unwrap();
586            tx1.commit().unwrap();
587
588            let mut tx2 = db.begin().unwrap();
589            let mut p2 = Page::new(PageType::BTreeLeaf, page2.page_id());
590            p2.as_bytes_mut()[100] = 0x22;
591            tx2.write_page(page2.page_id(), p2).unwrap();
592            tx2.commit().unwrap();
593
594            // Checkpoint
595            let result = db.checkpoint().unwrap();
596            assert_eq!(result.transactions_processed, 2);
597            assert!(result.pages_checkpointed >= 2);
598
599            // Close properly
600            db.close().unwrap();
601        }
602
603        // Reopen and verify
604        {
605            let db = Database::open(&path).unwrap();
606            // Pages should still be there
607            assert!(db.page_count() >= 3); // header + 2 data pages
608        }
609
610        cleanup(&path);
611    }
612
613    #[test]
614    fn test_database_read_only() {
615        let path = temp_db_path();
616        cleanup(&path);
617
618        // Create database first
619        {
620            let db = Database::open(&path).unwrap();
621            let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
622            db.close().unwrap();
623        }
624
625        // Open read-only
626        {
627            let config = DatabaseConfig {
628                read_only: true,
629                ..Default::default()
630            };
631            let db = Database::open_with_config(&path, config).unwrap();
632            assert!(db.is_read_only());
633
634            // Should not be able to allocate
635            assert!(db.allocate_page(PageType::BTreeLeaf).is_err());
636        }
637
638        cleanup(&path);
639    }
640
641    #[test]
642    fn test_database_multiple_transactions() {
643        let path = temp_db_path();
644        cleanup(&path);
645
646        {
647            let db = Database::open(&path).unwrap();
648
649            // Allocate pages
650            let page1 = db.allocate_page(PageType::BTreeLeaf).unwrap();
651            let page2 = db.allocate_page(PageType::BTreeLeaf).unwrap();
652
653            // Multiple concurrent transactions (interleaved)
654            let mut tx1 = db.begin().unwrap();
655            let mut tx2 = db.begin().unwrap();
656
657            // tx1 writes to page1
658            let mut p1 = Page::new(PageType::BTreeLeaf, page1.page_id());
659            p1.as_bytes_mut()[100] = 0x11;
660            tx1.write_page(page1.page_id(), p1).unwrap();
661
662            // tx2 writes to page2
663            let mut p2 = Page::new(PageType::BTreeLeaf, page2.page_id());
664            p2.as_bytes_mut()[100] = 0x22;
665            tx2.write_page(page2.page_id(), p2).unwrap();
666
667            // Commit both
668            tx1.commit().unwrap();
669            tx2.commit().unwrap();
670
671            // Verify
672            let r1 = db.read_page(page1.page_id()).unwrap();
673            let r2 = db.read_page(page2.page_id()).unwrap();
674            assert_eq!(r1.as_bytes()[100], 0x11);
675            assert_eq!(r2.as_bytes()[100], 0x22);
676        }
677
678        cleanup(&path);
679    }
680
681    #[test]
682    fn test_begin_returns_structured_error_when_state_lock_is_poisoned() {
683        let path = temp_db_path();
684        cleanup(&path);
685
686        {
687            let db = Arc::new(Database::open(&path).unwrap());
688            let poison_target = Arc::clone(&db);
689            let _ = std::thread::spawn(move || {
690                let _guard = poison_target
691                    .state
692                    .write()
693                    .expect("state lock should be acquired");
694                panic!("poison database state");
695            })
696            .join();
697
698            match db.begin() {
699                Ok(_) => panic!("begin should fail after state lock poisoning"),
700                Err(err) => {
701                    assert!(matches!(err, DatabaseError::LockPoisoned("database state")))
702                }
703            }
704        }
705
706        cleanup(&path);
707    }
708}