1use super::*;
2
3pub(super) const BTRFS_SUPER_MAGIC: i64 = 0x9123_683e;
4pub(super) const ZFS_SUPER_MAGIC: i64 = 0x2fc1_2fc1;
5pub(super) const FS_NOCOW_FL: u64 = 0x0080_0000;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub(super) enum CowFilesystemKind {
9 Zfs,
10 BtrfsDataCow,
11 TestOverride,
12}
13
14pub(super) fn classify_cow_filesystem(
15 fs_type: i64,
16 mount_options: Option<&str>,
17 inode_flags: Option<u64>,
18) -> Option<CowFilesystemKind> {
19 match fs_type {
20 ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
21 BTRFS_SUPER_MAGIC => {
22 let mount_options = mount_options?;
23 if mount_options.split(',').any(|option| option == "nodatacow") {
24 return None;
25 }
26
27 let inode_flags = inode_flags?;
28 if inode_flags & FS_NOCOW_FL != 0 {
29 return None;
30 }
31
32 Some(CowFilesystemKind::BtrfsDataCow)
33 }
34 _ => None,
35 }
36}
37
38#[cfg(target_os = "linux")]
39fn linux_fstatfs_type(file: &File) -> Option<i64> {
40 use std::mem::MaybeUninit;
41 use std::os::fd::AsRawFd;
42
43 let mut stat = MaybeUninit::<libc::statfs>::uninit();
44 let rc = unsafe { libc::fstatfs(file.as_raw_fd(), stat.as_mut_ptr()) };
45 if rc != 0 {
46 return None;
47 }
48 let stat = unsafe { stat.assume_init() };
49 #[allow(clippy::unnecessary_cast)]
52 Some(stat.f_type as i64)
53}
54
55#[cfg(target_os = "linux")]
56fn linux_inode_flags(file: &File) -> Option<u64> {
57 use std::os::fd::AsRawFd;
58
59 let mut flags: libc::c_long = 0;
60 let rc = unsafe { libc::ioctl(file.as_raw_fd(), libc::FS_IOC_GETFLAGS, &mut flags) };
61 if rc != 0 {
62 return None;
63 }
64 Some(flags as u64)
65}
66
67#[cfg(target_os = "linux")]
68fn linux_mount_options_for_path(path: &Path) -> Option<String> {
69 let path = path.canonicalize().ok()?;
70 let mountinfo = std::fs::read_to_string("/proc/self/mountinfo").ok()?;
71 parse_mountinfo_options_for_path(&mountinfo, &path)
72}
73
74#[cfg(target_os = "linux")]
75pub(super) fn parse_mountinfo_options_for_path(mountinfo: &str, path: &Path) -> Option<String> {
76 let mut best: Option<(usize, String)> = None;
77
78 for line in mountinfo.lines() {
79 let fields: Vec<&str> = line.split(' ').collect();
80 if fields.len() < 10 {
81 continue;
82 }
83
84 let Some(separator) = fields.iter().position(|field| *field == "-") else {
85 continue;
86 };
87 if separator + 3 >= fields.len() || separator < 6 {
88 continue;
89 }
90
91 let mount_point = mountinfo_unescape_path(fields[4]);
92 if !path.starts_with(&mount_point) {
93 continue;
94 }
95
96 let fs_type = fields[separator + 1];
97 if fs_type != "btrfs" && fs_type != "zfs" {
98 continue;
99 }
100
101 let mount_options = fields[5];
102 let super_options = fields[separator + 3];
103 let options = format!("{mount_options},{super_options}");
104 let depth = mount_point.components().count();
105 if best
106 .as_ref()
107 .map(|(best_depth, _)| depth > *best_depth)
108 .unwrap_or(true)
109 {
110 best = Some((depth, options));
111 }
112 }
113
114 best.map(|(_, options)| options)
115}
116
117#[cfg(target_os = "linux")]
118fn mountinfo_unescape_path(value: &str) -> PathBuf {
119 let bytes = value.as_bytes();
120 let mut out = Vec::with_capacity(bytes.len());
121 let mut i = 0;
122
123 while i < bytes.len() {
124 if bytes[i] == b'\\' && i + 3 < bytes.len() {
125 let octal = &value[i + 1..i + 4];
126 if let Ok(byte) = u8::from_str_radix(octal, 8) {
127 out.push(byte);
128 i += 4;
129 continue;
130 }
131 }
132 out.push(bytes[i]);
133 i += 1;
134 }
135
136 PathBuf::from(String::from_utf8_lossy(&out).into_owned())
137}
138
139impl Pager {
140 pub fn open<P: AsRef<Path>>(path: P, mut config: PagerConfig) -> Result<Self, PagerError> {
142 let path = path.as_ref().to_path_buf();
143 let exists = path.exists();
144
145 if !exists && !config.create {
146 return Err(PagerError::InvalidDatabase(
147 "Database does not exist".into(),
148 ));
149 }
150
151 if !exists && config.read_only {
152 return Err(PagerError::InvalidDatabase(
153 "Cannot create read-only database".into(),
154 ));
155 }
156
157 let file = OpenOptions::new()
160 .read(true)
161 .write(!config.read_only)
162 .create(config.create && !config.read_only)
163 .open(&path)?;
164
165 #[cfg(unix)]
171 {
172 use std::os::unix::fs::MetadataExt;
173 if let Ok(meta) = file.metadata() {
174 let fs_block_size = meta.blksize();
175 if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
176 tracing::warn!(
177 page_size = PAGE_SIZE,
178 fs_block_size,
179 path = %path.display(),
180 "database page size is not a multiple of the filesystem \
181 block size; page writes will straddle FS blocks \
182 (read-modify-write amplification). Diagnostic only — \
183 the page size is unchanged."
184 );
185 }
186 }
187 }
188
189 let lock_file = if !config.read_only {
191 let lf = OpenOptions::new().read(true).write(true).open(&path)?;
192 lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
193 Some(lf)
194 } else {
195 let lf = OpenOptions::new().read(true).open(&path)?;
196 match lf.try_lock_shared() {
197 Ok(_) => Some(lf),
198 Err(_) => None,
199 }
200 };
201
202 let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
214 if !config.double_write && !config.read_only && !fold_dwb {
215 let skip_dwb_on_cow =
216 Self::cow_filesystem_has_atomic_page_writes(&path, &file).is_some();
217 if !skip_dwb_on_cow {
218 tracing::warn!(
219 path = %path.display(),
220 "double_write=false requested, but the data file is not proven to be on \
221 ZFS or btrfs datacow; keeping the double-write buffer enabled"
222 );
223 config.double_write = true;
224 }
225 }
226
227 let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
228 let f = Self::open_dwb_file(&path)?;
229 Some(Mutex::new(f))
230 } else {
231 if fold_dwb && !config.read_only {
232 let _ = std::fs::remove_file(Self::dwb_path(&path));
233 }
234 None
235 };
236
237 let mut pager = Self {
238 path,
239 file: Mutex::new(file),
240 _lock_file: lock_file,
241 dwb_file,
242 cache: PageCache::new(config.cache_size),
243 freelist: RwLock::new(FreeList::new()),
244 header: RwLock::new(DatabaseHeader::default()),
245 config,
246 header_dirty: Mutex::new(false),
247 wal: RwLock::new(None),
248 encryption: None,
249 };
250
251 if exists {
252 pager.recover_from_dwb()?;
254 if !pager.config.double_write && !pager.config.read_only {
255 let _ = std::fs::remove_file(Self::dwb_path(&pager.path));
256 }
257 pager.load_header()?;
259 pager.bind_encryption_for_existing()?;
260 } else {
261 pager.initialize()?;
263 pager.bind_encryption_for_new()?;
264 }
265
266 Ok(pager)
267 }
268
269 pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
276 fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
277 }
278
279 #[cfg(test)]
280 fn cow_filesystem_test_override() -> Option<bool> {
281 match COW_ATOMIC_WRITE_TEST_OVERRIDE.load(Ordering::Relaxed) {
282 1 => Some(true),
283 2 => Some(false),
284 _ => None,
285 }
286 }
287
288 #[cfg(not(test))]
289 fn cow_filesystem_test_override() -> Option<bool> {
290 None
291 }
292
293 fn cow_filesystem_has_atomic_page_writes(
294 path: &Path,
295 file: &File,
296 ) -> Option<CowFilesystemKind> {
297 if let Some(allowed) = Self::cow_filesystem_test_override() {
298 return allowed.then_some(CowFilesystemKind::TestOverride);
299 }
300 Self::probe_cow_filesystem(path, file)
301 }
302
303 #[cfg(target_os = "linux")]
304 fn probe_cow_filesystem(path: &Path, file: &File) -> Option<CowFilesystemKind> {
305 let fs_type = linux_fstatfs_type(file)?;
306 match fs_type {
307 ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
308 BTRFS_SUPER_MAGIC => {
309 let mount_options = linux_mount_options_for_path(path)?;
310 let inode_flags = linux_inode_flags(file)?;
311 classify_cow_filesystem(fs_type, Some(&mount_options), Some(inode_flags))
312 }
313 _ => None,
314 }
315 }
316
317 #[cfg(not(target_os = "linux"))]
318 fn probe_cow_filesystem(_path: &Path, _file: &File) -> Option<CowFilesystemKind> {
319 None
320 }
321
322 fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
332 if self.page_count().unwrap_or(0) == 0 {
333 return self.bind_encryption_for_new();
334 }
335 let header_page = self.read_page_no_checksum(0)?;
336 let data = header_page.as_bytes();
337 let has_marker = reddb_file::paged_encryption_marker_present(data);
338
339 let key = self.config.encryption.clone();
340 match (has_marker, key) {
341 (true, Some(key)) => {
342 let header_bytes =
343 reddb_file::paged_encryption_header_bytes(data).ok_or_else(|| {
344 PagerError::InvalidDatabase("encryption header parse failed".to_string())
345 })?;
346 let header = crate::storage::encryption::EncryptionHeader::from_bytes(header_bytes)
347 .map_err(|e| {
348 PagerError::InvalidDatabase(format!("encryption header parse failed: {e}"))
349 })?;
350 if !header.validate(&key) {
351 return Err(PagerError::InvalidKey);
352 }
353 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
354 self.encryption = Some((encryptor, header));
355 Ok(())
356 }
357 (true, None) => Err(PagerError::EncryptionRequired),
358 (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
359 (false, None) => Ok(()),
360 }
361 }
362
363 fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
366 let Some(key) = self.config.encryption.clone() else {
367 return Ok(());
368 };
369 let header = crate::storage::encryption::EncryptionHeader::new(&key);
370 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
371
372 if self.page_count().unwrap_or(0) > 0 {
375 let mut page = self.read_page_no_checksum(0)?;
376 let data = page.as_bytes_mut();
377 let header_bytes = header.to_bytes();
378 reddb_file::write_paged_encryption_marker_and_header(data, &header_bytes)
379 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
380 self.write_page_no_checksum(0, page)?;
381 }
382 self.encryption = Some((encryptor, header));
383 Ok(())
384 }
385
386 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
388 Self::open(path, PagerConfig::default())
389 }
390
391 fn initialize(&self) -> Result<(), PagerError> {
393 if self.config.read_only {
394 return Err(PagerError::ReadOnly);
395 }
396
397 let initial_page_count = 3;
401 let header_page = Page::new_header_page(initial_page_count);
402 self.header_write()?.page_count = initial_page_count;
403
404 self.write_page_raw(0, &header_page)?;
407 let mut metadata_page = Page::new(PageType::Header, 1);
408 metadata_page.update_checksum();
409 self.write_page_raw(1, &metadata_page)?;
410 let mut vault_page = Page::new(PageType::Vault, 2);
411 vault_page.update_checksum();
412 self.write_page_raw(2, &vault_page)?;
413
414 self.sync()?;
416
417 Ok(())
418 }
419
420 fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
422 self.header.write().map_err(|_| PagerError::LockPoisoned)
423 }
424
425 fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
427 self.header.read().map_err(|_| PagerError::LockPoisoned)
428 }
429
430 fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
432 self.freelist.write().map_err(|_| PagerError::LockPoisoned)
433 }
434
435 fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
437 self.file.lock().map_err(|_| PagerError::LockPoisoned)
438 }
439
440 fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
442 self.header_dirty
443 .lock()
444 .map_err(|_| PagerError::LockPoisoned)
445 }
446
447 fn load_header(&self) -> Result<(), PagerError> {
449 let header_page = match self.read_page_raw(0) {
451 Ok(page) => {
452 if reddb_file::database_header_magic_matches(page.as_bytes()) {
454 page
455 } else {
456 self.recover_header_from_shadow()?
458 }
459 }
460 Err(_) => self.recover_header_from_shadow()?,
461 };
462
463 let decoded_header = reddb_file::decode_database_header(header_page.as_bytes())
464 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
465 let freelist_head = decoded_header.freelist_head;
466
467 {
468 let mut header = self.header_write()?;
469 *header = decoded_header;
470 }
471
472 {
474 let mut freelist = self.freelist_write()?;
475 *freelist = FreeList::from_header(freelist_head, 0);
476 }
477
478 Ok(())
479 }
480
481 fn write_header(&self) -> Result<(), PagerError> {
486 if self.config.read_only {
487 return Err(PagerError::ReadOnly);
488 }
489
490 let header = self.header_read()?;
491
492 let mut page = if let Some(cached) = self.cache.get(0) {
495 cached
496 } else {
497 let file = self.file_lock()?;
499 let len = file.metadata().map(|m| m.len()).unwrap_or(0);
500 drop(file);
501
502 if len >= PAGE_SIZE as u64 {
503 self.read_page_raw(0)?
504 } else {
505 Page::new(PageType::Header, 0)
507 }
508 };
509
510 let data = page.as_bytes_mut();
511
512 reddb_file::encode_database_header(data, &header)
513 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
514
515 page.update_checksum();
516
517 self.write_header_shadow(&page)?;
519
520 self.write_page_raw(0, &page)?;
521 *self.header_dirty_lock()? = false;
522
523 Ok(())
524 }
525
526 fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
528 let mut file = self.file_lock()?;
529 let offset = (page_id as u64) * (PAGE_SIZE as u64);
530
531 file.seek(SeekFrom::Start(offset))?;
532
533 let mut buf = [0u8; PAGE_SIZE];
534 file.read_exact(&mut buf)?;
535
536 let page = Page::from_bytes(buf);
537
538 if self.config.verify_checksums {
540 page.verify_checksum()?;
541 }
542
543 Ok(page)
544 }
545
546 fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
548 if self.config.read_only {
549 return Err(PagerError::ReadOnly);
550 }
551
552 let mut file = self.file_lock()?;
553 let offset = (page_id as u64) * (PAGE_SIZE as u64);
554
555 file.seek(SeekFrom::Start(offset))?;
556 file.write_all(page.as_bytes())?;
557
558 Ok(())
559 }
560
561 pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
563 if let Some(page) = self.cache.get(page_id) {
565 return Ok(page);
566 }
567
568 let page = self.read_page_raw(page_id)?;
570
571 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
573 let evicted_id = dirty_page.page_id();
575 self.write_page_raw(evicted_id, &dirty_page)?;
576 }
577
578 Ok(page)
579 }
580
581 pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
586 if let Some(page) = self.cache.get(page_id) {
588 return Ok(page);
589 }
590
591 let mut file = self.file_lock()?;
593 let offset = (page_id as u64) * (PAGE_SIZE as u64);
594
595 file.seek(SeekFrom::Start(offset))?;
596
597 let mut buf = [0u8; PAGE_SIZE];
598 file.read_exact(&mut buf)?;
599 drop(file);
600
601 let page = Page::from_bytes(buf);
602
603 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
605 let evicted_id = dirty_page.page_id();
607 self.write_page_raw(evicted_id, &dirty_page)?;
608 }
609
610 Ok(page)
611 }
612
613 pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
615 if self.config.read_only {
616 return Err(PagerError::ReadOnly);
617 }
618
619 page.update_checksum();
621
622 if let Some(dirty_page) = self.cache.insert(page_id, page) {
627 let evicted_id = dirty_page.page_id();
628 self.write_page_raw(evicted_id, &dirty_page)?;
629 }
630 self.cache.mark_dirty(page_id);
631
632 Ok(())
633 }
634
635 pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
640 if page_id == 0 || self.encryption.is_none() {
641 return self.read_page(page_id);
642 }
643 let raw = self.read_page_no_checksum(page_id)?;
644 let (enc, _) = self
645 .encryption
646 .as_ref()
647 .expect("encryption presence checked above");
648 let plaintext = enc
649 .decrypt(page_id, raw.as_bytes())
650 .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
651 let mut buf = [0u8; PAGE_SIZE];
652 let n = plaintext.len().min(PAGE_SIZE);
653 buf[..n].copy_from_slice(&plaintext[..n]);
654 Ok(Page::from_bytes(buf))
655 }
656
657 pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
662 if page_id == 0 || self.encryption.is_none() {
663 return self.write_page(page_id, page);
664 }
665 const OVERHEAD: usize = crate::storage::encryption::page_encryptor::OVERHEAD;
668 let plaintext_len = PAGE_SIZE - OVERHEAD;
669 let plaintext = &page.as_bytes()[..plaintext_len];
670 let (enc, _) = self
671 .encryption
672 .as_ref()
673 .expect("encryption presence checked above");
674 let ciphertext = enc.encrypt(page_id, plaintext);
675 debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
676 let mut buf = [0u8; PAGE_SIZE];
677 buf.copy_from_slice(&ciphertext);
678 let cipher_page = Page::from_bytes(buf);
679 self.write_page_no_checksum(page_id, cipher_page)
680 }
681
682 pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
687 if self.config.read_only {
688 return Err(PagerError::ReadOnly);
689 }
690
691 if let Some(dirty_page) = self.cache.insert(page_id, page) {
694 let evicted_id = dirty_page.page_id();
695 self.write_page_raw(evicted_id, &dirty_page)?;
696 }
697 self.cache.mark_dirty(page_id);
698
699 Ok(())
700 }
701
702 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
704 if self.config.read_only {
705 return Err(PagerError::ReadOnly);
706 }
707
708 let page_id = {
710 let mut freelist = self.freelist_write()?;
711 if let Some(id) = freelist.allocate() {
712 id
713 } else if freelist.trunk_head() != 0 {
714 let trunk_id = freelist.trunk_head();
715 drop(freelist);
716
717 let trunk = self.read_page(trunk_id).map_err(|e| match e {
718 PagerError::PageNotFound(_) => {
719 PagerError::InvalidDatabase("Freelist trunk missing".to_string())
720 }
721 other => other,
722 })?;
723
724 let mut freelist = self.freelist_write()?;
725 freelist
726 .load_from_trunk(&trunk)
727 .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
728 let id = freelist.allocate().ok_or_else(|| {
729 PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
730 })?;
731
732 let mut header = self.header_write()?;
733 header.freelist_head = freelist.trunk_head();
734 *self.header_dirty_lock()? = true;
735
736 id
737 } else {
738 let mut header = self.header_write()?;
740 let id = header.page_count;
741 header.page_count += 1;
742 *self.header_dirty_lock()? = true;
743 id
744 }
745 };
746
747 let page = Page::new(page_type, page_id);
748
749 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
761 let evicted_id = dirty_page.page_id();
762 self.write_page_raw(evicted_id, &dirty_page)?;
763 }
764 self.cache.mark_dirty(page_id);
765
766 Ok(page)
767 }
768
769 pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
771 if self.config.read_only {
772 return Err(PagerError::ReadOnly);
773 }
774 if n_pages == 0 {
775 return Err(PagerError::InvalidDatabase(
776 "contiguous extent must reserve at least one page".to_string(),
777 ));
778 }
779
780 let start_page = {
781 let mut header = self.header_write()?;
782 let start = header.page_count;
783 header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
784 PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
785 })?;
786 *self.header_dirty_lock()? = true;
787 start
788 };
789
790 for page_id in start_page..start_page + n_pages {
791 let mut page = Page::new(PageType::Vector, page_id);
792 page.update_checksum();
793 if let Some(dirty_page) = self.cache.insert(page_id, page) {
794 let evicted_id = dirty_page.page_id();
795 self.write_page_raw(evicted_id, &dirty_page)?;
796 }
797 self.cache.mark_dirty(page_id);
798 }
799
800 Ok(super::ExtentId {
801 start_page,
802 n_pages,
803 })
804 }
805
806 pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
808 if self.config.read_only {
809 return Err(PagerError::ReadOnly);
810 }
811
812 self.cache.remove(page_id);
814
815 let mut freelist = self.freelist_write()?;
817 freelist.free(page_id);
818
819 *self.header_dirty_lock()? = true;
820
821 Ok(())
822 }
823
824 pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
826 Ok(self.header_read()?.clone())
827 }
828
829 pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
830 Ok(self.header_read()?.physical)
831 }
832
833 pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
834 if self.config.read_only {
835 return Err(PagerError::ReadOnly);
836 }
837
838 let mut header = self.header_write()?;
839 header.physical = physical;
840 *self.header_dirty_lock()? = true;
841 Ok(())
842 }
843
844 pub fn page_count(&self) -> Result<u32, PagerError> {
846 Ok(self.header_read()?.page_count)
847 }
848
849 pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
861 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
862 *slot = Some(wal);
863 }
864
865 pub fn clear_wal_writer(&self) {
867 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
868 *slot = None;
869 }
870
871 pub fn has_wal_writer(&self) -> bool {
873 self.wal.read().map(|s| s.is_some()).unwrap_or(false)
874 }
875
876 pub fn flush(&self) -> Result<(), PagerError> {
878 if self.config.read_only {
879 return Ok(());
880 }
881
882 let trunks = {
884 let mut freelist = self.freelist_write()?;
885 if freelist.is_dirty() {
886 let mut header = self.header_write()?;
887 let trunks = freelist.flush_to_trunks(0, || {
888 let id = header.page_count;
889 header.page_count += 1;
890 id
891 });
892 header.freelist_head = freelist.trunk_head();
893 *self.header_dirty_lock()? = true;
894 freelist.mark_clean();
895 trunks
896 } else {
897 Vec::new()
898 }
899 };
900
901 for trunk in trunks {
902 let page_id = trunk.page_id();
903 self.cache.insert(page_id, trunk);
904 self.cache.mark_dirty(page_id);
905 }
906
907 let dirty_pages = self.cache.flush_dirty();
909 if !dirty_pages.is_empty() {
910 let max_lsn = dirty_pages
915 .iter()
916 .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
917 .max()
918 .unwrap_or(0);
919 if max_lsn > 0 {
920 if let Ok(slot) = self.wal.read() {
921 if let Some(wal) = slot.as_ref() {
922 let wal = Arc::clone(wal);
923 drop(slot);
927 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
928 wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
929 }
930 }
931 }
932 self.write_pages_through_dwb(&dirty_pages)?;
933 }
934
935 if *self.header_dirty_lock()? {
937 self.write_header()?;
938 }
939
940 Ok(())
941 }
942
943 pub fn sync(&self) -> Result<(), PagerError> {
945 self.flush()?;
946
947 let file = self.file_lock()?;
948 file.sync_all()?;
949
950 Ok(())
951 }
952
953 pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
955 self.cache.stats()
956 }
957
958 pub fn dirty_page_count(&self) -> usize {
960 self.cache.dirty_count()
961 }
962
963 pub fn dirty_fraction(&self) -> f64 {
967 let capacity = self.cache.capacity().max(1) as f64;
968 self.cache.dirty_count() as f64 / capacity
969 }
970
971 pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
975 if self.config.read_only || max == 0 {
976 return Ok(0);
977 }
978 let dirty_pages = self.cache.flush_some_dirty(max);
979 if dirty_pages.is_empty() {
980 return Ok(0);
981 }
982 let count = dirty_pages.len();
983 for (page_id, page) in dirty_pages {
990 self.write_page(page_id, page)?;
991 }
992 Ok(count)
993 }
994
995 pub fn path(&self) -> &Path {
997 &self.path
998 }
999
1000 pub fn is_read_only(&self) -> bool {
1002 self.config.read_only
1003 }
1004
1005 pub fn file_size(&self) -> Result<u64, PagerError> {
1007 let file = self.file_lock()?;
1008 Ok(file.metadata()?.len())
1009 }
1010
1011 pub fn prefetch_hint(&self, page_id: u32) {
1019 if let Ok(file) = self.file_lock() {
1020 let _ = crate::storage::btree::prefetch::prefetch_page(
1021 &file,
1022 page_id as u64,
1023 PAGE_SIZE as u32,
1024 );
1025 }
1026 }
1027
1028 fn shadow_path(db_path: &Path) -> PathBuf {
1032 reddb_file::layout::pager_header_shadow_path(db_path)
1033 }
1034
1035 fn meta_shadow_path(db_path: &Path) -> PathBuf {
1037 reddb_file::layout::pager_meta_shadow_path(db_path)
1038 }
1039
1040 fn dwb_path(db_path: &Path) -> PathBuf {
1042 reddb_file::layout::pager_dwb_shadow_path(db_path)
1043 }
1044
1045 fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1050 Ok(OpenOptions::new()
1051 .read(true)
1052 .write(true)
1053 .create(true)
1054 .truncate(false)
1055 .open(Self::dwb_path(db_path))?)
1056 }
1057
1058 fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1060 file.set_len(0)?;
1061 file.seek(SeekFrom::Start(0))?;
1062 file.sync_all()?;
1063 Ok(())
1064 }
1065
1066 fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1068 if self.config.read_only {
1069 return Ok(());
1070 }
1071 let shadow = Self::shadow_path(&self.path);
1072 let mut f = File::create(&shadow)?;
1073 f.write_all(page.as_bytes())?;
1074 f.sync_all()?;
1075 Ok(())
1076 }
1077
1078 fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1080 let shadow = Self::shadow_path(&self.path);
1081 if !shadow.exists() {
1082 return Err(PagerError::InvalidDatabase(
1083 "Page 0 corrupted and no header shadow found".into(),
1084 ));
1085 }
1086 let mut f = File::open(&shadow)?;
1087 let mut buf = [0u8; PAGE_SIZE];
1088 f.read_exact(&mut buf)?;
1089 let page = Page::from_bytes(buf);
1090
1091 if !reddb_file::database_header_magic_matches(page.as_bytes()) {
1093 return Err(PagerError::InvalidDatabase(
1094 "Header shadow also corrupted".into(),
1095 ));
1096 }
1097
1098 if !self.config.read_only {
1100 self.write_page_raw(0, &page)?;
1101 let file = self.file_lock()?;
1102 file.sync_all()?;
1103 }
1104
1105 Ok(page)
1106 }
1107
1108 pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1118 if self.config.read_only {
1119 return Ok(());
1120 }
1121 let shadow = Self::meta_shadow_path(&self.path);
1122 if crate::physical::fold_pager_meta_enabled() {
1123 let _ = std::fs::remove_file(&shadow);
1126 return Ok(());
1127 }
1128 let mut f = File::create(&shadow)?;
1129 f.write_all(page.as_bytes())?;
1130 f.sync_all()?;
1131 Ok(())
1132 }
1133
1134 pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1136 let shadow = Self::meta_shadow_path(&self.path);
1137 if !shadow.exists() {
1138 return Err(PagerError::InvalidDatabase(
1139 "Page 1 corrupted and no metadata shadow found".into(),
1140 ));
1141 }
1142 let mut f = File::open(&shadow)?;
1143 let mut buf = [0u8; PAGE_SIZE];
1144 f.read_exact(&mut buf)?;
1145 let page = Page::from_bytes(buf);
1146
1147 if !self.config.read_only {
1149 self.write_page_raw(1, &page)?;
1150 let file = self.file_lock()?;
1151 file.sync_all()?;
1152 }
1153
1154 Ok(page)
1155 }
1156
1157 fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1164 if let Some(dwb_mutex) = &self.dwb_file {
1165 let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1166
1167 let buf = reddb_file::encode_paged_dwb_frame(
1168 pages
1169 .iter()
1170 .map(|(page_id, page)| (*page_id, page.as_bytes())),
1171 );
1172
1173 dwb.seek(SeekFrom::Start(0))?;
1175 dwb.write_all(&buf)?;
1176 dwb.set_len(buf.len() as u64)?;
1177 dwb.sync_all()?;
1178
1179 for (page_id, page) in pages {
1181 self.write_page_raw(*page_id, page)?;
1182 }
1183
1184 Self::clear_dwb_file(&mut dwb)?;
1186
1187 Ok(())
1188 } else {
1189 for (page_id, page) in pages {
1191 self.write_page_raw(*page_id, page)?;
1192 }
1193 Ok(())
1194 }
1195 }
1196
1197 fn recover_from_dwb(&self) -> Result<(), PagerError> {
1202 let dwb_path = Self::dwb_path(&self.path);
1203 if !dwb_path.exists() {
1204 return Ok(());
1205 }
1206
1207 if let Some(dwb_mutex) = &self.dwb_file {
1208 let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1209 return self.recover_from_dwb_file(&mut file);
1210 }
1211
1212 let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1213 self.recover_from_dwb_file(&mut file)
1214 }
1215
1216 fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1217 file.seek(SeekFrom::Start(0))?;
1218 let len = file.metadata()?.len();
1219 let mut buf = vec![0u8; len as usize];
1220 file.read_exact(&mut buf)?;
1221
1222 let entries = match reddb_file::decode_paged_dwb_frame(&buf) {
1223 Ok(entries) => entries,
1224 Err(_) => return Self::clear_dwb_file(file),
1225 };
1226
1227 for entry in entries {
1229 let page = Page::from_bytes(entry.page);
1230 self.write_page_raw(entry.page_id, &page)?;
1231 }
1232
1233 {
1235 let file = self.file_lock()?;
1236 file.sync_all()?;
1237 }
1238
1239 Self::clear_dwb_file(file)
1240 }
1241
1242 pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1244 self.write_header()?;
1245 let file = self.file_lock()?;
1246 file.sync_all()?;
1247 Ok(())
1248 }
1249
1250 pub fn set_checkpoint_in_progress(
1252 &self,
1253 in_progress: bool,
1254 target_lsn: u64,
1255 ) -> Result<(), PagerError> {
1256 let mut header = self.header_write()?;
1257 header.checkpoint_in_progress = in_progress;
1258 header.checkpoint_target_lsn = target_lsn;
1259 *self.header_dirty_lock()? = true;
1260 drop(header);
1261 self.write_header_and_sync()
1262 }
1263
1264 pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1266 let mut header = self.header_write()?;
1267 header.checkpoint_lsn = lsn;
1268 header.checkpoint_in_progress = false;
1269 header.checkpoint_target_lsn = 0;
1270 *self.header_dirty_lock()? = true;
1271 drop(header);
1272 self.write_header_and_sync()
1273 }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278 use super::*;
1279
1280 fn temp_db_path(name: &str) -> PathBuf {
1281 std::env::temp_dir().join(format!(
1282 "reddb-pager-{}-{}-{}.rdb",
1283 name,
1284 std::process::id(),
1285 crate::utils::now_unix_nanos()
1286 ))
1287 }
1288
1289 #[test]
1290 fn open_refuses_future_database_version() {
1291 let path = temp_db_path("future-version");
1292 let pager = Pager::open_default(&path).unwrap();
1293 drop(pager);
1294
1295 let mut future_header = Page::new_header_page(1);
1296 reddb_file::set_database_header_version(
1297 future_header.as_bytes_mut(),
1298 reddb_file::PAGE_FILE_VERSION + 1,
1299 )
1300 .unwrap();
1301 future_header.update_checksum();
1302
1303 let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1304 file.seek(SeekFrom::Start(0)).unwrap();
1305 file.write_all(future_header.as_bytes()).unwrap();
1306 file.sync_all().unwrap();
1307 drop(file);
1308
1309 let err = match Pager::open_default(&path) {
1310 Ok(_) => panic!("future database version should be rejected"),
1311 Err(err) => err,
1312 };
1313 match err {
1314 PagerError::InvalidDatabase(msg) => {
1315 assert!(msg.contains("newer than supported"));
1316 }
1317 other => panic!("expected InvalidDatabase, got {other:?}"),
1318 }
1319
1320 let _ = std::fs::remove_file(&path);
1321 let _ = std::fs::remove_file(Pager::shadow_path(&path));
1322 let _ = std::fs::remove_file(Pager::dwb_path(&path));
1323 }
1324}