Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3pub(super) const BTRFS_SUPER_MAGIC: i64 = 0x9123_683e;
4pub(super) const ZFS_SUPER_MAGIC: i64 = 0x2fc1_2fc1;
5pub(super) const FS_NOCOW_FL: u64 = 0x0080_0000;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub(super) enum CowFilesystemKind {
9    Zfs,
10    BtrfsDataCow,
11    TestOverride,
12}
13
14pub(super) fn classify_cow_filesystem(
15    fs_type: i64,
16    mount_options: Option<&str>,
17    inode_flags: Option<u64>,
18) -> Option<CowFilesystemKind> {
19    match fs_type {
20        ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
21        BTRFS_SUPER_MAGIC => {
22            let mount_options = mount_options?;
23            if mount_options.split(',').any(|option| option == "nodatacow") {
24                return None;
25            }
26
27            let inode_flags = inode_flags?;
28            if inode_flags & FS_NOCOW_FL != 0 {
29                return None;
30            }
31
32            Some(CowFilesystemKind::BtrfsDataCow)
33        }
34        _ => None,
35    }
36}
37
38#[cfg(target_os = "linux")]
39fn linux_fstatfs_type(file: &File) -> Option<i64> {
40    use std::mem::MaybeUninit;
41    use std::os::fd::AsRawFd;
42
43    let mut stat = MaybeUninit::<libc::statfs>::uninit();
44    let rc = unsafe { libc::fstatfs(file.as_raw_fd(), stat.as_mut_ptr()) };
45    if rc != 0 {
46        return None;
47    }
48    let stat = unsafe { stat.assume_init() };
49    // `statfs.f_type` is i32 on 32-bit glibc targets (armv7) and i64 on
50    // 64-bit ones — widen explicitly so both compile.
51    #[allow(clippy::unnecessary_cast)]
52    Some(stat.f_type as i64)
53}
54
55#[cfg(target_os = "linux")]
56fn linux_inode_flags(file: &File) -> Option<u64> {
57    use std::os::fd::AsRawFd;
58
59    let mut flags: libc::c_long = 0;
60    let rc = unsafe { libc::ioctl(file.as_raw_fd(), libc::FS_IOC_GETFLAGS, &mut flags) };
61    if rc != 0 {
62        return None;
63    }
64    Some(flags as u64)
65}
66
67#[cfg(target_os = "linux")]
68fn linux_mount_options_for_path(path: &Path) -> Option<String> {
69    let path = path.canonicalize().ok()?;
70    let mountinfo = std::fs::read_to_string("/proc/self/mountinfo").ok()?;
71    parse_mountinfo_options_for_path(&mountinfo, &path)
72}
73
74#[cfg(target_os = "linux")]
75pub(super) fn parse_mountinfo_options_for_path(mountinfo: &str, path: &Path) -> Option<String> {
76    let mut best: Option<(usize, String)> = None;
77
78    for line in mountinfo.lines() {
79        let fields: Vec<&str> = line.split(' ').collect();
80        if fields.len() < 10 {
81            continue;
82        }
83
84        let Some(separator) = fields.iter().position(|field| *field == "-") else {
85            continue;
86        };
87        if separator + 3 >= fields.len() || separator < 6 {
88            continue;
89        }
90
91        let mount_point = mountinfo_unescape_path(fields[4]);
92        if !path.starts_with(&mount_point) {
93            continue;
94        }
95
96        let fs_type = fields[separator + 1];
97        if fs_type != "btrfs" && fs_type != "zfs" {
98            continue;
99        }
100
101        let mount_options = fields[5];
102        let super_options = fields[separator + 3];
103        let options = format!("{mount_options},{super_options}");
104        let depth = mount_point.components().count();
105        if best
106            .as_ref()
107            .map(|(best_depth, _)| depth > *best_depth)
108            .unwrap_or(true)
109        {
110            best = Some((depth, options));
111        }
112    }
113
114    best.map(|(_, options)| options)
115}
116
117#[cfg(target_os = "linux")]
118fn mountinfo_unescape_path(value: &str) -> PathBuf {
119    let bytes = value.as_bytes();
120    let mut out = Vec::with_capacity(bytes.len());
121    let mut i = 0;
122
123    while i < bytes.len() {
124        if bytes[i] == b'\\' && i + 3 < bytes.len() {
125            let octal = &value[i + 1..i + 4];
126            if let Ok(byte) = u8::from_str_radix(octal, 8) {
127                out.push(byte);
128                i += 4;
129                continue;
130            }
131        }
132        out.push(bytes[i]);
133        i += 1;
134    }
135
136    PathBuf::from(String::from_utf8_lossy(&out).into_owned())
137}
138
139impl Pager {
140    /// Open or create a database file
141    pub fn open<P: AsRef<Path>>(path: P, mut config: PagerConfig) -> Result<Self, PagerError> {
142        let path = path.as_ref().to_path_buf();
143        let exists = path.exists();
144
145        if !exists && !config.create {
146            return Err(PagerError::InvalidDatabase(
147                "Database does not exist".into(),
148            ));
149        }
150
151        if !exists && config.read_only {
152            return Err(PagerError::InvalidDatabase(
153                "Cannot create read-only database".into(),
154            ));
155        }
156
157        // Open file
158        // Note: create requires write access, so disable it for read-only mode
159        let file = OpenOptions::new()
160            .read(true)
161            .write(!config.read_only)
162            .create(config.create && !config.read_only)
163            .open(&path)?;
164
165        // gh-892: diagnostic probe of the filesystem block size. If the
166        // compile-time 16 KiB PAGE_SIZE is not a multiple of the FS block
167        // size, page writes straddle FS blocks and incur read-modify-write
168        // amplification. Pure diagnostic — emitted once per open, never
169        // mutates the page size. `blksize()` is the fstat `st_blksize`.
170        #[cfg(unix)]
171        {
172            use std::os::unix::fs::MetadataExt;
173            if let Ok(meta) = file.metadata() {
174                let fs_block_size = meta.blksize();
175                if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
176                    tracing::warn!(
177                        page_size = PAGE_SIZE,
178                        fs_block_size,
179                        path = %path.display(),
180                        "database page size is not a multiple of the filesystem \
181                         block size; page writes will straddle FS blocks \
182                         (read-modify-write amplification). Diagnostic only — \
183                         the page size is unchanged."
184                    );
185                }
186            }
187        }
188
189        // Acquire file lock (exclusive for writes, shared for read-only)
190        let lock_file = if !config.read_only {
191            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
192            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
193            Some(lf)
194        } else {
195            let lf = OpenOptions::new().read(true).open(&path)?;
196            match lf.try_lock_shared() {
197                Ok(_) => Some(lf),
198                Err(_) => None,
199            }
200        };
201
202        // Open double-write buffer file.
203        //
204        // gh-478: when `fold_dwb_into_wal` is enabled the DWB sidecar is
205        // suppressed — torn pages are healed by replaying FullPageImage WAL
206        // records during recovery. Any pre-existing `-dwb` is removed so a
207        // flipped flag cannot leave a stale sidecar on disk.
208        //
209        // gh-895: an explicit `double_write = false` request is honored only
210        // when the already-open data file is proven to live on a filesystem
211        // with atomic CoW page writes. Unknown and non-CoW filesystems fail
212        // closed by keeping the DWB sidecar.
213        let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
214        if !config.double_write && !config.read_only && !fold_dwb {
215            let skip_dwb_on_cow =
216                Self::cow_filesystem_has_atomic_page_writes(&path, &file).is_some();
217            if !skip_dwb_on_cow {
218                tracing::warn!(
219                    path = %path.display(),
220                    "double_write=false requested, but the data file is not proven to be on \
221                     ZFS or btrfs datacow; keeping the double-write buffer enabled"
222                );
223                config.double_write = true;
224            }
225        }
226
227        let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
228            let f = Self::open_dwb_file(&path)?;
229            Some(Mutex::new(f))
230        } else {
231            if fold_dwb && !config.read_only {
232                let _ = std::fs::remove_file(Self::dwb_path(&path));
233            }
234            None
235        };
236
237        let mut pager = Self {
238            path,
239            file: Mutex::new(file),
240            _lock_file: lock_file,
241            dwb_file,
242            cache: PageCache::new(config.cache_size),
243            freelist: RwLock::new(FreeList::new()),
244            header: RwLock::new(DatabaseHeader::default()),
245            config,
246            header_dirty: Mutex::new(false),
247            wal: RwLock::new(None),
248            encryption: None,
249        };
250
251        if exists {
252            // Recover from double-write buffer if needed
253            pager.recover_from_dwb()?;
254            if !pager.config.double_write && !pager.config.read_only {
255                let _ = std::fs::remove_file(Self::dwb_path(&pager.path));
256            }
257            // Load existing database (with header shadow fallback)
258            pager.load_header()?;
259            pager.bind_encryption_for_existing()?;
260        } else {
261            // Initialize new database
262            pager.initialize()?;
263            pager.bind_encryption_for_new()?;
264        }
265
266        Ok(pager)
267    }
268
269    /// gh-892 diagnostic predicate: returns `true` when the database
270    /// `page_size` is **not** an integer multiple of the filesystem's
271    /// reported block size (`st_blksize`), i.e. `page_size % fs_block_size
272    /// != 0`. A `fs_block_size` of `0` (probe unavailable / unknown) is
273    /// treated as aligned so a missing probe never produces a warning.
274    /// Pure function with no I/O so the warn decision is unit-testable.
275    pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
276        fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
277    }
278
279    #[cfg(test)]
280    fn cow_filesystem_test_override() -> Option<bool> {
281        match COW_ATOMIC_WRITE_TEST_OVERRIDE.load(Ordering::Relaxed) {
282            1 => Some(true),
283            2 => Some(false),
284            _ => None,
285        }
286    }
287
288    #[cfg(not(test))]
289    fn cow_filesystem_test_override() -> Option<bool> {
290        None
291    }
292
293    fn cow_filesystem_has_atomic_page_writes(
294        path: &Path,
295        file: &File,
296    ) -> Option<CowFilesystemKind> {
297        if let Some(allowed) = Self::cow_filesystem_test_override() {
298            return allowed.then_some(CowFilesystemKind::TestOverride);
299        }
300        Self::probe_cow_filesystem(path, file)
301    }
302
303    #[cfg(target_os = "linux")]
304    fn probe_cow_filesystem(path: &Path, file: &File) -> Option<CowFilesystemKind> {
305        let fs_type = linux_fstatfs_type(file)?;
306        match fs_type {
307            ZFS_SUPER_MAGIC => Some(CowFilesystemKind::Zfs),
308            BTRFS_SUPER_MAGIC => {
309                let mount_options = linux_mount_options_for_path(path)?;
310                let inode_flags = linux_inode_flags(file)?;
311                classify_cow_filesystem(fs_type, Some(&mount_options), Some(inode_flags))
312            }
313            _ => None,
314        }
315    }
316
317    #[cfg(not(target_os = "linux"))]
318    fn probe_cow_filesystem(_path: &Path, _file: &File) -> Option<CowFilesystemKind> {
319        None
320    }
321
322    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
323    /// the (key, marker) matrix:
324    ///
325    /// | Marker | Key supplied | Result                              |
326    /// |--------|--------------|-------------------------------------|
327    /// | yes    | yes          | Bind encryptor; validate key        |
328    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
329    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
330    /// | no     | no           | Plain pager — no binding needed     |
331    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
332        if self.page_count().unwrap_or(0) == 0 {
333            return self.bind_encryption_for_new();
334        }
335        let header_page = self.read_page_no_checksum(0)?;
336        let data = header_page.as_bytes();
337        let has_marker = reddb_file::paged_encryption_marker_present(data);
338
339        let key = self.config.encryption.clone();
340        match (has_marker, key) {
341            (true, Some(key)) => {
342                let header_bytes =
343                    reddb_file::paged_encryption_header_bytes(data).ok_or_else(|| {
344                        PagerError::InvalidDatabase("encryption header parse failed".to_string())
345                    })?;
346                let header = crate::storage::encryption::EncryptionHeader::from_bytes(header_bytes)
347                    .map_err(|e| {
348                        PagerError::InvalidDatabase(format!("encryption header parse failed: {e}"))
349                    })?;
350                if !header.validate(&key) {
351                    return Err(PagerError::InvalidKey);
352                }
353                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
354                self.encryption = Some((encryptor, header));
355                Ok(())
356            }
357            (true, None) => Err(PagerError::EncryptionRequired),
358            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
359            (false, None) => Ok(()),
360        }
361    }
362
363    /// New DB: if a key is configured, write the marker + header to
364    /// page 0 so subsequent opens detect encryption.
365    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
366        let Some(key) = self.config.encryption.clone() else {
367            return Ok(());
368        };
369        let header = crate::storage::encryption::EncryptionHeader::new(&key);
370        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
371
372        // Stamp the marker + header into page 0 if it's been
373        // initialised by `initialize()` already.
374        if self.page_count().unwrap_or(0) > 0 {
375            let mut page = self.read_page_no_checksum(0)?;
376            let data = page.as_bytes_mut();
377            let header_bytes = header.to_bytes();
378            reddb_file::write_paged_encryption_marker_and_header(data, &header_bytes)
379                .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
380            self.write_page_no_checksum(0, page)?;
381        }
382        self.encryption = Some((encryptor, header));
383        Ok(())
384    }
385
386    /// Open with default configuration
387    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
388        Self::open(path, PagerConfig::default())
389    }
390
391    /// Initialize a new database
392    fn initialize(&self) -> Result<(), PagerError> {
393        if self.config.read_only {
394            return Err(PagerError::ReadOnly);
395        }
396
397        // Create header page. Page ids 1 and 2 are reserved so fixed
398        // metadata/vault pages cannot be handed out to normal B-tree
399        // allocation before those subsystems are initialized.
400        let initial_page_count = 3;
401        let header_page = Page::new_header_page(initial_page_count);
402        self.header_write()?.page_count = initial_page_count;
403
404        // Write header and reserved pages so any scan over 0..page_count
405        // can read every allocated page in a brand-new database.
406        self.write_page_raw(0, &header_page)?;
407        let mut metadata_page = Page::new(PageType::Header, 1);
408        metadata_page.update_checksum();
409        self.write_page_raw(1, &metadata_page)?;
410        let mut vault_page = Page::new(PageType::Vault, 2);
411        vault_page.update_checksum();
412        self.write_page_raw(2, &vault_page)?;
413
414        // Sync to disk
415        self.sync()?;
416
417        Ok(())
418    }
419
420    /// Acquire a write lock on the header RwLock, mapping poison errors.
421    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
422        self.header.write().map_err(|_| PagerError::LockPoisoned)
423    }
424
425    /// Acquire a read lock on the header RwLock, mapping poison errors.
426    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
427        self.header.read().map_err(|_| PagerError::LockPoisoned)
428    }
429
430    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
431    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
432        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
433    }
434
435    /// Acquire a lock on the file Mutex, mapping poison errors.
436    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
437        self.file.lock().map_err(|_| PagerError::LockPoisoned)
438    }
439
440    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
441    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
442        self.header_dirty
443            .lock()
444            .map_err(|_| PagerError::LockPoisoned)
445    }
446
447    /// Load database header from page 0 (with shadow fallback)
448    fn load_header(&self) -> Result<(), PagerError> {
449        // Read page 0 — fall back to shadow if corrupted
450        let header_page = match self.read_page_raw(0) {
451            Ok(page) => {
452                // Verify magic bytes
453                if reddb_file::database_header_magic_matches(page.as_bytes()) {
454                    page
455                } else {
456                    // Page 0 corrupted — try shadow
457                    self.recover_header_from_shadow()?
458                }
459            }
460            Err(_) => self.recover_header_from_shadow()?,
461        };
462
463        let decoded_header = reddb_file::decode_database_header(header_page.as_bytes())
464            .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
465        let freelist_head = decoded_header.freelist_head;
466
467        {
468            let mut header = self.header_write()?;
469            *header = decoded_header;
470        }
471
472        // Initialize freelist
473        {
474            let mut freelist = self.freelist_write()?;
475            *freelist = FreeList::from_header(freelist_head, 0);
476        }
477
478        Ok(())
479    }
480
481    /// Write header page
482    ///
483    /// Note: This merges database header fields into the existing page 0 content
484    /// to preserve any additional data (like encryption headers) that may be stored there.
485    fn write_header(&self) -> Result<(), PagerError> {
486        if self.config.read_only {
487            return Err(PagerError::ReadOnly);
488        }
489
490        let header = self.header_read()?;
491
492        // Read existing page 0 to preserve any additional data (e.g., encryption header)
493        // First check cache, then fall back to disk
494        let mut page = if let Some(cached) = self.cache.get(0) {
495            cached
496        } else {
497            // Try to read from disk if file is large enough
498            let file = self.file_lock()?;
499            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
500            drop(file);
501
502            if len >= PAGE_SIZE as u64 {
503                self.read_page_raw(0)?
504            } else {
505                // File is new/empty, create fresh header page
506                Page::new(PageType::Header, 0)
507            }
508        };
509
510        let data = page.as_bytes_mut();
511
512        reddb_file::encode_database_header(data, &header)
513            .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
514
515        page.update_checksum();
516
517        // Write header shadow FIRST (so it's intact if main write is interrupted)
518        self.write_header_shadow(&page)?;
519
520        self.write_page_raw(0, &page)?;
521        *self.header_dirty_lock()? = false;
522
523        Ok(())
524    }
525
526    /// Read a page from disk (bypassing cache)
527    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
528        let mut file = self.file_lock()?;
529        let offset = (page_id as u64) * (PAGE_SIZE as u64);
530
531        file.seek(SeekFrom::Start(offset))?;
532
533        let mut buf = [0u8; PAGE_SIZE];
534        file.read_exact(&mut buf)?;
535
536        let page = Page::from_bytes(buf);
537
538        // Verify checksum if configured (including page 0)
539        if self.config.verify_checksums {
540            page.verify_checksum()?;
541        }
542
543        Ok(page)
544    }
545
546    /// Write a page to disk (bypassing cache)
547    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
548        if self.config.read_only {
549            return Err(PagerError::ReadOnly);
550        }
551
552        let mut file = self.file_lock()?;
553        let offset = (page_id as u64) * (PAGE_SIZE as u64);
554
555        file.seek(SeekFrom::Start(offset))?;
556        file.write_all(page.as_bytes())?;
557
558        Ok(())
559    }
560
561    /// Read a page (cache-aware)
562    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
563        // Check cache first
564        if let Some(page) = self.cache.get(page_id) {
565            return Ok(page);
566        }
567
568        // Cache miss - read from disk
569        let page = self.read_page_raw(page_id)?;
570
571        // Add to cache
572        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
573            // Evicted page was dirty, need to write it back
574            let evicted_id = dirty_page.page_id();
575            self.write_page_raw(evicted_id, &dirty_page)?;
576        }
577
578        Ok(page)
579    }
580
581    /// Read a page without verifying checksum (for encrypted pages)
582    ///
583    /// Use this when the page content has its own integrity protection
584    /// (e.g., AES-GCM authentication tag for encrypted pages).
585    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
586        // Check cache first
587        if let Some(page) = self.cache.get(page_id) {
588            return Ok(page);
589        }
590
591        // Cache miss - read from disk (skip checksum verification)
592        let mut file = self.file_lock()?;
593        let offset = (page_id as u64) * (PAGE_SIZE as u64);
594
595        file.seek(SeekFrom::Start(offset))?;
596
597        let mut buf = [0u8; PAGE_SIZE];
598        file.read_exact(&mut buf)?;
599        drop(file);
600
601        let page = Page::from_bytes(buf);
602
603        // Add to cache (no checksum verification)
604        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
605            // Evicted page was dirty, need to write it back
606            let evicted_id = dirty_page.page_id();
607            self.write_page_raw(evicted_id, &dirty_page)?;
608        }
609
610        Ok(page)
611    }
612
613    /// Write a page (cache-aware)
614    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
615        if self.config.read_only {
616            return Err(PagerError::ReadOnly);
617        }
618
619        // Update checksum
620        page.update_checksum();
621
622        // Add to cache and mark dirty. If the cache had to evict a
623        // dirty entry, write it through immediately — the evicted
624        // page will never be flushed otherwise (same bug fixed in
625        // `allocate_page`).
626        if let Some(dirty_page) = self.cache.insert(page_id, page) {
627            let evicted_id = dirty_page.page_id();
628            self.write_page_raw(evicted_id, &dirty_page)?;
629        }
630        self.cache.mark_dirty(page_id);
631
632        Ok(())
633    }
634
635    /// Read a page through the configured encryptor if any. Page 0
636    /// is always returned plaintext (it carries the encryption marker
637    /// + header). Callers that want raw cipher bytes can use
638    ///   `read_page_no_checksum` directly.
639    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
640        if page_id == 0 || self.encryption.is_none() {
641            return self.read_page(page_id);
642        }
643        let raw = self.read_page_no_checksum(page_id)?;
644        let (enc, _) = self
645            .encryption
646            .as_ref()
647            .expect("encryption presence checked above");
648        let plaintext = enc
649            .decrypt(page_id, raw.as_bytes())
650            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
651        let mut buf = [0u8; PAGE_SIZE];
652        let n = plaintext.len().min(PAGE_SIZE);
653        buf[..n].copy_from_slice(&plaintext[..n]);
654        Ok(Page::from_bytes(buf))
655    }
656
657    /// Write a page through the configured encryptor if any. Page 0
658    /// bypasses encryption and goes through the normal checksummed
659    /// path. Encrypted pages skip the checksum update because
660    /// AES-GCM's authentication tag is the integrity guarantee.
661    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
662        if page_id == 0 || self.encryption.is_none() {
663            return self.write_page(page_id, page);
664        }
665        const OVERHEAD: usize = 12 + 16; // nonce + GCM tag
666        let plaintext_len = PAGE_SIZE - OVERHEAD;
667        let plaintext = &page.as_bytes()[..plaintext_len];
668        let (enc, _) = self
669            .encryption
670            .as_ref()
671            .expect("encryption presence checked above");
672        let ciphertext = enc.encrypt(page_id, plaintext);
673        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
674        let mut buf = [0u8; PAGE_SIZE];
675        buf.copy_from_slice(&ciphertext);
676        let cipher_page = Page::from_bytes(buf);
677        self.write_page_no_checksum(page_id, cipher_page)
678    }
679
680    /// Write a page without updating checksum (for encrypted pages)
681    ///
682    /// Use this when the page content has its own integrity protection
683    /// (e.g., AES-GCM authentication tag for encrypted pages).
684    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
685        if self.config.read_only {
686            return Err(PagerError::ReadOnly);
687        }
688
689        // Add to cache and mark dirty (no checksum update). Same
690        // eviction-write-through guard as `write_page`.
691        if let Some(dirty_page) = self.cache.insert(page_id, page) {
692            let evicted_id = dirty_page.page_id();
693            self.write_page_raw(evicted_id, &dirty_page)?;
694        }
695        self.cache.mark_dirty(page_id);
696
697        Ok(())
698    }
699
700    /// Allocate a new page
701    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
702        if self.config.read_only {
703            return Err(PagerError::ReadOnly);
704        }
705
706        // Try to get from freelist first
707        let page_id = {
708            let mut freelist = self.freelist_write()?;
709            if let Some(id) = freelist.allocate() {
710                id
711            } else if freelist.trunk_head() != 0 {
712                let trunk_id = freelist.trunk_head();
713                drop(freelist);
714
715                let trunk = self.read_page(trunk_id).map_err(|e| match e {
716                    PagerError::PageNotFound(_) => {
717                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
718                    }
719                    other => other,
720                })?;
721
722                let mut freelist = self.freelist_write()?;
723                freelist
724                    .load_from_trunk(&trunk)
725                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
726                let id = freelist.allocate().ok_or_else(|| {
727                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
728                })?;
729
730                let mut header = self.header_write()?;
731                header.freelist_head = freelist.trunk_head();
732                *self.header_dirty_lock()? = true;
733
734                id
735            } else {
736                // No free pages, extend file
737                let mut header = self.header_write()?;
738                let id = header.page_count;
739                header.page_count += 1;
740                *self.header_dirty_lock()? = true;
741                id
742            }
743        };
744
745        let page = Page::new(page_type, page_id);
746
747        // Write to cache. The evicted page (if any) is dirty by
748        // definition — `cache.insert` only returns `Some` when it
749        // had to evict a dirty entry to make room. The previous
750        // version dropped that return value, which silently lost
751        // writes whenever a freshly-allocated page caused an LRU
752        // eviction. This shows up under heavy ingest as
753        // "B-tree insert error: Pager error: I/O error: failed to
754        // fill whole buffer" later, when something tries to read
755        // back the never-flushed page. Mirror `read_page`'s
756        // handling: write the evicted page through immediately so
757        // the on-disk image stays consistent.
758        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
759            let evicted_id = dirty_page.page_id();
760            self.write_page_raw(evicted_id, &dirty_page)?;
761        }
762        self.cache.mark_dirty(page_id);
763
764        Ok(page)
765    }
766
767    /// Reserve a contiguous extent of vector pages.
768    pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
769        if self.config.read_only {
770            return Err(PagerError::ReadOnly);
771        }
772        if n_pages == 0 {
773            return Err(PagerError::InvalidDatabase(
774                "contiguous extent must reserve at least one page".to_string(),
775            ));
776        }
777
778        let start_page = {
779            let mut header = self.header_write()?;
780            let start = header.page_count;
781            header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
782                PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
783            })?;
784            *self.header_dirty_lock()? = true;
785            start
786        };
787
788        for page_id in start_page..start_page + n_pages {
789            let mut page = Page::new(PageType::Vector, page_id);
790            page.update_checksum();
791            if let Some(dirty_page) = self.cache.insert(page_id, page) {
792                let evicted_id = dirty_page.page_id();
793                self.write_page_raw(evicted_id, &dirty_page)?;
794            }
795            self.cache.mark_dirty(page_id);
796        }
797
798        Ok(super::ExtentId {
799            start_page,
800            n_pages,
801        })
802    }
803
804    /// Free a page (return to freelist)
805    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
806        if self.config.read_only {
807            return Err(PagerError::ReadOnly);
808        }
809
810        // Remove from cache
811        self.cache.remove(page_id);
812
813        // Add to freelist
814        let mut freelist = self.freelist_write()?;
815        freelist.free(page_id);
816
817        *self.header_dirty_lock()? = true;
818
819        Ok(())
820    }
821
822    /// Get database header
823    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
824        Ok(self.header_read()?.clone())
825    }
826
827    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
828        Ok(self.header_read()?.physical)
829    }
830
831    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
832        if self.config.read_only {
833            return Err(PagerError::ReadOnly);
834        }
835
836        let mut header = self.header_write()?;
837        header.physical = physical;
838        *self.header_dirty_lock()? = true;
839        Ok(())
840    }
841
842    /// Get page count
843    pub fn page_count(&self) -> Result<u32, PagerError> {
844        Ok(self.header_read()?.page_count)
845    }
846
847    /// Attach a WAL writer to enforce WAL-first flush ordering.
848    ///
849    /// After this call, [`Pager::flush`] computes the maximum
850    /// `header.lsn` over all dirty pages and calls
851    /// `WalWriter::flush_until(max_lsn)` before any page is written
852    /// to the data file. This is the postgres rule: a page on disk
853    /// implies its WAL record is already durable on disk.
854    ///
855    /// Existing call sites that construct a Pager without a WAL
856    /// keep their previous behaviour (no LSN check) — wiring is
857    /// strictly opt-in.
858    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
859        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
860        *slot = Some(wal);
861    }
862
863    /// Detach the WAL writer (test / shutdown path).
864    pub fn clear_wal_writer(&self) {
865        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
866        *slot = None;
867    }
868
869    /// Has a WAL writer been attached?
870    pub fn has_wal_writer(&self) -> bool {
871        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
872    }
873
874    /// Flush all dirty pages to disk
875    pub fn flush(&self) -> Result<(), PagerError> {
876        if self.config.read_only {
877            return Ok(());
878        }
879
880        // Persist freelist to trunk pages when dirty
881        let trunks = {
882            let mut freelist = self.freelist_write()?;
883            if freelist.is_dirty() {
884                let mut header = self.header_write()?;
885                let trunks = freelist.flush_to_trunks(0, || {
886                    let id = header.page_count;
887                    header.page_count += 1;
888                    id
889                });
890                header.freelist_head = freelist.trunk_head();
891                *self.header_dirty_lock()? = true;
892                freelist.mark_clean();
893                trunks
894            } else {
895                Vec::new()
896            }
897        };
898
899        for trunk in trunks {
900            let page_id = trunk.page_id();
901            self.cache.insert(page_id, trunk);
902            self.cache.mark_dirty(page_id);
903        }
904
905        // Flush dirty pages from cache (through DWB if enabled)
906        let dirty_pages = self.cache.flush_dirty();
907        if !dirty_pages.is_empty() {
908            // WAL-FIRST: ensure every WAL record describing a dirty
909            // page is durable BEFORE the page itself reaches disk.
910            // Pages with `lsn == 0` are exempt (freelist trunks, header
911            // shadow pages, anything not produced by a WAL append).
912            let max_lsn = dirty_pages
913                .iter()
914                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
915                .max()
916                .unwrap_or(0);
917            if max_lsn > 0 {
918                if let Ok(slot) = self.wal.read() {
919                    if let Some(wal) = slot.as_ref() {
920                        let wal = Arc::clone(wal);
921                        // Drop the read lock before taking the WAL
922                        // mutex so an unrelated reader cannot block
923                        // the flush path.
924                        drop(slot);
925                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
926                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
927                    }
928                }
929            }
930            self.write_pages_through_dwb(&dirty_pages)?;
931        }
932
933        // Write header if dirty
934        if *self.header_dirty_lock()? {
935            self.write_header()?;
936        }
937
938        Ok(())
939    }
940
941    /// Sync file to disk (fsync)
942    pub fn sync(&self) -> Result<(), PagerError> {
943        self.flush()?;
944
945        let file = self.file_lock()?;
946        file.sync_all()?;
947
948        Ok(())
949    }
950
951    /// Get cache statistics
952    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
953        self.cache.stats()
954    }
955
956    /// Count dirty pages currently in the page cache.
957    pub fn dirty_page_count(&self) -> usize {
958        self.cache.dirty_count()
959    }
960
961    /// Estimated fraction of the page cache holding dirty pages.
962    /// Returned in `[0, 1]`. Used by the background writer to
963    /// decide when to kick in aggressive flushing.
964    pub fn dirty_fraction(&self) -> f64 {
965        let capacity = self.cache.capacity().max(1) as f64;
966        self.cache.dirty_count() as f64 / capacity
967    }
968
969    /// Flush up to `max` dirty pages from the cache. Returns the
970    /// number actually written. Background-writer entry point —
971    /// reuses the same persistence path as `flush()` but bounded.
972    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
973        if self.config.read_only || max == 0 {
974            return Ok(0);
975        }
976        let dirty_pages = self.cache.flush_some_dirty(max);
977        if dirty_pages.is_empty() {
978            return Ok(0);
979        }
980        let count = dirty_pages.len();
981        // WAL-first: every cached dirty page carries an LSN that the
982        // WAL must have already persisted. The full `flush()` path
983        // enforces this with `wal.flush(max_lsn)`; here we simply
984        // write through the pager — safe because callers only reach
985        // this path via the bgwriter, which runs asynchronously
986        // alongside normal commits that already respect WAL-first.
987        for (page_id, page) in dirty_pages {
988            self.write_page(page_id, page)?;
989        }
990        Ok(count)
991    }
992
993    /// Get database file path
994    pub fn path(&self) -> &Path {
995        &self.path
996    }
997
998    /// Check if database is read-only
999    pub fn is_read_only(&self) -> bool {
1000        self.config.read_only
1001    }
1002
1003    /// Get file size in bytes
1004    pub fn file_size(&self) -> Result<u64, PagerError> {
1005        let file = self.file_lock()?;
1006        Ok(file.metadata()?.len())
1007    }
1008
1009    /// Issue an OS-level read-ahead hint for `page_id`.
1010    ///
1011    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1012    /// cursor passes 50% of the current leaf, so the kernel fetches
1013    /// the next leaf page while CPU processes the remaining half of
1014    /// the current one. Failures are silent — a missed prefetch is a
1015    /// performance miss, never a correctness bug.
1016    pub fn prefetch_hint(&self, page_id: u32) {
1017        if let Ok(file) = self.file_lock() {
1018            let _ = crate::storage::btree::prefetch::prefetch_page(
1019                &file,
1020                page_id as u64,
1021                PAGE_SIZE as u32,
1022            );
1023        }
1024    }
1025
1026    // ── Corruption defense helpers ──────────────────────────────────
1027
1028    /// Path for the header shadow file
1029    fn shadow_path(db_path: &Path) -> PathBuf {
1030        reddb_file::layout::pager_header_shadow_path(db_path)
1031    }
1032
1033    /// Path for the metadata shadow file
1034    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1035        reddb_file::layout::pager_meta_shadow_path(db_path)
1036    }
1037
1038    /// Path for the double-write buffer file
1039    fn dwb_path(db_path: &Path) -> PathBuf {
1040        reddb_file::layout::pager_dwb_shadow_path(db_path)
1041    }
1042
1043    /// Open the double-write buffer file without truncating existing content.
1044    ///
1045    /// The file is intentionally preserved across restarts so recovery can
1046    /// consume any crash-leftover pages before the next write cycle clears it.
1047    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1048        Ok(OpenOptions::new()
1049            .read(true)
1050            .write(true)
1051            .create(true)
1052            .truncate(false)
1053            .open(Self::dwb_path(db_path))?)
1054    }
1055
1056    /// Clear the DWB in place while preserving the file path and handle.
1057    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1058        file.set_len(0)?;
1059        file.seek(SeekFrom::Start(0))?;
1060        file.sync_all()?;
1061        Ok(())
1062    }
1063
1064    /// Write a shadow copy of the header page.
1065    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1066        if self.config.read_only {
1067            return Ok(());
1068        }
1069        let shadow = Self::shadow_path(&self.path);
1070        let mut f = File::create(&shadow)?;
1071        f.write_all(page.as_bytes())?;
1072        f.sync_all()?;
1073        Ok(())
1074    }
1075
1076    /// Recover header from shadow file when page 0 is corrupted
1077    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1078        let shadow = Self::shadow_path(&self.path);
1079        if !shadow.exists() {
1080            return Err(PagerError::InvalidDatabase(
1081                "Page 0 corrupted and no header shadow found".into(),
1082            ));
1083        }
1084        let mut f = File::open(&shadow)?;
1085        let mut buf = [0u8; PAGE_SIZE];
1086        f.read_exact(&mut buf)?;
1087        let page = Page::from_bytes(buf);
1088
1089        // Verify shadow is valid
1090        if !reddb_file::database_header_magic_matches(page.as_bytes()) {
1091            return Err(PagerError::InvalidDatabase(
1092                "Header shadow also corrupted".into(),
1093            ));
1094        }
1095
1096        // Restore page 0 from shadow
1097        if !self.config.read_only {
1098            self.write_page_raw(0, &page)?;
1099            let file = self.file_lock()?;
1100            file.sync_all()?;
1101        }
1102
1103        Ok(page)
1104    }
1105
1106    /// Write a shadow copy of the metadata page.
1107    ///
1108    /// When the process-global `fold_pager_meta` policy is enabled (see
1109    /// [`crate::physical::fold_pager_meta_enabled`]) the shadow is suppressed:
1110    /// metadata is sourced exclusively from page 1 (plus its overflow chain).
1111    /// Any pre-existing `<data>-meta` file is also removed so a flipped flag
1112    /// cannot leave a stale shadow on disk. Reads still tolerate the sidecar
1113    /// when present so databases written before the flag flipped remain
1114    /// loadable.
1115    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1116        if self.config.read_only {
1117            return Ok(());
1118        }
1119        let shadow = Self::meta_shadow_path(&self.path);
1120        if crate::physical::fold_pager_meta_enabled() {
1121            // Best-effort cleanup of any prior shadow — a missing file is not
1122            // an error condition here.
1123            let _ = std::fs::remove_file(&shadow);
1124            return Ok(());
1125        }
1126        let mut f = File::create(&shadow)?;
1127        f.write_all(page.as_bytes())?;
1128        f.sync_all()?;
1129        Ok(())
1130    }
1131
1132    /// Recover metadata page from shadow file when page 1 is corrupted
1133    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1134        let shadow = Self::meta_shadow_path(&self.path);
1135        if !shadow.exists() {
1136            return Err(PagerError::InvalidDatabase(
1137                "Page 1 corrupted and no metadata shadow found".into(),
1138            ));
1139        }
1140        let mut f = File::open(&shadow)?;
1141        let mut buf = [0u8; PAGE_SIZE];
1142        f.read_exact(&mut buf)?;
1143        let page = Page::from_bytes(buf);
1144
1145        // Restore page 1 from shadow
1146        if !self.config.read_only {
1147            self.write_page_raw(1, &page)?;
1148            let file = self.file_lock()?;
1149            file.sync_all()?;
1150        }
1151
1152        Ok(page)
1153    }
1154
1155    /// Write pages through the double-write buffer for torn page protection.
1156    ///
1157    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1158    /// 2. fsync the DWB
1159    /// 3. Write all pages to their final locations in the .rdb file
1160    /// 4. Truncate the DWB (marks as consumed)
1161    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1162        if let Some(dwb_mutex) = &self.dwb_file {
1163            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1164
1165            let buf = reddb_file::encode_paged_dwb_frame(
1166                pages
1167                    .iter()
1168                    .map(|(page_id, page)| (*page_id, page.as_bytes())),
1169            );
1170
1171            // Write DWB and fsync
1172            dwb.seek(SeekFrom::Start(0))?;
1173            dwb.write_all(&buf)?;
1174            dwb.set_len(buf.len() as u64)?;
1175            dwb.sync_all()?;
1176
1177            // Now write pages to their final locations
1178            for (page_id, page) in pages {
1179                self.write_page_raw(*page_id, page)?;
1180            }
1181
1182            // Truncate DWB to mark as consumed
1183            Self::clear_dwb_file(&mut dwb)?;
1184
1185            Ok(())
1186        } else {
1187            // DWB disabled — write directly
1188            for (page_id, page) in pages {
1189                self.write_page_raw(*page_id, page)?;
1190            }
1191            Ok(())
1192        }
1193    }
1194
1195    /// Recover from double-write buffer after a crash.
1196    ///
1197    /// If the DWB file contains valid pages, they were written before the crash
1198    /// interrupted writing to the main file. Re-apply them.
1199    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1200        let dwb_path = Self::dwb_path(&self.path);
1201        if !dwb_path.exists() {
1202            return Ok(());
1203        }
1204
1205        if let Some(dwb_mutex) = &self.dwb_file {
1206            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1207            return self.recover_from_dwb_file(&mut file);
1208        }
1209
1210        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1211        self.recover_from_dwb_file(&mut file)
1212    }
1213
1214    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1215        file.seek(SeekFrom::Start(0))?;
1216        let len = file.metadata()?.len();
1217        let mut buf = vec![0u8; len as usize];
1218        file.read_exact(&mut buf)?;
1219
1220        let entries = match reddb_file::decode_paged_dwb_frame(&buf) {
1221            Ok(entries) => entries,
1222            Err(_) => return Self::clear_dwb_file(file),
1223        };
1224
1225        // DWB is valid — re-apply pages to main file
1226        for entry in entries {
1227            let page = Page::from_bytes(entry.page);
1228            self.write_page_raw(entry.page_id, &page)?;
1229        }
1230
1231        // Sync and clean up
1232        {
1233            let file = self.file_lock()?;
1234            file.sync_all()?;
1235        }
1236
1237        Self::clear_dwb_file(file)
1238    }
1239
1240    /// Write header and sync to disk (public for checkpointer).
1241    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1242        self.write_header()?;
1243        let file = self.file_lock()?;
1244        file.sync_all()?;
1245        Ok(())
1246    }
1247
1248    /// Set the checkpoint_in_progress flag in the header.
1249    pub fn set_checkpoint_in_progress(
1250        &self,
1251        in_progress: bool,
1252        target_lsn: u64,
1253    ) -> Result<(), PagerError> {
1254        let mut header = self.header_write()?;
1255        header.checkpoint_in_progress = in_progress;
1256        header.checkpoint_target_lsn = target_lsn;
1257        *self.header_dirty_lock()? = true;
1258        drop(header);
1259        self.write_header_and_sync()
1260    }
1261
1262    /// Update the checkpoint LSN and clear the in-progress flag.
1263    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1264        let mut header = self.header_write()?;
1265        header.checkpoint_lsn = lsn;
1266        header.checkpoint_in_progress = false;
1267        header.checkpoint_target_lsn = 0;
1268        *self.header_dirty_lock()? = true;
1269        drop(header);
1270        self.write_header_and_sync()
1271    }
1272}
1273
1274#[cfg(test)]
1275mod tests {
1276    use super::*;
1277
1278    fn temp_db_path(name: &str) -> PathBuf {
1279        std::env::temp_dir().join(format!(
1280            "reddb-pager-{}-{}-{}.rdb",
1281            name,
1282            std::process::id(),
1283            crate::utils::now_unix_nanos()
1284        ))
1285    }
1286
1287    #[test]
1288    fn open_refuses_future_database_version() {
1289        let path = temp_db_path("future-version");
1290        let pager = Pager::open_default(&path).unwrap();
1291        drop(pager);
1292
1293        let mut future_header = Page::new_header_page(1);
1294        reddb_file::set_database_header_version(
1295            future_header.as_bytes_mut(),
1296            reddb_file::PAGE_FILE_VERSION + 1,
1297        )
1298        .unwrap();
1299        future_header.update_checksum();
1300
1301        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1302        file.seek(SeekFrom::Start(0)).unwrap();
1303        file.write_all(future_header.as_bytes()).unwrap();
1304        file.sync_all().unwrap();
1305        drop(file);
1306
1307        let err = match Pager::open_default(&path) {
1308            Ok(_) => panic!("future database version should be rejected"),
1309            Err(err) => err,
1310        };
1311        match err {
1312            PagerError::InvalidDatabase(msg) => {
1313                assert!(msg.contains("newer than supported"));
1314            }
1315            other => panic!("expected InvalidDatabase, got {other:?}"),
1316        }
1317
1318        let _ = std::fs::remove_file(&path);
1319        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1320        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1321    }
1322}