Skip to main content

quiver_core/
wal.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The write-ahead log: the durability anchor of the storage engine.
3//!
4//! Every mutation is appended to the WAL and `fsync`'d before it is acknowledged
5//! (ADR-0005), so an acknowledged write survives `kill -9` and power loss. The
6//! log is a sequence of length-prefixed, CRC32C-framed records, each carrying a
7//! monotonic [`Lsn`]. A torn trailing record — the signature of a crash mid
8//! append — fails its length or CRC check and is discarded on recovery; it was
9//! never acknowledged.
10//!
11//! Recovery uses *point-in-time* semantics: it stops at the first invalid frame
12//! and treats everything after it as never-committed. Because the log is append
13//! only and each record is `fsync`'d before acknowledgement, the only place an
14//! invalid frame can legitimately appear is the tail.
15//!
16//! Each record's bytes pass through a [`PageCodec`] before framing, so when
17//! encryption-at-rest is enabled the AEAD codec seals every record and the log
18//! holds no plaintext user data; under the plaintext codec the bytes are written
19//! verbatim. The frame CRC is computed over the on-disk (sealed) bytes, so a
20//! torn or bit-rotted tail is still detected without a key, and the AEAD tag
21//! additionally authenticates each record (a wrong key or tampering on an
22//! otherwise-intact frame is a hard error, never silently dropped).
23//!
24//! File layout (little-endian):
25//!
26//! ```text
27//! 0  magic:u32  4  format_ver:u16  6  _pad:u16  8  base_lsn:u64   (16-byte header)
28//! 16 frame[0] | frame[1] | ...
29//!
30//! frame: len:u32 | crc32c:u32 | record: codec.seal_record(postcard(WalEntry)) [len bytes]
31//! ```
32
33use std::fs::{File, OpenOptions};
34use std::io::{BufReader, Read, Write};
35use std::path::{Path, PathBuf};
36
37use serde::{Deserialize, Serialize};
38
39use crate::error::{CoreError, Result};
40use crate::ids::{CollectionId, Lsn};
41use crate::page::PageCodec;
42
43/// Magic identifying a WAL segment file (`b"QVWL"`, little-endian).
44pub const WAL_MAGIC: u32 = u32::from_le_bytes(*b"QVWL");
45/// Current WAL format version.
46pub const WAL_FORMAT_VERSION: u16 = 1;
47
48const WAL_FILE_HEADER_SIZE: usize = 16;
49const FRAME_PREFIX_SIZE: usize = 8; // len:u32 + crc32c:u32
50/// Hard cap on a single record's encoded size, so a corrupt length field cannot
51/// trigger a huge allocation during recovery.
52pub const MAX_RECORD_BYTES: u32 = 64 * 1024 * 1024;
53
54/// A single logical mutation recorded in the WAL.
55///
56/// The vector and payload are stored as opaque bytes: the engine validates and
57/// interprets them (the descriptor fixes the vector dtype/dim; payloads are
58/// validated JSON), keeping the log a dumb, stable durability primitive.
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60pub enum WalOp {
61    /// Create a collection with the given id, name, and postcard-encoded
62    /// descriptor.
63    CreateCollection {
64        /// Identifier assigned to the new collection.
65        collection_id: CollectionId,
66        /// Human-readable collection name, unique within the store.
67        name: String,
68        /// Postcard-encoded collection descriptor (schema, dim, dtype, metric).
69        descriptor: Vec<u8>,
70    },
71    /// Drop a collection and all of its data.
72    DropCollection {
73        /// Identifier of the collection to drop.
74        collection_id: CollectionId,
75    },
76    /// Insert or replace a point.
77    Upsert {
78        /// Owning collection.
79        collection_id: CollectionId,
80        /// Caller-supplied external identifier.
81        external_id: String,
82        /// Raw little-endian vector element bytes (dtype per the descriptor).
83        vector: Vec<u8>,
84        /// Opaque, pre-validated payload bytes (UTF-8 JSON in Phase 1).
85        payload: Vec<u8>,
86    },
87    /// Delete a point by external id.
88    Delete {
89        /// Owning collection.
90        collection_id: CollectionId,
91        /// External identifier to delete.
92        external_id: String,
93    },
94    /// Record that state up to `last_checkpointed_lsn` is durable in segments
95    /// referenced by manifest version `manifest_version`.
96    Checkpoint {
97        /// Highest LSN now captured in sealed segments.
98        last_checkpointed_lsn: Lsn,
99        /// Manifest version that references those segments.
100        manifest_version: u64,
101    },
102}
103
104/// A WAL record: a monotonic LSN paired with the operation it commits.
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub struct WalEntry {
107    /// Monotonic log sequence number identifying this record.
108    pub lsn: Lsn,
109    /// The mutation committed at `lsn`.
110    pub op: WalOp,
111}
112
113/// Appends records to a WAL segment and controls the `fsync` durability policy.
114///
115/// LSNs are assigned by the caller (the engine owns the global counter); the
116/// writer only frames, appends, and syncs.
117#[derive(Debug)]
118pub struct WalWriter {
119    file: File,
120    path: PathBuf,
121    unsynced: bool,
122}
123
124impl WalWriter {
125    /// Create a new WAL segment file and write its header. Fails if the file
126    /// already exists.
127    pub fn create(path: &Path, base_lsn: Lsn) -> Result<Self> {
128        let mut file = OpenOptions::new()
129            .create_new(true)
130            .read(true)
131            .write(true)
132            .open(path)
133            .map_err(|e| CoreError::io(path, e))?;
134        let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
135        hdr[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
136        hdr[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
137        // Bytes 6..8 are reserved padding (zero).
138        hdr[8..16].copy_from_slice(&base_lsn.value().to_le_bytes());
139        file.write_all(&hdr).map_err(|e| CoreError::io(path, e))?;
140        file.sync_data().map_err(|e| CoreError::io(path, e))?;
141        Ok(Self {
142            file,
143            path: path.to_path_buf(),
144            unsynced: false,
145        })
146    }
147
148    /// Open an existing WAL segment for appending, validating its header. New
149    /// records are written at the end of the file.
150    pub fn open_append(path: &Path) -> Result<Self> {
151        let mut file = OpenOptions::new()
152            .read(true)
153            .append(true)
154            .open(path)
155            .map_err(|e| CoreError::io(path, e))?;
156        let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
157        file.read_exact(&mut hdr)
158            .map_err(|e| CoreError::io(path, e))?;
159        let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
160        if magic != WAL_MAGIC {
161            return Err(CoreError::BadMagic {
162                expected: WAL_MAGIC,
163                found: magic,
164            });
165        }
166        let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
167        if ver != WAL_FORMAT_VERSION {
168            return Err(CoreError::UnsupportedVersion {
169                found: ver,
170                supported: WAL_FORMAT_VERSION,
171            });
172        }
173        Ok(Self {
174            file,
175            path: path.to_path_buf(),
176            unsynced: false,
177        })
178    }
179
180    /// Frame and append a record, sealing its bytes with `codec` first (so an
181    /// encrypting codec leaves no plaintext in the log). Does not `fsync`; the
182    /// record is durable only after a subsequent [`WalWriter::sync`].
183    pub fn append(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
184        let plaintext = postcard::to_allocvec(entry)?;
185        let sealed = codec.seal_record(&plaintext)?;
186        let len = u32::try_from(sealed.len())
187            .map_err(|_| CoreError::TooLarge(format!("wal record {} bytes", sealed.len())))?;
188        if len > MAX_RECORD_BYTES {
189            return Err(CoreError::TooLarge(format!(
190                "wal record {len} bytes exceeds cap {MAX_RECORD_BYTES}"
191            )));
192        }
193        // The CRC covers the on-disk (sealed) bytes, so a torn or bit-rotted
194        // tail is detected on recovery without needing the key.
195        let crc = crc32c::crc32c(&sealed);
196        let mut frame = Vec::with_capacity(FRAME_PREFIX_SIZE + sealed.len());
197        frame.extend_from_slice(&len.to_le_bytes());
198        frame.extend_from_slice(&crc.to_le_bytes());
199        frame.extend_from_slice(&sealed);
200        self.file
201            .write_all(&frame)
202            .map_err(|e| CoreError::io(&self.path, e))?;
203        self.unsynced = true;
204        Ok(())
205    }
206
207    /// Flush and `fsync` the segment, making every appended record durable.
208    pub fn sync(&mut self) -> Result<()> {
209        if self.unsynced {
210            self.file
211                .sync_data()
212                .map_err(|e| CoreError::io(&self.path, e))?;
213            self.unsynced = false;
214        }
215        Ok(())
216    }
217
218    /// Append a record and immediately `fsync` — strict per-commit durability.
219    pub fn append_sync(&mut self, codec: &dyn PageCodec, entry: &WalEntry) -> Result<()> {
220        self.append(codec, entry)?;
221        self.sync()
222    }
223}
224
225/// The result of replaying a WAL segment to its end.
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct WalReplay {
228    /// The intact records, in log order.
229    pub entries: Vec<WalEntry>,
230    /// Byte offset of the torn trailing record, if the log ended on one rather
231    /// than cleanly at a frame boundary.
232    pub torn_at: Option<u64>,
233    /// The segment's base LSN, from its header.
234    pub base_lsn: Lsn,
235}
236
237impl WalReplay {
238    /// The highest LSN among the recovered records, if any.
239    #[must_use]
240    pub fn max_lsn(&self) -> Option<Lsn> {
241        self.entries.iter().map(|e| e.lsn).max()
242    }
243}
244
245enum ReadOutcome {
246    Full,
247    Partial,
248    Eof,
249}
250
251fn read_full<R: Read>(reader: &mut R, buf: &mut [u8]) -> Result<ReadOutcome> {
252    let mut filled = 0;
253    while filled < buf.len() {
254        match reader.read(&mut buf[filled..]) {
255            Ok(0) => {
256                return Ok(if filled == 0 {
257                    ReadOutcome::Eof
258                } else {
259                    ReadOutcome::Partial
260                });
261            }
262            Ok(n) => filled += n,
263            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
264            Err(e) => return Err(CoreError::BareIo(e)),
265        }
266    }
267    Ok(ReadOutcome::Full)
268}
269
270/// Read every intact record from a WAL segment, stopping cleanly at a torn
271/// trailing record. Each record is opened with `codec` (the identity transform
272/// under the plaintext codec; authenticated decryption under the AEAD codec).
273/// Errors on a structurally invalid header, an underlying I/O failure, or a
274/// frame that is intact on disk yet fails authentication (a wrong key or
275/// tampering) — a torn tail is a normal, expected outcome reported via
276/// [`WalReplay::torn_at`].
277pub fn read_all(path: &Path, codec: &dyn PageCodec) -> Result<WalReplay> {
278    let file = File::open(path).map_err(|e| CoreError::io(path, e))?;
279    let file_len = file.metadata().map_err(|e| CoreError::io(path, e))?.len();
280    let mut reader = BufReader::new(file);
281
282    let mut hdr = [0u8; WAL_FILE_HEADER_SIZE];
283    match read_full(&mut reader, &mut hdr)? {
284        ReadOutcome::Full => {}
285        _ => {
286            return Err(CoreError::MalformedPage(format!(
287                "wal {} is shorter than its header",
288                path.display()
289            )));
290        }
291    }
292    let magic = u32::from_le_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
293    if magic != WAL_MAGIC {
294        return Err(CoreError::BadMagic {
295            expected: WAL_MAGIC,
296            found: magic,
297        });
298    }
299    let ver = u16::from_le_bytes([hdr[4], hdr[5]]);
300    if ver != WAL_FORMAT_VERSION {
301        return Err(CoreError::UnsupportedVersion {
302            found: ver,
303            supported: WAL_FORMAT_VERSION,
304        });
305    }
306    let base_lsn = Lsn(u64::from_le_bytes([
307        hdr[8], hdr[9], hdr[10], hdr[11], hdr[12], hdr[13], hdr[14], hdr[15],
308    ]));
309
310    let mut entries = Vec::new();
311    let mut offset = WAL_FILE_HEADER_SIZE as u64;
312    let mut torn_at = None;
313    loop {
314        let mut prefix = [0u8; FRAME_PREFIX_SIZE];
315        match read_full(&mut reader, &mut prefix)? {
316            ReadOutcome::Eof => break, // clean end of log
317            ReadOutcome::Partial => {
318                torn_at = Some(offset);
319                break;
320            }
321            ReadOutcome::Full => {}
322        }
323        let len = u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]);
324        let crc = u32::from_le_bytes([prefix[4], prefix[5], prefix[6], prefix[7]]);
325        let frame_end = offset
326            .checked_add(FRAME_PREFIX_SIZE as u64)
327            .and_then(|o| o.checked_add(u64::from(len)));
328        match frame_end {
329            Some(end) if len != 0 && len <= MAX_RECORD_BYTES && end <= file_len => {}
330            _ => {
331                // Implausible length or a record that runs past EOF: a torn tail.
332                torn_at = Some(offset);
333                break;
334            }
335        }
336        let mut buf = vec![0u8; len as usize];
337        match read_full(&mut reader, &mut buf)? {
338            ReadOutcome::Full => {}
339            _ => {
340                torn_at = Some(offset);
341                break;
342            }
343        }
344        if crc32c::crc32c(&buf) != crc {
345            torn_at = Some(offset);
346            break;
347        }
348        // The frame is intact on disk (the CRC covers the sealed bytes). Open it:
349        // under the plaintext codec this is the identity, and under the AEAD codec
350        // a failure means a wrong key or tampering on an otherwise-complete,
351        // acknowledged record — a hard error, not a recoverable torn tail.
352        let plaintext = codec.open_record(&buf)?;
353        match postcard::from_bytes::<WalEntry>(&plaintext) {
354            Ok(entry) => {
355                entries.push(entry);
356                offset += FRAME_PREFIX_SIZE as u64 + u64::from(len);
357            }
358            Err(_) => {
359                // Authenticated bytes that nonetheless do not decode: torn.
360                torn_at = Some(offset);
361                break;
362            }
363        }
364    }
365    Ok(WalReplay {
366        entries,
367        torn_at,
368        base_lsn,
369    })
370}
371
372#[cfg(test)]
373mod tests {
374    // `super::*` also re-exports the parent module's imports (`OpenOptions`,
375    // `Write`, `Path`, `CoreError`, `CollectionId`, `Lsn`, `PageCodec`), so they
376    // need no separate `use` here. The concrete plaintext codec is a sibling type
377    // the parent does not import, so bring it in explicitly.
378    use super::*;
379    use crate::page::PlainCodec;
380    use proptest::prelude::*;
381
382    fn sample_ops() -> Vec<WalOp> {
383        vec![
384            WalOp::CreateCollection {
385                collection_id: CollectionId(1),
386                name: "alpha".into(),
387                descriptor: vec![1, 2, 3, 4],
388            },
389            WalOp::Upsert {
390                collection_id: CollectionId(1),
391                external_id: "alpha".into(),
392                vector: vec![0u8; 32],
393                payload: br#"{"k":"v"}"#.to_vec(),
394            },
395            WalOp::Delete {
396                collection_id: CollectionId(1),
397                external_id: "alpha".into(),
398            },
399            WalOp::Checkpoint {
400                last_checkpointed_lsn: Lsn(2),
401                manifest_version: 5,
402            },
403            WalOp::DropCollection {
404                collection_id: CollectionId(1),
405            },
406        ]
407    }
408
409    fn entries_from(ops: &[WalOp]) -> Vec<WalEntry> {
410        ops.iter()
411            .enumerate()
412            .map(|(i, op)| WalEntry {
413                lsn: Lsn(i as u64 + 1),
414                op: op.clone(),
415            })
416            .collect()
417    }
418
419    fn write_log(path: &Path, entries: &[WalEntry]) {
420        let mut w = WalWriter::create(path, Lsn(1)).unwrap();
421        for e in entries {
422            w.append(&PlainCodec, e).unwrap();
423        }
424        w.sync().unwrap();
425    }
426
427    #[test]
428    fn roundtrips_every_op() {
429        let dir = tempfile::tempdir().unwrap();
430        let path = dir.path().join("wal-1.log");
431        let entries = entries_from(&sample_ops());
432        write_log(&path, &entries);
433
434        let replay = read_all(&path, &PlainCodec).unwrap();
435        assert_eq!(replay.entries, entries);
436        assert_eq!(replay.torn_at, None);
437        assert_eq!(replay.base_lsn, Lsn(1));
438        assert_eq!(replay.max_lsn(), Some(Lsn(entries.len() as u64)));
439    }
440
441    #[test]
442    fn empty_log_replays_to_nothing() {
443        let dir = tempfile::tempdir().unwrap();
444        let path = dir.path().join("wal-1.log");
445        let _w = WalWriter::create(&path, Lsn(10)).unwrap();
446        let replay = read_all(&path, &PlainCodec).unwrap();
447        assert!(replay.entries.is_empty());
448        assert_eq!(replay.torn_at, None);
449        assert_eq!(replay.base_lsn, Lsn(10));
450        assert_eq!(replay.max_lsn(), None);
451    }
452
453    #[test]
454    fn reopen_and_append_continues_the_log() {
455        let dir = tempfile::tempdir().unwrap();
456        let path = dir.path().join("wal-1.log");
457        let entries = entries_from(&sample_ops());
458        {
459            let mut w = WalWriter::create(&path, Lsn(1)).unwrap();
460            w.append_sync(&PlainCodec, &entries[0]).unwrap();
461            w.append_sync(&PlainCodec, &entries[1]).unwrap();
462        }
463        {
464            let mut w = WalWriter::open_append(&path).unwrap();
465            for e in &entries[2..] {
466                w.append(&PlainCodec, e).unwrap();
467            }
468            w.sync().unwrap();
469        }
470        let replay = read_all(&path, &PlainCodec).unwrap();
471        assert_eq!(replay.entries, entries);
472        assert_eq!(replay.torn_at, None);
473    }
474
475    #[test]
476    fn torn_prefix_at_tail_is_dropped() {
477        let dir = tempfile::tempdir().unwrap();
478        let path = dir.path().join("wal-1.log");
479        let entries = entries_from(&sample_ops());
480        write_log(&path, &entries);
481        let clean_len = std::fs::metadata(&path).unwrap().len();
482        // Append a partial 8-byte frame prefix (only 3 bytes of it).
483        {
484            let mut f = OpenOptions::new().append(true).open(&path).unwrap();
485            f.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
486            f.sync_data().unwrap();
487        }
488        let replay = read_all(&path, &PlainCodec).unwrap();
489        assert_eq!(replay.entries, entries);
490        assert_eq!(replay.torn_at, Some(clean_len));
491    }
492
493    #[test]
494    fn torn_payload_at_tail_is_dropped() {
495        let dir = tempfile::tempdir().unwrap();
496        let path = dir.path().join("wal-1.log");
497        let entries = entries_from(&sample_ops());
498        write_log(&path, &entries);
499        let clean_len = std::fs::metadata(&path).unwrap().len();
500        // Append a frame claiming 100 payload bytes but supply only a few.
501        {
502            let mut f = OpenOptions::new().append(true).open(&path).unwrap();
503            f.write_all(&100u32.to_le_bytes()).unwrap();
504            f.write_all(&0u32.to_le_bytes()).unwrap();
505            f.write_all(&[1, 2, 3]).unwrap();
506            f.sync_data().unwrap();
507        }
508        let replay = read_all(&path, &PlainCodec).unwrap();
509        assert_eq!(replay.entries, entries);
510        assert_eq!(replay.torn_at, Some(clean_len));
511    }
512
513    #[test]
514    fn corruption_stops_recovery_point_in_time() {
515        let dir = tempfile::tempdir().unwrap();
516        let path = dir.path().join("wal-1.log");
517        let entries = entries_from(&sample_ops());
518        write_log(&path, &entries);
519
520        // Corrupt a byte inside the *second* record's payload region. The first
521        // record must still be recovered; the second and everything after it is
522        // treated as a torn tail (point-in-time recovery).
523        let len0 = postcard::to_allocvec(&entries[0]).unwrap().len() as u64;
524        let second_frame_offset = WAL_FILE_HEADER_SIZE as u64 + FRAME_PREFIX_SIZE as u64 + len0;
525        let corrupt_at = second_frame_offset + FRAME_PREFIX_SIZE as u64 + 1;
526
527        let mut bytes = std::fs::read(&path).unwrap();
528        bytes[corrupt_at as usize] ^= 0xFF;
529        std::fs::write(&path, &bytes).unwrap();
530
531        let replay = read_all(&path, &PlainCodec).unwrap();
532        assert_eq!(replay.entries, vec![entries[0].clone()]);
533        assert_eq!(replay.torn_at, Some(second_frame_offset));
534    }
535
536    #[test]
537    fn foreign_file_is_rejected_by_magic() {
538        let dir = tempfile::tempdir().unwrap();
539        let path = dir.path().join("wal-1.log");
540        std::fs::write(&path, vec![0u8; WAL_FILE_HEADER_SIZE + 4]).unwrap();
541        assert!(matches!(
542            read_all(&path, &PlainCodec),
543            Err(CoreError::BadMagic { .. })
544        ));
545    }
546
547    proptest! {
548        #[test]
549        fn entries_roundtrip(seeds in proptest::collection::vec(0u8..5, 0..40)) {
550            let ops = sample_ops();
551            let entries: Vec<WalEntry> = seeds
552                .iter()
553                .enumerate()
554                .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
555                .collect();
556            let dir = tempfile::tempdir().unwrap();
557            let path = dir.path().join("wal.log");
558            write_log(&path, &entries);
559            let replay = read_all(&path, &PlainCodec).unwrap();
560            prop_assert_eq!(replay.entries, entries);
561            prop_assert_eq!(replay.torn_at, None);
562        }
563
564        #[test]
565        fn truncation_yields_a_clean_prefix(
566            seeds in proptest::collection::vec(0u8..5, 1..20),
567            cut_num in 0u64..1000,
568        ) {
569            let ops = sample_ops();
570            let entries: Vec<WalEntry> = seeds
571                .iter()
572                .enumerate()
573                .map(|(i, &s)| WalEntry { lsn: Lsn(i as u64 + 1), op: ops[s as usize].clone() })
574                .collect();
575            let dir = tempfile::tempdir().unwrap();
576            let path = dir.path().join("wal.log");
577            write_log(&path, &entries);
578
579            let full = std::fs::metadata(&path).unwrap().len();
580            // Compute the byte boundary at the end of each record's frame.
581            let mut frame_ends = Vec::new();
582            let mut off = WAL_FILE_HEADER_SIZE as u64;
583            for e in &entries {
584                off += FRAME_PREFIX_SIZE as u64 + postcard::to_allocvec(e).unwrap().len() as u64;
585                frame_ends.push(off);
586            }
587            // Truncate somewhere in [header, full].
588            let cut = WAL_FILE_HEADER_SIZE as u64
589                + (cut_num % (full - WAL_FILE_HEADER_SIZE as u64 + 1));
590            let f = OpenOptions::new().write(true).open(&path).unwrap();
591            f.set_len(cut).unwrap();
592            drop(f);
593
594            let replay = read_all(&path, &PlainCodec).unwrap();
595            let survivors = frame_ends.iter().filter(|&&end| end <= cut).count();
596            prop_assert_eq!(replay.entries.as_slice(), &entries[..survivors]);
597            // A clean cut at a frame boundary has no torn tail; otherwise it does.
598            let clean = cut == WAL_FILE_HEADER_SIZE as u64 || frame_ends.contains(&cut);
599            prop_assert_eq!(replay.torn_at.is_none(), clean);
600        }
601    }
602}