1use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::os::unix::fs::OpenOptionsExt;
43use std::os::unix::io::AsRawFd;
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46
47use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT, is_aligned};
48use crate::error::{Result, WalError};
49use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
50
51const DWB_CAPACITY: usize = 64;
59
60const DWB_SLOT_PAYLOAD_MAX: usize = 64 * 1024;
62
63const DWB_SLOT_RAW: usize = 4 + HEADER_SIZE + DWB_SLOT_PAYLOAD_MAX;
65
66const DWB_SLOT_STRIDE: usize = round_up_const(DWB_SLOT_RAW, DEFAULT_ALIGNMENT);
70
71const DWB_HEADER_STRIDE: usize = DEFAULT_ALIGNMENT;
75const DWB_HEADER_FIELDS: usize = 12;
76const DWB_MAGIC: u32 = 0x4457_4246; static DWB_BYTES_WRITTEN_TOTAL: AtomicU64 = AtomicU64::new(0);
82
83pub fn wal_dwb_bytes_written_total() -> u64 {
85 DWB_BYTES_WRITTEN_TOTAL.load(Ordering::Relaxed)
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum DwbMode {
91 Off,
94 Buffered,
97 Direct,
100}
101
102impl DwbMode {
103 pub fn default_for_parent(parent_uses_direct_io: bool) -> Self {
107 if parent_uses_direct_io {
108 Self::Direct
109 } else {
110 Self::Buffered
111 }
112 }
113}
114
115const fn round_up_const(value: usize, align: usize) -> usize {
116 (value + align - 1) & !(align - 1)
117}
118
119pub const fn slot_stride() -> usize {
122 DWB_SLOT_STRIDE
123}
124
125fn slot_offset(idx: u32) -> u64 {
127 DWB_HEADER_STRIDE as u64 + (idx as u64 % DWB_CAPACITY as u64) * DWB_SLOT_STRIDE as u64
128}
129
130pub struct DoubleWriteBuffer {
132 file: File,
133 path: PathBuf,
134 mode: DwbMode,
135 write_pos: u32,
137 count: u32,
139 dirty: bool,
141 slot_buf: Option<AlignedBuf>,
144 header_buf: Option<AlignedBuf>,
146}
147
148impl std::fmt::Debug for DoubleWriteBuffer {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("DoubleWriteBuffer")
151 .field("path", &self.path)
152 .field("mode", &self.mode)
153 .field("write_pos", &self.write_pos)
154 .field("count", &self.count)
155 .finish()
156 }
157}
158
159impl DoubleWriteBuffer {
160 pub fn open(path: &Path, mode: DwbMode) -> Result<Self> {
165 if mode == DwbMode::Off {
166 return Err(WalError::DwbOffNotOpenable);
167 }
168
169 let mut opts = OpenOptions::new();
170 opts.read(true).write(true).create(true).truncate(false);
171 if mode == DwbMode::Direct {
172 opts.custom_flags(libc::O_DIRECT);
173 }
174
175 let file = opts.open(path).map_err(|e| {
176 tracing::warn!(path = %path.display(), error = %e, mode = ?mode, "failed to open double-write buffer");
177 WalError::Io(e)
178 })?;
179
180 let (slot_buf, header_buf) = if mode == DwbMode::Direct {
181 (
182 Some(AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?),
183 Some(AlignedBuf::new(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT)?),
184 )
185 } else {
186 (None, None)
187 };
188
189 let mut dwb = Self {
190 file,
191 path: path.to_path_buf(),
192 mode,
193 write_pos: 0,
194 count: 0,
195 dirty: false,
196 slot_buf,
197 header_buf,
198 };
199
200 let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
202 if file_len >= DWB_HEADER_STRIDE as u64 {
203 let mut block = vec![0u8; DWB_HEADER_STRIDE];
204 dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
205 if dwb.file.read_exact(&mut block).is_ok() {
206 let mut arr4 = [0u8; 4];
207 arr4.copy_from_slice(&block[0..4]);
208 let magic = u32::from_le_bytes(arr4);
209 if magic == DWB_MAGIC {
210 arr4.copy_from_slice(&block[4..8]);
211 dwb.count = u32::from_le_bytes(arr4);
212 arr4.copy_from_slice(&block[8..12]);
213 dwb.write_pos = u32::from_le_bytes(arr4);
214 }
215 }
216 }
217
218 Ok(dwb)
219 }
220
221 pub fn mode(&self) -> DwbMode {
223 self.mode
224 }
225
226 pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
232 self.write_record_deferred(record)?;
233 self.flush()
234 }
235
236 pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
244 let total_size = HEADER_SIZE + record.payload.len();
245
246 if total_size > DWB_SLOT_PAYLOAD_MAX {
249 return Ok(()); }
251
252 let header_bytes = record.header.to_bytes();
253 let offset = slot_offset(self.write_pos);
254
255 match self.mode {
256 DwbMode::Off => unreachable!("Off never opens a DoubleWriteBuffer"),
257 DwbMode::Buffered => {
258 self.file
259 .seek(SeekFrom::Start(offset))
260 .map_err(WalError::Io)?;
261 self.file
262 .write_all(&(total_size as u32).to_le_bytes())
263 .map_err(WalError::Io)?;
264 self.file.write_all(&header_bytes).map_err(WalError::Io)?;
265 self.file.write_all(&record.payload).map_err(WalError::Io)?;
266 DWB_BYTES_WRITTEN_TOTAL.fetch_add(
267 (4 + header_bytes.len() + record.payload.len()) as u64,
268 Ordering::Relaxed,
269 );
270 }
271 DwbMode::Direct => {
272 let buf = self
273 .slot_buf
274 .as_mut()
275 .expect("slot_buf present in Direct mode");
276 buf.clear();
277 buf.write(&(total_size as u32).to_le_bytes());
278 buf.write(&header_bytes);
279 buf.write(&record.payload);
280 zero_tail(buf);
283 let slice = full_capacity_slice(buf);
284 debug_assert_eq!(slice.len(), DWB_SLOT_STRIDE);
285 debug_assert!(is_aligned(offset as usize, DEFAULT_ALIGNMENT));
286 pwrite_all(&self.file, slice, offset)?;
287 DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
288 }
289 }
290
291 self.write_pos = self.write_pos.wrapping_add(1);
292 self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
293 self.dirty = true;
294
295 Ok(())
296 }
297
298 pub fn flush(&mut self) -> Result<()> {
304 if !self.dirty {
305 return Ok(());
306 }
307
308 let mut header = [0u8; DWB_HEADER_FIELDS];
309 header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
310 header[4..8].copy_from_slice(&self.count.to_le_bytes());
311 header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
312
313 match self.mode {
314 DwbMode::Off => unreachable!("invariant: flush() is gated on mode != Off by caller"),
315 DwbMode::Buffered => {
316 self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
317 self.file.write_all(&header).map_err(WalError::Io)?;
318 DWB_BYTES_WRITTEN_TOTAL.fetch_add(header.len() as u64, Ordering::Relaxed);
319 }
320 DwbMode::Direct => {
321 let buf = self
322 .header_buf
323 .as_mut()
324 .expect("header_buf present in Direct mode");
325 buf.clear();
326 buf.write(&header);
327 zero_tail(buf);
328 let slice = full_capacity_slice(buf);
329 debug_assert_eq!(slice.len(), DWB_HEADER_STRIDE);
330 pwrite_all(&self.file, slice, 0)?;
331 DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
332 }
333 }
334
335 self.file.sync_all().map_err(WalError::Io)?;
336 self.dirty = false;
337
338 Ok(())
339 }
340
341 pub fn path(&self) -> &Path {
343 &self.path
344 }
345
346 pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
354 let mut slot = AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?;
357
358 for i in 0..DWB_CAPACITY as u32 {
359 let offset = slot_offset(i);
360 let read = unsafe {
362 libc::pread(
363 self.file.as_raw_fd(),
364 slot.as_mut_ptr() as *mut libc::c_void,
365 DWB_SLOT_STRIDE,
366 offset as libc::off_t,
367 )
368 };
369 if read <= 0 {
370 continue;
371 }
372 let bytes: &[u8] = unsafe { std::slice::from_raw_parts(slot.as_ptr(), read as usize) };
374 if bytes.len() < 4 + HEADER_SIZE {
375 continue;
376 }
377
378 let mut arr4 = [0u8; 4];
379 arr4.copy_from_slice(&bytes[0..4]);
380 let total_size = u32::from_le_bytes(arr4) as usize;
381 if !(HEADER_SIZE..=DWB_SLOT_PAYLOAD_MAX).contains(&total_size)
382 || bytes.len() < 4 + total_size
383 {
384 continue;
385 }
386
387 let mut header_buf = [0u8; HEADER_SIZE];
388 header_buf.copy_from_slice(&bytes[4..4 + HEADER_SIZE]);
389 let header = RecordHeader::from_bytes(&header_buf);
390 if header.magic != WAL_MAGIC || header.lsn != target_lsn {
391 continue;
392 }
393
394 let payload_len = total_size - HEADER_SIZE;
395 let payload = bytes[4 + HEADER_SIZE..4 + HEADER_SIZE + payload_len].to_vec();
396 let record = WalRecord { header, payload };
397 if record.verify_checksum().is_ok() {
398 return Ok(Some(record));
399 }
400 }
401
402 Ok(None)
403 }
404}
405
406fn zero_tail(buf: &mut AlignedBuf) {
409 let written = buf.len();
410 let cap = buf.capacity();
411 if written < cap {
412 unsafe {
415 std::ptr::write_bytes(buf.as_mut_ptr().add(written), 0, cap - written);
416 }
417 }
418}
419
420fn full_capacity_slice(buf: &AlignedBuf) -> &[u8] {
423 unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.capacity()) }
426}
427
428fn pwrite_all(file: &File, mut data: &[u8], mut offset: u64) -> Result<()> {
430 let fd = file.as_raw_fd();
431 while !data.is_empty() {
432 let n = unsafe {
433 libc::pwrite(
434 fd,
435 data.as_ptr() as *const libc::c_void,
436 data.len(),
437 offset as libc::off_t,
438 )
439 };
440 if n < 0 {
441 return Err(WalError::Io(std::io::Error::last_os_error()));
442 }
443 let n = n as usize;
444 data = &data[n..];
445 offset += n as u64;
446 }
447 Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::record::RecordType;
454
455 fn open_buffered(path: &Path) -> DoubleWriteBuffer {
456 DoubleWriteBuffer::open(path, DwbMode::Buffered).unwrap()
457 }
458
459 #[test]
460 fn write_and_recover() {
461 let dir = tempfile::tempdir().unwrap();
462 let dwb_path = dir.path().join("test.dwb");
463
464 let mut dwb = open_buffered(&dwb_path);
465
466 let record = WalRecord::new(
467 RecordType::Put as u32,
468 42,
469 1,
470 0,
471 0,
472 b"hello double-write".to_vec(),
473 None,
474 None,
475 )
476 .unwrap();
477
478 dwb.write_record(&record).unwrap();
479
480 let recovered = dwb.recover_record(42).unwrap();
482 assert!(recovered.is_some());
483 let rec = recovered.unwrap();
484 assert_eq!(rec.header.lsn, 42);
485 assert_eq!(rec.payload, b"hello double-write");
486 }
487
488 #[test]
489 fn recover_nonexistent_returns_none() {
490 let dir = tempfile::tempdir().unwrap();
491 let dwb_path = dir.path().join("test2.dwb");
492
493 let mut dwb = open_buffered(&dwb_path);
494 let result = dwb.recover_record(999).unwrap();
495 assert!(result.is_none());
496 }
497
498 #[test]
499 fn survives_reopen() {
500 let dir = tempfile::tempdir().unwrap();
501 let dwb_path = dir.path().join("reopen.dwb");
502
503 {
504 let mut dwb = open_buffered(&dwb_path);
505 let record = WalRecord::new(
506 RecordType::Put as u32,
507 7,
508 1,
509 0,
510 0,
511 b"durable".to_vec(),
512 None,
513 None,
514 )
515 .unwrap();
516 dwb.write_record(&record).unwrap();
517 }
518
519 let mut dwb = open_buffered(&dwb_path);
520 let recovered = dwb.recover_record(7).unwrap();
521 assert!(recovered.is_some());
522 assert_eq!(recovered.unwrap().payload, b"durable");
523 }
524
525 #[test]
526 fn batch_deferred_writes_and_flush() {
527 let dir = tempfile::tempdir().unwrap();
528 let dwb_path = dir.path().join("batch.dwb");
529
530 let mut dwb = open_buffered(&dwb_path);
531
532 for lsn in 1..=5u64 {
533 let record = WalRecord::new(
534 RecordType::Put as u32,
535 lsn,
536 1,
537 0,
538 0,
539 format!("batch-{lsn}").into_bytes(),
540 None,
541 None,
542 )
543 .unwrap();
544 dwb.write_record_deferred(&record).unwrap();
545 }
546
547 assert!(dwb.dirty);
548 dwb.flush().unwrap();
549 assert!(!dwb.dirty);
550
551 for lsn in 1..=5u64 {
552 let recovered = dwb.recover_record(lsn).unwrap();
553 assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
554 assert_eq!(
555 recovered.unwrap().payload,
556 format!("batch-{lsn}").into_bytes()
557 );
558 }
559 }
560
561 #[test]
562 fn flush_is_idempotent() {
563 let dir = tempfile::tempdir().unwrap();
564 let dwb_path = dir.path().join("idem.dwb");
565
566 let mut dwb = open_buffered(&dwb_path);
567
568 dwb.flush().unwrap();
569 assert!(!dwb.dirty);
570
571 let record = WalRecord::new(
572 RecordType::Put as u32,
573 1,
574 1,
575 0,
576 0,
577 b"data".to_vec(),
578 None,
579 None,
580 )
581 .unwrap();
582 dwb.write_record_deferred(&record).unwrap();
583 dwb.flush().unwrap();
584 dwb.flush().unwrap();
585 assert!(!dwb.dirty);
586 }
587
588 #[test]
589 fn slot_stride_is_o_direct_aligned() {
590 assert!(
596 is_aligned(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT),
597 "DWB slot stride {DWB_SLOT_STRIDE} bytes is not a multiple of {DEFAULT_ALIGNMENT}"
598 );
599 assert!(is_aligned(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT));
600 for i in 0..DWB_CAPACITY as u32 {
601 assert!(is_aligned(slot_offset(i) as usize, DEFAULT_ALIGNMENT));
602 }
603 }
604
605 #[test]
606 fn recover_after_wraparound() {
607 let dir = tempfile::tempdir().unwrap();
608 let dwb_path = dir.path().join("wrap.dwb");
609
610 let mut dwb = open_buffered(&dwb_path);
611
612 let total = DWB_CAPACITY as u64 + 5;
613 for lsn in 1..=total {
614 let record = WalRecord::new(
615 RecordType::Put as u32,
616 lsn,
617 1,
618 0,
619 0,
620 format!("wrap-{lsn}").into_bytes(),
621 None,
622 None,
623 )
624 .unwrap();
625 dwb.write_record_deferred(&record).unwrap();
626 }
627 dwb.flush().unwrap();
628
629 for lsn in (total - 4)..=total {
630 let recovered = dwb.recover_record(lsn).unwrap();
631 assert!(
632 recovered.is_some(),
633 "LSN {lsn} should be recoverable after wrap-around"
634 );
635 assert_eq!(
636 recovered.unwrap().payload,
637 format!("wrap-{lsn}").into_bytes()
638 );
639 }
640
641 for lsn in 1..=5u64 {
642 let recovered = dwb.recover_record(lsn).unwrap();
643 assert!(
644 recovered.is_none(),
645 "LSN {lsn} should have been overwritten by wrap-around"
646 );
647 }
648 }
649
650 #[test]
651 fn bytes_written_counter_increments() {
652 let dir = tempfile::tempdir().unwrap();
653 let dwb_path = dir.path().join("counter.dwb");
654 let before = wal_dwb_bytes_written_total();
655
656 let mut dwb = open_buffered(&dwb_path);
657 let rec = WalRecord::new(
658 RecordType::Put as u32,
659 1,
660 1,
661 0,
662 0,
663 b"counted".to_vec(),
664 None,
665 None,
666 )
667 .unwrap();
668 dwb.write_record(&rec).unwrap();
669
670 assert!(wal_dwb_bytes_written_total() > before);
671 }
672}