sqlrite/sql/pager/wal.rs
1//! Write-Ahead Log (WAL) file format.
2//!
3//! Phase 4b introduces the `.sqlrite-wal` sidecar file. Writes don't go to
4//! the main `.sqlrite` file anymore once the WAL is wired in (Phase 4c);
5//! instead they append **frames** to this log, and a periodic checkpoint
6//! (Phase 4d) applies frames back into the main file.
7//!
8//! This module is the format layer — header, frame, codec, reader,
9//! writer. It doesn't know anything about the `Pager` yet; that wiring is
10//! the next slice.
11//!
12//! **On-disk layout**
13//!
14//! ```text
15//! byte 0..32 WAL header
16//! 0..8 magic "SQLRWAL\0"
17//! 8..12 format version (u32 LE)
18//! v1: pre-Phase-11
19//! v2: Phase 11.2 — adds clock_high_water
20//! in bytes 24..32
21//! 12..16 page size (u32 LE) = 4096
22//! 16..20 salt (u32 LE) — random on create,
23//! re-rolled per checkpoint
24//! 20..24 checkpoint seq (u32 LE) — bumps per checkpoint
25//! 24..32 v2: clock_high_water (u64 LE) — last
26//! persisted MVCC logical clock value;
27//! `crate::mvcc::MvccClock::observe`'d on
28//! reopen so timestamps don't reuse values
29//! across restarts.
30//! v1: reserved / zero (read as 0).
31//!
32//! byte 32.. sequence of frames, each `FRAME_SIZE` bytes:
33//! 0..4 page number (u32 LE)
34//! 4..8 commit-page-count (u32 LE)
35//! 0 = dirty frame (part of an open write)
36//! >0 = commit frame; value = page count at commit
37//! 8..12 salt (u32 LE) — copied from WAL header,
38//! detects truncation / file swap
39//! 12..16 checksum (u32 LE) — rolling sum over the
40//! frame header bytes
41//! [0..12] + the payload
42//! 16..16+PAGE_SIZE page bytes
43//! ```
44//!
45//! **Format version compatibility.** v1 WALs (written by pre-Phase-11
46//! builds) open cleanly: their reserved bytes are zero, which we
47//! interpret as `clock_high_water = 0` — exactly what a fresh-from-
48//! checkpoint clock would carry. The next time the WAL is rewritten
49//! (any checkpoint, including the auto-checkpoint that fires past the
50//! frame threshold) it lands on disk as v2. There's no "you must
51//! upgrade your files" step.
52//!
53//! **Checksum.** A rolling `rotate_left(1) + byte` sum over the
54//! concatenation of the frame's first 12 header bytes (page_num,
55//! commit-page-count, salt) and its PAGE_SIZE body. Catches bit flips
56//! and most multi-byte corruption without pulling in a dep. The 13th
57//! through 16th header bytes (the checksum field itself) are excluded
58//! from the computation, obviously.
59//!
60//! **Torn-write recovery.** On open, the reader walks frames from the
61//! start and verifies each checksum. The first invalid or incomplete
62//! frame marks where the WAL effectively ends; anything past it is
63//! treated as if it doesn't exist. Callers learn what's committed vs
64//! what's speculative from `Wal::last_commit_offset` / the `is_commit`
65//! flag of each scanned frame.
66
67use std::collections::HashMap;
68use std::fs::{File, OpenOptions};
69use std::io::{Read, Seek, SeekFrom, Write};
70use std::path::{Path, PathBuf};
71use std::time::{SystemTime, UNIX_EPOCH};
72
73use crate::error::{Result, SQLRiteError};
74use crate::sql::pager::page::PAGE_SIZE;
75use crate::sql::pager::pager::{AccessMode, acquire_lock};
76
77pub const WAL_HEADER_SIZE: usize = 32;
78pub const WAL_MAGIC: &[u8; 8] = b"SQLRWAL\0";
79/// The version the engine writes today. Phase 11.2 bumped 1 → 2 to
80/// Bumped 2 → 3 in Phase 11.9 to mark "may contain MVCC log-record
81/// frames" — frames whose `page_num` field carries
82/// [`crate::mvcc::MVCC_FRAME_MARKER`] (`u32::MAX`) instead of a
83/// real page number. v1 and v2 readers had no special-case for that
84/// marker and a pre-Phase-11.9 checkpoint would try to flush it to
85/// the main file at offset `u32::MAX * PAGE_SIZE` (way past EOF),
86/// which is why the bump is needed.
87pub const WAL_FORMAT_VERSION: u32 = 3;
88/// Lowest format version we know how to open. v1 had the bytes that
89/// now hold `clock_high_water` reserved-as-zero, which is identical
90/// to "clock has never been persisted" and round-trips cleanly. v2
91/// adds the clock_high_water field but no MVCC frames. v3 adds MVCC
92/// frames; downgrading a v3 WAL into a v2 reader would mis-handle
93/// the MVCC marker page number — but a fresh `truncate` (every
94/// checkpoint) rewrites the header at the engine's current version,
95/// so the cross-version exposure is bounded.
96pub const WAL_FORMAT_VERSION_MIN_SUPPORTED: u32 = 1;
97pub const FRAME_HEADER_SIZE: usize = 16;
98pub const FRAME_SIZE: usize = FRAME_HEADER_SIZE + PAGE_SIZE;
99
100/// Parsed WAL header. `page_size` is redundant with the engine's compile-
101/// time constant; we persist it for forward-compat and reject anything
102/// that doesn't match at open time.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub struct WalHeader {
105 pub salt: u32,
106 pub checkpoint_seq: u32,
107 /// Phase 11.2 — the last MVCC logical-clock value persisted to
108 /// this WAL. Rewritten by `truncate` (= every checkpoint) so a
109 /// reopen-then-tick can never reuse a timestamp the previous run
110 /// already handed out. Always 0 for v1 WALs (the bytes were
111 /// reserved-zero before the bump); read back as 0 on any v1 file
112 /// that pre-existed this build.
113 pub clock_high_water: u64,
114}
115
116/// Parsed per-frame header (everything but the page body).
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct FrameHeader {
119 pub page_num: u32,
120 pub commit_page_count: u32,
121 pub salt: u32,
122 pub checksum: u32,
123}
124
125impl FrameHeader {
126 /// A commit frame is the "transaction barrier": every preceding dirty
127 /// frame up to the previous commit frame (or the WAL header) belongs
128 /// to the transaction this one seals.
129 pub fn is_commit(&self) -> bool {
130 self.commit_page_count != 0
131 }
132}
133
134pub struct Wal {
135 // File carries a Debug impl; we don't derive on Wal because we don't
136 // want to dump the whole latest_frame map into the default Debug output.
137 file: File,
138 path: PathBuf,
139 header: WalHeader,
140 /// Page → byte offset of the LATEST frame carrying that page's
141 /// content. Offsets point at the start of the 16-byte frame header.
142 /// A reader consults this to resolve a page via the WAL before
143 /// falling back to the main DB file (that's Phase 4c).
144 latest_frame: HashMap<u32, u64>,
145 /// Byte offset just past the last valid commit frame. Anything past
146 /// this is uncommitted and should be ignored by readers. Equals
147 /// `WAL_HEADER_SIZE` when there's nothing committed yet.
148 last_commit_offset: u64,
149 /// Post-commit page count carried in the most recent commit frame.
150 last_commit_page_count: Option<u32>,
151 /// Total valid frames (up to and including `last_commit_offset`).
152 /// Used by the checkpointer in Phase 4d to decide whether to run.
153 frame_count: usize,
154 /// Phase 11.9 — MVCC commit batches recovered from the WAL on
155 /// open, in the order they were committed (oldest first).
156 /// Populated by `replay_frames` from frames carrying
157 /// `page_num = MVCC_FRAME_MARKER`. `Pager::open` exposes the
158 /// vector so the engine can replay them into `MvStore` on
159 /// reopen.
160 recovered_mvcc: Vec<crate::mvcc::MvccCommitBatch>,
161}
162
163impl Wal {
164 /// Creates a fresh WAL file, truncating any existing one. Writes the
165 /// header synchronously so a subsequent `open` sees a valid file even
166 /// if the caller panics before appending any frames. Always takes an
167 /// exclusive lock — create is a write operation by definition.
168 pub fn create(path: &Path) -> Result<Self> {
169 let file = OpenOptions::new()
170 .read(true)
171 .write(true)
172 .create(true)
173 .truncate(true)
174 .open(path)?;
175 acquire_lock(&file, path, AccessMode::ReadWrite)?;
176
177 let salt = random_salt();
178 let header = WalHeader {
179 salt,
180 checkpoint_seq: 0,
181 clock_high_water: 0,
182 };
183 let mut wal = Self {
184 file,
185 path: path.to_path_buf(),
186 header,
187 latest_frame: HashMap::new(),
188 last_commit_offset: WAL_HEADER_SIZE as u64,
189 last_commit_page_count: None,
190 frame_count: 0,
191 recovered_mvcc: Vec::new(),
192 };
193 wal.write_header()?;
194 wal.file.flush()?;
195 wal.file.sync_all()?;
196 Ok(wal)
197 }
198
199 /// Opens an existing WAL file with an exclusive lock (read-write).
200 /// Convenience wrapper around [`Wal::open_with_mode`].
201 pub fn open(path: &Path) -> Result<Self> {
202 Self::open_with_mode(path, AccessMode::ReadWrite)
203 }
204
205 /// Opens an existing WAL file with the given access mode. In
206 /// `ReadOnly` mode the file descriptor is opened read-only and the
207 /// advisory lock is shared — multiple read-only openers may coexist.
208 /// Walks every frame from the start, validates checksums, and builds
209 /// the in-memory `latest_frame` index. A torn or corrupted frame is
210 /// treated as the end of the log — its bytes and anything after stay
211 /// on disk but are ignored by reads.
212 pub fn open_with_mode(path: &Path, mode: AccessMode) -> Result<Self> {
213 let mut file = match mode {
214 AccessMode::ReadWrite => OpenOptions::new().read(true).write(true).open(path)?,
215 AccessMode::ReadOnly => OpenOptions::new().read(true).open(path)?,
216 };
217 acquire_lock(&file, path, mode)?;
218
219 let header = read_header(&mut file)?;
220 let mut wal = Self {
221 file,
222 path: path.to_path_buf(),
223 header,
224 latest_frame: HashMap::new(),
225 last_commit_offset: WAL_HEADER_SIZE as u64,
226 last_commit_page_count: None,
227 frame_count: 0,
228 recovered_mvcc: Vec::new(),
229 };
230 wal.replay_frames()?;
231 Ok(wal)
232 }
233
234 pub fn header(&self) -> WalHeader {
235 self.header
236 }
237
238 pub fn frame_count(&self) -> usize {
239 self.frame_count
240 }
241
242 pub fn last_commit_page_count(&self) -> Option<u32> {
243 self.last_commit_page_count
244 }
245
246 /// Phase 11.2 — the MVCC logical-clock high-water mark persisted
247 /// in this WAL's header. Returns 0 for fresh-from-create WALs and
248 /// for v1 WALs (where the bytes were reserved-zero). The Pager
249 /// (Phase 11.3) seeds the in-memory `MvccClock` from this on open.
250 pub fn clock_high_water(&self) -> u64 {
251 self.header.clock_high_water
252 }
253
254 /// Phase 11.2 — overrides the in-memory clock high-water value.
255 /// `truncate` writes whatever value the WAL is carrying when it
256 /// runs, so callers update this just before checkpoint to persist
257 /// the latest in-memory clock value. The setter rejects a
258 /// non-monotonic update (a value below the existing high-water
259 /// mark) — that would either be a bug in the caller or evidence
260 /// of a corrupted in-memory clock. Same value is a no-op.
261 pub fn set_clock_high_water(&mut self, value: u64) -> Result<()> {
262 if value < self.header.clock_high_water {
263 return Err(SQLRiteError::General(format!(
264 "WAL clock_high_water cannot move backwards: \
265 attempted {value}, current {}",
266 self.header.clock_high_water
267 )));
268 }
269 self.header.clock_high_water = value;
270 Ok(())
271 }
272
273 /// Bulk-loads every committed page from the WAL into `dest`. Used by
274 /// `Pager::open` to warm a WAL cache so subsequent reads don't have
275 /// to seek back into the WAL file. Uncommitted frames are skipped
276 /// (same rule as `read_page`).
277 pub fn load_committed_into(
278 &mut self,
279 dest: &mut HashMap<u32, Box<[u8; PAGE_SIZE]>>,
280 ) -> Result<()> {
281 // Snapshot the page numbers upfront so we don't hold a borrow of
282 // `self` while calling the mutating `read_page`.
283 let pages: Vec<u32> = self.latest_frame.keys().copied().collect();
284 for page_num in pages {
285 if let Some(body) = self.read_page(page_num)? {
286 dest.insert(page_num, body);
287 }
288 }
289 Ok(())
290 }
291
292 /// Appends a new frame at the current end of file. `commit_page_count`
293 /// of `None` writes a dirty (in-progress) frame; `Some(n)` writes a
294 /// commit frame carrying the post-commit page count. On commit the
295 /// frame is fsync'd; dirty frames are not — torn writes are recovered
296 /// by the checksum check on next open.
297 pub fn append_frame(
298 &mut self,
299 page_num: u32,
300 content: &[u8; PAGE_SIZE],
301 commit_page_count: Option<u32>,
302 ) -> Result<()> {
303 // Build the header in a buffer so we can checksum + write it
304 // atomically alongside the body.
305 let mut header_buf = [0u8; FRAME_HEADER_SIZE];
306 header_buf[0..4].copy_from_slice(&page_num.to_le_bytes());
307 header_buf[4..8].copy_from_slice(&commit_page_count.unwrap_or(0).to_le_bytes());
308 header_buf[8..12].copy_from_slice(&self.header.salt.to_le_bytes());
309 let sum = compute_checksum(&header_buf[0..12], content);
310 header_buf[12..16].copy_from_slice(&sum.to_le_bytes());
311
312 // Frame lands at the current tail.
313 let offset = self.file.seek(SeekFrom::End(0))?;
314 self.file.write_all(&header_buf)?;
315 self.file.write_all(content)?;
316
317 // Commit frames sync; dirty frames are buffered.
318 if commit_page_count.is_some() {
319 self.file.flush()?;
320 self.file.sync_all()?;
321 }
322
323 // Update in-memory state — the latest-frame map always points at the
324 // newest frame, whether dirty or committed. Readers consult the
325 // commit-barrier separately to decide what's visible.
326 self.latest_frame.insert(page_num, offset);
327 if let Some(pc) = commit_page_count {
328 self.last_commit_offset = offset + FRAME_SIZE as u64;
329 self.last_commit_page_count = Some(pc);
330 }
331 self.frame_count += 1;
332 Ok(())
333 }
334
335 /// Reads the most recent committed copy of a page from the WAL, or
336 /// `None` if no committed frame has been written for this page since
337 /// the last checkpoint. Uncommitted (dirty) frames are skipped — a
338 /// reader must only see committed state.
339 pub fn read_page(&mut self, page_num: u32) -> Result<Option<Box<[u8; PAGE_SIZE]>>> {
340 let Some(&offset) = self.latest_frame.get(&page_num) else {
341 return Ok(None);
342 };
343 // If this frame sits past the last commit barrier it's
344 // uncommitted — not visible.
345 if offset + FRAME_SIZE as u64 > self.last_commit_offset {
346 return Ok(None);
347 }
348 let (_hdr, body) = self.read_frame_at(offset)?;
349 Ok(Some(body))
350 }
351
352 /// Truncates the WAL back to just the header and rolls the salt.
353 /// Called by the checkpointer (Phase 4d) once it has applied
354 /// accumulated frames to the main file.
355 pub fn truncate(&mut self) -> Result<()> {
356 self.header.salt = random_salt();
357 self.header.checkpoint_seq = self.header.checkpoint_seq.wrapping_add(1);
358 self.file.set_len(WAL_HEADER_SIZE as u64)?;
359 self.write_header()?;
360 self.file.flush()?;
361 self.file.sync_all()?;
362 self.latest_frame.clear();
363 self.last_commit_offset = WAL_HEADER_SIZE as u64;
364 self.last_commit_page_count = None;
365 self.frame_count = 0;
366 // Phase 11.9 — the recovered MVCC batches were a snapshot
367 // taken at WAL replay time and represent commits the
368 // current process has now seen. Clearing them on truncate
369 // matches the legacy `latest_frame.clear()` policy: the
370 // WAL is now empty on disk, so the in-memory mirror is
371 // empty too. (The engine still holds the *applied* state
372 // in `MvStore`; this vector is only the rebuildable-from-
373 // WAL portion.)
374 self.recovered_mvcc.clear();
375 Ok(())
376 }
377
378 // ---- internal helpers ------------------------------------------------
379
380 fn write_header(&mut self) -> Result<()> {
381 let mut buf = [0u8; WAL_HEADER_SIZE];
382 buf[0..8].copy_from_slice(WAL_MAGIC);
383 buf[8..12].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
384 buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
385 buf[16..20].copy_from_slice(&self.header.salt.to_le_bytes());
386 buf[20..24].copy_from_slice(&self.header.checkpoint_seq.to_le_bytes());
387 // Phase 11.2: bytes 24..32 carry the MVCC clock high-water
388 // mark (u64 LE). v1 WALs left this as zeros, which v2 reads
389 // as "no timestamps have been issued" — the same value a
390 // newly-created v2 WAL starts at. That's how the v1 → v2
391 // upgrade stays seamless.
392 buf[24..32].copy_from_slice(&self.header.clock_high_water.to_le_bytes());
393 self.file.seek(SeekFrom::Start(0))?;
394 self.file.write_all(&buf)?;
395 Ok(())
396 }
397
398 /// Reads and parses one frame at `offset`. Returns `(header, body)`.
399 /// Errors if the frame is truncated or the checksum fails.
400 fn read_frame_at(&mut self, offset: u64) -> Result<(FrameHeader, Box<[u8; PAGE_SIZE]>)> {
401 self.file.seek(SeekFrom::Start(offset))?;
402 let mut header_buf = [0u8; FRAME_HEADER_SIZE];
403 self.file.read_exact(&mut header_buf)?;
404 let mut body = Box::new([0u8; PAGE_SIZE]);
405 self.file.read_exact(body.as_mut())?;
406
407 let page_num = u32::from_le_bytes(header_buf[0..4].try_into().unwrap());
408 let commit_page_count = u32::from_le_bytes(header_buf[4..8].try_into().unwrap());
409 let salt = u32::from_le_bytes(header_buf[8..12].try_into().unwrap());
410 let stored_checksum = u32::from_le_bytes(header_buf[12..16].try_into().unwrap());
411
412 if salt != self.header.salt {
413 return Err(SQLRiteError::General(format!(
414 "WAL frame at offset {offset}: salt mismatch (expected {:x}, got {:x})",
415 self.header.salt, salt
416 )));
417 }
418 let computed = compute_checksum(&header_buf[0..12], &body);
419 if computed != stored_checksum {
420 return Err(SQLRiteError::General(format!(
421 "WAL frame at offset {offset}: bad checksum (expected {stored_checksum:x}, got {computed:x})"
422 )));
423 }
424
425 Ok((
426 FrameHeader {
427 page_num,
428 commit_page_count,
429 salt,
430 checksum: stored_checksum,
431 },
432 body,
433 ))
434 }
435
436 /// Walks every frame from `WAL_HEADER_SIZE` to end-of-file, validating
437 /// each checksum and building `latest_frame`. A frame with a salt
438 /// mismatch or bad checksum marks the end of the usable log (earlier
439 /// frames are still valid). The last commit frame we successfully
440 /// read defines `last_commit_offset`.
441 ///
442 /// Key invariant: `latest_frame` only holds offsets of *committed*
443 /// frames. Dirty frames belonging to an in-progress (or crashed)
444 /// transaction accumulate in a pending map and are promoted on the
445 /// commit frame that seals them — or discarded if the log ends before
446 /// a commit arrives. Without this, an orphan dirty frame for page N
447 /// would shadow the previous committed frame for page N, erasing it
448 /// from visibility.
449 fn replay_frames(&mut self) -> Result<()> {
450 use crate::mvcc::{MVCC_FRAME_MARKER, MvccCommitBatch};
451
452 let file_len = self.file.seek(SeekFrom::End(0))?;
453 let mut offset = WAL_HEADER_SIZE as u64;
454 let mut pending: HashMap<u32, u64> = HashMap::new();
455 // Phase 11.9 — MVCC batches waiting for the next commit
456 // barrier. They share the legacy page-commit barrier's
457 // fsync, so we promote them at the same `is_commit` point
458 // page frames are promoted.
459 let mut pending_mvcc: Vec<MvccCommitBatch> = Vec::new();
460 while offset + FRAME_SIZE as u64 <= file_len {
461 match self.read_frame_at(offset) {
462 Ok((header, body)) => {
463 self.frame_count += 1;
464 if header.page_num == MVCC_FRAME_MARKER {
465 // MVCC log-record frame. Decode body now —
466 // if it's corrupt the whole frame falls
467 // out of the log (treated the same as a
468 // bad checksum).
469 match MvccCommitBatch::decode(body.as_ref()) {
470 Ok(batch) => pending_mvcc.push(batch),
471 Err(_) => break,
472 }
473 } else {
474 pending.insert(header.page_num, offset);
475 }
476 if header.is_commit() {
477 // Seal: promote all pending page frames
478 // (including this commit frame itself)
479 // into latest_frame, plus all pending
480 // MVCC batches into recovered_mvcc.
481 for (p, o) in pending.drain() {
482 self.latest_frame.insert(p, o);
483 }
484 self.recovered_mvcc.extend(pending_mvcc.drain(..));
485 self.last_commit_offset = offset + FRAME_SIZE as u64;
486 self.last_commit_page_count = Some(header.commit_page_count);
487 }
488 offset += FRAME_SIZE as u64;
489 }
490 // A bad frame is the torn-write boundary. Keep everything
491 // before it.
492 Err(_) => break,
493 }
494 }
495 // Anything still in `pending` or `pending_mvcc` belongs to
496 // a transaction that never committed (crash, or a writer
497 // that died mid-append). Drop it — the legacy and the
498 // MVCC writes both fail together, matching the atomicity
499 // contract.
500 Ok(())
501 }
502
503 /// Phase 11.9 — appends an MVCC commit batch as a single WAL
504 /// frame whose `page_num` is set to
505 /// [`MVCC_FRAME_MARKER`](crate::mvcc::MVCC_FRAME_MARKER).
506 ///
507 /// Writes the frame as a "dirty" frame
508 /// (`commit_page_count = None`) so it doesn't seal a
509 /// transaction by itself — it piggybacks on the next legacy
510 /// page-commit frame's fsync. This is the durability boundary
511 /// for `BEGIN CONCURRENT`: the MVCC batch and the legacy page
512 /// updates of the same transaction land in the WAL together
513 /// and either both survive the next fsync or both fall away
514 /// at the torn-write boundary on reopen.
515 ///
516 /// `Connection::commit_concurrent` calls this right before
517 /// `save_database`, so the same fsync covers both.
518 pub fn append_mvcc_batch(&mut self, batch: &crate::mvcc::MvccCommitBatch) -> Result<()> {
519 let body = batch.encode()?;
520 self.append_frame(crate::mvcc::MVCC_FRAME_MARKER, body.as_ref(), None)
521 }
522
523 /// Phase 11.9 — returns the MVCC commit batches recovered from
524 /// the WAL on open, in commit order. Empty if no MVCC frames
525 /// were present (a brand-new WAL, a pre-11.9 v1/v2 WAL, or one
526 /// where the last commit barrier dropped MVCC frames).
527 pub fn recovered_mvcc_commits(&self) -> &[crate::mvcc::MvccCommitBatch] {
528 &self.recovered_mvcc
529 }
530}
531
532impl std::fmt::Debug for Wal {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 f.debug_struct("Wal")
535 .field("path", &self.path)
536 .field("salt", &format_args!("{:#x}", self.header.salt))
537 .field("checkpoint_seq", &self.header.checkpoint_seq)
538 .field("frame_count", &self.frame_count)
539 .field("last_commit_page_count", &self.last_commit_page_count)
540 .finish()
541 }
542}
543
544fn read_header(file: &mut File) -> Result<WalHeader> {
545 let mut buf = [0u8; WAL_HEADER_SIZE];
546 file.seek(SeekFrom::Start(0))?;
547 // read_exact on a short file would bubble up as a generic io error —
548 // surface it as "bad magic" instead so the caller gets a consistent
549 // diagnosis regardless of whether the file is short-and-garbage or
550 // long-and-garbage.
551 if file.read_exact(&mut buf).is_err() {
552 return Err(SQLRiteError::General(
553 "file is not a SQLRite WAL (too short / bad magic)".to_string(),
554 ));
555 }
556 if &buf[0..8] != WAL_MAGIC {
557 return Err(SQLRiteError::General(
558 "file is not a SQLRite WAL (bad magic)".to_string(),
559 ));
560 }
561 let version = u32::from_le_bytes(buf[8..12].try_into().unwrap());
562 // Phase 11.2 — accept v1 (clock bytes are reserved-zero) and v2
563 // (clock bytes carry the high-water mark). Anything else is
564 // either a corrupt header or a forward-version we don't
565 // understand; reject with a clean error.
566 if !(WAL_FORMAT_VERSION_MIN_SUPPORTED..=WAL_FORMAT_VERSION).contains(&version) {
567 return Err(SQLRiteError::General(format!(
568 "unsupported WAL format version {version}; this build reads \
569 v{WAL_FORMAT_VERSION_MIN_SUPPORTED}..=v{WAL_FORMAT_VERSION}"
570 )));
571 }
572 let page_size = u32::from_le_bytes(buf[12..16].try_into().unwrap()) as usize;
573 if page_size != PAGE_SIZE {
574 return Err(SQLRiteError::General(format!(
575 "WAL page size {page_size} doesn't match engine's {PAGE_SIZE}"
576 )));
577 }
578 let salt = u32::from_le_bytes(buf[16..20].try_into().unwrap());
579 let checkpoint_seq = u32::from_le_bytes(buf[20..24].try_into().unwrap());
580 // v1 wrote zeros into bytes 24..32; v2 puts the clock high-water
581 // there. Either way, decoding as `u64::from_le_bytes` produces
582 // the right value — 0 for v1, the persisted clock for v2.
583 let clock_high_water = u64::from_le_bytes(buf[24..32].try_into().unwrap());
584 Ok(WalHeader {
585 salt,
586 checkpoint_seq,
587 clock_high_water,
588 })
589}
590
591fn random_salt() -> u32 {
592 // Seeded from SystemTime. Crypto-grade randomness isn't needed — the
593 // salt's only job is to make a post-truncate WAL file visibly
594 // different from the pre-truncate one (so stale tail bytes can't
595 // collide).
596 SystemTime::now()
597 .duration_since(UNIX_EPOCH)
598 .map(|d| (d.as_nanos() as u32) ^ (d.as_secs() as u32).rotate_left(13))
599 .unwrap_or(0xdeadbeef)
600}
601
602/// Rolling sum over `(header_bytes ++ body)`. `rotate_left(1)` per byte
603/// makes the checksum order-sensitive, so bit flips AND byte shuffles
604/// are detected.
605fn compute_checksum(header_bytes: &[u8], body: &[u8; PAGE_SIZE]) -> u32 {
606 let mut sum: u32 = 0;
607 for &b in header_bytes {
608 sum = sum.rotate_left(1).wrapping_add(b as u32);
609 }
610 for &b in body.iter() {
611 sum = sum.rotate_left(1).wrapping_add(b as u32);
612 }
613 sum
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 fn tmp_wal(name: &str) -> PathBuf {
621 let mut p = std::env::temp_dir();
622 let pid = std::process::id();
623 let nanos = std::time::SystemTime::now()
624 .duration_since(UNIX_EPOCH)
625 .map(|d| d.as_nanos())
626 .unwrap_or(0);
627 p.push(format!("sqlrite-wal-{pid}-{nanos}-{name}.wal"));
628 p
629 }
630
631 fn page(byte: u8) -> Box<[u8; PAGE_SIZE]> {
632 let mut b = Box::new([0u8; PAGE_SIZE]);
633 for (i, slot) in b.iter_mut().enumerate() {
634 *slot = byte.wrapping_add(i as u8);
635 }
636 b
637 }
638
639 #[test]
640 fn create_then_open_round_trips_an_empty_wal() {
641 let p = tmp_wal("empty");
642 let w = Wal::create(&p).unwrap();
643 assert_eq!(w.frame_count(), 0);
644 assert_eq!(w.last_commit_page_count(), None);
645 let salt = w.header().salt;
646 drop(w);
647
648 let w2 = Wal::open(&p).unwrap();
649 assert_eq!(w2.header().salt, salt);
650 assert_eq!(w2.frame_count(), 0);
651 assert_eq!(w2.last_commit_page_count(), None);
652
653 let _ = std::fs::remove_file(&p);
654 }
655
656 #[test]
657 fn single_commit_frame_round_trips() {
658 let p = tmp_wal("one_frame");
659 let mut w = Wal::create(&p).unwrap();
660 let content = page(0xab);
661 w.append_frame(7, &content, Some(42)).unwrap();
662 assert_eq!(w.frame_count(), 1);
663 assert_eq!(w.last_commit_page_count(), Some(42));
664 drop(w);
665
666 let mut w2 = Wal::open(&p).unwrap();
667 assert_eq!(w2.frame_count(), 1);
668 assert_eq!(w2.last_commit_page_count(), Some(42));
669 let read = w2.read_page(7).unwrap().expect("frame should be visible");
670 assert_eq!(read.as_ref(), content.as_ref());
671 assert!(
672 w2.read_page(99).unwrap().is_none(),
673 "untouched page is None"
674 );
675
676 let _ = std::fs::remove_file(&p);
677 }
678
679 #[test]
680 fn multi_frame_commits_and_latest_wins() {
681 // Three commits to the same page; the latest one should be what
682 // read_page returns.
683 let p = tmp_wal("latest_wins");
684 let mut w = Wal::create(&p).unwrap();
685 w.append_frame(1, &page(1), Some(10)).unwrap();
686 w.append_frame(1, &page(2), Some(10)).unwrap();
687 w.append_frame(1, &page(3), Some(10)).unwrap();
688 w.append_frame(2, &page(9), Some(10)).unwrap();
689 assert_eq!(w.frame_count(), 4);
690 drop(w);
691
692 let mut w2 = Wal::open(&p).unwrap();
693 assert_eq!(w2.read_page(1).unwrap().unwrap().as_ref(), page(3).as_ref());
694 assert_eq!(w2.read_page(2).unwrap().unwrap().as_ref(), page(9).as_ref());
695 let _ = std::fs::remove_file(&p);
696 }
697
698 #[test]
699 fn orphan_dirty_tail_preserves_previous_commit() {
700 // A dirty frame at the tail with no commit frame following it
701 // belongs to a transaction that never sealed. The reader must
702 // fall back to the previous committed frame for that page rather
703 // than treating the page as absent — otherwise a crash mid-write
704 // would erase the page's last durable value.
705 let p = tmp_wal("dirty_tail");
706 let mut w = Wal::create(&p).unwrap();
707 w.append_frame(5, &page(50), Some(10)).unwrap(); // committed V1
708 w.append_frame(5, &page(51), None).unwrap(); // orphan dirty V2
709 drop(w);
710
711 let mut w2 = Wal::open(&p).unwrap();
712 // latest_frame points at the committed offset, NOT the orphan's.
713 // read_page returns V1 — the orphan is invisible.
714 let got = w2
715 .read_page(5)
716 .unwrap()
717 .expect("committed V1 should still be visible");
718 assert_eq!(got.as_ref(), page(50).as_ref());
719 // Both frames are still present on disk; frame_count reflects that.
720 assert_eq!(w2.frame_count(), 2);
721 let _ = std::fs::remove_file(&p);
722 }
723
724 #[test]
725 fn uncommitted_frame_for_untouched_page_returns_none() {
726 // Contrast with the previous test: a dirty frame for a page that
727 // was never committed has no fallback, so read_page returns None.
728 let p = tmp_wal("dirty_only");
729 let mut w = Wal::create(&p).unwrap();
730 w.append_frame(7, &page(70), None).unwrap(); // dirty, no commit
731 drop(w);
732
733 let mut w2 = Wal::open(&p).unwrap();
734 assert_eq!(w2.read_page(7).unwrap(), None);
735 let _ = std::fs::remove_file(&p);
736 }
737
738 #[test]
739 fn truncate_resets_to_empty_and_rolls_salt() {
740 let p = tmp_wal("truncate");
741 let mut w = Wal::create(&p).unwrap();
742 w.append_frame(1, &page(11), Some(5)).unwrap();
743 w.append_frame(2, &page(22), Some(5)).unwrap();
744 let seq_before = w.header().checkpoint_seq;
745 let salt_before = w.header().salt;
746 w.truncate().unwrap();
747 assert_eq!(w.frame_count(), 0);
748 assert_eq!(w.last_commit_page_count(), None);
749 assert_eq!(w.header().checkpoint_seq, seq_before + 1);
750 // Salt is randomly rolled; we can't assert a specific value, but
751 // it should almost never match the previous one.
752 let _ = salt_before; // the SystemTime-based salt can collide in a
753 // theoretical tie; don't assert inequality to avoid flakes.
754
755 // Drop w so its exclusive lock releases before we reopen the same
756 // path for verification.
757 drop(w);
758
759 // After truncate, read_page returns None for pages we previously
760 // wrote — the frames are gone.
761 let mut w2 = Wal::open(&p).unwrap();
762 assert_eq!(w2.frame_count(), 0);
763 assert_eq!(w2.read_page(1).unwrap(), None);
764 assert_eq!(w2.read_page(2).unwrap(), None);
765
766 let _ = std::fs::remove_file(&p);
767 }
768
769 #[test]
770 fn bad_magic_file_is_rejected() {
771 let p = tmp_wal("bad_magic");
772 std::fs::write(&p, b"not a WAL file").unwrap();
773 let err = Wal::open(&p).unwrap_err();
774 assert!(format!("{err}").contains("bad magic"));
775 let _ = std::fs::remove_file(&p);
776 }
777
778 #[test]
779 fn corrupt_frame_body_marks_end_of_log() {
780 // Write two valid commit frames, then flip a byte in the second
781 // frame's body. The reader should accept the first frame and
782 // treat the second as end-of-log.
783 let p = tmp_wal("bit_flip");
784 let mut w = Wal::create(&p).unwrap();
785 w.append_frame(1, &page(0x11), Some(5)).unwrap();
786 w.append_frame(2, &page(0x22), Some(5)).unwrap();
787 drop(w);
788
789 // Flip a byte in the second frame's body. Frame 2's body starts
790 // at offset WAL_HEADER_SIZE + FRAME_SIZE + FRAME_HEADER_SIZE.
791 let body_offset = WAL_HEADER_SIZE + FRAME_SIZE + FRAME_HEADER_SIZE;
792 let mut buf = std::fs::read(&p).unwrap();
793 buf[body_offset] ^= 0xff;
794 std::fs::write(&p, &buf).unwrap();
795
796 let mut w2 = Wal::open(&p).unwrap();
797 // First frame survived.
798 assert_eq!(
799 w2.read_page(1).unwrap().unwrap().as_ref(),
800 page(0x11).as_ref()
801 );
802 // Second frame was truncated out — its content isn't readable.
803 assert_eq!(w2.read_page(2).unwrap(), None);
804 assert_eq!(w2.frame_count(), 1);
805
806 let _ = std::fs::remove_file(&p);
807 }
808
809 // -----------------------------------------------------------------
810 // Phase 11.2 — clock_high_water in the WAL header
811 // -----------------------------------------------------------------
812
813 #[test]
814 fn fresh_wal_starts_clock_at_zero() {
815 let p = tmp_wal("clock_fresh");
816 let w = Wal::create(&p).unwrap();
817 assert_eq!(w.clock_high_water(), 0);
818 drop(w);
819 let w2 = Wal::open(&p).unwrap();
820 assert_eq!(w2.clock_high_water(), 0);
821 let _ = std::fs::remove_file(&p);
822 }
823
824 /// Round-trip: setting the clock and triggering `truncate`
825 /// (= every checkpoint) must persist the high-water mark across a
826 /// reopen. This is the property Phase 11.6 GC relies on — without
827 /// it, two transactions on either side of a reopen could share a
828 /// timestamp and corrupt visibility.
829 #[test]
830 fn clock_high_water_round_trips_through_truncate() {
831 let p = tmp_wal("clock_truncate");
832 let mut w = Wal::create(&p).unwrap();
833 // Append a frame so truncate has something to drop. Doesn't
834 // matter what the body is — we're testing the header path.
835 w.append_frame(1, &page(0xaa), Some(1)).unwrap();
836 w.set_clock_high_water(12_345).unwrap();
837 w.truncate().unwrap();
838 assert_eq!(w.clock_high_water(), 12_345);
839 drop(w);
840
841 let w2 = Wal::open(&p).unwrap();
842 assert_eq!(w2.clock_high_water(), 12_345);
843 let _ = std::fs::remove_file(&p);
844 }
845
846 /// `truncate` rewrites the header. Bump-then-truncate-twice must
847 /// keep advancing the on-disk value as the in-memory clock moves.
848 #[test]
849 fn clock_high_water_is_monotonically_persisted_across_truncates() {
850 let p = tmp_wal("clock_monotonic_persist");
851 let mut w = Wal::create(&p).unwrap();
852 w.set_clock_high_water(100).unwrap();
853 w.truncate().unwrap();
854 w.set_clock_high_water(200).unwrap();
855 w.truncate().unwrap();
856 drop(w);
857
858 let w2 = Wal::open(&p).unwrap();
859 assert_eq!(w2.clock_high_water(), 200);
860 let _ = std::fs::remove_file(&p);
861 }
862
863 /// Setter rejects a value below the current high-water mark with
864 /// a typed error. Same value is accepted as a no-op (test below).
865 /// Same-or-greater is the contract every consumer relies on; a
866 /// silent saturate-to-current would mask a real bug in the caller.
867 #[test]
868 fn set_clock_high_water_rejects_regressions() {
869 let p = tmp_wal("clock_no_regress");
870 let mut w = Wal::create(&p).unwrap();
871 w.set_clock_high_water(500).unwrap();
872 let err = w.set_clock_high_water(499).unwrap_err();
873 let msg = format!("{err}");
874 assert!(
875 msg.contains("backwards") && msg.contains("499") && msg.contains("500"),
876 "expected typed regression error, got: {msg}"
877 );
878 // Idempotent same-value update is fine.
879 w.set_clock_high_water(500).unwrap();
880 assert_eq!(w.clock_high_water(), 500);
881 let _ = std::fs::remove_file(&p);
882 }
883
884 /// Synthetic v1 WAL — exact byte layout the previous engine
885 /// version wrote. Opening it must succeed and report
886 /// `clock_high_water == 0` (the bytes were reserved-zero in v1).
887 /// This is the "graceful upgrade" contract.
888 #[test]
889 fn v1_wal_opens_with_zero_clock() {
890 let p = tmp_wal("v1_compat");
891 // Hand-build a v1 WAL header. Frame body is intentionally
892 // omitted — we're testing header parsing, not frame replay.
893 let mut buf = vec![0u8; WAL_HEADER_SIZE];
894 buf[0..8].copy_from_slice(WAL_MAGIC);
895 buf[8..12].copy_from_slice(&1u32.to_le_bytes()); // version 1
896 buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
897 buf[16..20].copy_from_slice(&0xdead_beef_u32.to_le_bytes()); // salt
898 buf[20..24].copy_from_slice(&7u32.to_le_bytes()); // checkpoint_seq
899 // bytes 24..32 left as zero — v1's reserved bytes.
900 std::fs::write(&p, &buf).unwrap();
901
902 let w = Wal::open(&p).unwrap();
903 assert_eq!(w.header().salt, 0xdead_beef);
904 assert_eq!(w.header().checkpoint_seq, 7);
905 assert_eq!(
906 w.clock_high_water(),
907 0,
908 "v1 reserved bytes must read as clock=0"
909 );
910 let _ = std::fs::remove_file(&p);
911 }
912
913 /// Forward-versions we don't understand must error cleanly rather
914 /// than silently misinterpreting bytes. Picks a version far above
915 /// our `WAL_FORMAT_VERSION` so the test stays valid even after
916 /// future bumps.
917 #[test]
918 fn unknown_future_version_is_rejected() {
919 let p = tmp_wal("unknown_version");
920 let mut buf = vec![0u8; WAL_HEADER_SIZE];
921 buf[0..8].copy_from_slice(WAL_MAGIC);
922 buf[8..12].copy_from_slice(&999u32.to_le_bytes());
923 buf[12..16].copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
924 std::fs::write(&p, &buf).unwrap();
925 let err = Wal::open(&p).unwrap_err();
926 let msg = format!("{err}");
927 assert!(
928 msg.contains("unsupported WAL format version") && msg.contains("999"),
929 "unexpected error shape: {msg}"
930 );
931 let _ = std::fs::remove_file(&p);
932 }
933
934 #[test]
935 fn partial_trailing_frame_is_ignored() {
936 // Write one valid frame, then append a half-frame's worth of
937 // random bytes. The reader should stop cleanly at the valid
938 // frame.
939 let p = tmp_wal("partial");
940 let mut w = Wal::create(&p).unwrap();
941 w.append_frame(42, &page(42), Some(1)).unwrap();
942 drop(w);
943 {
944 let mut f = OpenOptions::new().write(true).open(&p).unwrap();
945 f.seek(SeekFrom::End(0)).unwrap();
946 f.write_all(&[0xaa; 2000]).unwrap();
947 }
948 let mut w2 = Wal::open(&p).unwrap();
949 assert_eq!(
950 w2.read_page(42).unwrap().unwrap().as_ref(),
951 page(42).as_ref()
952 );
953 assert_eq!(w2.frame_count(), 1);
954 let _ = std::fs::remove_file(&p);
955 }
956}