1use crate::error::Result;
79use crate::storage::journal::{JournalRecord, JournalState, RollbackJournal};
80use crate::storage::wal::{VisibleWalState, WalFrame, WalRecord};
81use crate::storage::{
82 buffer_pool::BufferPool,
83 file_manager::{FileManager, FileManagerSnapshot},
84 Page, PageId, PagerIntegrityReport, DB_HEADER_PAGE_ID, STORAGE_METADATA_PAGE_ID,
85};
86use std::collections::{HashMap, HashSet};
87use std::ffi::OsString;
88use std::fs::{self, OpenOptions};
89use std::io::Write;
90use std::path::Path;
91use std::path::PathBuf;
92use std::sync::{Mutex, MutexGuard, OnceLock};
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum JournalMode {
96 Rollback,
97 Wal,
98}
99
100impl JournalMode {
101 fn parse(value: &str) -> Result<Self> {
102 match value {
103 "rollback" => Ok(Self::Rollback),
104 "wal" => Ok(Self::Wal),
105 _ => Err(crate::error::HematiteError::StorageError(format!(
106 "Unsupported pager journal mode '{}'",
107 value
108 ))),
109 }
110 }
111
112 fn as_str(self) -> &'static str {
113 match self {
114 Self::Rollback => "rollback",
115 Self::Wal => "wal",
116 }
117 }
118}
119
120#[derive(Debug, Clone)]
121struct PagerTransaction {
122 original_file_len: u64,
123 original_free_pages: Vec<PageId>,
124 original_checksums: HashMap<PageId, u32>,
125 wal_next_page_id: PageId,
126 wal_free_pages: Vec<PageId>,
127 journaled_pages: HashSet<PageId>,
128 page_records: Vec<JournalRecord>,
129}
130
131#[derive(Debug, Clone)]
132pub(crate) struct PagerSnapshot {
133 file_manager: FileManagerSnapshot,
134 buffer_pool: BufferPool,
135 dirty_pages: HashSet<PageId>,
136 page_checksums: HashMap<PageId, u32>,
137 transaction: Option<PagerTransaction>,
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141enum PagerLockMode {
142 None,
143 Shared { depth: usize },
144 Write,
145}
146
147#[derive(Debug, Clone, Default)]
148struct LockRegistryEntry {
149 readers: usize,
150 writer: bool,
151 wal_reader_sequences: HashMap<u64, usize>,
152}
153
154fn compact_transaction_free_pages(transaction: &mut PagerTransaction) {
155 transaction.wal_free_pages.sort_unstable();
156 transaction.wal_free_pages.dedup();
157 while let Some(&last_page_id) = transaction.wal_free_pages.last() {
158 if last_page_id + 1 != transaction.wal_next_page_id {
159 break;
160 }
161 transaction.wal_free_pages.pop();
162 transaction.wal_next_page_id = transaction.wal_next_page_id.saturating_sub(1);
163 }
164}
165
166fn lock_registry() -> &'static Mutex<HashMap<PathBuf, LockRegistryEntry>> {
167 static REGISTRY: OnceLock<Mutex<HashMap<PathBuf, LockRegistryEntry>>> = OnceLock::new();
168 REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
169}
170
171#[derive(Debug)]
172pub struct Pager {
173 file_manager: FileManager,
174 buffer_pool: BufferPool,
175 dirty_pages: HashSet<PageId>,
176 page_checksums: HashMap<PageId, u32>,
177 journal_mode: JournalMode,
178 checksum_store_path: Option<PathBuf>,
179 journal_path: Option<PathBuf>,
180 wal_path: Option<PathBuf>,
181 database_identity: Option<PathBuf>,
182 lock_mode: PagerLockMode,
183 wal_read_snapshot: Option<VisibleWalState>,
184 latest_wal_state: Option<VisibleWalState>,
185 transaction: Option<PagerTransaction>,
186 buffer_pool_capacity: usize,
187}
188
189impl Pager {
190 fn lock_registry_map(
191 &self,
192 ) -> Result<MutexGuard<'static, HashMap<PathBuf, LockRegistryEntry>>> {
193 lock_registry().lock().map_err(|_| {
194 crate::error::HematiteError::InternalError(
195 "Pager lock registry mutex is poisoned".to_string(),
196 )
197 })
198 }
199
200 fn database_identity_path(&self) -> Result<&PathBuf> {
201 self.database_identity.as_ref().ok_or_else(|| {
202 crate::error::HematiteError::InternalError(
203 "Pager database identity is not available".to_string(),
204 )
205 })
206 }
207
208 pub const CHECKSUM_METADATA_VERSION: u32 = 1;
209
210 pub fn new<P: AsRef<Path>>(path: P, cache_capacity: usize) -> Result<Self> {
211 let checksum_store_path = Some(Self::checksum_store_path(path.as_ref()));
212 let journal_path = Some(Self::journal_path(path.as_ref()));
213 let wal_path = Some(Self::wal_path(path.as_ref()));
214 let file_manager = FileManager::new(&path)?;
215 let database_identity = fs::canonicalize(path.as_ref())
216 .ok()
217 .or_else(|| Some(path.as_ref().to_path_buf()));
218 let mut pager = Self {
219 file_manager,
220 buffer_pool: BufferPool::new(cache_capacity),
221 dirty_pages: HashSet::new(),
222 page_checksums: HashMap::new(),
223 journal_mode: JournalMode::Rollback,
224 checksum_store_path,
225 journal_path,
226 wal_path,
227 database_identity,
228 lock_mode: PagerLockMode::None,
229 wal_read_snapshot: None,
230 latest_wal_state: None,
231 transaction: None,
232 buffer_pool_capacity: cache_capacity,
233 };
234 pager.recover_if_needed()?;
235 pager.load_persisted_state()?;
236 pager.load_latest_wal_state()?;
237 Ok(pager)
238 }
239
240 pub fn new_in_memory(cache_capacity: usize) -> Result<Self> {
241 let file_manager = FileManager::new_in_memory()?;
242 Ok(Self {
243 file_manager,
244 buffer_pool: BufferPool::new(cache_capacity),
245 dirty_pages: HashSet::new(),
246 page_checksums: HashMap::new(),
247 journal_mode: JournalMode::Rollback,
248 checksum_store_path: None,
249 journal_path: None,
250 wal_path: None,
251 database_identity: None,
252 lock_mode: PagerLockMode::None,
253 wal_read_snapshot: None,
254 latest_wal_state: None,
255 transaction: None,
256 buffer_pool_capacity: cache_capacity,
257 })
258 }
259
260 pub fn read_page(&mut self, page_id: PageId) -> Result<Page> {
261 if let Some(page) = self.buffer_pool.get(page_id) {
262 return Ok(page.clone());
263 }
264
265 if self.journal_mode == JournalMode::Wal {
266 if let Some(transaction) = &self.transaction {
267 if page_id >= self.file_manager.next_page_id()
268 && page_id < transaction.wal_next_page_id
269 {
270 let page = Page::new(page_id);
271 self.buffer_pool.put(page.clone());
272 return Ok(page);
273 }
274 }
275 }
276
277 if let Some(state) = self
278 .wal_read_snapshot
279 .as_ref()
280 .or(self.latest_wal_state.as_ref())
281 {
282 if let Some(data) = state.page_overrides.get(&page_id) {
283 let page = Page::from_bytes(page_id, data.clone())?;
284 if let Some(expected_checksum) = state.page_checksums.get(&page_id) {
285 let actual_checksum = Self::calculate_page_checksum(&page);
286 if actual_checksum != *expected_checksum {
287 return Err(crate::error::HematiteError::CorruptedData(format!(
288 "WAL page checksum mismatch for page {}: expected {}, got {}",
289 page_id, expected_checksum, actual_checksum
290 )));
291 }
292 }
293 self.buffer_pool.put(page.clone());
294 return Ok(page);
295 }
296 }
297
298 let page = self.file_manager.read_page(page_id)?;
299 let expected_checksum = self
300 .wal_read_snapshot
301 .as_ref()
302 .or(self.latest_wal_state.as_ref())
303 .and_then(|state| state.page_checksums.get(&page_id))
304 .or_else(|| self.page_checksums.get(&page_id));
305 if let Some(expected_checksum) = expected_checksum {
306 let actual_checksum = Self::calculate_page_checksum(&page);
307 if actual_checksum != *expected_checksum {
308 return Err(crate::error::HematiteError::CorruptedData(format!(
309 "Page checksum mismatch for page {}: expected {}, got {}",
310 page_id, expected_checksum, actual_checksum
311 )));
312 }
313 }
314 self.buffer_pool.put(page.clone());
315 Ok(page)
316 }
317
318 pub fn write_page(&mut self, page: Page) -> Result<()> {
319 let page_id = page.id;
320 self.snapshot_original_page(page_id)?;
321 if page_id != STORAGE_METADATA_PAGE_ID {
322 self.page_checksums
323 .insert(page_id, Self::calculate_page_checksum(&page));
324 }
325 self.buffer_pool.put(page);
326 self.dirty_pages.insert(page_id);
327 Ok(())
328 }
329
330 pub fn allocate_page(&mut self) -> Result<PageId> {
331 if self.journal_mode == JournalMode::Wal {
332 if let Some(transaction) = &mut self.transaction {
333 if let Some(page_id) = transaction.wal_free_pages.pop() {
334 return Ok(page_id);
335 }
336 let page_id = transaction.wal_next_page_id;
337 transaction.wal_next_page_id += 1;
338 return Ok(page_id);
339 }
340 }
341 self.file_manager.allocate_page()
342 }
343
344 pub fn deallocate_page(&mut self, page_id: PageId) -> Result<()> {
345 self.snapshot_original_page(page_id)?;
346 self.buffer_pool.remove(page_id);
347 self.dirty_pages.remove(&page_id);
348 self.page_checksums.remove(&page_id);
349 if self.journal_mode == JournalMode::Wal {
350 if let Some(transaction) = &mut self.transaction {
351 if !transaction.wal_free_pages.contains(&page_id) {
352 transaction.wal_free_pages.push(page_id);
353 }
354 compact_transaction_free_pages(transaction);
355 return Ok(());
356 }
357 self.file_manager.deallocate_page_deferred(page_id);
358 Ok(())
359 } else {
360 self.file_manager.deallocate_page(page_id)
361 }
362 }
363
364 pub fn flush(&mut self) -> Result<()> {
365 if self.journal_mode == JournalMode::Wal && self.transaction.is_some() {
366 return Err(crate::error::HematiteError::StorageError(
367 "Cannot flush pager pages directly during an active WAL transaction".to_string(),
368 ));
369 }
370
371 let dirty_ids = self.dirty_pages.iter().copied().collect::<Vec<_>>();
372 let mut metadata_page_dirty = false;
373
374 for page_id in dirty_ids.iter().copied() {
375 if page_id == STORAGE_METADATA_PAGE_ID {
376 metadata_page_dirty = true;
377 continue;
378 }
379
380 if let Some(page) = self.buffer_pool.get(page_id) {
381 self.file_manager.write_page(page)?;
382 }
383 self.dirty_pages.remove(&page_id);
384 }
385
386 if metadata_page_dirty {
388 if let Some(page) = self.buffer_pool.get(STORAGE_METADATA_PAGE_ID) {
389 self.file_manager.write_page(page)?;
390 }
391 self.dirty_pages.remove(&STORAGE_METADATA_PAGE_ID);
392 }
393 self.file_manager.flush()?;
394 self.persist_checksums()
395 }
396
397 pub fn begin_transaction(&mut self) -> Result<()> {
398 if self.transaction.is_some() {
399 return Err(crate::error::HematiteError::StorageError(
400 "Pager transaction is already active".to_string(),
401 ));
402 }
403
404 self.acquire_write_lock()?;
405
406 let transaction = PagerTransaction {
407 original_file_len: self.file_manager.file_len()?,
408 original_free_pages: self.file_manager.free_pages().to_vec(),
409 original_checksums: self.page_checksums.clone(),
410 wal_next_page_id: self.file_manager.next_page_id(),
411 wal_free_pages: self.file_manager.free_pages().to_vec(),
412 journaled_pages: HashSet::new(),
413 page_records: Vec::new(),
414 };
415 self.transaction = Some(transaction);
416 if self.journal_mode == JournalMode::Rollback {
417 self.persist_journal(JournalState::Active)?;
418 }
419 Ok(())
420 }
421
422 pub fn commit_transaction(&mut self) -> Result<()> {
423 if self.transaction.is_none() {
424 return Err(crate::error::HematiteError::StorageError(
425 "Pager transaction is not active".to_string(),
426 ));
427 }
428
429 if self.journal_mode == JournalMode::Wal {
430 self.commit_wal_transaction()?;
431 if self.can_checkpoint_wal()? {
432 self.checkpoint_wal_unlocked()?;
433 }
434 } else {
435 self.flush()?;
436 self.persist_journal(JournalState::Committed)?;
437 }
438 self.remove_journal_file()?;
439 self.transaction = None;
440 self.release_write_lock()?;
441 Ok(())
442 }
443
444 pub fn rollback_transaction(&mut self) -> Result<()> {
445 if self.transaction.is_none() {
446 return Err(crate::error::HematiteError::StorageError(
447 "Pager transaction is not active".to_string(),
448 ));
449 }
450
451 if self.journal_mode == JournalMode::Wal {
452 self.rollback_wal_transaction()?;
453 } else {
454 self.rollback_from_active_transaction()?;
455 self.remove_journal_file()?;
456 }
457 self.transaction = None;
458 self.release_write_lock()?;
459 Ok(())
460 }
461
462 pub fn transaction_active(&self) -> bool {
463 self.transaction.is_some()
464 }
465
466 pub(crate) fn snapshot(&self) -> Result<PagerSnapshot> {
467 Ok(PagerSnapshot {
468 file_manager: self.file_manager.snapshot()?,
469 buffer_pool: self.buffer_pool.clone(),
470 dirty_pages: self.dirty_pages.clone(),
471 page_checksums: self.page_checksums.clone(),
472 transaction: self.transaction.clone(),
473 })
474 }
475
476 pub(crate) fn restore_snapshot(&mut self, snapshot: PagerSnapshot) -> Result<()> {
477 self.file_manager.restore_snapshot(snapshot.file_manager)?;
478 self.buffer_pool = snapshot.buffer_pool;
479 self.dirty_pages = snapshot.dirty_pages;
480 self.page_checksums = snapshot.page_checksums;
481 self.transaction = snapshot.transaction;
482 Ok(())
483 }
484
485 pub fn begin_read(&mut self) -> Result<()> {
486 let previous_lock_mode = self.lock_mode;
487 self.acquire_shared_lock()?;
488 if let Err(err) = self.refresh_persisted_view() {
489 let _ = self.release_shared_lock();
490 return Err(err);
491 }
492 if self.journal_mode == JournalMode::Wal {
493 if matches!(previous_lock_mode, PagerLockMode::Write) {
494 return Ok(());
495 }
496 if matches!(previous_lock_mode, PagerLockMode::Shared { .. }) {
497 return Ok(());
498 }
499 let snapshot = self.snapshot_wal_visible_state()?;
500 self.register_wal_reader_sequence(snapshot.visible_sequence)?;
501 self.wal_read_snapshot = Some(snapshot);
502 }
503 Ok(())
504 }
505
506 pub fn end_read(&mut self) -> Result<()> {
507 if matches!(self.lock_mode, PagerLockMode::Shared { depth: 1 }) {
508 if let Some(snapshot) = &self.wal_read_snapshot {
509 self.unregister_wal_reader_sequence(snapshot.visible_sequence)?;
510 }
511 }
512 self.wal_read_snapshot = None;
513 self.release_shared_lock()
514 }
515
516 pub fn free_pages(&self) -> &[PageId] {
517 self.file_manager.free_pages()
518 }
519
520 pub fn set_free_pages(&mut self, free_pages: Vec<PageId>) {
521 self.file_manager.set_free_pages(free_pages);
522 }
523
524 pub fn checksum_entries(&self) -> Vec<(PageId, u32)> {
525 self.page_checksums
526 .iter()
527 .map(|(page_id, checksum)| (*page_id, *checksum))
528 .collect()
529 }
530
531 pub fn journal_mode(&self) -> JournalMode {
532 self.journal_mode
533 }
534
535 pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
536 if self.transaction.is_some() {
537 return Err(crate::error::HematiteError::StorageError(
538 "Cannot change pager journal mode during an active transaction".to_string(),
539 ));
540 }
541 if self.journal_mode == journal_mode {
542 return Ok(());
543 }
544 if self.journal_mode == JournalMode::Wal && journal_mode == JournalMode::Rollback {
545 if !self.can_checkpoint_wal()? {
546 return Err(crate::error::HematiteError::StorageError(
547 "Cannot switch from WAL while readers are active".to_string(),
548 ));
549 }
550 self.checkpoint_wal_unlocked()?;
551 }
552 if journal_mode == JournalMode::Rollback {
553 self.remove_wal_file()?;
554 self.latest_wal_state = None;
555 self.wal_read_snapshot = None;
556 } else {
557 self.remove_journal_file()?;
558 }
559 self.journal_mode = journal_mode;
560 if journal_mode == JournalMode::Wal {
561 self.load_latest_wal_state()?;
562 }
563 self.persist_checksums()
564 }
565
566 pub fn checkpoint_wal(&mut self) -> Result<()> {
567 if self.journal_mode != JournalMode::Wal {
568 return Ok(());
569 }
570 if self.transaction.is_some() {
571 return Err(crate::error::HematiteError::StorageError(
572 "Cannot checkpoint WAL during an active transaction".to_string(),
573 ));
574 }
575 if !self.can_checkpoint_wal()? {
576 return Err(crate::error::HematiteError::StorageError(
577 "Cannot checkpoint WAL while readers are active".to_string(),
578 ));
579 }
580 self.checkpoint_wal_unlocked()
581 }
582
583 pub fn replace_checksums(&mut self, checksums: HashMap<PageId, u32>) {
584 self.page_checksums = checksums;
585 }
586
587 pub fn file_len(&self) -> Result<u64> {
588 self.file_manager.file_len()
589 }
590
591 pub fn allocated_page_count(&self) -> usize {
592 self.file_manager.allocated_page_count()
593 }
594
595 pub fn fragmented_free_page_count(&self) -> usize {
596 self.file_manager.fragmented_free_page_count()
597 }
598
599 pub fn trailing_free_page_count(&self) -> usize {
600 self.file_manager.trailing_free_page_count()
601 }
602
603 pub fn validate_integrity(&mut self) -> Result<PagerIntegrityReport> {
604 let (max_page_id_exclusive, logical_free_pages, logical_checksums, wal_overrides) =
605 if let Some(state) = &self.latest_wal_state {
606 let page_regions =
607 state.file_len.saturating_sub(64) / crate::storage::PAGE_SIZE as u64;
608 (
609 (page_regions as u32).max(2),
610 state.free_pages.clone(),
611 state.page_checksums.clone(),
612 state.page_overrides.clone(),
613 )
614 } else {
615 (
616 self.file_manager.next_page_id(),
617 self.file_manager.free_pages().to_vec(),
618 self.page_checksums.clone(),
619 HashMap::new(),
620 )
621 };
622
623 let mut free_pages = HashSet::new();
624
625 for &page_id in &logical_free_pages {
626 if page_id == DB_HEADER_PAGE_ID || page_id == STORAGE_METADATA_PAGE_ID {
627 return Err(crate::error::HematiteError::CorruptedData(format!(
628 "Reserved page {} cannot be marked free",
629 page_id
630 )));
631 }
632
633 if page_id >= max_page_id_exclusive {
634 return Err(crate::error::HematiteError::CorruptedData(format!(
635 "Free page {} exceeds allocated page range (next_page_id={})",
636 page_id, max_page_id_exclusive
637 )));
638 }
639
640 if !free_pages.insert(page_id) {
641 return Err(crate::error::HematiteError::CorruptedData(format!(
642 "Duplicate free page {} detected",
643 page_id
644 )));
645 }
646 }
647
648 if logical_checksums.contains_key(&STORAGE_METADATA_PAGE_ID) {
649 return Err(crate::error::HematiteError::CorruptedData(format!(
650 "Storage metadata page {} must not have pager checksum metadata",
651 STORAGE_METADATA_PAGE_ID
652 )));
653 }
654
655 let checksummed_pages = logical_checksums.into_iter().collect::<Vec<_>>();
656 let checksummed_page_count = checksummed_pages.len();
657
658 let mut verified_checksum_pages = 0usize;
659 for (page_id, expected_checksum) in checksummed_pages {
660 if page_id >= max_page_id_exclusive {
661 return Err(crate::error::HematiteError::CorruptedData(format!(
662 "Checksum entry for page {} exceeds allocated page range (next_page_id={})",
663 page_id, max_page_id_exclusive
664 )));
665 }
666
667 if free_pages.contains(&page_id) {
668 return Err(crate::error::HematiteError::CorruptedData(format!(
669 "Page {} has checksum metadata but is marked free",
670 page_id
671 )));
672 }
673
674 let page = if self.dirty_pages.contains(&page_id) {
675 self.buffer_pool.get(page_id).cloned().ok_or_else(|| {
676 crate::error::HematiteError::StorageError(format!(
677 "Dirty page {} missing from buffer pool",
678 page_id
679 ))
680 })?
681 } else if let Some(data) = wal_overrides.get(&page_id) {
682 Page::from_bytes(page_id, data.clone())?
683 } else {
684 self.file_manager.read_page(page_id)?
685 };
686
687 let actual_checksum = Self::calculate_page_checksum(&page);
688 if actual_checksum != expected_checksum {
689 return Err(crate::error::HematiteError::CorruptedData(format!(
690 "Page checksum mismatch for page {}: expected {}, got {}",
691 page_id, expected_checksum, actual_checksum
692 )));
693 }
694
695 verified_checksum_pages += 1;
696 }
697
698 Ok(PagerIntegrityReport {
699 allocated_page_count: self.file_manager.allocated_page_count(),
700 free_page_count: free_pages.len(),
701 fragmented_free_page_count: self.file_manager.fragmented_free_page_count(),
702 trailing_free_page_count: self.file_manager.trailing_free_page_count(),
703 checksummed_page_count,
704 verified_checksum_pages,
705 })
706 }
707
708 fn calculate_page_checksum(page: &Page) -> u32 {
709 let mut hash: u32 = 0x811C9DC5;
710 for byte in &page.data {
711 hash ^= u32::from(*byte);
712 hash = hash.wrapping_mul(0x01000193);
713 }
714 hash
715 }
716
717 #[cfg(test)]
718 pub(crate) fn dirty_page_count(&self) -> usize {
719 self.dirty_pages.len()
720 }
721
722 #[cfg(test)]
723 pub(crate) fn wal_snapshot_sequence(&self) -> Option<u64> {
724 self.wal_read_snapshot
725 .as_ref()
726 .map(|snapshot| snapshot.visible_sequence)
727 }
728
729 fn checksum_store_path(db_path: &Path) -> PathBuf {
730 let mut file_name = db_path
731 .file_name()
732 .map(OsString::from)
733 .unwrap_or_else(|| OsString::from("hematite.db"));
734 file_name.push(".pager_checksums");
735 match db_path.parent() {
736 Some(parent) => parent.join(file_name),
737 None => PathBuf::from(file_name),
738 }
739 }
740
741 fn acquire_shared_lock(&mut self) -> Result<()> {
742 if self.database_identity.is_none() {
743 return Ok(());
744 }
745
746 match self.lock_mode {
747 PagerLockMode::Write if self.journal_mode == JournalMode::Wal => return Ok(()),
748 PagerLockMode::Write => return Ok(()),
749 PagerLockMode::Shared { depth } => {
750 self.lock_mode = PagerLockMode::Shared { depth: depth + 1 };
751 return Ok(());
752 }
753 PagerLockMode::None => {}
754 }
755
756 let path = self.database_identity_path()?.clone();
757 let mut registry = self.lock_registry_map()?;
758 let entry = registry.entry(path).or_default();
759 if entry.writer && self.journal_mode == JournalMode::Rollback {
760 return Err(crate::error::HematiteError::StorageError(
761 "database is locked for writing".to_string(),
762 ));
763 }
764 entry.readers += 1;
765 self.lock_mode = PagerLockMode::Shared { depth: 1 };
766 Ok(())
767 }
768
769 fn release_shared_lock(&mut self) -> Result<()> {
770 let Some(path) = self.database_identity.as_ref() else {
771 return Ok(());
772 };
773
774 match self.lock_mode {
775 PagerLockMode::Write | PagerLockMode::None => return Ok(()),
776 PagerLockMode::Shared { depth } if depth > 1 => {
777 self.lock_mode = PagerLockMode::Shared { depth: depth - 1 };
778 return Ok(());
779 }
780 PagerLockMode::Shared { .. } => {}
781 }
782
783 let mut registry = self.lock_registry_map()?;
784 if let Some(entry) = registry.get_mut(path) {
785 entry.readers = entry.readers.saturating_sub(1);
786 if entry.readers == 0 && !entry.writer {
787 registry.remove(path);
788 }
789 }
790 self.lock_mode = PagerLockMode::None;
791 Ok(())
792 }
793
794 fn register_wal_reader_sequence(&self, sequence: u64) -> Result<()> {
795 let Some(path) = self.database_identity.as_ref() else {
796 return Ok(());
797 };
798 let mut registry = self.lock_registry_map()?;
799 let entry = registry.entry(path.clone()).or_default();
800 *entry.wal_reader_sequences.entry(sequence).or_insert(0) += 1;
801 Ok(())
802 }
803
804 fn unregister_wal_reader_sequence(&self, sequence: u64) -> Result<()> {
805 let Some(path) = self.database_identity.as_ref() else {
806 return Ok(());
807 };
808 let mut registry = self.lock_registry_map()?;
809 if let Some(entry) = registry.get_mut(path) {
810 if let Some(count) = entry.wal_reader_sequences.get_mut(&sequence) {
811 *count = count.saturating_sub(1);
812 if *count == 0 {
813 entry.wal_reader_sequences.remove(&sequence);
814 }
815 }
816 if entry.readers == 0 && !entry.writer && entry.wal_reader_sequences.is_empty() {
817 registry.remove(path);
818 }
819 }
820 Ok(())
821 }
822
823 fn acquire_write_lock(&mut self) -> Result<()> {
824 if self.database_identity.is_none() {
825 self.lock_mode = PagerLockMode::Write;
826 return Ok(());
827 }
828 if self.lock_mode == PagerLockMode::Write {
829 return Ok(());
830 }
831 if matches!(self.lock_mode, PagerLockMode::Shared { .. }) {
832 return Err(crate::error::HematiteError::StorageError(
833 "cannot upgrade a shared database lock to a write lock".to_string(),
834 ));
835 }
836
837 let path = self.database_identity_path()?.clone();
838 let mut registry = self.lock_registry_map()?;
839 let entry = registry.entry(path).or_default();
840 if entry.writer || (self.journal_mode == JournalMode::Rollback && entry.readers > 0) {
841 return Err(crate::error::HematiteError::StorageError(
842 "database is locked".to_string(),
843 ));
844 }
845 entry.writer = true;
846 self.lock_mode = PagerLockMode::Write;
847 Ok(())
848 }
849
850 fn release_write_lock(&mut self) -> Result<()> {
851 let Some(path) = self.database_identity.as_ref() else {
852 self.lock_mode = PagerLockMode::None;
853 return Ok(());
854 };
855 if self.lock_mode != PagerLockMode::Write {
856 return Ok(());
857 }
858
859 let mut registry = self.lock_registry_map()?;
860 if let Some(entry) = registry.get_mut(path) {
861 entry.writer = false;
862 if entry.readers == 0 {
863 registry.remove(path);
864 }
865 }
866 self.lock_mode = PagerLockMode::None;
867 Ok(())
868 }
869
870 fn journal_path(db_path: &Path) -> PathBuf {
871 let mut file_name = db_path
872 .file_name()
873 .map(OsString::from)
874 .unwrap_or_else(|| OsString::from("hematite.db"));
875 file_name.push(".journal");
876 match db_path.parent() {
877 Some(parent) => parent.join(file_name),
878 None => PathBuf::from(file_name),
879 }
880 }
881
882 fn wal_path(db_path: &Path) -> PathBuf {
883 let mut file_name = db_path
884 .file_name()
885 .map(OsString::from)
886 .unwrap_or_else(|| OsString::from("hematite.db"));
887 file_name.push(".wal");
888 match db_path.parent() {
889 Some(parent) => parent.join(file_name),
890 None => PathBuf::from(file_name),
891 }
892 }
893
894 fn load_persisted_state(&mut self) -> Result<()> {
895 let Some(path) = &self.checksum_store_path else {
896 return Ok(());
897 };
898
899 let contents = match fs::read_to_string(path) {
900 Ok(contents) => contents,
901 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
902 Err(err) => return Err(err.into()),
903 };
904
905 let mut lines = contents.lines();
906 let version = lines
907 .next()
908 .ok_or_else(|| {
909 crate::error::HematiteError::StorageError(
910 "Missing pager checksum metadata version".to_string(),
911 )
912 })?
913 .strip_prefix("version=")
914 .ok_or_else(|| {
915 crate::error::HematiteError::StorageError(
916 "Pager checksum metadata is missing version prefix".to_string(),
917 )
918 })?
919 .parse::<u32>()
920 .map_err(|_| {
921 crate::error::HematiteError::StorageError(
922 "Invalid pager checksum metadata version".to_string(),
923 )
924 })?;
925
926 if version != Self::CHECKSUM_METADATA_VERSION {
927 return Err(crate::error::HematiteError::StorageError(format!(
928 "Unsupported pager checksum metadata version: expected {}, got {}",
929 Self::CHECKSUM_METADATA_VERSION,
930 version
931 )));
932 }
933
934 let mut next_line = lines.next().ok_or_else(|| {
935 crate::error::HematiteError::StorageError(
936 "Missing pager freelist metadata count".to_string(),
937 )
938 })?;
939
940 if let Some(mode) = next_line.strip_prefix("journal_mode=") {
941 self.journal_mode = JournalMode::parse(mode)?;
942 next_line = lines.next().ok_or_else(|| {
943 crate::error::HematiteError::StorageError(
944 "Missing pager freelist metadata count".to_string(),
945 )
946 })?;
947 } else {
948 self.journal_mode = JournalMode::Rollback;
949 }
950
951 let expected_free_count = next_line
952 .strip_prefix("free_count=")
953 .ok_or_else(|| {
954 crate::error::HematiteError::StorageError(
955 "Pager freelist metadata is missing count prefix".to_string(),
956 )
957 })?
958 .parse::<usize>()
959 .map_err(|_| {
960 crate::error::HematiteError::StorageError(
961 "Invalid pager freelist metadata count".to_string(),
962 )
963 })?;
964
965 let mut free_pages = Vec::with_capacity(expected_free_count);
966 for _ in 0..expected_free_count {
967 let line = lines.next().ok_or_else(|| {
968 crate::error::HematiteError::StorageError(
969 "Pager freelist metadata ended early".to_string(),
970 )
971 })?;
972 let page_id = line
973 .strip_prefix("free|")
974 .ok_or_else(|| {
975 crate::error::HematiteError::StorageError(
976 "Invalid pager freelist metadata record".to_string(),
977 )
978 })?
979 .parse::<u32>()
980 .map(|page_id| page_id)
981 .map_err(|_| {
982 crate::error::HematiteError::StorageError(
983 "Invalid pager freelist page id".to_string(),
984 )
985 })?;
986 free_pages.push(page_id);
987 }
988
989 let expected_count = lines
990 .next()
991 .ok_or_else(|| {
992 crate::error::HematiteError::StorageError(
993 "Missing pager checksum metadata count".to_string(),
994 )
995 })?
996 .strip_prefix("checksum_count=")
997 .ok_or_else(|| {
998 crate::error::HematiteError::StorageError(
999 "Pager checksum metadata is missing count prefix".to_string(),
1000 )
1001 })?
1002 .parse::<usize>()
1003 .map_err(|_| {
1004 crate::error::HematiteError::StorageError(
1005 "Invalid pager checksum metadata count".to_string(),
1006 )
1007 })?;
1008
1009 let mut checksums = HashMap::new();
1010 for line in lines {
1011 if line.is_empty() {
1012 continue;
1013 }
1014 let payload = line.strip_prefix("checksum|").ok_or_else(|| {
1015 crate::error::HematiteError::StorageError(
1016 "Invalid pager checksum metadata record".to_string(),
1017 )
1018 })?;
1019 let parts = payload.split('|').collect::<Vec<_>>();
1020 if parts.len() != 2 {
1021 return Err(crate::error::HematiteError::StorageError(
1022 "Invalid pager checksum metadata record".to_string(),
1023 ));
1024 }
1025 let page_id = parts[0].parse::<u32>().map_err(|_| {
1026 crate::error::HematiteError::StorageError(
1027 "Invalid pager checksum page id".to_string(),
1028 )
1029 })?;
1030 let checksum = parts[1].parse::<u32>().map_err(|_| {
1031 crate::error::HematiteError::StorageError(
1032 "Invalid pager checksum value".to_string(),
1033 )
1034 })?;
1035 if checksums.insert(page_id, checksum).is_some() {
1036 return Err(crate::error::HematiteError::StorageError(format!(
1037 "Duplicate pager checksum entry for page {}",
1038 page_id
1039 )));
1040 }
1041 }
1042
1043 if checksums.len() != expected_count {
1044 return Err(crate::error::HematiteError::StorageError(format!(
1045 "Pager checksum metadata count mismatch: expected {}, got {}",
1046 expected_count,
1047 checksums.len()
1048 )));
1049 }
1050
1051 self.file_manager.set_free_pages(free_pages);
1052 self.page_checksums = checksums;
1053 Ok(())
1054 }
1055
1056 fn refresh_persisted_view(&mut self) -> Result<()> {
1057 if self.transaction.is_some() || !self.dirty_pages.is_empty() {
1058 return Ok(());
1059 }
1060
1061 self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1062 self.load_persisted_state()?;
1063 self.load_latest_wal_state()
1064 }
1065
1066 fn snapshot_wal_visible_state(&mut self) -> Result<VisibleWalState> {
1067 if let Some(state) = &self.latest_wal_state {
1068 return Ok(state.clone());
1069 }
1070
1071 Ok(VisibleWalState {
1072 visible_sequence: 0,
1073 file_len: self.file_manager.file_len()?,
1074 free_pages: self.file_manager.free_pages().to_vec(),
1075 page_checksums: self.page_checksums.clone(),
1076 page_overrides: HashMap::new(),
1077 })
1078 }
1079
1080 fn load_latest_wal_state(&mut self) -> Result<()> {
1081 if self.journal_mode != JournalMode::Wal {
1082 self.latest_wal_state = None;
1083 return Ok(());
1084 }
1085
1086 let Some(path) = &self.wal_path else {
1087 self.latest_wal_state = None;
1088 return Ok(());
1089 };
1090
1091 self.latest_wal_state = WalRecord::load_visible_state_from_path(path)?;
1092 Ok(())
1093 }
1094
1095 fn persist_checksums(&self) -> Result<()> {
1096 let Some(path) = &self.checksum_store_path else {
1097 return Ok(());
1098 };
1099
1100 let mut entries = self
1101 .page_checksums
1102 .iter()
1103 .map(|(page_id, checksum)| (*page_id, *checksum))
1104 .collect::<Vec<_>>();
1105 entries.sort_by_key(|(page_id, _)| *page_id);
1106
1107 let mut lines = vec![
1108 format!("version={}", Self::CHECKSUM_METADATA_VERSION),
1109 format!("journal_mode={}", self.journal_mode.as_str()),
1110 format!("free_count={}", self.file_manager.free_pages().len()),
1111 ];
1112 for page_id in self.file_manager.free_pages() {
1113 lines.push(format!("free|{}", page_id));
1114 }
1115 lines.push(format!("checksum_count={}", entries.len()));
1116 for (page_id, checksum) in entries {
1117 lines.push(format!("checksum|{}|{}", page_id, checksum));
1118 }
1119
1120 fs::write(path, lines.join("\n"))?;
1121 Ok(())
1122 }
1123
1124 fn snapshot_original_page(&mut self, page_id: PageId) -> Result<()> {
1125 if self.journal_mode == JournalMode::Wal {
1126 return Ok(());
1127 }
1128
1129 let Some(transaction) = &mut self.transaction else {
1130 return Ok(());
1131 };
1132
1133 if transaction.journaled_pages.contains(&page_id) {
1134 return Ok(());
1135 }
1136
1137 let page_end = 64 + ((page_id as u64 + 1) * crate::storage::PAGE_SIZE as u64);
1138 if page_end > transaction.original_file_len {
1139 return Ok(());
1140 }
1141
1142 let page = self.file_manager.read_page(page_id)?;
1143 transaction.page_records.push(JournalRecord {
1144 page_id,
1145 data: page.data,
1146 });
1147 transaction.journaled_pages.insert(page_id);
1148 self.persist_journal(JournalState::Active)
1149 }
1150
1151 fn persist_journal(&self, state: JournalState) -> Result<()> {
1152 let Some(transaction) = &self.transaction else {
1153 return Ok(());
1154 };
1155 let Some(path) = &self.journal_path else {
1156 return Ok(());
1157 };
1158
1159 let journal = RollbackJournal {
1160 state,
1161 original_file_len: transaction.original_file_len,
1162 original_free_pages: transaction.original_free_pages.clone(),
1163 original_checksums: transaction
1164 .original_checksums
1165 .iter()
1166 .map(|(page_id, checksum)| (*page_id, *checksum))
1167 .collect(),
1168 page_records: transaction.page_records.clone(),
1169 };
1170 let bytes = journal.encode()?;
1171 let mut file = OpenOptions::new()
1172 .create(true)
1173 .write(true)
1174 .truncate(true)
1175 .open(path)?;
1176 file.write_all(&bytes)?;
1177 file.sync_all()?;
1178 Ok(())
1179 }
1180
1181 fn remove_journal_file(&self) -> Result<()> {
1182 let Some(path) = &self.journal_path else {
1183 return Ok(());
1184 };
1185 match fs::remove_file(path) {
1186 Ok(()) => Ok(()),
1187 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
1188 Err(err) => Err(err.into()),
1189 }
1190 }
1191
1192 fn remove_wal_file(&self) -> Result<()> {
1193 let Some(path) = &self.wal_path else {
1194 return Ok(());
1195 };
1196 match fs::remove_file(path) {
1197 Ok(()) => Ok(()),
1198 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
1199 Err(err) => Err(err.into()),
1200 }
1201 }
1202
1203 fn recover_if_needed(&mut self) -> Result<()> {
1204 let Some(path) = &self.journal_path else {
1205 return Ok(());
1206 };
1207 let bytes = match fs::read(path) {
1208 Ok(bytes) => bytes,
1209 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1210 Err(err) => return Err(err.into()),
1211 };
1212
1213 let journal = RollbackJournal::decode(&bytes)?;
1214 match journal.state {
1215 JournalState::Active => {
1216 self.restore_from_journal(&journal)?;
1217 self.remove_journal_file()?;
1218 }
1219 JournalState::Committed => {
1220 self.remove_journal_file()?;
1221 }
1222 }
1223 Ok(())
1224 }
1225
1226 fn rollback_from_active_transaction(&mut self) -> Result<()> {
1227 let transaction = self.transaction.clone().ok_or_else(|| {
1228 crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1229 })?;
1230 let journal = RollbackJournal {
1231 state: JournalState::Active,
1232 original_file_len: transaction.original_file_len,
1233 original_free_pages: transaction.original_free_pages,
1234 original_checksums: transaction
1235 .original_checksums
1236 .into_iter()
1237 .collect::<Vec<_>>(),
1238 page_records: transaction.page_records,
1239 };
1240 self.restore_from_journal(&journal)
1241 }
1242
1243 fn restore_from_journal(&mut self, journal: &RollbackJournal) -> Result<()> {
1244 self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1245 self.dirty_pages.clear();
1246 self.file_manager
1247 .restore_file_len(journal.original_file_len)?;
1248 self.file_manager
1249 .set_free_pages(journal.original_free_pages.clone());
1250
1251 for record in &journal.page_records {
1252 let page = Page::from_bytes(record.page_id, record.data.clone())?;
1253 self.file_manager.write_page(&page)?;
1254 }
1255 self.file_manager.flush()?;
1256
1257 self.page_checksums = journal.original_checksums.iter().copied().collect();
1258 self.persist_checksums()
1259 }
1260
1261 fn rollback_wal_transaction(&mut self) -> Result<()> {
1262 let transaction = self.transaction.clone().ok_or_else(|| {
1263 crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1264 })?;
1265 self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1266 self.dirty_pages.clear();
1267 self.page_checksums = transaction.original_checksums;
1268 self.load_latest_wal_state()
1269 }
1270
1271 fn commit_wal_transaction(&mut self) -> Result<()> {
1272 let transaction = self.transaction.as_ref().ok_or_else(|| {
1273 crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1274 })?;
1275 let next_sequence = self
1276 .latest_wal_state
1277 .as_ref()
1278 .map(|state| state.visible_sequence + 1)
1279 .unwrap_or(1);
1280
1281 let mut page_ids = self.dirty_pages.iter().copied().collect::<Vec<_>>();
1282 page_ids.sort_unstable();
1283
1284 let mut frames = Vec::with_capacity(page_ids.len());
1285 for page_id in page_ids {
1286 let page = self.buffer_pool.get(page_id).cloned().ok_or_else(|| {
1287 crate::error::HematiteError::StorageError(format!(
1288 "Dirty page {} missing from buffer pool",
1289 page_id
1290 ))
1291 })?;
1292 frames.push(WalFrame {
1293 page_id,
1294 data: page.data,
1295 });
1296 }
1297
1298 let mut checksums = self
1299 .page_checksums
1300 .iter()
1301 .map(|(page_id, checksum)| (*page_id, *checksum))
1302 .collect::<Vec<_>>();
1303 checksums.sort_by_key(|(page_id, _)| *page_id);
1304
1305 let record = WalRecord {
1306 sequence: next_sequence,
1307 file_len: 64 + transaction.wal_next_page_id as u64 * crate::storage::PAGE_SIZE as u64,
1308 free_pages: transaction.wal_free_pages.clone(),
1309 checksums,
1310 frames,
1311 };
1312
1313 self.append_wal_record(record)?;
1314 self.dirty_pages.clear();
1315 self.persist_checksums()
1316 }
1317
1318 fn append_wal_record(&mut self, record: WalRecord) -> Result<()> {
1319 if let Some(path) = &self.wal_path {
1320 WalRecord::append_to_path(path, &record)?;
1321 } else {
1322 self.latest_wal_state = Some(VisibleWalState {
1323 visible_sequence: record.sequence,
1324 file_len: record.file_len,
1325 free_pages: record.free_pages.clone(),
1326 page_checksums: record.checksums.iter().copied().collect(),
1327 page_overrides: record
1328 .frames
1329 .iter()
1330 .map(|frame| (frame.page_id, frame.data.clone()))
1331 .collect(),
1332 });
1333 return Ok(());
1334 }
1335
1336 self.load_latest_wal_state()
1337 }
1338
1339 fn can_checkpoint_wal(&self) -> Result<bool> {
1340 if self.database_identity.is_none() {
1341 return Ok(true);
1342 }
1343
1344 let path = self.database_identity_path()?;
1345 let registry = self.lock_registry_map()?;
1346 let Some(entry) = registry.get(path) else {
1347 return Ok(true);
1348 };
1349
1350 if entry.writer && self.lock_mode != PagerLockMode::Write {
1351 return Ok(false);
1352 }
1353 if entry.readers == 0 {
1354 return Ok(true);
1355 }
1356 let latest_sequence = self
1357 .latest_wal_state
1358 .as_ref()
1359 .map(|state| state.visible_sequence)
1360 .unwrap_or(0);
1361 Ok(entry
1362 .wal_reader_sequences
1363 .keys()
1364 .all(|sequence| *sequence == latest_sequence))
1365 }
1366
1367 fn checkpoint_wal_unlocked(&mut self) -> Result<()> {
1368 let Some(state) = self.latest_wal_state.clone() else {
1369 self.remove_wal_file()?;
1370 return Ok(());
1371 };
1372
1373 self.file_manager.restore_file_len(state.file_len)?;
1374 self.file_manager.set_free_pages(state.free_pages.clone());
1375 self.file_manager.compact_free_pages()?;
1376 for (page_id, data) in &state.page_overrides {
1377 let page = Page::from_bytes(*page_id, data.clone())?;
1378 self.file_manager.write_page(&page)?;
1379 }
1380 self.file_manager.flush()?;
1381 self.page_checksums = state.page_checksums;
1382 self.latest_wal_state = None;
1383 self.wal_read_snapshot = None;
1384 self.remove_wal_file()?;
1385 self.persist_checksums()
1386 }
1387}
1388
1389impl Drop for Pager {
1390 fn drop(&mut self) {
1391 match self.lock_mode {
1392 PagerLockMode::Write => {
1393 let _ = self.release_write_lock();
1394 }
1395 PagerLockMode::Shared { .. } => {
1396 let _ = self.release_shared_lock();
1397 }
1398 PagerLockMode::None => {}
1399 }
1400 }
1401}