Skip to main content

limbo_core/storage/
pager.rs

1use crate::fast_lock::SpinLock;
2use crate::result::LimboResult;
3use crate::storage::btree::BTreePageInner;
4use crate::storage::buffer_pool::BufferPool;
5use crate::storage::database::DatabaseStorage;
6use crate::storage::sqlite3_ondisk::{
7    self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID, DATABASE_HEADER_SIZE,
8};
9use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
10use crate::types::CursorResult;
11use crate::Completion;
12use crate::{Buffer, LimboError, Result};
13use parking_lot::RwLock;
14use std::cell::{Cell, RefCell, UnsafeCell};
15use std::collections::HashSet;
16use std::rc::Rc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::Arc;
19use tracing::trace;
20
21use super::btree::BTreePage;
22use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
23use super::wal::{CheckpointMode, CheckpointStatus};
24
25#[cfg(not(feature = "omit_autovacuum"))]
26use {crate::io::Buffer as IoBuffer, ptrmap::*};
27
28pub struct PageInner {
29    pub flags: AtomicUsize,
30    pub contents: Option<PageContent>,
31    pub id: usize,
32}
33
34#[derive(Debug)]
35pub struct Page {
36    pub inner: UnsafeCell<PageInner>,
37}
38
39// Concurrency control of pages will be handled by the pager, we won't wrap Page with RwLock
40// because that is bad bad.
41pub type PageRef = Arc<Page>;
42
43/// Page is up-to-date.
44const PAGE_UPTODATE: usize = 0b001;
45/// Page is locked for I/O to prevent concurrent access.
46const PAGE_LOCKED: usize = 0b010;
47/// Page had an I/O error.
48const PAGE_ERROR: usize = 0b100;
49/// Page is dirty. Flush needed.
50const PAGE_DIRTY: usize = 0b1000;
51/// Page's contents are loaded in memory.
52const PAGE_LOADED: usize = 0b10000;
53
54impl Page {
55    pub fn new(id: usize) -> Self {
56        Self {
57            inner: UnsafeCell::new(PageInner {
58                flags: AtomicUsize::new(0),
59                contents: None,
60                id,
61            }),
62        }
63    }
64
65    #[allow(clippy::mut_from_ref)]
66    pub fn get(&self) -> &mut PageInner {
67        unsafe { &mut *self.inner.get() }
68    }
69
70    pub fn get_contents(&self) -> &mut PageContent {
71        self.get().contents.as_mut().unwrap()
72    }
73
74    pub fn is_uptodate(&self) -> bool {
75        self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0
76    }
77
78    pub fn set_uptodate(&self) {
79        self.get().flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst);
80    }
81
82    pub fn clear_uptodate(&self) {
83        self.get().flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst);
84    }
85
86    pub fn is_locked(&self) -> bool {
87        self.get().flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0
88    }
89
90    pub fn set_locked(&self) {
91        self.get().flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst);
92    }
93
94    pub fn clear_locked(&self) {
95        self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst);
96    }
97
98    pub fn is_error(&self) -> bool {
99        self.get().flags.load(Ordering::SeqCst) & PAGE_ERROR != 0
100    }
101
102    pub fn set_error(&self) {
103        self.get().flags.fetch_or(PAGE_ERROR, Ordering::SeqCst);
104    }
105
106    pub fn clear_error(&self) {
107        self.get().flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
108    }
109
110    pub fn is_dirty(&self) -> bool {
111        self.get().flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0
112    }
113
114    pub fn set_dirty(&self) {
115        tracing::debug!("set_dirty(page={})", self.get().id);
116        self.get().flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst);
117    }
118
119    pub fn clear_dirty(&self) {
120        tracing::debug!("clear_dirty(page={})", self.get().id);
121        self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
122    }
123
124    pub fn is_loaded(&self) -> bool {
125        self.get().flags.load(Ordering::SeqCst) & PAGE_LOADED != 0
126    }
127
128    pub fn set_loaded(&self) {
129        self.get().flags.fetch_or(PAGE_LOADED, Ordering::SeqCst);
130    }
131
132    pub fn clear_loaded(&self) {
133        tracing::debug!("clear loaded {}", self.get().id);
134        self.get().flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst);
135    }
136
137    pub fn is_index(&self) -> bool {
138        match self.get_contents().page_type() {
139            PageType::IndexLeaf | PageType::IndexInterior => true,
140            PageType::TableLeaf | PageType::TableInterior => false,
141        }
142    }
143}
144
145#[derive(Clone, Copy, Debug)]
146/// The state of the current pager cache flush.
147enum FlushState {
148    /// Idle.
149    Start,
150    /// Waiting for all in-flight writes to the on-disk WAL to complete.
151    WaitAppendFrames,
152    /// Fsync the on-disk WAL.
153    SyncWal,
154    /// Checkpoint the WAL to the database file (if needed).
155    Checkpoint,
156    /// Fsync the database file.
157    SyncDbFile,
158    /// Waiting for the database file to be fsynced.
159    WaitSyncDbFile,
160}
161
162#[derive(Clone, Debug, Copy)]
163enum CheckpointState {
164    Checkpoint,
165    SyncDbFile,
166    WaitSyncDbFile,
167    CheckpointDone,
168}
169
170/// The mode of allocating a btree page.
171pub enum BtreePageAllocMode {
172    /// Allocate any btree page
173    Any,
174    /// Allocate a specific page number, typically used for root page allocation
175    Exact(u32),
176    /// Allocate a page number less than or equal to the parameter
177    Le(u32),
178}
179
180/// This will keep track of the state of current cache flush in order to not repeat work
181struct FlushInfo {
182    state: FlushState,
183    /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
184    in_flight_writes: Rc<RefCell<usize>>,
185}
186
187/// Track the state of the auto-vacuum mode.
188#[derive(Clone, Copy, Debug)]
189pub enum AutoVacuumMode {
190    None,
191    Full,
192    Incremental,
193}
194
195/// Durability level set by `PRAGMA synchronous`.
196///
197/// Mirrors SQLite's synchronous settings. Controls when fsync is issued to the
198/// WAL and the database file.
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200pub enum SynchronousMode {
201    /// `0` — never fsync. Fastest, least durable (data may be lost on OS crash).
202    Off,
203    /// `1` — fsync the WAL at checkpoint time only. The default.
204    Normal,
205    /// `2` — fsync the WAL on every commit AND fsync the DB file after checkpoint.
206    Full,
207    /// `3` — like FULL with extra fsync of the directory containing the rollback
208    /// journal; for WAL mode this behaves like FULL for our purposes.
209    Extra,
210}
211
212impl SynchronousMode {
213    /// Numeric form used by `PRAGMA synchronous` (and SQLite's on-the-wire value).
214    pub fn as_i64(self) -> i64 {
215        match self {
216            SynchronousMode::Off => 0,
217            SynchronousMode::Normal => 1,
218            SynchronousMode::Full => 2,
219            SynchronousMode::Extra => 3,
220        }
221    }
222}
223
224/// A snapshot of WAL state captured at SAVEPOINT open time.
225///
226/// Used by ROLLBACK TO SAVEPOINT to restore the WAL (and thus the database)
227/// to the exact state it was in when the savepoint was created.
228#[derive(Debug, Clone)]
229pub struct SavepointFrame {
230    /// The savepoint name (compared case-insensitively per SQLite spec).
231    pub name: String,
232    /// `shared.max_frame` at the time the savepoint was opened.
233    pub wal_max_frame: u64,
234    /// `shared.last_checksum` at the time the savepoint was opened.
235    pub wal_checksum: (u32, u32),
236    /// True when this savepoint implicitly started the write transaction
237    /// (i.e., it was opened in autocommit mode).  RELEASE of an owner
238    /// savepoint commits the transaction.
239    pub is_transaction_owner: bool,
240}
241
242/// The pager interface implements the persistence layer by providing access
243/// to pages of the database file, including caching, concurrency control, and
244/// transaction management.
245pub struct Pager {
246    /// Source of the database pages.
247    pub db_file: Arc<dyn DatabaseStorage>,
248    /// The write-ahead log (WAL) for the database.
249    wal: Rc<RefCell<dyn Wal>>,
250    /// A page cache for the database.
251    page_cache: Arc<RwLock<DumbLruPageCache>>,
252    /// Buffer pool for temporary data storage.
253    buffer_pool: Rc<BufferPool>,
254    /// I/O interface for input/output operations.
255    pub io: Arc<dyn crate::io::IO>,
256    dirty_pages: Rc<RefCell<HashSet<usize>>>,
257    pub db_header: Arc<SpinLock<DatabaseHeader>>,
258
259    flush_info: RefCell<FlushInfo>,
260    checkpoint_state: RefCell<CheckpointState>,
261    checkpoint_inflight: Rc<RefCell<usize>>,
262    syncing: Rc<RefCell<bool>>,
263    auto_vacuum_mode: RefCell<AutoVacuumMode>,
264    /// Durability level controlled by `PRAGMA synchronous`. Defaults to NORMAL.
265    synchronous_mode: Cell<SynchronousMode>,
266    /// Stack of active savepoints for SAVEPOINT / ROLLBACK TO / RELEASE.
267    pub savepoints: RefCell<Vec<SavepointFrame>>,
268}
269
270#[derive(Debug, Copy, Clone)]
271/// The status of the current cache flush.
272/// A Done state means that the WAL was committed to disk and fsynced,
273/// plus potentially checkpointed to the DB (and the DB then fsynced).
274pub enum PagerCacheflushStatus {
275    Done(PagerCacheflushResult),
276    IO,
277}
278
279#[derive(Debug, Copy, Clone)]
280pub enum PagerCacheflushResult {
281    /// The WAL was written to disk and fsynced.
282    WalWritten,
283    /// The WAL was written, fsynced, and a checkpoint was performed.
284    /// The database file was then also fsynced.
285    Checkpointed(CheckpointResult),
286}
287
288impl Pager {
289    /// Begins opening a database by reading the database header.
290    pub fn begin_open(db_file: Arc<dyn DatabaseStorage>) -> Result<Arc<SpinLock<DatabaseHeader>>> {
291        sqlite3_ondisk::begin_read_database_header(db_file)
292    }
293
294    /// Completes opening a database by initializing the Pager with the database header.
295    pub fn finish_open(
296        db_header_ref: Arc<SpinLock<DatabaseHeader>>,
297        db_file: Arc<dyn DatabaseStorage>,
298        wal: Rc<RefCell<dyn Wal>>,
299        io: Arc<dyn crate::io::IO>,
300        page_cache: Arc<RwLock<DumbLruPageCache>>,
301        buffer_pool: Rc<BufferPool>,
302    ) -> Result<Self> {
303        Ok(Self {
304            db_file,
305            wal,
306            page_cache,
307            io,
308            dirty_pages: Rc::new(RefCell::new(HashSet::new())),
309            db_header: db_header_ref.clone(),
310            flush_info: RefCell::new(FlushInfo {
311                state: FlushState::Start,
312                in_flight_writes: Rc::new(RefCell::new(0)),
313            }),
314            syncing: Rc::new(RefCell::new(false)),
315            checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
316            checkpoint_inflight: Rc::new(RefCell::new(0)),
317            buffer_pool,
318            auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
319            synchronous_mode: Cell::new(SynchronousMode::Normal),
320            savepoints: RefCell::new(Vec::new()),
321        })
322    }
323
324    pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode {
325        *self.auto_vacuum_mode.borrow()
326    }
327
328    pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) {
329        *self.auto_vacuum_mode.borrow_mut() = mode;
330    }
331
332    pub fn get_synchronous_mode(&self) -> SynchronousMode {
333        self.synchronous_mode.get()
334    }
335
336    pub fn set_synchronous_mode(&self, mode: SynchronousMode) {
337        self.synchronous_mode.set(mode);
338    }
339
340    /// Retrieves the pointer map entry for a given database page.
341    /// `target_page_num` (1-indexed) is the page whose entry is sought.
342    /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself).
343    #[cfg(not(feature = "omit_autovacuum"))]
344    pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
345        tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
346        let configured_page_size = self.db_header.lock().get_page_size() as usize;
347
348        if target_page_num < FIRST_PTRMAP_PAGE_NO
349            || is_ptrmap_page(target_page_num, configured_page_size)
350        {
351            return Ok(CursorResult::Ok(None));
352        }
353
354        let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
355        let offset_in_ptrmap_page =
356            get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?;
357        tracing::trace!(
358            "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
359            target_page_num,
360            ptrmap_pg_no
361        );
362
363        let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
364        if ptrmap_page.is_locked() {
365            return Ok(CursorResult::IO);
366        }
367        if !ptrmap_page.is_loaded() {
368            return Ok(CursorResult::IO);
369        }
370        let ptrmap_page_inner = ptrmap_page.get();
371
372        let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
373            Some(content) => content,
374            None => {
375                return Err(LimboError::InternalError(format!(
376                    "Ptrmap page {} content not loaded",
377                    ptrmap_pg_no
378                )))
379            }
380        };
381
382        let page_buffer_guard: std::cell::Ref<IoBuffer> = page_content.buffer.borrow();
383        let full_buffer_slice: &[u8] = page_buffer_guard.as_slice();
384
385        // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
386        // The actual page data starts at page_content.offset within the full_buffer_slice.
387        if ptrmap_pg_no != 1 && page_content.offset != 0 {
388            return Err(LimboError::Corrupt(format!(
389                "Ptrmap page {} has unexpected internal offset {}",
390                ptrmap_pg_no, page_content.offset
391            )));
392        }
393        let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
394        let actual_data_length = ptrmap_page_data_slice.len();
395
396        // Check if the calculated offset for the entry is within the bounds of the actual page data length.
397        if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
398            return Err(LimboError::InternalError(format!(
399                "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
400                offset_in_ptrmap_page, PTRMAP_ENTRY_SIZE, ptrmap_pg_no, actual_data_length
401            )));
402        }
403
404        let entry_slice = &ptrmap_page_data_slice
405            [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
406        match PtrmapEntry::deserialize(entry_slice) {
407            Some(entry) => Ok(CursorResult::Ok(Some(entry))),
408            None => Err(LimboError::Corrupt(format!(
409                "Failed to deserialize ptrmap entry for page {} from ptrmap page {}",
410                target_page_num, ptrmap_pg_no
411            ))),
412        }
413    }
414
415    /// Writes or updates the pointer map entry for a given database page.
416    /// `db_page_no_to_update` (1-indexed) is the page whose entry is to be set.
417    /// `entry_type` and `parent_page_no` define the new entry.
418    #[cfg(not(feature = "omit_autovacuum"))]
419    pub fn ptrmap_put(
420        &self,
421        db_page_no_to_update: u32,
422        entry_type: PtrmapType,
423        parent_page_no: u32,
424    ) -> Result<CursorResult<()>> {
425        tracing::trace!(
426            "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {})",
427            db_page_no_to_update,
428            entry_type,
429            parent_page_no
430        );
431
432        let page_size = self.db_header.lock().get_page_size() as usize;
433
434        if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
435            || is_ptrmap_page(db_page_no_to_update, page_size)
436        {
437            return Err(LimboError::InternalError(format!(
438                "Cannot set ptrmap entry for page {}: it's a header/ptrmap page or invalid.",
439                db_page_no_to_update
440            )));
441        }
442
443        let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
444        let offset_in_ptrmap_page =
445            get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
446        tracing::trace!(
447            "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}",
448            db_page_no_to_update,
449            entry_type,
450            parent_page_no,
451            ptrmap_pg_no,
452            offset_in_ptrmap_page
453        );
454
455        let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
456        if ptrmap_page.is_locked() {
457            return Ok(CursorResult::IO);
458        }
459        if !ptrmap_page.is_loaded() {
460            return Ok(CursorResult::IO);
461        }
462        let ptrmap_page_inner = ptrmap_page.get();
463
464        let page_content = match ptrmap_page_inner.contents.as_ref() {
465            Some(content) => content,
466            None => {
467                return Err(LimboError::InternalError(format!(
468                    "Ptrmap page {} content not loaded",
469                    ptrmap_pg_no
470                )))
471            }
472        };
473
474        let mut page_buffer_guard = page_content.buffer.borrow_mut();
475        let full_buffer_slice = page_buffer_guard.as_mut_slice();
476
477        if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
478            return Err(LimboError::InternalError(format!(
479                "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
480                offset_in_ptrmap_page,
481                PTRMAP_ENTRY_SIZE,
482                ptrmap_pg_no,
483                full_buffer_slice.len()
484            )));
485        }
486
487        let entry = PtrmapEntry {
488            entry_type,
489            parent_page_no,
490        };
491        entry.serialize(
492            &mut full_buffer_slice
493                [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
494        )?;
495
496        ptrmap_page.set_dirty();
497        self.add_dirty(ptrmap_pg_no as usize);
498        Ok(CursorResult::Ok(()))
499    }
500
501    /// This method is used to allocate a new root page for a btree, both for tables and indexes
502    /// FIXME: handle no room in page cache
503    pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
504        let page_type = match flags {
505            _ if flags.is_table() => PageType::TableLeaf,
506            _ if flags.is_index() => PageType::IndexLeaf,
507            _ => unreachable!("Invalid flags state"),
508        };
509        #[cfg(feature = "omit_autovacuum")]
510        {
511            let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
512            let page_id = page.get().get().id;
513            return Ok(CursorResult::Ok(page_id as u32));
514        }
515
516        //  If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number
517        #[cfg(not(feature = "omit_autovacuum"))]
518        {
519            let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
520            match *auto_vacuum_mode {
521                AutoVacuumMode::None => {
522                    let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any);
523                    let page_id = page.get().get().id;
524                    return Ok(CursorResult::Ok(page_id as u32));
525                }
526                AutoVacuumMode::Full => {
527                    let mut root_page_num = self.db_header.lock().vacuum_mode_largest_root_page;
528                    assert!(root_page_num > 0); //  Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
529                    root_page_num += 1;
530                    assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); //  can never be less than 2 because we have already incremented
531
532                    while is_ptrmap_page(
533                        root_page_num,
534                        self.db_header.lock().get_page_size() as usize,
535                    ) {
536                        root_page_num += 1;
537                    }
538                    assert!(root_page_num >= 3); //  the very first root page is page 3
539
540                    //  root_page_num here is the desired root page
541                    let page = self.do_allocate_page(
542                        page_type,
543                        0,
544                        BtreePageAllocMode::Exact(root_page_num),
545                    );
546                    let allocated_page_id = page.get().get().id as u32;
547                    if allocated_page_id != root_page_num {
548                        //  TODO(Zaid): Handle swapping the allocated page with the desired root page
549                    }
550
551                    //  TODO(Zaid): Update the header metadata to reflect the new root page number
552
553                    //  For now map allocated_page_id since we are not swapping it with root_page_num
554                    match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? {
555                        CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id as u32)),
556                        CursorResult::IO => Ok(CursorResult::IO),
557                    }
558                }
559                AutoVacuumMode::Incremental => {
560                    unimplemented!()
561                }
562            }
563        }
564    }
565
566    /// Allocate a new overflow page.
567    /// This is done when a cell overflows and new space is needed.
568    // FIXME: handle no room in page cache
569    pub fn allocate_overflow_page(&self) -> PageRef {
570        let page = self.allocate_page().unwrap();
571        tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id);
572
573        // setup overflow page
574        let contents = page.get().contents.as_mut().unwrap();
575        let buf = contents.as_ptr();
576        buf.fill(0);
577
578        page
579    }
580
581    /// Allocate a new page to the btree via the pager.
582    /// This marks the page as dirty and writes the page header.
583    // FIXME: handle no room in page cache
584    pub fn do_allocate_page(
585        &self,
586        page_type: PageType,
587        offset: usize,
588        _alloc_mode: BtreePageAllocMode,
589    ) -> BTreePage {
590        let page = self.allocate_page().unwrap();
591        let page = Arc::new(BTreePageInner {
592            page: RefCell::new(page),
593        });
594        crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16);
595        tracing::debug!(
596            "do_allocate_page(id={}, page_type={:?})",
597            page.get().get().id,
598            page.get().get_contents().page_type()
599        );
600        page
601    }
602
603    /// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16
604    /// in the header, minus the "reserved" space size recorded in the 1-byte integer at offset 20 in the header.
605    /// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480.
606    /// In other words, if the page size is 512, then the reserved space size cannot exceed 32.
607    pub fn usable_space(&self) -> usize {
608        let db_header = self.db_header.lock();
609        (db_header.get_page_size() - db_header.reserved_space as u32) as usize
610    }
611
612    #[inline(always)]
613    pub fn begin_read_tx(&self) -> Result<LimboResult> {
614        self.wal.borrow_mut().begin_read_tx()
615    }
616
617    #[inline(always)]
618    pub fn begin_write_tx(&self) -> Result<LimboResult> {
619        self.wal.borrow_mut().begin_write_tx()
620    }
621
622    pub fn end_tx(&self) -> Result<PagerCacheflushStatus> {
623        let cacheflush_status = self.cacheflush()?;
624        return match cacheflush_status {
625            PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
626            PagerCacheflushStatus::Done(_) => {
627                self.wal.borrow().end_write_tx()?;
628                self.wal.borrow().end_read_tx()?;
629                // Clear any remaining savepoints after the transaction commits.
630                self.savepoints.borrow_mut().clear();
631                Ok(cacheflush_status)
632            }
633        };
634    }
635
636    pub fn end_read_tx(&self) -> Result<()> {
637        self.wal.borrow().end_read_tx()?;
638        Ok(())
639    }
640
641    /// Reads a page from the database.
642    pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
643        tracing::trace!("read_page(page_idx = {})", page_idx);
644        let mut page_cache = self.page_cache.write();
645        let page_key = PageCacheKey::new(page_idx);
646        if let Some(page) = page_cache.get(&page_key) {
647            tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
648            return Ok(page.clone());
649        }
650        let page = Arc::new(Page::new(page_idx));
651        page.set_locked();
652
653        if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
654            self.wal
655                .borrow()
656                .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
657            {
658                page.set_uptodate();
659            }
660            // TODO(pere) should probably first insert to page cache, and if successful,
661            // read frame or page
662            match page_cache.insert(page_key, page.clone()) {
663                Ok(_) => {}
664                Err(CacheError::Full) => return Err(LimboError::CacheFull),
665                Err(CacheError::KeyExists) => {
666                    unreachable!("Page should not exist in cache after get() miss")
667                }
668                Err(e) => {
669                    return Err(LimboError::InternalError(format!(
670                        "Failed to insert page into cache: {:?}",
671                        e
672                    )))
673                }
674            }
675            return Ok(page);
676        }
677
678        sqlite3_ondisk::begin_read_page(
679            self.db_file.clone(),
680            self.buffer_pool.clone(),
681            page.clone(),
682            page_idx,
683        )?;
684        match page_cache.insert(page_key, page.clone()) {
685            Ok(_) => {}
686            Err(CacheError::Full) => return Err(LimboError::CacheFull),
687            Err(CacheError::KeyExists) => {
688                unreachable!("Page should not exist in cache after get() miss")
689            }
690            Err(e) => {
691                return Err(LimboError::InternalError(format!(
692                    "Failed to insert page into cache: {:?}",
693                    e
694                )))
695            }
696        }
697        Ok(page)
698    }
699
700    /// Writes the database header.
701    pub fn write_database_header(&self, header: &DatabaseHeader) -> Result<()> {
702        let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?;
703        while header_page.is_locked() {
704            // FIXME: we should never run io here!
705            self.io.run_once()?;
706        }
707        header_page.set_dirty();
708        self.add_dirty(DATABASE_HEADER_PAGE_ID);
709
710        let contents = header_page.get().contents.as_ref().unwrap();
711        contents.write_database_header(&header);
712
713        Ok(())
714    }
715
716    /// Refresh the shared in-memory database header from the write-ahead log.
717    ///
718    /// The header occupies the first 100 bytes of page 1. At open time the
719    /// header is read straight from the main database file (see
720    /// [`Pager::begin_open`] / `sqlite3_ondisk::begin_read_database_header`),
721    /// which deliberately bypasses the WAL. Every *other* page is read through
722    /// [`Pager::read_page`], which consults the WAL via
723    /// [`crate::storage::wal::Wal::find_frame`]. That asymmetry means a page-1
724    /// change that has been committed to the WAL but not yet checkpointed back
725    /// into the main file (the normal state after a non-checkpointing close)
726    /// is invisible to the header — so cookies written via `PRAGMA
727    /// application_id` / `PRAGMA user_version` reset to their pre-write value on
728    /// reopen even though they are durably recorded in the WAL.
729    ///
730    /// This routine closes that gap the way SQLite itself resolves the header:
731    /// if the WAL holds a frame for page 1, page 1 is read through the
732    /// WAL-aware pager path and the shared header is re-decoded from that
733    /// frame. When the WAL has no page-1 frame the header read from the main
734    /// file at open time is already authoritative and the call is a no-op.
735    ///
736    /// It is intended to be invoked once, immediately after the shared WAL has
737    /// been opened/recovered, before any connection observes the header.
738    pub fn refresh_header_from_wal(&self) -> Result<()> {
739        // A read transaction publishes the recovered `max_frame` to this
740        // connection's WAL view so `find_frame` can see frames written by a
741        // previous process/connection. Without it `find_frame` would observe a
742        // zero read mark and miss the page-1 frame entirely.
743        if let LimboResult::Busy = self.begin_read_tx()? {
744            // Someone else holds the relevant lock; the header from the main
745            // file remains a valid (if possibly stale) view and a later
746            // transaction will resolve page 1 through the WAL as usual.
747            return Ok(());
748        }
749
750        let result = self.refresh_header_from_wal_inner();
751
752        // Always release the read transaction, even if the refresh failed.
753        self.end_read_tx()?;
754        result
755    }
756
757    fn refresh_header_from_wal_inner(&self) -> Result<()> {
758        let has_wal_frame = self
759            .wal
760            .borrow()
761            .find_frame(DATABASE_HEADER_PAGE_ID as u64)?
762            .is_some();
763        if !has_wal_frame {
764            // No newer page-1 frame in the WAL: the header read from the main
765            // database file at open time is authoritative.
766            return Ok(());
767        }
768
769        // Resolve page 1 through the WAL-aware read path.
770        let header_page = self.read_page(DATABASE_HEADER_PAGE_ID)?;
771        while header_page.is_locked() {
772            // FIXME: we should never run io here! (mirrors write_database_header)
773            self.io.run_once()?;
774        }
775
776        let page = header_page.get();
777        let contents = page.contents.as_ref().ok_or_else(|| {
778            LimboError::InternalError(
779                "page 1 contents missing while refreshing database header from WAL".to_string(),
780            )
781        })?;
782        let buf = contents.as_ptr();
783        sqlite3_ondisk::parse_database_header_into(&buf[..DATABASE_HEADER_SIZE], &self.db_header)?;
784
785        // The page now lives in this throwaway pager's cache with a frame that
786        // belongs to the recovered WAL view. Drop it so the first real
787        // connection re-reads page 1 under its own read transaction rather than
788        // reusing a cache entry populated outside a proper transaction scope.
789        self.clear_page_cache();
790        Ok(())
791    }
792
793    /// Changes the size of the page cache.
794    pub fn change_page_cache_size(&self, capacity: usize) -> Result<CacheResizeResult> {
795        let mut page_cache = self.page_cache.write();
796        Ok(page_cache.resize(capacity))
797    }
798
799    pub fn add_dirty(&self, page_id: usize) {
800        // TODO: check duplicates?
801        let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
802        dirty_pages.insert(page_id);
803    }
804
805    pub fn wal_frame_count(&self) -> Result<u64> {
806        Ok(self.wal.borrow().get_max_frame_in_wal())
807    }
808
809    /// Flush dirty pages to disk.
810    /// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
811    /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
812    /// the database file and then fsync the database file.
813    pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
814        let mut checkpoint_result = CheckpointResult::default();
815        loop {
816            let state = self.flush_info.borrow().state;
817            trace!("cacheflush {:?}", state);
818            match state {
819                FlushState::Start => {
820                    let db_size = self.db_header.lock().database_size;
821                    for page_id in self.dirty_pages.borrow().iter() {
822                        let mut cache = self.page_cache.write();
823                        let page_key = PageCacheKey::new(*page_id);
824                        let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
825                        let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
826                        trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
827                        self.wal.borrow_mut().append_frame(
828                            page.clone(),
829                            db_size,
830                            self.flush_info.borrow().in_flight_writes.clone(),
831                        )?;
832                        page.clear_dirty();
833                    }
834                    // This is okay assuming we use shared cache by default.
835                    {
836                        let mut cache = self.page_cache.write();
837                        cache.clear().unwrap();
838                    }
839                    self.dirty_pages.borrow_mut().clear();
840                    self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
841                    return Ok(PagerCacheflushStatus::IO);
842                }
843                FlushState::WaitAppendFrames => {
844                    let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
845                    if in_flight == 0 {
846                        self.flush_info.borrow_mut().state = FlushState::SyncWal;
847                    } else {
848                        return Ok(PagerCacheflushStatus::IO);
849                    }
850                }
851                FlushState::SyncWal => {
852                    if self.synchronous_mode.get() != SynchronousMode::Off
853                        && WalFsyncStatus::IO == self.wal.borrow_mut().sync()?
854                    {
855                        return Ok(PagerCacheflushStatus::IO);
856                    }
857
858                    if !self.wal.borrow().should_checkpoint() {
859                        self.flush_info.borrow_mut().state = FlushState::Start;
860                        return Ok(PagerCacheflushStatus::Done(
861                            PagerCacheflushResult::WalWritten,
862                        ));
863                    }
864                    self.flush_info.borrow_mut().state = FlushState::Checkpoint;
865                }
866                FlushState::Checkpoint => {
867                    match self.checkpoint()? {
868                        CheckpointStatus::Done(res) => {
869                            checkpoint_result = res;
870                            self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
871                        }
872                        CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO),
873                    };
874                }
875                FlushState::SyncDbFile => {
876                    if self.synchronous_mode.get() == SynchronousMode::Off {
877                        self.flush_info.borrow_mut().state = FlushState::Start;
878                        break;
879                    }
880                    sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
881                    self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
882                }
883                FlushState::WaitSyncDbFile => {
884                    if *self.syncing.borrow() {
885                        return Ok(PagerCacheflushStatus::IO);
886                    } else {
887                        self.flush_info.borrow_mut().state = FlushState::Start;
888                        break;
889                    }
890                }
891            }
892        }
893        Ok(PagerCacheflushStatus::Done(
894            PagerCacheflushResult::Checkpointed(checkpoint_result),
895        ))
896    }
897
898    pub fn wal_get_frame(
899        &self,
900        frame_no: u32,
901        p_frame: *mut u8,
902        frame_len: u32,
903    ) -> Result<Arc<Completion>> {
904        let wal = self.wal.borrow();
905        return wal.read_frame_raw(
906            frame_no.into(),
907            self.buffer_pool.clone(),
908            p_frame,
909            frame_len,
910        );
911    }
912
913    pub fn checkpoint(&self) -> Result<CheckpointStatus> {
914        let mut checkpoint_result = CheckpointResult::default();
915        loop {
916            let state = *self.checkpoint_state.borrow();
917            trace!("pager_checkpoint(state={:?})", state);
918            match state {
919                CheckpointState::Checkpoint => {
920                    let in_flight = self.checkpoint_inflight.clone();
921                    match self.wal.borrow_mut().checkpoint(
922                        self,
923                        in_flight,
924                        CheckpointMode::Passive,
925                    )? {
926                        CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
927                        CheckpointStatus::Done(res) => {
928                            checkpoint_result = res;
929                            self.checkpoint_state.replace(CheckpointState::SyncDbFile);
930                        }
931                    };
932                }
933                CheckpointState::SyncDbFile => {
934                    if self.synchronous_mode.get() == SynchronousMode::Off {
935                        self.checkpoint_state
936                            .replace(CheckpointState::CheckpointDone);
937                    } else {
938                        sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
939                        self.checkpoint_state
940                            .replace(CheckpointState::WaitSyncDbFile);
941                    }
942                }
943                CheckpointState::WaitSyncDbFile => {
944                    if *self.syncing.borrow() {
945                        return Ok(CheckpointStatus::IO);
946                    } else {
947                        self.checkpoint_state
948                            .replace(CheckpointState::CheckpointDone);
949                    }
950                }
951                CheckpointState::CheckpointDone => {
952                    return if *self.checkpoint_inflight.borrow() > 0 {
953                        Ok(CheckpointStatus::IO)
954                    } else {
955                        self.checkpoint_state.replace(CheckpointState::Checkpoint);
956                        Ok(CheckpointStatus::Done(checkpoint_result))
957                    };
958                }
959            }
960        }
961    }
962
963    /// Invalidates entire page cache by removing all dirty and clean pages. Usually used in case
964    /// of a rollback or in case we want to invalidate page cache after starting a read transaction
965    /// right after new writes happened which would invalidate current page cache.
966    pub fn clear_page_cache(&self) {
967        self.dirty_pages.borrow_mut().clear();
968        self.page_cache.write().unset_dirty_all_pages();
969        self.page_cache
970            .write()
971            .clear()
972            .expect("Failed to clear page cache");
973    }
974
975    /// Roll back the current write transaction.
976    ///
977    /// Clears all dirty pages from the page cache, resets in-flight flush and
978    /// checkpoint state machines to their idle state, and delegates to the WAL
979    /// to undo any frames appended during this transaction.
980    pub fn rollback(&self) {
981        // Invalidate every dirty (and clean) cached page so that subsequent
982        // reads reload from the WAL / database file.
983        self.clear_page_cache();
984
985        // Reset the flush state machine so the next commit starts fresh.
986        self.flush_info.borrow_mut().state = FlushState::Start;
987        *self.flush_info.borrow().in_flight_writes.borrow_mut() = 0;
988
989        // Reset the checkpoint state machine.
990        self.checkpoint_state.replace(CheckpointState::Checkpoint);
991
992        // Reset the sync flag.
993        *self.syncing.borrow_mut() = false;
994
995        // Roll back WAL frames appended during this transaction.
996        self.wal.borrow().rollback();
997
998        // Discard all savepoints: a full ROLLBACK invalidates them all.
999        self.savepoints.borrow_mut().clear();
1000    }
1001
1002    // ── Savepoint management ──────────────────────────────────────────────────
1003
1004    /// Flush all currently dirty pages to WAL frames without clearing the page
1005    /// cache.
1006    ///
1007    /// This is called at `SAVEPOINT` open time to materialise the pre-savepoint
1008    /// state as WAL frames.  After this call the pager's `dirty_pages` set is
1009    /// empty and every modified page lives in `frame_cache`.  Subsequent
1010    /// `ROLLBACK TO` can restore to this state by pruning frames written after
1011    /// the savepoint and then relying on WAL-based reads.
1012    ///
1013    /// Unlike `cacheflush` this method intentionally does **not** clear the
1014    /// page cache, so in-progress reads/writes continue to work from the
1015    /// existing in-memory pages.
1016    fn flush_dirty_pages_to_wal(&self) -> Result<()> {
1017        if self.dirty_pages.borrow().is_empty() {
1018            return Ok(());
1019        }
1020        let db_size = self.db_header.lock().database_size;
1021        // Collect page IDs first to avoid holding the dirty_pages borrow while
1022        // iterating and calling append_frame.
1023        let dirty_ids: Vec<usize> = self.dirty_pages.borrow().iter().copied().collect();
1024        for page_id in dirty_ids {
1025            let page_key = PageCacheKey::new(page_id);
1026            // Use the write-lock variant of get() which returns Option<Arc<Page>>.
1027            let mut cache = self.page_cache.write();
1028            let page = match cache.get(&page_key) {
1029                Some(p) => p,
1030                // Page was dirtied but evicted; skip defensively.
1031                None => continue,
1032            };
1033            self.wal.borrow_mut().append_frame(
1034                page.clone(),
1035                db_size,
1036                self.flush_info.borrow().in_flight_writes.clone(),
1037            )?;
1038        }
1039        // Clear the dirty-pages set — these pages are now in WAL frames.
1040        // The page cache is left intact so in-flight reads see the same data.
1041        self.dirty_pages.borrow_mut().clear();
1042        // Advance the reader-visible max frame so that, after a ROLLBACK TO
1043        // clears the page cache, subsequent reads can find these frames via
1044        // find_frame.
1045        let new_max = self.wal.borrow().current_frame_state().0;
1046        self.wal.borrow_mut().set_reader_max_frame(new_max);
1047        Ok(())
1048    }
1049
1050    /// Open a new named savepoint by capturing the current WAL frame state.
1051    ///
1052    /// All currently dirty pages are first flushed to WAL so that the
1053    /// pre-savepoint state is recorded as actual WAL frames.  `ROLLBACK TO`
1054    /// can then restore to this point by pruning frames written after the
1055    /// savepoint and clearing the page cache.
1056    ///
1057    /// `is_txn_owner` should be `true` when this savepoint implicitly started
1058    /// the surrounding write transaction (i.e., opened in autocommit mode).
1059    pub fn open_savepoint(&self, name: String, is_txn_owner: bool) -> Result<()> {
1060        // Materialise the current state as WAL frames before snapshotting.
1061        self.flush_dirty_pages_to_wal()?;
1062        let (max_frame, checksum) = self.wal.borrow().current_frame_state();
1063        let frame = SavepointFrame {
1064            name,
1065            wal_max_frame: max_frame,
1066            wal_checksum: checksum,
1067            is_transaction_owner: is_txn_owner,
1068        };
1069        self.savepoints.borrow_mut().push(frame);
1070        Ok(())
1071    }
1072
1073    /// Roll back to a previously opened savepoint (ROLLBACK TO SAVEPOINT).
1074    ///
1075    /// Truncates the WAL to the savepoint's frame watermark, clears the page
1076    /// cache, and resets the flush state machine — exactly as a full rollback
1077    /// does, but scoped to the savepoint boundary.  The savepoint itself
1078    /// remains valid so subsequent writes can continue within the transaction.
1079    ///
1080    /// Nested savepoints opened after the target are discarded.
1081    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
1082        // Find the innermost savepoint with this name.
1083        let idx = {
1084            let sps = self.savepoints.borrow();
1085            sps.iter()
1086                .rposition(|sp| sp.name.eq_ignore_ascii_case(name))
1087                .ok_or_else(|| LimboError::ParseError(format!("no such savepoint: {name}")))?
1088        };
1089        let sp = {
1090            let sps = self.savepoints.borrow();
1091            sps[idx].clone()
1092        };
1093        // All savepoints opened after the target are invalid after rollback.
1094        self.savepoints.borrow_mut().truncate(idx + 1);
1095        // Clear page cache (and dirty_pages) so reads see only the WAL state.
1096        self.clear_page_cache();
1097        // Prune WAL frames appended after the savepoint.
1098        self.wal
1099            .borrow()
1100            .rollback_to_frame(sp.wal_max_frame, sp.wal_checksum)?;
1101        // Restore the reader-visible max frame to the savepoint boundary so
1102        // that find_frame returns exactly the savepoint-era frames.
1103        self.wal.borrow_mut().set_reader_max_frame(sp.wal_max_frame);
1104        // Reset flush state machine.  Do NOT reset in_flight_writes: writes
1105        // from the savepoint flush may still be in-flight for file-backed DBs
1106        // and will drain naturally through the IO loop during COMMIT.
1107        self.flush_info.borrow_mut().state = FlushState::Start;
1108        self.checkpoint_state.replace(CheckpointState::Checkpoint);
1109        *self.syncing.borrow_mut() = false;
1110        Ok(())
1111    }
1112
1113    /// Release a savepoint (RELEASE SAVEPOINT).
1114    ///
1115    /// Removes the named savepoint and all savepoints opened after it from the
1116    /// stack.  Returns `true` when the released savepoint was the implicit
1117    /// transaction owner — the caller must then commit the transaction.
1118    pub fn release_savepoint(&self, name: &str) -> Result<bool> {
1119        // Find the innermost savepoint with this name.
1120        let idx = {
1121            let sps = self.savepoints.borrow();
1122            sps.iter()
1123                .rposition(|sp| sp.name.eq_ignore_ascii_case(name))
1124                .ok_or_else(|| LimboError::ParseError(format!("no such savepoint: {name}")))?
1125        };
1126        let is_owner = self.savepoints.borrow()[idx].is_transaction_owner;
1127        // Release the target savepoint and all nested ones opened after it.
1128        self.savepoints.borrow_mut().truncate(idx);
1129        Ok(is_owner)
1130    }
1131
1132    pub fn checkpoint_shutdown(&self) -> Result<()> {
1133        let mut attempts = 0;
1134        {
1135            let mut wal = self.wal.borrow_mut();
1136            // fsync the wal synchronously before beginning checkpoint (unless OFF)
1137            if self.synchronous_mode.get() != SynchronousMode::Off {
1138                while let Ok(WalFsyncStatus::IO) = wal.sync() {
1139                    if attempts >= 10 {
1140                        return Err(LimboError::InternalError(
1141                            "Failed to fsync WAL before final checkpoint, fd likely closed".into(),
1142                        ));
1143                    }
1144                    self.io.run_once()?;
1145                    attempts += 1;
1146                }
1147            }
1148        }
1149        // Truncate the WAL on clean close so the .db is self-contained; if a
1150        // Truncate checkpoint cannot complete (e.g. other active readers), fall
1151        // back to a Passive checkpoint.
1152        match self.wal_checkpoint_mode(CheckpointMode::Truncate) {
1153            Ok(_) => Ok(()),
1154            Err(_) => {
1155                let _ = self.wal_checkpoint_mode(CheckpointMode::Passive);
1156                Ok(())
1157            }
1158        }
1159    }
1160
1161    pub fn wal_checkpoint(&self) -> CheckpointResult {
1162        let checkpoint_result: CheckpointResult;
1163        loop {
1164            match self.wal.borrow_mut().checkpoint(
1165                self,
1166                Rc::new(RefCell::new(0)),
1167                CheckpointMode::Passive,
1168            ) {
1169                Ok(CheckpointStatus::IO) => {
1170                    let _ = self.io.run_once();
1171                }
1172                Ok(CheckpointStatus::Done(res)) => {
1173                    checkpoint_result = res;
1174                    break;
1175                }
1176                Err(err) => panic!("error while clearing cache {}", err),
1177            }
1178        }
1179        // TODO: only clear cache of things that are really invalidated
1180        self.page_cache
1181            .write()
1182            .clear()
1183            .expect("Failed to clear page cache");
1184        checkpoint_result
1185    }
1186
1187    /// Run a blocking checkpoint in the given mode (used by close()/Drop and by
1188    /// `PRAGMA wal_checkpoint(<MODE>)` at the pager level). Unlike
1189    /// [`Self::wal_checkpoint`], this never panics and returns a `Result`.
1190    pub fn wal_checkpoint_mode(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
1191        let checkpoint_result;
1192        loop {
1193            match self
1194                .wal
1195                .borrow_mut()
1196                .checkpoint(self, Rc::new(RefCell::new(0)), mode)?
1197            {
1198                CheckpointStatus::IO => {
1199                    self.io.run_once()?;
1200                }
1201                CheckpointStatus::Done(res) => {
1202                    checkpoint_result = res;
1203                    break;
1204                }
1205            }
1206        }
1207        self.page_cache.write().clear().map_err(|_| {
1208            crate::error::LimboError::InternalError("failed to clear page cache".to_string())
1209        })?;
1210        Ok(checkpoint_result)
1211    }
1212
1213    // Providing a page is optional, if provided it will be used to avoid reading the page from disk.
1214    // This is implemented in accordance with sqlite freepage2() function.
1215    pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
1216        tracing::trace!("free_page(page_id={})", page_id);
1217        const TRUNK_PAGE_HEADER_SIZE: usize = 8;
1218        const LEAF_ENTRY_SIZE: usize = 4;
1219        const RESERVED_SLOTS: usize = 2;
1220
1221        const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
1222        const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
1223
1224        if page_id < 2 || page_id > self.db_header.lock().database_size as usize {
1225            return Err(LimboError::Corrupt(format!(
1226                "Invalid page number {} for free operation",
1227                page_id
1228            )));
1229        }
1230
1231        let page = match page {
1232            Some(page) => {
1233                assert_eq!(page.get().id, page_id, "Page id mismatch");
1234                page
1235            }
1236            None => self.read_page(page_id)?,
1237        };
1238
1239        self.db_header.lock().freelist_pages += 1;
1240
1241        let trunk_page_id = self.db_header.lock().freelist_trunk_page;
1242
1243        if trunk_page_id != 0 {
1244            // Add as leaf to current trunk
1245            let trunk_page = self.read_page(trunk_page_id as usize)?;
1246            let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
1247            let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
1248
1249            // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
1250            let max_free_list_entries = (self.usable_size() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
1251
1252            if number_of_leaf_pages < max_free_list_entries as u32 {
1253                trunk_page.set_dirty();
1254                self.add_dirty(trunk_page_id as usize);
1255
1256                trunk_page_contents
1257                    .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
1258                trunk_page_contents.write_u32(
1259                    TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
1260                    page_id as u32,
1261                );
1262                page.clear_uptodate();
1263                page.clear_loaded();
1264
1265                return Ok(());
1266            }
1267        }
1268
1269        // If we get here, need to make this page a new trunk
1270        page.set_dirty();
1271        self.add_dirty(page_id);
1272
1273        let contents = page.get().contents.as_mut().unwrap();
1274        // Point to previous trunk
1275        contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
1276        // Zero leaf count
1277        contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
1278        // Update page 1 to point to new trunk
1279        self.db_header.lock().freelist_trunk_page = page_id as u32;
1280        // Clear flags
1281        page.clear_uptodate();
1282        page.clear_loaded();
1283        Ok(())
1284    }
1285
1286    /*
1287        Gets a new page that increasing the size of the page or uses a free page.
1288        Currently free list pages are not yet supported.
1289    */
1290    // FIXME: handle no room in page cache
1291    #[allow(clippy::readonly_write_lock)]
1292    pub fn allocate_page(&self) -> Result<PageRef> {
1293        let header = &self.db_header;
1294        let mut header = header.lock();
1295        header.database_size += 1;
1296
1297        #[cfg(not(feature = "omit_autovacuum"))]
1298        {
1299            //  If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
1300            //  - autovacuum is enabled
1301            //  - the last page is a pointer map page
1302            if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
1303                && is_ptrmap_page(header.database_size, header.get_page_size() as usize)
1304            {
1305                let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
1306                page.set_dirty();
1307                self.add_dirty(page.get().id);
1308
1309                let page_key = PageCacheKey::new(page.get().id);
1310                let mut cache = self.page_cache.write();
1311                match cache.insert(page_key, page.clone()) {
1312                    Ok(_) => (),
1313                    Err(CacheError::Full) => return Err(LimboError::CacheFull),
1314                    Err(_) => {
1315                        return Err(LimboError::InternalError(
1316                            "Unknown error inserting page to cache".into(),
1317                        ))
1318                    }
1319                }
1320                header.database_size += 1;
1321            }
1322        }
1323
1324        // update database size
1325        self.write_database_header(&mut header)?;
1326
1327        // FIXME: should reserve page cache entry before modifying the database
1328        let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
1329        {
1330            // setup page and add to cache
1331            page.set_dirty();
1332            self.add_dirty(page.get().id);
1333
1334            let page_key = PageCacheKey::new(page.get().id);
1335            let mut cache = self.page_cache.write();
1336            match cache.insert(page_key, page.clone()) {
1337                Err(CacheError::Full) => Err(LimboError::CacheFull),
1338                Err(_) => Err(LimboError::InternalError(
1339                    "Unknown error inserting page to cache".into(),
1340                )),
1341                Ok(_) => Ok(page),
1342            }
1343        }
1344    }
1345
1346    pub fn update_dirty_loaded_page_in_cache(
1347        &self,
1348        id: usize,
1349        page: PageRef,
1350    ) -> Result<(), LimboError> {
1351        let mut cache = self.page_cache.write();
1352        let page_key = PageCacheKey::new(id);
1353
1354        // FIXME: use specific page key for writer instead of max frame, this will make readers not conflict
1355        assert!(page.is_dirty());
1356        cache
1357            .insert_ignore_existing(page_key, page.clone())
1358            .map_err(|e| {
1359                LimboError::InternalError(format!(
1360                    "Failed to insert loaded page {} into cache: {:?}",
1361                    id, e
1362                ))
1363            })?;
1364        page.set_loaded();
1365        Ok(())
1366    }
1367
1368    pub fn usable_size(&self) -> usize {
1369        let db_header = self.db_header.lock();
1370        (db_header.get_page_size() - db_header.reserved_space as u32) as usize
1371    }
1372}
1373
1374pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize) -> PageRef {
1375    let page = Arc::new(Page::new(page_id));
1376    {
1377        let buffer = buffer_pool.get();
1378        let bp = buffer_pool.clone();
1379        let drop_fn = Rc::new(move |buf| {
1380            bp.put(buf);
1381        });
1382        let buffer = Arc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
1383        page.set_loaded();
1384        page.get().contents = Some(PageContent::new(offset, buffer));
1385    }
1386    page
1387}
1388
1389#[derive(Debug)]
1390pub struct CreateBTreeFlags(pub u8);
1391impl CreateBTreeFlags {
1392    pub const TABLE: u8 = 0b0001;
1393    pub const INDEX: u8 = 0b0010;
1394
1395    pub fn new_table() -> Self {
1396        Self(CreateBTreeFlags::TABLE)
1397    }
1398
1399    pub fn new_index() -> Self {
1400        Self(CreateBTreeFlags::INDEX)
1401    }
1402
1403    pub fn is_table(&self) -> bool {
1404        (self.0 & CreateBTreeFlags::TABLE) != 0
1405    }
1406
1407    pub fn is_index(&self) -> bool {
1408        (self.0 & CreateBTreeFlags::INDEX) != 0
1409    }
1410
1411    pub fn get_flags(&self) -> u8 {
1412        self.0
1413    }
1414}
1415
1416/*
1417** The pointer map is a lookup table that identifies the parent page for
1418** each child page in the database file.  The parent page is the page that
1419** contains a pointer to the child.  Every page in the database contains
1420** 0 or 1 parent pages. Each pointer map entry consists of a single byte 'type'
1421** and a 4 byte parent page number.
1422**
1423** The PTRMAP_XXX identifiers below are the valid types.
1424**
1425** The purpose of the pointer map is to facilitate moving pages from one
1426** position in the file to another as part of autovacuum.  When a page
1427** is moved, the pointer in its parent must be updated to point to the
1428** new location.  The pointer map is used to locate the parent page quickly.
1429**
1430** PTRMAP_ROOTPAGE: The database page is a root-page. The page-number is not
1431**                  used in this case.
1432**
1433** PTRMAP_FREEPAGE: The database page is an unused (free) page. The page-number
1434**                  is not used in this case.
1435**
1436** PTRMAP_OVERFLOW1: The database page is the first page in a list of
1437**                   overflow pages. The page number identifies the page that
1438**                   contains the cell with a pointer to this overflow page.
1439**
1440** PTRMAP_OVERFLOW2: The database page is the second or later page in a list of
1441**                   overflow pages. The page-number identifies the previous
1442**                   page in the overflow page list.
1443**
1444** PTRMAP_BTREE: The database page is a non-root btree page. The page number
1445**               identifies the parent page in the btree.
1446*/
1447#[cfg(not(feature = "omit_autovacuum"))]
1448mod ptrmap {
1449    use crate::{storage::sqlite3_ondisk::MIN_PAGE_SIZE, LimboError, Result};
1450
1451    // Constants
1452    pub const PTRMAP_ENTRY_SIZE: usize = 5;
1453    /// Page 1 is the schema page which contains the database header.
1454    /// Page 2 is the first pointer map page if the database has any pointer map pages.
1455    pub const FIRST_PTRMAP_PAGE_NO: u32 = 2;
1456
1457    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1458    #[repr(u8)]
1459    pub enum PtrmapType {
1460        RootPage = 1,
1461        FreePage = 2,
1462        Overflow1 = 3,
1463        Overflow2 = 4,
1464        BTreeNode = 5,
1465    }
1466
1467    impl PtrmapType {
1468        pub fn from_u8(value: u8) -> Option<Self> {
1469            match value {
1470                1 => Some(PtrmapType::RootPage),
1471                2 => Some(PtrmapType::FreePage),
1472                3 => Some(PtrmapType::Overflow1),
1473                4 => Some(PtrmapType::Overflow2),
1474                5 => Some(PtrmapType::BTreeNode),
1475                _ => None,
1476            }
1477        }
1478    }
1479
1480    #[derive(Debug, Clone, Copy)]
1481    pub struct PtrmapEntry {
1482        pub entry_type: PtrmapType,
1483        pub parent_page_no: u32,
1484    }
1485
1486    impl PtrmapEntry {
1487        pub fn serialize(&self, buffer: &mut [u8]) -> Result<()> {
1488            if buffer.len() < PTRMAP_ENTRY_SIZE {
1489                return Err(LimboError::InternalError(format!(
1490                "Buffer too small to serialize ptrmap entry. Expected at least {} bytes, got {}",
1491                PTRMAP_ENTRY_SIZE,
1492                buffer.len()
1493            )));
1494            }
1495            buffer[0] = self.entry_type as u8;
1496            buffer[1..5].copy_from_slice(&self.parent_page_no.to_be_bytes());
1497            Ok(())
1498        }
1499
1500        pub fn deserialize(buffer: &[u8]) -> Option<Self> {
1501            if buffer.len() < PTRMAP_ENTRY_SIZE {
1502                return None;
1503            }
1504            let entry_type_u8 = buffer[0];
1505            let parent_bytes_slice = buffer.get(1..5)?;
1506            let parent_page_no = u32::from_be_bytes(parent_bytes_slice.try_into().ok()?);
1507            PtrmapType::from_u8(entry_type_u8).map(|entry_type| PtrmapEntry {
1508                entry_type,
1509                parent_page_no,
1510            })
1511        }
1512    }
1513
1514    /// Calculates how many database pages are mapped by a single pointer map page.
1515    /// This is based on the total page size, as ptrmap pages are filled with entries.
1516    pub fn entries_per_ptrmap_page(page_size: usize) -> usize {
1517        assert!(page_size >= MIN_PAGE_SIZE as usize);
1518        page_size / PTRMAP_ENTRY_SIZE
1519    }
1520
1521    /// Calculates the cycle length of pointer map pages
1522    /// The cycle length is the number of database pages that are mapped by a single pointer map page.
1523    pub fn ptrmap_page_cycle_length(page_size: usize) -> usize {
1524        assert!(page_size >= MIN_PAGE_SIZE as usize);
1525        (page_size / PTRMAP_ENTRY_SIZE) + 1
1526    }
1527
1528    /// Determines if a given page number `db_page_no` (1-indexed) is a pointer map page in a database with autovacuum enabled
1529    pub fn is_ptrmap_page(db_page_no: u32, page_size: usize) -> bool {
1530        //  The first page cannot be a ptrmap page because its for the schema
1531        if db_page_no == 1 {
1532            return false;
1533        }
1534        if db_page_no == FIRST_PTRMAP_PAGE_NO {
1535            return true;
1536        }
1537        return get_ptrmap_page_no_for_db_page(db_page_no, page_size) == db_page_no;
1538    }
1539
1540    /// Calculates which pointer map page (1-indexed) contains the entry for `db_page_no_to_query` (1-indexed).
1541    /// `db_page_no_to_query` is the page whose ptrmap entry we are interested in.
1542    pub fn get_ptrmap_page_no_for_db_page(db_page_no_to_query: u32, page_size: usize) -> u32 {
1543        let group_size = ptrmap_page_cycle_length(page_size) as u32;
1544        if group_size == 0 {
1545            panic!("Page size too small, a ptrmap page cannot map any db pages.");
1546        }
1547
1548        let effective_page_index = db_page_no_to_query - FIRST_PTRMAP_PAGE_NO;
1549        let group_idx = effective_page_index / group_size;
1550
1551        (group_idx * group_size) + FIRST_PTRMAP_PAGE_NO
1552    }
1553
1554    /// Calculates the byte offset of the entry for `db_page_no_to_query` (1-indexed)
1555    /// within its pointer map page (`ptrmap_page_no`, 1-indexed).
1556    pub fn get_ptrmap_offset_in_page(
1557        db_page_no_to_query: u32,
1558        ptrmap_page_no: u32,
1559        page_size: usize,
1560    ) -> Result<usize> {
1561        // The data pages mapped by `ptrmap_page_no` are:
1562        // `ptrmap_page_no + 1`, `ptrmap_page_no + 2`, ..., up to `ptrmap_page_no + n_data_pages_per_group`.
1563        // `db_page_no_to_query` must be one of these.
1564        // The 0-indexed position of `db_page_no_to_query` within this sequence of data pages is:
1565        // `db_page_no_to_query - (ptrmap_page_no + 1)`.
1566
1567        let n_data_pages_per_group = entries_per_ptrmap_page(page_size);
1568        let first_data_page_mapped = ptrmap_page_no + 1;
1569        let last_data_page_mapped = ptrmap_page_no + n_data_pages_per_group as u32;
1570
1571        if db_page_no_to_query < first_data_page_mapped
1572            || db_page_no_to_query > last_data_page_mapped
1573        {
1574            return Err(LimboError::InternalError(format!(
1575                "Page {} is not mapped by the data page range [{}, {}] of ptrmap page {}",
1576                db_page_no_to_query, first_data_page_mapped, last_data_page_mapped, ptrmap_page_no
1577            )));
1578        }
1579        if is_ptrmap_page(db_page_no_to_query, page_size) {
1580            return Err(LimboError::InternalError(format!(
1581                "Page {} is a pointer map page and should not have an entry calculated this way.",
1582                db_page_no_to_query
1583            )));
1584        }
1585
1586        let entry_index_on_page = (db_page_no_to_query - first_data_page_mapped) as usize;
1587        Ok(entry_index_on_page * PTRMAP_ENTRY_SIZE)
1588    }
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593    use std::sync::Arc;
1594
1595    use parking_lot::RwLock;
1596
1597    use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey};
1598
1599    use super::Page;
1600
1601    #[test]
1602    fn test_shared_cache() {
1603        // ensure cache can be shared between threads
1604        let cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
1605
1606        let thread = {
1607            let cache = cache.clone();
1608            std::thread::spawn(move || {
1609                let mut cache = cache.write();
1610                let page_key = PageCacheKey::new(1);
1611                cache.insert(page_key, Arc::new(Page::new(1))).unwrap();
1612            })
1613        };
1614        let _ = thread.join();
1615        let mut cache = cache.write();
1616        let page_key = PageCacheKey::new(1);
1617        let page = cache.get(&page_key);
1618        assert_eq!(page.unwrap().get().id, 1);
1619    }
1620}
1621
1622#[cfg(test)]
1623#[cfg(not(feature = "omit_autovacuum"))]
1624mod ptrmap_tests {
1625    use std::cell::RefCell;
1626    use std::rc::Rc;
1627    use std::sync::Arc;
1628
1629    use super::ptrmap::*;
1630    use super::*;
1631    use crate::fast_lock::SpinLock;
1632    use crate::io::{MemoryIO, OpenFlags, IO};
1633    use crate::storage::buffer_pool::BufferPool;
1634    use crate::storage::database::{DatabaseFile, DatabaseStorage};
1635    use crate::storage::page_cache::DumbLruPageCache;
1636    use crate::storage::pager::Pager;
1637    use crate::storage::sqlite3_ondisk::DatabaseHeader;
1638    use crate::storage::sqlite3_ondisk::MIN_PAGE_SIZE;
1639    use crate::storage::wal::{WalFile, WalFileShared};
1640
1641    // Helper to create a Pager for testing
1642    fn test_pager_setup(page_size: u32, initial_db_pages: u32) -> Pager {
1643        let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
1644        let db_file_raw = io.open_file("test.db", OpenFlags::Create, true).unwrap();
1645        let db_storage: Arc<dyn DatabaseStorage> = Arc::new(DatabaseFile::new(db_file_raw));
1646
1647        //  Initialize a minimal header in autovacuum mode
1648        let mut header_data = DatabaseHeader::default();
1649        header_data.update_page_size(page_size);
1650        let db_header_arc = Arc::new(SpinLock::new(header_data));
1651        db_header_arc.lock().vacuum_mode_largest_root_page = 1;
1652
1653        //  Construct interfaces for the pager
1654        let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
1655        let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
1656            (initial_db_pages + 10) as usize,
1657        )));
1658
1659        let wal = Rc::new(RefCell::new(WalFile::new(
1660            io.clone(),
1661            page_size,
1662            WalFileShared::open_shared(&io, "test.db-wal", page_size).unwrap(),
1663            buffer_pool.clone(),
1664        )));
1665
1666        let pager = Pager::finish_open(db_header_arc, db_storage, wal, io, page_cache, buffer_pool)
1667            .unwrap();
1668        pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
1669
1670        //  Allocate all the pages as btree root pages
1671        for _ in 0..initial_db_pages {
1672            match pager.btree_create(&CreateBTreeFlags::new_table()) {
1673                Ok(CursorResult::Ok(_root_page_id)) => (),
1674                Ok(CursorResult::IO) => {
1675                    panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly");
1676                }
1677                Err(e) => {
1678                    panic!("test_pager_setup: btree_create failed: {:?}", e);
1679                }
1680            }
1681        }
1682
1683        return pager;
1684    }
1685
1686    #[test]
1687    fn test_ptrmap_page_allocation() {
1688        let page_size = 4096;
1689        let initial_db_pages = 10;
1690        let pager = test_pager_setup(page_size, initial_db_pages);
1691
1692        // Page 5 should be mapped by ptrmap page 2.
1693        let db_page_to_update: u32 = 5;
1694        let expected_ptrmap_pg_no =
1695            get_ptrmap_page_no_for_db_page(db_page_to_update, page_size as usize);
1696        assert_eq!(expected_ptrmap_pg_no, FIRST_PTRMAP_PAGE_NO);
1697
1698        //  Ensure the pointer map page ref is created and loadable via the pager
1699        let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as usize);
1700        assert!(ptrmap_page_ref.is_ok());
1701
1702        //  Ensure that the database header size is correctly reflected
1703        assert_eq!(pager.db_header.lock().database_size, initial_db_pages + 2); // (1+1) -> (header + ptrmap)
1704
1705        //  Read the entry from the ptrmap page and verify it
1706        let entry = pager.ptrmap_get(db_page_to_update).unwrap();
1707        assert!(matches!(entry, CursorResult::Ok(Some(_))));
1708        let CursorResult::Ok(Some(entry)) = entry else {
1709            panic!("entry is not Some");
1710        };
1711        assert_eq!(entry.entry_type, PtrmapType::RootPage);
1712        assert_eq!(entry.parent_page_no, 0);
1713    }
1714
1715    #[test]
1716    fn test_is_ptrmap_page_logic() {
1717        let page_size = MIN_PAGE_SIZE as usize;
1718        let n_data_pages = entries_per_ptrmap_page(page_size);
1719        assert_eq!(n_data_pages, 102); //   512/5 = 102
1720
1721        assert!(!is_ptrmap_page(1, page_size)); // Header
1722        assert!(is_ptrmap_page(2, page_size)); // P0
1723        assert!(!is_ptrmap_page(3, page_size)); // D0_1
1724        assert!(!is_ptrmap_page(4, page_size)); // D0_2
1725        assert!(!is_ptrmap_page(5, page_size)); // D0_3
1726        assert!(is_ptrmap_page(105, page_size)); // P1
1727        assert!(!is_ptrmap_page(106, page_size)); // D1_1
1728        assert!(!is_ptrmap_page(107, page_size)); // D1_2
1729        assert!(!is_ptrmap_page(108, page_size)); // D1_3
1730        assert!(is_ptrmap_page(208, page_size)); // P2
1731    }
1732
1733    #[test]
1734    fn test_get_ptrmap_page_no() {
1735        let page_size = MIN_PAGE_SIZE as usize; // Maps 103 data pages
1736
1737        // Test pages mapped by P0 (page 2)
1738        assert_eq!(get_ptrmap_page_no_for_db_page(3, page_size), 2); // D(3) -> P0(2)
1739        assert_eq!(get_ptrmap_page_no_for_db_page(4, page_size), 2); // D(4) -> P0(2)
1740        assert_eq!(get_ptrmap_page_no_for_db_page(5, page_size), 2); // D(5) -> P0(2)
1741        assert_eq!(get_ptrmap_page_no_for_db_page(104, page_size), 2); // D(104) -> P0(2)
1742
1743        assert_eq!(get_ptrmap_page_no_for_db_page(105, page_size), 105); // Page 105 is a pointer map page.
1744
1745        // Test pages mapped by P1 (page 6)
1746        assert_eq!(get_ptrmap_page_no_for_db_page(106, page_size), 105); // D(106) -> P1(105)
1747        assert_eq!(get_ptrmap_page_no_for_db_page(107, page_size), 105); // D(107) -> P1(105)
1748        assert_eq!(get_ptrmap_page_no_for_db_page(108, page_size), 105); // D(108) -> P1(105)
1749
1750        assert_eq!(get_ptrmap_page_no_for_db_page(208, page_size), 208); // Page 208 is a pointer map page.
1751    }
1752
1753    #[test]
1754    fn test_get_ptrmap_offset() {
1755        let page_size = MIN_PAGE_SIZE as usize; //  Maps 103 data pages
1756
1757        assert_eq!(get_ptrmap_offset_in_page(3, 2, page_size).unwrap(), 0);
1758        assert_eq!(
1759            get_ptrmap_offset_in_page(4, 2, page_size).unwrap(),
1760            1 * PTRMAP_ENTRY_SIZE
1761        );
1762        assert_eq!(
1763            get_ptrmap_offset_in_page(5, 2, page_size).unwrap(),
1764            2 * PTRMAP_ENTRY_SIZE
1765        );
1766
1767        //  P1 (page 105) maps D(106)...D(207)
1768        // D(106) is index 0 on P1. Offset 0.
1769        // D(107) is index 1 on P1. Offset 5.
1770        // D(108) is index 2 on P1. Offset 10.
1771        assert_eq!(get_ptrmap_offset_in_page(106, 105, page_size).unwrap(), 0);
1772        assert_eq!(
1773            get_ptrmap_offset_in_page(107, 105, page_size).unwrap(),
1774            1 * PTRMAP_ENTRY_SIZE
1775        );
1776        assert_eq!(
1777            get_ptrmap_offset_in_page(108, 105, page_size).unwrap(),
1778            2 * PTRMAP_ENTRY_SIZE
1779        );
1780    }
1781}