Skip to main content

mentedb_storage/
wal.rs

1//! Write-Ahead Log: append-only log for crash recovery.
2//!
3//! WAL entry format on disk:
4//! ```text
5//! [length: u32][lsn: u64][type: u8][page_id: u64][compressed_data: ...][crc32: u32]
6//! ```
7//!
8//! - `length`: byte count of the payload (lsn + type + page_id + compressed_data).
9//! - `compressed_data`: the data portion compressed with LZ4.
10//! - `crc32`: checksum over the entire payload.
11
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19/// Log Sequence Number.
20pub type Lsn = u64;
21
22/// WAL entry type discriminant.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26    PageWrite = 1,
27    /// Reserved for future transaction support. Not currently emitted.
28    Commit = 2,
29    Checkpoint = 3,
30}
31
32impl TryFrom<u8> for WalEntryType {
33    type Error = MenteError;
34    fn try_from(v: u8) -> MenteResult<Self> {
35        match v {
36            1 => Ok(Self::PageWrite),
37            2 => Ok(Self::Commit),
38            3 => Ok(Self::Checkpoint),
39            _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
40        }
41    }
42}
43
44/// A single WAL entry (in-memory representation).
45#[derive(Debug, Clone)]
46pub struct WalEntry {
47    /// Log sequence number.
48    pub lsn: u64,
49    /// The type of WAL operation.
50    pub entry_type: WalEntryType,
51    /// The page affected by this entry.
52    pub page_id: u64,
53    /// Serialized payload.
54    pub data: Vec<u8>,
55    /// CRC32 checksum for integrity verification.
56    pub checksum: u32,
57}
58
59/// Append-only write-ahead log.
60pub struct Wal {
61    file: File,
62    dir_path: std::path::PathBuf,
63    next_lsn: u64,
64}
65
66/// Minimum payload size: lsn(8) + type(1) + page_id(8).
67const MIN_PAYLOAD: usize = 17;
68
69impl Wal {
70    /// Open or create a WAL file at `dir_path/wal.log`.
71    pub fn open(dir_path: &Path) -> MenteResult<Self> {
72        let wal_path = dir_path.join("wal.log");
73        let exists = wal_path.exists()
74            && std::fs::metadata(&wal_path)
75                .map(|m| m.len() > 0)
76                .unwrap_or(false);
77
78        let file = OpenOptions::new()
79            .read(true)
80            .write(true)
81            .create(true)
82            .truncate(false)
83            .open(&wal_path)?;
84
85        let mut wal = Self {
86            file,
87            dir_path: dir_path.to_path_buf(),
88            next_lsn: 1,
89        };
90
91        if exists {
92            let entries = wal.read_all_entries()?;
93            if let Some(last) = entries.last() {
94                wal.next_lsn = last.lsn + 1;
95            }
96            info!(
97                next_lsn = wal.next_lsn,
98                entries = entries.len(),
99                "opened existing WAL"
100            );
101        } else {
102            info!("created new WAL");
103        }
104
105        Ok(wal)
106    }
107
108    /// Acquire a blocking exclusive file lock on the WAL file.
109    ///
110    /// Uses `flock(2)` (via fs2) which works across processes on the same host.
111    /// Blocks until the lock is available — callers should hold it only for
112    /// the duration of append + fsync.
113    pub fn lock_exclusive(&self) -> MenteResult<()> {
114        use fs2::FileExt;
115        self.file
116            .lock_exclusive()
117            .map_err(|e| MenteError::Storage(format!("WAL flock failed: {e}")))
118    }
119
120    /// Release the file lock on the WAL file.
121    pub fn unlock(&self) -> MenteResult<()> {
122        fs2::FileExt::unlock(&self.file)
123            .map_err(|e| MenteError::Storage(format!("WAL unlock failed: {e}")))
124    }
125
126    /// Re-read the WAL file to find the highest LSN, updating next_lsn.
127    /// Must be called under flock to see writes from other processes.
128    ///
129    /// Fast path: reads each entry's raw payload and CRC-validates it, but
130    /// skips the expensive LZ4 decompression. Only extracts the LSN (first
131    /// 8 bytes of each payload).
132    pub fn reload_lsn(&mut self) -> MenteResult<()> {
133        self.file.seek(SeekFrom::Start(0))?;
134        let file_len = self.file.metadata()?.len();
135        let mut offset: u64 = 0;
136        let mut last_lsn: Option<u64> = None;
137
138        while offset + 4 <= file_len {
139            // Read payload length
140            let mut len_buf = [0u8; 4];
141            if self.file.read_exact(&mut len_buf).is_err() {
142                break;
143            }
144            let payload_len = u32::from_le_bytes(len_buf) as usize;
145            offset += 4;
146
147            if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
148                break;
149            }
150
151            // Read full payload (no decompress) for CRC validation
152            let mut payload = vec![0u8; payload_len];
153            if self.file.read_exact(&mut payload).is_err() {
154                break;
155            }
156            offset += payload_len as u64;
157
158            // Read and verify CRC
159            let mut crc_buf = [0u8; 4];
160            if self.file.read_exact(&mut crc_buf).is_err() {
161                break;
162            }
163            let stored_crc = u32::from_le_bytes(crc_buf);
164            offset += 4;
165
166            let computed_crc = {
167                let mut h = crc32fast::Hasher::new();
168                h.update(&payload);
169                h.finalize()
170            };
171            if computed_crc != stored_crc {
172                break; // Corruption — stop here, same as read_all_entries.
173            }
174
175            // Extract LSN from first 8 bytes of payload
176            let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
177            last_lsn = Some(lsn);
178        }
179
180        self.next_lsn = last_lsn.map_or(1, |l| l + 1);
181        debug!(next_lsn = self.next_lsn, "reloaded WAL LSN (fast scan)");
182        Ok(())
183    }
184
185    /// Append an entry to the WAL and return its LSN.
186    pub fn append(
187        &mut self,
188        entry_type: WalEntryType,
189        page_id: u64,
190        data: &[u8],
191    ) -> MenteResult<Lsn> {
192        let lsn = self.next_lsn;
193        self.next_lsn += 1;
194
195        let compressed = lz4_flex::compress_prepend_size(data);
196
197        // Build the payload: lsn + type + page_id + compressed_data
198        let payload_len = 8 + 1 + 8 + compressed.len();
199        let mut payload = Vec::with_capacity(payload_len);
200        payload.extend_from_slice(&lsn.to_le_bytes());
201        payload.push(entry_type as u8);
202        payload.extend_from_slice(&page_id.to_le_bytes());
203        payload.extend_from_slice(&compressed);
204
205        let crc = {
206            let mut h = crc32fast::Hasher::new();
207            h.update(&payload);
208            h.finalize()
209        };
210
211        self.file.seek(SeekFrom::End(0))?;
212        self.file.write_all(&(payload_len as u32).to_le_bytes())?;
213        self.file.write_all(&payload)?;
214        self.file.write_all(&crc.to_le_bytes())?;
215
216        trace!(lsn, page_id, "appended WAL entry");
217        Ok(lsn)
218    }
219
220    /// Flush the WAL to durable storage (fdatasync).
221    pub fn sync(&mut self) -> MenteResult<()> {
222        self.file.sync_data()?;
223        debug!("WAL synced");
224        Ok(())
225    }
226
227    /// Read all valid entries from the WAL for recovery.
228    pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
229        self.read_all_entries()
230    }
231
232    /// Truncate all entries with LSN **less than** `before_lsn`.
233    ///
234    /// Uses atomic write-to-temp-then-rename to avoid data loss on crash.
235    pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
236        let entries = self.read_all_entries()?;
237        let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
238
239        let wal_path = self.dir_path.join("wal.log");
240        let tmp_path = self.dir_path.join("wal.log.tmp");
241
242        {
243            let mut tmp_file = OpenOptions::new()
244                .write(true)
245                .create(true)
246                .truncate(true)
247                .open(&tmp_path)?;
248
249            for entry in to_keep {
250                let compressed = lz4_flex::compress_prepend_size(&entry.data);
251
252                let payload_len = 8 + 1 + 8 + compressed.len();
253                let mut payload = Vec::with_capacity(payload_len);
254                payload.extend_from_slice(&entry.lsn.to_le_bytes());
255                payload.push(entry.entry_type as u8);
256                payload.extend_from_slice(&entry.page_id.to_le_bytes());
257                payload.extend_from_slice(&compressed);
258
259                let crc = {
260                    let mut h = crc32fast::Hasher::new();
261                    h.update(&payload);
262                    h.finalize()
263                };
264
265                tmp_file.write_all(&(payload_len as u32).to_le_bytes())?;
266                tmp_file.write_all(&payload)?;
267                tmp_file.write_all(&crc.to_le_bytes())?;
268            }
269
270            tmp_file.sync_data()?;
271        }
272
273        std::fs::rename(&tmp_path, &wal_path)?;
274
275        // Reopen the renamed file and re-acquire flock so callers' subsequent
276        // unlock() releases the correct fd.
277        let new_file = OpenOptions::new().read(true).write(true).open(&wal_path)?;
278        fs2::FileExt::lock_exclusive(&new_file)
279            .map_err(|e| MenteError::Storage(format!("WAL flock re-acquire failed: {e}")))?;
280        self.file = new_file;
281
282        debug!(before_lsn, "WAL truncated (atomic)");
283        Ok(())
284    }
285
286    /// Current next LSN (useful for external callers).
287    pub fn next_lsn(&self) -> Lsn {
288        self.next_lsn
289    }
290
291    /// Returns the current WAL file size in bytes.
292    pub fn file_size(&self) -> u64 {
293        self.file.metadata().map(|m| m.len()).unwrap_or(0)
294    }
295
296    // ---- internal helpers ----
297
298    fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
299        self.file.seek(SeekFrom::Start(0))?;
300        let file_len = self.file.metadata()?.len();
301        let mut offset: u64 = 0;
302        let mut entries = Vec::new();
303
304        while offset + 4 <= file_len {
305            // Read length
306            let mut len_buf = [0u8; 4];
307            if self.file.read_exact(&mut len_buf).is_err() {
308                break;
309            }
310            let payload_len = u32::from_le_bytes(len_buf) as usize;
311            offset += 4;
312
313            if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
314                break;
315            }
316
317            // Read payload
318            let mut payload = vec![0u8; payload_len];
319            if self.file.read_exact(&mut payload).is_err() {
320                break;
321            }
322            offset += payload_len as u64;
323
324            // Read CRC
325            let mut crc_buf = [0u8; 4];
326            if self.file.read_exact(&mut crc_buf).is_err() {
327                break;
328            }
329            let stored_crc = u32::from_le_bytes(crc_buf);
330            offset += 4;
331
332            // Verify CRC
333            let computed_crc = {
334                let mut h = crc32fast::Hasher::new();
335                h.update(&payload);
336                h.finalize()
337            };
338            if computed_crc != stored_crc {
339                break; // Corruption — stop here.
340            }
341
342            // Parse
343            let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
344            let entry_type = match WalEntryType::try_from(payload[8]) {
345                Ok(t) => t,
346                Err(_) => break,
347            };
348            let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
349            let compressed_data = &payload[17..];
350
351            let data = lz4_flex::decompress_size_prepended(compressed_data)
352                .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
353
354            entries.push(WalEntry {
355                lsn,
356                entry_type,
357                page_id,
358                data,
359                checksum: stored_crc,
360            });
361        }
362
363        Ok(entries)
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn setup() -> (tempfile::TempDir, Wal) {
372        let dir = tempfile::tempdir().unwrap();
373        let wal = Wal::open(dir.path()).unwrap();
374        (dir, wal)
375    }
376
377    #[test]
378    fn test_append_and_iterate() {
379        let (_dir, mut wal) = setup();
380
381        let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
382        let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
383        assert_eq!(lsn1, 1);
384        assert_eq!(lsn2, 2);
385
386        let entries = wal.iterate().unwrap();
387        assert_eq!(entries.len(), 2);
388        assert_eq!(entries[0].lsn, 1);
389        assert_eq!(entries[0].data, b"hello");
390        assert_eq!(entries[1].lsn, 2);
391        assert_eq!(entries[1].data, b"world");
392    }
393
394    #[test]
395    fn test_sync() {
396        let (_dir, mut wal) = setup();
397        wal.append(WalEntryType::Commit, 0, b"").unwrap();
398        wal.sync().unwrap(); // should not panic
399    }
400
401    #[test]
402    fn test_truncate() {
403        let (_dir, mut wal) = setup();
404
405        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
406        wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
407        wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
408
409        // Truncate everything before LSN 3.
410        wal.truncate(3).unwrap();
411
412        let entries = wal.iterate().unwrap();
413        assert_eq!(entries.len(), 1);
414        assert_eq!(entries[0].lsn, 3);
415    }
416
417    #[test]
418    fn test_recovery_reopen() {
419        let dir = tempfile::tempdir().unwrap();
420        {
421            let mut wal = Wal::open(dir.path()).unwrap();
422            wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
423                .unwrap();
424            wal.sync().unwrap();
425        }
426        {
427            let mut wal = Wal::open(dir.path()).unwrap();
428            assert_eq!(wal.next_lsn(), 2);
429            let entries = wal.iterate().unwrap();
430            assert_eq!(entries.len(), 1);
431            assert_eq!(entries[0].page_id, 10);
432            assert_eq!(entries[0].data, b"recovery-data");
433        }
434    }
435
436    #[test]
437    fn test_empty_data_entry() {
438        let (_dir, mut wal) = setup();
439        let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
440        let entries = wal.iterate().unwrap();
441        assert_eq!(entries.len(), 1);
442        assert_eq!(entries[0].lsn, lsn);
443        assert!(entries[0].data.is_empty());
444    }
445
446    #[test]
447    fn test_large_data_compression() {
448        let (_dir, mut wal) = setup();
449        let big_data = vec![0xABu8; 8192];
450        wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
451
452        let entries = wal.iterate().unwrap();
453        assert_eq!(entries.len(), 1);
454        assert_eq!(entries[0].data, big_data);
455    }
456
457    #[test]
458    fn test_append_then_sync_is_durable() {
459        // append() alone does not fsync — callers must call sync() for durability.
460        // This matches the group-commit pattern: batch appends, sync once.
461        let dir = tempfile::tempdir().unwrap();
462        {
463            let mut wal = Wal::open(dir.path()).unwrap();
464            wal.append(WalEntryType::PageWrite, 1, b"batch1").unwrap();
465            wal.append(WalEntryType::PageWrite, 2, b"batch2").unwrap();
466            wal.sync().unwrap();
467        }
468        {
469            let mut wal = Wal::open(dir.path()).unwrap();
470            let entries = wal.iterate().unwrap();
471            assert_eq!(entries.len(), 2);
472            assert_eq!(entries[0].data, b"batch1");
473            assert_eq!(entries[1].data, b"batch2");
474        }
475    }
476
477    #[test]
478    fn test_truncate_atomic_preserves_kept_entries() {
479        let dir = tempfile::tempdir().unwrap();
480        {
481            let mut wal = Wal::open(dir.path()).unwrap();
482            wal.append(WalEntryType::PageWrite, 1, b"old1").unwrap();
483            wal.append(WalEntryType::PageWrite, 2, b"old2").unwrap();
484            wal.append(WalEntryType::PageWrite, 3, b"keep1").unwrap();
485            wal.append(WalEntryType::PageWrite, 4, b"keep2").unwrap();
486
487            wal.truncate(3).unwrap();
488
489            let entries = wal.iterate().unwrap();
490            assert_eq!(entries.len(), 2);
491            assert_eq!(entries[0].data, b"keep1");
492            assert_eq!(entries[1].data, b"keep2");
493        }
494        // Verify survives reopen
495        {
496            let mut wal = Wal::open(dir.path()).unwrap();
497            let entries = wal.iterate().unwrap();
498            assert_eq!(entries.len(), 2);
499            assert_eq!(entries[0].lsn, 3);
500            assert_eq!(entries[1].lsn, 4);
501        }
502    }
503
504    #[test]
505    fn test_truncate_no_temp_file_left_behind() {
506        let dir = tempfile::tempdir().unwrap();
507        let mut wal = Wal::open(dir.path()).unwrap();
508        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
509        wal.truncate(2).unwrap();
510
511        // Temp file should not exist after truncation
512        assert!(!dir.path().join("wal.log.tmp").exists());
513    }
514
515    #[test]
516    fn test_append_after_truncate_works() {
517        let (_dir, mut wal) = setup();
518        wal.append(WalEntryType::PageWrite, 1, b"before").unwrap();
519        wal.truncate(2).unwrap();
520
521        // Should be able to append after truncation (file handle is valid)
522        wal.append(WalEntryType::PageWrite, 10, b"after").unwrap();
523        let entries = wal.iterate().unwrap();
524        assert_eq!(entries.len(), 1);
525        assert_eq!(entries[0].data, b"after");
526    }
527}