Skip to main content

hematite/storage/
pager.rs

1//! Pager implementation.
2//!
3//! The pager is the only stateful component in the storage layer. It presents the rest of the
4//! system with a logical database made of fixed-size pages and hides the machinery required to
5//! make those pages durable, reusable, and transactionally visible.
6//!
7//! Main file layout:
8//!
9//! ```text
10//! byte offset 0
11//! +-------------------------------+
12//! | 64-byte file header region    |
13//! +-------------------------------+
14//! | logical page 0  | db header   |
15//! +-----------------+-------------+
16//! | logical page 1  | metadata    |
17//! +-----------------+-------------+
18//! | logical page 2+ | payload     |
19//! +-----------------+-------------+
20//! ```
21//!
22//! Core state inside the pager:
23//!
24//! ```text
25//!                  caller
26//!                    |
27//!                    v
28//!      +----------------------------------+
29//!      | read/write/allocate/deallocate    |
30//!      +----------------------------------+
31//!                    |
32//!      +-------------+--------------+
33//!      |                            |
34//!      v                            v
35//! buffer pool                 transaction state
36//! dirty page ids              original pages / WAL frames
37//! checksum cache              free-page deltas / file-len deltas
38//!      |                            |
39//!      +-------------+--------------+
40//!                    |
41//!                    v
42//!              file manager
43//! ```
44//!
45//! Commit algorithms:
46//!
47//! Rollback mode:
48//! - capture the original page image before first write;
49//! - persist the rollback journal;
50//! - flush dirty main-file pages;
51//! - finalize by deleting the journal.
52//!
53//! WAL mode:
54//! - keep page mutations local to the transaction;
55//! - append a committed record containing page frames plus pager-visible metadata;
56//! - reconstruct reader-visible state by overlaying WAL frames on the main file;
57//! - checkpoint later by copying the visible state back into the main file.
58//!
59//! Reader visibility in WAL mode:
60//!
61//! ```text
62//! main-file page bytes
63//!        +
64//! last committed WAL sequence visible to this reader
65//!        +
66//! WAL frame overrides + checksum overrides + freelist snapshot
67//!        =
68//! effective database image
69//! ```
70//!
71//! Important invariants:
72//! - page allocation and freelist state must stay consistent with both the durable file and any
73//!   in-flight transaction state;
74//! - checksum metadata is part of the durable storage model, not optional verification data;
75//! - checkpoints cannot discard page images that are still needed by an active reader snapshot;
76//! - higher layers never see partial page writes or raw filesystem ordering concerns.
77
78use crate::error::Result;
79use crate::storage::journal::{JournalRecord, JournalState, RollbackJournal};
80use crate::storage::wal::{VisibleWalState, WalFrame, WalRecord};
81use crate::storage::{
82    buffer_pool::BufferPool,
83    file_manager::{FileManager, FileManagerSnapshot},
84    Page, PageId, PagerIntegrityReport, DB_HEADER_PAGE_ID, STORAGE_METADATA_PAGE_ID,
85};
86use std::collections::{HashMap, HashSet};
87use std::ffi::OsString;
88use std::fs::{self, OpenOptions};
89use std::io::Write;
90use std::path::Path;
91use std::path::PathBuf;
92use std::sync::{Mutex, MutexGuard, OnceLock};
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum JournalMode {
96    Rollback,
97    Wal,
98}
99
100impl JournalMode {
101    fn parse(value: &str) -> Result<Self> {
102        match value {
103            "rollback" => Ok(Self::Rollback),
104            "wal" => Ok(Self::Wal),
105            _ => Err(crate::error::HematiteError::StorageError(format!(
106                "Unsupported pager journal mode '{}'",
107                value
108            ))),
109        }
110    }
111
112    fn as_str(self) -> &'static str {
113        match self {
114            Self::Rollback => "rollback",
115            Self::Wal => "wal",
116        }
117    }
118}
119
120#[derive(Debug, Clone)]
121struct PagerTransaction {
122    original_file_len: u64,
123    original_free_pages: Vec<PageId>,
124    original_checksums: HashMap<PageId, u32>,
125    wal_next_page_id: PageId,
126    wal_free_pages: Vec<PageId>,
127    journaled_pages: HashSet<PageId>,
128    page_records: Vec<JournalRecord>,
129}
130
131#[derive(Debug, Clone)]
132pub(crate) struct PagerSnapshot {
133    file_manager: FileManagerSnapshot,
134    buffer_pool: BufferPool,
135    dirty_pages: HashSet<PageId>,
136    page_checksums: HashMap<PageId, u32>,
137    transaction: Option<PagerTransaction>,
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141enum PagerLockMode {
142    None,
143    Shared { depth: usize },
144    Write,
145}
146
147#[derive(Debug, Clone, Default)]
148struct LockRegistryEntry {
149    readers: usize,
150    writer: bool,
151    wal_reader_sequences: HashMap<u64, usize>,
152}
153
154fn compact_transaction_free_pages(transaction: &mut PagerTransaction) {
155    transaction.wal_free_pages.sort_unstable();
156    transaction.wal_free_pages.dedup();
157    while let Some(&last_page_id) = transaction.wal_free_pages.last() {
158        if last_page_id + 1 != transaction.wal_next_page_id {
159            break;
160        }
161        transaction.wal_free_pages.pop();
162        transaction.wal_next_page_id = transaction.wal_next_page_id.saturating_sub(1);
163    }
164}
165
166fn lock_registry() -> &'static Mutex<HashMap<PathBuf, LockRegistryEntry>> {
167    static REGISTRY: OnceLock<Mutex<HashMap<PathBuf, LockRegistryEntry>>> = OnceLock::new();
168    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
169}
170
171#[derive(Debug)]
172pub struct Pager {
173    file_manager: FileManager,
174    buffer_pool: BufferPool,
175    dirty_pages: HashSet<PageId>,
176    page_checksums: HashMap<PageId, u32>,
177    journal_mode: JournalMode,
178    checksum_store_path: Option<PathBuf>,
179    journal_path: Option<PathBuf>,
180    wal_path: Option<PathBuf>,
181    database_identity: Option<PathBuf>,
182    lock_mode: PagerLockMode,
183    wal_read_snapshot: Option<VisibleWalState>,
184    latest_wal_state: Option<VisibleWalState>,
185    transaction: Option<PagerTransaction>,
186    buffer_pool_capacity: usize,
187}
188
189impl Pager {
190    fn lock_registry_map(
191        &self,
192    ) -> Result<MutexGuard<'static, HashMap<PathBuf, LockRegistryEntry>>> {
193        lock_registry().lock().map_err(|_| {
194            crate::error::HematiteError::InternalError(
195                "Pager lock registry mutex is poisoned".to_string(),
196            )
197        })
198    }
199
200    fn database_identity_path(&self) -> Result<&PathBuf> {
201        self.database_identity.as_ref().ok_or_else(|| {
202            crate::error::HematiteError::InternalError(
203                "Pager database identity is not available".to_string(),
204            )
205        })
206    }
207
208    pub const CHECKSUM_METADATA_VERSION: u32 = 1;
209
210    pub fn new<P: AsRef<Path>>(path: P, cache_capacity: usize) -> Result<Self> {
211        let checksum_store_path = Some(Self::checksum_store_path(path.as_ref()));
212        let journal_path = Some(Self::journal_path(path.as_ref()));
213        let wal_path = Some(Self::wal_path(path.as_ref()));
214        let file_manager = FileManager::new(&path)?;
215        let database_identity = fs::canonicalize(path.as_ref())
216            .ok()
217            .or_else(|| Some(path.as_ref().to_path_buf()));
218        let mut pager = Self {
219            file_manager,
220            buffer_pool: BufferPool::new(cache_capacity),
221            dirty_pages: HashSet::new(),
222            page_checksums: HashMap::new(),
223            journal_mode: JournalMode::Rollback,
224            checksum_store_path,
225            journal_path,
226            wal_path,
227            database_identity,
228            lock_mode: PagerLockMode::None,
229            wal_read_snapshot: None,
230            latest_wal_state: None,
231            transaction: None,
232            buffer_pool_capacity: cache_capacity,
233        };
234        pager.recover_if_needed()?;
235        pager.load_persisted_state()?;
236        pager.load_latest_wal_state()?;
237        Ok(pager)
238    }
239
240    pub fn new_in_memory(cache_capacity: usize) -> Result<Self> {
241        let file_manager = FileManager::new_in_memory()?;
242        Ok(Self {
243            file_manager,
244            buffer_pool: BufferPool::new(cache_capacity),
245            dirty_pages: HashSet::new(),
246            page_checksums: HashMap::new(),
247            journal_mode: JournalMode::Rollback,
248            checksum_store_path: None,
249            journal_path: None,
250            wal_path: None,
251            database_identity: None,
252            lock_mode: PagerLockMode::None,
253            wal_read_snapshot: None,
254            latest_wal_state: None,
255            transaction: None,
256            buffer_pool_capacity: cache_capacity,
257        })
258    }
259
260    pub fn read_page(&mut self, page_id: PageId) -> Result<Page> {
261        if let Some(page) = self.buffer_pool.get(page_id) {
262            return Ok(page.clone());
263        }
264
265        if self.journal_mode == JournalMode::Wal {
266            if let Some(transaction) = &self.transaction {
267                if page_id >= self.file_manager.next_page_id()
268                    && page_id < transaction.wal_next_page_id
269                {
270                    let page = Page::new(page_id);
271                    self.buffer_pool.put(page.clone());
272                    return Ok(page);
273                }
274            }
275        }
276
277        if let Some(state) = self
278            .wal_read_snapshot
279            .as_ref()
280            .or(self.latest_wal_state.as_ref())
281        {
282            if let Some(data) = state.page_overrides.get(&page_id) {
283                let page = Page::from_bytes(page_id, data.clone())?;
284                if let Some(expected_checksum) = state.page_checksums.get(&page_id) {
285                    let actual_checksum = Self::calculate_page_checksum(&page);
286                    if actual_checksum != *expected_checksum {
287                        return Err(crate::error::HematiteError::CorruptedData(format!(
288                            "WAL page checksum mismatch for page {}: expected {}, got {}",
289                            page_id, expected_checksum, actual_checksum
290                        )));
291                    }
292                }
293                self.buffer_pool.put(page.clone());
294                return Ok(page);
295            }
296        }
297
298        let page = self.file_manager.read_page(page_id)?;
299        let expected_checksum = self
300            .wal_read_snapshot
301            .as_ref()
302            .or(self.latest_wal_state.as_ref())
303            .and_then(|state| state.page_checksums.get(&page_id))
304            .or_else(|| self.page_checksums.get(&page_id));
305        if let Some(expected_checksum) = expected_checksum {
306            let actual_checksum = Self::calculate_page_checksum(&page);
307            if actual_checksum != *expected_checksum {
308                return Err(crate::error::HematiteError::CorruptedData(format!(
309                    "Page checksum mismatch for page {}: expected {}, got {}",
310                    page_id, expected_checksum, actual_checksum
311                )));
312            }
313        }
314        self.buffer_pool.put(page.clone());
315        Ok(page)
316    }
317
318    pub fn write_page(&mut self, page: Page) -> Result<()> {
319        let page_id = page.id;
320        self.snapshot_original_page(page_id)?;
321        if page_id != STORAGE_METADATA_PAGE_ID {
322            self.page_checksums
323                .insert(page_id, Self::calculate_page_checksum(&page));
324        }
325        self.buffer_pool.put(page);
326        self.dirty_pages.insert(page_id);
327        Ok(())
328    }
329
330    pub fn allocate_page(&mut self) -> Result<PageId> {
331        if self.journal_mode == JournalMode::Wal {
332            if let Some(transaction) = &mut self.transaction {
333                if let Some(page_id) = transaction.wal_free_pages.pop() {
334                    return Ok(page_id);
335                }
336                let page_id = transaction.wal_next_page_id;
337                transaction.wal_next_page_id += 1;
338                return Ok(page_id);
339            }
340        }
341        self.file_manager.allocate_page()
342    }
343
344    pub fn deallocate_page(&mut self, page_id: PageId) -> Result<()> {
345        self.snapshot_original_page(page_id)?;
346        self.buffer_pool.remove(page_id);
347        self.dirty_pages.remove(&page_id);
348        self.page_checksums.remove(&page_id);
349        if self.journal_mode == JournalMode::Wal {
350            if let Some(transaction) = &mut self.transaction {
351                if !transaction.wal_free_pages.contains(&page_id) {
352                    transaction.wal_free_pages.push(page_id);
353                }
354                compact_transaction_free_pages(transaction);
355                return Ok(());
356            }
357            self.file_manager.deallocate_page_deferred(page_id);
358            Ok(())
359        } else {
360            self.file_manager.deallocate_page(page_id)
361        }
362    }
363
364    pub fn flush(&mut self) -> Result<()> {
365        if self.journal_mode == JournalMode::Wal && self.transaction.is_some() {
366            return Err(crate::error::HematiteError::StorageError(
367                "Cannot flush pager pages directly during an active WAL transaction".to_string(),
368            ));
369        }
370
371        let dirty_ids = self.dirty_pages.iter().copied().collect::<Vec<_>>();
372        let mut metadata_page_dirty = false;
373
374        for page_id in dirty_ids.iter().copied() {
375            if page_id == STORAGE_METADATA_PAGE_ID {
376                metadata_page_dirty = true;
377                continue;
378            }
379
380            if let Some(page) = self.buffer_pool.get(page_id) {
381                self.file_manager.write_page(page)?;
382            }
383            self.dirty_pages.remove(&page_id);
384        }
385
386        // Metadata is written last so it cannot describe page state that has not reached disk.
387        if metadata_page_dirty {
388            if let Some(page) = self.buffer_pool.get(STORAGE_METADATA_PAGE_ID) {
389                self.file_manager.write_page(page)?;
390            }
391            self.dirty_pages.remove(&STORAGE_METADATA_PAGE_ID);
392        }
393        self.file_manager.flush()?;
394        self.persist_checksums()
395    }
396
397    pub fn begin_transaction(&mut self) -> Result<()> {
398        if self.transaction.is_some() {
399            return Err(crate::error::HematiteError::StorageError(
400                "Pager transaction is already active".to_string(),
401            ));
402        }
403
404        self.acquire_write_lock()?;
405
406        let transaction = PagerTransaction {
407            original_file_len: self.file_manager.file_len()?,
408            original_free_pages: self.file_manager.free_pages().to_vec(),
409            original_checksums: self.page_checksums.clone(),
410            wal_next_page_id: self.file_manager.next_page_id(),
411            wal_free_pages: self.file_manager.free_pages().to_vec(),
412            journaled_pages: HashSet::new(),
413            page_records: Vec::new(),
414        };
415        self.transaction = Some(transaction);
416        if self.journal_mode == JournalMode::Rollback {
417            self.persist_journal(JournalState::Active)?;
418        }
419        Ok(())
420    }
421
422    pub fn commit_transaction(&mut self) -> Result<()> {
423        if self.transaction.is_none() {
424            return Err(crate::error::HematiteError::StorageError(
425                "Pager transaction is not active".to_string(),
426            ));
427        }
428
429        if self.journal_mode == JournalMode::Wal {
430            self.commit_wal_transaction()?;
431            if self.can_checkpoint_wal()? {
432                self.checkpoint_wal_unlocked()?;
433            }
434        } else {
435            self.flush()?;
436            self.persist_journal(JournalState::Committed)?;
437        }
438        self.remove_journal_file()?;
439        self.transaction = None;
440        self.release_write_lock()?;
441        Ok(())
442    }
443
444    pub fn rollback_transaction(&mut self) -> Result<()> {
445        if self.transaction.is_none() {
446            return Err(crate::error::HematiteError::StorageError(
447                "Pager transaction is not active".to_string(),
448            ));
449        }
450
451        if self.journal_mode == JournalMode::Wal {
452            self.rollback_wal_transaction()?;
453        } else {
454            self.rollback_from_active_transaction()?;
455            self.remove_journal_file()?;
456        }
457        self.transaction = None;
458        self.release_write_lock()?;
459        Ok(())
460    }
461
462    pub fn transaction_active(&self) -> bool {
463        self.transaction.is_some()
464    }
465
466    pub(crate) fn snapshot(&self) -> Result<PagerSnapshot> {
467        Ok(PagerSnapshot {
468            file_manager: self.file_manager.snapshot()?,
469            buffer_pool: self.buffer_pool.clone(),
470            dirty_pages: self.dirty_pages.clone(),
471            page_checksums: self.page_checksums.clone(),
472            transaction: self.transaction.clone(),
473        })
474    }
475
476    pub(crate) fn restore_snapshot(&mut self, snapshot: PagerSnapshot) -> Result<()> {
477        self.file_manager.restore_snapshot(snapshot.file_manager)?;
478        self.buffer_pool = snapshot.buffer_pool;
479        self.dirty_pages = snapshot.dirty_pages;
480        self.page_checksums = snapshot.page_checksums;
481        self.transaction = snapshot.transaction;
482        Ok(())
483    }
484
485    pub fn begin_read(&mut self) -> Result<()> {
486        let previous_lock_mode = self.lock_mode;
487        self.acquire_shared_lock()?;
488        if let Err(err) = self.refresh_persisted_view() {
489            let _ = self.release_shared_lock();
490            return Err(err);
491        }
492        if self.journal_mode == JournalMode::Wal {
493            if matches!(previous_lock_mode, PagerLockMode::Write) {
494                return Ok(());
495            }
496            if matches!(previous_lock_mode, PagerLockMode::Shared { .. }) {
497                return Ok(());
498            }
499            let snapshot = self.snapshot_wal_visible_state()?;
500            self.register_wal_reader_sequence(snapshot.visible_sequence)?;
501            self.wal_read_snapshot = Some(snapshot);
502        }
503        Ok(())
504    }
505
506    pub fn end_read(&mut self) -> Result<()> {
507        if matches!(self.lock_mode, PagerLockMode::Shared { depth: 1 }) {
508            if let Some(snapshot) = &self.wal_read_snapshot {
509                self.unregister_wal_reader_sequence(snapshot.visible_sequence)?;
510            }
511        }
512        self.wal_read_snapshot = None;
513        self.release_shared_lock()
514    }
515
516    pub fn free_pages(&self) -> &[PageId] {
517        self.file_manager.free_pages()
518    }
519
520    pub fn set_free_pages(&mut self, free_pages: Vec<PageId>) {
521        self.file_manager.set_free_pages(free_pages);
522    }
523
524    pub fn checksum_entries(&self) -> Vec<(PageId, u32)> {
525        self.page_checksums
526            .iter()
527            .map(|(page_id, checksum)| (*page_id, *checksum))
528            .collect()
529    }
530
531    pub fn journal_mode(&self) -> JournalMode {
532        self.journal_mode
533    }
534
535    pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
536        if self.transaction.is_some() {
537            return Err(crate::error::HematiteError::StorageError(
538                "Cannot change pager journal mode during an active transaction".to_string(),
539            ));
540        }
541        if self.journal_mode == journal_mode {
542            return Ok(());
543        }
544        if self.journal_mode == JournalMode::Wal && journal_mode == JournalMode::Rollback {
545            if !self.can_checkpoint_wal()? {
546                return Err(crate::error::HematiteError::StorageError(
547                    "Cannot switch from WAL while readers are active".to_string(),
548                ));
549            }
550            self.checkpoint_wal_unlocked()?;
551        }
552        if journal_mode == JournalMode::Rollback {
553            self.remove_wal_file()?;
554            self.latest_wal_state = None;
555            self.wal_read_snapshot = None;
556        } else {
557            self.remove_journal_file()?;
558        }
559        self.journal_mode = journal_mode;
560        if journal_mode == JournalMode::Wal {
561            self.load_latest_wal_state()?;
562        }
563        self.persist_checksums()
564    }
565
566    pub fn checkpoint_wal(&mut self) -> Result<()> {
567        if self.journal_mode != JournalMode::Wal {
568            return Ok(());
569        }
570        if self.transaction.is_some() {
571            return Err(crate::error::HematiteError::StorageError(
572                "Cannot checkpoint WAL during an active transaction".to_string(),
573            ));
574        }
575        if !self.can_checkpoint_wal()? {
576            return Err(crate::error::HematiteError::StorageError(
577                "Cannot checkpoint WAL while readers are active".to_string(),
578            ));
579        }
580        self.checkpoint_wal_unlocked()
581    }
582
583    pub fn replace_checksums(&mut self, checksums: HashMap<PageId, u32>) {
584        self.page_checksums = checksums;
585    }
586
587    pub fn file_len(&self) -> Result<u64> {
588        self.file_manager.file_len()
589    }
590
591    pub fn allocated_page_count(&self) -> usize {
592        self.file_manager.allocated_page_count()
593    }
594
595    pub fn fragmented_free_page_count(&self) -> usize {
596        self.file_manager.fragmented_free_page_count()
597    }
598
599    pub fn trailing_free_page_count(&self) -> usize {
600        self.file_manager.trailing_free_page_count()
601    }
602
603    pub fn validate_integrity(&mut self) -> Result<PagerIntegrityReport> {
604        let (max_page_id_exclusive, logical_free_pages, logical_checksums, wal_overrides) =
605            if let Some(state) = &self.latest_wal_state {
606                let page_regions =
607                    state.file_len.saturating_sub(64) / crate::storage::PAGE_SIZE as u64;
608                (
609                    (page_regions as u32).max(2),
610                    state.free_pages.clone(),
611                    state.page_checksums.clone(),
612                    state.page_overrides.clone(),
613                )
614            } else {
615                (
616                    self.file_manager.next_page_id(),
617                    self.file_manager.free_pages().to_vec(),
618                    self.page_checksums.clone(),
619                    HashMap::new(),
620                )
621            };
622
623        let mut free_pages = HashSet::new();
624
625        for &page_id in &logical_free_pages {
626            if page_id == DB_HEADER_PAGE_ID || page_id == STORAGE_METADATA_PAGE_ID {
627                return Err(crate::error::HematiteError::CorruptedData(format!(
628                    "Reserved page {} cannot be marked free",
629                    page_id
630                )));
631            }
632
633            if page_id >= max_page_id_exclusive {
634                return Err(crate::error::HematiteError::CorruptedData(format!(
635                    "Free page {} exceeds allocated page range (next_page_id={})",
636                    page_id, max_page_id_exclusive
637                )));
638            }
639
640            if !free_pages.insert(page_id) {
641                return Err(crate::error::HematiteError::CorruptedData(format!(
642                    "Duplicate free page {} detected",
643                    page_id
644                )));
645            }
646        }
647
648        if logical_checksums.contains_key(&STORAGE_METADATA_PAGE_ID) {
649            return Err(crate::error::HematiteError::CorruptedData(format!(
650                "Storage metadata page {} must not have pager checksum metadata",
651                STORAGE_METADATA_PAGE_ID
652            )));
653        }
654
655        let checksummed_pages = logical_checksums.into_iter().collect::<Vec<_>>();
656        let checksummed_page_count = checksummed_pages.len();
657
658        let mut verified_checksum_pages = 0usize;
659        for (page_id, expected_checksum) in checksummed_pages {
660            if page_id >= max_page_id_exclusive {
661                return Err(crate::error::HematiteError::CorruptedData(format!(
662                    "Checksum entry for page {} exceeds allocated page range (next_page_id={})",
663                    page_id, max_page_id_exclusive
664                )));
665            }
666
667            if free_pages.contains(&page_id) {
668                return Err(crate::error::HematiteError::CorruptedData(format!(
669                    "Page {} has checksum metadata but is marked free",
670                    page_id
671                )));
672            }
673
674            let page = if self.dirty_pages.contains(&page_id) {
675                self.buffer_pool.get(page_id).cloned().ok_or_else(|| {
676                    crate::error::HematiteError::StorageError(format!(
677                        "Dirty page {} missing from buffer pool",
678                        page_id
679                    ))
680                })?
681            } else if let Some(data) = wal_overrides.get(&page_id) {
682                Page::from_bytes(page_id, data.clone())?
683            } else {
684                self.file_manager.read_page(page_id)?
685            };
686
687            let actual_checksum = Self::calculate_page_checksum(&page);
688            if actual_checksum != expected_checksum {
689                return Err(crate::error::HematiteError::CorruptedData(format!(
690                    "Page checksum mismatch for page {}: expected {}, got {}",
691                    page_id, expected_checksum, actual_checksum
692                )));
693            }
694
695            verified_checksum_pages += 1;
696        }
697
698        Ok(PagerIntegrityReport {
699            allocated_page_count: self.file_manager.allocated_page_count(),
700            free_page_count: free_pages.len(),
701            fragmented_free_page_count: self.file_manager.fragmented_free_page_count(),
702            trailing_free_page_count: self.file_manager.trailing_free_page_count(),
703            checksummed_page_count,
704            verified_checksum_pages,
705        })
706    }
707
708    fn calculate_page_checksum(page: &Page) -> u32 {
709        let mut hash: u32 = 0x811C9DC5;
710        for byte in &page.data {
711            hash ^= u32::from(*byte);
712            hash = hash.wrapping_mul(0x01000193);
713        }
714        hash
715    }
716
717    #[cfg(test)]
718    pub(crate) fn dirty_page_count(&self) -> usize {
719        self.dirty_pages.len()
720    }
721
722    #[cfg(test)]
723    pub(crate) fn wal_snapshot_sequence(&self) -> Option<u64> {
724        self.wal_read_snapshot
725            .as_ref()
726            .map(|snapshot| snapshot.visible_sequence)
727    }
728
729    fn checksum_store_path(db_path: &Path) -> PathBuf {
730        let mut file_name = db_path
731            .file_name()
732            .map(OsString::from)
733            .unwrap_or_else(|| OsString::from("hematite.db"));
734        file_name.push(".pager_checksums");
735        match db_path.parent() {
736            Some(parent) => parent.join(file_name),
737            None => PathBuf::from(file_name),
738        }
739    }
740
741    fn acquire_shared_lock(&mut self) -> Result<()> {
742        if self.database_identity.is_none() {
743            return Ok(());
744        }
745
746        match self.lock_mode {
747            PagerLockMode::Write if self.journal_mode == JournalMode::Wal => return Ok(()),
748            PagerLockMode::Write => return Ok(()),
749            PagerLockMode::Shared { depth } => {
750                self.lock_mode = PagerLockMode::Shared { depth: depth + 1 };
751                return Ok(());
752            }
753            PagerLockMode::None => {}
754        }
755
756        let path = self.database_identity_path()?.clone();
757        let mut registry = self.lock_registry_map()?;
758        let entry = registry.entry(path).or_default();
759        if entry.writer && self.journal_mode == JournalMode::Rollback {
760            return Err(crate::error::HematiteError::StorageError(
761                "database is locked for writing".to_string(),
762            ));
763        }
764        entry.readers += 1;
765        self.lock_mode = PagerLockMode::Shared { depth: 1 };
766        Ok(())
767    }
768
769    fn release_shared_lock(&mut self) -> Result<()> {
770        let Some(path) = self.database_identity.as_ref() else {
771            return Ok(());
772        };
773
774        match self.lock_mode {
775            PagerLockMode::Write | PagerLockMode::None => return Ok(()),
776            PagerLockMode::Shared { depth } if depth > 1 => {
777                self.lock_mode = PagerLockMode::Shared { depth: depth - 1 };
778                return Ok(());
779            }
780            PagerLockMode::Shared { .. } => {}
781        }
782
783        let mut registry = self.lock_registry_map()?;
784        if let Some(entry) = registry.get_mut(path) {
785            entry.readers = entry.readers.saturating_sub(1);
786            if entry.readers == 0 && !entry.writer {
787                registry.remove(path);
788            }
789        }
790        self.lock_mode = PagerLockMode::None;
791        Ok(())
792    }
793
794    fn register_wal_reader_sequence(&self, sequence: u64) -> Result<()> {
795        let Some(path) = self.database_identity.as_ref() else {
796            return Ok(());
797        };
798        let mut registry = self.lock_registry_map()?;
799        let entry = registry.entry(path.clone()).or_default();
800        *entry.wal_reader_sequences.entry(sequence).or_insert(0) += 1;
801        Ok(())
802    }
803
804    fn unregister_wal_reader_sequence(&self, sequence: u64) -> Result<()> {
805        let Some(path) = self.database_identity.as_ref() else {
806            return Ok(());
807        };
808        let mut registry = self.lock_registry_map()?;
809        if let Some(entry) = registry.get_mut(path) {
810            if let Some(count) = entry.wal_reader_sequences.get_mut(&sequence) {
811                *count = count.saturating_sub(1);
812                if *count == 0 {
813                    entry.wal_reader_sequences.remove(&sequence);
814                }
815            }
816            if entry.readers == 0 && !entry.writer && entry.wal_reader_sequences.is_empty() {
817                registry.remove(path);
818            }
819        }
820        Ok(())
821    }
822
823    fn acquire_write_lock(&mut self) -> Result<()> {
824        if self.database_identity.is_none() {
825            self.lock_mode = PagerLockMode::Write;
826            return Ok(());
827        }
828        if self.lock_mode == PagerLockMode::Write {
829            return Ok(());
830        }
831        if matches!(self.lock_mode, PagerLockMode::Shared { .. }) {
832            return Err(crate::error::HematiteError::StorageError(
833                "cannot upgrade a shared database lock to a write lock".to_string(),
834            ));
835        }
836
837        let path = self.database_identity_path()?.clone();
838        let mut registry = self.lock_registry_map()?;
839        let entry = registry.entry(path).or_default();
840        if entry.writer || (self.journal_mode == JournalMode::Rollback && entry.readers > 0) {
841            return Err(crate::error::HematiteError::StorageError(
842                "database is locked".to_string(),
843            ));
844        }
845        entry.writer = true;
846        self.lock_mode = PagerLockMode::Write;
847        Ok(())
848    }
849
850    fn release_write_lock(&mut self) -> Result<()> {
851        let Some(path) = self.database_identity.as_ref() else {
852            self.lock_mode = PagerLockMode::None;
853            return Ok(());
854        };
855        if self.lock_mode != PagerLockMode::Write {
856            return Ok(());
857        }
858
859        let mut registry = self.lock_registry_map()?;
860        if let Some(entry) = registry.get_mut(path) {
861            entry.writer = false;
862            if entry.readers == 0 {
863                registry.remove(path);
864            }
865        }
866        self.lock_mode = PagerLockMode::None;
867        Ok(())
868    }
869
870    fn journal_path(db_path: &Path) -> PathBuf {
871        let mut file_name = db_path
872            .file_name()
873            .map(OsString::from)
874            .unwrap_or_else(|| OsString::from("hematite.db"));
875        file_name.push(".journal");
876        match db_path.parent() {
877            Some(parent) => parent.join(file_name),
878            None => PathBuf::from(file_name),
879        }
880    }
881
882    fn wal_path(db_path: &Path) -> PathBuf {
883        let mut file_name = db_path
884            .file_name()
885            .map(OsString::from)
886            .unwrap_or_else(|| OsString::from("hematite.db"));
887        file_name.push(".wal");
888        match db_path.parent() {
889            Some(parent) => parent.join(file_name),
890            None => PathBuf::from(file_name),
891        }
892    }
893
894    fn load_persisted_state(&mut self) -> Result<()> {
895        let Some(path) = &self.checksum_store_path else {
896            return Ok(());
897        };
898
899        let contents = match fs::read_to_string(path) {
900            Ok(contents) => contents,
901            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
902            Err(err) => return Err(err.into()),
903        };
904
905        let mut lines = contents.lines();
906        let version = lines
907            .next()
908            .ok_or_else(|| {
909                crate::error::HematiteError::StorageError(
910                    "Missing pager checksum metadata version".to_string(),
911                )
912            })?
913            .strip_prefix("version=")
914            .ok_or_else(|| {
915                crate::error::HematiteError::StorageError(
916                    "Pager checksum metadata is missing version prefix".to_string(),
917                )
918            })?
919            .parse::<u32>()
920            .map_err(|_| {
921                crate::error::HematiteError::StorageError(
922                    "Invalid pager checksum metadata version".to_string(),
923                )
924            })?;
925
926        if version != Self::CHECKSUM_METADATA_VERSION {
927            return Err(crate::error::HematiteError::StorageError(format!(
928                "Unsupported pager checksum metadata version: expected {}, got {}",
929                Self::CHECKSUM_METADATA_VERSION,
930                version
931            )));
932        }
933
934        let mut next_line = lines.next().ok_or_else(|| {
935            crate::error::HematiteError::StorageError(
936                "Missing pager freelist metadata count".to_string(),
937            )
938        })?;
939
940        if let Some(mode) = next_line.strip_prefix("journal_mode=") {
941            self.journal_mode = JournalMode::parse(mode)?;
942            next_line = lines.next().ok_or_else(|| {
943                crate::error::HematiteError::StorageError(
944                    "Missing pager freelist metadata count".to_string(),
945                )
946            })?;
947        } else {
948            self.journal_mode = JournalMode::Rollback;
949        }
950
951        let expected_free_count = next_line
952            .strip_prefix("free_count=")
953            .ok_or_else(|| {
954                crate::error::HematiteError::StorageError(
955                    "Pager freelist metadata is missing count prefix".to_string(),
956                )
957            })?
958            .parse::<usize>()
959            .map_err(|_| {
960                crate::error::HematiteError::StorageError(
961                    "Invalid pager freelist metadata count".to_string(),
962                )
963            })?;
964
965        let mut free_pages = Vec::with_capacity(expected_free_count);
966        for _ in 0..expected_free_count {
967            let line = lines.next().ok_or_else(|| {
968                crate::error::HematiteError::StorageError(
969                    "Pager freelist metadata ended early".to_string(),
970                )
971            })?;
972            let page_id = line
973                .strip_prefix("free|")
974                .ok_or_else(|| {
975                    crate::error::HematiteError::StorageError(
976                        "Invalid pager freelist metadata record".to_string(),
977                    )
978                })?
979                .parse::<u32>()
980                .map(|page_id| page_id)
981                .map_err(|_| {
982                    crate::error::HematiteError::StorageError(
983                        "Invalid pager freelist page id".to_string(),
984                    )
985                })?;
986            free_pages.push(page_id);
987        }
988
989        let expected_count = lines
990            .next()
991            .ok_or_else(|| {
992                crate::error::HematiteError::StorageError(
993                    "Missing pager checksum metadata count".to_string(),
994                )
995            })?
996            .strip_prefix("checksum_count=")
997            .ok_or_else(|| {
998                crate::error::HematiteError::StorageError(
999                    "Pager checksum metadata is missing count prefix".to_string(),
1000                )
1001            })?
1002            .parse::<usize>()
1003            .map_err(|_| {
1004                crate::error::HematiteError::StorageError(
1005                    "Invalid pager checksum metadata count".to_string(),
1006                )
1007            })?;
1008
1009        let mut checksums = HashMap::new();
1010        for line in lines {
1011            if line.is_empty() {
1012                continue;
1013            }
1014            let payload = line.strip_prefix("checksum|").ok_or_else(|| {
1015                crate::error::HematiteError::StorageError(
1016                    "Invalid pager checksum metadata record".to_string(),
1017                )
1018            })?;
1019            let parts = payload.split('|').collect::<Vec<_>>();
1020            if parts.len() != 2 {
1021                return Err(crate::error::HematiteError::StorageError(
1022                    "Invalid pager checksum metadata record".to_string(),
1023                ));
1024            }
1025            let page_id = parts[0].parse::<u32>().map_err(|_| {
1026                crate::error::HematiteError::StorageError(
1027                    "Invalid pager checksum page id".to_string(),
1028                )
1029            })?;
1030            let checksum = parts[1].parse::<u32>().map_err(|_| {
1031                crate::error::HematiteError::StorageError(
1032                    "Invalid pager checksum value".to_string(),
1033                )
1034            })?;
1035            if checksums.insert(page_id, checksum).is_some() {
1036                return Err(crate::error::HematiteError::StorageError(format!(
1037                    "Duplicate pager checksum entry for page {}",
1038                    page_id
1039                )));
1040            }
1041        }
1042
1043        if checksums.len() != expected_count {
1044            return Err(crate::error::HematiteError::StorageError(format!(
1045                "Pager checksum metadata count mismatch: expected {}, got {}",
1046                expected_count,
1047                checksums.len()
1048            )));
1049        }
1050
1051        self.file_manager.set_free_pages(free_pages);
1052        self.page_checksums = checksums;
1053        Ok(())
1054    }
1055
1056    fn refresh_persisted_view(&mut self) -> Result<()> {
1057        if self.transaction.is_some() || !self.dirty_pages.is_empty() {
1058            return Ok(());
1059        }
1060
1061        self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1062        self.load_persisted_state()?;
1063        self.load_latest_wal_state()
1064    }
1065
1066    fn snapshot_wal_visible_state(&mut self) -> Result<VisibleWalState> {
1067        if let Some(state) = &self.latest_wal_state {
1068            return Ok(state.clone());
1069        }
1070
1071        Ok(VisibleWalState {
1072            visible_sequence: 0,
1073            file_len: self.file_manager.file_len()?,
1074            free_pages: self.file_manager.free_pages().to_vec(),
1075            page_checksums: self.page_checksums.clone(),
1076            page_overrides: HashMap::new(),
1077        })
1078    }
1079
1080    fn load_latest_wal_state(&mut self) -> Result<()> {
1081        if self.journal_mode != JournalMode::Wal {
1082            self.latest_wal_state = None;
1083            return Ok(());
1084        }
1085
1086        let Some(path) = &self.wal_path else {
1087            self.latest_wal_state = None;
1088            return Ok(());
1089        };
1090
1091        self.latest_wal_state = WalRecord::load_visible_state_from_path(path)?;
1092        Ok(())
1093    }
1094
1095    fn persist_checksums(&self) -> Result<()> {
1096        let Some(path) = &self.checksum_store_path else {
1097            return Ok(());
1098        };
1099
1100        let mut entries = self
1101            .page_checksums
1102            .iter()
1103            .map(|(page_id, checksum)| (*page_id, *checksum))
1104            .collect::<Vec<_>>();
1105        entries.sort_by_key(|(page_id, _)| *page_id);
1106
1107        let mut lines = vec![
1108            format!("version={}", Self::CHECKSUM_METADATA_VERSION),
1109            format!("journal_mode={}", self.journal_mode.as_str()),
1110            format!("free_count={}", self.file_manager.free_pages().len()),
1111        ];
1112        for page_id in self.file_manager.free_pages() {
1113            lines.push(format!("free|{}", page_id));
1114        }
1115        lines.push(format!("checksum_count={}", entries.len()));
1116        for (page_id, checksum) in entries {
1117            lines.push(format!("checksum|{}|{}", page_id, checksum));
1118        }
1119
1120        fs::write(path, lines.join("\n"))?;
1121        Ok(())
1122    }
1123
1124    fn snapshot_original_page(&mut self, page_id: PageId) -> Result<()> {
1125        if self.journal_mode == JournalMode::Wal {
1126            return Ok(());
1127        }
1128
1129        let Some(transaction) = &mut self.transaction else {
1130            return Ok(());
1131        };
1132
1133        if transaction.journaled_pages.contains(&page_id) {
1134            return Ok(());
1135        }
1136
1137        let page_end = 64 + ((page_id as u64 + 1) * crate::storage::PAGE_SIZE as u64);
1138        if page_end > transaction.original_file_len {
1139            return Ok(());
1140        }
1141
1142        let page = self.file_manager.read_page(page_id)?;
1143        transaction.page_records.push(JournalRecord {
1144            page_id,
1145            data: page.data,
1146        });
1147        transaction.journaled_pages.insert(page_id);
1148        self.persist_journal(JournalState::Active)
1149    }
1150
1151    fn persist_journal(&self, state: JournalState) -> Result<()> {
1152        let Some(transaction) = &self.transaction else {
1153            return Ok(());
1154        };
1155        let Some(path) = &self.journal_path else {
1156            return Ok(());
1157        };
1158
1159        let journal = RollbackJournal {
1160            state,
1161            original_file_len: transaction.original_file_len,
1162            original_free_pages: transaction.original_free_pages.clone(),
1163            original_checksums: transaction
1164                .original_checksums
1165                .iter()
1166                .map(|(page_id, checksum)| (*page_id, *checksum))
1167                .collect(),
1168            page_records: transaction.page_records.clone(),
1169        };
1170        let bytes = journal.encode()?;
1171        let mut file = OpenOptions::new()
1172            .create(true)
1173            .write(true)
1174            .truncate(true)
1175            .open(path)?;
1176        file.write_all(&bytes)?;
1177        file.sync_all()?;
1178        Ok(())
1179    }
1180
1181    fn remove_journal_file(&self) -> Result<()> {
1182        let Some(path) = &self.journal_path else {
1183            return Ok(());
1184        };
1185        match fs::remove_file(path) {
1186            Ok(()) => Ok(()),
1187            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
1188            Err(err) => Err(err.into()),
1189        }
1190    }
1191
1192    fn remove_wal_file(&self) -> Result<()> {
1193        let Some(path) = &self.wal_path else {
1194            return Ok(());
1195        };
1196        match fs::remove_file(path) {
1197            Ok(()) => Ok(()),
1198            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
1199            Err(err) => Err(err.into()),
1200        }
1201    }
1202
1203    fn recover_if_needed(&mut self) -> Result<()> {
1204        let Some(path) = &self.journal_path else {
1205            return Ok(());
1206        };
1207        let bytes = match fs::read(path) {
1208            Ok(bytes) => bytes,
1209            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
1210            Err(err) => return Err(err.into()),
1211        };
1212
1213        let journal = RollbackJournal::decode(&bytes)?;
1214        match journal.state {
1215            JournalState::Active => {
1216                self.restore_from_journal(&journal)?;
1217                self.remove_journal_file()?;
1218            }
1219            JournalState::Committed => {
1220                self.remove_journal_file()?;
1221            }
1222        }
1223        Ok(())
1224    }
1225
1226    fn rollback_from_active_transaction(&mut self) -> Result<()> {
1227        let transaction = self.transaction.clone().ok_or_else(|| {
1228            crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1229        })?;
1230        let journal = RollbackJournal {
1231            state: JournalState::Active,
1232            original_file_len: transaction.original_file_len,
1233            original_free_pages: transaction.original_free_pages,
1234            original_checksums: transaction
1235                .original_checksums
1236                .into_iter()
1237                .collect::<Vec<_>>(),
1238            page_records: transaction.page_records,
1239        };
1240        self.restore_from_journal(&journal)
1241    }
1242
1243    fn restore_from_journal(&mut self, journal: &RollbackJournal) -> Result<()> {
1244        self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1245        self.dirty_pages.clear();
1246        self.file_manager
1247            .restore_file_len(journal.original_file_len)?;
1248        self.file_manager
1249            .set_free_pages(journal.original_free_pages.clone());
1250
1251        for record in &journal.page_records {
1252            let page = Page::from_bytes(record.page_id, record.data.clone())?;
1253            self.file_manager.write_page(&page)?;
1254        }
1255        self.file_manager.flush()?;
1256
1257        self.page_checksums = journal.original_checksums.iter().copied().collect();
1258        self.persist_checksums()
1259    }
1260
1261    fn rollback_wal_transaction(&mut self) -> Result<()> {
1262        let transaction = self.transaction.clone().ok_or_else(|| {
1263            crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1264        })?;
1265        self.buffer_pool = BufferPool::new(self.buffer_pool_capacity);
1266        self.dirty_pages.clear();
1267        self.page_checksums = transaction.original_checksums;
1268        self.load_latest_wal_state()
1269    }
1270
1271    fn commit_wal_transaction(&mut self) -> Result<()> {
1272        let transaction = self.transaction.as_ref().ok_or_else(|| {
1273            crate::error::HematiteError::StorageError("Pager transaction is not active".to_string())
1274        })?;
1275        let next_sequence = self
1276            .latest_wal_state
1277            .as_ref()
1278            .map(|state| state.visible_sequence + 1)
1279            .unwrap_or(1);
1280
1281        let mut page_ids = self.dirty_pages.iter().copied().collect::<Vec<_>>();
1282        page_ids.sort_unstable();
1283
1284        let mut frames = Vec::with_capacity(page_ids.len());
1285        for page_id in page_ids {
1286            let page = self.buffer_pool.get(page_id).cloned().ok_or_else(|| {
1287                crate::error::HematiteError::StorageError(format!(
1288                    "Dirty page {} missing from buffer pool",
1289                    page_id
1290                ))
1291            })?;
1292            frames.push(WalFrame {
1293                page_id,
1294                data: page.data,
1295            });
1296        }
1297
1298        let mut checksums = self
1299            .page_checksums
1300            .iter()
1301            .map(|(page_id, checksum)| (*page_id, *checksum))
1302            .collect::<Vec<_>>();
1303        checksums.sort_by_key(|(page_id, _)| *page_id);
1304
1305        let record = WalRecord {
1306            sequence: next_sequence,
1307            file_len: 64 + transaction.wal_next_page_id as u64 * crate::storage::PAGE_SIZE as u64,
1308            free_pages: transaction.wal_free_pages.clone(),
1309            checksums,
1310            frames,
1311        };
1312
1313        self.append_wal_record(record)?;
1314        self.dirty_pages.clear();
1315        self.persist_checksums()
1316    }
1317
1318    fn append_wal_record(&mut self, record: WalRecord) -> Result<()> {
1319        if let Some(path) = &self.wal_path {
1320            WalRecord::append_to_path(path, &record)?;
1321        } else {
1322            self.latest_wal_state = Some(VisibleWalState {
1323                visible_sequence: record.sequence,
1324                file_len: record.file_len,
1325                free_pages: record.free_pages.clone(),
1326                page_checksums: record.checksums.iter().copied().collect(),
1327                page_overrides: record
1328                    .frames
1329                    .iter()
1330                    .map(|frame| (frame.page_id, frame.data.clone()))
1331                    .collect(),
1332            });
1333            return Ok(());
1334        }
1335
1336        self.load_latest_wal_state()
1337    }
1338
1339    fn can_checkpoint_wal(&self) -> Result<bool> {
1340        if self.database_identity.is_none() {
1341            return Ok(true);
1342        }
1343
1344        let path = self.database_identity_path()?;
1345        let registry = self.lock_registry_map()?;
1346        let Some(entry) = registry.get(path) else {
1347            return Ok(true);
1348        };
1349
1350        if entry.writer && self.lock_mode != PagerLockMode::Write {
1351            return Ok(false);
1352        }
1353        if entry.readers == 0 {
1354            return Ok(true);
1355        }
1356        let latest_sequence = self
1357            .latest_wal_state
1358            .as_ref()
1359            .map(|state| state.visible_sequence)
1360            .unwrap_or(0);
1361        Ok(entry
1362            .wal_reader_sequences
1363            .keys()
1364            .all(|sequence| *sequence == latest_sequence))
1365    }
1366
1367    fn checkpoint_wal_unlocked(&mut self) -> Result<()> {
1368        let Some(state) = self.latest_wal_state.clone() else {
1369            self.remove_wal_file()?;
1370            return Ok(());
1371        };
1372
1373        self.file_manager.restore_file_len(state.file_len)?;
1374        self.file_manager.set_free_pages(state.free_pages.clone());
1375        self.file_manager.compact_free_pages()?;
1376        for (page_id, data) in &state.page_overrides {
1377            let page = Page::from_bytes(*page_id, data.clone())?;
1378            self.file_manager.write_page(&page)?;
1379        }
1380        self.file_manager.flush()?;
1381        self.page_checksums = state.page_checksums;
1382        self.latest_wal_state = None;
1383        self.wal_read_snapshot = None;
1384        self.remove_wal_file()?;
1385        self.persist_checksums()
1386    }
1387}
1388
1389impl Drop for Pager {
1390    fn drop(&mut self) {
1391        match self.lock_mode {
1392            PagerLockMode::Write => {
1393                let _ = self.release_write_lock();
1394            }
1395            PagerLockMode::Shared { .. } => {
1396                let _ = self.release_shared_lock();
1397            }
1398            PagerLockMode::None => {}
1399        }
1400    }
1401}