Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3/// DWB file magic: "RDDW"
4const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7    /// Open or create a database file
8    pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9        let path = path.as_ref().to_path_buf();
10        let exists = path.exists();
11
12        if !exists && !config.create {
13            return Err(PagerError::InvalidDatabase(
14                "Database does not exist".into(),
15            ));
16        }
17
18        if !exists && config.read_only {
19            return Err(PagerError::InvalidDatabase(
20                "Cannot create read-only database".into(),
21            ));
22        }
23
24        // Open file
25        // Note: create requires write access, so disable it for read-only mode
26        let file = OpenOptions::new()
27            .read(true)
28            .write(!config.read_only)
29            .create(config.create && !config.read_only)
30            .open(&path)?;
31
32        // Acquire file lock (exclusive for writes, shared for read-only)
33        let lock_file = if !config.read_only {
34            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
35            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
36            Some(lf)
37        } else {
38            let lf = OpenOptions::new().read(true).open(&path)?;
39            match lf.try_lock_shared() {
40                Ok(_) => Some(lf),
41                Err(_) => None,
42            }
43        };
44
45        // Open double-write buffer file.
46        //
47        // gh-478: when `fold_dwb_into_wal` is enabled the DWB sidecar is
48        // suppressed — torn pages are healed by replaying FullPageImage WAL
49        // records during recovery. Any pre-existing `-dwb` is removed so a
50        // flipped flag cannot leave a stale sidecar on disk.
51        let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
52        let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
53            let f = Self::open_dwb_file(&path)?;
54            Some(Mutex::new(f))
55        } else {
56            if fold_dwb && !config.read_only {
57                let _ = std::fs::remove_file(Self::dwb_path(&path));
58            }
59            None
60        };
61
62        let mut pager = Self {
63            path,
64            file: Mutex::new(file),
65            _lock_file: lock_file,
66            dwb_file,
67            cache: PageCache::new(config.cache_size),
68            freelist: RwLock::new(FreeList::new()),
69            header: RwLock::new(DatabaseHeader::default()),
70            config,
71            header_dirty: Mutex::new(false),
72            wal: RwLock::new(None),
73            encryption: None,
74        };
75
76        if exists {
77            // Recover from double-write buffer if needed
78            pager.recover_from_dwb()?;
79            // Load existing database (with header shadow fallback)
80            pager.load_header()?;
81            pager.bind_encryption_for_existing()?;
82        } else {
83            // Initialize new database
84            pager.initialize()?;
85            pager.bind_encryption_for_new()?;
86        }
87
88        Ok(pager)
89    }
90
91    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
92    /// the (key, marker) matrix:
93    ///
94    /// | Marker | Key supplied | Result                              |
95    /// |--------|--------------|-------------------------------------|
96    /// | yes    | yes          | Bind encryptor; validate key        |
97    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
98    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
99    /// | no     | no           | Plain pager — no binding needed     |
100    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
101        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
102        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
103
104        if self.page_count().unwrap_or(0) == 0 {
105            return self.bind_encryption_for_new();
106        }
107        let header_page = self.read_page_no_checksum(0)?;
108        let data = header_page.as_bytes();
109        let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
110            && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
111
112        let key = self.config.encryption.clone();
113        match (has_marker, key) {
114            (true, Some(key)) => {
115                let header_start = ENCRYPTION_MARKER_OFFSET + 4;
116                let header =
117                    crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
118                        .map_err(|e| {
119                            PagerError::InvalidDatabase(format!(
120                                "encryption header parse failed: {e}"
121                            ))
122                        })?;
123                if !header.validate(&key) {
124                    return Err(PagerError::InvalidKey);
125                }
126                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
127                self.encryption = Some((encryptor, header));
128                Ok(())
129            }
130            (true, None) => Err(PagerError::EncryptionRequired),
131            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
132            (false, None) => Ok(()),
133        }
134    }
135
136    /// New DB: if a key is configured, write the marker + header to
137    /// page 0 so subsequent opens detect encryption.
138    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
139        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
140        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
141
142        let Some(key) = self.config.encryption.clone() else {
143            return Ok(());
144        };
145        let header = crate::storage::encryption::EncryptionHeader::new(&key);
146        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
147
148        // Stamp the marker + header into page 0 if it's been
149        // initialised by `initialize()` already.
150        if self.page_count().unwrap_or(0) > 0 {
151            let mut page = self.read_page_no_checksum(0)?;
152            let data = page.as_bytes_mut();
153            data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
154                .copy_from_slice(ENCRYPTION_MARKER);
155            let header_bytes = header.to_bytes();
156            let header_start = ENCRYPTION_MARKER_OFFSET + 4;
157            data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
158            self.write_page_no_checksum(0, page)?;
159        }
160        self.encryption = Some((encryptor, header));
161        Ok(())
162    }
163
164    /// Open with default configuration
165    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
166        Self::open(path, PagerConfig::default())
167    }
168
169    /// Initialize a new database
170    fn initialize(&self) -> Result<(), PagerError> {
171        if self.config.read_only {
172            return Err(PagerError::ReadOnly);
173        }
174
175        // Create header page. Page ids 1 and 2 are reserved so fixed
176        // metadata/vault pages cannot be handed out to normal B-tree
177        // allocation before those subsystems are initialized.
178        let initial_page_count = 3;
179        let header_page = Page::new_header_page(initial_page_count);
180        self.header_write()?.page_count = initial_page_count;
181
182        // Write header and reserved pages so any scan over 0..page_count
183        // can read every allocated page in a brand-new database.
184        self.write_page_raw(0, &header_page)?;
185        let mut metadata_page = Page::new(PageType::Header, 1);
186        metadata_page.update_checksum();
187        self.write_page_raw(1, &metadata_page)?;
188        let mut vault_page = Page::new(PageType::Vault, 2);
189        vault_page.update_checksum();
190        self.write_page_raw(2, &vault_page)?;
191
192        // Sync to disk
193        self.sync()?;
194
195        Ok(())
196    }
197
198    /// Acquire a write lock on the header RwLock, mapping poison errors.
199    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
200        self.header.write().map_err(|_| PagerError::LockPoisoned)
201    }
202
203    /// Acquire a read lock on the header RwLock, mapping poison errors.
204    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
205        self.header.read().map_err(|_| PagerError::LockPoisoned)
206    }
207
208    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
209    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
210        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
211    }
212
213    /// Acquire a lock on the file Mutex, mapping poison errors.
214    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
215        self.file.lock().map_err(|_| PagerError::LockPoisoned)
216    }
217
218    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
219    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
220        self.header_dirty
221            .lock()
222            .map_err(|_| PagerError::LockPoisoned)
223    }
224
225    /// Load database header from page 0 (with shadow fallback)
226    fn load_header(&self) -> Result<(), PagerError> {
227        // Read page 0 — fall back to shadow if corrupted
228        let header_page = match self.read_page_raw(0) {
229            Ok(page) => {
230                // Verify magic bytes
231                let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
232                if magic == MAGIC_BYTES {
233                    page
234                } else {
235                    // Page 0 corrupted — try shadow
236                    self.recover_header_from_shadow()?
237                }
238            }
239            Err(_) => self.recover_header_from_shadow()?,
240        };
241
242        // Read header fields
243        let data = header_page.as_bytes();
244        let version = u32::from_le_bytes([
245            data[HEADER_SIZE + 4],
246            data[HEADER_SIZE + 5],
247            data[HEADER_SIZE + 6],
248            data[HEADER_SIZE + 7],
249        ]);
250
251        let page_size = u32::from_le_bytes([
252            data[HEADER_SIZE + 8],
253            data[HEADER_SIZE + 9],
254            data[HEADER_SIZE + 10],
255            data[HEADER_SIZE + 11],
256        ]);
257
258        if page_size != PAGE_SIZE as u32 {
259            return Err(PagerError::InvalidDatabase(format!(
260                "Unsupported page size: {}",
261                page_size
262            )));
263        }
264        if version > DB_VERSION {
265            return Err(PagerError::InvalidDatabase(format!(
266                "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
267            )));
268        }
269
270        let page_count = u32::from_le_bytes([
271            data[HEADER_SIZE + 12],
272            data[HEADER_SIZE + 13],
273            data[HEADER_SIZE + 14],
274            data[HEADER_SIZE + 15],
275        ]);
276
277        let freelist_head = u32::from_le_bytes([
278            data[HEADER_SIZE + 16],
279            data[HEADER_SIZE + 17],
280            data[HEADER_SIZE + 18],
281            data[HEADER_SIZE + 19],
282        ]);
283
284        let schema_version = u32::from_le_bytes([
285            data[HEADER_SIZE + 20],
286            data[HEADER_SIZE + 21],
287            data[HEADER_SIZE + 22],
288            data[HEADER_SIZE + 23],
289        ]);
290
291        let checkpoint_lsn = u64::from_le_bytes([
292            data[HEADER_SIZE + 24],
293            data[HEADER_SIZE + 25],
294            data[HEADER_SIZE + 26],
295            data[HEADER_SIZE + 27],
296            data[HEADER_SIZE + 28],
297            data[HEADER_SIZE + 29],
298            data[HEADER_SIZE + 30],
299            data[HEADER_SIZE + 31],
300        ]);
301        let physical_format_version = u32::from_le_bytes([
302            data[HEADER_SIZE + 32],
303            data[HEADER_SIZE + 33],
304            data[HEADER_SIZE + 34],
305            data[HEADER_SIZE + 35],
306        ]);
307        let physical_sequence = u64::from_le_bytes([
308            data[HEADER_SIZE + 36],
309            data[HEADER_SIZE + 37],
310            data[HEADER_SIZE + 38],
311            data[HEADER_SIZE + 39],
312            data[HEADER_SIZE + 40],
313            data[HEADER_SIZE + 41],
314            data[HEADER_SIZE + 42],
315            data[HEADER_SIZE + 43],
316        ]);
317        let manifest_root = u64::from_le_bytes([
318            data[HEADER_SIZE + 44],
319            data[HEADER_SIZE + 45],
320            data[HEADER_SIZE + 46],
321            data[HEADER_SIZE + 47],
322            data[HEADER_SIZE + 48],
323            data[HEADER_SIZE + 49],
324            data[HEADER_SIZE + 50],
325            data[HEADER_SIZE + 51],
326        ]);
327        let manifest_oldest_root = u64::from_le_bytes([
328            data[HEADER_SIZE + 52],
329            data[HEADER_SIZE + 53],
330            data[HEADER_SIZE + 54],
331            data[HEADER_SIZE + 55],
332            data[HEADER_SIZE + 56],
333            data[HEADER_SIZE + 57],
334            data[HEADER_SIZE + 58],
335            data[HEADER_SIZE + 59],
336        ]);
337        let free_set_root = u64::from_le_bytes([
338            data[HEADER_SIZE + 60],
339            data[HEADER_SIZE + 61],
340            data[HEADER_SIZE + 62],
341            data[HEADER_SIZE + 63],
342            data[HEADER_SIZE + 64],
343            data[HEADER_SIZE + 65],
344            data[HEADER_SIZE + 66],
345            data[HEADER_SIZE + 67],
346        ]);
347        let manifest_page = u32::from_le_bytes([
348            data[HEADER_SIZE + 68],
349            data[HEADER_SIZE + 69],
350            data[HEADER_SIZE + 70],
351            data[HEADER_SIZE + 71],
352        ]);
353        let manifest_checksum = u64::from_le_bytes([
354            data[HEADER_SIZE + 72],
355            data[HEADER_SIZE + 73],
356            data[HEADER_SIZE + 74],
357            data[HEADER_SIZE + 75],
358            data[HEADER_SIZE + 76],
359            data[HEADER_SIZE + 77],
360            data[HEADER_SIZE + 78],
361            data[HEADER_SIZE + 79],
362        ]);
363        let collection_roots_page = u32::from_le_bytes([
364            data[HEADER_SIZE + 80],
365            data[HEADER_SIZE + 81],
366            data[HEADER_SIZE + 82],
367            data[HEADER_SIZE + 83],
368        ]);
369        let collection_roots_checksum = u64::from_le_bytes([
370            data[HEADER_SIZE + 84],
371            data[HEADER_SIZE + 85],
372            data[HEADER_SIZE + 86],
373            data[HEADER_SIZE + 87],
374            data[HEADER_SIZE + 88],
375            data[HEADER_SIZE + 89],
376            data[HEADER_SIZE + 90],
377            data[HEADER_SIZE + 91],
378        ]);
379        let collection_root_count = u32::from_le_bytes([
380            data[HEADER_SIZE + 92],
381            data[HEADER_SIZE + 93],
382            data[HEADER_SIZE + 94],
383            data[HEADER_SIZE + 95],
384        ]);
385        let snapshot_count = u32::from_le_bytes([
386            data[HEADER_SIZE + 96],
387            data[HEADER_SIZE + 97],
388            data[HEADER_SIZE + 98],
389            data[HEADER_SIZE + 99],
390        ]);
391        let index_count = u32::from_le_bytes([
392            data[HEADER_SIZE + 100],
393            data[HEADER_SIZE + 101],
394            data[HEADER_SIZE + 102],
395            data[HEADER_SIZE + 103],
396        ]);
397        let catalog_collection_count = u32::from_le_bytes([
398            data[HEADER_SIZE + 104],
399            data[HEADER_SIZE + 105],
400            data[HEADER_SIZE + 106],
401            data[HEADER_SIZE + 107],
402        ]);
403        let catalog_total_entities = u64::from_le_bytes([
404            data[HEADER_SIZE + 108],
405            data[HEADER_SIZE + 109],
406            data[HEADER_SIZE + 110],
407            data[HEADER_SIZE + 111],
408            data[HEADER_SIZE + 112],
409            data[HEADER_SIZE + 113],
410            data[HEADER_SIZE + 114],
411            data[HEADER_SIZE + 115],
412        ]);
413        let export_count = u32::from_le_bytes([
414            data[HEADER_SIZE + 116],
415            data[HEADER_SIZE + 117],
416            data[HEADER_SIZE + 118],
417            data[HEADER_SIZE + 119],
418        ]);
419        let graph_projection_count = u32::from_le_bytes([
420            data[HEADER_SIZE + 120],
421            data[HEADER_SIZE + 121],
422            data[HEADER_SIZE + 122],
423            data[HEADER_SIZE + 123],
424        ]);
425        let analytics_job_count = u32::from_le_bytes([
426            data[HEADER_SIZE + 124],
427            data[HEADER_SIZE + 125],
428            data[HEADER_SIZE + 126],
429            data[HEADER_SIZE + 127],
430        ]);
431        let manifest_event_count = u32::from_le_bytes([
432            data[HEADER_SIZE + 128],
433            data[HEADER_SIZE + 129],
434            data[HEADER_SIZE + 130],
435            data[HEADER_SIZE + 131],
436        ]);
437        let registry_page = u32::from_le_bytes([
438            data[HEADER_SIZE + 132],
439            data[HEADER_SIZE + 133],
440            data[HEADER_SIZE + 134],
441            data[HEADER_SIZE + 135],
442        ]);
443        let registry_checksum = u64::from_le_bytes([
444            data[HEADER_SIZE + 136],
445            data[HEADER_SIZE + 137],
446            data[HEADER_SIZE + 138],
447            data[HEADER_SIZE + 139],
448            data[HEADER_SIZE + 140],
449            data[HEADER_SIZE + 141],
450            data[HEADER_SIZE + 142],
451            data[HEADER_SIZE + 143],
452        ]);
453        let recovery_page = u32::from_le_bytes([
454            data[HEADER_SIZE + 144],
455            data[HEADER_SIZE + 145],
456            data[HEADER_SIZE + 146],
457            data[HEADER_SIZE + 147],
458        ]);
459        let recovery_checksum = u64::from_le_bytes([
460            data[HEADER_SIZE + 148],
461            data[HEADER_SIZE + 149],
462            data[HEADER_SIZE + 150],
463            data[HEADER_SIZE + 151],
464            data[HEADER_SIZE + 152],
465            data[HEADER_SIZE + 153],
466            data[HEADER_SIZE + 154],
467            data[HEADER_SIZE + 155],
468        ]);
469        let catalog_page = u32::from_le_bytes([
470            data[HEADER_SIZE + 156],
471            data[HEADER_SIZE + 157],
472            data[HEADER_SIZE + 158],
473            data[HEADER_SIZE + 159],
474        ]);
475        let catalog_checksum = u64::from_le_bytes([
476            data[HEADER_SIZE + 160],
477            data[HEADER_SIZE + 161],
478            data[HEADER_SIZE + 162],
479            data[HEADER_SIZE + 163],
480            data[HEADER_SIZE + 164],
481            data[HEADER_SIZE + 165],
482            data[HEADER_SIZE + 166],
483            data[HEADER_SIZE + 167],
484        ]);
485        let metadata_state_page = u32::from_le_bytes([
486            data[HEADER_SIZE + 168],
487            data[HEADER_SIZE + 169],
488            data[HEADER_SIZE + 170],
489            data[HEADER_SIZE + 171],
490        ]);
491        let metadata_state_checksum = u64::from_le_bytes([
492            data[HEADER_SIZE + 172],
493            data[HEADER_SIZE + 173],
494            data[HEADER_SIZE + 174],
495            data[HEADER_SIZE + 175],
496            data[HEADER_SIZE + 176],
497            data[HEADER_SIZE + 177],
498            data[HEADER_SIZE + 178],
499            data[HEADER_SIZE + 179],
500        ]);
501        let vector_artifact_page = u32::from_le_bytes([
502            data[HEADER_SIZE + 180],
503            data[HEADER_SIZE + 181],
504            data[HEADER_SIZE + 182],
505            data[HEADER_SIZE + 183],
506        ]);
507        let vector_artifact_checksum = u64::from_le_bytes([
508            data[HEADER_SIZE + 184],
509            data[HEADER_SIZE + 185],
510            data[HEADER_SIZE + 186],
511            data[HEADER_SIZE + 187],
512            data[HEADER_SIZE + 188],
513            data[HEADER_SIZE + 189],
514            data[HEADER_SIZE + 190],
515            data[HEADER_SIZE + 191],
516        ]);
517
518        // Two-phase checkpoint fields (offset 192-200)
519        let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
520        let checkpoint_target_lsn = u64::from_le_bytes([
521            data[HEADER_SIZE + 193],
522            data[HEADER_SIZE + 194],
523            data[HEADER_SIZE + 195],
524            data[HEADER_SIZE + 196],
525            data[HEADER_SIZE + 197],
526            data[HEADER_SIZE + 198],
527            data[HEADER_SIZE + 199],
528            data[HEADER_SIZE + 200],
529        ]);
530
531        // Update header
532        {
533            let mut header = self.header_write()?;
534            header.version = version;
535            header.page_size = page_size;
536            header.page_count = page_count;
537            header.freelist_head = freelist_head;
538            header.schema_version = schema_version;
539            header.checkpoint_lsn = checkpoint_lsn;
540            header.checkpoint_in_progress = checkpoint_in_progress;
541            header.checkpoint_target_lsn = checkpoint_target_lsn;
542            header.physical = PhysicalFileHeader {
543                format_version: physical_format_version,
544                sequence: physical_sequence,
545                manifest_oldest_root,
546                manifest_root,
547                free_set_root,
548                manifest_page,
549                manifest_checksum,
550                collection_roots_page,
551                collection_roots_checksum,
552                collection_root_count,
553                snapshot_count,
554                index_count,
555                catalog_collection_count,
556                catalog_total_entities,
557                export_count,
558                graph_projection_count,
559                analytics_job_count,
560                manifest_event_count,
561                registry_page,
562                registry_checksum,
563                recovery_page,
564                recovery_checksum,
565                catalog_page,
566                catalog_checksum,
567                metadata_state_page,
568                metadata_state_checksum,
569                vector_artifact_page,
570                vector_artifact_checksum,
571            };
572        }
573
574        // Initialize freelist
575        {
576            let mut freelist = self.freelist_write()?;
577            *freelist = FreeList::from_header(freelist_head, 0);
578        }
579
580        Ok(())
581    }
582
583    /// Write header page
584    ///
585    /// Note: This merges database header fields into the existing page 0 content
586    /// to preserve any additional data (like encryption headers) that may be stored there.
587    fn write_header(&self) -> Result<(), PagerError> {
588        if self.config.read_only {
589            return Err(PagerError::ReadOnly);
590        }
591
592        let header = self.header_read()?;
593
594        // Read existing page 0 to preserve any additional data (e.g., encryption header)
595        // First check cache, then fall back to disk
596        let mut page = if let Some(cached) = self.cache.get(0) {
597            cached
598        } else {
599            // Try to read from disk if file is large enough
600            let file = self.file_lock()?;
601            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
602            drop(file);
603
604            if len >= PAGE_SIZE as u64 {
605                self.read_page_raw(0)?
606            } else {
607                // File is new/empty, create fresh header page
608                Page::new(PageType::Header, 0)
609            }
610        };
611
612        let data = page.as_bytes_mut();
613
614        // Write magic
615        data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
616
617        // Write fields (at fixed offsets in the DB header area)
618        data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
619        data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
620        data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
621        data[HEADER_SIZE + 16..HEADER_SIZE + 20]
622            .copy_from_slice(&header.freelist_head.to_le_bytes());
623        data[HEADER_SIZE + 20..HEADER_SIZE + 24]
624            .copy_from_slice(&header.schema_version.to_le_bytes());
625        data[HEADER_SIZE + 24..HEADER_SIZE + 32]
626            .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
627        data[HEADER_SIZE + 32..HEADER_SIZE + 36]
628            .copy_from_slice(&header.physical.format_version.to_le_bytes());
629        data[HEADER_SIZE + 36..HEADER_SIZE + 44]
630            .copy_from_slice(&header.physical.sequence.to_le_bytes());
631        data[HEADER_SIZE + 44..HEADER_SIZE + 52]
632            .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
633        data[HEADER_SIZE + 52..HEADER_SIZE + 60]
634            .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
635        data[HEADER_SIZE + 60..HEADER_SIZE + 68]
636            .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
637        data[HEADER_SIZE + 68..HEADER_SIZE + 72]
638            .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
639        data[HEADER_SIZE + 72..HEADER_SIZE + 80]
640            .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
641        data[HEADER_SIZE + 80..HEADER_SIZE + 84]
642            .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
643        data[HEADER_SIZE + 84..HEADER_SIZE + 92]
644            .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
645        data[HEADER_SIZE + 92..HEADER_SIZE + 96]
646            .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
647        data[HEADER_SIZE + 96..HEADER_SIZE + 100]
648            .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
649        data[HEADER_SIZE + 100..HEADER_SIZE + 104]
650            .copy_from_slice(&header.physical.index_count.to_le_bytes());
651        data[HEADER_SIZE + 104..HEADER_SIZE + 108]
652            .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
653        data[HEADER_SIZE + 108..HEADER_SIZE + 116]
654            .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
655        data[HEADER_SIZE + 116..HEADER_SIZE + 120]
656            .copy_from_slice(&header.physical.export_count.to_le_bytes());
657        data[HEADER_SIZE + 120..HEADER_SIZE + 124]
658            .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
659        data[HEADER_SIZE + 124..HEADER_SIZE + 128]
660            .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
661        data[HEADER_SIZE + 128..HEADER_SIZE + 132]
662            .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
663        data[HEADER_SIZE + 132..HEADER_SIZE + 136]
664            .copy_from_slice(&header.physical.registry_page.to_le_bytes());
665        data[HEADER_SIZE + 136..HEADER_SIZE + 144]
666            .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
667        data[HEADER_SIZE + 144..HEADER_SIZE + 148]
668            .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
669        data[HEADER_SIZE + 148..HEADER_SIZE + 156]
670            .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
671        data[HEADER_SIZE + 156..HEADER_SIZE + 160]
672            .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
673        data[HEADER_SIZE + 160..HEADER_SIZE + 168]
674            .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
675        data[HEADER_SIZE + 168..HEADER_SIZE + 172]
676            .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
677        data[HEADER_SIZE + 172..HEADER_SIZE + 180]
678            .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
679        data[HEADER_SIZE + 180..HEADER_SIZE + 184]
680            .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
681        data[HEADER_SIZE + 184..HEADER_SIZE + 192]
682            .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
683
684        // Two-phase checkpoint fields (offset 192-200)
685        data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
686        data[HEADER_SIZE + 193..HEADER_SIZE + 201]
687            .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
688
689        page.update_checksum();
690
691        // Write header shadow FIRST (so it's intact if main write is interrupted)
692        self.write_header_shadow(&page)?;
693
694        self.write_page_raw(0, &page)?;
695        *self.header_dirty_lock()? = false;
696
697        Ok(())
698    }
699
700    /// Read a page from disk (bypassing cache)
701    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
702        let mut file = self.file_lock()?;
703        let offset = (page_id as u64) * (PAGE_SIZE as u64);
704
705        file.seek(SeekFrom::Start(offset))?;
706
707        let mut buf = [0u8; PAGE_SIZE];
708        file.read_exact(&mut buf)?;
709
710        let page = Page::from_bytes(buf);
711
712        // Verify checksum if configured (including page 0)
713        if self.config.verify_checksums {
714            page.verify_checksum()?;
715        }
716
717        Ok(page)
718    }
719
720    /// Write a page to disk (bypassing cache)
721    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
722        if self.config.read_only {
723            return Err(PagerError::ReadOnly);
724        }
725
726        let mut file = self.file_lock()?;
727        let offset = (page_id as u64) * (PAGE_SIZE as u64);
728
729        file.seek(SeekFrom::Start(offset))?;
730        file.write_all(page.as_bytes())?;
731
732        Ok(())
733    }
734
735    /// Read a page (cache-aware)
736    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
737        // Check cache first
738        if let Some(page) = self.cache.get(page_id) {
739            return Ok(page);
740        }
741
742        // Cache miss - read from disk
743        let page = self.read_page_raw(page_id)?;
744
745        // Add to cache
746        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
747            // Evicted page was dirty, need to write it back
748            let evicted_id = dirty_page.page_id();
749            self.write_page_raw(evicted_id, &dirty_page)?;
750        }
751
752        Ok(page)
753    }
754
755    /// Read a page without verifying checksum (for encrypted pages)
756    ///
757    /// Use this when the page content has its own integrity protection
758    /// (e.g., AES-GCM authentication tag for encrypted pages).
759    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
760        // Check cache first
761        if let Some(page) = self.cache.get(page_id) {
762            return Ok(page);
763        }
764
765        // Cache miss - read from disk (skip checksum verification)
766        let mut file = self.file_lock()?;
767        let offset = (page_id as u64) * (PAGE_SIZE as u64);
768
769        file.seek(SeekFrom::Start(offset))?;
770
771        let mut buf = [0u8; PAGE_SIZE];
772        file.read_exact(&mut buf)?;
773        drop(file);
774
775        let page = Page::from_bytes(buf);
776
777        // Add to cache (no checksum verification)
778        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
779            // Evicted page was dirty, need to write it back
780            let evicted_id = dirty_page.page_id();
781            self.write_page_raw(evicted_id, &dirty_page)?;
782        }
783
784        Ok(page)
785    }
786
787    /// Write a page (cache-aware)
788    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
789        if self.config.read_only {
790            return Err(PagerError::ReadOnly);
791        }
792
793        // Update checksum
794        page.update_checksum();
795
796        // Add to cache and mark dirty. If the cache had to evict a
797        // dirty entry, write it through immediately — the evicted
798        // page will never be flushed otherwise (same bug fixed in
799        // `allocate_page`).
800        if let Some(dirty_page) = self.cache.insert(page_id, page) {
801            let evicted_id = dirty_page.page_id();
802            self.write_page_raw(evicted_id, &dirty_page)?;
803        }
804        self.cache.mark_dirty(page_id);
805
806        Ok(())
807    }
808
809    /// Read a page through the configured encryptor if any. Page 0
810    /// is always returned plaintext (it carries the encryption marker
811    /// + header). Callers that want raw cipher bytes can use
812    ///   `read_page_no_checksum` directly.
813    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
814        if page_id == 0 || self.encryption.is_none() {
815            return self.read_page(page_id);
816        }
817        let raw = self.read_page_no_checksum(page_id)?;
818        let (enc, _) = self
819            .encryption
820            .as_ref()
821            .expect("encryption presence checked above");
822        let plaintext = enc
823            .decrypt(page_id, raw.as_bytes())
824            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
825        let mut buf = [0u8; PAGE_SIZE];
826        let n = plaintext.len().min(PAGE_SIZE);
827        buf[..n].copy_from_slice(&plaintext[..n]);
828        Ok(Page::from_bytes(buf))
829    }
830
831    /// Write a page through the configured encryptor if any. Page 0
832    /// bypasses encryption and goes through the normal checksummed
833    /// path. Encrypted pages skip the checksum update because
834    /// AES-GCM's authentication tag is the integrity guarantee.
835    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
836        if page_id == 0 || self.encryption.is_none() {
837            return self.write_page(page_id, page);
838        }
839        const OVERHEAD: usize = 12 + 16; // nonce + GCM tag
840        let plaintext_len = PAGE_SIZE - OVERHEAD;
841        let plaintext = &page.as_bytes()[..plaintext_len];
842        let (enc, _) = self
843            .encryption
844            .as_ref()
845            .expect("encryption presence checked above");
846        let ciphertext = enc.encrypt(page_id, plaintext);
847        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
848        let mut buf = [0u8; PAGE_SIZE];
849        buf.copy_from_slice(&ciphertext);
850        let cipher_page = Page::from_bytes(buf);
851        self.write_page_no_checksum(page_id, cipher_page)
852    }
853
854    /// Write a page without updating checksum (for encrypted pages)
855    ///
856    /// Use this when the page content has its own integrity protection
857    /// (e.g., AES-GCM authentication tag for encrypted pages).
858    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
859        if self.config.read_only {
860            return Err(PagerError::ReadOnly);
861        }
862
863        // Add to cache and mark dirty (no checksum update). Same
864        // eviction-write-through guard as `write_page`.
865        if let Some(dirty_page) = self.cache.insert(page_id, page) {
866            let evicted_id = dirty_page.page_id();
867            self.write_page_raw(evicted_id, &dirty_page)?;
868        }
869        self.cache.mark_dirty(page_id);
870
871        Ok(())
872    }
873
874    /// Allocate a new page
875    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
876        if self.config.read_only {
877            return Err(PagerError::ReadOnly);
878        }
879
880        // Try to get from freelist first
881        let page_id = {
882            let mut freelist = self.freelist_write()?;
883            if let Some(id) = freelist.allocate() {
884                id
885            } else if freelist.trunk_head() != 0 {
886                let trunk_id = freelist.trunk_head();
887                drop(freelist);
888
889                let trunk = self.read_page(trunk_id).map_err(|e| match e {
890                    PagerError::PageNotFound(_) => {
891                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
892                    }
893                    other => other,
894                })?;
895
896                let mut freelist = self.freelist_write()?;
897                freelist
898                    .load_from_trunk(&trunk)
899                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
900                let id = freelist.allocate().ok_or_else(|| {
901                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
902                })?;
903
904                let mut header = self.header_write()?;
905                header.freelist_head = freelist.trunk_head();
906                *self.header_dirty_lock()? = true;
907
908                id
909            } else {
910                // No free pages, extend file
911                let mut header = self.header_write()?;
912                let id = header.page_count;
913                header.page_count += 1;
914                *self.header_dirty_lock()? = true;
915                id
916            }
917        };
918
919        let page = Page::new(page_type, page_id);
920
921        // Write to cache. The evicted page (if any) is dirty by
922        // definition — `cache.insert` only returns `Some` when it
923        // had to evict a dirty entry to make room. The previous
924        // version dropped that return value, which silently lost
925        // writes whenever a freshly-allocated page caused an LRU
926        // eviction. This shows up under heavy ingest as
927        // "B-tree insert error: Pager error: I/O error: failed to
928        // fill whole buffer" later, when something tries to read
929        // back the never-flushed page. Mirror `read_page`'s
930        // handling: write the evicted page through immediately so
931        // the on-disk image stays consistent.
932        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
933            let evicted_id = dirty_page.page_id();
934            self.write_page_raw(evicted_id, &dirty_page)?;
935        }
936        self.cache.mark_dirty(page_id);
937
938        Ok(page)
939    }
940
941    /// Reserve a contiguous extent of vector pages.
942    pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
943        if self.config.read_only {
944            return Err(PagerError::ReadOnly);
945        }
946        if n_pages == 0 {
947            return Err(PagerError::InvalidDatabase(
948                "contiguous extent must reserve at least one page".to_string(),
949            ));
950        }
951
952        let start_page = {
953            let mut header = self.header_write()?;
954            let start = header.page_count;
955            header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
956                PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
957            })?;
958            *self.header_dirty_lock()? = true;
959            start
960        };
961
962        for page_id in start_page..start_page + n_pages {
963            let mut page = Page::new(PageType::Vector, page_id);
964            page.update_checksum();
965            if let Some(dirty_page) = self.cache.insert(page_id, page) {
966                let evicted_id = dirty_page.page_id();
967                self.write_page_raw(evicted_id, &dirty_page)?;
968            }
969            self.cache.mark_dirty(page_id);
970        }
971
972        Ok(super::ExtentId {
973            start_page,
974            n_pages,
975        })
976    }
977
978    /// Free a page (return to freelist)
979    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
980        if self.config.read_only {
981            return Err(PagerError::ReadOnly);
982        }
983
984        // Remove from cache
985        self.cache.remove(page_id);
986
987        // Add to freelist
988        let mut freelist = self.freelist_write()?;
989        freelist.free(page_id);
990
991        *self.header_dirty_lock()? = true;
992
993        Ok(())
994    }
995
996    /// Get database header
997    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
998        Ok(self.header_read()?.clone())
999    }
1000
1001    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
1002        Ok(self.header_read()?.physical)
1003    }
1004
1005    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
1006        if self.config.read_only {
1007            return Err(PagerError::ReadOnly);
1008        }
1009
1010        let mut header = self.header_write()?;
1011        header.physical = physical;
1012        *self.header_dirty_lock()? = true;
1013        Ok(())
1014    }
1015
1016    /// Get page count
1017    pub fn page_count(&self) -> Result<u32, PagerError> {
1018        Ok(self.header_read()?.page_count)
1019    }
1020
1021    /// Attach a WAL writer to enforce WAL-first flush ordering.
1022    ///
1023    /// After this call, [`Pager::flush`] computes the maximum
1024    /// `header.lsn` over all dirty pages and calls
1025    /// `WalWriter::flush_until(max_lsn)` before any page is written
1026    /// to the data file. This is the postgres rule: a page on disk
1027    /// implies its WAL record is already durable on disk.
1028    ///
1029    /// Existing call sites that construct a Pager without a WAL
1030    /// keep their previous behaviour (no LSN check) — wiring is
1031    /// strictly opt-in.
1032    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
1033        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1034        *slot = Some(wal);
1035    }
1036
1037    /// Detach the WAL writer (test / shutdown path).
1038    pub fn clear_wal_writer(&self) {
1039        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1040        *slot = None;
1041    }
1042
1043    /// Has a WAL writer been attached?
1044    pub fn has_wal_writer(&self) -> bool {
1045        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1046    }
1047
1048    /// Flush all dirty pages to disk
1049    pub fn flush(&self) -> Result<(), PagerError> {
1050        if self.config.read_only {
1051            return Ok(());
1052        }
1053
1054        // Persist freelist to trunk pages when dirty
1055        let trunks = {
1056            let mut freelist = self.freelist_write()?;
1057            if freelist.is_dirty() {
1058                let mut header = self.header_write()?;
1059                let trunks = freelist.flush_to_trunks(0, || {
1060                    let id = header.page_count;
1061                    header.page_count += 1;
1062                    id
1063                });
1064                header.freelist_head = freelist.trunk_head();
1065                *self.header_dirty_lock()? = true;
1066                freelist.mark_clean();
1067                trunks
1068            } else {
1069                Vec::new()
1070            }
1071        };
1072
1073        for trunk in trunks {
1074            let page_id = trunk.page_id();
1075            self.cache.insert(page_id, trunk);
1076            self.cache.mark_dirty(page_id);
1077        }
1078
1079        // Flush dirty pages from cache (through DWB if enabled)
1080        let dirty_pages = self.cache.flush_dirty();
1081        if !dirty_pages.is_empty() {
1082            // WAL-FIRST: ensure every WAL record describing a dirty
1083            // page is durable BEFORE the page itself reaches disk.
1084            // Pages with `lsn == 0` are exempt (freelist trunks, header
1085            // shadow pages, anything not produced by a WAL append).
1086            let max_lsn = dirty_pages
1087                .iter()
1088                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1089                .max()
1090                .unwrap_or(0);
1091            if max_lsn > 0 {
1092                if let Ok(slot) = self.wal.read() {
1093                    if let Some(wal) = slot.as_ref() {
1094                        let wal = Arc::clone(wal);
1095                        // Drop the read lock before taking the WAL
1096                        // mutex so an unrelated reader cannot block
1097                        // the flush path.
1098                        drop(slot);
1099                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1100                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1101                    }
1102                }
1103            }
1104            self.write_pages_through_dwb(&dirty_pages)?;
1105        }
1106
1107        // Write header if dirty
1108        if *self.header_dirty_lock()? {
1109            self.write_header()?;
1110        }
1111
1112        Ok(())
1113    }
1114
1115    /// Sync file to disk (fsync)
1116    pub fn sync(&self) -> Result<(), PagerError> {
1117        self.flush()?;
1118
1119        let file = self.file_lock()?;
1120        file.sync_all()?;
1121
1122        Ok(())
1123    }
1124
1125    /// Get cache statistics
1126    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1127        self.cache.stats()
1128    }
1129
1130    /// Count dirty pages currently in the page cache.
1131    pub fn dirty_page_count(&self) -> usize {
1132        self.cache.dirty_count()
1133    }
1134
1135    /// Estimated fraction of the page cache holding dirty pages.
1136    /// Returned in `[0, 1]`. Used by the background writer to
1137    /// decide when to kick in aggressive flushing.
1138    pub fn dirty_fraction(&self) -> f64 {
1139        let capacity = self.cache.capacity().max(1) as f64;
1140        self.cache.dirty_count() as f64 / capacity
1141    }
1142
1143    /// Flush up to `max` dirty pages from the cache. Returns the
1144    /// number actually written. Background-writer entry point —
1145    /// reuses the same persistence path as `flush()` but bounded.
1146    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1147        if self.config.read_only || max == 0 {
1148            return Ok(0);
1149        }
1150        let dirty_pages = self.cache.flush_some_dirty(max);
1151        if dirty_pages.is_empty() {
1152            return Ok(0);
1153        }
1154        let count = dirty_pages.len();
1155        // WAL-first: every cached dirty page carries an LSN that the
1156        // WAL must have already persisted. The full `flush()` path
1157        // enforces this with `wal.flush(max_lsn)`; here we simply
1158        // write through the pager — safe because callers only reach
1159        // this path via the bgwriter, which runs asynchronously
1160        // alongside normal commits that already respect WAL-first.
1161        for (page_id, page) in dirty_pages {
1162            self.write_page(page_id, page)?;
1163        }
1164        Ok(count)
1165    }
1166
1167    /// Get database file path
1168    pub fn path(&self) -> &Path {
1169        &self.path
1170    }
1171
1172    /// Check if database is read-only
1173    pub fn is_read_only(&self) -> bool {
1174        self.config.read_only
1175    }
1176
1177    /// Get file size in bytes
1178    pub fn file_size(&self) -> Result<u64, PagerError> {
1179        let file = self.file_lock()?;
1180        Ok(file.metadata()?.len())
1181    }
1182
1183    /// Issue an OS-level read-ahead hint for `page_id`.
1184    ///
1185    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1186    /// cursor passes 50% of the current leaf, so the kernel fetches
1187    /// the next leaf page while CPU processes the remaining half of
1188    /// the current one. Failures are silent — a missed prefetch is a
1189    /// performance miss, never a correctness bug.
1190    pub fn prefetch_hint(&self, page_id: u32) {
1191        if let Ok(file) = self.file_lock() {
1192            let _ = crate::storage::btree::prefetch::prefetch_page(
1193                &file,
1194                page_id as u64,
1195                PAGE_SIZE as u32,
1196            );
1197        }
1198    }
1199
1200    // ── Corruption defense helpers ──────────────────────────────────
1201
1202    /// Path for the header shadow file
1203    fn shadow_path(db_path: &Path) -> PathBuf {
1204        let mut p = db_path.to_path_buf().into_os_string();
1205        p.push("-hdr");
1206        PathBuf::from(p)
1207    }
1208
1209    /// Path for the metadata shadow file
1210    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1211        let mut p = db_path.to_path_buf().into_os_string();
1212        p.push("-meta");
1213        PathBuf::from(p)
1214    }
1215
1216    /// Path for the double-write buffer file
1217    fn dwb_path(db_path: &Path) -> PathBuf {
1218        let mut p = db_path.to_path_buf().into_os_string();
1219        p.push("-dwb");
1220        PathBuf::from(p)
1221    }
1222
1223    /// Open the double-write buffer file without truncating existing content.
1224    ///
1225    /// The file is intentionally preserved across restarts so recovery can
1226    /// consume any crash-leftover pages before the next write cycle clears it.
1227    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1228        Ok(OpenOptions::new()
1229            .read(true)
1230            .write(true)
1231            .create(true)
1232            .truncate(false)
1233            .open(Self::dwb_path(db_path))?)
1234    }
1235
1236    /// Clear the DWB in place while preserving the file path and handle.
1237    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1238        file.set_len(0)?;
1239        file.seek(SeekFrom::Start(0))?;
1240        file.sync_all()?;
1241        Ok(())
1242    }
1243
1244    /// Write a shadow copy of the header page to .rdb-hdr
1245    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1246        if self.config.read_only {
1247            return Ok(());
1248        }
1249        let shadow = Self::shadow_path(&self.path);
1250        let mut f = File::create(&shadow)?;
1251        f.write_all(page.as_bytes())?;
1252        f.sync_all()?;
1253        Ok(())
1254    }
1255
1256    /// Recover header from shadow file when page 0 is corrupted
1257    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1258        let shadow = Self::shadow_path(&self.path);
1259        if !shadow.exists() {
1260            return Err(PagerError::InvalidDatabase(
1261                "Page 0 corrupted and no header shadow found".into(),
1262            ));
1263        }
1264        let mut f = File::open(&shadow)?;
1265        let mut buf = [0u8; PAGE_SIZE];
1266        f.read_exact(&mut buf)?;
1267        let page = Page::from_bytes(buf);
1268
1269        // Verify shadow is valid
1270        let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1271        if magic != MAGIC_BYTES {
1272            return Err(PagerError::InvalidDatabase(
1273                "Header shadow also corrupted".into(),
1274            ));
1275        }
1276
1277        // Restore page 0 from shadow
1278        if !self.config.read_only {
1279            self.write_page_raw(0, &page)?;
1280            let file = self.file_lock()?;
1281            file.sync_all()?;
1282        }
1283
1284        Ok(page)
1285    }
1286
1287    /// Write a shadow copy of the metadata page to .rdb-meta.
1288    ///
1289    /// When the process-global `fold_pager_meta` policy is enabled (see
1290    /// [`crate::physical::fold_pager_meta_enabled`]) the shadow is suppressed:
1291    /// metadata is sourced exclusively from page 1 (plus its overflow chain).
1292    /// Any pre-existing `<data>-meta` file is also removed so a flipped flag
1293    /// cannot leave a stale shadow on disk. Reads still tolerate the sidecar
1294    /// when present so databases written before the flag flipped remain
1295    /// loadable.
1296    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1297        if self.config.read_only {
1298            return Ok(());
1299        }
1300        let shadow = Self::meta_shadow_path(&self.path);
1301        if crate::physical::fold_pager_meta_enabled() {
1302            // Best-effort cleanup of any prior shadow — a missing file is not
1303            // an error condition here.
1304            let _ = std::fs::remove_file(&shadow);
1305            return Ok(());
1306        }
1307        let mut f = File::create(&shadow)?;
1308        f.write_all(page.as_bytes())?;
1309        f.sync_all()?;
1310        Ok(())
1311    }
1312
1313    /// Recover metadata page from shadow file when page 1 is corrupted
1314    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1315        let shadow = Self::meta_shadow_path(&self.path);
1316        if !shadow.exists() {
1317            return Err(PagerError::InvalidDatabase(
1318                "Page 1 corrupted and no metadata shadow found".into(),
1319            ));
1320        }
1321        let mut f = File::open(&shadow)?;
1322        let mut buf = [0u8; PAGE_SIZE];
1323        f.read_exact(&mut buf)?;
1324        let page = Page::from_bytes(buf);
1325
1326        // Restore page 1 from shadow
1327        if !self.config.read_only {
1328            self.write_page_raw(1, &page)?;
1329            let file = self.file_lock()?;
1330            file.sync_all()?;
1331        }
1332
1333        Ok(page)
1334    }
1335
1336    /// Write pages through the double-write buffer for torn page protection.
1337    ///
1338    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1339    /// 2. fsync the DWB
1340    /// 3. Write all pages to their final locations in the .rdb file
1341    /// 4. Truncate the DWB (marks as consumed)
1342    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1343        if let Some(dwb_mutex) = &self.dwb_file {
1344            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1345
1346            // Build DWB content: [magic:4][count:u32][checksum:u32][pages...]
1347            // Each page entry: [page_id:u32][page_data:4096]
1348            let entry_size = 4 + PAGE_SIZE; // page_id + data
1349            let header_len = 4 + 4 + 4; // magic + count + checksum
1350            let total = header_len + pages.len() * entry_size;
1351            let mut buf = Vec::with_capacity(total);
1352
1353            // Header
1354            buf.extend_from_slice(&DWB_MAGIC);
1355            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1356            buf.extend_from_slice(&[0u8; 4]); // placeholder for checksum
1357
1358            // Page entries
1359            for (page_id, page) in pages {
1360                buf.extend_from_slice(&page_id.to_le_bytes());
1361                buf.extend_from_slice(page.as_bytes());
1362            }
1363
1364            // Compute and write checksum over all data after the header
1365            let checksum = super::super::crc32::crc32(&buf[header_len..]);
1366            buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1367
1368            // Write DWB and fsync
1369            dwb.seek(SeekFrom::Start(0))?;
1370            dwb.write_all(&buf)?;
1371            dwb.set_len(buf.len() as u64)?;
1372            dwb.sync_all()?;
1373
1374            // Now write pages to their final locations
1375            for (page_id, page) in pages {
1376                self.write_page_raw(*page_id, page)?;
1377            }
1378
1379            // Truncate DWB to mark as consumed
1380            Self::clear_dwb_file(&mut dwb)?;
1381
1382            Ok(())
1383        } else {
1384            // DWB disabled — write directly
1385            for (page_id, page) in pages {
1386                self.write_page_raw(*page_id, page)?;
1387            }
1388            Ok(())
1389        }
1390    }
1391
1392    /// Recover from double-write buffer after a crash.
1393    ///
1394    /// If the DWB file contains valid pages, they were written before the crash
1395    /// interrupted writing to the main file. Re-apply them.
1396    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1397        let dwb_path = Self::dwb_path(&self.path);
1398        if !dwb_path.exists() {
1399            return Ok(());
1400        }
1401
1402        if let Some(dwb_mutex) = &self.dwb_file {
1403            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1404            return self.recover_from_dwb_file(&mut file);
1405        }
1406
1407        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1408        self.recover_from_dwb_file(&mut file)
1409    }
1410
1411    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1412        file.seek(SeekFrom::Start(0))?;
1413        let len = file.metadata()?.len();
1414        if len < 12 {
1415            // Empty or incomplete header — keep the DWB file but clear stale bytes.
1416            return Self::clear_dwb_file(file);
1417        }
1418
1419        let mut buf = vec![0u8; len as usize];
1420        file.read_exact(&mut buf)?;
1421
1422        // Verify magic
1423        if buf[0..4] != DWB_MAGIC {
1424            // Not a valid DWB — clear it in place so the same file can be reused.
1425            return Self::clear_dwb_file(file);
1426        }
1427
1428        let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1429        let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1430
1431        let header_len = 12;
1432        let entry_size = 4 + PAGE_SIZE;
1433        let expected_len = header_len + count * entry_size;
1434
1435        if buf.len() < expected_len {
1436            // Incomplete DWB write — discard it in place.
1437            return Self::clear_dwb_file(file);
1438        }
1439
1440        // Verify checksum
1441        let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1442        if computed != stored_checksum {
1443            // Corrupted DWB — discard it in place.
1444            return Self::clear_dwb_file(file);
1445        }
1446
1447        // DWB is valid — re-apply pages to main file
1448        let mut offset = header_len;
1449        for _ in 0..count {
1450            let page_id = u32::from_le_bytes([
1451                buf[offset],
1452                buf[offset + 1],
1453                buf[offset + 2],
1454                buf[offset + 3],
1455            ]);
1456            offset += 4;
1457
1458            let mut page_data = [0u8; PAGE_SIZE];
1459            page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1460            offset += PAGE_SIZE;
1461
1462            let page = Page::from_bytes(page_data);
1463            self.write_page_raw(page_id, &page)?;
1464        }
1465
1466        // Sync and clean up
1467        {
1468            let file = self.file_lock()?;
1469            file.sync_all()?;
1470        }
1471
1472        Self::clear_dwb_file(file)
1473    }
1474
1475    /// Write header and sync to disk (public for checkpointer).
1476    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1477        self.write_header()?;
1478        let file = self.file_lock()?;
1479        file.sync_all()?;
1480        Ok(())
1481    }
1482
1483    /// Set the checkpoint_in_progress flag in the header.
1484    pub fn set_checkpoint_in_progress(
1485        &self,
1486        in_progress: bool,
1487        target_lsn: u64,
1488    ) -> Result<(), PagerError> {
1489        let mut header = self.header_write()?;
1490        header.checkpoint_in_progress = in_progress;
1491        header.checkpoint_target_lsn = target_lsn;
1492        *self.header_dirty_lock()? = true;
1493        drop(header);
1494        self.write_header_and_sync()
1495    }
1496
1497    /// Update the checkpoint LSN and clear the in-progress flag.
1498    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1499        let mut header = self.header_write()?;
1500        header.checkpoint_lsn = lsn;
1501        header.checkpoint_in_progress = false;
1502        header.checkpoint_target_lsn = 0;
1503        *self.header_dirty_lock()? = true;
1504        drop(header);
1505        self.write_header_and_sync()
1506    }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511    use super::*;
1512
1513    fn temp_db_path(name: &str) -> PathBuf {
1514        std::env::temp_dir().join(format!(
1515            "reddb-pager-{}-{}-{}.rdb",
1516            name,
1517            std::process::id(),
1518            crate::utils::now_unix_nanos()
1519        ))
1520    }
1521
1522    #[test]
1523    fn open_refuses_future_database_version() {
1524        let path = temp_db_path("future-version");
1525        let pager = Pager::open_default(&path).unwrap();
1526        drop(pager);
1527
1528        let mut future_header = Page::new_header_page(1);
1529        future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1530            .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1531        future_header.update_checksum();
1532
1533        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1534        file.seek(SeekFrom::Start(0)).unwrap();
1535        file.write_all(future_header.as_bytes()).unwrap();
1536        file.sync_all().unwrap();
1537        drop(file);
1538
1539        let err = match Pager::open_default(&path) {
1540            Ok(_) => panic!("future database version should be rejected"),
1541            Err(err) => err,
1542        };
1543        match err {
1544            PagerError::InvalidDatabase(msg) => {
1545                assert!(msg.contains("newer than supported"));
1546            }
1547            other => panic!("expected InvalidDatabase, got {other:?}"),
1548        }
1549
1550        let _ = std::fs::remove_file(&path);
1551        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1552        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1553    }
1554}