Skip to main content

mentedb_storage/
wal.rs

1//! Write-Ahead Log — append-only log for crash recovery.
2//!
3//! WAL entry format on disk:
4//! ```text
5//! [length: u32][lsn: u64][type: u8][page_id: u64][compressed_data: ...][crc32: u32]
6//! ```
7//!
8//! - `length` — byte count of the payload (lsn + type + page_id + compressed_data).
9//! - `compressed_data` — the data portion compressed with LZ4.
10//! - `crc32` — checksum over the entire payload.
11
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19/// Log Sequence Number.
20pub type Lsn = u64;
21
22/// WAL entry type discriminant.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26    PageWrite = 1,
27    Commit = 2,
28    Checkpoint = 3,
29}
30
31impl TryFrom<u8> for WalEntryType {
32    type Error = MenteError;
33    fn try_from(v: u8) -> MenteResult<Self> {
34        match v {
35            1 => Ok(Self::PageWrite),
36            2 => Ok(Self::Commit),
37            3 => Ok(Self::Checkpoint),
38            _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
39        }
40    }
41}
42
43/// A single WAL entry (in-memory representation).
44#[derive(Debug, Clone)]
45pub struct WalEntry {
46    pub lsn: u64,
47    pub entry_type: WalEntryType,
48    pub page_id: u64,
49    pub data: Vec<u8>,
50    pub checksum: u32,
51}
52
53/// Append-only write-ahead log.
54pub struct Wal {
55    file: File,
56    next_lsn: u64,
57}
58
59/// Minimum payload size: lsn(8) + type(1) + page_id(8).
60const MIN_PAYLOAD: usize = 17;
61
62impl Wal {
63    /// Open or create a WAL file at `dir_path/wal.log`.
64    pub fn open(dir_path: &Path) -> MenteResult<Self> {
65        let wal_path = dir_path.join("wal.log");
66        let exists = wal_path.exists()
67            && std::fs::metadata(&wal_path)
68                .map(|m| m.len() > 0)
69                .unwrap_or(false);
70
71        let file = OpenOptions::new()
72            .read(true)
73            .write(true)
74            .create(true)
75            .truncate(false)
76            .open(&wal_path)?;
77
78        let mut wal = Self { file, next_lsn: 1 };
79
80        if exists {
81            let entries = wal.read_all_entries()?;
82            if let Some(last) = entries.last() {
83                wal.next_lsn = last.lsn + 1;
84            }
85            info!(
86                next_lsn = wal.next_lsn,
87                entries = entries.len(),
88                "opened existing WAL"
89            );
90        } else {
91            info!("created new WAL");
92        }
93
94        Ok(wal)
95    }
96
97    /// Append an entry to the WAL and return its LSN.
98    pub fn append(
99        &mut self,
100        entry_type: WalEntryType,
101        page_id: u64,
102        data: &[u8],
103    ) -> MenteResult<Lsn> {
104        let lsn = self.next_lsn;
105        self.next_lsn += 1;
106
107        let compressed = lz4_flex::compress_prepend_size(data);
108
109        // Build the payload: lsn + type + page_id + compressed_data
110        let payload_len = 8 + 1 + 8 + compressed.len();
111        let mut payload = Vec::with_capacity(payload_len);
112        payload.extend_from_slice(&lsn.to_le_bytes());
113        payload.push(entry_type as u8);
114        payload.extend_from_slice(&page_id.to_le_bytes());
115        payload.extend_from_slice(&compressed);
116
117        let crc = {
118            let mut h = crc32fast::Hasher::new();
119            h.update(&payload);
120            h.finalize()
121        };
122
123        self.file.seek(SeekFrom::End(0))?;
124        self.file.write_all(&(payload_len as u32).to_le_bytes())?;
125        self.file.write_all(&payload)?;
126        self.file.write_all(&crc.to_le_bytes())?;
127
128        trace!(lsn, page_id, "appended WAL entry");
129        Ok(lsn)
130    }
131
132    /// Flush the WAL to durable storage (fdatasync).
133    pub fn sync(&mut self) -> MenteResult<()> {
134        self.file.sync_data()?;
135        debug!("WAL synced");
136        Ok(())
137    }
138
139    /// Read all valid entries from the WAL for recovery.
140    pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
141        self.read_all_entries()
142    }
143
144    /// Truncate all entries with LSN **less than** `before_lsn`.
145    pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
146        let entries = self.read_all_entries()?;
147        let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
148
149        self.file.seek(SeekFrom::Start(0))?;
150        self.file.set_len(0)?;
151
152        for entry in to_keep {
153            let compressed = lz4_flex::compress_prepend_size(&entry.data);
154
155            let payload_len = 8 + 1 + 8 + compressed.len();
156            let mut payload = Vec::with_capacity(payload_len);
157            payload.extend_from_slice(&entry.lsn.to_le_bytes());
158            payload.push(entry.entry_type as u8);
159            payload.extend_from_slice(&entry.page_id.to_le_bytes());
160            payload.extend_from_slice(&compressed);
161
162            let crc = {
163                let mut h = crc32fast::Hasher::new();
164                h.update(&payload);
165                h.finalize()
166            };
167
168            self.file.write_all(&(payload_len as u32).to_le_bytes())?;
169            self.file.write_all(&payload)?;
170            self.file.write_all(&crc.to_le_bytes())?;
171        }
172
173        self.file.sync_data()?;
174        debug!(before_lsn, "WAL truncated");
175        Ok(())
176    }
177
178    /// Current next LSN (useful for external callers).
179    pub fn next_lsn(&self) -> Lsn {
180        self.next_lsn
181    }
182
183    // ---- internal helpers ----
184
185    fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
186        self.file.seek(SeekFrom::Start(0))?;
187        let file_len = self.file.metadata()?.len();
188        let mut offset: u64 = 0;
189        let mut entries = Vec::new();
190
191        while offset + 4 <= file_len {
192            // Read length
193            let mut len_buf = [0u8; 4];
194            if self.file.read_exact(&mut len_buf).is_err() {
195                break;
196            }
197            let payload_len = u32::from_le_bytes(len_buf) as usize;
198            offset += 4;
199
200            if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
201                break;
202            }
203
204            // Read payload
205            let mut payload = vec![0u8; payload_len];
206            if self.file.read_exact(&mut payload).is_err() {
207                break;
208            }
209            offset += payload_len as u64;
210
211            // Read CRC
212            let mut crc_buf = [0u8; 4];
213            if self.file.read_exact(&mut crc_buf).is_err() {
214                break;
215            }
216            let stored_crc = u32::from_le_bytes(crc_buf);
217            offset += 4;
218
219            // Verify CRC
220            let computed_crc = {
221                let mut h = crc32fast::Hasher::new();
222                h.update(&payload);
223                h.finalize()
224            };
225            if computed_crc != stored_crc {
226                break; // Corruption — stop here.
227            }
228
229            // Parse
230            let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
231            let entry_type = match WalEntryType::try_from(payload[8]) {
232                Ok(t) => t,
233                Err(_) => break,
234            };
235            let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
236            let compressed_data = &payload[17..];
237
238            let data = lz4_flex::decompress_size_prepended(compressed_data)
239                .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
240
241            entries.push(WalEntry {
242                lsn,
243                entry_type,
244                page_id,
245                data,
246                checksum: stored_crc,
247            });
248        }
249
250        Ok(entries)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    fn setup() -> (tempfile::TempDir, Wal) {
259        let dir = tempfile::tempdir().unwrap();
260        let wal = Wal::open(dir.path()).unwrap();
261        (dir, wal)
262    }
263
264    #[test]
265    fn test_append_and_iterate() {
266        let (_dir, mut wal) = setup();
267
268        let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
269        let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
270        assert_eq!(lsn1, 1);
271        assert_eq!(lsn2, 2);
272
273        let entries = wal.iterate().unwrap();
274        assert_eq!(entries.len(), 2);
275        assert_eq!(entries[0].lsn, 1);
276        assert_eq!(entries[0].data, b"hello");
277        assert_eq!(entries[1].lsn, 2);
278        assert_eq!(entries[1].data, b"world");
279    }
280
281    #[test]
282    fn test_sync() {
283        let (_dir, mut wal) = setup();
284        wal.append(WalEntryType::Commit, 0, b"").unwrap();
285        wal.sync().unwrap(); // should not panic
286    }
287
288    #[test]
289    fn test_truncate() {
290        let (_dir, mut wal) = setup();
291
292        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
293        wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
294        wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
295
296        // Truncate everything before LSN 3.
297        wal.truncate(3).unwrap();
298
299        let entries = wal.iterate().unwrap();
300        assert_eq!(entries.len(), 1);
301        assert_eq!(entries[0].lsn, 3);
302    }
303
304    #[test]
305    fn test_recovery_reopen() {
306        let dir = tempfile::tempdir().unwrap();
307        {
308            let mut wal = Wal::open(dir.path()).unwrap();
309            wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
310                .unwrap();
311            wal.sync().unwrap();
312        }
313        {
314            let mut wal = Wal::open(dir.path()).unwrap();
315            assert_eq!(wal.next_lsn(), 2);
316            let entries = wal.iterate().unwrap();
317            assert_eq!(entries.len(), 1);
318            assert_eq!(entries[0].page_id, 10);
319            assert_eq!(entries[0].data, b"recovery-data");
320        }
321    }
322
323    #[test]
324    fn test_empty_data_entry() {
325        let (_dir, mut wal) = setup();
326        let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
327        let entries = wal.iterate().unwrap();
328        assert_eq!(entries.len(), 1);
329        assert_eq!(entries[0].lsn, lsn);
330        assert!(entries[0].data.is_empty());
331    }
332
333    #[test]
334    fn test_large_data_compression() {
335        let (_dir, mut wal) = setup();
336        let big_data = vec![0xABu8; 8192];
337        wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
338
339        let entries = wal.iterate().unwrap();
340        assert_eq!(entries.len(), 1);
341        assert_eq!(entries[0].data, big_data);
342    }
343}