Skip to main content

nodedb_wal/
double_write.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Double-write buffer for torn write protection.
4//!
5//! NVMe drives guarantee atomic 4 KiB sector writes but NOT atomic writes
6//! for larger pages (e.g., 16 KiB). If power fails mid-write on a 16 KiB
7//! page, the WAL page can be partially written (torn).
8//!
9//! CRC32C detects torn writes during replay, but without the double-write
10//! buffer, the record is lost — even though it was acknowledged to the client.
11//!
12//! The double-write buffer solves this:
13//! 1. Before writing to WAL, write the record to the double-write file.
14//! 2. `fsync` the double-write file.
15//! 3. Write to the WAL file.
16//! 4. `fsync` the WAL file.
17//!
18//! On recovery, if a WAL record's CRC fails:
19//! - Check the double-write buffer for an intact copy (verify CRC).
20//! - If found, use the double-write copy to reconstruct the WAL page.
21//! - If not found, the record is truly lost (pre-fsync crash).
22//!
23//! The double-write file is a fixed-size circular buffer. Only the most
24//! recent N records are kept — older ones are overwritten. This is fine
25//! because torn writes can only happen on the most recent write.
26//!
27//! ## O_DIRECT mode
28//!
29//! When the parent WAL uses `O_DIRECT`, the DWB can also be opened with
30//! `O_DIRECT` (`DwbMode::Direct`). This:
31//! - Keeps the page cache free of DWB bytes — the O_DIRECT WAL was
32//!   specifically designed not to warm the cache, and a buffered DWB
33//!   undoes that by writing the exact same payload through the cache.
34//! - Surfaces DWB bytes in block-layer iostat traffic alongside the WAL.
35//!
36//! The on-disk layout is the same in both modes (one aligned header block
37//! followed by fixed-stride slots, all block-aligned) so a DWB written in
38//! one mode can be read in the other.
39
40use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU64, Ordering};
44
45#[cfg(not(target_arch = "wasm32"))]
46use std::os::unix::fs::OpenOptionsExt as _;
47
48use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT, is_aligned};
49use crate::error::{Result, WalError};
50use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
51
52/// Maximum number of records kept in the double-write buffer.
53/// Only the most recent records matter — torn writes affect the tail.
54///
55/// This is a compile-time constant used in slot offset arithmetic. It cannot
56/// be made runtime-configurable without storing capacity in the struct and
57/// adjusting all offset calculations accordingly. The value matches the
58/// `WalTuning::dwb_capacity` default (64).
59const DWB_CAPACITY: usize = 64;
60
61/// Maximum payload bytes per slot (excluding the length prefix and header).
62const DWB_SLOT_PAYLOAD_MAX: usize = 64 * 1024;
63
64/// Raw slot content size: [len:4B][header][payload-up-to-64KiB].
65const DWB_SLOT_RAW: usize = 4 + HEADER_SIZE + DWB_SLOT_PAYLOAD_MAX;
66
67/// Per-slot on-disk stride, padded up to the O_DIRECT block size so every
68/// slot offset is block-aligned. With `DWB_SLOT_RAW = 65570` and the default
69/// 4 KiB alignment this rounds to 69632 bytes per slot.
70const DWB_SLOT_STRIDE: usize = round_up_const(DWB_SLOT_RAW, DEFAULT_ALIGNMENT);
71
72/// On-disk header occupies one aligned block (not the raw 12 bytes) so the
73/// first slot starts at a block-aligned offset. The first 12 bytes of the
74/// block carry the header fields; the remainder is zero-padded.
75const DWB_HEADER_STRIDE: usize = DEFAULT_ALIGNMENT;
76const DWB_HEADER_FIELDS: usize = 12;
77const DWB_MAGIC: u32 = 0x4457_4246; // "DWBF"
78
79/// Global counter: total bytes written to any DWB across the process.
80/// Surfaces the duplicate-write cost of running the DWB alongside an
81/// O_DIRECT WAL.
82static DWB_BYTES_WRITTEN_TOTAL: AtomicU64 = AtomicU64::new(0);
83
84/// Total bytes written to DWB files since process start.
85pub fn wal_dwb_bytes_written_total() -> u64 {
86    DWB_BYTES_WRITTEN_TOTAL.load(Ordering::Relaxed)
87}
88
89/// I/O mode for the double-write buffer file.
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum DwbMode {
92    /// DWB disabled — no torn-write protection. `DoubleWriteBuffer::open`
93    /// returns `None`.
94    Off,
95    /// Buffered I/O (page cache + `fsync`). Default when the parent WAL
96    /// does not use `O_DIRECT`.
97    Buffered,
98    /// `O_DIRECT` I/O via an aligned buffer. The intended companion to an
99    /// `O_DIRECT` WAL: keeps DWB bytes out of the page cache.
100    Direct,
101}
102
103impl DwbMode {
104    /// Choose the DWB mode that mirrors the parent writer's O_DIRECT setting
105    /// when no explicit override is configured. With `O_DIRECT` on, the DWB
106    /// should also be `O_DIRECT`, otherwise it undoes the cache-bypass.
107    pub fn default_for_parent(parent_uses_direct_io: bool) -> Self {
108        if parent_uses_direct_io {
109            Self::Direct
110        } else {
111            Self::Buffered
112        }
113    }
114}
115
116const fn round_up_const(value: usize, align: usize) -> usize {
117    (value + align - 1) & !(align - 1)
118}
119
120/// Slot stride in bytes. Exposed for tests and for callers that want to
121/// size DWB files ahead of time.
122pub const fn slot_stride() -> usize {
123    DWB_SLOT_STRIDE
124}
125
126/// Byte offset of slot `idx` within the DWB file.
127fn slot_offset(idx: u32) -> u64 {
128    DWB_HEADER_STRIDE as u64 + (idx as u64 % DWB_CAPACITY as u64) * DWB_SLOT_STRIDE as u64
129}
130
131/// Double-write buffer file.
132pub struct DoubleWriteBuffer {
133    file: File,
134    path: PathBuf,
135    mode: DwbMode,
136    /// Current write position (circular, wraps at DWB_CAPACITY).
137    write_pos: u32,
138    /// Number of valid records in the buffer.
139    count: u32,
140    /// Whether there are deferred writes that haven't been fsynced.
141    dirty: bool,
142    /// Single-slot aligned staging buffer (Direct mode only). One slot is
143    /// serialized here, then pwrite'd at the slot offset.
144    slot_buf: Option<AlignedBuf>,
145    /// Aligned header block (Direct mode only). Written on `flush()`.
146    header_buf: Option<AlignedBuf>,
147}
148
149impl std::fmt::Debug for DoubleWriteBuffer {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("DoubleWriteBuffer")
152            .field("path", &self.path)
153            .field("mode", &self.mode)
154            .field("write_pos", &self.write_pos)
155            .field("count", &self.count)
156            .finish()
157    }
158}
159
160impl DoubleWriteBuffer {
161    /// Open or create the double-write buffer file in the requested I/O mode.
162    ///
163    /// Returns `None`-wrapped errors for unsupported modes via
164    /// `Err(WalError::…)`; callers that want "off" should not call this at all.
165    pub fn open(path: &Path, mode: DwbMode) -> Result<Self> {
166        if mode == DwbMode::Off {
167            return Err(WalError::DwbOffNotOpenable);
168        }
169
170        let mut opts = OpenOptions::new();
171        opts.read(true).write(true).create(true).truncate(false);
172        #[cfg(not(target_arch = "wasm32"))]
173        if mode == DwbMode::Direct {
174            opts.custom_flags(libc::O_DIRECT);
175        }
176
177        let file = opts.open(path).map_err(|e| {
178            tracing::warn!(path = %path.display(), error = %e, mode = ?mode, "failed to open double-write buffer");
179            WalError::Io(e)
180        })?;
181
182        let (slot_buf, header_buf) = if mode == DwbMode::Direct {
183            (
184                Some(AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?),
185                Some(AlignedBuf::new(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT)?),
186            )
187        } else {
188            (None, None)
189        };
190
191        let mut dwb = Self {
192            file,
193            path: path.to_path_buf(),
194            mode,
195            write_pos: 0,
196            count: 0,
197            dirty: false,
198            slot_buf,
199            header_buf,
200        };
201
202        // Try to read existing header (first DWB_HEADER_FIELDS bytes of block 0).
203        let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
204        if file_len >= DWB_HEADER_STRIDE as u64 {
205            let mut block = vec![0u8; DWB_HEADER_STRIDE];
206            dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
207            if dwb.file.read_exact(&mut block).is_ok() {
208                let mut arr4 = [0u8; 4];
209                arr4.copy_from_slice(&block[0..4]);
210                let magic = u32::from_le_bytes(arr4);
211                if magic == DWB_MAGIC {
212                    arr4.copy_from_slice(&block[4..8]);
213                    dwb.count = u32::from_le_bytes(arr4);
214                    arr4.copy_from_slice(&block[8..12]);
215                    dwb.write_pos = u32::from_le_bytes(arr4);
216                }
217            }
218        }
219
220        Ok(dwb)
221    }
222
223    /// I/O mode this buffer was opened with.
224    pub fn mode(&self) -> DwbMode {
225        self.mode
226    }
227
228    /// Write a WAL record to the double-write buffer before WAL append.
229    ///
230    /// The record is written at the current circular position and the file
231    /// is fsynced immediately. Use `write_record_deferred` + `flush` for
232    /// batch mode (multiple records per fsync).
233    pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
234        self.write_record_deferred(record)?;
235        self.flush()
236    }
237
238    /// Write a WAL record to the DWB without fsyncing.
239    ///
240    /// The data is written to the OS page cache (Buffered mode) or directly
241    /// to the block device (Direct mode) but not guaranteed durable until
242    /// `flush()` is called. Use this in batch mode: write all records in a
243    /// group commit batch, then call `flush()` once — reducing fsync calls
244    /// from N-per-batch to 1-per-batch.
245    pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
246        let total_size = HEADER_SIZE + record.payload.len();
247
248        // Max 64 KiB per slot — larger records skip the double-write buffer
249        // (they're multi-page and need different protection).
250        if total_size > DWB_SLOT_PAYLOAD_MAX {
251            return Ok(()); // Skip oversized records.
252        }
253
254        let header_bytes = record.header.to_bytes();
255        let offset = slot_offset(self.write_pos);
256
257        match self.mode {
258            DwbMode::Off => unreachable!("Off never opens a DoubleWriteBuffer"),
259            DwbMode::Buffered => {
260                self.file
261                    .seek(SeekFrom::Start(offset))
262                    .map_err(WalError::Io)?;
263                self.file
264                    .write_all(&(total_size as u32).to_le_bytes())
265                    .map_err(WalError::Io)?;
266                self.file.write_all(&header_bytes).map_err(WalError::Io)?;
267                self.file.write_all(&record.payload).map_err(WalError::Io)?;
268                DWB_BYTES_WRITTEN_TOTAL.fetch_add(
269                    (4 + header_bytes.len() + record.payload.len()) as u64,
270                    Ordering::Relaxed,
271                );
272            }
273            DwbMode::Direct => {
274                let buf = self
275                    .slot_buf
276                    .as_mut()
277                    .expect("slot_buf present in Direct mode");
278                buf.clear();
279                buf.write(&(total_size as u32).to_le_bytes());
280                buf.write(&header_bytes);
281                buf.write(&record.payload);
282                // Zero the tail so the full aligned slot can be written
283                // without leaking prior contents.
284                zero_tail(buf);
285                let slice = full_capacity_slice(buf);
286                debug_assert_eq!(slice.len(), DWB_SLOT_STRIDE);
287                debug_assert!(is_aligned(offset as usize, DEFAULT_ALIGNMENT));
288                pwrite_all(&self.file, slice, offset)?;
289                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
290            }
291        }
292
293        self.write_pos = self.write_pos.wrapping_add(1);
294        self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
295        self.dirty = true;
296
297        Ok(())
298    }
299
300    /// Flush the DWB header and fsync the file.
301    ///
302    /// Must be called after one or more `write_record_deferred` calls to make
303    /// the records durable. The single fsync covers all deferred writes since
304    /// the last flush — amortizing the cost across the group commit batch.
305    pub fn flush(&mut self) -> Result<()> {
306        if !self.dirty {
307            return Ok(());
308        }
309
310        let mut header = [0u8; DWB_HEADER_FIELDS];
311        header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
312        header[4..8].copy_from_slice(&self.count.to_le_bytes());
313        header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
314
315        match self.mode {
316            DwbMode::Off => unreachable!("invariant: flush() is gated on mode != Off by caller"),
317            DwbMode::Buffered => {
318                self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
319                self.file.write_all(&header).map_err(WalError::Io)?;
320                DWB_BYTES_WRITTEN_TOTAL.fetch_add(header.len() as u64, Ordering::Relaxed);
321            }
322            DwbMode::Direct => {
323                let buf = self
324                    .header_buf
325                    .as_mut()
326                    .expect("header_buf present in Direct mode");
327                buf.clear();
328                buf.write(&header);
329                zero_tail(buf);
330                let slice = full_capacity_slice(buf);
331                debug_assert_eq!(slice.len(), DWB_HEADER_STRIDE);
332                pwrite_all(&self.file, slice, 0)?;
333                DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
334            }
335        }
336
337        self.file.sync_all().map_err(WalError::Io)?;
338        self.dirty = false;
339
340        Ok(())
341    }
342
343    /// Path to the double-write buffer file.
344    pub fn path(&self) -> &Path {
345        &self.path
346    }
347
348    /// Try to recover a WAL record by LSN from the double-write buffer.
349    ///
350    /// Scans **all** DWB_CAPACITY slots for a record matching the given LSN
351    /// with valid CRC. We scan every slot rather than relying on `count` or
352    /// `write_pos` because the header itself may be stale or corrupted after
353    /// a crash. Each slot is self-describing: the record's own CRC validates
354    /// whether the slot contains usable data.
355    pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
356        #[cfg(target_arch = "wasm32")]
357        {
358            let _ = target_lsn;
359            return Ok(None);
360        }
361
362        #[cfg(not(target_arch = "wasm32"))]
363        {
364            use std::os::unix::io::AsRawFd as _;
365
366            // Under O_DIRECT, reads must also use aligned buffers and aligned
367            // lengths. Read one full aligned slot at a time, then parse.
368            let mut slot = AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?;
369
370            for i in 0..DWB_CAPACITY as u32 {
371                let offset = slot_offset(i);
372                // SAFETY: slot.as_mut_ptr is valid for `capacity()` bytes.
373                let read = unsafe {
374                    libc::pread(
375                        self.file.as_raw_fd(),
376                        slot.as_mut_ptr() as *mut libc::c_void,
377                        DWB_SLOT_STRIDE,
378                        offset as libc::off_t,
379                    )
380                };
381                if read <= 0 {
382                    continue;
383                }
384                // SAFETY: the kernel populated `read` bytes starting at the buffer.
385                let bytes: &[u8] =
386                    unsafe { std::slice::from_raw_parts(slot.as_ptr(), read as usize) };
387                if bytes.len() < 4 + HEADER_SIZE {
388                    continue;
389                }
390
391                let mut arr4 = [0u8; 4];
392                arr4.copy_from_slice(&bytes[0..4]);
393                let total_size = u32::from_le_bytes(arr4) as usize;
394                if !(HEADER_SIZE..=DWB_SLOT_PAYLOAD_MAX).contains(&total_size)
395                    || bytes.len() < 4 + total_size
396                {
397                    continue;
398                }
399
400                let mut header_buf = [0u8; HEADER_SIZE];
401                header_buf.copy_from_slice(&bytes[4..4 + HEADER_SIZE]);
402                let header = RecordHeader::from_bytes(&header_buf);
403                if header.magic != WAL_MAGIC || header.lsn != target_lsn {
404                    continue;
405                }
406
407                let payload_len = total_size - HEADER_SIZE;
408                let payload = bytes[4 + HEADER_SIZE..4 + HEADER_SIZE + payload_len].to_vec();
409                let record = WalRecord { header, payload };
410                if record.verify_checksum().is_ok() {
411                    return Ok(Some(record));
412                }
413            }
414
415            Ok(None)
416        }
417    }
418}
419
420/// Fill the unwritten tail of `buf` with zero bytes so an O_DIRECT write of
421/// the entire aligned slot does not leak stale buffer contents to disk.
422fn zero_tail(buf: &mut AlignedBuf) {
423    let written = buf.len();
424    let cap = buf.capacity();
425    if written < cap {
426        // SAFETY: `as_mut_ptr` is valid for `capacity` bytes; we write only
427        // the uninitialized tail between `written..capacity`.
428        unsafe {
429            std::ptr::write_bytes(buf.as_mut_ptr().add(written), 0, cap - written);
430        }
431    }
432}
433
434/// View the entire allocated capacity of `buf` as a byte slice. Requires
435/// that the caller has zeroed any unwritten tail (see `zero_tail`).
436fn full_capacity_slice(buf: &AlignedBuf) -> &[u8] {
437    // SAFETY: AlignedBuf guarantees `as_ptr` points to `capacity()` valid
438    // bytes (alloc_zeroed) for the lifetime of the buffer.
439    unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.capacity()) }
440}
441
442/// `pwrite`-retry helper that handles short writes.
443fn pwrite_all(file: &File, data: &[u8], offset: u64) -> Result<()> {
444    #[cfg(not(target_arch = "wasm32"))]
445    {
446        use std::os::unix::io::AsRawFd as _;
447        let fd = file.as_raw_fd();
448        let mut remaining = data;
449        let mut write_offset = offset;
450        while !remaining.is_empty() {
451            let n = unsafe {
452                libc::pwrite(
453                    fd,
454                    remaining.as_ptr() as *const libc::c_void,
455                    remaining.len(),
456                    write_offset as libc::off_t,
457                )
458            };
459            if n < 0 {
460                return Err(WalError::Io(std::io::Error::last_os_error()));
461            }
462            let n = n as usize;
463            remaining = &remaining[n..];
464            write_offset += n as u64;
465        }
466        Ok(())
467    }
468    #[cfg(target_arch = "wasm32")]
469    {
470        let _ = (file, data, offset);
471        Err(WalError::Unsupported {
472            detail: "O_DIRECT pwrite not available on wasm32",
473        })
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use crate::record::RecordType;
481
482    fn open_buffered(path: &Path) -> DoubleWriteBuffer {
483        DoubleWriteBuffer::open(path, DwbMode::Buffered).unwrap()
484    }
485
486    #[test]
487    fn write_and_recover() {
488        let dir = tempfile::tempdir().unwrap();
489        let dwb_path = dir.path().join("test.dwb");
490
491        let mut dwb = open_buffered(&dwb_path);
492
493        let record = WalRecord::new(
494            RecordType::Put as u32,
495            42,
496            1,
497            0,
498            0,
499            b"hello double-write".to_vec(),
500            None,
501            None,
502        )
503        .unwrap();
504
505        dwb.write_record(&record).unwrap();
506
507        // Recover by LSN.
508        let recovered = dwb.recover_record(42).unwrap();
509        assert!(recovered.is_some());
510        let rec = recovered.unwrap();
511        assert_eq!(rec.header.lsn, 42);
512        assert_eq!(rec.payload, b"hello double-write");
513    }
514
515    #[test]
516    fn recover_nonexistent_returns_none() {
517        let dir = tempfile::tempdir().unwrap();
518        let dwb_path = dir.path().join("test2.dwb");
519
520        let mut dwb = open_buffered(&dwb_path);
521        let result = dwb.recover_record(999).unwrap();
522        assert!(result.is_none());
523    }
524
525    #[test]
526    fn survives_reopen() {
527        let dir = tempfile::tempdir().unwrap();
528        let dwb_path = dir.path().join("reopen.dwb");
529
530        {
531            let mut dwb = open_buffered(&dwb_path);
532            let record = WalRecord::new(
533                RecordType::Put as u32,
534                7,
535                1,
536                0,
537                0,
538                b"durable".to_vec(),
539                None,
540                None,
541            )
542            .unwrap();
543            dwb.write_record(&record).unwrap();
544        }
545
546        let mut dwb = open_buffered(&dwb_path);
547        let recovered = dwb.recover_record(7).unwrap();
548        assert!(recovered.is_some());
549        assert_eq!(recovered.unwrap().payload, b"durable");
550    }
551
552    #[test]
553    fn batch_deferred_writes_and_flush() {
554        let dir = tempfile::tempdir().unwrap();
555        let dwb_path = dir.path().join("batch.dwb");
556
557        let mut dwb = open_buffered(&dwb_path);
558
559        for lsn in 1..=5u64 {
560            let record = WalRecord::new(
561                RecordType::Put as u32,
562                lsn,
563                1,
564                0,
565                0,
566                format!("batch-{lsn}").into_bytes(),
567                None,
568                None,
569            )
570            .unwrap();
571            dwb.write_record_deferred(&record).unwrap();
572        }
573
574        assert!(dwb.dirty);
575        dwb.flush().unwrap();
576        assert!(!dwb.dirty);
577
578        for lsn in 1..=5u64 {
579            let recovered = dwb.recover_record(lsn).unwrap();
580            assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
581            assert_eq!(
582                recovered.unwrap().payload,
583                format!("batch-{lsn}").into_bytes()
584            );
585        }
586    }
587
588    #[test]
589    fn flush_is_idempotent() {
590        let dir = tempfile::tempdir().unwrap();
591        let dwb_path = dir.path().join("idem.dwb");
592
593        let mut dwb = open_buffered(&dwb_path);
594
595        dwb.flush().unwrap();
596        assert!(!dwb.dirty);
597
598        let record = WalRecord::new(
599            RecordType::Put as u32,
600            1,
601            1,
602            0,
603            0,
604            b"data".to_vec(),
605            None,
606            None,
607        )
608        .unwrap();
609        dwb.write_record_deferred(&record).unwrap();
610        dwb.flush().unwrap();
611        dwb.flush().unwrap();
612        assert!(!dwb.dirty);
613    }
614
615    #[test]
616    fn slot_stride_is_o_direct_aligned() {
617        // The DWB slot stride must be a multiple of the WAL alignment
618        // (4 KiB) so the file can be opened with O_DIRECT alongside an
619        // O_DIRECT WAL. With a non-aligned stride, every slot after the
620        // first lands at an unaligned offset and the kernel rejects the
621        // write with -EINVAL.
622        assert!(
623            is_aligned(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT),
624            "DWB slot stride {DWB_SLOT_STRIDE} bytes is not a multiple of {DEFAULT_ALIGNMENT}"
625        );
626        assert!(is_aligned(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT));
627        for i in 0..DWB_CAPACITY as u32 {
628            assert!(is_aligned(slot_offset(i) as usize, DEFAULT_ALIGNMENT));
629        }
630    }
631
632    #[test]
633    fn recover_after_wraparound() {
634        let dir = tempfile::tempdir().unwrap();
635        let dwb_path = dir.path().join("wrap.dwb");
636
637        let mut dwb = open_buffered(&dwb_path);
638
639        let total = DWB_CAPACITY as u64 + 5;
640        for lsn in 1..=total {
641            let record = WalRecord::new(
642                RecordType::Put as u32,
643                lsn,
644                1,
645                0,
646                0,
647                format!("wrap-{lsn}").into_bytes(),
648                None,
649                None,
650            )
651            .unwrap();
652            dwb.write_record_deferred(&record).unwrap();
653        }
654        dwb.flush().unwrap();
655
656        for lsn in (total - 4)..=total {
657            let recovered = dwb.recover_record(lsn).unwrap();
658            assert!(
659                recovered.is_some(),
660                "LSN {lsn} should be recoverable after wrap-around"
661            );
662            assert_eq!(
663                recovered.unwrap().payload,
664                format!("wrap-{lsn}").into_bytes()
665            );
666        }
667
668        for lsn in 1..=5u64 {
669            let recovered = dwb.recover_record(lsn).unwrap();
670            assert!(
671                recovered.is_none(),
672                "LSN {lsn} should have been overwritten by wrap-around"
673            );
674        }
675    }
676
677    #[test]
678    fn bytes_written_counter_increments() {
679        let dir = tempfile::tempdir().unwrap();
680        let dwb_path = dir.path().join("counter.dwb");
681        let before = wal_dwb_bytes_written_total();
682
683        let mut dwb = open_buffered(&dwb_path);
684        let rec = WalRecord::new(
685            RecordType::Put as u32,
686            1,
687            1,
688            0,
689            0,
690            b"counted".to_vec(),
691            None,
692            None,
693        )
694        .unwrap();
695        dwb.write_record(&rec).unwrap();
696
697        assert!(wal_dwb_bytes_written_total() > before);
698    }
699}