Skip to main content

nodedb_wal/
double_write.rs

1//! Double-write buffer for torn write protection.
2//!
3//! NVMe drives guarantee atomic 4 KiB sector writes but NOT atomic writes
4//! for larger pages (e.g., 16 KiB). If power fails mid-write on a 16 KiB
5//! page, the WAL page can be partially written (torn).
6//!
7//! CRC32C detects torn writes during replay, but without the double-write
8//! buffer, the record is lost — even though it was acknowledged to the client.
9//!
10//! The double-write buffer solves this:
11//! 1. Before writing to WAL, write the record to the double-write file.
12//! 2. `fsync` the double-write file.
13//! 3. Write to the WAL file.
14//! 4. `fsync` the WAL file.
15//!
16//! On recovery, if a WAL record's CRC fails:
17//! - Check the double-write buffer for an intact copy (verify CRC).
18//! - If found, use the double-write copy to reconstruct the WAL page.
19//! - If not found, the record is truly lost (pre-fsync crash).
20//!
21//! The double-write file is a fixed-size circular buffer. Only the most
22//! recent N records are kept — older ones are overwritten. This is fine
23//! because torn writes can only happen on the most recent write.
24
25use std::fs::{File, OpenOptions};
26use std::io::{Read, Seek, SeekFrom, Write};
27use std::path::{Path, PathBuf};
28
29use crate::error::{Result, WalError};
30use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
31
32/// Maximum number of records kept in the double-write buffer.
33/// Only the most recent records matter — torn writes affect the tail.
34///
35/// This is a compile-time constant used in slot offset arithmetic. It cannot
36/// be made runtime-configurable without storing capacity in the struct and
37/// adjusting all offset calculations accordingly. The value matches the
38/// `WalTuning::dwb_capacity` default (64).
39const DWB_CAPACITY: usize = 64;
40
41/// On-disk header: [magic: 4B][count: 4B][write_pos: 4B] = 12 bytes.
42const DWB_HEADER_SIZE: usize = 12;
43const DWB_MAGIC: u32 = 0x4457_4246; // "DWBF"
44
45/// Double-write buffer file.
46pub struct DoubleWriteBuffer {
47    file: File,
48    path: PathBuf,
49    /// Current write position (circular, wraps at DWB_CAPACITY).
50    write_pos: u32,
51    /// Number of valid records in the buffer.
52    count: u32,
53    /// Whether there are deferred writes that haven't been fsynced.
54    dirty: bool,
55}
56
57impl std::fmt::Debug for DoubleWriteBuffer {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("DoubleWriteBuffer")
60            .field("path", &self.path)
61            .field("write_pos", &self.write_pos)
62            .field("count", &self.count)
63            .finish()
64    }
65}
66
67impl DoubleWriteBuffer {
68    /// Open or create the double-write buffer file.
69    pub fn open(path: &Path) -> Result<Self> {
70        let file = OpenOptions::new()
71            .read(true)
72            .write(true)
73            .create(true)
74            .truncate(false)
75            .open(path)
76            .map_err(|e| {
77                tracing::warn!(path = %path.display(), error = %e, "failed to open double-write buffer");
78                WalError::Io(e)
79            })?;
80
81        let mut dwb = Self {
82            file,
83            path: path.to_path_buf(),
84            write_pos: 0,
85            count: 0,
86            dirty: false,
87        };
88
89        // Try to read existing header.
90        let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
91        if file_len >= DWB_HEADER_SIZE as u64 {
92            let mut header = [0u8; DWB_HEADER_SIZE];
93            dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
94            if dwb.file.read_exact(&mut header).is_ok() {
95                let mut arr4 = [0u8; 4];
96                arr4.copy_from_slice(&header[0..4]);
97                let magic = u32::from_le_bytes(arr4);
98                if magic == DWB_MAGIC {
99                    arr4.copy_from_slice(&header[4..8]);
100                    dwb.count = u32::from_le_bytes(arr4);
101                    arr4.copy_from_slice(&header[8..12]);
102                    dwb.write_pos = u32::from_le_bytes(arr4);
103                }
104            }
105        }
106
107        Ok(dwb)
108    }
109
110    /// Write a WAL record to the double-write buffer before WAL append.
111    ///
112    /// The record is written at the current circular position and the file
113    /// is fsynced immediately. Use `write_record_deferred` + `flush` for
114    /// batch mode (multiple records per fsync).
115    pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
116        self.write_record_deferred(record)?;
117        self.flush()
118    }
119
120    /// Write a WAL record to the DWB without fsyncing.
121    ///
122    /// The data is written to the OS page cache but not guaranteed durable
123    /// until `flush()` is called. Use this in batch mode: write all records
124    /// in a group commit batch, then call `flush()` once — reducing fsync
125    /// calls from N-per-batch to 1-per-batch.
126    pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
127        let record_bytes = record.header.to_bytes();
128        let total_size = HEADER_SIZE + record.payload.len();
129
130        // Max 64 KiB per slot — larger records skip the double-write buffer
131        // (they're multi-page and need different protection).
132        if total_size > 64 * 1024 {
133            return Ok(()); // Skip oversized records.
134        }
135
136        // Write record at current slot position.
137        // Each slot stores: [total_size: 4B][header: 30B][payload: N bytes]
138        let slot_offset = DWB_HEADER_SIZE as u64
139            + (self.write_pos as u64 % DWB_CAPACITY as u64) * (4 + HEADER_SIZE as u64 + 64 * 1024);
140
141        self.file
142            .seek(SeekFrom::Start(slot_offset))
143            .map_err(WalError::Io)?;
144        self.file
145            .write_all(&(total_size as u32).to_le_bytes())
146            .map_err(WalError::Io)?;
147        self.file.write_all(&record_bytes).map_err(WalError::Io)?;
148        self.file.write_all(&record.payload).map_err(WalError::Io)?;
149
150        // Update position.
151        self.write_pos = self.write_pos.wrapping_add(1);
152        self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
153        self.dirty = true;
154
155        Ok(())
156    }
157
158    /// Flush the DWB header and fsync the file.
159    ///
160    /// Must be called after one or more `write_record_deferred` calls to make
161    /// the records durable. The single fsync covers all deferred writes since
162    /// the last flush — amortizing the cost across the group commit batch.
163    pub fn flush(&mut self) -> Result<()> {
164        if !self.dirty {
165            return Ok(());
166        }
167
168        // Write header atomically as a single write_all to prevent a crash
169        // between partial header writes from corrupting the DWB metadata.
170        let mut header = [0u8; DWB_HEADER_SIZE];
171        header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
172        header[4..8].copy_from_slice(&self.count.to_le_bytes());
173        header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
174
175        self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
176        self.file.write_all(&header).map_err(WalError::Io)?;
177
178        self.file.sync_all().map_err(WalError::Io)?;
179        self.dirty = false;
180
181        Ok(())
182    }
183
184    /// Path to the double-write buffer file.
185    pub fn path(&self) -> &Path {
186        &self.path
187    }
188
189    /// Try to recover a WAL record by LSN from the double-write buffer.
190    ///
191    /// Scans **all** DWB_CAPACITY slots for a record matching the given LSN
192    /// with valid CRC. We scan every slot rather than relying on `count` or
193    /// `write_pos` because the header itself may be stale or corrupted after
194    /// a crash. Each slot is self-describing: the record's own CRC validates
195    /// whether the slot contains usable data.
196    pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
197        let slot_size = 4 + HEADER_SIZE as u64 + 64 * 1024;
198
199        for i in 0..DWB_CAPACITY {
200            let slot_offset = DWB_HEADER_SIZE as u64 + (i as u64) * slot_size;
201
202            self.file
203                .seek(SeekFrom::Start(slot_offset))
204                .map_err(WalError::Io)?;
205
206            let mut size_buf = [0u8; 4];
207            if self.file.read_exact(&mut size_buf).is_err() {
208                continue;
209            }
210            let total_size = u32::from_le_bytes(size_buf) as usize;
211            if !(HEADER_SIZE..=64 * 1024).contains(&total_size) {
212                continue;
213            }
214
215            let mut header_buf = [0u8; HEADER_SIZE];
216            if self.file.read_exact(&mut header_buf).is_err() {
217                continue;
218            }
219            let header = RecordHeader::from_bytes(&header_buf);
220
221            if header.magic != WAL_MAGIC || header.lsn != target_lsn {
222                continue;
223            }
224
225            let payload_len = total_size - HEADER_SIZE;
226            let mut payload = vec![0u8; payload_len];
227            if self.file.read_exact(&mut payload).is_err() {
228                continue;
229            }
230
231            let record = WalRecord { header, payload };
232            if record.verify_checksum().is_ok() {
233                return Ok(Some(record));
234            }
235        }
236
237        Ok(None)
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::record::RecordType;
245
246    #[test]
247    fn write_and_recover() {
248        let dir = tempfile::tempdir().unwrap();
249        let dwb_path = dir.path().join("test.dwb");
250
251        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
252
253        let record = WalRecord::new(
254            RecordType::Put as u16,
255            42,
256            1,
257            0,
258            b"hello double-write".to_vec(),
259            None,
260        )
261        .unwrap();
262
263        dwb.write_record(&record).unwrap();
264
265        // Recover by LSN.
266        let recovered = dwb.recover_record(42).unwrap();
267        assert!(recovered.is_some());
268        let rec = recovered.unwrap();
269        assert_eq!(rec.header.lsn, 42);
270        assert_eq!(rec.payload, b"hello double-write");
271    }
272
273    #[test]
274    fn recover_nonexistent_returns_none() {
275        let dir = tempfile::tempdir().unwrap();
276        let dwb_path = dir.path().join("test2.dwb");
277
278        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
279        let result = dwb.recover_record(999).unwrap();
280        assert!(result.is_none());
281    }
282
283    #[test]
284    fn survives_reopen() {
285        let dir = tempfile::tempdir().unwrap();
286        let dwb_path = dir.path().join("reopen.dwb");
287
288        {
289            let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
290            let record =
291                WalRecord::new(RecordType::Put as u16, 7, 1, 0, b"durable".to_vec(), None).unwrap();
292            dwb.write_record(&record).unwrap();
293        }
294
295        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
296        let recovered = dwb.recover_record(7).unwrap();
297        assert!(recovered.is_some());
298        assert_eq!(recovered.unwrap().payload, b"durable");
299    }
300
301    #[test]
302    fn batch_deferred_writes_and_flush() {
303        let dir = tempfile::tempdir().unwrap();
304        let dwb_path = dir.path().join("batch.dwb");
305
306        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
307
308        // Write multiple records without fsyncing.
309        for lsn in 1..=5u64 {
310            let record = WalRecord::new(
311                RecordType::Put as u16,
312                lsn,
313                1,
314                0,
315                format!("batch-{lsn}").into_bytes(),
316                None,
317            )
318            .unwrap();
319            dwb.write_record_deferred(&record).unwrap();
320        }
321
322        assert!(dwb.dirty);
323
324        // Single flush covers all 5 records.
325        dwb.flush().unwrap();
326        assert!(!dwb.dirty);
327
328        // All records should be recoverable.
329        for lsn in 1..=5u64 {
330            let recovered = dwb.recover_record(lsn).unwrap();
331            assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
332            assert_eq!(
333                recovered.unwrap().payload,
334                format!("batch-{lsn}").into_bytes()
335            );
336        }
337    }
338
339    #[test]
340    fn flush_is_idempotent() {
341        let dir = tempfile::tempdir().unwrap();
342        let dwb_path = dir.path().join("idem.dwb");
343
344        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
345
346        // Flush without any writes — should be a no-op.
347        dwb.flush().unwrap();
348        assert!(!dwb.dirty);
349
350        // Write + flush + flush — second flush is no-op.
351        let record =
352            WalRecord::new(RecordType::Put as u16, 1, 1, 0, b"data".to_vec(), None).unwrap();
353        dwb.write_record_deferred(&record).unwrap();
354        dwb.flush().unwrap();
355        dwb.flush().unwrap(); // Idempotent.
356        assert!(!dwb.dirty);
357    }
358
359    #[test]
360    fn recover_after_wraparound() {
361        let dir = tempfile::tempdir().unwrap();
362        let dwb_path = dir.path().join("wrap.dwb");
363
364        let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
365
366        // Write DWB_CAPACITY + 5 records to force wrap-around.
367        // Records with LSN 1..=DWB_CAPACITY fill all slots, then
368        // LSN DWB_CAPACITY+1..=DWB_CAPACITY+5 overwrite slots 0..4.
369        let total = super::DWB_CAPACITY as u64 + 5;
370        for lsn in 1..=total {
371            let record = WalRecord::new(
372                RecordType::Put as u16,
373                lsn,
374                1,
375                0,
376                format!("wrap-{lsn}").into_bytes(),
377                None,
378            )
379            .unwrap();
380            dwb.write_record_deferred(&record).unwrap();
381        }
382        dwb.flush().unwrap();
383
384        // The most recent records (after wrap) should be recoverable.
385        for lsn in (total - 4)..=total {
386            let recovered = dwb.recover_record(lsn).unwrap();
387            assert!(
388                recovered.is_some(),
389                "LSN {lsn} should be recoverable after wrap-around"
390            );
391            assert_eq!(
392                recovered.unwrap().payload,
393                format!("wrap-{lsn}").into_bytes()
394            );
395        }
396
397        // Old records that were overwritten should NOT be recoverable
398        // (their slots were overwritten by newer records).
399        for lsn in 1..=5u64 {
400            let recovered = dwb.recover_record(lsn).unwrap();
401            assert!(
402                recovered.is_none(),
403                "LSN {lsn} should have been overwritten by wrap-around"
404            );
405        }
406    }
407}