1use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, DB_VERSION, HEADER_SIZE, MAGIC_BYTES, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::{Arc, Mutex, RwLock};
44
45const DEFAULT_CACHE_SIZE: usize = 10_000;
47
48#[derive(Debug)]
50pub enum PagerError {
51 Io(std::io::Error),
53 Page(PageError),
55 InvalidDatabase(String),
57 ReadOnly,
59 PageNotFound(u32),
61 Locked,
63 LockPoisoned,
65 EncryptionRequired,
67 PlainDatabaseRefusesKey,
69 InvalidKey,
71}
72
73impl std::fmt::Display for PagerError {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 Self::Io(e) => write!(f, "I/O error: {}", e),
77 Self::Page(e) => write!(f, "Page error: {}", e),
78 Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
79 Self::ReadOnly => write!(f, "Database is read-only"),
80 Self::PageNotFound(id) => write!(f, "Page {} not found", id),
81 Self::Locked => write!(f, "Database is locked"),
82 Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
83 Self::EncryptionRequired => write!(
84 f,
85 "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
86 ),
87 Self::PlainDatabaseRefusesKey => write!(
88 f,
89 "Plain (unencrypted) database opened with an encryption key — refusing"
90 ),
91 Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
92 }
93 }
94}
95
96impl std::error::Error for PagerError {
97 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
98 match self {
99 Self::Io(e) => Some(e),
100 Self::Page(e) => Some(e),
101 _ => None,
102 }
103 }
104}
105
106impl From<std::io::Error> for PagerError {
107 fn from(e: std::io::Error) -> Self {
108 Self::Io(e)
109 }
110}
111
112impl From<PageError> for PagerError {
113 fn from(e: PageError) -> Self {
114 Self::Page(e)
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct PagerConfig {
121 pub cache_size: usize,
123 pub read_only: bool,
125 pub create: bool,
127 pub verify_checksums: bool,
129 pub double_write: bool,
131 pub encryption: Option<crate::storage::encryption::SecureKey>,
137}
138
139impl Default for PagerConfig {
140 fn default() -> Self {
141 Self {
142 cache_size: DEFAULT_CACHE_SIZE,
143 read_only: false,
144 create: true,
145 verify_checksums: true,
146 double_write: true,
147 encryption: None,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct DatabaseHeader {
155 pub version: u32,
157 pub page_size: u32,
159 pub page_count: u32,
161 pub freelist_head: u32,
163 pub schema_version: u32,
165 pub checkpoint_lsn: u64,
167 pub checkpoint_in_progress: bool,
169 pub checkpoint_target_lsn: u64,
171 pub physical: PhysicalFileHeader,
173}
174
175#[derive(Debug, Clone, Copy, Default)]
177pub struct PhysicalFileHeader {
178 pub format_version: u32,
179 pub sequence: u64,
180 pub manifest_oldest_root: u64,
181 pub manifest_root: u64,
182 pub free_set_root: u64,
183 pub manifest_page: u32,
184 pub manifest_checksum: u64,
185 pub collection_roots_page: u32,
186 pub collection_roots_checksum: u64,
187 pub collection_root_count: u32,
188 pub snapshot_count: u32,
189 pub index_count: u32,
190 pub catalog_collection_count: u32,
191 pub catalog_total_entities: u64,
192 pub export_count: u32,
193 pub graph_projection_count: u32,
194 pub analytics_job_count: u32,
195 pub manifest_event_count: u32,
196 pub registry_page: u32,
197 pub registry_checksum: u64,
198 pub recovery_page: u32,
199 pub recovery_checksum: u64,
200 pub catalog_page: u32,
201 pub catalog_checksum: u64,
202 pub metadata_state_page: u32,
203 pub metadata_state_checksum: u64,
204 pub vector_artifact_page: u32,
205 pub vector_artifact_checksum: u64,
206}
207
208impl Default for DatabaseHeader {
209 fn default() -> Self {
210 Self {
211 version: DB_VERSION,
212 page_size: PAGE_SIZE as u32,
213 page_count: 1, freelist_head: 0,
215 schema_version: 0,
216 checkpoint_lsn: 0,
217 checkpoint_in_progress: false,
218 checkpoint_target_lsn: 0,
219 physical: PhysicalFileHeader::default(),
220 }
221 }
222}
223
224pub struct Pager {
228 path: PathBuf,
230 file: Mutex<File>,
232 _lock_file: Option<File>,
234 dwb_file: Option<Mutex<File>>,
236 cache: PageCache,
238 freelist: RwLock<FreeList>,
240 header: RwLock<DatabaseHeader>,
242 config: PagerConfig,
244 header_dirty: Mutex<bool>,
246 wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
258 pub(crate) encryption: Option<(
265 crate::storage::encryption::PageEncryptor,
266 crate::storage::encryption::EncryptionHeader,
267 )>,
268}
269
270#[path = "pager/impl.rs"]
271mod pager_impl;
272impl Drop for Pager {
273 fn drop(&mut self) {
274 let _ = self.flush();
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use std::fs;
283 use std::io::Write;
284
285 fn temp_db_path() -> PathBuf {
286 use std::sync::atomic::{AtomicU64, Ordering};
287 static COUNTER: AtomicU64 = AtomicU64::new(0);
288 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
289 let mut path = std::env::temp_dir();
290 path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
291 path
292 }
293
294 fn cleanup(path: &Path) {
295 let _ = fs::remove_file(path);
296 let mut hdr = path.to_path_buf().into_os_string();
298 hdr.push("-hdr");
299 let _ = fs::remove_file(&hdr);
300 let mut meta = path.to_path_buf().into_os_string();
301 meta.push("-meta");
302 let _ = fs::remove_file(&meta);
303 let mut dwb = path.to_path_buf().into_os_string();
304 dwb.push("-dwb");
305 let _ = fs::remove_file(&dwb);
306 }
307
308 fn dwb_path_for(path: &Path) -> PathBuf {
309 let mut dwb = path.to_path_buf().into_os_string();
310 dwb.push("-dwb");
311 PathBuf::from(dwb)
312 }
313
314 fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
315 let entry_size = 4 + PAGE_SIZE;
316 let header_len = 12;
317 let total = header_len + pages.len() * entry_size;
318 let mut buf = Vec::with_capacity(total);
319
320 buf.extend_from_slice(&[0x52, 0x44, 0x44, 0x57]); buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
322 buf.extend_from_slice(&[0u8; 4]);
323
324 for (page_id, page) in pages {
325 let mut page = page.clone();
326 page.update_checksum();
327 buf.extend_from_slice(&page_id.to_le_bytes());
328 buf.extend_from_slice(page.as_bytes());
329 }
330
331 let checksum = crate::storage::engine::crc32::crc32(&buf[header_len..]);
332 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
333
334 let dwb_path = dwb_path_for(path);
335 let mut file = fs::File::create(&dwb_path).unwrap();
336 file.write_all(&buf).unwrap();
337 file.sync_all().unwrap();
338 }
339
340 #[test]
341 fn test_pager_create_new() {
342 let path = temp_db_path();
343 cleanup(&path);
344
345 {
346 let pager = Pager::open_default(&path).unwrap();
347 assert_eq!(pager.page_count().unwrap(), 3); }
349
350 cleanup(&path);
351 }
352
353 #[test]
354 fn test_pager_reopen() {
355 let path = temp_db_path();
356 cleanup(&path);
357
358 {
360 let pager = Pager::open_default(&path).unwrap();
361
362 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
364 assert_eq!(page.page_id(), 3);
365
366 pager.sync().unwrap();
367 }
368
369 {
371 let pager = Pager::open_default(&path).unwrap();
372 assert_eq!(pager.page_count().unwrap(), 4); }
374
375 cleanup(&path);
376 }
377
378 #[test]
379 fn test_pager_read_write() {
380 let path = temp_db_path();
381 cleanup(&path);
382
383 {
384 let pager = Pager::open_default(&path).unwrap();
385
386 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
388 let page_id = page.page_id();
389
390 page.insert_cell(b"key", b"value").unwrap();
391 pager.write_page(page_id, page).unwrap();
392
393 let read_page = pager.read_page(page_id).unwrap();
395 let (key, value) = read_page.read_cell(0).unwrap();
396 assert_eq!(key, b"key");
397 assert_eq!(value, b"value");
398 }
399
400 cleanup(&path);
401 }
402
403 #[test]
404 fn test_pager_cache() {
405 let path = temp_db_path();
406 cleanup(&path);
407
408 {
409 let pager = Pager::open_default(&path).unwrap();
410
411 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
413 let page_id = page.page_id();
414
415 let _ = pager.read_page(page_id).unwrap();
417
418 let _ = pager.read_page(page_id).unwrap();
420
421 let stats = pager.cache_stats();
422 assert!(stats.hits >= 1);
423 }
424
425 cleanup(&path);
426 }
427
428 #[test]
429 fn test_pager_free_page() {
430 let path = temp_db_path();
431 cleanup(&path);
432
433 {
434 let pager = Pager::open_default(&path).unwrap();
435
436 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
438 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
439
440 let id1 = page1.page_id();
441 let id2 = page2.page_id();
442
443 pager.free_page(id1).unwrap();
445
446 let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
448 assert_eq!(page3.page_id(), id1);
449 }
450
451 cleanup(&path);
452 }
453
454 #[test]
455 fn test_freelist_persistence() {
456 let path = temp_db_path();
457 cleanup(&path);
458
459 let freed_id;
460 {
461 let pager = Pager::open_default(&path).unwrap();
462 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
463 let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
464 freed_id = page1.page_id();
465
466 pager.free_page(freed_id).unwrap();
467 pager.sync().unwrap();
468 }
469
470 {
471 let pager = Pager::open_default(&path).unwrap();
472 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
473 assert_eq!(page.page_id(), freed_id);
474 }
475
476 cleanup(&path);
477 }
478
479 #[test]
480 fn test_pager_read_only() {
481 let path = temp_db_path();
482 cleanup(&path);
483
484 {
486 let pager = Pager::open_default(&path).unwrap();
487 pager.sync().unwrap();
488 }
489
490 {
492 let config = PagerConfig {
493 read_only: true,
494 ..Default::default()
495 };
496
497 let pager = Pager::open(&path, config).unwrap();
498 assert!(pager.is_read_only());
499
500 assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
502 }
503
504 cleanup(&path);
505 }
506
507 #[test]
508 fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
509 let path = temp_db_path();
510 cleanup(&path);
511
512 let config = PagerConfig {
513 double_write: true,
514 ..Default::default()
515 };
516
517 let page_id;
518 {
519 let pager = Pager::open(&path, config.clone()).unwrap();
520 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
521 page_id = page.page_id();
522 pager.sync().unwrap();
523 }
524
525 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
526 recovered_page.insert_cell(b"key", b"value").unwrap();
527 write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
528
529 let dwb_path = dwb_path_for(&path);
530 assert!(dwb_path.exists());
531 assert!(fs::metadata(&dwb_path).unwrap().len() > 0);
532
533 {
534 let pager = Pager::open(&path, config).unwrap();
535
536 let read_page = pager.read_page(page_id).unwrap();
537 let (key, value) = read_page.read_cell(0).unwrap();
538 assert_eq!(key, b"key");
539 assert_eq!(value, b"value");
540
541 assert!(dwb_path.exists());
542 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
543
544 let mut updated_page = recovered_page.clone();
545 updated_page.insert_cell(b"key2", b"value2").unwrap();
546 pager.write_page(page_id, updated_page).unwrap();
547 pager.flush().unwrap();
548
549 assert!(dwb_path.exists());
550 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
551 }
552
553 cleanup(&path);
554 }
555
556 #[test]
561 fn pager_starts_without_wal_writer() {
562 let path = temp_db_path();
563 let pager = Pager::open(&path, PagerConfig::default()).unwrap();
564 assert!(!pager.has_wal_writer());
565 drop(pager);
566 cleanup(&path);
567 }
568
569 #[test]
570 fn set_wal_writer_attaches_handle() {
571 use crate::storage::wal::writer::WalWriter;
572 use std::sync::{Arc, Mutex};
573
574 let db_path = temp_db_path();
575 let mut wal_path = db_path.clone();
576 wal_path.set_extension("wal");
577 let _ = fs::remove_file(&wal_path);
578
579 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
580 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
581 pager.set_wal_writer(Arc::clone(&wal));
582 assert!(pager.has_wal_writer());
583
584 pager.clear_wal_writer();
585 assert!(!pager.has_wal_writer());
586
587 drop(pager);
588 let _ = fs::remove_file(&wal_path);
589 cleanup(&db_path);
590 }
591
592 #[test]
593 fn flush_with_lsn_zero_pages_skips_wal_call() {
594 use crate::storage::wal::writer::WalWriter;
600 use std::sync::{Arc, Mutex};
601
602 let db_path = temp_db_path();
603 let mut wal_path = db_path.clone();
604 wal_path.set_extension("wal");
605 let _ = fs::remove_file(&wal_path);
606
607 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
608 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
609 let initial_durable = {
610 let g = wal.lock().unwrap();
611 g.durable_lsn()
612 };
613 pager.set_wal_writer(Arc::clone(&wal));
614
615 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
617 page.insert_cell(b"k", b"v").unwrap();
618 pager.write_page(page.page_id(), page).unwrap();
620 pager.flush().unwrap();
621
622 let after_flush = {
625 let g = wal.lock().unwrap();
626 g.durable_lsn()
627 };
628 assert_eq!(after_flush, initial_durable);
629
630 drop(pager);
631 let _ = fs::remove_file(&wal_path);
632 cleanup(&db_path);
633 }
634
635 #[test]
636 fn flush_advances_wal_durable_when_pages_carry_lsn() {
637 use crate::storage::wal::record::WalRecord;
641 use crate::storage::wal::writer::WalWriter;
642 use std::sync::{Arc, Mutex};
643
644 let db_path = temp_db_path();
645 let mut wal_path = db_path.clone();
646 wal_path.set_extension("wal");
647 let _ = fs::remove_file(&wal_path);
648
649 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
650 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
651 pager.set_wal_writer(Arc::clone(&wal));
652
653 let stamped_lsn = {
655 let mut wal_guard = wal.lock().unwrap();
656 wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
657 wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
658 wal_guard.current_lsn()
659 };
660 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
661 page.insert_cell(b"k", b"v").unwrap();
662 page.set_lsn(stamped_lsn);
664 pager.write_page(page.page_id(), page).unwrap();
665 pager.flush().unwrap();
666
667 let after_flush = {
669 let g = wal.lock().unwrap();
670 g.durable_lsn()
671 };
672 assert!(
673 after_flush >= stamped_lsn,
674 "after flush durable_lsn {} must be >= stamped {}",
675 after_flush,
676 stamped_lsn
677 );
678
679 drop(pager);
680 let _ = fs::remove_file(&wal_path);
681 cleanup(&db_path);
682 }
683}