Skip to main content

nodedb_wal/
double_write.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Double-write buffer for torn write protection.
4//!
5//! NVMe drives guarantee atomic 4 KiB sector writes but NOT atomic writes
6//! for larger pages (e.g., 16 KiB). If power fails mid-write on a 16 KiB
7//! page, the WAL page can be partially written (torn).
8//!
9//! CRC32C detects torn writes during replay, but without the double-write
10//! buffer, the record is lost — even though it was acknowledged to the client.
11//!
12//! The double-write buffer solves this:
13//! 1. Before writing to WAL, write the record to the double-write file.
14//! 2. `fsync` the double-write file.
15//! 3. Write to the WAL file.
16//! 4. `fsync` the WAL file.
17//!
18//! On recovery, if a WAL record's CRC fails:
19//! - Check the double-write buffer for an intact copy (verify CRC).
20//! - If found, use the double-write copy to reconstruct the WAL page.
21//! - If not found, the record is truly lost (pre-fsync crash).
22//!
23//! The double-write file is a fixed-size circular buffer. Only the most
24//! recent N records are kept — older ones are overwritten. This is fine
25//! because torn writes can only happen on the most recent write.
26//!
27//! ## O_DIRECT mode
28//!
29//! When the parent WAL uses `O_DIRECT`, the DWB can also be opened with
30//! `O_DIRECT` (`DwbMode::Direct`). This:
31//! - Keeps the page cache free of DWB bytes — the O_DIRECT WAL was
32//!   specifically designed not to warm the cache, and a buffered DWB
33//!   undoes that by writing the exact same payload through the cache.
34//! - Surfaces DWB bytes in block-layer iostat traffic alongside the WAL.
35//!
36//! The on-disk layout is the same in both modes (one aligned header block
37//! followed by fixed-stride slots, all block-aligned) so a DWB written in
38//! one mode can be read in the other.
39
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::os::unix::fs::OpenOptionsExt;
43use std::os::unix::io::AsRawFd;
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46
47use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT, is_aligned};
48use crate::error::{Result, WalError};
49use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
50
51/// Maximum number of records kept in the double-write buffer.
52/// Only the most recent records matter — torn writes affect the tail.
53///
54/// This is a compile-time constant used in slot offset arithmetic. It cannot
55/// be made runtime-configurable without storing capacity in the struct and
56/// adjusting all offset calculations accordingly. The value matches the
57/// `WalTuning::dwb_capacity` default (64).
58const DWB_CAPACITY: usize = 64;
59
60/// Maximum payload bytes per slot (excluding the length prefix and header).
61const DWB_SLOT_PAYLOAD_MAX: usize = 64 * 1024;
62
63/// Raw slot content size: [len:4B][header][payload-up-to-64KiB].
64const DWB_SLOT_RAW: usize = 4 + HEADER_SIZE + DWB_SLOT_PAYLOAD_MAX;
65
66/// Per-slot on-disk stride, padded up to the O_DIRECT block size so every
67/// slot offset is block-aligned. With `DWB_SLOT_RAW = 65570` and the default
68/// 4 KiB alignment this rounds to 69632 bytes per slot.
69const DWB_SLOT_STRIDE: usize = round_up_const(DWB_SLOT_RAW, DEFAULT_ALIGNMENT);
70
71/// On-disk header occupies one aligned block (not the raw 12 bytes) so the
72/// first slot starts at a block-aligned offset. The first 12 bytes of the
73/// block carry the header fields; the remainder is zero-padded.
74const DWB_HEADER_STRIDE: usize = DEFAULT_ALIGNMENT;
75const DWB_HEADER_FIELDS: usize = 12;
76const DWB_MAGIC: u32 = 0x4457_4246; // "DWBF"
77
78/// Global counter: total bytes written to any DWB across the process.
79/// Surfaces the duplicate-write cost of running the DWB alongside an
80/// O_DIRECT WAL.
81static DWB_BYTES_WRITTEN_TOTAL: AtomicU64 = AtomicU64::new(0);
82
83/// Total bytes written to DWB files since process start.
84pub fn wal_dwb_bytes_written_total() -> u64 {
85    DWB_BYTES_WRITTEN_TOTAL.load(Ordering::Relaxed)
86}
87
88/// I/O mode for the double-write buffer file.
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum DwbMode {
91    /// DWB disabled — no torn-write protection. `DoubleWriteBuffer::open`
92    /// returns `None`.
93    Off,
94    /// Buffered I/O (page cache + `fsync`). Default when the parent WAL
95    /// does not use `O_DIRECT`.
96    Buffered,
97    /// `O_DIRECT` I/O via an aligned buffer. The intended companion to an
98    /// `O_DIRECT` WAL: keeps DWB bytes out of the page cache.
99    Direct,
100}
101
102impl DwbMode {
103    /// Choose the DWB mode that mirrors the parent writer's O_DIRECT setting
104    /// when no explicit override is configured. With `O_DIRECT` on, the DWB
105    /// should also be `O_DIRECT`, otherwise it undoes the cache-bypass.
106    pub fn default_for_parent(parent_uses_direct_io: bool) -> Self {
107        if parent_uses_direct_io {
108            Self::Direct
109        } else {
110            Self::Buffered
111        }
112    }
113}
114
115const fn round_up_const(value: usize, align: usize) -> usize {
116    (value + align - 1) & !(align - 1)
117}
118
119/// Slot stride in bytes. Exposed for tests and for callers that want to
120/// size DWB files ahead of time.
121pub const fn slot_stride() -> usize {
122    DWB_SLOT_STRIDE
123}
124
125/// Byte offset of slot `idx` within the DWB file.
126fn slot_offset(idx: u32) -> u64 {
127    DWB_HEADER_STRIDE as u64 + (idx as u64 % DWB_CAPACITY as u64) * DWB_SLOT_STRIDE as u64
128}
129
130/// Double-write buffer file.
131pub struct DoubleWriteBuffer {
132    file: File,
133    path: PathBuf,
134    mode: DwbMode,
135    /// Current write position (circular, wraps at DWB_CAPACITY).
136    write_pos: u32,
137    /// Number of valid records in the buffer.
138    count: u32,
139    /// Whether there are deferred writes that haven't been fsynced.
140    dirty: bool,
141    /// Single-slot aligned staging buffer (Direct mode only). One slot is
142    /// serialized here, then pwrite'd at the slot offset.
143    slot_buf: Option<AlignedBuf>,
144    /// Aligned header block (Direct mode only). Written on `flush()`.
145    header_buf: Option<AlignedBuf>,
146}
147
148impl std::fmt::Debug for DoubleWriteBuffer {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("DoubleWriteBuffer")
151            .field("path", &self.path)
152            .field("mode", &self.mode)
153            .field("write_pos", &self.write_pos)
154            .field("count", &self.count)
155            .finish()
156    }
157}
158
159impl DoubleWriteBuffer {
160    /// Open or create the double-write buffer file in the requested I/O mode.
161    ///
162    /// Returns `None`-wrapped errors for unsupported modes via
163    /// `Err(WalError::…)`; callers that want "off" should not call this at all.
164    pub fn open(path: &Path, mode: DwbMode) -> Result<Self> {
165        if mode == DwbMode::Off {
166            return Err(WalError::DwbOffNotOpenable);
167        }
168
169        let mut opts = OpenOptions::new();
170        opts.read(true).write(true).create(true).truncate(false);
171        if mode == DwbMode::Direct {
172            opts.custom_flags(libc::O_DIRECT);
173        }
174
175        let file = opts.open(path).map_err(|e| {
176            tracing::warn!(path = %path.display(), error = %e, mode = ?mode, "failed to open double-write buffer");
177            WalError::Io(e)
178        })?;
179
180        let (slot_buf, header_buf) = if mode == DwbMode::Direct {
181            (
182                Some(AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?),
183                Some(AlignedBuf::new(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT)?),
184            )
185        } else {
186            (None, None)
187        };
188
189        let mut dwb = Self {
190            file,
191            path: path.to_path_buf(),
192            mode,
193            write_pos: 0,
194            count: 0,
195            dirty: false,
196            slot_buf,
197            header_buf,
198        };
199
200        // Try to read existing header (first DWB_HEADER_FIELDS bytes of block 0).
201        let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
202        if file_len >= DWB_HEADER_STRIDE as u64 {
203            let mut block = vec![0u8; DWB_HEADER_STRIDE];
204            dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
205            if dwb.file.read_exact(&mut block).is_ok() {
206                let mut arr4 = [0u8; 4];
207                arr4.copy_from_slice(&block[0..4]);
208                let magic = u32::from_le_bytes(arr4);
209                if magic == DWB_MAGIC {
210                    arr4.copy_from_slice(&block[4..8]);
211                    dwb.count = u32::from_le_bytes(arr4);
212                    arr4.copy_from_slice(&block[8..12]);
213                    dwb.write_pos = u32::from_le_bytes(arr4);
214                }
215            }
216        }
217
218        Ok(dwb)
219    }
220
221    /// I/O mode this buffer was opened with.
222    pub fn mode(&self) -> DwbMode {
223        self.mode
224    }
225
226    /// Write a WAL record to the double-write buffer before WAL append.
227    ///
228    /// The record is written at the current circular position and the file
229    /// is fsynced immediately. Use `write_record_deferred` + `flush` for
230    /// batch mode (multiple records per fsync).
231    pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
232        self.write_record_deferred(record)?;
233        self.flush()
234    }
235
236    /// Write a WAL record to the DWB without fsyncing.
237    ///
238    /// The data is written to the OS page cache (Buffered mode) or directly
239    /// to the block device (Direct mode) but not guaranteed durable until
240    /// `flush()` is called. Use this in batch mode: write all records in a
241    /// group commit batch, then call `flush()` once — reducing fsync calls
242    /// from N-per-batch to 1-per-batch.
243    pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
244        let total_size = HEADER_SIZE + record.payload.len();
245
246        // Max 64 KiB per slot — larger records skip the double-write buffer
247        // (they're multi-page and need different protection).
248        if total_size > DWB_SLOT_PAYLOAD_MAX {
249            return Ok(()); // Skip oversized records.
250        }
251
252        let header_bytes = record.header.to_bytes();
253        let offset = slot_offset(self.write_pos);
254
255        match self.mode {
256            DwbMode::Off => unreachable!("Off never opens a DoubleWriteBuffer"),
257            DwbMode::Buffered => {
258                self.file
259                    .seek(SeekFrom::Start(offset))
260                    .map_err(WalError::Io)?;
261                self.file
262                    .write_all(&(total_size as u32).to_le_bytes())
263                    .map_err(WalError::Io)?;
264                self.file.write_all(&header_bytes).map_err(WalError::Io)?;
265                self.file.write_all(&record.payload).map_err(WalError::Io)?;
266                DWB_BYTES_WRITTEN_TOTAL.fetch_add(
267                    (4 + header_bytes.len() + record.payload.len()) as u64,
268                    Ordering::Relaxed,
269                );
270            }
271            DwbMode::Direct => {
272                let buf = self
273                    .slot_buf
274                    .as_mut()
275                    .expect("slot_buf present in Direct mode");
276                buf.clear();
277                buf.write(&(total_size as u32).to_le_bytes());
278                buf.write(&header_bytes);
279                buf.write(&record.payload);
280                // Zero the tail so the full aligned slot can be written
281                // without leaking prior contents.
282                zero_tail(buf);
283                let slice = full_capacity_slice(buf);
284                debug_assert_eq!(slice.len(), DWB_SLOT_STRIDE);
285                debug_assert!(is_aligned(offset as usize, DEFAULT_ALIGNMENT));
286                pwrite_all(&self.file, slice, offset)?;
287                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
288            }
289        }
290
291        self.write_pos = self.write_pos.wrapping_add(1);
292        self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
293        self.dirty = true;
294
295        Ok(())
296    }
297
298    /// Flush the DWB header and fsync the file.
299    ///
300    /// Must be called after one or more `write_record_deferred` calls to make
301    /// the records durable. The single fsync covers all deferred writes since
302    /// the last flush — amortizing the cost across the group commit batch.
303    pub fn flush(&mut self) -> Result<()> {
304        if !self.dirty {
305            return Ok(());
306        }
307
308        let mut header = [0u8; DWB_HEADER_FIELDS];
309        header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
310        header[4..8].copy_from_slice(&self.count.to_le_bytes());
311        header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
312
313        match self.mode {
314            DwbMode::Off => unreachable!("invariant: flush() is gated on mode != Off by caller"),
315            DwbMode::Buffered => {
316                self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
317                self.file.write_all(&header).map_err(WalError::Io)?;
318                DWB_BYTES_WRITTEN_TOTAL.fetch_add(header.len() as u64, Ordering::Relaxed);
319            }
320            DwbMode::Direct => {
321                let buf = self
322                    .header_buf
323                    .as_mut()
324                    .expect("header_buf present in Direct mode");
325                buf.clear();
326                buf.write(&header);
327                zero_tail(buf);
328                let slice = full_capacity_slice(buf);
329                debug_assert_eq!(slice.len(), DWB_HEADER_STRIDE);
330                pwrite_all(&self.file, slice, 0)?;
331                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
332            }
333        }
334
335        self.file.sync_all().map_err(WalError::Io)?;
336        self.dirty = false;
337
338        Ok(())
339    }
340
341    /// Path to the double-write buffer file.
342    pub fn path(&self) -> &Path {
343        &self.path
344    }
345
346    /// Try to recover a WAL record by LSN from the double-write buffer.
347    ///
348    /// Scans **all** DWB_CAPACITY slots for a record matching the given LSN
349    /// with valid CRC. We scan every slot rather than relying on `count` or
350    /// `write_pos` because the header itself may be stale or corrupted after
351    /// a crash. Each slot is self-describing: the record's own CRC validates
352    /// whether the slot contains usable data.
353    pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
354        // Under O_DIRECT, reads must also use aligned buffers and aligned
355        // lengths. Read one full aligned slot at a time, then parse.
356        let mut slot = AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?;
357
358        for i in 0..DWB_CAPACITY as u32 {
359            let offset = slot_offset(i);
360            // SAFETY: slot.as_mut_ptr is valid for `capacity()` bytes.
361            let read = unsafe {
362                libc::pread(
363                    self.file.as_raw_fd(),
364                    slot.as_mut_ptr() as *mut libc::c_void,
365                    DWB_SLOT_STRIDE,
366                    offset as libc::off_t,
367                )
368            };
369            if read <= 0 {
370                continue;
371            }
372            // SAFETY: the kernel populated `read` bytes starting at the buffer.
373            let bytes: &[u8] = unsafe { std::slice::from_raw_parts(slot.as_ptr(), read as usize) };
374            if bytes.len() < 4 + HEADER_SIZE {
375                continue;
376            }
377
378            let mut arr4 = [0u8; 4];
379            arr4.copy_from_slice(&bytes[0..4]);
380            let total_size = u32::from_le_bytes(arr4) as usize;
381            if !(HEADER_SIZE..=DWB_SLOT_PAYLOAD_MAX).contains(&total_size)
382                || bytes.len() < 4 + total_size
383            {
384                continue;
385            }
386
387            let mut header_buf = [0u8; HEADER_SIZE];
388            header_buf.copy_from_slice(&bytes[4..4 + HEADER_SIZE]);
389            let header = RecordHeader::from_bytes(&header_buf);
390            if header.magic != WAL_MAGIC || header.lsn != target_lsn {
391                continue;
392            }
393
394            let payload_len = total_size - HEADER_SIZE;
395            let payload = bytes[4 + HEADER_SIZE..4 + HEADER_SIZE + payload_len].to_vec();
396            let record = WalRecord { header, payload };
397            if record.verify_checksum().is_ok() {
398                return Ok(Some(record));
399            }
400        }
401
402        Ok(None)
403    }
404}
405
406/// Fill the unwritten tail of `buf` with zero bytes so an O_DIRECT write of
407/// the entire aligned slot does not leak stale buffer contents to disk.
408fn zero_tail(buf: &mut AlignedBuf) {
409    let written = buf.len();
410    let cap = buf.capacity();
411    if written < cap {
412        // SAFETY: `as_mut_ptr` is valid for `capacity` bytes; we write only
413        // the uninitialized tail between `written..capacity`.
414        unsafe {
415            std::ptr::write_bytes(buf.as_mut_ptr().add(written), 0, cap - written);
416        }
417    }
418}
419
420/// View the entire allocated capacity of `buf` as a byte slice. Requires
421/// that the caller has zeroed any unwritten tail (see `zero_tail`).
422fn full_capacity_slice(buf: &AlignedBuf) -> &[u8] {
423    // SAFETY: AlignedBuf guarantees `as_ptr` points to `capacity()` valid
424    // bytes (alloc_zeroed) for the lifetime of the buffer.
425    unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.capacity()) }
426}
427
428/// `pwrite`-retry helper that handles short writes.
429fn pwrite_all(file: &File, mut data: &[u8], mut offset: u64) -> Result<()> {
430    let fd = file.as_raw_fd();
431    while !data.is_empty() {
432        let n = unsafe {
433            libc::pwrite(
434                fd,
435                data.as_ptr() as *const libc::c_void,
436                data.len(),
437                offset as libc::off_t,
438            )
439        };
440        if n < 0 {
441            return Err(WalError::Io(std::io::Error::last_os_error()));
442        }
443        let n = n as usize;
444        data = &data[n..];
445        offset += n as u64;
446    }
447    Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::record::RecordType;
454
455    fn open_buffered(path: &Path) -> DoubleWriteBuffer {
456        DoubleWriteBuffer::open(path, DwbMode::Buffered).unwrap()
457    }
458
459    #[test]
460    fn write_and_recover() {
461        let dir = tempfile::tempdir().unwrap();
462        let dwb_path = dir.path().join("test.dwb");
463
464        let mut dwb = open_buffered(&dwb_path);
465
466        let record = WalRecord::new(
467            RecordType::Put as u32,
468            42,
469            1,
470            0,
471            0,
472            b"hello double-write".to_vec(),
473            None,
474            None,
475        )
476        .unwrap();
477
478        dwb.write_record(&record).unwrap();
479
480        // Recover by LSN.
481        let recovered = dwb.recover_record(42).unwrap();
482        assert!(recovered.is_some());
483        let rec = recovered.unwrap();
484        assert_eq!(rec.header.lsn, 42);
485        assert_eq!(rec.payload, b"hello double-write");
486    }
487
488    #[test]
489    fn recover_nonexistent_returns_none() {
490        let dir = tempfile::tempdir().unwrap();
491        let dwb_path = dir.path().join("test2.dwb");
492
493        let mut dwb = open_buffered(&dwb_path);
494        let result = dwb.recover_record(999).unwrap();
495        assert!(result.is_none());
496    }
497
498    #[test]
499    fn survives_reopen() {
500        let dir = tempfile::tempdir().unwrap();
501        let dwb_path = dir.path().join("reopen.dwb");
502
503        {
504            let mut dwb = open_buffered(&dwb_path);
505            let record = WalRecord::new(
506                RecordType::Put as u32,
507                7,
508                1,
509                0,
510                0,
511                b"durable".to_vec(),
512                None,
513                None,
514            )
515            .unwrap();
516            dwb.write_record(&record).unwrap();
517        }
518
519        let mut dwb = open_buffered(&dwb_path);
520        let recovered = dwb.recover_record(7).unwrap();
521        assert!(recovered.is_some());
522        assert_eq!(recovered.unwrap().payload, b"durable");
523    }
524
525    #[test]
526    fn batch_deferred_writes_and_flush() {
527        let dir = tempfile::tempdir().unwrap();
528        let dwb_path = dir.path().join("batch.dwb");
529
530        let mut dwb = open_buffered(&dwb_path);
531
532        for lsn in 1..=5u64 {
533            let record = WalRecord::new(
534                RecordType::Put as u32,
535                lsn,
536                1,
537                0,
538                0,
539                format!("batch-{lsn}").into_bytes(),
540                None,
541                None,
542            )
543            .unwrap();
544            dwb.write_record_deferred(&record).unwrap();
545        }
546
547        assert!(dwb.dirty);
548        dwb.flush().unwrap();
549        assert!(!dwb.dirty);
550
551        for lsn in 1..=5u64 {
552            let recovered = dwb.recover_record(lsn).unwrap();
553            assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
554            assert_eq!(
555                recovered.unwrap().payload,
556                format!("batch-{lsn}").into_bytes()
557            );
558        }
559    }
560
561    #[test]
562    fn flush_is_idempotent() {
563        let dir = tempfile::tempdir().unwrap();
564        let dwb_path = dir.path().join("idem.dwb");
565
566        let mut dwb = open_buffered(&dwb_path);
567
568        dwb.flush().unwrap();
569        assert!(!dwb.dirty);
570
571        let record = WalRecord::new(
572            RecordType::Put as u32,
573            1,
574            1,
575            0,
576            0,
577            b"data".to_vec(),
578            None,
579            None,
580        )
581        .unwrap();
582        dwb.write_record_deferred(&record).unwrap();
583        dwb.flush().unwrap();
584        dwb.flush().unwrap();
585        assert!(!dwb.dirty);
586    }
587
588    #[test]
589    fn slot_stride_is_o_direct_aligned() {
590        // The DWB slot stride must be a multiple of the WAL alignment
591        // (4 KiB) so the file can be opened with O_DIRECT alongside an
592        // O_DIRECT WAL. With a non-aligned stride, every slot after the
593        // first lands at an unaligned offset and the kernel rejects the
594        // write with -EINVAL.
595        assert!(
596            is_aligned(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT),
597            "DWB slot stride {DWB_SLOT_STRIDE} bytes is not a multiple of {DEFAULT_ALIGNMENT}"
598        );
599        assert!(is_aligned(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT));
600        for i in 0..DWB_CAPACITY as u32 {
601            assert!(is_aligned(slot_offset(i) as usize, DEFAULT_ALIGNMENT));
602        }
603    }
604
605    #[test]
606    fn recover_after_wraparound() {
607        let dir = tempfile::tempdir().unwrap();
608        let dwb_path = dir.path().join("wrap.dwb");
609
610        let mut dwb = open_buffered(&dwb_path);
611
612        let total = DWB_CAPACITY as u64 + 5;
613        for lsn in 1..=total {
614            let record = WalRecord::new(
615                RecordType::Put as u32,
616                lsn,
617                1,
618                0,
619                0,
620                format!("wrap-{lsn}").into_bytes(),
621                None,
622                None,
623            )
624            .unwrap();
625            dwb.write_record_deferred(&record).unwrap();
626        }
627        dwb.flush().unwrap();
628
629        for lsn in (total - 4)..=total {
630            let recovered = dwb.recover_record(lsn).unwrap();
631            assert!(
632                recovered.is_some(),
633                "LSN {lsn} should be recoverable after wrap-around"
634            );
635            assert_eq!(
636                recovered.unwrap().payload,
637                format!("wrap-{lsn}").into_bytes()
638            );
639        }
640
641        for lsn in 1..=5u64 {
642            let recovered = dwb.recover_record(lsn).unwrap();
643            assert!(
644                recovered.is_none(),
645                "LSN {lsn} should have been overwritten by wrap-around"
646            );
647        }
648    }
649
650    #[test]
651    fn bytes_written_counter_increments() {
652        let dir = tempfile::tempdir().unwrap();
653        let dwb_path = dir.path().join("counter.dwb");
654        let before = wal_dwb_bytes_written_total();
655
656        let mut dwb = open_buffered(&dwb_path);
657        let rec = WalRecord::new(
658            RecordType::Put as u32,
659            1,
660            1,
661            0,
662            0,
663            b"counted".to_vec(),
664            None,
665            None,
666        )
667        .unwrap();
668        dwb.write_record(&rec).unwrap();
669
670        assert!(wal_dwb_bytes_written_total() > before);
671    }
672}