1use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19pub type Lsn = u64;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26 PageWrite = 1,
27 Commit = 2,
29 Checkpoint = 3,
30}
31
32impl TryFrom<u8> for WalEntryType {
33 type Error = MenteError;
34 fn try_from(v: u8) -> MenteResult<Self> {
35 match v {
36 1 => Ok(Self::PageWrite),
37 2 => Ok(Self::Commit),
38 3 => Ok(Self::Checkpoint),
39 _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct WalEntry {
47 pub lsn: u64,
49 pub entry_type: WalEntryType,
51 pub page_id: u64,
53 pub data: Vec<u8>,
55 pub checksum: u32,
57}
58
59pub struct Wal {
61 file: File,
62 dir_path: std::path::PathBuf,
63 next_lsn: u64,
64}
65
66const MIN_PAYLOAD: usize = 17;
68
69impl Wal {
70 pub fn open(dir_path: &Path) -> MenteResult<Self> {
72 let wal_path = dir_path.join("wal.log");
73 let exists = wal_path.exists()
74 && std::fs::metadata(&wal_path)
75 .map(|m| m.len() > 0)
76 .unwrap_or(false);
77
78 let file = OpenOptions::new()
79 .read(true)
80 .write(true)
81 .create(true)
82 .truncate(false)
83 .open(&wal_path)?;
84
85 let mut wal = Self {
86 file,
87 dir_path: dir_path.to_path_buf(),
88 next_lsn: 1,
89 };
90
91 if exists {
92 let entries = wal.read_all_entries()?;
93 if let Some(last) = entries.last() {
94 wal.next_lsn = last.lsn + 1;
95 }
96 info!(
97 next_lsn = wal.next_lsn,
98 entries = entries.len(),
99 "opened existing WAL"
100 );
101 } else {
102 info!("created new WAL");
103 }
104
105 Ok(wal)
106 }
107
108 pub fn append(
110 &mut self,
111 entry_type: WalEntryType,
112 page_id: u64,
113 data: &[u8],
114 ) -> MenteResult<Lsn> {
115 let lsn = self.next_lsn;
116 self.next_lsn += 1;
117
118 let compressed = lz4_flex::compress_prepend_size(data);
119
120 let payload_len = 8 + 1 + 8 + compressed.len();
122 let mut payload = Vec::with_capacity(payload_len);
123 payload.extend_from_slice(&lsn.to_le_bytes());
124 payload.push(entry_type as u8);
125 payload.extend_from_slice(&page_id.to_le_bytes());
126 payload.extend_from_slice(&compressed);
127
128 let crc = {
129 let mut h = crc32fast::Hasher::new();
130 h.update(&payload);
131 h.finalize()
132 };
133
134 self.file.seek(SeekFrom::End(0))?;
135 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
136 self.file.write_all(&payload)?;
137 self.file.write_all(&crc.to_le_bytes())?;
138
139 trace!(lsn, page_id, "appended WAL entry");
140 Ok(lsn)
141 }
142
143 pub fn sync(&mut self) -> MenteResult<()> {
145 self.file.sync_data()?;
146 debug!("WAL synced");
147 Ok(())
148 }
149
150 pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
152 self.read_all_entries()
153 }
154
155 pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
159 let entries = self.read_all_entries()?;
160 let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
161
162 let wal_path = self.dir_path.join("wal.log");
163 let tmp_path = self.dir_path.join("wal.log.tmp");
164
165 {
166 let mut tmp_file = OpenOptions::new()
167 .write(true)
168 .create(true)
169 .truncate(true)
170 .open(&tmp_path)?;
171
172 for entry in to_keep {
173 let compressed = lz4_flex::compress_prepend_size(&entry.data);
174
175 let payload_len = 8 + 1 + 8 + compressed.len();
176 let mut payload = Vec::with_capacity(payload_len);
177 payload.extend_from_slice(&entry.lsn.to_le_bytes());
178 payload.push(entry.entry_type as u8);
179 payload.extend_from_slice(&entry.page_id.to_le_bytes());
180 payload.extend_from_slice(&compressed);
181
182 let crc = {
183 let mut h = crc32fast::Hasher::new();
184 h.update(&payload);
185 h.finalize()
186 };
187
188 tmp_file.write_all(&(payload_len as u32).to_le_bytes())?;
189 tmp_file.write_all(&payload)?;
190 tmp_file.write_all(&crc.to_le_bytes())?;
191 }
192
193 tmp_file.sync_data()?;
194 }
195
196 std::fs::rename(&tmp_path, &wal_path)?;
197
198 self.file = OpenOptions::new().read(true).write(true).open(&wal_path)?;
200
201 debug!(before_lsn, "WAL truncated (atomic)");
202 Ok(())
203 }
204
205 pub fn next_lsn(&self) -> Lsn {
207 self.next_lsn
208 }
209
210 fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
213 self.file.seek(SeekFrom::Start(0))?;
214 let file_len = self.file.metadata()?.len();
215 let mut offset: u64 = 0;
216 let mut entries = Vec::new();
217
218 while offset + 4 <= file_len {
219 let mut len_buf = [0u8; 4];
221 if self.file.read_exact(&mut len_buf).is_err() {
222 break;
223 }
224 let payload_len = u32::from_le_bytes(len_buf) as usize;
225 offset += 4;
226
227 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
228 break;
229 }
230
231 let mut payload = vec![0u8; payload_len];
233 if self.file.read_exact(&mut payload).is_err() {
234 break;
235 }
236 offset += payload_len as u64;
237
238 let mut crc_buf = [0u8; 4];
240 if self.file.read_exact(&mut crc_buf).is_err() {
241 break;
242 }
243 let stored_crc = u32::from_le_bytes(crc_buf);
244 offset += 4;
245
246 let computed_crc = {
248 let mut h = crc32fast::Hasher::new();
249 h.update(&payload);
250 h.finalize()
251 };
252 if computed_crc != stored_crc {
253 break; }
255
256 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
258 let entry_type = match WalEntryType::try_from(payload[8]) {
259 Ok(t) => t,
260 Err(_) => break,
261 };
262 let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
263 let compressed_data = &payload[17..];
264
265 let data = lz4_flex::decompress_size_prepended(compressed_data)
266 .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
267
268 entries.push(WalEntry {
269 lsn,
270 entry_type,
271 page_id,
272 data,
273 checksum: stored_crc,
274 });
275 }
276
277 Ok(entries)
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 fn setup() -> (tempfile::TempDir, Wal) {
286 let dir = tempfile::tempdir().unwrap();
287 let wal = Wal::open(dir.path()).unwrap();
288 (dir, wal)
289 }
290
291 #[test]
292 fn test_append_and_iterate() {
293 let (_dir, mut wal) = setup();
294
295 let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
296 let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
297 assert_eq!(lsn1, 1);
298 assert_eq!(lsn2, 2);
299
300 let entries = wal.iterate().unwrap();
301 assert_eq!(entries.len(), 2);
302 assert_eq!(entries[0].lsn, 1);
303 assert_eq!(entries[0].data, b"hello");
304 assert_eq!(entries[1].lsn, 2);
305 assert_eq!(entries[1].data, b"world");
306 }
307
308 #[test]
309 fn test_sync() {
310 let (_dir, mut wal) = setup();
311 wal.append(WalEntryType::Commit, 0, b"").unwrap();
312 wal.sync().unwrap(); }
314
315 #[test]
316 fn test_truncate() {
317 let (_dir, mut wal) = setup();
318
319 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
320 wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
321 wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
322
323 wal.truncate(3).unwrap();
325
326 let entries = wal.iterate().unwrap();
327 assert_eq!(entries.len(), 1);
328 assert_eq!(entries[0].lsn, 3);
329 }
330
331 #[test]
332 fn test_recovery_reopen() {
333 let dir = tempfile::tempdir().unwrap();
334 {
335 let mut wal = Wal::open(dir.path()).unwrap();
336 wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
337 .unwrap();
338 wal.sync().unwrap();
339 }
340 {
341 let mut wal = Wal::open(dir.path()).unwrap();
342 assert_eq!(wal.next_lsn(), 2);
343 let entries = wal.iterate().unwrap();
344 assert_eq!(entries.len(), 1);
345 assert_eq!(entries[0].page_id, 10);
346 assert_eq!(entries[0].data, b"recovery-data");
347 }
348 }
349
350 #[test]
351 fn test_empty_data_entry() {
352 let (_dir, mut wal) = setup();
353 let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
354 let entries = wal.iterate().unwrap();
355 assert_eq!(entries.len(), 1);
356 assert_eq!(entries[0].lsn, lsn);
357 assert!(entries[0].data.is_empty());
358 }
359
360 #[test]
361 fn test_large_data_compression() {
362 let (_dir, mut wal) = setup();
363 let big_data = vec![0xABu8; 8192];
364 wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
365
366 let entries = wal.iterate().unwrap();
367 assert_eq!(entries.len(), 1);
368 assert_eq!(entries[0].data, big_data);
369 }
370
371 #[test]
372 fn test_append_then_sync_is_durable() {
373 let dir = tempfile::tempdir().unwrap();
376 {
377 let mut wal = Wal::open(dir.path()).unwrap();
378 wal.append(WalEntryType::PageWrite, 1, b"batch1").unwrap();
379 wal.append(WalEntryType::PageWrite, 2, b"batch2").unwrap();
380 wal.sync().unwrap();
381 }
382 {
383 let mut wal = Wal::open(dir.path()).unwrap();
384 let entries = wal.iterate().unwrap();
385 assert_eq!(entries.len(), 2);
386 assert_eq!(entries[0].data, b"batch1");
387 assert_eq!(entries[1].data, b"batch2");
388 }
389 }
390
391 #[test]
392 fn test_truncate_atomic_preserves_kept_entries() {
393 let dir = tempfile::tempdir().unwrap();
394 {
395 let mut wal = Wal::open(dir.path()).unwrap();
396 wal.append(WalEntryType::PageWrite, 1, b"old1").unwrap();
397 wal.append(WalEntryType::PageWrite, 2, b"old2").unwrap();
398 wal.append(WalEntryType::PageWrite, 3, b"keep1").unwrap();
399 wal.append(WalEntryType::PageWrite, 4, b"keep2").unwrap();
400
401 wal.truncate(3).unwrap();
402
403 let entries = wal.iterate().unwrap();
404 assert_eq!(entries.len(), 2);
405 assert_eq!(entries[0].data, b"keep1");
406 assert_eq!(entries[1].data, b"keep2");
407 }
408 {
410 let mut wal = Wal::open(dir.path()).unwrap();
411 let entries = wal.iterate().unwrap();
412 assert_eq!(entries.len(), 2);
413 assert_eq!(entries[0].lsn, 3);
414 assert_eq!(entries[1].lsn, 4);
415 }
416 }
417
418 #[test]
419 fn test_truncate_no_temp_file_left_behind() {
420 let dir = tempfile::tempdir().unwrap();
421 let mut wal = Wal::open(dir.path()).unwrap();
422 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
423 wal.truncate(2).unwrap();
424
425 assert!(!dir.path().join("wal.log.tmp").exists());
427 }
428
429 #[test]
430 fn test_append_after_truncate_works() {
431 let (_dir, mut wal) = setup();
432 wal.append(WalEntryType::PageWrite, 1, b"before").unwrap();
433 wal.truncate(2).unwrap();
434
435 wal.append(WalEntryType::PageWrite, 10, b"after").unwrap();
437 let entries = wal.iterate().unwrap();
438 assert_eq!(entries.len(), 1);
439 assert_eq!(entries[0].data, b"after");
440 }
441}