Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3/// DWB file magic: "RDDW"
4const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7    /// Open or create a database file
8    pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9        let path = path.as_ref().to_path_buf();
10        let exists = path.exists();
11
12        if !exists && !config.create {
13            return Err(PagerError::InvalidDatabase(
14                "Database does not exist".into(),
15            ));
16        }
17
18        if !exists && config.read_only {
19            return Err(PagerError::InvalidDatabase(
20                "Cannot create read-only database".into(),
21            ));
22        }
23
24        // Open file
25        // Note: create requires write access, so disable it for read-only mode
26        let file = OpenOptions::new()
27            .read(true)
28            .write(!config.read_only)
29            .create(config.create && !config.read_only)
30            .open(&path)?;
31
32        // gh-892: diagnostic probe of the filesystem block size. If the
33        // compile-time 16 KiB PAGE_SIZE is not a multiple of the FS block
34        // size, page writes straddle FS blocks and incur read-modify-write
35        // amplification. Pure diagnostic — emitted once per open, never
36        // mutates the page size. `blksize()` is the fstat `st_blksize`.
37        #[cfg(unix)]
38        {
39            use std::os::unix::fs::MetadataExt;
40            if let Ok(meta) = file.metadata() {
41                let fs_block_size = meta.blksize();
42                if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
43                    tracing::warn!(
44                        page_size = PAGE_SIZE,
45                        fs_block_size,
46                        path = %path.display(),
47                        "database page size is not a multiple of the filesystem \
48                         block size; page writes will straddle FS blocks \
49                         (read-modify-write amplification). Diagnostic only — \
50                         the page size is unchanged."
51                    );
52                }
53            }
54        }
55
56        // Acquire file lock (exclusive for writes, shared for read-only)
57        let lock_file = if !config.read_only {
58            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
59            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
60            Some(lf)
61        } else {
62            let lf = OpenOptions::new().read(true).open(&path)?;
63            match lf.try_lock_shared() {
64                Ok(_) => Some(lf),
65                Err(_) => None,
66            }
67        };
68
69        // Open double-write buffer file.
70        //
71        // gh-478: when `fold_dwb_into_wal` is enabled the DWB sidecar is
72        // suppressed — torn pages are healed by replaying FullPageImage WAL
73        // records during recovery. Any pre-existing `-dwb` is removed so a
74        // flipped flag cannot leave a stale sidecar on disk.
75        let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
76        let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
77            let f = Self::open_dwb_file(&path)?;
78            Some(Mutex::new(f))
79        } else {
80            if fold_dwb && !config.read_only {
81                let _ = std::fs::remove_file(Self::dwb_path(&path));
82            }
83            None
84        };
85
86        let mut pager = Self {
87            path,
88            file: Mutex::new(file),
89            _lock_file: lock_file,
90            dwb_file,
91            cache: PageCache::new(config.cache_size),
92            freelist: RwLock::new(FreeList::new()),
93            header: RwLock::new(DatabaseHeader::default()),
94            config,
95            header_dirty: Mutex::new(false),
96            wal: RwLock::new(None),
97            encryption: None,
98        };
99
100        if exists {
101            // Recover from double-write buffer if needed
102            pager.recover_from_dwb()?;
103            // Load existing database (with header shadow fallback)
104            pager.load_header()?;
105            pager.bind_encryption_for_existing()?;
106        } else {
107            // Initialize new database
108            pager.initialize()?;
109            pager.bind_encryption_for_new()?;
110        }
111
112        Ok(pager)
113    }
114
115    /// gh-892 diagnostic predicate: returns `true` when the database
116    /// `page_size` is **not** an integer multiple of the filesystem's
117    /// reported block size (`st_blksize`), i.e. `page_size % fs_block_size
118    /// != 0`. A `fs_block_size` of `0` (probe unavailable / unknown) is
119    /// treated as aligned so a missing probe never produces a warning.
120    /// Pure function with no I/O so the warn decision is unit-testable.
121    pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
122        fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
123    }
124
125    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
126    /// the (key, marker) matrix:
127    ///
128    /// | Marker | Key supplied | Result                              |
129    /// |--------|--------------|-------------------------------------|
130    /// | yes    | yes          | Bind encryptor; validate key        |
131    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
132    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
133    /// | no     | no           | Plain pager — no binding needed     |
134    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
135        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
136        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
137
138        if self.page_count().unwrap_or(0) == 0 {
139            return self.bind_encryption_for_new();
140        }
141        let header_page = self.read_page_no_checksum(0)?;
142        let data = header_page.as_bytes();
143        let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
144            && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
145
146        let key = self.config.encryption.clone();
147        match (has_marker, key) {
148            (true, Some(key)) => {
149                let header_start = ENCRYPTION_MARKER_OFFSET + 4;
150                let header =
151                    crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
152                        .map_err(|e| {
153                            PagerError::InvalidDatabase(format!(
154                                "encryption header parse failed: {e}"
155                            ))
156                        })?;
157                if !header.validate(&key) {
158                    return Err(PagerError::InvalidKey);
159                }
160                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
161                self.encryption = Some((encryptor, header));
162                Ok(())
163            }
164            (true, None) => Err(PagerError::EncryptionRequired),
165            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
166            (false, None) => Ok(()),
167        }
168    }
169
170    /// New DB: if a key is configured, write the marker + header to
171    /// page 0 so subsequent opens detect encryption.
172    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
173        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
174        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
175
176        let Some(key) = self.config.encryption.clone() else {
177            return Ok(());
178        };
179        let header = crate::storage::encryption::EncryptionHeader::new(&key);
180        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
181
182        // Stamp the marker + header into page 0 if it's been
183        // initialised by `initialize()` already.
184        if self.page_count().unwrap_or(0) > 0 {
185            let mut page = self.read_page_no_checksum(0)?;
186            let data = page.as_bytes_mut();
187            data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
188                .copy_from_slice(ENCRYPTION_MARKER);
189            let header_bytes = header.to_bytes();
190            let header_start = ENCRYPTION_MARKER_OFFSET + 4;
191            data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
192            self.write_page_no_checksum(0, page)?;
193        }
194        self.encryption = Some((encryptor, header));
195        Ok(())
196    }
197
198    /// Open with default configuration
199    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
200        Self::open(path, PagerConfig::default())
201    }
202
203    /// Initialize a new database
204    fn initialize(&self) -> Result<(), PagerError> {
205        if self.config.read_only {
206            return Err(PagerError::ReadOnly);
207        }
208
209        // Create header page. Page ids 1 and 2 are reserved so fixed
210        // metadata/vault pages cannot be handed out to normal B-tree
211        // allocation before those subsystems are initialized.
212        let initial_page_count = 3;
213        let header_page = Page::new_header_page(initial_page_count);
214        self.header_write()?.page_count = initial_page_count;
215
216        // Write header and reserved pages so any scan over 0..page_count
217        // can read every allocated page in a brand-new database.
218        self.write_page_raw(0, &header_page)?;
219        let mut metadata_page = Page::new(PageType::Header, 1);
220        metadata_page.update_checksum();
221        self.write_page_raw(1, &metadata_page)?;
222        let mut vault_page = Page::new(PageType::Vault, 2);
223        vault_page.update_checksum();
224        self.write_page_raw(2, &vault_page)?;
225
226        // Sync to disk
227        self.sync()?;
228
229        Ok(())
230    }
231
232    /// Acquire a write lock on the header RwLock, mapping poison errors.
233    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
234        self.header.write().map_err(|_| PagerError::LockPoisoned)
235    }
236
237    /// Acquire a read lock on the header RwLock, mapping poison errors.
238    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
239        self.header.read().map_err(|_| PagerError::LockPoisoned)
240    }
241
242    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
243    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
244        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
245    }
246
247    /// Acquire a lock on the file Mutex, mapping poison errors.
248    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
249        self.file.lock().map_err(|_| PagerError::LockPoisoned)
250    }
251
252    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
253    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
254        self.header_dirty
255            .lock()
256            .map_err(|_| PagerError::LockPoisoned)
257    }
258
259    /// Load database header from page 0 (with shadow fallback)
260    fn load_header(&self) -> Result<(), PagerError> {
261        // Read page 0 — fall back to shadow if corrupted
262        let header_page = match self.read_page_raw(0) {
263            Ok(page) => {
264                // Verify magic bytes
265                let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
266                if magic == MAGIC_BYTES {
267                    page
268                } else {
269                    // Page 0 corrupted — try shadow
270                    self.recover_header_from_shadow()?
271                }
272            }
273            Err(_) => self.recover_header_from_shadow()?,
274        };
275
276        // Read header fields
277        let data = header_page.as_bytes();
278        let version = u32::from_le_bytes([
279            data[HEADER_SIZE + 4],
280            data[HEADER_SIZE + 5],
281            data[HEADER_SIZE + 6],
282            data[HEADER_SIZE + 7],
283        ]);
284
285        let page_size = u32::from_le_bytes([
286            data[HEADER_SIZE + 8],
287            data[HEADER_SIZE + 9],
288            data[HEADER_SIZE + 10],
289            data[HEADER_SIZE + 11],
290        ]);
291
292        if page_size != PAGE_SIZE as u32 {
293            return Err(PagerError::InvalidDatabase(format!(
294                "Unsupported page size: {}",
295                page_size
296            )));
297        }
298        if version > DB_VERSION {
299            return Err(PagerError::InvalidDatabase(format!(
300                "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
301            )));
302        }
303
304        let page_count = u32::from_le_bytes([
305            data[HEADER_SIZE + 12],
306            data[HEADER_SIZE + 13],
307            data[HEADER_SIZE + 14],
308            data[HEADER_SIZE + 15],
309        ]);
310
311        let freelist_head = u32::from_le_bytes([
312            data[HEADER_SIZE + 16],
313            data[HEADER_SIZE + 17],
314            data[HEADER_SIZE + 18],
315            data[HEADER_SIZE + 19],
316        ]);
317
318        let schema_version = u32::from_le_bytes([
319            data[HEADER_SIZE + 20],
320            data[HEADER_SIZE + 21],
321            data[HEADER_SIZE + 22],
322            data[HEADER_SIZE + 23],
323        ]);
324
325        let checkpoint_lsn = u64::from_le_bytes([
326            data[HEADER_SIZE + 24],
327            data[HEADER_SIZE + 25],
328            data[HEADER_SIZE + 26],
329            data[HEADER_SIZE + 27],
330            data[HEADER_SIZE + 28],
331            data[HEADER_SIZE + 29],
332            data[HEADER_SIZE + 30],
333            data[HEADER_SIZE + 31],
334        ]);
335        let physical_format_version = u32::from_le_bytes([
336            data[HEADER_SIZE + 32],
337            data[HEADER_SIZE + 33],
338            data[HEADER_SIZE + 34],
339            data[HEADER_SIZE + 35],
340        ]);
341        let physical_sequence = u64::from_le_bytes([
342            data[HEADER_SIZE + 36],
343            data[HEADER_SIZE + 37],
344            data[HEADER_SIZE + 38],
345            data[HEADER_SIZE + 39],
346            data[HEADER_SIZE + 40],
347            data[HEADER_SIZE + 41],
348            data[HEADER_SIZE + 42],
349            data[HEADER_SIZE + 43],
350        ]);
351        let manifest_root = u64::from_le_bytes([
352            data[HEADER_SIZE + 44],
353            data[HEADER_SIZE + 45],
354            data[HEADER_SIZE + 46],
355            data[HEADER_SIZE + 47],
356            data[HEADER_SIZE + 48],
357            data[HEADER_SIZE + 49],
358            data[HEADER_SIZE + 50],
359            data[HEADER_SIZE + 51],
360        ]);
361        let manifest_oldest_root = u64::from_le_bytes([
362            data[HEADER_SIZE + 52],
363            data[HEADER_SIZE + 53],
364            data[HEADER_SIZE + 54],
365            data[HEADER_SIZE + 55],
366            data[HEADER_SIZE + 56],
367            data[HEADER_SIZE + 57],
368            data[HEADER_SIZE + 58],
369            data[HEADER_SIZE + 59],
370        ]);
371        let free_set_root = u64::from_le_bytes([
372            data[HEADER_SIZE + 60],
373            data[HEADER_SIZE + 61],
374            data[HEADER_SIZE + 62],
375            data[HEADER_SIZE + 63],
376            data[HEADER_SIZE + 64],
377            data[HEADER_SIZE + 65],
378            data[HEADER_SIZE + 66],
379            data[HEADER_SIZE + 67],
380        ]);
381        let manifest_page = u32::from_le_bytes([
382            data[HEADER_SIZE + 68],
383            data[HEADER_SIZE + 69],
384            data[HEADER_SIZE + 70],
385            data[HEADER_SIZE + 71],
386        ]);
387        let manifest_checksum = u64::from_le_bytes([
388            data[HEADER_SIZE + 72],
389            data[HEADER_SIZE + 73],
390            data[HEADER_SIZE + 74],
391            data[HEADER_SIZE + 75],
392            data[HEADER_SIZE + 76],
393            data[HEADER_SIZE + 77],
394            data[HEADER_SIZE + 78],
395            data[HEADER_SIZE + 79],
396        ]);
397        let collection_roots_page = u32::from_le_bytes([
398            data[HEADER_SIZE + 80],
399            data[HEADER_SIZE + 81],
400            data[HEADER_SIZE + 82],
401            data[HEADER_SIZE + 83],
402        ]);
403        let collection_roots_checksum = u64::from_le_bytes([
404            data[HEADER_SIZE + 84],
405            data[HEADER_SIZE + 85],
406            data[HEADER_SIZE + 86],
407            data[HEADER_SIZE + 87],
408            data[HEADER_SIZE + 88],
409            data[HEADER_SIZE + 89],
410            data[HEADER_SIZE + 90],
411            data[HEADER_SIZE + 91],
412        ]);
413        let collection_root_count = u32::from_le_bytes([
414            data[HEADER_SIZE + 92],
415            data[HEADER_SIZE + 93],
416            data[HEADER_SIZE + 94],
417            data[HEADER_SIZE + 95],
418        ]);
419        let snapshot_count = u32::from_le_bytes([
420            data[HEADER_SIZE + 96],
421            data[HEADER_SIZE + 97],
422            data[HEADER_SIZE + 98],
423            data[HEADER_SIZE + 99],
424        ]);
425        let index_count = u32::from_le_bytes([
426            data[HEADER_SIZE + 100],
427            data[HEADER_SIZE + 101],
428            data[HEADER_SIZE + 102],
429            data[HEADER_SIZE + 103],
430        ]);
431        let catalog_collection_count = u32::from_le_bytes([
432            data[HEADER_SIZE + 104],
433            data[HEADER_SIZE + 105],
434            data[HEADER_SIZE + 106],
435            data[HEADER_SIZE + 107],
436        ]);
437        let catalog_total_entities = u64::from_le_bytes([
438            data[HEADER_SIZE + 108],
439            data[HEADER_SIZE + 109],
440            data[HEADER_SIZE + 110],
441            data[HEADER_SIZE + 111],
442            data[HEADER_SIZE + 112],
443            data[HEADER_SIZE + 113],
444            data[HEADER_SIZE + 114],
445            data[HEADER_SIZE + 115],
446        ]);
447        let export_count = u32::from_le_bytes([
448            data[HEADER_SIZE + 116],
449            data[HEADER_SIZE + 117],
450            data[HEADER_SIZE + 118],
451            data[HEADER_SIZE + 119],
452        ]);
453        let graph_projection_count = u32::from_le_bytes([
454            data[HEADER_SIZE + 120],
455            data[HEADER_SIZE + 121],
456            data[HEADER_SIZE + 122],
457            data[HEADER_SIZE + 123],
458        ]);
459        let analytics_job_count = u32::from_le_bytes([
460            data[HEADER_SIZE + 124],
461            data[HEADER_SIZE + 125],
462            data[HEADER_SIZE + 126],
463            data[HEADER_SIZE + 127],
464        ]);
465        let manifest_event_count = u32::from_le_bytes([
466            data[HEADER_SIZE + 128],
467            data[HEADER_SIZE + 129],
468            data[HEADER_SIZE + 130],
469            data[HEADER_SIZE + 131],
470        ]);
471        let registry_page = u32::from_le_bytes([
472            data[HEADER_SIZE + 132],
473            data[HEADER_SIZE + 133],
474            data[HEADER_SIZE + 134],
475            data[HEADER_SIZE + 135],
476        ]);
477        let registry_checksum = u64::from_le_bytes([
478            data[HEADER_SIZE + 136],
479            data[HEADER_SIZE + 137],
480            data[HEADER_SIZE + 138],
481            data[HEADER_SIZE + 139],
482            data[HEADER_SIZE + 140],
483            data[HEADER_SIZE + 141],
484            data[HEADER_SIZE + 142],
485            data[HEADER_SIZE + 143],
486        ]);
487        let recovery_page = u32::from_le_bytes([
488            data[HEADER_SIZE + 144],
489            data[HEADER_SIZE + 145],
490            data[HEADER_SIZE + 146],
491            data[HEADER_SIZE + 147],
492        ]);
493        let recovery_checksum = u64::from_le_bytes([
494            data[HEADER_SIZE + 148],
495            data[HEADER_SIZE + 149],
496            data[HEADER_SIZE + 150],
497            data[HEADER_SIZE + 151],
498            data[HEADER_SIZE + 152],
499            data[HEADER_SIZE + 153],
500            data[HEADER_SIZE + 154],
501            data[HEADER_SIZE + 155],
502        ]);
503        let catalog_page = u32::from_le_bytes([
504            data[HEADER_SIZE + 156],
505            data[HEADER_SIZE + 157],
506            data[HEADER_SIZE + 158],
507            data[HEADER_SIZE + 159],
508        ]);
509        let catalog_checksum = u64::from_le_bytes([
510            data[HEADER_SIZE + 160],
511            data[HEADER_SIZE + 161],
512            data[HEADER_SIZE + 162],
513            data[HEADER_SIZE + 163],
514            data[HEADER_SIZE + 164],
515            data[HEADER_SIZE + 165],
516            data[HEADER_SIZE + 166],
517            data[HEADER_SIZE + 167],
518        ]);
519        let metadata_state_page = u32::from_le_bytes([
520            data[HEADER_SIZE + 168],
521            data[HEADER_SIZE + 169],
522            data[HEADER_SIZE + 170],
523            data[HEADER_SIZE + 171],
524        ]);
525        let metadata_state_checksum = u64::from_le_bytes([
526            data[HEADER_SIZE + 172],
527            data[HEADER_SIZE + 173],
528            data[HEADER_SIZE + 174],
529            data[HEADER_SIZE + 175],
530            data[HEADER_SIZE + 176],
531            data[HEADER_SIZE + 177],
532            data[HEADER_SIZE + 178],
533            data[HEADER_SIZE + 179],
534        ]);
535        let vector_artifact_page = u32::from_le_bytes([
536            data[HEADER_SIZE + 180],
537            data[HEADER_SIZE + 181],
538            data[HEADER_SIZE + 182],
539            data[HEADER_SIZE + 183],
540        ]);
541        let vector_artifact_checksum = u64::from_le_bytes([
542            data[HEADER_SIZE + 184],
543            data[HEADER_SIZE + 185],
544            data[HEADER_SIZE + 186],
545            data[HEADER_SIZE + 187],
546            data[HEADER_SIZE + 188],
547            data[HEADER_SIZE + 189],
548            data[HEADER_SIZE + 190],
549            data[HEADER_SIZE + 191],
550        ]);
551
552        // Two-phase checkpoint fields (offset 192-200)
553        let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
554        let checkpoint_target_lsn = u64::from_le_bytes([
555            data[HEADER_SIZE + 193],
556            data[HEADER_SIZE + 194],
557            data[HEADER_SIZE + 195],
558            data[HEADER_SIZE + 196],
559            data[HEADER_SIZE + 197],
560            data[HEADER_SIZE + 198],
561            data[HEADER_SIZE + 199],
562            data[HEADER_SIZE + 200],
563        ]);
564
565        // Update header
566        {
567            let mut header = self.header_write()?;
568            header.version = version;
569            header.page_size = page_size;
570            header.page_count = page_count;
571            header.freelist_head = freelist_head;
572            header.schema_version = schema_version;
573            header.checkpoint_lsn = checkpoint_lsn;
574            header.checkpoint_in_progress = checkpoint_in_progress;
575            header.checkpoint_target_lsn = checkpoint_target_lsn;
576            header.physical = PhysicalFileHeader {
577                format_version: physical_format_version,
578                sequence: physical_sequence,
579                manifest_oldest_root,
580                manifest_root,
581                free_set_root,
582                manifest_page,
583                manifest_checksum,
584                collection_roots_page,
585                collection_roots_checksum,
586                collection_root_count,
587                snapshot_count,
588                index_count,
589                catalog_collection_count,
590                catalog_total_entities,
591                export_count,
592                graph_projection_count,
593                analytics_job_count,
594                manifest_event_count,
595                registry_page,
596                registry_checksum,
597                recovery_page,
598                recovery_checksum,
599                catalog_page,
600                catalog_checksum,
601                metadata_state_page,
602                metadata_state_checksum,
603                vector_artifact_page,
604                vector_artifact_checksum,
605            };
606        }
607
608        // Initialize freelist
609        {
610            let mut freelist = self.freelist_write()?;
611            *freelist = FreeList::from_header(freelist_head, 0);
612        }
613
614        Ok(())
615    }
616
617    /// Write header page
618    ///
619    /// Note: This merges database header fields into the existing page 0 content
620    /// to preserve any additional data (like encryption headers) that may be stored there.
621    fn write_header(&self) -> Result<(), PagerError> {
622        if self.config.read_only {
623            return Err(PagerError::ReadOnly);
624        }
625
626        let header = self.header_read()?;
627
628        // Read existing page 0 to preserve any additional data (e.g., encryption header)
629        // First check cache, then fall back to disk
630        let mut page = if let Some(cached) = self.cache.get(0) {
631            cached
632        } else {
633            // Try to read from disk if file is large enough
634            let file = self.file_lock()?;
635            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
636            drop(file);
637
638            if len >= PAGE_SIZE as u64 {
639                self.read_page_raw(0)?
640            } else {
641                // File is new/empty, create fresh header page
642                Page::new(PageType::Header, 0)
643            }
644        };
645
646        let data = page.as_bytes_mut();
647
648        // Write magic
649        data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
650
651        // Write fields (at fixed offsets in the DB header area)
652        data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
653        data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
654        data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
655        data[HEADER_SIZE + 16..HEADER_SIZE + 20]
656            .copy_from_slice(&header.freelist_head.to_le_bytes());
657        data[HEADER_SIZE + 20..HEADER_SIZE + 24]
658            .copy_from_slice(&header.schema_version.to_le_bytes());
659        data[HEADER_SIZE + 24..HEADER_SIZE + 32]
660            .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
661        data[HEADER_SIZE + 32..HEADER_SIZE + 36]
662            .copy_from_slice(&header.physical.format_version.to_le_bytes());
663        data[HEADER_SIZE + 36..HEADER_SIZE + 44]
664            .copy_from_slice(&header.physical.sequence.to_le_bytes());
665        data[HEADER_SIZE + 44..HEADER_SIZE + 52]
666            .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
667        data[HEADER_SIZE + 52..HEADER_SIZE + 60]
668            .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
669        data[HEADER_SIZE + 60..HEADER_SIZE + 68]
670            .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
671        data[HEADER_SIZE + 68..HEADER_SIZE + 72]
672            .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
673        data[HEADER_SIZE + 72..HEADER_SIZE + 80]
674            .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
675        data[HEADER_SIZE + 80..HEADER_SIZE + 84]
676            .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
677        data[HEADER_SIZE + 84..HEADER_SIZE + 92]
678            .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
679        data[HEADER_SIZE + 92..HEADER_SIZE + 96]
680            .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
681        data[HEADER_SIZE + 96..HEADER_SIZE + 100]
682            .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
683        data[HEADER_SIZE + 100..HEADER_SIZE + 104]
684            .copy_from_slice(&header.physical.index_count.to_le_bytes());
685        data[HEADER_SIZE + 104..HEADER_SIZE + 108]
686            .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
687        data[HEADER_SIZE + 108..HEADER_SIZE + 116]
688            .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
689        data[HEADER_SIZE + 116..HEADER_SIZE + 120]
690            .copy_from_slice(&header.physical.export_count.to_le_bytes());
691        data[HEADER_SIZE + 120..HEADER_SIZE + 124]
692            .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
693        data[HEADER_SIZE + 124..HEADER_SIZE + 128]
694            .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
695        data[HEADER_SIZE + 128..HEADER_SIZE + 132]
696            .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
697        data[HEADER_SIZE + 132..HEADER_SIZE + 136]
698            .copy_from_slice(&header.physical.registry_page.to_le_bytes());
699        data[HEADER_SIZE + 136..HEADER_SIZE + 144]
700            .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
701        data[HEADER_SIZE + 144..HEADER_SIZE + 148]
702            .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
703        data[HEADER_SIZE + 148..HEADER_SIZE + 156]
704            .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
705        data[HEADER_SIZE + 156..HEADER_SIZE + 160]
706            .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
707        data[HEADER_SIZE + 160..HEADER_SIZE + 168]
708            .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
709        data[HEADER_SIZE + 168..HEADER_SIZE + 172]
710            .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
711        data[HEADER_SIZE + 172..HEADER_SIZE + 180]
712            .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
713        data[HEADER_SIZE + 180..HEADER_SIZE + 184]
714            .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
715        data[HEADER_SIZE + 184..HEADER_SIZE + 192]
716            .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
717
718        // Two-phase checkpoint fields (offset 192-200)
719        data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
720        data[HEADER_SIZE + 193..HEADER_SIZE + 201]
721            .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
722
723        page.update_checksum();
724
725        // Write header shadow FIRST (so it's intact if main write is interrupted)
726        self.write_header_shadow(&page)?;
727
728        self.write_page_raw(0, &page)?;
729        *self.header_dirty_lock()? = false;
730
731        Ok(())
732    }
733
734    /// Read a page from disk (bypassing cache)
735    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
736        let mut file = self.file_lock()?;
737        let offset = (page_id as u64) * (PAGE_SIZE as u64);
738
739        file.seek(SeekFrom::Start(offset))?;
740
741        let mut buf = [0u8; PAGE_SIZE];
742        file.read_exact(&mut buf)?;
743
744        let page = Page::from_bytes(buf);
745
746        // Verify checksum if configured (including page 0)
747        if self.config.verify_checksums {
748            page.verify_checksum()?;
749        }
750
751        Ok(page)
752    }
753
754    /// Write a page to disk (bypassing cache)
755    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
756        if self.config.read_only {
757            return Err(PagerError::ReadOnly);
758        }
759
760        let mut file = self.file_lock()?;
761        let offset = (page_id as u64) * (PAGE_SIZE as u64);
762
763        file.seek(SeekFrom::Start(offset))?;
764        file.write_all(page.as_bytes())?;
765
766        Ok(())
767    }
768
769    /// Read a page (cache-aware)
770    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
771        // Check cache first
772        if let Some(page) = self.cache.get(page_id) {
773            return Ok(page);
774        }
775
776        // Cache miss - read from disk
777        let page = self.read_page_raw(page_id)?;
778
779        // Add to cache
780        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
781            // Evicted page was dirty, need to write it back
782            let evicted_id = dirty_page.page_id();
783            self.write_page_raw(evicted_id, &dirty_page)?;
784        }
785
786        Ok(page)
787    }
788
789    /// Read a page without verifying checksum (for encrypted pages)
790    ///
791    /// Use this when the page content has its own integrity protection
792    /// (e.g., AES-GCM authentication tag for encrypted pages).
793    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
794        // Check cache first
795        if let Some(page) = self.cache.get(page_id) {
796            return Ok(page);
797        }
798
799        // Cache miss - read from disk (skip checksum verification)
800        let mut file = self.file_lock()?;
801        let offset = (page_id as u64) * (PAGE_SIZE as u64);
802
803        file.seek(SeekFrom::Start(offset))?;
804
805        let mut buf = [0u8; PAGE_SIZE];
806        file.read_exact(&mut buf)?;
807        drop(file);
808
809        let page = Page::from_bytes(buf);
810
811        // Add to cache (no checksum verification)
812        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
813            // Evicted page was dirty, need to write it back
814            let evicted_id = dirty_page.page_id();
815            self.write_page_raw(evicted_id, &dirty_page)?;
816        }
817
818        Ok(page)
819    }
820
821    /// Write a page (cache-aware)
822    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
823        if self.config.read_only {
824            return Err(PagerError::ReadOnly);
825        }
826
827        // Update checksum
828        page.update_checksum();
829
830        // Add to cache and mark dirty. If the cache had to evict a
831        // dirty entry, write it through immediately — the evicted
832        // page will never be flushed otherwise (same bug fixed in
833        // `allocate_page`).
834        if let Some(dirty_page) = self.cache.insert(page_id, page) {
835            let evicted_id = dirty_page.page_id();
836            self.write_page_raw(evicted_id, &dirty_page)?;
837        }
838        self.cache.mark_dirty(page_id);
839
840        Ok(())
841    }
842
843    /// Read a page through the configured encryptor if any. Page 0
844    /// is always returned plaintext (it carries the encryption marker
845    /// + header). Callers that want raw cipher bytes can use
846    ///   `read_page_no_checksum` directly.
847    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
848        if page_id == 0 || self.encryption.is_none() {
849            return self.read_page(page_id);
850        }
851        let raw = self.read_page_no_checksum(page_id)?;
852        let (enc, _) = self
853            .encryption
854            .as_ref()
855            .expect("encryption presence checked above");
856        let plaintext = enc
857            .decrypt(page_id, raw.as_bytes())
858            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
859        let mut buf = [0u8; PAGE_SIZE];
860        let n = plaintext.len().min(PAGE_SIZE);
861        buf[..n].copy_from_slice(&plaintext[..n]);
862        Ok(Page::from_bytes(buf))
863    }
864
865    /// Write a page through the configured encryptor if any. Page 0
866    /// bypasses encryption and goes through the normal checksummed
867    /// path. Encrypted pages skip the checksum update because
868    /// AES-GCM's authentication tag is the integrity guarantee.
869    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
870        if page_id == 0 || self.encryption.is_none() {
871            return self.write_page(page_id, page);
872        }
873        const OVERHEAD: usize = 12 + 16; // nonce + GCM tag
874        let plaintext_len = PAGE_SIZE - OVERHEAD;
875        let plaintext = &page.as_bytes()[..plaintext_len];
876        let (enc, _) = self
877            .encryption
878            .as_ref()
879            .expect("encryption presence checked above");
880        let ciphertext = enc.encrypt(page_id, plaintext);
881        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
882        let mut buf = [0u8; PAGE_SIZE];
883        buf.copy_from_slice(&ciphertext);
884        let cipher_page = Page::from_bytes(buf);
885        self.write_page_no_checksum(page_id, cipher_page)
886    }
887
888    /// Write a page without updating checksum (for encrypted pages)
889    ///
890    /// Use this when the page content has its own integrity protection
891    /// (e.g., AES-GCM authentication tag for encrypted pages).
892    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
893        if self.config.read_only {
894            return Err(PagerError::ReadOnly);
895        }
896
897        // Add to cache and mark dirty (no checksum update). Same
898        // eviction-write-through guard as `write_page`.
899        if let Some(dirty_page) = self.cache.insert(page_id, page) {
900            let evicted_id = dirty_page.page_id();
901            self.write_page_raw(evicted_id, &dirty_page)?;
902        }
903        self.cache.mark_dirty(page_id);
904
905        Ok(())
906    }
907
908    /// Allocate a new page
909    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
910        if self.config.read_only {
911            return Err(PagerError::ReadOnly);
912        }
913
914        // Try to get from freelist first
915        let page_id = {
916            let mut freelist = self.freelist_write()?;
917            if let Some(id) = freelist.allocate() {
918                id
919            } else if freelist.trunk_head() != 0 {
920                let trunk_id = freelist.trunk_head();
921                drop(freelist);
922
923                let trunk = self.read_page(trunk_id).map_err(|e| match e {
924                    PagerError::PageNotFound(_) => {
925                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
926                    }
927                    other => other,
928                })?;
929
930                let mut freelist = self.freelist_write()?;
931                freelist
932                    .load_from_trunk(&trunk)
933                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
934                let id = freelist.allocate().ok_or_else(|| {
935                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
936                })?;
937
938                let mut header = self.header_write()?;
939                header.freelist_head = freelist.trunk_head();
940                *self.header_dirty_lock()? = true;
941
942                id
943            } else {
944                // No free pages, extend file
945                let mut header = self.header_write()?;
946                let id = header.page_count;
947                header.page_count += 1;
948                *self.header_dirty_lock()? = true;
949                id
950            }
951        };
952
953        let page = Page::new(page_type, page_id);
954
955        // Write to cache. The evicted page (if any) is dirty by
956        // definition — `cache.insert` only returns `Some` when it
957        // had to evict a dirty entry to make room. The previous
958        // version dropped that return value, which silently lost
959        // writes whenever a freshly-allocated page caused an LRU
960        // eviction. This shows up under heavy ingest as
961        // "B-tree insert error: Pager error: I/O error: failed to
962        // fill whole buffer" later, when something tries to read
963        // back the never-flushed page. Mirror `read_page`'s
964        // handling: write the evicted page through immediately so
965        // the on-disk image stays consistent.
966        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
967            let evicted_id = dirty_page.page_id();
968            self.write_page_raw(evicted_id, &dirty_page)?;
969        }
970        self.cache.mark_dirty(page_id);
971
972        Ok(page)
973    }
974
975    /// Reserve a contiguous extent of vector pages.
976    pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
977        if self.config.read_only {
978            return Err(PagerError::ReadOnly);
979        }
980        if n_pages == 0 {
981            return Err(PagerError::InvalidDatabase(
982                "contiguous extent must reserve at least one page".to_string(),
983            ));
984        }
985
986        let start_page = {
987            let mut header = self.header_write()?;
988            let start = header.page_count;
989            header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
990                PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
991            })?;
992            *self.header_dirty_lock()? = true;
993            start
994        };
995
996        for page_id in start_page..start_page + n_pages {
997            let mut page = Page::new(PageType::Vector, page_id);
998            page.update_checksum();
999            if let Some(dirty_page) = self.cache.insert(page_id, page) {
1000                let evicted_id = dirty_page.page_id();
1001                self.write_page_raw(evicted_id, &dirty_page)?;
1002            }
1003            self.cache.mark_dirty(page_id);
1004        }
1005
1006        Ok(super::ExtentId {
1007            start_page,
1008            n_pages,
1009        })
1010    }
1011
1012    /// Free a page (return to freelist)
1013    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
1014        if self.config.read_only {
1015            return Err(PagerError::ReadOnly);
1016        }
1017
1018        // Remove from cache
1019        self.cache.remove(page_id);
1020
1021        // Add to freelist
1022        let mut freelist = self.freelist_write()?;
1023        freelist.free(page_id);
1024
1025        *self.header_dirty_lock()? = true;
1026
1027        Ok(())
1028    }
1029
1030    /// Get database header
1031    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
1032        Ok(self.header_read()?.clone())
1033    }
1034
1035    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
1036        Ok(self.header_read()?.physical)
1037    }
1038
1039    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
1040        if self.config.read_only {
1041            return Err(PagerError::ReadOnly);
1042        }
1043
1044        let mut header = self.header_write()?;
1045        header.physical = physical;
1046        *self.header_dirty_lock()? = true;
1047        Ok(())
1048    }
1049
1050    /// Get page count
1051    pub fn page_count(&self) -> Result<u32, PagerError> {
1052        Ok(self.header_read()?.page_count)
1053    }
1054
1055    /// Attach a WAL writer to enforce WAL-first flush ordering.
1056    ///
1057    /// After this call, [`Pager::flush`] computes the maximum
1058    /// `header.lsn` over all dirty pages and calls
1059    /// `WalWriter::flush_until(max_lsn)` before any page is written
1060    /// to the data file. This is the postgres rule: a page on disk
1061    /// implies its WAL record is already durable on disk.
1062    ///
1063    /// Existing call sites that construct a Pager without a WAL
1064    /// keep their previous behaviour (no LSN check) — wiring is
1065    /// strictly opt-in.
1066    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
1067        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1068        *slot = Some(wal);
1069    }
1070
1071    /// Detach the WAL writer (test / shutdown path).
1072    pub fn clear_wal_writer(&self) {
1073        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1074        *slot = None;
1075    }
1076
1077    /// Has a WAL writer been attached?
1078    pub fn has_wal_writer(&self) -> bool {
1079        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1080    }
1081
1082    /// Flush all dirty pages to disk
1083    pub fn flush(&self) -> Result<(), PagerError> {
1084        if self.config.read_only {
1085            return Ok(());
1086        }
1087
1088        // Persist freelist to trunk pages when dirty
1089        let trunks = {
1090            let mut freelist = self.freelist_write()?;
1091            if freelist.is_dirty() {
1092                let mut header = self.header_write()?;
1093                let trunks = freelist.flush_to_trunks(0, || {
1094                    let id = header.page_count;
1095                    header.page_count += 1;
1096                    id
1097                });
1098                header.freelist_head = freelist.trunk_head();
1099                *self.header_dirty_lock()? = true;
1100                freelist.mark_clean();
1101                trunks
1102            } else {
1103                Vec::new()
1104            }
1105        };
1106
1107        for trunk in trunks {
1108            let page_id = trunk.page_id();
1109            self.cache.insert(page_id, trunk);
1110            self.cache.mark_dirty(page_id);
1111        }
1112
1113        // Flush dirty pages from cache (through DWB if enabled)
1114        let dirty_pages = self.cache.flush_dirty();
1115        if !dirty_pages.is_empty() {
1116            // WAL-FIRST: ensure every WAL record describing a dirty
1117            // page is durable BEFORE the page itself reaches disk.
1118            // Pages with `lsn == 0` are exempt (freelist trunks, header
1119            // shadow pages, anything not produced by a WAL append).
1120            let max_lsn = dirty_pages
1121                .iter()
1122                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1123                .max()
1124                .unwrap_or(0);
1125            if max_lsn > 0 {
1126                if let Ok(slot) = self.wal.read() {
1127                    if let Some(wal) = slot.as_ref() {
1128                        let wal = Arc::clone(wal);
1129                        // Drop the read lock before taking the WAL
1130                        // mutex so an unrelated reader cannot block
1131                        // the flush path.
1132                        drop(slot);
1133                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1134                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1135                    }
1136                }
1137            }
1138            self.write_pages_through_dwb(&dirty_pages)?;
1139        }
1140
1141        // Write header if dirty
1142        if *self.header_dirty_lock()? {
1143            self.write_header()?;
1144        }
1145
1146        Ok(())
1147    }
1148
1149    /// Sync file to disk (fsync)
1150    pub fn sync(&self) -> Result<(), PagerError> {
1151        self.flush()?;
1152
1153        let file = self.file_lock()?;
1154        file.sync_all()?;
1155
1156        Ok(())
1157    }
1158
1159    /// Get cache statistics
1160    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1161        self.cache.stats()
1162    }
1163
1164    /// Count dirty pages currently in the page cache.
1165    pub fn dirty_page_count(&self) -> usize {
1166        self.cache.dirty_count()
1167    }
1168
1169    /// Estimated fraction of the page cache holding dirty pages.
1170    /// Returned in `[0, 1]`. Used by the background writer to
1171    /// decide when to kick in aggressive flushing.
1172    pub fn dirty_fraction(&self) -> f64 {
1173        let capacity = self.cache.capacity().max(1) as f64;
1174        self.cache.dirty_count() as f64 / capacity
1175    }
1176
1177    /// Flush up to `max` dirty pages from the cache. Returns the
1178    /// number actually written. Background-writer entry point —
1179    /// reuses the same persistence path as `flush()` but bounded.
1180    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1181        if self.config.read_only || max == 0 {
1182            return Ok(0);
1183        }
1184        let dirty_pages = self.cache.flush_some_dirty(max);
1185        if dirty_pages.is_empty() {
1186            return Ok(0);
1187        }
1188        let count = dirty_pages.len();
1189        // WAL-first: every cached dirty page carries an LSN that the
1190        // WAL must have already persisted. The full `flush()` path
1191        // enforces this with `wal.flush(max_lsn)`; here we simply
1192        // write through the pager — safe because callers only reach
1193        // this path via the bgwriter, which runs asynchronously
1194        // alongside normal commits that already respect WAL-first.
1195        for (page_id, page) in dirty_pages {
1196            self.write_page(page_id, page)?;
1197        }
1198        Ok(count)
1199    }
1200
1201    /// Get database file path
1202    pub fn path(&self) -> &Path {
1203        &self.path
1204    }
1205
1206    /// Check if database is read-only
1207    pub fn is_read_only(&self) -> bool {
1208        self.config.read_only
1209    }
1210
1211    /// Get file size in bytes
1212    pub fn file_size(&self) -> Result<u64, PagerError> {
1213        let file = self.file_lock()?;
1214        Ok(file.metadata()?.len())
1215    }
1216
1217    /// Issue an OS-level read-ahead hint for `page_id`.
1218    ///
1219    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1220    /// cursor passes 50% of the current leaf, so the kernel fetches
1221    /// the next leaf page while CPU processes the remaining half of
1222    /// the current one. Failures are silent — a missed prefetch is a
1223    /// performance miss, never a correctness bug.
1224    pub fn prefetch_hint(&self, page_id: u32) {
1225        if let Ok(file) = self.file_lock() {
1226            let _ = crate::storage::btree::prefetch::prefetch_page(
1227                &file,
1228                page_id as u64,
1229                PAGE_SIZE as u32,
1230            );
1231        }
1232    }
1233
1234    // ── Corruption defense helpers ──────────────────────────────────
1235
1236    /// Path for the header shadow file
1237    fn shadow_path(db_path: &Path) -> PathBuf {
1238        let mut p = db_path.to_path_buf().into_os_string();
1239        p.push("-hdr");
1240        PathBuf::from(p)
1241    }
1242
1243    /// Path for the metadata shadow file
1244    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1245        let mut p = db_path.to_path_buf().into_os_string();
1246        p.push("-meta");
1247        PathBuf::from(p)
1248    }
1249
1250    /// Path for the double-write buffer file
1251    fn dwb_path(db_path: &Path) -> PathBuf {
1252        let mut p = db_path.to_path_buf().into_os_string();
1253        p.push("-dwb");
1254        PathBuf::from(p)
1255    }
1256
1257    /// Open the double-write buffer file without truncating existing content.
1258    ///
1259    /// The file is intentionally preserved across restarts so recovery can
1260    /// consume any crash-leftover pages before the next write cycle clears it.
1261    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1262        Ok(OpenOptions::new()
1263            .read(true)
1264            .write(true)
1265            .create(true)
1266            .truncate(false)
1267            .open(Self::dwb_path(db_path))?)
1268    }
1269
1270    /// Clear the DWB in place while preserving the file path and handle.
1271    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1272        file.set_len(0)?;
1273        file.seek(SeekFrom::Start(0))?;
1274        file.sync_all()?;
1275        Ok(())
1276    }
1277
1278    /// Write a shadow copy of the header page to .rdb-hdr
1279    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1280        if self.config.read_only {
1281            return Ok(());
1282        }
1283        let shadow = Self::shadow_path(&self.path);
1284        let mut f = File::create(&shadow)?;
1285        f.write_all(page.as_bytes())?;
1286        f.sync_all()?;
1287        Ok(())
1288    }
1289
1290    /// Recover header from shadow file when page 0 is corrupted
1291    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1292        let shadow = Self::shadow_path(&self.path);
1293        if !shadow.exists() {
1294            return Err(PagerError::InvalidDatabase(
1295                "Page 0 corrupted and no header shadow found".into(),
1296            ));
1297        }
1298        let mut f = File::open(&shadow)?;
1299        let mut buf = [0u8; PAGE_SIZE];
1300        f.read_exact(&mut buf)?;
1301        let page = Page::from_bytes(buf);
1302
1303        // Verify shadow is valid
1304        let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1305        if magic != MAGIC_BYTES {
1306            return Err(PagerError::InvalidDatabase(
1307                "Header shadow also corrupted".into(),
1308            ));
1309        }
1310
1311        // Restore page 0 from shadow
1312        if !self.config.read_only {
1313            self.write_page_raw(0, &page)?;
1314            let file = self.file_lock()?;
1315            file.sync_all()?;
1316        }
1317
1318        Ok(page)
1319    }
1320
1321    /// Write a shadow copy of the metadata page to .rdb-meta.
1322    ///
1323    /// When the process-global `fold_pager_meta` policy is enabled (see
1324    /// [`crate::physical::fold_pager_meta_enabled`]) the shadow is suppressed:
1325    /// metadata is sourced exclusively from page 1 (plus its overflow chain).
1326    /// Any pre-existing `<data>-meta` file is also removed so a flipped flag
1327    /// cannot leave a stale shadow on disk. Reads still tolerate the sidecar
1328    /// when present so databases written before the flag flipped remain
1329    /// loadable.
1330    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1331        if self.config.read_only {
1332            return Ok(());
1333        }
1334        let shadow = Self::meta_shadow_path(&self.path);
1335        if crate::physical::fold_pager_meta_enabled() {
1336            // Best-effort cleanup of any prior shadow — a missing file is not
1337            // an error condition here.
1338            let _ = std::fs::remove_file(&shadow);
1339            return Ok(());
1340        }
1341        let mut f = File::create(&shadow)?;
1342        f.write_all(page.as_bytes())?;
1343        f.sync_all()?;
1344        Ok(())
1345    }
1346
1347    /// Recover metadata page from shadow file when page 1 is corrupted
1348    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1349        let shadow = Self::meta_shadow_path(&self.path);
1350        if !shadow.exists() {
1351            return Err(PagerError::InvalidDatabase(
1352                "Page 1 corrupted and no metadata shadow found".into(),
1353            ));
1354        }
1355        let mut f = File::open(&shadow)?;
1356        let mut buf = [0u8; PAGE_SIZE];
1357        f.read_exact(&mut buf)?;
1358        let page = Page::from_bytes(buf);
1359
1360        // Restore page 1 from shadow
1361        if !self.config.read_only {
1362            self.write_page_raw(1, &page)?;
1363            let file = self.file_lock()?;
1364            file.sync_all()?;
1365        }
1366
1367        Ok(page)
1368    }
1369
1370    /// Write pages through the double-write buffer for torn page protection.
1371    ///
1372    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1373    /// 2. fsync the DWB
1374    /// 3. Write all pages to their final locations in the .rdb file
1375    /// 4. Truncate the DWB (marks as consumed)
1376    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1377        if let Some(dwb_mutex) = &self.dwb_file {
1378            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1379
1380            // Build DWB content: [magic:4][count:u32][checksum:u32][pages...]
1381            // Each page entry: [page_id:u32][page_data:4096]
1382            let entry_size = 4 + PAGE_SIZE; // page_id + data
1383            let header_len = 4 + 4 + 4; // magic + count + checksum
1384            let total = header_len + pages.len() * entry_size;
1385            let mut buf = Vec::with_capacity(total);
1386
1387            // Header
1388            buf.extend_from_slice(&DWB_MAGIC);
1389            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1390            buf.extend_from_slice(&[0u8; 4]); // placeholder for checksum
1391
1392            // Page entries
1393            for (page_id, page) in pages {
1394                buf.extend_from_slice(&page_id.to_le_bytes());
1395                buf.extend_from_slice(page.as_bytes());
1396            }
1397
1398            // Compute and write checksum over all data after the header
1399            let checksum = super::super::crc32::crc32(&buf[header_len..]);
1400            buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1401
1402            // Write DWB and fsync
1403            dwb.seek(SeekFrom::Start(0))?;
1404            dwb.write_all(&buf)?;
1405            dwb.set_len(buf.len() as u64)?;
1406            dwb.sync_all()?;
1407
1408            // Now write pages to their final locations
1409            for (page_id, page) in pages {
1410                self.write_page_raw(*page_id, page)?;
1411            }
1412
1413            // Truncate DWB to mark as consumed
1414            Self::clear_dwb_file(&mut dwb)?;
1415
1416            Ok(())
1417        } else {
1418            // DWB disabled — write directly
1419            for (page_id, page) in pages {
1420                self.write_page_raw(*page_id, page)?;
1421            }
1422            Ok(())
1423        }
1424    }
1425
1426    /// Recover from double-write buffer after a crash.
1427    ///
1428    /// If the DWB file contains valid pages, they were written before the crash
1429    /// interrupted writing to the main file. Re-apply them.
1430    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1431        let dwb_path = Self::dwb_path(&self.path);
1432        if !dwb_path.exists() {
1433            return Ok(());
1434        }
1435
1436        if let Some(dwb_mutex) = &self.dwb_file {
1437            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1438            return self.recover_from_dwb_file(&mut file);
1439        }
1440
1441        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1442        self.recover_from_dwb_file(&mut file)
1443    }
1444
1445    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1446        file.seek(SeekFrom::Start(0))?;
1447        let len = file.metadata()?.len();
1448        if len < 12 {
1449            // Empty or incomplete header — keep the DWB file but clear stale bytes.
1450            return Self::clear_dwb_file(file);
1451        }
1452
1453        let mut buf = vec![0u8; len as usize];
1454        file.read_exact(&mut buf)?;
1455
1456        // Verify magic
1457        if buf[0..4] != DWB_MAGIC {
1458            // Not a valid DWB — clear it in place so the same file can be reused.
1459            return Self::clear_dwb_file(file);
1460        }
1461
1462        let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1463        let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1464
1465        let header_len = 12;
1466        let entry_size = 4 + PAGE_SIZE;
1467        let expected_len = header_len + count * entry_size;
1468
1469        if buf.len() < expected_len {
1470            // Incomplete DWB write — discard it in place.
1471            return Self::clear_dwb_file(file);
1472        }
1473
1474        // Verify checksum
1475        let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1476        if computed != stored_checksum {
1477            // Corrupted DWB — discard it in place.
1478            return Self::clear_dwb_file(file);
1479        }
1480
1481        // DWB is valid — re-apply pages to main file
1482        let mut offset = header_len;
1483        for _ in 0..count {
1484            let page_id = u32::from_le_bytes([
1485                buf[offset],
1486                buf[offset + 1],
1487                buf[offset + 2],
1488                buf[offset + 3],
1489            ]);
1490            offset += 4;
1491
1492            let mut page_data = [0u8; PAGE_SIZE];
1493            page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1494            offset += PAGE_SIZE;
1495
1496            let page = Page::from_bytes(page_data);
1497            self.write_page_raw(page_id, &page)?;
1498        }
1499
1500        // Sync and clean up
1501        {
1502            let file = self.file_lock()?;
1503            file.sync_all()?;
1504        }
1505
1506        Self::clear_dwb_file(file)
1507    }
1508
1509    /// Write header and sync to disk (public for checkpointer).
1510    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1511        self.write_header()?;
1512        let file = self.file_lock()?;
1513        file.sync_all()?;
1514        Ok(())
1515    }
1516
1517    /// Set the checkpoint_in_progress flag in the header.
1518    pub fn set_checkpoint_in_progress(
1519        &self,
1520        in_progress: bool,
1521        target_lsn: u64,
1522    ) -> Result<(), PagerError> {
1523        let mut header = self.header_write()?;
1524        header.checkpoint_in_progress = in_progress;
1525        header.checkpoint_target_lsn = target_lsn;
1526        *self.header_dirty_lock()? = true;
1527        drop(header);
1528        self.write_header_and_sync()
1529    }
1530
1531    /// Update the checkpoint LSN and clear the in-progress flag.
1532    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1533        let mut header = self.header_write()?;
1534        header.checkpoint_lsn = lsn;
1535        header.checkpoint_in_progress = false;
1536        header.checkpoint_target_lsn = 0;
1537        *self.header_dirty_lock()? = true;
1538        drop(header);
1539        self.write_header_and_sync()
1540    }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use super::*;
1546
1547    fn temp_db_path(name: &str) -> PathBuf {
1548        std::env::temp_dir().join(format!(
1549            "reddb-pager-{}-{}-{}.rdb",
1550            name,
1551            std::process::id(),
1552            crate::utils::now_unix_nanos()
1553        ))
1554    }
1555
1556    #[test]
1557    fn open_refuses_future_database_version() {
1558        let path = temp_db_path("future-version");
1559        let pager = Pager::open_default(&path).unwrap();
1560        drop(pager);
1561
1562        let mut future_header = Page::new_header_page(1);
1563        future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1564            .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1565        future_header.update_checksum();
1566
1567        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1568        file.seek(SeekFrom::Start(0)).unwrap();
1569        file.write_all(future_header.as_bytes()).unwrap();
1570        file.sync_all().unwrap();
1571        drop(file);
1572
1573        let err = match Pager::open_default(&path) {
1574            Ok(_) => panic!("future database version should be rejected"),
1575            Err(err) => err,
1576        };
1577        match err {
1578            PagerError::InvalidDatabase(msg) => {
1579                assert!(msg.contains("newer than supported"));
1580            }
1581            other => panic!("expected InvalidDatabase, got {other:?}"),
1582        }
1583
1584        let _ = std::fs::remove_file(&path);
1585        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1586        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1587    }
1588}