Skip to main content

mentedb_storage/
wal.rs

1//! Write-Ahead Log: append-only log for crash recovery.
2//!
3//! WAL entry format on disk:
4//! ```text
5//! [length: u32][lsn: u64][type: u8][page_id: u64][compressed_data: ...][crc32: u32]
6//! ```
7//!
8//! - `length`: byte count of the payload (lsn + type + page_id + compressed_data).
9//! - `compressed_data`: the data portion compressed with LZ4.
10//! - `crc32`: checksum over the entire payload.
11
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19/// Log Sequence Number.
20pub type Lsn = u64;
21
22/// WAL entry type discriminant.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26    PageWrite = 1,
27    /// Reserved for future transaction support. Not currently emitted.
28    Commit = 2,
29    Checkpoint = 3,
30}
31
32impl TryFrom<u8> for WalEntryType {
33    type Error = MenteError;
34    fn try_from(v: u8) -> MenteResult<Self> {
35        match v {
36            1 => Ok(Self::PageWrite),
37            2 => Ok(Self::Commit),
38            3 => Ok(Self::Checkpoint),
39            _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
40        }
41    }
42}
43
44/// A single WAL entry (in-memory representation).
45#[derive(Debug, Clone)]
46pub struct WalEntry {
47    /// Log sequence number.
48    pub lsn: u64,
49    /// The type of WAL operation.
50    pub entry_type: WalEntryType,
51    /// The page affected by this entry.
52    pub page_id: u64,
53    /// Serialized payload.
54    pub data: Vec<u8>,
55    /// CRC32 checksum for integrity verification.
56    pub checksum: u32,
57}
58
59/// Append-only write-ahead log.
60pub struct Wal {
61    file: File,
62    dir_path: std::path::PathBuf,
63    next_lsn: u64,
64}
65
66/// Minimum payload size: lsn(8) + type(1) + page_id(8).
67const MIN_PAYLOAD: usize = 17;
68
69impl Wal {
70    /// Open or create a WAL file at `dir_path/wal.log`.
71    pub fn open(dir_path: &Path) -> MenteResult<Self> {
72        let wal_path = dir_path.join("wal.log");
73        let exists = wal_path.exists()
74            && std::fs::metadata(&wal_path)
75                .map(|m| m.len() > 0)
76                .unwrap_or(false);
77
78        let file = OpenOptions::new()
79            .read(true)
80            .write(true)
81            .create(true)
82            .truncate(false)
83            .open(&wal_path)?;
84
85        let mut wal = Self {
86            file,
87            dir_path: dir_path.to_path_buf(),
88            next_lsn: 1,
89        };
90
91        if exists {
92            let entries = wal.read_all_entries()?;
93            if let Some(last) = entries.last() {
94                wal.next_lsn = last.lsn + 1;
95            }
96            info!(
97                next_lsn = wal.next_lsn,
98                entries = entries.len(),
99                "opened existing WAL"
100            );
101        } else {
102            info!("created new WAL");
103        }
104
105        Ok(wal)
106    }
107
108    /// Acquire a blocking exclusive file lock on the WAL file.
109    ///
110    /// Uses `flock(2)` (via fs2) which works across processes on the same host.
111    /// Blocks until the lock is available — callers should hold it only for
112    /// the duration of append + fsync.
113    pub fn lock_exclusive(&self) -> MenteResult<()> {
114        use fs2::FileExt;
115        self.file
116            .lock_exclusive()
117            .map_err(|e| MenteError::Storage(format!("WAL flock failed: {e}")))
118    }
119
120    /// Release the file lock on the WAL file.
121    pub fn unlock(&self) -> MenteResult<()> {
122        fs2::FileExt::unlock(&self.file)
123            .map_err(|e| MenteError::Storage(format!("WAL unlock failed: {e}")))
124    }
125
126    /// Re-read the WAL file to find the highest LSN, updating next_lsn.
127    /// Must be called under flock to see writes from other processes.
128    pub fn reload_lsn(&mut self) -> MenteResult<()> {
129        let entries = self.read_all_entries()?;
130        self.next_lsn = entries.last().map_or(1, |e| e.lsn + 1);
131        debug!(next_lsn = self.next_lsn, "reloaded WAL LSN");
132        Ok(())
133    }
134
135    /// Append an entry to the WAL and return its LSN.
136    pub fn append(
137        &mut self,
138        entry_type: WalEntryType,
139        page_id: u64,
140        data: &[u8],
141    ) -> MenteResult<Lsn> {
142        let lsn = self.next_lsn;
143        self.next_lsn += 1;
144
145        let compressed = lz4_flex::compress_prepend_size(data);
146
147        // Build the payload: lsn + type + page_id + compressed_data
148        let payload_len = 8 + 1 + 8 + compressed.len();
149        let mut payload = Vec::with_capacity(payload_len);
150        payload.extend_from_slice(&lsn.to_le_bytes());
151        payload.push(entry_type as u8);
152        payload.extend_from_slice(&page_id.to_le_bytes());
153        payload.extend_from_slice(&compressed);
154
155        let crc = {
156            let mut h = crc32fast::Hasher::new();
157            h.update(&payload);
158            h.finalize()
159        };
160
161        self.file.seek(SeekFrom::End(0))?;
162        self.file.write_all(&(payload_len as u32).to_le_bytes())?;
163        self.file.write_all(&payload)?;
164        self.file.write_all(&crc.to_le_bytes())?;
165
166        trace!(lsn, page_id, "appended WAL entry");
167        Ok(lsn)
168    }
169
170    /// Flush the WAL to durable storage (fdatasync).
171    pub fn sync(&mut self) -> MenteResult<()> {
172        self.file.sync_data()?;
173        debug!("WAL synced");
174        Ok(())
175    }
176
177    /// Read all valid entries from the WAL for recovery.
178    pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
179        self.read_all_entries()
180    }
181
182    /// Truncate all entries with LSN **less than** `before_lsn`.
183    ///
184    /// Uses atomic write-to-temp-then-rename to avoid data loss on crash.
185    pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
186        let entries = self.read_all_entries()?;
187        let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
188
189        let wal_path = self.dir_path.join("wal.log");
190        let tmp_path = self.dir_path.join("wal.log.tmp");
191
192        {
193            let mut tmp_file = OpenOptions::new()
194                .write(true)
195                .create(true)
196                .truncate(true)
197                .open(&tmp_path)?;
198
199            for entry in to_keep {
200                let compressed = lz4_flex::compress_prepend_size(&entry.data);
201
202                let payload_len = 8 + 1 + 8 + compressed.len();
203                let mut payload = Vec::with_capacity(payload_len);
204                payload.extend_from_slice(&entry.lsn.to_le_bytes());
205                payload.push(entry.entry_type as u8);
206                payload.extend_from_slice(&entry.page_id.to_le_bytes());
207                payload.extend_from_slice(&compressed);
208
209                let crc = {
210                    let mut h = crc32fast::Hasher::new();
211                    h.update(&payload);
212                    h.finalize()
213                };
214
215                tmp_file.write_all(&(payload_len as u32).to_le_bytes())?;
216                tmp_file.write_all(&payload)?;
217                tmp_file.write_all(&crc.to_le_bytes())?;
218            }
219
220            tmp_file.sync_data()?;
221        }
222
223        std::fs::rename(&tmp_path, &wal_path)?;
224
225        // Reopen the renamed file and re-acquire flock so callers' subsequent
226        // unlock() releases the correct fd.
227        let new_file = OpenOptions::new().read(true).write(true).open(&wal_path)?;
228        fs2::FileExt::lock_exclusive(&new_file)
229            .map_err(|e| MenteError::Storage(format!("WAL flock re-acquire failed: {e}")))?;
230        self.file = new_file;
231
232        debug!(before_lsn, "WAL truncated (atomic)");
233        Ok(())
234    }
235
236    /// Current next LSN (useful for external callers).
237    pub fn next_lsn(&self) -> Lsn {
238        self.next_lsn
239    }
240
241    // ---- internal helpers ----
242
243    fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
244        self.file.seek(SeekFrom::Start(0))?;
245        let file_len = self.file.metadata()?.len();
246        let mut offset: u64 = 0;
247        let mut entries = Vec::new();
248
249        while offset + 4 <= file_len {
250            // Read length
251            let mut len_buf = [0u8; 4];
252            if self.file.read_exact(&mut len_buf).is_err() {
253                break;
254            }
255            let payload_len = u32::from_le_bytes(len_buf) as usize;
256            offset += 4;
257
258            if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
259                break;
260            }
261
262            // Read payload
263            let mut payload = vec![0u8; payload_len];
264            if self.file.read_exact(&mut payload).is_err() {
265                break;
266            }
267            offset += payload_len as u64;
268
269            // Read CRC
270            let mut crc_buf = [0u8; 4];
271            if self.file.read_exact(&mut crc_buf).is_err() {
272                break;
273            }
274            let stored_crc = u32::from_le_bytes(crc_buf);
275            offset += 4;
276
277            // Verify CRC
278            let computed_crc = {
279                let mut h = crc32fast::Hasher::new();
280                h.update(&payload);
281                h.finalize()
282            };
283            if computed_crc != stored_crc {
284                break; // Corruption — stop here.
285            }
286
287            // Parse
288            let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
289            let entry_type = match WalEntryType::try_from(payload[8]) {
290                Ok(t) => t,
291                Err(_) => break,
292            };
293            let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
294            let compressed_data = &payload[17..];
295
296            let data = lz4_flex::decompress_size_prepended(compressed_data)
297                .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
298
299            entries.push(WalEntry {
300                lsn,
301                entry_type,
302                page_id,
303                data,
304                checksum: stored_crc,
305            });
306        }
307
308        Ok(entries)
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    fn setup() -> (tempfile::TempDir, Wal) {
317        let dir = tempfile::tempdir().unwrap();
318        let wal = Wal::open(dir.path()).unwrap();
319        (dir, wal)
320    }
321
322    #[test]
323    fn test_append_and_iterate() {
324        let (_dir, mut wal) = setup();
325
326        let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
327        let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
328        assert_eq!(lsn1, 1);
329        assert_eq!(lsn2, 2);
330
331        let entries = wal.iterate().unwrap();
332        assert_eq!(entries.len(), 2);
333        assert_eq!(entries[0].lsn, 1);
334        assert_eq!(entries[0].data, b"hello");
335        assert_eq!(entries[1].lsn, 2);
336        assert_eq!(entries[1].data, b"world");
337    }
338
339    #[test]
340    fn test_sync() {
341        let (_dir, mut wal) = setup();
342        wal.append(WalEntryType::Commit, 0, b"").unwrap();
343        wal.sync().unwrap(); // should not panic
344    }
345
346    #[test]
347    fn test_truncate() {
348        let (_dir, mut wal) = setup();
349
350        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
351        wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
352        wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
353
354        // Truncate everything before LSN 3.
355        wal.truncate(3).unwrap();
356
357        let entries = wal.iterate().unwrap();
358        assert_eq!(entries.len(), 1);
359        assert_eq!(entries[0].lsn, 3);
360    }
361
362    #[test]
363    fn test_recovery_reopen() {
364        let dir = tempfile::tempdir().unwrap();
365        {
366            let mut wal = Wal::open(dir.path()).unwrap();
367            wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
368                .unwrap();
369            wal.sync().unwrap();
370        }
371        {
372            let mut wal = Wal::open(dir.path()).unwrap();
373            assert_eq!(wal.next_lsn(), 2);
374            let entries = wal.iterate().unwrap();
375            assert_eq!(entries.len(), 1);
376            assert_eq!(entries[0].page_id, 10);
377            assert_eq!(entries[0].data, b"recovery-data");
378        }
379    }
380
381    #[test]
382    fn test_empty_data_entry() {
383        let (_dir, mut wal) = setup();
384        let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
385        let entries = wal.iterate().unwrap();
386        assert_eq!(entries.len(), 1);
387        assert_eq!(entries[0].lsn, lsn);
388        assert!(entries[0].data.is_empty());
389    }
390
391    #[test]
392    fn test_large_data_compression() {
393        let (_dir, mut wal) = setup();
394        let big_data = vec![0xABu8; 8192];
395        wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
396
397        let entries = wal.iterate().unwrap();
398        assert_eq!(entries.len(), 1);
399        assert_eq!(entries[0].data, big_data);
400    }
401
402    #[test]
403    fn test_append_then_sync_is_durable() {
404        // append() alone does not fsync — callers must call sync() for durability.
405        // This matches the group-commit pattern: batch appends, sync once.
406        let dir = tempfile::tempdir().unwrap();
407        {
408            let mut wal = Wal::open(dir.path()).unwrap();
409            wal.append(WalEntryType::PageWrite, 1, b"batch1").unwrap();
410            wal.append(WalEntryType::PageWrite, 2, b"batch2").unwrap();
411            wal.sync().unwrap();
412        }
413        {
414            let mut wal = Wal::open(dir.path()).unwrap();
415            let entries = wal.iterate().unwrap();
416            assert_eq!(entries.len(), 2);
417            assert_eq!(entries[0].data, b"batch1");
418            assert_eq!(entries[1].data, b"batch2");
419        }
420    }
421
422    #[test]
423    fn test_truncate_atomic_preserves_kept_entries() {
424        let dir = tempfile::tempdir().unwrap();
425        {
426            let mut wal = Wal::open(dir.path()).unwrap();
427            wal.append(WalEntryType::PageWrite, 1, b"old1").unwrap();
428            wal.append(WalEntryType::PageWrite, 2, b"old2").unwrap();
429            wal.append(WalEntryType::PageWrite, 3, b"keep1").unwrap();
430            wal.append(WalEntryType::PageWrite, 4, b"keep2").unwrap();
431
432            wal.truncate(3).unwrap();
433
434            let entries = wal.iterate().unwrap();
435            assert_eq!(entries.len(), 2);
436            assert_eq!(entries[0].data, b"keep1");
437            assert_eq!(entries[1].data, b"keep2");
438        }
439        // Verify survives reopen
440        {
441            let mut wal = Wal::open(dir.path()).unwrap();
442            let entries = wal.iterate().unwrap();
443            assert_eq!(entries.len(), 2);
444            assert_eq!(entries[0].lsn, 3);
445            assert_eq!(entries[1].lsn, 4);
446        }
447    }
448
449    #[test]
450    fn test_truncate_no_temp_file_left_behind() {
451        let dir = tempfile::tempdir().unwrap();
452        let mut wal = Wal::open(dir.path()).unwrap();
453        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
454        wal.truncate(2).unwrap();
455
456        // Temp file should not exist after truncation
457        assert!(!dir.path().join("wal.log.tmp").exists());
458    }
459
460    #[test]
461    fn test_append_after_truncate_works() {
462        let (_dir, mut wal) = setup();
463        wal.append(WalEntryType::PageWrite, 1, b"before").unwrap();
464        wal.truncate(2).unwrap();
465
466        // Should be able to append after truncation (file handle is valid)
467        wal.append(WalEntryType::PageWrite, 10, b"after").unwrap();
468        let entries = wal.iterate().unwrap();
469        assert_eq!(entries.len(), 1);
470        assert_eq!(entries[0].data, b"after");
471    }
472}