1use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19pub type Lsn = u64;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26 PageWrite = 1,
27 Commit = 2,
29 Checkpoint = 3,
30}
31
32impl TryFrom<u8> for WalEntryType {
33 type Error = MenteError;
34 fn try_from(v: u8) -> MenteResult<Self> {
35 match v {
36 1 => Ok(Self::PageWrite),
37 2 => Ok(Self::Commit),
38 3 => Ok(Self::Checkpoint),
39 _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct WalEntry {
47 pub lsn: u64,
49 pub entry_type: WalEntryType,
51 pub page_id: u64,
53 pub data: Vec<u8>,
55 pub checksum: u32,
57}
58
59pub struct Wal {
61 file: File,
62 dir_path: std::path::PathBuf,
63 next_lsn: u64,
64}
65
66const MIN_PAYLOAD: usize = 17;
68
69impl Wal {
70 pub fn open(dir_path: &Path) -> MenteResult<Self> {
72 let wal_path = dir_path.join("wal.log");
73 let exists = wal_path.exists()
74 && std::fs::metadata(&wal_path)
75 .map(|m| m.len() > 0)
76 .unwrap_or(false);
77
78 let file = OpenOptions::new()
79 .read(true)
80 .write(true)
81 .create(true)
82 .truncate(false)
83 .open(&wal_path)?;
84
85 let mut wal = Self {
86 file,
87 dir_path: dir_path.to_path_buf(),
88 next_lsn: 1,
89 };
90
91 if exists {
92 let entries = wal.read_all_entries()?;
93 if let Some(last) = entries.last() {
94 wal.next_lsn = last.lsn + 1;
95 }
96 info!(
97 next_lsn = wal.next_lsn,
98 entries = entries.len(),
99 "opened existing WAL"
100 );
101 } else {
102 info!("created new WAL");
103 }
104
105 Ok(wal)
106 }
107
108 pub fn lock_exclusive(&self) -> MenteResult<()> {
114 use fs2::FileExt;
115 self.file
116 .lock_exclusive()
117 .map_err(|e| MenteError::Storage(format!("WAL flock failed: {e}")))
118 }
119
120 pub fn unlock(&self) -> MenteResult<()> {
122 fs2::FileExt::unlock(&self.file)
123 .map_err(|e| MenteError::Storage(format!("WAL unlock failed: {e}")))
124 }
125
126 pub fn reload_lsn(&mut self) -> MenteResult<()> {
129 let entries = self.read_all_entries()?;
130 self.next_lsn = entries.last().map_or(1, |e| e.lsn + 1);
131 debug!(next_lsn = self.next_lsn, "reloaded WAL LSN");
132 Ok(())
133 }
134
135 pub fn append(
137 &mut self,
138 entry_type: WalEntryType,
139 page_id: u64,
140 data: &[u8],
141 ) -> MenteResult<Lsn> {
142 let lsn = self.next_lsn;
143 self.next_lsn += 1;
144
145 let compressed = lz4_flex::compress_prepend_size(data);
146
147 let payload_len = 8 + 1 + 8 + compressed.len();
149 let mut payload = Vec::with_capacity(payload_len);
150 payload.extend_from_slice(&lsn.to_le_bytes());
151 payload.push(entry_type as u8);
152 payload.extend_from_slice(&page_id.to_le_bytes());
153 payload.extend_from_slice(&compressed);
154
155 let crc = {
156 let mut h = crc32fast::Hasher::new();
157 h.update(&payload);
158 h.finalize()
159 };
160
161 self.file.seek(SeekFrom::End(0))?;
162 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
163 self.file.write_all(&payload)?;
164 self.file.write_all(&crc.to_le_bytes())?;
165
166 trace!(lsn, page_id, "appended WAL entry");
167 Ok(lsn)
168 }
169
170 pub fn sync(&mut self) -> MenteResult<()> {
172 self.file.sync_data()?;
173 debug!("WAL synced");
174 Ok(())
175 }
176
177 pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
179 self.read_all_entries()
180 }
181
182 pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
186 let entries = self.read_all_entries()?;
187 let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
188
189 let wal_path = self.dir_path.join("wal.log");
190 let tmp_path = self.dir_path.join("wal.log.tmp");
191
192 {
193 let mut tmp_file = OpenOptions::new()
194 .write(true)
195 .create(true)
196 .truncate(true)
197 .open(&tmp_path)?;
198
199 for entry in to_keep {
200 let compressed = lz4_flex::compress_prepend_size(&entry.data);
201
202 let payload_len = 8 + 1 + 8 + compressed.len();
203 let mut payload = Vec::with_capacity(payload_len);
204 payload.extend_from_slice(&entry.lsn.to_le_bytes());
205 payload.push(entry.entry_type as u8);
206 payload.extend_from_slice(&entry.page_id.to_le_bytes());
207 payload.extend_from_slice(&compressed);
208
209 let crc = {
210 let mut h = crc32fast::Hasher::new();
211 h.update(&payload);
212 h.finalize()
213 };
214
215 tmp_file.write_all(&(payload_len as u32).to_le_bytes())?;
216 tmp_file.write_all(&payload)?;
217 tmp_file.write_all(&crc.to_le_bytes())?;
218 }
219
220 tmp_file.sync_data()?;
221 }
222
223 std::fs::rename(&tmp_path, &wal_path)?;
224
225 let new_file = OpenOptions::new().read(true).write(true).open(&wal_path)?;
228 fs2::FileExt::lock_exclusive(&new_file)
229 .map_err(|e| MenteError::Storage(format!("WAL flock re-acquire failed: {e}")))?;
230 self.file = new_file;
231
232 debug!(before_lsn, "WAL truncated (atomic)");
233 Ok(())
234 }
235
236 pub fn next_lsn(&self) -> Lsn {
238 self.next_lsn
239 }
240
241 fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
244 self.file.seek(SeekFrom::Start(0))?;
245 let file_len = self.file.metadata()?.len();
246 let mut offset: u64 = 0;
247 let mut entries = Vec::new();
248
249 while offset + 4 <= file_len {
250 let mut len_buf = [0u8; 4];
252 if self.file.read_exact(&mut len_buf).is_err() {
253 break;
254 }
255 let payload_len = u32::from_le_bytes(len_buf) as usize;
256 offset += 4;
257
258 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
259 break;
260 }
261
262 let mut payload = vec![0u8; payload_len];
264 if self.file.read_exact(&mut payload).is_err() {
265 break;
266 }
267 offset += payload_len as u64;
268
269 let mut crc_buf = [0u8; 4];
271 if self.file.read_exact(&mut crc_buf).is_err() {
272 break;
273 }
274 let stored_crc = u32::from_le_bytes(crc_buf);
275 offset += 4;
276
277 let computed_crc = {
279 let mut h = crc32fast::Hasher::new();
280 h.update(&payload);
281 h.finalize()
282 };
283 if computed_crc != stored_crc {
284 break; }
286
287 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
289 let entry_type = match WalEntryType::try_from(payload[8]) {
290 Ok(t) => t,
291 Err(_) => break,
292 };
293 let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
294 let compressed_data = &payload[17..];
295
296 let data = lz4_flex::decompress_size_prepended(compressed_data)
297 .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
298
299 entries.push(WalEntry {
300 lsn,
301 entry_type,
302 page_id,
303 data,
304 checksum: stored_crc,
305 });
306 }
307
308 Ok(entries)
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 fn setup() -> (tempfile::TempDir, Wal) {
317 let dir = tempfile::tempdir().unwrap();
318 let wal = Wal::open(dir.path()).unwrap();
319 (dir, wal)
320 }
321
322 #[test]
323 fn test_append_and_iterate() {
324 let (_dir, mut wal) = setup();
325
326 let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
327 let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
328 assert_eq!(lsn1, 1);
329 assert_eq!(lsn2, 2);
330
331 let entries = wal.iterate().unwrap();
332 assert_eq!(entries.len(), 2);
333 assert_eq!(entries[0].lsn, 1);
334 assert_eq!(entries[0].data, b"hello");
335 assert_eq!(entries[1].lsn, 2);
336 assert_eq!(entries[1].data, b"world");
337 }
338
339 #[test]
340 fn test_sync() {
341 let (_dir, mut wal) = setup();
342 wal.append(WalEntryType::Commit, 0, b"").unwrap();
343 wal.sync().unwrap(); }
345
346 #[test]
347 fn test_truncate() {
348 let (_dir, mut wal) = setup();
349
350 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
351 wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
352 wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
353
354 wal.truncate(3).unwrap();
356
357 let entries = wal.iterate().unwrap();
358 assert_eq!(entries.len(), 1);
359 assert_eq!(entries[0].lsn, 3);
360 }
361
362 #[test]
363 fn test_recovery_reopen() {
364 let dir = tempfile::tempdir().unwrap();
365 {
366 let mut wal = Wal::open(dir.path()).unwrap();
367 wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
368 .unwrap();
369 wal.sync().unwrap();
370 }
371 {
372 let mut wal = Wal::open(dir.path()).unwrap();
373 assert_eq!(wal.next_lsn(), 2);
374 let entries = wal.iterate().unwrap();
375 assert_eq!(entries.len(), 1);
376 assert_eq!(entries[0].page_id, 10);
377 assert_eq!(entries[0].data, b"recovery-data");
378 }
379 }
380
381 #[test]
382 fn test_empty_data_entry() {
383 let (_dir, mut wal) = setup();
384 let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
385 let entries = wal.iterate().unwrap();
386 assert_eq!(entries.len(), 1);
387 assert_eq!(entries[0].lsn, lsn);
388 assert!(entries[0].data.is_empty());
389 }
390
391 #[test]
392 fn test_large_data_compression() {
393 let (_dir, mut wal) = setup();
394 let big_data = vec![0xABu8; 8192];
395 wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
396
397 let entries = wal.iterate().unwrap();
398 assert_eq!(entries.len(), 1);
399 assert_eq!(entries[0].data, big_data);
400 }
401
402 #[test]
403 fn test_append_then_sync_is_durable() {
404 let dir = tempfile::tempdir().unwrap();
407 {
408 let mut wal = Wal::open(dir.path()).unwrap();
409 wal.append(WalEntryType::PageWrite, 1, b"batch1").unwrap();
410 wal.append(WalEntryType::PageWrite, 2, b"batch2").unwrap();
411 wal.sync().unwrap();
412 }
413 {
414 let mut wal = Wal::open(dir.path()).unwrap();
415 let entries = wal.iterate().unwrap();
416 assert_eq!(entries.len(), 2);
417 assert_eq!(entries[0].data, b"batch1");
418 assert_eq!(entries[1].data, b"batch2");
419 }
420 }
421
422 #[test]
423 fn test_truncate_atomic_preserves_kept_entries() {
424 let dir = tempfile::tempdir().unwrap();
425 {
426 let mut wal = Wal::open(dir.path()).unwrap();
427 wal.append(WalEntryType::PageWrite, 1, b"old1").unwrap();
428 wal.append(WalEntryType::PageWrite, 2, b"old2").unwrap();
429 wal.append(WalEntryType::PageWrite, 3, b"keep1").unwrap();
430 wal.append(WalEntryType::PageWrite, 4, b"keep2").unwrap();
431
432 wal.truncate(3).unwrap();
433
434 let entries = wal.iterate().unwrap();
435 assert_eq!(entries.len(), 2);
436 assert_eq!(entries[0].data, b"keep1");
437 assert_eq!(entries[1].data, b"keep2");
438 }
439 {
441 let mut wal = Wal::open(dir.path()).unwrap();
442 let entries = wal.iterate().unwrap();
443 assert_eq!(entries.len(), 2);
444 assert_eq!(entries[0].lsn, 3);
445 assert_eq!(entries[1].lsn, 4);
446 }
447 }
448
449 #[test]
450 fn test_truncate_no_temp_file_left_behind() {
451 let dir = tempfile::tempdir().unwrap();
452 let mut wal = Wal::open(dir.path()).unwrap();
453 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
454 wal.truncate(2).unwrap();
455
456 assert!(!dir.path().join("wal.log.tmp").exists());
458 }
459
460 #[test]
461 fn test_append_after_truncate_works() {
462 let (_dir, mut wal) = setup();
463 wal.append(WalEntryType::PageWrite, 1, b"before").unwrap();
464 wal.truncate(2).unwrap();
465
466 wal.append(WalEntryType::PageWrite, 10, b"after").unwrap();
468 let entries = wal.iterate().unwrap();
469 assert_eq!(entries.len(), 1);
470 assert_eq!(entries[0].data, b"after");
471 }
472}