Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3/// DWB file magic: "RDDW"
4const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7    /// Open or create a database file
8    pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9        let path = path.as_ref().to_path_buf();
10        let exists = path.exists();
11
12        if !exists && !config.create {
13            return Err(PagerError::InvalidDatabase(
14                "Database does not exist".into(),
15            ));
16        }
17
18        if !exists && config.read_only {
19            return Err(PagerError::InvalidDatabase(
20                "Cannot create read-only database".into(),
21            ));
22        }
23
24        // Open file
25        // Note: create requires write access, so disable it for read-only mode
26        let file = OpenOptions::new()
27            .read(true)
28            .write(!config.read_only)
29            .create(config.create && !config.read_only)
30            .open(&path)?;
31
32        // Acquire file lock (exclusive for writes, shared for read-only)
33        let lock_file = if !config.read_only {
34            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
35            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
36            Some(lf)
37        } else {
38            let lf = OpenOptions::new().read(true).open(&path)?;
39            match lf.try_lock_shared() {
40                Ok(_) => Some(lf),
41                Err(_) => None,
42            }
43        };
44
45        // Open double-write buffer file.
46        //
47        // gh-478: when `fold_dwb_into_wal` is enabled the DWB sidecar is
48        // suppressed — torn pages are healed by replaying FullPageImage WAL
49        // records during recovery. Any pre-existing `-dwb` is removed so a
50        // flipped flag cannot leave a stale sidecar on disk.
51        let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
52        let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
53            let f = Self::open_dwb_file(&path)?;
54            Some(Mutex::new(f))
55        } else {
56            if fold_dwb && !config.read_only {
57                let _ = std::fs::remove_file(Self::dwb_path(&path));
58            }
59            None
60        };
61
62        let mut pager = Self {
63            path,
64            file: Mutex::new(file),
65            _lock_file: lock_file,
66            dwb_file,
67            cache: PageCache::new(config.cache_size),
68            freelist: RwLock::new(FreeList::new()),
69            header: RwLock::new(DatabaseHeader::default()),
70            config,
71            header_dirty: Mutex::new(false),
72            wal: RwLock::new(None),
73            encryption: None,
74        };
75
76        if exists {
77            // Recover from double-write buffer if needed
78            pager.recover_from_dwb()?;
79            // Load existing database (with header shadow fallback)
80            pager.load_header()?;
81            pager.bind_encryption_for_existing()?;
82        } else {
83            // Initialize new database
84            pager.initialize()?;
85            pager.bind_encryption_for_new()?;
86        }
87
88        Ok(pager)
89    }
90
91    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
92    /// the (key, marker) matrix:
93    ///
94    /// | Marker | Key supplied | Result                              |
95    /// |--------|--------------|-------------------------------------|
96    /// | yes    | yes          | Bind encryptor; validate key        |
97    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
98    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
99    /// | no     | no           | Plain pager — no binding needed     |
100    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
101        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
102        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
103
104        if self.page_count().unwrap_or(0) == 0 {
105            return self.bind_encryption_for_new();
106        }
107        let header_page = self.read_page_no_checksum(0)?;
108        let data = header_page.as_bytes();
109        let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
110            && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
111
112        let key = self.config.encryption.clone();
113        match (has_marker, key) {
114            (true, Some(key)) => {
115                let header_start = ENCRYPTION_MARKER_OFFSET + 4;
116                let header =
117                    crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
118                        .map_err(|e| {
119                            PagerError::InvalidDatabase(format!(
120                                "encryption header parse failed: {e}"
121                            ))
122                        })?;
123                if !header.validate(&key) {
124                    return Err(PagerError::InvalidKey);
125                }
126                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
127                self.encryption = Some((encryptor, header));
128                Ok(())
129            }
130            (true, None) => Err(PagerError::EncryptionRequired),
131            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
132            (false, None) => Ok(()),
133        }
134    }
135
136    /// New DB: if a key is configured, write the marker + header to
137    /// page 0 so subsequent opens detect encryption.
138    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
139        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
140        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
141
142        let Some(key) = self.config.encryption.clone() else {
143            return Ok(());
144        };
145        let header = crate::storage::encryption::EncryptionHeader::new(&key);
146        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
147
148        // Stamp the marker + header into page 0 if it's been
149        // initialised by `initialize()` already.
150        if self.page_count().unwrap_or(0) > 0 {
151            let mut page = self.read_page_no_checksum(0)?;
152            let data = page.as_bytes_mut();
153            data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
154                .copy_from_slice(ENCRYPTION_MARKER);
155            let header_bytes = header.to_bytes();
156            let header_start = ENCRYPTION_MARKER_OFFSET + 4;
157            data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
158            self.write_page_no_checksum(0, page)?;
159        }
160        self.encryption = Some((encryptor, header));
161        Ok(())
162    }
163
164    /// Open with default configuration
165    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
166        Self::open(path, PagerConfig::default())
167    }
168
169    /// Initialize a new database
170    fn initialize(&self) -> Result<(), PagerError> {
171        if self.config.read_only {
172            return Err(PagerError::ReadOnly);
173        }
174
175        // Create header page. Page ids 1 and 2 are reserved so fixed
176        // metadata/vault pages cannot be handed out to normal B-tree
177        // allocation before those subsystems are initialized.
178        let initial_page_count = 3;
179        let header_page = Page::new_header_page(initial_page_count);
180        self.header_write()?.page_count = initial_page_count;
181
182        // Write header and reserved pages so any scan over 0..page_count
183        // can read every allocated page in a brand-new database.
184        self.write_page_raw(0, &header_page)?;
185        let mut metadata_page = Page::new(PageType::Header, 1);
186        metadata_page.update_checksum();
187        self.write_page_raw(1, &metadata_page)?;
188        let mut vault_page = Page::new(PageType::Vault, 2);
189        vault_page.update_checksum();
190        self.write_page_raw(2, &vault_page)?;
191
192        // Sync to disk
193        self.sync()?;
194
195        Ok(())
196    }
197
198    /// Acquire a write lock on the header RwLock, mapping poison errors.
199    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
200        self.header.write().map_err(|_| PagerError::LockPoisoned)
201    }
202
203    /// Acquire a read lock on the header RwLock, mapping poison errors.
204    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
205        self.header.read().map_err(|_| PagerError::LockPoisoned)
206    }
207
208    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
209    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
210        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
211    }
212
213    /// Acquire a lock on the file Mutex, mapping poison errors.
214    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
215        self.file.lock().map_err(|_| PagerError::LockPoisoned)
216    }
217
218    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
219    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
220        self.header_dirty
221            .lock()
222            .map_err(|_| PagerError::LockPoisoned)
223    }
224
225    /// Load database header from page 0 (with shadow fallback)
226    fn load_header(&self) -> Result<(), PagerError> {
227        // Read page 0 — fall back to shadow if corrupted
228        let header_page = match self.read_page_raw(0) {
229            Ok(page) => {
230                // Verify magic bytes
231                let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
232                if magic == MAGIC_BYTES {
233                    page
234                } else {
235                    // Page 0 corrupted — try shadow
236                    self.recover_header_from_shadow()?
237                }
238            }
239            Err(_) => self.recover_header_from_shadow()?,
240        };
241
242        // Read header fields
243        let data = header_page.as_bytes();
244        let version = u32::from_le_bytes([
245            data[HEADER_SIZE + 4],
246            data[HEADER_SIZE + 5],
247            data[HEADER_SIZE + 6],
248            data[HEADER_SIZE + 7],
249        ]);
250
251        let page_size = u32::from_le_bytes([
252            data[HEADER_SIZE + 8],
253            data[HEADER_SIZE + 9],
254            data[HEADER_SIZE + 10],
255            data[HEADER_SIZE + 11],
256        ]);
257
258        if page_size != PAGE_SIZE as u32 {
259            return Err(PagerError::InvalidDatabase(format!(
260                "Unsupported page size: {}",
261                page_size
262            )));
263        }
264        if version > DB_VERSION {
265            return Err(PagerError::InvalidDatabase(format!(
266                "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
267            )));
268        }
269
270        let page_count = u32::from_le_bytes([
271            data[HEADER_SIZE + 12],
272            data[HEADER_SIZE + 13],
273            data[HEADER_SIZE + 14],
274            data[HEADER_SIZE + 15],
275        ]);
276
277        let freelist_head = u32::from_le_bytes([
278            data[HEADER_SIZE + 16],
279            data[HEADER_SIZE + 17],
280            data[HEADER_SIZE + 18],
281            data[HEADER_SIZE + 19],
282        ]);
283
284        let schema_version = u32::from_le_bytes([
285            data[HEADER_SIZE + 20],
286            data[HEADER_SIZE + 21],
287            data[HEADER_SIZE + 22],
288            data[HEADER_SIZE + 23],
289        ]);
290
291        let checkpoint_lsn = u64::from_le_bytes([
292            data[HEADER_SIZE + 24],
293            data[HEADER_SIZE + 25],
294            data[HEADER_SIZE + 26],
295            data[HEADER_SIZE + 27],
296            data[HEADER_SIZE + 28],
297            data[HEADER_SIZE + 29],
298            data[HEADER_SIZE + 30],
299            data[HEADER_SIZE + 31],
300        ]);
301        let physical_format_version = u32::from_le_bytes([
302            data[HEADER_SIZE + 32],
303            data[HEADER_SIZE + 33],
304            data[HEADER_SIZE + 34],
305            data[HEADER_SIZE + 35],
306        ]);
307        let physical_sequence = u64::from_le_bytes([
308            data[HEADER_SIZE + 36],
309            data[HEADER_SIZE + 37],
310            data[HEADER_SIZE + 38],
311            data[HEADER_SIZE + 39],
312            data[HEADER_SIZE + 40],
313            data[HEADER_SIZE + 41],
314            data[HEADER_SIZE + 42],
315            data[HEADER_SIZE + 43],
316        ]);
317        let manifest_root = u64::from_le_bytes([
318            data[HEADER_SIZE + 44],
319            data[HEADER_SIZE + 45],
320            data[HEADER_SIZE + 46],
321            data[HEADER_SIZE + 47],
322            data[HEADER_SIZE + 48],
323            data[HEADER_SIZE + 49],
324            data[HEADER_SIZE + 50],
325            data[HEADER_SIZE + 51],
326        ]);
327        let manifest_oldest_root = u64::from_le_bytes([
328            data[HEADER_SIZE + 52],
329            data[HEADER_SIZE + 53],
330            data[HEADER_SIZE + 54],
331            data[HEADER_SIZE + 55],
332            data[HEADER_SIZE + 56],
333            data[HEADER_SIZE + 57],
334            data[HEADER_SIZE + 58],
335            data[HEADER_SIZE + 59],
336        ]);
337        let free_set_root = u64::from_le_bytes([
338            data[HEADER_SIZE + 60],
339            data[HEADER_SIZE + 61],
340            data[HEADER_SIZE + 62],
341            data[HEADER_SIZE + 63],
342            data[HEADER_SIZE + 64],
343            data[HEADER_SIZE + 65],
344            data[HEADER_SIZE + 66],
345            data[HEADER_SIZE + 67],
346        ]);
347        let manifest_page = u32::from_le_bytes([
348            data[HEADER_SIZE + 68],
349            data[HEADER_SIZE + 69],
350            data[HEADER_SIZE + 70],
351            data[HEADER_SIZE + 71],
352        ]);
353        let manifest_checksum = u64::from_le_bytes([
354            data[HEADER_SIZE + 72],
355            data[HEADER_SIZE + 73],
356            data[HEADER_SIZE + 74],
357            data[HEADER_SIZE + 75],
358            data[HEADER_SIZE + 76],
359            data[HEADER_SIZE + 77],
360            data[HEADER_SIZE + 78],
361            data[HEADER_SIZE + 79],
362        ]);
363        let collection_roots_page = u32::from_le_bytes([
364            data[HEADER_SIZE + 80],
365            data[HEADER_SIZE + 81],
366            data[HEADER_SIZE + 82],
367            data[HEADER_SIZE + 83],
368        ]);
369        let collection_roots_checksum = u64::from_le_bytes([
370            data[HEADER_SIZE + 84],
371            data[HEADER_SIZE + 85],
372            data[HEADER_SIZE + 86],
373            data[HEADER_SIZE + 87],
374            data[HEADER_SIZE + 88],
375            data[HEADER_SIZE + 89],
376            data[HEADER_SIZE + 90],
377            data[HEADER_SIZE + 91],
378        ]);
379        let collection_root_count = u32::from_le_bytes([
380            data[HEADER_SIZE + 92],
381            data[HEADER_SIZE + 93],
382            data[HEADER_SIZE + 94],
383            data[HEADER_SIZE + 95],
384        ]);
385        let snapshot_count = u32::from_le_bytes([
386            data[HEADER_SIZE + 96],
387            data[HEADER_SIZE + 97],
388            data[HEADER_SIZE + 98],
389            data[HEADER_SIZE + 99],
390        ]);
391        let index_count = u32::from_le_bytes([
392            data[HEADER_SIZE + 100],
393            data[HEADER_SIZE + 101],
394            data[HEADER_SIZE + 102],
395            data[HEADER_SIZE + 103],
396        ]);
397        let catalog_collection_count = u32::from_le_bytes([
398            data[HEADER_SIZE + 104],
399            data[HEADER_SIZE + 105],
400            data[HEADER_SIZE + 106],
401            data[HEADER_SIZE + 107],
402        ]);
403        let catalog_total_entities = u64::from_le_bytes([
404            data[HEADER_SIZE + 108],
405            data[HEADER_SIZE + 109],
406            data[HEADER_SIZE + 110],
407            data[HEADER_SIZE + 111],
408            data[HEADER_SIZE + 112],
409            data[HEADER_SIZE + 113],
410            data[HEADER_SIZE + 114],
411            data[HEADER_SIZE + 115],
412        ]);
413        let export_count = u32::from_le_bytes([
414            data[HEADER_SIZE + 116],
415            data[HEADER_SIZE + 117],
416            data[HEADER_SIZE + 118],
417            data[HEADER_SIZE + 119],
418        ]);
419        let graph_projection_count = u32::from_le_bytes([
420            data[HEADER_SIZE + 120],
421            data[HEADER_SIZE + 121],
422            data[HEADER_SIZE + 122],
423            data[HEADER_SIZE + 123],
424        ]);
425        let analytics_job_count = u32::from_le_bytes([
426            data[HEADER_SIZE + 124],
427            data[HEADER_SIZE + 125],
428            data[HEADER_SIZE + 126],
429            data[HEADER_SIZE + 127],
430        ]);
431        let manifest_event_count = u32::from_le_bytes([
432            data[HEADER_SIZE + 128],
433            data[HEADER_SIZE + 129],
434            data[HEADER_SIZE + 130],
435            data[HEADER_SIZE + 131],
436        ]);
437        let registry_page = u32::from_le_bytes([
438            data[HEADER_SIZE + 132],
439            data[HEADER_SIZE + 133],
440            data[HEADER_SIZE + 134],
441            data[HEADER_SIZE + 135],
442        ]);
443        let registry_checksum = u64::from_le_bytes([
444            data[HEADER_SIZE + 136],
445            data[HEADER_SIZE + 137],
446            data[HEADER_SIZE + 138],
447            data[HEADER_SIZE + 139],
448            data[HEADER_SIZE + 140],
449            data[HEADER_SIZE + 141],
450            data[HEADER_SIZE + 142],
451            data[HEADER_SIZE + 143],
452        ]);
453        let recovery_page = u32::from_le_bytes([
454            data[HEADER_SIZE + 144],
455            data[HEADER_SIZE + 145],
456            data[HEADER_SIZE + 146],
457            data[HEADER_SIZE + 147],
458        ]);
459        let recovery_checksum = u64::from_le_bytes([
460            data[HEADER_SIZE + 148],
461            data[HEADER_SIZE + 149],
462            data[HEADER_SIZE + 150],
463            data[HEADER_SIZE + 151],
464            data[HEADER_SIZE + 152],
465            data[HEADER_SIZE + 153],
466            data[HEADER_SIZE + 154],
467            data[HEADER_SIZE + 155],
468        ]);
469        let catalog_page = u32::from_le_bytes([
470            data[HEADER_SIZE + 156],
471            data[HEADER_SIZE + 157],
472            data[HEADER_SIZE + 158],
473            data[HEADER_SIZE + 159],
474        ]);
475        let catalog_checksum = u64::from_le_bytes([
476            data[HEADER_SIZE + 160],
477            data[HEADER_SIZE + 161],
478            data[HEADER_SIZE + 162],
479            data[HEADER_SIZE + 163],
480            data[HEADER_SIZE + 164],
481            data[HEADER_SIZE + 165],
482            data[HEADER_SIZE + 166],
483            data[HEADER_SIZE + 167],
484        ]);
485        let metadata_state_page = u32::from_le_bytes([
486            data[HEADER_SIZE + 168],
487            data[HEADER_SIZE + 169],
488            data[HEADER_SIZE + 170],
489            data[HEADER_SIZE + 171],
490        ]);
491        let metadata_state_checksum = u64::from_le_bytes([
492            data[HEADER_SIZE + 172],
493            data[HEADER_SIZE + 173],
494            data[HEADER_SIZE + 174],
495            data[HEADER_SIZE + 175],
496            data[HEADER_SIZE + 176],
497            data[HEADER_SIZE + 177],
498            data[HEADER_SIZE + 178],
499            data[HEADER_SIZE + 179],
500        ]);
501        let vector_artifact_page = u32::from_le_bytes([
502            data[HEADER_SIZE + 180],
503            data[HEADER_SIZE + 181],
504            data[HEADER_SIZE + 182],
505            data[HEADER_SIZE + 183],
506        ]);
507        let vector_artifact_checksum = u64::from_le_bytes([
508            data[HEADER_SIZE + 184],
509            data[HEADER_SIZE + 185],
510            data[HEADER_SIZE + 186],
511            data[HEADER_SIZE + 187],
512            data[HEADER_SIZE + 188],
513            data[HEADER_SIZE + 189],
514            data[HEADER_SIZE + 190],
515            data[HEADER_SIZE + 191],
516        ]);
517
518        // Two-phase checkpoint fields (offset 192-200)
519        let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
520        let checkpoint_target_lsn = u64::from_le_bytes([
521            data[HEADER_SIZE + 193],
522            data[HEADER_SIZE + 194],
523            data[HEADER_SIZE + 195],
524            data[HEADER_SIZE + 196],
525            data[HEADER_SIZE + 197],
526            data[HEADER_SIZE + 198],
527            data[HEADER_SIZE + 199],
528            data[HEADER_SIZE + 200],
529        ]);
530
531        // Update header
532        {
533            let mut header = self.header_write()?;
534            header.version = version;
535            header.page_size = page_size;
536            header.page_count = page_count;
537            header.freelist_head = freelist_head;
538            header.schema_version = schema_version;
539            header.checkpoint_lsn = checkpoint_lsn;
540            header.checkpoint_in_progress = checkpoint_in_progress;
541            header.checkpoint_target_lsn = checkpoint_target_lsn;
542            header.physical = PhysicalFileHeader {
543                format_version: physical_format_version,
544                sequence: physical_sequence,
545                manifest_oldest_root,
546                manifest_root,
547                free_set_root,
548                manifest_page,
549                manifest_checksum,
550                collection_roots_page,
551                collection_roots_checksum,
552                collection_root_count,
553                snapshot_count,
554                index_count,
555                catalog_collection_count,
556                catalog_total_entities,
557                export_count,
558                graph_projection_count,
559                analytics_job_count,
560                manifest_event_count,
561                registry_page,
562                registry_checksum,
563                recovery_page,
564                recovery_checksum,
565                catalog_page,
566                catalog_checksum,
567                metadata_state_page,
568                metadata_state_checksum,
569                vector_artifact_page,
570                vector_artifact_checksum,
571            };
572        }
573
574        // Initialize freelist
575        {
576            let mut freelist = self.freelist_write()?;
577            *freelist = FreeList::from_header(freelist_head, 0);
578        }
579
580        Ok(())
581    }
582
583    /// Write header page
584    ///
585    /// Note: This merges database header fields into the existing page 0 content
586    /// to preserve any additional data (like encryption headers) that may be stored there.
587    fn write_header(&self) -> Result<(), PagerError> {
588        if self.config.read_only {
589            return Err(PagerError::ReadOnly);
590        }
591
592        let header = self.header_read()?;
593
594        // Read existing page 0 to preserve any additional data (e.g., encryption header)
595        // First check cache, then fall back to disk
596        let mut page = if let Some(cached) = self.cache.get(0) {
597            cached
598        } else {
599            // Try to read from disk if file is large enough
600            let file = self.file_lock()?;
601            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
602            drop(file);
603
604            if len >= PAGE_SIZE as u64 {
605                self.read_page_raw(0)?
606            } else {
607                // File is new/empty, create fresh header page
608                Page::new(PageType::Header, 0)
609            }
610        };
611
612        let data = page.as_bytes_mut();
613
614        // Write magic
615        data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
616
617        // Write fields (at fixed offsets in the DB header area)
618        data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
619        data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
620        data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
621        data[HEADER_SIZE + 16..HEADER_SIZE + 20]
622            .copy_from_slice(&header.freelist_head.to_le_bytes());
623        data[HEADER_SIZE + 20..HEADER_SIZE + 24]
624            .copy_from_slice(&header.schema_version.to_le_bytes());
625        data[HEADER_SIZE + 24..HEADER_SIZE + 32]
626            .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
627        data[HEADER_SIZE + 32..HEADER_SIZE + 36]
628            .copy_from_slice(&header.physical.format_version.to_le_bytes());
629        data[HEADER_SIZE + 36..HEADER_SIZE + 44]
630            .copy_from_slice(&header.physical.sequence.to_le_bytes());
631        data[HEADER_SIZE + 44..HEADER_SIZE + 52]
632            .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
633        data[HEADER_SIZE + 52..HEADER_SIZE + 60]
634            .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
635        data[HEADER_SIZE + 60..HEADER_SIZE + 68]
636            .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
637        data[HEADER_SIZE + 68..HEADER_SIZE + 72]
638            .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
639        data[HEADER_SIZE + 72..HEADER_SIZE + 80]
640            .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
641        data[HEADER_SIZE + 80..HEADER_SIZE + 84]
642            .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
643        data[HEADER_SIZE + 84..HEADER_SIZE + 92]
644            .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
645        data[HEADER_SIZE + 92..HEADER_SIZE + 96]
646            .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
647        data[HEADER_SIZE + 96..HEADER_SIZE + 100]
648            .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
649        data[HEADER_SIZE + 100..HEADER_SIZE + 104]
650            .copy_from_slice(&header.physical.index_count.to_le_bytes());
651        data[HEADER_SIZE + 104..HEADER_SIZE + 108]
652            .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
653        data[HEADER_SIZE + 108..HEADER_SIZE + 116]
654            .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
655        data[HEADER_SIZE + 116..HEADER_SIZE + 120]
656            .copy_from_slice(&header.physical.export_count.to_le_bytes());
657        data[HEADER_SIZE + 120..HEADER_SIZE + 124]
658            .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
659        data[HEADER_SIZE + 124..HEADER_SIZE + 128]
660            .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
661        data[HEADER_SIZE + 128..HEADER_SIZE + 132]
662            .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
663        data[HEADER_SIZE + 132..HEADER_SIZE + 136]
664            .copy_from_slice(&header.physical.registry_page.to_le_bytes());
665        data[HEADER_SIZE + 136..HEADER_SIZE + 144]
666            .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
667        data[HEADER_SIZE + 144..HEADER_SIZE + 148]
668            .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
669        data[HEADER_SIZE + 148..HEADER_SIZE + 156]
670            .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
671        data[HEADER_SIZE + 156..HEADER_SIZE + 160]
672            .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
673        data[HEADER_SIZE + 160..HEADER_SIZE + 168]
674            .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
675        data[HEADER_SIZE + 168..HEADER_SIZE + 172]
676            .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
677        data[HEADER_SIZE + 172..HEADER_SIZE + 180]
678            .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
679        data[HEADER_SIZE + 180..HEADER_SIZE + 184]
680            .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
681        data[HEADER_SIZE + 184..HEADER_SIZE + 192]
682            .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
683
684        // Two-phase checkpoint fields (offset 192-200)
685        data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
686        data[HEADER_SIZE + 193..HEADER_SIZE + 201]
687            .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
688
689        page.update_checksum();
690
691        // Write header shadow FIRST (so it's intact if main write is interrupted)
692        self.write_header_shadow(&page)?;
693
694        self.write_page_raw(0, &page)?;
695        *self.header_dirty_lock()? = false;
696
697        Ok(())
698    }
699
700    /// Read a page from disk (bypassing cache)
701    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
702        let mut file = self.file_lock()?;
703        let offset = (page_id as u64) * (PAGE_SIZE as u64);
704
705        file.seek(SeekFrom::Start(offset))?;
706
707        let mut buf = [0u8; PAGE_SIZE];
708        file.read_exact(&mut buf)?;
709
710        let page = Page::from_bytes(buf);
711
712        // Verify checksum if configured (including page 0)
713        if self.config.verify_checksums {
714            page.verify_checksum()?;
715        }
716
717        Ok(page)
718    }
719
720    /// Write a page to disk (bypassing cache)
721    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
722        if self.config.read_only {
723            return Err(PagerError::ReadOnly);
724        }
725
726        let mut file = self.file_lock()?;
727        let offset = (page_id as u64) * (PAGE_SIZE as u64);
728
729        file.seek(SeekFrom::Start(offset))?;
730        file.write_all(page.as_bytes())?;
731
732        Ok(())
733    }
734
735    /// Read a page (cache-aware)
736    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
737        // Check cache first
738        if let Some(page) = self.cache.get(page_id) {
739            return Ok(page);
740        }
741
742        // Cache miss - read from disk
743        let page = self.read_page_raw(page_id)?;
744
745        // Add to cache
746        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
747            // Evicted page was dirty, need to write it back
748            let evicted_id = dirty_page.page_id();
749            self.write_page_raw(evicted_id, &dirty_page)?;
750        }
751
752        Ok(page)
753    }
754
755    /// Read a page without verifying checksum (for encrypted pages)
756    ///
757    /// Use this when the page content has its own integrity protection
758    /// (e.g., AES-GCM authentication tag for encrypted pages).
759    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
760        // Check cache first
761        if let Some(page) = self.cache.get(page_id) {
762            return Ok(page);
763        }
764
765        // Cache miss - read from disk (skip checksum verification)
766        let mut file = self.file_lock()?;
767        let offset = (page_id as u64) * (PAGE_SIZE as u64);
768
769        file.seek(SeekFrom::Start(offset))?;
770
771        let mut buf = [0u8; PAGE_SIZE];
772        file.read_exact(&mut buf)?;
773        drop(file);
774
775        let page = Page::from_bytes(buf);
776
777        // Add to cache (no checksum verification)
778        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
779            // Evicted page was dirty, need to write it back
780            let evicted_id = dirty_page.page_id();
781            self.write_page_raw(evicted_id, &dirty_page)?;
782        }
783
784        Ok(page)
785    }
786
787    /// Write a page (cache-aware)
788    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
789        if self.config.read_only {
790            return Err(PagerError::ReadOnly);
791        }
792
793        // Update checksum
794        page.update_checksum();
795
796        // Add to cache and mark dirty. If the cache had to evict a
797        // dirty entry, write it through immediately — the evicted
798        // page will never be flushed otherwise (same bug fixed in
799        // `allocate_page`).
800        if let Some(dirty_page) = self.cache.insert(page_id, page) {
801            let evicted_id = dirty_page.page_id();
802            self.write_page_raw(evicted_id, &dirty_page)?;
803        }
804        self.cache.mark_dirty(page_id);
805
806        Ok(())
807    }
808
809    /// Read a page through the configured encryptor if any. Page 0
810    /// is always returned plaintext (it carries the encryption marker
811    /// + header). Callers that want raw cipher bytes can use
812    ///   `read_page_no_checksum` directly.
813    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
814        if page_id == 0 || self.encryption.is_none() {
815            return self.read_page(page_id);
816        }
817        let raw = self.read_page_no_checksum(page_id)?;
818        let (enc, _) = self
819            .encryption
820            .as_ref()
821            .expect("encryption presence checked above");
822        let plaintext = enc
823            .decrypt(page_id, raw.as_bytes())
824            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
825        let mut buf = [0u8; PAGE_SIZE];
826        let n = plaintext.len().min(PAGE_SIZE);
827        buf[..n].copy_from_slice(&plaintext[..n]);
828        Ok(Page::from_bytes(buf))
829    }
830
831    /// Write a page through the configured encryptor if any. Page 0
832    /// bypasses encryption and goes through the normal checksummed
833    /// path. Encrypted pages skip the checksum update because
834    /// AES-GCM's authentication tag is the integrity guarantee.
835    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
836        if page_id == 0 || self.encryption.is_none() {
837            return self.write_page(page_id, page);
838        }
839        const OVERHEAD: usize = 12 + 16; // nonce + GCM tag
840        let plaintext_len = PAGE_SIZE - OVERHEAD;
841        let plaintext = &page.as_bytes()[..plaintext_len];
842        let (enc, _) = self
843            .encryption
844            .as_ref()
845            .expect("encryption presence checked above");
846        let ciphertext = enc.encrypt(page_id, plaintext);
847        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
848        let mut buf = [0u8; PAGE_SIZE];
849        buf.copy_from_slice(&ciphertext);
850        let cipher_page = Page::from_bytes(buf);
851        self.write_page_no_checksum(page_id, cipher_page)
852    }
853
854    /// Write a page without updating checksum (for encrypted pages)
855    ///
856    /// Use this when the page content has its own integrity protection
857    /// (e.g., AES-GCM authentication tag for encrypted pages).
858    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
859        if self.config.read_only {
860            return Err(PagerError::ReadOnly);
861        }
862
863        // Add to cache and mark dirty (no checksum update). Same
864        // eviction-write-through guard as `write_page`.
865        if let Some(dirty_page) = self.cache.insert(page_id, page) {
866            let evicted_id = dirty_page.page_id();
867            self.write_page_raw(evicted_id, &dirty_page)?;
868        }
869        self.cache.mark_dirty(page_id);
870
871        Ok(())
872    }
873
874    /// Allocate a new page
875    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
876        if self.config.read_only {
877            return Err(PagerError::ReadOnly);
878        }
879
880        // Try to get from freelist first
881        let page_id = {
882            let mut freelist = self.freelist_write()?;
883            if let Some(id) = freelist.allocate() {
884                id
885            } else if freelist.trunk_head() != 0 {
886                let trunk_id = freelist.trunk_head();
887                drop(freelist);
888
889                let trunk = self.read_page(trunk_id).map_err(|e| match e {
890                    PagerError::PageNotFound(_) => {
891                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
892                    }
893                    other => other,
894                })?;
895
896                let mut freelist = self.freelist_write()?;
897                freelist
898                    .load_from_trunk(&trunk)
899                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
900                let id = freelist.allocate().ok_or_else(|| {
901                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
902                })?;
903
904                let mut header = self.header_write()?;
905                header.freelist_head = freelist.trunk_head();
906                *self.header_dirty_lock()? = true;
907
908                id
909            } else {
910                // No free pages, extend file
911                let mut header = self.header_write()?;
912                let id = header.page_count;
913                header.page_count += 1;
914                *self.header_dirty_lock()? = true;
915                id
916            }
917        };
918
919        let page = Page::new(page_type, page_id);
920
921        // Write to cache. The evicted page (if any) is dirty by
922        // definition — `cache.insert` only returns `Some` when it
923        // had to evict a dirty entry to make room. The previous
924        // version dropped that return value, which silently lost
925        // writes whenever a freshly-allocated page caused an LRU
926        // eviction. This shows up under heavy ingest as
927        // "B-tree insert error: Pager error: I/O error: failed to
928        // fill whole buffer" later, when something tries to read
929        // back the never-flushed page. Mirror `read_page`'s
930        // handling: write the evicted page through immediately so
931        // the on-disk image stays consistent.
932        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
933            let evicted_id = dirty_page.page_id();
934            self.write_page_raw(evicted_id, &dirty_page)?;
935        }
936        self.cache.mark_dirty(page_id);
937
938        Ok(page)
939    }
940
941    /// Free a page (return to freelist)
942    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
943        if self.config.read_only {
944            return Err(PagerError::ReadOnly);
945        }
946
947        // Remove from cache
948        self.cache.remove(page_id);
949
950        // Add to freelist
951        let mut freelist = self.freelist_write()?;
952        freelist.free(page_id);
953
954        *self.header_dirty_lock()? = true;
955
956        Ok(())
957    }
958
959    /// Get database header
960    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
961        Ok(self.header_read()?.clone())
962    }
963
964    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
965        Ok(self.header_read()?.physical)
966    }
967
968    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
969        if self.config.read_only {
970            return Err(PagerError::ReadOnly);
971        }
972
973        let mut header = self.header_write()?;
974        header.physical = physical;
975        *self.header_dirty_lock()? = true;
976        Ok(())
977    }
978
979    /// Get page count
980    pub fn page_count(&self) -> Result<u32, PagerError> {
981        Ok(self.header_read()?.page_count)
982    }
983
984    /// Attach a WAL writer to enforce WAL-first flush ordering.
985    ///
986    /// After this call, [`Pager::flush`] computes the maximum
987    /// `header.lsn` over all dirty pages and calls
988    /// `WalWriter::flush_until(max_lsn)` before any page is written
989    /// to the data file. This is the postgres rule: a page on disk
990    /// implies its WAL record is already durable on disk.
991    ///
992    /// Existing call sites that construct a Pager without a WAL
993    /// keep their previous behaviour (no LSN check) — wiring is
994    /// strictly opt-in.
995    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
996        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
997        *slot = Some(wal);
998    }
999
1000    /// Detach the WAL writer (test / shutdown path).
1001    pub fn clear_wal_writer(&self) {
1002        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1003        *slot = None;
1004    }
1005
1006    /// Has a WAL writer been attached?
1007    pub fn has_wal_writer(&self) -> bool {
1008        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1009    }
1010
1011    /// Flush all dirty pages to disk
1012    pub fn flush(&self) -> Result<(), PagerError> {
1013        if self.config.read_only {
1014            return Ok(());
1015        }
1016
1017        // Persist freelist to trunk pages when dirty
1018        let trunks = {
1019            let mut freelist = self.freelist_write()?;
1020            if freelist.is_dirty() {
1021                let mut header = self.header_write()?;
1022                let trunks = freelist.flush_to_trunks(0, || {
1023                    let id = header.page_count;
1024                    header.page_count += 1;
1025                    id
1026                });
1027                header.freelist_head = freelist.trunk_head();
1028                *self.header_dirty_lock()? = true;
1029                freelist.mark_clean();
1030                trunks
1031            } else {
1032                Vec::new()
1033            }
1034        };
1035
1036        for trunk in trunks {
1037            let page_id = trunk.page_id();
1038            self.cache.insert(page_id, trunk);
1039            self.cache.mark_dirty(page_id);
1040        }
1041
1042        // Flush dirty pages from cache (through DWB if enabled)
1043        let dirty_pages = self.cache.flush_dirty();
1044        if !dirty_pages.is_empty() {
1045            // WAL-FIRST: ensure every WAL record describing a dirty
1046            // page is durable BEFORE the page itself reaches disk.
1047            // Pages with `lsn == 0` are exempt (freelist trunks, header
1048            // shadow pages, anything not produced by a WAL append).
1049            let max_lsn = dirty_pages
1050                .iter()
1051                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1052                .max()
1053                .unwrap_or(0);
1054            if max_lsn > 0 {
1055                if let Ok(slot) = self.wal.read() {
1056                    if let Some(wal) = slot.as_ref() {
1057                        let wal = Arc::clone(wal);
1058                        // Drop the read lock before taking the WAL
1059                        // mutex so an unrelated reader cannot block
1060                        // the flush path.
1061                        drop(slot);
1062                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1063                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1064                    }
1065                }
1066            }
1067            self.write_pages_through_dwb(&dirty_pages)?;
1068        }
1069
1070        // Write header if dirty
1071        if *self.header_dirty_lock()? {
1072            self.write_header()?;
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Sync file to disk (fsync)
1079    pub fn sync(&self) -> Result<(), PagerError> {
1080        self.flush()?;
1081
1082        let file = self.file_lock()?;
1083        file.sync_all()?;
1084
1085        Ok(())
1086    }
1087
1088    /// Get cache statistics
1089    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1090        self.cache.stats()
1091    }
1092
1093    /// Count dirty pages currently in the page cache.
1094    pub fn dirty_page_count(&self) -> usize {
1095        self.cache.dirty_count()
1096    }
1097
1098    /// Estimated fraction of the page cache holding dirty pages.
1099    /// Returned in `[0, 1]`. Used by the background writer to
1100    /// decide when to kick in aggressive flushing.
1101    pub fn dirty_fraction(&self) -> f64 {
1102        let capacity = self.cache.capacity().max(1) as f64;
1103        self.cache.dirty_count() as f64 / capacity
1104    }
1105
1106    /// Flush up to `max` dirty pages from the cache. Returns the
1107    /// number actually written. Background-writer entry point —
1108    /// reuses the same persistence path as `flush()` but bounded.
1109    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1110        if self.config.read_only || max == 0 {
1111            return Ok(0);
1112        }
1113        let dirty_pages = self.cache.flush_some_dirty(max);
1114        if dirty_pages.is_empty() {
1115            return Ok(0);
1116        }
1117        let count = dirty_pages.len();
1118        // WAL-first: every cached dirty page carries an LSN that the
1119        // WAL must have already persisted. The full `flush()` path
1120        // enforces this with `wal.flush(max_lsn)`; here we simply
1121        // write through the pager — safe because callers only reach
1122        // this path via the bgwriter, which runs asynchronously
1123        // alongside normal commits that already respect WAL-first.
1124        for (page_id, page) in dirty_pages {
1125            self.write_page(page_id, page)?;
1126        }
1127        Ok(count)
1128    }
1129
1130    /// Get database file path
1131    pub fn path(&self) -> &Path {
1132        &self.path
1133    }
1134
1135    /// Check if database is read-only
1136    pub fn is_read_only(&self) -> bool {
1137        self.config.read_only
1138    }
1139
1140    /// Get file size in bytes
1141    pub fn file_size(&self) -> Result<u64, PagerError> {
1142        let file = self.file_lock()?;
1143        Ok(file.metadata()?.len())
1144    }
1145
1146    /// Issue an OS-level read-ahead hint for `page_id`.
1147    ///
1148    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1149    /// cursor passes 50% of the current leaf, so the kernel fetches
1150    /// the next leaf page while CPU processes the remaining half of
1151    /// the current one. Failures are silent — a missed prefetch is a
1152    /// performance miss, never a correctness bug.
1153    pub fn prefetch_hint(&self, page_id: u32) {
1154        if let Ok(file) = self.file_lock() {
1155            let _ = crate::storage::btree::prefetch::prefetch_page(
1156                &file,
1157                page_id as u64,
1158                PAGE_SIZE as u32,
1159            );
1160        }
1161    }
1162
1163    // ── Corruption defense helpers ──────────────────────────────────
1164
1165    /// Path for the header shadow file
1166    fn shadow_path(db_path: &Path) -> PathBuf {
1167        let mut p = db_path.to_path_buf().into_os_string();
1168        p.push("-hdr");
1169        PathBuf::from(p)
1170    }
1171
1172    /// Path for the metadata shadow file
1173    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1174        let mut p = db_path.to_path_buf().into_os_string();
1175        p.push("-meta");
1176        PathBuf::from(p)
1177    }
1178
1179    /// Path for the double-write buffer file
1180    fn dwb_path(db_path: &Path) -> PathBuf {
1181        let mut p = db_path.to_path_buf().into_os_string();
1182        p.push("-dwb");
1183        PathBuf::from(p)
1184    }
1185
1186    /// Open the double-write buffer file without truncating existing content.
1187    ///
1188    /// The file is intentionally preserved across restarts so recovery can
1189    /// consume any crash-leftover pages before the next write cycle clears it.
1190    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1191        Ok(OpenOptions::new()
1192            .read(true)
1193            .write(true)
1194            .create(true)
1195            .truncate(false)
1196            .open(Self::dwb_path(db_path))?)
1197    }
1198
1199    /// Clear the DWB in place while preserving the file path and handle.
1200    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1201        file.set_len(0)?;
1202        file.seek(SeekFrom::Start(0))?;
1203        file.sync_all()?;
1204        Ok(())
1205    }
1206
1207    /// Write a shadow copy of the header page to .rdb-hdr
1208    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1209        if self.config.read_only {
1210            return Ok(());
1211        }
1212        let shadow = Self::shadow_path(&self.path);
1213        let mut f = File::create(&shadow)?;
1214        f.write_all(page.as_bytes())?;
1215        f.sync_all()?;
1216        Ok(())
1217    }
1218
1219    /// Recover header from shadow file when page 0 is corrupted
1220    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1221        let shadow = Self::shadow_path(&self.path);
1222        if !shadow.exists() {
1223            return Err(PagerError::InvalidDatabase(
1224                "Page 0 corrupted and no header shadow found".into(),
1225            ));
1226        }
1227        let mut f = File::open(&shadow)?;
1228        let mut buf = [0u8; PAGE_SIZE];
1229        f.read_exact(&mut buf)?;
1230        let page = Page::from_bytes(buf);
1231
1232        // Verify shadow is valid
1233        let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1234        if magic != MAGIC_BYTES {
1235            return Err(PagerError::InvalidDatabase(
1236                "Header shadow also corrupted".into(),
1237            ));
1238        }
1239
1240        // Restore page 0 from shadow
1241        if !self.config.read_only {
1242            self.write_page_raw(0, &page)?;
1243            let file = self.file_lock()?;
1244            file.sync_all()?;
1245        }
1246
1247        Ok(page)
1248    }
1249
1250    /// Write a shadow copy of the metadata page to .rdb-meta.
1251    ///
1252    /// When the process-global `fold_pager_meta` policy is enabled (see
1253    /// [`crate::physical::fold_pager_meta_enabled`]) the shadow is suppressed:
1254    /// metadata is sourced exclusively from page 1 (plus its overflow chain).
1255    /// Any pre-existing `<data>-meta` file is also removed so a flipped flag
1256    /// cannot leave a stale shadow on disk. Reads still tolerate the sidecar
1257    /// when present so databases written before the flag flipped remain
1258    /// loadable.
1259    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1260        if self.config.read_only {
1261            return Ok(());
1262        }
1263        let shadow = Self::meta_shadow_path(&self.path);
1264        if crate::physical::fold_pager_meta_enabled() {
1265            // Best-effort cleanup of any prior shadow — a missing file is not
1266            // an error condition here.
1267            let _ = std::fs::remove_file(&shadow);
1268            return Ok(());
1269        }
1270        let mut f = File::create(&shadow)?;
1271        f.write_all(page.as_bytes())?;
1272        f.sync_all()?;
1273        Ok(())
1274    }
1275
1276    /// Recover metadata page from shadow file when page 1 is corrupted
1277    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1278        let shadow = Self::meta_shadow_path(&self.path);
1279        if !shadow.exists() {
1280            return Err(PagerError::InvalidDatabase(
1281                "Page 1 corrupted and no metadata shadow found".into(),
1282            ));
1283        }
1284        let mut f = File::open(&shadow)?;
1285        let mut buf = [0u8; PAGE_SIZE];
1286        f.read_exact(&mut buf)?;
1287        let page = Page::from_bytes(buf);
1288
1289        // Restore page 1 from shadow
1290        if !self.config.read_only {
1291            self.write_page_raw(1, &page)?;
1292            let file = self.file_lock()?;
1293            file.sync_all()?;
1294        }
1295
1296        Ok(page)
1297    }
1298
1299    /// Write pages through the double-write buffer for torn page protection.
1300    ///
1301    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1302    /// 2. fsync the DWB
1303    /// 3. Write all pages to their final locations in the .rdb file
1304    /// 4. Truncate the DWB (marks as consumed)
1305    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1306        if let Some(dwb_mutex) = &self.dwb_file {
1307            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1308
1309            // Build DWB content: [magic:4][count:u32][checksum:u32][pages...]
1310            // Each page entry: [page_id:u32][page_data:4096]
1311            let entry_size = 4 + PAGE_SIZE; // page_id + data
1312            let header_len = 4 + 4 + 4; // magic + count + checksum
1313            let total = header_len + pages.len() * entry_size;
1314            let mut buf = Vec::with_capacity(total);
1315
1316            // Header
1317            buf.extend_from_slice(&DWB_MAGIC);
1318            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1319            buf.extend_from_slice(&[0u8; 4]); // placeholder for checksum
1320
1321            // Page entries
1322            for (page_id, page) in pages {
1323                buf.extend_from_slice(&page_id.to_le_bytes());
1324                buf.extend_from_slice(page.as_bytes());
1325            }
1326
1327            // Compute and write checksum over all data after the header
1328            let checksum = super::super::crc32::crc32(&buf[header_len..]);
1329            buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1330
1331            // Write DWB and fsync
1332            dwb.seek(SeekFrom::Start(0))?;
1333            dwb.write_all(&buf)?;
1334            dwb.set_len(buf.len() as u64)?;
1335            dwb.sync_all()?;
1336
1337            // Now write pages to their final locations
1338            for (page_id, page) in pages {
1339                self.write_page_raw(*page_id, page)?;
1340            }
1341
1342            // Truncate DWB to mark as consumed
1343            Self::clear_dwb_file(&mut dwb)?;
1344
1345            Ok(())
1346        } else {
1347            // DWB disabled — write directly
1348            for (page_id, page) in pages {
1349                self.write_page_raw(*page_id, page)?;
1350            }
1351            Ok(())
1352        }
1353    }
1354
1355    /// Recover from double-write buffer after a crash.
1356    ///
1357    /// If the DWB file contains valid pages, they were written before the crash
1358    /// interrupted writing to the main file. Re-apply them.
1359    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1360        let dwb_path = Self::dwb_path(&self.path);
1361        if !dwb_path.exists() {
1362            return Ok(());
1363        }
1364
1365        if let Some(dwb_mutex) = &self.dwb_file {
1366            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1367            return self.recover_from_dwb_file(&mut file);
1368        }
1369
1370        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1371        self.recover_from_dwb_file(&mut file)
1372    }
1373
1374    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1375        file.seek(SeekFrom::Start(0))?;
1376        let len = file.metadata()?.len();
1377        if len < 12 {
1378            // Empty or incomplete header — keep the DWB file but clear stale bytes.
1379            return Self::clear_dwb_file(file);
1380        }
1381
1382        let mut buf = vec![0u8; len as usize];
1383        file.read_exact(&mut buf)?;
1384
1385        // Verify magic
1386        if buf[0..4] != DWB_MAGIC {
1387            // Not a valid DWB — clear it in place so the same file can be reused.
1388            return Self::clear_dwb_file(file);
1389        }
1390
1391        let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1392        let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1393
1394        let header_len = 12;
1395        let entry_size = 4 + PAGE_SIZE;
1396        let expected_len = header_len + count * entry_size;
1397
1398        if buf.len() < expected_len {
1399            // Incomplete DWB write — discard it in place.
1400            return Self::clear_dwb_file(file);
1401        }
1402
1403        // Verify checksum
1404        let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1405        if computed != stored_checksum {
1406            // Corrupted DWB — discard it in place.
1407            return Self::clear_dwb_file(file);
1408        }
1409
1410        // DWB is valid — re-apply pages to main file
1411        let mut offset = header_len;
1412        for _ in 0..count {
1413            let page_id = u32::from_le_bytes([
1414                buf[offset],
1415                buf[offset + 1],
1416                buf[offset + 2],
1417                buf[offset + 3],
1418            ]);
1419            offset += 4;
1420
1421            let mut page_data = [0u8; PAGE_SIZE];
1422            page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1423            offset += PAGE_SIZE;
1424
1425            let page = Page::from_bytes(page_data);
1426            self.write_page_raw(page_id, &page)?;
1427        }
1428
1429        // Sync and clean up
1430        {
1431            let file = self.file_lock()?;
1432            file.sync_all()?;
1433        }
1434
1435        Self::clear_dwb_file(file)
1436    }
1437
1438    /// Write header and sync to disk (public for checkpointer).
1439    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1440        self.write_header()?;
1441        let file = self.file_lock()?;
1442        file.sync_all()?;
1443        Ok(())
1444    }
1445
1446    /// Set the checkpoint_in_progress flag in the header.
1447    pub fn set_checkpoint_in_progress(
1448        &self,
1449        in_progress: bool,
1450        target_lsn: u64,
1451    ) -> Result<(), PagerError> {
1452        let mut header = self.header_write()?;
1453        header.checkpoint_in_progress = in_progress;
1454        header.checkpoint_target_lsn = target_lsn;
1455        *self.header_dirty_lock()? = true;
1456        drop(header);
1457        self.write_header_and_sync()
1458    }
1459
1460    /// Update the checkpoint LSN and clear the in-progress flag.
1461    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1462        let mut header = self.header_write()?;
1463        header.checkpoint_lsn = lsn;
1464        header.checkpoint_in_progress = false;
1465        header.checkpoint_target_lsn = 0;
1466        *self.header_dirty_lock()? = true;
1467        drop(header);
1468        self.write_header_and_sync()
1469    }
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474    use super::*;
1475
1476    fn temp_db_path(name: &str) -> PathBuf {
1477        std::env::temp_dir().join(format!(
1478            "reddb-pager-{}-{}-{}.rdb",
1479            name,
1480            std::process::id(),
1481            crate::utils::now_unix_nanos()
1482        ))
1483    }
1484
1485    #[test]
1486    fn open_refuses_future_database_version() {
1487        let path = temp_db_path("future-version");
1488        let pager = Pager::open_default(&path).unwrap();
1489        drop(pager);
1490
1491        let mut future_header = Page::new_header_page(1);
1492        future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1493            .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1494        future_header.update_checksum();
1495
1496        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1497        file.seek(SeekFrom::Start(0)).unwrap();
1498        file.write_all(future_header.as_bytes()).unwrap();
1499        file.sync_all().unwrap();
1500        drop(file);
1501
1502        let err = match Pager::open_default(&path) {
1503            Ok(_) => panic!("future database version should be rejected"),
1504            Err(err) => err,
1505        };
1506        match err {
1507            PagerError::InvalidDatabase(msg) => {
1508                assert!(msg.contains("newer than supported"));
1509            }
1510            other => panic!("expected InvalidDatabase, got {other:?}"),
1511        }
1512
1513        let _ = std::fs::remove_file(&path);
1514        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1515        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1516    }
1517}