Skip to main content

reddb_server/storage/engine/pager/
impl.rs

1use super::*;
2
3/// DWB file magic: "RDDW"
4const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7    /// Open or create a database file
8    pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9        let path = path.as_ref().to_path_buf();
10        let exists = path.exists();
11
12        if !exists && !config.create {
13            return Err(PagerError::InvalidDatabase(
14                "Database does not exist".into(),
15            ));
16        }
17
18        if !exists && config.read_only {
19            return Err(PagerError::InvalidDatabase(
20                "Cannot create read-only database".into(),
21            ));
22        }
23
24        // Open file
25        // Note: create requires write access, so disable it for read-only mode
26        let file = OpenOptions::new()
27            .read(true)
28            .write(!config.read_only)
29            .create(config.create && !config.read_only)
30            .open(&path)?;
31
32        // Acquire file lock (exclusive for writes, shared for read-only)
33        let lock_file = if !config.read_only {
34            let lf = OpenOptions::new().read(true).write(true).open(&path)?;
35            lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
36            Some(lf)
37        } else {
38            let lf = OpenOptions::new().read(true).open(&path)?;
39            match lf.try_lock_shared() {
40                Ok(_) => Some(lf),
41                Err(_) => None,
42            }
43        };
44
45        // Open double-write buffer file
46        let dwb_file = if config.double_write && !config.read_only {
47            let f = Self::open_dwb_file(&path)?;
48            Some(Mutex::new(f))
49        } else {
50            None
51        };
52
53        let mut pager = Self {
54            path,
55            file: Mutex::new(file),
56            _lock_file: lock_file,
57            dwb_file,
58            cache: PageCache::new(config.cache_size),
59            freelist: RwLock::new(FreeList::new()),
60            header: RwLock::new(DatabaseHeader::default()),
61            config,
62            header_dirty: Mutex::new(false),
63            wal: RwLock::new(None),
64            encryption: None,
65        };
66
67        if exists {
68            // Recover from double-write buffer if needed
69            pager.recover_from_dwb()?;
70            // Load existing database (with header shadow fallback)
71            pager.load_header()?;
72            pager.bind_encryption_for_existing()?;
73        } else {
74            // Initialize new database
75            pager.initialize()?;
76            pager.bind_encryption_for_new()?;
77        }
78
79        Ok(pager)
80    }
81
82    /// Inspect page 0 for the `RDBE` encryption marker, then resolve
83    /// the (key, marker) matrix:
84    ///
85    /// | Marker | Key supplied | Result                              |
86    /// |--------|--------------|-------------------------------------|
87    /// | yes    | yes          | Bind encryptor; validate key        |
88    /// | yes    | no           | `EncryptionRequired` (fail closed)  |
89    /// | no     | yes          | `PlainDatabaseRefusesKey`           |
90    /// | no     | no           | Plain pager — no binding needed     |
91    fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
92        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
93        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
94
95        if self.page_count().unwrap_or(0) == 0 {
96            return self.bind_encryption_for_new();
97        }
98        let header_page = self.read_page_no_checksum(0)?;
99        let data = header_page.as_bytes();
100        let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
101            && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
102
103        let key = self.config.encryption.clone();
104        match (has_marker, key) {
105            (true, Some(key)) => {
106                let header_start = ENCRYPTION_MARKER_OFFSET + 4;
107                let header =
108                    crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
109                        .map_err(|e| {
110                            PagerError::InvalidDatabase(format!(
111                                "encryption header parse failed: {e}"
112                            ))
113                        })?;
114                if !header.validate(&key) {
115                    return Err(PagerError::InvalidKey);
116                }
117                let encryptor = crate::storage::encryption::PageEncryptor::new(key);
118                self.encryption = Some((encryptor, header));
119                Ok(())
120            }
121            (true, None) => Err(PagerError::EncryptionRequired),
122            (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
123            (false, None) => Ok(()),
124        }
125    }
126
127    /// New DB: if a key is configured, write the marker + header to
128    /// page 0 so subsequent opens detect encryption.
129    fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
130        const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
131        const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
132
133        let Some(key) = self.config.encryption.clone() else {
134            return Ok(());
135        };
136        let header = crate::storage::encryption::EncryptionHeader::new(&key);
137        let encryptor = crate::storage::encryption::PageEncryptor::new(key);
138
139        // Stamp the marker + header into page 0 if it's been
140        // initialised by `initialize()` already.
141        if self.page_count().unwrap_or(0) > 0 {
142            let mut page = self.read_page_no_checksum(0)?;
143            let data = page.as_bytes_mut();
144            data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
145                .copy_from_slice(ENCRYPTION_MARKER);
146            let header_bytes = header.to_bytes();
147            let header_start = ENCRYPTION_MARKER_OFFSET + 4;
148            data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
149            self.write_page_no_checksum(0, page)?;
150        }
151        self.encryption = Some((encryptor, header));
152        Ok(())
153    }
154
155    /// Open with default configuration
156    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
157        Self::open(path, PagerConfig::default())
158    }
159
160    /// Initialize a new database
161    fn initialize(&self) -> Result<(), PagerError> {
162        if self.config.read_only {
163            return Err(PagerError::ReadOnly);
164        }
165
166        // Create header page. Page ids 1 and 2 are reserved so fixed
167        // metadata/vault pages cannot be handed out to normal B-tree
168        // allocation before those subsystems are initialized.
169        let initial_page_count = 3;
170        let header_page = Page::new_header_page(initial_page_count);
171        self.header_write()?.page_count = initial_page_count;
172
173        // Write header and reserved pages so any scan over 0..page_count
174        // can read every allocated page in a brand-new database.
175        self.write_page_raw(0, &header_page)?;
176        let mut metadata_page = Page::new(PageType::Header, 1);
177        metadata_page.update_checksum();
178        self.write_page_raw(1, &metadata_page)?;
179        let mut vault_page = Page::new(PageType::Vault, 2);
180        vault_page.update_checksum();
181        self.write_page_raw(2, &vault_page)?;
182
183        // Sync to disk
184        self.sync()?;
185
186        Ok(())
187    }
188
189    /// Acquire a write lock on the header RwLock, mapping poison errors.
190    fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
191        self.header.write().map_err(|_| PagerError::LockPoisoned)
192    }
193
194    /// Acquire a read lock on the header RwLock, mapping poison errors.
195    fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
196        self.header.read().map_err(|_| PagerError::LockPoisoned)
197    }
198
199    /// Acquire a write lock on the freelist RwLock, mapping poison errors.
200    fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
201        self.freelist.write().map_err(|_| PagerError::LockPoisoned)
202    }
203
204    /// Acquire a lock on the file Mutex, mapping poison errors.
205    fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
206        self.file.lock().map_err(|_| PagerError::LockPoisoned)
207    }
208
209    /// Acquire a lock on the header_dirty Mutex, mapping poison errors.
210    fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
211        self.header_dirty
212            .lock()
213            .map_err(|_| PagerError::LockPoisoned)
214    }
215
216    /// Load database header from page 0 (with shadow fallback)
217    fn load_header(&self) -> Result<(), PagerError> {
218        // Read page 0 — fall back to shadow if corrupted
219        let header_page = match self.read_page_raw(0) {
220            Ok(page) => {
221                // Verify magic bytes
222                let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
223                if magic == MAGIC_BYTES {
224                    page
225                } else {
226                    // Page 0 corrupted — try shadow
227                    self.recover_header_from_shadow()?
228                }
229            }
230            Err(_) => self.recover_header_from_shadow()?,
231        };
232
233        // Read header fields
234        let data = header_page.as_bytes();
235        let version = u32::from_le_bytes([
236            data[HEADER_SIZE + 4],
237            data[HEADER_SIZE + 5],
238            data[HEADER_SIZE + 6],
239            data[HEADER_SIZE + 7],
240        ]);
241
242        let page_size = u32::from_le_bytes([
243            data[HEADER_SIZE + 8],
244            data[HEADER_SIZE + 9],
245            data[HEADER_SIZE + 10],
246            data[HEADER_SIZE + 11],
247        ]);
248
249        if page_size != PAGE_SIZE as u32 {
250            return Err(PagerError::InvalidDatabase(format!(
251                "Unsupported page size: {}",
252                page_size
253            )));
254        }
255        if version > DB_VERSION {
256            return Err(PagerError::InvalidDatabase(format!(
257                "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
258            )));
259        }
260
261        let page_count = u32::from_le_bytes([
262            data[HEADER_SIZE + 12],
263            data[HEADER_SIZE + 13],
264            data[HEADER_SIZE + 14],
265            data[HEADER_SIZE + 15],
266        ]);
267
268        let freelist_head = u32::from_le_bytes([
269            data[HEADER_SIZE + 16],
270            data[HEADER_SIZE + 17],
271            data[HEADER_SIZE + 18],
272            data[HEADER_SIZE + 19],
273        ]);
274
275        let schema_version = u32::from_le_bytes([
276            data[HEADER_SIZE + 20],
277            data[HEADER_SIZE + 21],
278            data[HEADER_SIZE + 22],
279            data[HEADER_SIZE + 23],
280        ]);
281
282        let checkpoint_lsn = u64::from_le_bytes([
283            data[HEADER_SIZE + 24],
284            data[HEADER_SIZE + 25],
285            data[HEADER_SIZE + 26],
286            data[HEADER_SIZE + 27],
287            data[HEADER_SIZE + 28],
288            data[HEADER_SIZE + 29],
289            data[HEADER_SIZE + 30],
290            data[HEADER_SIZE + 31],
291        ]);
292        let physical_format_version = u32::from_le_bytes([
293            data[HEADER_SIZE + 32],
294            data[HEADER_SIZE + 33],
295            data[HEADER_SIZE + 34],
296            data[HEADER_SIZE + 35],
297        ]);
298        let physical_sequence = u64::from_le_bytes([
299            data[HEADER_SIZE + 36],
300            data[HEADER_SIZE + 37],
301            data[HEADER_SIZE + 38],
302            data[HEADER_SIZE + 39],
303            data[HEADER_SIZE + 40],
304            data[HEADER_SIZE + 41],
305            data[HEADER_SIZE + 42],
306            data[HEADER_SIZE + 43],
307        ]);
308        let manifest_root = u64::from_le_bytes([
309            data[HEADER_SIZE + 44],
310            data[HEADER_SIZE + 45],
311            data[HEADER_SIZE + 46],
312            data[HEADER_SIZE + 47],
313            data[HEADER_SIZE + 48],
314            data[HEADER_SIZE + 49],
315            data[HEADER_SIZE + 50],
316            data[HEADER_SIZE + 51],
317        ]);
318        let manifest_oldest_root = u64::from_le_bytes([
319            data[HEADER_SIZE + 52],
320            data[HEADER_SIZE + 53],
321            data[HEADER_SIZE + 54],
322            data[HEADER_SIZE + 55],
323            data[HEADER_SIZE + 56],
324            data[HEADER_SIZE + 57],
325            data[HEADER_SIZE + 58],
326            data[HEADER_SIZE + 59],
327        ]);
328        let free_set_root = u64::from_le_bytes([
329            data[HEADER_SIZE + 60],
330            data[HEADER_SIZE + 61],
331            data[HEADER_SIZE + 62],
332            data[HEADER_SIZE + 63],
333            data[HEADER_SIZE + 64],
334            data[HEADER_SIZE + 65],
335            data[HEADER_SIZE + 66],
336            data[HEADER_SIZE + 67],
337        ]);
338        let manifest_page = u32::from_le_bytes([
339            data[HEADER_SIZE + 68],
340            data[HEADER_SIZE + 69],
341            data[HEADER_SIZE + 70],
342            data[HEADER_SIZE + 71],
343        ]);
344        let manifest_checksum = u64::from_le_bytes([
345            data[HEADER_SIZE + 72],
346            data[HEADER_SIZE + 73],
347            data[HEADER_SIZE + 74],
348            data[HEADER_SIZE + 75],
349            data[HEADER_SIZE + 76],
350            data[HEADER_SIZE + 77],
351            data[HEADER_SIZE + 78],
352            data[HEADER_SIZE + 79],
353        ]);
354        let collection_roots_page = u32::from_le_bytes([
355            data[HEADER_SIZE + 80],
356            data[HEADER_SIZE + 81],
357            data[HEADER_SIZE + 82],
358            data[HEADER_SIZE + 83],
359        ]);
360        let collection_roots_checksum = u64::from_le_bytes([
361            data[HEADER_SIZE + 84],
362            data[HEADER_SIZE + 85],
363            data[HEADER_SIZE + 86],
364            data[HEADER_SIZE + 87],
365            data[HEADER_SIZE + 88],
366            data[HEADER_SIZE + 89],
367            data[HEADER_SIZE + 90],
368            data[HEADER_SIZE + 91],
369        ]);
370        let collection_root_count = u32::from_le_bytes([
371            data[HEADER_SIZE + 92],
372            data[HEADER_SIZE + 93],
373            data[HEADER_SIZE + 94],
374            data[HEADER_SIZE + 95],
375        ]);
376        let snapshot_count = u32::from_le_bytes([
377            data[HEADER_SIZE + 96],
378            data[HEADER_SIZE + 97],
379            data[HEADER_SIZE + 98],
380            data[HEADER_SIZE + 99],
381        ]);
382        let index_count = u32::from_le_bytes([
383            data[HEADER_SIZE + 100],
384            data[HEADER_SIZE + 101],
385            data[HEADER_SIZE + 102],
386            data[HEADER_SIZE + 103],
387        ]);
388        let catalog_collection_count = u32::from_le_bytes([
389            data[HEADER_SIZE + 104],
390            data[HEADER_SIZE + 105],
391            data[HEADER_SIZE + 106],
392            data[HEADER_SIZE + 107],
393        ]);
394        let catalog_total_entities = u64::from_le_bytes([
395            data[HEADER_SIZE + 108],
396            data[HEADER_SIZE + 109],
397            data[HEADER_SIZE + 110],
398            data[HEADER_SIZE + 111],
399            data[HEADER_SIZE + 112],
400            data[HEADER_SIZE + 113],
401            data[HEADER_SIZE + 114],
402            data[HEADER_SIZE + 115],
403        ]);
404        let export_count = u32::from_le_bytes([
405            data[HEADER_SIZE + 116],
406            data[HEADER_SIZE + 117],
407            data[HEADER_SIZE + 118],
408            data[HEADER_SIZE + 119],
409        ]);
410        let graph_projection_count = u32::from_le_bytes([
411            data[HEADER_SIZE + 120],
412            data[HEADER_SIZE + 121],
413            data[HEADER_SIZE + 122],
414            data[HEADER_SIZE + 123],
415        ]);
416        let analytics_job_count = u32::from_le_bytes([
417            data[HEADER_SIZE + 124],
418            data[HEADER_SIZE + 125],
419            data[HEADER_SIZE + 126],
420            data[HEADER_SIZE + 127],
421        ]);
422        let manifest_event_count = u32::from_le_bytes([
423            data[HEADER_SIZE + 128],
424            data[HEADER_SIZE + 129],
425            data[HEADER_SIZE + 130],
426            data[HEADER_SIZE + 131],
427        ]);
428        let registry_page = u32::from_le_bytes([
429            data[HEADER_SIZE + 132],
430            data[HEADER_SIZE + 133],
431            data[HEADER_SIZE + 134],
432            data[HEADER_SIZE + 135],
433        ]);
434        let registry_checksum = u64::from_le_bytes([
435            data[HEADER_SIZE + 136],
436            data[HEADER_SIZE + 137],
437            data[HEADER_SIZE + 138],
438            data[HEADER_SIZE + 139],
439            data[HEADER_SIZE + 140],
440            data[HEADER_SIZE + 141],
441            data[HEADER_SIZE + 142],
442            data[HEADER_SIZE + 143],
443        ]);
444        let recovery_page = u32::from_le_bytes([
445            data[HEADER_SIZE + 144],
446            data[HEADER_SIZE + 145],
447            data[HEADER_SIZE + 146],
448            data[HEADER_SIZE + 147],
449        ]);
450        let recovery_checksum = u64::from_le_bytes([
451            data[HEADER_SIZE + 148],
452            data[HEADER_SIZE + 149],
453            data[HEADER_SIZE + 150],
454            data[HEADER_SIZE + 151],
455            data[HEADER_SIZE + 152],
456            data[HEADER_SIZE + 153],
457            data[HEADER_SIZE + 154],
458            data[HEADER_SIZE + 155],
459        ]);
460        let catalog_page = u32::from_le_bytes([
461            data[HEADER_SIZE + 156],
462            data[HEADER_SIZE + 157],
463            data[HEADER_SIZE + 158],
464            data[HEADER_SIZE + 159],
465        ]);
466        let catalog_checksum = u64::from_le_bytes([
467            data[HEADER_SIZE + 160],
468            data[HEADER_SIZE + 161],
469            data[HEADER_SIZE + 162],
470            data[HEADER_SIZE + 163],
471            data[HEADER_SIZE + 164],
472            data[HEADER_SIZE + 165],
473            data[HEADER_SIZE + 166],
474            data[HEADER_SIZE + 167],
475        ]);
476        let metadata_state_page = u32::from_le_bytes([
477            data[HEADER_SIZE + 168],
478            data[HEADER_SIZE + 169],
479            data[HEADER_SIZE + 170],
480            data[HEADER_SIZE + 171],
481        ]);
482        let metadata_state_checksum = u64::from_le_bytes([
483            data[HEADER_SIZE + 172],
484            data[HEADER_SIZE + 173],
485            data[HEADER_SIZE + 174],
486            data[HEADER_SIZE + 175],
487            data[HEADER_SIZE + 176],
488            data[HEADER_SIZE + 177],
489            data[HEADER_SIZE + 178],
490            data[HEADER_SIZE + 179],
491        ]);
492        let vector_artifact_page = u32::from_le_bytes([
493            data[HEADER_SIZE + 180],
494            data[HEADER_SIZE + 181],
495            data[HEADER_SIZE + 182],
496            data[HEADER_SIZE + 183],
497        ]);
498        let vector_artifact_checksum = u64::from_le_bytes([
499            data[HEADER_SIZE + 184],
500            data[HEADER_SIZE + 185],
501            data[HEADER_SIZE + 186],
502            data[HEADER_SIZE + 187],
503            data[HEADER_SIZE + 188],
504            data[HEADER_SIZE + 189],
505            data[HEADER_SIZE + 190],
506            data[HEADER_SIZE + 191],
507        ]);
508
509        // Two-phase checkpoint fields (offset 192-200)
510        let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
511        let checkpoint_target_lsn = u64::from_le_bytes([
512            data[HEADER_SIZE + 193],
513            data[HEADER_SIZE + 194],
514            data[HEADER_SIZE + 195],
515            data[HEADER_SIZE + 196],
516            data[HEADER_SIZE + 197],
517            data[HEADER_SIZE + 198],
518            data[HEADER_SIZE + 199],
519            data[HEADER_SIZE + 200],
520        ]);
521
522        // Update header
523        {
524            let mut header = self.header_write()?;
525            header.version = version;
526            header.page_size = page_size;
527            header.page_count = page_count;
528            header.freelist_head = freelist_head;
529            header.schema_version = schema_version;
530            header.checkpoint_lsn = checkpoint_lsn;
531            header.checkpoint_in_progress = checkpoint_in_progress;
532            header.checkpoint_target_lsn = checkpoint_target_lsn;
533            header.physical = PhysicalFileHeader {
534                format_version: physical_format_version,
535                sequence: physical_sequence,
536                manifest_oldest_root,
537                manifest_root,
538                free_set_root,
539                manifest_page,
540                manifest_checksum,
541                collection_roots_page,
542                collection_roots_checksum,
543                collection_root_count,
544                snapshot_count,
545                index_count,
546                catalog_collection_count,
547                catalog_total_entities,
548                export_count,
549                graph_projection_count,
550                analytics_job_count,
551                manifest_event_count,
552                registry_page,
553                registry_checksum,
554                recovery_page,
555                recovery_checksum,
556                catalog_page,
557                catalog_checksum,
558                metadata_state_page,
559                metadata_state_checksum,
560                vector_artifact_page,
561                vector_artifact_checksum,
562            };
563        }
564
565        // Initialize freelist
566        {
567            let mut freelist = self.freelist_write()?;
568            *freelist = FreeList::from_header(freelist_head, 0);
569        }
570
571        Ok(())
572    }
573
574    /// Write header page
575    ///
576    /// Note: This merges database header fields into the existing page 0 content
577    /// to preserve any additional data (like encryption headers) that may be stored there.
578    fn write_header(&self) -> Result<(), PagerError> {
579        if self.config.read_only {
580            return Err(PagerError::ReadOnly);
581        }
582
583        let header = self.header_read()?;
584
585        // Read existing page 0 to preserve any additional data (e.g., encryption header)
586        // First check cache, then fall back to disk
587        let mut page = if let Some(cached) = self.cache.get(0) {
588            cached
589        } else {
590            // Try to read from disk if file is large enough
591            let file = self.file_lock()?;
592            let len = file.metadata().map(|m| m.len()).unwrap_or(0);
593            drop(file);
594
595            if len >= PAGE_SIZE as u64 {
596                self.read_page_raw(0)?
597            } else {
598                // File is new/empty, create fresh header page
599                Page::new(PageType::Header, 0)
600            }
601        };
602
603        let data = page.as_bytes_mut();
604
605        // Write magic
606        data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
607
608        // Write fields (at fixed offsets in the DB header area)
609        data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
610        data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
611        data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
612        data[HEADER_SIZE + 16..HEADER_SIZE + 20]
613            .copy_from_slice(&header.freelist_head.to_le_bytes());
614        data[HEADER_SIZE + 20..HEADER_SIZE + 24]
615            .copy_from_slice(&header.schema_version.to_le_bytes());
616        data[HEADER_SIZE + 24..HEADER_SIZE + 32]
617            .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
618        data[HEADER_SIZE + 32..HEADER_SIZE + 36]
619            .copy_from_slice(&header.physical.format_version.to_le_bytes());
620        data[HEADER_SIZE + 36..HEADER_SIZE + 44]
621            .copy_from_slice(&header.physical.sequence.to_le_bytes());
622        data[HEADER_SIZE + 44..HEADER_SIZE + 52]
623            .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
624        data[HEADER_SIZE + 52..HEADER_SIZE + 60]
625            .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
626        data[HEADER_SIZE + 60..HEADER_SIZE + 68]
627            .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
628        data[HEADER_SIZE + 68..HEADER_SIZE + 72]
629            .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
630        data[HEADER_SIZE + 72..HEADER_SIZE + 80]
631            .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
632        data[HEADER_SIZE + 80..HEADER_SIZE + 84]
633            .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
634        data[HEADER_SIZE + 84..HEADER_SIZE + 92]
635            .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
636        data[HEADER_SIZE + 92..HEADER_SIZE + 96]
637            .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
638        data[HEADER_SIZE + 96..HEADER_SIZE + 100]
639            .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
640        data[HEADER_SIZE + 100..HEADER_SIZE + 104]
641            .copy_from_slice(&header.physical.index_count.to_le_bytes());
642        data[HEADER_SIZE + 104..HEADER_SIZE + 108]
643            .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
644        data[HEADER_SIZE + 108..HEADER_SIZE + 116]
645            .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
646        data[HEADER_SIZE + 116..HEADER_SIZE + 120]
647            .copy_from_slice(&header.physical.export_count.to_le_bytes());
648        data[HEADER_SIZE + 120..HEADER_SIZE + 124]
649            .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
650        data[HEADER_SIZE + 124..HEADER_SIZE + 128]
651            .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
652        data[HEADER_SIZE + 128..HEADER_SIZE + 132]
653            .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
654        data[HEADER_SIZE + 132..HEADER_SIZE + 136]
655            .copy_from_slice(&header.physical.registry_page.to_le_bytes());
656        data[HEADER_SIZE + 136..HEADER_SIZE + 144]
657            .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
658        data[HEADER_SIZE + 144..HEADER_SIZE + 148]
659            .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
660        data[HEADER_SIZE + 148..HEADER_SIZE + 156]
661            .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
662        data[HEADER_SIZE + 156..HEADER_SIZE + 160]
663            .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
664        data[HEADER_SIZE + 160..HEADER_SIZE + 168]
665            .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
666        data[HEADER_SIZE + 168..HEADER_SIZE + 172]
667            .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
668        data[HEADER_SIZE + 172..HEADER_SIZE + 180]
669            .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
670        data[HEADER_SIZE + 180..HEADER_SIZE + 184]
671            .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
672        data[HEADER_SIZE + 184..HEADER_SIZE + 192]
673            .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
674
675        // Two-phase checkpoint fields (offset 192-200)
676        data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
677        data[HEADER_SIZE + 193..HEADER_SIZE + 201]
678            .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
679
680        page.update_checksum();
681
682        // Write header shadow FIRST (so it's intact if main write is interrupted)
683        self.write_header_shadow(&page)?;
684
685        self.write_page_raw(0, &page)?;
686        *self.header_dirty_lock()? = false;
687
688        Ok(())
689    }
690
691    /// Read a page from disk (bypassing cache)
692    fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
693        let mut file = self.file_lock()?;
694        let offset = (page_id as u64) * (PAGE_SIZE as u64);
695
696        file.seek(SeekFrom::Start(offset))?;
697
698        let mut buf = [0u8; PAGE_SIZE];
699        file.read_exact(&mut buf)?;
700
701        let page = Page::from_bytes(buf);
702
703        // Verify checksum if configured (including page 0)
704        if self.config.verify_checksums {
705            page.verify_checksum()?;
706        }
707
708        Ok(page)
709    }
710
711    /// Write a page to disk (bypassing cache)
712    fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
713        if self.config.read_only {
714            return Err(PagerError::ReadOnly);
715        }
716
717        let mut file = self.file_lock()?;
718        let offset = (page_id as u64) * (PAGE_SIZE as u64);
719
720        file.seek(SeekFrom::Start(offset))?;
721        file.write_all(page.as_bytes())?;
722
723        Ok(())
724    }
725
726    /// Read a page (cache-aware)
727    pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
728        // Check cache first
729        if let Some(page) = self.cache.get(page_id) {
730            return Ok(page);
731        }
732
733        // Cache miss - read from disk
734        let page = self.read_page_raw(page_id)?;
735
736        // Add to cache
737        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
738            // Evicted page was dirty, need to write it back
739            let evicted_id = dirty_page.page_id();
740            self.write_page_raw(evicted_id, &dirty_page)?;
741        }
742
743        Ok(page)
744    }
745
746    /// Read a page without verifying checksum (for encrypted pages)
747    ///
748    /// Use this when the page content has its own integrity protection
749    /// (e.g., AES-GCM authentication tag for encrypted pages).
750    pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
751        // Check cache first
752        if let Some(page) = self.cache.get(page_id) {
753            return Ok(page);
754        }
755
756        // Cache miss - read from disk (skip checksum verification)
757        let mut file = self.file_lock()?;
758        let offset = (page_id as u64) * (PAGE_SIZE as u64);
759
760        file.seek(SeekFrom::Start(offset))?;
761
762        let mut buf = [0u8; PAGE_SIZE];
763        file.read_exact(&mut buf)?;
764        drop(file);
765
766        let page = Page::from_bytes(buf);
767
768        // Add to cache (no checksum verification)
769        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
770            // Evicted page was dirty, need to write it back
771            let evicted_id = dirty_page.page_id();
772            self.write_page_raw(evicted_id, &dirty_page)?;
773        }
774
775        Ok(page)
776    }
777
778    /// Write a page (cache-aware)
779    pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
780        if self.config.read_only {
781            return Err(PagerError::ReadOnly);
782        }
783
784        // Update checksum
785        page.update_checksum();
786
787        // Add to cache and mark dirty. If the cache had to evict a
788        // dirty entry, write it through immediately — the evicted
789        // page will never be flushed otherwise (same bug fixed in
790        // `allocate_page`).
791        if let Some(dirty_page) = self.cache.insert(page_id, page) {
792            let evicted_id = dirty_page.page_id();
793            self.write_page_raw(evicted_id, &dirty_page)?;
794        }
795        self.cache.mark_dirty(page_id);
796
797        Ok(())
798    }
799
800    /// Read a page through the configured encryptor if any. Page 0
801    /// is always returned plaintext (it carries the encryption marker
802    /// + header). Callers that want raw cipher bytes can use
803    ///   `read_page_no_checksum` directly.
804    pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
805        if page_id == 0 || self.encryption.is_none() {
806            return self.read_page(page_id);
807        }
808        let raw = self.read_page_no_checksum(page_id)?;
809        let (enc, _) = self
810            .encryption
811            .as_ref()
812            .expect("encryption presence checked above");
813        let plaintext = enc
814            .decrypt(page_id, raw.as_bytes())
815            .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
816        let mut buf = [0u8; PAGE_SIZE];
817        let n = plaintext.len().min(PAGE_SIZE);
818        buf[..n].copy_from_slice(&plaintext[..n]);
819        Ok(Page::from_bytes(buf))
820    }
821
822    /// Write a page through the configured encryptor if any. Page 0
823    /// bypasses encryption and goes through the normal checksummed
824    /// path. Encrypted pages skip the checksum update because
825    /// AES-GCM's authentication tag is the integrity guarantee.
826    pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
827        if page_id == 0 || self.encryption.is_none() {
828            return self.write_page(page_id, page);
829        }
830        const OVERHEAD: usize = 12 + 16; // nonce + GCM tag
831        let plaintext_len = PAGE_SIZE - OVERHEAD;
832        let plaintext = &page.as_bytes()[..plaintext_len];
833        let (enc, _) = self
834            .encryption
835            .as_ref()
836            .expect("encryption presence checked above");
837        let ciphertext = enc.encrypt(page_id, plaintext);
838        debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
839        let mut buf = [0u8; PAGE_SIZE];
840        buf.copy_from_slice(&ciphertext);
841        let cipher_page = Page::from_bytes(buf);
842        self.write_page_no_checksum(page_id, cipher_page)
843    }
844
845    /// Write a page without updating checksum (for encrypted pages)
846    ///
847    /// Use this when the page content has its own integrity protection
848    /// (e.g., AES-GCM authentication tag for encrypted pages).
849    pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
850        if self.config.read_only {
851            return Err(PagerError::ReadOnly);
852        }
853
854        // Add to cache and mark dirty (no checksum update). Same
855        // eviction-write-through guard as `write_page`.
856        if let Some(dirty_page) = self.cache.insert(page_id, page) {
857            let evicted_id = dirty_page.page_id();
858            self.write_page_raw(evicted_id, &dirty_page)?;
859        }
860        self.cache.mark_dirty(page_id);
861
862        Ok(())
863    }
864
865    /// Allocate a new page
866    pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
867        if self.config.read_only {
868            return Err(PagerError::ReadOnly);
869        }
870
871        // Try to get from freelist first
872        let page_id = {
873            let mut freelist = self.freelist_write()?;
874            if let Some(id) = freelist.allocate() {
875                id
876            } else if freelist.trunk_head() != 0 {
877                let trunk_id = freelist.trunk_head();
878                drop(freelist);
879
880                let trunk = self.read_page(trunk_id).map_err(|e| match e {
881                    PagerError::PageNotFound(_) => {
882                        PagerError::InvalidDatabase("Freelist trunk missing".to_string())
883                    }
884                    other => other,
885                })?;
886
887                let mut freelist = self.freelist_write()?;
888                freelist
889                    .load_from_trunk(&trunk)
890                    .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
891                let id = freelist.allocate().ok_or_else(|| {
892                    PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
893                })?;
894
895                let mut header = self.header_write()?;
896                header.freelist_head = freelist.trunk_head();
897                *self.header_dirty_lock()? = true;
898
899                id
900            } else {
901                // No free pages, extend file
902                let mut header = self.header_write()?;
903                let id = header.page_count;
904                header.page_count += 1;
905                *self.header_dirty_lock()? = true;
906                id
907            }
908        };
909
910        let page = Page::new(page_type, page_id);
911
912        // Write to cache. The evicted page (if any) is dirty by
913        // definition — `cache.insert` only returns `Some` when it
914        // had to evict a dirty entry to make room. The previous
915        // version dropped that return value, which silently lost
916        // writes whenever a freshly-allocated page caused an LRU
917        // eviction. This shows up under heavy ingest as
918        // "B-tree insert error: Pager error: I/O error: failed to
919        // fill whole buffer" later, when something tries to read
920        // back the never-flushed page. Mirror `read_page`'s
921        // handling: write the evicted page through immediately so
922        // the on-disk image stays consistent.
923        if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
924            let evicted_id = dirty_page.page_id();
925            self.write_page_raw(evicted_id, &dirty_page)?;
926        }
927        self.cache.mark_dirty(page_id);
928
929        Ok(page)
930    }
931
932    /// Free a page (return to freelist)
933    pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
934        if self.config.read_only {
935            return Err(PagerError::ReadOnly);
936        }
937
938        // Remove from cache
939        self.cache.remove(page_id);
940
941        // Add to freelist
942        let mut freelist = self.freelist_write()?;
943        freelist.free(page_id);
944
945        *self.header_dirty_lock()? = true;
946
947        Ok(())
948    }
949
950    /// Get database header
951    pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
952        Ok(self.header_read()?.clone())
953    }
954
955    pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
956        Ok(self.header_read()?.physical)
957    }
958
959    pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
960        if self.config.read_only {
961            return Err(PagerError::ReadOnly);
962        }
963
964        let mut header = self.header_write()?;
965        header.physical = physical;
966        *self.header_dirty_lock()? = true;
967        Ok(())
968    }
969
970    /// Get page count
971    pub fn page_count(&self) -> Result<u32, PagerError> {
972        Ok(self.header_read()?.page_count)
973    }
974
975    /// Attach a WAL writer to enforce WAL-first flush ordering.
976    ///
977    /// After this call, [`Pager::flush`] computes the maximum
978    /// `header.lsn` over all dirty pages and calls
979    /// `WalWriter::flush_until(max_lsn)` before any page is written
980    /// to the data file. This is the postgres rule: a page on disk
981    /// implies its WAL record is already durable on disk.
982    ///
983    /// Existing call sites that construct a Pager without a WAL
984    /// keep their previous behaviour (no LSN check) — wiring is
985    /// strictly opt-in.
986    pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
987        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
988        *slot = Some(wal);
989    }
990
991    /// Detach the WAL writer (test / shutdown path).
992    pub fn clear_wal_writer(&self) {
993        let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
994        *slot = None;
995    }
996
997    /// Has a WAL writer been attached?
998    pub fn has_wal_writer(&self) -> bool {
999        self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1000    }
1001
1002    /// Flush all dirty pages to disk
1003    pub fn flush(&self) -> Result<(), PagerError> {
1004        if self.config.read_only {
1005            return Ok(());
1006        }
1007
1008        // Persist freelist to trunk pages when dirty
1009        let trunks = {
1010            let mut freelist = self.freelist_write()?;
1011            if freelist.is_dirty() {
1012                let mut header = self.header_write()?;
1013                let trunks = freelist.flush_to_trunks(0, || {
1014                    let id = header.page_count;
1015                    header.page_count += 1;
1016                    id
1017                });
1018                header.freelist_head = freelist.trunk_head();
1019                *self.header_dirty_lock()? = true;
1020                freelist.mark_clean();
1021                trunks
1022            } else {
1023                Vec::new()
1024            }
1025        };
1026
1027        for trunk in trunks {
1028            let page_id = trunk.page_id();
1029            self.cache.insert(page_id, trunk);
1030            self.cache.mark_dirty(page_id);
1031        }
1032
1033        // Flush dirty pages from cache (through DWB if enabled)
1034        let dirty_pages = self.cache.flush_dirty();
1035        if !dirty_pages.is_empty() {
1036            // WAL-FIRST: ensure every WAL record describing a dirty
1037            // page is durable BEFORE the page itself reaches disk.
1038            // Pages with `lsn == 0` are exempt (freelist trunks, header
1039            // shadow pages, anything not produced by a WAL append).
1040            let max_lsn = dirty_pages
1041                .iter()
1042                .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1043                .max()
1044                .unwrap_or(0);
1045            if max_lsn > 0 {
1046                if let Ok(slot) = self.wal.read() {
1047                    if let Some(wal) = slot.as_ref() {
1048                        let wal = Arc::clone(wal);
1049                        // Drop the read lock before taking the WAL
1050                        // mutex so an unrelated reader cannot block
1051                        // the flush path.
1052                        drop(slot);
1053                        let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1054                        wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1055                    }
1056                }
1057            }
1058            self.write_pages_through_dwb(&dirty_pages)?;
1059        }
1060
1061        // Write header if dirty
1062        if *self.header_dirty_lock()? {
1063            self.write_header()?;
1064        }
1065
1066        Ok(())
1067    }
1068
1069    /// Sync file to disk (fsync)
1070    pub fn sync(&self) -> Result<(), PagerError> {
1071        self.flush()?;
1072
1073        let file = self.file_lock()?;
1074        file.sync_all()?;
1075
1076        Ok(())
1077    }
1078
1079    /// Get cache statistics
1080    pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1081        self.cache.stats()
1082    }
1083
1084    /// Count dirty pages currently in the page cache.
1085    pub fn dirty_page_count(&self) -> usize {
1086        self.cache.dirty_count()
1087    }
1088
1089    /// Estimated fraction of the page cache holding dirty pages.
1090    /// Returned in `[0, 1]`. Used by the background writer to
1091    /// decide when to kick in aggressive flushing.
1092    pub fn dirty_fraction(&self) -> f64 {
1093        let capacity = self.cache.capacity().max(1) as f64;
1094        self.cache.dirty_count() as f64 / capacity
1095    }
1096
1097    /// Flush up to `max` dirty pages from the cache. Returns the
1098    /// number actually written. Background-writer entry point —
1099    /// reuses the same persistence path as `flush()` but bounded.
1100    pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1101        if self.config.read_only || max == 0 {
1102            return Ok(0);
1103        }
1104        let dirty_pages = self.cache.flush_some_dirty(max);
1105        if dirty_pages.is_empty() {
1106            return Ok(0);
1107        }
1108        let count = dirty_pages.len();
1109        // WAL-first: every cached dirty page carries an LSN that the
1110        // WAL must have already persisted. The full `flush()` path
1111        // enforces this with `wal.flush(max_lsn)`; here we simply
1112        // write through the pager — safe because callers only reach
1113        // this path via the bgwriter, which runs asynchronously
1114        // alongside normal commits that already respect WAL-first.
1115        for (page_id, page) in dirty_pages {
1116            self.write_page(page_id, page)?;
1117        }
1118        Ok(count)
1119    }
1120
1121    /// Get database file path
1122    pub fn path(&self) -> &Path {
1123        &self.path
1124    }
1125
1126    /// Check if database is read-only
1127    pub fn is_read_only(&self) -> bool {
1128        self.config.read_only
1129    }
1130
1131    /// Get file size in bytes
1132    pub fn file_size(&self) -> Result<u64, PagerError> {
1133        let file = self.file_lock()?;
1134        Ok(file.metadata()?.len())
1135    }
1136
1137    /// Issue an OS-level read-ahead hint for `page_id`.
1138    ///
1139    /// A6 prefetch wire: called from `BTreeCursor::next` when the
1140    /// cursor passes 50% of the current leaf, so the kernel fetches
1141    /// the next leaf page while CPU processes the remaining half of
1142    /// the current one. Failures are silent — a missed prefetch is a
1143    /// performance miss, never a correctness bug.
1144    pub fn prefetch_hint(&self, page_id: u32) {
1145        if let Ok(file) = self.file_lock() {
1146            let _ = crate::storage::btree::prefetch::prefetch_page(
1147                &file,
1148                page_id as u64,
1149                PAGE_SIZE as u32,
1150            );
1151        }
1152    }
1153
1154    // ── Corruption defense helpers ──────────────────────────────────
1155
1156    /// Path for the header shadow file
1157    fn shadow_path(db_path: &Path) -> PathBuf {
1158        let mut p = db_path.to_path_buf().into_os_string();
1159        p.push("-hdr");
1160        PathBuf::from(p)
1161    }
1162
1163    /// Path for the metadata shadow file
1164    fn meta_shadow_path(db_path: &Path) -> PathBuf {
1165        let mut p = db_path.to_path_buf().into_os_string();
1166        p.push("-meta");
1167        PathBuf::from(p)
1168    }
1169
1170    /// Path for the double-write buffer file
1171    fn dwb_path(db_path: &Path) -> PathBuf {
1172        let mut p = db_path.to_path_buf().into_os_string();
1173        p.push("-dwb");
1174        PathBuf::from(p)
1175    }
1176
1177    /// Open the double-write buffer file without truncating existing content.
1178    ///
1179    /// The file is intentionally preserved across restarts so recovery can
1180    /// consume any crash-leftover pages before the next write cycle clears it.
1181    fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1182        Ok(OpenOptions::new()
1183            .read(true)
1184            .write(true)
1185            .create(true)
1186            .truncate(false)
1187            .open(Self::dwb_path(db_path))?)
1188    }
1189
1190    /// Clear the DWB in place while preserving the file path and handle.
1191    fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1192        file.set_len(0)?;
1193        file.seek(SeekFrom::Start(0))?;
1194        file.sync_all()?;
1195        Ok(())
1196    }
1197
1198    /// Write a shadow copy of the header page to .rdb-hdr
1199    fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1200        if self.config.read_only {
1201            return Ok(());
1202        }
1203        let shadow = Self::shadow_path(&self.path);
1204        let mut f = File::create(&shadow)?;
1205        f.write_all(page.as_bytes())?;
1206        f.sync_all()?;
1207        Ok(())
1208    }
1209
1210    /// Recover header from shadow file when page 0 is corrupted
1211    fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1212        let shadow = Self::shadow_path(&self.path);
1213        if !shadow.exists() {
1214            return Err(PagerError::InvalidDatabase(
1215                "Page 0 corrupted and no header shadow found".into(),
1216            ));
1217        }
1218        let mut f = File::open(&shadow)?;
1219        let mut buf = [0u8; PAGE_SIZE];
1220        f.read_exact(&mut buf)?;
1221        let page = Page::from_bytes(buf);
1222
1223        // Verify shadow is valid
1224        let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1225        if magic != MAGIC_BYTES {
1226            return Err(PagerError::InvalidDatabase(
1227                "Header shadow also corrupted".into(),
1228            ));
1229        }
1230
1231        // Restore page 0 from shadow
1232        if !self.config.read_only {
1233            self.write_page_raw(0, &page)?;
1234            let file = self.file_lock()?;
1235            file.sync_all()?;
1236        }
1237
1238        Ok(page)
1239    }
1240
1241    /// Write a shadow copy of the metadata page to .rdb-meta
1242    pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1243        if self.config.read_only {
1244            return Ok(());
1245        }
1246        let shadow = Self::meta_shadow_path(&self.path);
1247        let mut f = File::create(&shadow)?;
1248        f.write_all(page.as_bytes())?;
1249        f.sync_all()?;
1250        Ok(())
1251    }
1252
1253    /// Recover metadata page from shadow file when page 1 is corrupted
1254    pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1255        let shadow = Self::meta_shadow_path(&self.path);
1256        if !shadow.exists() {
1257            return Err(PagerError::InvalidDatabase(
1258                "Page 1 corrupted and no metadata shadow found".into(),
1259            ));
1260        }
1261        let mut f = File::open(&shadow)?;
1262        let mut buf = [0u8; PAGE_SIZE];
1263        f.read_exact(&mut buf)?;
1264        let page = Page::from_bytes(buf);
1265
1266        // Restore page 1 from shadow
1267        if !self.config.read_only {
1268            self.write_page_raw(1, &page)?;
1269            let file = self.file_lock()?;
1270            file.sync_all()?;
1271        }
1272
1273        Ok(page)
1274    }
1275
1276    /// Write pages through the double-write buffer for torn page protection.
1277    ///
1278    /// 1. Write all pages to the DWB file with a header (magic + count + checksum)
1279    /// 2. fsync the DWB
1280    /// 3. Write all pages to their final locations in the .rdb file
1281    /// 4. Truncate the DWB (marks as consumed)
1282    fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1283        if let Some(dwb_mutex) = &self.dwb_file {
1284            let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1285
1286            // Build DWB content: [magic:4][count:u32][checksum:u32][pages...]
1287            // Each page entry: [page_id:u32][page_data:4096]
1288            let entry_size = 4 + PAGE_SIZE; // page_id + data
1289            let header_len = 4 + 4 + 4; // magic + count + checksum
1290            let total = header_len + pages.len() * entry_size;
1291            let mut buf = Vec::with_capacity(total);
1292
1293            // Header
1294            buf.extend_from_slice(&DWB_MAGIC);
1295            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1296            buf.extend_from_slice(&[0u8; 4]); // placeholder for checksum
1297
1298            // Page entries
1299            for (page_id, page) in pages {
1300                buf.extend_from_slice(&page_id.to_le_bytes());
1301                buf.extend_from_slice(page.as_bytes());
1302            }
1303
1304            // Compute and write checksum over all data after the header
1305            let checksum = super::super::crc32::crc32(&buf[header_len..]);
1306            buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1307
1308            // Write DWB and fsync
1309            dwb.seek(SeekFrom::Start(0))?;
1310            dwb.write_all(&buf)?;
1311            dwb.set_len(buf.len() as u64)?;
1312            dwb.sync_all()?;
1313
1314            // Now write pages to their final locations
1315            for (page_id, page) in pages {
1316                self.write_page_raw(*page_id, page)?;
1317            }
1318
1319            // Truncate DWB to mark as consumed
1320            Self::clear_dwb_file(&mut dwb)?;
1321
1322            Ok(())
1323        } else {
1324            // DWB disabled — write directly
1325            for (page_id, page) in pages {
1326                self.write_page_raw(*page_id, page)?;
1327            }
1328            Ok(())
1329        }
1330    }
1331
1332    /// Recover from double-write buffer after a crash.
1333    ///
1334    /// If the DWB file contains valid pages, they were written before the crash
1335    /// interrupted writing to the main file. Re-apply them.
1336    fn recover_from_dwb(&self) -> Result<(), PagerError> {
1337        let dwb_path = Self::dwb_path(&self.path);
1338        if !dwb_path.exists() {
1339            return Ok(());
1340        }
1341
1342        if let Some(dwb_mutex) = &self.dwb_file {
1343            let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1344            return self.recover_from_dwb_file(&mut file);
1345        }
1346
1347        let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1348        self.recover_from_dwb_file(&mut file)
1349    }
1350
1351    fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1352        file.seek(SeekFrom::Start(0))?;
1353        let len = file.metadata()?.len();
1354        if len < 12 {
1355            // Empty or incomplete header — keep the DWB file but clear stale bytes.
1356            return Self::clear_dwb_file(file);
1357        }
1358
1359        let mut buf = vec![0u8; len as usize];
1360        file.read_exact(&mut buf)?;
1361
1362        // Verify magic
1363        if buf[0..4] != DWB_MAGIC {
1364            // Not a valid DWB — clear it in place so the same file can be reused.
1365            return Self::clear_dwb_file(file);
1366        }
1367
1368        let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1369        let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1370
1371        let header_len = 12;
1372        let entry_size = 4 + PAGE_SIZE;
1373        let expected_len = header_len + count * entry_size;
1374
1375        if buf.len() < expected_len {
1376            // Incomplete DWB write — discard it in place.
1377            return Self::clear_dwb_file(file);
1378        }
1379
1380        // Verify checksum
1381        let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1382        if computed != stored_checksum {
1383            // Corrupted DWB — discard it in place.
1384            return Self::clear_dwb_file(file);
1385        }
1386
1387        // DWB is valid — re-apply pages to main file
1388        let mut offset = header_len;
1389        for _ in 0..count {
1390            let page_id = u32::from_le_bytes([
1391                buf[offset],
1392                buf[offset + 1],
1393                buf[offset + 2],
1394                buf[offset + 3],
1395            ]);
1396            offset += 4;
1397
1398            let mut page_data = [0u8; PAGE_SIZE];
1399            page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1400            offset += PAGE_SIZE;
1401
1402            let page = Page::from_bytes(page_data);
1403            self.write_page_raw(page_id, &page)?;
1404        }
1405
1406        // Sync and clean up
1407        {
1408            let file = self.file_lock()?;
1409            file.sync_all()?;
1410        }
1411
1412        Self::clear_dwb_file(file)
1413    }
1414
1415    /// Write header and sync to disk (public for checkpointer).
1416    pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1417        self.write_header()?;
1418        let file = self.file_lock()?;
1419        file.sync_all()?;
1420        Ok(())
1421    }
1422
1423    /// Set the checkpoint_in_progress flag in the header.
1424    pub fn set_checkpoint_in_progress(
1425        &self,
1426        in_progress: bool,
1427        target_lsn: u64,
1428    ) -> Result<(), PagerError> {
1429        let mut header = self.header_write()?;
1430        header.checkpoint_in_progress = in_progress;
1431        header.checkpoint_target_lsn = target_lsn;
1432        *self.header_dirty_lock()? = true;
1433        drop(header);
1434        self.write_header_and_sync()
1435    }
1436
1437    /// Update the checkpoint LSN and clear the in-progress flag.
1438    pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1439        let mut header = self.header_write()?;
1440        header.checkpoint_lsn = lsn;
1441        header.checkpoint_in_progress = false;
1442        header.checkpoint_target_lsn = 0;
1443        *self.header_dirty_lock()? = true;
1444        drop(header);
1445        self.write_header_and_sync()
1446    }
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451    use super::*;
1452
1453    fn temp_db_path(name: &str) -> PathBuf {
1454        std::env::temp_dir().join(format!(
1455            "reddb-pager-{}-{}-{}.rdb",
1456            name,
1457            std::process::id(),
1458            crate::utils::now_unix_nanos()
1459        ))
1460    }
1461
1462    #[test]
1463    fn open_refuses_future_database_version() {
1464        let path = temp_db_path("future-version");
1465        let pager = Pager::open_default(&path).unwrap();
1466        drop(pager);
1467
1468        let mut future_header = Page::new_header_page(1);
1469        future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1470            .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1471        future_header.update_checksum();
1472
1473        let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1474        file.seek(SeekFrom::Start(0)).unwrap();
1475        file.write_all(future_header.as_bytes()).unwrap();
1476        file.sync_all().unwrap();
1477        drop(file);
1478
1479        let err = match Pager::open_default(&path) {
1480            Ok(_) => panic!("future database version should be rejected"),
1481            Err(err) => err,
1482        };
1483        match err {
1484            PagerError::InvalidDatabase(msg) => {
1485                assert!(msg.contains("newer than supported"));
1486            }
1487            other => panic!("expected InvalidDatabase, got {other:?}"),
1488        }
1489
1490        let _ = std::fs::remove_file(&path);
1491        let _ = std::fs::remove_file(Pager::shadow_path(&path));
1492        let _ = std::fs::remove_file(Pager::dwb_path(&path));
1493    }
1494}