Skip to main content

geographdb_core/storage/
wal.rs

1//! Write-Ahead Log (WAL) for crash recovery.
2//!
3//! Ported from geometric_db_concept/crates/geographdb/src/storage/wal.rs,
4//! adapted to use geographdb-core's crate path conventions.
5//!
6//! Provides durable, crash-safe transaction logging with:
7//! - CRC32 integrity checking
8//! - Batch fsync for performance
9//! - Replay logic for crash recovery
10//! - Truncation after checkpoint
11
12use anyhow::{Context, Result};
13use bytemuck::{bytes_of, Pod, Zeroable};
14use crc32fast::Hasher;
15use parking_lot::Mutex;
16use std::fs::{File, OpenOptions};
17use std::io::{Read, Seek, SeekFrom, Write};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21use crate::storage::data_structures::WalEntry;
22
23/// WAL magic number: "WALG" in little-endian
24const WAL_MAGIC: u32 = 0x57414C47;
25
26/// WAL version incremented for tx_id/lsn schema
27const WAL_VERSION: u32 = 2;
28
29/// WAL file header
30#[repr(C)]
31#[derive(Clone, Copy, Debug, Pod, Zeroable)]
32pub(crate) struct WalFileHeader {
33    magic: u32,
34    version: u32,
35    entry_count: u64,
36    last_checkpoint_lsn: u64,
37    _padding: [u8; 32],
38}
39
40/// WAL entry with CRC32 checksum for integrity
41#[repr(C)]
42#[derive(Clone, Copy, Debug, Pod, Zeroable)]
43pub(crate) struct WalEntryWithCrc {
44    entry: WalEntry,
45    crc32: u32,
46    _padding: u32,
47}
48
49impl WalEntryWithCrc {
50    /// Create new WAL entry with computed CRC
51    fn new(entry: WalEntry) -> Self {
52        let crc32 = Self::compute_crc(&entry);
53        Self {
54            entry,
55            crc32,
56            _padding: 0,
57        }
58    }
59
60    /// Compute CRC32 checksum for entry
61    fn compute_crc(entry: &WalEntry) -> u32 {
62        let mut hasher = Hasher::new();
63        hasher.update(bytes_of(entry));
64        hasher.finalize()
65    }
66
67    /// Verify CRC32 checksum
68    fn verify_crc(&self) -> bool {
69        Self::compute_crc(&self.entry) == self.crc32
70    }
71}
72
73/// Write-Ahead Log for durable transaction logging
74pub struct Wal {
75    file: Arc<Mutex<File>>,
76    _path: PathBuf,
77    dir_path: PathBuf,
78    entry_count: Arc<Mutex<u64>>,
79    pending_entries: Arc<Mutex<Vec<WalEntryWithCrc>>>,
80    batch_size: usize,
81}
82
83impl Wal {
84    /// Open or create WAL file.
85    ///
86    /// # Arguments
87    /// * `path` - Path to WAL file
88    /// * `batch_size` - Number of entries to batch before fsync
89    pub fn open<P: AsRef<Path>>(path: P, batch_size: usize) -> Result<Self> {
90        let path_buf = path.as_ref().to_path_buf();
91        let dir_path = path_buf
92            .parent()
93            .ok_or_else(|| anyhow::anyhow!("WAL path has no parent directory"))?
94            .to_path_buf();
95
96        let mut file = OpenOptions::new()
97            .read(true)
98            .write(true)
99            .create(true)
100            .truncate(false)
101            .open(&path_buf)
102            .context("Failed to open WAL file")?;
103
104        let metadata = file.metadata().context("Failed to get WAL file metadata")?;
105
106        let entry_count = if metadata.len() == 0 {
107            // New file - write header
108            Self::write_header(&mut file, 0, 0)?;
109            0
110        } else {
111            let header = Self::read_header(&mut file)?;
112            if header.magic != WAL_MAGIC {
113                anyhow::bail!(
114                    "Invalid WAL magic number: expected {:#x}, got {:#x}",
115                    WAL_MAGIC,
116                    header.magic
117                );
118            }
119            if header.version != WAL_VERSION {
120                anyhow::bail!(
121                    "Incompatible WAL version: expected {}, got {}",
122                    WAL_VERSION,
123                    header.version
124                );
125            }
126            header.entry_count
127        };
128
129        Ok(Self {
130            file: Arc::new(Mutex::new(file)),
131            _path: path_buf,
132            dir_path,
133            entry_count: Arc::new(Mutex::new(entry_count)),
134            pending_entries: Arc::new(Mutex::new(Vec::new())),
135            batch_size,
136        })
137    }
138
139    /// Append entry to WAL (buffered).
140    pub fn append(&self, entry: WalEntry) -> Result<()> {
141        let entry_with_crc = WalEntryWithCrc::new(entry);
142
143        let mut pending = self.pending_entries.lock();
144        pending.push(entry_with_crc);
145
146        if pending.len() >= self.batch_size {
147            drop(pending);
148            self.flush()?;
149        }
150
151        Ok(())
152    }
153
154    /// Flush pending entries to disk with fsync.
155    pub fn flush(&self) -> Result<()> {
156        let mut pending = self.pending_entries.lock();
157        if pending.is_empty() {
158            return Ok(());
159        }
160
161        let mut file = self.file.lock();
162        let mut count = self.entry_count.lock();
163
164        file.seek(SeekFrom::End(0))
165            .context("Failed to seek to end of WAL")?;
166
167        for entry_with_crc in pending.iter() {
168            file.write_all(bytes_of(entry_with_crc))
169                .context("Failed to write WAL entry")?;
170        }
171
172        file.sync_data().context("Failed to fsync WAL file")?;
173        *count += pending.len() as u64;
174        Self::update_header_entry_count(&mut file, *count)?;
175        Self::fsync_directory(&self.dir_path)?;
176        pending.clear();
177
178        Ok(())
179    }
180
181    /// Replay WAL entries from disk.
182    pub fn replay(&self) -> Result<Vec<WalEntry>> {
183        let mut file = self.file.lock();
184        let _header = Self::read_header(&mut file)?;
185
186        let entry_size = std::mem::size_of::<WalEntryWithCrc>();
187        let header_size = std::mem::size_of::<WalFileHeader>();
188
189        let mut entries = Vec::new();
190        file.seek(SeekFrom::Start(header_size as u64))
191            .context("Failed to seek past WAL header")?;
192
193        let mut buffer = vec![0u8; entry_size];
194        while let Ok(()) = file.read_exact(&mut buffer) {
195            let entry_with_crc: WalEntryWithCrc = *bytemuck::try_from_bytes(&buffer)
196                .map_err(|e| anyhow::anyhow!("Invalid WAL entry bytes: {}", e))?;
197
198            if !entry_with_crc.verify_crc() {
199                eprintln!("WAL corruption detected: CRC mismatch, stopping replay");
200                break;
201            }
202
203            entries.push(entry_with_crc.entry);
204        }
205
206        Ok(entries)
207    }
208
209    /// Truncate WAL after successful checkpoint.
210    pub fn truncate(&self) -> Result<()> {
211        let mut pending = self.pending_entries.lock();
212        pending.clear();
213
214        let mut count = self.entry_count.lock();
215        *count = 0;
216
217        let mut file = self.file.lock();
218        file.set_len(0)?;
219        file.seek(SeekFrom::Start(0))?;
220        Self::write_header(&mut file, 0, 0)?;
221        file.sync_data()?;
222
223        Ok(())
224    }
225
226    /// Get current total entry count (persisted + pending).
227    pub fn entry_count(&self) -> u64 {
228        *self.entry_count.lock() + self.pending_entries.lock().len() as u64
229    }
230
231    fn write_header(file: &mut File, entry_count: u64, checkpoint_lsn: u64) -> Result<()> {
232        let header = WalFileHeader {
233            magic: WAL_MAGIC,
234            version: WAL_VERSION,
235            entry_count,
236            last_checkpoint_lsn: checkpoint_lsn,
237            _padding: [0u8; 32],
238        };
239
240        file.seek(SeekFrom::Start(0))?;
241        file.write_all(bytes_of(&header))?;
242        file.flush()?;
243        Ok(())
244    }
245
246    fn read_header(file: &mut File) -> Result<WalFileHeader> {
247        let mut buffer = [0u8; std::mem::size_of::<WalFileHeader>()];
248        file.seek(SeekFrom::Start(0))?;
249        file.read_exact(&mut buffer)?;
250
251        let header: WalFileHeader = *bytemuck::try_from_bytes(&buffer)
252            .map_err(|e| anyhow::anyhow!("Invalid WAL header: {}", e))?;
253
254        Ok(header)
255    }
256
257    fn update_header_entry_count(file: &mut File, entry_count: u64) -> Result<()> {
258        let mut header = Self::read_header(file)?;
259        header.entry_count = entry_count;
260        file.seek(SeekFrom::Start(0))?;
261        file.write_all(bytes_of(&header))?;
262        file.flush()?;
263        Ok(())
264    }
265
266    fn fsync_directory(dir_path: &Path) -> Result<()> {
267        #[cfg(unix)]
268        {
269            use std::os::unix::io::AsRawFd;
270            let dir = File::open(dir_path).context("Failed to open directory for fsync")?;
271            unsafe {
272                if libc::fsync(dir.as_raw_fd()) != 0 {
273                    return Err(anyhow::anyhow!(
274                        "fsync directory failed: {}",
275                        std::io::Error::last_os_error()
276                    ));
277                }
278            }
279        }
280        Ok(())
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::storage::data_structures::{WAL_ENTRY_EDGE_CREATE, WAL_ENTRY_NODE_CREATE};
288    use tempfile::tempdir;
289
290    fn make_wal_entry(node_id: u64, entry_type: u8) -> WalEntry {
291        WalEntry {
292            timestamp: 0,
293            node_id,
294            edge_dst: 0,
295            x: 0.0,
296            y: 0.0,
297            z: 0.0,
298            edge_w: 0.0,
299            entry_type,
300            _padding: [0u8; 7],
301            tx_id: 0,
302            lsn: 0,
303        }
304    }
305
306    #[test]
307    fn test_wal_create_and_open() {
308        let temp_dir = tempdir().unwrap();
309        let wal_path = temp_dir.path().join("test.wal");
310
311        let wal = Wal::open(&wal_path, 10);
312        assert!(wal.is_ok());
313        assert!(wal_path.exists());
314
315        let wal2 = Wal::open(&wal_path, 10);
316        assert!(wal2.is_ok());
317    }
318
319    #[test]
320    fn test_wal_append_and_replay() {
321        let temp_dir = tempdir().unwrap();
322        let wal_path = temp_dir.path().join("test.wal");
323
324        let wal = Wal::open(&wal_path, 100).unwrap();
325
326        for i in 0..5 {
327            wal.append(make_wal_entry(i, WAL_ENTRY_NODE_CREATE))
328                .unwrap();
329        }
330
331        wal.flush().unwrap();
332        assert_eq!(wal.entry_count(), 5);
333
334        let entries = wal.replay().unwrap();
335        assert_eq!(entries.len(), 5);
336
337        for (i, entry) in entries.iter().enumerate() {
338            assert_eq!(entry.node_id, i as u64);
339            assert_eq!(entry.entry_type, WAL_ENTRY_NODE_CREATE);
340        }
341    }
342
343    #[test]
344    fn test_wal_auto_flush_on_batch_size() {
345        let temp_dir = tempdir().unwrap();
346        let wal_path = temp_dir.path().join("test.wal");
347
348        let wal = Wal::open(&wal_path, 3).unwrap();
349
350        for i in 0..2 {
351            wal.append(make_wal_entry(i, WAL_ENTRY_NODE_CREATE))
352                .unwrap();
353        }
354        assert_eq!(wal.entry_count(), 2);
355
356        wal.append(make_wal_entry(2, WAL_ENTRY_NODE_CREATE))
357            .unwrap();
358        assert_eq!(wal.entry_count(), 3);
359
360        let entries = wal.replay().unwrap();
361        assert_eq!(entries.len(), 3);
362    }
363
364    #[test]
365    fn test_wal_truncate() {
366        let temp_dir = tempdir().unwrap();
367        let wal_path = temp_dir.path().join("test.wal");
368
369        let wal = Wal::open(&wal_path, 100).unwrap();
370        for i in 0..5 {
371            wal.append(make_wal_entry(i, WAL_ENTRY_EDGE_CREATE))
372                .unwrap();
373        }
374        wal.flush().unwrap();
375
376        wal.truncate().unwrap();
377        assert_eq!(wal.entry_count(), 0);
378
379        let entries = wal.replay().unwrap();
380        assert_eq!(entries.len(), 0);
381    }
382
383    #[test]
384    fn test_wal_crc_corruption() {
385        let temp_dir = tempdir().unwrap();
386        let wal_path = temp_dir.path().join("test.wal");
387
388        let wal = Wal::open(&wal_path, 1).unwrap();
389        wal.append(make_wal_entry(0, WAL_ENTRY_NODE_CREATE))
390            .unwrap();
391        wal.flush().unwrap();
392
393        // Corrupt the 5th byte of the first entry (after header)
394        let _entry_size = std::mem::size_of::<WalEntryWithCrc>();
395        let header_size = std::mem::size_of::<WalFileHeader>();
396        let corrupt_offset = header_size + 4; // within the entry data
397
398        {
399            let mut file = OpenOptions::new()
400                .read(true)
401                .write(true)
402                .open(&wal_path)
403                .unwrap();
404            use std::io::Seek;
405            file.seek(SeekFrom::Start(corrupt_offset as u64)).unwrap();
406            let mut byte = [0u8; 1];
407            file.read_exact(&mut byte).unwrap();
408            byte[0] = byte[0].wrapping_add(1);
409            file.seek(SeekFrom::Start(corrupt_offset as u64)).unwrap();
410            file.write_all(&byte).unwrap();
411        }
412
413        let wal2 = Wal::open(&wal_path, 100).unwrap();
414        let entries = wal2.replay().unwrap();
415        // Should detect corruption and stop before returning the entry
416        assert_eq!(entries.len(), 0);
417    }
418}