dbx_core/wal/mod.rs
1//! Write-Ahead Logging (WAL) module for crash recovery.
2//!
3//! The WAL ensures durability by logging all write operations before applying them
4//! to the database. In case of a crash, the WAL can be replayed to restore the
5//! database to a consistent state.
6//!
7//! # Architecture
8//!
9//! - **WalRecord**: Enum representing different types of log entries
10//! - **WriteAheadLog**: Core WAL implementation with append/sync/replay
11//! - **CheckpointManager**: Manages periodic checkpoints and WAL trimming
12//!
13//! # Example
14//!
15//! ```rust
16//! use dbx_core::wal::{WriteAheadLog, WalRecord};
17//! use std::path::Path;
18//!
19//! # fn main() -> dbx_core::DbxResult<()> {
20//! let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
21//!
22//! // Log an insert operation
23//! let record = WalRecord::Insert {
24//! table: "users".to_string(),
25//! key: b"user:1".to_vec(),
26//! value: b"Alice".to_vec(),
27//! ts: 0,
28//! };
29//! let seq = wal.append(&record)?;
30//! wal.sync()?; // Ensure durability
31//! # Ok(())
32//! # }
33//! ```
34
35use crate::error::{DbxError, DbxResult};
36use serde::{Deserialize, Serialize};
37use std::fs::{File, OpenOptions};
38use std::io::{BufReader, Write};
39use std::path::Path;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicU64, Ordering};
42
43pub mod buffer;
44pub mod checkpoint;
45pub mod encrypted_wal;
46pub mod partitioned_wal;
47
48/// WAL record types.
49///
50/// Each record represents a single operation that can be replayed during recovery.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub enum WalRecord {
53 /// Insert operation: table, key, value
54 /// Insert operation: table, key, value, timestamp
55 Insert {
56 table: String,
57 key: Vec<u8>,
58 value: Vec<u8>,
59 ts: u64,
60 },
61
62 /// Delete operation: table, key, timestamp
63 Delete {
64 table: String,
65 key: Vec<u8>,
66 ts: u64,
67 },
68
69 /// Checkpoint marker: sequence number
70 Checkpoint { sequence: u64 },
71
72 /// Transaction commit: transaction ID
73 Commit { tx_id: u64 },
74
75 /// Transaction rollback: transaction ID
76 Rollback { tx_id: u64 },
77
78 /// Batch operation: table, list of (key, value) pairs
79 Batch {
80 table: String,
81 rows: Vec<(Vec<u8>, Vec<u8>)>,
82 ts: u64,
83 },
84}
85
86/// Write-Ahead Log for crash recovery.
87///
88/// All write operations are logged to disk before being applied to the database.
89/// This ensures that the database can be recovered to a consistent state after a crash.
90///
91/// # Thread Safety
92///
93/// `WriteAheadLog` is thread-safe and can be shared across multiple threads using `Arc`.
94pub struct WriteAheadLog {
95 /// Log file handle (protected by mutex for concurrent writes)
96 log_file: Mutex<File>,
97
98 /// Path to the WAL file (for replay)
99 path: std::path::PathBuf,
100
101 /// Monotonically increasing sequence number
102 sequence: AtomicU64,
103}
104
105impl WriteAheadLog {
106 /// Opens or creates a WAL file at the specified path.
107 ///
108 /// If the file exists, it will be opened in append mode.
109 /// The sequence number is initialized to the highest sequence in the existing log.
110 ///
111 /// # Arguments
112 ///
113 /// * `path` - Path to the WAL file
114 ///
115 /// # Example
116 ///
117 /// ```rust
118 /// # use dbx_core::wal::WriteAheadLog;
119 /// # use std::path::Path;
120 /// # fn main() -> dbx_core::DbxResult<()> {
121 /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
122 /// # Ok(())
123 /// # }
124 /// ```
125 pub fn open(path: &Path) -> DbxResult<Self> {
126 let file = OpenOptions::new()
127 .create(true)
128 .read(true)
129 .append(true)
130 .open(path)?;
131
132 // Scan existing log to find the highest sequence number
133 let max_seq = Self::scan_max_sequence(path)?;
134
135 Ok(Self {
136 log_file: Mutex::new(file),
137 path: path.to_path_buf(),
138 sequence: AtomicU64::new(max_seq),
139 })
140 }
141
142 /// Scans the WAL file to find the maximum sequence number.
143 fn scan_max_sequence(path: &Path) -> DbxResult<u64> {
144 let file = match File::open(path) {
145 Ok(f) => f,
146 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
147 Err(e) => return Err(e.into()),
148 };
149
150 let mut reader = BufReader::new(file);
151 let mut max_seq = 0u64;
152
153 while let Ok(len_buf) = {
154 let mut buf = [0u8; 4];
155 std::io::Read::read_exact(&mut reader, &mut buf).map(|_| buf)
156 } {
157 let len = u32::from_le_bytes(len_buf) as usize;
158 let mut data = vec![0u8; len];
159 if std::io::Read::read_exact(&mut reader, &mut data).is_err() {
160 break;
161 }
162 if let Ok(WalRecord::Checkpoint { sequence }) = bincode::deserialize::<WalRecord>(&data)
163 {
164 max_seq = max_seq.max(sequence);
165 }
166 }
167
168 Ok(max_seq)
169 }
170
171 /// Appends a record to the WAL.
172 ///
173 /// Returns the sequence number assigned to this record.
174 /// The record is buffered in memory until `sync()` is called.
175 ///
176 /// # Arguments
177 ///
178 /// * `record` - The WAL record to append
179 ///
180 /// # Returns
181 ///
182 /// The sequence number assigned to this record
183 ///
184 /// # Example
185 ///
186 /// ```rust
187 /// # use dbx_core::wal::{WriteAheadLog, WalRecord};
188 /// # use std::path::Path;
189 /// # fn main() -> dbx_core::DbxResult<()> {
190 /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
191 /// let record = WalRecord::Insert {
192 /// table: "users".to_string(),
193 /// key: b"key1".to_vec(),
194 /// value: b"value1".to_vec(),
195 /// ts: 0,
196 /// };
197 /// let seq = wal.append(&record)?;
198 /// # Ok(())
199 /// # }
200 /// ```
201 pub fn append(&self, record: &WalRecord) -> DbxResult<u64> {
202 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
203
204 // Serialize record to Binary format
205 let encoded = bincode::serialize(record)
206 .map_err(|e| DbxError::Wal(format!("serialization failed: {}", e)))?;
207
208 // Write to log file (Length-prefixed binary)
209 let mut file = self
210 .log_file
211 .lock()
212 .map_err(|e| DbxError::Wal(format!("lock failed: {}", e)))?;
213
214 let len = (encoded.len() as u32).to_le_bytes();
215 file.write_all(&len)?;
216 file.write_all(&encoded)?;
217
218 Ok(seq)
219 }
220
221 /// Synchronizes the WAL to disk (fsync).
222 ///
223 /// This ensures that all buffered writes are persisted to disk.
224 /// Call this after critical operations to guarantee durability.
225 ///
226 /// # Example
227 ///
228 /// ```rust
229 /// # use dbx_core::wal::{WriteAheadLog, WalRecord};
230 /// # use std::path::Path;
231 /// # fn main() -> dbx_core::DbxResult<()> {
232 /// let wal = WriteAheadLog::open(Path::new("./wal.log"))?;
233 /// let record = WalRecord::Insert {
234 /// table: "users".to_string(),
235 /// key: b"key1".to_vec(),
236 /// value: b"value1".to_vec(),
237 /// ts: 0,
238 /// };
239 /// wal.append(&record)?;
240 /// wal.sync()?; // Ensure durability
241 /// # Ok(())
242 /// # }
243 /// ```
244 pub fn sync(&self) -> DbxResult<()> {
245 let file = self
246 .log_file
247 .lock()
248 .map_err(|e| DbxError::Wal(format!("lock failed: {}", e)))?;
249
250 file.sync_all()?;
251 Ok(())
252 }
253
254 /// Replays all records from the WAL.
255 ///
256 /// Reads the entire WAL file and returns all records in order.
257 /// Used during database recovery to restore the state after a crash.
258 ///
259 /// # Returns
260 ///
261 /// A vector of all WAL records in the order they were written
262 ///
263 /// # Example
264 ///
265 /// ```rust
266 /// # use dbx_core::wal::WriteAheadLog;
267 /// # fn main() -> dbx_core::DbxResult<()> {
268 /// let tmp = tempfile::NamedTempFile::new().unwrap();
269 /// let wal = WriteAheadLog::open(tmp.path())?;
270 /// let records = wal.replay()?;
271 /// for record in records {
272 /// // Apply record to database
273 /// }
274 /// # Ok(())
275 /// # }
276 /// ```
277 pub fn replay(&self) -> DbxResult<Vec<WalRecord>> {
278 // Open a new file handle for reading from the beginning
279 let file = File::open(&self.path)?;
280 let mut reader = BufReader::new(file);
281 let mut records = Vec::new();
282
283 while let Ok(len_buf) = {
284 let mut buf = [0u8; 4];
285 std::io::Read::read_exact(&mut reader, &mut buf).map(|_| buf)
286 } {
287 let len = u32::from_le_bytes(len_buf) as usize;
288 let mut data = vec![0u8; len];
289 if std::io::Read::read_exact(&mut reader, &mut data).is_err() {
290 break;
291 }
292
293 let record = bincode::deserialize::<WalRecord>(&data)
294 .map_err(|e| DbxError::Wal(format!("deserialization failed: {}", e)))?;
295
296 records.push(record);
297 }
298
299 Ok(records)
300 }
301
302 /// Returns the current sequence number.
303 pub fn current_sequence(&self) -> u64 {
304 self.sequence.load(Ordering::SeqCst)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use tempfile::NamedTempFile;
312
313 #[test]
314 fn wal_append_and_replay() {
315 let temp_file = NamedTempFile::new().unwrap();
316 let wal = WriteAheadLog::open(temp_file.path()).unwrap();
317
318 // Append records
319 let record1 = WalRecord::Insert {
320 table: "users".to_string(),
321 key: b"user:1".to_vec(),
322 value: b"Alice".to_vec(),
323 ts: 1,
324 };
325 let record2 = WalRecord::Delete {
326 table: "users".to_string(),
327 key: b"user:2".to_vec(),
328 ts: 2,
329 };
330
331 let seq1 = wal.append(&record1).unwrap();
332 let seq2 = wal.append(&record2).unwrap();
333 wal.sync().unwrap();
334
335 assert_eq!(seq1, 0);
336 assert_eq!(seq2, 1);
337
338 // Replay
339 let records = wal.replay().unwrap();
340 assert_eq!(records.len(), 2);
341 assert_eq!(records[0], record1);
342 assert_eq!(records[1], record2);
343 }
344
345 #[test]
346 fn wal_sync_durability() {
347 let temp_file = NamedTempFile::new().unwrap();
348 let wal = WriteAheadLog::open(temp_file.path()).unwrap();
349
350 let record = WalRecord::Insert {
351 table: "test".to_string(),
352 key: b"key".to_vec(),
353 value: b"value".to_vec(),
354 ts: 5,
355 };
356
357 wal.append(&record).unwrap();
358 wal.sync().unwrap();
359
360 // Re-open and verify
361 let wal2 = WriteAheadLog::open(temp_file.path()).unwrap();
362 let records = wal2.replay().unwrap();
363 assert_eq!(records.len(), 1);
364 assert_eq!(records[0], record);
365 }
366
367 #[test]
368 fn wal_sequence_increments() {
369 let temp_file = NamedTempFile::new().unwrap();
370 let wal = WriteAheadLog::open(temp_file.path()).unwrap();
371
372 assert_eq!(wal.current_sequence(), 0);
373
374 let record = WalRecord::Commit { tx_id: 1 };
375 wal.append(&record).unwrap();
376 assert_eq!(wal.current_sequence(), 1);
377
378 wal.append(&record).unwrap();
379 assert_eq!(wal.current_sequence(), 2);
380 }
381
382 #[test]
383 fn wal_empty_replay() {
384 let temp_file = NamedTempFile::new().unwrap();
385 let wal = WriteAheadLog::open(temp_file.path()).unwrap();
386
387 let records = wal.replay().unwrap();
388 assert_eq!(records.len(), 0);
389 }
390
391 #[test]
392 fn wal_checkpoint_record() {
393 let temp_file = NamedTempFile::new().unwrap();
394 let wal = WriteAheadLog::open(temp_file.path()).unwrap();
395
396 let checkpoint = WalRecord::Checkpoint { sequence: 42 };
397 wal.append(&checkpoint).unwrap();
398 wal.sync().unwrap();
399
400 let records = wal.replay().unwrap();
401 assert_eq!(records.len(), 1);
402 assert_eq!(records[0], checkpoint);
403 }
404}