Skip to main content

obj_core/pager/
mod.rs

1//! Pager (L1) — fixed-size page allocator, freelist, bounded LRU
2//! cache, and the WAL-aware write path.
3//!
4//! The pager is the lowest-level component that handles named pages:
5//! [`PageId`]s are non-zero `u64`s, page bodies are exactly
6//! [`PAGE_SIZE`] bytes. The pager hides the difference between a
7//! file-backed database and an in-memory one (`Pager::memory`) so
8//! higher layers see a single uniform interface.
9//!
10//! # Power-of-ten posture
11//!
12//! - **Rule 2.** Every loop is bounded by the page count or the cache
13//!   capacity — both are integers known at function entry.
14//! - **Rule 3.** No heap allocation occurs on the read hot path: cache
15//!   frames are allocated at [`Pager::open`] and reused. The freelist
16//!   walks a single page at a time using stack-only state.
17//! - **Rule 7.** No `unwrap`/`expect` in production code paths.
18//!   Internal invariant violations use `debug_assert!` (Rule 5).
19//! - **Rule 8.** All syscalls go through [`crate::platform`]; this
20//!   module is `#![forbid(unsafe_code)]`.
21
22#![forbid(unsafe_code)]
23
24pub mod cache;
25pub mod checksum;
26pub mod freelist;
27pub mod header;
28pub mod page;
29
30use std::collections::HashMap;
31use std::path::{Path, PathBuf};
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::{Arc, Mutex};
34
35use serde::{Deserialize, Serialize};
36
37use crate::error::{Error, Result};
38use crate::pager::cache::{Cache, Evicted};
39use crate::pager::checksum::{
40    page_trailer_valid, page_trailer_valid_v1, write_page_trailer, write_page_trailer_v1,
41};
42use crate::pager::freelist::{
43    decode as decode_freelist_page, encode as encode_freelist_page, FreeListPage,
44};
45use crate::pager::header::{
46    decode_header, encode_header, FileHeader, FEATURE_FLAG_COMPRESSION, FEATURE_FLAG_ENCRYPTION,
47};
48use crate::pager::page::{Page, PageId, ENCRYPTION_OVERHEAD, PAGE_SIZE, PAGE_TRAILER_SIZE};
49use crate::platform::{FileBackend, FileHandle, SyncMode};
50use crate::wal::{Lsn, Wal, WalConfig};
51
52pub use crate::pager::page::PAGE_SIZE as PAGER_PAGE_SIZE;
53
54/// A borrowed view of a cached or WAL-staged page.
55///
56/// `PageRef` is the return type of [`Pager::read_page`]. It carries a
57/// shared reference to the page's bytes that lives no longer than
58/// the immutable borrow of the pager that produced it — so the
59/// borrow checker forbids holding a `PageRef` across any mutating
60/// call on the same pager (write, commit, checkpoint, alloc, free).
61///
62/// # Allocation contract
63///
64/// Construction of a `PageRef` performs **no heap allocation** on
65/// cache hits or WAL-overlay hits: the bytes are already resident in
66/// memory and `PageRef` is a thin wrapper around a `&[u8; PAGE_SIZE]`.
67/// On a cache miss the read-through path issues one `pread` and
68/// inserts the page into the cache, after which the `PageRef`
69/// borrows that cache frame.
70///
71/// # Examples
72///
73/// ```no_run
74/// # use obj_core::pager::{Config, Pager};
75/// # use obj_core::pager::page::PageId;
76/// # fn id(n: u64) -> PageId { PageId::new(n).unwrap() }
77/// # let mut p = Pager::memory(Config::default()).unwrap();
78/// # let a = p.alloc_page().unwrap();
79/// # let _ = p.commit().unwrap();
80/// let view = p.read_page(a)?;
81/// let header_byte = view.as_bytes()[0];
82/// // `view` must be dropped (or moved) before the next p.write_page(...).
83/// # Ok::<(), obj_core::Error>(())
84/// ```
85#[derive(Debug)]
86pub struct PageRef<'a> {
87    page_id: PageId,
88    bytes: &'a Page,
89}
90
91impl<'a> PageRef<'a> {
92    fn new(page_id: PageId, bytes: &'a Page) -> Self {
93        Self { page_id, bytes }
94    }
95
96    /// The page id this view was read for.
97    #[must_use]
98    pub fn page_id(&self) -> PageId {
99        self.page_id
100    }
101
102    /// The page's raw bytes. Includes the per-page CRC32C trailer
103    /// in its last [`PAGE_TRAILER_SIZE`]
104    /// bytes; callers that want only the payload should slice off
105    /// the trailer themselves.
106    #[must_use]
107    pub fn as_bytes(&self) -> &'a [u8; PAGE_SIZE] {
108        self.bytes.as_bytes()
109    }
110
111    /// Clone the underlying page into a fresh owned [`Page`]. This
112    /// allocates; only call it when an owned buffer is genuinely
113    /// required (e.g. to mutate before a subsequent `write_page`).
114    #[must_use]
115    pub fn to_owned_page(&self) -> Page {
116        self.bytes.clone()
117    }
118}
119
120/// Snapshot of the page-0 header fields that the M5 catalog +
121/// freelist code mutates directly (not through the WAL).
122///
123/// Returned by [`Pager::header_snapshot`] at txn-begin; passed back
124/// to [`Pager::restore_header_snapshot`] on rollback so the on-disk
125/// header is rewound alongside the WAL pending buffer.  M5 wrote
126/// these fields direct-to-disk for performance; the M6 transaction
127/// layer compensates by snapshotting them whenever a `WriteTxn`
128/// might roll back.
129#[derive(Debug, Clone)]
130pub struct HeaderSnapshot {
131    /// Catalog B-tree root page id captured at snapshot time.
132    pub root_catalog: u64,
133    /// Freelist head page id captured at snapshot time.
134    pub freelist_head: u64,
135    /// File page count captured at snapshot time.  Used so an
136    /// alloc-then-rollback does not leak the appended page.
137    pub page_count: u64,
138    /// WAL "committed view" snapshot.  See
139    /// [`Pager::header_snapshot`] for the rationale — `free_page`
140    /// removes per-page entries from the live view, and the
141    /// rollback path needs to put them back.
142    ///
143    /// #80: pages are held behind `Arc` so cloning this map (in the
144    /// rollback snapshot path) is a set of refcount bumps rather than
145    /// per-page 4 KiB memcpys, matching the live committed view.
146    pub view: HashMap<PageId, Arc<Page>>,
147}
148
149/// Opaque identifier for a single live MVCC reader snapshot.
150///
151/// The id is monotonic per-pager; the value is otherwise meaningless
152/// to callers — it only exists so the (private) `SnapshotPin` RAII
153/// guard can deregister the right entry from the pager's live-
154/// snapshots map when it drops.
155///
156/// `SnapshotId` is a `#[repr(transparent)]` newtype over `u64` (Power-
157/// of-Ten Rule 5). The serde encoding is `#[serde(transparent)]` so
158/// the bytes are identical to the bare `u64`; today `SnapshotId` is
159/// purely in-memory, but the transparent encoding preserves wire
160/// compatibility for any future diagnostics record that names it.
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
162#[repr(transparent)]
163#[serde(transparent)]
164pub struct SnapshotId(u64);
165
166impl SnapshotId {
167    /// Construct a [`SnapshotId`] from a raw `u64`. Total function —
168    /// any `u64` (including `0`) is a valid snapshot id.
169    #[must_use]
170    pub const fn new(raw: u64) -> Self {
171        Self(raw)
172    }
173
174    /// The raw id. Exposed for diagnostics.
175    #[must_use]
176    pub const fn get(self) -> u64 {
177        self.0
178    }
179}
180
181/// RAII handle that keeps a [`ReaderSnapshot`]'s pinned LSN in the
182/// pager's live-snapshots map until the snapshot is dropped.
183///
184/// On drop, the pin removes itself from the map so checkpoint can
185/// proceed.  A poisoned mutex on drop is silently ignored (drop
186/// must never panic — Rule 5).
187#[derive(Debug)]
188struct SnapshotPin {
189    id: SnapshotId,
190    map: Arc<Mutex<HashMap<SnapshotId, Lsn>>>,
191}
192
193impl Drop for SnapshotPin {
194    fn drop(&mut self) {
195        if let Ok(mut guard) = self.map.lock() {
196            guard.remove(&self.id);
197        }
198    }
199}
200
201/// Owning handle to a page returned by [`ReaderSnapshot::read_page`].
202///
203/// #81: `read_page` used to return an owned `Page` (a 4 KiB body
204/// clone) on every call, including the common frozen-view hit. A
205/// point read descends catalog + primary and an index lookup descends
206/// two trees, so each op paid 3-5 such 4 KiB copies. `PageHandle`
207/// removes the copy on the hot path:
208///
209/// - [`PageHandle::Shared`] — frozen-view hit. Holds an
210///   `Arc<Page>` cloned from the snapshot's view (a refcount bump,
211///   **no** body copy). Sound because committed pages are immutable:
212///   a new version is a fresh `Arc` under the same `PageId`, never an
213///   in-place mutation of a shared `Arc` (see `frozen_view`).
214/// - [`PageHandle::Owned`] — disk / cache miss. Holds the freshly
215///   read, checksum-verified `Page` produced by the existing
216///   `read_main_file_page` / `read_cache_or_main` (`read_through`)
217///   path; integrity behaviour is unchanged.
218///
219/// Concrete enum, not `dyn` (Rule 9). Both arms are a single pointer
220/// (`Arc<Page>` and `Page`'s `Box<[u8; PAGE_SIZE]>`), so the variants
221/// are balanced and `clippy::large_enum_variant` does not fire.
222#[derive(Debug, Clone)]
223pub enum PageHandle {
224    /// Frozen-view hit — shares the snapshot's `Arc<Page>` with no
225    /// 4 KiB body clone.
226    Shared(Arc<Page>),
227    /// Disk / cache miss — owns the checksum-verified page bytes.
228    Owned(Page),
229}
230
231impl PageHandle {
232    /// Borrow the page's raw bytes for decoding, without copying.
233    ///
234    /// The match over both arms is total — no `Result`, no
235    /// `unwrap`/`expect` (Rule 7): every `PageHandle` always holds a
236    /// readable page in exactly one of its two arms. Callers that only
237    /// need to decode (e.g. `BTree::get_via_snapshot`,
238    /// `Catalog::lookup_via_snapshot`) should keep the `PageHandle`
239    /// alive for the duration of the borrow and call this.
240    #[must_use]
241    pub fn as_bytes(&self) -> &[u8; PAGE_SIZE] {
242        match self {
243            PageHandle::Shared(page) => page.as_bytes(),
244            PageHandle::Owned(page) => page.as_bytes(),
245        }
246    }
247
248    /// Consume the handle and produce an owned [`Page`].
249    ///
250    /// The `Shared` arm clones the body exactly once (only when an
251    /// owned page is genuinely required, e.g. the public
252    /// [`crate::ReadTxn::read_page`] whose pager lock guard cannot
253    /// outlive the call); the `Owned` arm moves with no copy. This is
254    /// the *only* place the hot-path frozen-view clone is paid, and
255    /// only for callers that ask for ownership.
256    #[must_use]
257    pub fn into_page(self) -> Page {
258        match self {
259            PageHandle::Shared(page) => (*page).clone(),
260            PageHandle::Owned(page) => page,
261        }
262    }
263}
264
265/// A reader-side MVCC snapshot of the database.
266///
267/// Captures the WAL end-LSN at construction and a clone of the
268/// pager's in-memory committed view.  Reads through the snapshot
269/// observe `main file ∪ WAL frames with LSN ≤ pinned_lsn`; pending
270/// writes from a concurrent `WriteTxn` are NEVER visible.
271///
272/// `ReaderSnapshot` is `Send` so the user-facing read transaction
273/// can run on any thread.  It is NOT `Clone` — each snapshot owns
274/// its own pin and cloning would double-register the pin entry.
275///
276/// Generic over `F: FileBackend` (Rule 9: no `dyn`); the production
277/// snapshot is `ReaderSnapshot<FileHandle>`.  Read calls take a
278/// `&mut Pager<F>` parameter so the borrow checker can prove that
279/// the cache mutation a cache-miss read performs cannot race the
280/// snapshot's own view; in practice the `Db` wraps the pager in a
281/// `Mutex` (M6 issue #47) and the snapshot's pin is independent of
282/// the mutex.
283#[derive(Debug)]
284pub struct ReaderSnapshot<F: FileBackend> {
285    pinned_lsn: Lsn,
286    /// Frozen WAL view captured at snapshot creation.  Lookups that
287    /// hit this map return the body the writer had committed by
288    /// `pinned_lsn`.  Misses fall through to `Pager::read_main_file_page`.
289    ///
290    /// #80: each page is an `Arc<Page>` so capturing this view at pin
291    /// time (`reader_snapshot`) clones the map as refcount bumps, not
292    /// per-page memcpys. MVCC isolation is unaffected: each snapshot
293    /// owns its OWN cloned map, and committed pages are immutable —
294    /// a new page version is a fresh `Arc` inserted under the same
295    /// `PageId`, never an in-place mutation of a shared `Arc`.
296    frozen_view: HashMap<PageId, Arc<Page>>,
297    /// M11 #92: optional snapshot of the WAL's committed page-0
298    /// (file header) frame at pin time. `None` means "the on-disk
299    /// header at offset 0 is authoritative" — no WAL-staged header
300    /// update sits in the committed view. Read-side users of the
301    /// snapshot do NOT consult this directly; the
302    /// [`crate::backup`] module uses it to reconstruct the
303    /// snapshot's view of the header bytes when materialising a
304    /// hot backup.
305    frozen_header: Option<Page>,
306    /// M6 #51: snapshot of the catalog B-tree root page-id at pin
307    /// time. Captured from the pager's committed-header view; a
308    /// concurrent writer that calls `set_root_catalog` will NOT
309    /// mutate this value, so reader threads can pass their pinned
310    /// root into a [`crate::Catalog`] opened against the snapshot.
311    root_catalog: u64,
312    /// Live-map registration; deregistered on drop.
313    pin: SnapshotPin,
314    _phantom: std::marker::PhantomData<fn() -> F>,
315}
316
317impl<F: FileBackend> ReaderSnapshot<F> {
318    /// The LSN this snapshot pinned at construction.  Reads through
319    /// the snapshot only observe WAL frames with `LSN <= pinned_lsn`.
320    #[must_use]
321    pub fn pinned_lsn(&self) -> Lsn {
322        self.pinned_lsn
323    }
324
325    /// Iterate over every `(PageId, &Page)` pair in the snapshot's
326    /// frozen WAL view — i.e. every WAL frame at `LSN <= pinned_lsn`
327    /// at the moment the snapshot was created. Used by M11 #92
328    /// (`Db::backup_to`) to overlay the snapshot's view onto a
329    /// freshly-copied main file.
330    pub fn frozen_pages(&self) -> impl Iterator<Item = (PageId, &Page)> + '_ {
331        // #80: yield `&Page` (not `&Arc<Page>`) so `Db::backup_to`
332        // keeps its borrow-based contract — deref the Arc to its body.
333        self.frozen_view
334            .iter()
335            .map(|(id, page)| (*id, page.as_ref()))
336    }
337
338    /// The WAL-staged page-0 (file header) frame at the snapshot's
339    /// pinned LSN, or `None` if no header frame sits in the
340    /// committed view. Used by M11 #92 (`Db::backup_to`) to
341    /// reconstruct the header bytes the snapshot would observe.
342    #[must_use]
343    pub fn frozen_header(&self) -> Option<&Page> {
344        self.frozen_header.as_ref()
345    }
346
347    /// Snapshot id.  Diagnostic-only.
348    #[must_use]
349    pub fn id(&self) -> SnapshotId {
350        self.pin.id
351    }
352
353    /// M6 #51: the catalog B-tree root page-id this snapshot
354    /// pinned. A concurrent `WriteTxn` that calls
355    /// [`crate::pager::Pager::set_root_catalog`] does NOT mutate the
356    /// value returned here — the snapshot is frozen at pin time.
357    /// Use this when constructing a read-side
358    /// [`crate::Catalog`] handle that should observe the catalog at
359    /// the snapshot's LSN.
360    #[must_use]
361    pub fn root_catalog(&self) -> u64 {
362        self.root_catalog
363    }
364
365    /// Read page `id` consistent with the snapshot's pin.
366    ///
367    /// Lookup order (file-backed pagers):
368    /// 1. Frozen view (WAL frames at LSN ≤ `pinned_lsn` at snapshot
369    ///    creation time).  Returns [`PageHandle::Shared`] — an
370    ///    `Arc::clone` (refcount bump, no 4 KiB body copy; #81).
371    /// 2. Main file via the pager (cache-bypassed; goes through
372    ///    `read_through` which verifies the page trailer).  Returns
373    ///    [`PageHandle::Owned`].
374    ///
375    /// On in-memory pagers (`Pager::memory`) there is no WAL and no
376    /// MVCC: the snapshot's `frozen_view` is always empty and the
377    /// in-memory backend buffer may lag the cache (dirty cache
378    /// frames have not yet been written back). For that mode the
379    /// snapshot falls through to the LIVE cache (then main backend);
380    /// the WAL overlay does not exist. No concurrent writer can
381    /// race a reader on a memory pager, so the live read is the
382    /// snapshot read.
383    ///
384    /// # Errors
385    ///
386    /// - [`Error::InvalidArgument`] if `id` is out of range.
387    /// - [`Error::Io`] on syscall failure during the main-file read.
388    /// - [`Error::Corruption`] if the on-disk page trailer fails to
389    ///   verify.
390    pub fn read_page(&self, pager: &Pager<F>, id: PageId) -> Result<PageHandle> {
391        if let Some(page) = self.frozen_view.get(&id) {
392            // #81: frozen-view hit shares the snapshot's `Arc<Page>`
393            // via a refcount bump — NO 4 KiB body clone. Explicit
394            // `Arc::clone` (clippy `clone_on_ref_ptr`). Sound because
395            // committed pages are immutable (see `frozen_view`).
396            return Ok(PageHandle::Shared(Arc::clone(page)));
397        }
398        // Disk / cache miss: route through the existing read path so
399        // every page loaded FROM DISK is still checksum-verified
400        // (`read_through` → `page_trailer_valid` / `decode_page_v1`;
401        // integrity unchanged). The owned `Page` it returns is wrapped
402        // in `PageHandle::Owned` with no extra copy.
403        if pager.is_memory_backed() {
404            return Ok(PageHandle::Owned(pager.read_cache_or_main(id)?));
405        }
406        // #91: a page id that the snapshot does NOT carry in its frozen
407        // view AND that the main file does not physically hold was
408        // allocated AFTER this snapshot's pin — its body rode a WAL
409        // commit later than `pinned_lsn` and has not been checkpointed
410        // (checkpoint defers while this pin is live). MVCC isolation
411        // requires the snapshot NOT observe it: return the page's
412        // pre-existence state — a zeroed body. Before #91 such a page
413        // was physically present on the main file as a zeroed slot
414        // (alloc wrote it directly), so the snapshot read returned zeros
415        // then too; this preserves that exact observable behaviour now
416        // that alloc no longer touches the main file. A page that IS
417        // physically present (id < physical high-water) is a checkpointed
418        // page the snapshot is entitled to see and is read normally.
419        if id.get() >= pager.main_physical_page_count()? {
420            return Ok(PageHandle::Owned(Page::zeroed()));
421        }
422        Ok(PageHandle::Owned(pager.read_main_file_page(id)?))
423    }
424}
425
426/// Default LRU cache size when [`Config::default`] is used. 64 frames
427/// = 256 KiB of cached pages, comfortably within an embedded budget.
428pub const DEFAULT_CACHE_FRAMES: usize = 64;
429
430// #91: `MAIN_EXTEND_BATCH` (the #86 per-alloc file-grow batch) is gone.
431// Fresh allocations no longer extend the main file at alloc time — they
432// ride the WAL and the file is grown ONCE per checkpoint in
433// `apply_checkpoint_view::grow_main_to_cover`, so there is no per-alloc
434// `set_len` left to batch.
435
436/// Phase 3 (issue #8): per-pager compression knob. Selects whether
437/// newly-created files use the transparent LZ4 page-compression
438/// layer (`format_minor = 1`, `feature_flags` bit 0 set) or stay at
439/// the original uncompressed `format_minor = 0` layout.
440///
441/// **No-op against existing files:** when a pager opens an
442/// already-initialised database, the file's own header dictates
443/// whether compression is in use; this knob only affects file
444/// **creation**.
445#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
446#[non_exhaustive]
447pub enum CompressionMode {
448    /// Default — newly-created files use `format_minor = 0` with
449    /// the full 32-bit CRC32C per-page trailer (no compression).
450    #[default]
451    Off,
452    /// Newly-created files use `format_minor = 1` with LZ4 page
453    /// compression. Requires the `compression` Cargo feature on
454    /// `obj-core` / `obj-db`. A build WITHOUT that feature refuses
455    /// to open any `format_minor >= 1` file with
456    /// [`Error::FormatFeatureUnsupported`].
457    Lz4,
458}
459
460/// Issue #31: storage type for the in-memory copy of the caller's
461/// 32-byte master key held inside [`Config`].
462///
463/// Under the `encryption` Cargo feature this is
464/// [`zeroize::Zeroizing<[u8; 32]>`], which wipes the bytes when the
465/// owning `Config` (and therefore this field) is dropped, so the
466/// master key does not linger in freed heap/stack. `Zeroizing<T>`
467/// is a transparent newtype: it derefs to `[u8; 32]`, implements
468/// `Clone` (when the inner type does — `[u8; 32]` is `Clone`), and
469/// — with the `zeroize/serde` feature — `Serialize`/`Deserialize`,
470/// so the public surface of `Config` is byte-for-byte equivalent to
471/// the previous `[u8; 32]` apart from the added drop-glue.
472///
473/// Without the feature it is a bare `[u8; 32]`: the no-`encryption`
474/// build cannot use a key (the open path rejects it with
475/// `Error::FormatFeatureUnsupported`), so there is no secret to
476/// wipe and the type stays `Copy` to preserve the previous
477/// behaviour exactly.
478#[cfg(feature = "encryption")]
479pub type MasterKeyBytes = zeroize::Zeroizing<[u8; 32]>;
480
481/// See [`MasterKeyBytes`] — no-`encryption` build (bare array, no
482/// secret material is ever stored so nothing to wipe).
483#[cfg(not(feature = "encryption"))]
484pub type MasterKeyBytes = [u8; 32];
485
486/// Issue #31: wrap raw 32-byte key material into the
487/// feature-dependent [`MasterKeyBytes`] storage type.
488///
489/// Under the `encryption` feature this hands the bytes to
490/// [`zeroize::Zeroizing`] so they wipe on drop; without the feature
491/// it is the identity on `[u8; 32]`. Centralising the wrap in one
492/// `#[cfg]`-split function keeps the call sites feature-agnostic and
493/// avoids a `clippy::useless_conversion` on the no-feature build
494/// (where a `From<[u8; 32]> for [u8; 32]` would be a no-op).
495#[cfg(feature = "encryption")]
496#[inline]
497#[allow(dead_code)] // Reachable only where a key is actually stored.
498pub(crate) fn wrap_master_key(bytes: [u8; 32]) -> MasterKeyBytes {
499    zeroize::Zeroizing::new(bytes)
500}
501
502/// See [`wrap_master_key`] — no-`encryption` build (identity).
503#[cfg(not(feature = "encryption"))]
504#[inline]
505#[allow(dead_code)] // Reachable only where a key is actually stored.
506pub(crate) fn wrap_master_key(bytes: [u8; 32]) -> MasterKeyBytes {
507    bytes
508}
509
510/// Pager construction options.
511///
512/// `Debug` is implemented manually so the `encryption_key` field —
513/// if present — never leaks into log output (it redacts to
514/// `"<set>"` or `"<not set>"`). The `Serialize`/`Deserialize` impls
515/// are auto-derived; serialising a `Config` with a key present
516/// will round-trip the bytes (callers must decide for themselves
517/// whether persisting that is safe).
518///
519/// Issue #31: `Copy` is derived only on the no-`encryption` build.
520/// Under the `encryption` feature the `encryption_key` field is a
521/// [`zeroize::Zeroizing`] that wipes the key bytes on drop; a type
522/// with drop-glue cannot be `Copy` (and `Copy` would defeat
523/// zeroization by allowing silent bitwise duplication of the key).
524#[cfg_attr(not(feature = "encryption"), derive(Copy))]
525#[derive(Clone, serde::Serialize, serde::Deserialize)]
526pub struct Config {
527    /// Number of cache frames. Must be at least 1.
528    pub cache_frames: usize,
529    /// Durability mode used by the WAL (M3). See [`SyncMode`].
530    pub sync_mode: SyncMode,
531    /// Maximum WAL file size in bytes. Default 64 MiB.
532    pub wal_size_limit: u64,
533    /// Frame count at which the pager auto-checkpoints. Default
534    /// 1 000.
535    pub checkpoint_threshold: u64,
536    /// Phase 3 (issue #8): page-compression mode. See
537    /// [`CompressionMode`]. Default `Off`.
538    pub compression_mode: CompressionMode,
539    /// Phase 4 (issue #9): caller-supplied 32-byte master key for
540    /// XChaCha20-Poly1305 page encryption. `None` (default) =
541    /// unencrypted; `Some(key)` = encrypted (new files stamp
542    /// `format_minor = 2` + `feature_flags` bit 1; existing files
543    /// must already be `format_minor = 2`).
544    ///
545    /// Stored as a raw `[u8; 32]`; the per-file page key is
546    /// derived via HKDF-SHA256(key, `kdf_salt`) at open time. Not
547    /// persisted on disk.
548    ///
549    /// Available regardless of the `encryption` Cargo feature so
550    /// that public APIs (and serde round-trips) remain consistent
551    /// across builds. Setting a key in a build without the
552    /// `encryption` feature causes `Pager::open` to fail with
553    /// `Error::FormatFeatureUnsupported { feature: "encryption" }`
554    /// when the file is encryption-capable.
555    ///
556    /// Issue #31: the inner type is [`MasterKeyBytes`] —
557    /// `Zeroizing<[u8; 32]>` under the `encryption` feature so the
558    /// bytes are wiped when this `Config` is dropped, or a bare
559    /// `[u8; 32]` otherwise. The wrapper derefs to `[u8; 32]`, so
560    /// reads via `.as_ref()` / `.as_deref()` are unchanged.
561    pub encryption_key: Option<MasterKeyBytes>,
562}
563
564impl std::fmt::Debug for Config {
565    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566        f.debug_struct("Config")
567            .field("cache_frames", &self.cache_frames)
568            .field("sync_mode", &self.sync_mode)
569            .field("wal_size_limit", &self.wal_size_limit)
570            .field("checkpoint_threshold", &self.checkpoint_threshold)
571            .field("compression_mode", &self.compression_mode)
572            .field(
573                "encryption_key",
574                if self.encryption_key.is_some() {
575                    &"<set>"
576                } else {
577                    &"<not set>"
578                },
579            )
580            .finish()
581    }
582}
583
584impl Default for Config {
585    fn default() -> Self {
586        Self {
587            cache_frames: DEFAULT_CACHE_FRAMES,
588            sync_mode: SyncMode::Full,
589            wal_size_limit: crate::wal::DEFAULT_WAL_SIZE_LIMIT,
590            checkpoint_threshold: crate::wal::DEFAULT_CHECKPOINT_THRESHOLD,
591            compression_mode: CompressionMode::Off,
592            encryption_key: None,
593        }
594    }
595}
596
597impl Config {
598    /// Set the cache capacity.
599    ///
600    /// # Errors
601    ///
602    /// Returns [`Error::InvalidArgument`] if `frames` is zero. The
603    /// cache requires at least one frame to make progress.
604    pub fn with_cache_frames(self, frames: usize) -> Result<Self> {
605        if frames == 0 {
606            return Err(Error::InvalidArgument("cache_frames must be >= 1"));
607        }
608        Ok(Self {
609            cache_frames: frames,
610            ..self
611        })
612    }
613
614    /// Set the durability mode the WAL uses for every commit.
615    #[must_use]
616    pub fn with_sync_mode(self, sync_mode: SyncMode) -> Self {
617        Self { sync_mode, ..self }
618    }
619
620    /// Set the WAL size cap in bytes.
621    #[must_use]
622    pub fn with_wal_size_limit(self, limit: u64) -> Self {
623        Self {
624            wal_size_limit: limit,
625            ..self
626        }
627    }
628
629    /// Set the auto-checkpoint frame threshold.
630    #[must_use]
631    pub fn with_checkpoint_threshold(self, frames: u64) -> Self {
632        Self {
633            checkpoint_threshold: frames,
634            ..self
635        }
636    }
637
638    /// Phase 3 (issue #8): set the per-pager compression mode for
639    /// new files. See [`CompressionMode`].
640    #[must_use]
641    pub fn with_compression_mode(self, mode: CompressionMode) -> Self {
642        Self {
643            compression_mode: mode,
644            ..self
645        }
646    }
647
648    /// Phase 4 (issue #9): set the caller's 32-byte master
649    /// encryption key. `None` clears any previously-set key. See
650    /// [`Config::encryption_key`].
651    #[must_use]
652    pub fn with_encryption_key(self, key: Option<[u8; 32]>) -> Self {
653        // Issue #31: wrap into `MasterKeyBytes` so the stored copy
654        // zeroizes on drop under the `encryption` feature. The public
655        // signature stays `Option<[u8; 32]>`; only the internal
656        // storage type changes. `wrap_master_key` is the reflexive
657        // identity on the no-feature (`[u8; 32]`) build and a
658        // `Zeroizing::new` on the encryption build.
659        Self {
660            encryption_key: key.map(wrap_master_key),
661            ..self
662        }
663    }
664
665    fn wal_config(&self) -> WalConfig {
666        WalConfig {
667            sync_mode: self.sync_mode,
668            size_limit: self.wal_size_limit,
669            checkpoint_threshold: self.checkpoint_threshold,
670        }
671    }
672
673    /// Issue #31: borrow the in-memory master key as a plain
674    /// `&[u8; 32]`, hiding the feature-dependent storage type
675    /// ([`MasterKeyBytes`]). On the `encryption` build the stored
676    /// value is a `Zeroizing<[u8; 32]>`, which derefs to the array;
677    /// on the no-feature build it is the array itself. Centralising
678    /// the borrow here means the open/derive call sites stay
679    /// identical across both builds and never name `Zeroizing`.
680    fn master_key(&self) -> Option<&[u8; 32]> {
681        // `Option::as_ref` -> `Option<&MasterKeyBytes>`; the closure
682        // coerces each variant to `&[u8; 32]` (deref on the encryption
683        // build, identity reborrow otherwise).
684        self.encryption_key.as_ref().map(|k| {
685            let bytes: &[u8; 32] = k;
686            bytes
687        })
688    }
689}
690
691/// The pager.
692///
693/// Owns the storage backend (file or in-memory), a [`FileHeader`]
694/// snapshot of page 0, a [`Cache`] of main-file pages, and (for
695/// file-backed databases) a [`Wal`] sidecar plus two in-memory
696/// overlays: a pending-transaction buffer and a committed-but-not-
697/// checkpointed view. All public methods take `&mut self`;
698/// concurrent access is the WAL's problem to grow into in M6.
699///
700/// Generic over `F: FileBackend` (Rule 9: hot-path dispatch is static
701/// monomorphisation, never `dyn`). The default is the production
702/// [`FileHandle`]; the fault-injection harness substitutes
703/// `Pager<FaultyFileHandle>` to drive recovery against torn writes,
704/// dropped fsyncs, and bit flips.
705#[derive(Debug)]
706pub struct Pager<F: FileBackend = FileHandle> {
707    backend: Backend<F>,
708    header: FileHeader,
709    cache: Cache,
710    /// WAL state. `None` for the in-memory pager (`Pager::memory`),
711    /// `Some` for any file-backed database.
712    wal: Option<WalState<F>>,
713    config: Config,
714    /// Live MVCC reader snapshots, keyed by an opaque snapshot id and
715    /// valued by the WAL LSN each one has pinned.  The map is
716    /// `Arc<Mutex<_>>` so the `SnapshotPin` RAII guard returned by
717    /// `reader_snapshot` can deregister itself from any thread.
718    /// Used by `checkpoint` to skip reclamation while a live reader
719    /// pins an LSN below end-of-WAL.  See M6 issue #45.
720    snapshots: Arc<Mutex<HashMap<SnapshotId, Lsn>>>,
721    /// Allocator for snapshot ids.  Monotonic; not reset across
722    /// pager lifetime.
723    next_snapshot_id: Arc<AtomicU64>,
724    /// Phase 4 (issue #9): derived per-file page-encryption key.
725    /// `Some` iff the file is encrypted AND the build has the
726    /// `encryption` Cargo feature AND the caller supplied a valid
727    /// master key. Computed once at open from
728    /// `HKDF-SHA256(user_key, header.kdf_salt,
729    /// b"obj-page-encryption-v1")` so the read/write hot path
730    /// doesn't re-derive per page. Redacted in `Debug`.
731    derived_key: Option<PageEncryptionKey>,
732    /// #86 / #91: high-water mark — the number of pages the **main file**
733    /// is physically extended to cover (page 0 plus `main_high_water - 1`
734    /// data slots). Only meaningful on the `Backend::File` arm; the
735    /// in-memory backend keeps a `Vec` sized exactly to `page_count`
736    /// and never consults this field.
737    ///
738    /// #91 decoupled this from `page_count` entirely: fresh allocations
739    /// ride the WAL and do NOT extend the main file, so between a
740    /// committed growing transaction and its next checkpoint the file is
741    /// physically SHORTER than `page_count` — `main_high_water <
742    /// page_count` is the normal state in that window. The slots in
743    /// `[main_high_water, page_count)` live only in the WAL view; every
744    /// read path resolves them from `pending`/`view` ahead of the main
745    /// file (`read_page` priority). [`Self::apply_checkpoint_view`] is
746    /// the sole place that grows the file (one bounded `set_len` to cover
747    /// the max drained `PageId`) and re-seeds this mark from the grown
748    /// length. The mark is in-memory only — it is NEVER written to disk.
749    ///
750    /// Its invariant: `file_length_for(mark - 1)` (for `mark >= 1`)
751    /// equals the true on-disk length, so a slot at index `< mark` is
752    /// guaranteed to physically exist. Seeded at `open` from the real
753    /// file length via [`Self::main_pages_for_len`].
754    main_high_water: u64,
755}
756
757/// Phase 4 (issue #9): newtype wrapper around the derived 32-byte
758/// page-encryption key. Manual `Debug` impl redacts the bytes so
759/// the key never appears in log output even if the caller dumps a
760/// `Pager`. The bytes are still accessible internally via
761/// [`PageEncryptionKey::as_bytes`].
762///
763/// On a no-`encryption`-feature build the type still exists (the
764/// `derived_key` field on `Pager` carries an `Option<_>` regardless
765/// of the feature) but is never read; the `#[allow(dead_code)]`
766/// reflects that the no-feature build sees only `None`.
767///
768/// Issue #31: the inner field is [`MasterKeyBytes`], so under the
769/// `encryption` feature the derived per-file page key is wiped from
770/// memory when the owning `Pager` (and therefore this value) is
771/// dropped. `Copy` is derived only on the no-`encryption` build,
772/// where the field is a bare `[u8; 32]` and never holds a real key.
773#[cfg_attr(not(feature = "encryption"), derive(Copy))]
774#[derive(Clone)]
775#[allow(dead_code)] // Field/method are read only under `feature = "encryption"`.
776struct PageEncryptionKey(MasterKeyBytes);
777
778#[allow(dead_code)] // Method is read only under `feature = "encryption"`.
779impl PageEncryptionKey {
780    #[inline]
781    fn as_bytes(&self) -> &[u8; 32] {
782        // Deref-coerce through `MasterKeyBytes` (`Zeroizing<[u8; 32]>`
783        // under the `encryption` feature, `[u8; 32]` otherwise) to a
784        // plain `&[u8; 32]` for the crypto hot path.
785        let bytes: &[u8; 32] = &self.0;
786        bytes
787    }
788}
789
790impl std::fmt::Debug for PageEncryptionKey {
791    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
792        f.write_str("PageEncryptionKey(<redacted>)")
793    }
794}
795
796#[derive(Debug)]
797struct WalState<F: FileBackend> {
798    wal: Wal<F>,
799    /// Pages staged for the current uncommitted transaction. Drained
800    /// into `view` on `commit`.
801    pending: HashMap<PageId, Page>,
802    /// Pages committed to the WAL but not yet checkpointed into the
803    /// main file. Populated by recovery (#15) and by every successful
804    /// `commit`; drained by `checkpoint` (#16) and by `flush` (in
805    /// this issue, as a backward-compatible alias for "make data
806    /// durable on the main file").
807    ///
808    /// #80: values are `Arc<Page>` so `reader_snapshot` / `header_
809    /// snapshot` capture the view as cheap refcount bumps instead of
810    /// 4 KiB-per-page memcpys. Each entry is replaced wholesale by a
811    /// fresh `Arc` on the next commit of that id (never mutated in
812    /// place), and the map is only touched via wholesale insert /
813    /// drain / full-replace — there is deliberately NO `get_mut` /
814    /// `entry` / index-assign on `view`, so an `Arc<Page>` shared
815    /// into a pinned snapshot can never be mutated under it.
816    view: HashMap<PageId, Arc<Page>>,
817    /// M6 #51: dirty flag for the file-header `root_catalog` slot.
818    /// Set by [`Pager::set_root_catalog`]; cleared on commit /
819    /// rollback. When set at commit time, the pager appends a
820    /// page-0 frame to the WAL carrying the current in-memory
821    /// encoded header so reader snapshots taken AFTER the commit
822    /// observe the new value AND a crash before checkpoint can
823    /// re-apply the header via replay.
824    header_dirty: bool,
825    /// M6 #51: committed-view of the header at offset 0. Holds the
826    /// encoded page-0 of the most-recent WAL frame that touched
827    /// the header (whether from recovery or from a runtime commit).
828    /// Drained into the main file by [`Pager::checkpoint`]. `None`
829    /// means "the on-disk header at offset 0 is authoritative" —
830    /// either no header frame has ever been committed, or the
831    /// most-recent checkpoint already wrote the staged copy out.
832    view_header: Option<Page>,
833    /// M6 #51: snapshot of the committed `root_catalog`. Mirrors
834    /// `Pager.header.root_catalog` but lags it by one txn — it
835    /// only advances on a successful [`Pager::commit`]. Reader
836    /// snapshots capture THIS value (not the live one) so a writer
837    /// mid-txn cannot leak its in-flight catalog root to readers
838    /// pinned at an earlier LSN.
839    committed_root_catalog: u64,
840    /// M6 #51: transaction depth — incremented by
841    /// [`Pager::begin_txn`] (called from
842    /// [`crate::txn::WriteTxn::begin`]) and decremented by
843    /// [`Pager::end_txn`]. Drives the [`Pager::in_txn`] helper that
844    /// the catalog debug-asserts at its mutation boundaries
845    /// (Rule 5).
846    txn_depth: u32,
847}
848
849/// Storage backend.
850#[derive(Debug)]
851enum Backend<F: FileBackend> {
852    /// File-backed database. `pread`/`pwrite` go through the
853    /// generic `F` (`FileHandle` in production, `FaultyFileHandle`
854    /// in fault-injection tests).
855    File(F),
856    /// Memory-backed database: one `Vec<u8>` of `page_count * PAGE_SIZE`.
857    Memory(Vec<u8>),
858}
859
860impl Pager<FileHandle> {
861    /// Open or create a database file at `path`. A new file is
862    /// initialised with a default [`FileHeader`] and no allocated
863    /// pages beyond page 0.
864    ///
865    /// Cache capacity is taken from `config`. The cache is allocated
866    /// before any read or write; subsequent operations never call
867    /// the global allocator on the cache hot path (Rule 3).
868    ///
869    /// At M3, opening a file-backed database also opens (or creates)
870    /// the WAL sidecar at `<path>-wal` and replays any committed-but-
871    /// not-checkpointed frames before any read can succeed. If no
872    /// WAL exists, or the existing WAL belongs to a previous
873    /// generation (salt mismatch), the database opens as if the WAL
874    /// were empty.
875    ///
876    /// # Errors
877    ///
878    /// - [`Error::InvalidArgument`] if `config.cache_frames == 0`.
879    /// - [`Error::Io`] if the file cannot be opened or initialised.
880    /// - [`Error::InvalidFormat`] if an existing main file does not
881    ///   look like an obj database, or if an existing WAL has a
882    ///   header that disagrees with the main file's format.
883    pub fn open<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
884        let main_path = path.as_ref().to_path_buf();
885        let main = FileHandle::open_or_create(&main_path)?;
886        let wal_path = wal_path_for(&main_path);
887        let wal = FileHandle::open_or_create(&wal_path)?;
888        Self::open_with_backends(main, wal, wal_path, config)
889    }
890
891    /// Construct a fresh in-memory pager. Cache capacity is taken from
892    /// `config`; the backing store starts at one page (the header).
893    /// The in-memory pager has no WAL — all writes go straight to the
894    /// in-memory buffer.
895    ///
896    /// # Errors
897    ///
898    /// Returns [`Error::InvalidArgument`] if `config.cache_frames` is
899    /// zero.
900    pub fn memory(config: Config) -> Result<Self> {
901        if config.cache_frames == 0 {
902            return Err(Error::InvalidArgument("cache_frames must be >= 1"));
903        }
904        refuse_compression_without_feature(config.compression_mode)?;
905        // Phase 4 (issue #9): in-memory pagers can also be
906        // encryption-capable. We don't strictly need at-rest
907        // protection for a `Vec<u8>` backing store, but supporting
908        // the knob means tests and applications can exercise the
909        // encryption code path without a temp file. Refuse the knob
910        // outright if the build lacks the feature.
911        refuse_encryption_without_feature(config.encryption_key.is_some())?;
912        let header = build_new_file_header(config.compression_mode, config.master_key())?;
913        let mut bytes = vec![0u8; PAGE_SIZE];
914        let mut p = Page::zeroed();
915        encode_header(&header, &mut p);
916        bytes[..PAGE_SIZE].copy_from_slice(p.as_bytes());
917        let derived_key = derive_key_for_open(&config, &header)?;
918        Ok(Self {
919            backend: Backend::Memory(bytes),
920            header,
921            cache: Cache::new(config.cache_frames),
922            wal: None,
923            config,
924            snapshots: Arc::new(Mutex::new(HashMap::new())),
925            next_snapshot_id: Arc::new(AtomicU64::new(1)),
926            derived_key,
927            // #86: unused on the in-memory backend (the `Vec` is sized
928            // exactly to `page_count`); kept at 0 for definiteness.
929            main_high_water: 0,
930        })
931    }
932}
933
934impl<F: FileBackend> Pager<F> {
935    /// Open a file-backed pager on top of caller-supplied backends.
936    ///
937    /// `main` is the database file; `wal` is the WAL sidecar at
938    /// `wal_path`. Both must already be open and writable. The WAL is
939    /// walked for recovery (#15 / #21) before any user-visible read
940    /// can succeed.
941    ///
942    /// Production callers SHOULD use [`Pager::open`]; the
943    /// fault-injection harness uses this entry point to drop a
944    /// `FaultyFileHandle` into the pager and a *separate* one into
945    /// the WAL.
946    ///
947    /// # Errors
948    ///
949    /// - [`Error::InvalidArgument`] if `config.cache_frames == 0`.
950    /// - [`Error::Io`] on any syscall failure.
951    /// - [`Error::InvalidFormat`] if the existing main file does not
952    ///   look like an obj database, or if the WAL header disagrees
953    ///   with the main file's format.
954    /// - [`Error::WalCorruption`] if the WAL contains a CRC-invalid
955    ///   frame before its last commit marker (#21).
956    pub fn open_with_backends(
957        main: F,
958        wal: F,
959        wal_path: std::path::PathBuf,
960        config: Config,
961    ) -> Result<Self> {
962        if config.cache_frames == 0 {
963            return Err(Error::InvalidArgument("cache_frames must be >= 1"));
964        }
965        refuse_compression_without_feature(config.compression_mode)?;
966        refuse_encryption_without_feature(config.encryption_key.is_some())?;
967        // Phase 4 (issue #9): for a brand-new file with a key in
968        // Config, we need to generate the kdf_salt up front and
969        // stamp it into the new header. For an existing file we
970        // load the header first, then derive the key against the
971        // on-disk salt.
972        let mut header = if main.is_empty()? {
973            initialise_file(&main, config.compression_mode, config.master_key())?
974        } else {
975            load_header(&main)?
976        };
977        refuse_unsupported_features(&header)?;
978        let derived_key = derive_key_for_open(&config, &header)?;
979        let (wal_state, recovered_view, view_header) = recover_or_create_wal(
980            &main,
981            wal,
982            wal_path,
983            &mut header,
984            &config,
985            derived_key.as_ref(),
986        )?;
987        // #80: recovery yields a plain `HashMap<PageId, Page>`; wrap each
988        // recovered body in its own `Arc<Page>` so the live committed
989        // view matches the `Arc<Page>` shape `reader_snapshot` clones
990        // cheaply. One-time cost at open, off every hot path.
991        let view: HashMap<PageId, Arc<Page>> = recovered_view
992            .into_iter()
993            .map(|(id, page)| (id, Arc::new(page)))
994            .collect();
995        let committed_root_catalog = header.root_catalog;
996        // #86: capture the file's REAL physical length (after recovery,
997        // which may have checkpointed pages out) so we can seed the
998        // high-water mark below via `main_pages_for_len`.
999        let file_len = main.len()?;
1000        let mut pager = Self {
1001            backend: Backend::File(main),
1002            header,
1003            cache: Cache::new(config.cache_frames),
1004            wal: Some(WalState {
1005                wal: wal_state,
1006                pending: HashMap::new(),
1007                view,
1008                header_dirty: false,
1009                view_header,
1010                committed_root_catalog,
1011                txn_depth: 0,
1012            }),
1013            config,
1014            snapshots: Arc::new(Mutex::new(HashMap::new())),
1015            next_snapshot_id: Arc::new(AtomicU64::new(1)),
1016            derived_key,
1017            // #86: seeded immediately below from the real file length;
1018            // the placeholder is never observed because the seeding
1019            // statement runs before the pager is returned.
1020            main_high_water: 0,
1021        };
1022        // #86: seed the high-water mark from the file's PHYSICAL size.
1023        // A partial trailing stride is floored, so a fresh `alloc_fresh`
1024        // rewrites it cleanly.
1025        //
1026        // #91: the file is now the CHECKPOINTED high-water, NOT a cover
1027        // of `page_count`. Recovery may set `page_count = N+1` from a WAL
1028        // header frame while the file is still N pages — the fresh page
1029        // that advanced `page_count` rode the WAL and is resident in the
1030        // recovered `view`, NOT on the main file. So the old
1031        // `main_high_water >= page_count` invariant is deliberately
1032        // RELAXED: a page id in `[main_high_water, page_count)` is the
1033        // normal between-commit-and-checkpoint state and MUST be
1034        // resolvable from the recovered WAL view (it cannot have been
1035        // lost — its WAL frame is exactly what advanced `page_count`).
1036        // `apply_checkpoint_view` grows the file and re-seeds this mark.
1037        pager.main_high_water = pager.main_pages_for_len(file_len);
1038        pager.debug_assert_recovered_pages_covered();
1039        Ok(pager)
1040    }
1041
1042    /// #91 (Rule 5): assert the post-recovery reachability invariant —
1043    /// every page id the recovered header claims (`1..page_count`) is
1044    /// resolvable, either because the main file physically covers it
1045    /// (`id < main_high_water`) or because the recovered WAL `view`
1046    /// carries its body. A page in `[main_high_water, page_count)` that
1047    /// is absent from the view would be genuinely lost (the header names
1048    /// a page neither the file nor the WAL can produce); the recovery
1049    /// contract guarantees this never happens, since the same WAL frame
1050    /// that advanced `page_count` also staged the page body. Bounded by
1051    /// `page_count` (Rule 2). Elided in release builds.
1052    fn debug_assert_recovered_pages_covered(&self) {
1053        #[cfg(debug_assertions)]
1054        {
1055            let Some(state) = self.wal.as_ref() else {
1056                return;
1057            };
1058            let mut id_raw = self.main_high_water.max(1);
1059            while id_raw < self.header.page_count {
1060                if let Some(pid) = PageId::new(id_raw) {
1061                    debug_assert!(
1062                        state.view.contains_key(&pid) || state.pending.contains_key(&pid),
1063                        "#91: recovered page {id_raw} beyond the physical \
1064                         high-water must be resident in the WAL view",
1065                    );
1066                }
1067                id_raw += 1;
1068            }
1069        }
1070    }
1071
1072    /// Total number of pages in the database, including page 0.
1073    #[must_use]
1074    pub fn page_count(&self) -> u64 {
1075        self.header.page_count
1076    }
1077
1078    /// #91: number of pages the main file is PHYSICALLY long enough to
1079    /// hold (page 0 plus `result - 1` data slots) — the real on-disk
1080    /// high-water. On the file backend this is computed from the live
1081    /// file length, NOT from `page_count`: a committed growing
1082    /// transaction advances `page_count` while its fresh pages still
1083    /// live only in the WAL view, so the file is SHORTER than
1084    /// `page_count` until the next checkpoint. The backup path
1085    /// ([`crate::backup`]) gates its main-file copy by THIS value and
1086    /// relies on `overlay_frozen_view` to fill the WAL-resident fresh
1087    /// pages — reading them off the (too-short) main file would
1088    /// `UnexpectedEof`. On the in-memory backend the `Vec` is sized
1089    /// exactly to `page_count`, so this returns `page_count`.
1090    ///
1091    /// # Errors
1092    ///
1093    /// Returns [`Error::Io`] if the file length cannot be queried.
1094    pub fn main_physical_page_count(&self) -> Result<u64> {
1095        match &self.backend {
1096            Backend::File(handle) => {
1097                let len = handle.len()?;
1098                Ok(self.main_pages_for_len(len))
1099            }
1100            Backend::Memory(_) => Ok(self.header.page_count),
1101        }
1102    }
1103
1104    /// On-disk page size in bytes (4096 at format major 0). Surfaced
1105    /// for the M12 `obj stat` CLI surface; callers who want the
1106    /// compile-time constant should reach for
1107    /// [`crate::pager::page::PAGE_SIZE`] directly.
1108    #[must_use]
1109    pub fn page_size(&self) -> u16 {
1110        self.header.page_size
1111    }
1112
1113    /// `(format_major, format_minor)` from the on-disk header.
1114    /// Surfaced for the M12 `obj stat` CLI surface so a forensic
1115    /// tool can confirm the file's format vintage without re-reading
1116    /// page 0.
1117    #[must_use]
1118    pub fn format_version(&self) -> (u16, u16) {
1119        (self.header.format_major, self.header.format_minor)
1120    }
1121
1122    /// The current freelist head (`0` = empty). Useful for tests.
1123    #[must_use]
1124    pub fn freelist_head(&self) -> u64 {
1125        self.header.freelist_head
1126    }
1127
1128    /// The catalog B-tree root page-id, or `0` if no catalog has yet
1129    /// been installed. The catalog (M5) uses this field to bootstrap
1130    /// on first open; older `format_minor = 0` databases (M2..M4)
1131    /// always carry zero here.
1132    #[must_use]
1133    pub fn root_catalog(&self) -> u64 {
1134        self.header.root_catalog
1135    }
1136
1137    /// Update the catalog B-tree root page-id and persist the change
1138    /// in the file header.
1139    ///
1140    /// The catalog (M5 issue #38) calls this exactly once per
1141    /// `open_or_init` when it allocates a fresh empty catalog root,
1142    /// and on every catalog mutation that produces a new root via
1143    /// the B+tree's copy-on-write contract.
1144    ///
1145    /// As of M6 issue #51 the header update is **WAL-staged** on
1146    /// file-backed pagers: the call records the new value in the
1147    /// in-memory header (so subsequent reads from this writer see
1148    /// it) AND stages the encoded page-0 into the current WAL
1149    /// transaction. The on-disk header at offset 0 is NOT touched
1150    /// until checkpoint; reader snapshots therefore see the
1151    /// pre-commit value of `root_catalog` (whichever value the
1152    /// committed WAL view held at snapshot time). For in-memory
1153    /// pagers the call still writes the header into the in-memory
1154    /// backend buffer immediately (no WAL exists).
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns [`Error::Io`] on syscall failure writing the header
1159    /// (in-memory backend only — the file-backed path no longer
1160    /// performs an immediate write).
1161    pub fn set_root_catalog(&mut self, root: u64) -> Result<()> {
1162        self.header.root_catalog = root;
1163        self.stage_or_write_header()
1164    }
1165
1166    /// Allocate a new page. If the freelist is non-empty, recycles its
1167    /// head; otherwise appends a brand-new page to the file.
1168    ///
1169    /// As of issue #22, `alloc_page` is **transactional**: the
1170    /// freelist-page mutation it performs is staged in the current
1171    /// WAL transaction (the freelist link page is written through
1172    /// the WAL just like a regular user page write). As of issue
1173    /// #64, the file-header update (`freelist_head` / `page_count`)
1174    /// also rides the WAL (via the same private
1175    /// `stage_or_write_header` pathway that M6.5 #51 installed for
1176    /// [`Pager::set_root_catalog`]) — a crash between the WAL frame
1177    /// durability and the header write can no longer leave the
1178    /// on-disk header pointing at a not-yet-durable freelist link
1179    /// page. Callers SHOULD call [`Pager::commit`] before relying
1180    /// on the allocation being durable; pending allocations are
1181    /// lost on `Pager::open` after a crash, exactly like
1182    /// uncommitted user writes.
1183    ///
1184    /// # Errors
1185    ///
1186    /// - [`Error::Io`] on syscall failure when extending the file.
1187    /// - [`Error::Corruption`] if the freelist head fails to decode
1188    ///   (indicates a previously-written freelist page has been
1189    ///   damaged).
1190    /// - [`Error::InvalidArgument`] in the unrealistic case that
1191    ///   `page_count` would overflow `u64` or the resulting file size
1192    ///   would overflow.
1193    pub fn alloc_page(&mut self) -> Result<PageId> {
1194        // Rule 5: catch a future regression at the public boundary.
1195        // Both downstream paths (`alloc_from_freelist`, `alloc_fresh`)
1196        // mutate the file header and route the mutation through
1197        // `stage_or_write_header`; that helper requires an open txn so
1198        // the staged page-0 frame lands inside a commit group.
1199        debug_assert!(
1200            self.in_txn(),
1201            "alloc_page must be inside a Pager txn (begin_txn/end_txn)"
1202        );
1203        if let Some(head) = PageId::new(self.header.freelist_head) {
1204            self.alloc_from_freelist(head)
1205        } else {
1206            self.alloc_fresh()
1207        }
1208    }
1209
1210    /// Read page `id`. Returns a borrow-shaped [`PageRef`] that
1211    /// references bytes resident in one of (a) the in-flight
1212    /// transaction buffer, (b) the committed-but-not-checkpointed
1213    /// WAL view, (c) the LRU cache, or (d) — on a cache miss — a
1214    /// freshly-inserted cache frame populated by a single `pread`.
1215    ///
1216    /// Read priority: in-flight transaction buffer → committed
1217    /// (WAL) view → cache → main file. The first three are
1218    /// in-memory hash-map / cache lookups; the last is a `pread`.
1219    ///
1220    /// # Allocation contract (Rule 3)
1221    ///
1222    /// `read_page` performs **no heap allocation on cache hits or
1223    /// WAL-overlay hits**. On a cache miss, a single `pread` is
1224    /// issued and the page is inserted into the cache; the returned
1225    /// `PageRef` then borrows that cache frame. The previous M2
1226    /// signature returned `Result<Page>` and cost one `Page` clone
1227    /// per call regardless of hit/miss; the borrow API removes that
1228    /// per-call clone.
1229    ///
1230    /// # Lifetime contract
1231    ///
1232    /// The returned `PageRef<'_>` borrows `self`. The borrow checker
1233    /// forbids any mutating call on the same pager (`write_page`,
1234    /// `commit`, `checkpoint`, `alloc_page`, `free_page`, `close`,
1235    /// `flush`) while a `PageRef` is alive. Callers that need an
1236    /// owned page across mutating calls can use
1237    /// [`PageRef::to_owned_page`].
1238    ///
1239    /// # Errors
1240    ///
1241    /// - [`Error::InvalidArgument`] if `id` is out of range.
1242    /// - [`Error::Io`] if a cache-miss read from disk fails.
1243    /// - [`Error::Corruption`] if the page trailer fails to verify
1244    ///   on a cache-miss path.
1245    pub fn read_page(&mut self, id: PageId) -> Result<PageRef<'_>> {
1246        debug_assert!(id.get() > 0, "PageId is non-zero by construction");
1247        debug_assert!(
1248            id.get() < self.header.page_count,
1249            "read_page called with out-of-range id",
1250        );
1251        if id.get() >= self.header.page_count {
1252            return Err(Error::InvalidArgument("page id out of range"));
1253        }
1254        // Cache miss / WAL miss path takes &mut self for the cache
1255        // mutation; resolve it first so we can then drop the &mut
1256        // borrow and reach for a shared one.
1257        if self.wal_lookup_some(id) {
1258            return self.lookup_in_wal(id);
1259        }
1260        if self.cache.get(id).is_some() {
1261            return self.lookup_in_cache(id);
1262        }
1263        let buf = self.read_through(id)?;
1264        let evicted = self.cache.insert(id, buf, false);
1265        self.handle_eviction(evicted)?;
1266        self.lookup_in_cache(id)
1267    }
1268
1269    /// `true` iff the WAL overlay carries an entry for `id`.
1270    fn wal_lookup_some(&self, id: PageId) -> bool {
1271        let Some(state) = self.wal.as_ref() else {
1272            return false;
1273        };
1274        state.pending.contains_key(&id) || state.view.contains_key(&id)
1275    }
1276
1277    /// Return a `PageRef` borrowing from the WAL overlay. Assumes
1278    /// the caller verified the entry exists via [`Self::wal_lookup_some`].
1279    fn lookup_in_wal(&self, id: PageId) -> Result<PageRef<'_>> {
1280        let state = self
1281            .wal
1282            .as_ref()
1283            .ok_or(Error::InvalidArgument("internal: wal overlay missing"))?;
1284        // #80: `view` values are `Arc<Page>`; deref to `&Page` so both
1285        // arms unify on the borrow `PageRef` needs. The Arc is not
1286        // cloned — only borrowed for the lifetime of the returned ref.
1287        let page = state
1288            .pending
1289            .get(&id)
1290            .or_else(|| state.view.get(&id).map(Arc::as_ref))
1291            .ok_or(Error::InvalidArgument("internal: wal lookup race"))?;
1292        Ok(PageRef::new(id, page))
1293    }
1294
1295    /// Return a `PageRef` borrowing from the cache. The caller must
1296    /// have just touched the cache (so the LRU is already updated).
1297    fn lookup_in_cache(&mut self, id: PageId) -> Result<PageRef<'_>> {
1298        let page = self
1299            .cache
1300            .get(id)
1301            .ok_or(Error::InvalidArgument("internal: cache miss after insert"))?;
1302        Ok(PageRef::new(id, page))
1303    }
1304
1305    /// Write `page` back to `id`. For file-backed databases, the write
1306    /// is staged in the WAL transaction buffer; for in-memory
1307    /// databases, the write goes straight to the cache as in M2.
1308    ///
1309    /// To make the write durable, call [`Pager::commit`].
1310    ///
1311    /// # Errors
1312    ///
1313    /// - [`Error::InvalidArgument`] if `id` is out of range.
1314    /// - [`Error::Io`] if a dirty eviction triggered by this insert
1315    ///   fails to write its predecessor to disk (memory pager only).
1316    pub fn write_page(&mut self, id: PageId, page: &Page) -> Result<()> {
1317        debug_assert!(id.get() < self.header.page_count);
1318        if id.get() >= self.header.page_count {
1319            return Err(Error::InvalidArgument("page id out of range"));
1320        }
1321        if let Some(state) = self.wal.as_mut() {
1322            // Stage in the txn buffer. The cache is unchanged; reads
1323            // for this id will hit `state.pending` until commit.
1324            state.pending.insert(id, page.clone());
1325            return Ok(());
1326        }
1327        // In-memory pager — preserve M2 cache-then-evict semantics.
1328        if let Some(slot) = self.cache.get_mut(id) {
1329            *slot = page.clone();
1330            return Ok(());
1331        }
1332        let evicted = self.cache.insert(id, page.clone(), true);
1333        self.handle_eviction(evicted)
1334    }
1335
1336    /// Free a previously-allocated page, returning it to the freelist.
1337    /// `id` must refer to a currently-allocated page; freeing the same
1338    /// page twice is a caller bug.
1339    ///
1340    /// As of issue #22, the freelist link page is staged in the
1341    /// current WAL transaction (file-backed pagers) rather than
1342    /// written directly to the main file. As of issue #64, the
1343    /// header `freelist_head` update also rides the WAL (via the
1344    /// same `stage_or_write_header` pathway that M6.5 #51 installed
1345    /// for [`Pager::set_root_catalog`]); a crash mid-txn no longer
1346    /// leaves the on-disk header pointing at a freelist link that is
1347    /// only durable in the WAL view. Call [`Pager::commit`] before
1348    /// relying on the free being durable.
1349    ///
1350    /// # Errors
1351    ///
1352    /// - [`Error::InvalidArgument`] if `id` is out of range.
1353    /// - [`Error::Io`] if the freelist record or header write fails.
1354    pub fn free_page(&mut self, id: PageId) -> Result<()> {
1355        debug_assert!(id.get() > 0);
1356        debug_assert!(id.get() < self.header.page_count);
1357        // Rule 5: catch a future regression at the call boundary.
1358        // `free_page` mutates the file header (`freelist_head`); on
1359        // file-backed pagers the mutation rides the WAL via
1360        // `stage_or_write_header`, which requires an open txn so the
1361        // staged page-0 frame lands inside a commit group.
1362        debug_assert!(
1363            self.in_txn(),
1364            "free_page must be inside a Pager txn (begin_txn/end_txn)"
1365        );
1366        if id.get() >= self.header.page_count {
1367            return Err(Error::InvalidArgument("page id out of range"));
1368        }
1369        let _ = self.cache.evict(id);
1370        let next = self.header.freelist_head;
1371        let mut buf = Page::zeroed();
1372        encode_freelist_page(FreeListPage::new(next), &mut buf);
1373        // Stamp the per-page trailer so the freelist page reads back
1374        // valid from the cache / WAL view / main file uniformly.
1375        write_page_trailer(&mut buf);
1376        if let Some(state) = self.wal.as_mut() {
1377            // Replace any in-flight write to this id with the freelist
1378            // encoding. Do NOT touch `state.view` here — the live
1379            // committed view must stay intact for any in-flight
1380            // [`ReaderSnapshot`] that cloned it before this free. The
1381            // pending → view replacement happens atomically inside
1382            // [`Pager::commit_inner`]. See M6 #53: the eager
1383            // `state.view.remove(&id)` here used to poison snapshots
1384            // taken between a writer's free and the writer's commit.
1385            state.pending.insert(id, buf);
1386        } else {
1387            // In-memory pager: keep the M2 direct-write semantics.
1388            self.write_back_page(id, &buf)?;
1389        }
1390        self.header.freelist_head = id.get();
1391        // #64: route the header field update through the WAL on
1392        // file-backed pagers (same path M6.5 #51 installed for
1393        // `set_root_catalog`). A direct `write_header()` here could
1394        // leave the on-disk header pointing at a `freelist_head`
1395        // whose link page is only durable in the WAL view, surfacing
1396        // as `Error::Corruption { page_id: 1 }` on reopen without
1397        // an explicit commit.
1398        self.stage_or_write_header()?;
1399        Ok(())
1400    }
1401
1402    /// Commit the in-flight transaction. Writes every staged frame to
1403    /// the WAL with a single `sync_data` at the end (group commit).
1404    /// Returns the LSN of the last committed frame, or `0` if the
1405    /// transaction was empty.
1406    ///
1407    /// If the WAL's committed-frame count exceeds
1408    /// `Config::checkpoint_threshold` after the commit, the pager
1409    /// inlines a [`Pager::checkpoint`] call. Auto-checkpoint amortises
1410    /// recovery time across writers without surfacing as a separate
1411    /// API call to the caller.
1412    ///
1413    /// For in-memory pagers (no WAL) this is a no-op returning `0`.
1414    ///
1415    /// # Errors
1416    ///
1417    /// Returns [`Error::Io`] on syscall failure.
1418    pub fn commit(&mut self) -> Result<Lsn> {
1419        let lsn = self.commit_inner()?;
1420        if let Some(state) = self.wal.as_ref() {
1421            if state.wal.committed_frames() >= self.config.checkpoint_threshold {
1422                self.checkpoint()?;
1423            }
1424        }
1425        Ok(lsn)
1426    }
1427
1428    fn commit_inner(&mut self) -> Result<Lsn> {
1429        // Pre-encode the in-memory header BEFORE we borrow
1430        // `self.wal` mutably; encoding does not need the WAL state
1431        // and avoids a borrow-checker dance later in the function.
1432        let mut header_page: Page = Page::zeroed();
1433        encode_header(&self.header, &mut header_page);
1434
1435        // #91: fresh-page allocations now ride the WAL (their zeroed
1436        // bodies are staged in `state.pending`), so the per-commit
1437        // main-file extension barrier — and the `main_extend_dirty` flag
1438        // it fired on — is GONE. A growing commit issues exactly ONE
1439        // `F_FULLFSYNC`: the WAL group-commit below. There is no longer
1440        // any un-WAL'd main-file write to order ahead of it.
1441        let Some(state) = self.wal.as_mut() else {
1442            return Ok(Lsn::ZERO);
1443        };
1444        let header_dirty = state.header_dirty;
1445        if state.pending.is_empty() && !header_dirty {
1446            // Empty commit: nothing staged, header clean — no WAL frame
1447            // to write, nothing to make durable. (A fresh alloc always
1448            // dirties the header AND stages a page, so this branch is
1449            // only reached by a truly no-op commit.)
1450            return Ok(Lsn::ZERO);
1451        }
1452        let mut txn = state.wal.begin_txn();
1453        // Iterate `pending` in `PageId` order so commits are
1454        // deterministic across machines (HashMap iteration order is
1455        // not). Deterministic ordering matters for the crash-cycle
1456        // test in #18 where the same seed must produce the same byte
1457        // sequence on every runner.
1458        let mut ids: Vec<PageId> = state.pending.keys().copied().collect();
1459        ids.sort_unstable();
1460        for id in &ids {
1461            if let Some(page) = state.pending.get(id) {
1462                txn.append(*id, page)?;
1463            }
1464        }
1465        // M6 #51: if the catalog touched the file header, append a
1466        // page-0 frame carrying the freshly-encoded in-memory header.
1467        // The frame goes LAST in the WAL transaction so its
1468        // commit-marker bit is the one that flushes the group. On
1469        // replay the page-0 frame populates the in-memory header
1470        // for the next open.
1471        if header_dirty {
1472            txn.append_header(&header_page)?;
1473        }
1474        let lsn = txn.commit()?;
1475        // Move pages from pending → view, and invalidate any cache
1476        // entries (the cache holds main-file content, which is now
1477        // stale for these pages until checkpoint).
1478        for id in ids {
1479            if let Some(page) = state.pending.remove(&id) {
1480                // #80 / M6 #53: insert a FRESH `Arc<Page>` for this id.
1481                // `HashMap::insert` REPLACES any prior `Arc` under the
1482                // same id; it never mutates a shared `Arc` in place. Any
1483                // in-flight `ReaderSnapshot` that cloned `view` before
1484                // this commit still holds its OWN map pointing at the
1485                // OLD `Arc`, so its read is isolated from this post-pin
1486                // write — that is what preserves MVCC. Do NOT regress
1487                // this to a single shared mutable map (e.g. `Arc<HashMap>`
1488                // mutated in place): that would let a pinned snapshot
1489                // observe this write and break snapshot isolation.
1490                let fresh = Arc::new(page);
1491                // Rule 5: the value we insert must be uniquely owned at
1492                // insert time (no snapshot can be aliasing the brand-new
1493                // Arc), proving we are publishing a new version rather
1494                // than mutating one a reader already pinned.
1495                debug_assert_eq!(
1496                    Arc::strong_count(&fresh),
1497                    1,
1498                    "#53: a freshly-committed page version must be \
1499                     uniquely owned when published into the view",
1500                );
1501                state.view.insert(id, fresh);
1502            }
1503            let _ = self.cache.evict(id);
1504        }
1505        if header_dirty {
1506            state.view_header = Some(header_page);
1507            state.header_dirty = false;
1508            state.committed_root_catalog = self.header.root_catalog;
1509        }
1510        Ok(lsn)
1511    }
1512
1513    /// Perform a final checkpoint and remove the WAL sidecar.
1514    ///
1515    /// `close` does NOT auto-commit a pending transaction —
1516    /// `write_page` calls without a matching `commit()` are dropped
1517    /// silently, matching the "uncommitted writes are not durable"
1518    /// half of the design.md ACID contract. If you want the pending
1519    /// txn to land on disk, call `commit()` before `close()`.
1520    ///
1521    /// After `close()` returns, a fresh `Pager::open` on the same
1522    /// path observes a database with no WAL — the design.md
1523    /// "no sidecar files left behind after a clean shutdown"
1524    /// invariant.
1525    ///
1526    /// For in-memory pagers `close` is a no-op (no WAL to remove).
1527    ///
1528    /// # Errors
1529    ///
1530    /// Returns [`Error::Io`] on syscall failure.
1531    pub fn close(mut self) -> Result<()> {
1532        // Explicitly discard pending: uncommitted writes are lost.
1533        // #91: this drops any fresh-page bodies staged but never
1534        // committed — they had no WAL frame and never extended the main
1535        // file, so there is nothing on disk to undo.
1536        if let Some(state) = self.wal.as_mut() {
1537            state.pending.clear();
1538        }
1539        self.checkpoint()?;
1540        let path = self
1541            .wal
1542            .as_ref()
1543            .map(|state| state.wal.path().to_path_buf());
1544        drop(self);
1545        if let Some(p) = path {
1546            crate::wal::remove_wal(&p)?;
1547        }
1548        Ok(())
1549    }
1550
1551    /// Backward-compatible flush. At M3 this is `commit() +
1552    /// checkpoint() + fsync(main)` for file-backed databases. For
1553    /// the in-memory backend it preserves the M2 "drain dirty
1554    /// cache + fsync" semantics. Kept as a stable alias so M2 tests
1555    /// continue to work — new code SHOULD call [`Pager::commit`] +
1556    /// [`Pager::checkpoint`] / [`Pager::close`] directly.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns [`Error::Io`] on syscall failure.
1561    pub fn flush(&mut self) -> Result<()> {
1562        let _ = self.commit()?;
1563        self.checkpoint()?;
1564        // Drain any dirty cache frames (memory-pager path, which is
1565        // the M2 semantics for `flush`).
1566        let cap = self.cache.capacity();
1567        let pending: Vec<(PageId, Page)> = self.cache.drain_dirty().take(cap).collect();
1568        for (id, page) in pending {
1569            self.write_back_page(id, &page)?;
1570        }
1571        self.write_header()?;
1572        match &self.backend {
1573            Backend::File(handle) => handle.sync_data(self.config.sync_mode)?,
1574            Backend::Memory(_) => {}
1575        }
1576        Ok(())
1577    }
1578
1579    /// Roll every committed WAL frame forward into the main file.
1580    ///
1581    /// Protocol (see `docs/format.md` § Salt rotation):
1582    /// 1. For every page-id in the WAL view, write the page (with
1583    ///    its CRC32C trailer) into the main file.
1584    /// 2. `sync_data(SyncMode::Full)` on the main file. Only after
1585    ///    this returns Ok are the main-file writes durable.
1586    /// 3. Rotate the WAL salt via `Wal::reset_after_checkpoint` and
1587    ///    truncate the WAL to header-only with the new salt.
1588    /// 4. Stamp the new salt into the main file's `wal_salt` header
1589    ///    field and `sync_data` the main file again.
1590    ///
1591    /// Idempotent: a second invocation on an empty view is a no-op.
1592    ///
1593    /// Crash-recovery model: a crash before step 3 leaves the old
1594    /// WAL with the old salt; the next `Pager::open` recovers it
1595    /// (idempotent — re-applying writes the same bytes step 1
1596    /// already wrote). A crash after step 3 but before step 4
1597    /// leaves the WAL with the new salt and the main file with the
1598    /// old; the next open reads the OLD salt from the main header,
1599    /// fails to match, treats the WAL as empty, and proceeds —
1600    /// recovery loses no data because step 2 made the main file
1601    /// authoritative before the salt rotated.
1602    ///
1603    /// In-memory pagers have no WAL; `checkpoint` is a no-op for
1604    /// them.
1605    ///
1606    /// # Errors
1607    ///
1608    /// Returns [`Error::Io`] on syscall failure.
1609    #[cfg_attr(
1610        feature = "tracing",
1611        tracing::instrument(name = "pager.checkpoint", level = "debug", skip_all)
1612    )]
1613    pub fn checkpoint(&mut self) -> Result<()> {
1614        // M6: respect live MVCC reader snapshots.  If any reader has
1615        // pinned an LSN below the current end-of-WAL, defer the
1616        // checkpoint — the frames between the lowest pin and the
1617        // end of the WAL are exactly the frames that reader still
1618        // needs to see, and reclaiming them would race the reader's
1619        // `ReaderSnapshot::read_page` calls.
1620        if self.checkpoint_deferred_for_pinned_reader() {
1621            #[cfg(feature = "tracing")]
1622            tracing::debug!(reason = "reader_pin", "deferred");
1623            return Ok(());
1624        }
1625        // #80: the drained view is `Vec<(PageId, Arc<Page>)>` — draining
1626        // the map hands us the sole remaining owner of each `Arc` (no
1627        // live snapshot can reach a checkpointed frame: the pin check
1628        // above already deferred if any reader still needs these).
1629        let (view_pages, drained_header): (Vec<(PageId, Arc<Page>)>, Option<Page>) =
1630            if let Some(state) = self.wal.as_mut() {
1631                let pages: Vec<(PageId, Arc<Page>)> = state.view.drain().collect();
1632                let hdr = state.view_header.take();
1633                (pages, hdr)
1634            } else {
1635                return Ok(());
1636            };
1637        let nothing_to_do = view_pages.is_empty() && drained_header.is_none();
1638        self.apply_checkpoint_view(view_pages, drained_header)?;
1639        if nothing_to_do {
1640            return Ok(());
1641        }
1642        // Rotate the salt, persist it into the main header, and
1643        // truncate the WAL. Crash between the salt rotation and the
1644        // header write leaves the old WAL with the old salt still on
1645        // disk; recovery will read the OLD salt from the header and
1646        // re-apply those frames (idempotent because they write the
1647        // same bytes that the main file already received above).
1648        self.rotate_wal_salt_and_persist()
1649    }
1650
1651    /// Returns `true` when a live MVCC reader has pinned an LSN below
1652    /// the current end-of-WAL — in which case [`Self::checkpoint`]
1653    /// must defer rather than reclaim frames the reader still needs.
1654    fn checkpoint_deferred_for_pinned_reader(&self) -> bool {
1655        let Some(min_lsn) = self.min_pinned_lsn() else {
1656            return false;
1657        };
1658        let end_lsn = self
1659            .wal
1660            .as_ref()
1661            .map_or(Lsn::ZERO, |s| s.wal.next_lsn().prev_saturating());
1662        min_lsn < end_lsn
1663    }
1664
1665    /// Phase 2 of [`Self::checkpoint`]: write each WAL-view page back
1666    /// to the main file (evicting the matching cache slot so a
1667    /// subsequent read re-fetches the durable copy), apply any staged
1668    /// page-0 header, and `sync_data` the main backend so the writes
1669    /// are durable before the salt rotation.
1670    fn apply_checkpoint_view(
1671        &mut self,
1672        view_pages: Vec<(PageId, Arc<Page>)>,
1673        drained_header: Option<Page>,
1674    ) -> Result<()> {
1675        // #91 (guardrail 3): fresh allocations rode the WAL and never
1676        // extended the main file, so the drained view may name page ids
1677        // BEYOND the current physical high-water. Grow the main file ONCE
1678        // (a single bounded `set_len`, Rule 2) to cover the maximum
1679        // drained id BEFORE the write-back loop, then re-seed
1680        // `main_high_water` from the grown length. This is the sole place
1681        // the file grows on the file backend.
1682        self.grow_main_to_cover(&view_pages)?;
1683        for (id, page) in view_pages {
1684            // #80: `write_back_page` wants `&Page`; deref the `Arc`.
1685            self.write_back_page(id, page.as_ref())?;
1686            let _ = self.cache.evict(id);
1687        }
1688        // M6 #51: if the WAL view carried a staged page-0 header,
1689        // write it to the main file at offset 0. The header carries
1690        // its own CRC32C (see `pager/header.rs`) — no page-trailer
1691        // stamping is performed; that distinguishes page 0 from
1692        // every other page.
1693        if let Some(hp) = drained_header {
1694            match &mut self.backend {
1695                Backend::File(handle) => handle.write_all_at(hp.as_bytes(), 0)?,
1696                Backend::Memory(bytes) => {
1697                    if bytes.len() < PAGE_SIZE {
1698                        bytes.resize(PAGE_SIZE, 0);
1699                    }
1700                    bytes[..PAGE_SIZE].copy_from_slice(hp.as_bytes());
1701                }
1702            }
1703        }
1704        match &self.backend {
1705            Backend::File(handle) => handle.sync_data(self.config.sync_mode)?,
1706            Backend::Memory(_) => {}
1707        }
1708        Ok(())
1709    }
1710
1711    /// #91 (guardrail 3): grow the main file so it physically covers the
1712    /// maximum `PageId` in `view_pages`, in a SINGLE bounded `set_len`
1713    /// (Rule 2: the target length is `file_length_for(max_id)`, a closed
1714    /// form — no per-page syscalls). Re-seed [`Self::main_high_water`]
1715    /// from the grown length. A no-op when the file already covers every
1716    /// drained id (e.g. an all-overwrite checkpoint with no fresh pages)
1717    /// or on the in-memory backend (whose `Vec` was grown per-alloc and
1718    /// will be resized by `write_back_page` if needed).
1719    fn grow_main_to_cover(&mut self, view_pages: &[(PageId, Arc<Page>)]) -> Result<()> {
1720        if !matches!(self.backend, Backend::File(_)) {
1721            return Ok(());
1722        }
1723        let Some(max_id) = view_pages.iter().map(|(id, _)| id.get()).max() else {
1724            return Ok(());
1725        };
1726        // Already covered: the slot index `max_id` is `< main_high_water`.
1727        if max_id < self.main_high_water {
1728            return Ok(());
1729        }
1730        let new_len = self.file_length_for(max_id)?;
1731        if let Backend::File(handle) = &mut self.backend {
1732            handle.set_len(new_len)?;
1733        }
1734        // Re-seed the mark from the grown length: the file now covers
1735        // page 0 plus slots `1..=max_id`, i.e. `max_id + 1` pages.
1736        self.main_high_water = self.main_pages_for_len(new_len);
1737        debug_assert!(
1738            self.main_high_water > max_id,
1739            "#91: grown file must physically cover the max drained id",
1740        );
1741        Ok(())
1742    }
1743
1744    /// Phase 3 of [`Self::checkpoint`]: rotate the WAL salt (which
1745    /// also truncates the WAL to header-only with the new salt),
1746    /// stamp the new salt into the main file header, and `sync_data`
1747    /// the main backend.
1748    fn rotate_wal_salt_and_persist(&mut self) -> Result<()> {
1749        if let Some(state) = self.wal.as_mut() {
1750            state.wal.reset_after_checkpoint()?;
1751            stamp_salt_into_header(&mut self.header, state.wal.salt());
1752        }
1753        self.write_header()?;
1754        match &self.backend {
1755            Backend::File(handle) => handle.sync_data(self.config.sync_mode)?,
1756            Backend::Memory(_) => {}
1757        }
1758        Ok(())
1759    }
1760
1761    /// Open a new MVCC reader snapshot at the current WAL end-LSN.
1762    ///
1763    /// The snapshot captures (1) the LSN of the most-recent committed
1764    /// frame in the WAL at the moment of the call, and (2) a clone of
1765    /// the pager's in-memory committed view (`WalState.view`).
1766    /// Reads through the snapshot use the cloned view + the main
1767    /// file; pending writes from a concurrent `WriteTxn` and frames
1768    /// committed AFTER the snapshot was taken are invisible.
1769    ///
1770    /// On in-memory pagers (no WAL) the snapshot captures `pinned_lsn
1771    /// = 0` and an empty frozen view — every read falls through to
1772    /// the main backend.
1773    ///
1774    /// The returned [`ReaderSnapshot`] registers a pin in the pager's
1775    /// live-snapshots map and removes the pin on drop.  Checkpoint
1776    /// consults `snapshots.values().min()` when deciding whether it
1777    /// is safe to reclaim WAL frames.
1778    ///
1779    /// Power-of-ten Rule 9: no `dyn`. The snapshot is generic over
1780    /// `F: FileBackend`.
1781    ///
1782    /// # Errors
1783    ///
1784    /// Returns `Error::Io` only via underlying syscalls; the in-
1785    /// memory portion of this call cannot fail.
1786    pub fn reader_snapshot(&mut self) -> Result<ReaderSnapshot<F>> {
1787        let pinned_lsn = self
1788            .wal
1789            .as_ref()
1790            .map_or(Lsn::ZERO, |s| s.wal.next_lsn().prev_saturating());
1791        let frozen_view = self
1792            .wal
1793            .as_ref()
1794            .map(|s| s.view.clone())
1795            .unwrap_or_default();
1796        let frozen_header = self.wal.as_ref().and_then(|s| s.view_header.clone());
1797        // M6 #51: capture the committed-view of the catalog root.
1798        // For file-backed pagers this is the value last persisted
1799        // by a successful `commit` (which is what readers should
1800        // observe). The live `self.header.root_catalog` may have
1801        // been advanced by a writer mid-txn; the snapshot must NOT
1802        // see that.
1803        let root_catalog = match self.wal.as_ref() {
1804            Some(state) => state.committed_root_catalog,
1805            None => self.header.root_catalog,
1806        };
1807        let snapshot_id = SnapshotId::new(self.next_snapshot_id.fetch_add(1, Ordering::Relaxed));
1808        // Register the pin BEFORE constructing the snapshot value so
1809        // a panic during construction (impossible today, but Rule 5
1810        // defensive) cannot leave a phantom entry in the map.
1811        let mut guard = self
1812            .snapshots
1813            .lock()
1814            .map_err(|_| Error::InvalidArgument("snapshot map poisoned"))?;
1815        debug_assert!(
1816            !guard.contains_key(&snapshot_id),
1817            "next_snapshot_id is monotonic; collisions are impossible",
1818        );
1819        guard.insert(snapshot_id, pinned_lsn);
1820        drop(guard);
1821        Ok(ReaderSnapshot {
1822            pinned_lsn,
1823            frozen_view,
1824            frozen_header,
1825            root_catalog,
1826            pin: SnapshotPin {
1827                id: snapshot_id,
1828                map: Arc::clone(&self.snapshots),
1829            },
1830            _phantom: std::marker::PhantomData,
1831        })
1832    }
1833
1834    /// Snapshot the pager's in-memory header AND WAL committed
1835    /// view for txn-rollback purposes.  Returned to the caller and
1836    /// passed back into [`Self::restore_header_snapshot`] on
1837    /// rollback.
1838    ///
1839    /// The view is captured because [`Self::free_page`] removes
1840    /// per-page entries from the WAL view immediately (the page's
1841    /// committed content becomes stale once the id is back on the
1842    /// freelist).  Without snapshotting the view, a rolled-back txn
1843    /// that freed a page would leave readers no way to find the
1844    /// page's committed content — it sits below `state.view` (now
1845    /// missing the entry) and below the on-disk main file (never
1846    /// checkpointed).  Snapshot/restore closes that gap.
1847    ///
1848    /// Header fields (`root_catalog`, `freelist_head`,
1849    /// `page_count`) are written direct to disk (not through the
1850    /// WAL) so a pure pending-buffer discard leaves the header
1851    /// inconsistent with the rolled-back page bodies.  The
1852    /// snapshot/restore pair closes that gap for the M6
1853    /// `Db::transaction` rollback path.
1854    #[must_use]
1855    pub fn header_snapshot(&self) -> HeaderSnapshot {
1856        HeaderSnapshot {
1857            root_catalog: self.header.root_catalog,
1858            freelist_head: self.header.freelist_head,
1859            page_count: self.header.page_count,
1860            view: self
1861                .wal
1862                .as_ref()
1863                .map(|s| s.view.clone())
1864                .unwrap_or_default(),
1865        }
1866    }
1867
1868    /// Restore the in-memory header AND WAL view from a
1869    /// previously-captured snapshot, then write the restored header
1870    /// to disk.  Used by [`crate::txn::WriteTxn::rollback`] to undo
1871    /// direct header writes + view mutations that happened during
1872    /// the rolled-back txn.
1873    ///
1874    /// # Errors
1875    ///
1876    /// Returns [`Error::Io`] on syscall failure when writing the
1877    /// restored header to disk.
1878    pub fn restore_header_snapshot(&mut self, snap: HeaderSnapshot) -> Result<()> {
1879        self.header.root_catalog = snap.root_catalog;
1880        self.header.freelist_head = snap.freelist_head;
1881        self.header.page_count = snap.page_count;
1882        if let Some(state) = self.wal.as_mut() {
1883            state.view = snap.view;
1884        }
1885        self.write_header()
1886    }
1887
1888    /// Discard every page in the in-flight transaction buffer.
1889    /// Used by [`crate::txn::WriteTxn::rollback`].  Idempotent —
1890    /// calling on an in-memory pager or on a file pager with an
1891    /// empty pending buffer is a no-op.
1892    ///
1893    /// M6 #51: also clears the header-dirty flag so a rolled-back
1894    /// `set_root_catalog` does not emit a stray page-0 frame on the
1895    /// next commit. The in-memory `self.header.root_catalog`
1896    /// restoration is the caller's job — `WriteTxn` uses
1897    /// [`Self::header_snapshot`] + [`Self::restore_header_snapshot`].
1898    pub fn rollback_pending_writes(&mut self) {
1899        // #91: dropping `pending` also discards any fresh-page bodies the
1900        // rolled-back txn staged. Those never rode a WAL commit and never
1901        // extended the main file, so there is nothing on disk to undo —
1902        // the caller restores `page_count` via `restore_header_snapshot`.
1903        if let Some(state) = self.wal.as_mut() {
1904            state.pending.clear();
1905            state.header_dirty = false;
1906        }
1907    }
1908
1909    /// Number of live reader snapshots.  For diagnostics and tests.
1910    #[must_use]
1911    pub fn live_snapshot_count(&self) -> usize {
1912        self.snapshots.lock().map(|g| g.len()).unwrap_or_default()
1913    }
1914
1915    /// Lowest LSN any live reader has pinned, or `None` if no
1916    /// snapshots are live.
1917    pub fn min_pinned_lsn(&self) -> Option<Lsn> {
1918        let guard = self.snapshots.lock().ok()?;
1919        guard.values().copied().min()
1920    }
1921
1922    /// `true` iff this pager has no WAL — i.e. it was constructed
1923    /// via [`Pager::memory`]. In-memory pagers have no MVCC surface;
1924    /// a [`ReaderSnapshot`] against one reads the live cache rather
1925    /// than the (absent) WAL frozen view. Public so callers in
1926    /// peer crates (e.g. M11 `Db::backup_to`) can dispatch on the
1927    /// in-memory case without reaching across the privacy boundary.
1928    #[must_use]
1929    pub fn is_memory_backed(&self) -> bool {
1930        self.wal.is_none()
1931    }
1932
1933    /// Read page `id` consulting the cache first, then the main
1934    /// backend (no WAL overlay). Used by
1935    /// [`ReaderSnapshot::read_page`] on memory pagers, where the
1936    /// cache may be ahead of the in-memory backend buffer because
1937    /// memory pagers write to cache and only flush on eviction.
1938    /// The caller takes `&Pager`; no cache mutation occurs (a miss
1939    /// here is a `read_through`, not an insert).
1940    pub(crate) fn read_cache_or_main(&self, id: PageId) -> Result<Page> {
1941        debug_assert!(id.get() > 0);
1942        debug_assert!(id.get() < self.header.page_count);
1943        if id.get() >= self.header.page_count {
1944            return Err(Error::InvalidArgument("page id out of range"));
1945        }
1946        if let Some(page) = self.cache.peek(id) {
1947            return Ok(page.clone());
1948        }
1949        self.read_through(id)
1950    }
1951
1952    /// Read the first [`PAGE_SIZE`] bytes from the main backend
1953    /// into `buf`, bypassing the cache and the WAL overlay.
1954    /// Used by [`crate::backup`] to capture page 0 (the file
1955    /// header) for inclusion in a backup. Errors with
1956    /// [`Error::BackupNotSupportedForMemoryPager`] on an in-memory
1957    /// pager (which has no on-disk file to read from).
1958    ///
1959    /// # Errors
1960    ///
1961    /// Returns [`Error::Io`] on syscall failure or
1962    /// [`Error::BackupNotSupportedForMemoryPager`] when the pager
1963    /// has no file backend.
1964    pub fn read_main_file_page_zero(&self, buf: &mut [u8; PAGE_SIZE]) -> Result<()> {
1965        match &self.backend {
1966            Backend::File(handle) => handle.read_exact_at(buf, 0),
1967            Backend::Memory(_) => Err(Error::BackupNotSupportedForMemoryPager),
1968        }
1969    }
1970
1971    /// Read page `id` consulting ONLY the main backend (no WAL
1972    /// overlay, no cache).  Used by [`ReaderSnapshot::read_page`]
1973    /// when the frozen view does not contain the page.
1974    ///
1975    /// Internal to the snapshot path; the on-disk page's CRC32C
1976    /// trailer is verified before the page is returned.
1977    /// Read page `id` consulting ONLY the main backend (no WAL
1978    /// overlay, no cache). Verifies the on-disk page trailer
1979    /// before returning the bytes.
1980    ///
1981    /// Used by [`ReaderSnapshot::read_page`] when the frozen view
1982    /// does not contain the page, and by the [`crate::backup`]
1983    /// module to materialise the source's main-file pages into a
1984    /// destination backup file.
1985    ///
1986    /// # Errors
1987    ///
1988    /// - [`Error::InvalidArgument`] if `id` is out of range.
1989    /// - [`Error::Io`] on syscall failure.
1990    /// - [`Error::Corruption`] if the on-disk trailer fails to
1991    ///   verify.
1992    pub fn read_main_file_page(&self, id: PageId) -> Result<Page> {
1993        debug_assert!(id.get() > 0, "PageId is non-zero by construction");
1994        debug_assert!(id.get() < self.header.page_count);
1995        if id.get() >= self.header.page_count {
1996            return Err(Error::InvalidArgument("page id out of range"));
1997        }
1998        self.read_through(id)
1999    }
2000
2001    // ---------- helpers ----------
2002
2003    // wal_lookup_some + lookup_in_wal replace the previous
2004    // `wal_lookup` that cloned the page. The split avoids the per-
2005    // call clone (Rule 3) while keeping the read_page hot path easy
2006    // to audit.
2007
2008    /// Read a freelist link page using the same WAL → cache → main
2009    /// priority chain that [`Self::read_page`] uses. Required by
2010    /// #22 because [`Self::free_page`] now stages freelist pages in
2011    /// the WAL transaction buffer; a subsequent [`Self::alloc_page`]
2012    /// must observe the most-recent (possibly uncommitted) freelist
2013    /// link.
2014    fn read_freelist_page(&self, id: PageId) -> Result<Page> {
2015        if let Some(state) = self.wal.as_ref() {
2016            // #80: deref the `view` arm's `Arc<Page>` to `&Page` so it
2017            // unifies with the `pending` arm; the returned `Page` is an
2018            // owned body clone, matching the prior behaviour.
2019            if let Some(p) = state
2020                .pending
2021                .get(&id)
2022                .or_else(|| state.view.get(&id).map(Arc::as_ref))
2023            {
2024                return Ok(p.clone());
2025            }
2026        }
2027        self.read_through(id)
2028    }
2029
2030    fn alloc_from_freelist(&mut self, head: PageId) -> Result<PageId> {
2031        let head_page = self.read_freelist_page(head)?;
2032        let entry = decode_freelist_page(&head_page).ok_or(Error::Corruption {
2033            page_id: head.get(),
2034        })?;
2035        // #54 (Power-of-Ten Rule 2): the freelist is an in-page linked
2036        // list popped one head per call, so the implicit walk across
2037        // successive `alloc_from_freelist` calls is bounded only by the
2038        // on-disk links. A corrupt or double-freed link can point back
2039        // at `head` (a self-referential cycle) or past the end of the
2040        // file, which would loop or hand out a phantom id. Validate the
2041        // next link here: it must be `0` (end of list) or a real page id
2042        // strictly below `page_count` that is not the page we just
2043        // popped. Anything else is `Error::Corruption`.
2044        if entry.next != 0 && (entry.next == head.get() || entry.next >= self.header.page_count) {
2045            return Err(Error::Corruption {
2046                page_id: head.get(),
2047            });
2048        }
2049        self.header.freelist_head = entry.next;
2050        let _ = self.cache.evict(head);
2051        // The recycled id leaves the freelist: its prior `state.pending`
2052        // entry (a freelist link encoding) is stale. Remove it so the
2053        // *caller's* subsequent write_page lands as the canonical
2054        // content for this id. Do NOT touch `state.view` here — the
2055        // live committed view must stay intact for any in-flight
2056        // [`ReaderSnapshot`] that cloned it before this realloc. The
2057        // pending → view replacement happens atomically inside
2058        // [`Pager::commit_inner`]. See M6 #53.
2059        if let Some(state) = self.wal.as_mut() {
2060            state.pending.remove(&head);
2061        }
2062        // #64: route the header field update through the WAL on
2063        // file-backed pagers (same path M6.5 #51 installed for
2064        // `set_root_catalog`). A direct `write_header()` here could
2065        // leave the on-disk header pointing at a `freelist_head`
2066        // (the recycled head's successor) whose link bytes are only
2067        // durable in the WAL view, surfacing as
2068        // `Error::Corruption { page_id: 1 }` on reopen without an
2069        // explicit commit.
2070        self.stage_or_write_header()?;
2071        Ok(head)
2072    }
2073
2074    fn alloc_fresh(&mut self) -> Result<PageId> {
2075        // Rule 5: every caller goes through `alloc_page`, which
2076        // already asserts `in_txn`; assert again here so a hypothetical
2077        // future direct caller can't bypass the WAL routing below.
2078        debug_assert!(
2079            self.in_txn(),
2080            "alloc_fresh must be inside a Pager txn (begin_txn/end_txn)"
2081        );
2082        let new_id_raw = self.header.page_count;
2083        let new_id =
2084            PageId::new(new_id_raw).ok_or(Error::InvalidArgument("page_count overflow"))?;
2085        // #91: the file backend routes the fresh page through the WAL;
2086        // the in-memory backend keeps the M2 direct-write semantics.
2087        if self.wal.is_some() {
2088            self.alloc_fresh_wal(new_id, new_id_raw)
2089        } else {
2090            self.alloc_fresh_memory(new_id, new_id_raw)
2091        }
2092    }
2093
2094    /// #91: file-backend fresh allocation. Stage a zeroed, trailer-
2095    /// stamped page into `state.pending` under `new_id` so it rides the
2096    /// SAME `WalTxn` as the page-0/`page_count` frame — one group-commit
2097    /// fsync covers both. The main file is NOT extended here and the body
2098    /// is NOT written to it; `read_page` resolves the slot from `pending`
2099    /// (and, post-commit, `view`) ahead of the main file, and
2100    /// `apply_checkpoint_view` grows the file + writes the body out at
2101    /// checkpoint. This deletes the #52 past-EOF hazard at the root: the
2102    /// only durable record of a fresh page before checkpoint is its WAL
2103    /// frame, never an un-WAL'd main-file extension.
2104    fn alloc_fresh_wal(&mut self, new_id: PageId, new_id_raw: u64) -> Result<PageId> {
2105        // A freshly-allocated page is a zeroed body with its CRC32C
2106        // trailer stamped — the same shape `free_page` stages for a
2107        // freelist link. The trailer matches the v0 interpretation; it is
2108        // re-encoded (v1 / encrypted) by `write_back_page` at checkpoint,
2109        // so a compression- or encryption-capable file still lands the
2110        // correct on-disk bytes. The pre-stamped trailer keeps a direct
2111        // WAL-view read of the slot self-consistent before any
2112        // `write_page` overwrites it.
2113        let mut blank = Page::zeroed();
2114        write_page_trailer(&mut blank);
2115        // Advance `page_count` first so the slot is in range, then stage
2116        // the body. Guardrail 1: do NOT seed a DIRTY cache frame — a
2117        // dirty eviction would `write_back_page` the fresh body DIRECTLY
2118        // to a main file we have NOT extended, re-introducing the
2119        // past-EOF write. The page lives in `pending`, which `read_page`
2120        // consults first, so no cache seed is needed at all.
2121        self.header.page_count = new_id_raw + 1;
2122        let state = self
2123            .wal
2124            .as_mut()
2125            .ok_or(Error::InvalidArgument("internal: wal overlay missing"))?;
2126        state.pending.insert(new_id, blank);
2127        // Rule 5: the staged slot is reachable and resolves from pending.
2128        debug_assert!(
2129            state.pending.contains_key(&new_id),
2130            "#91: fresh page must be staged in pending before commit",
2131        );
2132        // #64: route the header field update (`page_count`, possibly
2133        // `freelist_head`) through the WAL. See the comment on
2134        // `alloc_from_freelist` for the failure mode this prevents.
2135        self.stage_or_write_header()?;
2136        Ok(new_id)
2137    }
2138
2139    /// In-memory fresh allocation — byte-for-byte the M2 semantics.
2140    /// `extend_main_for` resizes the backing `Vec` per alloc, the cache
2141    /// is seeded dirty so the zeroed body lands on the next
2142    /// flush-eviction, and the blank body is written through (its trailer
2143    /// is stamped inside `write_back_page`). The in-memory backend has no
2144    /// durability surface, so none of #91's WAL/file-grow machinery
2145    /// applies.
2146    fn alloc_fresh_memory(&mut self, new_id: PageId, new_id_raw: u64) -> Result<PageId> {
2147        self.extend_main_for(new_id_raw)?;
2148        self.header.page_count = new_id_raw + 1;
2149        let evicted = self.cache.insert(new_id, Page::zeroed(), true);
2150        self.handle_eviction(evicted)?;
2151        self.write_back_page(new_id, &Page::zeroed())?;
2152        self.stage_or_write_header()?;
2153        Ok(new_id)
2154    }
2155
2156    /// In-memory fresh-alloc helper: grow the backing `Vec` by one
2157    /// physical stride so the slot `alloc_fresh_memory` is about to hand
2158    /// out exists. Memory-only by construction — the file backend routes
2159    /// fresh pages through the WAL (#91) and grows the file lazily in
2160    /// [`Self::apply_checkpoint_view`], so it never calls this. The
2161    /// in-memory `Vec` must stay sized exactly to `page_count` because
2162    /// reads index straight into it.
2163    ///
2164    /// Power-of-ten Rule 5: a `debug_assert` confirms this is never
2165    /// reached on the file backend.
2166    fn extend_main_for(&mut self, new_id_raw: u64) -> Result<()> {
2167        let _ = new_id_raw;
2168        let stride = self.physical_stride();
2169        match &mut self.backend {
2170            Backend::Memory(bytes) => {
2171                bytes.resize(bytes.len() + stride, 0);
2172                Ok(())
2173            }
2174            Backend::File(_) => {
2175                debug_assert!(
2176                    false,
2177                    "#91: file backend extends the main file at checkpoint, not at alloc",
2178                );
2179                Err(Error::InvalidArgument(
2180                    "internal: extend_main_for on file backend",
2181                ))
2182            }
2183        }
2184    }
2185
2186    /// Compute the on-disk file size for a main file whose top data
2187    /// slot index is `top_id_raw` — i.e. a file holding `top_id_raw + 1`
2188    /// pages total (page 0 plus slots `1..=top_id_raw`). Uses the
2189    /// encrypted physical stride (4136) when the file is
2190    /// encryption-capable; otherwise the legacy 4096-byte stride.
2191    /// Page 0 always contributes 4096 bytes. Used by
2192    /// [`Self::apply_checkpoint_view`] (#91) to size the single
2193    /// checkpoint `set_len`.
2194    fn file_length_for(&self, new_id_raw: u64) -> Result<u64> {
2195        let stride = self.physical_stride() as u64;
2196        // After this alloc the file has `new_id_raw + 1` pages
2197        // total (index 0..=new_id_raw). The on-disk size is
2198        // `PAGE_SIZE + new_id_raw * stride`.
2199        let data_pages = new_id_raw;
2200        let data_bytes = data_pages
2201            .checked_mul(stride)
2202            .ok_or(Error::InvalidArgument("file too large"))?;
2203        (PAGE_SIZE as u64)
2204            .checked_add(data_bytes)
2205            .ok_or(Error::InvalidArgument("file too large"))
2206    }
2207
2208    /// #86: inverse of [`Self::file_length_for`] — the number of pages
2209    /// (page 0 plus the data slots) physically present in a main file
2210    /// of `file_len` bytes. Used at open to seed
2211    /// [`Self::main_high_water`] from the real on-disk size so an
2212    /// already-extended file is not re-grown. Returns `0` for a file
2213    /// shorter than the header page (a brand-new / empty file), letting
2214    /// the first `alloc_fresh` grow from scratch. A partial trailing
2215    /// stride (torn extension) is floored — it does not count as a
2216    /// usable slot, so `alloc_fresh` will rewrite it cleanly.
2217    fn main_pages_for_len(&self, file_len: u64) -> u64 {
2218        let stride = self.physical_stride() as u64;
2219        if file_len < PAGE_SIZE as u64 {
2220            return 0;
2221        }
2222        let data_bytes = file_len - PAGE_SIZE as u64;
2223        1 + data_bytes / stride
2224    }
2225
2226    /// Phase 3 (issue #8): `true` iff this pager was opened against
2227    /// a `format_minor >= 1` file (i.e. one whose per-page trailer
2228    /// uses the v1 interpretation and whose pages MAY be
2229    /// LZ4-compressed on disk). Cached at open time from the
2230    /// file header so the read/write hot path doesn't re-decode
2231    /// it per page.
2232    #[must_use]
2233    pub fn is_compression_capable(&self) -> bool {
2234        (self.header.feature_flags & FEATURE_FLAG_COMPRESSION) != 0
2235    }
2236
2237    /// Phase 4 (issue #9): `true` iff this pager was opened against
2238    /// an encryption-capable file (`format_minor = 2` +
2239    /// `feature_flags` bit 1). Cached from the header at open so
2240    /// hot-path callers don't re-decode per page.
2241    #[must_use]
2242    pub fn is_encryption_capable(&self) -> bool {
2243        (self.header.feature_flags & FEATURE_FLAG_ENCRYPTION) != 0
2244    }
2245
2246    /// Phase 4 (issue #9): byte offset of page `id`'s on-disk slot
2247    /// in the main file. Uses the encrypted physical stride
2248    /// (4136 bytes) when the file is encryption-capable; otherwise
2249    /// the legacy 4096-byte stride. Page 0 is always at offset 0.
2250    #[must_use]
2251    fn physical_offset(&self, id: PageId) -> u64 {
2252        crate::pager::page::physical_offset_for(id.get(), self.header.feature_flags)
2253    }
2254
2255    /// Phase 4 (issue #9): on-disk size of a single non-header page.
2256    /// Returns 4096 on unencrypted files, 4136 on encrypted ones.
2257    #[must_use]
2258    fn physical_stride(&self) -> usize {
2259        crate::pager::page::physical_page_stride(self.header.feature_flags)
2260    }
2261
2262    /// Read straight from the backend without consulting the cache.
2263    /// Verifies the page trailer; page 0 is the caller's
2264    /// responsibility (the header carries its own checksum).
2265    ///
2266    /// Phase 3 (issue #8): on `format_minor >= 1` (compression-
2267    /// capable) files the on-disk trailer is v1 (1-bit flag +
2268    /// 31-bit CRC). CRC verification is **always** performed
2269    /// BEFORE any LZ4 decompression (Rule 2 + design.md "No
2270    /// silent corruption"): malicious / corrupt input must not
2271    /// reach the decompressor without integrity first.
2272    fn read_through(&self, id: PageId) -> Result<Page> {
2273        let mut p = Page::zeroed();
2274        let off = self.physical_offset(id);
2275        // Phase 4 (issue #9): on encrypted files, the physical page
2276        // is 4136 bytes (4096 ciphertext + 24 nonce + 16 tag). Read
2277        // the full physical page, decrypt+verify Poly1305, THEN
2278        // hand the 4096-byte plaintext to the rest of the read
2279        // pipeline. Order is non-negotiable: any other check
2280        // running first would operate on attacker-controlled bytes.
2281        if self.is_encryption_capable() {
2282            self.read_encrypted_into(id, off, &mut p)?;
2283        } else {
2284            self.read_plain_into(id, off, &mut p)?;
2285        }
2286        // Defense-in-depth at a runtime boundary (Rule 5): never
2287        // hand out a frame whose checksum has not been verified
2288        // against bytes resident in memory.
2289        if self.is_compression_capable() {
2290            decode_page_v1(&p, id.get())
2291        } else {
2292            if !page_trailer_valid(&p) {
2293                return Err(Error::Corruption { page_id: id.get() });
2294            }
2295            Ok(p)
2296        }
2297    }
2298
2299    /// Phase 4 (issue #9): read a plaintext page (4096 bytes) into
2300    /// `p`. The non-encrypted physical path.
2301    fn read_plain_into(&self, id: PageId, off: u64, p: &mut Page) -> Result<()> {
2302        match &self.backend {
2303            Backend::File(handle) => handle.read_exact_at(p.as_bytes_mut(), off)?,
2304            Backend::Memory(bytes) => {
2305                let start =
2306                    usize::try_from(off).map_err(|_| Error::InvalidArgument("offset overflow"))?;
2307                let end = start
2308                    .checked_add(PAGE_SIZE)
2309                    .ok_or(Error::InvalidArgument("offset overflow"))?;
2310                if end > bytes.len() {
2311                    return Err(Error::Corruption { page_id: id.get() });
2312                }
2313                p.as_bytes_mut().copy_from_slice(&bytes[start..end]);
2314            }
2315        }
2316        Ok(())
2317    }
2318
2319    /// Phase 4 (issue #9): read an encrypted physical page (4136
2320    /// bytes) and decrypt it into `p` (4096 bytes of plaintext).
2321    /// Returns [`Error::EncryptionKeyInvalid`] if Poly1305 fails.
2322    fn read_encrypted_into(&self, id: PageId, off: u64, p: &mut Page) -> Result<()> {
2323        let stride = self.physical_stride();
2324        // Fixed-size stack buffer (4136 bytes). Rule 3: no heap.
2325        let mut phys = [0u8; PAGE_SIZE + ENCRYPTION_OVERHEAD];
2326        debug_assert_eq!(stride, phys.len(), "stride must match encrypted buffer");
2327        match &self.backend {
2328            Backend::File(handle) => handle.read_exact_at(&mut phys, off)?,
2329            Backend::Memory(bytes) => {
2330                let start =
2331                    usize::try_from(off).map_err(|_| Error::InvalidArgument("offset overflow"))?;
2332                let end = start
2333                    .checked_add(stride)
2334                    .ok_or(Error::InvalidArgument("offset overflow"))?;
2335                if end > bytes.len() {
2336                    return Err(Error::Corruption { page_id: id.get() });
2337                }
2338                phys.copy_from_slice(&bytes[start..end]);
2339            }
2340        }
2341        self.decrypt_physical(id, &phys, p)
2342    }
2343
2344    /// Phase 4 (issue #9): decrypt `phys` (4136 bytes) into `out`
2345    /// (4096 bytes). The cipher key must already be derived; if
2346    /// `self.derived_key` is `None` on an encryption-capable file
2347    /// we surface `EncryptionKeyRequired` rather than panic
2348    /// (this branch is unreachable at runtime — `derive_key_for_open`
2349    /// rejects that combination at open).
2350    fn decrypt_physical(
2351        &self,
2352        id: PageId,
2353        phys: &[u8; PAGE_SIZE + ENCRYPTION_OVERHEAD],
2354        out: &mut Page,
2355    ) -> Result<()> {
2356        let Some(key) = self.derived_key.as_ref() else {
2357            return Err(Error::EncryptionKeyRequired);
2358        };
2359        #[cfg(feature = "encryption")]
2360        {
2361            crate::crypto::decrypt_page(key.as_bytes(), id.get(), phys, out.as_bytes_mut())
2362        }
2363        #[cfg(not(feature = "encryption"))]
2364        {
2365            // Unreachable: `derive_key_for_open` returns
2366            // `FormatFeatureUnsupported` on a no-feature build
2367            // against an encrypted file, so `derived_key` is always
2368            // `None` here and the early return above fires first.
2369            let _ = (id, phys, out, key);
2370            Err(Error::FormatFeatureUnsupported {
2371                feature: "encryption",
2372            })
2373        }
2374    }
2375
2376    /// Write straight to the backend without going through the cache.
2377    /// Computes and stamps the page trailer before the write so that a
2378    /// read-back will always verify.
2379    ///
2380    /// Phase 3 (issue #8): on compression-capable files, the
2381    /// on-disk representation is produced by
2382    /// [`Self::encode_page_for_disk`] — LZ4 if it helps, raw
2383    /// otherwise. The in-memory `page` argument is always the
2384    /// 4092-byte raw body the encoders produced; compression is
2385    /// fully transparent at this layer.
2386    fn write_back_page(&mut self, id: PageId, page: &Page) -> Result<()> {
2387        let off = self.physical_offset(id);
2388        let stamped = self.encode_page_for_disk(page)?;
2389        // Phase 4 (issue #9): on encryption-capable files the
2390        // physical write is 4136 bytes (ciphertext + nonce + tag);
2391        // we encrypt the 4096-byte logical page (post-compression
2392        // / CRC) into a stack buffer and write that out.
2393        if self.is_encryption_capable() {
2394            let phys = self.encrypt_logical(id, &stamped)?;
2395            self.write_phys_encrypted(off, &phys)?;
2396        } else {
2397            self.write_phys_4096(off, stamped.as_bytes())?;
2398        }
2399        Ok(())
2400    }
2401
2402    /// Phase 4 (issue #9): encrypt a stamped 4096-byte logical
2403    /// page into a 4136-byte physical block (ciphertext || nonce
2404    /// || tag) suitable for direct write-out.
2405    fn encrypt_logical(
2406        &self,
2407        id: PageId,
2408        page: &Page,
2409    ) -> Result<[u8; PAGE_SIZE + ENCRYPTION_OVERHEAD]> {
2410        let Some(key) = self.derived_key.as_ref() else {
2411            return Err(Error::EncryptionKeyRequired);
2412        };
2413        #[cfg(feature = "encryption")]
2414        {
2415            let mut out = [0u8; PAGE_SIZE + ENCRYPTION_OVERHEAD];
2416            crate::crypto::encrypt_page(key.as_bytes(), id.get(), page.as_bytes(), &mut out)?;
2417            Ok(out)
2418        }
2419        #[cfg(not(feature = "encryption"))]
2420        {
2421            // Unreachable: build_new_file_header rejects new
2422            // encryption-capable files in a no-feature build, and
2423            // open-time guards reject existing ones. Keep the
2424            // shape consistent.
2425            let _ = (id, page, key);
2426            Err(Error::FormatFeatureUnsupported {
2427                feature: "encryption",
2428            })
2429        }
2430    }
2431
2432    /// Phase 4 (issue #9): write a 4136-byte encrypted physical
2433    /// page at `off`.
2434    fn write_phys_encrypted(
2435        &mut self,
2436        off: u64,
2437        phys: &[u8; PAGE_SIZE + ENCRYPTION_OVERHEAD],
2438    ) -> Result<()> {
2439        let stride = PAGE_SIZE + ENCRYPTION_OVERHEAD;
2440        match &mut self.backend {
2441            Backend::File(handle) => handle.write_all_at(phys, off)?,
2442            Backend::Memory(bytes) => {
2443                let start =
2444                    usize::try_from(off).map_err(|_| Error::InvalidArgument("offset overflow"))?;
2445                let end = start
2446                    .checked_add(stride)
2447                    .ok_or(Error::InvalidArgument("offset overflow"))?;
2448                if end > bytes.len() {
2449                    bytes.resize(end, 0);
2450                }
2451                bytes[start..end].copy_from_slice(phys);
2452            }
2453        }
2454        Ok(())
2455    }
2456
2457    /// Phase 4 (issue #9): write a 4096-byte plain physical page at
2458    /// `off` (the pre-encryption, pre-Phase-4 behavior).
2459    fn write_phys_4096(&mut self, off: u64, page_bytes: &[u8; PAGE_SIZE]) -> Result<()> {
2460        match &mut self.backend {
2461            Backend::File(handle) => handle.write_all_at(page_bytes, off)?,
2462            Backend::Memory(bytes) => {
2463                let start =
2464                    usize::try_from(off).map_err(|_| Error::InvalidArgument("offset overflow"))?;
2465                let end = start
2466                    .checked_add(PAGE_SIZE)
2467                    .ok_or(Error::InvalidArgument("offset overflow"))?;
2468                if end > bytes.len() {
2469                    bytes.resize(end, 0);
2470                }
2471                bytes[start..end].copy_from_slice(page_bytes);
2472            }
2473        }
2474        Ok(())
2475    }
2476
2477    /// Phase 3 (issue #8): produce the on-disk representation of a
2478    /// raw 4092-byte page body.
2479    ///
2480    /// - On `format_minor = 0` files (uncompressed): stamp the v0
2481    ///   32-bit CRC32C trailer. Existing behavior — bit-for-bit
2482    ///   identical to what the pre-#8 code produced.
2483    /// - On `format_minor = 1` files (compression-capable): try
2484    ///   LZ4 compress. If the compressed body fits in
2485    ///   `PAGE_SIZE - PAGE_TRAILER_SIZE - 2` bytes (= 4090), emit
2486    ///   the compressed layout (`u16 LE compressed_len` + LZ4
2487    ///   bytes + zero padding + v1 trailer with flag = 1).
2488    ///   Otherwise emit the raw body + v1 trailer with flag = 0.
2489    fn encode_page_for_disk(&self, page: &Page) -> Result<Page> {
2490        let mut stamped = page.clone();
2491        if !self.is_compression_capable() {
2492            write_page_trailer(&mut stamped);
2493            return Ok(stamped);
2494        }
2495        // Compression-capable file. Try to compress the raw body.
2496        // The 2-byte length prefix sits inside the 4092-byte body
2497        // region, so the compressed payload must fit in
2498        // PAYLOAD_END - 2 = 4090 bytes for it to be worth using.
2499        encode_page_v1(page)
2500    }
2501
2502    fn handle_eviction(&mut self, evicted: Option<Evicted>) -> Result<()> {
2503        if let Some(ev) = evicted {
2504            if ev.dirty {
2505                self.write_back_page(ev.page_id, &ev.buffer)?;
2506            }
2507        }
2508        Ok(())
2509    }
2510
2511    /// Re-encode and write page 0.
2512    fn write_header(&mut self) -> Result<()> {
2513        let mut p = Page::zeroed();
2514        encode_header(&self.header, &mut p);
2515        match &mut self.backend {
2516            Backend::File(handle) => handle.write_all_at(p.as_bytes(), 0)?,
2517            Backend::Memory(bytes) => {
2518                if bytes.len() < PAGE_SIZE {
2519                    bytes.resize(PAGE_SIZE, 0);
2520                }
2521                bytes[..PAGE_SIZE].copy_from_slice(p.as_bytes());
2522            }
2523        }
2524        Ok(())
2525    }
2526
2527    /// M6 #51: route a [`Self::set_root_catalog`] header update through
2528    /// the WAL on file-backed pagers; preserve the M2/M3 direct-write
2529    /// behavior on the in-memory pager (no WAL exists). Marks the
2530    /// header dirty so [`Self::commit`] knows to append a page-0
2531    /// frame.
2532    fn stage_or_write_header(&mut self) -> Result<()> {
2533        match self.wal.as_mut() {
2534            Some(state) => {
2535                state.header_dirty = true;
2536                Ok(())
2537            }
2538            None => self.write_header(),
2539        }
2540    }
2541
2542    /// M6 #51: mark the start of a WAL transaction. Called by
2543    /// [`crate::txn::WriteTxn::begin`]. The pager tracks a depth
2544    /// counter so a future nested-txn API (M8+) can bump/decrement
2545    /// without breaking the Catalog's debug-assert. For the in-
2546    /// memory pager this is a no-op (no WAL transactional surface).
2547    pub fn begin_txn(&mut self) {
2548        if let Some(state) = self.wal.as_mut() {
2549            state.txn_depth = state.txn_depth.saturating_add(1);
2550        }
2551    }
2552
2553    /// M6 #51: mark the end of a WAL transaction. Symmetric with
2554    /// [`Self::begin_txn`]; called by [`crate::txn::WriteTxn::commit`]
2555    /// and [`crate::txn::WriteTxn::rollback`] (and the implicit
2556    /// `Drop` rollback). Saturating-decrement so a stray end without
2557    /// a matching begin does not underflow.
2558    pub fn end_txn(&mut self) {
2559        if let Some(state) = self.wal.as_mut() {
2560            state.txn_depth = state.txn_depth.saturating_sub(1);
2561        }
2562    }
2563
2564    /// M6 #51: `true` if the pager is currently inside a WAL
2565    /// transaction (file-backed) or is an in-memory pager (no WAL
2566    /// transactional surface — every mutation is immediately
2567    /// visible). Catalog mutations debug-assert this at their entry
2568    /// points so the M5 direct-write bug class cannot regress.
2569    #[must_use]
2570    pub fn in_txn(&self) -> bool {
2571        match self.wal.as_ref() {
2572            Some(state) => state.txn_depth > 0,
2573            None => true,
2574        }
2575    }
2576}
2577
2578/// Construct the in-memory header for a brand-new file. Phase 3
2579/// (issue #8): when `mode == CompressionMode::Lz4` the new file
2580/// gets `format_minor = 1` and `feature_flags` bit 0 set;
2581/// otherwise it stays at the unchanged `format_minor = 0` layout.
2582/// Initialise a freshly-created file: write the default header at
2583/// offset 0 and `fsync`. Phase 3 (issue #8): the
2584/// [`CompressionMode`] knob picks the new file's `format_minor` +
2585/// `feature_flags`.
2586///
2587/// Phase 4 (issue #9): when `encryption_key` is `Some`, a fresh
2588/// 32-byte `kdf_salt` is generated from the OS CSPRNG and stamped
2589/// into the header at offset 72..104; `format_minor` is bumped to
2590/// `2` and `feature_flags` bit 1 is set. Page 0 itself remains
2591/// plaintext (the salt MUST be readable by tooling that does not
2592/// have the key).
2593fn initialise_file<F: FileBackend>(
2594    handle: &F,
2595    compression_mode: CompressionMode,
2596    encryption_key: Option<&[u8; 32]>,
2597) -> Result<FileHeader> {
2598    let header = build_new_file_header(compression_mode, encryption_key)?;
2599    let mut p = Page::zeroed();
2600    encode_header(&header, &mut p);
2601    handle.set_len(PAGE_SIZE as u64)?;
2602    handle.write_all_at(p.as_bytes(), 0)?;
2603    handle.sync_all()?;
2604    Ok(header)
2605}
2606
2607/// Phase 4 (issue #9): pick the right header for a brand-new file
2608/// given the (`compression_mode`, `encryption_key`) tuple.
2609fn build_new_file_header(
2610    compression_mode: CompressionMode,
2611    encryption_key: Option<&[u8; 32]>,
2612) -> Result<FileHeader> {
2613    match (compression_mode, encryption_key) {
2614        (CompressionMode::Off, None) => Ok(FileHeader::new_empty()),
2615        (CompressionMode::Lz4, None) => Ok(FileHeader::new_empty_with_compression()),
2616        (CompressionMode::Off, Some(_)) => {
2617            let salt = fresh_kdf_salt()?;
2618            Ok(FileHeader::new_empty_with_encryption(salt))
2619        }
2620        (CompressionMode::Lz4, Some(_)) => {
2621            let salt = fresh_kdf_salt()?;
2622            Ok(FileHeader::new_empty_with_encryption_and_compression(salt))
2623        }
2624    }
2625}
2626
2627/// Phase 4 (issue #9): pull 32 bytes of CSPRNG into a fresh KDF
2628/// salt. Returns [`Error::Io`] on CSPRNG failure (the only failure
2629/// mode). When the `encryption` Cargo feature is OFF the function
2630/// is still defined so the open path can pattern-match on
2631/// `encryption_key.is_some()` without `#[cfg]` salting at every
2632/// call site; without the feature it falls back to the `rand`
2633/// crate's OS RNG (we already pull `rand = "0.9"` in unconditionally).
2634// The `Result` return type is uniform across the two cfg arms even
2635// though the `not(feature = "encryption")` branch never fails — the
2636// `feature = "encryption"` arm uses `getrandom` and CAN fail. Keep
2637// one signature so callers don't need to fork on the feature flag
2638// themselves.
2639#[allow(clippy::unnecessary_wraps)]
2640fn fresh_kdf_salt() -> Result<[u8; 32]> {
2641    #[cfg(feature = "encryption")]
2642    {
2643        let mut out = [0u8; 32];
2644        getrandom::getrandom(&mut out)
2645            .map_err(|e| Error::Io(std::io::Error::other(format!("getrandom failure: {e}"))))?;
2646        Ok(out)
2647    }
2648    #[cfg(not(feature = "encryption"))]
2649    {
2650        use rand::RngCore;
2651        let mut out = [0u8; 32];
2652        rand::rng().fill_bytes(&mut out);
2653        Ok(out)
2654    }
2655}
2656
2657/// Phase 4 (issue #9): given the persisted on-disk header and the
2658/// caller's `Config`, derive the page-encryption key (or fall
2659/// back to `None`). This is the single function that maps the
2660/// open-time error matrix from the issue body:
2661///
2662/// | File state | Build feature | Key | Result |
2663/// |---|---|---|---|
2664/// | minor < 2 | any | None | Ok(None) |
2665/// | minor < 2 | any | Some | Err(EncryptionKeyMismatch) |
2666/// | minor = 2 (bit 1) | OFF | any | Err(FormatFeatureUnsupported) |
2667/// | minor = 2 (bit 1) | ON | None | Err(EncryptionKeyRequired) |
2668/// | minor = 2 (bit 1) | ON | Some | Ok(Some(derive(key, kdf_salt))) |
2669fn derive_key_for_open(config: &Config, header: &FileHeader) -> Result<Option<PageEncryptionKey>> {
2670    let file_is_encrypted = (header.feature_flags & FEATURE_FLAG_ENCRYPTION) != 0;
2671    let has_feature = cfg!(feature = "encryption");
2672    match (file_is_encrypted, has_feature, config.master_key()) {
2673        // Pre-encryption file, no key: business as usual.
2674        (false, _, None) => Ok(None),
2675        // Pre-encryption file, key supplied: caller is confused.
2676        (false, _, Some(_)) => Err(Error::EncryptionKeyMismatch),
2677        // Encrypted file, no `encryption` feature: refuse open.
2678        (true, false, _) => Err(Error::FormatFeatureUnsupported {
2679            feature: "encryption",
2680        }),
2681        // Encrypted file, feature on, no key: caller forgot the key.
2682        (true, true, None) => Err(Error::EncryptionKeyRequired),
2683        // Encrypted file, feature on, key supplied: derive.
2684        #[allow(unused_variables)]
2685        (true, true, Some(user_key)) => {
2686            #[cfg(feature = "encryption")]
2687            {
2688                let derived = crate::crypto::derive_page_key(user_key, &header.kdf_salt);
2689                // Issue #31: wrap the derived key so it zeroizes on
2690                // drop with the owning `Pager`.
2691                Ok(Some(PageEncryptionKey(wrap_master_key(derived))))
2692            }
2693            #[cfg(not(feature = "encryption"))]
2694            {
2695                // Unreachable: `(true, false, _)` above caught it.
2696                // Spell it out so the compiler is happy.
2697                let _ = user_key;
2698                Err(Error::FormatFeatureUnsupported {
2699                    feature: "encryption",
2700                })
2701            }
2702        }
2703    }
2704}
2705
2706/// Phase 3 (issue #8): reject opens that require features the
2707/// running binary was not compiled with. Today there is exactly
2708/// one such gate (`compression`); the function is structured so
2709/// future features (encryption, alternate codecs) plug in beside
2710/// it without touching the call site.
2711fn refuse_unsupported_features(header: &FileHeader) -> Result<()> {
2712    // Phase 3 (issue #8) compression gate: bit 0 of feature_flags
2713    // is the authoritative signal. The pre-Phase-4 code also
2714    // treated `format_minor >= 1` as an implicit compression
2715    // signal (to catch files written by a future build that
2716    // forgot to set the flag); that conflation broke once
2717    // format_minor = 2 became the encryption-capable baseline
2718    // (a file with bit 1 set but bit 0 clear is encryption-only
2719    // and MUST NOT trip the compression gate). Phase 4 narrows
2720    // the check to the explicit flag.
2721    let uses_compression = (header.feature_flags & FEATURE_FLAG_COMPRESSION) != 0;
2722    if uses_compression && !cfg!(feature = "compression") {
2723        return Err(Error::FormatFeatureUnsupported {
2724            feature: "compression",
2725        });
2726    }
2727    // Phase 4 (issue #9): encryption gate.
2728    let uses_encryption = (header.feature_flags & FEATURE_FLAG_ENCRYPTION) != 0;
2729    if uses_encryption && !cfg!(feature = "encryption") {
2730        return Err(Error::FormatFeatureUnsupported {
2731            feature: "encryption",
2732        });
2733    }
2734    Ok(())
2735}
2736
2737/// Phase 3 (issue #8): refuse to create new compression-capable
2738/// files when the build lacks the `compression` Cargo feature.
2739/// Without this guard, the open would happily produce a
2740/// `format_minor = 1` file the same build cannot reopen.
2741fn refuse_compression_without_feature(mode: CompressionMode) -> Result<()> {
2742    if matches!(mode, CompressionMode::Lz4) && !cfg!(feature = "compression") {
2743        return Err(Error::FormatFeatureUnsupported {
2744            feature: "compression",
2745        });
2746    }
2747    Ok(())
2748}
2749
2750/// Phase 4 (issue #9): refuse to create new encryption-capable
2751/// files when the build lacks the `encryption` Cargo feature.
2752fn refuse_encryption_without_feature(has_key: bool) -> Result<()> {
2753    if has_key && !cfg!(feature = "encryption") {
2754        return Err(Error::FormatFeatureUnsupported {
2755            feature: "encryption",
2756        });
2757    }
2758    Ok(())
2759}
2760
2761/// Phase 4 (issue #9): `true` iff this build was compiled with the
2762/// `encryption` Cargo feature. Exposed for diagnostic tooling and
2763/// integration tests that need to dispatch on the feature without
2764/// embedding a `cfg!` of their own.
2765#[must_use]
2766pub const fn encryption_feature_compiled_in() -> bool {
2767    cfg!(feature = "encryption")
2768}
2769
2770/// Extracted from [`Pager::open_with_backends`] so that function
2771/// stays within the power-of-ten 60-line budget (Rule 4). Either
2772/// rolls a recovered WAL forward into a `WalState` + view, or
2773/// creates a fresh WAL and stamps its salt into the main file
2774/// header.
2775///
2776/// Rationale for the `clippy::type_complexity` allow: the tuple
2777/// return shape is local to this single call site and mirrors
2778/// the immediate `(wal, view, view_header)` destructuring in
2779/// [`Pager::open_with_backends`]. Naming the tuple would be more
2780/// machinery than it deserves for a private helper that exists
2781/// solely to satisfy Rule 4.
2782#[allow(clippy::type_complexity)]
2783fn recover_or_create_wal<F: FileBackend>(
2784    main: &F,
2785    wal: F,
2786    wal_path: std::path::PathBuf,
2787    header: &mut FileHeader,
2788    config: &Config,
2789    derived_key: Option<&PageEncryptionKey>,
2790) -> Result<(Wal<F>, HashMap<PageId, Page>, Option<Page>)> {
2791    let expected_salt = salt_from_header(header);
2792    // Phase 4 (issue #9): the WAL inherits the SAME per-file
2793    // page-encryption key the main pager uses. Pass it in so
2794    // recovery decrypts each frame body before validating the
2795    // existing CRC.
2796    let wal_key_bytes = derived_key.map(|k| *k.as_bytes());
2797    let recovered = Wal::<F>::open_for_recovery_with_key(
2798        &wal,
2799        expected_salt,
2800        config.wal_size_limit,
2801        wal_key_bytes,
2802    )?;
2803    if recovered.committed_frames > 0 {
2804        let mut w = Wal::<F>::from_recovered_meta(
2805            wal,
2806            wal_path,
2807            recovered.salt,
2808            recovered.next_lsn,
2809            recovered.end_offset,
2810            recovered.committed_frames,
2811            config.wal_config(),
2812        );
2813        w.set_key(wal_key_bytes);
2814        let recovered_header = recovered.header.clone();
2815        if let Some(hp) = &recovered_header {
2816            let decoded = decode_header(hp)?;
2817            header.root_catalog = decoded.root_catalog;
2818            header.freelist_head = decoded.freelist_head;
2819            header.page_count = decoded.page_count;
2820        }
2821        Ok((w, recovered.into_view(), recovered_header))
2822    } else {
2823        let mut w = Wal::<F>::create_fresh_with(wal, wal_path, config.wal_config())?;
2824        w.set_key(wal_key_bytes);
2825        stamp_salt_into_header(header, w.salt());
2826        write_header_to_backend(main, header)?;
2827        main.sync_data(config.sync_mode)?;
2828        Ok((w, HashMap::new(), None))
2829    }
2830}
2831
2832/// Phase 3 (issue #8): byte offset of the trailer within a page.
2833/// Equal to `PAGE_SIZE - PAGE_TRAILER_SIZE = 4092` and matches the
2834/// constant of the same name in `pager::checksum`. Declared here
2835/// too so the encode/decode helpers in this module can stay
2836/// independent of the `checksum` module's private constants.
2837const V1_BODY_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;
2838
2839/// Phase 3 (issue #8): max LZ4 compressed-payload size that fits
2840/// inside the page body alongside the 2-byte length prefix.
2841/// Equal to `V1_BODY_END - 2 = 4090`.
2842const V1_MAX_COMPRESSED_LEN: usize = V1_BODY_END - 2;
2843
2844/// Phase 3 (issue #8): encode a raw 4092-byte page body for
2845/// on-disk storage in a `format_minor = 1` (compression-capable)
2846/// file.
2847///
2848/// Tries LZ4 compression. If the compressed payload fits in
2849/// [`V1_MAX_COMPRESSED_LEN`] (= 4090) bytes the layout is
2850/// `[u16 LE compressed_len][LZ4 bytes][zero pad to 4092][v1
2851/// trailer flag=1, 31-bit CRC]`. Otherwise the layout is
2852/// `[raw 4092-byte body][v1 trailer flag=0, 31-bit CRC]`.
2853///
2854/// Without the `compression` Cargo feature this function emits
2855/// the uncompressed layout unconditionally — the `Pager::open`
2856/// refusal in [`refuse_unsupported_features`] guarantees we
2857/// never reach this code path against a `format_minor >= 1`
2858/// file in that build configuration, but defending in depth is
2859/// cheap and consistent with Rule 5.
2860///
2861/// # Errors
2862///
2863/// #62: returns [`Error::InvalidArgument`] if the LZ4-compressed
2864/// length somehow exceeds the 2-byte length prefix's range, rather
2865/// than writing a self-corrupting length-0 page. The
2866/// `V1_MAX_COMPRESSED_LEN` guard makes this unreachable today.
2867// The `Result` return is uniform across the two cfg arms even
2868// though the `not(feature = "compression")` branch never fails —
2869// only the `feature = "compression"` arm has the fallible length
2870// check. Keep one signature so the caller (`encode_page_for_disk`)
2871// does not fork on the feature flag.
2872#[cfg_attr(not(feature = "compression"), allow(clippy::unnecessary_wraps))]
2873fn encode_page_v1(page: &Page) -> Result<Page> {
2874    #[cfg(feature = "compression")]
2875    {
2876        // Stack-allocated worst-case-sized scratch. `lz4_flex::block::
2877        // compress_into` requires the destination buffer to be large
2878        // enough for the LZ4 worst-case output regardless of the
2879        // input's actual entropy; calling with a tight 4090-byte
2880        // buffer yields `OutputTooSmall` even for highly compressible
2881        // inputs that would produce a tiny output. The worst case
2882        // for a 4092-byte input is bounded by `get_maximum_output_size`,
2883        // which the lz4_flex API publishes for exactly this purpose.
2884        // The stack array (a few KiB) is allocated each call but
2885        // never escapes — no heap on the write hot path (Rule 3).
2886        let max_out = lz4_flex::block::get_maximum_output_size(V1_BODY_END);
2887        // Guard against an absurd future regression of
2888        // get_maximum_output_size — should always fit in 8 KiB for
2889        // a 4092-byte input.
2890        let mut scratch = [0u8; 8192];
2891        if max_out > scratch.len() {
2892            // Fall through to the uncompressed layout.
2893        } else {
2894            let raw = &page.as_bytes()[..V1_BODY_END];
2895            if let Ok(compressed_len) = lz4_flex::block::compress_into(raw, &mut scratch[..max_out])
2896            {
2897                if compressed_len > 0 && compressed_len <= V1_MAX_COMPRESSED_LEN {
2898                    let mut out = Page::zeroed();
2899                    let buf = out.as_bytes_mut();
2900                    // Length prefix (u16 LE). The `compressed_len <=
2901                    // V1_MAX_COMPRESSED_LEN` guard above already bounds
2902                    // this below `u16::MAX`. #62: rather than mask a
2903                    // future invariant break with `unwrap_or(0)` (which
2904                    // would write a self-corrupting length-0 page),
2905                    // surface the out-of-range case as an error so the
2906                    // corruption never reaches the disk.
2907                    let len_u16 = u16::try_from(compressed_len).map_err(|_| {
2908                        Error::InvalidArgument(
2909                            "encode_page_v1: compressed length exceeds u16 length prefix",
2910                        )
2911                    })?;
2912                    buf[0..2].copy_from_slice(&len_u16.to_le_bytes());
2913                    buf[2..2 + compressed_len].copy_from_slice(&scratch[..compressed_len]);
2914                    // bytes[2 + compressed_len..V1_BODY_END] stay zero
2915                    // from `Page::zeroed`.
2916                    write_page_trailer_v1(&mut out, true);
2917                    return Ok(out);
2918                }
2919            }
2920        }
2921    }
2922    // Either compression is disabled, didn't help, or errored.
2923    // Emit the uncompressed v1 layout.
2924    let mut out = page.clone();
2925    write_page_trailer_v1(&mut out, false);
2926    Ok(out)
2927}
2928
2929/// Phase 3 (issue #8): decode a 4096-byte on-disk page from a
2930/// `format_minor = 1` file into its raw 4092-byte body
2931/// representation (with a v0 trailer re-stamped so downstream
2932/// consumers that call [`page_trailer_valid`] continue to work).
2933///
2934/// Verifies the v1 trailer FIRST (Rule 2: CRC before
2935/// decompress); a CRC mismatch returns
2936/// `Error::Corruption { page_id }` without invoking LZ4. The
2937/// LZ4 decompress is bounded to a fixed 4092-byte output buffer;
2938/// any size mismatch is `Error::Corruption`.
2939///
2940/// Public for the fuzz harness (`obj-fuzz/fuzz_targets/
2941/// page_decode_compressed.rs`). Application code SHOULD reach
2942/// for [`Pager::read_page`] instead.
2943///
2944/// # Errors
2945///
2946/// - [`Error::Corruption`] with the supplied `page_id` if the v1
2947///   trailer's 31-bit CRC does not match the recomputed CRC, if
2948///   the compressed-length prefix is out of range, or if the
2949///   LZ4 output is not exactly 4092 bytes.
2950/// - [`Error::FormatFeatureUnsupported`] (when the `compression`
2951///   Cargo feature is OFF and the trailer's compression flag is
2952///   set — unreachable on a well-formed open path, which refuses
2953///   such files at `Db::open`).
2954pub fn decode_page_v1(disk: &Page, page_id: u64) -> Result<Page> {
2955    if !page_trailer_valid_v1(disk) {
2956        return Err(Error::Corruption { page_id });
2957    }
2958    if !crate::pager::checksum::page_trailer_flag_v1(disk) {
2959        // Uncompressed page. Bytes [0..V1_BODY_END] ARE the raw
2960        // body; copy them into a new Page and re-stamp the v0
2961        // trailer so the rest of the codebase sees a uniform
2962        // 4092-byte-body + v0-CRC representation.
2963        let mut out = Page::zeroed();
2964        out.as_bytes_mut()[..V1_BODY_END].copy_from_slice(&disk.as_bytes()[..V1_BODY_END]);
2965        write_page_trailer(&mut out);
2966        return Ok(out);
2967    }
2968    // Compressed page. Read the length prefix, then decompress
2969    // the LZ4 block into a fixed 4092-byte output.
2970    decode_compressed_page_v1(disk, page_id)
2971}
2972
2973/// Phase 3 (issue #8): decompress a `format_minor = 1` page
2974/// whose flag bit is set. Split out so the gated `compression`
2975/// feature is localised to a single function; without the
2976/// feature the only way to reach this code is a malformed file
2977/// the open-time refusal already rejected, but we keep the
2978/// shape consistent.
2979fn decode_compressed_page_v1(disk: &Page, page_id: u64) -> Result<Page> {
2980    let body = &disk.as_bytes()[..V1_BODY_END];
2981    let mut len_buf = [0u8; 2];
2982    len_buf.copy_from_slice(&body[0..2]);
2983    let compressed_len = usize::from(u16::from_le_bytes(len_buf));
2984    if compressed_len == 0 || compressed_len > V1_MAX_COMPRESSED_LEN {
2985        return Err(Error::Corruption { page_id });
2986    }
2987    #[cfg(feature = "compression")]
2988    {
2989        let input = &body[2..2 + compressed_len];
2990        let mut out = Page::zeroed();
2991        let decompressed = {
2992            let dest = &mut out.as_bytes_mut()[..V1_BODY_END];
2993            lz4_flex::block::decompress_into(input, dest)
2994                .map_err(|_| Error::Corruption { page_id })?
2995        };
2996        // The decompressed body MUST be exactly V1_BODY_END
2997        // bytes. Anything else is corruption (the input claims
2998        // to expand to a page-sized body and didn't).
2999        if decompressed != V1_BODY_END {
3000            return Err(Error::Corruption { page_id });
3001        }
3002        // Re-stamp v0 trailer so downstream `page_trailer_valid`
3003        // checks against the in-memory copy continue to work.
3004        write_page_trailer(&mut out);
3005        Ok(out)
3006    }
3007    #[cfg(not(feature = "compression"))]
3008    {
3009        let _ = compressed_len;
3010        let _ = body;
3011        Err(Error::FormatFeatureUnsupported {
3012            feature: "compression",
3013        })
3014    }
3015}
3016
3017/// Read and decode page 0 from an existing file.
3018fn load_header<F: FileBackend>(handle: &F) -> Result<FileHeader> {
3019    let len = handle.len()?;
3020    if len < PAGE_SIZE as u64 {
3021        return Err(Error::InvalidFormat {
3022            reason: "file is shorter than one page",
3023        });
3024    }
3025    let mut p = Page::zeroed();
3026    handle.read_exact_at(p.as_bytes_mut(), 0)?;
3027    decode_header(&p)
3028}
3029
3030/// Read the WAL generation salt from the main file's `wal_salt`
3031/// field. The first four bytes carry the current generation salt;
3032/// the remaining 12 are reserved (zero in format-major 0). See
3033/// `docs/format.md` § Salt rotation.
3034fn salt_from_header(header: &FileHeader) -> u32 {
3035    u32::from_le_bytes([
3036        header.wal_salt[0],
3037        header.wal_salt[1],
3038        header.wal_salt[2],
3039        header.wal_salt[3],
3040    ])
3041}
3042
3043/// Stamp `salt` into the first four bytes of `header.wal_salt`. The
3044/// remaining 12 bytes are zeroed (format-major 0 reserves them).
3045fn stamp_salt_into_header(header: &mut FileHeader, salt: u32) {
3046    let bytes = salt.to_le_bytes();
3047    header.wal_salt = [0u8; 16];
3048    header.wal_salt[0..4].copy_from_slice(&bytes);
3049}
3050
3051/// Re-encode and write the header to any file backend. Used during
3052/// open when we cannot yet borrow `&mut self` (the pager is still
3053/// being constructed).
3054fn write_header_to_backend<F: FileBackend>(handle: &F, header: &FileHeader) -> Result<()> {
3055    let mut p = Page::zeroed();
3056    encode_header(header, &mut p);
3057    handle.write_all_at(p.as_bytes(), 0)
3058}
3059
3060/// Construct the WAL sidecar path for a given main-file path. We
3061/// append `-wal` to the file name (mirroring `SQLite`'s convention);
3062/// the WAL lives next to the main file.
3063///
3064/// Exposed for integration tests and tooling that need to inspect or
3065/// manipulate the sidecar directly (e.g. the crash-cycle fault
3066/// harness and the eventual verifier CLI).
3067#[must_use]
3068pub fn wal_path_for(main: &Path) -> PathBuf {
3069    let mut buf = main.as_os_str().to_os_string();
3070    buf.push("-wal");
3071    PathBuf::from(buf)
3072}
3073
3074/// Construct the cross-process lock sidecar path for a given
3075/// main-file path. We append `-lock` to the file name (mirroring
3076/// the `<db>-wal` sidecar convention); the lock file lives next
3077/// to the main DB and is the byte-range target for `WRITER_LOCK`
3078/// / `READER_LOCK_RANGE` (see `platform::lock`).
3079///
3080/// Issue #1: keeping the lock byte in a dedicated file prevents
3081/// pager I/O on the main DB from ever overlapping the locked
3082/// byte range. This matters on Windows because `LockFileEx`
3083/// produces mandatory locks; once the main DB grew past the
3084/// old past-EOF anchor at `0x4000_0000`, page writes that
3085/// crossed that byte failed with `ERROR_LOCK_VIOLATION`. With
3086/// the sidecar, the lock handle and the pager handle target
3087/// different files, so the failure mode cannot recur.
3088#[must_use]
3089pub fn lock_path_for(main: &Path) -> PathBuf {
3090    let mut buf = main.as_os_str().to_os_string();
3091    buf.push("-lock");
3092    PathBuf::from(buf)
3093}
3094
3095#[cfg(test)]
3096mod tests;
3097
3098#[cfg(any(test, feature = "fault-injection"))]
3099#[cfg(test)]
3100mod tests_fault;