1use anyhow::{Context, Result};
13use bytemuck::{bytes_of, Pod, Zeroable};
14use crc32fast::Hasher;
15use parking_lot::Mutex;
16use std::fs::{File, OpenOptions};
17use std::io::{Read, Seek, SeekFrom, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21use crate::storage::data_structures::WalEntry;
22
23const WAL_MAGIC: u32 = 0x57414C47;
25
26const WAL_VERSION: u32 = 2;
28
29#[repr(C)]
31#[derive(Clone, Copy, Debug, Pod, Zeroable)]
32pub(crate) struct WalFileHeader {
33 magic: u32,
34 version: u32,
35 entry_count: u64,
36 last_checkpoint_lsn: u64,
37 _padding: [u8; 32],
38}
39
40#[repr(C)]
42#[derive(Clone, Copy, Debug, Pod, Zeroable)]
43pub(crate) struct WalEntryWithCrc {
44 entry: WalEntry,
45 crc32: u32,
46 _padding: u32,
47}
48
49impl WalEntryWithCrc {
50 fn new(entry: WalEntry) -> Self {
52 let crc32 = Self::compute_crc(&entry);
53 Self {
54 entry,
55 crc32,
56 _padding: 0,
57 }
58 }
59
60 fn compute_crc(entry: &WalEntry) -> u32 {
62 let mut hasher = Hasher::new();
63 hasher.update(bytes_of(entry));
64 hasher.finalize()
65 }
66
67 fn verify_crc(&self) -> bool {
69 Self::compute_crc(&self.entry) == self.crc32
70 }
71}
72
73pub struct Wal {
75 file: Arc<Mutex<File>>,
76 _path: PathBuf,
77 dir_path: PathBuf,
78 entry_count: Arc<Mutex<u64>>,
79 pending_entries: Arc<Mutex<Vec<WalEntryWithCrc>>>,
80 batch_size: usize,
81}
82
83impl Wal {
84 pub fn open<P: AsRef<Path>>(path: P, batch_size: usize) -> Result<Self> {
90 let path_buf = path.as_ref().to_path_buf();
91 let dir_path = path_buf
92 .parent()
93 .ok_or_else(|| anyhow::anyhow!("WAL path has no parent directory"))?
94 .to_path_buf();
95
96 let mut file = OpenOptions::new()
97 .read(true)
98 .write(true)
99 .create(true)
100 .truncate(false)
101 .open(&path_buf)
102 .context("Failed to open WAL file")?;
103
104 let metadata = file.metadata().context("Failed to get WAL file metadata")?;
105
106 let entry_count = if metadata.len() == 0 {
107 Self::write_header(&mut file, 0, 0)?;
109 0
110 } else {
111 let header = Self::read_header(&mut file)?;
112 if header.magic != WAL_MAGIC {
113 anyhow::bail!(
114 "Invalid WAL magic number: expected {:#x}, got {:#x}",
115 WAL_MAGIC,
116 header.magic
117 );
118 }
119 if header.version != WAL_VERSION {
120 anyhow::bail!(
121 "Incompatible WAL version: expected {}, got {}",
122 WAL_VERSION,
123 header.version
124 );
125 }
126 header.entry_count
127 };
128
129 Ok(Self {
130 file: Arc::new(Mutex::new(file)),
131 _path: path_buf,
132 dir_path,
133 entry_count: Arc::new(Mutex::new(entry_count)),
134 pending_entries: Arc::new(Mutex::new(Vec::new())),
135 batch_size,
136 })
137 }
138
139 pub fn append(&self, entry: WalEntry) -> Result<()> {
141 let entry_with_crc = WalEntryWithCrc::new(entry);
142
143 let mut pending = self.pending_entries.lock();
144 pending.push(entry_with_crc);
145
146 if pending.len() >= self.batch_size {
147 drop(pending);
148 self.flush()?;
149 }
150
151 Ok(())
152 }
153
154 pub fn flush(&self) -> Result<()> {
156 let mut pending = self.pending_entries.lock();
157 if pending.is_empty() {
158 return Ok(());
159 }
160
161 let mut file = self.file.lock();
162 let mut count = self.entry_count.lock();
163
164 file.seek(SeekFrom::End(0))
165 .context("Failed to seek to end of WAL")?;
166
167 for entry_with_crc in pending.iter() {
168 file.write_all(bytes_of(entry_with_crc))
169 .context("Failed to write WAL entry")?;
170 }
171
172 file.sync_data().context("Failed to fsync WAL file")?;
173 *count += pending.len() as u64;
174 Self::update_header_entry_count(&mut file, *count)?;
175 Self::fsync_directory(&self.dir_path)?;
176 pending.clear();
177
178 Ok(())
179 }
180
181 pub fn replay(&self) -> Result<Vec<WalEntry>> {
183 let mut file = self.file.lock();
184 let _header = Self::read_header(&mut file)?;
185
186 let entry_size = std::mem::size_of::<WalEntryWithCrc>();
187 let header_size = std::mem::size_of::<WalFileHeader>();
188
189 let mut entries = Vec::new();
190 file.seek(SeekFrom::Start(header_size as u64))
191 .context("Failed to seek past WAL header")?;
192
193 let mut buffer = vec![0u8; entry_size];
194 while let Ok(()) = file.read_exact(&mut buffer) {
195 let entry_with_crc: WalEntryWithCrc = *bytemuck::try_from_bytes(&buffer)
196 .map_err(|e| anyhow::anyhow!("Invalid WAL entry bytes: {}", e))?;
197
198 if !entry_with_crc.verify_crc() {
199 eprintln!("WAL corruption detected: CRC mismatch, stopping replay");
200 break;
201 }
202
203 entries.push(entry_with_crc.entry);
204 }
205
206 Ok(entries)
207 }
208
209 pub fn truncate(&self) -> Result<()> {
211 let mut pending = self.pending_entries.lock();
212 pending.clear();
213
214 let mut count = self.entry_count.lock();
215 *count = 0;
216
217 let mut file = self.file.lock();
218 file.set_len(0)?;
219 file.seek(SeekFrom::Start(0))?;
220 Self::write_header(&mut file, 0, 0)?;
221 file.sync_data()?;
222
223 Ok(())
224 }
225
226 pub fn entry_count(&self) -> u64 {
228 *self.entry_count.lock() + self.pending_entries.lock().len() as u64
229 }
230
231 fn write_header(file: &mut File, entry_count: u64, checkpoint_lsn: u64) -> Result<()> {
232 let header = WalFileHeader {
233 magic: WAL_MAGIC,
234 version: WAL_VERSION,
235 entry_count,
236 last_checkpoint_lsn: checkpoint_lsn,
237 _padding: [0u8; 32],
238 };
239
240 file.seek(SeekFrom::Start(0))?;
241 file.write_all(bytes_of(&header))?;
242 file.flush()?;
243 Ok(())
244 }
245
246 fn read_header(file: &mut File) -> Result<WalFileHeader> {
247 let mut buffer = [0u8; std::mem::size_of::<WalFileHeader>()];
248 file.seek(SeekFrom::Start(0))?;
249 file.read_exact(&mut buffer)?;
250
251 let header: WalFileHeader = *bytemuck::try_from_bytes(&buffer)
252 .map_err(|e| anyhow::anyhow!("Invalid WAL header: {}", e))?;
253
254 Ok(header)
255 }
256
257 fn update_header_entry_count(file: &mut File, entry_count: u64) -> Result<()> {
258 let mut header = Self::read_header(file)?;
259 header.entry_count = entry_count;
260 file.seek(SeekFrom::Start(0))?;
261 file.write_all(bytes_of(&header))?;
262 file.flush()?;
263 Ok(())
264 }
265
266 fn fsync_directory(dir_path: &Path) -> Result<()> {
267 #[cfg(unix)]
268 {
269 use std::os::unix::io::AsRawFd;
270 let dir = File::open(dir_path).context("Failed to open directory for fsync")?;
271 unsafe {
272 if libc::fsync(dir.as_raw_fd()) != 0 {
273 return Err(anyhow::anyhow!(
274 "fsync directory failed: {}",
275 std::io::Error::last_os_error()
276 ));
277 }
278 }
279 }
280 Ok(())
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::storage::data_structures::{WAL_ENTRY_EDGE_CREATE, WAL_ENTRY_NODE_CREATE};
288 use tempfile::tempdir;
289
290 fn make_wal_entry(node_id: u64, entry_type: u8) -> WalEntry {
291 WalEntry {
292 timestamp: 0,
293 node_id,
294 edge_dst: 0,
295 x: 0.0,
296 y: 0.0,
297 z: 0.0,
298 edge_w: 0.0,
299 entry_type,
300 _padding: [0u8; 7],
301 tx_id: 0,
302 lsn: 0,
303 }
304 }
305
306 #[test]
307 fn test_wal_create_and_open() {
308 let temp_dir = tempdir().unwrap();
309 let wal_path = temp_dir.path().join("test.wal");
310
311 let wal = Wal::open(&wal_path, 10);
312 assert!(wal.is_ok());
313 assert!(wal_path.exists());
314
315 let wal2 = Wal::open(&wal_path, 10);
316 assert!(wal2.is_ok());
317 }
318
319 #[test]
320 fn test_wal_append_and_replay() {
321 let temp_dir = tempdir().unwrap();
322 let wal_path = temp_dir.path().join("test.wal");
323
324 let wal = Wal::open(&wal_path, 100).unwrap();
325
326 for i in 0..5 {
327 wal.append(make_wal_entry(i, WAL_ENTRY_NODE_CREATE))
328 .unwrap();
329 }
330
331 wal.flush().unwrap();
332 assert_eq!(wal.entry_count(), 5);
333
334 let entries = wal.replay().unwrap();
335 assert_eq!(entries.len(), 5);
336
337 for (i, entry) in entries.iter().enumerate() {
338 assert_eq!(entry.node_id, i as u64);
339 assert_eq!(entry.entry_type, WAL_ENTRY_NODE_CREATE);
340 }
341 }
342
343 #[test]
344 fn test_wal_auto_flush_on_batch_size() {
345 let temp_dir = tempdir().unwrap();
346 let wal_path = temp_dir.path().join("test.wal");
347
348 let wal = Wal::open(&wal_path, 3).unwrap();
349
350 for i in 0..2 {
351 wal.append(make_wal_entry(i, WAL_ENTRY_NODE_CREATE))
352 .unwrap();
353 }
354 assert_eq!(wal.entry_count(), 2);
355
356 wal.append(make_wal_entry(2, WAL_ENTRY_NODE_CREATE))
357 .unwrap();
358 assert_eq!(wal.entry_count(), 3);
359
360 let entries = wal.replay().unwrap();
361 assert_eq!(entries.len(), 3);
362 }
363
364 #[test]
365 fn test_wal_truncate() {
366 let temp_dir = tempdir().unwrap();
367 let wal_path = temp_dir.path().join("test.wal");
368
369 let wal = Wal::open(&wal_path, 100).unwrap();
370 for i in 0..5 {
371 wal.append(make_wal_entry(i, WAL_ENTRY_EDGE_CREATE))
372 .unwrap();
373 }
374 wal.flush().unwrap();
375
376 wal.truncate().unwrap();
377 assert_eq!(wal.entry_count(), 0);
378
379 let entries = wal.replay().unwrap();
380 assert_eq!(entries.len(), 0);
381 }
382
383 #[test]
384 fn test_wal_crc_corruption() {
385 let temp_dir = tempdir().unwrap();
386 let wal_path = temp_dir.path().join("test.wal");
387
388 let wal = Wal::open(&wal_path, 1).unwrap();
389 wal.append(make_wal_entry(0, WAL_ENTRY_NODE_CREATE))
390 .unwrap();
391 wal.flush().unwrap();
392
393 let _entry_size = std::mem::size_of::<WalEntryWithCrc>();
395 let header_size = std::mem::size_of::<WalFileHeader>();
396 let corrupt_offset = header_size + 4; {
399 let mut file = OpenOptions::new()
400 .read(true)
401 .write(true)
402 .open(&wal_path)
403 .unwrap();
404 use std::io::Seek;
405 file.seek(SeekFrom::Start(corrupt_offset as u64)).unwrap();
406 let mut byte = [0u8; 1];
407 file.read_exact(&mut byte).unwrap();
408 byte[0] = byte[0].wrapping_add(1);
409 file.seek(SeekFrom::Start(corrupt_offset as u64)).unwrap();
410 file.write_all(&byte).unwrap();
411 }
412
413 let wal2 = Wal::open(&wal_path, 100).unwrap();
414 let entries = wal2.replay().unwrap();
415 assert_eq!(entries.len(), 0);
417 }
418}