Skip to main content

sqlrite/sql/pager/
wal.rs

1//! Write-Ahead Log (WAL) file format.
2//!
3//! Phase 4b introduces the `.sqlrite-wal` sidecar file. Writes don't go to
4//! the main `.sqlrite` file anymore once the WAL is wired in (Phase 4c);
5//! instead they append **frames** to this log, and a periodic checkpoint
6//! (Phase 4d) applies frames back into the main file.
7//!
8//! This module is the format layer — header, frame, codec, reader,
9//! writer. It doesn't know anything about the `Pager` yet; that wiring is
10//! the next slice.
11//!
12//! **On-disk layout**
13//!
14//! ```text
15//!   byte 0..32   WAL header
16//!                   0..8    magic "SQLRWAL\0"
17//!                   8..12   format version (u32 LE)
18//!                            v1: pre-Phase-11
19//!                            v2: Phase 11.2 — adds clock_high_water
20//!                                              in bytes 24..32
21//!                  12..16   page size     (u32 LE) = 4096
22//!                  16..20   salt          (u32 LE) — random on create,
23//!                                                    re-rolled per checkpoint
24//!                  20..24   checkpoint seq (u32 LE) — bumps per checkpoint
25//!                  24..32   v2: clock_high_water (u64 LE) — last
26//!                            persisted MVCC logical clock value;
27//!                            `crate::mvcc::MvccClock::observe`'d on
28//!                            reopen so timestamps don't reuse values
29//!                            across restarts.
30//!                            v1: reserved / zero (read as 0).
31//!
32//!   byte 32..    sequence of frames, each `FRAME_SIZE` bytes:
33//!                   0..4    page number           (u32 LE)
34//!                   4..8    commit-page-count     (u32 LE)
35//!                             0 = dirty frame (part of an open write)
36//!                            >0 = commit frame; value = page count at commit
37//!                   8..12   salt (u32 LE)         — copied from WAL header,
38//!                                                    detects truncation / file swap
39//!                  12..16   checksum (u32 LE)     — rolling sum over the
40//!                                                    frame header bytes
41//!                                                    [0..12] + the payload
42//!                  16..16+PAGE_SIZE  page bytes
43//! ```
44//!
45//! **Format version compatibility.** v1 WALs (written by pre-Phase-11
46//! builds) open cleanly: their reserved bytes are zero, which we
47//! interpret as `clock_high_water = 0` — exactly what a fresh-from-
48//! checkpoint clock would carry. The next time the WAL is rewritten
49//! (any checkpoint, including the auto-checkpoint that fires past the
50//! frame threshold) it lands on disk as v2. There's no "you must
51//! upgrade your files" step.
52//!
53//! **Checksum.** A rolling `rotate_left(1) + byte` sum over the
54//! concatenation of the frame's first 12 header bytes (page_num,
55//! commit-page-count, salt) and its PAGE_SIZE body. Catches bit flips
56//! and most multi-byte corruption without pulling in a dep. The 13th
57//! through 16th header bytes (the checksum field itself) are excluded
58//! from the computation, obviously.
59//!
60//! **Torn-write recovery.** On open, the reader walks frames from the
61//! start and verifies each checksum. The first invalid or incomplete
62//! frame marks where the WAL effectively ends; anything past it is
63//! treated as if it doesn't exist. Callers learn what's committed vs
64//! what's speculative from `Wal::last_commit_offset` / the `is_commit`
65//! flag of each scanned frame.
66
67use std::collections::HashMap;
68use std::fs::{File, OpenOptions};
69use std::io::{Read, Seek, SeekFrom, Write};
70use std::path::{Path, PathBuf};
71use std::time::{SystemTime, UNIX_EPOCH};
72
73use crate::error::{Result, SQLRiteError};
74use crate::sql::pager::page::PAGE_SIZE;
75use crate::sql::pager::pager::{AccessMode, acquire_lock};
76
77pub const WAL_HEADER_SIZE: usize = 32;
78pub const WAL_MAGIC: &[u8; 8] = b"SQLRWAL\0";
79/// The version the engine writes today. Phase 11.2 bumped 1 → 2 to
80/// Bumped 2 → 3 in Phase 11.9 to mark "may contain MVCC log-record
81/// frames" — frames whose `page_num` field carries
82/// [`crate::mvcc::MVCC_FRAME_MARKER`] (`u32::MAX`) instead of a
83/// real page number. v1 and v2 readers had no special-case for that
84/// marker and a pre-Phase-11.9 checkpoint would try to flush it to
85/// the main file at offset `u32::MAX * PAGE_SIZE` (way past EOF),
86/// which is why the bump is needed.
87pub const WAL_FORMAT_VERSION: u32 = 3;
88/// Lowest format version we know how to open. v1 had the bytes that
89/// now hold `clock_high_water` reserved-as-zero, which is identical
90/// to "clock has never been persisted" and round-trips cleanly. v2
91/// adds the clock_high_water field but no MVCC frames. v3 adds MVCC
92/// frames; downgrading a v3 WAL into a v2 reader would mis-handle
93/// the MVCC marker page number — but a fresh `truncate` (every
94/// checkpoint) rewrites the header at the engine's current version,
95/// so the cross-version exposure is bounded.
96pub const WAL_FORMAT_VERSION_MIN_SUPPORTED: u32 = 1;
97pub const FRAME_HEADER_SIZE: usize = 16;
98pub const FRAME_SIZE: usize = FRAME_HEADER_SIZE + PAGE_SIZE;
99
100/// Parsed WAL header. `page_size` is redundant with the engine's compile-
101/// time constant; we persist it for forward-compat and reject anything
102/// that doesn't match at open time.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub struct WalHeader {
105    pub salt: u32,
106    pub checkpoint_seq: u32,
107    /// Phase 11.2 — the last MVCC logical-clock value persisted to
108    /// this WAL. Rewritten by `truncate` (= every checkpoint) so a
109    /// reopen-then-tick can never reuse a timestamp the previous run
110    /// already handed out. Always 0 for v1 WALs (the bytes were
111    /// reserved-zero before the bump); read back as 0 on any v1 file
112    /// that pre-existed this build.
113    pub clock_high_water: u64,
114}
115
116/// Parsed per-frame header (everything but the page body).
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct FrameHeader {
119    pub page_num: u32,
120    pub commit_page_count: u32,
121    pub salt: u32,
122    pub checksum: u32,
123}
124
125impl FrameHeader {
126    /// A commit frame is the "transaction barrier": every preceding dirty
127    /// frame up to the previous commit frame (or the WAL header) belongs
128    /// to the transaction this one seals.
129    pub fn is_commit(&self) -> bool {
130        self.commit_page_count != 0
131    }
132}
133
134pub struct Wal {
135    // File carries a Debug impl; we don't derive on Wal because we don't
136    // want to dump the whole latest_frame map into the default Debug output.
137    file: File,
138    path: PathBuf,
139    header: WalHeader,
140    /// Page → byte offset of the LATEST frame carrying that page's
141    /// content. Offsets point at the start of the 16-byte frame header.
142    /// A reader consults this to resolve a page via the WAL before
143    /// falling back to the main DB file (that's Phase 4c).
144    latest_frame: HashMap<u32, u64>,
145    /// Byte offset just past the last valid commit frame. Anything past
146    /// this is uncommitted and should be ignored by readers. Equals
147    /// `WAL_HEADER_SIZE` when there's nothing committed yet.
148    last_commit_offset: u64,
149    /// Post-commit page count carried in the most recent commit frame.
150    last_commit_page_count: Option<u32>,
151    /// Total valid frames (up to and including `last_commit_offset`).
152    /// Used by the checkpointer in Phase 4d to decide whether to run.
153    frame_count: usize,
154    /// Phase 11.9 — MVCC commit batches recovered from the WAL on
155    /// open, in the order they were committed (oldest first).
156    /// Populated by `replay_frames` from frames carrying
157    /// `page_num = MVCC_FRAME_MARKER`. `Pager::open` exposes the
158    /// vector so the engine can replay them into `MvStore` on
159    /// reopen.
160    recovered_mvcc: Vec<crate::mvcc::MvccCommitBatch>,
161}
162
163impl Wal {
164    /// Creates a fresh WAL file, truncating any existing one. Writes the
165    /// header synchronously so a subsequent `open` sees a valid file even
166    /// if the caller panics before appending any frames. Always takes an
167    /// exclusive lock — create is a write operation by definition.
168    pub fn create(path: &Path) -> Result<Self> {
169        let file = OpenOptions::new()
170            .read(true)
171            .write(true)
172            .create(true)
173            .truncate(true)
174            .open(path)?;
175        acquire_lock(&file, path, AccessMode::ReadWrite)?;
176
177        let salt = random_salt();
178        let header = WalHeader {
179            salt,
180            checkpoint_seq: 0,
181            clock_high_water: 0,
182        };
183        let mut wal = Self {
184            file,
185            path: path.to_path_buf(),
186            header,
187            latest_frame: HashMap::new(),
188            last_commit_offset: WAL_HEADER_SIZE as u64,
189            last_commit_page_count: None,
190            frame_count: 0,
191            recovered_mvcc: Vec::new(),
192        };
193        wal.write_header()?;
194        wal.file.flush()?;
195        wal.file.sync_all()?;
196        Ok(wal)
197    }
198
199    /// Opens an existing WAL file with an exclusive lock (read-write).
200    /// Convenience wrapper around [`Wal::open_with_mode`].
201    pub fn open(path: &Path) -> Result<Self> {
202        Self::open_with_mode(path, AccessMode::ReadWrite)
203    }
204
205    /// Opens an existing WAL file with the given access mode. In
206    /// `ReadOnly` mode the file descriptor is opened read-only and the
207    /// advisory lock is shared — multiple read-only openers may coexist.
208    /// Walks every frame from the start, validates checksums, and builds
209    /// the in-memory `latest_frame` index. A torn or corrupted frame is
210    /// treated as the end of the log — its bytes and anything after stay
211    /// on disk but are ignored by reads.
212    pub fn open_with_mode(path: &Path, mode: AccessMode) -> Result<Self> {
213        let mut file = match mode {
214            AccessMode::ReadWrite => OpenOptions::new().read(true).write(true).open(path)?,
215            AccessMode::ReadOnly => OpenOptions::new().read(true).open(path)?,
216        };
217        acquire_lock(&file, path, mode)?;
218
219        let header = read_header(&mut file)?;
220        let mut wal = Self {
221            file,
222            path: path.to_path_buf(),
223            header,
224            latest_frame: HashMap::new(),
225            last_commit_offset: WAL_HEADER_SIZE as u64,
226            last_commit_page_count: None,
227            frame_count: 0,
228            recovered_mvcc: Vec::new(),
229        };
230        wal.replay_frames()?;
231        Ok(wal)
232    }
233
234    pub fn header(&self) -> WalHeader {
235        self.header
236    }
237
238    pub fn frame_count(&self) -> usize {
239        self.frame_count
240    }
241
242    pub fn last_commit_page_count(&self) -> Option<u32> {
243        self.last_commit_page_count
244    }
245
246    /// Phase 11.2 — the MVCC logical-clock high-water mark persisted
247    /// in this WAL's header. Returns 0 for fresh-from-create WALs and
248    /// for v1 WALs (where the bytes were reserved-zero). The Pager
249    /// (Phase 11.3) seeds the in-memory `MvccClock` from this on open.
250    pub fn clock_high_water(&self) -> u64 {
251        self.header.clock_high_water
252    }
253
254    /// Phase 11.2 — overrides the in-memory clock high-water value.
255    /// `truncate` writes whatever value the WAL is carrying when it
256    /// runs, so callers update this just before checkpoint to persist
257    /// the latest in-memory clock value. The setter rejects a
258    /// non-monotonic update (a value below the existing high-water
259    /// mark) — that would either be a bug in the caller or evidence
260    /// of a corrupted in-memory clock. Same value is a no-op.
261    pub fn set_clock_high_water(&mut self, value: u64) -> Result<()> {
262        if value < self.header.clock_high_water {
263            return Err(SQLRiteError::General(format!(
264                "WAL clock_high_water cannot move backwards: \
265                 attempted {value}, current {}",
266                self.header.clock_high_water
267            )));
268        }
269        self.header.clock_high_water = value;
270        Ok(())
271    }
272
273    /// Bulk-loads every committed page from the WAL into `dest`. Used by
274    /// `Pager::open` to warm a WAL cache so subsequent reads don't have
275    /// to seek back into the WAL file. Uncommitted frames are skipped
276    /// (same rule as `read_page`).
277    pub fn load_committed_into(
278        &mut self,
279        dest: &mut HashMap<u32, Box<[u8; PAGE_SIZE]>>,
280    ) -> Result<()> {
281        // Snapshot the page numbers upfront so we don't hold a borrow of
282        // `self` while calling the mutating `read_page`.
283        let pages: Vec<u32> = self.latest_frame.keys().copied().collect();
284        for page_num in pages {
285            if let Some(body) = self.read_page(page_num)? {
286                dest.insert(page_num, body);
287            }
288        }
289        Ok(())
290    }
291
292    /// Appends a new frame at the current end of file. `commit_page_count`
293    /// of `None` writes a dirty (in-progress) frame; `Some(n)` writes a
294    /// commit frame carrying the post-commit page count. On commit the
295    /// frame is fsync'd; dirty frames are not — torn writes are recovered
296    /// by the checksum check on next open.
297    pub fn append_frame(
298        &mut self,
299        page_num: u32,
300        content: &[u8; PAGE_SIZE],
301        commit_page_count: Option<u32>,
302    ) -> Result<()> {
303        // Build the header in a buffer so we can checksum + write it
304        // atomically alongside the body.
305        let mut header_buf = [0u8; FRAME_HEADER_SIZE];
306        header_buf[0..4].copy_from_slice(&page_num.to_le_bytes());
307        header_buf[4..8].copy_from_slice(&commit_page_count.unwrap_or(0).to_le_bytes());
308        header_buf[8..12].copy_from_slice(&self.header.salt.to_le_bytes());
309        let sum = compute_checksum(&header_buf[0..12], content);
310        header_buf[12..16].copy_from_slice(&sum.to_le_bytes());
311
312        // Frame lands at the current tail.
313        let offset = self.file.seek(SeekFrom::End(0))?;
314        self.file.write_all(&header_buf)?;
315        self.file.write_all(content)?;
316
317        // Commit frames sync; dirty frames are buffered.
318        if commit_page_count.is_some() {
319            self.file.flush()?;
320            self.file.sync_all()?;
321        }
322
323        // Update in-memory state — the latest-frame map always points at the
324        // newest frame, whether dirty or committed. Readers consult the
325        // commit-barrier separately to decide what's visible.
326        self.latest_frame.insert(page_num, offset);
327        if let Some(pc) = commit_page_count {
328            self.last_commit_offset = offset + FRAME_SIZE as u64;
329            self.last_commit_page_count = Some(pc);
330        }
331        self.frame_count += 1;
332        Ok(())
333    }
334
335    /// Reads the most recent committed copy of a page from the WAL, or
336    /// `None` if no committed frame has been written for this page since
337    /// the last checkpoint. Uncommitted (dirty) frames are skipped — a
338    /// reader must only see committed state.
339    pub fn read_page(&mut self, page_num: u32) -> Result<Option<Box<[u8; PAGE_SIZE]>>> {
340        let Some(&offset) = self.latest_frame.get(&page_num) else {
341            return Ok(None);
342        };
343        // If this frame sits past the last commit barrier it's
344        // uncommitted — not visible.
345        if offset + FRAME_SIZE as u64 > self.last_commit_offset {
346            return Ok(None);
347        }
348        let (_hdr, body) = self.read_frame_at(offset)?;
349        Ok(Some(body))
350    }
351
352    /// Truncates the WAL back to just the header and rolls the salt.
353    /// Called by the checkpointer (Phase 4d) once it has applied
354    /// accumulated frames to the main file.
355    pub fn truncate(&mut self) -> Result<()> {
356        self.header.salt = random_salt();
357        self.header.checkpoint_seq = self.header.checkpoint_seq.wrapping_add(1);
358        self.file.set_len(WAL_HEADER_SIZE as u64)?;
359        self.write_header()?;
360        self.file.flush()?;
361        self.file.sync_all()?;
362        self.latest_frame.clear();
363        self.last_commit_offset = WAL_HEADER_SIZE as u64;
364        self.last_commit_page_count = None;
365        self.frame_count = 0;
366        // Phase 11.9 — the recovered MVCC batches were a snapshot
367        // taken at WAL replay time and represent commits the
368        // current process has now seen. Clearing them on truncate
369        // matches the legacy `latest_frame.clear()` policy: the
370        // WAL is now empty on disk, so the in-memory mirror is
371        // empty too. (The engine still holds the *applied* state
372        // in `MvStore`; this vector is only the rebuildable-from-
373        // WAL portion.)
374        self.recovered_mvcc.clear();
375        Ok(())
376    }
377
378    // ---- internal helpers ------------------------------------------------
379
380    fn write_header(&mut self) -> Result<()> {
381        let mut buf = [0u8; WAL_HEADER_SIZE];
382        buf[0..8].copy_from_slice(WAL_MAGIC);
383        buf[8..12].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
384        buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
385        buf[16..20].copy_from_slice(&self.header.salt.to_le_bytes());
386        buf[20..24].copy_from_slice(&self.header.checkpoint_seq.to_le_bytes());
387        // Phase 11.2: bytes 24..32 carry the MVCC clock high-water
388        // mark (u64 LE). v1 WALs left this as zeros, which v2 reads
389        // as "no timestamps have been issued" — the same value a
390        // newly-created v2 WAL starts at. That's how the v1 → v2
391        // upgrade stays seamless.
392        buf[24..32].copy_from_slice(&self.header.clock_high_water.to_le_bytes());
393        self.file.seek(SeekFrom::Start(0))?;
394        self.file.write_all(&buf)?;
395        Ok(())
396    }
397
398    /// Reads and parses one frame at `offset`. Returns `(header, body)`.
399    /// Errors if the frame is truncated or the checksum fails.
400    fn read_frame_at(&mut self, offset: u64) -> Result<(FrameHeader, Box<[u8; PAGE_SIZE]>)> {
401        self.file.seek(SeekFrom::Start(offset))?;
402        let mut header_buf = [0u8; FRAME_HEADER_SIZE];
403        self.file.read_exact(&mut header_buf)?;
404        let mut body = Box::new([0u8; PAGE_SIZE]);
405        self.file.read_exact(body.as_mut())?;
406
407        let page_num = u32::from_le_bytes(header_buf[0..4].try_into().unwrap());
408        let commit_page_count = u32::from_le_bytes(header_buf[4..8].try_into().unwrap());
409        let salt = u32::from_le_bytes(header_buf[8..12].try_into().unwrap());
410        let stored_checksum = u32::from_le_bytes(header_buf[12..16].try_into().unwrap());
411
412        if salt != self.header.salt {
413            return Err(SQLRiteError::General(format!(
414                "WAL frame at offset {offset}: salt mismatch (expected {:x}, got {:x})",
415                self.header.salt, salt
416            )));
417        }
418        let computed = compute_checksum(&header_buf[0..12], &body);
419        if computed != stored_checksum {
420            return Err(SQLRiteError::General(format!(
421                "WAL frame at offset {offset}: bad checksum (expected {stored_checksum:x}, got {computed:x})"
422            )));
423        }
424
425        Ok((
426            FrameHeader {
427                page_num,
428                commit_page_count,
429                salt,
430                checksum: stored_checksum,
431            },
432            body,
433        ))
434    }
435
436    /// Walks every frame from `WAL_HEADER_SIZE` to end-of-file, validating
437    /// each checksum and building `latest_frame`. A frame with a salt
438    /// mismatch or bad checksum marks the end of the usable log (earlier
439    /// frames are still valid). The last commit frame we successfully
440    /// read defines `last_commit_offset`.
441    ///
442    /// Key invariant: `latest_frame` only holds offsets of *committed*
443    /// frames. Dirty frames belonging to an in-progress (or crashed)
444    /// transaction accumulate in a pending map and are promoted on the
445    /// commit frame that seals them — or discarded if the log ends before
446    /// a commit arrives. Without this, an orphan dirty frame for page N
447    /// would shadow the previous committed frame for page N, erasing it
448    /// from visibility.
449    fn replay_frames(&mut self) -> Result<()> {
450        use crate::mvcc::{MVCC_FRAME_MARKER, MvccCommitBatch};
451
452        let file_len = self.file.seek(SeekFrom::End(0))?;
453        let mut offset = WAL_HEADER_SIZE as u64;
454        let mut pending: HashMap<u32, u64> = HashMap::new();
455        // Phase 11.9 — MVCC batches waiting for the next commit
456        // barrier. They share the legacy page-commit barrier's
457        // fsync, so we promote them at the same `is_commit` point
458        // page frames are promoted.
459        let mut pending_mvcc: Vec<MvccCommitBatch> = Vec::new();
460        while offset + FRAME_SIZE as u64 <= file_len {
461            match self.read_frame_at(offset) {
462                Ok((header, body)) => {
463                    self.frame_count += 1;
464                    if header.page_num == MVCC_FRAME_MARKER {
465                        // MVCC log-record frame. Decode body now —
466                        // if it's corrupt the whole frame falls
467                        // out of the log (treated the same as a
468                        // bad checksum).
469                        match MvccCommitBatch::decode(body.as_ref()) {
470                            Ok(batch) => pending_mvcc.push(batch),
471                            Err(_) => break,
472                        }
473                    } else {
474                        pending.insert(header.page_num, offset);
475                    }
476                    if header.is_commit() {
477                        // Seal: promote all pending page frames
478                        // (including this commit frame itself)
479                        // into latest_frame, plus all pending
480                        // MVCC batches into recovered_mvcc.
481                        for (p, o) in pending.drain() {
482                            self.latest_frame.insert(p, o);
483                        }
484                        self.recovered_mvcc.extend(pending_mvcc.drain(..));
485                        self.last_commit_offset = offset + FRAME_SIZE as u64;
486                        self.last_commit_page_count = Some(header.commit_page_count);
487                    }
488                    offset += FRAME_SIZE as u64;
489                }
490                // A bad frame is the torn-write boundary. Keep everything
491                // before it.
492                Err(_) => break,
493            }
494        }
495        // Anything still in `pending` or `pending_mvcc` belongs to
496        // a transaction that never committed (crash, or a writer
497        // that died mid-append). Drop it — the legacy and the
498        // MVCC writes both fail together, matching the atomicity
499        // contract.
500        Ok(())
501    }
502
503    /// Phase 11.9 — appends an MVCC commit batch as a single WAL
504    /// frame whose `page_num` is set to
505    /// [`MVCC_FRAME_MARKER`](crate::mvcc::MVCC_FRAME_MARKER).
506    ///
507    /// Writes the frame as a "dirty" frame
508    /// (`commit_page_count = None`) so it doesn't seal a
509    /// transaction by itself — it piggybacks on the next legacy
510    /// page-commit frame's fsync. This is the durability boundary
511    /// for `BEGIN CONCURRENT`: the MVCC batch and the legacy page
512    /// updates of the same transaction land in the WAL together
513    /// and either both survive the next fsync or both fall away
514    /// at the torn-write boundary on reopen.
515    ///
516    /// `Connection::commit_concurrent` calls this right before
517    /// `save_database`, so the same fsync covers both.
518    pub fn append_mvcc_batch(&mut self, batch: &crate::mvcc::MvccCommitBatch) -> Result<()> {
519        let body = batch.encode()?;
520        self.append_frame(crate::mvcc::MVCC_FRAME_MARKER, body.as_ref(), None)
521    }
522
523    /// Phase 11.9 — returns the MVCC commit batches recovered from
524    /// the WAL on open, in commit order. Empty if no MVCC frames
525    /// were present (a brand-new WAL, a pre-11.9 v1/v2 WAL, or one
526    /// where the last commit barrier dropped MVCC frames).
527    pub fn recovered_mvcc_commits(&self) -> &[crate::mvcc::MvccCommitBatch] {
528        &self.recovered_mvcc
529    }
530}
531
532impl std::fmt::Debug for Wal {
533    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534        f.debug_struct("Wal")
535            .field("path", &self.path)
536            .field("salt", &format_args!("{:#x}", self.header.salt))
537            .field("checkpoint_seq", &self.header.checkpoint_seq)
538            .field("frame_count", &self.frame_count)
539            .field("last_commit_page_count", &self.last_commit_page_count)
540            .finish()
541    }
542}
543
544fn read_header(file: &mut File) -> Result<WalHeader> {
545    let mut buf = [0u8; WAL_HEADER_SIZE];
546    file.seek(SeekFrom::Start(0))?;
547    // read_exact on a short file would bubble up as a generic io error —
548    // surface it as "bad magic" instead so the caller gets a consistent
549    // diagnosis regardless of whether the file is short-and-garbage or
550    // long-and-garbage.
551    if file.read_exact(&mut buf).is_err() {
552        return Err(SQLRiteError::General(
553            "file is not a SQLRite WAL (too short / bad magic)".to_string(),
554        ));
555    }
556    if &buf[0..8] != WAL_MAGIC {
557        return Err(SQLRiteError::General(
558            "file is not a SQLRite WAL (bad magic)".to_string(),
559        ));
560    }
561    let version = u32::from_le_bytes(buf[8..12].try_into().unwrap());
562    // Phase 11.2 — accept v1 (clock bytes are reserved-zero) and v2
563    // (clock bytes carry the high-water mark). Anything else is
564    // either a corrupt header or a forward-version we don't
565    // understand; reject with a clean error.
566    if !(WAL_FORMAT_VERSION_MIN_SUPPORTED..=WAL_FORMAT_VERSION).contains(&version) {
567        return Err(SQLRiteError::General(format!(
568            "unsupported WAL format version {version}; this build reads \
569             v{WAL_FORMAT_VERSION_MIN_SUPPORTED}..=v{WAL_FORMAT_VERSION}"
570        )));
571    }
572    let page_size = u32::from_le_bytes(buf[12..16].try_into().unwrap()) as usize;
573    if page_size != PAGE_SIZE {
574        return Err(SQLRiteError::General(format!(
575            "WAL page size {page_size} doesn't match engine's {PAGE_SIZE}"
576        )));
577    }
578    let salt = u32::from_le_bytes(buf[16..20].try_into().unwrap());
579    let checkpoint_seq = u32::from_le_bytes(buf[20..24].try_into().unwrap());
580    // v1 wrote zeros into bytes 24..32; v2 puts the clock high-water
581    // there. Either way, decoding as `u64::from_le_bytes` produces
582    // the right value — 0 for v1, the persisted clock for v2.
583    let clock_high_water = u64::from_le_bytes(buf[24..32].try_into().unwrap());
584    Ok(WalHeader {
585        salt,
586        checkpoint_seq,
587        clock_high_water,
588    })
589}
590
591fn random_salt() -> u32 {
592    // Seeded from SystemTime. Crypto-grade randomness isn't needed — the
593    // salt's only job is to make a post-truncate WAL file visibly
594    // different from the pre-truncate one (so stale tail bytes can't
595    // collide).
596    SystemTime::now()
597        .duration_since(UNIX_EPOCH)
598        .map(|d| (d.as_nanos() as u32) ^ (d.as_secs() as u32).rotate_left(13))
599        .unwrap_or(0xdeadbeef)
600}
601
602/// Rolling sum over `(header_bytes ++ body)`. `rotate_left(1)` per byte
603/// makes the checksum order-sensitive, so bit flips AND byte shuffles
604/// are detected.
605fn compute_checksum(header_bytes: &[u8], body: &[u8; PAGE_SIZE]) -> u32 {
606    let mut sum: u32 = 0;
607    for &b in header_bytes {
608        sum = sum.rotate_left(1).wrapping_add(b as u32);
609    }
610    for &b in body.iter() {
611        sum = sum.rotate_left(1).wrapping_add(b as u32);
612    }
613    sum
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619
620    fn tmp_wal(name: &str) -> PathBuf {
621        let mut p = std::env::temp_dir();
622        let pid = std::process::id();
623        let nanos = std::time::SystemTime::now()
624            .duration_since(UNIX_EPOCH)
625            .map(|d| d.as_nanos())
626            .unwrap_or(0);
627        p.push(format!("sqlrite-wal-{pid}-{nanos}-{name}.wal"));
628        p
629    }
630
631    fn page(byte: u8) -> Box<[u8; PAGE_SIZE]> {
632        let mut b = Box::new([0u8; PAGE_SIZE]);
633        for (i, slot) in b.iter_mut().enumerate() {
634            *slot = byte.wrapping_add(i as u8);
635        }
636        b
637    }
638
639    #[test]
640    fn create_then_open_round_trips_an_empty_wal() {
641        let p = tmp_wal("empty");
642        let w = Wal::create(&p).unwrap();
643        assert_eq!(w.frame_count(), 0);
644        assert_eq!(w.last_commit_page_count(), None);
645        let salt = w.header().salt;
646        drop(w);
647
648        let w2 = Wal::open(&p).unwrap();
649        assert_eq!(w2.header().salt, salt);
650        assert_eq!(w2.frame_count(), 0);
651        assert_eq!(w2.last_commit_page_count(), None);
652
653        let _ = std::fs::remove_file(&p);
654    }
655
656    #[test]
657    fn single_commit_frame_round_trips() {
658        let p = tmp_wal("one_frame");
659        let mut w = Wal::create(&p).unwrap();
660        let content = page(0xab);
661        w.append_frame(7, &content, Some(42)).unwrap();
662        assert_eq!(w.frame_count(), 1);
663        assert_eq!(w.last_commit_page_count(), Some(42));
664        drop(w);
665
666        let mut w2 = Wal::open(&p).unwrap();
667        assert_eq!(w2.frame_count(), 1);
668        assert_eq!(w2.last_commit_page_count(), Some(42));
669        let read = w2.read_page(7).unwrap().expect("frame should be visible");
670        assert_eq!(read.as_ref(), content.as_ref());
671        assert!(
672            w2.read_page(99).unwrap().is_none(),
673            "untouched page is None"
674        );
675
676        let _ = std::fs::remove_file(&p);
677    }
678
679    #[test]
680    fn multi_frame_commits_and_latest_wins() {
681        // Three commits to the same page; the latest one should be what
682        // read_page returns.
683        let p = tmp_wal("latest_wins");
684        let mut w = Wal::create(&p).unwrap();
685        w.append_frame(1, &page(1), Some(10)).unwrap();
686        w.append_frame(1, &page(2), Some(10)).unwrap();
687        w.append_frame(1, &page(3), Some(10)).unwrap();
688        w.append_frame(2, &page(9), Some(10)).unwrap();
689        assert_eq!(w.frame_count(), 4);
690        drop(w);
691
692        let mut w2 = Wal::open(&p).unwrap();
693        assert_eq!(w2.read_page(1).unwrap().unwrap().as_ref(), page(3).as_ref());
694        assert_eq!(w2.read_page(2).unwrap().unwrap().as_ref(), page(9).as_ref());
695        let _ = std::fs::remove_file(&p);
696    }
697
698    #[test]
699    fn orphan_dirty_tail_preserves_previous_commit() {
700        // A dirty frame at the tail with no commit frame following it
701        // belongs to a transaction that never sealed. The reader must
702        // fall back to the previous committed frame for that page rather
703        // than treating the page as absent — otherwise a crash mid-write
704        // would erase the page's last durable value.
705        let p = tmp_wal("dirty_tail");
706        let mut w = Wal::create(&p).unwrap();
707        w.append_frame(5, &page(50), Some(10)).unwrap(); // committed V1
708        w.append_frame(5, &page(51), None).unwrap(); // orphan dirty V2
709        drop(w);
710
711        let mut w2 = Wal::open(&p).unwrap();
712        // latest_frame points at the committed offset, NOT the orphan's.
713        // read_page returns V1 — the orphan is invisible.
714        let got = w2
715            .read_page(5)
716            .unwrap()
717            .expect("committed V1 should still be visible");
718        assert_eq!(got.as_ref(), page(50).as_ref());
719        // Both frames are still present on disk; frame_count reflects that.
720        assert_eq!(w2.frame_count(), 2);
721        let _ = std::fs::remove_file(&p);
722    }
723
724    #[test]
725    fn uncommitted_frame_for_untouched_page_returns_none() {
726        // Contrast with the previous test: a dirty frame for a page that
727        // was never committed has no fallback, so read_page returns None.
728        let p = tmp_wal("dirty_only");
729        let mut w = Wal::create(&p).unwrap();
730        w.append_frame(7, &page(70), None).unwrap(); // dirty, no commit
731        drop(w);
732
733        let mut w2 = Wal::open(&p).unwrap();
734        assert_eq!(w2.read_page(7).unwrap(), None);
735        let _ = std::fs::remove_file(&p);
736    }
737
738    #[test]
739    fn truncate_resets_to_empty_and_rolls_salt() {
740        let p = tmp_wal("truncate");
741        let mut w = Wal::create(&p).unwrap();
742        w.append_frame(1, &page(11), Some(5)).unwrap();
743        w.append_frame(2, &page(22), Some(5)).unwrap();
744        let seq_before = w.header().checkpoint_seq;
745        let salt_before = w.header().salt;
746        w.truncate().unwrap();
747        assert_eq!(w.frame_count(), 0);
748        assert_eq!(w.last_commit_page_count(), None);
749        assert_eq!(w.header().checkpoint_seq, seq_before + 1);
750        // Salt is randomly rolled; we can't assert a specific value, but
751        // it should almost never match the previous one.
752        let _ = salt_before; // the SystemTime-based salt can collide in a
753        // theoretical tie; don't assert inequality to avoid flakes.
754
755        // Drop w so its exclusive lock releases before we reopen the same
756        // path for verification.
757        drop(w);
758
759        // After truncate, read_page returns None for pages we previously
760        // wrote — the frames are gone.
761        let mut w2 = Wal::open(&p).unwrap();
762        assert_eq!(w2.frame_count(), 0);
763        assert_eq!(w2.read_page(1).unwrap(), None);
764        assert_eq!(w2.read_page(2).unwrap(), None);
765
766        let _ = std::fs::remove_file(&p);
767    }
768
769    #[test]
770    fn bad_magic_file_is_rejected() {
771        let p = tmp_wal("bad_magic");
772        std::fs::write(&p, b"not a WAL file").unwrap();
773        let err = Wal::open(&p).unwrap_err();
774        assert!(format!("{err}").contains("bad magic"));
775        let _ = std::fs::remove_file(&p);
776    }
777
778    #[test]
779    fn corrupt_frame_body_marks_end_of_log() {
780        // Write two valid commit frames, then flip a byte in the second
781        // frame's body. The reader should accept the first frame and
782        // treat the second as end-of-log.
783        let p = tmp_wal("bit_flip");
784        let mut w = Wal::create(&p).unwrap();
785        w.append_frame(1, &page(0x11), Some(5)).unwrap();
786        w.append_frame(2, &page(0x22), Some(5)).unwrap();
787        drop(w);
788
789        // Flip a byte in the second frame's body. Frame 2's body starts
790        // at offset WAL_HEADER_SIZE + FRAME_SIZE + FRAME_HEADER_SIZE.
791        let body_offset = WAL_HEADER_SIZE + FRAME_SIZE + FRAME_HEADER_SIZE;
792        let mut buf = std::fs::read(&p).unwrap();
793        buf[body_offset] ^= 0xff;
794        std::fs::write(&p, &buf).unwrap();
795
796        let mut w2 = Wal::open(&p).unwrap();
797        // First frame survived.
798        assert_eq!(
799            w2.read_page(1).unwrap().unwrap().as_ref(),
800            page(0x11).as_ref()
801        );
802        // Second frame was truncated out — its content isn't readable.
803        assert_eq!(w2.read_page(2).unwrap(), None);
804        assert_eq!(w2.frame_count(), 1);
805
806        let _ = std::fs::remove_file(&p);
807    }
808
809    // -----------------------------------------------------------------
810    // Phase 11.2 — clock_high_water in the WAL header
811    // -----------------------------------------------------------------
812
813    #[test]
814    fn fresh_wal_starts_clock_at_zero() {
815        let p = tmp_wal("clock_fresh");
816        let w = Wal::create(&p).unwrap();
817        assert_eq!(w.clock_high_water(), 0);
818        drop(w);
819        let w2 = Wal::open(&p).unwrap();
820        assert_eq!(w2.clock_high_water(), 0);
821        let _ = std::fs::remove_file(&p);
822    }
823
824    /// Round-trip: setting the clock and triggering `truncate`
825    /// (= every checkpoint) must persist the high-water mark across a
826    /// reopen. This is the property Phase 11.6 GC relies on — without
827    /// it, two transactions on either side of a reopen could share a
828    /// timestamp and corrupt visibility.
829    #[test]
830    fn clock_high_water_round_trips_through_truncate() {
831        let p = tmp_wal("clock_truncate");
832        let mut w = Wal::create(&p).unwrap();
833        // Append a frame so truncate has something to drop. Doesn't
834        // matter what the body is — we're testing the header path.
835        w.append_frame(1, &page(0xaa), Some(1)).unwrap();
836        w.set_clock_high_water(12_345).unwrap();
837        w.truncate().unwrap();
838        assert_eq!(w.clock_high_water(), 12_345);
839        drop(w);
840
841        let w2 = Wal::open(&p).unwrap();
842        assert_eq!(w2.clock_high_water(), 12_345);
843        let _ = std::fs::remove_file(&p);
844    }
845
846    /// `truncate` rewrites the header. Bump-then-truncate-twice must
847    /// keep advancing the on-disk value as the in-memory clock moves.
848    #[test]
849    fn clock_high_water_is_monotonically_persisted_across_truncates() {
850        let p = tmp_wal("clock_monotonic_persist");
851        let mut w = Wal::create(&p).unwrap();
852        w.set_clock_high_water(100).unwrap();
853        w.truncate().unwrap();
854        w.set_clock_high_water(200).unwrap();
855        w.truncate().unwrap();
856        drop(w);
857
858        let w2 = Wal::open(&p).unwrap();
859        assert_eq!(w2.clock_high_water(), 200);
860        let _ = std::fs::remove_file(&p);
861    }
862
863    /// Setter rejects a value below the current high-water mark with
864    /// a typed error. Same value is accepted as a no-op (test below).
865    /// Same-or-greater is the contract every consumer relies on; a
866    /// silent saturate-to-current would mask a real bug in the caller.
867    #[test]
868    fn set_clock_high_water_rejects_regressions() {
869        let p = tmp_wal("clock_no_regress");
870        let mut w = Wal::create(&p).unwrap();
871        w.set_clock_high_water(500).unwrap();
872        let err = w.set_clock_high_water(499).unwrap_err();
873        let msg = format!("{err}");
874        assert!(
875            msg.contains("backwards") && msg.contains("499") && msg.contains("500"),
876            "expected typed regression error, got: {msg}"
877        );
878        // Idempotent same-value update is fine.
879        w.set_clock_high_water(500).unwrap();
880        assert_eq!(w.clock_high_water(), 500);
881        let _ = std::fs::remove_file(&p);
882    }
883
884    /// Synthetic v1 WAL — exact byte layout the previous engine
885    /// version wrote. Opening it must succeed and report
886    /// `clock_high_water == 0` (the bytes were reserved-zero in v1).
887    /// This is the "graceful upgrade" contract.
888    #[test]
889    fn v1_wal_opens_with_zero_clock() {
890        let p = tmp_wal("v1_compat");
891        // Hand-build a v1 WAL header. Frame body is intentionally
892        // omitted — we're testing header parsing, not frame replay.
893        let mut buf = vec![0u8; WAL_HEADER_SIZE];
894        buf[0..8].copy_from_slice(WAL_MAGIC);
895        buf[8..12].copy_from_slice(&1u32.to_le_bytes()); // version 1
896        buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
897        buf[16..20].copy_from_slice(&0xdead_beef_u32.to_le_bytes()); // salt
898        buf[20..24].copy_from_slice(&7u32.to_le_bytes()); // checkpoint_seq
899        // bytes 24..32 left as zero — v1's reserved bytes.
900        std::fs::write(&p, &buf).unwrap();
901
902        let w = Wal::open(&p).unwrap();
903        assert_eq!(w.header().salt, 0xdead_beef);
904        assert_eq!(w.header().checkpoint_seq, 7);
905        assert_eq!(
906            w.clock_high_water(),
907            0,
908            "v1 reserved bytes must read as clock=0"
909        );
910        let _ = std::fs::remove_file(&p);
911    }
912
913    /// Forward-versions we don't understand must error cleanly rather
914    /// than silently misinterpreting bytes. Picks a version far above
915    /// our `WAL_FORMAT_VERSION` so the test stays valid even after
916    /// future bumps.
917    #[test]
918    fn unknown_future_version_is_rejected() {
919        let p = tmp_wal("unknown_version");
920        let mut buf = vec![0u8; WAL_HEADER_SIZE];
921        buf[0..8].copy_from_slice(WAL_MAGIC);
922        buf[8..12].copy_from_slice(&999u32.to_le_bytes());
923        buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
924        std::fs::write(&p, &buf).unwrap();
925        let err = Wal::open(&p).unwrap_err();
926        let msg = format!("{err}");
927        assert!(
928            msg.contains("unsupported WAL format version") && msg.contains("999"),
929            "unexpected error shape: {msg}"
930        );
931        let _ = std::fs::remove_file(&p);
932    }
933
934    #[test]
935    fn partial_trailing_frame_is_ignored() {
936        // Write one valid frame, then append a half-frame's worth of
937        // random bytes. The reader should stop cleanly at the valid
938        // frame.
939        let p = tmp_wal("partial");
940        let mut w = Wal::create(&p).unwrap();
941        w.append_frame(42, &page(42), Some(1)).unwrap();
942        drop(w);
943        {
944            let mut f = OpenOptions::new().write(true).open(&p).unwrap();
945            f.seek(SeekFrom::End(0)).unwrap();
946            f.write_all(&[0xaa; 2000]).unwrap();
947        }
948        let mut w2 = Wal::open(&p).unwrap();
949        assert_eq!(
950            w2.read_page(42).unwrap().unwrap().as_ref(),
951            page(42).as_ref()
952        );
953        assert_eq!(w2.frame_count(), 1);
954        let _ = std::fs::remove_file(&p);
955    }
956}