reddb_server/storage/engine/
database.rs1use std::io;
44use std::path::{Path, PathBuf};
45use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
46
47use super::{Page, PageType, Pager, PagerConfig};
48use crate::storage::wal::{
49 CheckpointError, CheckpointMode, CheckpointResult, Checkpointer, Transaction,
50 TransactionManager, TxError,
51};
52
53#[derive(Debug, Clone)]
55pub struct DatabaseConfig {
56 pub cache_size: usize,
58 pub read_only: bool,
60 pub create: bool,
62 pub checkpoint_mode: CheckpointMode,
64 pub auto_checkpoint_threshold: u32,
67 pub verify_checksums: bool,
69}
70
71impl Default for DatabaseConfig {
72 fn default() -> Self {
73 Self {
74 cache_size: 10_000,
75 read_only: false,
76 create: true,
77 checkpoint_mode: CheckpointMode::Full,
78 auto_checkpoint_threshold: 1000,
79 verify_checksums: true,
80 }
81 }
82}
83
84#[derive(Debug)]
86pub enum DatabaseError {
87 Io(io::Error),
89 Pager(String),
91 LockPoisoned(&'static str),
93 Transaction(TxError),
95 Checkpoint(CheckpointError),
97 ReadOnly,
99 Closed,
101}
102
103impl std::fmt::Display for DatabaseError {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 match self {
106 Self::Io(e) => write!(f, "I/O error: {}", e),
107 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
108 Self::LockPoisoned(name) => write!(f, "Lock poisoned: {}", name),
109 Self::Transaction(e) => write!(f, "Transaction error: {}", e),
110 Self::Checkpoint(e) => write!(f, "Checkpoint error: {}", e),
111 Self::ReadOnly => write!(f, "Database is read-only"),
112 Self::Closed => write!(f, "Database is closed"),
113 }
114 }
115}
116
117impl std::error::Error for DatabaseError {}
118
119impl From<io::Error> for DatabaseError {
120 fn from(e: io::Error) -> Self {
121 Self::Io(e)
122 }
123}
124
125impl From<TxError> for DatabaseError {
126 fn from(e: TxError) -> Self {
127 Self::Transaction(e)
128 }
129}
130
131impl From<CheckpointError> for DatabaseError {
132 fn from(e: CheckpointError) -> Self {
133 Self::Checkpoint(e)
134 }
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139enum DbState {
140 Open,
141 Closed,
142}
143
144pub struct Database {
148 path: PathBuf,
150 wal_path: PathBuf,
152 pager: Arc<Pager>,
154 tx_manager: Arc<TransactionManager>,
156 config: DatabaseConfig,
158 state: RwLock<DbState>,
160 pages_since_checkpoint: RwLock<u32>,
162 #[allow(dead_code)]
166 bgwriter: Option<crate::storage::cache::bgwriter::BgWriterHandle>,
167}
168
169impl Database {
170 fn state_read(&self) -> Result<RwLockReadGuard<'_, DbState>, DatabaseError> {
171 self.state
172 .read()
173 .map_err(|_| DatabaseError::LockPoisoned("database state"))
174 }
175
176 fn state_write(&self) -> Result<RwLockWriteGuard<'_, DbState>, DatabaseError> {
177 self.state
178 .write()
179 .map_err(|_| DatabaseError::LockPoisoned("database state"))
180 }
181
182 fn pages_since_checkpoint_read(&self) -> Result<RwLockReadGuard<'_, u32>, DatabaseError> {
183 self.pages_since_checkpoint
184 .read()
185 .map_err(|_| DatabaseError::LockPoisoned("pages since checkpoint"))
186 }
187
188 fn pages_since_checkpoint_write(&self) -> Result<RwLockWriteGuard<'_, u32>, DatabaseError> {
189 self.pages_since_checkpoint
190 .write()
191 .map_err(|_| DatabaseError::LockPoisoned("pages since checkpoint"))
192 }
193
194 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DatabaseError> {
196 Self::open_with_config(path, DatabaseConfig::default())
197 }
198
199 pub fn bgwriter_stats(&self) -> Option<crate::storage::cache::bgwriter::BgWriterStatsSnapshot> {
203 self.bgwriter.as_ref().map(|h| h.stats.snapshot())
204 }
205
206 pub fn open_with_config<P: AsRef<Path>>(
208 path: P,
209 config: DatabaseConfig,
210 ) -> Result<Self, DatabaseError> {
211 let path = path.as_ref().to_path_buf();
212 let wal_path = path.with_extension("rdb-wal");
213
214 let pager_config = PagerConfig {
216 cache_size: config.cache_size,
217 read_only: config.read_only,
218 create: config.create,
219 verify_checksums: config.verify_checksums,
220 double_write: true,
221 encryption: None,
222 };
223
224 let pager =
226 Pager::open(&path, pager_config).map_err(|e| DatabaseError::Pager(e.to_string()))?;
227 let pager = Arc::new(pager);
228
229 if wal_path.exists() && !config.read_only {
231 let recovery_result = Checkpointer::recover(&pager, &wal_path)?;
232 if recovery_result.pages_checkpointed > 0 {
233 tracing::info!(
234 transactions = recovery_result.transactions_processed,
235 pages = recovery_result.pages_checkpointed,
236 "WAL recovery applied"
237 );
238 }
239 }
240
241 let tx_manager = Arc::new(
243 TransactionManager::new(Arc::clone(&pager), &wal_path).map_err(DatabaseError::Io)?,
244 );
245
246 let bgwriter = if config.read_only
256 || !matches!(
257 std::env::var("REDDB_BGWRITER").ok().as_deref(),
258 Some("1") | Some("true") | Some("on")
259 ) {
260 None
261 } else {
262 let flusher = std::sync::Arc::new(
263 crate::storage::cache::bgwriter::PagerDirtyFlusher::new(Arc::downgrade(&pager)),
264 );
265 Some(crate::storage::cache::bgwriter::spawn(
266 flusher,
267 crate::storage::cache::bgwriter::BgWriterConfig::default(),
268 ))
269 };
270
271 Ok(Self {
272 path,
273 wal_path,
274 pager,
275 tx_manager,
276 config,
277 state: RwLock::new(DbState::Open),
278 pages_since_checkpoint: RwLock::new(0),
279 bgwriter,
280 })
281 }
282
283 fn check_open(&self) -> Result<(), DatabaseError> {
285 if *self.state_read()? == DbState::Closed {
286 return Err(DatabaseError::Closed);
287 }
288 Ok(())
289 }
290
291 pub fn begin(&self) -> Result<Transaction, DatabaseError> {
293 self.check_open()?;
294 Ok(self.tx_manager.begin()?)
295 }
296
297 pub fn pager(&self) -> &Arc<Pager> {
299 &self.pager
300 }
301
302 pub fn tx_manager(&self) -> &Arc<TransactionManager> {
304 &self.tx_manager
305 }
306
307 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, DatabaseError> {
309 self.check_open()?;
310 if self.config.read_only {
311 return Err(DatabaseError::ReadOnly);
312 }
313 self.pager
314 .allocate_page(page_type)
315 .map_err(|e| DatabaseError::Pager(e.to_string()))
316 }
317
318 pub fn read_page(&self, page_id: u32) -> Result<Page, DatabaseError> {
320 self.check_open()?;
321 self.pager
322 .read_page(page_id)
323 .map_err(|e| DatabaseError::Pager(e.to_string()))
324 }
325
326 pub fn checkpoint(&self) -> Result<CheckpointResult, DatabaseError> {
328 self.check_open()?;
329 if self.config.read_only {
330 return Err(DatabaseError::ReadOnly);
331 }
332
333 let checkpointer = Checkpointer::new(self.config.checkpoint_mode);
334 let result = checkpointer.checkpoint(&self.pager, &self.wal_path)?;
335
336 *self.pages_since_checkpoint_write()? = 0;
338
339 Ok(result)
340 }
341
342 pub fn maybe_auto_checkpoint(&self) -> Result<Option<CheckpointResult>, DatabaseError> {
344 if self.config.auto_checkpoint_threshold == 0 {
345 return Ok(None);
346 }
347
348 let pages = *self.pages_since_checkpoint_read()?;
349 if pages >= self.config.auto_checkpoint_threshold {
350 Ok(Some(self.checkpoint()?))
351 } else {
352 Ok(None)
353 }
354 }
355
356 pub fn increment_page_count(&self, count: u32) {
358 let mut pages = self
359 .pages_since_checkpoint
360 .write()
361 .unwrap_or_else(|poisoned| poisoned.into_inner());
362 *pages = pages.saturating_add(count);
363 }
364
365 pub fn sync(&self) -> Result<(), DatabaseError> {
367 self.check_open()?;
368 self.pager
369 .sync()
370 .map_err(|e| DatabaseError::Pager(e.to_string()))?;
371 self.tx_manager.sync_wal()?;
372 Ok(())
373 }
374
375 pub fn close(self) -> Result<(), DatabaseError> {
379 *self.state_write()? = DbState::Closed;
381
382 if self.tx_manager.has_active_transactions() {
384 tracing::warn!("closing database with active transactions");
385 }
386
387 if !self.config.read_only {
389 let checkpointer = Checkpointer::new(CheckpointMode::Truncate);
390 let _ = checkpointer.checkpoint(&self.pager, &self.wal_path);
391 }
392
393 let _ = self.pager.sync();
395
396 Ok(())
397 }
398
399 pub fn path(&self) -> &Path {
401 &self.path
402 }
403
404 pub fn wal_path(&self) -> &Path {
406 &self.wal_path
407 }
408
409 pub fn is_read_only(&self) -> bool {
411 self.config.read_only
412 }
413
414 pub fn page_count(&self) -> u32 {
416 self.pager.page_count().unwrap_or(0)
417 }
418
419 pub fn file_size(&self) -> Result<u64, DatabaseError> {
421 self.pager
422 .file_size()
423 .map_err(|e| DatabaseError::Pager(e.to_string()))
424 }
425
426 pub fn cache_stats(&self) -> super::page_cache::CacheStats {
428 self.pager.cache_stats()
429 }
430}
431
432impl Drop for Database {
433 fn drop(&mut self) {
434 let state = self
436 .state
437 .read()
438 .unwrap_or_else(|poisoned| poisoned.into_inner());
439 if *state == DbState::Open {
440 drop(state);
441 let mut state = self
442 .state
443 .write()
444 .unwrap_or_else(|poisoned| poisoned.into_inner());
445 *state = DbState::Closed;
446 drop(state);
447
448 if !self.config.read_only {
450 let checkpointer = Checkpointer::new(CheckpointMode::Full);
451 let _ = checkpointer.checkpoint(&self.pager, &self.wal_path);
452 }
453 let _ = self.pager.sync();
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use std::fs;
462 use std::time::{SystemTime, UNIX_EPOCH};
463
464 fn temp_db_path() -> PathBuf {
465 let timestamp = SystemTime::now()
466 .duration_since(UNIX_EPOCH)
467 .unwrap()
468 .as_nanos();
469 std::env::temp_dir().join(format!("reddb_engine_test_{}.rdb", timestamp))
470 }
471
472 fn cleanup(path: &Path) {
473 let _ = fs::remove_file(path);
474 let wal_path = path.with_extension("rdb-wal");
475 let _ = fs::remove_file(wal_path);
476 }
477
478 #[test]
479 fn test_database_open_create() {
480 let path = temp_db_path();
481 cleanup(&path);
482
483 {
484 let db = Database::open(&path).unwrap();
485 assert!(!db.is_read_only());
486 assert_eq!(db.page_count(), 3); }
488
489 {
491 let db = Database::open(&path).unwrap();
492 assert_eq!(db.page_count(), 3);
493 }
494
495 cleanup(&path);
496 }
497
498 #[test]
499 fn test_database_transaction() {
500 let path = temp_db_path();
501 cleanup(&path);
502
503 {
504 let db = Database::open(&path).unwrap();
505
506 let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
508 let page_id = page.page_id();
509
510 let mut tx = db.begin().unwrap();
512
513 let mut page = Page::new(PageType::BTreeLeaf, page_id);
515 page.as_bytes_mut()[100] = 0xAB;
516 tx.write_page(page_id, page).unwrap();
517
518 tx.commit().unwrap();
520
521 let read_page = db.read_page(page_id).unwrap();
523 assert_eq!(read_page.as_bytes()[100], 0xAB);
524 }
525
526 cleanup(&path);
527 }
528
529 #[test]
530 fn test_database_crash_recovery() {
531 let path = temp_db_path();
532 cleanup(&path);
533
534 let page_id;
535
536 {
538 let db = Database::open(&path).unwrap();
539
540 let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
542 page_id = page.page_id();
543
544 let mut tx = db.begin().unwrap();
546 let mut page = Page::new(PageType::BTreeLeaf, page_id);
547 page.as_bytes_mut()[100] = 0xCD;
548 tx.write_page(page_id, page).unwrap();
549 tx.commit().unwrap();
550
551 db.sync().unwrap();
553
554 }
556
557 {
559 let db = Database::open(&path).unwrap();
560
561 let read_page = db.read_page(page_id).unwrap();
563 assert_eq!(read_page.as_bytes()[100], 0xCD);
564 }
565
566 cleanup(&path);
567 }
568
569 #[test]
570 fn test_database_checkpoint() {
571 let path = temp_db_path();
572 cleanup(&path);
573
574 {
575 let db = Database::open(&path).unwrap();
576
577 let page1 = db.allocate_page(PageType::BTreeLeaf).unwrap();
579 let page2 = db.allocate_page(PageType::BTreeLeaf).unwrap();
580
581 let mut tx1 = db.begin().unwrap();
583 let mut p1 = Page::new(PageType::BTreeLeaf, page1.page_id());
584 p1.as_bytes_mut()[100] = 0x11;
585 tx1.write_page(page1.page_id(), p1).unwrap();
586 tx1.commit().unwrap();
587
588 let mut tx2 = db.begin().unwrap();
589 let mut p2 = Page::new(PageType::BTreeLeaf, page2.page_id());
590 p2.as_bytes_mut()[100] = 0x22;
591 tx2.write_page(page2.page_id(), p2).unwrap();
592 tx2.commit().unwrap();
593
594 let result = db.checkpoint().unwrap();
596 assert_eq!(result.transactions_processed, 2);
597 assert!(result.pages_checkpointed >= 2);
598
599 db.close().unwrap();
601 }
602
603 {
605 let db = Database::open(&path).unwrap();
606 assert!(db.page_count() >= 3); }
609
610 cleanup(&path);
611 }
612
613 #[test]
614 fn test_database_read_only() {
615 let path = temp_db_path();
616 cleanup(&path);
617
618 {
620 let db = Database::open(&path).unwrap();
621 let page = db.allocate_page(PageType::BTreeLeaf).unwrap();
622 db.close().unwrap();
623 }
624
625 {
627 let config = DatabaseConfig {
628 read_only: true,
629 ..Default::default()
630 };
631 let db = Database::open_with_config(&path, config).unwrap();
632 assert!(db.is_read_only());
633
634 assert!(db.allocate_page(PageType::BTreeLeaf).is_err());
636 }
637
638 cleanup(&path);
639 }
640
641 #[test]
642 fn test_database_multiple_transactions() {
643 let path = temp_db_path();
644 cleanup(&path);
645
646 {
647 let db = Database::open(&path).unwrap();
648
649 let page1 = db.allocate_page(PageType::BTreeLeaf).unwrap();
651 let page2 = db.allocate_page(PageType::BTreeLeaf).unwrap();
652
653 let mut tx1 = db.begin().unwrap();
655 let mut tx2 = db.begin().unwrap();
656
657 let mut p1 = Page::new(PageType::BTreeLeaf, page1.page_id());
659 p1.as_bytes_mut()[100] = 0x11;
660 tx1.write_page(page1.page_id(), p1).unwrap();
661
662 let mut p2 = Page::new(PageType::BTreeLeaf, page2.page_id());
664 p2.as_bytes_mut()[100] = 0x22;
665 tx2.write_page(page2.page_id(), p2).unwrap();
666
667 tx1.commit().unwrap();
669 tx2.commit().unwrap();
670
671 let r1 = db.read_page(page1.page_id()).unwrap();
673 let r2 = db.read_page(page2.page_id()).unwrap();
674 assert_eq!(r1.as_bytes()[100], 0x11);
675 assert_eq!(r2.as_bytes()[100], 0x22);
676 }
677
678 cleanup(&path);
679 }
680
681 #[test]
682 fn test_begin_returns_structured_error_when_state_lock_is_poisoned() {
683 let path = temp_db_path();
684 cleanup(&path);
685
686 {
687 let db = Arc::new(Database::open(&path).unwrap());
688 let poison_target = Arc::clone(&db);
689 let _ = std::thread::spawn(move || {
690 let _guard = poison_target
691 .state
692 .write()
693 .expect("state lock should be acquired");
694 panic!("poison database state");
695 })
696 .join();
697
698 match db.begin() {
699 Ok(_) => panic!("begin should fail after state lock poisoning"),
700 Err(err) => {
701 assert!(matches!(err, DatabaseError::LockPoisoned("database state")))
702 }
703 }
704 }
705
706 cleanup(&path);
707 }
708}