1use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19pub type Lsn = u64;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26 PageWrite = 1,
27 Commit = 2,
28 Checkpoint = 3,
29}
30
31impl TryFrom<u8> for WalEntryType {
32 type Error = MenteError;
33 fn try_from(v: u8) -> MenteResult<Self> {
34 match v {
35 1 => Ok(Self::PageWrite),
36 2 => Ok(Self::Commit),
37 3 => Ok(Self::Checkpoint),
38 _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct WalEntry {
46 pub lsn: u64,
48 pub entry_type: WalEntryType,
50 pub page_id: u64,
52 pub data: Vec<u8>,
54 pub checksum: u32,
56}
57
58pub struct Wal {
60 file: File,
61 next_lsn: u64,
62}
63
64const MIN_PAYLOAD: usize = 17;
66
67impl Wal {
68 pub fn open(dir_path: &Path) -> MenteResult<Self> {
70 let wal_path = dir_path.join("wal.log");
71 let exists = wal_path.exists()
72 && std::fs::metadata(&wal_path)
73 .map(|m| m.len() > 0)
74 .unwrap_or(false);
75
76 let file = OpenOptions::new()
77 .read(true)
78 .write(true)
79 .create(true)
80 .truncate(false)
81 .open(&wal_path)?;
82
83 let mut wal = Self { file, next_lsn: 1 };
84
85 if exists {
86 let entries = wal.read_all_entries()?;
87 if let Some(last) = entries.last() {
88 wal.next_lsn = last.lsn + 1;
89 }
90 info!(
91 next_lsn = wal.next_lsn,
92 entries = entries.len(),
93 "opened existing WAL"
94 );
95 } else {
96 info!("created new WAL");
97 }
98
99 Ok(wal)
100 }
101
102 pub fn append(
104 &mut self,
105 entry_type: WalEntryType,
106 page_id: u64,
107 data: &[u8],
108 ) -> MenteResult<Lsn> {
109 let lsn = self.next_lsn;
110 self.next_lsn += 1;
111
112 let compressed = lz4_flex::compress_prepend_size(data);
113
114 let payload_len = 8 + 1 + 8 + compressed.len();
116 let mut payload = Vec::with_capacity(payload_len);
117 payload.extend_from_slice(&lsn.to_le_bytes());
118 payload.push(entry_type as u8);
119 payload.extend_from_slice(&page_id.to_le_bytes());
120 payload.extend_from_slice(&compressed);
121
122 let crc = {
123 let mut h = crc32fast::Hasher::new();
124 h.update(&payload);
125 h.finalize()
126 };
127
128 self.file.seek(SeekFrom::End(0))?;
129 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
130 self.file.write_all(&payload)?;
131 self.file.write_all(&crc.to_le_bytes())?;
132
133 trace!(lsn, page_id, "appended WAL entry");
134 Ok(lsn)
135 }
136
137 pub fn sync(&mut self) -> MenteResult<()> {
139 self.file.sync_data()?;
140 debug!("WAL synced");
141 Ok(())
142 }
143
144 pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
146 self.read_all_entries()
147 }
148
149 pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
151 let entries = self.read_all_entries()?;
152 let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
153
154 self.file.seek(SeekFrom::Start(0))?;
155 self.file.set_len(0)?;
156
157 for entry in to_keep {
158 let compressed = lz4_flex::compress_prepend_size(&entry.data);
159
160 let payload_len = 8 + 1 + 8 + compressed.len();
161 let mut payload = Vec::with_capacity(payload_len);
162 payload.extend_from_slice(&entry.lsn.to_le_bytes());
163 payload.push(entry.entry_type as u8);
164 payload.extend_from_slice(&entry.page_id.to_le_bytes());
165 payload.extend_from_slice(&compressed);
166
167 let crc = {
168 let mut h = crc32fast::Hasher::new();
169 h.update(&payload);
170 h.finalize()
171 };
172
173 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
174 self.file.write_all(&payload)?;
175 self.file.write_all(&crc.to_le_bytes())?;
176 }
177
178 self.file.sync_data()?;
179 debug!(before_lsn, "WAL truncated");
180 Ok(())
181 }
182
183 pub fn next_lsn(&self) -> Lsn {
185 self.next_lsn
186 }
187
188 fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
191 self.file.seek(SeekFrom::Start(0))?;
192 let file_len = self.file.metadata()?.len();
193 let mut offset: u64 = 0;
194 let mut entries = Vec::new();
195
196 while offset + 4 <= file_len {
197 let mut len_buf = [0u8; 4];
199 if self.file.read_exact(&mut len_buf).is_err() {
200 break;
201 }
202 let payload_len = u32::from_le_bytes(len_buf) as usize;
203 offset += 4;
204
205 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
206 break;
207 }
208
209 let mut payload = vec![0u8; payload_len];
211 if self.file.read_exact(&mut payload).is_err() {
212 break;
213 }
214 offset += payload_len as u64;
215
216 let mut crc_buf = [0u8; 4];
218 if self.file.read_exact(&mut crc_buf).is_err() {
219 break;
220 }
221 let stored_crc = u32::from_le_bytes(crc_buf);
222 offset += 4;
223
224 let computed_crc = {
226 let mut h = crc32fast::Hasher::new();
227 h.update(&payload);
228 h.finalize()
229 };
230 if computed_crc != stored_crc {
231 break; }
233
234 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
236 let entry_type = match WalEntryType::try_from(payload[8]) {
237 Ok(t) => t,
238 Err(_) => break,
239 };
240 let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
241 let compressed_data = &payload[17..];
242
243 let data = lz4_flex::decompress_size_prepended(compressed_data)
244 .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
245
246 entries.push(WalEntry {
247 lsn,
248 entry_type,
249 page_id,
250 data,
251 checksum: stored_crc,
252 });
253 }
254
255 Ok(entries)
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 fn setup() -> (tempfile::TempDir, Wal) {
264 let dir = tempfile::tempdir().unwrap();
265 let wal = Wal::open(dir.path()).unwrap();
266 (dir, wal)
267 }
268
269 #[test]
270 fn test_append_and_iterate() {
271 let (_dir, mut wal) = setup();
272
273 let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
274 let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
275 assert_eq!(lsn1, 1);
276 assert_eq!(lsn2, 2);
277
278 let entries = wal.iterate().unwrap();
279 assert_eq!(entries.len(), 2);
280 assert_eq!(entries[0].lsn, 1);
281 assert_eq!(entries[0].data, b"hello");
282 assert_eq!(entries[1].lsn, 2);
283 assert_eq!(entries[1].data, b"world");
284 }
285
286 #[test]
287 fn test_sync() {
288 let (_dir, mut wal) = setup();
289 wal.append(WalEntryType::Commit, 0, b"").unwrap();
290 wal.sync().unwrap(); }
292
293 #[test]
294 fn test_truncate() {
295 let (_dir, mut wal) = setup();
296
297 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
298 wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
299 wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
300
301 wal.truncate(3).unwrap();
303
304 let entries = wal.iterate().unwrap();
305 assert_eq!(entries.len(), 1);
306 assert_eq!(entries[0].lsn, 3);
307 }
308
309 #[test]
310 fn test_recovery_reopen() {
311 let dir = tempfile::tempdir().unwrap();
312 {
313 let mut wal = Wal::open(dir.path()).unwrap();
314 wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
315 .unwrap();
316 wal.sync().unwrap();
317 }
318 {
319 let mut wal = Wal::open(dir.path()).unwrap();
320 assert_eq!(wal.next_lsn(), 2);
321 let entries = wal.iterate().unwrap();
322 assert_eq!(entries.len(), 1);
323 assert_eq!(entries[0].page_id, 10);
324 assert_eq!(entries[0].data, b"recovery-data");
325 }
326 }
327
328 #[test]
329 fn test_empty_data_entry() {
330 let (_dir, mut wal) = setup();
331 let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
332 let entries = wal.iterate().unwrap();
333 assert_eq!(entries.len(), 1);
334 assert_eq!(entries[0].lsn, lsn);
335 assert!(entries[0].data.is_empty());
336 }
337
338 #[test]
339 fn test_large_data_compression() {
340 let (_dir, mut wal) = setup();
341 let big_data = vec![0xABu8; 8192];
342 wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
343
344 let entries = wal.iterate().unwrap();
345 assert_eq!(entries.len(), 1);
346 assert_eq!(entries[0].data, big_data);
347 }
348}