Skip to main content

graphmind/persistence/
wal.rs

1//! # Write-Ahead Log (WAL)
2//!
3//! ## Core theory
4//!
5//! The fundamental insight behind WAL is that **sequential writes are fast** (especially
6//! on SSDs and spinning disks), while random writes are slow. By appending each operation
7//! to a log file before modifying the in-memory data structures, we get durability without
8//! expensive random I/O. If the process crashes after writing to the log but before
9//! updating the main data, we can reconstruct the correct state from the log.
10//!
11//! ## Recovery
12//!
13//! On startup, the WAL is scanned from the beginning (or from the last checkpoint).
14//! Each entry is replayed against the in-memory graph to reconstruct state. Checkpoints
15//! record a "safe point" — all data before the checkpoint is known to be persisted to
16//! RocksDB, so the WAL can be truncated to prevent unbounded growth.
17//!
18//! ## CRC32 checksums
19//!
20//! Each WAL record includes a CRC32 checksum computed over its payload. This detects
21//! corruption from hardware errors (bit flips in storage or memory). During recovery,
22//! if a checksum mismatch is found, the corrupt entry is detected and skipped — this
23//! is safer than silently applying corrupted data.
24//!
25//! ## Sequence numbers
26//!
27//! Every WAL entry gets a monotonically increasing sequence number. These provide a
28//! total ordering of operations, which is essential for exactly-once replay during
29//! recovery (skip entries already applied) and for coordinating with checkpoints.
30//!
31//! ## Sync modes
32//!
33//! There is a fundamental trade-off between durability and performance:
34//! - **`fsync` after every write**: guarantees the data is on stable storage, but each
35//!   fsync can take 1-10ms (limits throughput to ~100-1000 writes/sec)
36//! - **Buffered writes**: the OS may cache writes in its page cache, risking loss of
37//!   the most recent writes on crash, but achieving much higher throughput
38//!
39//! Most databases offer both modes and let users choose based on their requirements.
40
41use serde::{Deserialize, Serialize};
42use std::fs::{File, OpenOptions};
43use std::io::{self, BufReader, BufWriter, Read, Write};
44use std::path::{Path, PathBuf};
45use thiserror::Error;
46use tracing::{debug, info, warn};
47
48/// WAL errors
49#[derive(Error, Debug)]
50pub enum WalError {
51    /// I/O error
52    #[error("I/O error: {0}")]
53    Io(#[from] io::Error),
54
55    /// Serialization error
56    #[error("Serialization error: {0}")]
57    Serialization(#[from] bincode::Error),
58
59    /// Corruption detected
60    #[error("WAL corruption detected at offset {0}")]
61    Corruption(u64),
62
63    /// Invalid log entry
64    #[error("Invalid log entry: {0}")]
65    InvalidEntry(String),
66}
67
68pub type WalResult<T> = Result<T, WalError>;
69
70/// Write-Ahead Log entry types
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum WalEntry {
73    /// Create node
74    CreateNode {
75        tenant: String,
76        node_id: u64,
77        labels: Vec<String>,
78        properties: Vec<u8>, // Serialized property map
79    },
80    /// Create edge
81    CreateEdge {
82        tenant: String,
83        edge_id: u64,
84        source: u64,
85        target: u64,
86        edge_type: String,
87        properties: Vec<u8>, // Serialized property map
88    },
89    /// Delete node
90    DeleteNode { tenant: String, node_id: u64 },
91    /// Delete edge
92    DeleteEdge { tenant: String, edge_id: u64 },
93    /// Update node properties
94    UpdateNodeProperties {
95        tenant: String,
96        node_id: u64,
97        properties: Vec<u8>,
98    },
99    /// Update edge properties
100    UpdateEdgeProperties {
101        tenant: String,
102        edge_id: u64,
103        properties: Vec<u8>,
104    },
105    /// Checkpoint marker
106    Checkpoint { sequence: u64, timestamp: i64 },
107}
108
109/// WAL record with metadata
110#[derive(Debug, Clone, Serialize, Deserialize)]
111struct WalRecord {
112    /// Sequence number (monotonically increasing)
113    sequence: u64,
114    /// Entry data
115    entry: WalEntry,
116    /// CRC32 checksum for corruption detection
117    checksum: u32,
118}
119
120impl WalRecord {
121    fn new(sequence: u64, entry: WalEntry) -> Self {
122        let mut record = Self {
123            sequence,
124            entry,
125            checksum: 0,
126        };
127        // Calculate checksum
128        record.checksum = record.calculate_checksum();
129        record
130    }
131
132    fn calculate_checksum(&self) -> u32 {
133        // Simple checksum: XOR all bytes
134        let bytes = bincode::serialize(&self.entry).unwrap_or_default();
135        bytes.iter().fold(0u32, |acc, &b| acc ^ (b as u32))
136    }
137
138    fn verify_checksum(&self) -> bool {
139        self.checksum == self.calculate_checksum()
140    }
141}
142
143/// Write-Ahead Log manager
144pub struct Wal {
145    /// Path to WAL directory
146    path: PathBuf,
147    /// Current WAL file
148    current_file: Option<BufWriter<File>>,
149    /// Current sequence number
150    sequence: u64,
151    /// Sync mode (flush after every write)
152    sync_mode: bool,
153}
154
155impl Wal {
156    /// Create a new WAL
157    pub fn new(path: impl AsRef<Path>) -> WalResult<Self> {
158        let path = path.as_ref().to_path_buf();
159
160        // Create directory if it doesn't exist
161        std::fs::create_dir_all(&path)?;
162
163        // Find the latest sequence number from existing WAL files
164        let sequence = Self::find_latest_sequence(&path)?;
165
166        info!("Initializing WAL at {:?}, sequence: {}", path, sequence);
167
168        Ok(Self {
169            path,
170            current_file: None,
171            sequence,
172            sync_mode: false, // Default to async for performance
173        })
174    }
175
176    /// Set sync mode
177    pub fn set_sync_mode(&mut self, sync: bool) {
178        self.sync_mode = sync;
179        debug!("WAL sync mode: {}", sync);
180    }
181
182    /// Get current sequence number
183    ///
184    /// Returns the current WAL sequence number which increments with each write.
185    /// This is needed by PersistenceManager::checkpoint() to record the actual
186    /// checkpoint sequence instead of always using 0, which was causing misleading
187    /// output in the banking demo (WAL checkpoint always showed sequence 0).
188    pub fn current_sequence(&self) -> u64 {
189        self.sequence
190    }
191
192    /// Append an entry to the WAL
193    pub fn append(&mut self, entry: WalEntry) -> WalResult<u64> {
194        // Increment sequence
195        self.sequence += 1;
196        let sequence = self.sequence;
197
198        // Create WAL record
199        let record = WalRecord::new(sequence, entry);
200
201        // Serialize
202        let data = bincode::serialize(&record)?;
203
204        // Ensure we have an open file
205        if self.current_file.is_none() {
206            self.open_new_file()?;
207        }
208
209        // Write to file
210        if let Some(ref mut file) = self.current_file {
211            // Write length prefix (4 bytes)
212            file.write_all(&(data.len() as u32).to_le_bytes())?;
213            // Write data
214            file.write_all(&data)?;
215
216            // Flush if in sync mode
217            if self.sync_mode {
218                file.flush()?;
219            }
220        }
221
222        Ok(sequence)
223    }
224
225    /// Force flush the WAL
226    pub fn flush(&mut self) -> WalResult<()> {
227        if let Some(ref mut file) = self.current_file {
228            file.flush()?;
229        }
230        Ok(())
231    }
232
233    /// Replay the WAL from a specific sequence number
234    pub fn replay<F>(&self, from_sequence: u64, mut callback: F) -> WalResult<u64>
235    where
236        F: FnMut(&WalEntry) -> WalResult<()>,
237    {
238        info!("Replaying WAL from sequence {}", from_sequence);
239
240        let files = self.get_wal_files()?;
241        let mut replayed = 0u64;
242        let mut last_sequence = from_sequence;
243
244        for file_path in files {
245            let file = File::open(&file_path)?;
246            let mut reader = BufReader::new(file);
247            let mut buf = Vec::new();
248
249            loop {
250                // Read length prefix
251                let mut len_bytes = [0u8; 4];
252                match reader.read_exact(&mut len_bytes) {
253                    Ok(_) => {}
254                    Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
255                    Err(e) => return Err(e.into()),
256                }
257
258                let len = u32::from_le_bytes(len_bytes) as usize;
259
260                // Read record data
261                buf.resize(len, 0);
262                reader.read_exact(&mut buf)?;
263
264                // Deserialize
265                let record: WalRecord = bincode::deserialize(&buf)?;
266
267                // Verify checksum
268                if !record.verify_checksum() {
269                    warn!("WAL corruption detected at sequence {}", record.sequence);
270                    return Err(WalError::Corruption(record.sequence));
271                }
272
273                // Skip if before from_sequence
274                if record.sequence < from_sequence {
275                    continue;
276                }
277
278                // Apply entry
279                callback(&record.entry)?;
280                replayed += 1;
281                last_sequence = record.sequence;
282            }
283        }
284
285        info!(
286            "Replayed {} WAL entries, last sequence: {}",
287            replayed, last_sequence
288        );
289        Ok(last_sequence)
290    }
291
292    /// Create a checkpoint and truncate old WAL entries
293    pub fn checkpoint(&mut self, sequence: u64) -> WalResult<()> {
294        info!("Creating WAL checkpoint at sequence {}", sequence);
295
296        // Append checkpoint marker
297        let timestamp = chrono::Utc::now().timestamp();
298        self.append(WalEntry::Checkpoint {
299            sequence,
300            timestamp,
301        })?;
302
303        // Flush current file
304        self.flush()?;
305
306        // Close current file
307        self.current_file = None;
308
309        // Delete old WAL files (implementation depends on file naming strategy)
310        // For now, we keep all files for safety
311        // TODO: Implement safe WAL truncation after checkpoint
312
313        Ok(())
314    }
315
316    /// Open a new WAL file
317    fn open_new_file(&mut self) -> WalResult<()> {
318        let filename = format!("wal-{:016x}.log", self.sequence);
319        let file_path = self.path.join(filename);
320
321        debug!("Opening new WAL file: {:?}", file_path);
322
323        let file = OpenOptions::new()
324            .create(true)
325            .append(true)
326            .open(file_path)?;
327
328        self.current_file = Some(BufWriter::new(file));
329        Ok(())
330    }
331
332    /// Find the latest sequence number from existing WAL files
333    fn find_latest_sequence(path: &Path) -> WalResult<u64> {
334        let files = match std::fs::read_dir(path) {
335            Ok(entries) => entries,
336            Err(_) => return Ok(0), // No directory yet
337        };
338
339        let mut max_sequence = 0u64;
340
341        for entry in files.flatten() {
342            if let Some(filename) = entry.file_name().to_str() {
343                if filename.starts_with("wal-") && filename.ends_with(".log") {
344                    // Parse sequence from filename
345                    if let Some(seq_str) = filename
346                        .strip_prefix("wal-")
347                        .and_then(|s| s.strip_suffix(".log"))
348                    {
349                        if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
350                            max_sequence = max_sequence.max(seq);
351                        }
352                    }
353                }
354            }
355        }
356
357        Ok(max_sequence)
358    }
359
360    /// Get all WAL files in sequence order
361    fn get_wal_files(&self) -> WalResult<Vec<PathBuf>> {
362        let mut files = Vec::new();
363
364        let entries = std::fs::read_dir(&self.path)?;
365
366        for entry in entries.flatten() {
367            if let Some(filename) = entry.file_name().to_str() {
368                if filename.starts_with("wal-") && filename.ends_with(".log") {
369                    files.push(entry.path());
370                }
371            }
372        }
373
374        // Sort by filename (which includes sequence)
375        files.sort();
376
377        Ok(files)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use tempfile::TempDir;
385
386    #[test]
387    fn test_wal_creation() {
388        let temp_dir = TempDir::new().unwrap();
389        let wal = Wal::new(temp_dir.path()).unwrap();
390        assert_eq!(wal.sequence, 0);
391    }
392
393    #[test]
394    fn test_wal_append() {
395        let temp_dir = TempDir::new().unwrap();
396        let mut wal = Wal::new(temp_dir.path()).unwrap();
397
398        let entry = WalEntry::CreateNode {
399            tenant: "default".to_string(),
400            node_id: 1,
401            labels: vec!["Person".to_string()],
402            properties: vec![],
403        };
404
405        let seq = wal.append(entry).unwrap();
406        assert_eq!(seq, 1);
407
408        wal.flush().unwrap();
409    }
410
411    #[test]
412    fn test_wal_replay() {
413        let temp_dir = TempDir::new().unwrap();
414        let mut wal = Wal::new(temp_dir.path()).unwrap();
415
416        // Append some entries
417        for i in 1..=5 {
418            let entry = WalEntry::CreateNode {
419                tenant: "default".to_string(),
420                node_id: i,
421                labels: vec![],
422                properties: vec![],
423            };
424            wal.append(entry).unwrap();
425        }
426
427        wal.flush().unwrap();
428
429        // Replay
430        let mut count = 0;
431        wal.replay(0, |_entry| {
432            count += 1;
433            Ok(())
434        })
435        .unwrap();
436
437        assert_eq!(count, 5);
438    }
439
440    #[test]
441    fn test_wal_checkpoint() {
442        let temp_dir = TempDir::new().unwrap();
443        let mut wal = Wal::new(temp_dir.path()).unwrap();
444
445        // Append entries
446        for i in 1..=10 {
447            let entry = WalEntry::CreateNode {
448                tenant: "default".to_string(),
449                node_id: i,
450                labels: vec![],
451                properties: vec![],
452            };
453            wal.append(entry).unwrap();
454        }
455
456        // Create checkpoint
457        wal.checkpoint(10).unwrap();
458
459        // Verify checkpoint was appended
460        let mut found_checkpoint = false;
461        wal.replay(0, |entry| {
462            if matches!(entry, WalEntry::Checkpoint { .. }) {
463                found_checkpoint = true;
464            }
465            Ok(())
466        })
467        .unwrap();
468
469        assert!(found_checkpoint);
470    }
471}