Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3pub(super) const BTRFS_SUPER_MAGIC: i64 = 0x9123_683e;
4pub(super) const ZFS_SUPER_MAGIC: i64 = 0x2fc1_2fc1;
5pub(super) const FS_NOCOW_FL: u64 = 0x0080_0000;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub(super) enum CowFilesystemKind {
9    Zfs,
10    BtrfsDataCow,
11    TestOverride,
12}
13
14pub(super) fn classify_cow_filesystem(
15    fs_type: i64,
16    mount_options: Option<&str>,
17    inode_flags: Option<u64>,
18) -> Option<CowFilesystemKind> {
19    match fs_type {
20        ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
21        BTRFS_SUPER_MAGIC => {
22            let mount_options = mount_options?;
23            if mount_options.split(',').any(|option| option == "nodatacow") {
24                return None;
25            }
26
27            let inode_flags = inode_flags?;
28            if inode_flags & FS_NOCOW_FL != 0 {
29                return None;
30            }
31
32            Some(CowFilesystemKind::BtrfsDataCow)
33        }
34        _ => None,
35    }
36}
37
38#[cfg(target_os = "linux")]
39fn linux_fstatfs_type(file: &File) -> Option<i64> {
40    use std::mem::MaybeUninit;
41    use std::os::fd::AsRawFd;
42
43    let mut stat = MaybeUninit::<libc::statfs>::uninit();
44    let rc = unsafe { libc::fstatfs(file.as_raw_fd(), stat.as_mut_ptr()) };
45    if rc != 0 {
46        return None;
47    }
48    let stat = unsafe { stat.assume_init() };
49    // `statfs.f_type` is i32 on 32-bit glibc targets (armv7) and i64 on
50    // 64-bit ones — widen explicitly so both compile.
51    #[allow(clippy::unnecessary_cast)]
52    Some(stat.f_type as i64)
53}
54
55#[cfg(target_os = "linux")]
56fn linux_inode_flags(file: &File) -> Option<u64> {
57    use std::os::fd::AsRawFd;
58
59    let mut flags: libc::c_long = 0;
60    let rc = unsafe { libc::ioctl(file.as_raw_fd(), libc::FS_IOC_GETFLAGS, &mut flags) };
61    if rc != 0 {
62        return None;
63    }
64    Some(flags as u64)
65}
66
67#[cfg(target_os = "linux")]
68fn linux_mount_options_for_path(path: &Path) -> Option<String> {
69    let path = path.canonicalize().ok()?;
70    let mountinfo = std::fs::read_to_string("/proc/self/mountinfo").ok()?;
71    parse_mountinfo_options_for_path(&mountinfo, &path)
72}
73
74#[cfg(target_os = "linux")]
75pub(super) fn parse_mountinfo_options_for_path(mountinfo: &str, path: &Path) -> Option<String> {
76    let mut best: Option<(usize, String)> = None;
77
78    for line in mountinfo.lines() {
79        let fields: Vec<&str> = line.split(' ').collect();
80        if fields.len() < 10 {
81            continue;
82        }
83
84        let Some(separator) = fields.iter().position(|field| *field == "-") else {
85            continue;
86        };
87        if separator + 3 >= fields.len() || separator < 6 {
88            continue;
89        }
90
91        let mount_point = mountinfo_unescape_path(fields[4]);
92        if !path.starts_with(&mount_point) {
93            continue;
94        }
95
96        let fs_type = fields[separator + 1];
97        if fs_type != "btrfs" && fs_type != "zfs" {
98            continue;
99        }
100
101        let mount_options = fields[5];
102        let super_options = fields[separator + 3];
103        let options = format!("{mount_options},{super_options}");
104        let depth = mount_point.components().count();
105        if best
106            .as_ref()
107            .map(|(best_depth, _)| depth > *best_depth)
108            .unwrap_or(true)
109        {
110            best = Some((depth, options));
111        }
112    }
113
114    best.map(|(_, options)| options)
115}
116
117#[cfg(target_os = "linux")]
118fn mountinfo_unescape_path(value: &str) -> PathBuf {
119    let bytes = value.as_bytes();
120    let mut out = Vec::with_capacity(bytes.len());
121    let mut i = 0;
122
123    while i < bytes.len() {
124        if bytes[i] == b'\\' && i + 3 < bytes.len() {
125            let octal = &value[i + 1..i + 4];
126            if let Ok(byte) = u8::from_str_radix(octal, 8) {
127                out.push(byte);
128                i += 4;
129                continue;
130            }
131        }
132        out.push(bytes[i]);
133        i += 1;
134    }
135
136    PathBuf::from(String::from_utf8_lossy(&out).into_owned())
137}
138
139impl Pager {
140    /// Open or create a database file
141    pub fn open<P: AsRef<Path>>(path: P, mut config: PagerConfig) -> Result<Self, PagerError> {
142        let path = path.as_ref().to_path_buf();
143        let exists = path.exists();
144
145        if !exists && !config.create {
146            return Err(PagerError::InvalidDatabase(
147                "Database does not exist".into(),
148            ));
149        }
150
151        if !exists && config.read_only {
152            return Err(PagerError::InvalidDatabase(
153                "Cannot create read-only database".into(),
154            ));
155        }
156
157        // Open file
158        // Note: create requires write access, so disable it for read-only mode
159        let file = OpenOptions::new()
160            .read(true)
161            .write(!config.read_only)
162            .create(config.create && !config.read_only)
163            .open(&path)?;
164
165        // gh-892: diagnostic probe of the filesystem block size. If the
166        // compile-time 16 KiB PAGE_SIZE is not a multiple of the FS block
167        // size, page writes straddle FS blocks and incur read-modify-write
168        // amplification. Pure diagnostic — emitted once per open, never
169        // mutates the page size. `blksize()` is the fstat `st_blksize`.
170        #[cfg(unix)]
171        {
172            use std::os::unix::fs::MetadataExt;
173            if let Ok(meta) = file.metadata() {
174                let fs_block_size = meta.blksize();
175                if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
176                    tracing::warn!(
177                        page_size = PAGE_SIZE,
178                        fs_block_size,
179                        path = %path.display(),
180                        "database page size is not a multiple of the filesystem \
181                         block size; page writes will straddle FS blocks \
182                         (read-modify-write amplification). Diagnostic only — \
183                         the page size is unchanged."
184                    );
185                }
186            }
187        }
188
189        // Acquire file lock (exclusive for writes, shared for read-only)
190        let lock_file = if !config.read_only {
191            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
192            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
193            Some(lf)
194        } else {
195            let lf = OpenOptions::new().read(true).open(&path)?;
196            match lf.try_lock_shared() {
197                Ok(_) => Some(lf),
198                Err(_) => None,
199            }
200        };
201
202        // Open double-write buffer file.
203        //
204        // gh-478: when `fold_dwb_into_wal` is enabled the DWB sidecar is
205        // suppressed — torn pages are healed by replaying FullPageImage WAL
206        // records during recovery. Any pre-existing `-dwb` is removed so a
207        // flipped flag cannot leave a stale sidecar on disk.
208        //
209        // gh-895: an explicit `double_write = false` request is honored only
210        // when the already-open data file is proven to live on a filesystem
211        // with atomic CoW page writes. Unknown and non-CoW filesystems fail
212        // closed by keeping the DWB sidecar.
213        let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
214        if !config.double_write && !config.read_only && !fold_dwb {
215            let skip_dwb_on_cow =
216                Self::cow_filesystem_has_atomic_page_writes(&path, &file).is_some();
217            if !skip_dwb_on_cow {
218                tracing::warn!(
219                    path = %path.display(),
220                    "double_write=false requested, but the data file is not proven to be on \
221                     ZFS or btrfs datacow; keeping the double-write buffer enabled"
222                );
223                config.double_write = true;
224            }
225        }
226
227        let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
228            let f = Self::open_dwb_file(&path)?;
229            Some(Mutex::new(f))
230        } else {
231            if fold_dwb && !config.read_only {
232                let _ = std::fs::remove_file(Self::dwb_path(&path));
233            }
234            None
235        };
236
237        let mut pager = Self {
238            path,
239            file: Mutex::new(file),
240            _lock_file: lock_file,
241            dwb_file,
242            cache: PageCache::new(config.cache_size),
243            freelist: RwLock::new(FreeList::new()),
244            header: RwLock::new(DatabaseHeader::default()),
245            config,
246            header_dirty: Mutex::new(false),
247            wal: RwLock::new(None),
248            encryption: None,
249        };
250
251        if exists {
252            // Recover from double-write buffer if needed
253            pager.recover_from_dwb()?;
254            if !pager.config.double_write && !pager.config.read_only {
255                let _ = std::fs::remove_file(Self::dwb_path(&pager.path));
256            }
257            // Load existing database (with header shadow fallback)
258            pager.load_header()?;
259            pager.bind_encryption_for_existing()?;
260        } else {
261            // Initialize new database
262            pager.initialize()?;
263            pager.bind_encryption_for_new()?;
264        }
265
266        Ok(pager)
267    }
268
269    /// gh-892 diagnostic predicate: returns `true` when the database
270    /// `page_size` is **not** an integer multiple of the filesystem's
271    /// reported block size (`st_blksize`), i.e. `page_size % fs_block_size
272    /// != 0`. A `fs_block_size` of `0` (probe unavailable / unknown) is
273    /// treated as aligned so a missing probe never produces a warning.
274    /// Pure function with no I/O so the warn decision is unit-testable.
275    pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
276        fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
277    }
278
279    #[cfg(test)]
280    fn cow_filesystem_test_override() -> Option<bool> {
281        match COW_ATOMIC_WRITE_TEST_OVERRIDE.load(Ordering::Relaxed) {
282            1 => Some(true),
283            2 => Some(false),
284            _ => None,
285        }
286    }
287
288    #[cfg(not(test))]
289    fn cow_filesystem_test_override() -> Option<bool> {
290        None
291    }
292
293    fn cow_filesystem_has_atomic_page_writes(
294        path: &Path,
295        file: &File,
296    ) -> Option<CowFilesystemKind> {
297        if let Some(allowed) = Self::cow_filesystem_test_override() {
298            return allowed.then_some(CowFilesystemKind::TestOverride);
299        }
300        Self::probe_cow_filesystem(path, file)
301    }
302
303    #[cfg(target_os = "linux")]
304    fn probe_cow_filesystem(path: &Path, file: &File) -> Option<CowFilesystemKind> {
305        let fs_type = linux_fstatfs_type(file)?;
306        match fs_type {
307            ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
308            BTRFS_SUPER_MAGIC => {
309                let mount_options = linux_mount_options_for_path(path)?;
310                let inode_flags = linux_inode_flags(file)?;
311                classify_cow_filesystem(fs_type, Some(&mount_options), Some(inode_flags))
312            }
313            _ => None,
314        }
315    }
316
317    #[cfg(not(target_os = "linux"))]
318    fn probe_cow_filesystem(_path: &Path, _file: &File) -> Option<CowFilesystemKind> {
319        None
320    }
321
322    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
323    /// the (key, marker) matrix:
324    ///
325    /// | Marker | Key supplied | Result                              |
326    /// |--------|--------------|-------------------------------------|
327    /// | yes    | yes          | Bind encryptor; validate key        |
328    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
329    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
330    /// | no     | no           | Plain pager — no binding needed     |
331    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
332        if self.page_count().unwrap_or(0) == 0 {
333            return self.bind_encryption_for_new();
334        }
335        let header_page = self.read_page_no_checksum(0)?;
336        let data = header_page.as_bytes();
337        let has_marker = reddb_file::paged_encryption_marker_present(data);
338
339        let key = self.config.encryption.clone();
340        match (has_marker, key) {
341            (true, Some(key)) => {
342                let header_bytes =
343                    reddb_file::paged_encryption_header_bytes(data).ok_or_else(|| {
344                        PagerError::InvalidDatabase("encryption header parse failed".to_string())
345                    })?;
346                let header = crate::storage::encryption::EncryptionHeader::from_bytes(header_bytes)
347                    .map_err(|e| {
348                        PagerError::InvalidDatabase(format!("encryption header parse failed: {e}"))
349                    })?;
350                if !header.validate(&key) {
351                    return Err(PagerError::InvalidKey);
352                }
353                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
354                self.encryption = Some((encryptor, header));
355                Ok(())
356            }
357            (true, None) => Err(PagerError::EncryptionRequired),
358            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
359            (false, None) => Ok(()),
360        }
361    }
362
363    /// New DB: if a key is configured, write the marker + header to
364    /// page 0 so subsequent opens detect encryption.
365    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
366        let Some(key) = self.config.encryption.clone() else {
367            return Ok(());
368        };
369        let header = crate::storage::encryption::EncryptionHeader::new(&key);
370        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
371
372        // Stamp the marker + header into page 0 if it's been
373        // initialised by `initialize()` already.
374        if self.page_count().unwrap_or(0) > 0 {
375            let mut page = self.read_page_no_checksum(0)?;
376            let data = page.as_bytes_mut();
377            let header_bytes = header.to_bytes();
378            reddb_file::write_paged_encryption_marker_and_header(data, &header_bytes)
379                .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
380            self.write_page_no_checksum(0, page)?;
381        }
382        self.encryption = Some((encryptor, header));
383        Ok(())
384    }
385
386    /// Open with default configuration
387    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
388        Self::open(path, PagerConfig::default())
389    }
390
391    /// Initialize a new database
392    fn initialize(&self) -> Result<(), PagerError> {
393        if self.config.read_only {
394            return Err(PagerError::ReadOnly);
395        }
396
397        // Create header page. Page ids 1 and 2 are reserved so fixed
398        // metadata/vault pages cannot be handed out to normal B-tree
399        // allocation before those subsystems are initialized.
400        let initial_page_count = 3;
401        let header_page = Page::new_header_page(initial_page_count);
402        self.header_write()?.page_count = initial_page_count;
403
404        // Write header and reserved pages so any scan over 0..page_count
405        // can read every allocated page in a brand-new database.
406        self.write_page_raw(0, &header_page)?;
407        let mut metadata_page = Page::new(PageType::Header, 1);
408        metadata_page.update_checksum();
409        self.write_page_raw(1, &metadata_page)?;
410        let mut vault_page = Page::new(PageType::Vault, 2);
411        vault_page.update_checksum();
412        self.write_page_raw(2, &vault_page)?;
413
414        // Sync to disk
415        self.sync()?;
416
417        Ok(())
418    }
419
420    /// Acquire a write lock on the header RwLock, mapping poison errors.
421    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
422        self.header.write().map_err(|_| PagerError::LockPoisoned)
423    }
424
425    /// Acquire a read lock on the header RwLock, mapping poison errors.
426    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
427        self.header.read().map_err(|_| PagerError::LockPoisoned)
428    }
429
430    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
431    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
432        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
433    }
434
435    /// Acquire a lock on the file Mutex, mapping poison errors.
436    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
437        self.file.lock().map_err(|_| PagerError::LockPoisoned)
438    }
439
440    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
441    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
442        self.header_dirty
443            .lock()
444            .map_err(|_| PagerError::LockPoisoned)
445    }
446
447    /// Load database header from page 0 (with shadow fallback)
448    fn load_header(&self) -> Result<(), PagerError> {
449        // Read page 0 — fall back to shadow if corrupted
450        let header_page = match self.read_page_raw(0) {
451            Ok(page) => {
452                // Verify magic bytes
453                if reddb_file::database_header_magic_matches(page.as_bytes()) {
454                    page
455                } else {
456                    // Page 0 corrupted — try shadow
457                    self.recover_header_from_shadow()?
458                }
459            }
460            Err(_) => self.recover_header_from_shadow()?,
461        };
462
463        let decoded_header = reddb_file::decode_database_header(header_page.as_bytes())
464            .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
465        let freelist_head = decoded_header.freelist_head;
466
467        {
468            let mut header = self.header_write()?;
469            *header = decoded_header;
470        }
471
472        // Initialize freelist
473        {
474            let mut freelist = self.freelist_write()?;
475            *freelist = FreeList::from_header(freelist_head, 0);
476        }
477
478        Ok(())
479    }
480
481    /// Write header page
482    ///
483    /// Note: This merges database header fields into the existing page 0 content
484    /// to preserve any additional data (like encryption headers) that may be stored there.
485    fn write_header(&self) -> Result<(), PagerError> {
486        if self.config.read_only {
487            return Err(PagerError::ReadOnly);
488        }
489
490        let header = self.header_read()?;
491
492        // Read existing page 0 to preserve any additional data (e.g., encryption header)
493        // First check cache, then fall back to disk
494        let mut page = if let Some(cached) = self.cache.get(0) {
495            cached
496        } else {
497            // Try to read from disk if file is large enough
498            let file = self.file_lock()?;
499            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
500            drop(file);
501
502            if len >= PAGE_SIZE as u64 {
503                self.read_page_raw(0)?
504            } else {
505                // File is new/empty, create fresh header page
506                Page::new(PageType::Header, 0)
507            }
508        };
509
510        let data = page.as_bytes_mut();
511
512        reddb_file::encode_database_header(data, &header)
513            .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
514
515        page.update_checksum();
516
517        // Write header shadow FIRST (so it's intact if main write is interrupted)
518        self.write_header_shadow(&page)?;
519
520        self.write_page_raw(0, &page)?;
521        *self.header_dirty_lock()? = false;
522
523        Ok(())
524    }
525
526    /// Read a page from disk (bypassing cache)
527    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
528        let mut file = self.file_lock()?;
529        let offset = (page_id as u64) * (PAGE_SIZE as u64);
530
531        file.seek(SeekFrom::Start(offset))?;
532
533        let mut buf = [0u8; PAGE_SIZE];
534        file.read_exact(&mut buf)?;
535
536        let page = Page::from_bytes(buf);
537
538        // Verify checksum if configured (including page 0)
539        if self.config.verify_checksums {
540            page.verify_checksum()?;
541        }
542
543        Ok(page)
544    }
545
546    /// Write a page to disk (bypassing cache)
547    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
548        if self.config.read_only {
549            return Err(PagerError::ReadOnly);
550        }
551
552        let mut file = self.file_lock()?;
553        let offset = (page_id as u64) * (PAGE_SIZE as u64);
554
555        file.seek(SeekFrom::Start(offset))?;
556        file.write_all(page.as_bytes())?;
557
558        Ok(())
559    }
560
561    /// Read a page (cache-aware)
562    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
563        // Check cache first
564        if let Some(page) = self.cache.get(page_id) {
565            return Ok(page);
566        }
567
568        // Cache miss - read from disk
569        let page = self.read_page_raw(page_id)?;
570
571        // Add to cache
572        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
573            // Evicted page was dirty, need to write it back
574            let evicted_id = dirty_page.page_id();
575            self.write_page_raw(evicted_id, &dirty_page)?;
576        }
577
578        Ok(page)
579    }
580
581    /// Read a page without verifying checksum (for encrypted pages)
582    ///
583    /// Use this when the page content has its own integrity protection
584    /// (e.g., AES-GCM authentication tag for encrypted pages).
585    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
586        // Check cache first
587        if let Some(page) = self.cache.get(page_id) {
588            return Ok(page);
589        }
590
591        // Cache miss - read from disk (skip checksum verification)
592        let mut file = self.file_lock()?;
593        let offset = (page_id as u64) * (PAGE_SIZE as u64);
594
595        file.seek(SeekFrom::Start(offset))?;
596
597        let mut buf = [0u8; PAGE_SIZE];
598        file.read_exact(&mut buf)?;
599        drop(file);
600
601        let page = Page::from_bytes(buf);
602
603        // Add to cache (no checksum verification)
604        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
605            // Evicted page was dirty, need to write it back
606            let evicted_id = dirty_page.page_id();
607            self.write_page_raw(evicted_id, &dirty_page)?;
608        }
609
610        Ok(page)
611    }
612
613    /// Write a page (cache-aware)
614    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
615        if self.config.read_only {
616            return Err(PagerError::ReadOnly);
617        }
618
619        // Update checksum
620        page.update_checksum();
621
622        // Add to cache and mark dirty. If the cache had to evict a
623        // dirty entry, write it through immediately — the evicted
624        // page will never be flushed otherwise (same bug fixed in
625        // `allocate_page`).
626        if let Some(dirty_page) = self.cache.insert(page_id, page) {
627            let evicted_id = dirty_page.page_id();
628            self.write_page_raw(evicted_id, &dirty_page)?;
629        }
630        self.cache.mark_dirty(page_id);
631
632        Ok(())
633    }
634
635    /// Read a page through the configured encryptor if any. Page 0
636    /// is always returned plaintext (it carries the encryption marker
637    /// + header). Callers that want raw cipher bytes can use
638    ///   `read_page_no_checksum` directly.
639    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
640        if page_id == 0 || self.encryption.is_none() {
641            return self.read_page(page_id);
642        }
643        let raw = self.read_page_no_checksum(page_id)?;
644        let (enc, _) = self
645            .encryption
646            .as_ref()
647            .expect("encryption presence checked above");
648        let plaintext = enc
649            .decrypt(page_id, raw.as_bytes())
650            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
651        let mut buf = [0u8; PAGE_SIZE];
652        let n = plaintext.len().min(PAGE_SIZE);
653        buf[..n].copy_from_slice(&plaintext[..n]);
654        Ok(Page::from_bytes(buf))
655    }
656
657    /// Write a page through the configured encryptor if any. Page 0
658    /// bypasses encryption and goes through the normal checksummed
659    /// path. Encrypted pages skip the checksum update because
660    /// AES-GCM's authentication tag is the integrity guarantee.
661    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
662        if page_id == 0 || self.encryption.is_none() {
663            return self.write_page(page_id, page);
664        }
665        // Canonical per-page envelope overhead (nonce + GCM tag),
666        // owned by reddb-io-crypto (#1053, ADR 0054).
667        const OVERHEAD: usize = crate::storage::encryption::page_encryptor::OVERHEAD;
668        let plaintext_len = PAGE_SIZE - OVERHEAD;
669        let plaintext = &page.as_bytes()[..plaintext_len];
670        let (enc, _) = self
671            .encryption
672            .as_ref()
673            .expect("encryption presence checked above");
674        let ciphertext = enc.encrypt(page_id, plaintext);
675        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
676        let mut buf = [0u8; PAGE_SIZE];
677        buf.copy_from_slice(&ciphertext);
678        let cipher_page = Page::from_bytes(buf);
679        self.write_page_no_checksum(page_id, cipher_page)
680    }
681
682    /// Write a page without updating checksum (for encrypted pages)
683    ///
684    /// Use this when the page content has its own integrity protection
685    /// (e.g., AES-GCM authentication tag for encrypted pages).
686    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
687        if self.config.read_only {
688            return Err(PagerError::ReadOnly);
689        }
690
691        // Add to cache and mark dirty (no checksum update). Same
692        // eviction-write-through guard as `write_page`.
693        if let Some(dirty_page) = self.cache.insert(page_id, page) {
694            let evicted_id = dirty_page.page_id();
695            self.write_page_raw(evicted_id, &dirty_page)?;
696        }
697        self.cache.mark_dirty(page_id);
698
699        Ok(())
700    }
701
702    /// Allocate a new page
703    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
704        if self.config.read_only {
705            return Err(PagerError::ReadOnly);
706        }
707
708        // Try to get from freelist first
709        let page_id = {
710            let mut freelist = self.freelist_write()?;
711            if let Some(id) = freelist.allocate() {
712                id
713            } else if freelist.trunk_head() != 0 {
714                let trunk_id = freelist.trunk_head();
715                drop(freelist);
716
717                let trunk = self.read_page(trunk_id).map_err(|e| match e {
718                    PagerError::PageNotFound(_) => {
719                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
720                    }
721                    other => other,
722                })?;
723
724                let mut freelist = self.freelist_write()?;
725                freelist
726                    .load_from_trunk(&trunk)
727                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
728                let id = freelist.allocate().ok_or_else(|| {
729                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
730                })?;
731
732                let mut header = self.header_write()?;
733                header.freelist_head = freelist.trunk_head();
734                *self.header_dirty_lock()? = true;
735
736                id
737            } else {
738                // No free pages, extend file
739                let mut header = self.header_write()?;
740                let id = header.page_count;
741                header.page_count += 1;
742                *self.header_dirty_lock()? = true;
743                id
744            }
745        };
746
747        let page = Page::new(page_type, page_id);
748
749        // Write to cache. The evicted page (if any) is dirty by
750        // definition — `cache.insert` only returns `Some` when it
751        // had to evict a dirty entry to make room. The previous
752        // version dropped that return value, which silently lost
753        // writes whenever a freshly-allocated page caused an LRU
754        // eviction. This shows up under heavy ingest as
755        // "B-tree insert error: Pager error: I/O error: failed to
756        // fill whole buffer" later, when something tries to read
757        // back the never-flushed page. Mirror `read_page`'s
758        // handling: write the evicted page through immediately so
759        // the on-disk image stays consistent.
760        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
761            let evicted_id = dirty_page.page_id();
762            self.write_page_raw(evicted_id, &dirty_page)?;
763        }
764        self.cache.mark_dirty(page_id);
765
766        Ok(page)
767    }
768
769    /// Reserve a contiguous extent of vector pages.
770    pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
771        if self.config.read_only {
772            return Err(PagerError::ReadOnly);
773        }
774        if n_pages == 0 {
775            return Err(PagerError::InvalidDatabase(
776                "contiguous extent must reserve at least one page".to_string(),
777            ));
778        }
779
780        let start_page = {
781            let mut header = self.header_write()?;
782            let start = header.page_count;
783            header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
784                PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
785            })?;
786            *self.header_dirty_lock()? = true;
787            start
788        };
789
790        for page_id in start_page..start_page + n_pages {
791            let mut page = Page::new(PageType::Vector, page_id);
792            page.update_checksum();
793            if let Some(dirty_page) = self.cache.insert(page_id, page) {
794                let evicted_id = dirty_page.page_id();
795                self.write_page_raw(evicted_id, &dirty_page)?;
796            }
797            self.cache.mark_dirty(page_id);
798        }
799
800        Ok(super::ExtentId {
801            start_page,
802            n_pages,
803        })
804    }
805
806    /// Free a page (return to freelist)
807    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
808        if self.config.read_only {
809            return Err(PagerError::ReadOnly);
810        }
811
812        // Remove from cache
813        self.cache.remove(page_id);
814
815        // Add to freelist
816        let mut freelist = self.freelist_write()?;
817        freelist.free(page_id);
818
819        *self.header_dirty_lock()? = true;
820
821        Ok(())
822    }
823
824    /// Get database header
825    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
826        Ok(self.header_read()?.clone())
827    }
828
829    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
830        Ok(self.header_read()?.physical)
831    }
832
833    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
834        if self.config.read_only {
835            return Err(PagerError::ReadOnly);
836        }
837
838        let mut header = self.header_write()?;
839        header.physical = physical;
840        *self.header_dirty_lock()? = true;
841        Ok(())
842    }
843
844    /// Get page count
845    pub fn page_count(&self) -> Result<u32, PagerError> {
846        Ok(self.header_read()?.page_count)
847    }
848
849    /// Attach a WAL writer to enforce WAL-first flush ordering.
850    ///
851    /// After this call, [`Pager::flush`] computes the maximum
852    /// `header.lsn` over all dirty pages and calls
853    /// `WalWriter::flush_until(max_lsn)` before any page is written
854    /// to the data file. This is the postgres rule: a page on disk
855    /// implies its WAL record is already durable on disk.
856    ///
857    /// Existing call sites that construct a Pager without a WAL
858    /// keep their previous behaviour (no LSN check) — wiring is
859    /// strictly opt-in.
860    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
861        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
862        *slot = Some(wal);
863    }
864
865    /// Detach the WAL writer (test / shutdown path).
866    pub fn clear_wal_writer(&self) {
867        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
868        *slot = None;
869    }
870
871    /// Has a WAL writer been attached?
872    pub fn has_wal_writer(&self) -> bool {
873        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
874    }
875
876    /// Flush all dirty pages to disk
877    pub fn flush(&self) -> Result<(), PagerError> {
878        if self.config.read_only {
879            return Ok(());
880        }
881
882        // Persist freelist to trunk pages when dirty
883        let trunks = {
884            let mut freelist = self.freelist_write()?;
885            if freelist.is_dirty() {
886                let mut header = self.header_write()?;
887                let trunks = freelist.flush_to_trunks(0, || {
888                    let id = header.page_count;
889                    header.page_count += 1;
890                    id
891                });
892                header.freelist_head = freelist.trunk_head();
893                *self.header_dirty_lock()? = true;
894                freelist.mark_clean();
895                trunks
896            } else {
897                Vec::new()
898            }
899        };
900
901        for trunk in trunks {
902            let page_id = trunk.page_id();
903            self.cache.insert(page_id, trunk);
904            self.cache.mark_dirty(page_id);
905        }
906
907        // Flush dirty pages from cache (through DWB if enabled)
908        let dirty_pages = self.cache.flush_dirty();
909        if !dirty_pages.is_empty() {
910            // WAL-FIRST: ensure every WAL record describing a dirty
911            // page is durable BEFORE the page itself reaches disk.
912            // Pages with `lsn == 0` are exempt (freelist trunks, header
913            // shadow pages, anything not produced by a WAL append).
914            let max_lsn = dirty_pages
915                .iter()
916                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
917                .max()
918                .unwrap_or(0);
919            if max_lsn > 0 {
920                if let Ok(slot) = self.wal.read() {
921                    if let Some(wal) = slot.as_ref() {
922                        let wal = Arc::clone(wal);
923                        // Drop the read lock before taking the WAL
924                        // mutex so an unrelated reader cannot block
925                        // the flush path.
926                        drop(slot);
927                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
928                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
929                    }
930                }
931            }
932            self.write_pages_through_dwb(&dirty_pages)?;
933        }
934
935        // Write header if dirty
936        if *self.header_dirty_lock()? {
937            self.write_header()?;
938        }
939
940        Ok(())
941    }
942
943    /// Sync file to disk (fsync)
944    pub fn sync(&self) -> Result<(), PagerError> {
945        self.flush()?;
946
947        let file = self.file_lock()?;
948        file.sync_all()?;
949
950        Ok(())
951    }
952
953    /// Get cache statistics
954    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
955        self.cache.stats()
956    }
957
958    /// Count dirty pages currently in the page cache.
959    pub fn dirty_page_count(&self) -> usize {
960        self.cache.dirty_count()
961    }
962
963    /// Estimated fraction of the page cache holding dirty pages.
964    /// Returned in `[0, 1]`. Used by the background writer to
965    /// decide when to kick in aggressive flushing.
966    pub fn dirty_fraction(&self) -> f64 {
967        let capacity = self.cache.capacity().max(1) as f64;
968        self.cache.dirty_count() as f64 / capacity
969    }
970
971    /// Flush up to `max` dirty pages from the cache. Returns the
972    /// number actually written. Background-writer entry point —
973    /// reuses the same persistence path as `flush()` but bounded.
974    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
975        if self.config.read_only || max == 0 {
976            return Ok(0);
977        }
978        let dirty_pages = self.cache.flush_some_dirty(max);
979        if dirty_pages.is_empty() {
980            return Ok(0);
981        }
982        let count = dirty_pages.len();
983        // WAL-first: every cached dirty page carries an LSN that the
984        // WAL must have already persisted. The full `flush()` path
985        // enforces this with `wal.flush(max_lsn)`; here we simply
986        // write through the pager — safe because callers only reach
987        // this path via the bgwriter, which runs asynchronously
988        // alongside normal commits that already respect WAL-first.
989        for (page_id, page) in dirty_pages {
990            self.write_page(page_id, page)?;
991        }
992        Ok(count)
993    }
994
995    /// Get database file path
996    pub fn path(&self) -> &Path {
997        &self.path
998    }
999
1000    /// Check if database is read-only
1001    pub fn is_read_only(&self) -> bool {
1002        self.config.read_only
1003    }
1004
1005    /// Get file size in bytes
1006    pub fn file_size(&self) -> Result<u64, PagerError> {
1007        let file = self.file_lock()?;
1008        Ok(file.metadata()?.len())
1009    }
1010
1011    /// Issue an OS-level read-ahead hint for `page_id`.
1012    ///
1013    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1014    /// cursor passes 50% of the current leaf, so the kernel fetches
1015    /// the next leaf page while CPU processes the remaining half of
1016    /// the current one. Failures are silent — a missed prefetch is a
1017    /// performance miss, never a correctness bug.
1018    pub fn prefetch_hint(&self, page_id: u32) {
1019        if let Ok(file) = self.file_lock() {
1020            let _ = crate::storage::btree::prefetch::prefetch_page(
1021                &file,
1022                page_id as u64,
1023                PAGE_SIZE as u32,
1024            );
1025        }
1026    }
1027
1028    // ── Corruption defense helpers ──────────────────────────────────
1029
1030    /// Path for the header shadow file
1031    fn shadow_path(db_path: &Path) -> PathBuf {
1032        reddb_file::layout::pager_header_shadow_path(db_path)
1033    }
1034
1035    /// Path for the metadata shadow file
1036    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1037        reddb_file::layout::pager_meta_shadow_path(db_path)
1038    }
1039
1040    /// Path for the double-write buffer file
1041    fn dwb_path(db_path: &Path) -> PathBuf {
1042        reddb_file::layout::pager_dwb_shadow_path(db_path)
1043    }
1044
1045    /// Open the double-write buffer file without truncating existing content.
1046    ///
1047    /// The file is intentionally preserved across restarts so recovery can
1048    /// consume any crash-leftover pages before the next write cycle clears it.
1049    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1050        Ok(OpenOptions::new()
1051            .read(true)
1052            .write(true)
1053            .create(true)
1054            .truncate(false)
1055            .open(Self::dwb_path(db_path))?)
1056    }
1057
1058    /// Clear the DWB in place while preserving the file path and handle.
1059    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1060        file.set_len(0)?;
1061        file.seek(SeekFrom::Start(0))?;
1062        file.sync_all()?;
1063        Ok(())
1064    }
1065
1066    /// Write a shadow copy of the header page.
1067    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1068        if self.config.read_only {
1069            return Ok(());
1070        }
1071        let shadow = Self::shadow_path(&self.path);
1072        let mut f = File::create(&shadow)?;
1073        f.write_all(page.as_bytes())?;
1074        f.sync_all()?;
1075        Ok(())
1076    }
1077
1078    /// Recover header from shadow file when page 0 is corrupted
1079    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1080        let shadow = Self::shadow_path(&self.path);
1081        if !shadow.exists() {
1082            return Err(PagerError::InvalidDatabase(
1083                "Page 0 corrupted and no header shadow found".into(),
1084            ));
1085        }
1086        let mut f = File::open(&shadow)?;
1087        let mut buf = [0u8; PAGE_SIZE];
1088        f.read_exact(&mut buf)?;
1089        let page = Page::from_bytes(buf);
1090
1091        // Verify shadow is valid
1092        if !reddb_file::database_header_magic_matches(page.as_bytes()) {
1093            return Err(PagerError::InvalidDatabase(
1094                "Header shadow also corrupted".into(),
1095            ));
1096        }
1097
1098        // Restore page 0 from shadow
1099        if !self.config.read_only {
1100            self.write_page_raw(0, &page)?;
1101            let file = self.file_lock()?;
1102            file.sync_all()?;
1103        }
1104
1105        Ok(page)
1106    }
1107
1108    /// Write a shadow copy of the metadata page.
1109    ///
1110    /// When the process-global `fold_pager_meta` policy is enabled (see
1111    /// [`crate::physical::fold_pager_meta_enabled`]) the shadow is suppressed:
1112    /// metadata is sourced exclusively from page 1 (plus its overflow chain).
1113    /// Any pre-existing `<data>-meta` file is also removed so a flipped flag
1114    /// cannot leave a stale shadow on disk. Reads still tolerate the sidecar
1115    /// when present so databases written before the flag flipped remain
1116    /// loadable.
1117    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1118        if self.config.read_only {
1119            return Ok(());
1120        }
1121        let shadow = Self::meta_shadow_path(&self.path);
1122        if crate::physical::fold_pager_meta_enabled() {
1123            // Best-effort cleanup of any prior shadow — a missing file is not
1124            // an error condition here.
1125            let _ = std::fs::remove_file(&shadow);
1126            return Ok(());
1127        }
1128        let mut f = File::create(&shadow)?;
1129        f.write_all(page.as_bytes())?;
1130        f.sync_all()?;
1131        Ok(())
1132    }
1133
1134    /// Recover metadata page from shadow file when page 1 is corrupted
1135    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1136        let shadow = Self::meta_shadow_path(&self.path);
1137        if !shadow.exists() {
1138            return Err(PagerError::InvalidDatabase(
1139                "Page 1 corrupted and no metadata shadow found".into(),
1140            ));
1141        }
1142        let mut f = File::open(&shadow)?;
1143        let mut buf = [0u8; PAGE_SIZE];
1144        f.read_exact(&mut buf)?;
1145        let page = Page::from_bytes(buf);
1146
1147        // Restore page 1 from shadow
1148        if !self.config.read_only {
1149            self.write_page_raw(1, &page)?;
1150            let file = self.file_lock()?;
1151            file.sync_all()?;
1152        }
1153
1154        Ok(page)
1155    }
1156
1157    /// Write pages through the double-write buffer for torn page protection.
1158    ///
1159    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1160    /// 2. fsync the DWB
1161    /// 3. Write all pages to their final locations in the .rdb file
1162    /// 4. Truncate the DWB (marks as consumed)
1163    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1164        if let Some(dwb_mutex) = &self.dwb_file {
1165            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1166
1167            let buf = reddb_file::encode_paged_dwb_frame(
1168                pages
1169                    .iter()
1170                    .map(|(page_id, page)| (*page_id, page.as_bytes())),
1171            );
1172
1173            // Write DWB and fsync
1174            dwb.seek(SeekFrom::Start(0))?;
1175            dwb.write_all(&buf)?;
1176            dwb.set_len(buf.len() as u64)?;
1177            dwb.sync_all()?;
1178
1179            // Now write pages to their final locations
1180            for (page_id, page) in pages {
1181                self.write_page_raw(*page_id, page)?;
1182            }
1183
1184            // Truncate DWB to mark as consumed
1185            Self::clear_dwb_file(&mut dwb)?;
1186
1187            Ok(())
1188        } else {
1189            // DWB disabled — write directly
1190            for (page_id, page) in pages {
1191                self.write_page_raw(*page_id, page)?;
1192            }
1193            Ok(())
1194        }
1195    }
1196
1197    /// Recover from double-write buffer after a crash.
1198    ///
1199    /// If the DWB file contains valid pages, they were written before the crash
1200    /// interrupted writing to the main file. Re-apply them.
1201    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1202        let dwb_path = Self::dwb_path(&self.path);
1203        if !dwb_path.exists() {
1204            return Ok(());
1205        }
1206
1207        if let Some(dwb_mutex) = &self.dwb_file {
1208            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1209            return self.recover_from_dwb_file(&mut file);
1210        }
1211
1212        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1213        self.recover_from_dwb_file(&mut file)
1214    }
1215
1216    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1217        file.seek(SeekFrom::Start(0))?;
1218        let len = file.metadata()?.len();
1219        let mut buf = vec![0u8; len as usize];
1220        file.read_exact(&mut buf)?;
1221
1222        let entries = match reddb_file::decode_paged_dwb_frame(&buf) {
1223            Ok(entries) => entries,
1224            Err(_) => return Self::clear_dwb_file(file),
1225        };
1226
1227        // DWB is valid — re-apply pages to main file
1228        for entry in entries {
1229            let page = Page::from_bytes(entry.page);
1230            self.write_page_raw(entry.page_id, &page)?;
1231        }
1232
1233        // Sync and clean up
1234        {
1235            let file = self.file_lock()?;
1236            file.sync_all()?;
1237        }
1238
1239        Self::clear_dwb_file(file)
1240    }
1241
1242    /// Write header and sync to disk (public for checkpointer).
1243    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1244        self.write_header()?;
1245        let file = self.file_lock()?;
1246        file.sync_all()?;
1247        Ok(())
1248    }
1249
1250    /// Set the checkpoint_in_progress flag in the header.
1251    pub fn set_checkpoint_in_progress(
1252        &self,
1253        in_progress: bool,
1254        target_lsn: u64,
1255    ) -> Result<(), PagerError> {
1256        let mut header = self.header_write()?;
1257        header.checkpoint_in_progress = in_progress;
1258        header.checkpoint_target_lsn = target_lsn;
1259        *self.header_dirty_lock()? = true;
1260        drop(header);
1261        self.write_header_and_sync()
1262    }
1263
1264    /// Update the checkpoint LSN and clear the in-progress flag.
1265    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1266        let mut header = self.header_write()?;
1267        header.checkpoint_lsn = lsn;
1268        header.checkpoint_in_progress = false;
1269        header.checkpoint_target_lsn = 0;
1270        *self.header_dirty_lock()? = true;
1271        drop(header);
1272        self.write_header_and_sync()
1273    }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278    use super::*;
1279
1280    fn temp_db_path(name: &str) -> PathBuf {
1281        std::env::temp_dir().join(format!(
1282            "reddb-pager-{}-{}-{}.rdb",
1283            name,
1284            std::process::id(),
1285            crate::utils::now_unix_nanos()
1286        ))
1287    }
1288
1289    #[test]
1290    fn open_refuses_future_database_version() {
1291        let path = temp_db_path("future-version");
1292        let pager = Pager::open_default(&path).unwrap();
1293        drop(pager);
1294
1295        let mut future_header = Page::new_header_page(1);
1296        reddb_file::set_database_header_version(
1297            future_header.as_bytes_mut(),
1298            reddb_file::PAGE_FILE_VERSION + 1,
1299        )
1300        .unwrap();
1301        future_header.update_checksum();
1302
1303        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1304        file.seek(SeekFrom::Start(0)).unwrap();
1305        file.write_all(future_header.as_bytes()).unwrap();
1306        file.sync_all().unwrap();
1307        drop(file);
1308
1309        let err = match Pager::open_default(&path) {
1310            Ok(_) => panic!("future database version should be rejected"),
1311            Err(err) => err,
1312        };
1313        match err {
1314            PagerError::InvalidDatabase(msg) => {
1315                assert!(msg.contains("newer than supported"));
1316            }
1317            other => panic!("expected InvalidDatabase, got {other:?}"),
1318        }
1319
1320        let _ = std::fs::remove_file(&path);
1321        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1322        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1323    }
1324}