Skip to main content

mentedb_storage/
wal.rs

1//! Write-Ahead Log: append-only log for crash recovery.
2//!
3//! WAL entry format on disk:
4//! ```text
5//! [length: u32][lsn: u64][type: u8][page_id: u64][compressed_data: ...][crc32: u32]
6//! ```
7//!
8//! - `length`: byte count of the payload (lsn + type + page_id + compressed_data).
9//! - `compressed_data`: the data portion compressed with LZ4.
10//! - `crc32`: checksum over the entire payload.
11
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19/// Log Sequence Number.
20pub type Lsn = u64;
21
22/// WAL entry type discriminant.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26    PageWrite = 1,
27    Commit = 2,
28    Checkpoint = 3,
29}
30
31impl TryFrom<u8> for WalEntryType {
32    type Error = MenteError;
33    fn try_from(v: u8) -> MenteResult<Self> {
34        match v {
35            1 => Ok(Self::PageWrite),
36            2 => Ok(Self::Commit),
37            3 => Ok(Self::Checkpoint),
38            _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
39        }
40    }
41}
42
43/// A single WAL entry (in-memory representation).
44#[derive(Debug, Clone)]
45pub struct WalEntry {
46    /// Log sequence number.
47    pub lsn: u64,
48    /// The type of WAL operation.
49    pub entry_type: WalEntryType,
50    /// The page affected by this entry.
51    pub page_id: u64,
52    /// Serialized payload.
53    pub data: Vec<u8>,
54    /// CRC32 checksum for integrity verification.
55    pub checksum: u32,
56}
57
58/// Append-only write-ahead log.
59pub struct Wal {
60    file: File,
61    next_lsn: u64,
62}
63
64/// Minimum payload size: lsn(8) + type(1) + page_id(8).
65const MIN_PAYLOAD: usize = 17;
66
67impl Wal {
68    /// Open or create a WAL file at `dir_path/wal.log`.
69    pub fn open(dir_path: &Path) -> MenteResult<Self> {
70        let wal_path = dir_path.join("wal.log");
71        let exists = wal_path.exists()
72            && std::fs::metadata(&wal_path)
73                .map(|m| m.len() > 0)
74                .unwrap_or(false);
75
76        let file = OpenOptions::new()
77            .read(true)
78            .write(true)
79            .create(true)
80            .truncate(false)
81            .open(&wal_path)?;
82
83        let mut wal = Self { file, next_lsn: 1 };
84
85        if exists {
86            let entries = wal.read_all_entries()?;
87            if let Some(last) = entries.last() {
88                wal.next_lsn = last.lsn + 1;
89            }
90            info!(
91                next_lsn = wal.next_lsn,
92                entries = entries.len(),
93                "opened existing WAL"
94            );
95        } else {
96            info!("created new WAL");
97        }
98
99        Ok(wal)
100    }
101
102    /// Append an entry to the WAL and return its LSN.
103    pub fn append(
104        &mut self,
105        entry_type: WalEntryType,
106        page_id: u64,
107        data: &[u8],
108    ) -> MenteResult<Lsn> {
109        let lsn = self.next_lsn;
110        self.next_lsn += 1;
111
112        let compressed = lz4_flex::compress_prepend_size(data);
113
114        // Build the payload: lsn + type + page_id + compressed_data
115        let payload_len = 8 + 1 + 8 + compressed.len();
116        let mut payload = Vec::with_capacity(payload_len);
117        payload.extend_from_slice(&lsn.to_le_bytes());
118        payload.push(entry_type as u8);
119        payload.extend_from_slice(&page_id.to_le_bytes());
120        payload.extend_from_slice(&compressed);
121
122        let crc = {
123            let mut h = crc32fast::Hasher::new();
124            h.update(&payload);
125            h.finalize()
126        };
127
128        self.file.seek(SeekFrom::End(0))?;
129        self.file.write_all(&(payload_len as u32).to_le_bytes())?;
130        self.file.write_all(&payload)?;
131        self.file.write_all(&crc.to_le_bytes())?;
132
133        trace!(lsn, page_id, "appended WAL entry");
134        Ok(lsn)
135    }
136
137    /// Flush the WAL to durable storage (fdatasync).
138    pub fn sync(&mut self) -> MenteResult<()> {
139        self.file.sync_data()?;
140        debug!("WAL synced");
141        Ok(())
142    }
143
144    /// Read all valid entries from the WAL for recovery.
145    pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
146        self.read_all_entries()
147    }
148
149    /// Truncate all entries with LSN **less than** `before_lsn`.
150    pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
151        let entries = self.read_all_entries()?;
152        let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
153
154        self.file.seek(SeekFrom::Start(0))?;
155        self.file.set_len(0)?;
156
157        for entry in to_keep {
158            let compressed = lz4_flex::compress_prepend_size(&entry.data);
159
160            let payload_len = 8 + 1 + 8 + compressed.len();
161            let mut payload = Vec::with_capacity(payload_len);
162            payload.extend_from_slice(&entry.lsn.to_le_bytes());
163            payload.push(entry.entry_type as u8);
164            payload.extend_from_slice(&entry.page_id.to_le_bytes());
165            payload.extend_from_slice(&compressed);
166
167            let crc = {
168                let mut h = crc32fast::Hasher::new();
169                h.update(&payload);
170                h.finalize()
171            };
172
173            self.file.write_all(&(payload_len as u32).to_le_bytes())?;
174            self.file.write_all(&payload)?;
175            self.file.write_all(&crc.to_le_bytes())?;
176        }
177
178        self.file.sync_data()?;
179        debug!(before_lsn, "WAL truncated");
180        Ok(())
181    }
182
183    /// Current next LSN (useful for external callers).
184    pub fn next_lsn(&self) -> Lsn {
185        self.next_lsn
186    }
187
188    // ---- internal helpers ----
189
190    fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
191        self.file.seek(SeekFrom::Start(0))?;
192        let file_len = self.file.metadata()?.len();
193        let mut offset: u64 = 0;
194        let mut entries = Vec::new();
195
196        while offset + 4 <= file_len {
197            // Read length
198            let mut len_buf = [0u8; 4];
199            if self.file.read_exact(&mut len_buf).is_err() {
200                break;
201            }
202            let payload_len = u32::from_le_bytes(len_buf) as usize;
203            offset += 4;
204
205            if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
206                break;
207            }
208
209            // Read payload
210            let mut payload = vec![0u8; payload_len];
211            if self.file.read_exact(&mut payload).is_err() {
212                break;
213            }
214            offset += payload_len as u64;
215
216            // Read CRC
217            let mut crc_buf = [0u8; 4];
218            if self.file.read_exact(&mut crc_buf).is_err() {
219                break;
220            }
221            let stored_crc = u32::from_le_bytes(crc_buf);
222            offset += 4;
223
224            // Verify CRC
225            let computed_crc = {
226                let mut h = crc32fast::Hasher::new();
227                h.update(&payload);
228                h.finalize()
229            };
230            if computed_crc != stored_crc {
231                break; // Corruption — stop here.
232            }
233
234            // Parse
235            let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
236            let entry_type = match WalEntryType::try_from(payload[8]) {
237                Ok(t) => t,
238                Err(_) => break,
239            };
240            let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
241            let compressed_data = &payload[17..];
242
243            let data = lz4_flex::decompress_size_prepended(compressed_data)
244                .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
245
246            entries.push(WalEntry {
247                lsn,
248                entry_type,
249                page_id,
250                data,
251                checksum: stored_crc,
252            });
253        }
254
255        Ok(entries)
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    fn setup() -> (tempfile::TempDir, Wal) {
264        let dir = tempfile::tempdir().unwrap();
265        let wal = Wal::open(dir.path()).unwrap();
266        (dir, wal)
267    }
268
269    #[test]
270    fn test_append_and_iterate() {
271        let (_dir, mut wal) = setup();
272
273        let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
274        let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
275        assert_eq!(lsn1, 1);
276        assert_eq!(lsn2, 2);
277
278        let entries = wal.iterate().unwrap();
279        assert_eq!(entries.len(), 2);
280        assert_eq!(entries[0].lsn, 1);
281        assert_eq!(entries[0].data, b"hello");
282        assert_eq!(entries[1].lsn, 2);
283        assert_eq!(entries[1].data, b"world");
284    }
285
286    #[test]
287    fn test_sync() {
288        let (_dir, mut wal) = setup();
289        wal.append(WalEntryType::Commit, 0, b"").unwrap();
290        wal.sync().unwrap(); // should not panic
291    }
292
293    #[test]
294    fn test_truncate() {
295        let (_dir, mut wal) = setup();
296
297        wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
298        wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
299        wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
300
301        // Truncate everything before LSN 3.
302        wal.truncate(3).unwrap();
303
304        let entries = wal.iterate().unwrap();
305        assert_eq!(entries.len(), 1);
306        assert_eq!(entries[0].lsn, 3);
307    }
308
309    #[test]
310    fn test_recovery_reopen() {
311        let dir = tempfile::tempdir().unwrap();
312        {
313            let mut wal = Wal::open(dir.path()).unwrap();
314            wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
315                .unwrap();
316            wal.sync().unwrap();
317        }
318        {
319            let mut wal = Wal::open(dir.path()).unwrap();
320            assert_eq!(wal.next_lsn(), 2);
321            let entries = wal.iterate().unwrap();
322            assert_eq!(entries.len(), 1);
323            assert_eq!(entries[0].page_id, 10);
324            assert_eq!(entries[0].data, b"recovery-data");
325        }
326    }
327
328    #[test]
329    fn test_empty_data_entry() {
330        let (_dir, mut wal) = setup();
331        let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
332        let entries = wal.iterate().unwrap();
333        assert_eq!(entries.len(), 1);
334        assert_eq!(entries[0].lsn, lsn);
335        assert!(entries[0].data.is_empty());
336    }
337
338    #[test]
339    fn test_large_data_compression() {
340        let (_dir, mut wal) = setup();
341        let big_data = vec![0xABu8; 8192];
342        wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
343
344        let entries = wal.iterate().unwrap();
345        assert_eq!(entries.len(), 1);
346        assert_eq!(entries[0].data, big_data);
347    }
348}