1use super::*;
2
3pub(super) const BTRFS_SUPER_MAGIC: i64 = 0x9123_683e;
4pub(super) const ZFS_SUPER_MAGIC: i64 = 0x2fc1_2fc1;
5pub(super) const FS_NOCOW_FL: u64 = 0x0080_0000;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub(super) enum CowFilesystemKind {
9 Zfs,
10 BtrfsDataCow,
11 TestOverride,
12}
13
14pub(super) fn classify_cow_filesystem(
15 fs_type: i64,
16 mount_options: Option<&str>,
17 inode_flags: Option<u64>,
18) -> Option<CowFilesystemKind> {
19 match fs_type {
20 ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
21 BTRFS_SUPER_MAGIC => {
22 let mount_options = mount_options?;
23 if mount_options.split(',').any(|option| option == "nodatacow") {
24 return None;
25 }
26
27 let inode_flags = inode_flags?;
28 if inode_flags & FS_NOCOW_FL != 0 {
29 return None;
30 }
31
32 Some(CowFilesystemKind::BtrfsDataCow)
33 }
34 _ => None,
35 }
36}
37
38#[cfg(target_os = "linux")]
39fn linux_fstatfs_type(file: &File) -> Option<i64> {
40 use std::mem::MaybeUninit;
41 use std::os::fd::AsRawFd;
42
43 let mut stat = MaybeUninit::<libc::statfs>::uninit();
44 let rc = unsafe { libc::fstatfs(file.as_raw_fd(), stat.as_mut_ptr()) };
45 if rc != 0 {
46 return None;
47 }
48 let stat = unsafe { stat.assume_init() };
49 #[allow(clippy::unnecessary_cast)]
52 Some(stat.f_type as i64)
53}
54
55#[cfg(target_os = "linux")]
56fn linux_inode_flags(file: &File) -> Option<u64> {
57 use std::os::fd::AsRawFd;
58
59 let mut flags: libc::c_long = 0;
60 let rc = unsafe { libc::ioctl(file.as_raw_fd(), libc::FS_IOC_GETFLAGS, &mut flags) };
61 if rc != 0 {
62 return None;
63 }
64 Some(flags as u64)
65}
66
67#[cfg(target_os = "linux")]
68fn linux_mount_options_for_path(path: &Path) -> Option<String> {
69 let path = path.canonicalize().ok()?;
70 let mountinfo = std::fs::read_to_string("/proc/self/mountinfo").ok()?;
71 parse_mountinfo_options_for_path(&mountinfo, &path)
72}
73
74#[cfg(target_os = "linux")]
75pub(super) fn parse_mountinfo_options_for_path(mountinfo: &str, path: &Path) -> Option<String> {
76 let mut best: Option<(usize, String)> = None;
77
78 for line in mountinfo.lines() {
79 let fields: Vec<&str> = line.split(' ').collect();
80 if fields.len() < 10 {
81 continue;
82 }
83
84 let Some(separator) = fields.iter().position(|field| *field == "-") else {
85 continue;
86 };
87 if separator + 3 >= fields.len() || separator < 6 {
88 continue;
89 }
90
91 let mount_point = mountinfo_unescape_path(fields[4]);
92 if !path.starts_with(&mount_point) {
93 continue;
94 }
95
96 let fs_type = fields[separator + 1];
97 if fs_type != "btrfs" && fs_type != "zfs" {
98 continue;
99 }
100
101 let mount_options = fields[5];
102 let super_options = fields[separator + 3];
103 let options = format!("{mount_options},{super_options}");
104 let depth = mount_point.components().count();
105 if best
106 .as_ref()
107 .map(|(best_depth, _)| depth > *best_depth)
108 .unwrap_or(true)
109 {
110 best = Some((depth, options));
111 }
112 }
113
114 best.map(|(_, options)| options)
115}
116
117#[cfg(target_os = "linux")]
118fn mountinfo_unescape_path(value: &str) -> PathBuf {
119 let bytes = value.as_bytes();
120 let mut out = Vec::with_capacity(bytes.len());
121 let mut i = 0;
122
123 while i < bytes.len() {
124 if bytes[i] == b'\\' && i + 3 < bytes.len() {
125 let octal = &value[i + 1..i + 4];
126 if let Ok(byte) = u8::from_str_radix(octal, 8) {
127 out.push(byte);
128 i += 4;
129 continue;
130 }
131 }
132 out.push(bytes[i]);
133 i += 1;
134 }
135
136 PathBuf::from(String::from_utf8_lossy(&out).into_owned())
137}
138
139impl Pager {
140 pub fn open<P: AsRef<Path>>(path: P, mut config: PagerConfig) -> Result<Self, PagerError> {
142 let path = path.as_ref().to_path_buf();
143 let exists = path.exists();
144
145 if !exists && !config.create {
146 return Err(PagerError::InvalidDatabase(
147 "Database does not exist".into(),
148 ));
149 }
150
151 if !exists && config.read_only {
152 return Err(PagerError::InvalidDatabase(
153 "Cannot create read-only database".into(),
154 ));
155 }
156
157 let file = OpenOptions::new()
160 .read(true)
161 .write(!config.read_only)
162 .create(config.create && !config.read_only)
163 .open(&path)?;
164
165 #[cfg(unix)]
171 {
172 use std::os::unix::fs::MetadataExt;
173 if let Ok(meta) = file.metadata() {
174 let fs_block_size = meta.blksize();
175 if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
176 tracing::warn!(
177 page_size = PAGE_SIZE,
178 fs_block_size,
179 path = %path.display(),
180 "database page size is not a multiple of the filesystem \
181 block size; page writes will straddle FS blocks \
182 (read-modify-write amplification). Diagnostic only — \
183 the page size is unchanged."
184 );
185 }
186 }
187 }
188
189 let lock_file = if !config.read_only {
191 let lf = OpenOptions::new().read(true).write(true).open(&path)?;
192 lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
193 Some(lf)
194 } else {
195 let lf = OpenOptions::new().read(true).open(&path)?;
196 match lf.try_lock_shared() {
197 Ok(_) => Some(lf),
198 Err(_) => None,
199 }
200 };
201
202 let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
214 if !config.double_write && !config.read_only && !fold_dwb {
215 let skip_dwb_on_cow =
216 Self::cow_filesystem_has_atomic_page_writes(&path, &file).is_some();
217 if !skip_dwb_on_cow {
218 tracing::warn!(
219 path = %path.display(),
220 "double_write=false requested, but the data file is not proven to be on \
221 ZFS or btrfs datacow; keeping the double-write buffer enabled"
222 );
223 config.double_write = true;
224 }
225 }
226
227 let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
228 let f = Self::open_dwb_file(&path)?;
229 Some(Mutex::new(f))
230 } else {
231 if fold_dwb && !config.read_only {
232 let _ = std::fs::remove_file(Self::dwb_path(&path));
233 }
234 None
235 };
236
237 let mut pager = Self {
238 path,
239 file: Mutex::new(file),
240 _lock_file: lock_file,
241 dwb_file,
242 cache: PageCache::new(config.cache_size),
243 freelist: RwLock::new(FreeList::new()),
244 header: RwLock::new(DatabaseHeader::default()),
245 config,
246 header_dirty: Mutex::new(false),
247 wal: RwLock::new(None),
248 encryption: None,
249 };
250
251 if exists {
252 pager.recover_from_dwb()?;
254 if !pager.config.double_write && !pager.config.read_only {
255 let _ = std::fs::remove_file(Self::dwb_path(&pager.path));
256 }
257 pager.load_header()?;
259 pager.bind_encryption_for_existing()?;
260 } else {
261 pager.initialize()?;
263 pager.bind_encryption_for_new()?;
264 }
265
266 Ok(pager)
267 }
268
269 pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
276 fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
277 }
278
279 #[cfg(test)]
280 fn cow_filesystem_test_override() -> Option<bool> {
281 match COW_ATOMIC_WRITE_TEST_OVERRIDE.load(Ordering::Relaxed) {
282 1 => Some(true),
283 2 => Some(false),
284 _ => None,
285 }
286 }
287
288 #[cfg(not(test))]
289 fn cow_filesystem_test_override() -> Option<bool> {
290 None
291 }
292
293 fn cow_filesystem_has_atomic_page_writes(
294 path: &Path,
295 file: &File,
296 ) -> Option<CowFilesystemKind> {
297 if let Some(allowed) = Self::cow_filesystem_test_override() {
298 return allowed.then_some(CowFilesystemKind::TestOverride);
299 }
300 Self::probe_cow_filesystem(path, file)
301 }
302
303 #[cfg(target_os = "linux")]
304 fn probe_cow_filesystem(path: &Path, file: &File) -> Option<CowFilesystemKind> {
305 let fs_type = linux_fstatfs_type(file)?;
306 match fs_type {
307 ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
308 BTRFS_SUPER_MAGIC => {
309 let mount_options = linux_mount_options_for_path(path)?;
310 let inode_flags = linux_inode_flags(file)?;
311 classify_cow_filesystem(fs_type, Some(&mount_options), Some(inode_flags))
312 }
313 _ => None,
314 }
315 }
316
317 #[cfg(not(target_os = "linux"))]
318 fn probe_cow_filesystem(_path: &Path, _file: &File) -> Option<CowFilesystemKind> {
319 None
320 }
321
322 fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
332 if self.page_count().unwrap_or(0) == 0 {
333 return self.bind_encryption_for_new();
334 }
335 let header_page = self.read_page_no_checksum(0)?;
336 let data = header_page.as_bytes();
337 let has_marker = reddb_file::paged_encryption_marker_present(data);
338
339 let key = self.config.encryption.clone();
340 match (has_marker, key) {
341 (true, Some(key)) => {
342 let header_bytes =
343 reddb_file::paged_encryption_header_bytes(data).ok_or_else(|| {
344 PagerError::InvalidDatabase("encryption header parse failed".to_string())
345 })?;
346 let header = crate::storage::encryption::EncryptionHeader::from_bytes(header_bytes)
347 .map_err(|e| {
348 PagerError::InvalidDatabase(format!("encryption header parse failed: {e}"))
349 })?;
350 if !header.validate(&key) {
351 return Err(PagerError::InvalidKey);
352 }
353 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
354 self.encryption = Some((encryptor, header));
355 Ok(())
356 }
357 (true, None) => Err(PagerError::EncryptionRequired),
358 (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
359 (false, None) => Ok(()),
360 }
361 }
362
363 fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
366 let Some(key) = self.config.encryption.clone() else {
367 return Ok(());
368 };
369 let header = crate::storage::encryption::EncryptionHeader::new(&key);
370 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
371
372 if self.page_count().unwrap_or(0) > 0 {
375 let mut page = self.read_page_no_checksum(0)?;
376 let data = page.as_bytes_mut();
377 let header_bytes = header.to_bytes();
378 reddb_file::write_paged_encryption_marker_and_header(data, &header_bytes)
379 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
380 self.write_page_no_checksum(0, page)?;
381 }
382 self.encryption = Some((encryptor, header));
383 Ok(())
384 }
385
386 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
388 Self::open(path, PagerConfig::default())
389 }
390
391 fn initialize(&self) -> Result<(), PagerError> {
393 if self.config.read_only {
394 return Err(PagerError::ReadOnly);
395 }
396
397 let initial_page_count = 3;
401 let header_page = Page::new_header_page(initial_page_count);
402 self.header_write()?.page_count = initial_page_count;
403
404 self.write_page_raw(0, &header_page)?;
407 let mut metadata_page = Page::new(PageType::Header, 1);
408 metadata_page.update_checksum();
409 self.write_page_raw(1, &metadata_page)?;
410 let mut vault_page = Page::new(PageType::Vault, 2);
411 vault_page.update_checksum();
412 self.write_page_raw(2, &vault_page)?;
413
414 self.sync()?;
416
417 Ok(())
418 }
419
420 fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
422 self.header.write().map_err(|_| PagerError::LockPoisoned)
423 }
424
425 fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
427 self.header.read().map_err(|_| PagerError::LockPoisoned)
428 }
429
430 fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
432 self.freelist.write().map_err(|_| PagerError::LockPoisoned)
433 }
434
435 fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
437 self.file.lock().map_err(|_| PagerError::LockPoisoned)
438 }
439
440 fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
442 self.header_dirty
443 .lock()
444 .map_err(|_| PagerError::LockPoisoned)
445 }
446
447 fn load_header(&self) -> Result<(), PagerError> {
449 let header_page = match self.read_page_raw(0) {
451 Ok(page) => {
452 if reddb_file::database_header_magic_matches(page.as_bytes()) {
454 page
455 } else {
456 self.recover_header_from_shadow()?
458 }
459 }
460 Err(_) => self.recover_header_from_shadow()?,
461 };
462
463 let decoded_header = reddb_file::decode_database_header(header_page.as_bytes())
464 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
465 let freelist_head = decoded_header.freelist_head;
466
467 {
468 let mut header = self.header_write()?;
469 *header = decoded_header;
470 }
471
472 {
474 let mut freelist = self.freelist_write()?;
475 *freelist = FreeList::from_header(freelist_head, 0);
476 }
477
478 Ok(())
479 }
480
481 fn write_header(&self) -> Result<(), PagerError> {
486 if self.config.read_only {
487 return Err(PagerError::ReadOnly);
488 }
489
490 let header = self.header_read()?;
491
492 let mut page = if let Some(cached) = self.cache.get(0) {
495 cached
496 } else {
497 let file = self.file_lock()?;
499 let len = file.metadata().map(|m| m.len()).unwrap_or(0);
500 drop(file);
501
502 if len >= PAGE_SIZE as u64 {
503 self.read_page_raw(0)?
504 } else {
505 Page::new(PageType::Header, 0)
507 }
508 };
509
510 let data = page.as_bytes_mut();
511
512 reddb_file::encode_database_header(data, &header)
513 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
514
515 page.update_checksum();
516
517 self.write_header_shadow(&page)?;
519
520 self.write_page_raw(0, &page)?;
521 *self.header_dirty_lock()? = false;
522
523 Ok(())
524 }
525
526 fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
528 let mut file = self.file_lock()?;
529 let offset = (page_id as u64) * (PAGE_SIZE as u64);
530
531 file.seek(SeekFrom::Start(offset))?;
532
533 let mut buf = [0u8; PAGE_SIZE];
534 file.read_exact(&mut buf)?;
535
536 let page = Page::from_bytes(buf);
537
538 if self.config.verify_checksums {
540 page.verify_checksum()?;
541 }
542
543 Ok(page)
544 }
545
546 fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
548 if self.config.read_only {
549 return Err(PagerError::ReadOnly);
550 }
551
552 let mut file = self.file_lock()?;
553 let offset = (page_id as u64) * (PAGE_SIZE as u64);
554
555 file.seek(SeekFrom::Start(offset))?;
556 file.write_all(page.as_bytes())?;
557
558 Ok(())
559 }
560
561 pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
563 if let Some(page) = self.cache.get(page_id) {
565 return Ok(page);
566 }
567
568 let page = self.read_page_raw(page_id)?;
570
571 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
573 let evicted_id = dirty_page.page_id();
575 self.write_page_raw(evicted_id, &dirty_page)?;
576 }
577
578 Ok(page)
579 }
580
581 pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
586 if let Some(page) = self.cache.get(page_id) {
588 return Ok(page);
589 }
590
591 let mut file = self.file_lock()?;
593 let offset = (page_id as u64) * (PAGE_SIZE as u64);
594
595 file.seek(SeekFrom::Start(offset))?;
596
597 let mut buf = [0u8; PAGE_SIZE];
598 file.read_exact(&mut buf)?;
599 drop(file);
600
601 let page = Page::from_bytes(buf);
602
603 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
605 let evicted_id = dirty_page.page_id();
607 self.write_page_raw(evicted_id, &dirty_page)?;
608 }
609
610 Ok(page)
611 }
612
613 pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
615 if self.config.read_only {
616 return Err(PagerError::ReadOnly);
617 }
618
619 page.update_checksum();
621
622 if let Some(dirty_page) = self.cache.insert(page_id, page) {
627 let evicted_id = dirty_page.page_id();
628 self.write_page_raw(evicted_id, &dirty_page)?;
629 }
630 self.cache.mark_dirty(page_id);
631
632 Ok(())
633 }
634
635 pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
640 if page_id == 0 || self.encryption.is_none() {
641 return self.read_page(page_id);
642 }
643 let raw = self.read_page_no_checksum(page_id)?;
644 let (enc, _) = self
645 .encryption
646 .as_ref()
647 .expect("encryption presence checked above");
648 let plaintext = enc
649 .decrypt(page_id, raw.as_bytes())
650 .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
651 let mut buf = [0u8; PAGE_SIZE];
652 let n = plaintext.len().min(PAGE_SIZE);
653 buf[..n].copy_from_slice(&plaintext[..n]);
654 Ok(Page::from_bytes(buf))
655 }
656
657 pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
662 if page_id == 0 || self.encryption.is_none() {
663 return self.write_page(page_id, page);
664 }
665 const OVERHEAD: usize = 12 + 16; let plaintext_len = PAGE_SIZE - OVERHEAD;
667 let plaintext = &page.as_bytes()[..plaintext_len];
668 let (enc, _) = self
669 .encryption
670 .as_ref()
671 .expect("encryption presence checked above");
672 let ciphertext = enc.encrypt(page_id, plaintext);
673 debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
674 let mut buf = [0u8; PAGE_SIZE];
675 buf.copy_from_slice(&ciphertext);
676 let cipher_page = Page::from_bytes(buf);
677 self.write_page_no_checksum(page_id, cipher_page)
678 }
679
680 pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
685 if self.config.read_only {
686 return Err(PagerError::ReadOnly);
687 }
688
689 if let Some(dirty_page) = self.cache.insert(page_id, page) {
692 let evicted_id = dirty_page.page_id();
693 self.write_page_raw(evicted_id, &dirty_page)?;
694 }
695 self.cache.mark_dirty(page_id);
696
697 Ok(())
698 }
699
700 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
702 if self.config.read_only {
703 return Err(PagerError::ReadOnly);
704 }
705
706 let page_id = {
708 let mut freelist = self.freelist_write()?;
709 if let Some(id) = freelist.allocate() {
710 id
711 } else if freelist.trunk_head() != 0 {
712 let trunk_id = freelist.trunk_head();
713 drop(freelist);
714
715 let trunk = self.read_page(trunk_id).map_err(|e| match e {
716 PagerError::PageNotFound(_) => {
717 PagerError::InvalidDatabase("Freelist trunk missing".to_string())
718 }
719 other => other,
720 })?;
721
722 let mut freelist = self.freelist_write()?;
723 freelist
724 .load_from_trunk(&trunk)
725 .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
726 let id = freelist.allocate().ok_or_else(|| {
727 PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
728 })?;
729
730 let mut header = self.header_write()?;
731 header.freelist_head = freelist.trunk_head();
732 *self.header_dirty_lock()? = true;
733
734 id
735 } else {
736 let mut header = self.header_write()?;
738 let id = header.page_count;
739 header.page_count += 1;
740 *self.header_dirty_lock()? = true;
741 id
742 }
743 };
744
745 let page = Page::new(page_type, page_id);
746
747 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
759 let evicted_id = dirty_page.page_id();
760 self.write_page_raw(evicted_id, &dirty_page)?;
761 }
762 self.cache.mark_dirty(page_id);
763
764 Ok(page)
765 }
766
767 pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
769 if self.config.read_only {
770 return Err(PagerError::ReadOnly);
771 }
772 if n_pages == 0 {
773 return Err(PagerError::InvalidDatabase(
774 "contiguous extent must reserve at least one page".to_string(),
775 ));
776 }
777
778 let start_page = {
779 let mut header = self.header_write()?;
780 let start = header.page_count;
781 header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
782 PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
783 })?;
784 *self.header_dirty_lock()? = true;
785 start
786 };
787
788 for page_id in start_page..start_page + n_pages {
789 let mut page = Page::new(PageType::Vector, page_id);
790 page.update_checksum();
791 if let Some(dirty_page) = self.cache.insert(page_id, page) {
792 let evicted_id = dirty_page.page_id();
793 self.write_page_raw(evicted_id, &dirty_page)?;
794 }
795 self.cache.mark_dirty(page_id);
796 }
797
798 Ok(super::ExtentId {
799 start_page,
800 n_pages,
801 })
802 }
803
804 pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
806 if self.config.read_only {
807 return Err(PagerError::ReadOnly);
808 }
809
810 self.cache.remove(page_id);
812
813 let mut freelist = self.freelist_write()?;
815 freelist.free(page_id);
816
817 *self.header_dirty_lock()? = true;
818
819 Ok(())
820 }
821
822 pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
824 Ok(self.header_read()?.clone())
825 }
826
827 pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
828 Ok(self.header_read()?.physical)
829 }
830
831 pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
832 if self.config.read_only {
833 return Err(PagerError::ReadOnly);
834 }
835
836 let mut header = self.header_write()?;
837 header.physical = physical;
838 *self.header_dirty_lock()? = true;
839 Ok(())
840 }
841
842 pub fn page_count(&self) -> Result<u32, PagerError> {
844 Ok(self.header_read()?.page_count)
845 }
846
847 pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
859 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
860 *slot = Some(wal);
861 }
862
863 pub fn clear_wal_writer(&self) {
865 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
866 *slot = None;
867 }
868
869 pub fn has_wal_writer(&self) -> bool {
871 self.wal.read().map(|s| s.is_some()).unwrap_or(false)
872 }
873
874 pub fn flush(&self) -> Result<(), PagerError> {
876 if self.config.read_only {
877 return Ok(());
878 }
879
880 let trunks = {
882 let mut freelist = self.freelist_write()?;
883 if freelist.is_dirty() {
884 let mut header = self.header_write()?;
885 let trunks = freelist.flush_to_trunks(0, || {
886 let id = header.page_count;
887 header.page_count += 1;
888 id
889 });
890 header.freelist_head = freelist.trunk_head();
891 *self.header_dirty_lock()? = true;
892 freelist.mark_clean();
893 trunks
894 } else {
895 Vec::new()
896 }
897 };
898
899 for trunk in trunks {
900 let page_id = trunk.page_id();
901 self.cache.insert(page_id, trunk);
902 self.cache.mark_dirty(page_id);
903 }
904
905 let dirty_pages = self.cache.flush_dirty();
907 if !dirty_pages.is_empty() {
908 let max_lsn = dirty_pages
913 .iter()
914 .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
915 .max()
916 .unwrap_or(0);
917 if max_lsn > 0 {
918 if let Ok(slot) = self.wal.read() {
919 if let Some(wal) = slot.as_ref() {
920 let wal = Arc::clone(wal);
921 drop(slot);
925 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
926 wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
927 }
928 }
929 }
930 self.write_pages_through_dwb(&dirty_pages)?;
931 }
932
933 if *self.header_dirty_lock()? {
935 self.write_header()?;
936 }
937
938 Ok(())
939 }
940
941 pub fn sync(&self) -> Result<(), PagerError> {
943 self.flush()?;
944
945 let file = self.file_lock()?;
946 file.sync_all()?;
947
948 Ok(())
949 }
950
951 pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
953 self.cache.stats()
954 }
955
956 pub fn dirty_page_count(&self) -> usize {
958 self.cache.dirty_count()
959 }
960
961 pub fn dirty_fraction(&self) -> f64 {
965 let capacity = self.cache.capacity().max(1) as f64;
966 self.cache.dirty_count() as f64 / capacity
967 }
968
969 pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
973 if self.config.read_only || max == 0 {
974 return Ok(0);
975 }
976 let dirty_pages = self.cache.flush_some_dirty(max);
977 if dirty_pages.is_empty() {
978 return Ok(0);
979 }
980 let count = dirty_pages.len();
981 for (page_id, page) in dirty_pages {
988 self.write_page(page_id, page)?;
989 }
990 Ok(count)
991 }
992
993 pub fn path(&self) -> &Path {
995 &self.path
996 }
997
998 pub fn is_read_only(&self) -> bool {
1000 self.config.read_only
1001 }
1002
1003 pub fn file_size(&self) -> Result<u64, PagerError> {
1005 let file = self.file_lock()?;
1006 Ok(file.metadata()?.len())
1007 }
1008
1009 pub fn prefetch_hint(&self, page_id: u32) {
1017 if let Ok(file) = self.file_lock() {
1018 let _ = crate::storage::btree::prefetch::prefetch_page(
1019 &file,
1020 page_id as u64,
1021 PAGE_SIZE as u32,
1022 );
1023 }
1024 }
1025
1026 fn shadow_path(db_path: &Path) -> PathBuf {
1030 reddb_file::layout::pager_header_shadow_path(db_path)
1031 }
1032
1033 fn meta_shadow_path(db_path: &Path) -> PathBuf {
1035 reddb_file::layout::pager_meta_shadow_path(db_path)
1036 }
1037
1038 fn dwb_path(db_path: &Path) -> PathBuf {
1040 reddb_file::layout::pager_dwb_shadow_path(db_path)
1041 }
1042
1043 fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1048 Ok(OpenOptions::new()
1049 .read(true)
1050 .write(true)
1051 .create(true)
1052 .truncate(false)
1053 .open(Self::dwb_path(db_path))?)
1054 }
1055
1056 fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1058 file.set_len(0)?;
1059 file.seek(SeekFrom::Start(0))?;
1060 file.sync_all()?;
1061 Ok(())
1062 }
1063
1064 fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1066 if self.config.read_only {
1067 return Ok(());
1068 }
1069 let shadow = Self::shadow_path(&self.path);
1070 let mut f = File::create(&shadow)?;
1071 f.write_all(page.as_bytes())?;
1072 f.sync_all()?;
1073 Ok(())
1074 }
1075
1076 fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1078 let shadow = Self::shadow_path(&self.path);
1079 if !shadow.exists() {
1080 return Err(PagerError::InvalidDatabase(
1081 "Page 0 corrupted and no header shadow found".into(),
1082 ));
1083 }
1084 let mut f = File::open(&shadow)?;
1085 let mut buf = [0u8; PAGE_SIZE];
1086 f.read_exact(&mut buf)?;
1087 let page = Page::from_bytes(buf);
1088
1089 if !reddb_file::database_header_magic_matches(page.as_bytes()) {
1091 return Err(PagerError::InvalidDatabase(
1092 "Header shadow also corrupted".into(),
1093 ));
1094 }
1095
1096 if !self.config.read_only {
1098 self.write_page_raw(0, &page)?;
1099 let file = self.file_lock()?;
1100 file.sync_all()?;
1101 }
1102
1103 Ok(page)
1104 }
1105
1106 pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1116 if self.config.read_only {
1117 return Ok(());
1118 }
1119 let shadow = Self::meta_shadow_path(&self.path);
1120 if crate::physical::fold_pager_meta_enabled() {
1121 let _ = std::fs::remove_file(&shadow);
1124 return Ok(());
1125 }
1126 let mut f = File::create(&shadow)?;
1127 f.write_all(page.as_bytes())?;
1128 f.sync_all()?;
1129 Ok(())
1130 }
1131
1132 pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1134 let shadow = Self::meta_shadow_path(&self.path);
1135 if !shadow.exists() {
1136 return Err(PagerError::InvalidDatabase(
1137 "Page 1 corrupted and no metadata shadow found".into(),
1138 ));
1139 }
1140 let mut f = File::open(&shadow)?;
1141 let mut buf = [0u8; PAGE_SIZE];
1142 f.read_exact(&mut buf)?;
1143 let page = Page::from_bytes(buf);
1144
1145 if !self.config.read_only {
1147 self.write_page_raw(1, &page)?;
1148 let file = self.file_lock()?;
1149 file.sync_all()?;
1150 }
1151
1152 Ok(page)
1153 }
1154
1155 fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1162 if let Some(dwb_mutex) = &self.dwb_file {
1163 let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1164
1165 let buf = reddb_file::encode_paged_dwb_frame(
1166 pages
1167 .iter()
1168 .map(|(page_id, page)| (*page_id, page.as_bytes())),
1169 );
1170
1171 dwb.seek(SeekFrom::Start(0))?;
1173 dwb.write_all(&buf)?;
1174 dwb.set_len(buf.len() as u64)?;
1175 dwb.sync_all()?;
1176
1177 for (page_id, page) in pages {
1179 self.write_page_raw(*page_id, page)?;
1180 }
1181
1182 Self::clear_dwb_file(&mut dwb)?;
1184
1185 Ok(())
1186 } else {
1187 for (page_id, page) in pages {
1189 self.write_page_raw(*page_id, page)?;
1190 }
1191 Ok(())
1192 }
1193 }
1194
1195 fn recover_from_dwb(&self) -> Result<(), PagerError> {
1200 let dwb_path = Self::dwb_path(&self.path);
1201 if !dwb_path.exists() {
1202 return Ok(());
1203 }
1204
1205 if let Some(dwb_mutex) = &self.dwb_file {
1206 let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1207 return self.recover_from_dwb_file(&mut file);
1208 }
1209
1210 let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1211 self.recover_from_dwb_file(&mut file)
1212 }
1213
1214 fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1215 file.seek(SeekFrom::Start(0))?;
1216 let len = file.metadata()?.len();
1217 let mut buf = vec![0u8; len as usize];
1218 file.read_exact(&mut buf)?;
1219
1220 let entries = match reddb_file::decode_paged_dwb_frame(&buf) {
1221 Ok(entries) => entries,
1222 Err(_) => return Self::clear_dwb_file(file),
1223 };
1224
1225 for entry in entries {
1227 let page = Page::from_bytes(entry.page);
1228 self.write_page_raw(entry.page_id, &page)?;
1229 }
1230
1231 {
1233 let file = self.file_lock()?;
1234 file.sync_all()?;
1235 }
1236
1237 Self::clear_dwb_file(file)
1238 }
1239
1240 pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1242 self.write_header()?;
1243 let file = self.file_lock()?;
1244 file.sync_all()?;
1245 Ok(())
1246 }
1247
1248 pub fn set_checkpoint_in_progress(
1250 &self,
1251 in_progress: bool,
1252 target_lsn: u64,
1253 ) -> Result<(), PagerError> {
1254 let mut header = self.header_write()?;
1255 header.checkpoint_in_progress = in_progress;
1256 header.checkpoint_target_lsn = target_lsn;
1257 *self.header_dirty_lock()? = true;
1258 drop(header);
1259 self.write_header_and_sync()
1260 }
1261
1262 pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1264 let mut header = self.header_write()?;
1265 header.checkpoint_lsn = lsn;
1266 header.checkpoint_in_progress = false;
1267 header.checkpoint_target_lsn = 0;
1268 *self.header_dirty_lock()? = true;
1269 drop(header);
1270 self.write_header_and_sync()
1271 }
1272}
1273
1274#[cfg(test)]
1275mod tests {
1276 use super::*;
1277
1278 fn temp_db_path(name: &str) -> PathBuf {
1279 std::env::temp_dir().join(format!(
1280 "reddb-pager-{}-{}-{}.rdb",
1281 name,
1282 std::process::id(),
1283 crate::utils::now_unix_nanos()
1284 ))
1285 }
1286
1287 #[test]
1288 fn open_refuses_future_database_version() {
1289 let path = temp_db_path("future-version");
1290 let pager = Pager::open_default(&path).unwrap();
1291 drop(pager);
1292
1293 let mut future_header = Page::new_header_page(1);
1294 reddb_file::set_database_header_version(
1295 future_header.as_bytes_mut(),
1296 reddb_file::PAGE_FILE_VERSION + 1,
1297 )
1298 .unwrap();
1299 future_header.update_checksum();
1300
1301 let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1302 file.seek(SeekFrom::Start(0)).unwrap();
1303 file.write_all(future_header.as_bytes()).unwrap();
1304 file.sync_all().unwrap();
1305 drop(file);
1306
1307 let err = match Pager::open_default(&path) {
1308 Ok(_) => panic!("future database version should be rejected"),
1309 Err(err) => err,
1310 };
1311 match err {
1312 PagerError::InvalidDatabase(msg) => {
1313 assert!(msg.contains("newer than supported"));
1314 }
1315 other => panic!("expected InvalidDatabase, got {other:?}"),
1316 }
1317
1318 let _ = std::fs::remove_file(&path);
1319 let _ = std::fs::remove_file(Pager::shadow_path(&path));
1320 let _ = std::fs::remove_file(Pager::dwb_path(&path));
1321 }
1322}