1use super::*;
2
3const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
5
6impl Pager {
7 pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
9 let path = path.as_ref().to_path_buf();
10 let exists = path.exists();
11
12 if !exists && !config.create {
13 return Err(PagerError::InvalidDatabase(
14 "Database does not exist".into(),
15 ));
16 }
17
18 if !exists && config.read_only {
19 return Err(PagerError::InvalidDatabase(
20 "Cannot create read-only database".into(),
21 ));
22 }
23
24 let file = OpenOptions::new()
27 .read(true)
28 .write(!config.read_only)
29 .create(config.create && !config.read_only)
30 .open(&path)?;
31
32 let lock_file = if !config.read_only {
34 let lf = OpenOptions::new().read(true).write(true).open(&path)?;
35 lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
36 Some(lf)
37 } else {
38 let lf = OpenOptions::new().read(true).open(&path)?;
39 match lf.try_lock_shared() {
40 Ok(_) => Some(lf),
41 Err(_) => None,
42 }
43 };
44
45 let dwb_file = if config.double_write && !config.read_only {
47 let f = Self::open_dwb_file(&path)?;
48 Some(Mutex::new(f))
49 } else {
50 None
51 };
52
53 let mut pager = Self {
54 path,
55 file: Mutex::new(file),
56 _lock_file: lock_file,
57 dwb_file,
58 cache: PageCache::new(config.cache_size),
59 freelist: RwLock::new(FreeList::new()),
60 header: RwLock::new(DatabaseHeader::default()),
61 config,
62 header_dirty: Mutex::new(false),
63 wal: RwLock::new(None),
64 encryption: None,
65 };
66
67 if exists {
68 pager.recover_from_dwb()?;
70 pager.load_header()?;
72 pager.bind_encryption_for_existing()?;
73 } else {
74 pager.initialize()?;
76 pager.bind_encryption_for_new()?;
77 }
78
79 Ok(pager)
80 }
81
82 fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
92 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
93 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
94
95 if self.page_count().unwrap_or(0) == 0 {
96 return self.bind_encryption_for_new();
97 }
98 let header_page = self.read_page_no_checksum(0)?;
99 let data = header_page.as_bytes();
100 let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
101 && &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
102
103 let key = self.config.encryption.clone();
104 match (has_marker, key) {
105 (true, Some(key)) => {
106 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
107 let header =
108 crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
109 .map_err(|e| {
110 PagerError::InvalidDatabase(format!(
111 "encryption header parse failed: {e}"
112 ))
113 })?;
114 if !header.validate(&key) {
115 return Err(PagerError::InvalidKey);
116 }
117 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
118 self.encryption = Some((encryptor, header));
119 Ok(())
120 }
121 (true, None) => Err(PagerError::EncryptionRequired),
122 (false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
123 (false, None) => Ok(()),
124 }
125 }
126
127 fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
130 const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
131 const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
132
133 let Some(key) = self.config.encryption.clone() else {
134 return Ok(());
135 };
136 let header = crate::storage::encryption::EncryptionHeader::new(&key);
137 let encryptor = crate::storage::encryption::PageEncryptor::new(key);
138
139 if self.page_count().unwrap_or(0) > 0 {
142 let mut page = self.read_page_no_checksum(0)?;
143 let data = page.as_bytes_mut();
144 data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
145 .copy_from_slice(ENCRYPTION_MARKER);
146 let header_bytes = header.to_bytes();
147 let header_start = ENCRYPTION_MARKER_OFFSET + 4;
148 data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
149 self.write_page_no_checksum(0, page)?;
150 }
151 self.encryption = Some((encryptor, header));
152 Ok(())
153 }
154
155 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
157 Self::open(path, PagerConfig::default())
158 }
159
160 fn initialize(&self) -> Result<(), PagerError> {
162 if self.config.read_only {
163 return Err(PagerError::ReadOnly);
164 }
165
166 let initial_page_count = 3;
170 let header_page = Page::new_header_page(initial_page_count);
171 self.header_write()?.page_count = initial_page_count;
172
173 self.write_page_raw(0, &header_page)?;
176 let mut metadata_page = Page::new(PageType::Header, 1);
177 metadata_page.update_checksum();
178 self.write_page_raw(1, &metadata_page)?;
179 let mut vault_page = Page::new(PageType::Vault, 2);
180 vault_page.update_checksum();
181 self.write_page_raw(2, &vault_page)?;
182
183 self.sync()?;
185
186 Ok(())
187 }
188
189 fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
191 self.header.write().map_err(|_| PagerError::LockPoisoned)
192 }
193
194 fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
196 self.header.read().map_err(|_| PagerError::LockPoisoned)
197 }
198
199 fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
201 self.freelist.write().map_err(|_| PagerError::LockPoisoned)
202 }
203
204 fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
206 self.file.lock().map_err(|_| PagerError::LockPoisoned)
207 }
208
209 fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
211 self.header_dirty
212 .lock()
213 .map_err(|_| PagerError::LockPoisoned)
214 }
215
216 fn load_header(&self) -> Result<(), PagerError> {
218 let header_page = match self.read_page_raw(0) {
220 Ok(page) => {
221 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
223 if magic == MAGIC_BYTES {
224 page
225 } else {
226 self.recover_header_from_shadow()?
228 }
229 }
230 Err(_) => self.recover_header_from_shadow()?,
231 };
232
233 let data = header_page.as_bytes();
235 let version = u32::from_le_bytes([
236 data[HEADER_SIZE + 4],
237 data[HEADER_SIZE + 5],
238 data[HEADER_SIZE + 6],
239 data[HEADER_SIZE + 7],
240 ]);
241
242 let page_size = u32::from_le_bytes([
243 data[HEADER_SIZE + 8],
244 data[HEADER_SIZE + 9],
245 data[HEADER_SIZE + 10],
246 data[HEADER_SIZE + 11],
247 ]);
248
249 if page_size != PAGE_SIZE as u32 {
250 return Err(PagerError::InvalidDatabase(format!(
251 "Unsupported page size: {}",
252 page_size
253 )));
254 }
255 if version > DB_VERSION {
256 return Err(PagerError::InvalidDatabase(format!(
257 "Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
258 )));
259 }
260
261 let page_count = u32::from_le_bytes([
262 data[HEADER_SIZE + 12],
263 data[HEADER_SIZE + 13],
264 data[HEADER_SIZE + 14],
265 data[HEADER_SIZE + 15],
266 ]);
267
268 let freelist_head = u32::from_le_bytes([
269 data[HEADER_SIZE + 16],
270 data[HEADER_SIZE + 17],
271 data[HEADER_SIZE + 18],
272 data[HEADER_SIZE + 19],
273 ]);
274
275 let schema_version = u32::from_le_bytes([
276 data[HEADER_SIZE + 20],
277 data[HEADER_SIZE + 21],
278 data[HEADER_SIZE + 22],
279 data[HEADER_SIZE + 23],
280 ]);
281
282 let checkpoint_lsn = u64::from_le_bytes([
283 data[HEADER_SIZE + 24],
284 data[HEADER_SIZE + 25],
285 data[HEADER_SIZE + 26],
286 data[HEADER_SIZE + 27],
287 data[HEADER_SIZE + 28],
288 data[HEADER_SIZE + 29],
289 data[HEADER_SIZE + 30],
290 data[HEADER_SIZE + 31],
291 ]);
292 let physical_format_version = u32::from_le_bytes([
293 data[HEADER_SIZE + 32],
294 data[HEADER_SIZE + 33],
295 data[HEADER_SIZE + 34],
296 data[HEADER_SIZE + 35],
297 ]);
298 let physical_sequence = u64::from_le_bytes([
299 data[HEADER_SIZE + 36],
300 data[HEADER_SIZE + 37],
301 data[HEADER_SIZE + 38],
302 data[HEADER_SIZE + 39],
303 data[HEADER_SIZE + 40],
304 data[HEADER_SIZE + 41],
305 data[HEADER_SIZE + 42],
306 data[HEADER_SIZE + 43],
307 ]);
308 let manifest_root = u64::from_le_bytes([
309 data[HEADER_SIZE + 44],
310 data[HEADER_SIZE + 45],
311 data[HEADER_SIZE + 46],
312 data[HEADER_SIZE + 47],
313 data[HEADER_SIZE + 48],
314 data[HEADER_SIZE + 49],
315 data[HEADER_SIZE + 50],
316 data[HEADER_SIZE + 51],
317 ]);
318 let manifest_oldest_root = u64::from_le_bytes([
319 data[HEADER_SIZE + 52],
320 data[HEADER_SIZE + 53],
321 data[HEADER_SIZE + 54],
322 data[HEADER_SIZE + 55],
323 data[HEADER_SIZE + 56],
324 data[HEADER_SIZE + 57],
325 data[HEADER_SIZE + 58],
326 data[HEADER_SIZE + 59],
327 ]);
328 let free_set_root = u64::from_le_bytes([
329 data[HEADER_SIZE + 60],
330 data[HEADER_SIZE + 61],
331 data[HEADER_SIZE + 62],
332 data[HEADER_SIZE + 63],
333 data[HEADER_SIZE + 64],
334 data[HEADER_SIZE + 65],
335 data[HEADER_SIZE + 66],
336 data[HEADER_SIZE + 67],
337 ]);
338 let manifest_page = u32::from_le_bytes([
339 data[HEADER_SIZE + 68],
340 data[HEADER_SIZE + 69],
341 data[HEADER_SIZE + 70],
342 data[HEADER_SIZE + 71],
343 ]);
344 let manifest_checksum = u64::from_le_bytes([
345 data[HEADER_SIZE + 72],
346 data[HEADER_SIZE + 73],
347 data[HEADER_SIZE + 74],
348 data[HEADER_SIZE + 75],
349 data[HEADER_SIZE + 76],
350 data[HEADER_SIZE + 77],
351 data[HEADER_SIZE + 78],
352 data[HEADER_SIZE + 79],
353 ]);
354 let collection_roots_page = u32::from_le_bytes([
355 data[HEADER_SIZE + 80],
356 data[HEADER_SIZE + 81],
357 data[HEADER_SIZE + 82],
358 data[HEADER_SIZE + 83],
359 ]);
360 let collection_roots_checksum = u64::from_le_bytes([
361 data[HEADER_SIZE + 84],
362 data[HEADER_SIZE + 85],
363 data[HEADER_SIZE + 86],
364 data[HEADER_SIZE + 87],
365 data[HEADER_SIZE + 88],
366 data[HEADER_SIZE + 89],
367 data[HEADER_SIZE + 90],
368 data[HEADER_SIZE + 91],
369 ]);
370 let collection_root_count = u32::from_le_bytes([
371 data[HEADER_SIZE + 92],
372 data[HEADER_SIZE + 93],
373 data[HEADER_SIZE + 94],
374 data[HEADER_SIZE + 95],
375 ]);
376 let snapshot_count = u32::from_le_bytes([
377 data[HEADER_SIZE + 96],
378 data[HEADER_SIZE + 97],
379 data[HEADER_SIZE + 98],
380 data[HEADER_SIZE + 99],
381 ]);
382 let index_count = u32::from_le_bytes([
383 data[HEADER_SIZE + 100],
384 data[HEADER_SIZE + 101],
385 data[HEADER_SIZE + 102],
386 data[HEADER_SIZE + 103],
387 ]);
388 let catalog_collection_count = u32::from_le_bytes([
389 data[HEADER_SIZE + 104],
390 data[HEADER_SIZE + 105],
391 data[HEADER_SIZE + 106],
392 data[HEADER_SIZE + 107],
393 ]);
394 let catalog_total_entities = u64::from_le_bytes([
395 data[HEADER_SIZE + 108],
396 data[HEADER_SIZE + 109],
397 data[HEADER_SIZE + 110],
398 data[HEADER_SIZE + 111],
399 data[HEADER_SIZE + 112],
400 data[HEADER_SIZE + 113],
401 data[HEADER_SIZE + 114],
402 data[HEADER_SIZE + 115],
403 ]);
404 let export_count = u32::from_le_bytes([
405 data[HEADER_SIZE + 116],
406 data[HEADER_SIZE + 117],
407 data[HEADER_SIZE + 118],
408 data[HEADER_SIZE + 119],
409 ]);
410 let graph_projection_count = u32::from_le_bytes([
411 data[HEADER_SIZE + 120],
412 data[HEADER_SIZE + 121],
413 data[HEADER_SIZE + 122],
414 data[HEADER_SIZE + 123],
415 ]);
416 let analytics_job_count = u32::from_le_bytes([
417 data[HEADER_SIZE + 124],
418 data[HEADER_SIZE + 125],
419 data[HEADER_SIZE + 126],
420 data[HEADER_SIZE + 127],
421 ]);
422 let manifest_event_count = u32::from_le_bytes([
423 data[HEADER_SIZE + 128],
424 data[HEADER_SIZE + 129],
425 data[HEADER_SIZE + 130],
426 data[HEADER_SIZE + 131],
427 ]);
428 let registry_page = u32::from_le_bytes([
429 data[HEADER_SIZE + 132],
430 data[HEADER_SIZE + 133],
431 data[HEADER_SIZE + 134],
432 data[HEADER_SIZE + 135],
433 ]);
434 let registry_checksum = u64::from_le_bytes([
435 data[HEADER_SIZE + 136],
436 data[HEADER_SIZE + 137],
437 data[HEADER_SIZE + 138],
438 data[HEADER_SIZE + 139],
439 data[HEADER_SIZE + 140],
440 data[HEADER_SIZE + 141],
441 data[HEADER_SIZE + 142],
442 data[HEADER_SIZE + 143],
443 ]);
444 let recovery_page = u32::from_le_bytes([
445 data[HEADER_SIZE + 144],
446 data[HEADER_SIZE + 145],
447 data[HEADER_SIZE + 146],
448 data[HEADER_SIZE + 147],
449 ]);
450 let recovery_checksum = u64::from_le_bytes([
451 data[HEADER_SIZE + 148],
452 data[HEADER_SIZE + 149],
453 data[HEADER_SIZE + 150],
454 data[HEADER_SIZE + 151],
455 data[HEADER_SIZE + 152],
456 data[HEADER_SIZE + 153],
457 data[HEADER_SIZE + 154],
458 data[HEADER_SIZE + 155],
459 ]);
460 let catalog_page = u32::from_le_bytes([
461 data[HEADER_SIZE + 156],
462 data[HEADER_SIZE + 157],
463 data[HEADER_SIZE + 158],
464 data[HEADER_SIZE + 159],
465 ]);
466 let catalog_checksum = u64::from_le_bytes([
467 data[HEADER_SIZE + 160],
468 data[HEADER_SIZE + 161],
469 data[HEADER_SIZE + 162],
470 data[HEADER_SIZE + 163],
471 data[HEADER_SIZE + 164],
472 data[HEADER_SIZE + 165],
473 data[HEADER_SIZE + 166],
474 data[HEADER_SIZE + 167],
475 ]);
476 let metadata_state_page = u32::from_le_bytes([
477 data[HEADER_SIZE + 168],
478 data[HEADER_SIZE + 169],
479 data[HEADER_SIZE + 170],
480 data[HEADER_SIZE + 171],
481 ]);
482 let metadata_state_checksum = u64::from_le_bytes([
483 data[HEADER_SIZE + 172],
484 data[HEADER_SIZE + 173],
485 data[HEADER_SIZE + 174],
486 data[HEADER_SIZE + 175],
487 data[HEADER_SIZE + 176],
488 data[HEADER_SIZE + 177],
489 data[HEADER_SIZE + 178],
490 data[HEADER_SIZE + 179],
491 ]);
492 let vector_artifact_page = u32::from_le_bytes([
493 data[HEADER_SIZE + 180],
494 data[HEADER_SIZE + 181],
495 data[HEADER_SIZE + 182],
496 data[HEADER_SIZE + 183],
497 ]);
498 let vector_artifact_checksum = u64::from_le_bytes([
499 data[HEADER_SIZE + 184],
500 data[HEADER_SIZE + 185],
501 data[HEADER_SIZE + 186],
502 data[HEADER_SIZE + 187],
503 data[HEADER_SIZE + 188],
504 data[HEADER_SIZE + 189],
505 data[HEADER_SIZE + 190],
506 data[HEADER_SIZE + 191],
507 ]);
508
509 let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
511 let checkpoint_target_lsn = u64::from_le_bytes([
512 data[HEADER_SIZE + 193],
513 data[HEADER_SIZE + 194],
514 data[HEADER_SIZE + 195],
515 data[HEADER_SIZE + 196],
516 data[HEADER_SIZE + 197],
517 data[HEADER_SIZE + 198],
518 data[HEADER_SIZE + 199],
519 data[HEADER_SIZE + 200],
520 ]);
521
522 {
524 let mut header = self.header_write()?;
525 header.version = version;
526 header.page_size = page_size;
527 header.page_count = page_count;
528 header.freelist_head = freelist_head;
529 header.schema_version = schema_version;
530 header.checkpoint_lsn = checkpoint_lsn;
531 header.checkpoint_in_progress = checkpoint_in_progress;
532 header.checkpoint_target_lsn = checkpoint_target_lsn;
533 header.physical = PhysicalFileHeader {
534 format_version: physical_format_version,
535 sequence: physical_sequence,
536 manifest_oldest_root,
537 manifest_root,
538 free_set_root,
539 manifest_page,
540 manifest_checksum,
541 collection_roots_page,
542 collection_roots_checksum,
543 collection_root_count,
544 snapshot_count,
545 index_count,
546 catalog_collection_count,
547 catalog_total_entities,
548 export_count,
549 graph_projection_count,
550 analytics_job_count,
551 manifest_event_count,
552 registry_page,
553 registry_checksum,
554 recovery_page,
555 recovery_checksum,
556 catalog_page,
557 catalog_checksum,
558 metadata_state_page,
559 metadata_state_checksum,
560 vector_artifact_page,
561 vector_artifact_checksum,
562 };
563 }
564
565 {
567 let mut freelist = self.freelist_write()?;
568 *freelist = FreeList::from_header(freelist_head, 0);
569 }
570
571 Ok(())
572 }
573
574 fn write_header(&self) -> Result<(), PagerError> {
579 if self.config.read_only {
580 return Err(PagerError::ReadOnly);
581 }
582
583 let header = self.header_read()?;
584
585 let mut page = if let Some(cached) = self.cache.get(0) {
588 cached
589 } else {
590 let file = self.file_lock()?;
592 let len = file.metadata().map(|m| m.len()).unwrap_or(0);
593 drop(file);
594
595 if len >= PAGE_SIZE as u64 {
596 self.read_page_raw(0)?
597 } else {
598 Page::new(PageType::Header, 0)
600 }
601 };
602
603 let data = page.as_bytes_mut();
604
605 data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
607
608 data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
610 data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
611 data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
612 data[HEADER_SIZE + 16..HEADER_SIZE + 20]
613 .copy_from_slice(&header.freelist_head.to_le_bytes());
614 data[HEADER_SIZE + 20..HEADER_SIZE + 24]
615 .copy_from_slice(&header.schema_version.to_le_bytes());
616 data[HEADER_SIZE + 24..HEADER_SIZE + 32]
617 .copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
618 data[HEADER_SIZE + 32..HEADER_SIZE + 36]
619 .copy_from_slice(&header.physical.format_version.to_le_bytes());
620 data[HEADER_SIZE + 36..HEADER_SIZE + 44]
621 .copy_from_slice(&header.physical.sequence.to_le_bytes());
622 data[HEADER_SIZE + 44..HEADER_SIZE + 52]
623 .copy_from_slice(&header.physical.manifest_root.to_le_bytes());
624 data[HEADER_SIZE + 52..HEADER_SIZE + 60]
625 .copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
626 data[HEADER_SIZE + 60..HEADER_SIZE + 68]
627 .copy_from_slice(&header.physical.free_set_root.to_le_bytes());
628 data[HEADER_SIZE + 68..HEADER_SIZE + 72]
629 .copy_from_slice(&header.physical.manifest_page.to_le_bytes());
630 data[HEADER_SIZE + 72..HEADER_SIZE + 80]
631 .copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
632 data[HEADER_SIZE + 80..HEADER_SIZE + 84]
633 .copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
634 data[HEADER_SIZE + 84..HEADER_SIZE + 92]
635 .copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
636 data[HEADER_SIZE + 92..HEADER_SIZE + 96]
637 .copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
638 data[HEADER_SIZE + 96..HEADER_SIZE + 100]
639 .copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
640 data[HEADER_SIZE + 100..HEADER_SIZE + 104]
641 .copy_from_slice(&header.physical.index_count.to_le_bytes());
642 data[HEADER_SIZE + 104..HEADER_SIZE + 108]
643 .copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
644 data[HEADER_SIZE + 108..HEADER_SIZE + 116]
645 .copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
646 data[HEADER_SIZE + 116..HEADER_SIZE + 120]
647 .copy_from_slice(&header.physical.export_count.to_le_bytes());
648 data[HEADER_SIZE + 120..HEADER_SIZE + 124]
649 .copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
650 data[HEADER_SIZE + 124..HEADER_SIZE + 128]
651 .copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
652 data[HEADER_SIZE + 128..HEADER_SIZE + 132]
653 .copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
654 data[HEADER_SIZE + 132..HEADER_SIZE + 136]
655 .copy_from_slice(&header.physical.registry_page.to_le_bytes());
656 data[HEADER_SIZE + 136..HEADER_SIZE + 144]
657 .copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
658 data[HEADER_SIZE + 144..HEADER_SIZE + 148]
659 .copy_from_slice(&header.physical.recovery_page.to_le_bytes());
660 data[HEADER_SIZE + 148..HEADER_SIZE + 156]
661 .copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
662 data[HEADER_SIZE + 156..HEADER_SIZE + 160]
663 .copy_from_slice(&header.physical.catalog_page.to_le_bytes());
664 data[HEADER_SIZE + 160..HEADER_SIZE + 168]
665 .copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
666 data[HEADER_SIZE + 168..HEADER_SIZE + 172]
667 .copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
668 data[HEADER_SIZE + 172..HEADER_SIZE + 180]
669 .copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
670 data[HEADER_SIZE + 180..HEADER_SIZE + 184]
671 .copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
672 data[HEADER_SIZE + 184..HEADER_SIZE + 192]
673 .copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
674
675 data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
677 data[HEADER_SIZE + 193..HEADER_SIZE + 201]
678 .copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
679
680 page.update_checksum();
681
682 self.write_header_shadow(&page)?;
684
685 self.write_page_raw(0, &page)?;
686 *self.header_dirty_lock()? = false;
687
688 Ok(())
689 }
690
691 fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
693 let mut file = self.file_lock()?;
694 let offset = (page_id as u64) * (PAGE_SIZE as u64);
695
696 file.seek(SeekFrom::Start(offset))?;
697
698 let mut buf = [0u8; PAGE_SIZE];
699 file.read_exact(&mut buf)?;
700
701 let page = Page::from_bytes(buf);
702
703 if self.config.verify_checksums {
705 page.verify_checksum()?;
706 }
707
708 Ok(page)
709 }
710
711 fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
713 if self.config.read_only {
714 return Err(PagerError::ReadOnly);
715 }
716
717 let mut file = self.file_lock()?;
718 let offset = (page_id as u64) * (PAGE_SIZE as u64);
719
720 file.seek(SeekFrom::Start(offset))?;
721 file.write_all(page.as_bytes())?;
722
723 Ok(())
724 }
725
726 pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
728 if let Some(page) = self.cache.get(page_id) {
730 return Ok(page);
731 }
732
733 let page = self.read_page_raw(page_id)?;
735
736 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
738 let evicted_id = dirty_page.page_id();
740 self.write_page_raw(evicted_id, &dirty_page)?;
741 }
742
743 Ok(page)
744 }
745
746 pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
751 if let Some(page) = self.cache.get(page_id) {
753 return Ok(page);
754 }
755
756 let mut file = self.file_lock()?;
758 let offset = (page_id as u64) * (PAGE_SIZE as u64);
759
760 file.seek(SeekFrom::Start(offset))?;
761
762 let mut buf = [0u8; PAGE_SIZE];
763 file.read_exact(&mut buf)?;
764 drop(file);
765
766 let page = Page::from_bytes(buf);
767
768 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
770 let evicted_id = dirty_page.page_id();
772 self.write_page_raw(evicted_id, &dirty_page)?;
773 }
774
775 Ok(page)
776 }
777
778 pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
780 if self.config.read_only {
781 return Err(PagerError::ReadOnly);
782 }
783
784 page.update_checksum();
786
787 if let Some(dirty_page) = self.cache.insert(page_id, page) {
792 let evicted_id = dirty_page.page_id();
793 self.write_page_raw(evicted_id, &dirty_page)?;
794 }
795 self.cache.mark_dirty(page_id);
796
797 Ok(())
798 }
799
800 pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
805 if page_id == 0 || self.encryption.is_none() {
806 return self.read_page(page_id);
807 }
808 let raw = self.read_page_no_checksum(page_id)?;
809 let (enc, _) = self
810 .encryption
811 .as_ref()
812 .expect("encryption presence checked above");
813 let plaintext = enc
814 .decrypt(page_id, raw.as_bytes())
815 .map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
816 let mut buf = [0u8; PAGE_SIZE];
817 let n = plaintext.len().min(PAGE_SIZE);
818 buf[..n].copy_from_slice(&plaintext[..n]);
819 Ok(Page::from_bytes(buf))
820 }
821
822 pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
827 if page_id == 0 || self.encryption.is_none() {
828 return self.write_page(page_id, page);
829 }
830 const OVERHEAD: usize = 12 + 16; let plaintext_len = PAGE_SIZE - OVERHEAD;
832 let plaintext = &page.as_bytes()[..plaintext_len];
833 let (enc, _) = self
834 .encryption
835 .as_ref()
836 .expect("encryption presence checked above");
837 let ciphertext = enc.encrypt(page_id, plaintext);
838 debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
839 let mut buf = [0u8; PAGE_SIZE];
840 buf.copy_from_slice(&ciphertext);
841 let cipher_page = Page::from_bytes(buf);
842 self.write_page_no_checksum(page_id, cipher_page)
843 }
844
845 pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
850 if self.config.read_only {
851 return Err(PagerError::ReadOnly);
852 }
853
854 if let Some(dirty_page) = self.cache.insert(page_id, page) {
857 let evicted_id = dirty_page.page_id();
858 self.write_page_raw(evicted_id, &dirty_page)?;
859 }
860 self.cache.mark_dirty(page_id);
861
862 Ok(())
863 }
864
865 pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
867 if self.config.read_only {
868 return Err(PagerError::ReadOnly);
869 }
870
871 let page_id = {
873 let mut freelist = self.freelist_write()?;
874 if let Some(id) = freelist.allocate() {
875 id
876 } else if freelist.trunk_head() != 0 {
877 let trunk_id = freelist.trunk_head();
878 drop(freelist);
879
880 let trunk = self.read_page(trunk_id).map_err(|e| match e {
881 PagerError::PageNotFound(_) => {
882 PagerError::InvalidDatabase("Freelist trunk missing".to_string())
883 }
884 other => other,
885 })?;
886
887 let mut freelist = self.freelist_write()?;
888 freelist
889 .load_from_trunk(&trunk)
890 .map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
891 let id = freelist.allocate().ok_or_else(|| {
892 PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
893 })?;
894
895 let mut header = self.header_write()?;
896 header.freelist_head = freelist.trunk_head();
897 *self.header_dirty_lock()? = true;
898
899 id
900 } else {
901 let mut header = self.header_write()?;
903 let id = header.page_count;
904 header.page_count += 1;
905 *self.header_dirty_lock()? = true;
906 id
907 }
908 };
909
910 let page = Page::new(page_type, page_id);
911
912 if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
924 let evicted_id = dirty_page.page_id();
925 self.write_page_raw(evicted_id, &dirty_page)?;
926 }
927 self.cache.mark_dirty(page_id);
928
929 Ok(page)
930 }
931
932 pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
934 if self.config.read_only {
935 return Err(PagerError::ReadOnly);
936 }
937
938 self.cache.remove(page_id);
940
941 let mut freelist = self.freelist_write()?;
943 freelist.free(page_id);
944
945 *self.header_dirty_lock()? = true;
946
947 Ok(())
948 }
949
950 pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
952 Ok(self.header_read()?.clone())
953 }
954
955 pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
956 Ok(self.header_read()?.physical)
957 }
958
959 pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
960 if self.config.read_only {
961 return Err(PagerError::ReadOnly);
962 }
963
964 let mut header = self.header_write()?;
965 header.physical = physical;
966 *self.header_dirty_lock()? = true;
967 Ok(())
968 }
969
970 pub fn page_count(&self) -> Result<u32, PagerError> {
972 Ok(self.header_read()?.page_count)
973 }
974
975 pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
987 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
988 *slot = Some(wal);
989 }
990
991 pub fn clear_wal_writer(&self) {
993 let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
994 *slot = None;
995 }
996
997 pub fn has_wal_writer(&self) -> bool {
999 self.wal.read().map(|s| s.is_some()).unwrap_or(false)
1000 }
1001
1002 pub fn flush(&self) -> Result<(), PagerError> {
1004 if self.config.read_only {
1005 return Ok(());
1006 }
1007
1008 let trunks = {
1010 let mut freelist = self.freelist_write()?;
1011 if freelist.is_dirty() {
1012 let mut header = self.header_write()?;
1013 let trunks = freelist.flush_to_trunks(0, || {
1014 let id = header.page_count;
1015 header.page_count += 1;
1016 id
1017 });
1018 header.freelist_head = freelist.trunk_head();
1019 *self.header_dirty_lock()? = true;
1020 freelist.mark_clean();
1021 trunks
1022 } else {
1023 Vec::new()
1024 }
1025 };
1026
1027 for trunk in trunks {
1028 let page_id = trunk.page_id();
1029 self.cache.insert(page_id, trunk);
1030 self.cache.mark_dirty(page_id);
1031 }
1032
1033 let dirty_pages = self.cache.flush_dirty();
1035 if !dirty_pages.is_empty() {
1036 let max_lsn = dirty_pages
1041 .iter()
1042 .filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
1043 .max()
1044 .unwrap_or(0);
1045 if max_lsn > 0 {
1046 if let Ok(slot) = self.wal.read() {
1047 if let Some(wal) = slot.as_ref() {
1048 let wal = Arc::clone(wal);
1049 drop(slot);
1053 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
1054 wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
1055 }
1056 }
1057 }
1058 self.write_pages_through_dwb(&dirty_pages)?;
1059 }
1060
1061 if *self.header_dirty_lock()? {
1063 self.write_header()?;
1064 }
1065
1066 Ok(())
1067 }
1068
1069 pub fn sync(&self) -> Result<(), PagerError> {
1071 self.flush()?;
1072
1073 let file = self.file_lock()?;
1074 file.sync_all()?;
1075
1076 Ok(())
1077 }
1078
1079 pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
1081 self.cache.stats()
1082 }
1083
1084 pub fn dirty_page_count(&self) -> usize {
1086 self.cache.dirty_count()
1087 }
1088
1089 pub fn dirty_fraction(&self) -> f64 {
1093 let capacity = self.cache.capacity().max(1) as f64;
1094 self.cache.dirty_count() as f64 / capacity
1095 }
1096
1097 pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
1101 if self.config.read_only || max == 0 {
1102 return Ok(0);
1103 }
1104 let dirty_pages = self.cache.flush_some_dirty(max);
1105 if dirty_pages.is_empty() {
1106 return Ok(0);
1107 }
1108 let count = dirty_pages.len();
1109 for (page_id, page) in dirty_pages {
1116 self.write_page(page_id, page)?;
1117 }
1118 Ok(count)
1119 }
1120
1121 pub fn path(&self) -> &Path {
1123 &self.path
1124 }
1125
1126 pub fn is_read_only(&self) -> bool {
1128 self.config.read_only
1129 }
1130
1131 pub fn file_size(&self) -> Result<u64, PagerError> {
1133 let file = self.file_lock()?;
1134 Ok(file.metadata()?.len())
1135 }
1136
1137 pub fn prefetch_hint(&self, page_id: u32) {
1145 if let Ok(file) = self.file_lock() {
1146 let _ = crate::storage::btree::prefetch::prefetch_page(
1147 &file,
1148 page_id as u64,
1149 PAGE_SIZE as u32,
1150 );
1151 }
1152 }
1153
1154 fn shadow_path(db_path: &Path) -> PathBuf {
1158 let mut p = db_path.to_path_buf().into_os_string();
1159 p.push("-hdr");
1160 PathBuf::from(p)
1161 }
1162
1163 fn meta_shadow_path(db_path: &Path) -> PathBuf {
1165 let mut p = db_path.to_path_buf().into_os_string();
1166 p.push("-meta");
1167 PathBuf::from(p)
1168 }
1169
1170 fn dwb_path(db_path: &Path) -> PathBuf {
1172 let mut p = db_path.to_path_buf().into_os_string();
1173 p.push("-dwb");
1174 PathBuf::from(p)
1175 }
1176
1177 fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
1182 Ok(OpenOptions::new()
1183 .read(true)
1184 .write(true)
1185 .create(true)
1186 .truncate(false)
1187 .open(Self::dwb_path(db_path))?)
1188 }
1189
1190 fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
1192 file.set_len(0)?;
1193 file.seek(SeekFrom::Start(0))?;
1194 file.sync_all()?;
1195 Ok(())
1196 }
1197
1198 fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
1200 if self.config.read_only {
1201 return Ok(());
1202 }
1203 let shadow = Self::shadow_path(&self.path);
1204 let mut f = File::create(&shadow)?;
1205 f.write_all(page.as_bytes())?;
1206 f.sync_all()?;
1207 Ok(())
1208 }
1209
1210 fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
1212 let shadow = Self::shadow_path(&self.path);
1213 if !shadow.exists() {
1214 return Err(PagerError::InvalidDatabase(
1215 "Page 0 corrupted and no header shadow found".into(),
1216 ));
1217 }
1218 let mut f = File::open(&shadow)?;
1219 let mut buf = [0u8; PAGE_SIZE];
1220 f.read_exact(&mut buf)?;
1221 let page = Page::from_bytes(buf);
1222
1223 let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
1225 if magic != MAGIC_BYTES {
1226 return Err(PagerError::InvalidDatabase(
1227 "Header shadow also corrupted".into(),
1228 ));
1229 }
1230
1231 if !self.config.read_only {
1233 self.write_page_raw(0, &page)?;
1234 let file = self.file_lock()?;
1235 file.sync_all()?;
1236 }
1237
1238 Ok(page)
1239 }
1240
1241 pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
1243 if self.config.read_only {
1244 return Ok(());
1245 }
1246 let shadow = Self::meta_shadow_path(&self.path);
1247 let mut f = File::create(&shadow)?;
1248 f.write_all(page.as_bytes())?;
1249 f.sync_all()?;
1250 Ok(())
1251 }
1252
1253 pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
1255 let shadow = Self::meta_shadow_path(&self.path);
1256 if !shadow.exists() {
1257 return Err(PagerError::InvalidDatabase(
1258 "Page 1 corrupted and no metadata shadow found".into(),
1259 ));
1260 }
1261 let mut f = File::open(&shadow)?;
1262 let mut buf = [0u8; PAGE_SIZE];
1263 f.read_exact(&mut buf)?;
1264 let page = Page::from_bytes(buf);
1265
1266 if !self.config.read_only {
1268 self.write_page_raw(1, &page)?;
1269 let file = self.file_lock()?;
1270 file.sync_all()?;
1271 }
1272
1273 Ok(page)
1274 }
1275
1276 fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
1283 if let Some(dwb_mutex) = &self.dwb_file {
1284 let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1285
1286 let entry_size = 4 + PAGE_SIZE; let header_len = 4 + 4 + 4; let total = header_len + pages.len() * entry_size;
1291 let mut buf = Vec::with_capacity(total);
1292
1293 buf.extend_from_slice(&DWB_MAGIC);
1295 buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
1296 buf.extend_from_slice(&[0u8; 4]); for (page_id, page) in pages {
1300 buf.extend_from_slice(&page_id.to_le_bytes());
1301 buf.extend_from_slice(page.as_bytes());
1302 }
1303
1304 let checksum = super::super::crc32::crc32(&buf[header_len..]);
1306 buf[8..12].copy_from_slice(&checksum.to_le_bytes());
1307
1308 dwb.seek(SeekFrom::Start(0))?;
1310 dwb.write_all(&buf)?;
1311 dwb.set_len(buf.len() as u64)?;
1312 dwb.sync_all()?;
1313
1314 for (page_id, page) in pages {
1316 self.write_page_raw(*page_id, page)?;
1317 }
1318
1319 Self::clear_dwb_file(&mut dwb)?;
1321
1322 Ok(())
1323 } else {
1324 for (page_id, page) in pages {
1326 self.write_page_raw(*page_id, page)?;
1327 }
1328 Ok(())
1329 }
1330 }
1331
1332 fn recover_from_dwb(&self) -> Result<(), PagerError> {
1337 let dwb_path = Self::dwb_path(&self.path);
1338 if !dwb_path.exists() {
1339 return Ok(());
1340 }
1341
1342 if let Some(dwb_mutex) = &self.dwb_file {
1343 let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
1344 return self.recover_from_dwb_file(&mut file);
1345 }
1346
1347 let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
1348 self.recover_from_dwb_file(&mut file)
1349 }
1350
1351 fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
1352 file.seek(SeekFrom::Start(0))?;
1353 let len = file.metadata()?.len();
1354 if len < 12 {
1355 return Self::clear_dwb_file(file);
1357 }
1358
1359 let mut buf = vec![0u8; len as usize];
1360 file.read_exact(&mut buf)?;
1361
1362 if buf[0..4] != DWB_MAGIC {
1364 return Self::clear_dwb_file(file);
1366 }
1367
1368 let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
1369 let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
1370
1371 let header_len = 12;
1372 let entry_size = 4 + PAGE_SIZE;
1373 let expected_len = header_len + count * entry_size;
1374
1375 if buf.len() < expected_len {
1376 return Self::clear_dwb_file(file);
1378 }
1379
1380 let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
1382 if computed != stored_checksum {
1383 return Self::clear_dwb_file(file);
1385 }
1386
1387 let mut offset = header_len;
1389 for _ in 0..count {
1390 let page_id = u32::from_le_bytes([
1391 buf[offset],
1392 buf[offset + 1],
1393 buf[offset + 2],
1394 buf[offset + 3],
1395 ]);
1396 offset += 4;
1397
1398 let mut page_data = [0u8; PAGE_SIZE];
1399 page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
1400 offset += PAGE_SIZE;
1401
1402 let page = Page::from_bytes(page_data);
1403 self.write_page_raw(page_id, &page)?;
1404 }
1405
1406 {
1408 let file = self.file_lock()?;
1409 file.sync_all()?;
1410 }
1411
1412 Self::clear_dwb_file(file)
1413 }
1414
1415 pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
1417 self.write_header()?;
1418 let file = self.file_lock()?;
1419 file.sync_all()?;
1420 Ok(())
1421 }
1422
1423 pub fn set_checkpoint_in_progress(
1425 &self,
1426 in_progress: bool,
1427 target_lsn: u64,
1428 ) -> Result<(), PagerError> {
1429 let mut header = self.header_write()?;
1430 header.checkpoint_in_progress = in_progress;
1431 header.checkpoint_target_lsn = target_lsn;
1432 *self.header_dirty_lock()? = true;
1433 drop(header);
1434 self.write_header_and_sync()
1435 }
1436
1437 pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
1439 let mut header = self.header_write()?;
1440 header.checkpoint_lsn = lsn;
1441 header.checkpoint_in_progress = false;
1442 header.checkpoint_target_lsn = 0;
1443 *self.header_dirty_lock()? = true;
1444 drop(header);
1445 self.write_header_and_sync()
1446 }
1447}
1448
1449#[cfg(test)]
1450mod tests {
1451 use super::*;
1452
1453 fn temp_db_path(name: &str) -> PathBuf {
1454 std::env::temp_dir().join(format!(
1455 "reddb-pager-{}-{}-{}.rdb",
1456 name,
1457 std::process::id(),
1458 crate::utils::now_unix_nanos()
1459 ))
1460 }
1461
1462 #[test]
1463 fn open_refuses_future_database_version() {
1464 let path = temp_db_path("future-version");
1465 let pager = Pager::open_default(&path).unwrap();
1466 drop(pager);
1467
1468 let mut future_header = Page::new_header_page(1);
1469 future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
1470 .copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
1471 future_header.update_checksum();
1472
1473 let mut file = OpenOptions::new().write(true).open(&path).unwrap();
1474 file.seek(SeekFrom::Start(0)).unwrap();
1475 file.write_all(future_header.as_bytes()).unwrap();
1476 file.sync_all().unwrap();
1477 drop(file);
1478
1479 let err = match Pager::open_default(&path) {
1480 Ok(_) => panic!("future database version should be rejected"),
1481 Err(err) => err,
1482 };
1483 match err {
1484 PagerError::InvalidDatabase(msg) => {
1485 assert!(msg.contains("newer than supported"));
1486 }
1487 other => panic!("expected InvalidDatabase, got {other:?}"),
1488 }
1489
1490 let _ = std::fs::remove_file(&path);
1491 let _ = std::fs::remove_file(Pager::shadow_path(&path));
1492 let _ = std::fs::remove_file(Pager::dwb_path(&path));
1493 }
1494}