1use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43#[cfg(test)]
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::{Arc, Mutex, RwLock};
46
47pub use reddb_file::{DatabaseHeader, PhysicalFileHeader};
48
49const DEFAULT_CACHE_SIZE: usize = 10_000;
51
52#[cfg(test)]
53static COW_ATOMIC_WRITE_TEST_OVERRIDE: AtomicU8 = AtomicU8::new(0);
54
55#[derive(Debug)]
57pub enum PagerError {
58 Io(std::io::Error),
60 Page(PageError),
62 InvalidDatabase(String),
64 ReadOnly,
66 PageNotFound(u32),
68 Locked,
70 LockPoisoned,
72 EncryptionRequired,
74 PlainDatabaseRefusesKey,
76 InvalidKey,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct ExtentId {
83 pub start_page: u32,
84 pub n_pages: u32,
85}
86
87impl std::fmt::Display for PagerError {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::Io(e) => write!(f, "I/O error: {}", e),
91 Self::Page(e) => write!(f, "Page error: {}", e),
92 Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
93 Self::ReadOnly => write!(f, "Database is read-only"),
94 Self::PageNotFound(id) => write!(f, "Page {} not found", id),
95 Self::Locked => write!(f, "Database is locked"),
96 Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
97 Self::EncryptionRequired => write!(
98 f,
99 "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
100 ),
101 Self::PlainDatabaseRefusesKey => write!(
102 f,
103 "Plain (unencrypted) database opened with an encryption key — refusing"
104 ),
105 Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
106 }
107 }
108}
109
110impl std::error::Error for PagerError {
111 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112 match self {
113 Self::Io(e) => Some(e),
114 Self::Page(e) => Some(e),
115 _ => None,
116 }
117 }
118}
119
120impl From<std::io::Error> for PagerError {
121 fn from(e: std::io::Error) -> Self {
122 Self::Io(e)
123 }
124}
125
126impl From<PageError> for PagerError {
127 fn from(e: PageError) -> Self {
128 Self::Page(e)
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct PagerConfig {
135 pub cache_size: usize,
137 pub read_only: bool,
139 pub create: bool,
141 pub verify_checksums: bool,
143 pub double_write: bool,
145 pub encryption: Option<crate::storage::encryption::SecureKey>,
151}
152
153impl Default for PagerConfig {
154 fn default() -> Self {
155 Self {
156 cache_size: DEFAULT_CACHE_SIZE,
157 read_only: false,
158 create: true,
159 verify_checksums: true,
160 double_write: true,
161 encryption: None,
162 }
163 }
164}
165
166pub struct Pager {
170 path: PathBuf,
172 file: Mutex<File>,
174 _lock_file: Option<File>,
176 dwb_file: Option<Mutex<File>>,
178 cache: PageCache,
180 freelist: RwLock<FreeList>,
182 header: RwLock<DatabaseHeader>,
184 config: PagerConfig,
186 header_dirty: Mutex<bool>,
188 wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
200 pub(crate) encryption: Option<(
207 crate::storage::encryption::PageEncryptor,
208 crate::storage::encryption::EncryptionHeader,
209 )>,
210}
211
212#[path = "pager/impl.rs"]
213mod pager_impl;
214impl Drop for Pager {
215 fn drop(&mut self) {
216 let _ = self.flush();
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 #[cfg(target_os = "linux")]
225 use pager_impl::parse_mountinfo_options_for_path;
226 use pager_impl::{
227 classify_cow_filesystem, CowFilesystemKind, BTRFS_SUPER_MAGIC, FS_NOCOW_FL, ZFS_SUPER_MAGIC,
228 };
229 use std::fs;
230 use std::io::Write;
231
232 fn temp_db_path() -> PathBuf {
233 use std::sync::atomic::{AtomicU64, Ordering};
234 static COUNTER: AtomicU64 = AtomicU64::new(0);
235 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
236 let mut path = std::env::temp_dir();
237 path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
238 path
239 }
240
241 fn cleanup(path: &Path) {
242 let _ = fs::remove_file(path);
243 let _ = fs::remove_file(reddb_file::layout::pager_header_shadow_path(path));
245 let _ = fs::remove_file(reddb_file::layout::pager_meta_shadow_path(path));
246 let _ = fs::remove_file(reddb_file::layout::pager_dwb_shadow_path(path));
247 }
248
249 fn dwb_path_for(path: &Path) -> PathBuf {
250 reddb_file::layout::pager_dwb_shadow_path(path)
251 }
252
253 static COW_ATOMIC_WRITE_OVERRIDE_GUARD: Mutex<()> = Mutex::new(());
254
255 struct CowAtomicWriteOverrideGuard {
256 _guard: std::sync::MutexGuard<'static, ()>,
257 }
258
259 impl Drop for CowAtomicWriteOverrideGuard {
260 fn drop(&mut self) {
261 COW_ATOMIC_WRITE_TEST_OVERRIDE.store(0, Ordering::Relaxed);
262 }
263 }
264
265 fn cow_atomic_write_override(value: bool) -> CowAtomicWriteOverrideGuard {
266 let guard = COW_ATOMIC_WRITE_OVERRIDE_GUARD
267 .lock()
268 .unwrap_or_else(|err| err.into_inner());
269 COW_ATOMIC_WRITE_TEST_OVERRIDE.store(if value { 1 } else { 2 }, Ordering::Relaxed);
270 CowAtomicWriteOverrideGuard { _guard: guard }
271 }
272
273 fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
274 let pages: Vec<_> = pages
275 .iter()
276 .map(|(page_id, page)| {
277 let mut page = page.clone();
278 page.update_checksum();
279 (*page_id, page)
280 })
281 .collect();
282 let buf = reddb_file::encode_paged_dwb_frame(
283 pages
284 .iter()
285 .map(|(page_id, page)| (*page_id, page.as_bytes())),
286 );
287
288 let dwb_path = dwb_path_for(path);
289 let mut file = fs::File::create(&dwb_path).expect("create() should succeed");
290 file.write_all(&buf).expect("write_all() should succeed");
291 file.sync_all().expect("sync_all() should succeed");
292 }
293
294 fn write_page_bytes(path: &Path, page_id: u32, page: &Page) {
295 let mut file = OpenOptions::new()
296 .write(true)
297 .open(path)
298 .expect("open() should succeed");
299 file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
300 .expect("value is present");
301 file.write_all(page.as_bytes())
302 .expect("write_all() should succeed");
303 file.sync_all().expect("sync_all() should succeed");
304 }
305
306 fn write_torn_page_bytes(path: &Path, page_id: u32, before: &Page, after: &Page) {
307 let mut torn = *before.as_bytes();
308 torn[..PAGE_SIZE / 2].copy_from_slice(&after.as_bytes()[..PAGE_SIZE / 2]);
309
310 let mut file = OpenOptions::new()
311 .write(true)
312 .open(path)
313 .expect("open() should succeed");
314 file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
315 .expect("value is present");
316 file.write_all(&torn).expect("write_all() should succeed");
317 file.sync_all().expect("sync_all() should succeed");
318 }
319
320 #[test]
321 fn test_pager_create_new() {
322 let path = temp_db_path();
323 cleanup(&path);
324
325 {
326 let pager = Pager::open_default(&path).expect("open_default() should succeed");
327 assert_eq!(pager.page_count().expect("page_count() should succeed"), 3);
328 }
330
331 cleanup(&path);
332 }
333
334 #[test]
335 fn test_pager_reopen() {
336 let path = temp_db_path();
337 cleanup(&path);
338
339 {
341 let pager = Pager::open_default(&path).expect("open_default() should succeed");
342
343 let page = pager
345 .allocate_page(PageType::BTreeLeaf)
346 .expect("allocate_page() should succeed");
347 assert_eq!(page.page_id(), 3);
348
349 pager.sync().expect("sync() should succeed");
350 }
351
352 {
354 let pager = Pager::open_default(&path).expect("open_default() should succeed");
355 assert_eq!(pager.page_count().expect("page_count() should succeed"), 4);
356 }
358
359 cleanup(&path);
360 }
361
362 #[test]
363 fn test_pager_read_write() {
364 let path = temp_db_path();
365 cleanup(&path);
366
367 {
368 let pager = Pager::open_default(&path).expect("open_default() should succeed");
369
370 let mut page = pager
372 .allocate_page(PageType::BTreeLeaf)
373 .expect("allocate_page() should succeed");
374 let page_id = page.page_id();
375
376 page.insert_cell(b"key", b"value")
377 .expect("insert_cell() should succeed");
378 pager
379 .write_page(page_id, page)
380 .expect("write_page() should succeed");
381
382 let read_page = pager
384 .read_page(page_id)
385 .expect("read_page() should succeed");
386 let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
387 assert_eq!(key, b"key");
388 assert_eq!(value, b"value");
389 }
390
391 cleanup(&path);
392 }
393
394 #[test]
395 fn test_pager_cache() {
396 let path = temp_db_path();
397 cleanup(&path);
398
399 {
400 let pager = Pager::open_default(&path).expect("open_default() should succeed");
401
402 let page = pager
404 .allocate_page(PageType::BTreeLeaf)
405 .expect("allocate_page() should succeed");
406 let page_id = page.page_id();
407
408 let _ = pager
410 .read_page(page_id)
411 .expect("read_page() should succeed");
412
413 let _ = pager
415 .read_page(page_id)
416 .expect("read_page() should succeed");
417
418 let stats = pager.cache_stats();
419 assert!(stats.hits >= 1);
420 }
421
422 cleanup(&path);
423 }
424
425 #[test]
426 fn test_pager_free_page() {
427 let path = temp_db_path();
428 cleanup(&path);
429
430 {
431 let pager = Pager::open_default(&path).expect("open_default() should succeed");
432
433 let page1 = pager
435 .allocate_page(PageType::BTreeLeaf)
436 .expect("allocate_page() should succeed");
437 let page2 = pager
438 .allocate_page(PageType::BTreeLeaf)
439 .expect("allocate_page() should succeed");
440
441 let id1 = page1.page_id();
442 let id2 = page2.page_id();
443
444 pager.free_page(id1).expect("free_page() should succeed");
446
447 let page3 = pager
449 .allocate_page(PageType::BTreeLeaf)
450 .expect("allocate_page() should succeed");
451 assert_eq!(page3.page_id(), id1);
452 }
453
454 cleanup(&path);
455 }
456
457 #[test]
458 fn test_freelist_persistence() {
459 let path = temp_db_path();
460 cleanup(&path);
461
462 let freed_id;
463 {
464 let pager = Pager::open_default(&path).expect("open_default() should succeed");
465 let page1 = pager
466 .allocate_page(PageType::BTreeLeaf)
467 .expect("allocate_page() should succeed");
468 let _page2 = pager
469 .allocate_page(PageType::BTreeLeaf)
470 .expect("allocate_page() should succeed");
471 freed_id = page1.page_id();
472
473 pager
474 .free_page(freed_id)
475 .expect("free_page() should succeed");
476 pager.sync().expect("sync() should succeed");
477 }
478
479 {
480 let pager = Pager::open_default(&path).expect("open_default() should succeed");
481 let page = pager
482 .allocate_page(PageType::BTreeLeaf)
483 .expect("allocate_page() should succeed");
484 assert_eq!(page.page_id(), freed_id);
485 }
486
487 cleanup(&path);
488 }
489
490 #[test]
491 fn test_pager_read_only() {
492 let path = temp_db_path();
493 cleanup(&path);
494
495 {
497 let pager = Pager::open_default(&path).expect("open_default() should succeed");
498 pager.sync().expect("sync() should succeed");
499 }
500
501 {
503 let config = PagerConfig {
504 read_only: true,
505 ..Default::default()
506 };
507
508 let pager = Pager::open(&path, config).expect("open() should succeed");
509 assert!(pager.is_read_only());
510
511 assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
513 }
514
515 cleanup(&path);
516 }
517
518 #[test]
519 fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
520 let path = temp_db_path();
521 cleanup(&path);
522
523 let config = PagerConfig {
524 double_write: true,
525 ..Default::default()
526 };
527
528 let page_id;
529 {
530 let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
531 let page = pager
532 .allocate_page(PageType::BTreeLeaf)
533 .expect("allocate_page() should succeed");
534 page_id = page.page_id();
535 pager.sync().expect("sync() should succeed");
536 }
537
538 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
539 recovered_page
540 .insert_cell(b"key", b"value")
541 .expect("insert_cell() should succeed");
542 write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
543
544 let dwb_path = dwb_path_for(&path);
545 assert!(dwb_path.exists());
546 assert!(
547 fs::metadata(&dwb_path)
548 .expect("metadata() should succeed")
549 .len()
550 > 0
551 );
552
553 {
554 let pager = Pager::open(&path, config).expect("open() should succeed");
555
556 let read_page = pager
557 .read_page(page_id)
558 .expect("read_page() should succeed");
559 let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
560 assert_eq!(key, b"key");
561 assert_eq!(value, b"value");
562
563 assert!(dwb_path.exists());
564 assert_eq!(
565 fs::metadata(&dwb_path)
566 .expect("metadata() should succeed")
567 .len(),
568 0
569 );
570
571 let mut updated_page = recovered_page.clone();
572 updated_page
573 .insert_cell(b"key2", b"value2")
574 .expect("insert_cell() should succeed");
575 pager
576 .write_page(page_id, updated_page)
577 .expect("write_page() should succeed");
578 pager.flush().expect("flush() should succeed");
579
580 assert!(dwb_path.exists());
581 assert_eq!(
582 fs::metadata(&dwb_path)
583 .expect("metadata() should succeed")
584 .len(),
585 0
586 );
587 }
588
589 cleanup(&path);
590 }
591
592 #[test]
593 fn cow_probe_classification_fails_closed_for_btrfs_nodatacow() {
594 assert_eq!(
595 classify_cow_filesystem(ZFS_SUPER_MAGIC, None, None),
596 Some(CowFilesystemKind::Zfs),
597 "ZFS is always CoW"
598 );
599 assert_eq!(
600 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,relatime"), Some(0)),
601 Some(CowFilesystemKind::BtrfsDataCow),
602 "btrfs qualifies only when datacow remains enabled"
603 );
604 assert_eq!(
605 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,nodatacow"), Some(0)),
606 None,
607 "btrfs nodatacow mount option must reject DWB skip"
608 );
609 assert_eq!(
610 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), Some(FS_NOCOW_FL)),
611 None,
612 "btrfs chattr +C / NOCOW inode flag must reject DWB skip"
613 );
614 assert_eq!(
615 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), None),
616 None,
617 "missing btrfs inode flags are uncertain and must fail closed"
618 );
619 assert_eq!(
620 classify_cow_filesystem(BTRFS_SUPER_MAGIC, None, Some(0)),
621 None,
622 "missing btrfs mount options are uncertain and must fail closed"
623 );
624 }
625
626 #[cfg(target_os = "linux")]
627 #[test]
628 fn mountinfo_parser_uses_longest_cow_mount_and_rejects_nodatacow() {
629 let mountinfo = "\
63024 18 0:21 / / rw,relatime - ext4 /dev/root rw\n\
63135 24 0:42 /subvol /mnt/reddb rw,relatime - btrfs /dev/sdb rw,space_cache=v2\n\
63236 35 0:43 /nocow /mnt/reddb/nocow rw,relatime - btrfs /dev/sdb rw,nodatacow\n\
633";
634
635 assert_eq!(
636 parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/data.rdb"))
637 .as_deref(),
638 Some("rw,relatime,rw,space_cache=v2")
639 );
640 assert_eq!(
641 parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/nocow/data.rdb"))
642 .as_deref(),
643 Some("rw,relatime,rw,nodatacow")
644 );
645 }
646
647 #[test]
648 fn double_write_false_keeps_dwb_when_cow_probe_denies() {
649 let _override = cow_atomic_write_override(false);
650 let path = temp_db_path();
651 cleanup(&path);
652
653 {
654 let config = PagerConfig {
655 double_write: false,
656 ..Default::default()
657 };
658 let pager = Pager::open(&path, config).expect("open() should succeed");
659 let page = pager
660 .allocate_page(PageType::BTreeLeaf)
661 .expect("allocate_page() should succeed");
662 pager
663 .write_page(page.page_id(), page)
664 .expect("write_page() should succeed");
665 pager.flush().expect("flush() should succeed");
666 }
667
668 assert!(
669 dwb_path_for(&path).exists(),
670 "DWB must stay enabled when double_write=false is not proven safe"
671 );
672
673 cleanup(&path);
674 }
675
676 #[test]
677 fn double_write_false_skips_dwb_when_cow_probe_allows() {
678 let _override = cow_atomic_write_override(true);
679 let path = temp_db_path();
680 cleanup(&path);
681
682 {
683 let config = PagerConfig {
684 double_write: false,
685 ..Default::default()
686 };
687 let pager = Pager::open(&path, config).expect("open() should succeed");
688 let page = pager
689 .allocate_page(PageType::BTreeLeaf)
690 .expect("allocate_page() should succeed");
691 pager
692 .write_page(page.page_id(), page)
693 .expect("write_page() should succeed");
694 pager.flush().expect("flush() should succeed");
695 }
696
697 assert!(
698 !dwb_path_for(&path).exists(),
699 "DWB may be skipped only after the CoW probe allows it"
700 );
701
702 cleanup(&path);
703 }
704
705 #[test]
706 fn double_write_false_on_cow_replays_then_removes_existing_dwb() {
707 let _override = cow_atomic_write_override(true);
708 let path = temp_db_path();
709 cleanup(&path);
710
711 let page_id;
712 {
713 let pager = Pager::open(&path, PagerConfig::default()).expect("open() should succeed");
714 let page = pager
715 .allocate_page(PageType::BTreeLeaf)
716 .expect("allocate_page() should succeed");
717 page_id = page.page_id();
718 pager.sync().expect("sync() should succeed");
719 }
720
721 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
722 recovered_page
723 .insert_cell(b"key", b"value")
724 .expect("insert_cell() should succeed");
725 write_dwb_fixture(&path, &[(page_id, recovered_page)]);
726
727 {
728 let config = PagerConfig {
729 double_write: false,
730 ..Default::default()
731 };
732 let pager = Pager::open(&path, config).expect("open() should succeed");
733 let read_page = pager
734 .read_page(page_id)
735 .expect("read_page() should succeed");
736 let (key, value) = read_page.read_cell(0).expect("read_cell() should succeed");
737 assert_eq!(key, b"key");
738 assert_eq!(value, b"value");
739 }
740
741 assert!(
742 !dwb_path_for(&path).exists(),
743 "CoW DWB-skip must replay any existing DWB before removing the sidecar"
744 );
745
746 cleanup(&path);
747 }
748
749 #[test]
750 fn simulated_cow_mid_write_leaves_a_whole_consistent_page_without_dwb() {
751 let _override = cow_atomic_write_override(true);
752 let path = temp_db_path();
753 cleanup(&path);
754
755 let config = PagerConfig {
756 double_write: false,
757 ..Default::default()
758 };
759
760 let page_id;
761 let before;
762 let after;
763 {
764 let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
765 let mut page = pager
766 .allocate_page(PageType::BTreeLeaf)
767 .expect("allocate_page() should succeed");
768 page_id = page.page_id();
769 page.insert_cell(b"phase", b"before")
770 .expect("insert_cell() should succeed");
771 pager
772 .write_page(page_id, page)
773 .expect("write_page() should succeed");
774 pager.sync().expect("sync() should succeed");
775 before = pager
776 .read_page(page_id)
777 .expect("read_page() should succeed");
778
779 let mut page = before.clone();
780 page.insert_cell(b"phase2", b"after")
781 .expect("insert_cell() should succeed");
782 pager
783 .write_page(page_id, page)
784 .expect("write_page() should succeed");
785 pager.flush().expect("flush() should succeed");
786 after = pager
787 .read_page(page_id)
788 .expect("read_page() should succeed");
789 }
790
791 for (whole_page, expected_cells) in [(&before, 1), (&after, 2)] {
794 write_page_bytes(&path, page_id, whole_page);
795
796 let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
797 let recovered = pager
798 .read_page(page_id)
799 .expect("read_page() should succeed");
800 assert_eq!(recovered.cell_count(), expected_cells);
801 let (key, value) = recovered.read_cell(0).expect("read_cell() should succeed");
802 assert_eq!(key, b"phase");
803 assert_eq!(value, b"before");
804 if expected_cells == 2 {
805 let (key, value) = recovered.read_cell(1).expect("read_cell() should succeed");
806 assert_eq!(key, b"phase2");
807 assert_eq!(value, b"after");
808 }
809 drop(pager);
810 }
811
812 cleanup(&path);
813 }
814
815 #[test]
816 fn same_mid_write_without_cow_recovers_from_dwb() {
817 let _override = cow_atomic_write_override(false);
818 let path = temp_db_path();
819 cleanup(&path);
820
821 let config = PagerConfig {
822 double_write: false,
823 ..Default::default()
824 };
825
826 let page_id;
827 let before;
828 let after;
829 {
830 let pager = Pager::open(&path, config.clone()).expect("open() should succeed");
831 let mut page = pager
832 .allocate_page(PageType::BTreeLeaf)
833 .expect("allocate_page() should succeed");
834 page_id = page.page_id();
835 page.insert_cell(b"phase", b"before")
836 .expect("insert_cell() should succeed");
837 pager
838 .write_page(page_id, page)
839 .expect("write_page() should succeed");
840 pager.sync().expect("sync() should succeed");
841 before = pager
842 .read_page(page_id)
843 .expect("read_page() should succeed");
844
845 let mut page = before.clone();
846 page.insert_cell(b"phase2", b"after")
847 .expect("insert_cell() should succeed");
848 pager
849 .write_page(page_id, page)
850 .expect("write_page() should succeed");
851 pager.flush().expect("flush() should succeed");
852 after = pager
853 .read_page(page_id)
854 .expect("read_page() should succeed");
855 }
856
857 write_dwb_fixture(&path, &[(page_id, after.clone())]);
858 write_torn_page_bytes(&path, page_id, &before, &after);
859
860 {
861 let pager = Pager::open(&path, config).expect("open() should succeed");
862 let recovered = pager
863 .read_page(page_id)
864 .expect("read_page() should succeed");
865 assert_eq!(recovered.cell_count(), 2);
866
867 let (key, value) = recovered.read_cell(0).expect("read_cell() should succeed");
868 assert_eq!(key, b"phase");
869 assert_eq!(value, b"before");
870
871 let (key, value) = recovered.read_cell(1).expect("read_cell() should succeed");
872 assert_eq!(key, b"phase2");
873 assert_eq!(value, b"after");
874 }
875
876 assert_eq!(
877 fs::metadata(dwb_path_for(&path))
878 .expect("metadata() should succeed")
879 .len(),
880 0
881 );
882 cleanup(&path);
883 }
884
885 #[test]
890 fn pager_starts_without_wal_writer() {
891 let path = temp_db_path();
892 let pager = Pager::open(&path, PagerConfig::default()).expect("open() should succeed");
893 assert!(!pager.has_wal_writer());
894 drop(pager);
895 cleanup(&path);
896 }
897
898 #[test]
899 fn set_wal_writer_attaches_handle() {
900 use crate::storage::wal::writer::WalWriter;
901 use std::sync::{Arc, Mutex};
902
903 let db_path = temp_db_path();
904 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
905 let _ = fs::remove_file(&wal_path);
906
907 let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
908 let wal = Arc::new(Mutex::new(
909 WalWriter::open(&wal_path).expect("open() should succeed"),
910 ));
911 pager.set_wal_writer(Arc::clone(&wal));
912 assert!(pager.has_wal_writer());
913
914 pager.clear_wal_writer();
915 assert!(!pager.has_wal_writer());
916
917 drop(pager);
918 let _ = fs::remove_file(&wal_path);
919 cleanup(&db_path);
920 }
921
922 #[test]
923 fn flush_with_lsn_zero_pages_skips_wal_call() {
924 use crate::storage::wal::writer::WalWriter;
930 use std::sync::{Arc, Mutex};
931
932 let db_path = temp_db_path();
933 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
934 let _ = fs::remove_file(&wal_path);
935
936 let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
937 let wal = Arc::new(Mutex::new(
938 WalWriter::open(&wal_path).expect("open() should succeed"),
939 ));
940 let initial_durable = {
941 let g = wal.lock().expect("lock() should succeed");
942 g.durable_lsn()
943 };
944 pager.set_wal_writer(Arc::clone(&wal));
945
946 let mut page = pager
948 .allocate_page(PageType::BTreeLeaf)
949 .expect("allocate_page() should succeed");
950 page.insert_cell(b"k", b"v")
951 .expect("insert_cell() should succeed");
952 pager
954 .write_page(page.page_id(), page)
955 .expect("write_page() should succeed");
956 pager.flush().expect("flush() should succeed");
957
958 let after_flush = {
961 let g = wal.lock().expect("lock() should succeed");
962 g.durable_lsn()
963 };
964 assert_eq!(after_flush, initial_durable);
965
966 drop(pager);
967 let _ = fs::remove_file(&wal_path);
968 cleanup(&db_path);
969 }
970
971 #[test]
972 fn flush_advances_wal_durable_when_pages_carry_lsn() {
973 use crate::storage::wal::record::WalRecord;
977 use crate::storage::wal::writer::WalWriter;
978 use std::sync::{Arc, Mutex};
979
980 let db_path = temp_db_path();
981 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
982 let _ = fs::remove_file(&wal_path);
983
984 let pager = Pager::open(&db_path, PagerConfig::default()).expect("open() should succeed");
985 let wal = Arc::new(Mutex::new(
986 WalWriter::open(&wal_path).expect("open() should succeed"),
987 ));
988 pager.set_wal_writer(Arc::clone(&wal));
989
990 let stamped_lsn = {
992 let mut wal_guard = wal.lock().expect("lock() should succeed");
993 wal_guard
994 .append(&WalRecord::Begin { tx_id: 1 })
995 .expect("append() should succeed");
996 wal_guard
997 .append(&WalRecord::Commit { tx_id: 1 })
998 .expect("append() should succeed");
999 wal_guard.current_lsn()
1000 };
1001 let mut page = pager
1002 .allocate_page(PageType::BTreeLeaf)
1003 .expect("allocate_page() should succeed");
1004 page.insert_cell(b"k", b"v")
1005 .expect("insert_cell() should succeed");
1006 page.set_lsn(stamped_lsn);
1008 pager
1009 .write_page(page.page_id(), page)
1010 .expect("write_page() should succeed");
1011 pager.flush().expect("flush() should succeed");
1012
1013 let after_flush = {
1015 let g = wal.lock().expect("lock() should succeed");
1016 g.durable_lsn()
1017 };
1018 assert!(
1019 after_flush >= stamped_lsn,
1020 "after flush durable_lsn {} must be >= stamped {}",
1021 after_flush,
1022 stamped_lsn
1023 );
1024
1025 drop(pager);
1026 let _ = fs::remove_file(&wal_path);
1027 cleanup(&db_path);
1028 }
1029
1030 #[test]
1035 fn block_size_warn_fires_for_mismatched_block_size() {
1036 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6000));
1040 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 1_048_576));
1042 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6 * 1024));
1044 }
1045
1046 #[test]
1047 fn block_size_silent_for_divisor() {
1048 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 4096));
1050 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 16384));
1051 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 512));
1052 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 8192));
1053 }
1054
1055 #[test]
1056 fn block_size_unavailable_is_silent() {
1057 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 0));
1059 }
1060
1061 #[test]
1062 fn page_size_is_unchanged_16kib() {
1063 assert_eq!(PAGE_SIZE, 16 * 1024);
1065 }
1066}