Skip to main content

dbx_core/wal/
mod.rs

1//! Write-Ahead Logging (WAL) module for crash recovery.
2//!
3//! The WAL ensures durability by logging all write operations before applying them
4//! to the database. In case of a crash, the WAL can be replayed to restore the
5//! database to a consistent state.
6//!
7//! # Architecture
8//!
9//! - **WalRecord**: Enum representing different types of log entries
10//! - **WriteAheadLog**: Core WAL implementation with append/sync/replay
11//! - **CheckpointManager**: Manages periodic checkpoints and WAL trimming
12//!
13//! # Example
14//!
15//! ```rust
16//! use dbx_core::wal::{WriteAheadLog, WalRecord};
17//! use std::path::Path;
18//!
19//! # fn main() -> dbx_core::DbxResult<()> {
20//! let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
21//!
22//! // Log an insert operation
23//! let record = WalRecord::Insert {
24//!     table: "users".to_string(),
25//!     key: b"user:1".to_vec(),
26//!     value: b"Alice".to_vec(),
27//!     ts: 0,
28//! };
29//! let seq = wal.append(&record)?;
30//! wal.sync()?;  // Ensure durability
31//! # Ok(())
32//! # }
33//! ```
34
35use crate::error::{DbxError, DbxResult};
36use serde::{Deserialize, Serialize};
37use std::fs::{File, OpenOptions};
38use std::io::{BufReader, Write};
39use std::path::Path;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicU64, Ordering};
42
43pub mod buffer;
44pub mod checkpoint;
45pub mod encrypted_wal;
46pub mod partitioned_wal;
47
48/// WAL record types.
49///
50/// Each record represents a single operation that can be replayed during recovery.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub enum WalRecord {
53    /// Insert operation: table, key, value
54    /// Insert operation: table, key, value, timestamp
55    Insert {
56        table: String,
57        key: Vec<u8>,
58        value: Vec<u8>,
59        ts: u64,
60    },
61
62    /// Delete operation: table, key, timestamp
63    Delete {
64        table: String,
65        key: Vec<u8>,
66        ts: u64,
67    },
68
69    /// Checkpoint marker: sequence number
70    Checkpoint { sequence: u64 },
71
72    /// Transaction commit: transaction ID
73    Commit { tx_id: u64 },
74
75    /// Transaction rollback: transaction ID
76    Rollback { tx_id: u64 },
77
78    /// Batch operation: table, list of (key, value) pairs
79    Batch {
80        table: String,
81        rows: Vec<(Vec<u8>, Vec<u8>)>,
82        ts: u64,
83    },
84}
85
86/// Write-Ahead Log for crash recovery.
87///
88/// All write operations are logged to disk before being applied to the database.
89/// This ensures that the database can be recovered to a consistent state after a crash.
90///
91/// # Thread Safety
92///
93/// `WriteAheadLog` is thread-safe and can be shared across multiple threads using `Arc`.
94pub struct WriteAheadLog {
95    /// Log file handle (protected by mutex for concurrent writes)
96    log_file: Mutex<File>,
97
98    /// Path to the WAL file (for replay)
99    path: std::path::PathBuf,
100
101    /// Monotonically increasing sequence number
102    sequence: AtomicU64,
103}
104
105impl WriteAheadLog {
106    /// Opens or creates a WAL file at the specified path.
107    ///
108    /// If the file exists, it will be opened in append mode.
109    /// The sequence number is initialized to the highest sequence in the existing log.
110    ///
111    /// # Arguments
112    ///
113    /// * `path` - Path to the WAL file
114    ///
115    /// # Example
116    ///
117    /// ```rust
118    /// # use dbx_core::wal::WriteAheadLog;
119    /// # use std::path::Path;
120    /// # fn main() -> dbx_core::DbxResult<()> {
121    /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
122    /// # Ok(())
123    /// # }
124    /// ```
125    pub fn open(path: &Path) -> DbxResult<Self> {
126        let file = OpenOptions::new()
127            .create(true)
128            .read(true)
129            .append(true)
130            .open(path)?;
131
132        // Scan existing log to find the highest sequence number
133        let max_seq = Self::scan_max_sequence(path)?;
134
135        Ok(Self {
136            log_file: Mutex::new(file),
137            path: path.to_path_buf(),
138            sequence: AtomicU64::new(max_seq),
139        })
140    }
141
142    /// Scans the WAL file to find the maximum sequence number.
143    fn scan_max_sequence(path: &Path) -> DbxResult<u64> {
144        let file = match File::open(path) {
145            Ok(f) => f,
146            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
147            Err(e) => return Err(e.into()),
148        };
149
150        let mut reader = BufReader::new(file);
151        let mut max_seq = 0u64;
152
153        while let Ok(len_buf) = {
154            let mut buf = [0u8; 4];
155            std::io::Read::read_exact(&mut reader, &mut buf).map(|_| buf)
156        } {
157            let len = u32::from_le_bytes(len_buf) as usize;
158            let mut data = vec![0u8; len];
159            if std::io::Read::read_exact(&mut reader, &mut data).is_err() {
160                break;
161            }
162            if let Ok(WalRecord::Checkpoint { sequence }) = bincode::deserialize::<WalRecord>(&data)
163            {
164                max_seq = max_seq.max(sequence);
165            }
166        }
167
168        Ok(max_seq)
169    }
170
171    /// Appends a record to the WAL.
172    ///
173    /// Returns the sequence number assigned to this record.
174    /// The record is buffered in memory until `sync()` is called.
175    ///
176    /// # Arguments
177    ///
178    /// * `record` - The WAL record to append
179    ///
180    /// # Returns
181    ///
182    /// The sequence number assigned to this record
183    ///
184    /// # Example
185    ///
186    /// ```rust
187    /// # use dbx_core::wal::{WriteAheadLog, WalRecord};
188    /// # use std::path::Path;
189    /// # fn main() -> dbx_core::DbxResult<()> {
190    /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
191    /// let record = WalRecord::Insert {
192    ///     table: "users".to_string(),
193    ///     key: b"key1".to_vec(),
194    ///     value: b"value1".to_vec(),
195    ///     ts: 0,
196    /// };
197    /// let seq = wal.append(&record)?;
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub fn append(&self, record: &WalRecord) -> DbxResult<u64> {
202        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
203
204        // Serialize record to Binary format
205        let encoded = bincode::serialize(record)
206            .map_err(|e| DbxError::Wal(format!("serialization failed: {}", e)))?;
207
208        // Write to log file (Length-prefixed binary)
209        let mut file = self
210            .log_file
211            .lock()
212            .map_err(|e| DbxError::Wal(format!("lock failed: {}", e)))?;
213
214        let len = (encoded.len() as u32).to_le_bytes();
215        file.write_all(&len)?;
216        file.write_all(&encoded)?;
217
218        Ok(seq)
219    }
220
221    /// Synchronizes the WAL to disk (fsync).
222    ///
223    /// This ensures that all buffered writes are persisted to disk.
224    /// Call this after critical operations to guarantee durability.
225    ///
226    /// # Example
227    ///
228    /// ```rust
229    /// # use dbx_core::wal::{WriteAheadLog, WalRecord};
230    /// # use std::path::Path;
231    /// # fn main() -> dbx_core::DbxResult<()> {
232    /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
233    /// let record = WalRecord::Insert {
234    ///     table: "users".to_string(),
235    ///     key: b"key1".to_vec(),
236    ///     value: b"value1".to_vec(),
237    ///     ts: 0,
238    /// };
239    /// wal.append(&record)?;
240    /// wal.sync()?;  // Ensure durability
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub fn sync(&self) -> DbxResult<()> {
245        let file = self
246            .log_file
247            .lock()
248            .map_err(|e| DbxError::Wal(format!("lock failed: {}", e)))?;
249
250        file.sync_all()?;
251        Ok(())
252    }
253
254    /// Replays all records from the WAL.
255    ///
256    /// Reads the entire WAL file and returns all records in order.
257    /// Used during database recovery to restore the state after a crash.
258    ///
259    /// # Returns
260    ///
261    /// A vector of all WAL records in the order they were written
262    ///
263    /// # Example
264    ///
265    /// ```rust
266    /// # use dbx_core::wal::WriteAheadLog;
267    /// # fn main() -> dbx_core::DbxResult<()> {
268    /// let tmp = tempfile::NamedTempFile::new().unwrap();
269    /// let wal = WriteAheadLog::open(tmp.path())?;
270    /// let records = wal.replay()?;
271    /// for record in records {
272    ///     // Apply record to database
273    /// }
274    /// # Ok(())
275    /// # }
276    /// ```
277    pub fn replay(&self) -> DbxResult<Vec<WalRecord>> {
278        // Open a new file handle for reading from the beginning
279        let file = File::open(&self.path)?;
280        let mut reader = BufReader::new(file);
281        let mut records = Vec::new();
282
283        while let Ok(len_buf) = {
284            let mut buf = [0u8; 4];
285            std::io::Read::read_exact(&mut reader, &mut buf).map(|_| buf)
286        } {
287            let len = u32::from_le_bytes(len_buf) as usize;
288            let mut data = vec![0u8; len];
289            if std::io::Read::read_exact(&mut reader, &mut data).is_err() {
290                break;
291            }
292
293            let record = bincode::deserialize::<WalRecord>(&data)
294                .map_err(|e| DbxError::Wal(format!("deserialization failed: {}", e)))?;
295
296            records.push(record);
297        }
298
299        Ok(records)
300    }
301
302    /// Returns the current sequence number.
303    pub fn current_sequence(&self) -> u64 {
304        self.sequence.load(Ordering::SeqCst)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use tempfile::NamedTempFile;
312
313    #[test]
314    fn wal_append_and_replay() {
315        let temp_file = NamedTempFile::new().unwrap();
316        let wal = WriteAheadLog::open(temp_file.path()).unwrap();
317
318        // Append records
319        let record1 = WalRecord::Insert {
320            table: "users".to_string(),
321            key: b"user:1".to_vec(),
322            value: b"Alice".to_vec(),
323            ts: 1,
324        };
325        let record2 = WalRecord::Delete {
326            table: "users".to_string(),
327            key: b"user:2".to_vec(),
328            ts: 2,
329        };
330
331        let seq1 = wal.append(&record1).unwrap();
332        let seq2 = wal.append(&record2).unwrap();
333        wal.sync().unwrap();
334
335        assert_eq!(seq1, 0);
336        assert_eq!(seq2, 1);
337
338        // Replay
339        let records = wal.replay().unwrap();
340        assert_eq!(records.len(), 2);
341        assert_eq!(records[0], record1);
342        assert_eq!(records[1], record2);
343    }
344
345    #[test]
346    fn wal_sync_durability() {
347        let temp_file = NamedTempFile::new().unwrap();
348        let wal = WriteAheadLog::open(temp_file.path()).unwrap();
349
350        let record = WalRecord::Insert {
351            table: "test".to_string(),
352            key: b"key".to_vec(),
353            value: b"value".to_vec(),
354            ts: 5,
355        };
356
357        wal.append(&record).unwrap();
358        wal.sync().unwrap();
359
360        // Re-open and verify
361        let wal2 = WriteAheadLog::open(temp_file.path()).unwrap();
362        let records = wal2.replay().unwrap();
363        assert_eq!(records.len(), 1);
364        assert_eq!(records[0], record);
365    }
366
367    #[test]
368    fn wal_sequence_increments() {
369        let temp_file = NamedTempFile::new().unwrap();
370        let wal = WriteAheadLog::open(temp_file.path()).unwrap();
371
372        assert_eq!(wal.current_sequence(), 0);
373
374        let record = WalRecord::Commit { tx_id: 1 };
375        wal.append(&record).unwrap();
376        assert_eq!(wal.current_sequence(), 1);
377
378        wal.append(&record).unwrap();
379        assert_eq!(wal.current_sequence(), 2);
380    }
381
382    #[test]
383    fn wal_empty_replay() {
384        let temp_file = NamedTempFile::new().unwrap();
385        let wal = WriteAheadLog::open(temp_file.path()).unwrap();
386
387        let records = wal.replay().unwrap();
388        assert_eq!(records.len(), 0);
389    }
390
391    #[test]
392    fn wal_checkpoint_record() {
393        let temp_file = NamedTempFile::new().unwrap();
394        let wal = WriteAheadLog::open(temp_file.path()).unwrap();
395
396        let checkpoint = WalRecord::Checkpoint { sequence: 42 };
397        wal.append(&checkpoint).unwrap();
398        wal.sync().unwrap();
399
400        let records = wal.replay().unwrap();
401        assert_eq!(records.len(), 1);
402        assert_eq!(records[0], checkpoint);
403    }
404}