1use super::*;
2
3const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7 pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9 let path = path.as_ref().to_path_buf();
10 let exists = path.exists();
11
12 if !exists && !config.create {
13 return Err(PagerError::InvalidDatabase(
14 "Database does not exist".into(),
15 ));
16 }
17
18 if !exists && config.read_only {
19 return Err(PagerError::InvalidDatabase(
20 "Cannot create read-only database".into(),
21 ));
22 }
23
24 let file = OpenOptions::new()
27 .read(true)
28 .write(!config.read_only)
29 .create(config.create && !config.read_only)
30 .open(&path)?;
31
32 let lock_file = if !config.read_only {
34 let lf = OpenOptions::new().read(true).write(true).open(&path)?;
35 lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
36 Some(lf)
37 } else {
38 let lf = OpenOptions::new().read(true).open(&path)?;
39 match lf.try_lock_shared() {
40 Ok(_) => Some(lf),
41 Err(_) => None,
42 }
43 };
44
45 let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
52 let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
53 let f = Self::open_dwb_file(&path)?;
54 Some(Mutex::new(f))
55 } else {
56 if fold_dwb && !config.read_only {
57 let _ = std::fs::remove_file(Self::dwb_path(&path));
58 }
59 None
60 };
61
62 let mut pager = Self {
63 path,
64 file: Mutex::new(file),
65 _lock_file: lock_file,
66 dwb_file,
67 cache: PageCache::new(config.cache_size),
68 freelist: RwLock::new(FreeList::new()),
69 header: RwLock::new(DatabaseHeader::default()),
70 config,
71 header_dirty: Mutex::new(false),
72 wal: RwLock::new(None),
73 encryption: None,
74 };
75
76 if exists {
77 pager.recover_from_dwb()?;
79 pager.load_header()?;
81 pager.bind_encryption_for_existing()?;
82 } else {
83 pager.initialize()?;
85 pager.bind_encryption_for_new()?;
86 }
87
88 Ok(pager)
89 }
90
91 fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
101 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
102 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
103
104 if self.page_count().unwrap_or(0) == 0 {
105 return self.bind_encryption_for_new();
106 }
107 let header_page = self.read_page_no_checksum(0)?;
108 let data = header_page.as_bytes();
109 let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
110 && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
111
112 let key = self.config.encryption.clone();
113 match (has_marker, key) {
114 (true, Some(key)) => {
115 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
116 let header =
117 crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
118 .map_err(|e| {
119 PagerError::InvalidDatabase(format!(
120 "encryption header parse failed: {e}"
121 ))
122 })?;
123 if !header.validate(&key) {
124 return Err(PagerError::InvalidKey);
125 }
126 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
127 self.encryption = Some((encryptor, header));
128 Ok(())
129 }
130 (true, None) => Err(PagerError::EncryptionRequired),
131 (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
132 (false, None) => Ok(()),
133 }
134 }
135
136 fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
139 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
140 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
141
142 let Some(key) = self.config.encryption.clone() else {
143 return Ok(());
144 };
145 let header = crate::storage::encryption::EncryptionHeader::new(&key);
146 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
147
148 if self.page_count().unwrap_or(0) > 0 {
151 let mut page = self.read_page_no_checksum(0)?;
152 let data = page.as_bytes_mut();
153 data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
154 .copy_from_slice(ENCRYPTION_MARKER);
155 let header_bytes = header.to_bytes();
156 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
157 data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
158 self.write_page_no_checksum(0, page)?;
159 }
160 self.encryption = Some((encryptor, header));
161 Ok(())
162 }
163
164 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
166 Self::open(path, PagerConfig::default())
167 }
168
169 fn initialize(&self) -> Result<(), PagerError> {
171 if self.config.read_only {
172 return Err(PagerError::ReadOnly);
173 }
174
175 let initial_page_count = 3;
179 let header_page = Page::new_header_page(initial_page_count);
180 self.header_write()?.page_count = initial_page_count;
181
182 self.write_page_raw(0, &header_page)?;
185 let mut metadata_page = Page::new(PageType::Header, 1);
186 metadata_page.update_checksum();
187 self.write_page_raw(1, &metadata_page)?;
188 let mut vault_page = Page::new(PageType::Vault, 2);
189 vault_page.update_checksum();
190 self.write_page_raw(2, &vault_page)?;
191
192 self.sync()?;
194
195 Ok(())
196 }
197
198 fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
200 self.header.write().map_err(|_| PagerError::LockPoisoned)
201 }
202
203 fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
205 self.header.read().map_err(|_| PagerError::LockPoisoned)
206 }
207
208 fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
210 self.freelist.write().map_err(|_| PagerError::LockPoisoned)
211 }
212
213 fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
215 self.file.lock().map_err(|_| PagerError::LockPoisoned)
216 }
217
218 fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
220 self.header_dirty
221 .lock()
222 .map_err(|_| PagerError::LockPoisoned)
223 }
224
225 fn load_header(&self) -> Result<(), PagerError> {
227 let header_page = match self.read_page_raw(0) {
229 Ok(page) => {
230 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
232 if magic == MAGIC_BYTES {
233 page
234 } else {
235 self.recover_header_from_shadow()?
237 }
238 }
239 Err(_) => self.recover_header_from_shadow()?,
240 };
241
242 let data = header_page.as_bytes();
244 let version = u32::from_le_bytes([
245 data[HEADER_SIZE + 4],
246 data[HEADER_SIZE + 5],
247 data[HEADER_SIZE + 6],
248 data[HEADER_SIZE + 7],
249 ]);
250
251 let page_size = u32::from_le_bytes([
252 data[HEADER_SIZE + 8],
253 data[HEADER_SIZE + 9],
254 data[HEADER_SIZE + 10],
255 data[HEADER_SIZE + 11],
256 ]);
257
258 if page_size != PAGE_SIZE as u32 {
259 return Err(PagerError::InvalidDatabase(format!(
260 "Unsupported page size: {}",
261 page_size
262 )));
263 }
264 if version > DB_VERSION {
265 return Err(PagerError::InvalidDatabase(format!(
266 "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
267 )));
268 }
269
270 let page_count = u32::from_le_bytes([
271 data[HEADER_SIZE + 12],
272 data[HEADER_SIZE + 13],
273 data[HEADER_SIZE + 14],
274 data[HEADER_SIZE + 15],
275 ]);
276
277 let freelist_head = u32::from_le_bytes([
278 data[HEADER_SIZE + 16],
279 data[HEADER_SIZE + 17],
280 data[HEADER_SIZE + 18],
281 data[HEADER_SIZE + 19],
282 ]);
283
284 let schema_version = u32::from_le_bytes([
285 data[HEADER_SIZE + 20],
286 data[HEADER_SIZE + 21],
287 data[HEADER_SIZE + 22],
288 data[HEADER_SIZE + 23],
289 ]);
290
291 let checkpoint_lsn = u64::from_le_bytes([
292 data[HEADER_SIZE + 24],
293 data[HEADER_SIZE + 25],
294 data[HEADER_SIZE + 26],
295 data[HEADER_SIZE + 27],
296 data[HEADER_SIZE + 28],
297 data[HEADER_SIZE + 29],
298 data[HEADER_SIZE + 30],
299 data[HEADER_SIZE + 31],
300 ]);
301 let physical_format_version = u32::from_le_bytes([
302 data[HEADER_SIZE + 32],
303 data[HEADER_SIZE + 33],
304 data[HEADER_SIZE + 34],
305 data[HEADER_SIZE + 35],
306 ]);
307 let physical_sequence = u64::from_le_bytes([
308 data[HEADER_SIZE + 36],
309 data[HEADER_SIZE + 37],
310 data[HEADER_SIZE + 38],
311 data[HEADER_SIZE + 39],
312 data[HEADER_SIZE + 40],
313 data[HEADER_SIZE + 41],
314 data[HEADER_SIZE + 42],
315 data[HEADER_SIZE + 43],
316 ]);
317 let manifest_root = u64::from_le_bytes([
318 data[HEADER_SIZE + 44],
319 data[HEADER_SIZE + 45],
320 data[HEADER_SIZE + 46],
321 data[HEADER_SIZE + 47],
322 data[HEADER_SIZE + 48],
323 data[HEADER_SIZE + 49],
324 data[HEADER_SIZE + 50],
325 data[HEADER_SIZE + 51],
326 ]);
327 let manifest_oldest_root = u64::from_le_bytes([
328 data[HEADER_SIZE + 52],
329 data[HEADER_SIZE + 53],
330 data[HEADER_SIZE + 54],
331 data[HEADER_SIZE + 55],
332 data[HEADER_SIZE + 56],
333 data[HEADER_SIZE + 57],
334 data[HEADER_SIZE + 58],
335 data[HEADER_SIZE + 59],
336 ]);
337 let free_set_root = u64::from_le_bytes([
338 data[HEADER_SIZE + 60],
339 data[HEADER_SIZE + 61],
340 data[HEADER_SIZE + 62],
341 data[HEADER_SIZE + 63],
342 data[HEADER_SIZE + 64],
343 data[HEADER_SIZE + 65],
344 data[HEADER_SIZE + 66],
345 data[HEADER_SIZE + 67],
346 ]);
347 let manifest_page = u32::from_le_bytes([
348 data[HEADER_SIZE + 68],
349 data[HEADER_SIZE + 69],
350 data[HEADER_SIZE + 70],
351 data[HEADER_SIZE + 71],
352 ]);
353 let manifest_checksum = u64::from_le_bytes([
354 data[HEADER_SIZE + 72],
355 data[HEADER_SIZE + 73],
356 data[HEADER_SIZE + 74],
357 data[HEADER_SIZE + 75],
358 data[HEADER_SIZE + 76],
359 data[HEADER_SIZE + 77],
360 data[HEADER_SIZE + 78],
361 data[HEADER_SIZE + 79],
362 ]);
363 let collection_roots_page = u32::from_le_bytes([
364 data[HEADER_SIZE + 80],
365 data[HEADER_SIZE + 81],
366 data[HEADER_SIZE + 82],
367 data[HEADER_SIZE + 83],
368 ]);
369 let collection_roots_checksum = u64::from_le_bytes([
370 data[HEADER_SIZE + 84],
371 data[HEADER_SIZE + 85],
372 data[HEADER_SIZE + 86],
373 data[HEADER_SIZE + 87],
374 data[HEADER_SIZE + 88],
375 data[HEADER_SIZE + 89],
376 data[HEADER_SIZE + 90],
377 data[HEADER_SIZE + 91],
378 ]);
379 let collection_root_count = u32::from_le_bytes([
380 data[HEADER_SIZE + 92],
381 data[HEADER_SIZE + 93],
382 data[HEADER_SIZE + 94],
383 data[HEADER_SIZE + 95],
384 ]);
385 let snapshot_count = u32::from_le_bytes([
386 data[HEADER_SIZE + 96],
387 data[HEADER_SIZE + 97],
388 data[HEADER_SIZE + 98],
389 data[HEADER_SIZE + 99],
390 ]);
391 let index_count = u32::from_le_bytes([
392 data[HEADER_SIZE + 100],
393 data[HEADER_SIZE + 101],
394 data[HEADER_SIZE + 102],
395 data[HEADER_SIZE + 103],
396 ]);
397 let catalog_collection_count = u32::from_le_bytes([
398 data[HEADER_SIZE + 104],
399 data[HEADER_SIZE + 105],
400 data[HEADER_SIZE + 106],
401 data[HEADER_SIZE + 107],
402 ]);
403 let catalog_total_entities = u64::from_le_bytes([
404 data[HEADER_SIZE + 108],
405 data[HEADER_SIZE + 109],
406 data[HEADER_SIZE + 110],
407 data[HEADER_SIZE + 111],
408 data[HEADER_SIZE + 112],
409 data[HEADER_SIZE + 113],
410 data[HEADER_SIZE + 114],
411 data[HEADER_SIZE + 115],
412 ]);
413 let export_count = u32::from_le_bytes([
414 data[HEADER_SIZE + 116],
415 data[HEADER_SIZE + 117],
416 data[HEADER_SIZE + 118],
417 data[HEADER_SIZE + 119],
418 ]);
419 let graph_projection_count = u32::from_le_bytes([
420 data[HEADER_SIZE + 120],
421 data[HEADER_SIZE + 121],
422 data[HEADER_SIZE + 122],
423 data[HEADER_SIZE + 123],
424 ]);
425 let analytics_job_count = u32::from_le_bytes([
426 data[HEADER_SIZE + 124],
427 data[HEADER_SIZE + 125],
428 data[HEADER_SIZE + 126],
429 data[HEADER_SIZE + 127],
430 ]);
431 let manifest_event_count = u32::from_le_bytes([
432 data[HEADER_SIZE + 128],
433 data[HEADER_SIZE + 129],
434 data[HEADER_SIZE + 130],
435 data[HEADER_SIZE + 131],
436 ]);
437 let registry_page = u32::from_le_bytes([
438 data[HEADER_SIZE + 132],
439 data[HEADER_SIZE + 133],
440 data[HEADER_SIZE + 134],
441 data[HEADER_SIZE + 135],
442 ]);
443 let registry_checksum = u64::from_le_bytes([
444 data[HEADER_SIZE + 136],
445 data[HEADER_SIZE + 137],
446 data[HEADER_SIZE + 138],
447 data[HEADER_SIZE + 139],
448 data[HEADER_SIZE + 140],
449 data[HEADER_SIZE + 141],
450 data[HEADER_SIZE + 142],
451 data[HEADER_SIZE + 143],
452 ]);
453 let recovery_page = u32::from_le_bytes([
454 data[HEADER_SIZE + 144],
455 data[HEADER_SIZE + 145],
456 data[HEADER_SIZE + 146],
457 data[HEADER_SIZE + 147],
458 ]);
459 let recovery_checksum = u64::from_le_bytes([
460 data[HEADER_SIZE + 148],
461 data[HEADER_SIZE + 149],
462 data[HEADER_SIZE + 150],
463 data[HEADER_SIZE + 151],
464 data[HEADER_SIZE + 152],
465 data[HEADER_SIZE + 153],
466 data[HEADER_SIZE + 154],
467 data[HEADER_SIZE + 155],
468 ]);
469 let catalog_page = u32::from_le_bytes([
470 data[HEADER_SIZE + 156],
471 data[HEADER_SIZE + 157],
472 data[HEADER_SIZE + 158],
473 data[HEADER_SIZE + 159],
474 ]);
475 let catalog_checksum = u64::from_le_bytes([
476 data[HEADER_SIZE + 160],
477 data[HEADER_SIZE + 161],
478 data[HEADER_SIZE + 162],
479 data[HEADER_SIZE + 163],
480 data[HEADER_SIZE + 164],
481 data[HEADER_SIZE + 165],
482 data[HEADER_SIZE + 166],
483 data[HEADER_SIZE + 167],
484 ]);
485 let metadata_state_page = u32::from_le_bytes([
486 data[HEADER_SIZE + 168],
487 data[HEADER_SIZE + 169],
488 data[HEADER_SIZE + 170],
489 data[HEADER_SIZE + 171],
490 ]);
491 let metadata_state_checksum = u64::from_le_bytes([
492 data[HEADER_SIZE + 172],
493 data[HEADER_SIZE + 173],
494 data[HEADER_SIZE + 174],
495 data[HEADER_SIZE + 175],
496 data[HEADER_SIZE + 176],
497 data[HEADER_SIZE + 177],
498 data[HEADER_SIZE + 178],
499 data[HEADER_SIZE + 179],
500 ]);
501 let vector_artifact_page = u32::from_le_bytes([
502 data[HEADER_SIZE + 180],
503 data[HEADER_SIZE + 181],
504 data[HEADER_SIZE + 182],
505 data[HEADER_SIZE + 183],
506 ]);
507 let vector_artifact_checksum = u64::from_le_bytes([
508 data[HEADER_SIZE + 184],
509 data[HEADER_SIZE + 185],
510 data[HEADER_SIZE + 186],
511 data[HEADER_SIZE + 187],
512 data[HEADER_SIZE + 188],
513 data[HEADER_SIZE + 189],
514 data[HEADER_SIZE + 190],
515 data[HEADER_SIZE + 191],
516 ]);
517
518 let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
520 let checkpoint_target_lsn = u64::from_le_bytes([
521 data[HEADER_SIZE + 193],
522 data[HEADER_SIZE + 194],
523 data[HEADER_SIZE + 195],
524 data[HEADER_SIZE + 196],
525 data[HEADER_SIZE + 197],
526 data[HEADER_SIZE + 198],
527 data[HEADER_SIZE + 199],
528 data[HEADER_SIZE + 200],
529 ]);
530
531 {
533 let mut header = self.header_write()?;
534 header.version = version;
535 header.page_size = page_size;
536 header.page_count = page_count;
537 header.freelist_head = freelist_head;
538 header.schema_version = schema_version;
539 header.checkpoint_lsn = checkpoint_lsn;
540 header.checkpoint_in_progress = checkpoint_in_progress;
541 header.checkpoint_target_lsn = checkpoint_target_lsn;
542 header.physical = PhysicalFileHeader {
543 format_version: physical_format_version,
544 sequence: physical_sequence,
545 manifest_oldest_root,
546 manifest_root,
547 free_set_root,
548 manifest_page,
549 manifest_checksum,
550 collection_roots_page,
551 collection_roots_checksum,
552 collection_root_count,
553 snapshot_count,
554 index_count,
555 catalog_collection_count,
556 catalog_total_entities,
557 export_count,
558 graph_projection_count,
559 analytics_job_count,
560 manifest_event_count,
561 registry_page,
562 registry_checksum,
563 recovery_page,
564 recovery_checksum,
565 catalog_page,
566 catalog_checksum,
567 metadata_state_page,
568 metadata_state_checksum,
569 vector_artifact_page,
570 vector_artifact_checksum,
571 };
572 }
573
574 {
576 let mut freelist = self.freelist_write()?;
577 *freelist = FreeList::from_header(freelist_head, 0);
578 }
579
580 Ok(())
581 }
582
583 fn write_header(&self) -> Result<(), PagerError> {
588 if self.config.read_only {
589 return Err(PagerError::ReadOnly);
590 }
591
592 let header = self.header_read()?;
593
594 let mut page = if let Some(cached) = self.cache.get(0) {
597 cached
598 } else {
599 let file = self.file_lock()?;
601 let len = file.metadata().map(|m| m.len()).unwrap_or(0);
602 drop(file);
603
604 if len >= PAGE_SIZE as u64 {
605 self.read_page_raw(0)?
606 } else {
607 Page::new(PageType::Header, 0)
609 }
610 };
611
612 let data = page.as_bytes_mut();
613
614 data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
616
617 data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
619 data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
620 data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
621 data[HEADER_SIZE + 16..HEADER_SIZE + 20]
622 .copy_from_slice(&header.freelist_head.to_le_bytes());
623 data[HEADER_SIZE + 20..HEADER_SIZE + 24]
624 .copy_from_slice(&header.schema_version.to_le_bytes());
625 data[HEADER_SIZE + 24..HEADER_SIZE + 32]
626 .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
627 data[HEADER_SIZE + 32..HEADER_SIZE + 36]
628 .copy_from_slice(&header.physical.format_version.to_le_bytes());
629 data[HEADER_SIZE + 36..HEADER_SIZE + 44]
630 .copy_from_slice(&header.physical.sequence.to_le_bytes());
631 data[HEADER_SIZE + 44..HEADER_SIZE + 52]
632 .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
633 data[HEADER_SIZE + 52..HEADER_SIZE + 60]
634 .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
635 data[HEADER_SIZE + 60..HEADER_SIZE + 68]
636 .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
637 data[HEADER_SIZE + 68..HEADER_SIZE + 72]
638 .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
639 data[HEADER_SIZE + 72..HEADER_SIZE + 80]
640 .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
641 data[HEADER_SIZE + 80..HEADER_SIZE + 84]
642 .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
643 data[HEADER_SIZE + 84..HEADER_SIZE + 92]
644 .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
645 data[HEADER_SIZE + 92..HEADER_SIZE + 96]
646 .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
647 data[HEADER_SIZE + 96..HEADER_SIZE + 100]
648 .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
649 data[HEADER_SIZE + 100..HEADER_SIZE + 104]
650 .copy_from_slice(&header.physical.index_count.to_le_bytes());
651 data[HEADER_SIZE + 104..HEADER_SIZE + 108]
652 .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
653 data[HEADER_SIZE + 108..HEADER_SIZE + 116]
654 .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
655 data[HEADER_SIZE + 116..HEADER_SIZE + 120]
656 .copy_from_slice(&header.physical.export_count.to_le_bytes());
657 data[HEADER_SIZE + 120..HEADER_SIZE + 124]
658 .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
659 data[HEADER_SIZE + 124..HEADER_SIZE + 128]
660 .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
661 data[HEADER_SIZE + 128..HEADER_SIZE + 132]
662 .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
663 data[HEADER_SIZE + 132..HEADER_SIZE + 136]
664 .copy_from_slice(&header.physical.registry_page.to_le_bytes());
665 data[HEADER_SIZE + 136..HEADER_SIZE + 144]
666 .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
667 data[HEADER_SIZE + 144..HEADER_SIZE + 148]
668 .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
669 data[HEADER_SIZE + 148..HEADER_SIZE + 156]
670 .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
671 data[HEADER_SIZE + 156..HEADER_SIZE + 160]
672 .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
673 data[HEADER_SIZE + 160..HEADER_SIZE + 168]
674 .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
675 data[HEADER_SIZE + 168..HEADER_SIZE + 172]
676 .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
677 data[HEADER_SIZE + 172..HEADER_SIZE + 180]
678 .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
679 data[HEADER_SIZE + 180..HEADER_SIZE + 184]
680 .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
681 data[HEADER_SIZE + 184..HEADER_SIZE + 192]
682 .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
683
684 data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
686 data[HEADER_SIZE + 193..HEADER_SIZE + 201]
687 .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
688
689 page.update_checksum();
690
691 self.write_header_shadow(&page)?;
693
694 self.write_page_raw(0, &page)?;
695 *self.header_dirty_lock()? = false;
696
697 Ok(())
698 }
699
700 fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
702 let mut file = self.file_lock()?;
703 let offset = (page_id as u64) * (PAGE_SIZE as u64);
704
705 file.seek(SeekFrom::Start(offset))?;
706
707 let mut buf = [0u8; PAGE_SIZE];
708 file.read_exact(&mut buf)?;
709
710 let page = Page::from_bytes(buf);
711
712 if self.config.verify_checksums {
714 page.verify_checksum()?;
715 }
716
717 Ok(page)
718 }
719
720 fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
722 if self.config.read_only {
723 return Err(PagerError::ReadOnly);
724 }
725
726 let mut file = self.file_lock()?;
727 let offset = (page_id as u64) * (PAGE_SIZE as u64);
728
729 file.seek(SeekFrom::Start(offset))?;
730 file.write_all(page.as_bytes())?;
731
732 Ok(())
733 }
734
735 pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
737 if let Some(page) = self.cache.get(page_id) {
739 return Ok(page);
740 }
741
742 let page = self.read_page_raw(page_id)?;
744
745 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
747 let evicted_id = dirty_page.page_id();
749 self.write_page_raw(evicted_id, &dirty_page)?;
750 }
751
752 Ok(page)
753 }
754
755 pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
760 if let Some(page) = self.cache.get(page_id) {
762 return Ok(page);
763 }
764
765 let mut file = self.file_lock()?;
767 let offset = (page_id as u64) * (PAGE_SIZE as u64);
768
769 file.seek(SeekFrom::Start(offset))?;
770
771 let mut buf = [0u8; PAGE_SIZE];
772 file.read_exact(&mut buf)?;
773 drop(file);
774
775 let page = Page::from_bytes(buf);
776
777 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
779 let evicted_id = dirty_page.page_id();
781 self.write_page_raw(evicted_id, &dirty_page)?;
782 }
783
784 Ok(page)
785 }
786
787 pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
789 if self.config.read_only {
790 return Err(PagerError::ReadOnly);
791 }
792
793 page.update_checksum();
795
796 if let Some(dirty_page) = self.cache.insert(page_id, page) {
801 let evicted_id = dirty_page.page_id();
802 self.write_page_raw(evicted_id, &dirty_page)?;
803 }
804 self.cache.mark_dirty(page_id);
805
806 Ok(())
807 }
808
809 pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
814 if page_id == 0 || self.encryption.is_none() {
815 return self.read_page(page_id);
816 }
817 let raw = self.read_page_no_checksum(page_id)?;
818 let (enc, _) = self
819 .encryption
820 .as_ref()
821 .expect("encryption presence checked above");
822 let plaintext = enc
823 .decrypt(page_id, raw.as_bytes())
824 .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
825 let mut buf = [0u8; PAGE_SIZE];
826 let n = plaintext.len().min(PAGE_SIZE);
827 buf[..n].copy_from_slice(&plaintext[..n]);
828 Ok(Page::from_bytes(buf))
829 }
830
831 pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
836 if page_id == 0 || self.encryption.is_none() {
837 return self.write_page(page_id, page);
838 }
839 const OVERHEAD: usize = 12 + 16; let plaintext_len = PAGE_SIZE - OVERHEAD;
841 let plaintext = &page.as_bytes()[..plaintext_len];
842 let (enc, _) = self
843 .encryption
844 .as_ref()
845 .expect("encryption presence checked above");
846 let ciphertext = enc.encrypt(page_id, plaintext);
847 debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
848 let mut buf = [0u8; PAGE_SIZE];
849 buf.copy_from_slice(&ciphertext);
850 let cipher_page = Page::from_bytes(buf);
851 self.write_page_no_checksum(page_id, cipher_page)
852 }
853
854 pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
859 if self.config.read_only {
860 return Err(PagerError::ReadOnly);
861 }
862
863 if let Some(dirty_page) = self.cache.insert(page_id, page) {
866 let evicted_id = dirty_page.page_id();
867 self.write_page_raw(evicted_id, &dirty_page)?;
868 }
869 self.cache.mark_dirty(page_id);
870
871 Ok(())
872 }
873
874 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
876 if self.config.read_only {
877 return Err(PagerError::ReadOnly);
878 }
879
880 let page_id = {
882 let mut freelist = self.freelist_write()?;
883 if let Some(id) = freelist.allocate() {
884 id
885 } else if freelist.trunk_head() != 0 {
886 let trunk_id = freelist.trunk_head();
887 drop(freelist);
888
889 let trunk = self.read_page(trunk_id).map_err(|e| match e {
890 PagerError::PageNotFound(_) => {
891 PagerError::InvalidDatabase("Freelist trunk missing".to_string())
892 }
893 other => other,
894 })?;
895
896 let mut freelist = self.freelist_write()?;
897 freelist
898 .load_from_trunk(&trunk)
899 .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
900 let id = freelist.allocate().ok_or_else(|| {
901 PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
902 })?;
903
904 let mut header = self.header_write()?;
905 header.freelist_head = freelist.trunk_head();
906 *self.header_dirty_lock()? = true;
907
908 id
909 } else {
910 let mut header = self.header_write()?;
912 let id = header.page_count;
913 header.page_count += 1;
914 *self.header_dirty_lock()? = true;
915 id
916 }
917 };
918
919 let page = Page::new(page_type, page_id);
920
921 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
933 let evicted_id = dirty_page.page_id();
934 self.write_page_raw(evicted_id, &dirty_page)?;
935 }
936 self.cache.mark_dirty(page_id);
937
938 Ok(page)
939 }
940
941 pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
943 if self.config.read_only {
944 return Err(PagerError::ReadOnly);
945 }
946 if n_pages == 0 {
947 return Err(PagerError::InvalidDatabase(
948 "contiguous extent must reserve at least one page".to_string(),
949 ));
950 }
951
952 let start_page = {
953 let mut header = self.header_write()?;
954 let start = header.page_count;
955 header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
956 PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
957 })?;
958 *self.header_dirty_lock()? = true;
959 start
960 };
961
962 for page_id in start_page..start_page + n_pages {
963 let mut page = Page::new(PageType::Vector, page_id);
964 page.update_checksum();
965 if let Some(dirty_page) = self.cache.insert(page_id, page) {
966 let evicted_id = dirty_page.page_id();
967 self.write_page_raw(evicted_id, &dirty_page)?;
968 }
969 self.cache.mark_dirty(page_id);
970 }
971
972 Ok(super::ExtentId {
973 start_page,
974 n_pages,
975 })
976 }
977
978 pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
980 if self.config.read_only {
981 return Err(PagerError::ReadOnly);
982 }
983
984 self.cache.remove(page_id);
986
987 let mut freelist = self.freelist_write()?;
989 freelist.free(page_id);
990
991 *self.header_dirty_lock()? = true;
992
993 Ok(())
994 }
995
996 pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
998 Ok(self.header_read()?.clone())
999 }
1000
1001 pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
1002 Ok(self.header_read()?.physical)
1003 }
1004
1005 pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
1006 if self.config.read_only {
1007 return Err(PagerError::ReadOnly);
1008 }
1009
1010 let mut header = self.header_write()?;
1011 header.physical = physical;
1012 *self.header_dirty_lock()? = true;
1013 Ok(())
1014 }
1015
1016 pub fn page_count(&self) -> Result<u32, PagerError> {
1018 Ok(self.header_read()?.page_count)
1019 }
1020
1021 pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
1033 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1034 *slot = Some(wal);
1035 }
1036
1037 pub fn clear_wal_writer(&self) {
1039 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1040 *slot = None;
1041 }
1042
1043 pub fn has_wal_writer(&self) -> bool {
1045 self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1046 }
1047
1048 pub fn flush(&self) -> Result<(), PagerError> {
1050 if self.config.read_only {
1051 return Ok(());
1052 }
1053
1054 let trunks = {
1056 let mut freelist = self.freelist_write()?;
1057 if freelist.is_dirty() {
1058 let mut header = self.header_write()?;
1059 let trunks = freelist.flush_to_trunks(0, || {
1060 let id = header.page_count;
1061 header.page_count += 1;
1062 id
1063 });
1064 header.freelist_head = freelist.trunk_head();
1065 *self.header_dirty_lock()? = true;
1066 freelist.mark_clean();
1067 trunks
1068 } else {
1069 Vec::new()
1070 }
1071 };
1072
1073 for trunk in trunks {
1074 let page_id = trunk.page_id();
1075 self.cache.insert(page_id, trunk);
1076 self.cache.mark_dirty(page_id);
1077 }
1078
1079 let dirty_pages = self.cache.flush_dirty();
1081 if !dirty_pages.is_empty() {
1082 let max_lsn = dirty_pages
1087 .iter()
1088 .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1089 .max()
1090 .unwrap_or(0);
1091 if max_lsn > 0 {
1092 if let Ok(slot) = self.wal.read() {
1093 if let Some(wal) = slot.as_ref() {
1094 let wal = Arc::clone(wal);
1095 drop(slot);
1099 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1100 wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1101 }
1102 }
1103 }
1104 self.write_pages_through_dwb(&dirty_pages)?;
1105 }
1106
1107 if *self.header_dirty_lock()? {
1109 self.write_header()?;
1110 }
1111
1112 Ok(())
1113 }
1114
1115 pub fn sync(&self) -> Result<(), PagerError> {
1117 self.flush()?;
1118
1119 let file = self.file_lock()?;
1120 file.sync_all()?;
1121
1122 Ok(())
1123 }
1124
1125 pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1127 self.cache.stats()
1128 }
1129
1130 pub fn dirty_page_count(&self) -> usize {
1132 self.cache.dirty_count()
1133 }
1134
1135 pub fn dirty_fraction(&self) -> f64 {
1139 let capacity = self.cache.capacity().max(1) as f64;
1140 self.cache.dirty_count() as f64 / capacity
1141 }
1142
1143 pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1147 if self.config.read_only || max == 0 {
1148 return Ok(0);
1149 }
1150 let dirty_pages = self.cache.flush_some_dirty(max);
1151 if dirty_pages.is_empty() {
1152 return Ok(0);
1153 }
1154 let count = dirty_pages.len();
1155 for (page_id, page) in dirty_pages {
1162 self.write_page(page_id, page)?;
1163 }
1164 Ok(count)
1165 }
1166
1167 pub fn path(&self) -> &Path {
1169 &self.path
1170 }
1171
1172 pub fn is_read_only(&self) -> bool {
1174 self.config.read_only
1175 }
1176
1177 pub fn file_size(&self) -> Result<u64, PagerError> {
1179 let file = self.file_lock()?;
1180 Ok(file.metadata()?.len())
1181 }
1182
1183 pub fn prefetch_hint(&self, page_id: u32) {
1191 if let Ok(file) = self.file_lock() {
1192 let _ = crate::storage::btree::prefetch::prefetch_page(
1193 &file,
1194 page_id as u64,
1195 PAGE_SIZE as u32,
1196 );
1197 }
1198 }
1199
1200 fn shadow_path(db_path: &Path) -> PathBuf {
1204 let mut p = db_path.to_path_buf().into_os_string();
1205 p.push("-hdr");
1206 PathBuf::from(p)
1207 }
1208
1209 fn meta_shadow_path(db_path: &Path) -> PathBuf {
1211 let mut p = db_path.to_path_buf().into_os_string();
1212 p.push("-meta");
1213 PathBuf::from(p)
1214 }
1215
1216 fn dwb_path(db_path: &Path) -> PathBuf {
1218 let mut p = db_path.to_path_buf().into_os_string();
1219 p.push("-dwb");
1220 PathBuf::from(p)
1221 }
1222
1223 fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1228 Ok(OpenOptions::new()
1229 .read(true)
1230 .write(true)
1231 .create(true)
1232 .truncate(false)
1233 .open(Self::dwb_path(db_path))?)
1234 }
1235
1236 fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1238 file.set_len(0)?;
1239 file.seek(SeekFrom::Start(0))?;
1240 file.sync_all()?;
1241 Ok(())
1242 }
1243
1244 fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1246 if self.config.read_only {
1247 return Ok(());
1248 }
1249 let shadow = Self::shadow_path(&self.path);
1250 let mut f = File::create(&shadow)?;
1251 f.write_all(page.as_bytes())?;
1252 f.sync_all()?;
1253 Ok(())
1254 }
1255
1256 fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1258 let shadow = Self::shadow_path(&self.path);
1259 if !shadow.exists() {
1260 return Err(PagerError::InvalidDatabase(
1261 "Page 0 corrupted and no header shadow found".into(),
1262 ));
1263 }
1264 let mut f = File::open(&shadow)?;
1265 let mut buf = [0u8; PAGE_SIZE];
1266 f.read_exact(&mut buf)?;
1267 let page = Page::from_bytes(buf);
1268
1269 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1271 if magic != MAGIC_BYTES {
1272 return Err(PagerError::InvalidDatabase(
1273 "Header shadow also corrupted".into(),
1274 ));
1275 }
1276
1277 if !self.config.read_only {
1279 self.write_page_raw(0, &page)?;
1280 let file = self.file_lock()?;
1281 file.sync_all()?;
1282 }
1283
1284 Ok(page)
1285 }
1286
1287 pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1297 if self.config.read_only {
1298 return Ok(());
1299 }
1300 let shadow = Self::meta_shadow_path(&self.path);
1301 if crate::physical::fold_pager_meta_enabled() {
1302 let _ = std::fs::remove_file(&shadow);
1305 return Ok(());
1306 }
1307 let mut f = File::create(&shadow)?;
1308 f.write_all(page.as_bytes())?;
1309 f.sync_all()?;
1310 Ok(())
1311 }
1312
1313 pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1315 let shadow = Self::meta_shadow_path(&self.path);
1316 if !shadow.exists() {
1317 return Err(PagerError::InvalidDatabase(
1318 "Page 1 corrupted and no metadata shadow found".into(),
1319 ));
1320 }
1321 let mut f = File::open(&shadow)?;
1322 let mut buf = [0u8; PAGE_SIZE];
1323 f.read_exact(&mut buf)?;
1324 let page = Page::from_bytes(buf);
1325
1326 if !self.config.read_only {
1328 self.write_page_raw(1, &page)?;
1329 let file = self.file_lock()?;
1330 file.sync_all()?;
1331 }
1332
1333 Ok(page)
1334 }
1335
1336 fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1343 if let Some(dwb_mutex) = &self.dwb_file {
1344 let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1345
1346 let entry_size = 4 + PAGE_SIZE; let header_len = 4 + 4 + 4; let total = header_len + pages.len() * entry_size;
1351 let mut buf = Vec::with_capacity(total);
1352
1353 buf.extend_from_slice(&DWB_MAGIC);
1355 buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1356 buf.extend_from_slice(&[0u8; 4]); for (page_id, page) in pages {
1360 buf.extend_from_slice(&page_id.to_le_bytes());
1361 buf.extend_from_slice(page.as_bytes());
1362 }
1363
1364 let checksum = super::super::crc32::crc32(&buf[header_len..]);
1366 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1367
1368 dwb.seek(SeekFrom::Start(0))?;
1370 dwb.write_all(&buf)?;
1371 dwb.set_len(buf.len() as u64)?;
1372 dwb.sync_all()?;
1373
1374 for (page_id, page) in pages {
1376 self.write_page_raw(*page_id, page)?;
1377 }
1378
1379 Self::clear_dwb_file(&mut dwb)?;
1381
1382 Ok(())
1383 } else {
1384 for (page_id, page) in pages {
1386 self.write_page_raw(*page_id, page)?;
1387 }
1388 Ok(())
1389 }
1390 }
1391
1392 fn recover_from_dwb(&self) -> Result<(), PagerError> {
1397 let dwb_path = Self::dwb_path(&self.path);
1398 if !dwb_path.exists() {
1399 return Ok(());
1400 }
1401
1402 if let Some(dwb_mutex) = &self.dwb_file {
1403 let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1404 return self.recover_from_dwb_file(&mut file);
1405 }
1406
1407 let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1408 self.recover_from_dwb_file(&mut file)
1409 }
1410
1411 fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1412 file.seek(SeekFrom::Start(0))?;
1413 let len = file.metadata()?.len();
1414 if len < 12 {
1415 return Self::clear_dwb_file(file);
1417 }
1418
1419 let mut buf = vec![0u8; len as usize];
1420 file.read_exact(&mut buf)?;
1421
1422 if buf[0..4] != DWB_MAGIC {
1424 return Self::clear_dwb_file(file);
1426 }
1427
1428 let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1429 let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1430
1431 let header_len = 12;
1432 let entry_size = 4 + PAGE_SIZE;
1433 let expected_len = header_len + count * entry_size;
1434
1435 if buf.len() < expected_len {
1436 return Self::clear_dwb_file(file);
1438 }
1439
1440 let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1442 if computed != stored_checksum {
1443 return Self::clear_dwb_file(file);
1445 }
1446
1447 let mut offset = header_len;
1449 for _ in 0..count {
1450 let page_id = u32::from_le_bytes([
1451 buf[offset],
1452 buf[offset + 1],
1453 buf[offset + 2],
1454 buf[offset + 3],
1455 ]);
1456 offset += 4;
1457
1458 let mut page_data = [0u8; PAGE_SIZE];
1459 page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1460 offset += PAGE_SIZE;
1461
1462 let page = Page::from_bytes(page_data);
1463 self.write_page_raw(page_id, &page)?;
1464 }
1465
1466 {
1468 let file = self.file_lock()?;
1469 file.sync_all()?;
1470 }
1471
1472 Self::clear_dwb_file(file)
1473 }
1474
1475 pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1477 self.write_header()?;
1478 let file = self.file_lock()?;
1479 file.sync_all()?;
1480 Ok(())
1481 }
1482
1483 pub fn set_checkpoint_in_progress(
1485 &self,
1486 in_progress: bool,
1487 target_lsn: u64,
1488 ) -> Result<(), PagerError> {
1489 let mut header = self.header_write()?;
1490 header.checkpoint_in_progress = in_progress;
1491 header.checkpoint_target_lsn = target_lsn;
1492 *self.header_dirty_lock()? = true;
1493 drop(header);
1494 self.write_header_and_sync()
1495 }
1496
1497 pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1499 let mut header = self.header_write()?;
1500 header.checkpoint_lsn = lsn;
1501 header.checkpoint_in_progress = false;
1502 header.checkpoint_target_lsn = 0;
1503 *self.header_dirty_lock()? = true;
1504 drop(header);
1505 self.write_header_and_sync()
1506 }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511 use super::*;
1512
1513 fn temp_db_path(name: &str) -> PathBuf {
1514 std::env::temp_dir().join(format!(
1515 "reddb-pager-{}-{}-{}.rdb",
1516 name,
1517 std::process::id(),
1518 crate::utils::now_unix_nanos()
1519 ))
1520 }
1521
1522 #[test]
1523 fn open_refuses_future_database_version() {
1524 let path = temp_db_path("future-version");
1525 let pager = Pager::open_default(&path).unwrap();
1526 drop(pager);
1527
1528 let mut future_header = Page::new_header_page(1);
1529 future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1530 .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1531 future_header.update_checksum();
1532
1533 let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1534 file.seek(SeekFrom::Start(0)).unwrap();
1535 file.write_all(future_header.as_bytes()).unwrap();
1536 file.sync_all().unwrap();
1537 drop(file);
1538
1539 let err = match Pager::open_default(&path) {
1540 Ok(_) => panic!("future database version should be rejected"),
1541 Err(err) => err,
1542 };
1543 match err {
1544 PagerError::InvalidDatabase(msg) => {
1545 assert!(msg.contains("newer than supported"));
1546 }
1547 other => panic!("expected InvalidDatabase, got {other:?}"),
1548 }
1549
1550 let _ = std::fs::remove_file(&path);
1551 let _ = std::fs::remove_file(Pager::shadow_path(&path));
1552 let _ = std::fs::remove_file(Pager::dwb_path(&path));
1553 }
1554}