Skip to main content

quiver_core/
wal.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The write-ahead log: the durability anchor of the storage engine.
3//!
4//! Every mutation is appended to the WAL and `fsync`'d before it is acknowledged
5//! (ADR-0005), so an acknowledged write survives `kill -9` and power loss. The
6//! log is a sequence of length-prefixed, CRC32C-framed records, each carrying a
7//! monotonic [`Lsn`]. A torn trailing record — the signature of a crash mid
8//! append — fails its length or CRC check and is discarded on recovery; it was
9//! never acknowledged.
10//!
11//! Recovery uses *point-in-time* semantics: it stops at the first invalid frame
12//! and treats everything after it as never-committed. Because the log is append
13//! only and each record is `fsync`'d before acknowledgement, the only place an
14//! invalid frame can legitimately appear is the tail.
15//!
16//! Each record's bytes pass through a [`PageCodec`] before framing, so when
17//! encryption-at-rest is enabled the AEAD codec seals every record and the log
18//! holds no plaintext user data; under the plaintext codec the bytes are written
19//! verbatim. The frame CRC is computed over the on-disk (sealed) bytes, so a
20//! torn or bit-rotted tail is still detected without a key, and the AEAD tag
21//! additionally authenticates each record (a wrong key or tampering on an
22//! otherwise-intact frame is a hard error, never silently dropped).
23//!
24//! File layout (little-endian):
25//!
26//! ```text
27//! 0  magic:u32  4  format_ver:u16  6  _pad:u16  8  base_lsn:u64   (16-byte header)
28//! 16 frame[0] | frame[1] | ...
29//!
30//! frame: len:u32 | crc32c:u32 | record: codec.seal_record(postcard(WalEntry)) [len bytes]
31//! ```
32
33use std::fs::{File, OpenOptions};
34use std::io::{BufReader, Read, Write};
35use std::path::{Path, PathBuf};
36
37use serde::{Deserialize, Serialize};
38
39use crate::error::{CoreError, Result};
40use crate::ids::{CollectionId, Lsn};
41use crate::page::PageCodec;
42
43/// Magic identifying a WAL segment file (`b"QVWL"`, little-endian).
44pub const WAL_MAGIC: u32 = u32::from_le_bytes(*b"QVWL");
45/// Current WAL format version.
46pub const WAL_FORMAT_VERSION: u16 = 1;
47
48const WAL_FILE_HEADER_SIZE: usize = 16;
49const FRAME_PREFIX_SIZE: usize = 8; // len:u32 + crc32c:u32
50/// Hard cap on a single record's encoded size, so a corrupt length field cannot
51/// trigger a huge allocation during recovery.
52pub const MAX_RECORD_BYTES: u32 = 64 * 1024 * 1024;
53
54/// A single logical mutation recorded in the WAL.
55///
56/// The vector and payload are stored as opaque bytes: the engine validates and
57/// interprets them (the descriptor fixes the vector dtype/dim; payloads are
58/// validated JSON), keeping the log a dumb, stable durability primitive.
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60pub enum WalOp {
61    /// Create a collection with the given id, name, and postcard-encoded
62    /// descriptor.
63    CreateCollection {
64        /// Identifier assigned to the new collection.
65        collection_id: CollectionId,
66        /// Human-readable collection name, unique within the store.
67        name: String,
68        /// Postcard-encoded collection descriptor (schema, dim, dtype, metric).
69        descriptor: Vec<u8>,
70    },
71    /// Drop a collection and all of its data.
72    DropCollection {
73        /// Identifier of the collection to drop.
74        collection_id: CollectionId,
75    },
76    /// Insert or replace a point.
77    Upsert {
78        /// Owning collection.
79        collection_id: CollectionId,
80        /// Caller-supplied external identifier.
81        external_id: String,
82        /// Raw little-endian vector element bytes (dtype per the descriptor).
83        vector: Vec<u8>,
84        /// Opaque, pre-validated payload bytes (UTF-8 JSON in Phase 1).
85        payload: Vec<u8>,
86    },
87    /// Delete a point by external id.
88    Delete {
89        /// Owning collection.
90        collection_id: CollectionId,
91        /// External identifier to delete.
92        external_id: String,
93    },
94    /// Record that state up to `last_checkpointed_lsn` is durable in segments
95    /// referenced by manifest version `manifest_version`.
96    Checkpoint {
97        /// Highest LSN now captured in sealed segments.
98        last_checkpointed_lsn: Lsn,
99        /// Manifest version that references those segments.
100        manifest_version: u64,
101    },
102}
103
104/// A WAL record: a monotonic LSN paired with the operation it commits.
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub struct WalEntry {
107    /// Monotonic log sequence number identifying this record.
108    pub lsn: Lsn,
109    /// The mutation committed at `lsn`.
110    pub op: WalOp,
111}
112
113/// Appends records to a WAL segment and controls the `fsync` durability policy.
114///
115/// LSNs are assigned by the caller (the engine owns the global counter); the
116/// writer only frames, appends, and syncs.
117#[derive(Debug)]
118pub struct WalWriter {
119    file: File,
120    path: PathBuf,
121    unsynced: bool,
122}
123
124impl WalWriter {
125    /// Create a new WAL segment file and write its header. Fails if the file
126    /// already exists.
127    pub fn create(path: &Path, base_lsn: Lsn) -> Result<Self> {
128        let mut file = OpenOptions::new()
129            .create_new(true)
130            .read(true)
131            .write(true)
132            .open(path)
133            .map_err(|e| CoreError::io(path, e))?;
134        let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
135        hdr[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
136        hdr[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
137        // Bytes 6..8 are reserved padding (zero).
138        hdr[8..16].copy_from_slice(&base_lsn.value().to_le_bytes());
139        file.write_all(&hdr).map_err(|e| CoreError::io(path, e))?;
140        file.sync_data().map_err(|e| CoreError::io(path, e))?;
141        Ok(Self {
142            file,
143            path: path.to_path_buf(),
144            unsynced: false,
145        })
146    }
147
148    /// Open an existing WAL segment for appending, validating its header. New
149    /// records are written at the end of the file.
150    pub fn open_append(path: &Path) -> Result<Self> {
151        let mut file = OpenOptions::new()
152            .read(true)
153            .append(true)
154            .open(path)
155            .map_err(|e| CoreError::io(path, e))?;
156        let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
157        file.read_exact(&mut hdr)
158            .map_err(|e| CoreError::io(path, e))?;
159        let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
160        if magic != WAL_MAGIC {
161            return Err(CoreError::BadMagic {
162                expected: WAL_MAGIC,
163                found: magic,
164            });
165        }
166        let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
167        if ver != WAL_FORMAT_VERSION {
168            return Err(CoreError::UnsupportedVersion {
169                found: ver,
170                supported: WAL_FORMAT_VERSION,
171            });
172        }
173        Ok(Self {
174            file,
175            path: path.to_path_buf(),
176            unsynced: false,
177        })
178    }
179
180    /// Frame and append a record, sealing its bytes with `codec` first (so an
181    /// encrypting codec leaves no plaintext in the log). Does not `fsync`; the
182    /// record is durable only after a subsequent [`WalWriter::sync`].
183    pub fn append(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
184        let plaintext = postcard::to_allocvec(entry)?;
185        let sealed = codec.seal_record(&plaintext)?;
186        let len = u32::try_from(sealed.len())
187            .map_err(|_| CoreError::TooLarge(format!("wal record {} bytes", sealed.len())))?;
188        if len > MAX_RECORD_BYTES {
189            return Err(CoreError::TooLarge(format!(
190                "wal record {len} bytes exceeds cap {MAX_RECORD_BYTES}"
191            )));
192        }
193        // The CRC covers the on-disk (sealed) bytes, so a torn or bit-rotted
194        // tail is detected on recovery without needing the key.
195        let crc = crc32c::crc32c(&sealed);
196        let mut frame = Vec::with_capacity(FRAME_PREFIX_SIZE + sealed.len());
197        frame.extend_from_slice(&len.to_le_bytes());
198        frame.extend_from_slice(&crc.to_le_bytes());
199        frame.extend_from_slice(&sealed);
200        self.file
201            .write_all(&frame)
202            .map_err(|e| CoreError::io(&self.path, e))?;
203        self.unsynced = true;
204        Ok(())
205    }
206
207    /// Flush and `fsync` the segment, making every appended record durable.
208    pub fn sync(&mut self) -> Result<()> {
209        if self.unsynced {
210            self.file
211                .sync_data()
212                .map_err(|e| CoreError::io(&self.path, e))?;
213            self.unsynced = false;
214        }
215        Ok(())
216    }
217
218    /// Append a record and immediately `fsync` — strict per-commit durability.
219    pub fn append_sync(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
220        self.append(codec, entry)?;
221        self.sync()
222    }
223}
224
225/// The result of replaying a WAL segment to its end.
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct WalReplay {
228    /// The intact records, in log order.
229    pub entries: Vec<WalEntry>,
230    /// Byte offset of the torn trailing record, if the log ended on one rather
231    /// than cleanly at a frame boundary.
232    pub torn_at: Option<u64>,
233    /// The segment's base LSN, from its header.
234    pub base_lsn: Lsn,
235}
236
237impl WalReplay {
238    /// The highest LSN among the recovered records, if any.
239    #[must_use]
240    pub fn max_lsn(&self) -> Option<Lsn> {
241        self.entries.iter().map(|e| e.lsn).max()
242    }
243}
244
245enum ReadOutcome {
246    Full,
247    Partial,
248    Eof,
249}
250
251fn read_full<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<ReadOutcome> {
252    let mut filled = 0;
253    while filled < buf.len() {
254        match reader.read(&mut buf[filled..]) {
255            Ok(0) => {
256                return Ok(if filled == 0 {
257                    ReadOutcome::Eof
258                } else {
259                    ReadOutcome::Partial
260                });
261            }
262            Ok(n) => filled += n,
263            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
264            Err(e) => return Err(CoreError::BareIo(e)),
265        }
266    }
267    Ok(ReadOutcome::Full)
268}
269
270/// Read every intact record from a WAL segment, stopping cleanly at a torn
271/// trailing record. Each record is opened with `codec` (the identity transform
272/// under the plaintext codec; authenticated decryption under the AEAD codec).
273/// A segment too short to even hold its 16-byte file header is itself a torn tail
274/// (a `kill -9` during WAL rotation, before the new segment's header became
275/// durable) and is reported as an empty replay torn at offset 0 — never an error.
276/// Errors on a header with a wrong magic or an unsupported version, an underlying
277/// I/O failure, or a frame that is intact on disk yet fails authentication (a
278/// wrong key or tampering) — a torn tail is a normal, expected outcome reported
279/// via [`WalReplay::torn_at`].
280pub fn read_all(path: &Path, codec: &dyn PageCodec) -> Result<WalReplay> {
281    let file = File::open(path).map_err(|e| CoreError::io(path, e))?;
282    let file_len = file.metadata().map_err(|e| CoreError::io(path, e))?.len();
283    let mut reader = BufReader::new(file);
284
285    let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
286    match read_full(&mut reader, &mut hdr)? {
287        ReadOutcome::Full => {}
288        _ => {
289            // A segment whose 16-byte header never became durable holds no
290            // records: `create` fsyncs the header before returning the writer, so
291            // a record can only be appended (and acknowledged) after a durable
292            // header. A `kill -9` during WAL rotation can leave such a sub-header
293            // segment beside the prior, fully-durable one; recovery must treat it
294            // as an empty torn tail — exactly like a torn frame — and never as a
295            // hard error, or the crash gate (ADR-0005) flakes. `base_lsn` is
296            // unknown here and unused by the recovery path.
297            return Ok(WalReplay {
298                entries: Vec::new(),
299                torn_at: Some(0),
300                base_lsn: Lsn(0),
301            });
302        }
303    }
304    let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
305    if magic != WAL_MAGIC {
306        return Err(CoreError::BadMagic {
307            expected: WAL_MAGIC,
308            found: magic,
309        });
310    }
311    let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
312    if ver != WAL_FORMAT_VERSION {
313        return Err(CoreError::UnsupportedVersion {
314            found: ver,
315            supported: WAL_FORMAT_VERSION,
316        });
317    }
318    let base_lsn = Lsn(u64::from_le_bytes([
319        hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15],
320    ]));
321
322    let mut entries = Vec::new();
323    let mut offset = WAL_FILE_HEADER_SIZE as u64;
324    let mut torn_at = None;
325    loop {
326        let mut prefix = [0u8; FRAME_PREFIX_SIZE];
327        match read_full(&mut reader, &mut prefix)? {
328            ReadOutcome::Eof => break, // clean end of log
329            ReadOutcome::Partial => {
330                torn_at = Some(offset);
331                break;
332            }
333            ReadOutcome::Full => {}
334        }
335        let len = u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]);
336        let crc = u32::from_le_bytes([prefix[4], prefix[5], prefix[6], prefix[7]]);
337        let frame_end = offset
338            .checked_add(FRAME_PREFIX_SIZE as u64)
339            .and_then(|o| o.checked_add(u64::from(len)));
340        match frame_end {
341            Some(end) if len != 0 && len <= MAX_RECORD_BYTES && end <= file_len => {}
342            _ => {
343                // Implausible length or a record that runs past EOF: a torn tail.
344                torn_at = Some(offset);
345                break;
346            }
347        }
348        let mut buf = vec![0u8; len as usize];
349        match read_full(&mut reader, &mut buf)? {
350            ReadOutcome::Full => {}
351            _ => {
352                torn_at = Some(offset);
353                break;
354            }
355        }
356        if crc32c::crc32c(&buf) != crc {
357            torn_at = Some(offset);
358            break;
359        }
360        // The frame is intact on disk (the CRC covers the sealed bytes). Open it:
361        // under the plaintext codec this is the identity, and under the AEAD codec
362        // a failure means a wrong key or tampering on an otherwise-complete,
363        // acknowledged record — a hard error, not a recoverable torn tail.
364        let plaintext = codec.open_record(&buf)?;
365        match postcard::from_bytes::<WalEntry>(&plaintext) {
366            Ok(entry) => {
367                entries.push(entry);
368                offset += FRAME_PREFIX_SIZE as u64 + u64::from(len);
369            }
370            Err(_) => {
371                // Authenticated bytes that nonetheless do not decode: torn.
372                torn_at = Some(offset);
373                break;
374            }
375        }
376    }
377    Ok(WalReplay {
378        entries,
379        torn_at,
380        base_lsn,
381    })
382}
383
384#[cfg(test)]
385mod tests {
386    // `super::*` also re-exports the parent module's imports (`OpenOptions`,
387    // `Write`, `Path`, `CoreError`, `CollectionId`, `Lsn`, `PageCodec`), so they
388    // need no separate `use` here. The concrete plaintext codec is a sibling type
389    // the parent does not import, so bring it in explicitly.
390    use super::*;
391    use crate::page::PlainCodec;
392    use proptest::prelude::*;
393
394    fn sample_ops() -> Vec<WalOp> {
395        vec![
396            WalOp::CreateCollection {
397                collection_id: CollectionId(1),
398                name: "alpha".into(),
399                descriptor: vec![1, 2, 3, 4],
400            },
401            WalOp::Upsert {
402                collection_id: CollectionId(1),
403                external_id: "alpha".into(),
404                vector: vec![0u8; 32],
405                payload: br#"{"k":"v"}"#.to_vec(),
406            },
407            WalOp::Delete {
408                collection_id: CollectionId(1),
409                external_id: "alpha".into(),
410            },
411            WalOp::Checkpoint {
412                last_checkpointed_lsn: Lsn(2),
413                manifest_version: 5,
414            },
415            WalOp::DropCollection {
416                collection_id: CollectionId(1),
417            },
418        ]
419    }
420
421    fn entries_from(ops: &[WalOp]) -> Vec<WalEntry> {
422        ops.iter()
423            .enumerate()
424            .map(|(i, op)| WalEntry {
425                lsn: Lsn(i as u64 + 1),
426                op: op.clone(),
427            })
428            .collect()
429    }
430
431    fn write_log(path: &Path, entries: &[WalEntry]) {
432        let mut w = WalWriter::create(path, Lsn(1)).unwrap();
433        for e in entries {
434            w.append(&PlainCodec, e).unwrap();
435        }
436        w.sync().unwrap();
437    }
438
439    #[test]
440    fn roundtrips_every_op() {
441        let dir = tempfile::tempdir().unwrap();
442        let path = dir.path().join("wal-1.log");
443        let entries = entries_from(&sample_ops());
444        write_log(&path, &entries);
445
446        let replay = read_all(&path, &PlainCodec).unwrap();
447        assert_eq!(replay.entries, entries);
448        assert_eq!(replay.torn_at, None);
449        assert_eq!(replay.base_lsn, Lsn(1));
450        assert_eq!(replay.max_lsn(), Some(Lsn(entries.len() as u64)));
451    }
452
453    #[test]
454    fn empty_log_replays_to_nothing() {
455        let dir = tempfile::tempdir().unwrap();
456        let path = dir.path().join("wal-1.log");
457        let _w = WalWriter::create(&path, Lsn(10)).unwrap();
458        let replay = read_all(&path, &PlainCodec).unwrap();
459        assert!(replay.entries.is_empty());
460        assert_eq!(replay.torn_at, None);
461        assert_eq!(replay.base_lsn, Lsn(10));
462        assert_eq!(replay.max_lsn(), None);
463    }
464
465    #[test]
466    fn reopen_and_append_continues_the_log() {
467        let dir = tempfile::tempdir().unwrap();
468        let path = dir.path().join("wal-1.log");
469        let entries = entries_from(&sample_ops());
470        {
471            let mut w = WalWriter::create(&path, Lsn(1)).unwrap();
472            w.append_sync(&PlainCodec, &entries[0]).unwrap();
473            w.append_sync(&PlainCodec, &entries[1]).unwrap();
474        }
475        {
476            let mut w = WalWriter::open_append(&path).unwrap();
477            for e in &entries[2..] {
478                w.append(&PlainCodec, e).unwrap();
479            }
480            w.sync().unwrap();
481        }
482        let replay = read_all(&path, &PlainCodec).unwrap();
483        assert_eq!(replay.entries, entries);
484        assert_eq!(replay.torn_at, None);
485    }
486
487    #[test]
488    fn torn_prefix_at_tail_is_dropped() {
489        let dir = tempfile::tempdir().unwrap();
490        let path = dir.path().join("wal-1.log");
491        let entries = entries_from(&sample_ops());
492        write_log(&path, &entries);
493        let clean_len = std::fs::metadata(&path).unwrap().len();
494        // Append a partial 8-byte frame prefix (only 3 bytes of it).
495        {
496            let mut f = OpenOptions::new().append(true).open(&path).unwrap();
497            f.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
498            f.sync_data().unwrap();
499        }
500        let replay = read_all(&path, &PlainCodec).unwrap();
501        assert_eq!(replay.entries, entries);
502        assert_eq!(replay.torn_at, Some(clean_len));
503    }
504
505    #[test]
506    fn torn_payload_at_tail_is_dropped() {
507        let dir = tempfile::tempdir().unwrap();
508        let path = dir.path().join("wal-1.log");
509        let entries = entries_from(&sample_ops());
510        write_log(&path, &entries);
511        let clean_len = std::fs::metadata(&path).unwrap().len();
512        // Append a frame claiming 100 payload bytes but supply only a few.
513        {
514            let mut f = OpenOptions::new().append(true).open(&path).unwrap();
515            f.write_all(&100u32.to_le_bytes()).unwrap();
516            f.write_all(&0u32.to_le_bytes()).unwrap();
517            f.write_all(&[1, 2, 3]).unwrap();
518            f.sync_data().unwrap();
519        }
520        let replay = read_all(&path, &PlainCodec).unwrap();
521        assert_eq!(replay.entries, entries);
522        assert_eq!(replay.torn_at, Some(clean_len));
523    }
524
525    #[test]
526    fn corruption_stops_recovery_point_in_time() {
527        let dir = tempfile::tempdir().unwrap();
528        let path = dir.path().join("wal-1.log");
529        let entries = entries_from(&sample_ops());
530        write_log(&path, &entries);
531
532        // Corrupt a byte inside the *second* record's payload region. The first
533        // record must still be recovered; the second and everything after it is
534        // treated as a torn tail (point-in-time recovery).
535        let len0 = postcard::to_allocvec(&entries[0]).unwrap().len() as u64;
536        let second_frame_offset = WAL_FILE_HEADER_SIZE as u64 + FRAME_PREFIX_SIZE as u64 + len0;
537        let corrupt_at = second_frame_offset + FRAME_PREFIX_SIZE as u64 + 1;
538
539        let mut bytes = std::fs::read(&path).unwrap();
540        bytes[corrupt_at as usize] ^= 0xFF;
541        std::fs::write(&path, &bytes).unwrap();
542
543        let replay = read_all(&path, &PlainCodec).unwrap();
544        assert_eq!(replay.entries, vec![entries[0].clone()]);
545        assert_eq!(replay.torn_at, Some(second_frame_offset));
546    }
547
548    #[test]
549    fn foreign_file_is_rejected_by_magic() {
550        let dir = tempfile::tempdir().unwrap();
551        let path = dir.path().join("wal-1.log");
552        std::fs::write(&path, vec![0u8; WAL_FILE_HEADER_SIZE + 4]).unwrap();
553        assert!(matches!(
554            read_all(&path, &PlainCodec),
555            Err(CoreError::BadMagic { .. })
556        ));
557    }
558
559    #[test]
560    fn sub_header_segment_is_an_empty_torn_tail() {
561        // A `kill -9` during WAL rotation can leave a freshly-created segment with
562        // fewer than its 16-byte header durable, beside the prior segment that
563        // holds the acknowledged writes. Recovery must treat it as an empty torn
564        // tail — never a hard error (the crash gate, ADR-0005) — or `Store::open`
565        // fails and a clean recovery is lost (the source of the crash-gate flake).
566        for len in [0usize, 1, WAL_FILE_HEADER_SIZE - 1] {
567            let dir = tempfile::tempdir().unwrap();
568            let path = dir.path().join("wal-1.log");
569            std::fs::write(&path, vec![0u8; len]).unwrap();
570            let replay = read_all(&path, &PlainCodec).unwrap();
571            assert!(replay.entries.is_empty(), "len {len}: expected no entries");
572            assert_eq!(replay.torn_at, Some(0), "len {len}: expected torn at 0");
573        }
574    }
575
576    proptest! {
577        #[test]
578        fn entries_roundtrip(seeds in proptest::collection::vec(0u8..5, 0..40)) {
579            let ops = sample_ops();
580            let entries: Vec<WalEntry> = seeds
581                .iter()
582                .enumerate()
583                .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
584                .collect();
585            let dir = tempfile::tempdir().unwrap();
586            let path = dir.path().join("wal.log");
587            write_log(&path, &entries);
588            let replay = read_all(&path, &PlainCodec).unwrap();
589            prop_assert_eq!(replay.entries, entries);
590            prop_assert_eq!(replay.torn_at, None);
591        }
592
593        #[test]
594        fn truncation_yields_a_clean_prefix(
595            seeds in proptest::collection::vec(0u8..5, 1..20),
596            cut_num in 0u64..1000,
597        ) {
598            let ops = sample_ops();
599            let entries: Vec<WalEntry> = seeds
600                .iter()
601                .enumerate()
602                .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
603                .collect();
604            let dir = tempfile::tempdir().unwrap();
605            let path = dir.path().join("wal.log");
606            write_log(&path, &entries);
607
608            let full = std::fs::metadata(&path).unwrap().len();
609            // Compute the byte boundary at the end of each record's frame.
610            let mut frame_ends = Vec::new();
611            let mut off = WAL_FILE_HEADER_SIZE as u64;
612            for e in &entries {
613                off += FRAME_PREFIX_SIZE as u64 + postcard::to_allocvec(e).unwrap().len() as u64;
614                frame_ends.push(off);
615            }
616            // Truncate somewhere in [header, full].
617            let cut = WAL_FILE_HEADER_SIZE as u64
618                + (cut_num % (full - WAL_FILE_HEADER_SIZE as u64 + 1));
619            let f = OpenOptions::new().write(true).open(&path).unwrap();
620            f.set_len(cut).unwrap();
621            drop(f);
622
623            let replay = read_all(&path, &PlainCodec).unwrap();
624            let survivors = frame_ends.iter().filter(|&&end| end <= cut).count();
625            prop_assert_eq!(replay.entries.as_slice(), &entries[..survivors]);
626            // A clean cut at a frame boundary has no torn tail; otherwise it does.
627            let clean = cut == WAL_FILE_HEADER_SIZE as u64 || frame_ends.contains(&cut);
628            prop_assert_eq!(replay.torn_at.is_none(), clean);
629        }
630    }
631}