Skip to main content

obj_core/wal/
mod.rs

1//! Write-ahead log (L2).
2//!
3//! The WAL is the durability layer that sits between the pager and the
4//! main file. Writes go to an append-only sidecar (`<main>-wal`) first;
5//! a checkpoint (M3 issue #16) later rolls them into the main file.
6//! Recovery / replay on open is implemented by
7//! [`Wal::open_for_recovery`] (M3 issue #15).
8//!
9//! See `docs/format.md` § Write-ahead log for the byte layout this
10//! module is the reference implementation of, and § Recovery semantics
11//! for the algorithm `open_for_recovery` enacts.
12//!
13//! # Power-of-ten posture
14//!
15//! - **Rule 2.** Every loop in this module is bounded — either by a
16//!   `Vec`'s length (txn buffer) or by the WAL file's frame-count
17//!   limit (recovery, added in #15).
18//! - **Rule 5.** Per-frame `salt`, per-frame `crc32c`, commit-marker
19//!   pivot, and the file-level magic are layered defenses against
20//!   torn writes and stale generations. Every decision is driven by
21//!   an explicit invariant check, not an implicit cast.
22//! - **Rule 7.** No `unwrap` / `expect` in production code paths.
23//! - **Rule 8.** All file I/O goes through [`crate::platform`]; this
24//!   module is `#![forbid(unsafe_code)]`.
25
26#![forbid(unsafe_code)]
27
28pub mod frame;
29
30use std::collections::HashMap;
31use std::path::{Path, PathBuf};
32
33use rand::RngCore;
34use serde::{Deserialize, Serialize};
35
36use crate::error::{Error, Result};
37use crate::pager::page::{Page, PageId, PAGE_SIZE};
38use crate::platform::{remove_file_if_exists, FileBackend, FileHandle, SyncMode};
39use crate::wal::frame::{
40    decode_frame_header_classified, encode_frame_header, frame_size_for, FrameDecode, FrameHeader,
41    FRAME_HEADER_SIZE, FRAME_SIZE, WAL_HEADER_SIZE, WAL_MAGIC,
42};
43#[cfg(feature = "encryption")]
44use crate::wal::frame::{FRAME_AEAD_SUFFIX_SIZE, FRAME_SIZE_ENCRYPTED};
45
46/// Log sequence number.
47///
48/// Monotonically increasing within a single WAL generation; reset to
49/// zero across checkpoints (the salt rotation disambiguates). The
50/// sentinel value [`Lsn::ZERO`] represents "no LSN" — returned by
51/// [`crate::pager::Pager::commit`] for an empty transaction and by
52/// [`crate::pager::Pager::reader_snapshot`] for in-memory pagers
53/// that have no WAL.
54///
55/// `Lsn` is a `#[repr(transparent)]` newtype over `u64` so the
56/// type-system rejects implicit confusion with page counts, byte
57/// offsets, or page ids (Power-of-Ten Rule 5). The serde encoding is
58/// `#[serde(transparent)]` — an `Lsn` round-trips byte-identically to
59/// the bare `u64` it wraps, which preserves wire compatibility with
60/// any future on-disk record that names it directly.
61///
62/// `Lsn` deliberately does NOT implement `Add<u64>` / `AddAssign<u64>`
63/// or any other arithmetic trait. Step it through the explicit
64/// [`Lsn::checked_next`] / [`Lsn::prev_saturating`] helpers so every
65/// mutation is auditable.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
67#[repr(transparent)]
68#[serde(transparent)]
69pub struct Lsn(u64);
70
71impl Lsn {
72    /// The sentinel "no LSN" value. Returned by
73    /// [`crate::pager::Pager::commit`] when the transaction was empty
74    /// and by [`crate::pager::Pager::reader_snapshot`] on in-memory
75    /// pagers (no WAL exists).
76    pub const ZERO: Self = Self(0);
77
78    /// The LSN handed out for the first frame of a fresh WAL
79    /// generation.
80    pub const ONE: Self = Self(1);
81
82    /// Construct an [`Lsn`] from a raw `u64`. The underlying `u64`
83    /// has no invariants — any value (including `0`) is valid —
84    /// so this is a total function.
85    #[must_use]
86    pub const fn new(raw: u64) -> Self {
87        Self(raw)
88    }
89
90    /// The raw `u64` LSN value. Use this only when crossing into
91    /// hand-rolled byte serialization (see
92    /// [`crate::wal::frame::FrameHeader::lsn`]) or when emitting
93    /// diagnostics; arithmetic should go through the explicit step
94    /// helpers below.
95    #[must_use]
96    pub const fn get(self) -> u64 {
97        self.0
98    }
99
100    /// Monotonic step: return the next LSN, or [`Error::InvalidArgument`]
101    /// on `u64` overflow.
102    ///
103    /// # Errors
104    ///
105    /// Returns [`Error::InvalidArgument`] when the underlying counter
106    /// would wrap past `u64::MAX`. At 10⁶ commits/sec this is ~584 000
107    /// years; the check is defensive (Power-of-Ten Rule 7) and
108    /// extremely cheap.
109    pub fn checked_next(self) -> Result<Self> {
110        self.0
111            .checked_add(1)
112            .map(Self)
113            .ok_or(Error::InvalidArgument("LSN overflow"))
114    }
115
116    /// Predecessor LSN, saturating at [`Lsn::ZERO`].
117    ///
118    /// Used by [`crate::pager::Pager::commit`] / `reader_snapshot`
119    /// to report the LSN of the *last* committed frame as
120    /// `next_lsn - 1`, with the special case `next_lsn == ZERO`
121    /// mapping back to `ZERO` rather than wrapping.
122    #[must_use]
123    pub const fn prev_saturating(self) -> Self {
124        Self(self.0.saturating_sub(1))
125    }
126}
127
128/// Default size cap on the WAL file, in bytes. The cap exists so that
129/// a runaway "write without ever committing or checkpointing"
130/// workload cannot make recovery walk unboundedly many frames
131/// (power-of-ten Rule 2).
132///
133/// 64 MiB / 4160 bytes/frame ≈ 16 145 frames — the recovery walk
134/// length we have to ship a bound for.
135pub const DEFAULT_WAL_SIZE_LIMIT: u64 = 64 * 1024 * 1024;
136
137/// Default automatic-checkpoint threshold, in frames. When the WAL
138/// has more than this many frames committed, the pager will call its
139/// checkpoint routine inline (M3 issue #16).
140pub const DEFAULT_CHECKPOINT_THRESHOLD: u64 = 1_000;
141
142/// WAL construction options.
143#[derive(Debug, Clone, Copy)]
144pub struct WalConfig {
145    /// Per-commit durability primitive.
146    pub sync_mode: SyncMode,
147    /// Maximum WAL file size in bytes. Exceeding this returns
148    /// `Error::InvalidArgument("wal size limit exceeded")`.
149    pub size_limit: u64,
150    /// Auto-checkpoint threshold (in frames).
151    pub checkpoint_threshold: u64,
152}
153
154impl Default for WalConfig {
155    fn default() -> Self {
156        Self {
157            sync_mode: SyncMode::Full,
158            size_limit: DEFAULT_WAL_SIZE_LIMIT,
159            checkpoint_threshold: DEFAULT_CHECKPOINT_THRESHOLD,
160        }
161    }
162}
163
164/// Result of walking an on-disk WAL during recovery.
165///
166/// `view` is the per-page-id last-committed payload, ready to be
167/// merged into the pager's in-memory view. `next_lsn` and
168/// `end_offset` are the seekpoints the resulting [`Wal`] uses for
169/// subsequent appends; `salt` and `committed_frames` carry over
170/// from the WAL header.
171///
172/// `header` (M6 #51) carries the page-0 file-header bytes from the
173/// most-recent committed frame whose `page_id` was `0`. The pager
174/// applies these on adoption so the in-memory header reflects
175/// WAL-staged catalog-root updates that the on-disk header at offset
176/// 0 does not yet carry (until checkpoint).
177#[derive(Debug)]
178pub struct Recovered {
179    /// Per-page-id, the body of the most-recent committed frame.
180    pub view: HashMap<PageId, Page>,
181    /// Header page-0 bytes recovered from a WAL frame with
182    /// `page_id = 0`, if any.
183    pub header: Option<Page>,
184    /// LSN that the next [`WalTxn::commit`] will assign.
185    pub next_lsn: Lsn,
186    /// WAL generation salt (as read from the WAL header on disk).
187    pub salt: u32,
188    /// Number of committed frames on disk (torn-tail not counted).
189    pub committed_frames: u64,
190    /// Byte length where the next frame will be appended. Equals the
191    /// position just past the last committed frame; torn tail (if
192    /// any) sits between this offset and the file length on disk.
193    pub end_offset: u64,
194}
195
196impl Recovered {
197    /// Consume the [`Recovered`] and return ownership of the per-
198    /// page recovered view. Used by the pager when it adopts the
199    /// recovered state.
200    #[must_use]
201    pub fn into_view(self) -> HashMap<PageId, Page> {
202        self.view
203    }
204}
205
206/// Phase 4 (issue #9): newtype wrapper around the derived 32-byte
207/// WAL page-encryption key. Manual `Debug` impl redacts the bytes so
208/// the key never appears in log output.
209///
210/// Issue #31: the inner field is [`crate::pager::MasterKeyBytes`], so
211/// under the `encryption` feature the WAL's copy of the per-file page
212/// key is wiped from memory when the owning [`Wal`] is dropped.
213/// `Copy` is derived only on the no-`encryption` build, where the
214/// field is a bare `[u8; 32]` and never holds a real key.
215#[cfg_attr(not(feature = "encryption"), derive(Copy))]
216#[derive(Clone)]
217#[allow(dead_code)] // Field is read only under `feature = "encryption"`.
218pub(crate) struct WalKey(crate::pager::MasterKeyBytes);
219
220impl WalKey {
221    #[must_use]
222    #[allow(dead_code)] // Reachable only under `feature = "encryption"`.
223    pub(crate) fn new(bytes: [u8; 32]) -> Self {
224        // Issue #31: wrap so the stored copy zeroizes on drop under
225        // the `encryption` feature. `wrap_master_key` is the
226        // reflexive identity on the no-feature (`[u8; 32]`) build.
227        Self(crate::pager::wrap_master_key(bytes))
228    }
229
230    #[inline]
231    #[allow(dead_code)] // Reachable only under `feature = "encryption"`.
232    pub(crate) fn as_bytes(&self) -> &[u8; 32] {
233        // Deref-coerce through `MasterKeyBytes` to a plain `&[u8; 32]`
234        // for the crypto hot path.
235        let bytes: &[u8; 32] = &self.0;
236        bytes
237    }
238}
239
240impl std::fmt::Debug for WalKey {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.write_str("WalKey(<redacted>)")
243    }
244}
245
246/// The write-ahead log.
247///
248/// Owns the on-disk WAL file, the current generation salt, and the LSN
249/// counter. The pager talks to a `Wal` via [`Wal::begin_txn`], staging
250/// per-page writes in a [`WalTxn`] and then calling [`WalTxn::commit`]
251/// to make them durable.
252///
253/// Generic over `F: FileBackend` (Rule 9: static dispatch on the hot
254/// path). Production code uses `Wal<FileHandle>`; the fault-injection
255/// harness substitutes `Wal<FaultyFileHandle>` to drive recovery
256/// against torn writes, dropped fsyncs, and bit flips.
257///
258/// Phase 4 (issue #9): when the parent pager opens an
259/// encryption-capable file with the right key, the WAL also
260/// encrypts each frame body with `XChaCha20-Poly1305`. The frame
261/// layout gains a 40-byte suffix (`nonce || tag`), the on-disk
262/// per-frame stride becomes 4200 bytes, and the frame's existing
263/// CRC32C is computed over (`header_sans_crc` + PLAINTEXT body) —
264/// the CRC catches in-memory bit-flips on the post-decryption
265/// representation rather than running on attacker-controlled
266/// ciphertext.
267#[derive(Debug)]
268pub struct Wal<F: FileBackend = FileHandle> {
269    file: F,
270    path: PathBuf,
271    salt: u32,
272    next_lsn: Lsn,
273    /// Byte offset where the next frame will be written.
274    end_offset: u64,
275    /// Frames-on-disk count (committed; torn-tail not counted). Used
276    /// by the pager to decide when to auto-checkpoint.
277    committed_frames: u64,
278    config: WalConfig,
279    /// Phase 4 (issue #9): per-file page-encryption key, derived
280    /// once at open from `HKDF-SHA256(user_key, kdf_salt,
281    /// b"obj-page-encryption-v1")` by the pager. `None` =
282    /// plaintext WAL (legacy behaviour). The key is the SAME as the
283    /// pager's `derived_key` — the design specifically calls out
284    /// that the WAL and the main file share one key.
285    key: Option<WalKey>,
286}
287
288/// An in-progress WAL transaction.
289///
290/// Buffers `(page_id, page_body)` pairs in memory; the actual disk
291/// writes happen at [`WalTxn::commit`]. This is how group commit
292/// works: many calls to [`WalTxn::append`] amortise one `sync_data`.
293#[derive(Debug)]
294pub struct WalTxn<'a, F: FileBackend = FileHandle> {
295    wal: &'a mut Wal<F>,
296    /// LIFO of staged frames; iterated in order on commit.
297    staged: Vec<(PageId, Page)>,
298    /// M6 #51 / #85: per-staged-frame "is this a page-0 file-header
299    /// update?" flag, index-aligned with `staged`. On commit a `true`
300    /// entry is emitted with `page_id == 0` in the on-disk frame
301    /// header; the `staged` tuple carries a stand-in `PageId::new(1)`
302    /// because `PageId` cannot represent zero. Carried as a parallel
303    /// `Vec<bool>` (not folded into the tuple) so `drain_staged` can
304    /// still hand back `staged` verbatim, and allocated once per txn
305    /// rather than rebuilt into a `HashSet` per commit.
306    is_header: Vec<bool>,
307}
308
309impl Wal<FileHandle> {
310    /// Create or truncate the WAL sidecar at `path` to a fresh,
311    /// empty WAL backed by a [`FileHandle`]. Convenience for
312    /// production callers; see [`Wal::create_fresh_with`] when the
313    /// caller already holds a backend instance (e.g. a fault-injection
314    /// harness).
315    ///
316    /// # Errors
317    ///
318    /// Returns [`Error::Io`] on syscall failure.
319    pub fn create_fresh(path: &Path, config: WalConfig) -> Result<Self> {
320        let file = FileHandle::open_or_create(path)?;
321        Self::create_fresh_with(file, path.to_path_buf(), config)
322    }
323
324    /// Walk the on-disk WAL at `path` and produce a [`Recovered`]
325    /// snapshot, opening the WAL with a production [`FileHandle`].
326    ///
327    /// See [`Wal::open_for_recovery_with`] for the documented
328    /// algorithm; see [`Wal::create_fresh`] for the file-handle
329    /// rationale.
330    ///
331    /// # Errors
332    ///
333    /// See [`Wal::open_for_recovery_with`].
334    pub fn open_for_recovery(
335        path: &Path,
336        expected_salt: u32,
337        size_limit: u64,
338    ) -> Result<Recovered> {
339        if !path.exists() {
340            return Ok(empty_recovered(expected_salt));
341        }
342        let file = FileHandle::open_or_create(path)?;
343        Self::open_for_recovery_with(&file, expected_salt, size_limit)
344    }
345}
346
347impl<F: FileBackend> Wal<F> {
348    /// Create or truncate the WAL sidecar at `path` to a fresh,
349    /// empty WAL on top of an already-opened backend `file`. Any
350    /// existing content is overwritten with a new WAL header carrying
351    /// a freshly-sampled generation salt.
352    ///
353    /// # Errors
354    ///
355    /// Returns [`Error::Io`] on syscall failure.
356    pub fn create_fresh_with(file: F, path: PathBuf, config: WalConfig) -> Result<Self> {
357        file.set_len(0)?;
358        let salt = fresh_salt();
359        write_wal_header(&file, salt)?;
360        file.sync_data(config.sync_mode)?;
361        Ok(Self {
362            file,
363            path,
364            salt,
365            next_lsn: Lsn::ONE,
366            end_offset: WAL_HEADER_SIZE as u64,
367            committed_frames: 0,
368            config,
369            key: None,
370        })
371    }
372
373    /// Phase 4 (issue #9): set the WAL's page-encryption key. Called
374    /// by the pager immediately after open / create on
375    /// encryption-capable files. `None` clears the key (no-op for
376    /// callers that already opened a plaintext WAL).
377    ///
378    /// Must be called BEFORE any `append` or recovery — the WAL
379    /// records its frame size at write/read time from
380    /// `self.key.is_some()`, so toggling the key mid-stream would
381    /// produce frames of mixed sizes that recovery cannot walk.
382    pub(crate) fn set_key(&mut self, key: Option<[u8; 32]>) {
383        self.key = key.map(WalKey::new);
384    }
385
386    /// Adopt an already-walked WAL handle. Used by `Pager::open`
387    /// after [`Wal::open_for_recovery`] has returned a [`Recovered`].
388    /// `salt`, `next_lsn`, `committed_frames`, and `end_offset` are
389    /// taken from `recovered`; the caller separately merges
390    /// `recovered.view` into the pager's in-memory state.
391    #[must_use]
392    pub fn from_recovered_meta(
393        file: F,
394        path: PathBuf,
395        salt: u32,
396        next_lsn: Lsn,
397        end_offset: u64,
398        committed_frames: u64,
399        config: WalConfig,
400    ) -> Self {
401        Self {
402            file,
403            path,
404            salt,
405            next_lsn,
406            end_offset,
407            committed_frames,
408            config,
409            key: None,
410        }
411    }
412
413    /// Walk an already-open WAL file and produce a [`Recovered`]
414    /// snapshot.
415    ///
416    /// Algorithm (matches `docs/format.md` § Recovery semantics):
417    ///
418    /// 1. If `path` does not exist, or is shorter than a WAL header,
419    ///    return an empty `Recovered` carrying `expected_salt`.
420    /// 2. Read the WAL header. If magic / format-major / page-size
421    ///    disagree with the build, fail with
422    ///    [`Error::InvalidFormat`].
423    /// 3. If the header's salt does not equal `expected_salt`, the
424    ///    WAL is from a previous generation; return an empty
425    ///    `Recovered`.
426    /// 4. **Pass 1**: scan every aligned frame in the WAL and record
427    ///    the byte offset of the *last* frame whose salt matches and
428    ///    whose CRC validates AND whose commit-marker bit is set.
429    ///    Frames whose CRC fails (or whose salt does not match) in
430    ///    pass 1 are silently skipped — they might be torn-tail noise
431    ///    that precedes a later valid commit marker.
432    /// 5. **Pass 2**: walk frames from offset [`WAL_HEADER_SIZE`] up
433    ///    to (but not past) the last-commit-end offset from pass 1.
434    ///    Any frame in this range whose salt matches MUST have a
435    ///    valid CRC; otherwise return [`Error::WalCorruption`] — the
436    ///    bad frame sits between two intact commit markers and
437    ///    recovery cannot determine if data was lost.
438    /// 6. Salt-mismatched frames inside pass 2's range are skipped
439    ///    (they are stale-generation noise, not corruption). Frames
440    ///    *past* the last commit marker are torn tail and are
441    ///    silently discarded.
442    ///
443    /// # Errors
444    ///
445    /// - [`Error::Io`] on syscall failure.
446    /// - [`Error::InvalidFormat`] when the WAL header is malformed
447    ///   in a way that indicates a config mismatch rather than torn
448    ///   tail.
449    /// - [`Error::WalCorruption`] when a CRC-invalid frame sits
450    ///   before the last committed frame in the current generation.
451    /// - [`Error::InvalidArgument`] if `size_limit` would be
452    ///   exceeded during the walk (a runaway WAL caps recovery).
453    pub fn open_for_recovery_with(
454        file: &F,
455        expected_salt: u32,
456        size_limit: u64,
457    ) -> Result<Recovered> {
458        Self::open_for_recovery_with_key(file, expected_salt, size_limit, None)
459    }
460
461    /// Phase 4 (issue #9): same as
462    /// [`Self::open_for_recovery_with`] but takes an optional
463    /// per-file page-encryption key. On encrypted WALs each frame
464    /// body is decrypted with the supplied key BEFORE the frame's
465    /// CRC32C is validated; the recovery walker therefore needs the
466    /// key at construction. The pager calls this entry point.
467    ///
468    /// # Errors
469    ///
470    /// As [`Self::open_for_recovery_with`], plus
471    /// [`Error::EncryptionKeyInvalid`] when a salt-matching frame
472    /// in the WAL fails Poly1305 verification — the smoking-gun
473    /// wrong-key signal.
474    pub fn open_for_recovery_with_key(
475        file: &F,
476        expected_salt: u32,
477        size_limit: u64,
478        key: Option<[u8; 32]>,
479    ) -> Result<Recovered> {
480        let len = file.len()?;
481        if len < WAL_HEADER_SIZE as u64 {
482            return Ok(empty_recovered(expected_salt));
483        }
484        let header_salt = read_wal_header(file)?;
485        if header_salt != expected_salt {
486            // Stale WAL from a previous generation.
487            return Ok(empty_recovered(expected_salt));
488        }
489        let key = key.map(WalKey::new);
490        walk_frames(file, header_salt, len, size_limit, key.as_ref())
491    }
492
493    /// Path the WAL was opened at. Used by the pager to remove the
494    /// sidecar on clean shutdown.
495    #[must_use]
496    pub fn path(&self) -> &Path {
497        &self.path
498    }
499
500    /// Phase 4 (issue #9): on-disk per-frame stride in bytes. Equal
501    /// to [`FRAME_SIZE`] (4160) on plaintext WALs, [`FRAME_SIZE_ENCRYPTED`]
502    /// (4200) on encrypted ones. Read at every site that walks the
503    /// WAL — the constant `FRAME_SIZE` is no longer authoritative
504    /// across all builds.
505    #[must_use]
506    fn frame_size_bytes(&self) -> usize {
507        frame_size_for(self.key.is_some())
508    }
509
510    /// Current WAL generation salt.
511    #[must_use]
512    pub fn salt(&self) -> u32 {
513        self.salt
514    }
515
516    /// LSN the next appended frame will carry.
517    #[must_use]
518    pub fn next_lsn(&self) -> Lsn {
519        self.next_lsn
520    }
521
522    /// Frames currently on disk (committed; torn-tail not counted).
523    #[must_use]
524    pub fn committed_frames(&self) -> u64 {
525        self.committed_frames
526    }
527
528    /// Configured auto-checkpoint threshold.
529    #[must_use]
530    pub fn checkpoint_threshold(&self) -> u64 {
531        self.config.checkpoint_threshold
532    }
533
534    /// Begin a new transaction. The returned [`WalTxn`] holds a
535    /// mutable borrow of the WAL; only one transaction can be open at
536    /// a time.
537    pub fn begin_txn(&mut self) -> WalTxn<'_, F> {
538        WalTxn {
539            wal: self,
540            staged: Vec::new(),
541            is_header: Vec::new(),
542        }
543    }
544
545    /// Reset the WAL after a successful checkpoint: rotate the salt,
546    /// write the new header, fsync, and truncate to header-only.
547    ///
548    /// # Errors
549    ///
550    /// Returns [`Error::Io`] on syscall failure.
551    pub fn reset_after_checkpoint(&mut self) -> Result<()> {
552        let new_salt = next_salt(self.salt);
553        write_wal_header(&self.file, new_salt)?;
554        self.file.sync_data(self.config.sync_mode)?;
555        self.file.set_len(WAL_HEADER_SIZE as u64)?;
556        self.file.sync_data(self.config.sync_mode)?;
557        self.salt = new_salt;
558        self.next_lsn = Lsn::ONE;
559        self.end_offset = WAL_HEADER_SIZE as u64;
560        self.committed_frames = 0;
561        Ok(())
562    }
563}
564
565impl<F: FileBackend> WalTxn<'_, F> {
566    /// Append `(page_id, page)` to the transaction. The frame is held
567    /// in memory until [`WalTxn::commit`].
568    ///
569    /// # Errors
570    ///
571    /// Returns [`Error::InvalidArgument`] if the resulting WAL size
572    /// would exceed `Config::wal_size_limit`.
573    pub fn append(&mut self, page_id: PageId, page: &Page) -> Result<()> {
574        self.append_raw(page_id.get(), page)
575    }
576
577    /// M6 #51: append a file-header (page-0) frame to the
578    /// transaction. The WAL frame carries `page_id = 0`; recovery's
579    /// `WalkState::absorb` routes it into a dedicated header slot.
580    /// Used by [`crate::pager::Pager::commit`] when
581    /// [`crate::pager::Pager::set_root_catalog`] dirtied the
582    /// in-memory header.
583    ///
584    /// # Errors
585    ///
586    /// As [`Self::append`].
587    pub fn append_header(&mut self, page: &Page) -> Result<()> {
588        self.append_raw(0, page)
589    }
590
591    /// Internal: stage a frame with the given raw page-id (zero for
592    /// header updates, non-zero for regular page writes). Centralises
593    /// the size-cap check so both [`Self::append`] and
594    /// [`Self::append_header`] share one bound.
595    fn append_raw(&mut self, page_id: u64, page: &Page) -> Result<()> {
596        let prospective_size = self
597            .wal
598            .end_offset
599            .checked_add(
600                (self
601                    .staged
602                    .len()
603                    .checked_add(1)
604                    .ok_or(Error::InvalidArgument("txn frame count overflow"))?
605                    as u64)
606                    .checked_mul(self.wal.frame_size_bytes() as u64)
607                    .ok_or(Error::InvalidArgument("wal frame offset overflow"))?,
608            )
609            .ok_or(Error::InvalidArgument("wal offset overflow"))?;
610        if prospective_size > self.wal.config.size_limit {
611            return Err(Error::InvalidArgument("wal size limit exceeded"));
612        }
613        // `PageId::new(0)` is `None` — page-0 (header) frames cannot
614        // be represented as a `PageId`. Use a stand-in `PageId::new(1)`
615        // for the staged tuple's first element; the actual page-id
616        // that hits the on-disk frame header is taken from
617        // `(page_id_raw == 0)` further down the commit path.
618        let staged_id = PageId::new(if page_id == 0 { 1 } else { page_id }).ok_or(
619            Error::InvalidArgument("internal: PageId::new returned None on a non-zero input"),
620        )?;
621        self.staged.push((staged_id, page.clone()));
622        // Tag header frames (index-aligned with `staged`) so commit can
623        // emit them with `page_id == 0` on disk.
624        self.is_header.push(page_id == 0);
625        debug_assert_eq!(
626            self.staged.len(),
627            self.is_header.len(),
628            "is_header must stay index-aligned with staged"
629        );
630        Ok(())
631    }
632
633    /// Number of frames currently staged in this transaction.
634    #[must_use]
635    pub fn staged_frame_count(&self) -> usize {
636        self.staged.len()
637    }
638
639    /// Commit the transaction. Writes every staged frame to disk,
640    /// stamps the last one as the commit marker, performs one
641    /// `sync_data(sync_mode)`, and returns the LSN of the last
642    /// frame.
643    ///
644    /// An empty transaction is a no-op and returns the current
645    /// `next_lsn - 1`.
646    ///
647    /// # Errors
648    ///
649    /// Returns [`Error::Io`] on syscall failure.
650    pub fn commit(self) -> Result<Lsn> {
651        if self.staged.is_empty() {
652            return Ok(self.wal.next_lsn.prev_saturating());
653        }
654        let last_index = self.staged.len() - 1;
655        let mut last_lsn: Lsn = Lsn::ZERO;
656        let mut offset = self.wal.end_offset;
657        let bound = self.staged.len();
658        debug_assert_eq!(
659            self.staged.len(),
660            self.is_header.len(),
661            "is_header must stay index-aligned with staged"
662        );
663        // #85: one frame scratch reused across the whole commit loop,
664        // re-stamped fully each iteration by `write_frame`.
665        let mut scratch = [0u8; FRAME_SIZE];
666        for (index, (page_id, page)) in self.staged.iter().enumerate().take(bound) {
667            let lsn = self.wal.next_lsn;
668            self.wal.next_lsn = self.wal.next_lsn.checked_next()?;
669            let is_commit = index == last_index;
670            // M6 #51: header frames are staged with a stand-in
671            // `PageId(1)`; on the wire they MUST carry `page_id == 0`.
672            let wire_page_id = if self.is_header[index] {
673                0
674            } else {
675                page_id.get()
676            };
677            let header = FrameHeader {
678                page_id: wire_page_id,
679                lsn: lsn.get(),
680                salt: self.wal.salt,
681                commit: is_commit,
682            };
683            write_frame(
684                &self.wal.file,
685                offset,
686                &header,
687                page,
688                self.wal.key.as_ref(),
689                &mut scratch,
690            )?;
691            last_lsn = lsn;
692            offset = offset
693                .checked_add(self.wal.frame_size_bytes() as u64)
694                .ok_or(Error::InvalidArgument("wal offset overflow"))?;
695        }
696        // One fsync per commit, regardless of frame count — group
697        // commit. This is the durability boundary the WAL promises
698        // to its caller.
699        self.wal.file.sync_data(self.wal.config.sync_mode)?;
700        self.wal.end_offset = offset;
701        let count_u64 = u64::try_from(self.staged.len())
702            .map_err(|_| Error::InvalidArgument("txn frame count overflow"))?;
703        self.wal.committed_frames = self
704            .wal
705            .committed_frames
706            .checked_add(count_u64)
707            .ok_or(Error::InvalidArgument("committed-frame count overflow"))?;
708        Ok(last_lsn)
709    }
710
711    /// Drain the staged frames into an owned `Vec` so the pager can
712    /// merge them into its in-memory view after a successful commit.
713    /// Called by `WalTxn::commit_returning_view` (see
714    /// `pager::commit`).
715    #[must_use]
716    pub fn drain_staged(self) -> Vec<(PageId, Page)> {
717        self.staged
718    }
719}
720
721// --- internals --------------------------------------------------------
722
723fn empty_recovered(salt: u32) -> Recovered {
724    Recovered {
725        view: HashMap::new(),
726        header: None,
727        next_lsn: Lsn::ONE,
728        salt,
729        committed_frames: 0,
730        end_offset: WAL_HEADER_SIZE as u64,
731    }
732}
733
734fn read_wal_header<F: FileBackend>(file: &F) -> Result<u32> {
735    let mut buf = [0u8; WAL_HEADER_SIZE];
736    file.read_exact_at(&mut buf, 0)?;
737    if buf[0..4] != WAL_MAGIC {
738        return Err(Error::InvalidFormat {
739            reason: "WAL magic does not match",
740        });
741    }
742    let major = u16::from_le_bytes([buf[4], buf[5]]);
743    // Phase 8 (issue #17): accept any major in the reader's
744    // supported set so a v1.0 build can recover a WAL written by
745    // a pre-1.0 (`format_major = 0`) writer when the main file is
746    // the same era. The main file's per-major minor enforcement
747    // still gates the open path.
748    if !crate::pager::header::is_supported_format_major(major) {
749        return Err(Error::InvalidFormat {
750            reason: "WAL format-major does not match",
751        });
752    }
753    // #50: validate the WAL header's format_minor the same way the
754    // main-file open path validates the file header's minor (see
755    // `header::is_supported_minor`, `docs/format.md` § Recovery).
756    // The WAL is a sidecar of the main file and is stamped with the
757    // build's `FORMAT_MINOR` at `write_wal_header`; a WAL whose minor
758    // is not a supported pairing for its major indicates a config /
759    // version mismatch rather than torn tail, so surface it as
760    // `InvalidFormat` before walking any frames.
761    let minor = u16::from_le_bytes([buf[6], buf[7]]);
762    if !crate::pager::header::is_supported_minor(major, minor) {
763        return Err(Error::InvalidFormat {
764            reason: "WAL format-minor is not supported",
765        });
766    }
767    let page_size = u16::from_le_bytes([buf[8], buf[9]]);
768    if usize::from(page_size) != PAGE_SIZE {
769        return Err(Error::InvalidFormat {
770            reason: "WAL page-size does not match this build",
771        });
772    }
773    Ok(u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]))
774}
775
776/// Two-pass WAL recovery walk.
777///
778/// **Pass 1** finds the byte offset of the last frame in the current
779/// generation that satisfies (salt matches AND CRC valid AND commit
780/// flag set). Frames that fail decoding are silently skipped in pass
781/// 1 — they may be torn tail.
782///
783/// **Pass 2** walks from `WAL_HEADER_SIZE` up to (but not past) the
784/// end of the last-commit frame found in pass 1. Any frame in that
785/// range whose salt matches `salt` MUST have a valid CRC32C; a
786/// mismatch is `Error::WalCorruption`. Salt-mismatched frames inside
787/// the range are silently skipped (treated like stale-generation
788/// noise that happens to sit before a later commit).
789///
790/// If pass 1 finds no commit marker, the WAL contains no recoverable
791/// state and we return an empty `Recovered`. In that case any bad CRC
792/// past the WAL header is treated as torn tail — the standard
793/// pre-2.0 "WAL exists but no transaction ever committed" path.
794fn walk_frames<F: FileBackend>(
795    file: &F,
796    salt: u32,
797    file_len: u64,
798    size_limit: u64,
799    key: Option<&WalKey>,
800) -> Result<Recovered> {
801    let frame_size = frame_size_for(key.is_some());
802    let frame_limit = bounded_frame_limit(size_limit, frame_size);
803    let scan_end = scan_aligned_end(file_len, frame_size);
804    // Phase 4 (issue #9): pass 1 returns the last-commit offset AND
805    // a "wrong-key suspected" flag: a salt-matching frame whose
806    // body failed to decrypt is the smoking gun for an incorrect
807    // master key. We surface that as `EncryptionKeyInvalid` here
808    // BEFORE attempting pass 2, so the caller sees a precise
809    // diagnostic instead of an empty-WAL silent recovery.
810    let scan = find_last_commit_end(file, salt, scan_end, frame_limit, key, frame_size)?;
811    if key.is_some() && scan.salt_match_with_decrypt_failure {
812        return Err(Error::EncryptionKeyInvalid);
813    }
814    if scan.last_commit_end <= WAL_HEADER_SIZE as u64 {
815        // No committed frame in this generation — same as empty WAL.
816        return Ok(empty_recovered(salt));
817    }
818    replay_up_to_commit(
819        file,
820        salt,
821        scan.last_commit_end,
822        frame_limit,
823        key,
824        frame_size,
825    )
826}
827
828/// Phase 4 (issue #9): result of pass 1 of the WAL walk. Carries
829/// both the last-commit-end byte offset (the original behaviour)
830/// and a flag that fires if any salt-matching frame failed to
831/// decrypt — the smoking gun for a wrong-key open.
832#[derive(Debug, Clone, Copy)]
833struct ScanResult {
834    last_commit_end: u64,
835    salt_match_with_decrypt_failure: bool,
836}
837
838/// Byte offset just past the last full-frame boundary that fits in
839/// `file_len`. Any bytes after this are torn tail (less than a frame
840/// worth) and never inspected.
841///
842/// Rule 7: `file_len` is OS-supplied and effectively caller-
843/// controlled in a fault-injection harness; saturate the arithmetic
844/// at `u64::MAX` rather than relying on the `payload / FRAME_SIZE`
845/// reduction to bound the final product. The saturation is benign:
846/// the recovery walker's `walked > frame_limit` check is the actual
847/// termination guarantee.
848fn scan_aligned_end(file_len: u64, frame_size: usize) -> u64 {
849    if file_len < WAL_HEADER_SIZE as u64 {
850        return WAL_HEADER_SIZE as u64;
851    }
852    let payload = file_len - WAL_HEADER_SIZE as u64;
853    let aligned_frames = payload / frame_size as u64;
854    aligned_frames
855        .checked_mul(frame_size as u64)
856        .and_then(|product| product.checked_add(WAL_HEADER_SIZE as u64))
857        .unwrap_or(u64::MAX)
858}
859
860/// Pass 1: walk every aligned frame between [`WAL_HEADER_SIZE`] and
861/// `scan_end`. Record the byte offset just past the *last* frame in
862/// the current generation whose salt matches and whose CRC validates
863/// AND whose commit flag is set. Returns `WAL_HEADER_SIZE` if no such
864/// frame exists.
865fn find_last_commit_end<F: FileBackend>(
866    file: &F,
867    salt: u32,
868    scan_end: u64,
869    frame_limit: u64,
870    key: Option<&WalKey>,
871    frame_size: usize,
872) -> Result<ScanResult> {
873    let mut offset = WAL_HEADER_SIZE as u64;
874    let mut last_commit_end = WAL_HEADER_SIZE as u64;
875    let mut salt_match_with_decrypt_failure = false;
876    let mut walked: u64 = 0;
877    // Rule 7: `scan_end` is derived from the OS-supplied file length.
878    // Use checked_add so a `u64::MAX`-saturated `scan_end` (see
879    // `scan_aligned_end`) cannot wrap the loop guard.
880    while let Some(frame_end) = offset.checked_add(frame_size as u64) {
881        if frame_end > scan_end {
882            break;
883        }
884        if walked > frame_limit {
885            return Err(Error::InvalidArgument(
886                "WAL exceeds size limit during recovery",
887            ));
888        }
889        walked = walked.saturating_add(1);
890        // Phase 4 (issue #9): read the physical frame (potentially
891        // 4200 bytes on encrypted WALs), decrypt the body into a
892        // plaintext 4160-byte view, THEN run the existing CRC +
893        // salt + flag validation. A wrong key or tampered ciphertext
894        // surfaces in pass 1 as `FrameDecode::CrcInvalid` /
895        // `FrameDecode::Malformed` — both of which the existing
896        // pass-1 logic treats as torn-tail-ish noise. The actual
897        // hard-fail (`Error::WalCorruption`) only fires in pass 2,
898        // which sits BELOW a known commit marker.
899        let frame = read_plaintext_frame_diag(file, offset, key, frame_size, salt)?;
900        if let FrameDecode::Ok(header) = decode_frame_header_classified(&frame.buf, salt) {
901            // M6 #51: `page_id == 0` frames are header updates and
902            // are valid commit-marker carriers. The original
903            // `PageId::new(header.page_id).is_some()` guard rejected
904            // them as tail; that condition is removed so a txn
905            // whose only commit frame touched the header is still
906            // recovered.
907            if header.commit {
908                last_commit_end = offset
909                    .checked_add(frame_size as u64)
910                    .ok_or(Error::InvalidArgument("wal offset overflow"))?;
911            }
912        } else if frame.salt_matched_but_decrypt_failed {
913            // Phase 4 (issue #9): the frame's header salt matches
914            // our generation salt — i.e. this frame was written by
915            // this generation of the database — yet decryption of
916            // its body failed. That is the smoking gun for a wrong
917            // master key. Record it; `walk_frames` raises
918            // `Error::EncryptionKeyInvalid` if no later frame
919            // overrides this signal.
920            salt_match_with_decrypt_failure = true;
921        }
922        offset = offset
923            .checked_add(frame_size as u64)
924            .ok_or(Error::InvalidArgument("wal offset overflow"))?;
925    }
926    Ok(ScanResult {
927        last_commit_end,
928        salt_match_with_decrypt_failure,
929    })
930}
931
932/// Phase 4 (issue #9): on-disk frame reader that ALSO returns a
933/// diagnostic flag set to `true` when the on-disk frame's header
934/// salt matched but the body decrypt failed. Used in pass 1 to
935/// distinguish "wrong-key open" from "torn tail" / "stale
936/// generation".
937struct PlaintextFrame {
938    buf: Vec<u8>,
939    salt_matched_but_decrypt_failed: bool,
940}
941
942fn read_plaintext_frame_diag<F: FileBackend>(
943    file: &F,
944    offset: u64,
945    key: Option<&WalKey>,
946    frame_size: usize,
947    expected_salt: u32,
948) -> Result<PlaintextFrame> {
949    let raw = read_frame_bytes(file, offset, frame_size)?;
950    let Some(key) = key else {
951        let _ = expected_salt;
952        return Ok(PlaintextFrame {
953            buf: raw,
954            salt_matched_but_decrypt_failed: false,
955        });
956    };
957    #[cfg(feature = "encryption")]
958    {
959        let mut out = vec![0u8; FRAME_SIZE];
960        out[..FRAME_HEADER_SIZE].copy_from_slice(&raw[..FRAME_HEADER_SIZE]);
961        let mut ad = [0u8; 16];
962        ad.copy_from_slice(&raw[..16]);
963        let mut ct = [0u8; PAGE_SIZE + FRAME_AEAD_SUFFIX_SIZE];
964        ct.copy_from_slice(&raw[FRAME_HEADER_SIZE..]);
965        let mut pt = [0u8; PAGE_SIZE];
966        let salt_matched_but_decrypt_failed = if wal_decrypt(key, &ad, &ct, &mut pt).is_ok() {
967            out[FRAME_HEADER_SIZE..].copy_from_slice(&pt);
968            false
969        } else {
970            // The frame header is plaintext — extract the salt it
971            // recorded. If that salt matches the WAL header's
972            // expected_salt, this frame WAS written in the current
973            // generation and the decrypt failure is the smoking gun
974            // for a wrong key. Fall through with the ciphertext in
975            // the body slot so the CRC check downstream treats the
976            // frame as `FrameDecode::CrcInvalid` and skips it.
977            let frame_salt = u32::from_le_bytes([raw[16], raw[17], raw[18], raw[19]]);
978            out[FRAME_HEADER_SIZE..].copy_from_slice(&ct[..PAGE_SIZE]);
979            frame_salt == expected_salt
980        };
981        Ok(PlaintextFrame {
982            buf: out,
983            salt_matched_but_decrypt_failed,
984        })
985    }
986    #[cfg(not(feature = "encryption"))]
987    {
988        let _ = (key, expected_salt);
989        Ok(PlaintextFrame {
990            buf: raw,
991            salt_matched_but_decrypt_failed: false,
992        })
993    }
994}
995
996/// Pass 2: replay frames from the WAL header up to (but not past)
997/// `commit_end`. Frames whose salt matches MUST have a valid CRC;
998/// salt-mismatched frames are skipped (treated like stale-generation
999/// noise that pre-dates the current run). Returns the recovered view
1000/// with the merged committed state.
1001fn replay_up_to_commit<F: FileBackend>(
1002    file: &F,
1003    salt: u32,
1004    commit_end: u64,
1005    frame_limit: u64,
1006    key: Option<&WalKey>,
1007    frame_size: usize,
1008) -> Result<Recovered> {
1009    let mut state = WalkState::new();
1010    let mut walked: u64 = 0;
1011    while state.offset < commit_end {
1012        if walked > frame_limit {
1013            return Err(Error::InvalidArgument(
1014                "WAL exceeds size limit during recovery",
1015            ));
1016        }
1017        walked = walked.saturating_add(1);
1018        // Phase 4 (issue #9): decrypt-then-CRC. Recovery in this
1019        // window must EITHER recover a valid plaintext frame OR
1020        // raise `WalCorruption`. A decrypt failure inside the
1021        // committed window means the WAL is unrecoverable; we
1022        // surface that as `WalCorruption` rather than panic.
1023        let buf = read_plaintext_frame(file, state.offset, key, frame_size)?;
1024        match decode_frame_header_classified(&buf, salt) {
1025            FrameDecode::Ok(header) => {
1026                let mut page = Page::zeroed();
1027                page.as_bytes_mut()
1028                    .copy_from_slice(&buf[FRAME_HEADER_SIZE..]);
1029                state.absorb(header, page, frame_size)?;
1030            }
1031            FrameDecode::CrcInvalid => {
1032                return Err(Error::WalCorruption {
1033                    frame_offset: state.offset,
1034                });
1035            }
1036            FrameDecode::SaltMismatch | FrameDecode::Malformed => {
1037                // Skip: torn-tail-ish noise inside the prefix is
1038                // tolerated. The fact that we sit before the
1039                // last-known commit marker means subsequent frames
1040                // will rebuild the canonical view.
1041            }
1042        }
1043        state.offset = state
1044            .offset
1045            .checked_add(frame_size as u64)
1046            .ok_or(Error::InvalidArgument("wal offset overflow"))?;
1047    }
1048    Ok(state.into_recovered(salt))
1049}
1050
1051struct WalkState {
1052    view: HashMap<PageId, Page>,
1053    pending: HashMap<PageId, Page>,
1054    pending_count: u64,
1055    /// M6 #51: a frame with `page_id == 0` carries an updated page-0
1056    /// file header. Accumulate the most-recent uncommitted one here
1057    /// and promote on commit (alongside the regular `pending` map).
1058    pending_header: Option<Page>,
1059    /// Most-recent COMMITTED page-0 frame body.
1060    view_header: Option<Page>,
1061    offset: u64,
1062    next_lsn: Lsn,
1063    committed_frames: u64,
1064    last_committed_offset: u64,
1065}
1066
1067impl WalkState {
1068    fn new() -> Self {
1069        Self {
1070            view: HashMap::new(),
1071            pending: HashMap::new(),
1072            pending_count: 0,
1073            pending_header: None,
1074            view_header: None,
1075            offset: WAL_HEADER_SIZE as u64,
1076            next_lsn: Lsn::ONE,
1077            committed_frames: 0,
1078            last_committed_offset: WAL_HEADER_SIZE as u64,
1079        }
1080    }
1081
1082    /// Absorb one decoded frame. M6 #51: a frame with `page_id == 0`
1083    /// is a file-header (page-0) update; route it into a dedicated
1084    /// slot. Frames with non-zero `page_id` are regular page writes.
1085    /// Returns `Ok(false)` only on the malformed case where a frame
1086    /// is neither (today: never — kept as a forward-compat hook).
1087    ///
1088    /// Phase 4 (issue #9): `frame_size` is the on-disk per-frame
1089    /// stride (4160 plaintext / 4200 encrypted) so the
1090    /// `last_committed_offset` computation can step the right number
1091    /// of bytes.
1092    fn absorb(&mut self, header: FrameHeader, page: Page, frame_size: usize) -> Result<bool> {
1093        if header.page_id == 0 {
1094            // M6 #51: header (page-0) update.
1095            self.pending_header = Some(page);
1096        } else {
1097            let Some(page_id) = PageId::new(header.page_id) else {
1098                return Ok(false);
1099            };
1100            self.pending.insert(page_id, page);
1101        }
1102        self.pending_count = self
1103            .pending_count
1104            .checked_add(1)
1105            .ok_or(Error::InvalidArgument("pending frame count overflow"))?;
1106        if header.commit {
1107            promote_pending(&mut self.pending, &mut self.view);
1108            if let Some(hp) = self.pending_header.take() {
1109                self.view_header = Some(hp);
1110            }
1111            self.committed_frames = self
1112                .committed_frames
1113                .checked_add(self.pending_count)
1114                .ok_or(Error::InvalidArgument("committed frame count overflow"))?;
1115            self.pending_count = 0;
1116            self.last_committed_offset = self
1117                .offset
1118                .checked_add(frame_size as u64)
1119                .ok_or(Error::InvalidArgument("wal offset overflow"))?;
1120        }
1121        // `header.lsn` is the raw `u64` from the on-disk frame
1122        // header (see `wal::frame::FrameHeader`). Promote it to an
1123        // [`Lsn`] at this boundary and step it monotonically; a
1124        // wrap at `u64::MAX` saturates back to itself, which is
1125        // benign because the recovery walker stops at the last
1126        // committed frame anyway.
1127        self.next_lsn = Lsn::new(header.lsn.saturating_add(1));
1128        Ok(true)
1129    }
1130
1131    fn into_recovered(self, salt: u32) -> Recovered {
1132        Recovered {
1133            view: self.view,
1134            header: self.view_header,
1135            next_lsn: self.next_lsn,
1136            salt,
1137            committed_frames: self.committed_frames,
1138            end_offset: self.last_committed_offset,
1139        }
1140    }
1141}
1142
1143fn promote_pending(pending: &mut HashMap<PageId, Page>, view: &mut HashMap<PageId, Page>) {
1144    for (id, page) in pending.drain() {
1145        view.insert(id, page);
1146    }
1147}
1148
1149fn read_frame_bytes<F: FileBackend>(file: &F, offset: u64, frame_size: usize) -> Result<Vec<u8>> {
1150    let mut buf = vec![0u8; frame_size];
1151    file.read_exact_at(&mut buf, offset)?;
1152    Ok(buf)
1153}
1154
1155/// Phase 4 (issue #9): read the on-disk physical frame at `offset`
1156/// and return its **plaintext** representation (always `FRAME_SIZE`
1157/// = 4160 bytes). On plaintext WALs (`key` is `None`) this is
1158/// exactly the on-disk bytes. On encrypted WALs we read 4200
1159/// bytes, copy the 64-byte plaintext header verbatim, and
1160/// AEAD-decrypt the
1161/// remaining body. A decryption failure surfaces as a plaintext
1162/// buffer carrying the original ciphertext — `decode_frame_header_
1163/// classified` will then return `FrameDecode::CrcInvalid` (the
1164/// caller treats that as torn tail in pass 1 and as
1165/// `Error::WalCorruption` in pass 2).
1166fn read_plaintext_frame<F: FileBackend>(
1167    file: &F,
1168    offset: u64,
1169    key: Option<&WalKey>,
1170    frame_size: usize,
1171) -> Result<Vec<u8>> {
1172    let raw = read_frame_bytes(file, offset, frame_size)?;
1173    let Some(key) = key else {
1174        return Ok(raw);
1175    };
1176    #[cfg(feature = "encryption")]
1177    {
1178        // Build a FRAME_SIZE plaintext view: copy header, decrypt
1179        // body into the body slot. If decryption fails, fall back
1180        // to a buffer whose body slot is the original ciphertext;
1181        // the CRC check will fail downstream and surface as
1182        // `FrameDecode::CrcInvalid` — torn tail in pass 1 or
1183        // `WalCorruption` in pass 2, exactly the right semantics.
1184        let mut out = vec![0u8; FRAME_SIZE];
1185        out[..FRAME_HEADER_SIZE].copy_from_slice(&raw[..FRAME_HEADER_SIZE]);
1186        let mut ad = [0u8; 16];
1187        ad.copy_from_slice(&raw[..16]);
1188        let mut ct = [0u8; PAGE_SIZE + FRAME_AEAD_SUFFIX_SIZE];
1189        ct.copy_from_slice(&raw[FRAME_HEADER_SIZE..]);
1190        let mut pt = [0u8; PAGE_SIZE];
1191        match wal_decrypt(key, &ad, &ct, &mut pt) {
1192            Ok(()) => {
1193                out[FRAME_HEADER_SIZE..].copy_from_slice(&pt);
1194            }
1195            Err(_) => {
1196                // Decrypt failed (wrong key, tampered ciphertext, or
1197                // a salt-mismatched frame from a previous
1198                // generation). Pass through ciphertext into the
1199                // body slot — the CRC will mismatch and recovery
1200                // will treat this as
1201                // FrameDecode::CrcInvalid → WalCorruption (pass 2)
1202                // or torn tail (pass 1). The pager-level error
1203                // matrix ensures that "wrong key" is caught earlier
1204                // by the page-decrypt path; landing here is the
1205                // forensic case (tampered WAL).
1206                out[FRAME_HEADER_SIZE..].copy_from_slice(&ct[..PAGE_SIZE]);
1207            }
1208        }
1209        Ok(out)
1210    }
1211    #[cfg(not(feature = "encryption"))]
1212    {
1213        // Unreachable: `key` is only `Some` under the `encryption`
1214        // feature. Keep the shape consistent.
1215        let _ = key;
1216        Ok(raw)
1217    }
1218}
1219
1220fn bounded_frame_limit(size_limit: u64, frame_size: usize) -> u64 {
1221    // Power-of-ten Rule 2: recovery iterates at most
1222    // `size_limit / FRAME_SIZE + 1` frames. The `+1` covers the
1223    // case where `size_limit` is exactly on a frame boundary.
1224    size_limit / frame_size as u64 + 1
1225}
1226
1227/// Generate a fresh 32-bit salt from the OS RNG. Used at first WAL
1228/// open and at every checkpoint rotation.
1229fn fresh_salt() -> u32 {
1230    let mut rng = rand::rng();
1231    rng.next_u32()
1232}
1233
1234/// Generate the next-generation salt. Guarantees `next != current`
1235/// even if the OS RNG returns the same value back-to-back (rare but
1236/// theoretically possible with a constant-output mock RNG).
1237fn next_salt(current: u32) -> u32 {
1238    let mut candidate = fresh_salt();
1239    if candidate == current {
1240        candidate = current.wrapping_add(1);
1241    }
1242    candidate
1243}
1244
1245fn write_wal_header<F: FileBackend>(file: &F, salt: u32) -> Result<()> {
1246    let mut buf = [0u8; WAL_HEADER_SIZE];
1247    buf[0..4].copy_from_slice(&WAL_MAGIC);
1248    buf[4..6].copy_from_slice(&crate::pager::header::FORMAT_MAJOR.to_le_bytes());
1249    buf[6..8].copy_from_slice(&crate::pager::header::FORMAT_MINOR.to_le_bytes());
1250    let page_size_u16 =
1251        u16::try_from(PAGE_SIZE).map_err(|_| Error::InvalidArgument("page size > u16"))?;
1252    buf[8..10].copy_from_slice(&page_size_u16.to_le_bytes());
1253    // bytes 10..12 reserved (zero).
1254    buf[12..16].copy_from_slice(&salt.to_le_bytes());
1255    // bytes 16..64 reserved (zero).
1256    file.write_all_at(&buf, 0)
1257}
1258
1259fn write_frame<F: FileBackend>(
1260    file: &F,
1261    offset: u64,
1262    header: &FrameHeader,
1263    page: &Page,
1264    key: Option<&WalKey>,
1265    scratch: &mut [u8],
1266) -> Result<()> {
1267    // Phase 4 (issue #9): on encrypted WALs the on-disk frame is
1268    // `[frame_header][ciphertext_body][nonce][tag]` = 4200 bytes.
1269    // The CRC in the frame header is computed over (header_sans_crc
1270    // + PLAINTEXT body) — so we stamp the header FIRST against the
1271    // plaintext body, then encrypt the body in place, then append
1272    // (nonce, tag) to the output.
1273    //
1274    // `scratch` is a caller-owned `[u8; FRAME_SIZE]` reused across the
1275    // commit loop (#85). Re-stamp it fully every call: the body is
1276    // overwritten from `page.as_bytes()` and `encode_frame_header`
1277    // zeroes the 64-byte header region, so no stale bytes survive.
1278    debug_assert_eq!(
1279        scratch.len(),
1280        FRAME_SIZE,
1281        "frame scratch must be FRAME_SIZE"
1282    );
1283    let frame_buf = scratch;
1284    frame_buf[FRAME_HEADER_SIZE..].copy_from_slice(page.as_bytes());
1285    encode_frame_header(header, frame_buf);
1286    let Some(key) = key else {
1287        // Plaintext WAL — write 4160 bytes and return.
1288        return file.write_all_at(frame_buf, offset);
1289    };
1290    // Encrypt the body. #58: the AEAD associated data is the FRAME
1291    // header's first 16 bytes only — page_id (bytes 0..8) + lsn
1292    // (bytes 8..16). It does NOT bind salt, the commit flag, or the
1293    // CRC (those live at byte offsets 16, 20, and 60 and are not fed
1294    // to the AEAD). The (page_id, lsn) pair is enough to make a
1295    // relocated frame fail decryption; the salt/flags/CRC are
1296    // integrity-protected by the frame CRC32C, not by the AEAD tag.
1297    // See `encrypt_frame_body` for the exact AD slice.
1298    encrypt_frame_body(key, frame_buf, offset, file)
1299}
1300
1301/// Phase 4 (issue #9): encrypt a stamped 4160-byte plaintext frame
1302/// buffer (`[header][plaintext_body]`) into a 4200-byte encrypted
1303/// physical frame (`[header][ciphertext_body][nonce][tag]`) and
1304/// write it to `file` at `offset`.
1305fn encrypt_frame_body<F: FileBackend>(
1306    key: &WalKey,
1307    plain_frame: &[u8],
1308    offset: u64,
1309    file: &F,
1310) -> Result<()> {
1311    debug_assert_eq!(plain_frame.len(), FRAME_SIZE);
1312    #[cfg(feature = "encryption")]
1313    {
1314        let mut out = [0u8; FRAME_SIZE_ENCRYPTED];
1315        // Copy the frame header verbatim (plaintext on disk).
1316        out[..FRAME_HEADER_SIZE].copy_from_slice(&plain_frame[..FRAME_HEADER_SIZE]);
1317        // The AEAD encrypts the body into the `body` slot of `out`,
1318        // then drops nonce + tag at `out[FRAME_HEADER_SIZE +
1319        // PAGE_SIZE..]`. Reuse the page-encryption helper by
1320        // building a fixed-size body buffer.
1321        let mut body_pt = [0u8; PAGE_SIZE];
1322        body_pt.copy_from_slice(&plain_frame[FRAME_HEADER_SIZE..]);
1323        let mut body_phys = [0u8; PAGE_SIZE + FRAME_AEAD_SUFFIX_SIZE];
1324        // AD for the WAL frame: the FRAME header's first 16 bytes —
1325        // (page_id, lsn) — uniquely identifies the frame. Using
1326        // page_id alone is not enough (the WAL may carry multiple
1327        // writes for the same page across LSNs), so we feed both.
1328        let mut ad = [0u8; 16];
1329        ad.copy_from_slice(&plain_frame[..16]);
1330        wal_encrypt(key, &ad, &body_pt, &mut body_phys)?;
1331        out[FRAME_HEADER_SIZE..].copy_from_slice(&body_phys);
1332        file.write_all_at(&out, offset)
1333    }
1334    #[cfg(not(feature = "encryption"))]
1335    {
1336        let _ = (key, plain_frame, offset, file);
1337        // Unreachable: the only way to reach here is for `key` to be
1338        // `Some`, which requires the open path to have constructed a
1339        // `WalKey` — which it only does under the `encryption`
1340        // feature. Spell out an error rather than panic.
1341        Err(Error::FormatFeatureUnsupported {
1342            feature: "encryption",
1343        })
1344    }
1345}
1346
1347/// Phase 4 (issue #9): XChaCha20-Poly1305 encrypt a 4096-byte body
1348/// into a 4136-byte (body || nonce || tag) buffer, with AD = `ad`.
1349/// The nonce is `XChaCha20`'s 24-byte (192-bit) extended nonce.
1350/// Returns [`Error::Io`] on CSPRNG failure or
1351/// [`Error::EncryptionKeyInvalid`] on the structurally-unreachable
1352/// AEAD error.
1353#[cfg(feature = "encryption")]
1354fn wal_encrypt(
1355    key: &WalKey,
1356    ad: &[u8; 16],
1357    plaintext: &[u8; PAGE_SIZE],
1358    out: &mut [u8; PAGE_SIZE + FRAME_AEAD_SUFFIX_SIZE],
1359) -> Result<()> {
1360    use chacha20poly1305::aead::{AeadInPlace, KeyInit};
1361    use chacha20poly1305::{Key, XChaCha20Poly1305, XNonce};
1362    let mut nonce_bytes = [0u8; 24];
1363    getrandom::getrandom(&mut nonce_bytes)
1364        .map_err(|e| Error::Io(std::io::Error::other(format!("getrandom failure: {e}"))))?;
1365    let nonce = XNonce::from_slice(&nonce_bytes);
1366    out[..PAGE_SIZE].copy_from_slice(plaintext);
1367    let cipher = XChaCha20Poly1305::new(Key::from_slice(key.as_bytes()));
1368    let tag = cipher
1369        .encrypt_in_place_detached(nonce, ad, &mut out[..PAGE_SIZE])
1370        .map_err(|_| Error::EncryptionKeyInvalid)?;
1371    out[PAGE_SIZE..PAGE_SIZE + 24].copy_from_slice(&nonce_bytes);
1372    out[PAGE_SIZE + 24..].copy_from_slice(&tag);
1373    Ok(())
1374}
1375
1376/// Phase 4 (issue #9): XChaCha20-Poly1305 decrypt a 4136-byte
1377/// (ciphertext || nonce || tag) buffer into a 4096-byte body.
1378#[cfg(feature = "encryption")]
1379fn wal_decrypt(
1380    key: &WalKey,
1381    ad: &[u8; 16],
1382    ciphertext: &[u8; PAGE_SIZE + FRAME_AEAD_SUFFIX_SIZE],
1383    out: &mut [u8; PAGE_SIZE],
1384) -> Result<()> {
1385    use chacha20poly1305::aead::{AeadInPlace, KeyInit};
1386    use chacha20poly1305::{Key, Tag, XChaCha20Poly1305, XNonce};
1387    let mut nonce_bytes = [0u8; 24];
1388    nonce_bytes.copy_from_slice(&ciphertext[PAGE_SIZE..PAGE_SIZE + 24]);
1389    let nonce = XNonce::from_slice(&nonce_bytes);
1390    let mut tag_bytes = [0u8; 16];
1391    tag_bytes.copy_from_slice(&ciphertext[PAGE_SIZE + 24..]);
1392    let tag = Tag::from_slice(&tag_bytes);
1393    out.copy_from_slice(&ciphertext[..PAGE_SIZE]);
1394    let cipher = XChaCha20Poly1305::new(Key::from_slice(key.as_bytes()));
1395    cipher
1396        .decrypt_in_place_detached(nonce, ad, out, tag)
1397        .map_err(|_| Error::EncryptionKeyInvalid)?;
1398    Ok(())
1399}
1400
1401/// Remove the WAL file at `path`. Idempotent — missing-file is OK.
1402///
1403/// # Errors
1404///
1405/// Returns [`Error::Io`] on any failure other than `NotFound`.
1406pub fn remove_wal(path: &Path) -> Result<()> {
1407    remove_file_if_exists(path)
1408}
1409
1410#[cfg(test)]
1411mod tests;