1use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15
16use mentedb_core::error::{MenteError, MenteResult};
17use tracing::{debug, info, trace};
18
19pub type Lsn = u64;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[repr(u8)]
25pub enum WalEntryType {
26 PageWrite = 1,
27 Commit = 2,
28 Checkpoint = 3,
29}
30
31impl TryFrom<u8> for WalEntryType {
32 type Error = MenteError;
33 fn try_from(v: u8) -> MenteResult<Self> {
34 match v {
35 1 => Ok(Self::PageWrite),
36 2 => Ok(Self::Commit),
37 3 => Ok(Self::Checkpoint),
38 _ => Err(MenteError::Storage(format!("invalid WAL entry type: {v}"))),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct WalEntry {
46 pub lsn: u64,
47 pub entry_type: WalEntryType,
48 pub page_id: u64,
49 pub data: Vec<u8>,
50 pub checksum: u32,
51}
52
53pub struct Wal {
55 file: File,
56 next_lsn: u64,
57}
58
59const MIN_PAYLOAD: usize = 17;
61
62impl Wal {
63 pub fn open(dir_path: &Path) -> MenteResult<Self> {
65 let wal_path = dir_path.join("wal.log");
66 let exists = wal_path.exists()
67 && std::fs::metadata(&wal_path)
68 .map(|m| m.len() > 0)
69 .unwrap_or(false);
70
71 let file = OpenOptions::new()
72 .read(true)
73 .write(true)
74 .create(true)
75 .truncate(false)
76 .open(&wal_path)?;
77
78 let mut wal = Self { file, next_lsn: 1 };
79
80 if exists {
81 let entries = wal.read_all_entries()?;
82 if let Some(last) = entries.last() {
83 wal.next_lsn = last.lsn + 1;
84 }
85 info!(
86 next_lsn = wal.next_lsn,
87 entries = entries.len(),
88 "opened existing WAL"
89 );
90 } else {
91 info!("created new WAL");
92 }
93
94 Ok(wal)
95 }
96
97 pub fn append(
99 &mut self,
100 entry_type: WalEntryType,
101 page_id: u64,
102 data: &[u8],
103 ) -> MenteResult<Lsn> {
104 let lsn = self.next_lsn;
105 self.next_lsn += 1;
106
107 let compressed = lz4_flex::compress_prepend_size(data);
108
109 let payload_len = 8 + 1 + 8 + compressed.len();
111 let mut payload = Vec::with_capacity(payload_len);
112 payload.extend_from_slice(&lsn.to_le_bytes());
113 payload.push(entry_type as u8);
114 payload.extend_from_slice(&page_id.to_le_bytes());
115 payload.extend_from_slice(&compressed);
116
117 let crc = {
118 let mut h = crc32fast::Hasher::new();
119 h.update(&payload);
120 h.finalize()
121 };
122
123 self.file.seek(SeekFrom::End(0))?;
124 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
125 self.file.write_all(&payload)?;
126 self.file.write_all(&crc.to_le_bytes())?;
127
128 trace!(lsn, page_id, "appended WAL entry");
129 Ok(lsn)
130 }
131
132 pub fn sync(&mut self) -> MenteResult<()> {
134 self.file.sync_data()?;
135 debug!("WAL synced");
136 Ok(())
137 }
138
139 pub fn iterate(&mut self) -> MenteResult<Vec<WalEntry>> {
141 self.read_all_entries()
142 }
143
144 pub fn truncate(&mut self, before_lsn: Lsn) -> MenteResult<()> {
146 let entries = self.read_all_entries()?;
147 let to_keep: Vec<&WalEntry> = entries.iter().filter(|e| e.lsn >= before_lsn).collect();
148
149 self.file.seek(SeekFrom::Start(0))?;
150 self.file.set_len(0)?;
151
152 for entry in to_keep {
153 let compressed = lz4_flex::compress_prepend_size(&entry.data);
154
155 let payload_len = 8 + 1 + 8 + compressed.len();
156 let mut payload = Vec::with_capacity(payload_len);
157 payload.extend_from_slice(&entry.lsn.to_le_bytes());
158 payload.push(entry.entry_type as u8);
159 payload.extend_from_slice(&entry.page_id.to_le_bytes());
160 payload.extend_from_slice(&compressed);
161
162 let crc = {
163 let mut h = crc32fast::Hasher::new();
164 h.update(&payload);
165 h.finalize()
166 };
167
168 self.file.write_all(&(payload_len as u32).to_le_bytes())?;
169 self.file.write_all(&payload)?;
170 self.file.write_all(&crc.to_le_bytes())?;
171 }
172
173 self.file.sync_data()?;
174 debug!(before_lsn, "WAL truncated");
175 Ok(())
176 }
177
178 pub fn next_lsn(&self) -> Lsn {
180 self.next_lsn
181 }
182
183 fn read_all_entries(&mut self) -> MenteResult<Vec<WalEntry>> {
186 self.file.seek(SeekFrom::Start(0))?;
187 let file_len = self.file.metadata()?.len();
188 let mut offset: u64 = 0;
189 let mut entries = Vec::new();
190
191 while offset + 4 <= file_len {
192 let mut len_buf = [0u8; 4];
194 if self.file.read_exact(&mut len_buf).is_err() {
195 break;
196 }
197 let payload_len = u32::from_le_bytes(len_buf) as usize;
198 offset += 4;
199
200 if payload_len < MIN_PAYLOAD || offset + payload_len as u64 + 4 > file_len {
201 break;
202 }
203
204 let mut payload = vec![0u8; payload_len];
206 if self.file.read_exact(&mut payload).is_err() {
207 break;
208 }
209 offset += payload_len as u64;
210
211 let mut crc_buf = [0u8; 4];
213 if self.file.read_exact(&mut crc_buf).is_err() {
214 break;
215 }
216 let stored_crc = u32::from_le_bytes(crc_buf);
217 offset += 4;
218
219 let computed_crc = {
221 let mut h = crc32fast::Hasher::new();
222 h.update(&payload);
223 h.finalize()
224 };
225 if computed_crc != stored_crc {
226 break; }
228
229 let lsn = u64::from_le_bytes(payload[0..8].try_into().unwrap());
231 let entry_type = match WalEntryType::try_from(payload[8]) {
232 Ok(t) => t,
233 Err(_) => break,
234 };
235 let page_id = u64::from_le_bytes(payload[9..17].try_into().unwrap());
236 let compressed_data = &payload[17..];
237
238 let data = lz4_flex::decompress_size_prepended(compressed_data)
239 .map_err(|e| MenteError::Storage(format!("LZ4 decompress failed: {e}")))?;
240
241 entries.push(WalEntry {
242 lsn,
243 entry_type,
244 page_id,
245 data,
246 checksum: stored_crc,
247 });
248 }
249
250 Ok(entries)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 fn setup() -> (tempfile::TempDir, Wal) {
259 let dir = tempfile::tempdir().unwrap();
260 let wal = Wal::open(dir.path()).unwrap();
261 (dir, wal)
262 }
263
264 #[test]
265 fn test_append_and_iterate() {
266 let (_dir, mut wal) = setup();
267
268 let lsn1 = wal.append(WalEntryType::PageWrite, 1, b"hello").unwrap();
269 let lsn2 = wal.append(WalEntryType::PageWrite, 2, b"world").unwrap();
270 assert_eq!(lsn1, 1);
271 assert_eq!(lsn2, 2);
272
273 let entries = wal.iterate().unwrap();
274 assert_eq!(entries.len(), 2);
275 assert_eq!(entries[0].lsn, 1);
276 assert_eq!(entries[0].data, b"hello");
277 assert_eq!(entries[1].lsn, 2);
278 assert_eq!(entries[1].data, b"world");
279 }
280
281 #[test]
282 fn test_sync() {
283 let (_dir, mut wal) = setup();
284 wal.append(WalEntryType::Commit, 0, b"").unwrap();
285 wal.sync().unwrap(); }
287
288 #[test]
289 fn test_truncate() {
290 let (_dir, mut wal) = setup();
291
292 wal.append(WalEntryType::PageWrite, 1, b"a").unwrap();
293 wal.append(WalEntryType::PageWrite, 2, b"b").unwrap();
294 wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
295
296 wal.truncate(3).unwrap();
298
299 let entries = wal.iterate().unwrap();
300 assert_eq!(entries.len(), 1);
301 assert_eq!(entries[0].lsn, 3);
302 }
303
304 #[test]
305 fn test_recovery_reopen() {
306 let dir = tempfile::tempdir().unwrap();
307 {
308 let mut wal = Wal::open(dir.path()).unwrap();
309 wal.append(WalEntryType::PageWrite, 10, b"recovery-data")
310 .unwrap();
311 wal.sync().unwrap();
312 }
313 {
314 let mut wal = Wal::open(dir.path()).unwrap();
315 assert_eq!(wal.next_lsn(), 2);
316 let entries = wal.iterate().unwrap();
317 assert_eq!(entries.len(), 1);
318 assert_eq!(entries[0].page_id, 10);
319 assert_eq!(entries[0].data, b"recovery-data");
320 }
321 }
322
323 #[test]
324 fn test_empty_data_entry() {
325 let (_dir, mut wal) = setup();
326 let lsn = wal.append(WalEntryType::Checkpoint, 0, b"").unwrap();
327 let entries = wal.iterate().unwrap();
328 assert_eq!(entries.len(), 1);
329 assert_eq!(entries[0].lsn, lsn);
330 assert!(entries[0].data.is_empty());
331 }
332
333 #[test]
334 fn test_large_data_compression() {
335 let (_dir, mut wal) = setup();
336 let big_data = vec![0xABu8; 8192];
337 wal.append(WalEntryType::PageWrite, 5, &big_data).unwrap();
338
339 let entries = wal.iterate().unwrap();
340 assert_eq!(entries.len(), 1);
341 assert_eq!(entries[0].data, big_data);
342 }
343}