1use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19pub type Lsn = u64;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26 PageWrite = 1,
27 Commit = 2,
29 Checkpoint = 3,
30}
31
32impl TryFrom<u8> for WalEntryType {
33 type Error = MenteError;
34 fn try_from(v: u8) -> MenteResult<Self> {
35 match v {
36 1 => Ok(Self::PageWrite),
37 2 => Ok(Self::Commit),
38 3 => Ok(Self::Checkpoint),
39 _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct WalEntry {
47 pub lsn: u64,
49 pub entry_type: WalEntryType,
51 pub page_id: u64,
53 pub data: Vec<u8>,
55 pub checksum: u32,
57}
58
59pub struct Wal {
61 file: File,
62 dir_path: std::path::PathBuf,
63 next_lsn: u64,
64}
65
66const MIN_PAYLOAD: usize = 17;
68
69impl Wal {
70 pub fn open(dir_path: &Path) -> MenteResult<Self> {
72 let wal_path = dir_path.join("wal.log");
73 let exists = wal_path.exists()
74 && std::fs::metadata(&wal_path)
75 .map(|m| m.len() > 0)
76 .unwrap_or(false);
77
78 let file = OpenOptions::new()
79 .read(true)
80 .write(true)
81 .create(true)
82 .truncate(false)
83 .open(&wal_path)?;
84
85 let mut wal = Self {
86 file,
87 dir_path: dir_path.to_path_buf(),
88 next_lsn: 1,
89 };
90
91 if exists {
92 let entries = wal.read_all_entries()?;
93 if let Some(last) = entries.last() {
94 wal.next_lsn = last.lsn + 1;
95 }
96 info!(
97 next_lsn = wal.next_lsn,
98 entries = entries.len(),
99 "opened existing WAL"
100 );
101 } else {
102 info!("created new WAL");
103 }
104
105 Ok(wal)
106 }
107
108 pub fn lock_exclusive(&self) -> MenteResult<()> {
114 use fs2::FileExt;
115 self.file
116 .lock_exclusive()
117 .map_err(|e| MenteError::Storage(format!("WAL flock failed: {e}")))
118 }
119
120 pub fn unlock(&self) -> MenteResult<()> {
122 fs2::FileExt::unlock(&self.file)
123 .map_err(|e| MenteError::Storage(format!("WAL unlock failed: {e}")))
124 }
125
126 pub fn reload_lsn(&mut self) -> MenteResult<()> {
133 self.file.seek(SeekFrom::Start(0))?;
134 let file_len = self.file.metadata()?.len();
135 let mut offset: u64 = 0;
136 let mut last_lsn: Option<u64> = None;
137
138 while offset + 4 <= file_len {
139 let mut len_buf = [0u8; 4];
141 if self.file.read_exact(&mut len_buf).is_err() {
142 break;
143 }
144 let payload_len = u32::from_le_bytes(len_buf) as usize;
145 offset += 4;
146
147 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
148 break;
149 }
150
151 let mut payload = vec![0u8; payload_len];
153 if self.file.read_exact(&mut payload).is_err() {
154 break;
155 }
156 offset += payload_len as u64;
157
158 let mut crc_buf = [0u8; 4];
160 if self.file.read_exact(&mut crc_buf).is_err() {
161 break;
162 }
163 let stored_crc = u32::from_le_bytes(crc_buf);
164 offset += 4;
165
166 let computed_crc = {
167 let mut h = crc32fast::Hasher::new();
168 h.update(&payload);
169 h.finalize()
170 };
171 if computed_crc != stored_crc {
172 break; }
174
175 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
177 last_lsn = Some(lsn);
178 }
179
180 self.next_lsn = last_lsn.map_or(1, |l| l + 1);
181 debug!(next_lsn = self.next_lsn, "reloaded WAL LSN (fast scan)");
182 Ok(())
183 }
184
185 pub fn append(
187 &mut self,
188 entry_type: WalEntryType,
189 page_id: u64,
190 data: &[u8],
191 ) -> MenteResult<Lsn> {
192 let lsn = self.next_lsn;
193 self.next_lsn += 1;
194
195 let compressed = lz4_flex::compress_prepend_size(data);
196
197 let payload_len = 8 + 1 + 8 + compressed.len();
199 let mut payload = Vec::with_capacity(payload_len);
200 payload.extend_from_slice(&lsn.to_le_bytes());
201 payload.push(entry_type as u8);
202 payload.extend_from_slice(&page_id.to_le_bytes());
203 payload.extend_from_slice(&compressed);
204
205 let crc = {
206 let mut h = crc32fast::Hasher::new();
207 h.update(&payload);
208 h.finalize()
209 };
210
211 self.file.seek(SeekFrom::End(0))?;
212 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
213 self.file.write_all(&payload)?;
214 self.file.write_all(&crc.to_le_bytes())?;
215
216 trace!(lsn, page_id, "appended WAL entry");
217 Ok(lsn)
218 }
219
220 pub fn sync(&mut self) -> MenteResult<()> {
222 self.file.sync_data()?;
223 debug!("WAL synced");
224 Ok(())
225 }
226
227 pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
229 self.read_all_entries()
230 }
231
232 pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
236 let entries = self.read_all_entries()?;
237 let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
238
239 let wal_path = self.dir_path.join("wal.log");
240 let tmp_path = self.dir_path.join("wal.log.tmp");
241
242 {
243 let mut tmp_file = OpenOptions::new()
244 .write(true)
245 .create(true)
246 .truncate(true)
247 .open(&tmp_path)?;
248
249 for entry in to_keep {
250 let compressed = lz4_flex::compress_prepend_size(&entry.data);
251
252 let payload_len = 8 + 1 + 8 + compressed.len();
253 let mut payload = Vec::with_capacity(payload_len);
254 payload.extend_from_slice(&entry.lsn.to_le_bytes());
255 payload.push(entry.entry_type as u8);
256 payload.extend_from_slice(&entry.page_id.to_le_bytes());
257 payload.extend_from_slice(&compressed);
258
259 let crc = {
260 let mut h = crc32fast::Hasher::new();
261 h.update(&payload);
262 h.finalize()
263 };
264
265 tmp_file.write_all(&(payload_len as u32).to_le_bytes())?;
266 tmp_file.write_all(&payload)?;
267 tmp_file.write_all(&crc.to_le_bytes())?;
268 }
269
270 tmp_file.sync_data()?;
271 }
272
273 std::fs::rename(&tmp_path, &wal_path)?;
274
275 let new_file = OpenOptions::new().read(true).write(true).open(&wal_path)?;
278 fs2::FileExt::lock_exclusive(&new_file)
279 .map_err(|e| MenteError::Storage(format!("WAL flock re-acquire failed: {e}")))?;
280 self.file = new_file;
281
282 debug!(before_lsn, "WAL truncated (atomic)");
283 Ok(())
284 }
285
286 pub fn next_lsn(&self) -> Lsn {
288 self.next_lsn
289 }
290
291 pub fn file_size(&self) -> u64 {
293 self.file.metadata().map(|m| m.len()).unwrap_or(0)
294 }
295
296 fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
299 self.file.seek(SeekFrom::Start(0))?;
300 let file_len = self.file.metadata()?.len();
301 let mut offset: u64 = 0;
302 let mut entries = Vec::new();
303
304 while offset + 4 <= file_len {
305 let mut len_buf = [0u8; 4];
307 if self.file.read_exact(&mut len_buf).is_err() {
308 break;
309 }
310 let payload_len = u32::from_le_bytes(len_buf) as usize;
311 offset += 4;
312
313 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
314 break;
315 }
316
317 let mut payload = vec![0u8; payload_len];
319 if self.file.read_exact(&mut payload).is_err() {
320 break;
321 }
322 offset += payload_len as u64;
323
324 let mut crc_buf = [0u8; 4];
326 if self.file.read_exact(&mut crc_buf).is_err() {
327 break;
328 }
329 let stored_crc = u32::from_le_bytes(crc_buf);
330 offset += 4;
331
332 let computed_crc = {
334 let mut h = crc32fast::Hasher::new();
335 h.update(&payload);
336 h.finalize()
337 };
338 if computed_crc != stored_crc {
339 break; }
341
342 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
344 let entry_type = match WalEntryType::try_from(payload[8]) {
345 Ok(t) => t,
346 Err(_) => break,
347 };
348 let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
349 let compressed_data = &payload[17..];
350
351 let data = lz4_flex::decompress_size_prepended(compressed_data)
352 .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
353
354 entries.push(WalEntry {
355 lsn,
356 entry_type,
357 page_id,
358 data,
359 checksum: stored_crc,
360 });
361 }
362
363 Ok(entries)
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn setup() -> (tempfile::TempDir, Wal) {
372 let dir = tempfile::tempdir().unwrap();
373 let wal = Wal::open(dir.path()).unwrap();
374 (dir, wal)
375 }
376
377 #[test]
378 fn test_append_and_iterate() {
379 let (_dir, mut wal) = setup();
380
381 let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
382 let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
383 assert_eq!(lsn1, 1);
384 assert_eq!(lsn2, 2);
385
386 let entries = wal.iterate().unwrap();
387 assert_eq!(entries.len(), 2);
388 assert_eq!(entries[0].lsn, 1);
389 assert_eq!(entries[0].data, b"hello");
390 assert_eq!(entries[1].lsn, 2);
391 assert_eq!(entries[1].data, b"world");
392 }
393
394 #[test]
395 fn test_sync() {
396 let (_dir, mut wal) = setup();
397 wal.append(WalEntryType::Commit, 0, b"").unwrap();
398 wal.sync().unwrap(); }
400
401 #[test]
402 fn test_truncate() {
403 let (_dir, mut wal) = setup();
404
405 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
406 wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
407 wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
408
409 wal.truncate(3).unwrap();
411
412 let entries = wal.iterate().unwrap();
413 assert_eq!(entries.len(), 1);
414 assert_eq!(entries[0].lsn, 3);
415 }
416
417 #[test]
418 fn test_recovery_reopen() {
419 let dir = tempfile::tempdir().unwrap();
420 {
421 let mut wal = Wal::open(dir.path()).unwrap();
422 wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
423 .unwrap();
424 wal.sync().unwrap();
425 }
426 {
427 let mut wal = Wal::open(dir.path()).unwrap();
428 assert_eq!(wal.next_lsn(), 2);
429 let entries = wal.iterate().unwrap();
430 assert_eq!(entries.len(), 1);
431 assert_eq!(entries[0].page_id, 10);
432 assert_eq!(entries[0].data, b"recovery-data");
433 }
434 }
435
436 #[test]
437 fn test_empty_data_entry() {
438 let (_dir, mut wal) = setup();
439 let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
440 let entries = wal.iterate().unwrap();
441 assert_eq!(entries.len(), 1);
442 assert_eq!(entries[0].lsn, lsn);
443 assert!(entries[0].data.is_empty());
444 }
445
446 #[test]
447 fn test_large_data_compression() {
448 let (_dir, mut wal) = setup();
449 let big_data = vec![0xABu8; 8192];
450 wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
451
452 let entries = wal.iterate().unwrap();
453 assert_eq!(entries.len(), 1);
454 assert_eq!(entries[0].data, big_data);
455 }
456
457 #[test]
458 fn test_append_then_sync_is_durable() {
459 let dir = tempfile::tempdir().unwrap();
462 {
463 let mut wal = Wal::open(dir.path()).unwrap();
464 wal.append(WalEntryType::PageWrite, 1, b"batch1").unwrap();
465 wal.append(WalEntryType::PageWrite, 2, b"batch2").unwrap();
466 wal.sync().unwrap();
467 }
468 {
469 let mut wal = Wal::open(dir.path()).unwrap();
470 let entries = wal.iterate().unwrap();
471 assert_eq!(entries.len(), 2);
472 assert_eq!(entries[0].data, b"batch1");
473 assert_eq!(entries[1].data, b"batch2");
474 }
475 }
476
477 #[test]
478 fn test_truncate_atomic_preserves_kept_entries() {
479 let dir = tempfile::tempdir().unwrap();
480 {
481 let mut wal = Wal::open(dir.path()).unwrap();
482 wal.append(WalEntryType::PageWrite, 1, b"old1").unwrap();
483 wal.append(WalEntryType::PageWrite, 2, b"old2").unwrap();
484 wal.append(WalEntryType::PageWrite, 3, b"keep1").unwrap();
485 wal.append(WalEntryType::PageWrite, 4, b"keep2").unwrap();
486
487 wal.truncate(3).unwrap();
488
489 let entries = wal.iterate().unwrap();
490 assert_eq!(entries.len(), 2);
491 assert_eq!(entries[0].data, b"keep1");
492 assert_eq!(entries[1].data, b"keep2");
493 }
494 {
496 let mut wal = Wal::open(dir.path()).unwrap();
497 let entries = wal.iterate().unwrap();
498 assert_eq!(entries.len(), 2);
499 assert_eq!(entries[0].lsn, 3);
500 assert_eq!(entries[1].lsn, 4);
501 }
502 }
503
504 #[test]
505 fn test_truncate_no_temp_file_left_behind() {
506 let dir = tempfile::tempdir().unwrap();
507 let mut wal = Wal::open(dir.path()).unwrap();
508 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
509 wal.truncate(2).unwrap();
510
511 assert!(!dir.path().join("wal.log.tmp").exists());
513 }
514
515 #[test]
516 fn test_append_after_truncate_works() {
517 let (_dir, mut wal) = setup();
518 wal.append(WalEntryType::PageWrite, 1, b"before").unwrap();
519 wal.truncate(2).unwrap();
520
521 wal.append(WalEntryType::PageWrite, 10, b"after").unwrap();
523 let entries = wal.iterate().unwrap();
524 assert_eq!(entries.len(), 1);
525 assert_eq!(entries[0].data, b"after");
526 }
527}