Skip to main content

nodedb_wal/
double_write.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Double-write buffer for torn write protection.
4//!
5//! NVMe drives guarantee atomic 4 KiB sector writes but NOT atomic writes
6//! for larger pages (e.g., 16 KiB). If power fails mid-write on a 16 KiB
7//! page, the WAL page can be partially written (torn).
8//!
9//! CRC32C detects torn writes during replay, but without the double-write
10//! buffer, the record is lost — even though it was acknowledged to the client.
11//!
12//! The double-write buffer solves this:
13//! 1. Before writing to WAL, write the record to the double-write file.
14//! 2. `fsync` the double-write file.
15//! 3. Write to the WAL file.
16//! 4. `fsync` the WAL file.
17//!
18//! On recovery, if a WAL record's CRC fails:
19//! - Check the double-write buffer for an intact copy (verify CRC).
20//! - If found, use the double-write copy to reconstruct the WAL page.
21//! - If not found, the record is truly lost (pre-fsync crash).
22//!
23//! The double-write file is a fixed-size circular buffer. Only the most
24//! recent N records are kept — older ones are overwritten. This is fine
25//! because torn writes can only happen on the most recent write.
26//!
27//! ## O_DIRECT mode
28//!
29//! When the parent WAL uses `O_DIRECT`, the DWB can also be opened with
30//! `O_DIRECT` (`DwbMode::Direct`). This:
31//! - Keeps the page cache free of DWB bytes — the O_DIRECT WAL was
32//!   specifically designed not to warm the cache, and a buffered DWB
33//!   undoes that by writing the exact same payload through the cache.
34//! - Surfaces DWB bytes in block-layer iostat traffic alongside the WAL.
35//!
36//! The on-disk layout is the same in both modes (one aligned header block
37//! followed by fixed-stride slots, all block-aligned) so a DWB written in
38//! one mode can be read in the other.
39
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::os::unix::fs::OpenOptionsExt;
43use std::os::unix::io::AsRawFd;
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46
47use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT, is_aligned};
48use crate::error::{Result, WalError};
49use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
50
51/// Maximum number of records kept in the double-write buffer.
52/// Only the most recent records matter — torn writes affect the tail.
53///
54/// This is a compile-time constant used in slot offset arithmetic. It cannot
55/// be made runtime-configurable without storing capacity in the struct and
56/// adjusting all offset calculations accordingly. The value matches the
57/// `WalTuning::dwb_capacity` default (64).
58const DWB_CAPACITY: usize = 64;
59
60/// Maximum payload bytes per slot (excluding the length prefix and header).
61const DWB_SLOT_PAYLOAD_MAX: usize = 64 * 1024;
62
63/// Raw slot content size: [len:4B][header][payload-up-to-64KiB].
64const DWB_SLOT_RAW: usize = 4 + HEADER_SIZE + DWB_SLOT_PAYLOAD_MAX;
65
66/// Per-slot on-disk stride, padded up to the O_DIRECT block size so every
67/// slot offset is block-aligned. With `DWB_SLOT_RAW = 65570` and the default
68/// 4 KiB alignment this rounds to 69632 bytes per slot.
69const DWB_SLOT_STRIDE: usize = round_up_const(DWB_SLOT_RAW, DEFAULT_ALIGNMENT);
70
71/// On-disk header occupies one aligned block (not the raw 12 bytes) so the
72/// first slot starts at a block-aligned offset. The first 12 bytes of the
73/// block carry the header fields; the remainder is zero-padded.
74const DWB_HEADER_STRIDE: usize = DEFAULT_ALIGNMENT;
75const DWB_HEADER_FIELDS: usize = 12;
76const DWB_MAGIC: u32 = 0x4457_4246; // "DWBF"
77
78/// Global counter: total bytes written to any DWB across the process.
79/// Surfaces the duplicate-write cost of running the DWB alongside an
80/// O_DIRECT WAL.
81static DWB_BYTES_WRITTEN_TOTAL: AtomicU64 = AtomicU64::new(0);
82
83/// Total bytes written to DWB files since process start.
84pub fn wal_dwb_bytes_written_total() -> u64 {
85    DWB_BYTES_WRITTEN_TOTAL.load(Ordering::Relaxed)
86}
87
88/// I/O mode for the double-write buffer file.
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum DwbMode {
91    /// DWB disabled — no torn-write protection. `DoubleWriteBuffer::open`
92    /// returns `None`.
93    Off,
94    /// Buffered I/O (page cache + `fsync`). Default when the parent WAL
95    /// does not use `O_DIRECT`.
96    Buffered,
97    /// `O_DIRECT` I/O via an aligned buffer. The intended companion to an
98    /// `O_DIRECT` WAL: keeps DWB bytes out of the page cache.
99    Direct,
100}
101
102impl DwbMode {
103    /// Choose the DWB mode that mirrors the parent writer's O_DIRECT setting
104    /// when no explicit override is configured. With `O_DIRECT` on, the DWB
105    /// should also be `O_DIRECT`, otherwise it undoes the cache-bypass.
106    pub fn default_for_parent(parent_uses_direct_io: bool) -> Self {
107        if parent_uses_direct_io {
108            Self::Direct
109        } else {
110            Self::Buffered
111        }
112    }
113}
114
115const fn round_up_const(value: usize, align: usize) -> usize {
116    (value + align - 1) & !(align - 1)
117}
118
119/// Slot stride in bytes. Exposed for tests and for callers that want to
120/// size DWB files ahead of time.
121pub const fn slot_stride() -> usize {
122    DWB_SLOT_STRIDE
123}
124
125/// Byte offset of slot `idx` within the DWB file.
126fn slot_offset(idx: u32) -> u64 {
127    DWB_HEADER_STRIDE as u64 + (idx as u64 % DWB_CAPACITY as u64) * DWB_SLOT_STRIDE as u64
128}
129
130/// Double-write buffer file.
131pub struct DoubleWriteBuffer {
132    file: File,
133    path: PathBuf,
134    mode: DwbMode,
135    /// Current write position (circular, wraps at DWB_CAPACITY).
136    write_pos: u32,
137    /// Number of valid records in the buffer.
138    count: u32,
139    /// Whether there are deferred writes that haven't been fsynced.
140    dirty: bool,
141    /// Single-slot aligned staging buffer (Direct mode only). One slot is
142    /// serialized here, then pwrite'd at the slot offset.
143    slot_buf: Option<AlignedBuf>,
144    /// Aligned header block (Direct mode only). Written on `flush()`.
145    header_buf: Option<AlignedBuf>,
146}
147
148impl std::fmt::Debug for DoubleWriteBuffer {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("DoubleWriteBuffer")
151            .field("path", &self.path)
152            .field("mode", &self.mode)
153            .field("write_pos", &self.write_pos)
154            .field("count", &self.count)
155            .finish()
156    }
157}
158
159impl DoubleWriteBuffer {
160    /// Open or create the double-write buffer file in the requested I/O mode.
161    ///
162    /// Returns `None`-wrapped errors for unsupported modes via
163    /// `Err(WalError::…)`; callers that want "off" should not call this at all.
164    pub fn open(path: &Path, mode: DwbMode) -> Result<Self> {
165        if mode == DwbMode::Off {
166            return Err(WalError::DwbOffNotOpenable);
167        }
168
169        let mut opts = OpenOptions::new();
170        opts.read(true).write(true).create(true).truncate(false);
171        if mode == DwbMode::Direct {
172            opts.custom_flags(libc::O_DIRECT);
173        }
174
175        let file = opts.open(path).map_err(|e| {
176            tracing::warn!(path = %path.display(), error = %e, mode = ?mode, "failed to open double-write buffer");
177            WalError::Io(e)
178        })?;
179
180        let (slot_buf, header_buf) = if mode == DwbMode::Direct {
181            (
182                Some(AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?),
183                Some(AlignedBuf::new(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT)?),
184            )
185        } else {
186            (None, None)
187        };
188
189        let mut dwb = Self {
190            file,
191            path: path.to_path_buf(),
192            mode,
193            write_pos: 0,
194            count: 0,
195            dirty: false,
196            slot_buf,
197            header_buf,
198        };
199
200        // Try to read existing header (first DWB_HEADER_FIELDS bytes of block 0).
201        let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
202        if file_len >= DWB_HEADER_STRIDE as u64 {
203            let mut block = vec![0u8; DWB_HEADER_STRIDE];
204            dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
205            if dwb.file.read_exact(&mut block).is_ok() {
206                let mut arr4 = [0u8; 4];
207                arr4.copy_from_slice(&block[0..4]);
208                let magic = u32::from_le_bytes(arr4);
209                if magic == DWB_MAGIC {
210                    arr4.copy_from_slice(&block[4..8]);
211                    dwb.count = u32::from_le_bytes(arr4);
212                    arr4.copy_from_slice(&block[8..12]);
213                    dwb.write_pos = u32::from_le_bytes(arr4);
214                }
215            }
216        }
217
218        Ok(dwb)
219    }
220
221    /// I/O mode this buffer was opened with.
222    pub fn mode(&self) -> DwbMode {
223        self.mode
224    }
225
226    /// Write a WAL record to the double-write buffer before WAL append.
227    ///
228    /// The record is written at the current circular position and the file
229    /// is fsynced immediately. Use `write_record_deferred` + `flush` for
230    /// batch mode (multiple records per fsync).
231    pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
232        self.write_record_deferred(record)?;
233        self.flush()
234    }
235
236    /// Write a WAL record to the DWB without fsyncing.
237    ///
238    /// The data is written to the OS page cache (Buffered mode) or directly
239    /// to the block device (Direct mode) but not guaranteed durable until
240    /// `flush()` is called. Use this in batch mode: write all records in a
241    /// group commit batch, then call `flush()` once — reducing fsync calls
242    /// from N-per-batch to 1-per-batch.
243    pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
244        let total_size = HEADER_SIZE + record.payload.len();
245
246        // Max 64 KiB per slot — larger records skip the double-write buffer
247        // (they're multi-page and need different protection).
248        if total_size > DWB_SLOT_PAYLOAD_MAX {
249            return Ok(()); // Skip oversized records.
250        }
251
252        let header_bytes = record.header.to_bytes();
253        let offset = slot_offset(self.write_pos);
254
255        match self.mode {
256            DwbMode::Off => unreachable!("Off never opens a DoubleWriteBuffer"),
257            DwbMode::Buffered => {
258                self.file
259                    .seek(SeekFrom::Start(offset))
260                    .map_err(WalError::Io)?;
261                self.file
262                    .write_all(&(total_size as u32).to_le_bytes())
263                    .map_err(WalError::Io)?;
264                self.file.write_all(&header_bytes).map_err(WalError::Io)?;
265                self.file.write_all(&record.payload).map_err(WalError::Io)?;
266                DWB_BYTES_WRITTEN_TOTAL.fetch_add(
267                    (4 + header_bytes.len() + record.payload.len()) as u64,
268                    Ordering::Relaxed,
269                );
270            }
271            DwbMode::Direct => {
272                let buf = self
273                    .slot_buf
274                    .as_mut()
275                    .expect("slot_buf present in Direct mode");
276                buf.clear();
277                buf.write(&(total_size as u32).to_le_bytes());
278                buf.write(&header_bytes);
279                buf.write(&record.payload);
280                // Zero the tail so the full aligned slot can be written
281                // without leaking prior contents.
282                zero_tail(buf);
283                let slice = full_capacity_slice(buf);
284                debug_assert_eq!(slice.len(), DWB_SLOT_STRIDE);
285                debug_assert!(is_aligned(offset as usize, DEFAULT_ALIGNMENT));
286                pwrite_all(&self.file, slice, offset)?;
287                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
288            }
289        }
290
291        self.write_pos = self.write_pos.wrapping_add(1);
292        self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
293        self.dirty = true;
294
295        Ok(())
296    }
297
298    /// Flush the DWB header and fsync the file.
299    ///
300    /// Must be called after one or more `write_record_deferred` calls to make
301    /// the records durable. The single fsync covers all deferred writes since
302    /// the last flush — amortizing the cost across the group commit batch.
303    pub fn flush(&mut self) -> Result<()> {
304        if !self.dirty {
305            return Ok(());
306        }
307
308        let mut header = [0u8; DWB_HEADER_FIELDS];
309        header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
310        header[4..8].copy_from_slice(&self.count.to_le_bytes());
311        header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
312
313        match self.mode {
314            DwbMode::Off => unreachable!("invariant: flush() is gated on mode != Off by caller"),
315            DwbMode::Buffered => {
316                self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
317                self.file.write_all(&header).map_err(WalError::Io)?;
318                DWB_BYTES_WRITTEN_TOTAL.fetch_add(header.len() as u64, Ordering::Relaxed);
319            }
320            DwbMode::Direct => {
321                let buf = self
322                    .header_buf
323                    .as_mut()
324                    .expect("header_buf present in Direct mode");
325                buf.clear();
326                buf.write(&header);
327                zero_tail(buf);
328                let slice = full_capacity_slice(buf);
329                debug_assert_eq!(slice.len(), DWB_HEADER_STRIDE);
330                pwrite_all(&self.file, slice, 0)?;
331                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
332            }
333        }
334
335        self.file.sync_all().map_err(WalError::Io)?;
336        self.dirty = false;
337
338        Ok(())
339    }
340
341    /// Path to the double-write buffer file.
342    pub fn path(&self) -> &Path {
343        &self.path
344    }
345
346    /// Try to recover a WAL record by LSN from the double-write buffer.
347    ///
348    /// Scans **all** DWB_CAPACITY slots for a record matching the given LSN
349    /// with valid CRC. We scan every slot rather than relying on `count` or
350    /// `write_pos` because the header itself may be stale or corrupted after
351    /// a crash. Each slot is self-describing: the record's own CRC validates
352    /// whether the slot contains usable data.
353    pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
354        // Under O_DIRECT, reads must also use aligned buffers and aligned
355        // lengths. Read one full aligned slot at a time, then parse.
356        let mut slot = AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?;
357
358        for i in 0..DWB_CAPACITY as u32 {
359            let offset = slot_offset(i);
360            // SAFETY: slot.as_mut_ptr is valid for `capacity()` bytes.
361            let read = unsafe {
362                libc::pread(
363                    self.file.as_raw_fd(),
364                    slot.as_mut_ptr() as *mut libc::c_void,
365                    DWB_SLOT_STRIDE,
366                    offset as libc::off_t,
367                )
368            };
369            if read <= 0 {
370                continue;
371            }
372            // SAFETY: the kernel populated `read` bytes starting at the buffer.
373            let bytes: &[u8] = unsafe { std::slice::from_raw_parts(slot.as_ptr(), read as usize) };
374            if bytes.len() < 4 + HEADER_SIZE {
375                continue;
376            }
377
378            let mut arr4 = [0u8; 4];
379            arr4.copy_from_slice(&bytes[0..4]);
380            let total_size = u32::from_le_bytes(arr4) as usize;
381            if !(HEADER_SIZE..=DWB_SLOT_PAYLOAD_MAX).contains(&total_size)
382                || bytes.len() < 4 + total_size
383            {
384                continue;
385            }
386
387            let mut header_buf = [0u8; HEADER_SIZE];
388            header_buf.copy_from_slice(&bytes[4..4 + HEADER_SIZE]);
389            let header = RecordHeader::from_bytes(&header_buf);
390            if header.magic != WAL_MAGIC || header.lsn != target_lsn {
391                continue;
392            }
393
394            let payload_len = total_size - HEADER_SIZE;
395            let payload = bytes[4 + HEADER_SIZE..4 + HEADER_SIZE + payload_len].to_vec();
396            let record = WalRecord { header, payload };
397            if record.verify_checksum().is_ok() {
398                return Ok(Some(record));
399            }
400        }
401
402        Ok(None)
403    }
404}
405
406/// Fill the unwritten tail of `buf` with zero bytes so an O_DIRECT write of
407/// the entire aligned slot does not leak stale buffer contents to disk.
408fn zero_tail(buf: &mut AlignedBuf) {
409    let written = buf.len();
410    let cap = buf.capacity();
411    if written < cap {
412        // SAFETY: `as_mut_ptr` is valid for `capacity` bytes; we write only
413        // the uninitialized tail between `written..capacity`.
414        unsafe {
415            std::ptr::write_bytes(buf.as_mut_ptr().add(written), 0, cap - written);
416        }
417    }
418}
419
420/// View the entire allocated capacity of `buf` as a byte slice. Requires
421/// that the caller has zeroed any unwritten tail (see `zero_tail`).
422fn full_capacity_slice(buf: &AlignedBuf) -> &[u8] {
423    // SAFETY: AlignedBuf guarantees `as_ptr` points to `capacity()` valid
424    // bytes (alloc_zeroed) for the lifetime of the buffer.
425    unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.capacity()) }
426}
427
428/// `pwrite`-retry helper that handles short writes.
429fn pwrite_all(file: &File, mut data: &[u8], mut offset: u64) -> Result<()> {
430    let fd = file.as_raw_fd();
431    while !data.is_empty() {
432        let n = unsafe {
433            libc::pwrite(
434                fd,
435                data.as_ptr() as *const libc::c_void,
436                data.len(),
437                offset as libc::off_t,
438            )
439        };
440        if n < 0 {
441            return Err(WalError::Io(std::io::Error::last_os_error()));
442        }
443        let n = n as usize;
444        data = &data[n..];
445        offset += n as u64;
446    }
447    Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::record::RecordType;
454
455    fn open_buffered(path: &Path) -> DoubleWriteBuffer {
456        DoubleWriteBuffer::open(path, DwbMode::Buffered).unwrap()
457    }
458
459    #[test]
460    fn write_and_recover() {
461        let dir = tempfile::tempdir().unwrap();
462        let dwb_path = dir.path().join("test.dwb");
463
464        let mut dwb = open_buffered(&dwb_path);
465
466        let record = WalRecord::new(
467            RecordType::Put as u32,
468            42,
469            1,
470            0,
471            b"hello double-write".to_vec(),
472            None,
473            None,
474        )
475        .unwrap();
476
477        dwb.write_record(&record).unwrap();
478
479        // Recover by LSN.
480        let recovered = dwb.recover_record(42).unwrap();
481        assert!(recovered.is_some());
482        let rec = recovered.unwrap();
483        assert_eq!(rec.header.lsn, 42);
484        assert_eq!(rec.payload, b"hello double-write");
485    }
486
487    #[test]
488    fn recover_nonexistent_returns_none() {
489        let dir = tempfile::tempdir().unwrap();
490        let dwb_path = dir.path().join("test2.dwb");
491
492        let mut dwb = open_buffered(&dwb_path);
493        let result = dwb.recover_record(999).unwrap();
494        assert!(result.is_none());
495    }
496
497    #[test]
498    fn survives_reopen() {
499        let dir = tempfile::tempdir().unwrap();
500        let dwb_path = dir.path().join("reopen.dwb");
501
502        {
503            let mut dwb = open_buffered(&dwb_path);
504            let record = WalRecord::new(
505                RecordType::Put as u32,
506                7,
507                1,
508                0,
509                b"durable".to_vec(),
510                None,
511                None,
512            )
513            .unwrap();
514            dwb.write_record(&record).unwrap();
515        }
516
517        let mut dwb = open_buffered(&dwb_path);
518        let recovered = dwb.recover_record(7).unwrap();
519        assert!(recovered.is_some());
520        assert_eq!(recovered.unwrap().payload, b"durable");
521    }
522
523    #[test]
524    fn batch_deferred_writes_and_flush() {
525        let dir = tempfile::tempdir().unwrap();
526        let dwb_path = dir.path().join("batch.dwb");
527
528        let mut dwb = open_buffered(&dwb_path);
529
530        for lsn in 1..=5u64 {
531            let record = WalRecord::new(
532                RecordType::Put as u32,
533                lsn,
534                1,
535                0,
536                format!("batch-{lsn}").into_bytes(),
537                None,
538                None,
539            )
540            .unwrap();
541            dwb.write_record_deferred(&record).unwrap();
542        }
543
544        assert!(dwb.dirty);
545        dwb.flush().unwrap();
546        assert!(!dwb.dirty);
547
548        for lsn in 1..=5u64 {
549            let recovered = dwb.recover_record(lsn).unwrap();
550            assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
551            assert_eq!(
552                recovered.unwrap().payload,
553                format!("batch-{lsn}").into_bytes()
554            );
555        }
556    }
557
558    #[test]
559    fn flush_is_idempotent() {
560        let dir = tempfile::tempdir().unwrap();
561        let dwb_path = dir.path().join("idem.dwb");
562
563        let mut dwb = open_buffered(&dwb_path);
564
565        dwb.flush().unwrap();
566        assert!(!dwb.dirty);
567
568        let record = WalRecord::new(
569            RecordType::Put as u32,
570            1,
571            1,
572            0,
573            b"data".to_vec(),
574            None,
575            None,
576        )
577        .unwrap();
578        dwb.write_record_deferred(&record).unwrap();
579        dwb.flush().unwrap();
580        dwb.flush().unwrap();
581        assert!(!dwb.dirty);
582    }
583
584    #[test]
585    fn slot_stride_is_o_direct_aligned() {
586        // The DWB slot stride must be a multiple of the WAL alignment
587        // (4 KiB) so the file can be opened with O_DIRECT alongside an
588        // O_DIRECT WAL. With a non-aligned stride, every slot after the
589        // first lands at an unaligned offset and the kernel rejects the
590        // write with -EINVAL.
591        assert!(
592            is_aligned(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT),
593            "DWB slot stride {DWB_SLOT_STRIDE} bytes is not a multiple of {DEFAULT_ALIGNMENT}"
594        );
595        assert!(is_aligned(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT));
596        for i in 0..DWB_CAPACITY as u32 {
597            assert!(is_aligned(slot_offset(i) as usize, DEFAULT_ALIGNMENT));
598        }
599    }
600
601    #[test]
602    fn recover_after_wraparound() {
603        let dir = tempfile::tempdir().unwrap();
604        let dwb_path = dir.path().join("wrap.dwb");
605
606        let mut dwb = open_buffered(&dwb_path);
607
608        let total = DWB_CAPACITY as u64 + 5;
609        for lsn in 1..=total {
610            let record = WalRecord::new(
611                RecordType::Put as u32,
612                lsn,
613                1,
614                0,
615                format!("wrap-{lsn}").into_bytes(),
616                None,
617                None,
618            )
619            .unwrap();
620            dwb.write_record_deferred(&record).unwrap();
621        }
622        dwb.flush().unwrap();
623
624        for lsn in (total - 4)..=total {
625            let recovered = dwb.recover_record(lsn).unwrap();
626            assert!(
627                recovered.is_some(),
628                "LSN {lsn} should be recoverable after wrap-around"
629            );
630            assert_eq!(
631                recovered.unwrap().payload,
632                format!("wrap-{lsn}").into_bytes()
633            );
634        }
635
636        for lsn in 1..=5u64 {
637            let recovered = dwb.recover_record(lsn).unwrap();
638            assert!(
639                recovered.is_none(),
640                "LSN {lsn} should have been overwritten by wrap-around"
641            );
642        }
643    }
644
645    #[test]
646    fn bytes_written_counter_increments() {
647        let dir = tempfile::tempdir().unwrap();
648        let dwb_path = dir.path().join("counter.dwb");
649        let before = wal_dwb_bytes_written_total();
650
651        let mut dwb = open_buffered(&dwb_path);
652        let rec = WalRecord::new(
653            RecordType::Put as u32,
654            1,
655            1,
656            0,
657            b"counted".to_vec(),
658            None,
659            None,
660        )
661        .unwrap();
662        dwb.write_record(&rec).unwrap();
663
664        assert!(wal_dwb_bytes_written_total() > before);
665    }
666}