1use super::*;
2
3const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7 pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9 let path = path.as_ref().to_path_buf();
10 let exists = path.exists();
11
12 if !exists && !config.create {
13 return Err(PagerError::InvalidDatabase(
14 "Database does not exist".into(),
15 ));
16 }
17
18 if !exists && config.read_only {
19 return Err(PagerError::InvalidDatabase(
20 "Cannot create read-only database".into(),
21 ));
22 }
23
24 let file = OpenOptions::new()
27 .read(true)
28 .write(!config.read_only)
29 .create(config.create && !config.read_only)
30 .open(&path)?;
31
32 #[cfg(unix)]
38 {
39 use std::os::unix::fs::MetadataExt;
40 if let Ok(meta) = file.metadata() {
41 let fs_block_size = meta.blksize();
42 if Self::page_size_misaligned_with_block(PAGE_SIZE, fs_block_size) {
43 tracing::warn!(
44 page_size = PAGE_SIZE,
45 fs_block_size,
46 path = %path.display(),
47 "database page size is not a multiple of the filesystem \
48 block size; page writes will straddle FS blocks \
49 (read-modify-write amplification). Diagnostic only — \
50 the page size is unchanged."
51 );
52 }
53 }
54 }
55
56 let lock_file = if !config.read_only {
58 let lf = OpenOptions::new().read(true).write(true).open(&path)?;
59 lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
60 Some(lf)
61 } else {
62 let lf = OpenOptions::new().read(true).open(&path)?;
63 match lf.try_lock_shared() {
64 Ok(_) => Some(lf),
65 Err(_) => None,
66 }
67 };
68
69 let fold_dwb = crate::physical::fold_dwb_into_wal_enabled();
76 let dwb_file = if config.double_write && !config.read_only && !fold_dwb {
77 let f = Self::open_dwb_file(&path)?;
78 Some(Mutex::new(f))
79 } else {
80 if fold_dwb && !config.read_only {
81 let _ = std::fs::remove_file(Self::dwb_path(&path));
82 }
83 None
84 };
85
86 let mut pager = Self {
87 path,
88 file: Mutex::new(file),
89 _lock_file: lock_file,
90 dwb_file,
91 cache: PageCache::new(config.cache_size),
92 freelist: RwLock::new(FreeList::new()),
93 header: RwLock::new(DatabaseHeader::default()),
94 config,
95 header_dirty: Mutex::new(false),
96 wal: RwLock::new(None),
97 encryption: None,
98 };
99
100 if exists {
101 pager.recover_from_dwb()?;
103 pager.load_header()?;
105 pager.bind_encryption_for_existing()?;
106 } else {
107 pager.initialize()?;
109 pager.bind_encryption_for_new()?;
110 }
111
112 Ok(pager)
113 }
114
115 pub(crate) fn page_size_misaligned_with_block(page_size: usize, fs_block_size: u64) -> bool {
122 fs_block_size != 0 && !(page_size as u64).is_multiple_of(fs_block_size)
123 }
124
125 fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
135 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
136 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
137
138 if self.page_count().unwrap_or(0) == 0 {
139 return self.bind_encryption_for_new();
140 }
141 let header_page = self.read_page_no_checksum(0)?;
142 let data = header_page.as_bytes();
143 let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
144 && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
145
146 let key = self.config.encryption.clone();
147 match (has_marker, key) {
148 (true, Some(key)) => {
149 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
150 let header =
151 crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
152 .map_err(|e| {
153 PagerError::InvalidDatabase(format!(
154 "encryption header parse failed: {e}"
155 ))
156 })?;
157 if !header.validate(&key) {
158 return Err(PagerError::InvalidKey);
159 }
160 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
161 self.encryption = Some((encryptor, header));
162 Ok(())
163 }
164 (true, None) => Err(PagerError::EncryptionRequired),
165 (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
166 (false, None) => Ok(()),
167 }
168 }
169
170 fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
173 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
174 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
175
176 let Some(key) = self.config.encryption.clone() else {
177 return Ok(());
178 };
179 let header = crate::storage::encryption::EncryptionHeader::new(&key);
180 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
181
182 if self.page_count().unwrap_or(0) > 0 {
185 let mut page = self.read_page_no_checksum(0)?;
186 let data = page.as_bytes_mut();
187 data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
188 .copy_from_slice(ENCRYPTION_MARKER);
189 let header_bytes = header.to_bytes();
190 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
191 data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
192 self.write_page_no_checksum(0, page)?;
193 }
194 self.encryption = Some((encryptor, header));
195 Ok(())
196 }
197
198 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
200 Self::open(path, PagerConfig::default())
201 }
202
203 fn initialize(&self) -> Result<(), PagerError> {
205 if self.config.read_only {
206 return Err(PagerError::ReadOnly);
207 }
208
209 let initial_page_count = 3;
213 let header_page = Page::new_header_page(initial_page_count);
214 self.header_write()?.page_count = initial_page_count;
215
216 self.write_page_raw(0, &header_page)?;
219 let mut metadata_page = Page::new(PageType::Header, 1);
220 metadata_page.update_checksum();
221 self.write_page_raw(1, &metadata_page)?;
222 let mut vault_page = Page::new(PageType::Vault, 2);
223 vault_page.update_checksum();
224 self.write_page_raw(2, &vault_page)?;
225
226 self.sync()?;
228
229 Ok(())
230 }
231
232 fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
234 self.header.write().map_err(|_| PagerError::LockPoisoned)
235 }
236
237 fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
239 self.header.read().map_err(|_| PagerError::LockPoisoned)
240 }
241
242 fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
244 self.freelist.write().map_err(|_| PagerError::LockPoisoned)
245 }
246
247 fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
249 self.file.lock().map_err(|_| PagerError::LockPoisoned)
250 }
251
252 fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
254 self.header_dirty
255 .lock()
256 .map_err(|_| PagerError::LockPoisoned)
257 }
258
259 fn load_header(&self) -> Result<(), PagerError> {
261 let header_page = match self.read_page_raw(0) {
263 Ok(page) => {
264 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
266 if magic == MAGIC_BYTES {
267 page
268 } else {
269 self.recover_header_from_shadow()?
271 }
272 }
273 Err(_) => self.recover_header_from_shadow()?,
274 };
275
276 let data = header_page.as_bytes();
278 let version = u32::from_le_bytes([
279 data[HEADER_SIZE + 4],
280 data[HEADER_SIZE + 5],
281 data[HEADER_SIZE + 6],
282 data[HEADER_SIZE + 7],
283 ]);
284
285 let page_size = u32::from_le_bytes([
286 data[HEADER_SIZE + 8],
287 data[HEADER_SIZE + 9],
288 data[HEADER_SIZE + 10],
289 data[HEADER_SIZE + 11],
290 ]);
291
292 if page_size != PAGE_SIZE as u32 {
293 return Err(PagerError::InvalidDatabase(format!(
294 "Unsupported page size: {}",
295 page_size
296 )));
297 }
298 if version > DB_VERSION {
299 return Err(PagerError::InvalidDatabase(format!(
300 "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
301 )));
302 }
303
304 let page_count = u32::from_le_bytes([
305 data[HEADER_SIZE + 12],
306 data[HEADER_SIZE + 13],
307 data[HEADER_SIZE + 14],
308 data[HEADER_SIZE + 15],
309 ]);
310
311 let freelist_head = u32::from_le_bytes([
312 data[HEADER_SIZE + 16],
313 data[HEADER_SIZE + 17],
314 data[HEADER_SIZE + 18],
315 data[HEADER_SIZE + 19],
316 ]);
317
318 let schema_version = u32::from_le_bytes([
319 data[HEADER_SIZE + 20],
320 data[HEADER_SIZE + 21],
321 data[HEADER_SIZE + 22],
322 data[HEADER_SIZE + 23],
323 ]);
324
325 let checkpoint_lsn = u64::from_le_bytes([
326 data[HEADER_SIZE + 24],
327 data[HEADER_SIZE + 25],
328 data[HEADER_SIZE + 26],
329 data[HEADER_SIZE + 27],
330 data[HEADER_SIZE + 28],
331 data[HEADER_SIZE + 29],
332 data[HEADER_SIZE + 30],
333 data[HEADER_SIZE + 31],
334 ]);
335 let physical_format_version = u32::from_le_bytes([
336 data[HEADER_SIZE + 32],
337 data[HEADER_SIZE + 33],
338 data[HEADER_SIZE + 34],
339 data[HEADER_SIZE + 35],
340 ]);
341 let physical_sequence = u64::from_le_bytes([
342 data[HEADER_SIZE + 36],
343 data[HEADER_SIZE + 37],
344 data[HEADER_SIZE + 38],
345 data[HEADER_SIZE + 39],
346 data[HEADER_SIZE + 40],
347 data[HEADER_SIZE + 41],
348 data[HEADER_SIZE + 42],
349 data[HEADER_SIZE + 43],
350 ]);
351 let manifest_root = u64::from_le_bytes([
352 data[HEADER_SIZE + 44],
353 data[HEADER_SIZE + 45],
354 data[HEADER_SIZE + 46],
355 data[HEADER_SIZE + 47],
356 data[HEADER_SIZE + 48],
357 data[HEADER_SIZE + 49],
358 data[HEADER_SIZE + 50],
359 data[HEADER_SIZE + 51],
360 ]);
361 let manifest_oldest_root = u64::from_le_bytes([
362 data[HEADER_SIZE + 52],
363 data[HEADER_SIZE + 53],
364 data[HEADER_SIZE + 54],
365 data[HEADER_SIZE + 55],
366 data[HEADER_SIZE + 56],
367 data[HEADER_SIZE + 57],
368 data[HEADER_SIZE + 58],
369 data[HEADER_SIZE + 59],
370 ]);
371 let free_set_root = u64::from_le_bytes([
372 data[HEADER_SIZE + 60],
373 data[HEADER_SIZE + 61],
374 data[HEADER_SIZE + 62],
375 data[HEADER_SIZE + 63],
376 data[HEADER_SIZE + 64],
377 data[HEADER_SIZE + 65],
378 data[HEADER_SIZE + 66],
379 data[HEADER_SIZE + 67],
380 ]);
381 let manifest_page = u32::from_le_bytes([
382 data[HEADER_SIZE + 68],
383 data[HEADER_SIZE + 69],
384 data[HEADER_SIZE + 70],
385 data[HEADER_SIZE + 71],
386 ]);
387 let manifest_checksum = u64::from_le_bytes([
388 data[HEADER_SIZE + 72],
389 data[HEADER_SIZE + 73],
390 data[HEADER_SIZE + 74],
391 data[HEADER_SIZE + 75],
392 data[HEADER_SIZE + 76],
393 data[HEADER_SIZE + 77],
394 data[HEADER_SIZE + 78],
395 data[HEADER_SIZE + 79],
396 ]);
397 let collection_roots_page = u32::from_le_bytes([
398 data[HEADER_SIZE + 80],
399 data[HEADER_SIZE + 81],
400 data[HEADER_SIZE + 82],
401 data[HEADER_SIZE + 83],
402 ]);
403 let collection_roots_checksum = u64::from_le_bytes([
404 data[HEADER_SIZE + 84],
405 data[HEADER_SIZE + 85],
406 data[HEADER_SIZE + 86],
407 data[HEADER_SIZE + 87],
408 data[HEADER_SIZE + 88],
409 data[HEADER_SIZE + 89],
410 data[HEADER_SIZE + 90],
411 data[HEADER_SIZE + 91],
412 ]);
413 let collection_root_count = u32::from_le_bytes([
414 data[HEADER_SIZE + 92],
415 data[HEADER_SIZE + 93],
416 data[HEADER_SIZE + 94],
417 data[HEADER_SIZE + 95],
418 ]);
419 let snapshot_count = u32::from_le_bytes([
420 data[HEADER_SIZE + 96],
421 data[HEADER_SIZE + 97],
422 data[HEADER_SIZE + 98],
423 data[HEADER_SIZE + 99],
424 ]);
425 let index_count = u32::from_le_bytes([
426 data[HEADER_SIZE + 100],
427 data[HEADER_SIZE + 101],
428 data[HEADER_SIZE + 102],
429 data[HEADER_SIZE + 103],
430 ]);
431 let catalog_collection_count = u32::from_le_bytes([
432 data[HEADER_SIZE + 104],
433 data[HEADER_SIZE + 105],
434 data[HEADER_SIZE + 106],
435 data[HEADER_SIZE + 107],
436 ]);
437 let catalog_total_entities = u64::from_le_bytes([
438 data[HEADER_SIZE + 108],
439 data[HEADER_SIZE + 109],
440 data[HEADER_SIZE + 110],
441 data[HEADER_SIZE + 111],
442 data[HEADER_SIZE + 112],
443 data[HEADER_SIZE + 113],
444 data[HEADER_SIZE + 114],
445 data[HEADER_SIZE + 115],
446 ]);
447 let export_count = u32::from_le_bytes([
448 data[HEADER_SIZE + 116],
449 data[HEADER_SIZE + 117],
450 data[HEADER_SIZE + 118],
451 data[HEADER_SIZE + 119],
452 ]);
453 let graph_projection_count = u32::from_le_bytes([
454 data[HEADER_SIZE + 120],
455 data[HEADER_SIZE + 121],
456 data[HEADER_SIZE + 122],
457 data[HEADER_SIZE + 123],
458 ]);
459 let analytics_job_count = u32::from_le_bytes([
460 data[HEADER_SIZE + 124],
461 data[HEADER_SIZE + 125],
462 data[HEADER_SIZE + 126],
463 data[HEADER_SIZE + 127],
464 ]);
465 let manifest_event_count = u32::from_le_bytes([
466 data[HEADER_SIZE + 128],
467 data[HEADER_SIZE + 129],
468 data[HEADER_SIZE + 130],
469 data[HEADER_SIZE + 131],
470 ]);
471 let registry_page = u32::from_le_bytes([
472 data[HEADER_SIZE + 132],
473 data[HEADER_SIZE + 133],
474 data[HEADER_SIZE + 134],
475 data[HEADER_SIZE + 135],
476 ]);
477 let registry_checksum = u64::from_le_bytes([
478 data[HEADER_SIZE + 136],
479 data[HEADER_SIZE + 137],
480 data[HEADER_SIZE + 138],
481 data[HEADER_SIZE + 139],
482 data[HEADER_SIZE + 140],
483 data[HEADER_SIZE + 141],
484 data[HEADER_SIZE + 142],
485 data[HEADER_SIZE + 143],
486 ]);
487 let recovery_page = u32::from_le_bytes([
488 data[HEADER_SIZE + 144],
489 data[HEADER_SIZE + 145],
490 data[HEADER_SIZE + 146],
491 data[HEADER_SIZE + 147],
492 ]);
493 let recovery_checksum = u64::from_le_bytes([
494 data[HEADER_SIZE + 148],
495 data[HEADER_SIZE + 149],
496 data[HEADER_SIZE + 150],
497 data[HEADER_SIZE + 151],
498 data[HEADER_SIZE + 152],
499 data[HEADER_SIZE + 153],
500 data[HEADER_SIZE + 154],
501 data[HEADER_SIZE + 155],
502 ]);
503 let catalog_page = u32::from_le_bytes([
504 data[HEADER_SIZE + 156],
505 data[HEADER_SIZE + 157],
506 data[HEADER_SIZE + 158],
507 data[HEADER_SIZE + 159],
508 ]);
509 let catalog_checksum = u64::from_le_bytes([
510 data[HEADER_SIZE + 160],
511 data[HEADER_SIZE + 161],
512 data[HEADER_SIZE + 162],
513 data[HEADER_SIZE + 163],
514 data[HEADER_SIZE + 164],
515 data[HEADER_SIZE + 165],
516 data[HEADER_SIZE + 166],
517 data[HEADER_SIZE + 167],
518 ]);
519 let metadata_state_page = u32::from_le_bytes([
520 data[HEADER_SIZE + 168],
521 data[HEADER_SIZE + 169],
522 data[HEADER_SIZE + 170],
523 data[HEADER_SIZE + 171],
524 ]);
525 let metadata_state_checksum = u64::from_le_bytes([
526 data[HEADER_SIZE + 172],
527 data[HEADER_SIZE + 173],
528 data[HEADER_SIZE + 174],
529 data[HEADER_SIZE + 175],
530 data[HEADER_SIZE + 176],
531 data[HEADER_SIZE + 177],
532 data[HEADER_SIZE + 178],
533 data[HEADER_SIZE + 179],
534 ]);
535 let vector_artifact_page = u32::from_le_bytes([
536 data[HEADER_SIZE + 180],
537 data[HEADER_SIZE + 181],
538 data[HEADER_SIZE + 182],
539 data[HEADER_SIZE + 183],
540 ]);
541 let vector_artifact_checksum = u64::from_le_bytes([
542 data[HEADER_SIZE + 184],
543 data[HEADER_SIZE + 185],
544 data[HEADER_SIZE + 186],
545 data[HEADER_SIZE + 187],
546 data[HEADER_SIZE + 188],
547 data[HEADER_SIZE + 189],
548 data[HEADER_SIZE + 190],
549 data[HEADER_SIZE + 191],
550 ]);
551
552 let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
554 let checkpoint_target_lsn = u64::from_le_bytes([
555 data[HEADER_SIZE + 193],
556 data[HEADER_SIZE + 194],
557 data[HEADER_SIZE + 195],
558 data[HEADER_SIZE + 196],
559 data[HEADER_SIZE + 197],
560 data[HEADER_SIZE + 198],
561 data[HEADER_SIZE + 199],
562 data[HEADER_SIZE + 200],
563 ]);
564
565 {
567 let mut header = self.header_write()?;
568 header.version = version;
569 header.page_size = page_size;
570 header.page_count = page_count;
571 header.freelist_head = freelist_head;
572 header.schema_version = schema_version;
573 header.checkpoint_lsn = checkpoint_lsn;
574 header.checkpoint_in_progress = checkpoint_in_progress;
575 header.checkpoint_target_lsn = checkpoint_target_lsn;
576 header.physical = PhysicalFileHeader {
577 format_version: physical_format_version,
578 sequence: physical_sequence,
579 manifest_oldest_root,
580 manifest_root,
581 free_set_root,
582 manifest_page,
583 manifest_checksum,
584 collection_roots_page,
585 collection_roots_checksum,
586 collection_root_count,
587 snapshot_count,
588 index_count,
589 catalog_collection_count,
590 catalog_total_entities,
591 export_count,
592 graph_projection_count,
593 analytics_job_count,
594 manifest_event_count,
595 registry_page,
596 registry_checksum,
597 recovery_page,
598 recovery_checksum,
599 catalog_page,
600 catalog_checksum,
601 metadata_state_page,
602 metadata_state_checksum,
603 vector_artifact_page,
604 vector_artifact_checksum,
605 };
606 }
607
608 {
610 let mut freelist = self.freelist_write()?;
611 *freelist = FreeList::from_header(freelist_head, 0);
612 }
613
614 Ok(())
615 }
616
617 fn write_header(&self) -> Result<(), PagerError> {
622 if self.config.read_only {
623 return Err(PagerError::ReadOnly);
624 }
625
626 let header = self.header_read()?;
627
628 let mut page = if let Some(cached) = self.cache.get(0) {
631 cached
632 } else {
633 let file = self.file_lock()?;
635 let len = file.metadata().map(|m| m.len()).unwrap_or(0);
636 drop(file);
637
638 if len >= PAGE_SIZE as u64 {
639 self.read_page_raw(0)?
640 } else {
641 Page::new(PageType::Header, 0)
643 }
644 };
645
646 let data = page.as_bytes_mut();
647
648 data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
650
651 data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
653 data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
654 data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
655 data[HEADER_SIZE + 16..HEADER_SIZE + 20]
656 .copy_from_slice(&header.freelist_head.to_le_bytes());
657 data[HEADER_SIZE + 20..HEADER_SIZE + 24]
658 .copy_from_slice(&header.schema_version.to_le_bytes());
659 data[HEADER_SIZE + 24..HEADER_SIZE + 32]
660 .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
661 data[HEADER_SIZE + 32..HEADER_SIZE + 36]
662 .copy_from_slice(&header.physical.format_version.to_le_bytes());
663 data[HEADER_SIZE + 36..HEADER_SIZE + 44]
664 .copy_from_slice(&header.physical.sequence.to_le_bytes());
665 data[HEADER_SIZE + 44..HEADER_SIZE + 52]
666 .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
667 data[HEADER_SIZE + 52..HEADER_SIZE + 60]
668 .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
669 data[HEADER_SIZE + 60..HEADER_SIZE + 68]
670 .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
671 data[HEADER_SIZE + 68..HEADER_SIZE + 72]
672 .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
673 data[HEADER_SIZE + 72..HEADER_SIZE + 80]
674 .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
675 data[HEADER_SIZE + 80..HEADER_SIZE + 84]
676 .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
677 data[HEADER_SIZE + 84..HEADER_SIZE + 92]
678 .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
679 data[HEADER_SIZE + 92..HEADER_SIZE + 96]
680 .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
681 data[HEADER_SIZE + 96..HEADER_SIZE + 100]
682 .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
683 data[HEADER_SIZE + 100..HEADER_SIZE + 104]
684 .copy_from_slice(&header.physical.index_count.to_le_bytes());
685 data[HEADER_SIZE + 104..HEADER_SIZE + 108]
686 .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
687 data[HEADER_SIZE + 108..HEADER_SIZE + 116]
688 .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
689 data[HEADER_SIZE + 116..HEADER_SIZE + 120]
690 .copy_from_slice(&header.physical.export_count.to_le_bytes());
691 data[HEADER_SIZE + 120..HEADER_SIZE + 124]
692 .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
693 data[HEADER_SIZE + 124..HEADER_SIZE + 128]
694 .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
695 data[HEADER_SIZE + 128..HEADER_SIZE + 132]
696 .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
697 data[HEADER_SIZE + 132..HEADER_SIZE + 136]
698 .copy_from_slice(&header.physical.registry_page.to_le_bytes());
699 data[HEADER_SIZE + 136..HEADER_SIZE + 144]
700 .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
701 data[HEADER_SIZE + 144..HEADER_SIZE + 148]
702 .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
703 data[HEADER_SIZE + 148..HEADER_SIZE + 156]
704 .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
705 data[HEADER_SIZE + 156..HEADER_SIZE + 160]
706 .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
707 data[HEADER_SIZE + 160..HEADER_SIZE + 168]
708 .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
709 data[HEADER_SIZE + 168..HEADER_SIZE + 172]
710 .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
711 data[HEADER_SIZE + 172..HEADER_SIZE + 180]
712 .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
713 data[HEADER_SIZE + 180..HEADER_SIZE + 184]
714 .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
715 data[HEADER_SIZE + 184..HEADER_SIZE + 192]
716 .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
717
718 data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
720 data[HEADER_SIZE + 193..HEADER_SIZE + 201]
721 .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
722
723 page.update_checksum();
724
725 self.write_header_shadow(&page)?;
727
728 self.write_page_raw(0, &page)?;
729 *self.header_dirty_lock()? = false;
730
731 Ok(())
732 }
733
734 fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
736 let mut file = self.file_lock()?;
737 let offset = (page_id as u64) * (PAGE_SIZE as u64);
738
739 file.seek(SeekFrom::Start(offset))?;
740
741 let mut buf = [0u8; PAGE_SIZE];
742 file.read_exact(&mut buf)?;
743
744 let page = Page::from_bytes(buf);
745
746 if self.config.verify_checksums {
748 page.verify_checksum()?;
749 }
750
751 Ok(page)
752 }
753
754 fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
756 if self.config.read_only {
757 return Err(PagerError::ReadOnly);
758 }
759
760 let mut file = self.file_lock()?;
761 let offset = (page_id as u64) * (PAGE_SIZE as u64);
762
763 file.seek(SeekFrom::Start(offset))?;
764 file.write_all(page.as_bytes())?;
765
766 Ok(())
767 }
768
769 pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
771 if let Some(page) = self.cache.get(page_id) {
773 return Ok(page);
774 }
775
776 let page = self.read_page_raw(page_id)?;
778
779 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
781 let evicted_id = dirty_page.page_id();
783 self.write_page_raw(evicted_id, &dirty_page)?;
784 }
785
786 Ok(page)
787 }
788
789 pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
794 if let Some(page) = self.cache.get(page_id) {
796 return Ok(page);
797 }
798
799 let mut file = self.file_lock()?;
801 let offset = (page_id as u64) * (PAGE_SIZE as u64);
802
803 file.seek(SeekFrom::Start(offset))?;
804
805 let mut buf = [0u8; PAGE_SIZE];
806 file.read_exact(&mut buf)?;
807 drop(file);
808
809 let page = Page::from_bytes(buf);
810
811 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
813 let evicted_id = dirty_page.page_id();
815 self.write_page_raw(evicted_id, &dirty_page)?;
816 }
817
818 Ok(page)
819 }
820
821 pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
823 if self.config.read_only {
824 return Err(PagerError::ReadOnly);
825 }
826
827 page.update_checksum();
829
830 if let Some(dirty_page) = self.cache.insert(page_id, page) {
835 let evicted_id = dirty_page.page_id();
836 self.write_page_raw(evicted_id, &dirty_page)?;
837 }
838 self.cache.mark_dirty(page_id);
839
840 Ok(())
841 }
842
843 pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
848 if page_id == 0 || self.encryption.is_none() {
849 return self.read_page(page_id);
850 }
851 let raw = self.read_page_no_checksum(page_id)?;
852 let (enc, _) = self
853 .encryption
854 .as_ref()
855 .expect("encryption presence checked above");
856 let plaintext = enc
857 .decrypt(page_id, raw.as_bytes())
858 .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
859 let mut buf = [0u8; PAGE_SIZE];
860 let n = plaintext.len().min(PAGE_SIZE);
861 buf[..n].copy_from_slice(&plaintext[..n]);
862 Ok(Page::from_bytes(buf))
863 }
864
865 pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
870 if page_id == 0 || self.encryption.is_none() {
871 return self.write_page(page_id, page);
872 }
873 const OVERHEAD: usize = 12 + 16; let plaintext_len = PAGE_SIZE - OVERHEAD;
875 let plaintext = &page.as_bytes()[..plaintext_len];
876 let (enc, _) = self
877 .encryption
878 .as_ref()
879 .expect("encryption presence checked above");
880 let ciphertext = enc.encrypt(page_id, plaintext);
881 debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
882 let mut buf = [0u8; PAGE_SIZE];
883 buf.copy_from_slice(&ciphertext);
884 let cipher_page = Page::from_bytes(buf);
885 self.write_page_no_checksum(page_id, cipher_page)
886 }
887
888 pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
893 if self.config.read_only {
894 return Err(PagerError::ReadOnly);
895 }
896
897 if let Some(dirty_page) = self.cache.insert(page_id, page) {
900 let evicted_id = dirty_page.page_id();
901 self.write_page_raw(evicted_id, &dirty_page)?;
902 }
903 self.cache.mark_dirty(page_id);
904
905 Ok(())
906 }
907
908 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
910 if self.config.read_only {
911 return Err(PagerError::ReadOnly);
912 }
913
914 let page_id = {
916 let mut freelist = self.freelist_write()?;
917 if let Some(id) = freelist.allocate() {
918 id
919 } else if freelist.trunk_head() != 0 {
920 let trunk_id = freelist.trunk_head();
921 drop(freelist);
922
923 let trunk = self.read_page(trunk_id).map_err(|e| match e {
924 PagerError::PageNotFound(_) => {
925 PagerError::InvalidDatabase("Freelist trunk missing".to_string())
926 }
927 other => other,
928 })?;
929
930 let mut freelist = self.freelist_write()?;
931 freelist
932 .load_from_trunk(&trunk)
933 .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
934 let id = freelist.allocate().ok_or_else(|| {
935 PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
936 })?;
937
938 let mut header = self.header_write()?;
939 header.freelist_head = freelist.trunk_head();
940 *self.header_dirty_lock()? = true;
941
942 id
943 } else {
944 let mut header = self.header_write()?;
946 let id = header.page_count;
947 header.page_count += 1;
948 *self.header_dirty_lock()? = true;
949 id
950 }
951 };
952
953 let page = Page::new(page_type, page_id);
954
955 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
967 let evicted_id = dirty_page.page_id();
968 self.write_page_raw(evicted_id, &dirty_page)?;
969 }
970 self.cache.mark_dirty(page_id);
971
972 Ok(page)
973 }
974
975 pub fn reserve_contig_extent(&self, n_pages: u32) -> Result<super::ExtentId, PagerError> {
977 if self.config.read_only {
978 return Err(PagerError::ReadOnly);
979 }
980 if n_pages == 0 {
981 return Err(PagerError::InvalidDatabase(
982 "contiguous extent must reserve at least one page".to_string(),
983 ));
984 }
985
986 let start_page = {
987 let mut header = self.header_write()?;
988 let start = header.page_count;
989 header.page_count = header.page_count.checked_add(n_pages).ok_or_else(|| {
990 PagerError::InvalidDatabase("contiguous extent page count overflow".to_string())
991 })?;
992 *self.header_dirty_lock()? = true;
993 start
994 };
995
996 for page_id in start_page..start_page + n_pages {
997 let mut page = Page::new(PageType::Vector, page_id);
998 page.update_checksum();
999 if let Some(dirty_page) = self.cache.insert(page_id, page) {
1000 let evicted_id = dirty_page.page_id();
1001 self.write_page_raw(evicted_id, &dirty_page)?;
1002 }
1003 self.cache.mark_dirty(page_id);
1004 }
1005
1006 Ok(super::ExtentId {
1007 start_page,
1008 n_pages,
1009 })
1010 }
1011
1012 pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
1014 if self.config.read_only {
1015 return Err(PagerError::ReadOnly);
1016 }
1017
1018 self.cache.remove(page_id);
1020
1021 let mut freelist = self.freelist_write()?;
1023 freelist.free(page_id);
1024
1025 *self.header_dirty_lock()? = true;
1026
1027 Ok(())
1028 }
1029
1030 pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
1032 Ok(self.header_read()?.clone())
1033 }
1034
1035 pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
1036 Ok(self.header_read()?.physical)
1037 }
1038
1039 pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
1040 if self.config.read_only {
1041 return Err(PagerError::ReadOnly);
1042 }
1043
1044 let mut header = self.header_write()?;
1045 header.physical = physical;
1046 *self.header_dirty_lock()? = true;
1047 Ok(())
1048 }
1049
1050 pub fn page_count(&self) -> Result<u32, PagerError> {
1052 Ok(self.header_read()?.page_count)
1053 }
1054
1055 pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
1067 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1068 *slot = Some(wal);
1069 }
1070
1071 pub fn clear_wal_writer(&self) {
1073 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
1074 *slot = None;
1075 }
1076
1077 pub fn has_wal_writer(&self) -> bool {
1079 self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1080 }
1081
1082 pub fn flush(&self) -> Result<(), PagerError> {
1084 if self.config.read_only {
1085 return Ok(());
1086 }
1087
1088 let trunks = {
1090 let mut freelist = self.freelist_write()?;
1091 if freelist.is_dirty() {
1092 let mut header = self.header_write()?;
1093 let trunks = freelist.flush_to_trunks(0, || {
1094 let id = header.page_count;
1095 header.page_count += 1;
1096 id
1097 });
1098 header.freelist_head = freelist.trunk_head();
1099 *self.header_dirty_lock()? = true;
1100 freelist.mark_clean();
1101 trunks
1102 } else {
1103 Vec::new()
1104 }
1105 };
1106
1107 for trunk in trunks {
1108 let page_id = trunk.page_id();
1109 self.cache.insert(page_id, trunk);
1110 self.cache.mark_dirty(page_id);
1111 }
1112
1113 let dirty_pages = self.cache.flush_dirty();
1115 if !dirty_pages.is_empty() {
1116 let max_lsn = dirty_pages
1121 .iter()
1122 .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1123 .max()
1124 .unwrap_or(0);
1125 if max_lsn > 0 {
1126 if let Ok(slot) = self.wal.read() {
1127 if let Some(wal) = slot.as_ref() {
1128 let wal = Arc::clone(wal);
1129 drop(slot);
1133 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1134 wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1135 }
1136 }
1137 }
1138 self.write_pages_through_dwb(&dirty_pages)?;
1139 }
1140
1141 if *self.header_dirty_lock()? {
1143 self.write_header()?;
1144 }
1145
1146 Ok(())
1147 }
1148
1149 pub fn sync(&self) -> Result<(), PagerError> {
1151 self.flush()?;
1152
1153 let file = self.file_lock()?;
1154 file.sync_all()?;
1155
1156 Ok(())
1157 }
1158
1159 pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1161 self.cache.stats()
1162 }
1163
1164 pub fn dirty_page_count(&self) -> usize {
1166 self.cache.dirty_count()
1167 }
1168
1169 pub fn dirty_fraction(&self) -> f64 {
1173 let capacity = self.cache.capacity().max(1) as f64;
1174 self.cache.dirty_count() as f64 / capacity
1175 }
1176
1177 pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1181 if self.config.read_only || max == 0 {
1182 return Ok(0);
1183 }
1184 let dirty_pages = self.cache.flush_some_dirty(max);
1185 if dirty_pages.is_empty() {
1186 return Ok(0);
1187 }
1188 let count = dirty_pages.len();
1189 for (page_id, page) in dirty_pages {
1196 self.write_page(page_id, page)?;
1197 }
1198 Ok(count)
1199 }
1200
1201 pub fn path(&self) -> &Path {
1203 &self.path
1204 }
1205
1206 pub fn is_read_only(&self) -> bool {
1208 self.config.read_only
1209 }
1210
1211 pub fn file_size(&self) -> Result<u64, PagerError> {
1213 let file = self.file_lock()?;
1214 Ok(file.metadata()?.len())
1215 }
1216
1217 pub fn prefetch_hint(&self, page_id: u32) {
1225 if let Ok(file) = self.file_lock() {
1226 let _ = crate::storage::btree::prefetch::prefetch_page(
1227 &file,
1228 page_id as u64,
1229 PAGE_SIZE as u32,
1230 );
1231 }
1232 }
1233
1234 fn shadow_path(db_path: &Path) -> PathBuf {
1238 let mut p = db_path.to_path_buf().into_os_string();
1239 p.push("-hdr");
1240 PathBuf::from(p)
1241 }
1242
1243 fn meta_shadow_path(db_path: &Path) -> PathBuf {
1245 let mut p = db_path.to_path_buf().into_os_string();
1246 p.push("-meta");
1247 PathBuf::from(p)
1248 }
1249
1250 fn dwb_path(db_path: &Path) -> PathBuf {
1252 let mut p = db_path.to_path_buf().into_os_string();
1253 p.push("-dwb");
1254 PathBuf::from(p)
1255 }
1256
1257 fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1262 Ok(OpenOptions::new()
1263 .read(true)
1264 .write(true)
1265 .create(true)
1266 .truncate(false)
1267 .open(Self::dwb_path(db_path))?)
1268 }
1269
1270 fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1272 file.set_len(0)?;
1273 file.seek(SeekFrom::Start(0))?;
1274 file.sync_all()?;
1275 Ok(())
1276 }
1277
1278 fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1280 if self.config.read_only {
1281 return Ok(());
1282 }
1283 let shadow = Self::shadow_path(&self.path);
1284 let mut f = File::create(&shadow)?;
1285 f.write_all(page.as_bytes())?;
1286 f.sync_all()?;
1287 Ok(())
1288 }
1289
1290 fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1292 let shadow = Self::shadow_path(&self.path);
1293 if !shadow.exists() {
1294 return Err(PagerError::InvalidDatabase(
1295 "Page 0 corrupted and no header shadow found".into(),
1296 ));
1297 }
1298 let mut f = File::open(&shadow)?;
1299 let mut buf = [0u8; PAGE_SIZE];
1300 f.read_exact(&mut buf)?;
1301 let page = Page::from_bytes(buf);
1302
1303 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1305 if magic != MAGIC_BYTES {
1306 return Err(PagerError::InvalidDatabase(
1307 "Header shadow also corrupted".into(),
1308 ));
1309 }
1310
1311 if !self.config.read_only {
1313 self.write_page_raw(0, &page)?;
1314 let file = self.file_lock()?;
1315 file.sync_all()?;
1316 }
1317
1318 Ok(page)
1319 }
1320
1321 pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1331 if self.config.read_only {
1332 return Ok(());
1333 }
1334 let shadow = Self::meta_shadow_path(&self.path);
1335 if crate::physical::fold_pager_meta_enabled() {
1336 let _ = std::fs::remove_file(&shadow);
1339 return Ok(());
1340 }
1341 let mut f = File::create(&shadow)?;
1342 f.write_all(page.as_bytes())?;
1343 f.sync_all()?;
1344 Ok(())
1345 }
1346
1347 pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1349 let shadow = Self::meta_shadow_path(&self.path);
1350 if !shadow.exists() {
1351 return Err(PagerError::InvalidDatabase(
1352 "Page 1 corrupted and no metadata shadow found".into(),
1353 ));
1354 }
1355 let mut f = File::open(&shadow)?;
1356 let mut buf = [0u8; PAGE_SIZE];
1357 f.read_exact(&mut buf)?;
1358 let page = Page::from_bytes(buf);
1359
1360 if !self.config.read_only {
1362 self.write_page_raw(1, &page)?;
1363 let file = self.file_lock()?;
1364 file.sync_all()?;
1365 }
1366
1367 Ok(page)
1368 }
1369
1370 fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1377 if let Some(dwb_mutex) = &self.dwb_file {
1378 let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1379
1380 let entry_size = 4 + PAGE_SIZE; let header_len = 4 + 4 + 4; let total = header_len + pages.len() * entry_size;
1385 let mut buf = Vec::with_capacity(total);
1386
1387 buf.extend_from_slice(&DWB_MAGIC);
1389 buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1390 buf.extend_from_slice(&[0u8; 4]); for (page_id, page) in pages {
1394 buf.extend_from_slice(&page_id.to_le_bytes());
1395 buf.extend_from_slice(page.as_bytes());
1396 }
1397
1398 let checksum = super::super::crc32::crc32(&buf[header_len..]);
1400 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1401
1402 dwb.seek(SeekFrom::Start(0))?;
1404 dwb.write_all(&buf)?;
1405 dwb.set_len(buf.len() as u64)?;
1406 dwb.sync_all()?;
1407
1408 for (page_id, page) in pages {
1410 self.write_page_raw(*page_id, page)?;
1411 }
1412
1413 Self::clear_dwb_file(&mut dwb)?;
1415
1416 Ok(())
1417 } else {
1418 for (page_id, page) in pages {
1420 self.write_page_raw(*page_id, page)?;
1421 }
1422 Ok(())
1423 }
1424 }
1425
1426 fn recover_from_dwb(&self) -> Result<(), PagerError> {
1431 let dwb_path = Self::dwb_path(&self.path);
1432 if !dwb_path.exists() {
1433 return Ok(());
1434 }
1435
1436 if let Some(dwb_mutex) = &self.dwb_file {
1437 let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1438 return self.recover_from_dwb_file(&mut file);
1439 }
1440
1441 let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1442 self.recover_from_dwb_file(&mut file)
1443 }
1444
1445 fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1446 file.seek(SeekFrom::Start(0))?;
1447 let len = file.metadata()?.len();
1448 if len < 12 {
1449 return Self::clear_dwb_file(file);
1451 }
1452
1453 let mut buf = vec![0u8; len as usize];
1454 file.read_exact(&mut buf)?;
1455
1456 if buf[0..4] != DWB_MAGIC {
1458 return Self::clear_dwb_file(file);
1460 }
1461
1462 let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1463 let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1464
1465 let header_len = 12;
1466 let entry_size = 4 + PAGE_SIZE;
1467 let expected_len = header_len + count * entry_size;
1468
1469 if buf.len() < expected_len {
1470 return Self::clear_dwb_file(file);
1472 }
1473
1474 let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1476 if computed != stored_checksum {
1477 return Self::clear_dwb_file(file);
1479 }
1480
1481 let mut offset = header_len;
1483 for _ in 0..count {
1484 let page_id = u32::from_le_bytes([
1485 buf[offset],
1486 buf[offset + 1],
1487 buf[offset + 2],
1488 buf[offset + 3],
1489 ]);
1490 offset += 4;
1491
1492 let mut page_data = [0u8; PAGE_SIZE];
1493 page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1494 offset += PAGE_SIZE;
1495
1496 let page = Page::from_bytes(page_data);
1497 self.write_page_raw(page_id, &page)?;
1498 }
1499
1500 {
1502 let file = self.file_lock()?;
1503 file.sync_all()?;
1504 }
1505
1506 Self::clear_dwb_file(file)
1507 }
1508
1509 pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1511 self.write_header()?;
1512 let file = self.file_lock()?;
1513 file.sync_all()?;
1514 Ok(())
1515 }
1516
1517 pub fn set_checkpoint_in_progress(
1519 &self,
1520 in_progress: bool,
1521 target_lsn: u64,
1522 ) -> Result<(), PagerError> {
1523 let mut header = self.header_write()?;
1524 header.checkpoint_in_progress = in_progress;
1525 header.checkpoint_target_lsn = target_lsn;
1526 *self.header_dirty_lock()? = true;
1527 drop(header);
1528 self.write_header_and_sync()
1529 }
1530
1531 pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1533 let mut header = self.header_write()?;
1534 header.checkpoint_lsn = lsn;
1535 header.checkpoint_in_progress = false;
1536 header.checkpoint_target_lsn = 0;
1537 *self.header_dirty_lock()? = true;
1538 drop(header);
1539 self.write_header_and_sync()
1540 }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use super::*;
1546
1547 fn temp_db_path(name: &str) -> PathBuf {
1548 std::env::temp_dir().join(format!(
1549 "reddb-pager-{}-{}-{}.rdb",
1550 name,
1551 std::process::id(),
1552 crate::utils::now_unix_nanos()
1553 ))
1554 }
1555
1556 #[test]
1557 fn open_refuses_future_database_version() {
1558 let path = temp_db_path("future-version");
1559 let pager = Pager::open_default(&path).unwrap();
1560 drop(pager);
1561
1562 let mut future_header = Page::new_header_page(1);
1563 future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1564 .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1565 future_header.update_checksum();
1566
1567 let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1568 file.seek(SeekFrom::Start(0)).unwrap();
1569 file.write_all(future_header.as_bytes()).unwrap();
1570 file.sync_all().unwrap();
1571 drop(file);
1572
1573 let err = match Pager::open_default(&path) {
1574 Ok(_) => panic!("future database version should be rejected"),
1575 Err(err) => err,
1576 };
1577 match err {
1578 PagerError::InvalidDatabase(msg) => {
1579 assert!(msg.contains("newer than supported"));
1580 }
1581 other => panic!("expected InvalidDatabase, got {other:?}"),
1582 }
1583
1584 let _ = std::fs::remove_file(&path);
1585 let _ = std::fs::remove_file(Pager::shadow_path(&path));
1586 let _ = std::fs::remove_file(Pager::dwb_path(&path));
1587 }
1588}