1use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, DB_VERSION, HEADER_SIZE, MAGIC_BYTES, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::{Arc, Mutex, RwLock};
44
45const DEFAULT_CACHE_SIZE: usize = 10_000;
47
48#[derive(Debug)]
50pub enum PagerError {
51 Io(std::io::Error),
53 Page(PageError),
55 InvalidDatabase(String),
57 ReadOnly,
59 PageNotFound(u32),
61 Locked,
63 LockPoisoned,
65 EncryptionRequired,
67 PlainDatabaseRefusesKey,
69 InvalidKey,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct ExtentId {
76 pub start_page: u32,
77 pub n_pages: u32,
78}
79
80impl std::fmt::Display for PagerError {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Io(e) => write!(f, "I/O error: {}", e),
84 Self::Page(e) => write!(f, "Page error: {}", e),
85 Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
86 Self::ReadOnly => write!(f, "Database is read-only"),
87 Self::PageNotFound(id) => write!(f, "Page {} not found", id),
88 Self::Locked => write!(f, "Database is locked"),
89 Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
90 Self::EncryptionRequired => write!(
91 f,
92 "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
93 ),
94 Self::PlainDatabaseRefusesKey => write!(
95 f,
96 "Plain (unencrypted) database opened with an encryption key — refusing"
97 ),
98 Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
99 }
100 }
101}
102
103impl std::error::Error for PagerError {
104 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
105 match self {
106 Self::Io(e) => Some(e),
107 Self::Page(e) => Some(e),
108 _ => None,
109 }
110 }
111}
112
113impl From<std::io::Error> for PagerError {
114 fn from(e: std::io::Error) -> Self {
115 Self::Io(e)
116 }
117}
118
119impl From<PageError> for PagerError {
120 fn from(e: PageError) -> Self {
121 Self::Page(e)
122 }
123}
124
125#[derive(Debug, Clone)]
127pub struct PagerConfig {
128 pub cache_size: usize,
130 pub read_only: bool,
132 pub create: bool,
134 pub verify_checksums: bool,
136 pub double_write: bool,
138 pub encryption: Option<crate::storage::encryption::SecureKey>,
144}
145
146impl Default for PagerConfig {
147 fn default() -> Self {
148 Self {
149 cache_size: DEFAULT_CACHE_SIZE,
150 read_only: false,
151 create: true,
152 verify_checksums: true,
153 double_write: true,
154 encryption: None,
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct DatabaseHeader {
162 pub version: u32,
164 pub page_size: u32,
166 pub page_count: u32,
168 pub freelist_head: u32,
170 pub schema_version: u32,
172 pub checkpoint_lsn: u64,
174 pub checkpoint_in_progress: bool,
176 pub checkpoint_target_lsn: u64,
178 pub physical: PhysicalFileHeader,
180}
181
182#[derive(Debug, Clone, Copy, Default)]
184pub struct PhysicalFileHeader {
185 pub format_version: u32,
186 pub sequence: u64,
187 pub manifest_oldest_root: u64,
188 pub manifest_root: u64,
189 pub free_set_root: u64,
190 pub manifest_page: u32,
191 pub manifest_checksum: u64,
192 pub collection_roots_page: u32,
193 pub collection_roots_checksum: u64,
194 pub collection_root_count: u32,
195 pub snapshot_count: u32,
196 pub index_count: u32,
197 pub catalog_collection_count: u32,
198 pub catalog_total_entities: u64,
199 pub export_count: u32,
200 pub graph_projection_count: u32,
201 pub analytics_job_count: u32,
202 pub manifest_event_count: u32,
203 pub registry_page: u32,
204 pub registry_checksum: u64,
205 pub recovery_page: u32,
206 pub recovery_checksum: u64,
207 pub catalog_page: u32,
208 pub catalog_checksum: u64,
209 pub metadata_state_page: u32,
210 pub metadata_state_checksum: u64,
211 pub vector_artifact_page: u32,
212 pub vector_artifact_checksum: u64,
213}
214
215impl Default for DatabaseHeader {
216 fn default() -> Self {
217 Self {
218 version: DB_VERSION,
219 page_size: PAGE_SIZE as u32,
220 page_count: 1, freelist_head: 0,
222 schema_version: 0,
223 checkpoint_lsn: 0,
224 checkpoint_in_progress: false,
225 checkpoint_target_lsn: 0,
226 physical: PhysicalFileHeader::default(),
227 }
228 }
229}
230
231pub struct Pager {
235 path: PathBuf,
237 file: Mutex<File>,
239 _lock_file: Option<File>,
241 dwb_file: Option<Mutex<File>>,
243 cache: PageCache,
245 freelist: RwLock<FreeList>,
247 header: RwLock<DatabaseHeader>,
249 config: PagerConfig,
251 header_dirty: Mutex<bool>,
253 wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
265 pub(crate) encryption: Option<(
272 crate::storage::encryption::PageEncryptor,
273 crate::storage::encryption::EncryptionHeader,
274 )>,
275}
276
277#[path = "pager/impl.rs"]
278mod pager_impl;
279impl Drop for Pager {
280 fn drop(&mut self) {
281 let _ = self.flush();
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use std::fs;
290 use std::io::Write;
291
292 fn temp_db_path() -> PathBuf {
293 use std::sync::atomic::{AtomicU64, Ordering};
294 static COUNTER: AtomicU64 = AtomicU64::new(0);
295 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
296 let mut path = std::env::temp_dir();
297 path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
298 path
299 }
300
301 fn cleanup(path: &Path) {
302 let _ = fs::remove_file(path);
303 let mut hdr = path.to_path_buf().into_os_string();
305 hdr.push("-hdr");
306 let _ = fs::remove_file(&hdr);
307 let mut meta = path.to_path_buf().into_os_string();
308 meta.push("-meta");
309 let _ = fs::remove_file(&meta);
310 let mut dwb = path.to_path_buf().into_os_string();
311 dwb.push("-dwb");
312 let _ = fs::remove_file(&dwb);
313 }
314
315 fn dwb_path_for(path: &Path) -> PathBuf {
316 let mut dwb = path.to_path_buf().into_os_string();
317 dwb.push("-dwb");
318 PathBuf::from(dwb)
319 }
320
321 fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
322 let entry_size = 4 + PAGE_SIZE;
323 let header_len = 12;
324 let total = header_len + pages.len() * entry_size;
325 let mut buf = Vec::with_capacity(total);
326
327 buf.extend_from_slice(&[0x52, 0x44, 0x44, 0x57]); buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
329 buf.extend_from_slice(&[0u8; 4]);
330
331 for (page_id, page) in pages {
332 let mut page = page.clone();
333 page.update_checksum();
334 buf.extend_from_slice(&page_id.to_le_bytes());
335 buf.extend_from_slice(page.as_bytes());
336 }
337
338 let checksum = crate::storage::engine::crc32::crc32(&buf[header_len..]);
339 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
340
341 let dwb_path = dwb_path_for(path);
342 let mut file = fs::File::create(&dwb_path).unwrap();
343 file.write_all(&buf).unwrap();
344 file.sync_all().unwrap();
345 }
346
347 #[test]
348 fn test_pager_create_new() {
349 let path = temp_db_path();
350 cleanup(&path);
351
352 {
353 let pager = Pager::open_default(&path).unwrap();
354 assert_eq!(pager.page_count().unwrap(), 3); }
356
357 cleanup(&path);
358 }
359
360 #[test]
361 fn test_pager_reopen() {
362 let path = temp_db_path();
363 cleanup(&path);
364
365 {
367 let pager = Pager::open_default(&path).unwrap();
368
369 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
371 assert_eq!(page.page_id(), 3);
372
373 pager.sync().unwrap();
374 }
375
376 {
378 let pager = Pager::open_default(&path).unwrap();
379 assert_eq!(pager.page_count().unwrap(), 4); }
381
382 cleanup(&path);
383 }
384
385 #[test]
386 fn test_pager_read_write() {
387 let path = temp_db_path();
388 cleanup(&path);
389
390 {
391 let pager = Pager::open_default(&path).unwrap();
392
393 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
395 let page_id = page.page_id();
396
397 page.insert_cell(b"key", b"value").unwrap();
398 pager.write_page(page_id, page).unwrap();
399
400 let read_page = pager.read_page(page_id).unwrap();
402 let (key, value) = read_page.read_cell(0).unwrap();
403 assert_eq!(key, b"key");
404 assert_eq!(value, b"value");
405 }
406
407 cleanup(&path);
408 }
409
410 #[test]
411 fn test_pager_cache() {
412 let path = temp_db_path();
413 cleanup(&path);
414
415 {
416 let pager = Pager::open_default(&path).unwrap();
417
418 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
420 let page_id = page.page_id();
421
422 let _ = pager.read_page(page_id).unwrap();
424
425 let _ = pager.read_page(page_id).unwrap();
427
428 let stats = pager.cache_stats();
429 assert!(stats.hits >= 1);
430 }
431
432 cleanup(&path);
433 }
434
435 #[test]
436 fn test_pager_free_page() {
437 let path = temp_db_path();
438 cleanup(&path);
439
440 {
441 let pager = Pager::open_default(&path).unwrap();
442
443 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
445 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
446
447 let id1 = page1.page_id();
448 let id2 = page2.page_id();
449
450 pager.free_page(id1).unwrap();
452
453 let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
455 assert_eq!(page3.page_id(), id1);
456 }
457
458 cleanup(&path);
459 }
460
461 #[test]
462 fn test_freelist_persistence() {
463 let path = temp_db_path();
464 cleanup(&path);
465
466 let freed_id;
467 {
468 let pager = Pager::open_default(&path).unwrap();
469 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
470 let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
471 freed_id = page1.page_id();
472
473 pager.free_page(freed_id).unwrap();
474 pager.sync().unwrap();
475 }
476
477 {
478 let pager = Pager::open_default(&path).unwrap();
479 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
480 assert_eq!(page.page_id(), freed_id);
481 }
482
483 cleanup(&path);
484 }
485
486 #[test]
487 fn test_pager_read_only() {
488 let path = temp_db_path();
489 cleanup(&path);
490
491 {
493 let pager = Pager::open_default(&path).unwrap();
494 pager.sync().unwrap();
495 }
496
497 {
499 let config = PagerConfig {
500 read_only: true,
501 ..Default::default()
502 };
503
504 let pager = Pager::open(&path, config).unwrap();
505 assert!(pager.is_read_only());
506
507 assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
509 }
510
511 cleanup(&path);
512 }
513
514 #[test]
515 fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
516 let path = temp_db_path();
517 cleanup(&path);
518
519 let config = PagerConfig {
520 double_write: true,
521 ..Default::default()
522 };
523
524 let page_id;
525 {
526 let pager = Pager::open(&path, config.clone()).unwrap();
527 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
528 page_id = page.page_id();
529 pager.sync().unwrap();
530 }
531
532 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
533 recovered_page.insert_cell(b"key", b"value").unwrap();
534 write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
535
536 let dwb_path = dwb_path_for(&path);
537 assert!(dwb_path.exists());
538 assert!(fs::metadata(&dwb_path).unwrap().len() > 0);
539
540 {
541 let pager = Pager::open(&path, config).unwrap();
542
543 let read_page = pager.read_page(page_id).unwrap();
544 let (key, value) = read_page.read_cell(0).unwrap();
545 assert_eq!(key, b"key");
546 assert_eq!(value, b"value");
547
548 assert!(dwb_path.exists());
549 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
550
551 let mut updated_page = recovered_page.clone();
552 updated_page.insert_cell(b"key2", b"value2").unwrap();
553 pager.write_page(page_id, updated_page).unwrap();
554 pager.flush().unwrap();
555
556 assert!(dwb_path.exists());
557 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
558 }
559
560 cleanup(&path);
561 }
562
563 #[test]
568 fn pager_starts_without_wal_writer() {
569 let path = temp_db_path();
570 let pager = Pager::open(&path, PagerConfig::default()).unwrap();
571 assert!(!pager.has_wal_writer());
572 drop(pager);
573 cleanup(&path);
574 }
575
576 #[test]
577 fn set_wal_writer_attaches_handle() {
578 use crate::storage::wal::writer::WalWriter;
579 use std::sync::{Arc, Mutex};
580
581 let db_path = temp_db_path();
582 let mut wal_path = db_path.clone();
583 wal_path.set_extension("wal");
584 let _ = fs::remove_file(&wal_path);
585
586 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
587 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
588 pager.set_wal_writer(Arc::clone(&wal));
589 assert!(pager.has_wal_writer());
590
591 pager.clear_wal_writer();
592 assert!(!pager.has_wal_writer());
593
594 drop(pager);
595 let _ = fs::remove_file(&wal_path);
596 cleanup(&db_path);
597 }
598
599 #[test]
600 fn flush_with_lsn_zero_pages_skips_wal_call() {
601 use crate::storage::wal::writer::WalWriter;
607 use std::sync::{Arc, Mutex};
608
609 let db_path = temp_db_path();
610 let mut wal_path = db_path.clone();
611 wal_path.set_extension("wal");
612 let _ = fs::remove_file(&wal_path);
613
614 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
615 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
616 let initial_durable = {
617 let g = wal.lock().unwrap();
618 g.durable_lsn()
619 };
620 pager.set_wal_writer(Arc::clone(&wal));
621
622 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
624 page.insert_cell(b"k", b"v").unwrap();
625 pager.write_page(page.page_id(), page).unwrap();
627 pager.flush().unwrap();
628
629 let after_flush = {
632 let g = wal.lock().unwrap();
633 g.durable_lsn()
634 };
635 assert_eq!(after_flush, initial_durable);
636
637 drop(pager);
638 let _ = fs::remove_file(&wal_path);
639 cleanup(&db_path);
640 }
641
642 #[test]
643 fn flush_advances_wal_durable_when_pages_carry_lsn() {
644 use crate::storage::wal::record::WalRecord;
648 use crate::storage::wal::writer::WalWriter;
649 use std::sync::{Arc, Mutex};
650
651 let db_path = temp_db_path();
652 let mut wal_path = db_path.clone();
653 wal_path.set_extension("wal");
654 let _ = fs::remove_file(&wal_path);
655
656 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
657 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
658 pager.set_wal_writer(Arc::clone(&wal));
659
660 let stamped_lsn = {
662 let mut wal_guard = wal.lock().unwrap();
663 wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
664 wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
665 wal_guard.current_lsn()
666 };
667 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
668 page.insert_cell(b"k", b"v").unwrap();
669 page.set_lsn(stamped_lsn);
671 pager.write_page(page.page_id(), page).unwrap();
672 pager.flush().unwrap();
673
674 let after_flush = {
676 let g = wal.lock().unwrap();
677 g.durable_lsn()
678 };
679 assert!(
680 after_flush >= stamped_lsn,
681 "after flush durable_lsn {} must be >= stamped {}",
682 after_flush,
683 stamped_lsn
684 );
685
686 drop(pager);
687 let _ = fs::remove_file(&wal_path);
688 cleanup(&db_path);
689 }
690
691 #[test]
696 fn block_size_warn_fires_for_mismatched_block_size() {
697 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6000));
701 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 1_048_576));
703 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6 * 1024));
705 }
706
707 #[test]
708 fn block_size_silent_for_divisor() {
709 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 4096));
711 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 16384));
712 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 512));
713 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 8192));
714 }
715
716 #[test]
717 fn block_size_unavailable_is_silent() {
718 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 0));
720 }
721
722 #[test]
723 fn page_size_is_unchanged_16kib() {
724 assert_eq!(PAGE_SIZE, 16 * 1024);
726 }
727}