Skip to main content

luci/storage/
wal.rs

1use std::fs::{File, OpenOptions};
2use std::io::{self, BufReader, BufWriter, Read, Write};
3use std::path::Path;
4
5use crate::core::{DocId, LuciError, Result};
6
7use crate::storage::header::xxh3_checksum;
8
9/// Size of a WAL record header in bytes.
10const RECORD_HEADER_SIZE: usize = 16;
11
12/// Durability mode controlling when WAL writes are fsynced to disk.
13///
14/// See [[architecture-storage-format#Durability Modes]].
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum DurabilityMode {
17    /// `fsync` after every `append()`. Every acknowledged write survives a
18    /// crash. Highest write latency.
19    Full,
20    /// `fsync` on explicit `sync()` only (typically called during commit).
21    /// Default mode. Uncommitted writes may be lost on crash, but committed
22    /// data is always durable.
23    Batch,
24    /// No explicit `fsync` — rely on OS writeback. Fastest. Suitable for
25    /// ephemeral or rebuildable indexes.
26    None,
27}
28
29/// A single WAL record: either a document put or delete.
30///
31/// See [[architecture-storage-format#Record Format]].
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub enum WalRecord {
34    /// Insert or replace a document.
35    Put {
36        doc_id: DocId,
37        /// Raw document bytes (typically JSON).
38        data: Vec<u8>,
39    },
40    /// Delete a document by ID.
41    Delete { doc_id: DocId },
42}
43
44// Record type tags.
45const TAG_PUT: u8 = 1;
46const TAG_DELETE: u8 = 2;
47
48/// Write-ahead log for buffering document operations between commits.
49///
50/// Appends records to a `.luci.wal` file. Records are self-delimiting with
51/// per-record xxHash checksums, allowing replay to detect and discard
52/// truncated or corrupted trailing records after a crash.
53///
54/// See [[architecture-storage-format#Write-Ahead Log (WAL)]].
55pub struct Wal {
56    writer: BufWriter<File>,
57    mode: DurabilityMode,
58}
59
60impl Wal {
61    /// Open (or create) a WAL file for appending.
62    ///
63    /// New records are appended to the end. If the file already contains
64    /// records from a previous session (crash recovery), they remain intact
65    /// until [`truncate`](Self::truncate) is called.
66    pub fn open(path: impl AsRef<Path>, mode: DurabilityMode) -> Result<Self> {
67        let file = OpenOptions::new()
68            .create(true)
69            .append(true)
70            .open(path.as_ref())?;
71
72        Ok(Self {
73            writer: BufWriter::new(file),
74            mode,
75        })
76    }
77
78    /// Append a record to the WAL.
79    ///
80    /// In `Full` durability mode, the write is fsynced before returning. In
81    /// `Batch` and `None` modes, the record is buffered and only fsynced on
82    /// an explicit call to [`sync`](Self::sync).
83    pub fn append(&mut self, record: &WalRecord) -> Result<()> {
84        let (tag, payload) = encode_payload(record);
85
86        let checksum = xxh3_checksum(&payload);
87
88        // Write header: tag (1) + reserved (3) + payload_length (4) + checksum (8).
89        let mut header = [0u8; RECORD_HEADER_SIZE];
90        header[0] = tag;
91        header[4..8].copy_from_slice(&(payload.len() as u32).to_le_bytes());
92        header[8..16].copy_from_slice(&checksum.to_le_bytes());
93
94        self.writer.write_all(&header)?;
95        self.writer.write_all(&payload)?;
96
97        if self.mode == DurabilityMode::Full {
98            self.writer.flush()?;
99            self.writer.get_ref().sync_all()?;
100        }
101
102        Ok(())
103    }
104
105    /// Flush the buffer and fsync the WAL file.
106    ///
107    /// Called by the commit path (in `Batch` mode) to ensure all appended
108    /// records are durable before proceeding with the atomic commit.
109    pub fn sync(&mut self) -> Result<()> {
110        self.writer.flush()?;
111        self.writer.get_ref().sync_all()?;
112        Ok(())
113    }
114
115    /// Truncate the WAL to zero bytes.
116    ///
117    /// Called after a successful commit to discard all buffered records (they
118    /// are now persisted in the main `.luci` file as a committed segment).
119    pub fn truncate(&mut self) -> Result<()> {
120        self.writer.flush()?;
121        self.writer.get_ref().set_len(0)?;
122        // In append mode, the next write goes to offset 0 (the new EOF).
123        Ok(())
124    }
125
126    /// The current durability mode.
127    pub fn mode(&self) -> DurabilityMode {
128        self.mode
129    }
130}
131
132/// Replay a WAL file, returning all valid records.
133///
134/// Reads records sequentially from the beginning of the file. Stops at the
135/// first truncated or checksum-invalid record — all preceding records are
136/// returned. This handles crash recovery: a crash mid-write leaves a partial
137/// record at the end that is safely discarded.
138///
139/// Returns an empty `Vec` if the file does not exist or is empty.
140///
141/// See [[architecture-storage-format#WAL Lifecycle]].
142pub fn replay_wal(path: impl AsRef<Path>) -> Result<Vec<WalRecord>> {
143    let file = match File::open(path.as_ref()) {
144        Ok(f) => f,
145        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
146        Err(e) => return Err(LuciError::Io(e)),
147    };
148
149    let mut reader = BufReader::new(file);
150    let mut records = Vec::new();
151
152    loop {
153        // Read record header.
154        let mut header = [0u8; RECORD_HEADER_SIZE];
155        match reader.read_exact(&mut header) {
156            Ok(()) => {}
157            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
158            Err(e) => return Err(LuciError::Io(e)),
159        }
160
161        let tag = header[0];
162        let payload_length = u32::from_le_bytes(header[4..8].try_into().unwrap()) as usize;
163        let stored_checksum = u64::from_le_bytes(header[8..16].try_into().unwrap());
164
165        // Read payload.
166        let mut payload = vec![0u8; payload_length];
167        match reader.read_exact(&mut payload) {
168            Ok(()) => {}
169            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
170            Err(e) => return Err(LuciError::Io(e)),
171        }
172
173        // Validate checksum.
174        if xxh3_checksum(&payload) != stored_checksum {
175            break;
176        }
177
178        // Decode record.
179        match decode_record(tag, &payload) {
180            Some(record) => records.push(record),
181            None => break,
182        }
183    }
184
185    Ok(records)
186}
187
188fn encode_payload(record: &WalRecord) -> (u8, Vec<u8>) {
189    match record {
190        WalRecord::Put { doc_id, data } => {
191            let mut payload = Vec::with_capacity(4 + data.len());
192            payload.extend_from_slice(&doc_id.as_u32().to_le_bytes());
193            payload.extend_from_slice(data);
194            (TAG_PUT, payload)
195        }
196        WalRecord::Delete { doc_id } => {
197            let payload = doc_id.as_u32().to_le_bytes().to_vec();
198            (TAG_DELETE, payload)
199        }
200    }
201}
202
203fn decode_record(tag: u8, payload: &[u8]) -> Option<WalRecord> {
204    match tag {
205        TAG_PUT => {
206            if payload.len() < 4 {
207                return None;
208            }
209            let doc_id = DocId::new(u32::from_le_bytes(payload[0..4].try_into().unwrap()));
210            let data = payload[4..].to_vec();
211            Some(WalRecord::Put { doc_id, data })
212        }
213        TAG_DELETE => {
214            if payload.len() < 4 {
215                return None;
216            }
217            let doc_id = DocId::new(u32::from_le_bytes(payload[0..4].try_into().unwrap()));
218            Some(WalRecord::Delete { doc_id })
219        }
220        _ => None,
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use std::fs;
228
229    fn test_path(name: &str) -> std::path::PathBuf {
230        let dir = std::env::temp_dir().join(format!("luci_wal_test_{}", std::process::id()));
231        fs::create_dir_all(&dir).unwrap();
232        dir.join(name)
233    }
234
235    #[test]
236    fn write_and_replay_put() {
237        let path = test_path("put.wal");
238        let _ = fs::remove_file(&path);
239
240        {
241            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
242            wal.append(&WalRecord::Put {
243                doc_id: DocId::new(1),
244                data: b"hello".to_vec(),
245            })
246            .unwrap();
247            wal.sync().unwrap();
248        }
249
250        let records = replay_wal(&path).unwrap();
251        assert_eq!(records.len(), 1);
252        assert_eq!(
253            records[0],
254            WalRecord::Put {
255                doc_id: DocId::new(1),
256                data: b"hello".to_vec(),
257            }
258        );
259
260        fs::remove_file(&path).unwrap();
261    }
262
263    #[test]
264    fn write_and_replay_delete() {
265        let path = test_path("delete.wal");
266        let _ = fs::remove_file(&path);
267
268        {
269            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
270            wal.append(&WalRecord::Delete {
271                doc_id: DocId::new(42),
272            })
273            .unwrap();
274            wal.sync().unwrap();
275        }
276
277        let records = replay_wal(&path).unwrap();
278        assert_eq!(records.len(), 1);
279        assert_eq!(
280            records[0],
281            WalRecord::Delete {
282                doc_id: DocId::new(42),
283            }
284        );
285
286        fs::remove_file(&path).unwrap();
287    }
288
289    #[test]
290    fn multiple_records() {
291        let path = test_path("multi.wal");
292        let _ = fs::remove_file(&path);
293
294        {
295            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
296            for i in 0..100 {
297                wal.append(&WalRecord::Put {
298                    doc_id: DocId::new(i),
299                    data: format!("doc-{i}").into_bytes(),
300                })
301                .unwrap();
302            }
303            wal.append(&WalRecord::Delete {
304                doc_id: DocId::new(50),
305            })
306            .unwrap();
307            wal.sync().unwrap();
308        }
309
310        let records = replay_wal(&path).unwrap();
311        assert_eq!(records.len(), 101);
312
313        // Check first and last.
314        assert_eq!(
315            records[0],
316            WalRecord::Put {
317                doc_id: DocId::new(0),
318                data: b"doc-0".to_vec(),
319            }
320        );
321        assert_eq!(
322            records[100],
323            WalRecord::Delete {
324                doc_id: DocId::new(50),
325            }
326        );
327
328        fs::remove_file(&path).unwrap();
329    }
330
331    #[test]
332    fn truncate_clears_wal() {
333        let path = test_path("truncate.wal");
334        let _ = fs::remove_file(&path);
335
336        {
337            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
338            wal.append(&WalRecord::Put {
339                doc_id: DocId::new(1),
340                data: b"data".to_vec(),
341            })
342            .unwrap();
343            wal.sync().unwrap();
344            wal.truncate().unwrap();
345        }
346
347        let records = replay_wal(&path).unwrap();
348        assert!(records.is_empty());
349
350        fs::remove_file(&path).unwrap();
351    }
352
353    #[test]
354    fn truncated_header_is_discarded() {
355        let path = test_path("trunc_header.wal");
356        let _ = fs::remove_file(&path);
357
358        // Write a valid record then append a partial header.
359        {
360            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
361            wal.append(&WalRecord::Put {
362                doc_id: DocId::new(1),
363                data: b"valid".to_vec(),
364            })
365            .unwrap();
366            wal.sync().unwrap();
367        }
368
369        // Append a partial header (less than 16 bytes).
370        {
371            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
372            file.write_all(&[0u8; 8]).unwrap();
373        }
374
375        let records = replay_wal(&path).unwrap();
376        assert_eq!(records.len(), 1);
377        assert_eq!(
378            records[0],
379            WalRecord::Put {
380                doc_id: DocId::new(1),
381                data: b"valid".to_vec(),
382            }
383        );
384
385        fs::remove_file(&path).unwrap();
386    }
387
388    #[test]
389    fn truncated_payload_is_discarded() {
390        let path = test_path("trunc_payload.wal");
391        let _ = fs::remove_file(&path);
392
393        // Write two valid records.
394        {
395            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
396            wal.append(&WalRecord::Put {
397                doc_id: DocId::new(1),
398                data: b"first".to_vec(),
399            })
400            .unwrap();
401            wal.append(&WalRecord::Put {
402                doc_id: DocId::new(2),
403                data: b"second".to_vec(),
404            })
405            .unwrap();
406            wal.sync().unwrap();
407        }
408
409        // Truncate the file mid-way through the second record's payload.
410        {
411            let meta = fs::metadata(&path).unwrap();
412            let first_record_size = RECORD_HEADER_SIZE + 4 + 5; // header + doc_id + "first"
413            // Keep first record + second record's header + 2 payload bytes.
414            let truncated_len = first_record_size + RECORD_HEADER_SIZE + 2;
415            assert!(truncated_len < meta.len() as usize);
416            let file = OpenOptions::new().write(true).open(&path).unwrap();
417            file.set_len(truncated_len as u64).unwrap();
418        }
419
420        let records = replay_wal(&path).unwrap();
421        assert_eq!(records.len(), 1);
422        assert_eq!(
423            records[0],
424            WalRecord::Put {
425                doc_id: DocId::new(1),
426                data: b"first".to_vec(),
427            }
428        );
429
430        fs::remove_file(&path).unwrap();
431    }
432
433    #[test]
434    fn corrupted_checksum_stops_replay() {
435        let path = test_path("corrupt_checksum.wal");
436        let _ = fs::remove_file(&path);
437
438        {
439            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
440            wal.append(&WalRecord::Put {
441                doc_id: DocId::new(1),
442                data: b"good".to_vec(),
443            })
444            .unwrap();
445            wal.append(&WalRecord::Put {
446                doc_id: DocId::new(2),
447                data: b"will-be-corrupted".to_vec(),
448            })
449            .unwrap();
450            wal.append(&WalRecord::Put {
451                doc_id: DocId::new(3),
452                data: b"after-corrupt".to_vec(),
453            })
454            .unwrap();
455            wal.sync().unwrap();
456        }
457
458        // Corrupt the checksum of the second record.
459        {
460            let first_record_size = RECORD_HEADER_SIZE + 4 + 4; // header + doc_id + "good"
461            let checksum_offset = first_record_size + 8; // second record's checksum field
462            let mut file = OpenOptions::new()
463                .read(true)
464                .write(true)
465                .open(&path)
466                .unwrap();
467            let mut buf = [0u8; 1];
468            use std::io::{Seek, SeekFrom};
469            file.seek(SeekFrom::Start(checksum_offset as u64)).unwrap();
470            file.read_exact(&mut buf).unwrap();
471            buf[0] ^= 0xFF;
472            file.seek(SeekFrom::Start(checksum_offset as u64)).unwrap();
473            file.write_all(&buf).unwrap();
474        }
475
476        // Only the first record should be recovered.
477        let records = replay_wal(&path).unwrap();
478        assert_eq!(records.len(), 1);
479        assert_eq!(
480            records[0],
481            WalRecord::Put {
482                doc_id: DocId::new(1),
483                data: b"good".to_vec(),
484            }
485        );
486
487        fs::remove_file(&path).unwrap();
488    }
489
490    #[test]
491    fn replay_nonexistent_file_returns_empty() {
492        let path = test_path("nonexistent.wal");
493        let _ = fs::remove_file(&path);
494        let records = replay_wal(&path).unwrap();
495        assert!(records.is_empty());
496    }
497
498    #[test]
499    fn replay_empty_file() {
500        let path = test_path("empty.wal");
501        let _ = fs::remove_file(&path);
502        File::create(&path).unwrap();
503
504        let records = replay_wal(&path).unwrap();
505        assert!(records.is_empty());
506
507        fs::remove_file(&path).unwrap();
508    }
509
510    #[test]
511    fn full_durability_mode() {
512        let path = test_path("full_mode.wal");
513        let _ = fs::remove_file(&path);
514
515        // Full mode: each append fsyncs.
516        let mut wal = Wal::open(&path, DurabilityMode::Full).unwrap();
517        assert_eq!(wal.mode(), DurabilityMode::Full);
518
519        wal.append(&WalRecord::Put {
520            doc_id: DocId::new(1),
521            data: b"durable".to_vec(),
522        })
523        .unwrap();
524
525        // Record should be on disk immediately (no explicit sync needed).
526        drop(wal);
527
528        let records = replay_wal(&path).unwrap();
529        assert_eq!(records.len(), 1);
530
531        fs::remove_file(&path).unwrap();
532    }
533
534    #[test]
535    fn append_after_truncate() {
536        let path = test_path("append_after_trunc.wal");
537        let _ = fs::remove_file(&path);
538
539        let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
540
541        wal.append(&WalRecord::Put {
542            doc_id: DocId::new(1),
543            data: b"before".to_vec(),
544        })
545        .unwrap();
546        wal.sync().unwrap();
547        wal.truncate().unwrap();
548
549        wal.append(&WalRecord::Put {
550            doc_id: DocId::new(2),
551            data: b"after".to_vec(),
552        })
553        .unwrap();
554        wal.sync().unwrap();
555        drop(wal);
556
557        let records = replay_wal(&path).unwrap();
558        assert_eq!(records.len(), 1);
559        assert_eq!(
560            records[0],
561            WalRecord::Put {
562                doc_id: DocId::new(2),
563                data: b"after".to_vec(),
564            }
565        );
566
567        fs::remove_file(&path).unwrap();
568    }
569
570    #[test]
571    fn large_payload() {
572        let path = test_path("large_payload.wal");
573        let _ = fs::remove_file(&path);
574
575        let big_data = vec![0xCDu8; 1_000_000];
576
577        {
578            let mut wal = Wal::open(&path, DurabilityMode::Batch).unwrap();
579            wal.append(&WalRecord::Put {
580                doc_id: DocId::new(1),
581                data: big_data.clone(),
582            })
583            .unwrap();
584            wal.sync().unwrap();
585        }
586
587        let records = replay_wal(&path).unwrap();
588        assert_eq!(records.len(), 1);
589        match &records[0] {
590            WalRecord::Put { doc_id, data } => {
591                assert_eq!(*doc_id, DocId::new(1));
592                assert_eq!(data, &big_data);
593            }
594            _ => panic!("expected Put"),
595        }
596
597        fs::remove_file(&path).unwrap();
598    }
599}