1use super::freelist::FreeList;
36use super::page::{Page, PageError, PageType, PAGE_SIZE};
37use super::page_cache::PageCache;
38use crate::storage::wal::writer::WalWriter;
39use fs2::FileExt;
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43#[cfg(test)]
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::{Arc, Mutex, RwLock};
46
47pub use reddb_file::{DatabaseHeader, PhysicalFileHeader};
48
49const DEFAULT_CACHE_SIZE: usize = 10_000;
51
52#[cfg(test)]
53static COW_ATOMIC_WRITE_TEST_OVERRIDE: AtomicU8 = AtomicU8::new(0);
54
55#[derive(Debug)]
57pub enum PagerError {
58 Io(std::io::Error),
60 Page(PageError),
62 InvalidDatabase(String),
64 ReadOnly,
66 PageNotFound(u32),
68 Locked,
70 LockPoisoned,
72 EncryptionRequired,
74 PlainDatabaseRefusesKey,
76 InvalidKey,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct ExtentId {
83 pub start_page: u32,
84 pub n_pages: u32,
85}
86
87impl std::fmt::Display for PagerError {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::Io(e) => write!(f, "I/O error: {}", e),
91 Self::Page(e) => write!(f, "Page error: {}", e),
92 Self::InvalidDatabase(msg) => write!(f, "Invalid database: {}", msg),
93 Self::ReadOnly => write!(f, "Database is read-only"),
94 Self::PageNotFound(id) => write!(f, "Page {} not found", id),
95 Self::Locked => write!(f, "Database is locked"),
96 Self::LockPoisoned => write!(f, "Internal lock poisoned (concurrent thread panicked)"),
97 Self::EncryptionRequired => write!(
98 f,
99 "Database is encrypted but no key was supplied (set PagerConfig::encryption)"
100 ),
101 Self::PlainDatabaseRefusesKey => write!(
102 f,
103 "Plain (unencrypted) database opened with an encryption key — refusing"
104 ),
105 Self::InvalidKey => write!(f, "Encryption key validation failed for this database"),
106 }
107 }
108}
109
110impl std::error::Error for PagerError {
111 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112 match self {
113 Self::Io(e) => Some(e),
114 Self::Page(e) => Some(e),
115 _ => None,
116 }
117 }
118}
119
120impl From<std::io::Error> for PagerError {
121 fn from(e: std::io::Error) -> Self {
122 Self::Io(e)
123 }
124}
125
126impl From<PageError> for PagerError {
127 fn from(e: PageError) -> Self {
128 Self::Page(e)
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct PagerConfig {
135 pub cache_size: usize,
137 pub read_only: bool,
139 pub create: bool,
141 pub verify_checksums: bool,
143 pub double_write: bool,
145 pub encryption: Option<crate::storage::encryption::SecureKey>,
151}
152
153impl Default for PagerConfig {
154 fn default() -> Self {
155 Self {
156 cache_size: DEFAULT_CACHE_SIZE,
157 read_only: false,
158 create: true,
159 verify_checksums: true,
160 double_write: true,
161 encryption: None,
162 }
163 }
164}
165
166pub struct Pager {
170 path: PathBuf,
172 file: Mutex<File>,
174 _lock_file: Option<File>,
176 dwb_file: Option<Mutex<File>>,
178 cache: PageCache,
180 freelist: RwLock<FreeList>,
182 header: RwLock<DatabaseHeader>,
184 config: PagerConfig,
186 header_dirty: Mutex<bool>,
188 wal: RwLock<Option<Arc<Mutex<WalWriter>>>>,
200 pub(crate) encryption: Option<(
207 crate::storage::encryption::PageEncryptor,
208 crate::storage::encryption::EncryptionHeader,
209 )>,
210}
211
212#[path = "pager/impl.rs"]
213mod pager_impl;
214impl Drop for Pager {
215 fn drop(&mut self) {
216 let _ = self.flush();
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 #[cfg(target_os = "linux")]
225 use pager_impl::parse_mountinfo_options_for_path;
226 use pager_impl::{
227 classify_cow_filesystem, CowFilesystemKind, BTRFS_SUPER_MAGIC, FS_NOCOW_FL, ZFS_SUPER_MAGIC,
228 };
229 use std::fs;
230 use std::io::Write;
231
232 fn temp_db_path() -> PathBuf {
233 use std::sync::atomic::{AtomicU64, Ordering};
234 static COUNTER: AtomicU64 = AtomicU64::new(0);
235 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
236 let mut path = std::env::temp_dir();
237 path.push(format!("reddb_test_{}_{}.db", std::process::id(), id));
238 path
239 }
240
241 fn cleanup(path: &Path) {
242 let _ = fs::remove_file(path);
243 let _ = fs::remove_file(reddb_file::layout::pager_header_shadow_path(path));
245 let _ = fs::remove_file(reddb_file::layout::pager_meta_shadow_path(path));
246 let _ = fs::remove_file(reddb_file::layout::pager_dwb_shadow_path(path));
247 }
248
249 fn dwb_path_for(path: &Path) -> PathBuf {
250 reddb_file::layout::pager_dwb_shadow_path(path)
251 }
252
253 static COW_ATOMIC_WRITE_OVERRIDE_GUARD: Mutex<()> = Mutex::new(());
254
255 struct CowAtomicWriteOverrideGuard {
256 _guard: std::sync::MutexGuard<'static, ()>,
257 }
258
259 impl Drop for CowAtomicWriteOverrideGuard {
260 fn drop(&mut self) {
261 COW_ATOMIC_WRITE_TEST_OVERRIDE.store(0, Ordering::Relaxed);
262 }
263 }
264
265 fn cow_atomic_write_override(value: bool) -> CowAtomicWriteOverrideGuard {
266 let guard = COW_ATOMIC_WRITE_OVERRIDE_GUARD
267 .lock()
268 .unwrap_or_else(|err| err.into_inner());
269 COW_ATOMIC_WRITE_TEST_OVERRIDE.store(if value { 1 } else { 2 }, Ordering::Relaxed);
270 CowAtomicWriteOverrideGuard { _guard: guard }
271 }
272
273 fn write_dwb_fixture(path: &Path, pages: &[(u32, Page)]) {
274 let pages: Vec<_> = pages
275 .iter()
276 .map(|(page_id, page)| {
277 let mut page = page.clone();
278 page.update_checksum();
279 (*page_id, page)
280 })
281 .collect();
282 let buf = reddb_file::encode_paged_dwb_frame(
283 pages
284 .iter()
285 .map(|(page_id, page)| (*page_id, page.as_bytes())),
286 );
287
288 let dwb_path = dwb_path_for(path);
289 let mut file = fs::File::create(&dwb_path).unwrap();
290 file.write_all(&buf).unwrap();
291 file.sync_all().unwrap();
292 }
293
294 fn write_page_bytes(path: &Path, page_id: u32, page: &Page) {
295 let mut file = OpenOptions::new().write(true).open(path).unwrap();
296 file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
297 .unwrap();
298 file.write_all(page.as_bytes()).unwrap();
299 file.sync_all().unwrap();
300 }
301
302 fn write_torn_page_bytes(path: &Path, page_id: u32, before: &Page, after: &Page) {
303 let mut torn = *before.as_bytes();
304 torn[..PAGE_SIZE / 2].copy_from_slice(&after.as_bytes()[..PAGE_SIZE / 2]);
305
306 let mut file = OpenOptions::new().write(true).open(path).unwrap();
307 file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))
308 .unwrap();
309 file.write_all(&torn).unwrap();
310 file.sync_all().unwrap();
311 }
312
313 #[test]
314 fn test_pager_create_new() {
315 let path = temp_db_path();
316 cleanup(&path);
317
318 {
319 let pager = Pager::open_default(&path).unwrap();
320 assert_eq!(pager.page_count().unwrap(), 3); }
322
323 cleanup(&path);
324 }
325
326 #[test]
327 fn test_pager_reopen() {
328 let path = temp_db_path();
329 cleanup(&path);
330
331 {
333 let pager = Pager::open_default(&path).unwrap();
334
335 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
337 assert_eq!(page.page_id(), 3);
338
339 pager.sync().unwrap();
340 }
341
342 {
344 let pager = Pager::open_default(&path).unwrap();
345 assert_eq!(pager.page_count().unwrap(), 4); }
347
348 cleanup(&path);
349 }
350
351 #[test]
352 fn test_pager_read_write() {
353 let path = temp_db_path();
354 cleanup(&path);
355
356 {
357 let pager = Pager::open_default(&path).unwrap();
358
359 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
361 let page_id = page.page_id();
362
363 page.insert_cell(b"key", b"value").unwrap();
364 pager.write_page(page_id, page).unwrap();
365
366 let read_page = pager.read_page(page_id).unwrap();
368 let (key, value) = read_page.read_cell(0).unwrap();
369 assert_eq!(key, b"key");
370 assert_eq!(value, b"value");
371 }
372
373 cleanup(&path);
374 }
375
376 #[test]
377 fn test_pager_cache() {
378 let path = temp_db_path();
379 cleanup(&path);
380
381 {
382 let pager = Pager::open_default(&path).unwrap();
383
384 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
386 let page_id = page.page_id();
387
388 let _ = pager.read_page(page_id).unwrap();
390
391 let _ = pager.read_page(page_id).unwrap();
393
394 let stats = pager.cache_stats();
395 assert!(stats.hits >= 1);
396 }
397
398 cleanup(&path);
399 }
400
401 #[test]
402 fn test_pager_free_page() {
403 let path = temp_db_path();
404 cleanup(&path);
405
406 {
407 let pager = Pager::open_default(&path).unwrap();
408
409 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
411 let page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
412
413 let id1 = page1.page_id();
414 let id2 = page2.page_id();
415
416 pager.free_page(id1).unwrap();
418
419 let page3 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
421 assert_eq!(page3.page_id(), id1);
422 }
423
424 cleanup(&path);
425 }
426
427 #[test]
428 fn test_freelist_persistence() {
429 let path = temp_db_path();
430 cleanup(&path);
431
432 let freed_id;
433 {
434 let pager = Pager::open_default(&path).unwrap();
435 let page1 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
436 let _page2 = pager.allocate_page(PageType::BTreeLeaf).unwrap();
437 freed_id = page1.page_id();
438
439 pager.free_page(freed_id).unwrap();
440 pager.sync().unwrap();
441 }
442
443 {
444 let pager = Pager::open_default(&path).unwrap();
445 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
446 assert_eq!(page.page_id(), freed_id);
447 }
448
449 cleanup(&path);
450 }
451
452 #[test]
453 fn test_pager_read_only() {
454 let path = temp_db_path();
455 cleanup(&path);
456
457 {
459 let pager = Pager::open_default(&path).unwrap();
460 pager.sync().unwrap();
461 }
462
463 {
465 let config = PagerConfig {
466 read_only: true,
467 ..Default::default()
468 };
469
470 let pager = Pager::open(&path, config).unwrap();
471 assert!(pager.is_read_only());
472
473 assert!(pager.allocate_page(PageType::BTreeLeaf).is_err());
475 }
476
477 cleanup(&path);
478 }
479
480 #[test]
481 fn test_dwb_recovery_clears_in_place_and_keeps_file_reusable() {
482 let path = temp_db_path();
483 cleanup(&path);
484
485 let config = PagerConfig {
486 double_write: true,
487 ..Default::default()
488 };
489
490 let page_id;
491 {
492 let pager = Pager::open(&path, config.clone()).unwrap();
493 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
494 page_id = page.page_id();
495 pager.sync().unwrap();
496 }
497
498 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
499 recovered_page.insert_cell(b"key", b"value").unwrap();
500 write_dwb_fixture(&path, &[(page_id, recovered_page.clone())]);
501
502 let dwb_path = dwb_path_for(&path);
503 assert!(dwb_path.exists());
504 assert!(fs::metadata(&dwb_path).unwrap().len() > 0);
505
506 {
507 let pager = Pager::open(&path, config).unwrap();
508
509 let read_page = pager.read_page(page_id).unwrap();
510 let (key, value) = read_page.read_cell(0).unwrap();
511 assert_eq!(key, b"key");
512 assert_eq!(value, b"value");
513
514 assert!(dwb_path.exists());
515 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
516
517 let mut updated_page = recovered_page.clone();
518 updated_page.insert_cell(b"key2", b"value2").unwrap();
519 pager.write_page(page_id, updated_page).unwrap();
520 pager.flush().unwrap();
521
522 assert!(dwb_path.exists());
523 assert_eq!(fs::metadata(&dwb_path).unwrap().len(), 0);
524 }
525
526 cleanup(&path);
527 }
528
529 #[test]
530 fn cow_probe_classification_fails_closed_for_btrfs_nodatacow() {
531 assert_eq!(
532 classify_cow_filesystem(ZFS_SUPER_MAGIC, None, None),
533 Some(CowFilesystemKind::Zfs),
534 "ZFS is always CoW"
535 );
536 assert_eq!(
537 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,relatime"), Some(0)),
538 Some(CowFilesystemKind::BtrfsDataCow),
539 "btrfs qualifies only when datacow remains enabled"
540 );
541 assert_eq!(
542 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw,nodatacow"), Some(0)),
543 None,
544 "btrfs nodatacow mount option must reject DWB skip"
545 );
546 assert_eq!(
547 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), Some(FS_NOCOW_FL)),
548 None,
549 "btrfs chattr +C / NOCOW inode flag must reject DWB skip"
550 );
551 assert_eq!(
552 classify_cow_filesystem(BTRFS_SUPER_MAGIC, Some("rw"), None),
553 None,
554 "missing btrfs inode flags are uncertain and must fail closed"
555 );
556 assert_eq!(
557 classify_cow_filesystem(BTRFS_SUPER_MAGIC, None, Some(0)),
558 None,
559 "missing btrfs mount options are uncertain and must fail closed"
560 );
561 }
562
563 #[cfg(target_os = "linux")]
564 #[test]
565 fn mountinfo_parser_uses_longest_cow_mount_and_rejects_nodatacow() {
566 let mountinfo = "\
56724 18 0:21 / / rw,relatime - ext4 /dev/root rw\n\
56835 24 0:42 /subvol /mnt/reddb rw,relatime - btrfs /dev/sdb rw,space_cache=v2\n\
56936 35 0:43 /nocow /mnt/reddb/nocow rw,relatime - btrfs /dev/sdb rw,nodatacow\n\
570";
571
572 assert_eq!(
573 parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/data.rdb"))
574 .as_deref(),
575 Some("rw,relatime,rw,space_cache=v2")
576 );
577 assert_eq!(
578 parse_mountinfo_options_for_path(mountinfo, Path::new("/mnt/reddb/nocow/data.rdb"))
579 .as_deref(),
580 Some("rw,relatime,rw,nodatacow")
581 );
582 }
583
584 #[test]
585 fn double_write_false_keeps_dwb_when_cow_probe_denies() {
586 let _override = cow_atomic_write_override(false);
587 let path = temp_db_path();
588 cleanup(&path);
589
590 {
591 let config = PagerConfig {
592 double_write: false,
593 ..Default::default()
594 };
595 let pager = Pager::open(&path, config).unwrap();
596 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
597 pager.write_page(page.page_id(), page).unwrap();
598 pager.flush().unwrap();
599 }
600
601 assert!(
602 dwb_path_for(&path).exists(),
603 "DWB must stay enabled when double_write=false is not proven safe"
604 );
605
606 cleanup(&path);
607 }
608
609 #[test]
610 fn double_write_false_skips_dwb_when_cow_probe_allows() {
611 let _override = cow_atomic_write_override(true);
612 let path = temp_db_path();
613 cleanup(&path);
614
615 {
616 let config = PagerConfig {
617 double_write: false,
618 ..Default::default()
619 };
620 let pager = Pager::open(&path, config).unwrap();
621 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
622 pager.write_page(page.page_id(), page).unwrap();
623 pager.flush().unwrap();
624 }
625
626 assert!(
627 !dwb_path_for(&path).exists(),
628 "DWB may be skipped only after the CoW probe allows it"
629 );
630
631 cleanup(&path);
632 }
633
634 #[test]
635 fn double_write_false_on_cow_replays_then_removes_existing_dwb() {
636 let _override = cow_atomic_write_override(true);
637 let path = temp_db_path();
638 cleanup(&path);
639
640 let page_id;
641 {
642 let pager = Pager::open(&path, PagerConfig::default()).unwrap();
643 let page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
644 page_id = page.page_id();
645 pager.sync().unwrap();
646 }
647
648 let mut recovered_page = Page::new(PageType::BTreeLeaf, page_id);
649 recovered_page.insert_cell(b"key", b"value").unwrap();
650 write_dwb_fixture(&path, &[(page_id, recovered_page)]);
651
652 {
653 let config = PagerConfig {
654 double_write: false,
655 ..Default::default()
656 };
657 let pager = Pager::open(&path, config).unwrap();
658 let read_page = pager.read_page(page_id).unwrap();
659 let (key, value) = read_page.read_cell(0).unwrap();
660 assert_eq!(key, b"key");
661 assert_eq!(value, b"value");
662 }
663
664 assert!(
665 !dwb_path_for(&path).exists(),
666 "CoW DWB-skip must replay any existing DWB before removing the sidecar"
667 );
668
669 cleanup(&path);
670 }
671
672 #[test]
673 fn simulated_cow_mid_write_leaves_a_whole_consistent_page_without_dwb() {
674 let _override = cow_atomic_write_override(true);
675 let path = temp_db_path();
676 cleanup(&path);
677
678 let config = PagerConfig {
679 double_write: false,
680 ..Default::default()
681 };
682
683 let page_id;
684 let before;
685 let after;
686 {
687 let pager = Pager::open(&path, config.clone()).unwrap();
688 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
689 page_id = page.page_id();
690 page.insert_cell(b"phase", b"before").unwrap();
691 pager.write_page(page_id, page).unwrap();
692 pager.sync().unwrap();
693 before = pager.read_page(page_id).unwrap();
694
695 let mut page = before.clone();
696 page.insert_cell(b"phase2", b"after").unwrap();
697 pager.write_page(page_id, page).unwrap();
698 pager.flush().unwrap();
699 after = pager.read_page(page_id).unwrap();
700 }
701
702 for (whole_page, expected_cells) in [(&before, 1), (&after, 2)] {
705 write_page_bytes(&path, page_id, whole_page);
706
707 let pager = Pager::open(&path, config.clone()).unwrap();
708 let recovered = pager.read_page(page_id).unwrap();
709 assert_eq!(recovered.cell_count(), expected_cells);
710 let (key, value) = recovered.read_cell(0).unwrap();
711 assert_eq!(key, b"phase");
712 assert_eq!(value, b"before");
713 if expected_cells == 2 {
714 let (key, value) = recovered.read_cell(1).unwrap();
715 assert_eq!(key, b"phase2");
716 assert_eq!(value, b"after");
717 }
718 drop(pager);
719 }
720
721 cleanup(&path);
722 }
723
724 #[test]
725 fn same_mid_write_without_cow_recovers_from_dwb() {
726 let _override = cow_atomic_write_override(false);
727 let path = temp_db_path();
728 cleanup(&path);
729
730 let config = PagerConfig {
731 double_write: false,
732 ..Default::default()
733 };
734
735 let page_id;
736 let before;
737 let after;
738 {
739 let pager = Pager::open(&path, config.clone()).unwrap();
740 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
741 page_id = page.page_id();
742 page.insert_cell(b"phase", b"before").unwrap();
743 pager.write_page(page_id, page).unwrap();
744 pager.sync().unwrap();
745 before = pager.read_page(page_id).unwrap();
746
747 let mut page = before.clone();
748 page.insert_cell(b"phase2", b"after").unwrap();
749 pager.write_page(page_id, page).unwrap();
750 pager.flush().unwrap();
751 after = pager.read_page(page_id).unwrap();
752 }
753
754 write_dwb_fixture(&path, &[(page_id, after.clone())]);
755 write_torn_page_bytes(&path, page_id, &before, &after);
756
757 {
758 let pager = Pager::open(&path, config).unwrap();
759 let recovered = pager.read_page(page_id).unwrap();
760 assert_eq!(recovered.cell_count(), 2);
761
762 let (key, value) = recovered.read_cell(0).unwrap();
763 assert_eq!(key, b"phase");
764 assert_eq!(value, b"before");
765
766 let (key, value) = recovered.read_cell(1).unwrap();
767 assert_eq!(key, b"phase2");
768 assert_eq!(value, b"after");
769 }
770
771 assert_eq!(fs::metadata(dwb_path_for(&path)).unwrap().len(), 0);
772 cleanup(&path);
773 }
774
775 #[test]
780 fn pager_starts_without_wal_writer() {
781 let path = temp_db_path();
782 let pager = Pager::open(&path, PagerConfig::default()).unwrap();
783 assert!(!pager.has_wal_writer());
784 drop(pager);
785 cleanup(&path);
786 }
787
788 #[test]
789 fn set_wal_writer_attaches_handle() {
790 use crate::storage::wal::writer::WalWriter;
791 use std::sync::{Arc, Mutex};
792
793 let db_path = temp_db_path();
794 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
795 let _ = fs::remove_file(&wal_path);
796
797 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
798 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
799 pager.set_wal_writer(Arc::clone(&wal));
800 assert!(pager.has_wal_writer());
801
802 pager.clear_wal_writer();
803 assert!(!pager.has_wal_writer());
804
805 drop(pager);
806 let _ = fs::remove_file(&wal_path);
807 cleanup(&db_path);
808 }
809
810 #[test]
811 fn flush_with_lsn_zero_pages_skips_wal_call() {
812 use crate::storage::wal::writer::WalWriter;
818 use std::sync::{Arc, Mutex};
819
820 let db_path = temp_db_path();
821 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
822 let _ = fs::remove_file(&wal_path);
823
824 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
825 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
826 let initial_durable = {
827 let g = wal.lock().unwrap();
828 g.durable_lsn()
829 };
830 pager.set_wal_writer(Arc::clone(&wal));
831
832 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
834 page.insert_cell(b"k", b"v").unwrap();
835 pager.write_page(page.page_id(), page).unwrap();
837 pager.flush().unwrap();
838
839 let after_flush = {
842 let g = wal.lock().unwrap();
843 g.durable_lsn()
844 };
845 assert_eq!(after_flush, initial_durable);
846
847 drop(pager);
848 let _ = fs::remove_file(&wal_path);
849 cleanup(&db_path);
850 }
851
852 #[test]
853 fn flush_advances_wal_durable_when_pages_carry_lsn() {
854 use crate::storage::wal::record::WalRecord;
858 use crate::storage::wal::writer::WalWriter;
859 use std::sync::{Arc, Mutex};
860
861 let db_path = temp_db_path();
862 let wal_path = reddb_file::layout::pager_legacy_wal_path(&db_path);
863 let _ = fs::remove_file(&wal_path);
864
865 let pager = Pager::open(&db_path, PagerConfig::default()).unwrap();
866 let wal = Arc::new(Mutex::new(WalWriter::open(&wal_path).unwrap()));
867 pager.set_wal_writer(Arc::clone(&wal));
868
869 let stamped_lsn = {
871 let mut wal_guard = wal.lock().unwrap();
872 wal_guard.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
873 wal_guard.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
874 wal_guard.current_lsn()
875 };
876 let mut page = pager.allocate_page(PageType::BTreeLeaf).unwrap();
877 page.insert_cell(b"k", b"v").unwrap();
878 page.set_lsn(stamped_lsn);
880 pager.write_page(page.page_id(), page).unwrap();
881 pager.flush().unwrap();
882
883 let after_flush = {
885 let g = wal.lock().unwrap();
886 g.durable_lsn()
887 };
888 assert!(
889 after_flush >= stamped_lsn,
890 "after flush durable_lsn {} must be >= stamped {}",
891 after_flush,
892 stamped_lsn
893 );
894
895 drop(pager);
896 let _ = fs::remove_file(&wal_path);
897 cleanup(&db_path);
898 }
899
900 #[test]
905 fn block_size_warn_fires_for_mismatched_block_size() {
906 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6000));
910 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 1_048_576));
912 assert!(Pager::page_size_misaligned_with_block(PAGE_SIZE, 6 * 1024));
914 }
915
916 #[test]
917 fn block_size_silent_for_divisor() {
918 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 4096));
920 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 16384));
921 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 512));
922 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 8192));
923 }
924
925 #[test]
926 fn block_size_unavailable_is_silent() {
927 assert!(!Pager::page_size_misaligned_with_block(PAGE_SIZE, 0));
929 }
930
931 #[test]
932 fn page_size_is_unchanged_16kib() {
933 assert_eq!(PAGE_SIZE, 16 * 1024);
935 }
936}